datex_core/global/
dxb_block.rs

1use super::protocol_structures::{
2    block_header::BlockHeader,
3    encrypted_header::EncryptedHeader,
4    routing_header::{EncryptionType, RoutingHeader, SignatureType},
5};
6use crate::global::protocol_structures::routing_header::Receivers;
7use crate::stdlib::vec::Vec;
8use crate::task::UnboundedReceiver;
9use crate::utils::buffers::write_u16;
10use crate::values::core_values::endpoint::Endpoint;
11use binrw::io::{Cursor, Read};
12use binrw::{BinRead, BinWrite};
13use core::fmt::Display;
14use core::prelude::rust_2024::*;
15use core::result::Result;
16use core::unimplemented;
17use log::error;
18use strum::Display;
19use thiserror::Error;
20
21#[derive(Debug, Display, Error)]
22pub enum HeaderParsingError {
23    InvalidBlock,
24    InsufficientLength,
25}
26
27// TODO #110: RawDXBBlock that is received in com_hub, only containing RoutingHeader, BlockHeader and raw bytes
28
29// TODO #429 @Norbert
30// Add optional raw signature, and encrypted part
31#[cfg_attr(feature = "debug", derive(serde::Serialize, serde::Deserialize))]
32#[derive(Debug, Clone, Default)]
33pub struct DXBBlock {
34    pub routing_header: RoutingHeader,
35    pub block_header: BlockHeader,
36    pub signature: Option<Vec<u8>>,
37    pub encrypted_header: EncryptedHeader,
38    pub body: Vec<u8>,
39
40    #[cfg_attr(feature = "debug", serde(skip))]
41    pub raw_bytes: Option<Vec<u8>>,
42}
43
44impl PartialEq for DXBBlock {
45    fn eq(&self, other: &Self) -> bool {
46        self.routing_header == other.routing_header
47            && self.block_header == other.block_header
48            && self.encrypted_header == other.encrypted_header
49            && self.body == other.body
50    }
51}
52
53const SIZE_BYTE_POSITION: usize = 3; // magic number (2 bytes) + version (1 byte)
54const SIZE_BYTES: usize = 2;
55
56pub type IncomingContextId = u32;
57pub type IncomingSectionIndex = u16;
58pub type IncomingBlockNumber = u16;
59pub type OutgoingContextId = u32;
60pub type OutgoingSectionIndex = u16;
61pub type OutgoingBlockNumber = u16;
62
63#[allow(clippy::large_enum_variant)]
64#[derive(Debug)]
65pub enum IncomingSection {
66    /// a single block
67    SingleBlock((Option<DXBBlock>, IncomingEndpointContextSectionId)),
68    /// a stream of blocks
69    /// the stream is finished when a block has the end_of_block flag set
70    BlockStream(
71        (
72            Option<UnboundedReceiver<DXBBlock>>,
73            IncomingEndpointContextSectionId,
74        ),
75    ),
76}
77
78impl IncomingSection {
79    pub async fn next(&mut self) -> Option<DXBBlock> {
80        match self {
81            IncomingSection::SingleBlock((block, _)) => block.take(),
82            IncomingSection::BlockStream((blocks, _)) => {
83                if let Some(receiver) = blocks {
84                    receiver.next().await
85                } else {
86                    None // No blocks to receive
87                }
88            }
89        }
90    }
91
92    pub async fn drain(&mut self) -> Vec<DXBBlock> {
93        let mut blocks = Vec::new();
94        while let Some(block) = self.next().await {
95            blocks.push(block);
96        }
97        blocks
98    }
99}
100
101impl IncomingSection {
102    pub fn get_section_index(&self) -> IncomingSectionIndex {
103        self.get_section_context_id().section_index
104    }
105
106    pub fn get_sender(&self) -> Endpoint {
107        self.get_section_context_id()
108            .endpoint_context_id
109            .sender
110            .clone()
111    }
112
113    pub fn get_section_context_id(&self) -> &IncomingEndpointContextSectionId {
114        match self {
115            IncomingSection::SingleBlock((_, section_context_id))
116            | IncomingSection::BlockStream((_, section_context_id)) => {
117                section_context_id
118            }
119        }
120    }
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
124pub struct IncomingEndpointContextId {
125    pub sender: Endpoint,
126    pub context_id: IncomingContextId,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq, Hash)]
130pub struct IncomingEndpointContextSectionId {
131    pub endpoint_context_id: IncomingEndpointContextId,
132    pub section_index: IncomingSectionIndex,
133}
134
135impl IncomingEndpointContextSectionId {
136    pub fn new(
137        endpoint_context_id: IncomingEndpointContextId,
138        section_index: IncomingSectionIndex,
139    ) -> Self {
140        IncomingEndpointContextSectionId {
141            endpoint_context_id,
142            section_index,
143        }
144    }
145}
146
147/// An identifier that defines a globally unique block
148#[derive(Debug, Clone, PartialEq, Eq, Hash)]
149pub struct BlockId {
150    pub endpoint_context_id: IncomingEndpointContextId,
151    pub timestamp: u64,
152    pub current_section_index: IncomingSectionIndex,
153    pub current_block_number: IncomingBlockNumber,
154}
155
156impl DXBBlock {
157    pub fn new(
158        routing_header: RoutingHeader,
159        block_header: BlockHeader,
160        encrypted_header: EncryptedHeader,
161        body: Vec<u8>,
162    ) -> DXBBlock {
163        DXBBlock {
164            routing_header,
165            block_header,
166            signature: None,
167            encrypted_header,
168            body,
169            raw_bytes: None,
170        }
171    }
172
173    pub fn to_bytes(&self) -> Result<Vec<u8>, binrw::Error> {
174        let mut writer = Cursor::new(Vec::new());
175        self.routing_header.write(&mut writer)?;
176        self.signature.write(&mut writer)?;
177        self.block_header.write(&mut writer)?;
178        self.encrypted_header.write(&mut writer)?;
179        let mut bytes = writer.into_inner();
180        bytes.extend_from_slice(&self.body);
181        Ok(DXBBlock::adjust_block_length(bytes))
182    }
183    pub fn recalculate_struct(&mut self) -> &mut Self {
184        let bytes = self.to_bytes().unwrap();
185        let size = bytes.len() as u16;
186        self.routing_header.block_size = size;
187        self
188    }
189
190    fn adjust_block_length(mut bytes: Vec<u8>) -> Vec<u8> {
191        let size = bytes.len() as u32;
192        write_u16(&mut bytes, &mut SIZE_BYTE_POSITION.clone(), size as u16);
193        bytes
194    }
195
196    pub fn has_dxb_magic_number(dxb: &[u8]) -> bool {
197        dxb.len() >= 2 && dxb[0] == 0x01 && dxb[1] == 0x64
198    }
199
200    pub fn extract_dxb_block_length(
201        dxb: &[u8],
202    ) -> Result<u16, HeaderParsingError> {
203        if dxb.len() < SIZE_BYTE_POSITION + SIZE_BYTES {
204            return Err(HeaderParsingError::InsufficientLength);
205        }
206        let routing_header = RoutingHeader::read(&mut Cursor::new(dxb))
207            .map_err(|e| {
208                error!("Failed to read routing header: {e:?}");
209                HeaderParsingError::InvalidBlock
210            })?;
211        Ok(routing_header.block_size)
212    }
213
214    pub async fn from_bytes(bytes: &[u8]) -> Result<DXBBlock, binrw::Error> {
215        let mut reader = Cursor::new(bytes);
216        let routing_header = RoutingHeader::read(&mut reader)?;
217
218        let signature = match routing_header.flags.signature_type() {
219            SignatureType::Encrypted => {
220                // extract next 255 bytes as the signature
221                let mut signature = Vec::from([0u8; 108]);
222                reader.read_exact(&mut signature)?;
223
224                // TODO #111: decrypt the signature
225                Some(signature)
226            }
227            SignatureType::Unencrypted => {
228                // extract next 255 bytes as the signature
229                let mut signature = Vec::from([0u8; 108]);
230                reader.read_exact(&mut signature)?;
231                Some(signature)
232            }
233            SignatureType::None => None,
234        };
235
236        // TODO #112: validate the signature
237        let decrypted_bytes = match routing_header.flags.encryption_type() {
238            EncryptionType::Encrypted => {
239                // TODO #113: decrypt the body
240                let mut decrypted_bytes = Vec::from([0u8; 255]);
241                reader.read_exact(&mut decrypted_bytes)?;
242                decrypted_bytes
243            }
244            EncryptionType::None => {
245                let mut bytes = Vec::new();
246                reader.read_to_end(&mut bytes)?;
247                bytes
248            }
249        };
250
251        let mut reader = Cursor::new(decrypted_bytes);
252        let block_header = BlockHeader::read(&mut reader)?;
253        let encrypted_header = EncryptedHeader::read(&mut reader)?;
254
255        let mut body = Vec::new();
256        reader.read_to_end(&mut body)?;
257
258        cfg_if::cfg_if! {
259            if #[cfg(feature = "native_crypto")] {
260                /*
261                use crate::runtime::global_context::{GlobalContext, set_global_context, get_global_context};
262                set_global_context(GlobalContext::native());
263                let crypto = get_global_context().crypto;
264                */
265                use crate::crypto::crypto_native::CryptoNative;
266                use crate::crypto::crypto::CryptoTrait;
267                let crypto = CryptoNative {};
268
269                match routing_header.flags.signature_type() {
270                    SignatureType::Encrypted => {
271                        let raw_sign = signature
272                            .as_ref()
273                            .ok_or(binrw::Error::Custom { pos: 0u64, err: Box::new(HeaderParsingError::InvalidBlock) })?;
274                        let (enc_sign, pub_key) = raw_sign.split_at(64);
275                        let hash = crypto.hkdf_sha256(pub_key, &[0u8; 16])
276                            .await
277                            .map_err(|e| binrw::Error::Custom { pos: 0u64, err: Box::new(e) })?;
278                        let signature = crypto
279                            .aes_ctr_decrypt(&hash, &[0u8; 16], enc_sign)
280                            .await
281                            .map_err(|e| binrw::Error::Custom { pos: 0u64, err: Box::new(e) })?;
282
283                        let raw_signed = [
284                            pub_key,
285                            &body.clone()
286                            ]
287                            .concat();
288                        let hashed_signed = crypto
289                            .hash_sha256(&raw_signed)
290                            .await
291                            .map_err(|e| binrw::Error::Custom { pos: 0u64, err: Box::new(e) })?;
292
293                        let ver = crypto
294                            .ver_ed25519(pub_key, &signature, &hashed_signed)
295                            .await
296                            .map_err(|e| binrw::Error::Custom { pos: 0u64, err: Box::new(e) })?;
297
298                        if !ver {
299                            return Err(
300                                binrw::Error::Custom {
301                                    pos: 0u64,
302                                    err: Box::new("Something is off with the signature.")
303                                });
304                        }
305                    },
306                    SignatureType::Unencrypted => {
307                        let raw_sign = signature
308                            .as_ref()
309                            .ok_or(binrw::Error::Custom { pos: 0u64, err: Box::new(HeaderParsingError::InvalidBlock) })?;
310                        let (signature, pub_key) = raw_sign.split_at(64);
311
312                        let raw_signed = [
313                            pub_key,
314                            &body.clone()
315                            ]
316                            .concat();
317                        let hashed_signed = crypto
318                            .hash_sha256(&raw_signed)
319                            .await
320                            .map_err(|e| binrw::Error::Custom { pos: 0u64, err: Box::new(e) })?;
321
322                        let ver = crypto
323                            .ver_ed25519(pub_key, signature, &hashed_signed)
324                            .await
325                            .map_err(|e| binrw::Error::Custom { pos: 0u64, err: Box::new(e) })?;
326
327                        if !ver {
328                            return Err(
329                                binrw::Error::Custom {
330                                    pos: 0u64,
331                                    err: Box::new("Something is off with the signature.")
332                                });
333                        }
334                    },
335                    SignatureType::None => {
336                        /* Ignored */
337                    },
338                };
339            }
340            else {/* Add other crypto implementations */}
341        }
342
343        Ok(DXBBlock {
344            routing_header,
345            block_header,
346            signature,
347            encrypted_header,
348            body,
349            raw_bytes: Some(bytes.to_vec()),
350        })
351    }
352
353    /// Get a list of all receiver endpoints from the routing header.
354    pub fn receiver_endpoints(&self) -> Vec<Endpoint> {
355        match self.routing_header.receivers() {
356            Receivers::Endpoints(endpoints) => endpoints,
357            Receivers::EndpointsWithKeys(endpoints_with_keys) => {
358                endpoints_with_keys.into_iter().map(|(e, _)| e).collect()
359            }
360            Receivers::PointerId(_) => unimplemented!(),
361            _ => Vec::new(),
362        }
363    }
364    pub fn receivers(&self) -> Receivers {
365        self.routing_header.receivers()
366    }
367
368    /// Update the receivers list in the routing header.
369    pub fn set_receivers<T>(&mut self, endpoints: T)
370    where
371        T: Into<Receivers>,
372    {
373        self.routing_header.set_receivers(endpoints.into());
374    }
375
376    pub fn set_bounce_back(&mut self, bounce_back: bool) {
377        self.routing_header.flags.set_is_bounce_back(bounce_back);
378    }
379
380    pub fn is_bounce_back(&self) -> bool {
381        self.routing_header.flags.is_bounce_back()
382    }
383
384    pub fn get_sender(&self) -> &Endpoint {
385        &self.routing_header.sender
386    }
387
388    pub fn get_endpoint_context_id(&self) -> IncomingEndpointContextId {
389        IncomingEndpointContextId {
390            sender: self.routing_header.sender.clone(),
391            context_id: self.block_header.context_id,
392        }
393    }
394
395    pub fn get_block_id(&self) -> BlockId {
396        BlockId {
397            endpoint_context_id: self.get_endpoint_context_id(),
398            timestamp: self
399                .block_header
400                .flags_and_timestamp
401                .creation_timestamp(),
402            current_section_index: self.block_header.section_index,
403            current_block_number: self.block_header.block_number,
404        }
405    }
406
407    /// Returns true if the block has a fixed number of receivers
408    /// without wildcard instances, and no @@any receiver.
409    pub fn has_exact_receiver_count(&self) -> bool {
410        !self
411            .receiver_endpoints()
412            .iter()
413            .any(|e| e.is_broadcast() || e.is_any())
414    }
415
416    pub fn clone_with_new_receivers<T>(&self, new_receivers: T) -> DXBBlock
417    where
418        T: Into<Receivers>,
419    {
420        let mut new_block = self.clone();
421        new_block.set_receivers(new_receivers.into());
422        new_block
423    }
424}
425
426impl Display for DXBBlock {
427    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
428        let block_type = self.block_header.flags_and_timestamp.block_type();
429        let sender = &self.routing_header.sender;
430        let receivers = self.receivers();
431        core::write!(f, "[{block_type}] {sender} -> {receivers}")?;
432
433        Ok(())
434    }
435}
436
437#[cfg(test)]
438mod tests {
439    use core::str::FromStr;
440
441    use crate::{
442        crypto::crypto::CryptoTrait,
443        crypto::crypto_native::CryptoNative,
444        global::{
445            dxb_block::DXBBlock,
446            protocol_structures::{
447                encrypted_header::{self, EncryptedHeader},
448                routing_header::{RoutingHeader, SignatureType},
449            },
450        },
451        values::core_values::endpoint::Endpoint,
452    };
453
454    #[tokio::test]
455    pub async fn test_recalculate() {
456        let mut routing_header = RoutingHeader::default()
457            .with_sender(Endpoint::from_str("@test").unwrap())
458            .to_owned();
459        routing_header.set_size(420);
460        let mut block = DXBBlock {
461            body: vec![0x01, 0x02, 0x03],
462            encrypted_header: EncryptedHeader {
463                flags: encrypted_header::Flags::new()
464                    .with_user_agent(encrypted_header::UserAgent::Unused11),
465                ..Default::default()
466            },
467            routing_header,
468            ..DXBBlock::default()
469        };
470
471        {
472            // invalid block size
473            let block_bytes = block.to_bytes().unwrap();
474            let block2: DXBBlock =
475                DXBBlock::from_bytes(&block_bytes).await.unwrap();
476            assert_ne!(block, block2);
477        }
478
479        {
480            // valid block size
481            block.recalculate_struct();
482            let block_bytes = block.to_bytes().unwrap();
483            let block3: DXBBlock =
484                DXBBlock::from_bytes(&block_bytes).await.unwrap();
485            assert_eq!(block, block3);
486        }
487    }
488
489    #[tokio::test]
490    pub async fn signature_to_and_from_bytes() {
491        let crypto = CryptoNative {};
492        // setup block
493        let mut routing_header = RoutingHeader::default()
494            .with_sender(Endpoint::from_str("@test").unwrap())
495            .to_owned();
496        routing_header.set_size(157);
497        let mut block = DXBBlock {
498            body: vec![0x01, 0x02, 0x03],
499            encrypted_header: EncryptedHeader {
500                ..Default::default()
501            },
502            routing_header,
503            ..DXBBlock::default()
504        };
505
506        // setup correct signature
507        block
508            .routing_header
509            .flags
510            .set_signature_type(SignatureType::Unencrypted);
511
512        let (pub_key, pri_key) = crypto.gen_ed25519().await.unwrap();
513        let raw_signed = [pub_key.clone(), block.body.clone()].concat();
514        let hashed_signed = crypto.hash_sha256(&raw_signed).await.unwrap();
515
516        let signature =
517            crypto.sig_ed25519(&pri_key, &hashed_signed).await.unwrap();
518        // 64 + 44 = 108
519        block.signature = Some([signature.to_vec(), pub_key.clone()].concat());
520
521        let block_bytes = block.to_bytes().unwrap();
522        let block2: DXBBlock =
523            DXBBlock::from_bytes(&block_bytes).await.unwrap();
524        assert_eq!(block, block2);
525        assert_eq!(block.signature, block2.signature);
526
527        // setup faulty signature
528        let mut other_sig = signature.clone();
529        if other_sig[42] != 42u8 {
530            other_sig[42] = 42u8;
531        } else {
532            other_sig[42] = 43u8;
533        }
534        block.signature = Some([other_sig.to_vec(), pub_key].concat());
535        let block_bytes2 = block.to_bytes().unwrap();
536        let block3 = DXBBlock::from_bytes(&block_bytes2).await;
537        assert!(block3.is_err());
538        assert_eq!(
539            block3.unwrap_err().to_string(),
540            "Something is off with the signature. at 0x0"
541        )
542    }
543}