datex_core/network/
block_handler.rs

1use crate::collections::HashMap;
2use crate::global::dxb_block::{
3    BlockId, DXBBlock, IncomingBlockNumber, IncomingContextId,
4    IncomingEndpointContextId, IncomingEndpointContextSectionId,
5    IncomingSection, IncomingSectionIndex, OutgoingContextId,
6    OutgoingSectionIndex,
7};
8use crate::network::com_interfaces::com_interface_socket::ComInterfaceSocketUUID;
9use crate::std_random::RandomState;
10use crate::stdlib::boxed::Box;
11use crate::stdlib::collections::{BTreeMap, VecDeque};
12use crate::stdlib::rc::Rc;
13use crate::stdlib::vec;
14use crate::stdlib::vec::Vec;
15use crate::task::{
16    UnboundedReceiver, UnboundedSender, create_unbounded_channel,
17};
18use crate::utils::time::Time;
19use core::cell::RefCell;
20use core::fmt::Debug;
21use core::prelude::rust_2024::*;
22use log::info;
23use ringmap::RingMap;
24
25// TODO #170: store scope memory
26#[derive(Debug)]
27pub struct ScopeContext {
28    pub next_section_index: IncomingSectionIndex,
29    pub next_block_number: IncomingBlockNumber,
30    /// timestamp of the last keep alive block
31    /// when a specific time has passed since the timestamp, the scope context is disposed
32    /// TODO #171: implement dispose of scope context
33    pub keep_alive_timestamp: u64,
34    // a reference to the sender for the current section
35    pub current_queue_sender: Option<UnboundedSender<DXBBlock>>,
36    // a cache for all blocks indexed by their block number
37    pub cached_blocks: BTreeMap<IncomingBlockNumber, DXBBlock>,
38}
39
40/// A scope context storing scopes of incoming DXB blocks
41impl Default for ScopeContext {
42    fn default() -> Self {
43        ScopeContext {
44            next_section_index: 0,
45            next_block_number: 0,
46            keep_alive_timestamp: Time::now(),
47            current_queue_sender: None,
48            cached_blocks: BTreeMap::new(),
49        }
50    }
51}
52
53// fn that gets a scope context as callback
54type SectionObserver = Box<dyn FnMut(IncomingSection)>;
55
56#[derive(Clone, Debug)]
57pub struct BlockHistoryData {
58    /// if block originated from local endpoint, the socket uuid is None,
59    /// otherwise it is the uuid of the incoming socket
60    pub original_socket_uuid: Option<ComInterfaceSocketUUID>,
61}
62
63pub struct BlockHandler {
64    pub current_context_id: RefCell<OutgoingContextId>,
65
66    /// a map of active request scopes for incoming blocks
67    pub block_cache: RefCell<HashMap<IncomingEndpointContextId, ScopeContext>>,
68
69    /// a queue of incoming request scopes
70    /// the scopes can be retrieved from the request_scopes map
71    pub incoming_sections_queue: RefCell<VecDeque<IncomingSection>>,
72
73    /// a map of observers for incoming response blocks (by context_id + block_index)
74    /// contains an observer callback and an optional queue of blocks if the response block is a multi-block stream
75    pub section_observers: RefCell<
76        HashMap<(IncomingContextId, IncomingSectionIndex), SectionObserver>,
77    >,
78
79    /// history of all incoming blocks
80    pub incoming_blocks_history:
81        RefCell<RingMap<BlockId, BlockHistoryData, RandomState>>,
82}
83
84impl Debug for BlockHandler {
85    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
86        f.debug_struct("BlockHandler")
87            .field("current_context_id", &self.current_context_id)
88            .field("block_cache", &self.block_cache)
89            .field("incoming_sections_queue", &self.incoming_sections_queue)
90            .field("incoming_blocks_history", &self.incoming_blocks_history)
91            .finish()
92    }
93}
94
95impl Default for BlockHandler {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101impl BlockHandler {
102    pub fn new() -> BlockHandler {
103        BlockHandler {
104            current_context_id: RefCell::new(0),
105            block_cache: RefCell::new(HashMap::new()),
106            incoming_sections_queue: RefCell::new(VecDeque::new()),
107            section_observers: RefCell::new(HashMap::new()),
108            incoming_blocks_history: RefCell::new(
109                RingMap::with_capacity_and_hasher(500, RandomState::default()),
110            ),
111        }
112    }
113
114    /// Adds a block to the history of incoming blocks
115    /// if the block is not already in the history
116    /// returns true if the block was added and not already in the history
117    pub fn add_block_to_history(
118        &self,
119        block: &DXBBlock,
120        original_socket_uuid: Option<ComInterfaceSocketUUID>,
121    ) {
122        let mut history = self.incoming_blocks_history.borrow_mut();
123        let block_id = block.get_block_id();
124        // only add if original block
125        if !history.contains_key(&block_id) {
126            let block_data = BlockHistoryData {
127                original_socket_uuid,
128            };
129            history.insert(block_id, block_data);
130        }
131    }
132
133    /// Checks if a block is already in the history
134    pub fn is_block_in_history(&self, block: &DXBBlock) -> bool {
135        let history = self.incoming_blocks_history.borrow();
136        let block_id = block.get_block_id();
137        history.contains_key(&block_id)
138    }
139
140    pub fn get_block_data_from_history(
141        &self,
142        block: &DXBBlock,
143    ) -> Option<BlockHistoryData> {
144        let history = self.incoming_blocks_history.borrow();
145        let block_id = block.get_block_id();
146        history.get(&block_id).cloned()
147    }
148
149    pub fn handle_incoming_block(&self, block: DXBBlock) {
150        info!("Handling incoming block...");
151        let context_id = block.block_header.context_id;
152        let section_index = block.block_header.section_index;
153        let block_number = block.block_header.block_number;
154        let is_response = block
155            .block_header
156            .flags_and_timestamp
157            .block_type()
158            .is_response();
159
160        info!(
161            "Received block (context={context_id}, section={section_index}, block_nr={block_number})"
162        );
163
164        // handle observers if response block
165        if is_response {
166            self.handle_incoming_response_block(block);
167        } else {
168            self.handle_incoming_request_block(block);
169        }
170    }
171
172    // Handles incoming request blocks by putting them into the request queue
173    fn handle_incoming_request_block(&self, block: DXBBlock) {
174        let new_sections =
175            self.extract_complete_sections_with_new_incoming_block(block);
176        // put into request queue
177        let mut request_queue = self.incoming_sections_queue.borrow_mut();
178        for section in new_sections {
179            request_queue.push_back(section);
180        }
181    }
182
183    /// Handles incoming response blocks by calling the observer if an observer is registered
184    /// Returns true when the observer has consumed all blocks and should be removed
185    fn handle_incoming_response_block(&self, block: DXBBlock) {
186        let context_id = block.block_header.context_id;
187        let endpoint_context_id = IncomingEndpointContextId {
188            sender: block.routing_header.sender.clone(),
189            context_id,
190        };
191        let new_sections =
192            self.extract_complete_sections_with_new_incoming_block(block);
193        // try to call the observer for the incoming response block
194        for section in new_sections {
195            let section_index = section.get_section_index();
196
197            if let Some(observer) = self
198                .section_observers
199                .borrow_mut()
200                .get_mut(&(context_id, section_index))
201            {
202                // call the observer with the new section
203                observer(section);
204            } else {
205                // no observer for this scope id + block index
206                log::warn!(
207                    "No observer for incoming response block (scope={endpoint_context_id:?}, block={section_index}), dropping block"
208                );
209            };
210        }
211    }
212
213    /// Takes a new incoming block and returns a vector of all new available incoming sections
214    /// for the block's scope
215    fn extract_complete_sections_with_new_incoming_block(
216        &self,
217        block: DXBBlock,
218    ) -> Vec<IncomingSection> {
219        let section_index = block.block_header.section_index;
220        let block_number = block.block_header.block_number;
221        let is_end_of_section =
222            block.block_header.flags_and_timestamp.is_end_of_section();
223        let is_end_of_context =
224            block.block_header.flags_and_timestamp.is_end_of_context();
225        let endpoint_context_id = IncomingEndpointContextId {
226            sender: block.routing_header.sender.clone(),
227            context_id: block.block_header.context_id,
228        };
229        let section_context_id = IncomingEndpointContextSectionId::new(
230            endpoint_context_id.clone(),
231            section_index,
232        );
233
234        // get scope context if it already exists
235        let has_scope_context =
236            self.block_cache.borrow().contains_key(&endpoint_context_id);
237
238        // Case 1: shortcut if no scope context exists and the block is a single block
239        if !has_scope_context
240            && block_number == 0
241            && (is_end_of_section || is_end_of_context)
242        {
243            return vec![IncomingSection::SingleBlock((
244                Some(block),
245                section_context_id.clone(),
246            ))];
247        }
248
249        // make sure a scope context exists from here on
250        let mut request_scopes = self.block_cache.borrow_mut();
251        let scope_context = request_scopes
252            .entry(endpoint_context_id.clone())
253            .or_default();
254
255        // TODO #172: what happens if the endpoint has not received all blocks starting with block_number 0?
256        // we should still potentially process those blocks
257
258        // Case 2: if the block is the next expected block in the current section, put it into the
259        // section block queue and try to drain blocks from the cache
260        if block_number == scope_context.next_block_number {
261            // list of IncomingSections that is returned at the end
262            let mut new_blocks = vec![];
263
264            // initial values for loop variables from input block
265            let mut is_end_of_context = is_end_of_context;
266            let mut is_end_of_section = is_end_of_section;
267            let mut next_block = block;
268            let mut section_index = section_index;
269
270            // loop over the input block and potential blocks from the cache until the next block cannot be found
271            // or the end of the scope is reached
272            loop {
273                if let Some(sender) = &mut scope_context.current_queue_sender {
274                    // send the next block to the section queue receiver
275                    sender.start_send(next_block).expect(
276                        "Failed to send block to current section queue",
277                    );
278                } else {
279                    // create a new block queue for the current section
280                    let (mut sender, receiver) = create_unbounded_channel();
281
282                    // add the first block to the queue
283                    new_blocks.push(IncomingSection::BlockStream((
284                        Some(receiver),
285                        IncomingEndpointContextSectionId::new(
286                            endpoint_context_id.clone(),
287                            section_index,
288                        ),
289                    )));
290
291                    // send the next block to the section queue receiver
292                    sender.start_send(next_block).expect(
293                        "Failed to send first block to current section queue",
294                    );
295
296                    scope_context.current_queue_sender = Some(sender);
297                }
298
299                // cleanup / prepare for next block =======================
300                // increment next block number
301                scope_context.next_block_number += 1;
302
303                // if end of scope, remove the scope context
304                if is_end_of_context {
305                    request_scopes.remove(&endpoint_context_id);
306                    break;
307                }
308                // cleanup if section is finished
309                else if is_end_of_section {
310                    // increment section index
311                    scope_context.next_section_index += 1;
312                    // close and remove the current section queue sender
313                    if let Some(sender) =
314                        scope_context.current_queue_sender.take()
315                    {
316                        sender.close_channel();
317                    }
318                }
319                // ========================================================
320
321                // check if next block is in cache for next iteration
322                if let Some(next_cached_block) = scope_context
323                    .cached_blocks
324                    .remove(&scope_context.next_block_number)
325                {
326                    // check if block is end of section
327                    is_end_of_section = next_cached_block
328                        .block_header
329                        .flags_and_timestamp
330                        .is_end_of_section();
331                    // check if block is end of scope
332                    is_end_of_context = next_cached_block
333                        .block_header
334                        .flags_and_timestamp
335                        .is_end_of_context();
336                    // set next block
337                    next_block = next_cached_block;
338
339                    // update section index from next block
340                    section_index = next_block.block_header.section_index;
341                }
342                // no more blocks in cache, break
343                else {
344                    break;
345                }
346            }
347
348            new_blocks
349        }
350        // Case 3: if the block is not the next expected block in the current section,
351        // put it into the block cache
352        else {
353            // check if block is already in cache
354            // TODO #173: this should not happen, we should make sure duplicate blocks are dropped before
355            if scope_context.cached_blocks.contains_key(&block_number) {
356                log::warn!(
357                    "Block {block_number} already in cache, dropping block"
358                );
359            }
360
361            // add block to cache
362            scope_context.cached_blocks.insert(block_number, block);
363
364            vec![]
365        }
366    }
367
368    pub fn get_new_context_id(&self) -> OutgoingContextId {
369        *self.current_context_id.borrow_mut() += 1;
370        *self.current_context_id.borrow()
371    }
372
373    /// Adds a new observer for incoming blocks with a specific scope id and block index
374    /// Returns a receiver that can be awaited to get the incoming sections
375    pub fn register_incoming_block_observer(
376        &self,
377        context_id: OutgoingContextId,
378        section_index: OutgoingSectionIndex,
379    ) -> UnboundedReceiver<IncomingSection> {
380        let (tx, rx) = create_unbounded_channel::<IncomingSection>();
381        let tx = Rc::new(RefCell::new(tx));
382
383        // create observer callback for scope id + block index
384        let observer = move |blocks: IncomingSection| {
385            tx.clone().borrow_mut().start_send(blocks).unwrap();
386        };
387
388        // add new scope observer
389        self.section_observers
390            .borrow_mut()
391            .insert((context_id, section_index), Box::new(observer));
392
393        rx
394    }
395
396    /// Waits for incoming response block with a specific scope id and block index
397    pub async fn wait_for_incoming_response_block(
398        &self,
399        context_id: OutgoingContextId,
400        section_index: OutgoingSectionIndex,
401    ) -> Option<IncomingSection> {
402        let rx =
403            self.register_incoming_block_observer(context_id, section_index);
404        // Await the result from the callback
405        // FIXME #174
406        None
407        // rx.next().await
408    }
409}