1use crate::collections::HashMap;
2use crate::global::dxb_block::{
3 BlockId, DXBBlock, IncomingBlockNumber, IncomingContextId,
4 IncomingEndpointContextId, IncomingEndpointContextSectionId,
5 IncomingSection, IncomingSectionIndex, OutgoingContextId,
6 OutgoingSectionIndex,
7};
8use crate::network::com_interfaces::com_interface_socket::ComInterfaceSocketUUID;
9use crate::std_random::RandomState;
10use crate::stdlib::boxed::Box;
11use crate::stdlib::collections::{BTreeMap, VecDeque};
12use crate::stdlib::rc::Rc;
13use crate::stdlib::vec;
14use crate::stdlib::vec::Vec;
15use crate::task::{
16 UnboundedReceiver, UnboundedSender, create_unbounded_channel,
17};
18use crate::utils::time::Time;
19use core::cell::RefCell;
20use core::fmt::Debug;
21use core::prelude::rust_2024::*;
22use log::info;
23use ringmap::RingMap;
24
25#[derive(Debug)]
27pub struct ScopeContext {
28 pub next_section_index: IncomingSectionIndex,
29 pub next_block_number: IncomingBlockNumber,
30 pub keep_alive_timestamp: u64,
34 pub current_queue_sender: Option<UnboundedSender<DXBBlock>>,
36 pub cached_blocks: BTreeMap<IncomingBlockNumber, DXBBlock>,
38}
39
40impl Default for ScopeContext {
42 fn default() -> Self {
43 ScopeContext {
44 next_section_index: 0,
45 next_block_number: 0,
46 keep_alive_timestamp: Time::now(),
47 current_queue_sender: None,
48 cached_blocks: BTreeMap::new(),
49 }
50 }
51}
52
53type SectionObserver = Box<dyn FnMut(IncomingSection)>;
55
56#[derive(Clone, Debug)]
57pub struct BlockHistoryData {
58 pub original_socket_uuid: Option<ComInterfaceSocketUUID>,
61}
62
63pub struct BlockHandler {
64 pub current_context_id: RefCell<OutgoingContextId>,
65
66 pub block_cache: RefCell<HashMap<IncomingEndpointContextId, ScopeContext>>,
68
69 pub incoming_sections_queue: RefCell<VecDeque<IncomingSection>>,
72
73 pub section_observers: RefCell<
76 HashMap<(IncomingContextId, IncomingSectionIndex), SectionObserver>,
77 >,
78
79 pub incoming_blocks_history:
81 RefCell<RingMap<BlockId, BlockHistoryData, RandomState>>,
82}
83
84impl Debug for BlockHandler {
85 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
86 f.debug_struct("BlockHandler")
87 .field("current_context_id", &self.current_context_id)
88 .field("block_cache", &self.block_cache)
89 .field("incoming_sections_queue", &self.incoming_sections_queue)
90 .field("incoming_blocks_history", &self.incoming_blocks_history)
91 .finish()
92 }
93}
94
95impl Default for BlockHandler {
96 fn default() -> Self {
97 Self::new()
98 }
99}
100
101impl BlockHandler {
102 pub fn new() -> BlockHandler {
103 BlockHandler {
104 current_context_id: RefCell::new(0),
105 block_cache: RefCell::new(HashMap::new()),
106 incoming_sections_queue: RefCell::new(VecDeque::new()),
107 section_observers: RefCell::new(HashMap::new()),
108 incoming_blocks_history: RefCell::new(
109 RingMap::with_capacity_and_hasher(500, RandomState::default()),
110 ),
111 }
112 }
113
114 pub fn add_block_to_history(
118 &self,
119 block: &DXBBlock,
120 original_socket_uuid: Option<ComInterfaceSocketUUID>,
121 ) {
122 let mut history = self.incoming_blocks_history.borrow_mut();
123 let block_id = block.get_block_id();
124 if !history.contains_key(&block_id) {
126 let block_data = BlockHistoryData {
127 original_socket_uuid,
128 };
129 history.insert(block_id, block_data);
130 }
131 }
132
133 pub fn is_block_in_history(&self, block: &DXBBlock) -> bool {
135 let history = self.incoming_blocks_history.borrow();
136 let block_id = block.get_block_id();
137 history.contains_key(&block_id)
138 }
139
140 pub fn get_block_data_from_history(
141 &self,
142 block: &DXBBlock,
143 ) -> Option<BlockHistoryData> {
144 let history = self.incoming_blocks_history.borrow();
145 let block_id = block.get_block_id();
146 history.get(&block_id).cloned()
147 }
148
149 pub fn handle_incoming_block(&self, block: DXBBlock) {
150 info!("Handling incoming block...");
151 let context_id = block.block_header.context_id;
152 let section_index = block.block_header.section_index;
153 let block_number = block.block_header.block_number;
154 let is_response = block
155 .block_header
156 .flags_and_timestamp
157 .block_type()
158 .is_response();
159
160 info!(
161 "Received block (context={context_id}, section={section_index}, block_nr={block_number})"
162 );
163
164 if is_response {
166 self.handle_incoming_response_block(block);
167 } else {
168 self.handle_incoming_request_block(block);
169 }
170 }
171
172 fn handle_incoming_request_block(&self, block: DXBBlock) {
174 let new_sections =
175 self.extract_complete_sections_with_new_incoming_block(block);
176 let mut request_queue = self.incoming_sections_queue.borrow_mut();
178 for section in new_sections {
179 request_queue.push_back(section);
180 }
181 }
182
183 fn handle_incoming_response_block(&self, block: DXBBlock) {
186 let context_id = block.block_header.context_id;
187 let endpoint_context_id = IncomingEndpointContextId {
188 sender: block.routing_header.sender.clone(),
189 context_id,
190 };
191 let new_sections =
192 self.extract_complete_sections_with_new_incoming_block(block);
193 for section in new_sections {
195 let section_index = section.get_section_index();
196
197 if let Some(observer) = self
198 .section_observers
199 .borrow_mut()
200 .get_mut(&(context_id, section_index))
201 {
202 observer(section);
204 } else {
205 log::warn!(
207 "No observer for incoming response block (scope={endpoint_context_id:?}, block={section_index}), dropping block"
208 );
209 };
210 }
211 }
212
213 fn extract_complete_sections_with_new_incoming_block(
216 &self,
217 block: DXBBlock,
218 ) -> Vec<IncomingSection> {
219 let section_index = block.block_header.section_index;
220 let block_number = block.block_header.block_number;
221 let is_end_of_section =
222 block.block_header.flags_and_timestamp.is_end_of_section();
223 let is_end_of_context =
224 block.block_header.flags_and_timestamp.is_end_of_context();
225 let endpoint_context_id = IncomingEndpointContextId {
226 sender: block.routing_header.sender.clone(),
227 context_id: block.block_header.context_id,
228 };
229 let section_context_id = IncomingEndpointContextSectionId::new(
230 endpoint_context_id.clone(),
231 section_index,
232 );
233
234 let has_scope_context =
236 self.block_cache.borrow().contains_key(&endpoint_context_id);
237
238 if !has_scope_context
240 && block_number == 0
241 && (is_end_of_section || is_end_of_context)
242 {
243 return vec![IncomingSection::SingleBlock((
244 Some(block),
245 section_context_id.clone(),
246 ))];
247 }
248
249 let mut request_scopes = self.block_cache.borrow_mut();
251 let scope_context = request_scopes
252 .entry(endpoint_context_id.clone())
253 .or_default();
254
255 if block_number == scope_context.next_block_number {
261 let mut new_blocks = vec![];
263
264 let mut is_end_of_context = is_end_of_context;
266 let mut is_end_of_section = is_end_of_section;
267 let mut next_block = block;
268 let mut section_index = section_index;
269
270 loop {
273 if let Some(sender) = &mut scope_context.current_queue_sender {
274 sender.start_send(next_block).expect(
276 "Failed to send block to current section queue",
277 );
278 } else {
279 let (mut sender, receiver) = create_unbounded_channel();
281
282 new_blocks.push(IncomingSection::BlockStream((
284 Some(receiver),
285 IncomingEndpointContextSectionId::new(
286 endpoint_context_id.clone(),
287 section_index,
288 ),
289 )));
290
291 sender.start_send(next_block).expect(
293 "Failed to send first block to current section queue",
294 );
295
296 scope_context.current_queue_sender = Some(sender);
297 }
298
299 scope_context.next_block_number += 1;
302
303 if is_end_of_context {
305 request_scopes.remove(&endpoint_context_id);
306 break;
307 }
308 else if is_end_of_section {
310 scope_context.next_section_index += 1;
312 if let Some(sender) =
314 scope_context.current_queue_sender.take()
315 {
316 sender.close_channel();
317 }
318 }
319 if let Some(next_cached_block) = scope_context
323 .cached_blocks
324 .remove(&scope_context.next_block_number)
325 {
326 is_end_of_section = next_cached_block
328 .block_header
329 .flags_and_timestamp
330 .is_end_of_section();
331 is_end_of_context = next_cached_block
333 .block_header
334 .flags_and_timestamp
335 .is_end_of_context();
336 next_block = next_cached_block;
338
339 section_index = next_block.block_header.section_index;
341 }
342 else {
344 break;
345 }
346 }
347
348 new_blocks
349 }
350 else {
353 if scope_context.cached_blocks.contains_key(&block_number) {
356 log::warn!(
357 "Block {block_number} already in cache, dropping block"
358 );
359 }
360
361 scope_context.cached_blocks.insert(block_number, block);
363
364 vec![]
365 }
366 }
367
368 pub fn get_new_context_id(&self) -> OutgoingContextId {
369 *self.current_context_id.borrow_mut() += 1;
370 *self.current_context_id.borrow()
371 }
372
373 pub fn register_incoming_block_observer(
376 &self,
377 context_id: OutgoingContextId,
378 section_index: OutgoingSectionIndex,
379 ) -> UnboundedReceiver<IncomingSection> {
380 let (tx, rx) = create_unbounded_channel::<IncomingSection>();
381 let tx = Rc::new(RefCell::new(tx));
382
383 let observer = move |blocks: IncomingSection| {
385 tx.clone().borrow_mut().start_send(blocks).unwrap();
386 };
387
388 self.section_observers
390 .borrow_mut()
391 .insert((context_id, section_index), Box::new(observer));
392
393 rx
394 }
395
396 pub async fn wait_for_incoming_response_block(
398 &self,
399 context_id: OutgoingContextId,
400 section_index: OutgoingSectionIndex,
401 ) -> Option<IncomingSection> {
402 let rx =
403 self.register_incoming_block_observer(context_id, section_index);
404 None
407 }
409}