datex_core/global/
dxb_block.rs

1use std::fmt::Display;
2use std::io::{Cursor, Read};
3// FIXME #109 no-std
4
5use super::protocol_structures::{
6    block_header::BlockHeader,
7    encrypted_header::EncryptedHeader,
8    routing_header::{BlockSize, EncryptionType, RoutingHeader, SignatureType},
9};
10use crate::global::protocol_structures::routing_header::ReceiverEndpoints;
11use crate::utils::buffers::{clear_bit, set_bit, write_u16, write_u32};
12use crate::values::core_values::endpoint::Endpoint;
13use binrw::{BinRead, BinWrite};
14use futures::channel::mpsc::UnboundedReceiver;
15use futures_util::StreamExt;
16use log::error;
17use strum::Display;
18use thiserror::Error;
19
20#[derive(Debug, Display, Error)]
21pub enum HeaderParsingError {
22    InvalidBlock,
23    InsufficientLength,
24}
25
26// TODO #110: RawDXBBlock that is received in com_hub, only containing RoutingHeader, BlockHeader and raw bytes
27
28#[derive(Debug, Clone, Default)]
29pub struct DXBBlock {
30    pub routing_header: RoutingHeader,
31    pub block_header: BlockHeader,
32    pub encrypted_header: EncryptedHeader,
33    pub body: Vec<u8>,
34    pub raw_bytes: Option<Vec<u8>>,
35}
36
37impl PartialEq for DXBBlock {
38    fn eq(&self, other: &Self) -> bool {
39        self.routing_header == other.routing_header
40            && self.block_header == other.block_header
41            && self.encrypted_header == other.encrypted_header
42            && self.body == other.body
43    }
44}
45
46const ROUTING_HEADER_FLAGS_POSITION: usize = 5;
47const SIZE_BYTE_POSITION: usize = ROUTING_HEADER_FLAGS_POSITION + 1;
48const MAX_SIZE_BYTE_LENGTH: usize = 4;
49const ROUTING_HEADER_FLAGS_SIZE_BIT_POSITION: u8 = 3;
50
51pub type IncomingContextId = u32;
52pub type IncomingSectionIndex = u16;
53pub type IncomingBlockNumber = u16;
54pub type OutgoingContextId = u32;
55pub type OutgoingSectionIndex = u16;
56pub type OutgoingBlockNumber = u16;
57
58#[allow(clippy::large_enum_variant)]
59#[derive(Debug)]
60pub enum IncomingSection {
61    /// a single block
62    SingleBlock((Option<DXBBlock>, IncomingEndpointContextSectionId)),
63    /// a stream of blocks
64    /// the stream is finished when a block has the end_of_block flag set
65    BlockStream(
66        (
67            Option<UnboundedReceiver<DXBBlock>>,
68            IncomingEndpointContextSectionId,
69        ),
70    ),
71}
72
73impl IncomingSection {
74    pub async fn next(&mut self) -> Option<DXBBlock> {
75        match self {
76            IncomingSection::SingleBlock((block, _)) => block.take(),
77            IncomingSection::BlockStream((blocks, _)) => {
78                if let Some(receiver) = blocks {
79                    receiver.next().await
80                } else {
81                    None // No blocks to receive
82                }
83            }
84        }
85    }
86
87    pub async fn drain(&mut self) -> Vec<DXBBlock> {
88        let mut blocks = Vec::new();
89        while let Some(block) = self.next().await {
90            blocks.push(block);
91        }
92        blocks
93    }
94}
95
96impl IncomingSection {
97    pub fn get_section_index(&self) -> IncomingSectionIndex {
98        self.get_section_context_id().section_index
99    }
100
101    pub fn get_sender(&self) -> Endpoint {
102        self.get_section_context_id()
103            .endpoint_context_id
104            .sender
105            .clone()
106    }
107
108    pub fn get_section_context_id(&self) -> &IncomingEndpointContextSectionId {
109        match self {
110            IncomingSection::SingleBlock((_, section_context_id))
111            | IncomingSection::BlockStream((_, section_context_id)) => {
112                section_context_id
113            }
114        }
115    }
116}
117
118#[derive(Debug, Clone, PartialEq, Eq, Hash)]
119pub struct IncomingEndpointContextId {
120    pub sender: Endpoint,
121    pub context_id: IncomingContextId,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq, Hash)]
125pub struct IncomingEndpointContextSectionId {
126    pub endpoint_context_id: IncomingEndpointContextId,
127    pub section_index: IncomingSectionIndex,
128}
129
130impl IncomingEndpointContextSectionId {
131    pub fn new(
132        endpoint_context_id: IncomingEndpointContextId,
133        section_index: IncomingSectionIndex,
134    ) -> Self {
135        IncomingEndpointContextSectionId {
136            endpoint_context_id,
137            section_index,
138        }
139    }
140}
141
142/// An identifier that defines a globally unique block
143#[derive(Debug, Clone, PartialEq, Eq, Hash)]
144pub struct BlockId {
145    pub endpoint_context_id: IncomingEndpointContextId,
146    pub timestamp: u64,
147    pub current_section_index: IncomingSectionIndex,
148    pub current_block_number: IncomingBlockNumber,
149}
150
151impl DXBBlock {
152    pub fn new(
153        routing_header: RoutingHeader,
154        block_header: BlockHeader,
155        encrypted_header: EncryptedHeader,
156        body: Vec<u8>,
157    ) -> DXBBlock {
158        DXBBlock {
159            routing_header,
160            block_header,
161            encrypted_header,
162            body,
163            raw_bytes: None,
164        }
165    }
166
167    pub fn to_bytes(&self) -> Result<Vec<u8>, binrw::Error> {
168        let mut writer = Cursor::new(Vec::new());
169        self.routing_header.write(&mut writer)?;
170        self.block_header.write(&mut writer)?;
171        self.encrypted_header.write(&mut writer)?;
172        let mut bytes = writer.into_inner();
173        bytes.extend_from_slice(&self.body);
174        Ok(DXBBlock::adjust_block_length(bytes, &self.routing_header))
175    }
176    pub fn recalculate_struct(&mut self) -> &mut Self {
177        let bytes = self.to_bytes().unwrap();
178        let size = bytes.len() as u32;
179        let is_small_size = size <= u16::MAX as u32;
180        self.routing_header.flags.set_block_size(if is_small_size {
181            BlockSize::Default
182        } else {
183            BlockSize::Large
184        });
185        self.routing_header.block_size_u16 = if is_small_size {
186            Some(size as u16)
187        } else {
188            None
189        };
190        self.routing_header.block_size_u32 =
191            if is_small_size { None } else { Some(size) };
192        self
193    }
194
195    fn adjust_block_length(
196        mut bytes: Vec<u8>,
197        routing_header: &RoutingHeader,
198    ) -> Vec<u8> {
199        let size = bytes.len() as u32;
200        let is_small_size = size <= u16::MAX as u32;
201
202        if is_small_size {
203            // replace u32 size with u16 size
204            if routing_header.flags.block_size() == BlockSize::Large {
205                bytes.remove(SIZE_BYTE_POSITION);
206            }
207            write_u16(&mut bytes, &mut SIZE_BYTE_POSITION.clone(), size as u16);
208        } else {
209            // replace u16 size with u32 size
210            if routing_header.flags.block_size() == BlockSize::Default {
211                bytes.insert(SIZE_BYTE_POSITION, 0);
212            }
213            write_u32(&mut bytes, &mut SIZE_BYTE_POSITION.clone(), size);
214        }
215
216        // update small size flag
217        if is_small_size {
218            clear_bit(
219                &mut bytes,
220                ROUTING_HEADER_FLAGS_POSITION,
221                ROUTING_HEADER_FLAGS_SIZE_BIT_POSITION,
222            );
223        } else {
224            set_bit(
225                &mut bytes,
226                ROUTING_HEADER_FLAGS_POSITION,
227                ROUTING_HEADER_FLAGS_SIZE_BIT_POSITION,
228            );
229        }
230        bytes
231    }
232
233    pub fn has_dxb_magic_number(dxb: &[u8]) -> bool {
234        dxb.len() >= 2 && dxb[0] == 0x01 && dxb[1] == 0x64
235    }
236
237    pub fn extract_dxb_block_length(
238        dxb: &[u8],
239    ) -> Result<u32, HeaderParsingError> {
240        if dxb.len() < SIZE_BYTE_POSITION + MAX_SIZE_BYTE_LENGTH {
241            return Err(HeaderParsingError::InsufficientLength);
242        }
243        let routing_header = RoutingHeader::read(&mut Cursor::new(dxb))
244            .map_err(|e| {
245                error!("Failed to read routing header: {e:?}");
246                HeaderParsingError::InvalidBlock
247            })?;
248        if routing_header.block_size_u16.is_some() {
249            Ok(routing_header.block_size_u16.unwrap() as u32)
250        } else {
251            Ok(routing_header.block_size_u32.unwrap())
252        }
253    }
254
255    pub fn from_bytes(bytes: &[u8]) -> Result<DXBBlock, binrw::Error> {
256        let mut reader = Cursor::new(bytes);
257        let routing_header = RoutingHeader::read(&mut reader)?;
258
259        let _signature = match routing_header.flags.signature_type() {
260            SignatureType::Encrypted => {
261                // extract next 255 bytes as the signature
262                let mut signature = Vec::with_capacity(255);
263                reader.read_exact(&mut signature)?;
264
265                // TODO #111: decrypt the signature
266                Some(signature)
267            }
268            SignatureType::Unencrypted => {
269                // extract next 255 bytes as the signature
270                let mut signature = Vec::with_capacity(255);
271                reader.read_exact(&mut signature)?;
272                Some(signature)
273            }
274            SignatureType::None => None,
275        };
276
277        // TODO #112: validate the signature
278        let decrypted_bytes = match routing_header.flags.encryption_type() {
279            EncryptionType::Encrypted => {
280                // TODO #113: decrypt the body
281                let mut decrypted_bytes = Vec::with_capacity(255);
282                reader.read_exact(&mut decrypted_bytes)?;
283                decrypted_bytes
284            }
285            EncryptionType::Unencrypted => {
286                let mut bytes = Vec::new();
287                reader.read_to_end(&mut bytes)?;
288                bytes
289            }
290        };
291
292        let mut reader = Cursor::new(decrypted_bytes);
293        let block_header = BlockHeader::read(&mut reader)?;
294        let encrypted_header = EncryptedHeader::read(&mut reader)?;
295
296        let mut body = Vec::new();
297        reader.read_to_end(&mut body)?;
298
299        Ok(DXBBlock {
300            routing_header,
301            block_header,
302            encrypted_header,
303            body,
304            raw_bytes: Some(bytes.to_vec()),
305        })
306    }
307
308    /// Get a list of all receiver endpoints from the routing header.
309    pub fn receivers(&self) -> Option<&Vec<Endpoint>> {
310        if let Some(endpoints) = &self.routing_header.receivers.endpoints {
311            Some(&endpoints.endpoints)
312        } else {
313            None
314        }
315    }
316
317    /// Update the receivers list in the routing header.
318    pub fn set_receivers(&mut self, receivers: &[Endpoint]) {
319        self.routing_header.receivers.endpoints =
320            Some(ReceiverEndpoints::new(receivers.to_vec()));
321        self.routing_header
322            .receivers
323            .flags
324            .set_has_endpoints(!receivers.is_empty());
325    }
326
327    pub fn set_bounce_back(&mut self, bounce_back: bool) {
328        self.routing_header.flags.set_is_bounce_back(bounce_back);
329    }
330
331    pub fn is_bounce_back(&self) -> bool {
332        self.routing_header.flags.is_bounce_back()
333    }
334
335    pub fn get_receivers(&self) -> Vec<Endpoint> {
336        if let Some(ref endpoints) = self.routing_header.receivers.endpoints {
337            endpoints.endpoints.clone()
338        } else if let Some(ref endpoints) =
339            self.routing_header.receivers.endpoints_with_keys
340        {
341            endpoints
342                .endpoints_with_keys
343                .iter()
344                .map(|(e, _)| e.clone())
345                .collect()
346        } else {
347            unreachable!("No receivers set in the routing header")
348        }
349    }
350
351    pub fn get_sender(&self) -> &Endpoint {
352        &self.routing_header.sender
353    }
354
355    pub fn get_endpoint_context_id(&self) -> IncomingEndpointContextId {
356        IncomingEndpointContextId {
357            sender: self.routing_header.sender.clone(),
358            context_id: self.block_header.context_id,
359        }
360    }
361
362    pub fn get_block_id(&self) -> BlockId {
363        BlockId {
364            endpoint_context_id: self.get_endpoint_context_id(),
365            timestamp: self.block_header.flags_and_timestamp.creation_timestamp(),
366            current_section_index: self.block_header.section_index,
367            current_block_number: self.block_header.block_number,
368        }
369    }
370
371    /// Returns true if the block has a fixed number of receivers
372    /// without wildcard instances, and no @@any receiver.
373    pub fn has_exact_receiver_count(&self) -> bool {
374        !self
375            .get_receivers()
376            .iter()
377            .any(|e| e.is_broadcast() || e.is_any())
378    }
379
380    pub fn clone_with_new_receivers(
381        &self,
382        new_receivers: &[Endpoint],
383    ) -> DXBBlock {
384        let mut new_block = self.clone();
385        new_block.set_receivers(new_receivers);
386        new_block
387    }
388}
389
390impl Display for DXBBlock {
391    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
392        let block_type = self.block_header.flags_and_timestamp.block_type();
393        let sender = &self.routing_header.sender;
394        let receivers = self
395            .receivers()
396            .map(|endpoints| {
397                endpoints
398                    .iter()
399                    .map(|e| e.to_string())
400                    .collect::<Vec<_>>()
401                    .join(", ")
402            })
403            .unwrap_or("none".to_string());
404
405        write!(f, "[{block_type}] {sender} -> {receivers}")?;
406
407        Ok(())
408    }
409}