1use crate::{
2 channel::mpsc::{
3 UnboundedReceiver, UnboundedSender, create_unbounded_channel,
4 },
5 collections::HashMap,
6 global::dxb_block::{
7 BlockId, DXBBlock, IncomingBlockNumber, IncomingContextId,
8 IncomingEndpointContextId, IncomingEndpointContextSectionId,
9 IncomingSection, IncomingSectionIndex, OutgoingContextId,
10 OutgoingSectionIndex,
11 },
12 network::com_interfaces::com_interface::socket::ComInterfaceSocketUUID,
13 random::RandomState,
14};
15
16use crate::prelude::*;
17use core::{cell::RefCell, fmt::Debug, prelude::rust_2024::*};
18use log::info;
19use ringmap::RingMap;
20
21#[derive(Debug)]
23pub struct ScopeContext {
24 pub next_section_index: IncomingSectionIndex,
25 pub next_block_number: IncomingBlockNumber,
26 pub keep_alive_timestamp: u64,
30 pub current_queue_sender: Option<UnboundedSender<DXBBlock>>,
32 pub cached_blocks: BTreeMap<IncomingBlockNumber, DXBBlock>,
34}
35
36impl Default for ScopeContext {
38 fn default() -> Self {
39 ScopeContext {
40 next_section_index: 0,
41 next_block_number: 0,
42 keep_alive_timestamp: crate::time::now_ms(),
43 current_queue_sender: None,
44 cached_blocks: BTreeMap::new(),
45 }
46 }
47}
48
49type SectionObserver = Box<dyn FnMut(IncomingSection)>;
51
52#[derive(Clone, Debug)]
53pub struct BlockHistoryData {
54 pub original_socket_uuid: Option<ComInterfaceSocketUUID>,
57}
58
59pub struct BlockHandler {
60 pub current_context_id: RefCell<OutgoingContextId>,
61
62 pub block_cache: RefCell<HashMap<IncomingEndpointContextId, ScopeContext>>,
64
65 pub incoming_sections_sender: RefCell<UnboundedSender<IncomingSection>>,
68
69 pub section_observers: RefCell<
72 HashMap<(IncomingContextId, IncomingSectionIndex), SectionObserver>,
73 >,
74
75 pub incoming_blocks_history:
77 RefCell<RingMap<BlockId, BlockHistoryData, RandomState>>,
78}
79
80impl Debug for BlockHandler {
81 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
82 f.debug_struct("BlockHandler")
83 .field("current_context_id", &self.current_context_id)
84 .field("incoming_blocks_history", &self.incoming_blocks_history)
86 .finish()
87 }
88}
89
90const RING_MAP_CAPACITY: usize = 500;
91
92impl BlockHandler {
93 pub fn init(
94 incoming_sections_sender: UnboundedSender<IncomingSection>,
95 ) -> BlockHandler {
96 BlockHandler {
97 current_context_id: RefCell::new(0),
98 block_cache: RefCell::new(HashMap::new()),
99 incoming_sections_sender: RefCell::new(incoming_sections_sender),
100 section_observers: RefCell::new(HashMap::new()),
101 incoming_blocks_history: RefCell::new(
102 RingMap::with_capacity_and_hasher(
103 RING_MAP_CAPACITY,
104 RandomState::default(),
105 ),
106 ),
107 }
108 }
109
110 pub fn add_block_id_to_history(
114 &self,
115 block_id: BlockId,
116 original_socket_uuid: Option<ComInterfaceSocketUUID>,
117 ) {
118 let mut history = self.incoming_blocks_history.borrow_mut();
119 if !history.contains_key(&block_id) {
121 let block_data = BlockHistoryData {
122 original_socket_uuid,
123 };
124 history.insert(block_id, block_data);
125 }
126 }
127
128 pub fn is_block_in_history(&self, block: &DXBBlock) -> bool {
130 let history = self.incoming_blocks_history.borrow();
131 let block_id = block.get_block_id();
132 history.contains_key(&block_id)
133 }
134
135 pub fn get_block_data_from_history(
136 &self,
137 block: &DXBBlock,
138 ) -> Option<BlockHistoryData> {
139 let history = self.incoming_blocks_history.borrow();
140 let block_id = block.get_block_id();
141 history.get(&block_id).cloned()
142 }
143
144 pub fn handle_incoming_block(&self, block: DXBBlock) {
147 info!("Handling incoming block...");
148 let context_id = block.block_header.context_id;
149 let section_index = block.block_header.section_index;
150 let block_number = block.block_header.block_number;
151 let is_response = block
152 .block_header
153 .flags_and_timestamp
154 .block_type()
155 .is_response();
156
157 info!(
158 "Received block (context={context_id}, section={section_index}, block_nr={block_number})"
159 );
160
161 if is_response {
163 self.handle_incoming_response_block(block);
164 } else {
165 self.handle_incoming_request_block(block);
166 }
167 }
168
169 fn handle_incoming_request_block(&self, block: DXBBlock) {
171 let new_sections =
172 self.extract_complete_sections_with_new_incoming_block(block);
173 let sender = &self.incoming_sections_sender;
175 for section in new_sections {
176 sender.borrow_mut().start_send(section).unwrap();
177 }
178 }
179
180 fn handle_incoming_response_block(&self, block: DXBBlock) {
183 let context_id = block.block_header.context_id;
184 let endpoint_context_id = IncomingEndpointContextId {
185 sender: block.routing_header.sender.clone(),
186 context_id,
187 };
188 let new_sections =
189 self.extract_complete_sections_with_new_incoming_block(block);
190 for section in new_sections {
192 let section_index = section.get_section_index();
193
194 if let Some(observer) = self
195 .section_observers
196 .borrow_mut()
197 .get_mut(&(context_id, section_index))
198 {
199 observer(section);
201 } else {
202 log::warn!(
204 "No observer for incoming response block (scope={endpoint_context_id:?}, block={section_index}), dropping block"
205 );
206 };
207 }
208 }
209
210 fn extract_complete_sections_with_new_incoming_block(
213 &self,
214 block: DXBBlock,
215 ) -> Vec<IncomingSection> {
216 let section_index = block.block_header.section_index;
217 let block_number = block.block_header.block_number;
218 let is_end_of_section =
219 block.block_header.flags_and_timestamp.is_end_of_section();
220 let is_end_of_context =
221 block.block_header.flags_and_timestamp.is_end_of_context();
222 let endpoint_context_id = IncomingEndpointContextId {
223 sender: block.routing_header.sender.clone(),
224 context_id: block.block_header.context_id,
225 };
226 let section_context_id = IncomingEndpointContextSectionId::new(
227 endpoint_context_id.clone(),
228 section_index,
229 );
230
231 let has_scope_context =
233 { self.block_cache.borrow().contains_key(&endpoint_context_id) };
234
235 if !has_scope_context
237 && block_number == 0
238 && (is_end_of_section || is_end_of_context)
239 {
240 return vec![IncomingSection::SingleBlock((
241 Some(block),
242 section_context_id.clone(),
243 ))];
244 }
245
246 let mut request_scopes = self.block_cache.borrow_mut();
248 let scope_context = request_scopes
249 .entry(endpoint_context_id.clone())
250 .or_default();
251
252 if block_number == scope_context.next_block_number {
258 let mut new_blocks = vec![];
260
261 let mut is_end_of_context = is_end_of_context;
263 let mut is_end_of_section = is_end_of_section;
264 let mut next_block = block;
265 let mut section_index = section_index;
266
267 loop {
270 if let Some(sender) = &mut scope_context.current_queue_sender {
271 sender.start_send(next_block).expect(
273 "Failed to send block to current section queue",
274 );
275 } else {
276 let (mut sender, receiver) = create_unbounded_channel();
278
279 new_blocks.push(IncomingSection::BlockStream((
281 Some(receiver),
282 IncomingEndpointContextSectionId::new(
283 endpoint_context_id.clone(),
284 section_index,
285 ),
286 )));
287
288 sender.start_send(next_block).expect(
290 "Failed to send first block to current section queue",
291 );
292
293 scope_context.current_queue_sender = Some(sender);
294 }
295
296 scope_context.next_block_number += 1;
299
300 if is_end_of_context {
302 request_scopes.remove(&endpoint_context_id);
303 break;
304 }
305 else if is_end_of_section {
307 scope_context.next_section_index += 1;
309 if let Some(sender) =
311 scope_context.current_queue_sender.take()
312 {
313 sender.close_channel();
314 }
315 }
316 if let Some(next_cached_block) = scope_context
320 .cached_blocks
321 .remove(&scope_context.next_block_number)
322 {
323 is_end_of_section = next_cached_block
325 .block_header
326 .flags_and_timestamp
327 .is_end_of_section();
328 is_end_of_context = next_cached_block
330 .block_header
331 .flags_and_timestamp
332 .is_end_of_context();
333 next_block = next_cached_block;
335
336 section_index = next_block.block_header.section_index;
338 }
339 else {
341 break;
342 }
343 }
344
345 new_blocks
346 }
347 else {
350 if scope_context.cached_blocks.contains_key(&block_number) {
353 log::warn!(
354 "Block {block_number} already in cache, dropping block"
355 );
356 }
357
358 scope_context.cached_blocks.insert(block_number, block);
360
361 vec![]
362 }
363 }
364
365 pub fn get_new_context_id(&self) -> OutgoingContextId {
366 *self.current_context_id.borrow_mut() += 1;
367 *self.current_context_id.borrow()
368 }
369
370 pub fn register_incoming_block_observer(
373 &self,
374 context_id: OutgoingContextId,
375 section_index: OutgoingSectionIndex,
376 ) -> UnboundedReceiver<IncomingSection> {
377 let (tx, rx) = create_unbounded_channel::<IncomingSection>();
378 let tx = Rc::new(RefCell::new(tx));
379
380 let observer = move |blocks: IncomingSection| {
382 tx.clone().borrow_mut().start_send(blocks).unwrap();
383 };
384
385 self.section_observers
387 .borrow_mut()
388 .insert((context_id, section_index), Box::new(observer));
389
390 rx
391 }
392
393 pub async fn wait_for_incoming_response_block(
395 &self,
396 context_id: OutgoingContextId,
397 section_index: OutgoingSectionIndex,
398 ) -> Option<IncomingSection> {
399 let _rx =
400 self.register_incoming_block_observer(context_id, section_index);
401 None
404 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411 use crate::{
412 global::{
413 dxb_block::{DXBBlock, IncomingSection},
414 protocol_structures::{
415 block_header::{BlockHeader, BlockType, FlagsAndTimestamp},
416 routing_header::{Receivers, RoutingHeader},
417 },
418 },
419 values::core_values::endpoint::Endpoint,
420 };
421 use core::str::FromStr;
422 use ntest_timeout::timeout;
423
424 lazy_static::lazy_static! {
425 pub static ref TEST_ENDPOINT_ORIGIN: Endpoint = Endpoint::from_str("@origin").unwrap();
426 pub static ref TEST_ENDPOINT_A: Endpoint = Endpoint::from_str("@test-a").unwrap();
427 pub static ref TEST_ENDPOINT_B: Endpoint = Endpoint::from_str("@test-b").unwrap();
428 }
429
430 #[tokio::test]
431 async fn receive_single_block() {
432 let (incoming_sections_sender, mut incoming_sections_receiver) =
433 create_unbounded_channel::<IncomingSection>();
434
435 let block_handler = BlockHandler::init(incoming_sections_sender);
436 let context_id = block_handler.get_new_context_id();
437
438 let mut block = DXBBlock {
440 block_header: BlockHeader {
441 context_id,
442 flags_and_timestamp: FlagsAndTimestamp::new()
443 .with_is_end_of_section(true)
444 .with_is_end_of_context(true),
445 ..BlockHeader::default()
446 },
447 routing_header: RoutingHeader::default()
448 .with_sender(TEST_ENDPOINT_A.clone())
449 .with_receivers(Receivers::Endpoints(vec![
450 TEST_ENDPOINT_ORIGIN.clone(),
451 ]))
452 .to_owned(),
453 ..DXBBlock::default()
454 };
455 let block_endpoint_context_id = block.get_endpoint_context_id();
456
457 block_handler.handle_incoming_block(block);
459
460 let incoming_block = incoming_sections_receiver.next().await.unwrap();
462 match &incoming_block {
463 IncomingSection::SingleBlock((Some(received_block), ..)) => {
464 assert_eq!(
465 received_block.get_endpoint_context_id(),
466 block_endpoint_context_id
467 );
468 }
469 _ => panic!("Expected a SingleBlock section"),
470 }
471 }
472
473 #[tokio::test]
474 async fn receive_multiple_blocks() {
475 let (incoming_sections_sender, mut incoming_sections_receiver) =
476 create_unbounded_channel::<IncomingSection>();
477
478 let block_handler = BlockHandler::init(incoming_sections_sender);
479
480 let context_id = block_handler.get_new_context_id();
481 let section_index = 42;
482
483 let mut blocks = gen move {
485 yield DXBBlock {
486 block_header: BlockHeader {
487 context_id,
488 section_index,
489 block_number: 0,
490 flags_and_timestamp: FlagsAndTimestamp::new()
491 .with_is_end_of_section(false)
492 .with_is_end_of_context(false),
493 ..BlockHeader::default()
494 },
495 routing_header: RoutingHeader::default()
496 .with_sender(TEST_ENDPOINT_A.clone())
497 .with_receivers(Receivers::Endpoints(vec![
498 TEST_ENDPOINT_ORIGIN.clone(),
499 ]))
500 .to_owned(),
501 ..DXBBlock::default()
502 };
503
504 yield DXBBlock {
505 block_header: BlockHeader {
506 context_id,
507 section_index,
508 block_number: 1,
509 flags_and_timestamp: FlagsAndTimestamp::new()
510 .with_is_end_of_section(true)
511 .with_is_end_of_context(true),
512 ..BlockHeader::default()
513 },
514 routing_header: RoutingHeader::default()
515 .with_sender(TEST_ENDPOINT_A.clone())
516 .with_receivers(Receivers::Endpoints(vec![
517 TEST_ENDPOINT_ORIGIN.clone(),
518 ]))
519 .to_owned(),
520 ..DXBBlock::default()
521 };
522 };
523
524 block_handler.handle_incoming_block(blocks.next().unwrap());
526
527 info!("Checking incoming sections...");
528
529 let mut section = incoming_sections_receiver.next().await.unwrap();
531 match §ion {
532 IncomingSection::BlockStream((
533 Some(blocks),
534 incoming_context_section_id,
535 )) => {
536 assert_eq!(
538 incoming_context_section_id.section_index,
539 section_index
540 );
541
542 assert!(section.next().await.is_some());
544 }
545 _ => core::panic!("Expected a BlockStream section"),
546 }
547
548 block_handler.handle_incoming_block(blocks.next().unwrap());
550
551 match §ion {
554 IncomingSection::BlockStream((
555 Some(blocks),
556 incoming_context_section_id,
557 )) => {
558 assert_eq!(
560 incoming_context_section_id.section_index,
561 section_index
562 );
563 assert_eq!(section.drain().await.len(), 1);
565 }
566 _ => core::panic!("Expected a BlockStream section"),
567 }
568 }
569
570 #[tokio::test]
571 async fn receive_multiple_blocks_wrong_order() {
572 let (incoming_sections_sender, mut incoming_sections_receiver) =
573 create_unbounded_channel::<IncomingSection>();
574
575 let block_handler = BlockHandler::init(incoming_sections_sender);
576
577 let context_id = block_handler.get_new_context_id();
578 let section_index = 42;
579
580 let mut blocks = gen move {
582 yield DXBBlock {
583 block_header: BlockHeader {
584 context_id,
585 section_index,
586 block_number: 1,
587 flags_and_timestamp: FlagsAndTimestamp::new()
588 .with_is_end_of_section(true)
589 .with_is_end_of_context(true),
590 ..BlockHeader::default()
591 },
592 routing_header: RoutingHeader::default()
593 .with_sender(TEST_ENDPOINT_A.clone())
594 .with_receivers(Receivers::Endpoints(vec![
595 TEST_ENDPOINT_ORIGIN.clone(),
596 ]))
597 .to_owned(),
598 ..DXBBlock::default()
599 };
600
601 yield DXBBlock {
602 block_header: BlockHeader {
603 context_id,
604 section_index,
605 block_number: 0,
606 flags_and_timestamp: FlagsAndTimestamp::new()
607 .with_is_end_of_section(false)
608 .with_is_end_of_context(false),
609 ..BlockHeader::default()
610 },
611 routing_header: RoutingHeader::default()
612 .with_sender(TEST_ENDPOINT_A.clone())
613 .with_receivers(Receivers::Endpoints(vec![
614 TEST_ENDPOINT_ORIGIN.clone(),
615 ]))
616 .to_owned(),
617 ..DXBBlock::default()
618 };
619 };
620
621 block_handler.handle_incoming_block(blocks.next().unwrap());
623
624 block_handler.handle_incoming_block(blocks.next().unwrap());
626
627 let mut section = incoming_sections_receiver.next().await.unwrap();
629 match §ion {
631 IncomingSection::BlockStream((
632 Some(blocks),
633 incoming_context_section_id,
634 )) => {
635 assert_eq!(
637 incoming_context_section_id.section_index.clone(),
638 section_index
639 );
640 let blocks = section.drain().await;
642 assert_eq!(blocks.len(), 2);
643
644 let block = blocks.first().unwrap();
647 assert_eq!(block.block_header.block_number, 0);
648 let block = blocks.get(1).unwrap();
650 assert_eq!(block.block_header.block_number, 1);
651 }
652 _ => core::panic!("Expected a BlockStream section"),
653 }
654 }
655
656 #[tokio::test]
657 async fn receive_multiple_sections() {
658 let (incoming_sections_sender, mut incoming_sections_receiver) =
659 create_unbounded_channel::<IncomingSection>();
660
661 let block_handler = BlockHandler::init(incoming_sections_sender);
662
663 let context_id = block_handler.get_new_context_id();
664 let section_index_1 = 42;
665 let section_index_2 = 43;
666
667 let mut blocks = gen move {
669 yield DXBBlock {
671 block_header: BlockHeader {
672 context_id,
673 section_index: section_index_1,
674 block_number: 0,
675 flags_and_timestamp: FlagsAndTimestamp::new()
676 .with_is_end_of_section(false)
677 .with_is_end_of_context(false),
678 ..BlockHeader::default()
679 },
680 routing_header: RoutingHeader::default()
681 .with_sender(TEST_ENDPOINT_A.clone())
682 .with_receivers(Receivers::Endpoints(vec![
683 TEST_ENDPOINT_ORIGIN.clone(),
684 ]))
685 .to_owned(),
686 ..DXBBlock::default()
687 };
688
689 yield DXBBlock {
690 block_header: BlockHeader {
691 context_id,
692 section_index: section_index_1,
693 block_number: 1,
694 flags_and_timestamp: FlagsAndTimestamp::new()
695 .with_is_end_of_section(true)
696 .with_is_end_of_context(false),
697 ..BlockHeader::default()
698 },
699 routing_header: RoutingHeader::default()
700 .with_sender(TEST_ENDPOINT_A.clone())
701 .with_receivers(Receivers::Endpoints(vec![
702 TEST_ENDPOINT_ORIGIN.clone(),
703 ]))
704 .to_owned(),
705 ..DXBBlock::default()
706 };
707
708 yield DXBBlock {
710 block_header: BlockHeader {
711 context_id,
712 section_index: section_index_2,
713 block_number: 2,
714 flags_and_timestamp: FlagsAndTimestamp::new()
715 .with_is_end_of_section(false)
716 .with_is_end_of_context(false),
717 ..BlockHeader::default()
718 },
719 routing_header: RoutingHeader::default()
720 .with_sender(TEST_ENDPOINT_A.clone())
721 .with_receivers(Receivers::Endpoints(vec![
722 TEST_ENDPOINT_ORIGIN.clone(),
723 ]))
724 .to_owned(),
725 ..DXBBlock::default()
726 };
727
728 yield DXBBlock {
729 block_header: BlockHeader {
730 context_id,
731 section_index: section_index_2,
732 block_number: 3,
733 flags_and_timestamp: FlagsAndTimestamp::new()
734 .with_is_end_of_section(true)
735 .with_is_end_of_context(true),
736 ..BlockHeader::default()
737 },
738 routing_header: RoutingHeader::default()
739 .with_sender(TEST_ENDPOINT_A.clone())
740 .with_receivers(Receivers::Endpoints(vec![
741 TEST_ENDPOINT_ORIGIN.clone(),
742 ]))
743 .to_owned(),
744 ..DXBBlock::default()
745 };
746 };
747
748 block_handler.handle_incoming_block(blocks.next().unwrap());
750
751 let mut section = incoming_sections_receiver.next().await.unwrap();
753 match §ion {
755 IncomingSection::BlockStream((
756 Some(blocks),
757 incoming_context_section_id,
758 )) => {
759 assert_eq!(
761 incoming_context_section_id.section_index,
762 section_index_1
763 );
764 assert!(section.next().await.is_some());
766 }
767 _ => core::panic!("Expected a BlockStream section"),
768 }
769
770 block_handler.handle_incoming_block(blocks.next().unwrap());
772
773 match §ion {
775 IncomingSection::BlockStream((
776 Some(blocks),
777 incoming_context_section_id,
778 )) => {
779 assert_eq!(
781 incoming_context_section_id.section_index,
782 section_index_1
783 );
784
785 assert_eq!(section.drain().await.len(), 1);
787 }
788 _ => core::panic!("Expected a BlockStream section"),
789 }
790
791 block_handler.handle_incoming_block(blocks.next().unwrap());
793
794 let mut section = incoming_sections_receiver.next().await.unwrap();
796 match §ion {
798 IncomingSection::BlockStream((
799 Some(blocks),
800 incoming_context_section_id,
801 )) => {
802 assert_eq!(
804 incoming_context_section_id.section_index,
805 section_index_2
806 );
807 assert!(section.next().await.is_some());
809 }
810 _ => core::panic!("Expected a BlockStream section"),
811 }
812
813 block_handler.handle_incoming_block(blocks.next().unwrap());
815
816 match §ion {
819 IncomingSection::BlockStream((
820 Some(blocks),
821 incoming_context_section_id,
822 )) => {
823 assert_eq!(
825 incoming_context_section_id.section_index,
826 section_index_2
827 );
828 assert_eq!(section.drain().await.len(), 1);
830 }
831 _ => core::panic!("Expected a BlockStream section"),
832 }
833 }
834
835 #[tokio::test]
836 #[timeout(2000)]
837 #[cfg(feature = "std")]
838 async fn await_response_block() {
839 let (incoming_sections_sender, mut incoming_sections_receiver) =
840 create_unbounded_channel::<IncomingSection>();
841
842 let block_handler = BlockHandler::init(incoming_sections_sender);
843
844 let context_id = block_handler.get_new_context_id();
845 let section_index = 42;
846
847 let block = DXBBlock {
849 block_header: BlockHeader {
850 context_id,
851 section_index,
852 flags_and_timestamp: FlagsAndTimestamp::new()
853 .with_block_type(BlockType::Response)
854 .with_is_end_of_section(true)
855 .with_is_end_of_context(true),
856 ..BlockHeader::default()
857 },
858 routing_header: RoutingHeader::default()
859 .with_sender(TEST_ENDPOINT_A.clone())
860 .with_receivers(Receivers::Endpoints(vec![
861 TEST_ENDPOINT_ORIGIN.clone(),
862 ]))
863 .to_owned(),
864 ..DXBBlock::default()
865 };
866
867 let mut rx = block_handler
869 .register_incoming_block_observer(context_id, section_index);
870
871 block_handler.handle_incoming_block(block);
873
874 let response = rx.next().await.unwrap();
876
877 match response {
879 IncomingSection::SingleBlock((Some(block), _)) => {
880 info!("section: {block:?}");
881 assert_eq!(block.block_header.context_id, context_id);
882 assert_eq!(block.block_header.section_index, section_index);
883 }
884 _ => core::panic!("Expected a SingleBlock section"),
885 }
886 }
887}