1use crate::global::dxb_block::{
2 BlockId, DXBBlock, IncomingBlockNumber, IncomingContextId,
3 IncomingEndpointContextId, IncomingEndpointContextSectionId,
4 IncomingSection, IncomingSectionIndex, OutgoingContextId,
5 OutgoingSectionIndex,
6};
7use crate::network::com_interfaces::com_interface_socket::ComInterfaceSocketUUID;
8use crate::utils::time::Time;
9use futures::channel::mpsc;
10use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
11use log::info;
12use ringmap::RingMap;
13use std::cell::RefCell;
14use std::collections::{BTreeMap, HashMap, VecDeque};
15use std::fmt::Debug;
16use std::rc::Rc;
17#[derive(Debug)]
21pub struct ScopeContext {
22 pub next_section_index: IncomingSectionIndex,
23 pub next_block_number: IncomingBlockNumber,
24 pub keep_alive_timestamp: u64,
28 pub current_queue_sender: Option<UnboundedSender<DXBBlock>>,
30 pub cached_blocks: BTreeMap<IncomingBlockNumber, DXBBlock>,
32}
33
34impl Default for ScopeContext {
36 fn default() -> Self {
37 ScopeContext {
38 next_section_index: 0,
39 next_block_number: 0,
40 keep_alive_timestamp: Time::now(),
41 current_queue_sender: None,
42 cached_blocks: BTreeMap::new(),
43 }
44 }
45}
46
47type SectionObserver = Box<dyn FnMut(IncomingSection)>;
49
50#[derive(Clone, Debug)]
51pub struct BlockHistoryData {
52 pub original_socket_uuid: Option<ComInterfaceSocketUUID>,
55}
56
57pub struct BlockHandler {
58 pub current_context_id: RefCell<OutgoingContextId>,
59
60 pub block_cache: RefCell<HashMap<IncomingEndpointContextId, ScopeContext>>,
62
63 pub incoming_sections_queue: RefCell<VecDeque<IncomingSection>>,
66
67 pub section_observers: RefCell<
70 HashMap<(IncomingContextId, IncomingSectionIndex), SectionObserver>,
71 >,
72
73 pub incoming_blocks_history: RefCell<RingMap<BlockId, BlockHistoryData>>,
75}
76
77impl Debug for BlockHandler {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 f.debug_struct("BlockHandler")
80 .field("current_context_id", &self.current_context_id)
81 .field("block_cache", &self.block_cache)
82 .field("incoming_sections_queue", &self.incoming_sections_queue)
83 .field("incoming_blocks_history", &self.incoming_blocks_history)
84 .finish()
85 }
86}
87
88impl Default for BlockHandler {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl BlockHandler {
95 pub fn new() -> BlockHandler {
96 BlockHandler {
97 current_context_id: RefCell::new(0),
98 block_cache: RefCell::new(HashMap::new()),
99 incoming_sections_queue: RefCell::new(VecDeque::new()),
100 section_observers: RefCell::new(HashMap::new()),
101 incoming_blocks_history: RefCell::new(RingMap::with_capacity(500)),
102 }
103 }
104
105 pub fn add_block_to_history(
109 &self,
110 block: &DXBBlock,
111 original_socket_uuid: Option<ComInterfaceSocketUUID>,
112 ) {
113 let mut history = self.incoming_blocks_history.borrow_mut();
114 let block_id = block.get_block_id();
115 if !history.contains_key(&block_id) {
117 let block_data = BlockHistoryData {
118 original_socket_uuid,
119 };
120 history.insert(block_id, block_data);
121 }
122 }
123
124 pub fn is_block_in_history(&self, block: &DXBBlock) -> bool {
126 let history = self.incoming_blocks_history.borrow();
127 let block_id = block.get_block_id();
128 history.contains_key(&block_id)
129 }
130
131 pub fn get_block_data_from_history(
132 &self,
133 block: &DXBBlock,
134 ) -> Option<BlockHistoryData> {
135 let history = self.incoming_blocks_history.borrow();
136 let block_id = block.get_block_id();
137 history.get(&block_id).cloned()
138 }
139
140 pub fn handle_incoming_block(&self, block: DXBBlock) {
141 info!("Handling incoming block...");
142 let context_id = block.block_header.context_id;
143 let section_index = block.block_header.section_index;
144 let block_number = block.block_header.block_number;
145 let is_response = block
146 .block_header
147 .flags_and_timestamp
148 .block_type()
149 .is_response();
150
151 info!(
152 "Received block (context={context_id}, section={section_index}, block_nr={block_number})"
153 );
154
155 if is_response {
157 self.handle_incoming_response_block(block);
158 } else {
159 self.handle_incoming_request_block(block);
160 }
161 }
162
163 fn handle_incoming_request_block(&self, block: DXBBlock) {
165 let new_sections =
166 self.extract_complete_sections_with_new_incoming_block(block);
167 let mut request_queue = self.incoming_sections_queue.borrow_mut();
169 for section in new_sections {
170 request_queue.push_back(section);
171 }
172 }
173
174 fn handle_incoming_response_block(&self, block: DXBBlock) {
177 let context_id = block.block_header.context_id;
178 let endpoint_context_id = IncomingEndpointContextId {
179 sender: block.routing_header.sender.clone(),
180 context_id,
181 };
182 let new_sections =
183 self.extract_complete_sections_with_new_incoming_block(block);
184 for section in new_sections {
186 let section_index = section.get_section_index();
187
188 if let Some(observer) = self
189 .section_observers
190 .borrow_mut()
191 .get_mut(&(context_id, section_index))
192 {
193 observer(section);
195 } else {
196 log::warn!(
198 "No observer for incoming response block (scope={endpoint_context_id:?}, block={section_index}), dropping block"
199 );
200 };
201 }
202 }
203
204 fn extract_complete_sections_with_new_incoming_block(
207 &self,
208 block: DXBBlock,
209 ) -> Vec<IncomingSection> {
210 let section_index = block.block_header.section_index;
211 let block_number = block.block_header.block_number;
212 let is_end_of_section =
213 block.block_header.flags_and_timestamp.is_end_of_section();
214 let is_end_of_context =
215 block.block_header.flags_and_timestamp.is_end_of_context();
216 let endpoint_context_id = IncomingEndpointContextId {
217 sender: block.routing_header.sender.clone(),
218 context_id: block.block_header.context_id,
219 };
220 let section_context_id = IncomingEndpointContextSectionId::new(
221 endpoint_context_id.clone(),
222 section_index,
223 );
224
225 let has_scope_context =
227 self.block_cache.borrow().contains_key(&endpoint_context_id);
228
229 if !has_scope_context
231 && block_number == 0
232 && (is_end_of_section || is_end_of_context)
233 {
234 return vec![IncomingSection::SingleBlock((
235 Some(block),
236 section_context_id.clone(),
237 ))];
238 }
239
240 let mut request_scopes = self.block_cache.borrow_mut();
242 let scope_context = request_scopes
243 .entry(endpoint_context_id.clone())
244 .or_default();
245
246 if block_number == scope_context.next_block_number {
252 let mut new_blocks = vec![];
254
255 let mut is_end_of_context = is_end_of_context;
257 let mut is_end_of_section = is_end_of_section;
258 let mut next_block = block;
259 let mut section_index = section_index;
260
261 loop {
264 if let Some(sender) = &mut scope_context.current_queue_sender {
265 sender.start_send(next_block).expect(
267 "Failed to send block to current section queue",
268 );
269 } else {
270 let (mut sender, receiver) = mpsc::unbounded();
272
273 new_blocks.push(IncomingSection::BlockStream((
275 Some(receiver),
276 IncomingEndpointContextSectionId::new(
277 endpoint_context_id.clone(),
278 section_index,
279 ),
280 )));
281
282 sender.start_send(next_block).expect(
284 "Failed to send first block to current section queue",
285 );
286
287 scope_context.current_queue_sender = Some(sender);
288 }
289
290 scope_context.next_block_number += 1;
293
294 if is_end_of_context {
296 request_scopes.remove(&endpoint_context_id);
297 break;
298 }
299 else if is_end_of_section {
301 scope_context.next_section_index += 1;
303 if let Some(sender) =
305 scope_context.current_queue_sender.take()
306 {
307 sender.close_channel();
308 }
309 }
310 if let Some(next_cached_block) = scope_context
314 .cached_blocks
315 .remove(&scope_context.next_block_number)
316 {
317 is_end_of_section = next_cached_block
319 .block_header
320 .flags_and_timestamp
321 .is_end_of_section();
322 is_end_of_context = next_cached_block
324 .block_header
325 .flags_and_timestamp
326 .is_end_of_context();
327 next_block = next_cached_block;
329
330 section_index = next_block.block_header.section_index;
332 }
333 else {
335 break;
336 }
337 }
338
339 new_blocks
340 }
341 else {
344 if scope_context.cached_blocks.contains_key(&block_number) {
347 log::warn!(
348 "Block {block_number} already in cache, dropping block"
349 );
350 }
351
352 scope_context.cached_blocks.insert(block_number, block);
354
355 vec![]
356 }
357 }
358
359 pub fn get_new_context_id(&self) -> OutgoingContextId {
360 *self.current_context_id.borrow_mut() += 1;
361 *self.current_context_id.borrow()
362 }
363
364 pub fn register_incoming_block_observer(
367 &self,
368 context_id: OutgoingContextId,
369 section_index: OutgoingSectionIndex,
370 ) -> UnboundedReceiver<IncomingSection> {
371 let (tx, rx) = mpsc::unbounded();
372 let tx = Rc::new(RefCell::new(tx));
373
374 let observer = move |blocks: IncomingSection| {
376 tx.clone().borrow_mut().start_send(blocks).unwrap();
377 };
378
379 self.section_observers
381 .borrow_mut()
382 .insert((context_id, section_index), Box::new(observer));
383
384 rx
385 }
386
387 pub async fn wait_for_incoming_response_block(
389 &self,
390 context_id: OutgoingContextId,
391 section_index: OutgoingSectionIndex,
392 ) -> Option<IncomingSection> {
393 let rx =
394 self.register_incoming_block_observer(context_id, section_index);
395 None
398 }
400}