1use futures::{Stream, StreamExt, stream::FusedStream};
2use std::{collections::HashMap, task::Poll};
3
4use crate::{
5 Behavior, BehaviorOutput, Message as MessageTrait, OutboundQueue, PeerId, protocol as proto,
6};
7
8use super::{AcceptedVersion, AnyMessage, BlockRange, ConnectionState};
9
10mod blockfetch;
11mod chainsync;
12mod connection;
13mod discovery;
14mod handshake;
15mod keepalive;
16mod promotion;
17
18pub use blockfetch::*;
19pub use chainsync::*;
20pub use connection::*;
21pub use discovery::*;
22pub use handshake::*;
23pub use keepalive::*;
24pub use promotion::*;
25
26pub trait PeerVisitor {
31 #[allow(unused_variables)]
33 fn visit_connected(
34 &mut self,
35 pid: &PeerId,
36 state: &mut InitiatorState,
37 outbound: &mut OutboundQueue<InitiatorBehavior>,
38 ) {
39 }
41
42 #[allow(unused_variables)]
44 fn visit_disconnected(
45 &mut self,
46 pid: &PeerId,
47 state: &mut InitiatorState,
48 outbound: &mut OutboundQueue<InitiatorBehavior>,
49 ) {
50 }
52
53 #[allow(unused_variables)]
55 fn visit_errored(
56 &mut self,
57 pid: &PeerId,
58 state: &mut InitiatorState,
59 outbound: &mut OutboundQueue<InitiatorBehavior>,
60 ) {
61 }
63
64 #[allow(unused_variables)]
66 fn visit_discovered(
67 &mut self,
68 pid: &PeerId,
69 state: &mut InitiatorState,
70 outbound: &mut OutboundQueue<InitiatorBehavior>,
71 ) {
72 }
74
75 #[allow(unused_variables)]
77 fn visit_inbound_msg(
78 &mut self,
79 pid: &PeerId,
80 state: &mut InitiatorState,
81 outbound: &mut OutboundQueue<InitiatorBehavior>,
82 ) {
83 }
85
86 #[allow(unused_variables)]
88 fn visit_outbound_msg(
89 &mut self,
90 pid: &PeerId,
91 state: &mut InitiatorState,
92 outbound: &mut OutboundQueue<InitiatorBehavior>,
93 ) {
94 }
96
97 #[allow(unused_variables)]
99 fn visit_tagged(
100 &mut self,
101 pid: &PeerId,
102 state: &mut InitiatorState,
103 outbound: &mut OutboundQueue<InitiatorBehavior>,
104 ) {
105 }
107
108 #[allow(unused_variables)]
110 fn visit_housekeeping(
111 &mut self,
112 pid: &PeerId,
113 state: &mut InitiatorState,
114 outbound: &mut OutboundQueue<InitiatorBehavior>,
115 ) {
116 }
118}
119
120#[derive(PartialEq, Debug, Default, Copy, Clone)]
122pub enum PromotionTag {
123 #[default]
125 Cold,
126 Warm,
128 Hot,
130 Banned,
132}
133
134#[derive(Default, Debug)]
137pub struct InitiatorState {
138 pub(crate) connection: ConnectionState,
139 pub(crate) promotion: PromotionTag,
140 pub(crate) handshake: proto::handshake::State<proto::handshake::n2n::VersionData>,
141 pub(crate) keepalive: proto::keepalive::State,
142 pub(crate) peersharing: proto::peersharing::State,
143 pub(crate) blockfetch: proto::blockfetch::State,
144 pub(crate) chainsync: proto::chainsync::State<proto::chainsync::HeaderContent>,
145 pub(crate) tx_submission: proto::txsubmission::State,
146 pub(crate) violation: bool,
147 pub(crate) error_count: u32,
148 pub(crate) continue_sync: bool,
149}
150
151impl InitiatorState {
152 pub fn new() -> Self {
154 InitiatorState {
155 connection: ConnectionState::default(),
156 promotion: PromotionTag::default(),
157 handshake: proto::handshake::State::default(),
158 keepalive: proto::keepalive::State::default(),
159 peersharing: proto::peersharing::State::default(),
160 blockfetch: proto::blockfetch::State::default(),
161 chainsync: crate::protocol::chainsync::State::default(),
162 tx_submission: crate::protocol::txsubmission::State::default(),
163 violation: false,
164 error_count: 0,
165 continue_sync: false,
166 }
167 }
168
169 pub fn is_initialized(&self) -> bool {
171 matches!(self.connection, ConnectionState::Initialized)
172 }
173
174 pub fn version(&self) -> Option<proto::handshake::n2n::VersionData> {
176 match &self.handshake {
177 proto::handshake::State::Done(proto::handshake::DoneState::Accepted(_, data)) => {
178 Some(data.clone())
179 }
180 _ => None,
181 }
182 }
183
184 pub fn promotion(&self) -> PromotionTag {
186 self.promotion
187 }
188
189 pub fn supports_peer_sharing(&self) -> bool {
191 let val = self
192 .version()
193 .as_ref()
194 .and_then(|v| v.peer_sharing)
195 .unwrap_or(0);
196
197 val > 0
198 }
199
200 pub fn apply_msg(&mut self, msg: &AnyMessage) {
202 match msg {
203 AnyMessage::Handshake(msg) => {
204 let result = self.handshake.apply(msg);
205
206 let Ok(new) = result else {
207 tracing::warn!("handshake violation");
208 self.violation = true;
209 return;
210 };
211
212 self.handshake = new;
213 }
214 AnyMessage::KeepAlive(msg) => {
215 let result = self.keepalive.apply(msg);
216
217 let Ok(new) = result else {
218 tracing::warn!("keepalive violation");
219 self.violation = true;
220 return;
221 };
222
223 self.keepalive = new;
224 }
225 AnyMessage::PeerSharing(msg) => {
226 let result = self.peersharing.apply(msg);
227
228 let Ok(new) = result else {
229 tracing::warn!("peer sharing violation");
230 self.violation = true;
231 return;
232 };
233
234 self.peersharing = new;
235 }
236 AnyMessage::BlockFetch(msg) => {
237 let result = self.blockfetch.apply(msg);
238
239 let Ok(new) = result else {
240 tracing::warn!("block fetch violation");
241 self.violation = true;
242 return;
243 };
244
245 self.blockfetch = new;
246 }
247 AnyMessage::ChainSync(msg) => {
248 let result = self.chainsync.apply(msg);
249
250 let Ok(new) = result else {
251 tracing::warn!("chain sync violation");
252 self.violation = true;
253 return;
254 };
255
256 self.chainsync = new;
257 }
258 AnyMessage::TxSubmission(msg) => {
259 let result = self.tx_submission.apply(msg);
260
261 let Ok(new) = result else {
262 tracing::warn!("tx submission violation");
263 self.violation = true;
264 return;
265 };
266
267 self.tx_submission = new;
268 }
269 }
270 }
271
272 pub fn reset(&mut self) {
274 self.connection = ConnectionState::default();
275 self.promotion = PromotionTag::default();
276 self.handshake = proto::handshake::State::default();
277 self.keepalive = proto::keepalive::State::default();
278 self.peersharing = proto::peersharing::State::default();
279 self.blockfetch = proto::blockfetch::State::default();
280 self.chainsync = proto::chainsync::State::default();
281 self.tx_submission = proto::txsubmission::State::default();
282 self.continue_sync = false;
283 self.violation = false;
284 }
285}
286
287pub type TagFn = fn(&mut InitiatorState);
290
291#[derive(Debug)]
293pub enum InitiatorCommand {
294 IncludePeer(PeerId),
296 Housekeeping,
298 StartSync(Vec<proto::Point>),
300 ContinueSync(PeerId),
302 RequestBlocks(BlockRange),
304 SendTx(
306 PeerId,
307 proto::txsubmission::EraTxId,
308 proto::txsubmission::EraTxBody,
309 ),
310 BanPeer(PeerId),
312 DemotePeer(PeerId),
314}
315
316#[derive(Debug)]
318pub enum InitiatorEvent {
319 PeerInitialized(PeerId, AcceptedVersion),
321 IntersectionFound(PeerId, proto::Point, proto::chainsync::Tip),
323 BlockHeaderReceived(
325 PeerId,
326 proto::chainsync::HeaderContent,
327 proto::chainsync::Tip,
328 ),
329 RollbackReceived(PeerId, proto::Point, proto::chainsync::Tip),
331 BlockBodyReceived(PeerId, proto::blockfetch::Body),
333 TxRequested(PeerId, proto::txsubmission::EraTxId),
335}
336
337#[derive(Default)]
343pub struct InitiatorBehavior {
344 pub promotion: promotion::PromotionBehavior,
345 pub connection: connection::ConnectionBehavior,
346 pub handshake: handshake::HandshakeBehavior,
347 pub keepalive: keepalive::KeepaliveBehavior,
348 pub discovery: discovery::DiscoveryBehavior,
349 pub blockfetch: blockfetch::BlockFetchBehavior,
350 pub chainsync: chainsync::ChainSyncBehavior,
351 pub peers: HashMap<PeerId, InitiatorState>,
352 pub outbound: OutboundQueue<Self>,
353}
354
355macro_rules! all_visitors {
356 ($self:ident, $pid:ident, $state:expr, $method:ident) => {
357 $self.promotion.$method($pid, $state, &mut $self.outbound);
358 $self.connection.$method($pid, $state, &mut $self.outbound);
359 $self.handshake.$method($pid, $state, &mut $self.outbound);
360 $self.keepalive.$method($pid, $state, &mut $self.outbound);
361 $self.discovery.$method($pid, $state, &mut $self.outbound);
362 $self.blockfetch.$method($pid, $state, &mut $self.outbound);
363 $self.chainsync.$method($pid, $state, &mut $self.outbound);
364 };
365}
366
367impl InitiatorBehavior {
368 #[tracing::instrument(skip_all, fields(pid = %pid, channel = %msg.channel()))]
369 pub fn on_inbound_msg(&mut self, pid: &PeerId, msg: &AnyMessage) {
371 tracing::debug!(channel = msg.channel(), "new inbound message");
372
373 self.peers.entry(pid.clone()).and_modify(|state| {
374 state.apply_msg(msg);
375
376 all_visitors!(self, pid, state, visit_inbound_msg);
377 });
378 }
379
380 #[tracing::instrument(skip_all, fields(pid = %pid, channel = %msg.channel()))]
381 pub fn on_outbound_msg(&mut self, pid: &PeerId, msg: &AnyMessage) {
383 tracing::debug!(channel = msg.channel(), "new outbound message");
384
385 self.peers.entry(pid.clone()).and_modify(|state| {
386 state.apply_msg(msg);
387
388 all_visitors!(self, pid, state, visit_outbound_msg);
389 });
390 }
391
392 #[tracing::instrument(skip_all, fields(pid = %pid))]
393 fn on_connected(&mut self, pid: &PeerId) {
394 tracing::info!("connected");
395
396 self.peers.entry(pid.clone()).and_modify(|state| {
397 state.connection = ConnectionState::Connected;
398
399 all_visitors!(self, pid, state, visit_connected);
400 });
401 }
402
403 #[tracing::instrument(skip_all, fields(pid = %pid))]
404 fn on_disconnected(&mut self, pid: &PeerId) {
405 tracing::info!("disconnected");
406
407 self.peers.entry(pid.clone()).and_modify(|state| {
408 state.connection = ConnectionState::Disconnected;
409 state.reset();
410
411 all_visitors!(self, pid, state, visit_disconnected);
412 });
413 }
414
415 #[tracing::instrument(skip_all, fields(pid = %pid))]
416 fn on_errored(&mut self, pid: &PeerId) {
417 tracing::error!("error");
418
419 self.peers.entry(pid.clone()).and_modify(|state| {
420 state.connection = ConnectionState::Errored;
421 state.error_count += 1;
422
423 all_visitors!(self, pid, state, visit_errored);
424 });
425 }
426
427 #[tracing::instrument(skip_all, fields(pid = %pid))]
428 fn on_tagged(&mut self, pid: &PeerId, tagger: TagFn) {
429 tracing::debug!("tagged");
430
431 self.peers.entry(pid.clone()).and_modify(|state| {
432 tagger(state);
433
434 all_visitors!(self, pid, state, visit_tagged);
435 });
436 }
437
438 #[tracing::instrument(skip_all, fields(pid = %pid))]
439 fn on_discovered(&mut self, pid: &PeerId) {
440 let mut state = InitiatorState::new();
441
442 all_visitors!(self, pid, &mut state, visit_discovered);
443
444 self.peers.insert(pid.clone(), state);
445 }
446
447 fn move_discovered_into_promotion(&mut self) {
448 let deficit = self.promotion.peer_deficit();
449
450 if deficit == 0 {
451 return;
452 }
453
454 let new = self.discovery.drain_new_peers(deficit);
455
456 if new.is_empty() {
457 tracing::trace!("no new peers discovered");
458 return;
459 }
460
461 tracing::info!(deficit = deficit, new = new.len(), "discovered new peers",);
462
463 for pid in new {
464 if !self.peers.contains_key(&pid) {
465 self.on_discovered(&pid);
466 }
467 }
468 }
469
470 #[tracing::instrument(skip_all)]
471 fn housekeeping(&mut self) {
472 for (pid, state) in self.peers.iter_mut() {
473 all_visitors!(self, pid, state, visit_housekeeping);
474 }
475
476 self.move_discovered_into_promotion();
477 }
478}
479
480impl Stream for InitiatorBehavior {
481 type Item = BehaviorOutput<Self>;
482
483 fn poll_next(
484 mut self: std::pin::Pin<&mut Self>,
485 cx: &mut std::task::Context<'_>,
486 ) -> std::task::Poll<Option<Self::Item>> {
487 let poll = self.outbound.futures.poll_next_unpin(cx);
488
489 match poll {
490 Poll::Ready(Some(x)) => Poll::Ready(Some(x)),
491 Poll::Ready(None) => Poll::Pending,
492 Poll::Pending => Poll::Pending,
493 }
494 }
495}
496
497impl FusedStream for InitiatorBehavior {
498 fn is_terminated(&self) -> bool {
499 false
500 }
501}
502
503impl Behavior for InitiatorBehavior {
504 type Event = InitiatorEvent;
505 type Command = InitiatorCommand;
506 type PeerState = InitiatorState;
507 type Message = AnyMessage;
508
509 fn handle_io(&mut self, event: crate::InterfaceEvent<Self::Message>) {
510 match &event {
511 crate::InterfaceEvent::Connected(pid) => {
512 self.on_connected(pid);
513 }
514 crate::InterfaceEvent::Disconnected(pid) => {
515 self.on_disconnected(pid);
516 }
517 crate::InterfaceEvent::Recv(pid, msgs) => {
518 for msg in msgs {
519 self.on_inbound_msg(pid, msg);
520 }
521 }
522 crate::InterfaceEvent::Sent(pid, msg) => {
523 self.on_outbound_msg(pid, msg);
524 }
525 crate::InterfaceEvent::Error(pid, _) => {
526 self.on_errored(pid);
527 }
528 crate::InterfaceEvent::Idle => {
529 self.housekeeping();
530 }
531 }
532 }
533
534 fn execute(&mut self, cmd: Self::Command) {
535 match cmd {
536 InitiatorCommand::IncludePeer(pid) => {
537 tracing::debug!("include peer command");
538 self.on_discovered(&pid);
539 }
540 InitiatorCommand::StartSync(points) => {
541 tracing::debug!("start sync command");
542 self.chainsync.start(points);
543 }
544 InitiatorCommand::ContinueSync(pid) => {
545 tracing::debug!("continue sync command");
546 self.on_tagged(&pid, |state| state.continue_sync = true);
547 }
548 InitiatorCommand::RequestBlocks(range) => {
549 tracing::debug!("request blocks command");
550 self.blockfetch.enqueue(range);
551 }
552 InitiatorCommand::Housekeeping => {
553 tracing::debug!("housekeeping command");
554 self.housekeeping();
555 }
556 InitiatorCommand::BanPeer(pid) => {
557 tracing::debug!("ban peer command");
558 self.on_tagged(&pid, |state| state.promotion = PromotionTag::Banned);
559 }
560 InitiatorCommand::DemotePeer(pid) => {
561 tracing::debug!("demote peer command");
562 self.on_tagged(&pid, |state| state.promotion = PromotionTag::Cold);
563 }
564 InitiatorCommand::SendTx(..) => {
565 tracing::warn!("SendTx not yet implemented");
566 }
567 }
568 }
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use crate::protocol::{
575 MAINNET_MAGIC, Point, blockfetch as bf, chainsync as cs, handshake, keepalive, peersharing,
576 };
577 use crate::testing::BehaviorOutputExt;
578 use crate::{InterfaceError, InterfaceEvent};
579 use futures::StreamExt;
580 use std::collections::HashMap;
581 use std::net::Ipv4Addr;
582
583 fn drain_outputs(behavior: &mut InitiatorBehavior) -> Vec<BehaviorOutput<InitiatorBehavior>> {
584 let mut outputs = Vec::new();
585 let waker = futures::task::noop_waker();
586 let mut cx = std::task::Context::from_waker(&waker);
587
588 while let std::task::Poll::Ready(Some(output)) = behavior.poll_next_unpin(&mut cx) {
589 outputs.push(output);
590 }
591
592 outputs
593 }
594
595 fn complete_handshake(behavior: &mut InitiatorBehavior, pid: &PeerId) {
596 let version_data =
597 handshake::n2n::VersionData::new(MAINNET_MAGIC, false, Some(1), Some(false));
598 let mut values = HashMap::new();
599 values.insert(13u64, version_data.clone());
600 let version_table = handshake::VersionTable { values };
601
602 let propose = AnyMessage::Handshake(handshake::Message::Propose(version_table));
603 behavior.handle_io(InterfaceEvent::Sent(pid.clone(), propose));
604 drain_outputs(behavior);
605
606 let accept = AnyMessage::Handshake(handshake::Message::Accept(13, version_data));
607 behavior.handle_io(InterfaceEvent::Recv(pid.clone(), vec![accept]));
608 drain_outputs(behavior);
609 }
610
611 #[tokio::test]
614 async fn banned_peer_not_reconnected() {
615 tokio::time::pause();
617
618 let mut behavior = InitiatorBehavior::default();
619 let pid = PeerId::test(1);
620
621 behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
622 behavior.execute(InitiatorCommand::Housekeeping);
623 drain_outputs(&mut behavior);
624
625 behavior.handle_io(InterfaceEvent::Connected(pid.clone()));
626 drain_outputs(&mut behavior);
627
628 let bad_msg = AnyMessage::KeepAlive(keepalive::Message::ResponseKeepAlive(42));
629 behavior.handle_io(InterfaceEvent::Recv(pid.clone(), vec![bad_msg]));
630 behavior.execute(InitiatorCommand::Housekeeping);
631 drain_outputs(&mut behavior);
632
633 behavior.handle_io(InterfaceEvent::Disconnected(pid.clone()));
634 drain_outputs(&mut behavior);
635
636 for _ in 0..10 {
637 behavior.execute(InitiatorCommand::Housekeeping);
638 let outputs = drain_outputs(&mut behavior);
639 assert!(!outputs.has_connect_for(&pid));
640 }
641 }
642
643 #[tokio::test]
644 async fn demote_peer_returns_to_cold() {
645 tokio::time::pause();
647
648 let mut behavior = InitiatorBehavior::default();
649 let pid = PeerId::test(2);
650
651 behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
652 behavior.execute(InitiatorCommand::Housekeeping);
653 drain_outputs(&mut behavior);
654
655 behavior.handle_io(InterfaceEvent::Connected(pid.clone()));
656 drain_outputs(&mut behavior);
657 complete_handshake(&mut behavior, &pid);
658
659 behavior.execute(InitiatorCommand::Housekeeping);
660 drain_outputs(&mut behavior);
661 assert!(behavior.promotion.hot_peers.contains(&pid));
662
663 behavior.execute(InitiatorCommand::DemotePeer(pid.clone()));
664 drain_outputs(&mut behavior);
665
666 let state = behavior.peers.get(&pid).unwrap();
667 assert_eq!(state.promotion, PromotionTag::Cold);
668
669 behavior.execute(InitiatorCommand::Housekeeping);
670 let outputs = drain_outputs(&mut behavior);
671 assert!(outputs.has_disconnect_for(&pid));
672 }
673
674 #[tokio::test]
675 async fn error_count_persists_across_disconnect() {
676 tokio::time::pause();
679
680 let mut behavior = InitiatorBehavior {
681 promotion: PromotionBehavior::new(PromotionConfig {
682 max_error_count: 2,
683 ..PromotionConfig::default()
684 }),
685 ..Default::default()
686 };
687 let pid = PeerId::test(3);
688
689 behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
690 behavior.execute(InitiatorCommand::Housekeeping);
691 drain_outputs(&mut behavior);
692
693 for _ in 0..2 {
694 behavior.handle_io(InterfaceEvent::Error(
695 pid.clone(),
696 InterfaceError::Other("err".into()),
697 ));
698 behavior.execute(InitiatorCommand::Housekeeping);
699 drain_outputs(&mut behavior);
700 behavior.handle_io(InterfaceEvent::Disconnected(pid.clone()));
701 drain_outputs(&mut behavior);
702 }
703
704 assert!(!behavior.promotion.banned_peers.contains(&pid));
705
706 behavior.handle_io(InterfaceEvent::Error(
707 pid.clone(),
708 InterfaceError::Other("err".into()),
709 ));
710 behavior.execute(InitiatorCommand::Housekeeping);
711 drain_outputs(&mut behavior);
712
713 assert!(behavior.promotion.banned_peers.contains(&pid));
714 }
715
716 #[tokio::test]
719 async fn full_peer_lifecycle_include_to_chainsync() {
720 tokio::time::pause();
722
723 let mut behavior = InitiatorBehavior::default();
724 let pid = PeerId::test(10);
725
726 behavior.execute(InitiatorCommand::StartSync(vec![Point::Origin]));
728
729 behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
731 behavior.execute(InitiatorCommand::Housekeeping);
732 let outputs = drain_outputs(&mut behavior);
733
734 assert!(behavior.promotion.warm_peers.contains(&pid));
735 assert!(outputs.has_connect_for(&pid));
736
737 behavior.handle_io(InterfaceEvent::Connected(pid.clone()));
739 let outputs = drain_outputs(&mut behavior);
740 assert!(
741 outputs
742 .has_send(|m| matches!(m, AnyMessage::Handshake(handshake::Message::Propose(_))))
743 );
744
745 complete_handshake(&mut behavior, &pid);
747
748 behavior.execute(InitiatorCommand::Housekeeping);
750 let outputs = drain_outputs(&mut behavior);
751
752 assert!(behavior.promotion.hot_peers.contains(&pid));
753 assert!(
754 outputs.has_send(|m| matches!(m, AnyMessage::ChainSync(cs::Message::FindIntersect(_)))),
755 "chainsync should start for hot initialized peer"
756 );
757 }
758
759 #[tokio::test]
760 async fn housekeeping_promotes_and_connects_in_same_pass() {
761 tokio::time::pause();
763
764 let mut behavior = InitiatorBehavior::default();
765 let pid = PeerId::test(11);
766
767 behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
768
769 behavior.execute(InitiatorCommand::Housekeeping);
771 let outputs = drain_outputs(&mut behavior);
772
773 assert!(
774 behavior.promotion.warm_peers.contains(&pid),
775 "peer should be promoted to warm"
776 );
777 assert!(
778 outputs.has_connect_for(&pid),
779 "Connect should be issued in the same housekeeping pass"
780 );
781 }
782
783 #[tokio::test]
784 async fn discovery_feeds_into_promotion() {
785 tokio::time::pause();
787
788 let mut behavior = InitiatorBehavior::default();
789 let seed_pid = PeerId::test(12);
790
791 behavior.execute(InitiatorCommand::IncludePeer(seed_pid.clone()));
793 behavior.execute(InitiatorCommand::Housekeeping);
794 drain_outputs(&mut behavior);
795
796 behavior.handle_io(InterfaceEvent::Connected(seed_pid.clone()));
797 drain_outputs(&mut behavior);
798 complete_handshake(&mut behavior, &seed_pid);
799 behavior.execute(InitiatorCommand::Housekeeping);
800 drain_outputs(&mut behavior);
801
802 let share_response = AnyMessage::PeerSharing(peersharing::Message::SharePeers(vec![
804 peersharing::PeerAddress::V4(Ipv4Addr::new(192, 168, 1, 1), 3000),
805 peersharing::PeerAddress::V4(Ipv4Addr::new(192, 168, 1, 2), 3001),
806 ]));
807
808 let share_req = AnyMessage::PeerSharing(peersharing::Message::ShareRequest(10));
811 behavior.handle_io(InterfaceEvent::Sent(seed_pid.clone(), share_req));
812 drain_outputs(&mut behavior);
813
814 behavior.handle_io(InterfaceEvent::Recv(seed_pid.clone(), vec![share_response]));
816 drain_outputs(&mut behavior);
817
818 behavior.execute(InitiatorCommand::Housekeeping);
820 drain_outputs(&mut behavior);
821
822 let discovered_1 = PeerId {
824 host: "192.168.1.1".to_string(),
825 port: 3000,
826 };
827 let discovered_2 = PeerId {
828 host: "192.168.1.2".to_string(),
829 port: 3001,
830 };
831
832 assert!(
833 behavior.peers.contains_key(&discovered_1),
834 "discovered peer 1 should be tracked after housekeeping"
835 );
836 assert!(
837 behavior.peers.contains_key(&discovered_2),
838 "discovered peer 2 should be tracked after housekeeping"
839 );
840 }
841
842 #[tokio::test]
843 async fn violation_bans_and_disconnects() {
844 tokio::time::pause();
846
847 let mut behavior = InitiatorBehavior::default();
848 let pid = PeerId::test(13);
849
850 behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
851 behavior.execute(InitiatorCommand::Housekeeping);
852 drain_outputs(&mut behavior);
853
854 behavior.handle_io(InterfaceEvent::Connected(pid.clone()));
855 drain_outputs(&mut behavior);
856
857 let bad_msg = AnyMessage::KeepAlive(keepalive::Message::ResponseKeepAlive(42));
859 behavior.handle_io(InterfaceEvent::Recv(pid.clone(), vec![bad_msg]));
860
861 behavior.execute(InitiatorCommand::Housekeeping);
863 let outputs = drain_outputs(&mut behavior);
864
865 assert!(
866 behavior.promotion.banned_peers.contains(&pid),
867 "promotion should ban the violating peer"
868 );
869 assert!(
870 outputs.has_disconnect_for(&pid),
871 "connection should disconnect the banned peer"
872 );
873 }
874
875 #[tokio::test]
876 async fn blockfetch_requires_initialized_and_idle() {
877 tokio::time::pause();
879
880 let mut behavior = InitiatorBehavior::default();
881 let pid = PeerId::test(14);
882
883 let range = (Point::Origin, Point::new(100, vec![0xAA; 32]));
884 behavior.blockfetch.enqueue(range.clone());
885
886 behavior.execute(InitiatorCommand::IncludePeer(pid.clone()));
888 behavior.execute(InitiatorCommand::Housekeeping);
889 drain_outputs(&mut behavior);
890
891 behavior.handle_io(InterfaceEvent::Connected(pid.clone()));
892 drain_outputs(&mut behavior);
893
894 behavior.execute(InitiatorCommand::Housekeeping);
896 let outputs = drain_outputs(&mut behavior);
897 assert!(
898 !outputs
899 .has_send(|m| matches!(m, AnyMessage::BlockFetch(bf::Message::RequestRange(_)))),
900 "should NOT send RequestRange before handshake"
901 );
902
903 complete_handshake(&mut behavior, &pid);
905
906 behavior.execute(InitiatorCommand::Housekeeping);
910 let outputs = drain_outputs(&mut behavior);
911 assert!(
912 outputs.has_send(|m| matches!(m, AnyMessage::BlockFetch(bf::Message::RequestRange(_)))),
913 "should send RequestRange after handshake completes"
914 );
915 }
916}