1use std::fmt::Display;
2use std::io::{Cursor, Read};
3use super::protocol_structures::{
6 block_header::BlockHeader,
7 encrypted_header::EncryptedHeader,
8 routing_header::{EncryptionType, RoutingHeader, SignatureType},
9};
10use crate::global::protocol_structures::routing_header::Receivers;
11use crate::utils::buffers::write_u16;
12use crate::values::core_values::endpoint::Endpoint;
13use binrw::{BinRead, BinWrite};
14use futures::channel::mpsc::UnboundedReceiver;
15use futures_util::StreamExt;
16use log::error;
17use strum::Display;
18use thiserror::Error;
19
20#[derive(Debug, Display, Error)]
21pub enum HeaderParsingError {
22 InvalidBlock,
23 InsufficientLength,
24}
25
26#[cfg_attr(feature = "debug", derive(serde::Serialize, serde::Deserialize))]
31#[derive(Debug, Clone, Default)]
32pub struct DXBBlock {
33 pub routing_header: RoutingHeader,
34 pub block_header: BlockHeader,
35 pub encrypted_header: EncryptedHeader,
36 pub body: Vec<u8>,
37
38 #[cfg_attr(feature = "debug", serde(skip))]
39 pub raw_bytes: Option<Vec<u8>>,
40}
41
42impl PartialEq for DXBBlock {
43 fn eq(&self, other: &Self) -> bool {
44 self.routing_header == other.routing_header
45 && self.block_header == other.block_header
46 && self.encrypted_header == other.encrypted_header
47 && self.body == other.body
48 }
49}
50
51const SIZE_BYTE_POSITION: usize = 3; const SIZE_BYTES: usize = 2;
53
54pub type IncomingContextId = u32;
55pub type IncomingSectionIndex = u16;
56pub type IncomingBlockNumber = u16;
57pub type OutgoingContextId = u32;
58pub type OutgoingSectionIndex = u16;
59pub type OutgoingBlockNumber = u16;
60
61#[allow(clippy::large_enum_variant)]
62#[derive(Debug)]
63pub enum IncomingSection {
64 SingleBlock((Option<DXBBlock>, IncomingEndpointContextSectionId)),
66 BlockStream(
69 (
70 Option<UnboundedReceiver<DXBBlock>>,
71 IncomingEndpointContextSectionId,
72 ),
73 ),
74}
75
76impl IncomingSection {
77 pub async fn next(&mut self) -> Option<DXBBlock> {
78 match self {
79 IncomingSection::SingleBlock((block, _)) => block.take(),
80 IncomingSection::BlockStream((blocks, _)) => {
81 if let Some(receiver) = blocks {
82 receiver.next().await
83 } else {
84 None }
86 }
87 }
88 }
89
90 pub async fn drain(&mut self) -> Vec<DXBBlock> {
91 let mut blocks = Vec::new();
92 while let Some(block) = self.next().await {
93 blocks.push(block);
94 }
95 blocks
96 }
97}
98
99impl IncomingSection {
100 pub fn get_section_index(&self) -> IncomingSectionIndex {
101 self.get_section_context_id().section_index
102 }
103
104 pub fn get_sender(&self) -> Endpoint {
105 self.get_section_context_id()
106 .endpoint_context_id
107 .sender
108 .clone()
109 }
110
111 pub fn get_section_context_id(&self) -> &IncomingEndpointContextSectionId {
112 match self {
113 IncomingSection::SingleBlock((_, section_context_id))
114 | IncomingSection::BlockStream((_, section_context_id)) => {
115 section_context_id
116 }
117 }
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, Hash)]
122pub struct IncomingEndpointContextId {
123 pub sender: Endpoint,
124 pub context_id: IncomingContextId,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq, Hash)]
128pub struct IncomingEndpointContextSectionId {
129 pub endpoint_context_id: IncomingEndpointContextId,
130 pub section_index: IncomingSectionIndex,
131}
132
133impl IncomingEndpointContextSectionId {
134 pub fn new(
135 endpoint_context_id: IncomingEndpointContextId,
136 section_index: IncomingSectionIndex,
137 ) -> Self {
138 IncomingEndpointContextSectionId {
139 endpoint_context_id,
140 section_index,
141 }
142 }
143}
144
145#[derive(Debug, Clone, PartialEq, Eq, Hash)]
147pub struct BlockId {
148 pub endpoint_context_id: IncomingEndpointContextId,
149 pub timestamp: u64,
150 pub current_section_index: IncomingSectionIndex,
151 pub current_block_number: IncomingBlockNumber,
152}
153
154impl DXBBlock {
155 pub fn new(
156 routing_header: RoutingHeader,
157 block_header: BlockHeader,
158 encrypted_header: EncryptedHeader,
159 body: Vec<u8>,
160 ) -> DXBBlock {
161 DXBBlock {
162 routing_header,
163 block_header,
164 encrypted_header,
165 body,
166 raw_bytes: None,
167 }
168 }
169
170 pub fn to_bytes(&self) -> Result<Vec<u8>, binrw::Error> {
171 let mut writer = Cursor::new(Vec::new());
172 self.routing_header.write(&mut writer)?;
173 self.block_header.write(&mut writer)?;
174 self.encrypted_header.write(&mut writer)?;
175 let mut bytes = writer.into_inner();
176 bytes.extend_from_slice(&self.body);
177 Ok(DXBBlock::adjust_block_length(bytes))
178 }
179 pub fn recalculate_struct(&mut self) -> &mut Self {
180 let bytes = self.to_bytes().unwrap();
181 let size = bytes.len() as u16;
182 self.routing_header.block_size = size;
183 self
184 }
185
186 fn adjust_block_length(mut bytes: Vec<u8>) -> Vec<u8> {
187 let size = bytes.len() as u32;
188 write_u16(&mut bytes, &mut SIZE_BYTE_POSITION.clone(), size as u16);
189 bytes
190 }
191
192 pub fn has_dxb_magic_number(dxb: &[u8]) -> bool {
193 dxb.len() >= 2 && dxb[0] == 0x01 && dxb[1] == 0x64
194 }
195
196 pub fn extract_dxb_block_length(
197 dxb: &[u8],
198 ) -> Result<u16, HeaderParsingError> {
199 if dxb.len() < SIZE_BYTE_POSITION + SIZE_BYTES {
200 return Err(HeaderParsingError::InsufficientLength);
201 }
202 let routing_header = RoutingHeader::read(&mut Cursor::new(dxb))
203 .map_err(|e| {
204 error!("Failed to read routing header: {e:?}");
205 HeaderParsingError::InvalidBlock
206 })?;
207 Ok(routing_header.block_size)
208 }
209
210 pub fn from_bytes(bytes: &[u8]) -> Result<DXBBlock, binrw::Error> {
211 let mut reader = Cursor::new(bytes);
212 let routing_header = RoutingHeader::read(&mut reader)?;
213
214 let _signature = match routing_header.flags.signature_type() {
215 SignatureType::Encrypted => {
216 let mut signature = Vec::with_capacity(255);
218 reader.read_exact(&mut signature)?;
219
220 Some(signature)
222 }
223 SignatureType::Unencrypted => {
224 let mut signature = Vec::with_capacity(255);
226 reader.read_exact(&mut signature)?;
227 Some(signature)
228 }
229 SignatureType::None => None,
230 };
231
232 let decrypted_bytes = match routing_header.flags.encryption_type() {
234 EncryptionType::Encrypted => {
235 let mut decrypted_bytes = Vec::with_capacity(255);
237 reader.read_exact(&mut decrypted_bytes)?;
238 decrypted_bytes
239 }
240 EncryptionType::None => {
241 let mut bytes = Vec::new();
242 reader.read_to_end(&mut bytes)?;
243 bytes
244 }
245 };
246
247 let mut reader = Cursor::new(decrypted_bytes);
248 let block_header = BlockHeader::read(&mut reader)?;
249 let encrypted_header = EncryptedHeader::read(&mut reader)?;
250
251 let mut body = Vec::new();
252 reader.read_to_end(&mut body)?;
253
254 Ok(DXBBlock {
255 routing_header,
256 block_header,
257 encrypted_header,
258 body,
259 raw_bytes: Some(bytes.to_vec()),
260 })
261 }
262
263 pub fn receiver_endpoints(&self) -> Vec<Endpoint> {
265 match self.routing_header.receivers() {
266 Receivers::Endpoints(endpoints) => endpoints,
267 Receivers::EndpointsWithKeys(endpoints_with_keys) => {
268 endpoints_with_keys.into_iter().map(|(e, _)| e).collect()
269 }
270 Receivers::PointerId(_) => unimplemented!(),
271 _ => Vec::new(),
272 }
273 }
274 pub fn receivers(&self) -> Receivers {
275 self.routing_header.receivers()
276 }
277
278 pub fn set_receivers<T>(&mut self, endpoints: T)
280 where
281 T: Into<Receivers>,
282 {
283 self.routing_header.set_receivers(endpoints.into());
284 }
285
286 pub fn set_bounce_back(&mut self, bounce_back: bool) {
287 self.routing_header.flags.set_is_bounce_back(bounce_back);
288 }
289
290 pub fn is_bounce_back(&self) -> bool {
291 self.routing_header.flags.is_bounce_back()
292 }
293
294 pub fn get_sender(&self) -> &Endpoint {
295 &self.routing_header.sender
296 }
297
298 pub fn get_endpoint_context_id(&self) -> IncomingEndpointContextId {
299 IncomingEndpointContextId {
300 sender: self.routing_header.sender.clone(),
301 context_id: self.block_header.context_id,
302 }
303 }
304
305 pub fn get_block_id(&self) -> BlockId {
306 BlockId {
307 endpoint_context_id: self.get_endpoint_context_id(),
308 timestamp: self
309 .block_header
310 .flags_and_timestamp
311 .creation_timestamp(),
312 current_section_index: self.block_header.section_index,
313 current_block_number: self.block_header.block_number,
314 }
315 }
316
317 pub fn has_exact_receiver_count(&self) -> bool {
320 !self
321 .receiver_endpoints()
322 .iter()
323 .any(|e| e.is_broadcast() || e.is_any())
324 }
325
326 pub fn clone_with_new_receivers<T>(&self, new_receivers: T) -> DXBBlock
327 where
328 T: Into<Receivers>,
329 {
330 let mut new_block = self.clone();
331 new_block.set_receivers(new_receivers.into());
332 new_block
333 }
334}
335
336impl Display for DXBBlock {
337 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
338 let block_type = self.block_header.flags_and_timestamp.block_type();
339 let sender = &self.routing_header.sender;
340 let receivers = self.receivers();
341 write!(f, "[{block_type}] {sender} -> {receivers}")?;
342
343 Ok(())
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use std::str::FromStr;
350
351 use crate::{
352 global::{
353 dxb_block::DXBBlock,
354 protocol_structures::{
355 encrypted_header::{self, EncryptedHeader},
356 routing_header::RoutingHeader,
357 },
358 },
359 values::core_values::endpoint::Endpoint,
360 };
361
362 #[test]
363 pub fn test_recalculate() {
364 let mut routing_header = RoutingHeader::default()
365 .with_sender(Endpoint::from_str("@test").unwrap())
366 .to_owned();
367 routing_header.set_size(420);
368 let mut block = DXBBlock {
369 body: vec![0x01, 0x02, 0x03],
370 encrypted_header: EncryptedHeader {
371 flags: encrypted_header::Flags::new()
372 .with_user_agent(encrypted_header::UserAgent::Unused11),
373 ..Default::default()
374 },
375 routing_header,
376 ..DXBBlock::default()
377 };
378
379 {
380 let block_bytes = block.to_bytes().unwrap();
382 let block2: DXBBlock = DXBBlock::from_bytes(&block_bytes).unwrap();
383 assert_ne!(block, block2);
384 }
385
386 {
387 block.recalculate_struct();
389 let block_bytes = block.to_bytes().unwrap();
390 let block3: DXBBlock = DXBBlock::from_bytes(&block_bytes).unwrap();
391 assert_eq!(block, block3);
392 }
393 }
394}