1use super::protocol_structures::{
2 block_header::BlockHeader,
3 encrypted_header::EncryptedHeader,
4 routing_header::{EncryptionType, RoutingHeader, SignatureType},
5};
6use crate::{
7 channel::mpsc::UnboundedReceiver,
8 crypto::CryptoImpl,
9 global::protocol_structures::{
10 block_header::BlockType, routing_header::Receivers,
11 },
12 utils::buffers::write_u16,
13 values::core_values::endpoint::Endpoint,
14};
15use binrw::{
16 BinRead, BinWrite,
17 io::{Cursor, Read},
18};
19use core::{fmt::Display, result::Result, unimplemented};
20use datex_crypto_facade::crypto::Crypto;
21use strum::Display;
22use thiserror::Error;
23
24use crate::{prelude::*, utils::maybe_async::MaybeAsync};
25
26#[derive(Debug, Display, Error)]
27pub enum HeaderParsingError {
28 InsufficientLength,
29 InvalidMagicNumber,
30}
31
32#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Default)]
37pub struct DXBBlock {
38 pub routing_header: RoutingHeader,
39 pub block_header: BlockHeader,
40 pub signature: Option<Vec<u8>>,
41 pub encrypted_header: EncryptedHeader,
42 pub body: Vec<u8>,
43
44 #[serde(skip)]
45 pub raw_bytes: Option<Vec<u8>>,
46}
47
48impl PartialEq for DXBBlock {
49 fn eq(&self, other: &Self) -> bool {
50 self.routing_header == other.routing_header
51 && self.block_header == other.block_header
52 && self.encrypted_header == other.encrypted_header
53 && self.body == other.body
54 }
55}
56
57const SIZE_BYTE_POSITION: usize = 3; const SIZE_BYTES: usize = 2;
59
60pub type IncomingContextId = u32;
61pub type IncomingSectionIndex = u16;
62pub type IncomingBlockNumber = u16;
63pub type OutgoingContextId = u32;
64pub type OutgoingSectionIndex = u16;
65pub type OutgoingBlockNumber = u16;
66
67#[allow(clippy::large_enum_variant)]
68#[derive(Debug)]
69pub enum IncomingSection {
70 SingleBlock((Option<DXBBlock>, IncomingEndpointContextSectionId)),
72 BlockStream(
75 (
76 Option<UnboundedReceiver<DXBBlock>>,
77 IncomingEndpointContextSectionId,
78 ),
79 ),
80}
81
82impl IncomingSection {
83 pub async fn next(&mut self) -> Option<DXBBlock> {
84 match self {
85 IncomingSection::SingleBlock((block, _)) => block.take(),
86 IncomingSection::BlockStream((blocks, _)) => {
87 if let Some(receiver) = blocks {
88 receiver.next().await
89 } else {
90 None }
92 }
93 }
94 }
95
96 pub async fn drain(&mut self) -> Vec<DXBBlock> {
97 let mut blocks = Vec::new();
98 while let Some(block) = self.next().await {
99 blocks.push(block);
100 }
101 blocks
102 }
103}
104
105impl IncomingSection {
106 pub fn get_section_index(&self) -> IncomingSectionIndex {
107 self.get_section_context_id().section_index
108 }
109
110 pub fn get_sender(&self) -> Endpoint {
111 self.get_section_context_id()
112 .endpoint_context_id
113 .sender
114 .clone()
115 }
116
117 pub fn get_section_context_id(&self) -> &IncomingEndpointContextSectionId {
118 match self {
119 IncomingSection::SingleBlock((_, section_context_id))
120 | IncomingSection::BlockStream((_, section_context_id)) => {
121 section_context_id
122 }
123 }
124 }
125}
126
127#[derive(Debug, Clone, PartialEq, Eq, Hash)]
128pub struct IncomingEndpointContextId {
129 pub sender: Endpoint,
130 pub context_id: IncomingContextId,
131}
132
133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134pub struct IncomingEndpointContextSectionId {
135 pub endpoint_context_id: IncomingEndpointContextId,
136 pub section_index: IncomingSectionIndex,
137}
138
139impl IncomingEndpointContextSectionId {
140 pub fn new(
141 endpoint_context_id: IncomingEndpointContextId,
142 section_index: IncomingSectionIndex,
143 ) -> Self {
144 IncomingEndpointContextSectionId {
145 endpoint_context_id,
146 section_index,
147 }
148 }
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, Hash)]
153pub struct BlockId {
154 pub endpoint_context_id: IncomingEndpointContextId,
155 pub timestamp: u64,
156 pub current_section_index: IncomingSectionIndex,
157 pub current_block_number: IncomingBlockNumber,
158}
159
160#[derive(Debug)]
161pub enum DXBBlockParseError {
162 IOError(String),
163 ParseError(binrw::Error),
164 IllegalState,
165}
166
167#[derive(Debug)]
168pub enum SignatureValidationError {
169 MissingSignature,
171 SignatureParseError,
173 InvalidSignature,
175}
176
177impl From<binrw::Error> for DXBBlockParseError {
178 fn from(err: binrw::Error) -> Self {
179 DXBBlockParseError::ParseError(err)
180 }
181}
182
183impl From<String> for DXBBlockParseError {
184 fn from(err: String) -> Self {
185 DXBBlockParseError::IOError(err)
186 }
187}
188
189impl DXBBlock {
190 pub fn new_with_body(body: &[u8]) -> DXBBlock {
191 let mut block = DXBBlock {
192 body: body.to_vec(),
193 ..DXBBlock::default()
194 };
195 block.recalculate_struct();
196 block
197 }
198 pub fn new(
199 routing_header: RoutingHeader,
200 block_header: BlockHeader,
201 encrypted_header: EncryptedHeader,
202 body: Vec<u8>,
203 ) -> DXBBlock {
204 let mut block = DXBBlock {
205 routing_header,
206 block_header,
207 signature: None,
208 encrypted_header,
209 body,
210 raw_bytes: None,
211 };
212 block.recalculate_struct();
213 block
214 }
215
216 pub fn to_bytes(&self) -> Vec<u8> {
218 let mut writer = Cursor::new(Vec::new());
219 self.routing_header.write(&mut writer).unwrap();
220 self.signature.write(&mut writer).unwrap();
221 self.block_header.write(&mut writer).unwrap();
222 self.encrypted_header.write(&mut writer).unwrap();
223 let mut bytes = writer.into_inner();
224 bytes.extend_from_slice(&self.body);
225 DXBBlock::adjust_block_length(bytes)
226 }
227 pub fn recalculate_struct(&mut self) -> &mut Self {
228 let bytes = self.to_bytes();
229 let size = bytes.len() as u16;
230 self.routing_header.block_size = size;
231 self
232 }
233
234 fn adjust_block_length(mut bytes: Vec<u8>) -> Vec<u8> {
235 let size = bytes.len() as u32;
236 write_u16(&mut bytes, &mut SIZE_BYTE_POSITION.clone(), size as u16);
237 bytes
238 }
239
240 pub fn has_dxb_magic_number(dxb: &[u8]) -> bool {
241 dxb.len() >= 2 && dxb[0] == 0x01 && dxb[1] == 0x64
242 }
243
244 pub fn extract_dxb_block_length(
245 dxb: &[u8],
246 ) -> Result<u16, HeaderParsingError> {
247 if dxb.len() < SIZE_BYTE_POSITION + SIZE_BYTES {
248 return Err(HeaderParsingError::InsufficientLength);
249 }
250
251 if !DXBBlock::has_dxb_magic_number(dxb) {
253 return Err(HeaderParsingError::InvalidMagicNumber);
254 }
255
256 let block_size_bytes =
258 &dxb[SIZE_BYTE_POSITION..SIZE_BYTE_POSITION + SIZE_BYTES];
259 Ok(u16::from_le_bytes(block_size_bytes.try_into().unwrap()))
260 }
261
262 pub fn from_bytes(bytes: &[u8]) -> Result<DXBBlock, DXBBlockParseError> {
263 let mut reader = Cursor::new(bytes);
264 let routing_header = RoutingHeader::read(&mut reader)?;
265
266 let signature = match routing_header.flags.signature_type() {
267 SignatureType::Encrypted => {
268 let mut signature = Vec::from([0u8; 108]);
270 reader.read_exact(&mut signature).map_err(|e| {
271 DXBBlockParseError::IOError(format!(
272 "Failed to read encrypted signature: {}",
273 e
274 ))
275 })?;
276
277 Some(signature)
279 }
280 SignatureType::Unencrypted => {
281 let mut signature = Vec::from([0u8; 108]);
283 reader.read_exact(&mut signature).map_err(|e| {
284 DXBBlockParseError::IOError(format!(
285 "Failed to read unencrypted signature: {}",
286 e
287 ))
288 })?;
289 Some(signature)
290 }
291 SignatureType::None => None,
292 };
293
294 let decrypted_bytes = match routing_header.flags.encryption_type() {
295 EncryptionType::Encrypted => {
296 let mut decrypted_bytes = Vec::from([0u8; 255]);
298 reader.read_exact(&mut decrypted_bytes).map_err(|e| {
299 DXBBlockParseError::IOError(format!(
300 "Failed to read encrypted body: {}",
301 e
302 ))
303 })?;
304 decrypted_bytes
305 }
306 EncryptionType::None => {
307 let mut bytes = Vec::new();
308 reader.read_to_end(&mut bytes).map_err(|e| e.to_string())?;
309 bytes
310 }
311 };
312
313 let mut reader = Cursor::new(decrypted_bytes);
314 let block_header = BlockHeader::read(&mut reader)?;
315
316 if signature.is_some()
318 && matches!(
319 block_header.flags_and_timestamp.block_type(),
320 BlockType::Trace | BlockType::TraceBack
321 )
322 {
323 return Err(DXBBlockParseError::IllegalState);
324 }
325
326 let encrypted_header = EncryptedHeader::read(&mut reader)?;
327
328 let mut body = Vec::new();
329 reader.read_to_end(&mut body).map_err(|e| e.to_string())?;
330
331 let block = DXBBlock {
332 routing_header,
333 block_header,
334 signature,
335 encrypted_header,
336 body,
337 raw_bytes: Some(bytes.to_vec()),
338 };
339
340 Ok(block)
341 }
342
343 pub fn validate_signature(
346 self,
347 ) -> MaybeAsync<
348 Result<DXBBlock, SignatureValidationError>,
349 impl Future<Output = Result<DXBBlock, SignatureValidationError>>,
350 > {
351 #[cfg(all(
353 not(feature = "crypto_enabled"),
354 feature = "allow_unsigned_blocks"
355 ))]
356 {
357 log::info!(
358 "Crypto and signature validation are disabled, allowing block without signature validation"
359 );
360 return MaybeAsync::Sync(Ok(self));
361 }
362
363 match self.routing_header.flags.signature_type() {
365 signature_type @ (SignatureType::Encrypted
369 | SignatureType::Unencrypted) => MaybeAsync::Async(async move {
370 let is_valid = match signature_type {
371 SignatureType::Encrypted => {
372 let raw_sign = self.signature.as_ref().ok_or(
373 SignatureValidationError::MissingSignature,
374 )?;
375 let (enc_sign, pub_key) = raw_sign.split_at(64);
376 let hash = CryptoImpl::hkdf_sha256(pub_key, &[0u8; 16])
377 .await
378 .map_err(|_| {
379 SignatureValidationError::SignatureParseError
380 })?;
381 let signature = CryptoImpl::aes_ctr_decrypt(
382 &hash, &[0u8; 16], enc_sign,
383 )
384 .await
385 .map_err(|_| {
386 SignatureValidationError::SignatureParseError
387 })?;
388
389 let raw_signed = [pub_key, &self.body.clone()].concat();
390 let hashed_signed = CryptoImpl::hash_sha256(
391 &raw_signed,
392 )
393 .await
394 .map_err(|_| {
395 SignatureValidationError::SignatureParseError
396 })?;
397
398 CryptoImpl::ver_ed25519(
399 pub_key,
400 &signature,
401 &hashed_signed,
402 )
403 .await
404 .map_err(|_| {
405 SignatureValidationError::InvalidSignature
406 })?
407 }
408
409 SignatureType::Unencrypted => {
410 let raw_sign = self.signature.as_ref().ok_or(
411 SignatureValidationError::MissingSignature,
412 )?;
413 let (signature, pub_key) = raw_sign.split_at(64);
414
415 let raw_signed = [pub_key, &self.body.clone()].concat();
416 let hashed_signed = CryptoImpl::hash_sha256(
417 &raw_signed,
418 )
419 .await
420 .map_err(|_| {
421 SignatureValidationError::SignatureParseError
422 })?;
423
424 CryptoImpl::ver_ed25519(
425 pub_key,
426 signature,
427 &hashed_signed,
428 )
429 .await
430 .map_err(|_| {
431 SignatureValidationError::InvalidSignature
432 })?
433 }
434 _ => unreachable!(),
435 };
436
437 match is_valid {
438 true => Ok(self),
439 false => Err(SignatureValidationError::InvalidSignature),
440 }
441 }),
442
443 SignatureType::None => {
444 let is_valid = {
445 cfg_if::cfg_if! {
446 if #[cfg(feature = "allow_unsigned_blocks")] {
448 log::info!("Signature validation is disabled, allowing block without signature validation");
449 true
450 }
451 else {
454 match self.block_type() {
455 BlockType::Trace | BlockType::TraceBack => true,
456 _ => return MaybeAsync::Sync(Err(SignatureValidationError::MissingSignature))
458 }
459 }
460 }
461 };
462
463 MaybeAsync::Sync(match is_valid {
464 true => Ok(self),
465 false => Err(SignatureValidationError::InvalidSignature),
466 })
467 }
468 }
469 }
470
471 pub fn receiver_endpoints(&self) -> Vec<Endpoint> {
473 match self.routing_header.receivers() {
474 Receivers::Endpoints(endpoints) => endpoints,
475 Receivers::EndpointsWithKeys(endpoints_with_keys) => {
476 endpoints_with_keys.into_iter().map(|(e, _)| e).collect()
477 }
478 Receivers::PointerId(_) => unimplemented!(),
479 _ => Vec::new(),
480 }
481 }
482 pub fn receivers(&self) -> Receivers {
483 self.routing_header.receivers()
484 }
485
486 pub fn set_receivers<T>(&mut self, endpoints: T)
488 where
489 T: Into<Receivers>,
490 {
491 self.routing_header.set_receivers(endpoints.into());
492 }
493
494 pub fn set_bounce_back(&mut self, bounce_back: bool) {
495 self.routing_header.flags.set_is_bounce_back(bounce_back);
496 }
497
498 pub fn is_bounce_back(&self) -> bool {
499 self.routing_header.flags.is_bounce_back()
500 }
501
502 pub fn sender(&self) -> &Endpoint {
503 &self.routing_header.sender
504 }
505
506 pub fn block_type(&self) -> BlockType {
507 self.block_header.flags_and_timestamp.block_type()
508 }
509
510 pub fn get_endpoint_context_id(&self) -> IncomingEndpointContextId {
511 IncomingEndpointContextId {
512 sender: self.routing_header.sender.clone(),
513 context_id: self.block_header.context_id,
514 }
515 }
516
517 pub fn get_block_id(&self) -> BlockId {
518 BlockId {
519 endpoint_context_id: self.get_endpoint_context_id(),
520 timestamp: self
521 .block_header
522 .flags_and_timestamp
523 .creation_timestamp(),
524 current_section_index: self.block_header.section_index,
525 current_block_number: self.block_header.block_number,
526 }
527 }
528
529 pub fn has_exact_receiver_count(&self) -> bool {
532 !self
533 .receiver_endpoints()
534 .iter()
535 .any(|e| e.is_broadcast() || e.is_any())
536 }
537
538 pub fn clone_with_new_receivers<T>(&self, new_receivers: T) -> DXBBlock
539 where
540 T: Into<Receivers>,
541 {
542 let mut new_block = self.clone();
543 new_block.set_receivers(new_receivers.into());
544 new_block
545 }
546}
547
548impl Display for DXBBlock {
549 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
550 let block_type = self.block_header.flags_and_timestamp.block_type();
551 let sender = &self.routing_header.sender;
552 let receivers = self.receivers();
553 core::write!(f, "[{block_type}] {sender} -> {receivers}")?;
554
555 Ok(())
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use core::str::FromStr;
562
563 use crate::{
564 crypto::CryptoImpl,
565 global::{
566 dxb_block::{
567 DXBBlock, DXBBlockParseError, SignatureValidationError,
568 },
569 protocol_structures::{
570 block_header::BlockType,
571 encrypted_header::{self, EncryptedHeader},
572 routing_header::{RoutingHeader, SignatureType},
573 },
574 },
575 prelude::*,
576 values::core_values::endpoint::Endpoint,
577 };
578 use core::assert_matches;
579 use datex_crypto_facade::crypto::Crypto;
580
581 #[test]
582 pub fn test_recalculate() {
583 let mut routing_header = RoutingHeader::default()
584 .with_sender(Endpoint::from_str("@test").unwrap())
585 .to_owned();
586 routing_header.flags.set_signature_type(SignatureType::None);
587 routing_header.set_size(420);
588 let mut block = DXBBlock {
589 body: vec![0x01, 0x02, 0x03],
590 encrypted_header: EncryptedHeader {
591 flags: encrypted_header::Flags::default()
592 .with_user_agent(encrypted_header::UserAgent::Unused11),
593 ..Default::default()
594 },
595 routing_header,
596 ..DXBBlock::default()
597 };
598
599 {
600 let block_bytes = block.to_bytes();
602 let block2: DXBBlock = DXBBlock::from_bytes(&block_bytes).unwrap();
603 assert_ne!(block, block2);
604 }
605
606 {
607 block.recalculate_struct();
609 let block_bytes = block.to_bytes();
610 let block3: DXBBlock = DXBBlock::from_bytes(&block_bytes).unwrap();
611 assert_eq!(block, block3);
612 }
613 }
614
615 #[tokio::test]
616 #[cfg(feature = "std")]
617 pub async fn signature_to_and_from_bytes() {
618 let mut routing_header = RoutingHeader::default()
620 .with_sender(Endpoint::from_str("@test").unwrap())
621 .to_owned();
622 routing_header.set_size(157);
623 let mut block = DXBBlock {
624 body: vec![0x01, 0x02, 0x03],
625 encrypted_header: EncryptedHeader {
626 ..Default::default()
627 },
628 routing_header,
629 ..DXBBlock::default()
630 };
631
632 block
634 .routing_header
635 .flags
636 .set_signature_type(SignatureType::Unencrypted);
637
638 let (pub_key, pri_key) = CryptoImpl::gen_ed25519().await.unwrap();
639 let raw_signed = [pub_key.clone(), block.body.clone()].concat();
640 let hashed_signed = CryptoImpl::hash_sha256(&raw_signed).await.unwrap();
641
642 let signature = CryptoImpl::sig_ed25519(&pri_key, &hashed_signed)
643 .await
644 .unwrap();
645 block.signature = Some([signature.to_vec(), pub_key.clone()].concat());
647
648 let block_bytes = block.to_bytes();
649 let block2: DXBBlock = DXBBlock::from_bytes(&block_bytes).unwrap();
650 assert_eq!(block, block2);
651 assert_eq!(block.signature, block2.signature);
652
653 let mut other_sig = signature;
655 if other_sig[42] != 42u8 {
656 other_sig[42] = 42u8;
657 } else {
658 other_sig[42] = 43u8;
659 }
660 block.signature = Some([other_sig.to_vec(), pub_key].concat());
661 let block_bytes2 = block.to_bytes();
662 let signature_validation = DXBBlock::from_bytes(&block_bytes2)
663 .unwrap()
664 .validate_signature()
665 .into_future()
666 .await;
667 assert!(signature_validation.is_err());
668 assert_matches!(
669 signature_validation.unwrap_err(),
670 SignatureValidationError::InvalidSignature
671 )
672 }
673
674 #[test]
675 fn illegal_signed_trace_block() {
676 let mut block = DXBBlock::new_with_body(&[0x01, 0x02, 0x03]);
677 block
678 .block_header
679 .flags_and_timestamp
680 .set_block_type(BlockType::Trace);
681 block
682 .routing_header
683 .flags
684 .set_signature_type(SignatureType::Unencrypted);
685 block.signature = Some(vec![0u8; 108]);
686
687 let block_bytes = block.to_bytes();
688 let parse_result = DXBBlock::from_bytes(&block_bytes);
689 assert!(parse_result.is_err());
690 assert_matches!(
691 parse_result.unwrap_err(),
692 DXBBlockParseError::IllegalState
693 );
694 }
695}