pallas_network2/behavior/initiator/
blockfetch.rs1use std::collections::VecDeque;
2
3use crate::protocol::blockfetch as blockfetch_proto;
4
5use crate::{
6 BehaviorOutput, InterfaceCommand, OutboundQueue, PeerId,
7 behavior::{AnyMessage, BlockRange, ConnectionState},
8};
9
10use super::{InitiatorBehavior, InitiatorEvent, InitiatorState, PeerVisitor};
11
12pub type BlockFetchConfig = ();
14
15pub type Request = BlockRange;
17
18pub struct BlockFetchBehavior {
20 requests: VecDeque<Request>,
22}
23
24impl Default for BlockFetchBehavior {
25 fn default() -> Self {
26 Self::new(())
27 }
28}
29
30impl BlockFetchBehavior {
31 pub fn new(_config: BlockFetchConfig) -> Self {
33 Self {
34 requests: VecDeque::new(),
35 }
36 }
37
38 pub fn enqueue(&mut self, request: Request) {
40 self.requests.push_back(request);
41 tracing::info!(total = self.requests.len(), "new request");
42 }
43
44 pub fn request_block_batch(
46 &self,
47 pid: &PeerId,
48 range: BlockRange,
49 outbound: &mut OutboundQueue<super::InitiatorBehavior>,
50 ) {
51 tracing::info!("requesting block batch");
52
53 outbound.push_ready(BehaviorOutput::InterfaceCommand(InterfaceCommand::Send(
54 pid.clone(),
55 AnyMessage::BlockFetch(blockfetch_proto::Message::RequestRange(range)),
56 )));
57 }
58
59 pub fn dispatch_block(
62 &self,
63 pid: &PeerId,
64 state: &InitiatorState,
65 outbound: &mut OutboundQueue<super::InitiatorBehavior>,
66 ) {
67 if let blockfetch_proto::State::Streaming(Some(block)) = &state.blockfetch {
68 let out = InitiatorEvent::BlockBodyReceived(pid.clone(), block.clone());
69
70 outbound.push_ready(BehaviorOutput::ExternalEvent(out));
71 }
72 }
73}
74
75fn peer_is_available(state: &InitiatorState) -> bool {
76 matches!(state.connection, ConnectionState::Initialized)
77 && matches!(state.blockfetch, blockfetch_proto::State::Idle)
78}
79
80impl PeerVisitor for BlockFetchBehavior {
81 fn visit_inbound_msg(
82 &mut self,
83 pid: &PeerId,
84 state: &mut InitiatorState,
85 outbound: &mut OutboundQueue<InitiatorBehavior>,
86 ) {
87 self.dispatch_block(pid, state, outbound);
88 }
89
90 fn visit_housekeeping(
91 &mut self,
92 pid: &PeerId,
93 state: &mut InitiatorState,
94 outbound: &mut OutboundQueue<InitiatorBehavior>,
95 ) {
96 if self.requests.is_empty() {
97 tracing::trace!("no requests pending");
98 return;
99 }
100
101 if peer_is_available(state) {
102 tracing::debug!("peer looks available");
103
104 if let Some(request) = self.requests.pop_front() {
105 tracing::debug!("granting request to peer");
106 self.request_block_batch(pid, request, outbound);
107 }
108 } else {
109 tracing::warn!("no peer available");
110 }
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117 use crate::OutboundQueue;
118 use crate::protocol::{Point, blockfetch as bf};
119
120 fn drain_outputs(
121 outbound: &mut OutboundQueue<InitiatorBehavior>,
122 ) -> Vec<BehaviorOutput<InitiatorBehavior>> {
123 outbound.drain_ready()
124 }
125
126 #[test]
127 fn enqueue_adds_to_queue() {
128 let mut bf = BlockFetchBehavior::new(());
129 let range = (Point::Origin, Point::new(100, vec![0xAA; 32]));
130
131 bf.enqueue(range);
132 assert_eq!(bf.requests.len(), 1);
133
134 bf.enqueue((Point::Origin, Point::Origin));
135 assert_eq!(bf.requests.len(), 2);
136 }
137
138 #[test]
139 fn dispatch_block_emits_event_when_streaming() {
140 let bf = BlockFetchBehavior::new(());
141 let pid = PeerId::test(1);
142 let mut state = InitiatorState::new();
143 let mut outbound = OutboundQueue::new();
144
145 state.blockfetch = bf::State::Streaming(Some(vec![0xBE; 64]));
146
147 bf.dispatch_block(&pid, &state, &mut outbound);
148
149 let outputs = drain_outputs(&mut outbound);
150 let has_event = outputs.iter().any(|o| {
151 matches!(
152 o,
153 BehaviorOutput::ExternalEvent(InitiatorEvent::BlockBodyReceived(..))
154 )
155 });
156 assert!(has_event, "should emit BlockBodyReceived");
157 }
158
159 #[test]
160 fn dispatch_block_noop_when_idle() {
161 let bf = BlockFetchBehavior::new(());
162 let pid = PeerId::test(1);
163 let state = InitiatorState::new();
164 let mut outbound = OutboundQueue::new();
165
166 bf.dispatch_block(&pid, &state, &mut outbound);
168
169 let outputs = drain_outputs(&mut outbound);
170 assert!(outputs.is_empty());
171 }
172
173 #[test]
174 fn housekeeping_dispatches_request_for_available_peer() {
175 let mut bf = BlockFetchBehavior::new(());
176 let pid = PeerId::test(1);
177 let mut state = InitiatorState::new();
178 let mut outbound = OutboundQueue::new();
179
180 let range = (Point::Origin, Point::new(100, vec![0xAA; 32]));
181 bf.enqueue(range);
182
183 state.connection = ConnectionState::Initialized;
185 state.blockfetch = bf::State::Idle;
186
187 bf.visit_housekeeping(&pid, &mut state, &mut outbound);
188
189 let outputs = drain_outputs(&mut outbound);
190 let has_request = outputs.iter().any(|o| {
191 matches!(
192 o,
193 BehaviorOutput::InterfaceCommand(InterfaceCommand::Send(
194 _,
195 AnyMessage::BlockFetch(bf::Message::RequestRange(_))
196 ))
197 )
198 });
199 assert!(has_request, "should send RequestRange");
200 assert!(
201 bf.requests.is_empty(),
202 "request should be consumed from queue"
203 );
204 }
205}