datex_core/network/
block_handler.rs

1use crate::global::dxb_block::{BlockId, DXBBlock, IncomingBlockNumber, IncomingContextId, IncomingEndpointContextId, IncomingEndpointContextSectionId, IncomingSection, IncomingSectionIndex, OutgoingContextId, OutgoingSectionIndex};
2use crate::network::com_interfaces::com_interface_socket::ComInterfaceSocketUUID;
3use crate::runtime::global_context::get_global_context;
4use futures::channel::mpsc;
5use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
6use log::info;
7use ringmap::RingMap;
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap, VecDeque};
10use std::fmt::Debug;
11use std::rc::Rc;
12// use tokio_stream::StreamExt;
13
14// TODO #170: store scope memory
15#[derive(Debug)]
16pub struct ScopeContext {
17    pub next_section_index: IncomingSectionIndex,
18    pub next_block_number: IncomingBlockNumber,
19    /// timestamp of the last keep alive block
20    /// when a specific time has passed since the timestamp, the scope context is disposed
21    /// TODO #171: implement dispose of scope context
22    pub keep_alive_timestamp: u64,
23    // a reference to the sender for the current section
24    pub current_queue_sender: Option<UnboundedSender<DXBBlock>>,
25    // a cache for all blocks indexed by their block number
26    pub cached_blocks: BTreeMap<IncomingBlockNumber, DXBBlock>,
27}
28
29/// A scope context storing scopes of incoming DXB blocks
30impl Default for ScopeContext {
31    fn default() -> Self {
32        ScopeContext {
33            next_section_index: 0,
34            next_block_number: 0,
35            keep_alive_timestamp: get_global_context()
36                .time
37                .lock()
38                .unwrap()
39                .now(),
40            current_queue_sender: None,
41            cached_blocks: BTreeMap::new(),
42        }
43    }
44}
45
46// fn that gets a scope context as callback
47type SectionObserver = Box<dyn FnMut(IncomingSection)>;
48
49#[derive(Clone, Debug)]
50pub struct BlockHistoryData {
51    /// if block originated from local endpoint, the socket uuid is None,
52    /// otherwise it is the uuid of the incoming socket
53    pub original_socket_uuid: Option<ComInterfaceSocketUUID>,
54}
55
56pub struct BlockHandler {
57    pub current_context_id: RefCell<OutgoingContextId>,
58
59    /// a map of active request scopes for incoming blocks
60    pub block_cache: RefCell<HashMap<IncomingEndpointContextId, ScopeContext>>,
61
62    /// a queue of incoming request scopes
63    /// the scopes can be retrieved from the request_scopes map
64    pub incoming_sections_queue: RefCell<VecDeque<IncomingSection>>,
65
66    /// a map of observers for incoming response blocks (by context_id + block_index)
67    /// contains an observer callback and an optional queue of blocks if the response block is a multi-block stream
68    pub section_observers: RefCell<
69        HashMap<(IncomingContextId, IncomingSectionIndex), SectionObserver>,
70    >,
71
72    /// history of all incoming blocks
73    pub incoming_blocks_history: RefCell<RingMap<BlockId, BlockHistoryData>>,
74}
75
76impl Debug for BlockHandler {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("BlockHandler")
79            .field("current_context_id", &self.current_context_id)
80            .field("block_cache", &self.block_cache)
81            .field("incoming_sections_queue", &self.incoming_sections_queue)
82            .field("incoming_blocks_history", &self.incoming_blocks_history)
83            .finish()
84    }
85}
86
87impl Default for BlockHandler {
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93impl BlockHandler {
94    pub fn new() -> BlockHandler {
95        BlockHandler {
96            current_context_id: RefCell::new(0),
97            block_cache: RefCell::new(HashMap::new()),
98            incoming_sections_queue: RefCell::new(VecDeque::new()),
99            section_observers: RefCell::new(HashMap::new()),
100            incoming_blocks_history: RefCell::new(RingMap::with_capacity(500)),
101        }
102    }
103
104    /// Adds a block to the history of incoming blocks
105    /// if the block is not already in the history
106    /// returns true if the block was added and not already in the history
107    pub fn add_block_to_history(
108        &self,
109        block: &DXBBlock,
110        original_socket_uuid: Option<ComInterfaceSocketUUID>,
111    ) {
112        let mut history = self.incoming_blocks_history.borrow_mut();
113        let block_id = block.get_block_id();
114        // only add if original block
115        if !history.contains_key(&block_id) {
116            let block_data = BlockHistoryData {
117                original_socket_uuid,
118            };
119            history.insert(block_id, block_data);
120        }
121    }
122
123    /// Checks if a block is already in the history
124    pub fn is_block_in_history(&self, block: &DXBBlock) -> bool {
125        let history = self.incoming_blocks_history.borrow();
126        let block_id = block.get_block_id();
127        history.contains_key(&block_id)
128    }
129
130    pub fn get_block_data_from_history(
131        &self,
132        block: &DXBBlock,
133    ) -> Option<BlockHistoryData> {
134        let history = self.incoming_blocks_history.borrow();
135        let block_id = block.get_block_id();
136        history.get(&block_id).cloned()
137    }
138
139    pub fn handle_incoming_block(&self, block: DXBBlock) {
140        info!("Handling incoming block...");
141        let context_id = block.block_header.context_id;
142        let section_index = block.block_header.section_index;
143        let block_number = block.block_header.block_number;
144        let is_response = block
145            .block_header
146            .flags_and_timestamp
147            .block_type()
148            .is_response();
149
150        info!("Received block (context={context_id}, section={section_index}, block_nr={block_number})");
151
152        // handle observers if response block
153        if is_response {
154            self.handle_incoming_response_block(block);
155        } else {
156            self.handle_incoming_request_block(block);
157        }
158    }
159
160    // Handles incoming request blocks by putting them into the request queue
161    fn handle_incoming_request_block(&self, block: DXBBlock) {
162        let new_sections =
163            self.extract_complete_sections_with_new_incoming_block(block);
164        // put into request queue
165        let mut request_queue = self.incoming_sections_queue.borrow_mut();
166        for section in new_sections {
167            request_queue.push_back(section);
168        }
169    }
170
171    /// Handles incoming response blocks by calling the observer if an observer is registered
172    /// Returns true when the observer has consumed all blocks and should be removed
173    fn handle_incoming_response_block(&self, block: DXBBlock) {
174        let context_id = block.block_header.context_id;
175        let endpoint_context_id = IncomingEndpointContextId {
176            sender: block.routing_header.sender.clone(),
177            context_id,
178        };
179        let new_sections =
180            self.extract_complete_sections_with_new_incoming_block(block);
181        // try to call the observer for the incoming response block
182        for section in new_sections {
183            let section_index = section.get_section_index();
184
185            if let Some(observer) = self
186                .section_observers
187                .borrow_mut()
188                .get_mut(&(context_id, section_index))
189            {
190                // call the observer with the new section
191                observer(section);
192            } else {
193                // no observer for this scope id + block index
194                log::warn!("No observer for incoming response block (scope={endpoint_context_id:?}, block={section_index}), dropping block");
195            };
196        }
197    }
198
199    /// Takes a new incoming block and returns a vector of all new available incoming sections
200    /// for the block's scope
201    fn extract_complete_sections_with_new_incoming_block(
202        &self,
203        block: DXBBlock,
204    ) -> Vec<IncomingSection> {
205        let section_index = block.block_header.section_index;
206        let block_number = block.block_header.block_number;
207        let is_end_of_section =
208            block.block_header.flags_and_timestamp.is_end_of_section();
209        let is_end_of_context =
210            block.block_header.flags_and_timestamp.is_end_of_context();
211        let endpoint_context_id = IncomingEndpointContextId {
212            sender: block.routing_header.sender.clone(),
213            context_id: block.block_header.context_id,
214        };
215        let section_context_id = IncomingEndpointContextSectionId::new(endpoint_context_id.clone(), section_index);
216
217        // get scope context if it already exists
218        let has_scope_context =
219            self.block_cache.borrow().contains_key(&endpoint_context_id);
220
221
222        // Case 1: shortcut if no scope context exists and the block is a single block
223        if !has_scope_context
224            && block_number == 0
225            && (is_end_of_section || is_end_of_context)
226        {
227            return vec![IncomingSection::SingleBlock((Some(block), section_context_id.clone()))];
228        }
229
230        // make sure a scope context exists from here on
231        let mut request_scopes = self.block_cache.borrow_mut();
232        let scope_context = request_scopes
233            .entry(endpoint_context_id.clone())
234            .or_default();
235
236        // TODO #172: what happens if the endpoint has not received all blocks starting with block_number 0?
237        // we should still potentially process those blocks
238
239        // Case 2: if the block is the next expected block in the current section, put it into the
240        // section block queue and try to drain blocks from the cache
241        if block_number == scope_context.next_block_number {
242            // list of IncomingSections that is returned at the end
243            let mut new_blocks = vec![];
244
245            // initial values for loop variables from input block
246            let mut is_end_of_context = is_end_of_context;
247            let mut is_end_of_section = is_end_of_section;
248            let mut next_block = block;
249            let mut section_index = section_index;
250
251            // loop over the input block and potential blocks from the cache until the next block cannot be found
252            // or the end of the scope is reached
253            loop {
254                if let Some(sender ) = &mut scope_context.current_queue_sender {
255                    // send the next block to the section queue receiver
256                    sender.start_send(next_block)
257                        .expect("Failed to send block to current section queue");
258                }
259                else {
260                    // create a new block queue for the current section
261                    let (mut sender, receiver) = mpsc::unbounded();
262
263                    // add the first block to the queue
264                    new_blocks.push(IncomingSection::BlockStream((
265                        Some(receiver),
266                        IncomingEndpointContextSectionId::new(endpoint_context_id.clone(), section_index),
267                    )));
268
269                    // send the next block to the section queue receiver
270                    sender.start_send(next_block)
271                        .expect("Failed to send first block to current section queue");
272
273                    scope_context.current_queue_sender = Some(sender);
274                }
275
276                // cleanup / prepare for next block =======================
277                // increment next block number
278                scope_context.next_block_number += 1;
279
280                // if end of scope, remove the scope context
281                if is_end_of_context {
282                    request_scopes.remove(&endpoint_context_id);
283                    break;
284                }
285                // cleanup if section is finished
286                else if is_end_of_section {
287                    // increment section index
288                    scope_context.next_section_index += 1;
289                    // close and remove the current section queue sender
290                    if let Some(sender) = scope_context.current_queue_sender.take() {
291                        sender.close_channel();
292                    }
293                }
294                // ========================================================
295
296                // check if next block is in cache for next iteration
297                if let Some(next_cached_block) = scope_context
298                    .cached_blocks
299                    .remove(&scope_context.next_block_number)
300                {
301                    // check if block is end of section
302                    is_end_of_section = next_cached_block
303                        .block_header
304                        .flags_and_timestamp
305                        .is_end_of_section();
306                    // check if block is end of scope
307                    is_end_of_context = next_cached_block
308                        .block_header
309                        .flags_and_timestamp
310                        .is_end_of_context();
311                    // set next block
312                    next_block = next_cached_block;
313
314                    // update section index from next block
315                    section_index = next_block.block_header.section_index;
316                }
317                // no more blocks in cache, break
318                else {
319                    break;
320                }
321            }
322
323            new_blocks
324        }
325        // Case 3: if the block is not the next expected block in the current section,
326        // put it into the block cache
327        else {
328            // check if block is already in cache
329            // TODO #173: this should not happen, we should make sure duplicate blocks are dropped before
330            if scope_context.cached_blocks.contains_key(&block_number) {
331                log::warn!(
332                    "Block {block_number} already in cache, dropping block"
333                );
334            }
335
336            // add block to cache
337            scope_context.cached_blocks.insert(block_number, block);
338
339            vec![]
340        }
341    }
342
343    pub fn get_new_context_id(&self) -> OutgoingContextId {
344        *self.current_context_id.borrow_mut() += 1;
345        *self.current_context_id.borrow()
346    }
347
348    /// Adds a new observer for incoming blocks with a specific scope id and block index
349    /// Returns a receiver that can be awaited to get the incoming sections
350    pub fn register_incoming_block_observer(
351        &self,
352        context_id: OutgoingContextId,
353        section_index: OutgoingSectionIndex,
354    ) -> UnboundedReceiver<IncomingSection> {
355        let (tx, rx) = mpsc::unbounded();
356        let tx = Rc::new(RefCell::new(tx));
357
358        // create observer callback for scope id + block index
359        let observer = move |blocks: IncomingSection| {
360            tx.clone().borrow_mut().start_send(blocks).unwrap();
361        };
362
363        // add new scope observer
364        self.section_observers
365            .borrow_mut()
366            .insert((context_id, section_index), Box::new(observer));
367
368        rx
369    }
370
371    /// Waits for incoming response block with a specific scope id and block index
372    pub async fn wait_for_incoming_response_block(
373        &self,
374        context_id: OutgoingContextId,
375        section_index: OutgoingSectionIndex,
376    ) -> Option<IncomingSection> {
377        let rx =
378            self.register_incoming_block_observer(context_id, section_index);
379        // Await the result from the callback
380        // FIXME #174
381        None
382        // rx.next().await
383    }
384}