datex_core/network/
block_handler.rs

1use crate::global::dxb_block::{
2    BlockId, DXBBlock, IncomingBlockNumber, IncomingContextId,
3    IncomingEndpointContextId, IncomingEndpointContextSectionId,
4    IncomingSection, IncomingSectionIndex, OutgoingContextId,
5    OutgoingSectionIndex,
6};
7use crate::network::com_interfaces::com_interface_socket::ComInterfaceSocketUUID;
8use crate::utils::time::Time;
9use futures::channel::mpsc;
10use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
11use log::info;
12use ringmap::RingMap;
13use std::cell::RefCell;
14use std::collections::{BTreeMap, HashMap, VecDeque};
15use std::fmt::Debug;
16use std::rc::Rc;
17// use tokio_stream::StreamExt;
18
19// TODO #170: store scope memory
20#[derive(Debug)]
21pub struct ScopeContext {
22    pub next_section_index: IncomingSectionIndex,
23    pub next_block_number: IncomingBlockNumber,
24    /// timestamp of the last keep alive block
25    /// when a specific time has passed since the timestamp, the scope context is disposed
26    /// TODO #171: implement dispose of scope context
27    pub keep_alive_timestamp: u64,
28    // a reference to the sender for the current section
29    pub current_queue_sender: Option<UnboundedSender<DXBBlock>>,
30    // a cache for all blocks indexed by their block number
31    pub cached_blocks: BTreeMap<IncomingBlockNumber, DXBBlock>,
32}
33
34/// A scope context storing scopes of incoming DXB blocks
35impl Default for ScopeContext {
36    fn default() -> Self {
37        ScopeContext {
38            next_section_index: 0,
39            next_block_number: 0,
40            keep_alive_timestamp: Time::now(),
41            current_queue_sender: None,
42            cached_blocks: BTreeMap::new(),
43        }
44    }
45}
46
47// fn that gets a scope context as callback
48type SectionObserver = Box<dyn FnMut(IncomingSection)>;
49
50#[derive(Clone, Debug)]
51pub struct BlockHistoryData {
52    /// if block originated from local endpoint, the socket uuid is None,
53    /// otherwise it is the uuid of the incoming socket
54    pub original_socket_uuid: Option<ComInterfaceSocketUUID>,
55}
56
57pub struct BlockHandler {
58    pub current_context_id: RefCell<OutgoingContextId>,
59
60    /// a map of active request scopes for incoming blocks
61    pub block_cache: RefCell<HashMap<IncomingEndpointContextId, ScopeContext>>,
62
63    /// a queue of incoming request scopes
64    /// the scopes can be retrieved from the request_scopes map
65    pub incoming_sections_queue: RefCell<VecDeque<IncomingSection>>,
66
67    /// a map of observers for incoming response blocks (by context_id + block_index)
68    /// contains an observer callback and an optional queue of blocks if the response block is a multi-block stream
69    pub section_observers: RefCell<
70        HashMap<(IncomingContextId, IncomingSectionIndex), SectionObserver>,
71    >,
72
73    /// history of all incoming blocks
74    pub incoming_blocks_history: RefCell<RingMap<BlockId, BlockHistoryData>>,
75}
76
77impl Debug for BlockHandler {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        f.debug_struct("BlockHandler")
80            .field("current_context_id", &self.current_context_id)
81            .field("block_cache", &self.block_cache)
82            .field("incoming_sections_queue", &self.incoming_sections_queue)
83            .field("incoming_blocks_history", &self.incoming_blocks_history)
84            .finish()
85    }
86}
87
88impl Default for BlockHandler {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl BlockHandler {
95    pub fn new() -> BlockHandler {
96        BlockHandler {
97            current_context_id: RefCell::new(0),
98            block_cache: RefCell::new(HashMap::new()),
99            incoming_sections_queue: RefCell::new(VecDeque::new()),
100            section_observers: RefCell::new(HashMap::new()),
101            incoming_blocks_history: RefCell::new(RingMap::with_capacity(500)),
102        }
103    }
104
105    /// Adds a block to the history of incoming blocks
106    /// if the block is not already in the history
107    /// returns true if the block was added and not already in the history
108    pub fn add_block_to_history(
109        &self,
110        block: &DXBBlock,
111        original_socket_uuid: Option<ComInterfaceSocketUUID>,
112    ) {
113        let mut history = self.incoming_blocks_history.borrow_mut();
114        let block_id = block.get_block_id();
115        // only add if original block
116        if !history.contains_key(&block_id) {
117            let block_data = BlockHistoryData {
118                original_socket_uuid,
119            };
120            history.insert(block_id, block_data);
121        }
122    }
123
124    /// Checks if a block is already in the history
125    pub fn is_block_in_history(&self, block: &DXBBlock) -> bool {
126        let history = self.incoming_blocks_history.borrow();
127        let block_id = block.get_block_id();
128        history.contains_key(&block_id)
129    }
130
131    pub fn get_block_data_from_history(
132        &self,
133        block: &DXBBlock,
134    ) -> Option<BlockHistoryData> {
135        let history = self.incoming_blocks_history.borrow();
136        let block_id = block.get_block_id();
137        history.get(&block_id).cloned()
138    }
139
140    pub fn handle_incoming_block(&self, block: DXBBlock) {
141        info!("Handling incoming block...");
142        let context_id = block.block_header.context_id;
143        let section_index = block.block_header.section_index;
144        let block_number = block.block_header.block_number;
145        let is_response = block
146            .block_header
147            .flags_and_timestamp
148            .block_type()
149            .is_response();
150
151        info!(
152            "Received block (context={context_id}, section={section_index}, block_nr={block_number})"
153        );
154
155        // handle observers if response block
156        if is_response {
157            self.handle_incoming_response_block(block);
158        } else {
159            self.handle_incoming_request_block(block);
160        }
161    }
162
163    // Handles incoming request blocks by putting them into the request queue
164    fn handle_incoming_request_block(&self, block: DXBBlock) {
165        let new_sections =
166            self.extract_complete_sections_with_new_incoming_block(block);
167        // put into request queue
168        let mut request_queue = self.incoming_sections_queue.borrow_mut();
169        for section in new_sections {
170            request_queue.push_back(section);
171        }
172    }
173
174    /// Handles incoming response blocks by calling the observer if an observer is registered
175    /// Returns true when the observer has consumed all blocks and should be removed
176    fn handle_incoming_response_block(&self, block: DXBBlock) {
177        let context_id = block.block_header.context_id;
178        let endpoint_context_id = IncomingEndpointContextId {
179            sender: block.routing_header.sender.clone(),
180            context_id,
181        };
182        let new_sections =
183            self.extract_complete_sections_with_new_incoming_block(block);
184        // try to call the observer for the incoming response block
185        for section in new_sections {
186            let section_index = section.get_section_index();
187
188            if let Some(observer) = self
189                .section_observers
190                .borrow_mut()
191                .get_mut(&(context_id, section_index))
192            {
193                // call the observer with the new section
194                observer(section);
195            } else {
196                // no observer for this scope id + block index
197                log::warn!(
198                    "No observer for incoming response block (scope={endpoint_context_id:?}, block={section_index}), dropping block"
199                );
200            };
201        }
202    }
203
204    /// Takes a new incoming block and returns a vector of all new available incoming sections
205    /// for the block's scope
206    fn extract_complete_sections_with_new_incoming_block(
207        &self,
208        block: DXBBlock,
209    ) -> Vec<IncomingSection> {
210        let section_index = block.block_header.section_index;
211        let block_number = block.block_header.block_number;
212        let is_end_of_section =
213            block.block_header.flags_and_timestamp.is_end_of_section();
214        let is_end_of_context =
215            block.block_header.flags_and_timestamp.is_end_of_context();
216        let endpoint_context_id = IncomingEndpointContextId {
217            sender: block.routing_header.sender.clone(),
218            context_id: block.block_header.context_id,
219        };
220        let section_context_id = IncomingEndpointContextSectionId::new(
221            endpoint_context_id.clone(),
222            section_index,
223        );
224
225        // get scope context if it already exists
226        let has_scope_context =
227            self.block_cache.borrow().contains_key(&endpoint_context_id);
228
229        // Case 1: shortcut if no scope context exists and the block is a single block
230        if !has_scope_context
231            && block_number == 0
232            && (is_end_of_section || is_end_of_context)
233        {
234            return vec![IncomingSection::SingleBlock((
235                Some(block),
236                section_context_id.clone(),
237            ))];
238        }
239
240        // make sure a scope context exists from here on
241        let mut request_scopes = self.block_cache.borrow_mut();
242        let scope_context = request_scopes
243            .entry(endpoint_context_id.clone())
244            .or_default();
245
246        // TODO #172: what happens if the endpoint has not received all blocks starting with block_number 0?
247        // we should still potentially process those blocks
248
249        // Case 2: if the block is the next expected block in the current section, put it into the
250        // section block queue and try to drain blocks from the cache
251        if block_number == scope_context.next_block_number {
252            // list of IncomingSections that is returned at the end
253            let mut new_blocks = vec![];
254
255            // initial values for loop variables from input block
256            let mut is_end_of_context = is_end_of_context;
257            let mut is_end_of_section = is_end_of_section;
258            let mut next_block = block;
259            let mut section_index = section_index;
260
261            // loop over the input block and potential blocks from the cache until the next block cannot be found
262            // or the end of the scope is reached
263            loop {
264                if let Some(sender) = &mut scope_context.current_queue_sender {
265                    // send the next block to the section queue receiver
266                    sender.start_send(next_block).expect(
267                        "Failed to send block to current section queue",
268                    );
269                } else {
270                    // create a new block queue for the current section
271                    let (mut sender, receiver) = mpsc::unbounded();
272
273                    // add the first block to the queue
274                    new_blocks.push(IncomingSection::BlockStream((
275                        Some(receiver),
276                        IncomingEndpointContextSectionId::new(
277                            endpoint_context_id.clone(),
278                            section_index,
279                        ),
280                    )));
281
282                    // send the next block to the section queue receiver
283                    sender.start_send(next_block).expect(
284                        "Failed to send first block to current section queue",
285                    );
286
287                    scope_context.current_queue_sender = Some(sender);
288                }
289
290                // cleanup / prepare for next block =======================
291                // increment next block number
292                scope_context.next_block_number += 1;
293
294                // if end of scope, remove the scope context
295                if is_end_of_context {
296                    request_scopes.remove(&endpoint_context_id);
297                    break;
298                }
299                // cleanup if section is finished
300                else if is_end_of_section {
301                    // increment section index
302                    scope_context.next_section_index += 1;
303                    // close and remove the current section queue sender
304                    if let Some(sender) =
305                        scope_context.current_queue_sender.take()
306                    {
307                        sender.close_channel();
308                    }
309                }
310                // ========================================================
311
312                // check if next block is in cache for next iteration
313                if let Some(next_cached_block) = scope_context
314                    .cached_blocks
315                    .remove(&scope_context.next_block_number)
316                {
317                    // check if block is end of section
318                    is_end_of_section = next_cached_block
319                        .block_header
320                        .flags_and_timestamp
321                        .is_end_of_section();
322                    // check if block is end of scope
323                    is_end_of_context = next_cached_block
324                        .block_header
325                        .flags_and_timestamp
326                        .is_end_of_context();
327                    // set next block
328                    next_block = next_cached_block;
329
330                    // update section index from next block
331                    section_index = next_block.block_header.section_index;
332                }
333                // no more blocks in cache, break
334                else {
335                    break;
336                }
337            }
338
339            new_blocks
340        }
341        // Case 3: if the block is not the next expected block in the current section,
342        // put it into the block cache
343        else {
344            // check if block is already in cache
345            // TODO #173: this should not happen, we should make sure duplicate blocks are dropped before
346            if scope_context.cached_blocks.contains_key(&block_number) {
347                log::warn!(
348                    "Block {block_number} already in cache, dropping block"
349                );
350            }
351
352            // add block to cache
353            scope_context.cached_blocks.insert(block_number, block);
354
355            vec![]
356        }
357    }
358
359    pub fn get_new_context_id(&self) -> OutgoingContextId {
360        *self.current_context_id.borrow_mut() += 1;
361        *self.current_context_id.borrow()
362    }
363
364    /// Adds a new observer for incoming blocks with a specific scope id and block index
365    /// Returns a receiver that can be awaited to get the incoming sections
366    pub fn register_incoming_block_observer(
367        &self,
368        context_id: OutgoingContextId,
369        section_index: OutgoingSectionIndex,
370    ) -> UnboundedReceiver<IncomingSection> {
371        let (tx, rx) = mpsc::unbounded();
372        let tx = Rc::new(RefCell::new(tx));
373
374        // create observer callback for scope id + block index
375        let observer = move |blocks: IncomingSection| {
376            tx.clone().borrow_mut().start_send(blocks).unwrap();
377        };
378
379        // add new scope observer
380        self.section_observers
381            .borrow_mut()
382            .insert((context_id, section_index), Box::new(observer));
383
384        rx
385    }
386
387    /// Waits for incoming response block with a specific scope id and block index
388    pub async fn wait_for_incoming_response_block(
389        &self,
390        context_id: OutgoingContextId,
391        section_index: OutgoingSectionIndex,
392    ) -> Option<IncomingSection> {
393        let rx =
394            self.register_incoming_block_observer(context_id, section_index);
395        // Await the result from the callback
396        // FIXME #174
397        None
398        // rx.next().await
399    }
400}