1use super::protocol_structures::{
2 block_header::BlockHeader,
3 encrypted_header::EncryptedHeader,
4 routing_header::{EncryptionType, RoutingHeader, SignatureType},
5};
6use crate::global::protocol_structures::routing_header::Receivers;
7use crate::stdlib::vec::Vec;
8use crate::task::UnboundedReceiver;
9use crate::utils::buffers::write_u16;
10use crate::values::core_values::endpoint::Endpoint;
11use binrw::io::{Cursor, Read};
12use binrw::{BinRead, BinWrite};
13use core::fmt::Display;
14use core::prelude::rust_2024::*;
15use core::result::Result;
16use core::unimplemented;
17use log::error;
18use strum::Display;
19use thiserror::Error;
20
21#[derive(Debug, Display, Error)]
22pub enum HeaderParsingError {
23 InvalidBlock,
24 InsufficientLength,
25}
26
27#[cfg_attr(feature = "debug", derive(serde::Serialize, serde::Deserialize))]
32#[derive(Debug, Clone, Default)]
33pub struct DXBBlock {
34 pub routing_header: RoutingHeader,
35 pub block_header: BlockHeader,
36 pub signature: Option<Vec<u8>>,
37 pub encrypted_header: EncryptedHeader,
38 pub body: Vec<u8>,
39
40 #[cfg_attr(feature = "debug", serde(skip))]
41 pub raw_bytes: Option<Vec<u8>>,
42}
43
44impl PartialEq for DXBBlock {
45 fn eq(&self, other: &Self) -> bool {
46 self.routing_header == other.routing_header
47 && self.block_header == other.block_header
48 && self.encrypted_header == other.encrypted_header
49 && self.body == other.body
50 }
51}
52
53const SIZE_BYTE_POSITION: usize = 3; const SIZE_BYTES: usize = 2;
55
56pub type IncomingContextId = u32;
57pub type IncomingSectionIndex = u16;
58pub type IncomingBlockNumber = u16;
59pub type OutgoingContextId = u32;
60pub type OutgoingSectionIndex = u16;
61pub type OutgoingBlockNumber = u16;
62
63#[allow(clippy::large_enum_variant)]
64#[derive(Debug)]
65pub enum IncomingSection {
66 SingleBlock((Option<DXBBlock>, IncomingEndpointContextSectionId)),
68 BlockStream(
71 (
72 Option<UnboundedReceiver<DXBBlock>>,
73 IncomingEndpointContextSectionId,
74 ),
75 ),
76}
77
78impl IncomingSection {
79 pub async fn next(&mut self) -> Option<DXBBlock> {
80 match self {
81 IncomingSection::SingleBlock((block, _)) => block.take(),
82 IncomingSection::BlockStream((blocks, _)) => {
83 if let Some(receiver) = blocks {
84 receiver.next().await
85 } else {
86 None }
88 }
89 }
90 }
91
92 pub async fn drain(&mut self) -> Vec<DXBBlock> {
93 let mut blocks = Vec::new();
94 while let Some(block) = self.next().await {
95 blocks.push(block);
96 }
97 blocks
98 }
99}
100
101impl IncomingSection {
102 pub fn get_section_index(&self) -> IncomingSectionIndex {
103 self.get_section_context_id().section_index
104 }
105
106 pub fn get_sender(&self) -> Endpoint {
107 self.get_section_context_id()
108 .endpoint_context_id
109 .sender
110 .clone()
111 }
112
113 pub fn get_section_context_id(&self) -> &IncomingEndpointContextSectionId {
114 match self {
115 IncomingSection::SingleBlock((_, section_context_id))
116 | IncomingSection::BlockStream((_, section_context_id)) => {
117 section_context_id
118 }
119 }
120 }
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
124pub struct IncomingEndpointContextId {
125 pub sender: Endpoint,
126 pub context_id: IncomingContextId,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq, Hash)]
130pub struct IncomingEndpointContextSectionId {
131 pub endpoint_context_id: IncomingEndpointContextId,
132 pub section_index: IncomingSectionIndex,
133}
134
135impl IncomingEndpointContextSectionId {
136 pub fn new(
137 endpoint_context_id: IncomingEndpointContextId,
138 section_index: IncomingSectionIndex,
139 ) -> Self {
140 IncomingEndpointContextSectionId {
141 endpoint_context_id,
142 section_index,
143 }
144 }
145}
146
147#[derive(Debug, Clone, PartialEq, Eq, Hash)]
149pub struct BlockId {
150 pub endpoint_context_id: IncomingEndpointContextId,
151 pub timestamp: u64,
152 pub current_section_index: IncomingSectionIndex,
153 pub current_block_number: IncomingBlockNumber,
154}
155
156impl DXBBlock {
157 pub fn new(
158 routing_header: RoutingHeader,
159 block_header: BlockHeader,
160 encrypted_header: EncryptedHeader,
161 body: Vec<u8>,
162 ) -> DXBBlock {
163 DXBBlock {
164 routing_header,
165 block_header,
166 signature: None,
167 encrypted_header,
168 body,
169 raw_bytes: None,
170 }
171 }
172
173 pub fn to_bytes(&self) -> Result<Vec<u8>, binrw::Error> {
174 let mut writer = Cursor::new(Vec::new());
175 self.routing_header.write(&mut writer)?;
176 self.signature.write(&mut writer)?;
177 self.block_header.write(&mut writer)?;
178 self.encrypted_header.write(&mut writer)?;
179 let mut bytes = writer.into_inner();
180 bytes.extend_from_slice(&self.body);
181 Ok(DXBBlock::adjust_block_length(bytes))
182 }
183 pub fn recalculate_struct(&mut self) -> &mut Self {
184 let bytes = self.to_bytes().unwrap();
185 let size = bytes.len() as u16;
186 self.routing_header.block_size = size;
187 self
188 }
189
190 fn adjust_block_length(mut bytes: Vec<u8>) -> Vec<u8> {
191 let size = bytes.len() as u32;
192 write_u16(&mut bytes, &mut SIZE_BYTE_POSITION.clone(), size as u16);
193 bytes
194 }
195
196 pub fn has_dxb_magic_number(dxb: &[u8]) -> bool {
197 dxb.len() >= 2 && dxb[0] == 0x01 && dxb[1] == 0x64
198 }
199
200 pub fn extract_dxb_block_length(
201 dxb: &[u8],
202 ) -> Result<u16, HeaderParsingError> {
203 if dxb.len() < SIZE_BYTE_POSITION + SIZE_BYTES {
204 return Err(HeaderParsingError::InsufficientLength);
205 }
206 let routing_header = RoutingHeader::read(&mut Cursor::new(dxb))
207 .map_err(|e| {
208 error!("Failed to read routing header: {e:?}");
209 HeaderParsingError::InvalidBlock
210 })?;
211 Ok(routing_header.block_size)
212 }
213
214 pub async fn from_bytes(bytes: &[u8]) -> Result<DXBBlock, binrw::Error> {
215 let mut reader = Cursor::new(bytes);
216 let routing_header = RoutingHeader::read(&mut reader)?;
217
218 let signature = match routing_header.flags.signature_type() {
219 SignatureType::Encrypted => {
220 let mut signature = Vec::from([0u8; 108]);
222 reader.read_exact(&mut signature)?;
223
224 Some(signature)
226 }
227 SignatureType::Unencrypted => {
228 let mut signature = Vec::from([0u8; 108]);
230 reader.read_exact(&mut signature)?;
231 Some(signature)
232 }
233 SignatureType::None => None,
234 };
235
236 let decrypted_bytes = match routing_header.flags.encryption_type() {
238 EncryptionType::Encrypted => {
239 let mut decrypted_bytes = Vec::from([0u8; 255]);
241 reader.read_exact(&mut decrypted_bytes)?;
242 decrypted_bytes
243 }
244 EncryptionType::None => {
245 let mut bytes = Vec::new();
246 reader.read_to_end(&mut bytes)?;
247 bytes
248 }
249 };
250
251 let mut reader = Cursor::new(decrypted_bytes);
252 let block_header = BlockHeader::read(&mut reader)?;
253 let encrypted_header = EncryptedHeader::read(&mut reader)?;
254
255 let mut body = Vec::new();
256 reader.read_to_end(&mut body)?;
257
258 cfg_if::cfg_if! {
259 if #[cfg(feature = "native_crypto")] {
260 use crate::crypto::crypto_native::CryptoNative;
266 use crate::crypto::crypto::CryptoTrait;
267 let crypto = CryptoNative {};
268
269 match routing_header.flags.signature_type() {
270 SignatureType::Encrypted => {
271 let raw_sign = signature
272 .as_ref()
273 .ok_or(binrw::Error::Custom { pos: 0u64, err: Box::new(HeaderParsingError::InvalidBlock) })?;
274 let (enc_sign, pub_key) = raw_sign.split_at(64);
275 let hash = crypto.hkdf_sha256(pub_key, &[0u8; 16])
276 .await
277 .map_err(|e| binrw::Error::Custom { pos: 0u64, err: Box::new(e) })?;
278 let signature = crypto
279 .aes_ctr_decrypt(&hash, &[0u8; 16], enc_sign)
280 .await
281 .map_err(|e| binrw::Error::Custom { pos: 0u64, err: Box::new(e) })?;
282
283 let raw_signed = [
284 pub_key,
285 &body.clone()
286 ]
287 .concat();
288 let hashed_signed = crypto
289 .hash_sha256(&raw_signed)
290 .await
291 .map_err(|e| binrw::Error::Custom { pos: 0u64, err: Box::new(e) })?;
292
293 let ver = crypto
294 .ver_ed25519(pub_key, &signature, &hashed_signed)
295 .await
296 .map_err(|e| binrw::Error::Custom { pos: 0u64, err: Box::new(e) })?;
297
298 if !ver {
299 return Err(
300 binrw::Error::Custom {
301 pos: 0u64,
302 err: Box::new("Something is off with the signature.")
303 });
304 }
305 },
306 SignatureType::Unencrypted => {
307 let raw_sign = signature
308 .as_ref()
309 .ok_or(binrw::Error::Custom { pos: 0u64, err: Box::new(HeaderParsingError::InvalidBlock) })?;
310 let (signature, pub_key) = raw_sign.split_at(64);
311
312 let raw_signed = [
313 pub_key,
314 &body.clone()
315 ]
316 .concat();
317 let hashed_signed = crypto
318 .hash_sha256(&raw_signed)
319 .await
320 .map_err(|e| binrw::Error::Custom { pos: 0u64, err: Box::new(e) })?;
321
322 let ver = crypto
323 .ver_ed25519(pub_key, signature, &hashed_signed)
324 .await
325 .map_err(|e| binrw::Error::Custom { pos: 0u64, err: Box::new(e) })?;
326
327 if !ver {
328 return Err(
329 binrw::Error::Custom {
330 pos: 0u64,
331 err: Box::new("Something is off with the signature.")
332 });
333 }
334 },
335 SignatureType::None => {
336 },
338 };
339 }
340 else {}
341 }
342
343 Ok(DXBBlock {
344 routing_header,
345 block_header,
346 signature,
347 encrypted_header,
348 body,
349 raw_bytes: Some(bytes.to_vec()),
350 })
351 }
352
353 pub fn receiver_endpoints(&self) -> Vec<Endpoint> {
355 match self.routing_header.receivers() {
356 Receivers::Endpoints(endpoints) => endpoints,
357 Receivers::EndpointsWithKeys(endpoints_with_keys) => {
358 endpoints_with_keys.into_iter().map(|(e, _)| e).collect()
359 }
360 Receivers::PointerId(_) => unimplemented!(),
361 _ => Vec::new(),
362 }
363 }
364 pub fn receivers(&self) -> Receivers {
365 self.routing_header.receivers()
366 }
367
368 pub fn set_receivers<T>(&mut self, endpoints: T)
370 where
371 T: Into<Receivers>,
372 {
373 self.routing_header.set_receivers(endpoints.into());
374 }
375
376 pub fn set_bounce_back(&mut self, bounce_back: bool) {
377 self.routing_header.flags.set_is_bounce_back(bounce_back);
378 }
379
380 pub fn is_bounce_back(&self) -> bool {
381 self.routing_header.flags.is_bounce_back()
382 }
383
384 pub fn get_sender(&self) -> &Endpoint {
385 &self.routing_header.sender
386 }
387
388 pub fn get_endpoint_context_id(&self) -> IncomingEndpointContextId {
389 IncomingEndpointContextId {
390 sender: self.routing_header.sender.clone(),
391 context_id: self.block_header.context_id,
392 }
393 }
394
395 pub fn get_block_id(&self) -> BlockId {
396 BlockId {
397 endpoint_context_id: self.get_endpoint_context_id(),
398 timestamp: self
399 .block_header
400 .flags_and_timestamp
401 .creation_timestamp(),
402 current_section_index: self.block_header.section_index,
403 current_block_number: self.block_header.block_number,
404 }
405 }
406
407 pub fn has_exact_receiver_count(&self) -> bool {
410 !self
411 .receiver_endpoints()
412 .iter()
413 .any(|e| e.is_broadcast() || e.is_any())
414 }
415
416 pub fn clone_with_new_receivers<T>(&self, new_receivers: T) -> DXBBlock
417 where
418 T: Into<Receivers>,
419 {
420 let mut new_block = self.clone();
421 new_block.set_receivers(new_receivers.into());
422 new_block
423 }
424}
425
426impl Display for DXBBlock {
427 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
428 let block_type = self.block_header.flags_and_timestamp.block_type();
429 let sender = &self.routing_header.sender;
430 let receivers = self.receivers();
431 core::write!(f, "[{block_type}] {sender} -> {receivers}")?;
432
433 Ok(())
434 }
435}
436
437#[cfg(test)]
438mod tests {
439 use core::str::FromStr;
440
441 use crate::{
442 crypto::crypto::CryptoTrait,
443 crypto::crypto_native::CryptoNative,
444 global::{
445 dxb_block::DXBBlock,
446 protocol_structures::{
447 encrypted_header::{self, EncryptedHeader},
448 routing_header::{RoutingHeader, SignatureType},
449 },
450 },
451 values::core_values::endpoint::Endpoint,
452 };
453
454 #[tokio::test]
455 pub async fn test_recalculate() {
456 let mut routing_header = RoutingHeader::default()
457 .with_sender(Endpoint::from_str("@test").unwrap())
458 .to_owned();
459 routing_header.set_size(420);
460 let mut block = DXBBlock {
461 body: vec![0x01, 0x02, 0x03],
462 encrypted_header: EncryptedHeader {
463 flags: encrypted_header::Flags::new()
464 .with_user_agent(encrypted_header::UserAgent::Unused11),
465 ..Default::default()
466 },
467 routing_header,
468 ..DXBBlock::default()
469 };
470
471 {
472 let block_bytes = block.to_bytes().unwrap();
474 let block2: DXBBlock =
475 DXBBlock::from_bytes(&block_bytes).await.unwrap();
476 assert_ne!(block, block2);
477 }
478
479 {
480 block.recalculate_struct();
482 let block_bytes = block.to_bytes().unwrap();
483 let block3: DXBBlock =
484 DXBBlock::from_bytes(&block_bytes).await.unwrap();
485 assert_eq!(block, block3);
486 }
487 }
488
489 #[tokio::test]
490 pub async fn signature_to_and_from_bytes() {
491 let crypto = CryptoNative {};
492 let mut routing_header = RoutingHeader::default()
494 .with_sender(Endpoint::from_str("@test").unwrap())
495 .to_owned();
496 routing_header.set_size(157);
497 let mut block = DXBBlock {
498 body: vec![0x01, 0x02, 0x03],
499 encrypted_header: EncryptedHeader {
500 ..Default::default()
501 },
502 routing_header,
503 ..DXBBlock::default()
504 };
505
506 block
508 .routing_header
509 .flags
510 .set_signature_type(SignatureType::Unencrypted);
511
512 let (pub_key, pri_key) = crypto.gen_ed25519().await.unwrap();
513 let raw_signed = [pub_key.clone(), block.body.clone()].concat();
514 let hashed_signed = crypto.hash_sha256(&raw_signed).await.unwrap();
515
516 let signature =
517 crypto.sig_ed25519(&pri_key, &hashed_signed).await.unwrap();
518 block.signature = Some([signature.to_vec(), pub_key.clone()].concat());
520
521 let block_bytes = block.to_bytes().unwrap();
522 let block2: DXBBlock =
523 DXBBlock::from_bytes(&block_bytes).await.unwrap();
524 assert_eq!(block, block2);
525 assert_eq!(block.signature, block2.signature);
526
527 let mut other_sig = signature.clone();
529 if other_sig[42] != 42u8 {
530 other_sig[42] = 42u8;
531 } else {
532 other_sig[42] = 43u8;
533 }
534 block.signature = Some([other_sig.to_vec(), pub_key].concat());
535 let block_bytes2 = block.to_bytes().unwrap();
536 let block3 = DXBBlock::from_bytes(&block_bytes2).await;
537 assert!(block3.is_err());
538 assert_eq!(
539 block3.unwrap_err().to_string(),
540 "Something is off with the signature. at 0x0"
541 )
542 }
543}