datex_core/network/
block_handler.rs1use crate::global::dxb_block::{BlockId, DXBBlock, IncomingBlockNumber, IncomingContextId, IncomingEndpointContextId, IncomingEndpointContextSectionId, IncomingSection, IncomingSectionIndex, OutgoingContextId, OutgoingSectionIndex};
2use crate::network::com_interfaces::com_interface_socket::ComInterfaceSocketUUID;
3use crate::runtime::global_context::get_global_context;
4use futures::channel::mpsc;
5use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
6use log::info;
7use ringmap::RingMap;
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap, VecDeque};
10use std::fmt::Debug;
11use std::rc::Rc;
12#[derive(Debug)]
16pub struct ScopeContext {
17 pub next_section_index: IncomingSectionIndex,
18 pub next_block_number: IncomingBlockNumber,
19 pub keep_alive_timestamp: u64,
23 pub current_queue_sender: Option<UnboundedSender<DXBBlock>>,
25 pub cached_blocks: BTreeMap<IncomingBlockNumber, DXBBlock>,
27}
28
29impl Default for ScopeContext {
31 fn default() -> Self {
32 ScopeContext {
33 next_section_index: 0,
34 next_block_number: 0,
35 keep_alive_timestamp: get_global_context()
36 .time
37 .lock()
38 .unwrap()
39 .now(),
40 current_queue_sender: None,
41 cached_blocks: BTreeMap::new(),
42 }
43 }
44}
45
46type SectionObserver = Box<dyn FnMut(IncomingSection)>;
48
49#[derive(Clone, Debug)]
50pub struct BlockHistoryData {
51 pub original_socket_uuid: Option<ComInterfaceSocketUUID>,
54}
55
56pub struct BlockHandler {
57 pub current_context_id: RefCell<OutgoingContextId>,
58
59 pub block_cache: RefCell<HashMap<IncomingEndpointContextId, ScopeContext>>,
61
62 pub incoming_sections_queue: RefCell<VecDeque<IncomingSection>>,
65
66 pub section_observers: RefCell<
69 HashMap<(IncomingContextId, IncomingSectionIndex), SectionObserver>,
70 >,
71
72 pub incoming_blocks_history: RefCell<RingMap<BlockId, BlockHistoryData>>,
74}
75
76impl Debug for BlockHandler {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("BlockHandler")
79 .field("current_context_id", &self.current_context_id)
80 .field("block_cache", &self.block_cache)
81 .field("incoming_sections_queue", &self.incoming_sections_queue)
82 .field("incoming_blocks_history", &self.incoming_blocks_history)
83 .finish()
84 }
85}
86
87impl Default for BlockHandler {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93impl BlockHandler {
94 pub fn new() -> BlockHandler {
95 BlockHandler {
96 current_context_id: RefCell::new(0),
97 block_cache: RefCell::new(HashMap::new()),
98 incoming_sections_queue: RefCell::new(VecDeque::new()),
99 section_observers: RefCell::new(HashMap::new()),
100 incoming_blocks_history: RefCell::new(RingMap::with_capacity(500)),
101 }
102 }
103
104 pub fn add_block_to_history(
108 &self,
109 block: &DXBBlock,
110 original_socket_uuid: Option<ComInterfaceSocketUUID>,
111 ) {
112 let mut history = self.incoming_blocks_history.borrow_mut();
113 let block_id = block.get_block_id();
114 if !history.contains_key(&block_id) {
116 let block_data = BlockHistoryData {
117 original_socket_uuid,
118 };
119 history.insert(block_id, block_data);
120 }
121 }
122
123 pub fn is_block_in_history(&self, block: &DXBBlock) -> bool {
125 let history = self.incoming_blocks_history.borrow();
126 let block_id = block.get_block_id();
127 history.contains_key(&block_id)
128 }
129
130 pub fn get_block_data_from_history(
131 &self,
132 block: &DXBBlock,
133 ) -> Option<BlockHistoryData> {
134 let history = self.incoming_blocks_history.borrow();
135 let block_id = block.get_block_id();
136 history.get(&block_id).cloned()
137 }
138
139 pub fn handle_incoming_block(&self, block: DXBBlock) {
140 info!("Handling incoming block...");
141 let context_id = block.block_header.context_id;
142 let section_index = block.block_header.section_index;
143 let block_number = block.block_header.block_number;
144 let is_response = block
145 .block_header
146 .flags_and_timestamp
147 .block_type()
148 .is_response();
149
150 info!("Received block (context={context_id}, section={section_index}, block_nr={block_number})");
151
152 if is_response {
154 self.handle_incoming_response_block(block);
155 } else {
156 self.handle_incoming_request_block(block);
157 }
158 }
159
160 fn handle_incoming_request_block(&self, block: DXBBlock) {
162 let new_sections =
163 self.extract_complete_sections_with_new_incoming_block(block);
164 let mut request_queue = self.incoming_sections_queue.borrow_mut();
166 for section in new_sections {
167 request_queue.push_back(section);
168 }
169 }
170
171 fn handle_incoming_response_block(&self, block: DXBBlock) {
174 let context_id = block.block_header.context_id;
175 let endpoint_context_id = IncomingEndpointContextId {
176 sender: block.routing_header.sender.clone(),
177 context_id,
178 };
179 let new_sections =
180 self.extract_complete_sections_with_new_incoming_block(block);
181 for section in new_sections {
183 let section_index = section.get_section_index();
184
185 if let Some(observer) = self
186 .section_observers
187 .borrow_mut()
188 .get_mut(&(context_id, section_index))
189 {
190 observer(section);
192 } else {
193 log::warn!("No observer for incoming response block (scope={endpoint_context_id:?}, block={section_index}), dropping block");
195 };
196 }
197 }
198
199 fn extract_complete_sections_with_new_incoming_block(
202 &self,
203 block: DXBBlock,
204 ) -> Vec<IncomingSection> {
205 let section_index = block.block_header.section_index;
206 let block_number = block.block_header.block_number;
207 let is_end_of_section =
208 block.block_header.flags_and_timestamp.is_end_of_section();
209 let is_end_of_context =
210 block.block_header.flags_and_timestamp.is_end_of_context();
211 let endpoint_context_id = IncomingEndpointContextId {
212 sender: block.routing_header.sender.clone(),
213 context_id: block.block_header.context_id,
214 };
215 let section_context_id = IncomingEndpointContextSectionId::new(endpoint_context_id.clone(), section_index);
216
217 let has_scope_context =
219 self.block_cache.borrow().contains_key(&endpoint_context_id);
220
221
222 if !has_scope_context
224 && block_number == 0
225 && (is_end_of_section || is_end_of_context)
226 {
227 return vec![IncomingSection::SingleBlock((Some(block), section_context_id.clone()))];
228 }
229
230 let mut request_scopes = self.block_cache.borrow_mut();
232 let scope_context = request_scopes
233 .entry(endpoint_context_id.clone())
234 .or_default();
235
236 if block_number == scope_context.next_block_number {
242 let mut new_blocks = vec![];
244
245 let mut is_end_of_context = is_end_of_context;
247 let mut is_end_of_section = is_end_of_section;
248 let mut next_block = block;
249 let mut section_index = section_index;
250
251 loop {
254 if let Some(sender ) = &mut scope_context.current_queue_sender {
255 sender.start_send(next_block)
257 .expect("Failed to send block to current section queue");
258 }
259 else {
260 let (mut sender, receiver) = mpsc::unbounded();
262
263 new_blocks.push(IncomingSection::BlockStream((
265 Some(receiver),
266 IncomingEndpointContextSectionId::new(endpoint_context_id.clone(), section_index),
267 )));
268
269 sender.start_send(next_block)
271 .expect("Failed to send first block to current section queue");
272
273 scope_context.current_queue_sender = Some(sender);
274 }
275
276 scope_context.next_block_number += 1;
279
280 if is_end_of_context {
282 request_scopes.remove(&endpoint_context_id);
283 break;
284 }
285 else if is_end_of_section {
287 scope_context.next_section_index += 1;
289 if let Some(sender) = scope_context.current_queue_sender.take() {
291 sender.close_channel();
292 }
293 }
294 if let Some(next_cached_block) = scope_context
298 .cached_blocks
299 .remove(&scope_context.next_block_number)
300 {
301 is_end_of_section = next_cached_block
303 .block_header
304 .flags_and_timestamp
305 .is_end_of_section();
306 is_end_of_context = next_cached_block
308 .block_header
309 .flags_and_timestamp
310 .is_end_of_context();
311 next_block = next_cached_block;
313
314 section_index = next_block.block_header.section_index;
316 }
317 else {
319 break;
320 }
321 }
322
323 new_blocks
324 }
325 else {
328 if scope_context.cached_blocks.contains_key(&block_number) {
331 log::warn!(
332 "Block {block_number} already in cache, dropping block"
333 );
334 }
335
336 scope_context.cached_blocks.insert(block_number, block);
338
339 vec![]
340 }
341 }
342
343 pub fn get_new_context_id(&self) -> OutgoingContextId {
344 *self.current_context_id.borrow_mut() += 1;
345 *self.current_context_id.borrow()
346 }
347
348 pub fn register_incoming_block_observer(
351 &self,
352 context_id: OutgoingContextId,
353 section_index: OutgoingSectionIndex,
354 ) -> UnboundedReceiver<IncomingSection> {
355 let (tx, rx) = mpsc::unbounded();
356 let tx = Rc::new(RefCell::new(tx));
357
358 let observer = move |blocks: IncomingSection| {
360 tx.clone().borrow_mut().start_send(blocks).unwrap();
361 };
362
363 self.section_observers
365 .borrow_mut()
366 .insert((context_id, section_index), Box::new(observer));
367
368 rx
369 }
370
371 pub async fn wait_for_incoming_response_block(
373 &self,
374 context_id: OutgoingContextId,
375 section_index: OutgoingSectionIndex,
376 ) -> Option<IncomingSection> {
377 let rx =
378 self.register_incoming_block_observer(context_id, section_index);
379 None
382 }
384}