datex_core/global/
dxb_block.rs

1use std::fmt::Display;
2use std::io::{Cursor, Read};
3// FIXME #109 no-std
4
5use super::protocol_structures::{
6    block_header::BlockHeader,
7    encrypted_header::EncryptedHeader,
8    routing_header::{EncryptionType, RoutingHeader, SignatureType},
9};
10use crate::global::protocol_structures::routing_header::Receivers;
11use crate::utils::buffers::write_u16;
12use crate::values::core_values::endpoint::Endpoint;
13use binrw::{BinRead, BinWrite};
14use futures::channel::mpsc::UnboundedReceiver;
15use futures_util::StreamExt;
16use log::error;
17use strum::Display;
18use thiserror::Error;
19
20#[derive(Debug, Display, Error)]
21pub enum HeaderParsingError {
22    InvalidBlock,
23    InsufficientLength,
24}
25
26// TODO #110: RawDXBBlock that is received in com_hub, only containing RoutingHeader, BlockHeader and raw bytes
27
28// TODO #429 @Norbert
29// Add optional raw signature, and encrypted part
30#[cfg_attr(feature = "debug", derive(serde::Serialize, serde::Deserialize))]
31#[derive(Debug, Clone, Default)]
32pub struct DXBBlock {
33    pub routing_header: RoutingHeader,
34    pub block_header: BlockHeader,
35    pub encrypted_header: EncryptedHeader,
36    pub body: Vec<u8>,
37
38    #[cfg_attr(feature = "debug", serde(skip))]
39    pub raw_bytes: Option<Vec<u8>>,
40}
41
42impl PartialEq for DXBBlock {
43    fn eq(&self, other: &Self) -> bool {
44        self.routing_header == other.routing_header
45            && self.block_header == other.block_header
46            && self.encrypted_header == other.encrypted_header
47            && self.body == other.body
48    }
49}
50
51const SIZE_BYTE_POSITION: usize = 3; // magic number (2 bytes) + version (1 byte)
52const SIZE_BYTES: usize = 2;
53
54pub type IncomingContextId = u32;
55pub type IncomingSectionIndex = u16;
56pub type IncomingBlockNumber = u16;
57pub type OutgoingContextId = u32;
58pub type OutgoingSectionIndex = u16;
59pub type OutgoingBlockNumber = u16;
60
61#[allow(clippy::large_enum_variant)]
62#[derive(Debug)]
63pub enum IncomingSection {
64    /// a single block
65    SingleBlock((Option<DXBBlock>, IncomingEndpointContextSectionId)),
66    /// a stream of blocks
67    /// the stream is finished when a block has the end_of_block flag set
68    BlockStream(
69        (
70            Option<UnboundedReceiver<DXBBlock>>,
71            IncomingEndpointContextSectionId,
72        ),
73    ),
74}
75
76impl IncomingSection {
77    pub async fn next(&mut self) -> Option<DXBBlock> {
78        match self {
79            IncomingSection::SingleBlock((block, _)) => block.take(),
80            IncomingSection::BlockStream((blocks, _)) => {
81                if let Some(receiver) = blocks {
82                    receiver.next().await
83                } else {
84                    None // No blocks to receive
85                }
86            }
87        }
88    }
89
90    pub async fn drain(&mut self) -> Vec<DXBBlock> {
91        let mut blocks = Vec::new();
92        while let Some(block) = self.next().await {
93            blocks.push(block);
94        }
95        blocks
96    }
97}
98
99impl IncomingSection {
100    pub fn get_section_index(&self) -> IncomingSectionIndex {
101        self.get_section_context_id().section_index
102    }
103
104    pub fn get_sender(&self) -> Endpoint {
105        self.get_section_context_id()
106            .endpoint_context_id
107            .sender
108            .clone()
109    }
110
111    pub fn get_section_context_id(&self) -> &IncomingEndpointContextSectionId {
112        match self {
113            IncomingSection::SingleBlock((_, section_context_id))
114            | IncomingSection::BlockStream((_, section_context_id)) => {
115                section_context_id
116            }
117        }
118    }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, Hash)]
122pub struct IncomingEndpointContextId {
123    pub sender: Endpoint,
124    pub context_id: IncomingContextId,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq, Hash)]
128pub struct IncomingEndpointContextSectionId {
129    pub endpoint_context_id: IncomingEndpointContextId,
130    pub section_index: IncomingSectionIndex,
131}
132
133impl IncomingEndpointContextSectionId {
134    pub fn new(
135        endpoint_context_id: IncomingEndpointContextId,
136        section_index: IncomingSectionIndex,
137    ) -> Self {
138        IncomingEndpointContextSectionId {
139            endpoint_context_id,
140            section_index,
141        }
142    }
143}
144
145/// An identifier that defines a globally unique block
146#[derive(Debug, Clone, PartialEq, Eq, Hash)]
147pub struct BlockId {
148    pub endpoint_context_id: IncomingEndpointContextId,
149    pub timestamp: u64,
150    pub current_section_index: IncomingSectionIndex,
151    pub current_block_number: IncomingBlockNumber,
152}
153
154impl DXBBlock {
155    pub fn new(
156        routing_header: RoutingHeader,
157        block_header: BlockHeader,
158        encrypted_header: EncryptedHeader,
159        body: Vec<u8>,
160    ) -> DXBBlock {
161        DXBBlock {
162            routing_header,
163            block_header,
164            encrypted_header,
165            body,
166            raw_bytes: None,
167        }
168    }
169
170    pub fn to_bytes(&self) -> Result<Vec<u8>, binrw::Error> {
171        let mut writer = Cursor::new(Vec::new());
172        self.routing_header.write(&mut writer)?;
173        self.block_header.write(&mut writer)?;
174        self.encrypted_header.write(&mut writer)?;
175        let mut bytes = writer.into_inner();
176        bytes.extend_from_slice(&self.body);
177        Ok(DXBBlock::adjust_block_length(bytes))
178    }
179    pub fn recalculate_struct(&mut self) -> &mut Self {
180        let bytes = self.to_bytes().unwrap();
181        let size = bytes.len() as u16;
182        self.routing_header.block_size = size;
183        self
184    }
185
186    fn adjust_block_length(mut bytes: Vec<u8>) -> Vec<u8> {
187        let size = bytes.len() as u32;
188        write_u16(&mut bytes, &mut SIZE_BYTE_POSITION.clone(), size as u16);
189        bytes
190    }
191
192    pub fn has_dxb_magic_number(dxb: &[u8]) -> bool {
193        dxb.len() >= 2 && dxb[0] == 0x01 && dxb[1] == 0x64
194    }
195
196    pub fn extract_dxb_block_length(
197        dxb: &[u8],
198    ) -> Result<u16, HeaderParsingError> {
199        if dxb.len() < SIZE_BYTE_POSITION + SIZE_BYTES {
200            return Err(HeaderParsingError::InsufficientLength);
201        }
202        let routing_header = RoutingHeader::read(&mut Cursor::new(dxb))
203            .map_err(|e| {
204                error!("Failed to read routing header: {e:?}");
205                HeaderParsingError::InvalidBlock
206            })?;
207        Ok(routing_header.block_size)
208    }
209
210    pub fn from_bytes(bytes: &[u8]) -> Result<DXBBlock, binrw::Error> {
211        let mut reader = Cursor::new(bytes);
212        let routing_header = RoutingHeader::read(&mut reader)?;
213
214        let _signature = match routing_header.flags.signature_type() {
215            SignatureType::Encrypted => {
216                // extract next 255 bytes as the signature
217                let mut signature = Vec::with_capacity(255);
218                reader.read_exact(&mut signature)?;
219
220                // TODO #111: decrypt the signature
221                Some(signature)
222            }
223            SignatureType::Unencrypted => {
224                // extract next 255 bytes as the signature
225                let mut signature = Vec::with_capacity(255);
226                reader.read_exact(&mut signature)?;
227                Some(signature)
228            }
229            SignatureType::None => None,
230        };
231
232        // TODO #112: validate the signature
233        let decrypted_bytes = match routing_header.flags.encryption_type() {
234            EncryptionType::Encrypted => {
235                // TODO #113: decrypt the body
236                let mut decrypted_bytes = Vec::with_capacity(255);
237                reader.read_exact(&mut decrypted_bytes)?;
238                decrypted_bytes
239            }
240            EncryptionType::None => {
241                let mut bytes = Vec::new();
242                reader.read_to_end(&mut bytes)?;
243                bytes
244            }
245        };
246
247        let mut reader = Cursor::new(decrypted_bytes);
248        let block_header = BlockHeader::read(&mut reader)?;
249        let encrypted_header = EncryptedHeader::read(&mut reader)?;
250
251        let mut body = Vec::new();
252        reader.read_to_end(&mut body)?;
253
254        Ok(DXBBlock {
255            routing_header,
256            block_header,
257            encrypted_header,
258            body,
259            raw_bytes: Some(bytes.to_vec()),
260        })
261    }
262
263    /// Get a list of all receiver endpoints from the routing header.
264    pub fn receiver_endpoints(&self) -> Vec<Endpoint> {
265        match self.routing_header.receivers() {
266            Receivers::Endpoints(endpoints) => endpoints,
267            Receivers::EndpointsWithKeys(endpoints_with_keys) => {
268                endpoints_with_keys.into_iter().map(|(e, _)| e).collect()
269            }
270            Receivers::PointerId(_) => unimplemented!(),
271            _ => Vec::new(),
272        }
273    }
274    pub fn receivers(&self) -> Receivers {
275        self.routing_header.receivers()
276    }
277
278    /// Update the receivers list in the routing header.
279    pub fn set_receivers<T>(&mut self, endpoints: T)
280    where
281        T: Into<Receivers>,
282    {
283        self.routing_header.set_receivers(endpoints.into());
284    }
285
286    pub fn set_bounce_back(&mut self, bounce_back: bool) {
287        self.routing_header.flags.set_is_bounce_back(bounce_back);
288    }
289
290    pub fn is_bounce_back(&self) -> bool {
291        self.routing_header.flags.is_bounce_back()
292    }
293
294    pub fn get_sender(&self) -> &Endpoint {
295        &self.routing_header.sender
296    }
297
298    pub fn get_endpoint_context_id(&self) -> IncomingEndpointContextId {
299        IncomingEndpointContextId {
300            sender: self.routing_header.sender.clone(),
301            context_id: self.block_header.context_id,
302        }
303    }
304
305    pub fn get_block_id(&self) -> BlockId {
306        BlockId {
307            endpoint_context_id: self.get_endpoint_context_id(),
308            timestamp: self
309                .block_header
310                .flags_and_timestamp
311                .creation_timestamp(),
312            current_section_index: self.block_header.section_index,
313            current_block_number: self.block_header.block_number,
314        }
315    }
316
317    /// Returns true if the block has a fixed number of receivers
318    /// without wildcard instances, and no @@any receiver.
319    pub fn has_exact_receiver_count(&self) -> bool {
320        !self
321            .receiver_endpoints()
322            .iter()
323            .any(|e| e.is_broadcast() || e.is_any())
324    }
325
326    pub fn clone_with_new_receivers<T>(&self, new_receivers: T) -> DXBBlock
327    where
328        T: Into<Receivers>,
329    {
330        let mut new_block = self.clone();
331        new_block.set_receivers(new_receivers.into());
332        new_block
333    }
334}
335
336impl Display for DXBBlock {
337    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
338        let block_type = self.block_header.flags_and_timestamp.block_type();
339        let sender = &self.routing_header.sender;
340        let receivers = self.receivers();
341        write!(f, "[{block_type}] {sender} -> {receivers}")?;
342
343        Ok(())
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use std::str::FromStr;
350
351    use crate::{
352        global::{
353            dxb_block::DXBBlock,
354            protocol_structures::{
355                encrypted_header::{self, EncryptedHeader},
356                routing_header::RoutingHeader,
357            },
358        },
359        values::core_values::endpoint::Endpoint,
360    };
361
362    #[test]
363    pub fn test_recalculate() {
364        let mut routing_header = RoutingHeader::default()
365            .with_sender(Endpoint::from_str("@test").unwrap())
366            .to_owned();
367        routing_header.set_size(420);
368        let mut block = DXBBlock {
369            body: vec![0x01, 0x02, 0x03],
370            encrypted_header: EncryptedHeader {
371                flags: encrypted_header::Flags::new()
372                    .with_user_agent(encrypted_header::UserAgent::Unused11),
373                ..Default::default()
374            },
375            routing_header,
376            ..DXBBlock::default()
377        };
378
379        {
380            // invalid block size
381            let block_bytes = block.to_bytes().unwrap();
382            let block2: DXBBlock = DXBBlock::from_bytes(&block_bytes).unwrap();
383            assert_ne!(block, block2);
384        }
385
386        {
387            // valid block size
388            block.recalculate_struct();
389            let block_bytes = block.to_bytes().unwrap();
390            let block3: DXBBlock = DXBBlock::from_bytes(&block_bytes).unwrap();
391            assert_eq!(block, block3);
392        }
393    }
394}