Skip to main content

datex_core/network/
block_handler.rs

1use crate::{
2    channel::mpsc::{
3        UnboundedReceiver, UnboundedSender, create_unbounded_channel,
4    },
5    collections::HashMap,
6    global::dxb_block::{
7        BlockId, DXBBlock, IncomingBlockNumber, IncomingContextId,
8        IncomingEndpointContextId, IncomingEndpointContextSectionId,
9        IncomingSection, IncomingSectionIndex, OutgoingContextId,
10        OutgoingSectionIndex,
11    },
12    network::com_interfaces::com_interface::socket::ComInterfaceSocketUUID,
13    random::RandomState,
14};
15
16use crate::prelude::*;
17use core::{cell::RefCell, fmt::Debug, prelude::rust_2024::*};
18use log::info;
19use ringmap::RingMap;
20
21// TODO #170: store scope memory
22#[derive(Debug)]
23pub struct ScopeContext {
24    pub next_section_index: IncomingSectionIndex,
25    pub next_block_number: IncomingBlockNumber,
26    /// timestamp of the last keep alive block
27    /// when a specific time has passed since the timestamp, the scope context is disposed
28    /// TODO #171: implement dispose of scope context
29    pub keep_alive_timestamp: u64,
30    // a reference to the sender for the current section
31    pub current_queue_sender: Option<UnboundedSender<DXBBlock>>,
32    // a cache for all blocks indexed by their block number
33    pub cached_blocks: BTreeMap<IncomingBlockNumber, DXBBlock>,
34}
35
36/// A scope context storing scopes of incoming DXB blocks
37impl Default for ScopeContext {
38    fn default() -> Self {
39        ScopeContext {
40            next_section_index: 0,
41            next_block_number: 0,
42            keep_alive_timestamp: crate::time::now_ms(),
43            current_queue_sender: None,
44            cached_blocks: BTreeMap::new(),
45        }
46    }
47}
48
49// fn that gets a scope context as callback
50type SectionObserver = Box<dyn FnMut(IncomingSection)>;
51
52#[derive(Clone, Debug)]
53pub struct BlockHistoryData {
54    /// if block originated from local endpoint, the socket uuid is None,
55    /// otherwise it is the uuid of the incoming socket
56    pub original_socket_uuid: Option<ComInterfaceSocketUUID>,
57}
58
59pub struct BlockHandler {
60    pub current_context_id: RefCell<OutgoingContextId>,
61
62    /// a map of active request scopes for incoming blocks
63    pub block_cache: RefCell<HashMap<IncomingEndpointContextId, ScopeContext>>,
64
65    /// a queue of incoming request scopes
66    /// the scopes can be retrieved from the request_scopes map
67    pub incoming_sections_sender: RefCell<UnboundedSender<IncomingSection>>,
68
69    /// a map of observers for incoming response blocks (by context_id + block_index)
70    /// contains an observer callback and an optional queue of blocks if the response block is a multi-block stream
71    pub section_observers: RefCell<
72        HashMap<(IncomingContextId, IncomingSectionIndex), SectionObserver>,
73    >,
74
75    /// history of all incoming blocks
76    pub incoming_blocks_history:
77        RefCell<RingMap<BlockId, BlockHistoryData, RandomState>>,
78}
79
80impl Debug for BlockHandler {
81    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
82        f.debug_struct("BlockHandler")
83            .field("current_context_id", &self.current_context_id)
84            // .field("block_cache", &self.block_cache)
85            .field("incoming_blocks_history", &self.incoming_blocks_history)
86            .finish()
87    }
88}
89
90const RING_MAP_CAPACITY: usize = 500;
91
92impl BlockHandler {
93    pub fn init(
94        incoming_sections_sender: UnboundedSender<IncomingSection>,
95    ) -> BlockHandler {
96        BlockHandler {
97            current_context_id: RefCell::new(0),
98            block_cache: RefCell::new(HashMap::new()),
99            incoming_sections_sender: RefCell::new(incoming_sections_sender),
100            section_observers: RefCell::new(HashMap::new()),
101            incoming_blocks_history: RefCell::new(
102                RingMap::with_capacity_and_hasher(
103                    RING_MAP_CAPACITY,
104                    RandomState::default(),
105                ),
106            ),
107        }
108    }
109
110    /// Adds a block to the history of incoming blocks
111    /// if the block is not already in the history
112    /// returns true if the block was added and not already in the history
113    pub fn add_block_id_to_history(
114        &self,
115        block_id: BlockId,
116        original_socket_uuid: Option<ComInterfaceSocketUUID>,
117    ) {
118        let mut history = self.incoming_blocks_history.borrow_mut();
119        // only add if original block
120        if !history.contains_key(&block_id) {
121            let block_data = BlockHistoryData {
122                original_socket_uuid,
123            };
124            history.insert(block_id, block_data);
125        }
126    }
127
128    /// Checks if a block is already in the history
129    pub fn is_block_in_history(&self, block: &DXBBlock) -> bool {
130        let history = self.incoming_blocks_history.borrow();
131        let block_id = block.get_block_id();
132        history.contains_key(&block_id)
133    }
134
135    pub fn get_block_data_from_history(
136        &self,
137        block: &DXBBlock,
138    ) -> Option<BlockHistoryData> {
139        let history = self.incoming_blocks_history.borrow();
140        let block_id = block.get_block_id();
141        history.get(&block_id).cloned()
142    }
143
144    /// Handles an incoming block by either putting it into the request queue
145    /// or calling the observer for the block if it is a response block
146    pub fn handle_incoming_block(&self, block: DXBBlock) {
147        info!("Handling incoming block...");
148        let context_id = block.block_header.context_id;
149        let section_index = block.block_header.section_index;
150        let block_number = block.block_header.block_number;
151        let is_response = block
152            .block_header
153            .flags_and_timestamp
154            .block_type()
155            .is_response();
156
157        info!(
158            "Received block (context={context_id}, section={section_index}, block_nr={block_number})"
159        );
160
161        // handle observers if response block
162        if is_response {
163            self.handle_incoming_response_block(block);
164        } else {
165            self.handle_incoming_request_block(block);
166        }
167    }
168
169    // Handles incoming request blocks by putting them into the request queue
170    fn handle_incoming_request_block(&self, block: DXBBlock) {
171        let new_sections =
172            self.extract_complete_sections_with_new_incoming_block(block);
173        // put into request queue
174        let sender = &self.incoming_sections_sender;
175        for section in new_sections {
176            sender.borrow_mut().start_send(section).unwrap();
177        }
178    }
179
180    /// Handles incoming response blocks by calling the observer if an observer is registered
181    /// Returns true when the observer has consumed all blocks and should be removed
182    fn handle_incoming_response_block(&self, block: DXBBlock) {
183        let context_id = block.block_header.context_id;
184        let endpoint_context_id = IncomingEndpointContextId {
185            sender: block.routing_header.sender.clone(),
186            context_id,
187        };
188        let new_sections =
189            self.extract_complete_sections_with_new_incoming_block(block);
190        // try to call the observer for the incoming response block
191        for section in new_sections {
192            let section_index = section.get_section_index();
193
194            if let Some(observer) = self
195                .section_observers
196                .borrow_mut()
197                .get_mut(&(context_id, section_index))
198            {
199                // call the observer with the new section
200                observer(section);
201            } else {
202                // no observer for this scope id + block index
203                log::warn!(
204                    "No observer for incoming response block (scope={endpoint_context_id:?}, block={section_index}), dropping block"
205                );
206            };
207        }
208    }
209
210    /// Takes a new incoming block and returns a vector of all new available incoming sections
211    /// for the block's scope
212    fn extract_complete_sections_with_new_incoming_block(
213        &self,
214        block: DXBBlock,
215    ) -> Vec<IncomingSection> {
216        let section_index = block.block_header.section_index;
217        let block_number = block.block_header.block_number;
218        let is_end_of_section =
219            block.block_header.flags_and_timestamp.is_end_of_section();
220        let is_end_of_context =
221            block.block_header.flags_and_timestamp.is_end_of_context();
222        let endpoint_context_id = IncomingEndpointContextId {
223            sender: block.routing_header.sender.clone(),
224            context_id: block.block_header.context_id,
225        };
226        let section_context_id = IncomingEndpointContextSectionId::new(
227            endpoint_context_id.clone(),
228            section_index,
229        );
230
231        // get scope context if it already exists
232        let has_scope_context =
233            { self.block_cache.borrow().contains_key(&endpoint_context_id) };
234
235        // Case 1: shortcut if no scope context exists and the block is a single block
236        if !has_scope_context
237            && block_number == 0
238            && (is_end_of_section || is_end_of_context)
239        {
240            return vec![IncomingSection::SingleBlock((
241                Some(block),
242                section_context_id.clone(),
243            ))];
244        }
245
246        // make sure a scope context exists from here on
247        let mut request_scopes = self.block_cache.borrow_mut();
248        let scope_context = request_scopes
249            .entry(endpoint_context_id.clone())
250            .or_default();
251
252        // TODO #172: what happens if the endpoint has not received all blocks starting with block_number 0?
253        // we should still potentially process those blocks
254
255        // Case 2: if the block is the next expected block in the current section, put it into the
256        // section block queue and try to drain blocks from the cache
257        if block_number == scope_context.next_block_number {
258            // list of IncomingSections that is returned at the end
259            let mut new_blocks = vec![];
260
261            // initial values for loop variables from input block
262            let mut is_end_of_context = is_end_of_context;
263            let mut is_end_of_section = is_end_of_section;
264            let mut next_block = block;
265            let mut section_index = section_index;
266
267            // loop over the input block and potential blocks from the cache until the next block cannot be found
268            // or the end of the scope is reached
269            loop {
270                if let Some(sender) = &mut scope_context.current_queue_sender {
271                    // send the next block to the section queue receiver
272                    sender.start_send(next_block).expect(
273                        "Failed to send block to current section queue",
274                    );
275                } else {
276                    // create a new block queue for the current section
277                    let (mut sender, receiver) = create_unbounded_channel();
278
279                    // add the first block to the queue
280                    new_blocks.push(IncomingSection::BlockStream((
281                        Some(receiver),
282                        IncomingEndpointContextSectionId::new(
283                            endpoint_context_id.clone(),
284                            section_index,
285                        ),
286                    )));
287
288                    // send the next block to the section queue receiver
289                    sender.start_send(next_block).expect(
290                        "Failed to send first block to current section queue",
291                    );
292
293                    scope_context.current_queue_sender = Some(sender);
294                }
295
296                // cleanup / prepare for next block =======================
297                // increment next block number
298                scope_context.next_block_number += 1;
299
300                // if end of scope, remove the scope context
301                if is_end_of_context {
302                    request_scopes.remove(&endpoint_context_id);
303                    break;
304                }
305                // cleanup if section is finished
306                else if is_end_of_section {
307                    // increment section index
308                    scope_context.next_section_index += 1;
309                    // close and remove the current section queue sender
310                    if let Some(sender) =
311                        scope_context.current_queue_sender.take()
312                    {
313                        sender.close_channel();
314                    }
315                }
316                // ========================================================
317
318                // check if next block is in cache for next iteration
319                if let Some(next_cached_block) = scope_context
320                    .cached_blocks
321                    .remove(&scope_context.next_block_number)
322                {
323                    // check if block is end of section
324                    is_end_of_section = next_cached_block
325                        .block_header
326                        .flags_and_timestamp
327                        .is_end_of_section();
328                    // check if block is end of scope
329                    is_end_of_context = next_cached_block
330                        .block_header
331                        .flags_and_timestamp
332                        .is_end_of_context();
333                    // set next block
334                    next_block = next_cached_block;
335
336                    // update section index from next block
337                    section_index = next_block.block_header.section_index;
338                }
339                // no more blocks in cache, break
340                else {
341                    break;
342                }
343            }
344
345            new_blocks
346        }
347        // Case 3: if the block is not the next expected block in the current section,
348        // put it into the block cache
349        else {
350            // check if block is already in cache
351            // TODO #173: this should not happen, we should make sure duplicate blocks are dropped before
352            if scope_context.cached_blocks.contains_key(&block_number) {
353                log::warn!(
354                    "Block {block_number} already in cache, dropping block"
355                );
356            }
357
358            // add block to cache
359            scope_context.cached_blocks.insert(block_number, block);
360
361            vec![]
362        }
363    }
364
365    pub fn get_new_context_id(&self) -> OutgoingContextId {
366        *self.current_context_id.borrow_mut() += 1;
367        *self.current_context_id.borrow()
368    }
369
370    /// Adds a new observer for incoming blocks with a specific scope id and block index
371    /// Returns a receiver that can be awaited to get the incoming sections
372    pub fn register_incoming_block_observer(
373        &self,
374        context_id: OutgoingContextId,
375        section_index: OutgoingSectionIndex,
376    ) -> UnboundedReceiver<IncomingSection> {
377        let (tx, rx) = create_unbounded_channel::<IncomingSection>();
378        let tx = Rc::new(RefCell::new(tx));
379
380        // create observer callback for scope id + block index
381        let observer = move |blocks: IncomingSection| {
382            tx.clone().borrow_mut().start_send(blocks).unwrap();
383        };
384
385        // add new scope observer
386        self.section_observers
387            .borrow_mut()
388            .insert((context_id, section_index), Box::new(observer));
389
390        rx
391    }
392
393    /// Waits for incoming response block with a specific scope id and block index
394    pub async fn wait_for_incoming_response_block(
395        &self,
396        context_id: OutgoingContextId,
397        section_index: OutgoingSectionIndex,
398    ) -> Option<IncomingSection> {
399        let _rx =
400            self.register_incoming_block_observer(context_id, section_index);
401        // Await the result from the callback
402        // FIXME #174
403        None
404        // rx.next().await
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use crate::{
412        global::{
413            dxb_block::{DXBBlock, IncomingSection},
414            protocol_structures::{
415                block_header::{BlockHeader, BlockType, FlagsAndTimestamp},
416                routing_header::{Receivers, RoutingHeader},
417            },
418        },
419        values::core_values::endpoint::Endpoint,
420    };
421    use core::str::FromStr;
422    use ntest_timeout::timeout;
423
424    lazy_static::lazy_static! {
425        pub static ref TEST_ENDPOINT_ORIGIN: Endpoint = Endpoint::from_str("@origin").unwrap();
426        pub static ref TEST_ENDPOINT_A: Endpoint = Endpoint::from_str("@test-a").unwrap();
427        pub static ref TEST_ENDPOINT_B: Endpoint = Endpoint::from_str("@test-b").unwrap();
428    }
429
430    #[tokio::test]
431    async fn receive_single_block() {
432        let (incoming_sections_sender, mut incoming_sections_receiver) =
433            create_unbounded_channel::<IncomingSection>();
434
435        let block_handler = BlockHandler::init(incoming_sections_sender);
436        let context_id = block_handler.get_new_context_id();
437
438        // Create a single DXB block
439        let mut block = DXBBlock {
440            block_header: BlockHeader {
441                context_id,
442                flags_and_timestamp: FlagsAndTimestamp::new()
443                    .with_is_end_of_section(true)
444                    .with_is_end_of_context(true),
445                ..BlockHeader::default()
446            },
447            routing_header: RoutingHeader::default()
448                .with_sender(TEST_ENDPOINT_A.clone())
449                .with_receivers(Receivers::Endpoints(vec![
450                    TEST_ENDPOINT_ORIGIN.clone(),
451                ]))
452                .to_owned(),
453            ..DXBBlock::default()
454        };
455        let block_endpoint_context_id = block.get_endpoint_context_id();
456
457        // Send incoming block to block handler
458        block_handler.handle_incoming_block(block);
459
460        // block must be in incoming_sections_receiver
461        let incoming_block = incoming_sections_receiver.next().await.unwrap();
462        match &incoming_block {
463            IncomingSection::SingleBlock((Some(received_block), ..)) => {
464                assert_eq!(
465                    received_block.get_endpoint_context_id(),
466                    block_endpoint_context_id
467                );
468            }
469            _ => panic!("Expected a SingleBlock section"),
470        }
471    }
472
473    #[tokio::test]
474    async fn receive_multiple_blocks() {
475        let (incoming_sections_sender, mut incoming_sections_receiver) =
476            create_unbounded_channel::<IncomingSection>();
477
478        let block_handler = BlockHandler::init(incoming_sections_sender);
479
480        let context_id = block_handler.get_new_context_id();
481        let section_index = 42;
482
483        // Create multiple DXB blocks for the same context and section
484        let mut blocks = gen move {
485            yield DXBBlock {
486                block_header: BlockHeader {
487                    context_id,
488                    section_index,
489                    block_number: 0,
490                    flags_and_timestamp: FlagsAndTimestamp::new()
491                        .with_is_end_of_section(false)
492                        .with_is_end_of_context(false),
493                    ..BlockHeader::default()
494                },
495                routing_header: RoutingHeader::default()
496                    .with_sender(TEST_ENDPOINT_A.clone())
497                    .with_receivers(Receivers::Endpoints(vec![
498                        TEST_ENDPOINT_ORIGIN.clone(),
499                    ]))
500                    .to_owned(),
501                ..DXBBlock::default()
502            };
503
504            yield DXBBlock {
505                block_header: BlockHeader {
506                    context_id,
507                    section_index,
508                    block_number: 1,
509                    flags_and_timestamp: FlagsAndTimestamp::new()
510                        .with_is_end_of_section(true)
511                        .with_is_end_of_context(true),
512                    ..BlockHeader::default()
513                },
514                routing_header: RoutingHeader::default()
515                    .with_sender(TEST_ENDPOINT_A.clone())
516                    .with_receivers(Receivers::Endpoints(vec![
517                        TEST_ENDPOINT_ORIGIN.clone(),
518                    ]))
519                    .to_owned(),
520                ..DXBBlock::default()
521            };
522        };
523
524        // 1. Send first block
525        block_handler.handle_incoming_block(blocks.next().unwrap());
526
527        info!("Checking incoming sections...");
528
529        // block must be in incoming_sections_queue
530        let mut section = incoming_sections_receiver.next().await.unwrap();
531        match &section {
532            IncomingSection::BlockStream((
533                Some(blocks),
534                incoming_context_section_id,
535            )) => {
536                // section must match
537                assert_eq!(
538                    incoming_context_section_id.section_index,
539                    section_index
540                );
541
542                // blocks queue must contain the first block
543                assert!(section.next().await.is_some());
544            }
545            _ => core::panic!("Expected a BlockStream section"),
546        }
547
548        // 2. Send second block
549        block_handler.handle_incoming_block(blocks.next().unwrap());
550
551        // no new incoming sections, old section receives new blocks
552        // block must be a block stream
553        match &section {
554            IncomingSection::BlockStream((
555                Some(blocks),
556                incoming_context_section_id,
557            )) => {
558                // section must match
559                assert_eq!(
560                    incoming_context_section_id.section_index,
561                    section_index
562                );
563                // blocks queue length must be 2 (was not yet drained)
564                assert_eq!(section.drain().await.len(), 1);
565            }
566            _ => core::panic!("Expected a BlockStream section"),
567        }
568    }
569
570    #[tokio::test]
571    async fn receive_multiple_blocks_wrong_order() {
572        let (incoming_sections_sender, mut incoming_sections_receiver) =
573            create_unbounded_channel::<IncomingSection>();
574
575        let block_handler = BlockHandler::init(incoming_sections_sender);
576
577        let context_id = block_handler.get_new_context_id();
578        let section_index = 42;
579
580        // Create multiple DXB blocks for the same context and section in wrong order
581        let mut blocks = gen move {
582            yield DXBBlock {
583                block_header: BlockHeader {
584                    context_id,
585                    section_index,
586                    block_number: 1,
587                    flags_and_timestamp: FlagsAndTimestamp::new()
588                        .with_is_end_of_section(true)
589                        .with_is_end_of_context(true),
590                    ..BlockHeader::default()
591                },
592                routing_header: RoutingHeader::default()
593                    .with_sender(TEST_ENDPOINT_A.clone())
594                    .with_receivers(Receivers::Endpoints(vec![
595                        TEST_ENDPOINT_ORIGIN.clone(),
596                    ]))
597                    .to_owned(),
598                ..DXBBlock::default()
599            };
600
601            yield DXBBlock {
602                block_header: BlockHeader {
603                    context_id,
604                    section_index,
605                    block_number: 0,
606                    flags_and_timestamp: FlagsAndTimestamp::new()
607                        .with_is_end_of_section(false)
608                        .with_is_end_of_context(false),
609                    ..BlockHeader::default()
610                },
611                routing_header: RoutingHeader::default()
612                    .with_sender(TEST_ENDPOINT_A.clone())
613                    .with_receivers(Receivers::Endpoints(vec![
614                        TEST_ENDPOINT_ORIGIN.clone(),
615                    ]))
616                    .to_owned(),
617                ..DXBBlock::default()
618            };
619        };
620
621        // 1. Send first block
622        block_handler.handle_incoming_block(blocks.next().unwrap());
623
624        // 2. Send second block
625        block_handler.handle_incoming_block(blocks.next().unwrap());
626
627        // block must be in incoming_sections_queue
628        let mut section = incoming_sections_receiver.next().await.unwrap();
629        // block must be a block stream
630        match &section {
631            IncomingSection::BlockStream((
632                Some(blocks),
633                incoming_context_section_id,
634            )) => {
635                // section must match
636                assert_eq!(
637                    incoming_context_section_id.section_index.clone(),
638                    section_index
639                );
640                // blocks queue length must be 2
641                let blocks = section.drain().await;
642                assert_eq!(blocks.len(), 2);
643
644                // check order:
645                // first block must have block number 0
646                let block = blocks.first().unwrap();
647                assert_eq!(block.block_header.block_number, 0);
648                // second block must have block number 1
649                let block = blocks.get(1).unwrap();
650                assert_eq!(block.block_header.block_number, 1);
651            }
652            _ => core::panic!("Expected a BlockStream section"),
653        }
654    }
655
656    #[tokio::test]
657    async fn receive_multiple_sections() {
658        let (incoming_sections_sender, mut incoming_sections_receiver) =
659            create_unbounded_channel::<IncomingSection>();
660
661        let block_handler = BlockHandler::init(incoming_sections_sender);
662
663        let context_id = block_handler.get_new_context_id();
664        let section_index_1 = 42;
665        let section_index_2 = 43;
666
667        // Create multiple DXB blocks for two sections
668        let mut blocks = gen move {
669            // first section
670            yield DXBBlock {
671                block_header: BlockHeader {
672                    context_id,
673                    section_index: section_index_1,
674                    block_number: 0,
675                    flags_and_timestamp: FlagsAndTimestamp::new()
676                        .with_is_end_of_section(false)
677                        .with_is_end_of_context(false),
678                    ..BlockHeader::default()
679                },
680                routing_header: RoutingHeader::default()
681                    .with_sender(TEST_ENDPOINT_A.clone())
682                    .with_receivers(Receivers::Endpoints(vec![
683                        TEST_ENDPOINT_ORIGIN.clone(),
684                    ]))
685                    .to_owned(),
686                ..DXBBlock::default()
687            };
688
689            yield DXBBlock {
690                block_header: BlockHeader {
691                    context_id,
692                    section_index: section_index_1,
693                    block_number: 1,
694                    flags_and_timestamp: FlagsAndTimestamp::new()
695                        .with_is_end_of_section(true)
696                        .with_is_end_of_context(false),
697                    ..BlockHeader::default()
698                },
699                routing_header: RoutingHeader::default()
700                    .with_sender(TEST_ENDPOINT_A.clone())
701                    .with_receivers(Receivers::Endpoints(vec![
702                        TEST_ENDPOINT_ORIGIN.clone(),
703                    ]))
704                    .to_owned(),
705                ..DXBBlock::default()
706            };
707
708            // second section, end of context
709            yield DXBBlock {
710                block_header: BlockHeader {
711                    context_id,
712                    section_index: section_index_2,
713                    block_number: 2,
714                    flags_and_timestamp: FlagsAndTimestamp::new()
715                        .with_is_end_of_section(false)
716                        .with_is_end_of_context(false),
717                    ..BlockHeader::default()
718                },
719                routing_header: RoutingHeader::default()
720                    .with_sender(TEST_ENDPOINT_A.clone())
721                    .with_receivers(Receivers::Endpoints(vec![
722                        TEST_ENDPOINT_ORIGIN.clone(),
723                    ]))
724                    .to_owned(),
725                ..DXBBlock::default()
726            };
727
728            yield DXBBlock {
729                block_header: BlockHeader {
730                    context_id,
731                    section_index: section_index_2,
732                    block_number: 3,
733                    flags_and_timestamp: FlagsAndTimestamp::new()
734                        .with_is_end_of_section(true)
735                        .with_is_end_of_context(true),
736                    ..BlockHeader::default()
737                },
738                routing_header: RoutingHeader::default()
739                    .with_sender(TEST_ENDPOINT_A.clone())
740                    .with_receivers(Receivers::Endpoints(vec![
741                        TEST_ENDPOINT_ORIGIN.clone(),
742                    ]))
743                    .to_owned(),
744                ..DXBBlock::default()
745            };
746        };
747
748        // 1. Send first block
749        block_handler.handle_incoming_block(blocks.next().unwrap());
750
751        // block must be in incoming_sections_queue
752        let mut section = incoming_sections_receiver.next().await.unwrap();
753        // block must be a block stream
754        match &section {
755            IncomingSection::BlockStream((
756                Some(blocks),
757                incoming_context_section_id,
758            )) => {
759                // section must match
760                assert_eq!(
761                    incoming_context_section_id.section_index,
762                    section_index_1
763                );
764                // block queue must contain the first block
765                assert!(section.next().await.is_some());
766            }
767            _ => core::panic!("Expected a BlockStream section"),
768        }
769
770        // 2. Send second block
771        block_handler.handle_incoming_block(blocks.next().unwrap());
772
773        // block must be a block stream
774        match &section {
775            IncomingSection::BlockStream((
776                Some(blocks),
777                incoming_context_section_id,
778            )) => {
779                // section must match
780                assert_eq!(
781                    incoming_context_section_id.section_index,
782                    section_index_1
783                );
784
785                // blocks queue length must be 1
786                assert_eq!(section.drain().await.len(), 1);
787            }
788            _ => core::panic!("Expected a BlockStream section"),
789        }
790
791        // 3. Send third block
792        block_handler.handle_incoming_block(blocks.next().unwrap());
793
794        // block must be in incoming_sections_queue
795        let mut section = incoming_sections_receiver.next().await.unwrap();
796        // block must be a block stream
797        match &section {
798            IncomingSection::BlockStream((
799                Some(blocks),
800                incoming_context_section_id,
801            )) => {
802                // section must match
803                assert_eq!(
804                    incoming_context_section_id.section_index,
805                    section_index_2
806                );
807                // block queue must contain the first block
808                assert!(section.next().await.is_some());
809            }
810            _ => core::panic!("Expected a BlockStream section"),
811        }
812
813        // 4. Send fourth block
814        block_handler.handle_incoming_block(blocks.next().unwrap());
815
816        // block must not be in incoming_sections_queue
817        // block must be a block stream
818        match &section {
819            IncomingSection::BlockStream((
820                Some(blocks),
821                incoming_context_section_id,
822            )) => {
823                // section must match
824                assert_eq!(
825                    incoming_context_section_id.section_index,
826                    section_index_2
827                );
828                // blocks queue length must be 1
829                assert_eq!(section.drain().await.len(), 1);
830            }
831            _ => core::panic!("Expected a BlockStream section"),
832        }
833    }
834
835    #[tokio::test]
836    #[timeout(2000)]
837    #[cfg(feature = "std")]
838    async fn await_response_block() {
839        let (incoming_sections_sender, mut incoming_sections_receiver) =
840            create_unbounded_channel::<IncomingSection>();
841
842        let block_handler = BlockHandler::init(incoming_sections_sender);
843
844        let context_id = block_handler.get_new_context_id();
845        let section_index = 42;
846
847        // Create a single DXB block
848        let block = DXBBlock {
849            block_header: BlockHeader {
850                context_id,
851                section_index,
852                flags_and_timestamp: FlagsAndTimestamp::new()
853                    .with_block_type(BlockType::Response)
854                    .with_is_end_of_section(true)
855                    .with_is_end_of_context(true),
856                ..BlockHeader::default()
857            },
858            routing_header: RoutingHeader::default()
859                .with_sender(TEST_ENDPOINT_A.clone())
860                .with_receivers(Receivers::Endpoints(vec![
861                    TEST_ENDPOINT_ORIGIN.clone(),
862                ]))
863                .to_owned(),
864            ..DXBBlock::default()
865        };
866
867        // set observer for the block
868        let mut rx = block_handler
869            .register_incoming_block_observer(context_id, section_index);
870
871        // Put into incoming queue of mock interface
872        block_handler.handle_incoming_block(block);
873
874        // await receiver
875        let response = rx.next().await.unwrap();
876
877        // IncomingSection must be a SingleBlock
878        match response {
879            IncomingSection::SingleBlock((Some(block), _)) => {
880                info!("section: {block:?}");
881                assert_eq!(block.block_header.context_id, context_id);
882                assert_eq!(block.block_header.section_index, section_index);
883            }
884            _ => core::panic!("Expected a SingleBlock section"),
885        }
886    }
887}