Skip to main content

pallas_network2/behavior/initiator/
blockfetch.rs

1use std::collections::VecDeque;
2
3use crate::protocol::blockfetch as blockfetch_proto;
4
5use crate::{
6    BehaviorOutput, InterfaceCommand, OutboundQueue, PeerId,
7    behavior::{AnyMessage, BlockRange, ConnectionState},
8};
9
10use super::{InitiatorBehavior, InitiatorEvent, InitiatorState, PeerVisitor};
11
12/// Configuration for the block-fetch sub-behavior (currently unused).
13pub type BlockFetchConfig = ();
14
15/// A block-fetch request, defined as a range of points.
16pub type Request = BlockRange;
17
18/// Sub-behavior that manages block fetching from peers.
19pub struct BlockFetchBehavior {
20    //config: BlockFetchConfig,
21    requests: VecDeque<Request>,
22}
23
24impl Default for BlockFetchBehavior {
25    fn default() -> Self {
26        Self::new(())
27    }
28}
29
30impl BlockFetchBehavior {
31    /// Creates a new block-fetch behavior with the given configuration.
32    pub fn new(_config: BlockFetchConfig) -> Self {
33        Self {
34            requests: VecDeque::new(),
35        }
36    }
37
38    /// Adds a block range request to the pending queue.
39    pub fn enqueue(&mut self, request: Request) {
40        self.requests.push_back(request);
41        tracing::info!(total = self.requests.len(), "new request");
42    }
43
44    /// Sends a block range request to the specified peer.
45    pub fn request_block_batch(
46        &self,
47        pid: &PeerId,
48        range: BlockRange,
49        outbound: &mut OutboundQueue<super::InitiatorBehavior>,
50    ) {
51        tracing::info!("requesting block batch");
52
53        outbound.push_ready(BehaviorOutput::InterfaceCommand(InterfaceCommand::Send(
54            pid.clone(),
55            AnyMessage::BlockFetch(blockfetch_proto::Message::RequestRange(range)),
56        )));
57    }
58
59    /// Emits a [`BlockBodyReceived`](super::InitiatorEvent::BlockBodyReceived)
60    /// event if the peer's block-fetch state contains a new block.
61    pub fn dispatch_block(
62        &self,
63        pid: &PeerId,
64        state: &InitiatorState,
65        outbound: &mut OutboundQueue<super::InitiatorBehavior>,
66    ) {
67        if let blockfetch_proto::State::Streaming(Some(block)) = &state.blockfetch {
68            let out = InitiatorEvent::BlockBodyReceived(pid.clone(), block.clone());
69
70            outbound.push_ready(BehaviorOutput::ExternalEvent(out));
71        }
72    }
73}
74
75fn peer_is_available(state: &InitiatorState) -> bool {
76    matches!(state.connection, ConnectionState::Initialized)
77        && matches!(state.blockfetch, blockfetch_proto::State::Idle)
78}
79
80impl PeerVisitor for BlockFetchBehavior {
81    fn visit_inbound_msg(
82        &mut self,
83        pid: &PeerId,
84        state: &mut InitiatorState,
85        outbound: &mut OutboundQueue<InitiatorBehavior>,
86    ) {
87        self.dispatch_block(pid, state, outbound);
88    }
89
90    fn visit_housekeeping(
91        &mut self,
92        pid: &PeerId,
93        state: &mut InitiatorState,
94        outbound: &mut OutboundQueue<InitiatorBehavior>,
95    ) {
96        if self.requests.is_empty() {
97            tracing::trace!("no requests pending");
98            return;
99        }
100
101        if peer_is_available(state) {
102            tracing::debug!("peer looks available");
103
104            if let Some(request) = self.requests.pop_front() {
105                tracing::debug!("granting request to peer");
106                self.request_block_batch(pid, request, outbound);
107            }
108        } else {
109            tracing::warn!("no peer available");
110        }
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117    use crate::OutboundQueue;
118    use crate::protocol::{Point, blockfetch as bf};
119
120    fn drain_outputs(
121        outbound: &mut OutboundQueue<InitiatorBehavior>,
122    ) -> Vec<BehaviorOutput<InitiatorBehavior>> {
123        outbound.drain_ready()
124    }
125
126    #[test]
127    fn enqueue_adds_to_queue() {
128        let mut bf = BlockFetchBehavior::new(());
129        let range = (Point::Origin, Point::new(100, vec![0xAA; 32]));
130
131        bf.enqueue(range);
132        assert_eq!(bf.requests.len(), 1);
133
134        bf.enqueue((Point::Origin, Point::Origin));
135        assert_eq!(bf.requests.len(), 2);
136    }
137
138    #[test]
139    fn dispatch_block_emits_event_when_streaming() {
140        let bf = BlockFetchBehavior::new(());
141        let pid = PeerId::test(1);
142        let mut state = InitiatorState::new();
143        let mut outbound = OutboundQueue::new();
144
145        state.blockfetch = bf::State::Streaming(Some(vec![0xBE; 64]));
146
147        bf.dispatch_block(&pid, &state, &mut outbound);
148
149        let outputs = drain_outputs(&mut outbound);
150        let has_event = outputs.iter().any(|o| {
151            matches!(
152                o,
153                BehaviorOutput::ExternalEvent(InitiatorEvent::BlockBodyReceived(..))
154            )
155        });
156        assert!(has_event, "should emit BlockBodyReceived");
157    }
158
159    #[test]
160    fn dispatch_block_noop_when_idle() {
161        let bf = BlockFetchBehavior::new(());
162        let pid = PeerId::test(1);
163        let state = InitiatorState::new();
164        let mut outbound = OutboundQueue::new();
165
166        // Default state is Idle
167        bf.dispatch_block(&pid, &state, &mut outbound);
168
169        let outputs = drain_outputs(&mut outbound);
170        assert!(outputs.is_empty());
171    }
172
173    #[test]
174    fn housekeeping_dispatches_request_for_available_peer() {
175        let mut bf = BlockFetchBehavior::new(());
176        let pid = PeerId::test(1);
177        let mut state = InitiatorState::new();
178        let mut outbound = OutboundQueue::new();
179
180        let range = (Point::Origin, Point::new(100, vec![0xAA; 32]));
181        bf.enqueue(range);
182
183        // Peer must be Initialized + blockfetch Idle
184        state.connection = ConnectionState::Initialized;
185        state.blockfetch = bf::State::Idle;
186
187        bf.visit_housekeeping(&pid, &mut state, &mut outbound);
188
189        let outputs = drain_outputs(&mut outbound);
190        let has_request = outputs.iter().any(|o| {
191            matches!(
192                o,
193                BehaviorOutput::InterfaceCommand(InterfaceCommand::Send(
194                    _,
195                    AnyMessage::BlockFetch(bf::Message::RequestRange(_))
196                ))
197            )
198        });
199        assert!(has_request, "should send RequestRange");
200        assert!(
201            bf.requests.is_empty(),
202            "request should be consumed from queue"
203        );
204    }
205}