Skip to main content

pallas_network2/behavior/initiator/
mod.rs

1use futures::{Stream, StreamExt, stream::FusedStream};
2use std::{collections::HashMap, task::Poll};
3
4use crate::{
5    Behavior, BehaviorOutput, Message as MessageTrait, OutboundQueue, PeerId, protocol as proto,
6};
7
8use super::{AcceptedVersion, AnyMessage, BlockRange, ConnectionState};
9
10mod blockfetch;
11mod chainsync;
12mod connection;
13mod discovery;
14mod handshake;
15mod keepalive;
16mod promotion;
17
18pub use blockfetch::*;
19pub use chainsync::*;
20pub use connection::*;
21pub use discovery::*;
22pub use handshake::*;
23pub use keepalive::*;
24pub use promotion::*;
25
26/// A visitor trait that allows sub-behaviors to react to peer lifecycle events.
27///
28/// Each method is called by the initiator behavior at the appropriate point
29/// in a peer's lifecycle. Default implementations are no-ops.
30pub trait PeerVisitor {
31    /// Called when a TCP connection to the peer is established.
32    #[allow(unused_variables)]
33    fn visit_connected(
34        &mut self,
35        pid: &PeerId,
36        state: &mut InitiatorState,
37        outbound: &mut OutboundQueue<InitiatorBehavior>,
38    ) {
39        // default implementation does nothing
40    }
41
42    /// Called when the peer has been disconnected.
43    #[allow(unused_variables)]
44    fn visit_disconnected(
45        &mut self,
46        pid: &PeerId,
47        state: &mut InitiatorState,
48        outbound: &mut OutboundQueue<InitiatorBehavior>,
49    ) {
50        // default implementation does nothing
51    }
52
53    /// Called when an error occurred on the peer's connection.
54    #[allow(unused_variables)]
55    fn visit_errored(
56        &mut self,
57        pid: &PeerId,
58        state: &mut InitiatorState,
59        outbound: &mut OutboundQueue<InitiatorBehavior>,
60    ) {
61        // default implementation does nothing
62    }
63
64    /// Called when a new peer has been discovered.
65    #[allow(unused_variables)]
66    fn visit_discovered(
67        &mut self,
68        pid: &PeerId,
69        state: &mut InitiatorState,
70        outbound: &mut OutboundQueue<InitiatorBehavior>,
71    ) {
72        // default implementation does nothing
73    }
74
75    /// Called when a message has been received from the peer.
76    #[allow(unused_variables)]
77    fn visit_inbound_msg(
78        &mut self,
79        pid: &PeerId,
80        state: &mut InitiatorState,
81        outbound: &mut OutboundQueue<InitiatorBehavior>,
82    ) {
83        // default implementation does nothing
84    }
85
86    /// Called after a message has been sent to the peer.
87    #[allow(unused_variables)]
88    fn visit_outbound_msg(
89        &mut self,
90        pid: &PeerId,
91        state: &mut InitiatorState,
92        outbound: &mut OutboundQueue<InitiatorBehavior>,
93    ) {
94        // default implementation does nothing
95    }
96
97    /// Called when a peer's state has been modified by a tag function.
98    #[allow(unused_variables)]
99    fn visit_tagged(
100        &mut self,
101        pid: &PeerId,
102        state: &mut InitiatorState,
103        outbound: &mut OutboundQueue<InitiatorBehavior>,
104    ) {
105        // default implementation does nothing
106    }
107
108    /// Called during periodic housekeeping for each tracked peer.
109    #[allow(unused_variables)]
110    fn visit_housekeeping(
111        &mut self,
112        pid: &PeerId,
113        state: &mut InitiatorState,
114        outbound: &mut OutboundQueue<InitiatorBehavior>,
115    ) {
116        // default implementation does nothing
117    }
118}
119
120/// The promotion level of a peer, controlling which mini-protocols are active.
121#[derive(PartialEq, Debug, Default, Copy, Clone)]
122pub enum PromotionTag {
123    /// Peer is known but not connected.
124    #[default]
125    Cold,
126    /// Peer is connected and performing basic protocols (handshake, keepalive).
127    Warm,
128    /// Peer is fully active with all mini-protocols.
129    Hot,
130    /// Peer has been banned and will not be connected.
131    Banned,
132}
133
134/// The per-peer state tracked by the initiator behavior, including connection
135/// status and all mini-protocol state machines.
136#[derive(Default, Debug)]
137pub struct InitiatorState {
138    pub(crate) connection: ConnectionState,
139    pub(crate) promotion: PromotionTag,
140    pub(crate) handshake: proto::handshake::State<proto::handshake::n2n::VersionData>,
141    pub(crate) keepalive: proto::keepalive::State,
142    pub(crate) peersharing: proto::peersharing::State,
143    pub(crate) blockfetch: proto::blockfetch::State,
144    pub(crate) chainsync: proto::chainsync::State<proto::chainsync::HeaderContent>,
145    pub(crate) tx_submission: proto::txsubmission::State,
146    pub(crate) violation: bool,
147    pub(crate) error_count: u32,
148    pub(crate) continue_sync: bool,
149}
150
151impl InitiatorState {
152    /// Creates a new initiator state with default values for all protocols.
153    pub fn new() -> Self {
154        InitiatorState {
155            connection: ConnectionState::default(),
156            promotion: PromotionTag::default(),
157            handshake: proto::handshake::State::default(),
158            keepalive: proto::keepalive::State::default(),
159            peersharing: proto::peersharing::State::default(),
160            blockfetch: proto::blockfetch::State::default(),
161            chainsync: crate::protocol::chainsync::State::default(),
162            tx_submission: crate::protocol::txsubmission::State::default(),
163            violation: false,
164            error_count: 0,
165            continue_sync: false,
166        }
167    }
168
169    /// Returns true if the handshake has completed and mini-protocols are active.
170    pub fn is_initialized(&self) -> bool {
171        matches!(self.connection, ConnectionState::Initialized)
172    }
173
174    /// Returns the accepted version data if the handshake completed successfully.
175    pub fn version(&self) -> Option<proto::handshake::n2n::VersionData> {
176        match &self.handshake {
177            proto::handshake::State::Done(proto::handshake::DoneState::Accepted(_, data)) => {
178                Some(data.clone())
179            }
180            _ => None,
181        }
182    }
183
184    /// Returns the current promotion level of this peer.
185    pub fn promotion(&self) -> PromotionTag {
186        self.promotion
187    }
188
189    /// Returns true if the negotiated version supports peer sharing.
190    pub fn supports_peer_sharing(&self) -> bool {
191        let val = self
192            .version()
193            .as_ref()
194            .and_then(|v| v.peer_sharing)
195            .unwrap_or(0);
196
197        val > 0
198    }
199
200    /// Applies a message to the corresponding mini-protocol state machine.
201    pub fn apply_msg(&mut self, msg: &AnyMessage) {
202        match msg {
203            AnyMessage::Handshake(msg) => {
204                let result = self.handshake.apply(msg);
205
206                let Ok(new) = result else {
207                    tracing::warn!("handshake violation");
208                    self.violation = true;
209                    return;
210                };
211
212                self.handshake = new;
213            }
214            AnyMessage::KeepAlive(msg) => {
215                let result = self.keepalive.apply(msg);
216
217                let Ok(new) = result else {
218                    tracing::warn!("keepalive violation");
219                    self.violation = true;
220                    return;
221                };
222
223                self.keepalive = new;
224            }
225            AnyMessage::PeerSharing(msg) => {
226                let result = self.peersharing.apply(msg);
227
228                let Ok(new) = result else {
229                    tracing::warn!("peer sharing violation");
230                    self.violation = true;
231                    return;
232                };
233
234                self.peersharing = new;
235            }
236            AnyMessage::BlockFetch(msg) => {
237                let result = self.blockfetch.apply(msg);
238
239                let Ok(new) = result else {
240                    tracing::warn!("block fetch violation");
241                    self.violation = true;
242                    return;
243                };
244
245                self.blockfetch = new;
246            }
247            AnyMessage::ChainSync(msg) => {
248                let result = self.chainsync.apply(msg);
249
250                let Ok(new) = result else {
251                    tracing::warn!("chain sync violation");
252                    self.violation = true;
253                    return;
254                };
255
256                self.chainsync = new;
257            }
258            AnyMessage::TxSubmission(msg) => {
259                let result = self.tx_submission.apply(msg);
260
261                let Ok(new) = result else {
262                    tracing::warn!("tx submission violation");
263                    self.violation = true;
264                    return;
265                };
266
267                self.tx_submission = new;
268            }
269        }
270    }
271
272    /// Resets the state back to its initial state, except for error count
273    pub fn reset(&mut self) {
274        self.connection = ConnectionState::default();
275        self.promotion = PromotionTag::default();
276        self.handshake = proto::handshake::State::default();
277        self.keepalive = proto::keepalive::State::default();
278        self.peersharing = proto::peersharing::State::default();
279        self.blockfetch = proto::blockfetch::State::default();
280        self.chainsync = proto::chainsync::State::default();
281        self.tx_submission = proto::txsubmission::State::default();
282        self.continue_sync = false;
283        self.violation = false;
284    }
285}
286
287/// A function that mutates an [`InitiatorState`], used for tagging operations
288/// like banning or demoting peers.
289pub type TagFn = fn(&mut InitiatorState);
290
291/// Commands that can be sent to the initiator behavior from external code.
292#[derive(Debug)]
293pub enum InitiatorCommand {
294    /// Add a new peer to be tracked and potentially connected.
295    IncludePeer(PeerId),
296    /// Trigger periodic housekeeping (peer promotion, discovery, etc.).
297    Housekeeping,
298    /// Begin chain synchronization from the given known points.
299    StartSync(Vec<proto::Point>),
300    /// Resume chain synchronization for a specific peer.
301    ContinueSync(PeerId),
302    /// Request a range of blocks to be fetched.
303    RequestBlocks(BlockRange),
304    /// Submit a transaction to a specific peer.
305    SendTx(
306        PeerId,
307        proto::txsubmission::EraTxId,
308        proto::txsubmission::EraTxBody,
309    ),
310    /// Ban a peer, preventing future connections.
311    BanPeer(PeerId),
312    /// Demote a peer back to cold status.
313    DemotePeer(PeerId),
314}
315
316/// Events emitted by the initiator behavior to external consumers.
317#[derive(Debug)]
318pub enum InitiatorEvent {
319    /// A peer completed the handshake and is ready for mini-protocols.
320    PeerInitialized(PeerId, AcceptedVersion),
321    /// An intersection point was found during chain-sync.
322    IntersectionFound(PeerId, proto::Point, proto::chainsync::Tip),
323    /// A new block header was received via chain-sync.
324    BlockHeaderReceived(
325        PeerId,
326        proto::chainsync::HeaderContent,
327        proto::chainsync::Tip,
328    ),
329    /// A rollback was received via chain-sync.
330    RollbackReceived(PeerId, proto::Point, proto::chainsync::Tip),
331    /// A block body was received via block-fetch.
332    BlockBodyReceived(PeerId, proto::blockfetch::Body),
333    /// The remote peer requested a transaction via tx-submission.
334    TxRequested(PeerId, proto::txsubmission::EraTxId),
335}
336
337/// The main initiator behavior that orchestrates outbound Cardano connections.
338///
339/// Manages peer lifecycle (discovery, connection, promotion) and coordinates
340/// all mini-protocol sub-behaviors (handshake, keepalive, chain-sync,
341/// block-fetch, peer-sharing, discovery).
342#[derive(Default)]
343pub struct InitiatorBehavior {
344    pub promotion: promotion::PromotionBehavior,
345    pub connection: connection::ConnectionBehavior,
346    pub handshake: handshake::HandshakeBehavior,
347    pub keepalive: keepalive::KeepaliveBehavior,
348    pub discovery: discovery::DiscoveryBehavior,
349    pub blockfetch: blockfetch::BlockFetchBehavior,
350    pub chainsync: chainsync::ChainSyncBehavior,
351    pub peers: HashMap<PeerId, InitiatorState>,
352    pub outbound: OutboundQueue<Self>,
353}
354
355macro_rules! all_visitors {
356    ($self:ident, $pid:ident, $state:expr, $method:ident) => {
357        $self.promotion.$method($pid, $state, &mut $self.outbound);
358        $self.connection.$method($pid, $state, &mut $self.outbound);
359        $self.handshake.$method($pid, $state, &mut $self.outbound);
360        $self.keepalive.$method($pid, $state, &mut $self.outbound);
361        $self.discovery.$method($pid, $state, &mut $self.outbound);
362        $self.blockfetch.$method($pid, $state, &mut $self.outbound);
363        $self.chainsync.$method($pid, $state, &mut $self.outbound);
364    };
365}
366
367impl InitiatorBehavior {
368    #[tracing::instrument(skip_all, fields(pid = %pid, channel = %msg.channel()))]
369    /// Processes an inbound message from a peer, updating state and notifying visitors.
370    pub fn on_inbound_msg(&mut self, pid: &PeerId, msg: &AnyMessage) {
371        tracing::debug!(channel = msg.channel(), "new inbound message");
372
373        self.peers.entry(pid.clone()).and_modify(|state| {
374            state.apply_msg(msg);
375
376            all_visitors!(self, pid, state, visit_inbound_msg);
377        });
378    }
379
380    #[tracing::instrument(skip_all, fields(pid = %pid, channel = %msg.channel()))]
381    /// Processes a confirmed outbound message to a peer, updating state and notifying visitors.
382    pub fn on_outbound_msg(&mut self, pid: &PeerId, msg: &AnyMessage) {
383        tracing::debug!(channel = msg.channel(), "new outbound message");
384
385        self.peers.entry(pid.clone()).and_modify(|state| {
386            state.apply_msg(msg);
387
388            all_visitors!(self, pid, state, visit_outbound_msg);
389        });
390    }
391
392    #[tracing::instrument(skip_all, fields(pid = %pid))]
393    fn on_connected(&mut self, pid: &PeerId) {
394        tracing::info!("connected");
395
396        self.peers.entry(pid.clone()).and_modify(|state| {
397            state.connection = ConnectionState::Connected;
398
399            all_visitors!(self, pid, state, visit_connected);
400        });
401    }
402
403    #[tracing::instrument(skip_all, fields(pid = %pid))]
404    fn on_disconnected(&mut self, pid: &PeerId) {
405        tracing::info!("disconnected");
406
407        self.peers.entry(pid.clone()).and_modify(|state| {
408            state.connection = ConnectionState::Disconnected;
409            state.reset();
410
411            all_visitors!(self, pid, state, visit_disconnected);
412        });
413    }
414
415    #[tracing::instrument(skip_all, fields(pid = %pid))]
416    fn on_errored(&mut self, pid: &PeerId) {
417        tracing::error!("error");
418
419        self.peers.entry(pid.clone()).and_modify(|state| {
420            state.connection = ConnectionState::Errored;
421            state.error_count += 1;
422
423            all_visitors!(self, pid, state, visit_errored);
424        });
425    }
426
427    #[tracing::instrument(skip_all, fields(pid = %pid))]
428    fn on_tagged(&mut self, pid: &PeerId, tagger: TagFn) {
429        tracing::debug!("tagged");
430
431        self.peers.entry(pid.clone()).and_modify(|state| {
432            tagger(state);
433
434            all_visitors!(self, pid, state, visit_tagged);
435        });
436    }
437
438    #[tracing::instrument(skip_all, fields(pid = %pid))]
439    fn on_discovered(&mut self, pid: &PeerId) {
440        let mut state = InitiatorState::new();
441
442        all_visitors!(self, pid, &mut state, visit_discovered);
443
444        self.peers.insert(pid.clone(), state);
445    }
446
447    fn move_discovered_into_promotion(&mut self) {
448        let deficit = self.promotion.peer_deficit();
449
450        if deficit == 0 {
451            return;
452        }
453
454        let new = self.discovery.drain_new_peers(deficit);
455
456        if new.is_empty() {
457            tracing::trace!("no new peers discovered");
458            return;
459        }
460
461        tracing::info!(deficit = deficit, new = new.len(), "discovered new peers",);
462
463        for pid in new {
464            if !self.peers.contains_key(&pid) {
465                self.on_discovered(&pid);
466            }
467        }
468    }
469
470    #[tracing::instrument(skip_all)]
471    fn housekeeping(&mut self) {
472        for (pid, state) in self.peers.iter_mut() {
473            all_visitors!(self, pid, state, visit_housekeeping);
474        }
475
476        self.move_discovered_into_promotion();
477    }
478}
479
480impl Stream for InitiatorBehavior {
481    type Item = BehaviorOutput<Self>;
482
483    fn poll_next(
484        mut self: std::pin::Pin<&mut Self>,
485        cx: &mut std::task::Context<'_>,
486    ) -> std::task::Poll<Option<Self::Item>> {
487        let poll = self.outbound.futures.poll_next_unpin(cx);
488
489        match poll {
490            Poll::Ready(Some(x)) => Poll::Ready(Some(x)),
491            Poll::Ready(None) => Poll::Pending,
492            Poll::Pending => Poll::Pending,
493        }
494    }
495}
496
497impl FusedStream for InitiatorBehavior {
498    fn is_terminated(&self) -> bool {
499        false
500    }
501}
502
503impl Behavior for InitiatorBehavior {
504    type Event = InitiatorEvent;
505    type Command = InitiatorCommand;
506    type PeerState = InitiatorState;
507    type Message = AnyMessage;
508
509    fn handle_io(&mut self, event: crate::InterfaceEvent<Self::Message>) {
510        match &event {
511            crate::InterfaceEvent::Connected(pid) => {
512                self.on_connected(pid);
513            }
514            crate::InterfaceEvent::Disconnected(pid) => {
515                self.on_disconnected(pid);
516            }
517            crate::InterfaceEvent::Recv(pid, msgs) => {
518                for msg in msgs {
519                    self.on_inbound_msg(pid, msg);
520                }
521            }
522            crate::InterfaceEvent::Sent(pid, msg) => {
523                self.on_outbound_msg(pid, msg);
524            }
525            crate::InterfaceEvent::Error(pid, _) => {
526                self.on_errored(pid);
527            }
528            crate::InterfaceEvent::Idle => {
529                self.housekeeping();
530            }
531        }
532    }
533
534    fn execute(&mut self, cmd: Self::Command) {
535        match cmd {
536            InitiatorCommand::IncludePeer(pid) => {
537                tracing::debug!("include peer command");
538                self.on_discovered(&pid);
539            }
540            InitiatorCommand::StartSync(points) => {
541                tracing::debug!("start sync command");
542                self.chainsync.start(points);
543            }
544            InitiatorCommand::ContinueSync(pid) => {
545                tracing::debug!("continue sync command");
546                self.on_tagged(&pid, |state| state.continue_sync = true);
547            }
548            InitiatorCommand::RequestBlocks(range) => {
549                tracing::debug!("request blocks command");
550                self.blockfetch.enqueue(range);
551            }
552            InitiatorCommand::Housekeeping => {
553                tracing::debug!("housekeeping command");
554                self.housekeeping();
555            }
556            InitiatorCommand::BanPeer(pid) => {
557                tracing::debug!("ban peer command");
558                self.on_tagged(&pid, |state| state.promotion = PromotionTag::Banned);
559            }
560            InitiatorCommand::DemotePeer(pid) => {
561                tracing::debug!("demote peer command");
562                self.on_tagged(&pid, |state| state.promotion = PromotionTag::Cold);
563            }
564            InitiatorCommand::SendTx(..) => {
565                tracing::warn!("SendTx not yet implemented");
566            }
567        }
568    }
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use crate::protocol::{
575        MAINNET_MAGIC, Point, blockfetch as bf, chainsync as cs, handshake, keepalive, peersharing,
576    };
577    use crate::testing::BehaviorOutputExt;
578    use crate::{InterfaceError, InterfaceEvent};
579    use futures::StreamExt;
580    use std::collections::HashMap;
581    use std::net::Ipv4Addr;
582
583    fn drain_outputs(behavior: &mut InitiatorBehavior) -> Vec<BehaviorOutput<InitiatorBehavior>> {
584        let mut outputs = Vec::new();
585        let waker = futures::task::noop_waker();
586        let mut cx = std::task::Context::from_waker(&waker);
587
588        while let std::task::Poll::Ready(Some(output)) = behavior.poll_next_unpin(&mut cx) {
589            outputs.push(output);
590        }
591
592        outputs
593    }
594
595    fn complete_handshake(behavior: &mut InitiatorBehavior, pid: &PeerId) {
596        let version_data =
597            handshake::n2n::VersionData::new(MAINNET_MAGIC, false, Some(1), Some(false));
598        let mut values = HashMap::new();
599        values.insert(13u64, version_data.clone());
600        let version_table = handshake::VersionTable { values };
601
602        let propose = AnyMessage::Handshake(handshake::Message::Propose(version_table));
603        behavior.handle_io(InterfaceEvent::Sent(pid.clone(), propose));
604        drain_outputs(behavior);
605
606        let accept = AnyMessage::Handshake(handshake::Message::Accept(13, version_data));
607        behavior.handle_io(InterfaceEvent::Recv(pid.clone(), vec![accept]));
608        drain_outputs(behavior);
609    }
610
611    // ---- Kept: genuinely cross-cutting tests ----
612
613    #[tokio::test]
614    async fn banned_peer_not_reconnected() {
615        // Composition: violation flag → promotion ban → connection guard
616        tokio::time::pause();
617
618        let mut behavior = InitiatorBehavior::default();
619        let pid = PeerId::test(1);
620
621        behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
622        behavior.execute(InitiatorCommand::Housekeeping);
623        drain_outputs(&mut behavior);
624
625        behavior.handle_io(InterfaceEvent::Connected(pid.clone()));
626        drain_outputs(&mut behavior);
627
628        let bad_msg = AnyMessage::KeepAlive(keepalive::Message::ResponseKeepAlive(42));
629        behavior.handle_io(InterfaceEvent::Recv(pid.clone(), vec![bad_msg]));
630        behavior.execute(InitiatorCommand::Housekeeping);
631        drain_outputs(&mut behavior);
632
633        behavior.handle_io(InterfaceEvent::Disconnected(pid.clone()));
634        drain_outputs(&mut behavior);
635
636        for _ in 0..10 {
637            behavior.execute(InitiatorCommand::Housekeeping);
638            let outputs = drain_outputs(&mut behavior);
639            assert!(!outputs.has_connect_for(&pid));
640        }
641    }
642
643    #[tokio::test]
644    async fn demote_peer_returns_to_cold() {
645        // Composition: handshake → promotion hot → demote tag → connection disconnect
646        tokio::time::pause();
647
648        let mut behavior = InitiatorBehavior::default();
649        let pid = PeerId::test(2);
650
651        behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
652        behavior.execute(InitiatorCommand::Housekeeping);
653        drain_outputs(&mut behavior);
654
655        behavior.handle_io(InterfaceEvent::Connected(pid.clone()));
656        drain_outputs(&mut behavior);
657        complete_handshake(&mut behavior, &pid);
658
659        behavior.execute(InitiatorCommand::Housekeeping);
660        drain_outputs(&mut behavior);
661        assert!(behavior.promotion.hot_peers.contains(&pid));
662
663        behavior.execute(InitiatorCommand::DemotePeer(pid.clone()));
664        drain_outputs(&mut behavior);
665
666        let state = behavior.peers.get(&pid).unwrap();
667        assert_eq!(state.promotion, PromotionTag::Cold);
668
669        behavior.execute(InitiatorCommand::Housekeeping);
670        let outputs = drain_outputs(&mut behavior);
671        assert!(outputs.has_disconnect_for(&pid));
672    }
673
674    #[tokio::test]
675    async fn error_count_persists_across_disconnect() {
676        // Composition: on_errored increments → on_disconnected resets but preserves
677        //              error_count → promotion bans on threshold
678        tokio::time::pause();
679
680        let mut behavior = InitiatorBehavior {
681            promotion: PromotionBehavior::new(PromotionConfig {
682                max_error_count: 2,
683                ..PromotionConfig::default()
684            }),
685            ..Default::default()
686        };
687        let pid = PeerId::test(3);
688
689        behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
690        behavior.execute(InitiatorCommand::Housekeeping);
691        drain_outputs(&mut behavior);
692
693        for _ in 0..2 {
694            behavior.handle_io(InterfaceEvent::Error(
695                pid.clone(),
696                InterfaceError::Other("err".into()),
697            ));
698            behavior.execute(InitiatorCommand::Housekeeping);
699            drain_outputs(&mut behavior);
700            behavior.handle_io(InterfaceEvent::Disconnected(pid.clone()));
701            drain_outputs(&mut behavior);
702        }
703
704        assert!(!behavior.promotion.banned_peers.contains(&pid));
705
706        behavior.handle_io(InterfaceEvent::Error(
707            pid.clone(),
708            InterfaceError::Other("err".into()),
709        ));
710        behavior.execute(InitiatorCommand::Housekeeping);
711        drain_outputs(&mut behavior);
712
713        assert!(behavior.promotion.banned_peers.contains(&pid));
714    }
715
716    // ---- New: composition tests ----
717
718    #[tokio::test]
719    async fn full_peer_lifecycle_include_to_chainsync() {
720        // Composition: promotion → connection → handshake → promotion (warm→hot) → chainsync
721        tokio::time::pause();
722
723        let mut behavior = InitiatorBehavior::default();
724        let pid = PeerId::test(10);
725
726        // Start chainsync so the behavior will initiate it for hot peers
727        behavior.execute(InitiatorCommand::StartSync(vec![Point::Origin]));
728
729        // Include peer → housekeeping promotes cold→warm and connects
730        behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
731        behavior.execute(InitiatorCommand::Housekeeping);
732        let outputs = drain_outputs(&mut behavior);
733
734        assert!(behavior.promotion.warm_peers.contains(&pid));
735        assert!(outputs.has_connect_for(&pid));
736
737        // Connected → handshake proposes
738        behavior.handle_io(InterfaceEvent::Connected(pid.clone()));
739        let outputs = drain_outputs(&mut behavior);
740        assert!(
741            outputs
742                .has_send(|m| matches!(m, AnyMessage::Handshake(handshake::Message::Propose(_))))
743        );
744
745        // Complete handshake → Initialized
746        complete_handshake(&mut behavior, &pid);
747
748        // Housekeeping promotes warm→hot, chainsync starts FindIntersect
749        behavior.execute(InitiatorCommand::Housekeeping);
750        let outputs = drain_outputs(&mut behavior);
751
752        assert!(behavior.promotion.hot_peers.contains(&pid));
753        assert!(
754            outputs.has_send(|m| matches!(m, AnyMessage::ChainSync(cs::Message::FindIntersect(_)))),
755            "chainsync should start for hot initialized peer"
756        );
757    }
758
759    #[tokio::test]
760    async fn housekeeping_promotes_and_connects_in_same_pass() {
761        // Composition: visitor ordering — promotion runs before connection in all_visitors!
762        tokio::time::pause();
763
764        let mut behavior = InitiatorBehavior::default();
765        let pid = PeerId::test(11);
766
767        behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
768
769        // Single housekeeping call should both promote cold→warm AND issue Connect
770        behavior.execute(InitiatorCommand::Housekeeping);
771        let outputs = drain_outputs(&mut behavior);
772
773        assert!(
774            behavior.promotion.warm_peers.contains(&pid),
775            "peer should be promoted to warm"
776        );
777        assert!(
778            outputs.has_connect_for(&pid),
779            "Connect should be issued in the same housekeeping pass"
780        );
781    }
782
783    #[tokio::test]
784    async fn discovery_feeds_into_promotion() {
785        // Composition: discovery accumulates peers → housekeeping drains → promotion adds to cold
786        tokio::time::pause();
787
788        let mut behavior = InitiatorBehavior::default();
789        let seed_pid = PeerId::test(12);
790
791        // Include and fully initialize a seed peer with peer-sharing support
792        behavior.execute(InitiatorCommand::IncludePeer(seed_pid.clone()));
793        behavior.execute(InitiatorCommand::Housekeeping);
794        drain_outputs(&mut behavior);
795
796        behavior.handle_io(InterfaceEvent::Connected(seed_pid.clone()));
797        drain_outputs(&mut behavior);
798        complete_handshake(&mut behavior, &seed_pid);
799        behavior.execute(InitiatorCommand::Housekeeping);
800        drain_outputs(&mut behavior);
801
802        // Simulate peersharing response with 2 new peers
803        let share_response = AnyMessage::PeerSharing(peersharing::Message::SharePeers(vec![
804            peersharing::PeerAddress::V4(Ipv4Addr::new(192, 168, 1, 1), 3000),
805            peersharing::PeerAddress::V4(Ipv4Addr::new(192, 168, 1, 2), 3001),
806        ]));
807
808        // We need the seed peer's peersharing state to be in the right state first.
809        // Simulate the outbound ShareRequest being sent (to move state to Busy)
810        let share_req = AnyMessage::PeerSharing(peersharing::Message::ShareRequest(10));
811        behavior.handle_io(InterfaceEvent::Sent(seed_pid.clone(), share_req));
812        drain_outputs(&mut behavior);
813
814        // Now receive the response
815        behavior.handle_io(InterfaceEvent::Recv(seed_pid.clone(), vec![share_response]));
816        drain_outputs(&mut behavior);
817
818        // Housekeeping should move discovered peers into promotion
819        behavior.execute(InitiatorCommand::Housekeeping);
820        drain_outputs(&mut behavior);
821
822        // The discovered peers should now be tracked
823        let discovered_1 = PeerId {
824            host: "192.168.1.1".to_string(),
825            port: 3000,
826        };
827        let discovered_2 = PeerId {
828            host: "192.168.1.2".to_string(),
829            port: 3001,
830        };
831
832        assert!(
833            behavior.peers.contains_key(&discovered_1),
834            "discovered peer 1 should be tracked after housekeeping"
835        );
836        assert!(
837            behavior.peers.contains_key(&discovered_2),
838            "discovered peer 2 should be tracked after housekeeping"
839        );
840    }
841
842    #[tokio::test]
843    async fn violation_bans_and_disconnects() {
844        // Composition: apply_msg sets violation → promotion bans → connection disconnects
845        tokio::time::pause();
846
847        let mut behavior = InitiatorBehavior::default();
848        let pid = PeerId::test(13);
849
850        behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
851        behavior.execute(InitiatorCommand::Housekeeping);
852        drain_outputs(&mut behavior);
853
854        behavior.handle_io(InterfaceEvent::Connected(pid.clone()));
855        drain_outputs(&mut behavior);
856
857        // Protocol violation
858        let bad_msg = AnyMessage::KeepAlive(keepalive::Message::ResponseKeepAlive(42));
859        behavior.handle_io(InterfaceEvent::Recv(pid.clone(), vec![bad_msg]));
860
861        // Housekeeping should both ban (promotion) AND disconnect (connection)
862        behavior.execute(InitiatorCommand::Housekeeping);
863        let outputs = drain_outputs(&mut behavior);
864
865        assert!(
866            behavior.promotion.banned_peers.contains(&pid),
867            "promotion should ban the violating peer"
868        );
869        assert!(
870            outputs.has_disconnect_for(&pid),
871            "connection should disconnect the banned peer"
872        );
873    }
874
875    #[tokio::test]
876    async fn blockfetch_requires_initialized_and_idle() {
877        // Composition: handshake state gates blockfetch dispatch
878        tokio::time::pause();
879
880        let mut behavior = InitiatorBehavior::default();
881        let pid = PeerId::test(14);
882
883        let range = (Point::Origin, Point::new(100, vec![0xAA; 32]));
884        behavior.blockfetch.enqueue(range.clone());
885
886        // Include peer, promote to warm, connect (but NOT handshaked)
887        behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
888        behavior.execute(InitiatorCommand::Housekeeping);
889        drain_outputs(&mut behavior);
890
891        behavior.handle_io(InterfaceEvent::Connected(pid.clone()));
892        drain_outputs(&mut behavior);
893
894        // Housekeeping — peer is Connected but not Initialized, so no RequestRange
895        behavior.execute(InitiatorCommand::Housekeeping);
896        let outputs = drain_outputs(&mut behavior);
897        assert!(
898            !outputs
899                .has_send(|m| matches!(m, AnyMessage::BlockFetch(bf::Message::RequestRange(_)))),
900            "should NOT send RequestRange before handshake"
901        );
902
903        // Complete handshake → Initialized
904        complete_handshake(&mut behavior, &pid);
905
906        // Re-enqueue since housekeeping may have consumed nothing
907        // (the request is still in the queue since peer wasn't available)
908        // Housekeeping now — peer is Initialized + blockfetch Idle
909        behavior.execute(InitiatorCommand::Housekeeping);
910        let outputs = drain_outputs(&mut behavior);
911        assert!(
912            outputs.has_send(|m| matches!(m, AnyMessage::BlockFetch(bf::Message::RequestRange(_)))),
913            "should send RequestRange after handshake completes"
914        );
915    }
916}