Skip to main content

datex_core/global/
dxb_block.rs

1use super::protocol_structures::{
2    block_header::BlockHeader,
3    encrypted_header::EncryptedHeader,
4    routing_header::{EncryptionType, RoutingHeader, SignatureType},
5};
6use crate::{
7    channel::mpsc::UnboundedReceiver,
8    crypto::CryptoImpl,
9    global::protocol_structures::{
10        block_header::BlockType, routing_header::Receivers,
11    },
12    utils::buffers::write_u16,
13    values::core_values::endpoint::Endpoint,
14};
15use binrw::{
16    BinRead, BinWrite,
17    io::{Cursor, Read},
18};
19use core::{fmt::Display, result::Result, unimplemented};
20use datex_crypto_facade::crypto::Crypto;
21use strum::Display;
22use thiserror::Error;
23
24use crate::{prelude::*, utils::maybe_async::MaybeAsync};
25
26#[derive(Debug, Display, Error)]
27pub enum HeaderParsingError {
28    InsufficientLength,
29    InvalidMagicNumber,
30}
31
32// TODO #110: RawDXBBlock that is received in com_hub, only containing RoutingHeader, BlockHeader and raw bytes
33
34// TODO #429 @Norbert
35// Add optional raw signature, and encrypted part
36#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Default)]
37pub struct DXBBlock {
38    pub routing_header: RoutingHeader,
39    pub block_header: BlockHeader,
40    pub signature: Option<Vec<u8>>,
41    pub encrypted_header: EncryptedHeader,
42    pub body: Vec<u8>,
43
44    #[serde(skip)]
45    pub raw_bytes: Option<Vec<u8>>,
46}
47
48impl PartialEq for DXBBlock {
49    fn eq(&self, other: &Self) -> bool {
50        self.routing_header == other.routing_header
51            && self.block_header == other.block_header
52            && self.encrypted_header == other.encrypted_header
53            && self.body == other.body
54    }
55}
56
57const SIZE_BYTE_POSITION: usize = 3; // magic number (2 bytes) + version (1 byte)
58const SIZE_BYTES: usize = 2;
59
60pub type IncomingContextId = u32;
61pub type IncomingSectionIndex = u16;
62pub type IncomingBlockNumber = u16;
63pub type OutgoingContextId = u32;
64pub type OutgoingSectionIndex = u16;
65pub type OutgoingBlockNumber = u16;
66
67#[allow(clippy::large_enum_variant)]
68#[derive(Debug)]
69pub enum IncomingSection {
70    /// a single block
71    SingleBlock((Option<DXBBlock>, IncomingEndpointContextSectionId)),
72    /// a stream of blocks
73    /// the stream is finished when a block has the end_of_block flag set
74    BlockStream(
75        (
76            Option<UnboundedReceiver<DXBBlock>>,
77            IncomingEndpointContextSectionId,
78        ),
79    ),
80}
81
82impl IncomingSection {
83    pub async fn next(&mut self) -> Option<DXBBlock> {
84        match self {
85            IncomingSection::SingleBlock((block, _)) => block.take(),
86            IncomingSection::BlockStream((blocks, _)) => {
87                if let Some(receiver) = blocks {
88                    receiver.next().await
89                } else {
90                    None // No blocks to receive
91                }
92            }
93        }
94    }
95
96    pub async fn drain(&mut self) -> Vec<DXBBlock> {
97        let mut blocks = Vec::new();
98        while let Some(block) = self.next().await {
99            blocks.push(block);
100        }
101        blocks
102    }
103}
104
105impl IncomingSection {
106    pub fn get_section_index(&self) -> IncomingSectionIndex {
107        self.get_section_context_id().section_index
108    }
109
110    pub fn get_sender(&self) -> Endpoint {
111        self.get_section_context_id()
112            .endpoint_context_id
113            .sender
114            .clone()
115    }
116
117    pub fn get_section_context_id(&self) -> &IncomingEndpointContextSectionId {
118        match self {
119            IncomingSection::SingleBlock((_, section_context_id))
120            | IncomingSection::BlockStream((_, section_context_id)) => {
121                section_context_id
122            }
123        }
124    }
125}
126
127#[derive(Debug, Clone, PartialEq, Eq, Hash)]
128pub struct IncomingEndpointContextId {
129    pub sender: Endpoint,
130    pub context_id: IncomingContextId,
131}
132
133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134pub struct IncomingEndpointContextSectionId {
135    pub endpoint_context_id: IncomingEndpointContextId,
136    pub section_index: IncomingSectionIndex,
137}
138
139impl IncomingEndpointContextSectionId {
140    pub fn new(
141        endpoint_context_id: IncomingEndpointContextId,
142        section_index: IncomingSectionIndex,
143    ) -> Self {
144        IncomingEndpointContextSectionId {
145            endpoint_context_id,
146            section_index,
147        }
148    }
149}
150
151/// An identifier that defines a globally unique block
152#[derive(Debug, Clone, PartialEq, Eq, Hash)]
153pub struct BlockId {
154    pub endpoint_context_id: IncomingEndpointContextId,
155    pub timestamp: u64,
156    pub current_section_index: IncomingSectionIndex,
157    pub current_block_number: IncomingBlockNumber,
158}
159
160#[derive(Debug)]
161pub enum DXBBlockParseError {
162    IOError(String),
163    ParseError(binrw::Error),
164    IllegalState,
165}
166
167#[derive(Debug)]
168pub enum SignatureValidationError {
169    /// The block is expected to have a signature, but no signature was found.
170    MissingSignature,
171    /// The signature could not be parsed correctly (e.g. wrong length).
172    SignatureParseError,
173    /// The signature is invalid (e.g. does not match the expected value).
174    InvalidSignature,
175}
176
177impl From<binrw::Error> for DXBBlockParseError {
178    fn from(err: binrw::Error) -> Self {
179        DXBBlockParseError::ParseError(err)
180    }
181}
182
183impl From<String> for DXBBlockParseError {
184    fn from(err: String) -> Self {
185        DXBBlockParseError::IOError(err)
186    }
187}
188
189impl DXBBlock {
190    pub fn new_with_body(body: &[u8]) -> DXBBlock {
191        let mut block = DXBBlock {
192            body: body.to_vec(),
193            ..DXBBlock::default()
194        };
195        block.recalculate_struct();
196        block
197    }
198    pub fn new(
199        routing_header: RoutingHeader,
200        block_header: BlockHeader,
201        encrypted_header: EncryptedHeader,
202        body: Vec<u8>,
203    ) -> DXBBlock {
204        let mut block = DXBBlock {
205            routing_header,
206            block_header,
207            signature: None,
208            encrypted_header,
209            body,
210            raw_bytes: None,
211        };
212        block.recalculate_struct();
213        block
214    }
215
216    // TODO #743: guarantee that all unwraps are safe and no binrw::Error is possible
217    pub fn to_bytes(&self) -> Vec<u8> {
218        let mut writer = Cursor::new(Vec::new());
219        self.routing_header.write(&mut writer).unwrap();
220        self.signature.write(&mut writer).unwrap();
221        self.block_header.write(&mut writer).unwrap();
222        self.encrypted_header.write(&mut writer).unwrap();
223        let mut bytes = writer.into_inner();
224        bytes.extend_from_slice(&self.body);
225        DXBBlock::adjust_block_length(bytes)
226    }
227    pub fn recalculate_struct(&mut self) -> &mut Self {
228        let bytes = self.to_bytes();
229        let size = bytes.len() as u16;
230        self.routing_header.block_size = size;
231        self
232    }
233
234    fn adjust_block_length(mut bytes: Vec<u8>) -> Vec<u8> {
235        let size = bytes.len() as u32;
236        write_u16(&mut bytes, &mut SIZE_BYTE_POSITION.clone(), size as u16);
237        bytes
238    }
239
240    pub fn has_dxb_magic_number(dxb: &[u8]) -> bool {
241        dxb.len() >= 2 && dxb[0] == 0x01 && dxb[1] == 0x64
242    }
243
244    pub fn extract_dxb_block_length(
245        dxb: &[u8],
246    ) -> Result<u16, HeaderParsingError> {
247        if dxb.len() < SIZE_BYTE_POSITION + SIZE_BYTES {
248            return Err(HeaderParsingError::InsufficientLength);
249        }
250
251        // make sure magic number is correct
252        if !DXBBlock::has_dxb_magic_number(dxb) {
253            return Err(HeaderParsingError::InvalidMagicNumber);
254        }
255
256        // block size is u16 at SIZE_BYTE_POSITION
257        let block_size_bytes =
258            &dxb[SIZE_BYTE_POSITION..SIZE_BYTE_POSITION + SIZE_BYTES];
259        Ok(u16::from_le_bytes(block_size_bytes.try_into().unwrap()))
260    }
261
262    pub fn from_bytes(bytes: &[u8]) -> Result<DXBBlock, DXBBlockParseError> {
263        let mut reader = Cursor::new(bytes);
264        let routing_header = RoutingHeader::read(&mut reader)?;
265
266        let signature = match routing_header.flags.signature_type() {
267            SignatureType::Encrypted => {
268                // extract next 255 bytes as the signature
269                let mut signature = Vec::from([0u8; 108]);
270                reader.read_exact(&mut signature).map_err(|e| {
271                    DXBBlockParseError::IOError(format!(
272                        "Failed to read encrypted signature: {}",
273                        e
274                    ))
275                })?;
276
277                // TODO #111: decrypt the signature
278                Some(signature)
279            }
280            SignatureType::Unencrypted => {
281                // extract next 255 bytes as the signature
282                let mut signature = Vec::from([0u8; 108]);
283                reader.read_exact(&mut signature).map_err(|e| {
284                    DXBBlockParseError::IOError(format!(
285                        "Failed to read unencrypted signature: {}",
286                        e
287                    ))
288                })?;
289                Some(signature)
290            }
291            SignatureType::None => None,
292        };
293
294        let decrypted_bytes = match routing_header.flags.encryption_type() {
295            EncryptionType::Encrypted => {
296                // TODO #113: decrypt the body
297                let mut decrypted_bytes = Vec::from([0u8; 255]);
298                reader.read_exact(&mut decrypted_bytes).map_err(|e| {
299                    DXBBlockParseError::IOError(format!(
300                        "Failed to read encrypted body: {}",
301                        e
302                    ))
303                })?;
304                decrypted_bytes
305            }
306            EncryptionType::None => {
307                let mut bytes = Vec::new();
308                reader.read_to_end(&mut bytes).map_err(|e| e.to_string())?;
309                bytes
310            }
311        };
312
313        let mut reader = Cursor::new(decrypted_bytes);
314        let block_header = BlockHeader::read(&mut reader)?;
315
316        // invalid state: trace / trace back blocks with a signature
317        if signature.is_some()
318            && matches!(
319                block_header.flags_and_timestamp.block_type(),
320                BlockType::Trace | BlockType::TraceBack
321            )
322        {
323            return Err(DXBBlockParseError::IllegalState);
324        }
325
326        let encrypted_header = EncryptedHeader::read(&mut reader)?;
327
328        let mut body = Vec::new();
329        reader.read_to_end(&mut body).map_err(|e| e.to_string())?;
330
331        let block = DXBBlock {
332            routing_header,
333            block_header,
334            signature,
335            encrypted_header,
336            body,
337            raw_bytes: Some(bytes.to_vec()),
338        };
339
340        Ok(block)
341    }
342
343    /// Validates the signature of the block based on the signature type specified in the routing header.
344    /// Returns Ok(self) if the signature is valid, or a SignatureValidationError if the signature is missing, cannot be parsed, or is invalid.
345    pub fn validate_signature(
346        self,
347    ) -> MaybeAsync<
348        Result<DXBBlock, SignatureValidationError>,
349        impl Future<Output = Result<DXBBlock, SignatureValidationError>>,
350    > {
351        // if not crypto_enabled, but allow_unsigned_blocks is set, just return Ok(()) for all blocks, as signature validation is not possible
352        #[cfg(all(
353            not(feature = "crypto_enabled"),
354            feature = "allow_unsigned_blocks"
355        ))]
356        {
357            log::info!(
358                "Crypto and signature validation are disabled, allowing block without signature validation"
359            );
360            return MaybeAsync::Sync(Ok(self));
361        }
362
363        // TODO #179 check for creation time, withdraw if too old (TBD) or in the future
364        match self.routing_header.flags.signature_type() {
365            // TODO #180: verify signature and abort if invalid
366            // Check if signature is following in some later block and add them to
367            // a pool of incoming blocks awaiting some signature
368            signature_type @ (SignatureType::Encrypted
369            | SignatureType::Unencrypted) => MaybeAsync::Async(async move {
370                let is_valid = match signature_type {
371                    SignatureType::Encrypted => {
372                        let raw_sign = self.signature.as_ref().ok_or(
373                            SignatureValidationError::MissingSignature,
374                        )?;
375                        let (enc_sign, pub_key) = raw_sign.split_at(64);
376                        let hash = CryptoImpl::hkdf_sha256(pub_key, &[0u8; 16])
377                            .await
378                            .map_err(|_| {
379                                SignatureValidationError::SignatureParseError
380                            })?;
381                        let signature = CryptoImpl::aes_ctr_decrypt(
382                            &hash, &[0u8; 16], enc_sign,
383                        )
384                        .await
385                        .map_err(|_| {
386                            SignatureValidationError::SignatureParseError
387                        })?;
388
389                        let raw_signed = [pub_key, &self.body.clone()].concat();
390                        let hashed_signed = CryptoImpl::hash_sha256(
391                            &raw_signed,
392                        )
393                        .await
394                        .map_err(|_| {
395                            SignatureValidationError::SignatureParseError
396                        })?;
397
398                        CryptoImpl::ver_ed25519(
399                            pub_key,
400                            &signature,
401                            &hashed_signed,
402                        )
403                        .await
404                        .map_err(|_| {
405                            SignatureValidationError::InvalidSignature
406                        })?
407                    }
408
409                    SignatureType::Unencrypted => {
410                        let raw_sign = self.signature.as_ref().ok_or(
411                            SignatureValidationError::MissingSignature,
412                        )?;
413                        let (signature, pub_key) = raw_sign.split_at(64);
414
415                        let raw_signed = [pub_key, &self.body.clone()].concat();
416                        let hashed_signed = CryptoImpl::hash_sha256(
417                            &raw_signed,
418                        )
419                        .await
420                        .map_err(|_| {
421                            SignatureValidationError::SignatureParseError
422                        })?;
423
424                        CryptoImpl::ver_ed25519(
425                            pub_key,
426                            signature,
427                            &hashed_signed,
428                        )
429                        .await
430                        .map_err(|_| {
431                            SignatureValidationError::InvalidSignature
432                        })?
433                    }
434                    _ => unreachable!(),
435                };
436
437                match is_valid {
438                    true => Ok(self),
439                    false => Err(SignatureValidationError::InvalidSignature),
440                }
441            }),
442
443            SignatureType::None => {
444                let is_valid = {
445                    cfg_if::cfg_if! {
446                        // if unsigned blocks are allowed, return true
447                        if #[cfg(feature = "allow_unsigned_blocks")] {
448                            log::info!("Signature validation is disabled, allowing block without signature validation");
449                            true
450                        }
451                        // otherwise, only allow unsigned Trace and TraceBack blocks,
452                        // as they are used for debugging and should not be used in production with real data
453                        else {
454                            match self.block_type() {
455                                BlockType::Trace | BlockType::TraceBack => true,
456                                // TODO #181 Check if the sender is trusted (endpoint + interface) connection
457                                _ => return MaybeAsync::Sync(Err(SignatureValidationError::MissingSignature))
458                            }
459                        }
460                    }
461                };
462
463                MaybeAsync::Sync(match is_valid {
464                    true => Ok(self),
465                    false => Err(SignatureValidationError::InvalidSignature),
466                })
467            }
468        }
469    }
470
471    /// Get a list of all receiver endpoints from the routing header.
472    pub fn receiver_endpoints(&self) -> Vec<Endpoint> {
473        match self.routing_header.receivers() {
474            Receivers::Endpoints(endpoints) => endpoints,
475            Receivers::EndpointsWithKeys(endpoints_with_keys) => {
476                endpoints_with_keys.into_iter().map(|(e, _)| e).collect()
477            }
478            Receivers::PointerId(_) => unimplemented!(),
479            _ => Vec::new(),
480        }
481    }
482    pub fn receivers(&self) -> Receivers {
483        self.routing_header.receivers()
484    }
485
486    /// Update the receivers list in the routing header.
487    pub fn set_receivers<T>(&mut self, endpoints: T)
488    where
489        T: Into<Receivers>,
490    {
491        self.routing_header.set_receivers(endpoints.into());
492    }
493
494    pub fn set_bounce_back(&mut self, bounce_back: bool) {
495        self.routing_header.flags.set_is_bounce_back(bounce_back);
496    }
497
498    pub fn is_bounce_back(&self) -> bool {
499        self.routing_header.flags.is_bounce_back()
500    }
501
502    pub fn sender(&self) -> &Endpoint {
503        &self.routing_header.sender
504    }
505
506    pub fn block_type(&self) -> BlockType {
507        self.block_header.flags_and_timestamp.block_type()
508    }
509
510    pub fn get_endpoint_context_id(&self) -> IncomingEndpointContextId {
511        IncomingEndpointContextId {
512            sender: self.routing_header.sender.clone(),
513            context_id: self.block_header.context_id,
514        }
515    }
516
517    pub fn get_block_id(&self) -> BlockId {
518        BlockId {
519            endpoint_context_id: self.get_endpoint_context_id(),
520            timestamp: self
521                .block_header
522                .flags_and_timestamp
523                .creation_timestamp(),
524            current_section_index: self.block_header.section_index,
525            current_block_number: self.block_header.block_number,
526        }
527    }
528
529    /// Returns true if the block has a fixed number of receivers
530    /// without wildcard instances, and no @@any receiver.
531    pub fn has_exact_receiver_count(&self) -> bool {
532        !self
533            .receiver_endpoints()
534            .iter()
535            .any(|e| e.is_broadcast() || e.is_any())
536    }
537
538    pub fn clone_with_new_receivers<T>(&self, new_receivers: T) -> DXBBlock
539    where
540        T: Into<Receivers>,
541    {
542        let mut new_block = self.clone();
543        new_block.set_receivers(new_receivers.into());
544        new_block
545    }
546}
547
548impl Display for DXBBlock {
549    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
550        let block_type = self.block_header.flags_and_timestamp.block_type();
551        let sender = &self.routing_header.sender;
552        let receivers = self.receivers();
553        core::write!(f, "[{block_type}] {sender} -> {receivers}")?;
554
555        Ok(())
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use core::str::FromStr;
562
563    use crate::{
564        crypto::CryptoImpl,
565        global::{
566            dxb_block::{
567                DXBBlock, DXBBlockParseError, SignatureValidationError,
568            },
569            protocol_structures::{
570                block_header::BlockType,
571                encrypted_header::{self, EncryptedHeader},
572                routing_header::{RoutingHeader, SignatureType},
573            },
574        },
575        prelude::*,
576        values::core_values::endpoint::Endpoint,
577    };
578    use core::assert_matches;
579    use datex_crypto_facade::crypto::Crypto;
580
581    #[test]
582    pub fn test_recalculate() {
583        let mut routing_header = RoutingHeader::default()
584            .with_sender(Endpoint::from_str("@test").unwrap())
585            .to_owned();
586        routing_header.flags.set_signature_type(SignatureType::None);
587        routing_header.set_size(420);
588        let mut block = DXBBlock {
589            body: vec![0x01, 0x02, 0x03],
590            encrypted_header: EncryptedHeader {
591                flags: encrypted_header::Flags::default()
592                    .with_user_agent(encrypted_header::UserAgent::Unused11),
593                ..Default::default()
594            },
595            routing_header,
596            ..DXBBlock::default()
597        };
598
599        {
600            // invalid block size
601            let block_bytes = block.to_bytes();
602            let block2: DXBBlock = DXBBlock::from_bytes(&block_bytes).unwrap();
603            assert_ne!(block, block2);
604        }
605
606        {
607            // valid block size
608            block.recalculate_struct();
609            let block_bytes = block.to_bytes();
610            let block3: DXBBlock = DXBBlock::from_bytes(&block_bytes).unwrap();
611            assert_eq!(block, block3);
612        }
613    }
614
615    #[tokio::test]
616    #[cfg(feature = "std")]
617    pub async fn signature_to_and_from_bytes() {
618        // setup block
619        let mut routing_header = RoutingHeader::default()
620            .with_sender(Endpoint::from_str("@test").unwrap())
621            .to_owned();
622        routing_header.set_size(157);
623        let mut block = DXBBlock {
624            body: vec![0x01, 0x02, 0x03],
625            encrypted_header: EncryptedHeader {
626                ..Default::default()
627            },
628            routing_header,
629            ..DXBBlock::default()
630        };
631
632        // setup correct signature
633        block
634            .routing_header
635            .flags
636            .set_signature_type(SignatureType::Unencrypted);
637
638        let (pub_key, pri_key) = CryptoImpl::gen_ed25519().await.unwrap();
639        let raw_signed = [pub_key.clone(), block.body.clone()].concat();
640        let hashed_signed = CryptoImpl::hash_sha256(&raw_signed).await.unwrap();
641
642        let signature = CryptoImpl::sig_ed25519(&pri_key, &hashed_signed)
643            .await
644            .unwrap();
645        // 64 + 44 = 108
646        block.signature = Some([signature.to_vec(), pub_key.clone()].concat());
647
648        let block_bytes = block.to_bytes();
649        let block2: DXBBlock = DXBBlock::from_bytes(&block_bytes).unwrap();
650        assert_eq!(block, block2);
651        assert_eq!(block.signature, block2.signature);
652
653        // setup faulty signature
654        let mut other_sig = signature;
655        if other_sig[42] != 42u8 {
656            other_sig[42] = 42u8;
657        } else {
658            other_sig[42] = 43u8;
659        }
660        block.signature = Some([other_sig.to_vec(), pub_key].concat());
661        let block_bytes2 = block.to_bytes();
662        let signature_validation = DXBBlock::from_bytes(&block_bytes2)
663            .unwrap()
664            .validate_signature()
665            .into_future()
666            .await;
667        assert!(signature_validation.is_err());
668        assert_matches!(
669            signature_validation.unwrap_err(),
670            SignatureValidationError::InvalidSignature
671        )
672    }
673
674    #[test]
675    fn illegal_signed_trace_block() {
676        let mut block = DXBBlock::new_with_body(&[0x01, 0x02, 0x03]);
677        block
678            .block_header
679            .flags_and_timestamp
680            .set_block_type(BlockType::Trace);
681        block
682            .routing_header
683            .flags
684            .set_signature_type(SignatureType::Unencrypted);
685        block.signature = Some(vec![0u8; 108]);
686
687        let block_bytes = block.to_bytes();
688        let parse_result = DXBBlock::from_bytes(&block_bytes);
689        assert!(parse_result.is_err());
690        assert_matches!(
691            parse_result.unwrap_err(),
692            DXBBlockParseError::IllegalState
693        );
694    }
695}