1use std::fmt::Display;
2use std::io::{Cursor, Read};
3use super::protocol_structures::{
6 block_header::BlockHeader,
7 encrypted_header::EncryptedHeader,
8 routing_header::{BlockSize, EncryptionType, RoutingHeader, SignatureType},
9};
10use crate::global::protocol_structures::routing_header::ReceiverEndpoints;
11use crate::utils::buffers::{clear_bit, set_bit, write_u16, write_u32};
12use crate::values::core_values::endpoint::Endpoint;
13use binrw::{BinRead, BinWrite};
14use futures::channel::mpsc::UnboundedReceiver;
15use futures_util::StreamExt;
16use log::error;
17use strum::Display;
18use thiserror::Error;
19
20#[derive(Debug, Display, Error)]
21pub enum HeaderParsingError {
22 InvalidBlock,
23 InsufficientLength,
24}
25
26#[derive(Debug, Clone, Default)]
29pub struct DXBBlock {
30 pub routing_header: RoutingHeader,
31 pub block_header: BlockHeader,
32 pub encrypted_header: EncryptedHeader,
33 pub body: Vec<u8>,
34 pub raw_bytes: Option<Vec<u8>>,
35}
36
37impl PartialEq for DXBBlock {
38 fn eq(&self, other: &Self) -> bool {
39 self.routing_header == other.routing_header
40 && self.block_header == other.block_header
41 && self.encrypted_header == other.encrypted_header
42 && self.body == other.body
43 }
44}
45
46const ROUTING_HEADER_FLAGS_POSITION: usize = 5;
47const SIZE_BYTE_POSITION: usize = ROUTING_HEADER_FLAGS_POSITION + 1;
48const MAX_SIZE_BYTE_LENGTH: usize = 4;
49const ROUTING_HEADER_FLAGS_SIZE_BIT_POSITION: u8 = 3;
50
51pub type IncomingContextId = u32;
52pub type IncomingSectionIndex = u16;
53pub type IncomingBlockNumber = u16;
54pub type OutgoingContextId = u32;
55pub type OutgoingSectionIndex = u16;
56pub type OutgoingBlockNumber = u16;
57
58#[allow(clippy::large_enum_variant)]
59#[derive(Debug)]
60pub enum IncomingSection {
61 SingleBlock((Option<DXBBlock>, IncomingEndpointContextSectionId)),
63 BlockStream(
66 (
67 Option<UnboundedReceiver<DXBBlock>>,
68 IncomingEndpointContextSectionId,
69 ),
70 ),
71}
72
73impl IncomingSection {
74 pub async fn next(&mut self) -> Option<DXBBlock> {
75 match self {
76 IncomingSection::SingleBlock((block, _)) => block.take(),
77 IncomingSection::BlockStream((blocks, _)) => {
78 if let Some(receiver) = blocks {
79 receiver.next().await
80 } else {
81 None }
83 }
84 }
85 }
86
87 pub async fn drain(&mut self) -> Vec<DXBBlock> {
88 let mut blocks = Vec::new();
89 while let Some(block) = self.next().await {
90 blocks.push(block);
91 }
92 blocks
93 }
94}
95
96impl IncomingSection {
97 pub fn get_section_index(&self) -> IncomingSectionIndex {
98 self.get_section_context_id().section_index
99 }
100
101 pub fn get_sender(&self) -> Endpoint {
102 self.get_section_context_id()
103 .endpoint_context_id
104 .sender
105 .clone()
106 }
107
108 pub fn get_section_context_id(&self) -> &IncomingEndpointContextSectionId {
109 match self {
110 IncomingSection::SingleBlock((_, section_context_id))
111 | IncomingSection::BlockStream((_, section_context_id)) => {
112 section_context_id
113 }
114 }
115 }
116}
117
118#[derive(Debug, Clone, PartialEq, Eq, Hash)]
119pub struct IncomingEndpointContextId {
120 pub sender: Endpoint,
121 pub context_id: IncomingContextId,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq, Hash)]
125pub struct IncomingEndpointContextSectionId {
126 pub endpoint_context_id: IncomingEndpointContextId,
127 pub section_index: IncomingSectionIndex,
128}
129
130impl IncomingEndpointContextSectionId {
131 pub fn new(
132 endpoint_context_id: IncomingEndpointContextId,
133 section_index: IncomingSectionIndex,
134 ) -> Self {
135 IncomingEndpointContextSectionId {
136 endpoint_context_id,
137 section_index,
138 }
139 }
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, Hash)]
144pub struct BlockId {
145 pub endpoint_context_id: IncomingEndpointContextId,
146 pub timestamp: u64,
147 pub current_section_index: IncomingSectionIndex,
148 pub current_block_number: IncomingBlockNumber,
149}
150
151impl DXBBlock {
152 pub fn new(
153 routing_header: RoutingHeader,
154 block_header: BlockHeader,
155 encrypted_header: EncryptedHeader,
156 body: Vec<u8>,
157 ) -> DXBBlock {
158 DXBBlock {
159 routing_header,
160 block_header,
161 encrypted_header,
162 body,
163 raw_bytes: None,
164 }
165 }
166
167 pub fn to_bytes(&self) -> Result<Vec<u8>, binrw::Error> {
168 let mut writer = Cursor::new(Vec::new());
169 self.routing_header.write(&mut writer)?;
170 self.block_header.write(&mut writer)?;
171 self.encrypted_header.write(&mut writer)?;
172 let mut bytes = writer.into_inner();
173 bytes.extend_from_slice(&self.body);
174 Ok(DXBBlock::adjust_block_length(bytes, &self.routing_header))
175 }
176 pub fn recalculate_struct(&mut self) -> &mut Self {
177 let bytes = self.to_bytes().unwrap();
178 let size = bytes.len() as u32;
179 let is_small_size = size <= u16::MAX as u32;
180 self.routing_header.flags.set_block_size(if is_small_size {
181 BlockSize::Default
182 } else {
183 BlockSize::Large
184 });
185 self.routing_header.block_size_u16 = if is_small_size {
186 Some(size as u16)
187 } else {
188 None
189 };
190 self.routing_header.block_size_u32 =
191 if is_small_size { None } else { Some(size) };
192 self
193 }
194
195 fn adjust_block_length(
196 mut bytes: Vec<u8>,
197 routing_header: &RoutingHeader,
198 ) -> Vec<u8> {
199 let size = bytes.len() as u32;
200 let is_small_size = size <= u16::MAX as u32;
201
202 if is_small_size {
203 if routing_header.flags.block_size() == BlockSize::Large {
205 bytes.remove(SIZE_BYTE_POSITION);
206 }
207 write_u16(&mut bytes, &mut SIZE_BYTE_POSITION.clone(), size as u16);
208 } else {
209 if routing_header.flags.block_size() == BlockSize::Default {
211 bytes.insert(SIZE_BYTE_POSITION, 0);
212 }
213 write_u32(&mut bytes, &mut SIZE_BYTE_POSITION.clone(), size);
214 }
215
216 if is_small_size {
218 clear_bit(
219 &mut bytes,
220 ROUTING_HEADER_FLAGS_POSITION,
221 ROUTING_HEADER_FLAGS_SIZE_BIT_POSITION,
222 );
223 } else {
224 set_bit(
225 &mut bytes,
226 ROUTING_HEADER_FLAGS_POSITION,
227 ROUTING_HEADER_FLAGS_SIZE_BIT_POSITION,
228 );
229 }
230 bytes
231 }
232
233 pub fn has_dxb_magic_number(dxb: &[u8]) -> bool {
234 dxb.len() >= 2 && dxb[0] == 0x01 && dxb[1] == 0x64
235 }
236
237 pub fn extract_dxb_block_length(
238 dxb: &[u8],
239 ) -> Result<u32, HeaderParsingError> {
240 if dxb.len() < SIZE_BYTE_POSITION + MAX_SIZE_BYTE_LENGTH {
241 return Err(HeaderParsingError::InsufficientLength);
242 }
243 let routing_header = RoutingHeader::read(&mut Cursor::new(dxb))
244 .map_err(|e| {
245 error!("Failed to read routing header: {e:?}");
246 HeaderParsingError::InvalidBlock
247 })?;
248 if routing_header.block_size_u16.is_some() {
249 Ok(routing_header.block_size_u16.unwrap() as u32)
250 } else {
251 Ok(routing_header.block_size_u32.unwrap())
252 }
253 }
254
255 pub fn from_bytes(bytes: &[u8]) -> Result<DXBBlock, binrw::Error> {
256 let mut reader = Cursor::new(bytes);
257 let routing_header = RoutingHeader::read(&mut reader)?;
258
259 let _signature = match routing_header.flags.signature_type() {
260 SignatureType::Encrypted => {
261 let mut signature = Vec::with_capacity(255);
263 reader.read_exact(&mut signature)?;
264
265 Some(signature)
267 }
268 SignatureType::Unencrypted => {
269 let mut signature = Vec::with_capacity(255);
271 reader.read_exact(&mut signature)?;
272 Some(signature)
273 }
274 SignatureType::None => None,
275 };
276
277 let decrypted_bytes = match routing_header.flags.encryption_type() {
279 EncryptionType::Encrypted => {
280 let mut decrypted_bytes = Vec::with_capacity(255);
282 reader.read_exact(&mut decrypted_bytes)?;
283 decrypted_bytes
284 }
285 EncryptionType::Unencrypted => {
286 let mut bytes = Vec::new();
287 reader.read_to_end(&mut bytes)?;
288 bytes
289 }
290 };
291
292 let mut reader = Cursor::new(decrypted_bytes);
293 let block_header = BlockHeader::read(&mut reader)?;
294 let encrypted_header = EncryptedHeader::read(&mut reader)?;
295
296 let mut body = Vec::new();
297 reader.read_to_end(&mut body)?;
298
299 Ok(DXBBlock {
300 routing_header,
301 block_header,
302 encrypted_header,
303 body,
304 raw_bytes: Some(bytes.to_vec()),
305 })
306 }
307
308 pub fn receivers(&self) -> Option<&Vec<Endpoint>> {
310 if let Some(endpoints) = &self.routing_header.receivers.endpoints {
311 Some(&endpoints.endpoints)
312 } else {
313 None
314 }
315 }
316
317 pub fn set_receivers(&mut self, receivers: &[Endpoint]) {
319 self.routing_header.receivers.endpoints =
320 Some(ReceiverEndpoints::new(receivers.to_vec()));
321 self.routing_header
322 .receivers
323 .flags
324 .set_has_endpoints(!receivers.is_empty());
325 }
326
327 pub fn set_bounce_back(&mut self, bounce_back: bool) {
328 self.routing_header.flags.set_is_bounce_back(bounce_back);
329 }
330
331 pub fn is_bounce_back(&self) -> bool {
332 self.routing_header.flags.is_bounce_back()
333 }
334
335 pub fn get_receivers(&self) -> Vec<Endpoint> {
336 if let Some(ref endpoints) = self.routing_header.receivers.endpoints {
337 endpoints.endpoints.clone()
338 } else if let Some(ref endpoints) =
339 self.routing_header.receivers.endpoints_with_keys
340 {
341 endpoints
342 .endpoints_with_keys
343 .iter()
344 .map(|(e, _)| e.clone())
345 .collect()
346 } else {
347 unreachable!("No receivers set in the routing header")
348 }
349 }
350
351 pub fn get_sender(&self) -> &Endpoint {
352 &self.routing_header.sender
353 }
354
355 pub fn get_endpoint_context_id(&self) -> IncomingEndpointContextId {
356 IncomingEndpointContextId {
357 sender: self.routing_header.sender.clone(),
358 context_id: self.block_header.context_id,
359 }
360 }
361
362 pub fn get_block_id(&self) -> BlockId {
363 BlockId {
364 endpoint_context_id: self.get_endpoint_context_id(),
365 timestamp: self.block_header.flags_and_timestamp.creation_timestamp(),
366 current_section_index: self.block_header.section_index,
367 current_block_number: self.block_header.block_number,
368 }
369 }
370
371 pub fn has_exact_receiver_count(&self) -> bool {
374 !self
375 .get_receivers()
376 .iter()
377 .any(|e| e.is_broadcast() || e.is_any())
378 }
379
380 pub fn clone_with_new_receivers(
381 &self,
382 new_receivers: &[Endpoint],
383 ) -> DXBBlock {
384 let mut new_block = self.clone();
385 new_block.set_receivers(new_receivers);
386 new_block
387 }
388}
389
390impl Display for DXBBlock {
391 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
392 let block_type = self.block_header.flags_and_timestamp.block_type();
393 let sender = &self.routing_header.sender;
394 let receivers = self
395 .receivers()
396 .map(|endpoints| {
397 endpoints
398 .iter()
399 .map(|e| e.to_string())
400 .collect::<Vec<_>>()
401 .join(", ")
402 })
403 .unwrap_or("none".to_string());
404
405 write!(f, "[{block_type}] {sender} -> {receivers}")?;
406
407 Ok(())
408 }
409}