antenna-protocol 0.1.1

SansIO core driving the antenna P2P mesh protocol.
Documentation
#[cfg(test)]
mod test {
    use std::collections::HashMap;

    use crate::{
        HandshakeInput, HandshakeOutput, Input, MAX_RECONNECT_ATTEMPTS, MeshNodeFSM, MsgPayload,
        Output, PeerID, RelayPayload, Scheduled,
        test::{drive_bootstrap_handshake, join_mesh},
    };

    fn connected_pair() -> (MeshNodeFSM, MeshNodeFSM) {
        let mut a = MeshNodeFSM::new();
        let mut b = MeshNodeFSM::new();
        drive_bootstrap_handshake::<()>(&mut a, &mut b);
        (a, b)
    }

    fn connected_triple() -> HashMap<PeerID, MeshNodeFSM> {
        let mut peers = HashMap::new();
        let mut a = MeshNodeFSM::new();
        let a_id = a.id().clone();
        let mut b = MeshNodeFSM::new();
        let b_id = b.id().clone();
        drive_bootstrap_handshake::<()>(&mut a, &mut b);
        peers.insert(a_id, a);
        peers.insert(b_id.clone(), b);
        let c = MeshNodeFSM::new();
        let c_id = c.id().clone();
        peers.insert(c_id.clone(), c);
        join_mesh(&c_id, &b_id, &mut peers);
        peers
    }

    #[test]
    fn abrupt_disconnect_schedules_reconnect_timer() {
        let (mut a, b) = connected_pair();
        let b_id = b.id().clone();

        let out = a
            .process::<()>(Input::Handshake {
                from: b_id.clone(),
                event: HandshakeInput::ConnectionDropped,
            })
            .unwrap();

        assert!(
            out.iter().any(|o| matches!(
                o,
                Output::ScheduleTimer { kind: Scheduled::ReconnectAttempt { peer }, .. }
                if peer == &b_id
            )),
            "abrupt disconnect must schedule a reconnect timer"
        );
    }

    #[test]
    fn graceful_disconnect_does_not_schedule_reconnect() {
        let (mut a, b) = connected_pair();
        let b_id = b.id().clone();

        let out = a
            .process::<()>(Input::MessageReceived {
                peer_from: b_id.clone(),
                data: MsgPayload::Disconnect,
            })
            .unwrap();

        assert!(
            !out.iter()
                .any(|o| matches!(o, Output::ScheduleTimer { .. })),
            "graceful disconnect must not schedule reconnect"
        );
    }

    #[test]
    fn peer_leaving_input_does_not_schedule_reconnect() {
        let (mut a, b) = connected_pair();
        let b_id = b.id().clone();

        let out = a
            .process::<()>(Input::PeerLeaving { peer: b_id.clone() })
            .unwrap();

        assert!(
            !out.iter()
                .any(|o| matches!(o, Output::ScheduleTimer { .. })),
            "PeerLeaving must not schedule reconnect"
        );
    }

    #[test]
    fn stale_tick_after_leave_is_noop() {
        let (mut a, b) = connected_pair();
        let b_id = b.id().clone();

        let _ = a
            .process::<()>(Input::Handshake {
                from: b_id.clone(),
                event: HandshakeInput::ConnectionDropped,
            })
            .unwrap();

        let _ = a.process::<()>(Input::Leave).unwrap();

        let out = a
            .process::<()>(Input::TimerFired {
                kind: Scheduled::ReconnectAttempt { peer: b_id.clone() },
            })
            .unwrap();

        assert!(
            out.is_empty(),
            "stale tick after leave must produce no outputs"
        );
    }

    #[test]
    fn tick_with_no_relays_gives_up_silently() {
        let (mut a, b) = connected_pair();
        let b_id = b.id().clone();

        let _ = a
            .process::<()>(Input::Handshake {
                from: b_id.clone(),
                event: HandshakeInput::ConnectionDropped,
            })
            .unwrap();

        let out = a
            .process::<()>(Input::TimerFired {
                kind: Scheduled::ReconnectAttempt { peer: b_id.clone() },
            })
            .unwrap();

        assert!(
            !out.iter()
                .any(|o| matches!(o, Output::ScheduleTimer { .. })),
            "no further reconnect timer should fire once we give up",
        );
        assert!(
            !out.iter().any(|o| matches!(o, Output::SendMessage { .. })),
            "no relay introduction can be sent without a connected relay peer",
        );

        let stale = a
            .process::<()>(Input::TimerFired {
                kind: Scheduled::ReconnectAttempt { peer: b_id.clone() },
            })
            .unwrap();
        assert!(stale.is_empty(), "stale tick after give-up must be a no-op");
    }

    #[test]
    fn lower_id_initiates_as_host_via_relay() {
        let mut peers = connected_triple();
        let mut sorted_ids: Vec<PeerID> = peers.keys().cloned().collect();
        sorted_ids.sort();
        let a_id = sorted_ids[0].clone();
        let b_id = sorted_ids[1].clone();
        let c_id = sorted_ids[2].clone();

        let _ = peers
            .get_mut(&a_id)
            .unwrap()
            .process::<()>(Input::Handshake {
                from: b_id.clone(),
                event: HandshakeInput::ConnectionDropped,
            })
            .unwrap();

        let out = peers
            .get_mut(&a_id)
            .unwrap()
            .process::<()>(Input::TimerFired {
                kind: Scheduled::ReconnectAttempt { peer: b_id.clone() },
            })
            .unwrap();

        let relay_intro_sent = out.iter().any(|o| {
            matches!(
                o,
                Output::SendMessage {
                    peer_to,
                    data: MsgPayload::RelaySignalingTo { dst, data: RelayPayload::InitConnect(_) },
                } if peer_to == &c_id && dst == &b_id
            )
        });
        assert!(
            relay_intro_sent,
            "lower-id peer must send RelaySignalingTo via relay with InitConnect targeting the lost peer",
        );

        assert!(
            out.iter().any(|o| matches!(
                o,
                Output::Handshake { peer, event: HandshakeOutput::InitSDPOffer }
                if peer == &b_id
            )),
            "lower-id peer must locally drive its host handshake to InitSDPOffer",
        );

        assert!(
            out.iter().any(|o| matches!(
                o,
                Output::ScheduleTimer { kind: Scheduled::ReconnectAttempt { peer }, .. }
                if peer == &b_id
            )),
            "next reconnect tick must be babysat",
        );
    }

    #[test]
    fn higher_id_initiates_as_joiner_via_relay() {
        let mut peers = connected_triple();
        let mut sorted_ids: Vec<PeerID> = peers.keys().cloned().collect();
        sorted_ids.sort();
        let a_id = sorted_ids[0].clone();
        let b_id = sorted_ids[1].clone();
        let c_id = sorted_ids[2].clone();

        let _ = peers
            .get_mut(&b_id)
            .unwrap()
            .process::<()>(Input::Handshake {
                from: a_id.clone(),
                event: HandshakeInput::ConnectionDropped,
            })
            .unwrap();

        let out = peers
            .get_mut(&b_id)
            .unwrap()
            .process::<()>(Input::TimerFired {
                kind: Scheduled::ReconnectAttempt { peer: a_id.clone() },
            })
            .unwrap();

        let relay_intro_sent = out.iter().any(|o| {
            matches!(
                o,
                Output::SendMessage {
                    peer_to,
                    data: MsgPayload::RelaySignalingTo { dst, data: RelayPayload::InitConnect(_) },
                } if peer_to == &c_id && dst == &a_id
            )
        });
        assert!(
            relay_intro_sent,
            "higher-id peer must send RelaySignalingTo via relay with InitConnect targeting the lost peer",
        );

        assert!(
            !out.iter().any(|o| matches!(
                o,
                Output::Handshake {
                    event: HandshakeOutput::InitSDPOffer,
                    ..
                }
            )),
            "higher-id (joiner role) peer must not start its own offer; it waits for the host",
        );
    }

    #[test]
    fn ticks_short_circuit_while_handshake_in_flight() {
        let mut peers = connected_triple();
        let mut sorted_ids: Vec<PeerID> = peers.keys().cloned().collect();
        sorted_ids.sort();
        let a_id = sorted_ids[0].clone();
        let b_id = sorted_ids[1].clone();

        let _ = peers
            .get_mut(&a_id)
            .unwrap()
            .process::<()>(Input::Handshake {
                from: b_id.clone(),
                event: HandshakeInput::ConnectionDropped,
            })
            .unwrap();

        let _ = peers
            .get_mut(&a_id)
            .unwrap()
            .process::<()>(Input::TimerFired {
                kind: Scheduled::ReconnectAttempt { peer: b_id.clone() },
            })
            .unwrap();

        for _ in 0..(MAX_RECONNECT_ATTEMPTS + 5) {
            let out = peers
                .get_mut(&a_id)
                .unwrap()
                .process::<()>(Input::TimerFired {
                    kind: Scheduled::ReconnectAttempt { peer: b_id.clone() },
                })
                .unwrap();
            assert!(
                out.is_empty(),
                "tick during in-flight handshake must be a no-op",
            );
        }
    }

    #[test]
    fn exhausted_attempts_clear_state_and_stop_scheduling() {
        let mut peers = connected_triple();
        let mut sorted_ids: Vec<PeerID> = peers.keys().cloned().collect();
        sorted_ids.sort();
        let a_id = sorted_ids[0].clone();
        let b_id = sorted_ids[1].clone();

        let _ = peers
            .get_mut(&a_id)
            .unwrap()
            .process::<()>(Input::Handshake {
                from: b_id.clone(),
                event: HandshakeInput::ConnectionDropped,
            })
            .unwrap();

        for _ in 0..MAX_RECONNECT_ATTEMPTS {
            let _ = peers
                .get_mut(&a_id)
                .unwrap()
                .process::<()>(Input::TimerFired {
                    kind: Scheduled::ReconnectAttempt { peer: b_id.clone() },
                })
                .unwrap();
            let _ = peers
                .get_mut(&a_id)
                .unwrap()
                .process::<()>(Input::Handshake {
                    from: b_id.clone(),
                    event: HandshakeInput::ConnectionDropped,
                })
                .unwrap();
        }

        let out = peers
            .get_mut(&a_id)
            .unwrap()
            .process::<()>(Input::TimerFired {
                kind: Scheduled::ReconnectAttempt { peer: b_id.clone() },
            })
            .unwrap();
        assert!(
            !out.iter()
                .any(|o| matches!(o, Output::ScheduleTimer { .. })),
            "no further reconnect timer should be scheduled after exhaustion",
        );
        assert!(
            !out.iter().any(|o| matches!(o, Output::SendMessage { .. })),
            "no relay introduction should be sent on the exhausting tick",
        );

        let stale = peers
            .get_mut(&a_id)
            .unwrap()
            .process::<()>(Input::TimerFired {
                kind: Scheduled::ReconnectAttempt { peer: b_id.clone() },
            })
            .unwrap();
        assert!(stale.is_empty(), "post-exhaustion ticks must be no-ops");
    }
}