1use core::cmp;
2use core::convert::TryFrom;
3use core::fmt;
4use serde::de::{SeqAccess, Visitor};
5use serde::ser::{SerializeSeq, Serializer};
6use serde::{de, Deserialize, Deserializer, Serialize};
7
8use super::canonical::*;
9use super::crc::*;
10use super::dtntime::*;
11use super::eid::*;
12use super::flags::*;
13use super::primary::*;
14use crate::error::{Error, ErrorList};
15use thiserror::Error;
16
17pub const DTN_VERSION: u32 = 7;
19
20pub type ByteBuffer = Vec<u8>;
21
22pub type DtnVersionType = u32;
23pub type CanonicalBlockNumberType = u64;
24pub type FragOffsetType = u64;
25pub type TotalDataLengthType = u64;
26
27pub trait Block {
34 fn to_cbor(&self) -> ByteBuffer;
36}
37
38#[derive(Error, Debug)]
45pub enum BundleBuilderError {
46 #[error("Missing payload block")]
47 NoPayloadBlock,
48}
49
50#[derive(Debug, Clone, PartialEq, Default)]
51pub struct BundleBuilder {
52 primary: PrimaryBlock,
53 canonicals: Vec<CanonicalBlock>,
54}
55
56impl BundleBuilder {
57 pub fn new() -> Self {
58 Self::default()
59 }
60 pub fn primary(mut self, primary: PrimaryBlock) -> Self {
61 self.primary = primary;
62 self
63 }
64 pub fn canonicals(mut self, canonicals: Vec<CanonicalBlock>) -> Self {
65 self.canonicals = canonicals;
66 self
67 }
68 pub fn payload(mut self, payload: ByteBuffer) -> Self {
69 let payload = crate::canonical::new_payload_block(BlockControlFlags::empty(), payload);
70 self.canonicals.push(payload);
71 self
72 }
73 pub fn build(mut self) -> Result<Bundle, BundleBuilderError> {
74 self.canonicals
75 .sort_by(|a, b| b.block_number.cmp(&a.block_number));
76
77 if self.canonicals.is_empty() || self.canonicals.last().unwrap().payload_data().is_none() {
78 Err(BundleBuilderError::NoPayloadBlock)
79 } else {
80 Ok(Bundle {
81 primary: self.primary,
82 canonicals: self.canonicals,
83 })
84 }
85 }
86}
87
88#[derive(Debug, Clone, PartialEq, Default)]
91pub struct Bundle {
92 pub primary: PrimaryBlock,
93 pub canonicals: Vec<CanonicalBlock>,
94}
95
96impl Serialize for Bundle {
97 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
98 where
99 S: Serializer,
100 {
101 let mut seq = serializer.serialize_seq(Some(self.canonicals.len() + 1))?;
102 seq.serialize_element(&self.primary)?;
103 for e in &self.canonicals {
104 seq.serialize_element(&e)?;
105 }
106 seq.end()
107 }
108}
109
110impl<'de> Deserialize<'de> for Bundle {
111 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
112 where
113 D: Deserializer<'de>,
114 {
115 struct BundleVisitor;
116
117 impl<'de> Visitor<'de> for BundleVisitor {
118 type Value = Bundle;
119
120 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
121 formatter.write_str("bundle")
122 }
123
124 fn visit_seq<V>(self, mut seq: V) -> Result<Self::Value, V::Error>
125 where
126 V: SeqAccess<'de>,
127 {
128 let primary: PrimaryBlock = seq
129 .next_element()?
130 .ok_or_else(|| de::Error::invalid_length(0, &self))?;
131
132 let mut canonicals: Vec<CanonicalBlock> = Vec::new();
133 while let Some(next) = seq.next_element::<CanonicalBlock>()? {
134 canonicals.push(next);
135 }
136
137 Ok(Bundle {
138 primary,
139 canonicals,
140 })
141 }
142 }
143
144 deserializer.deserialize_any(BundleVisitor)
145 }
146}
147
148impl Bundle {
149 pub fn new(primary: PrimaryBlock, canonicals: Vec<CanonicalBlock>) -> Bundle {
150 Bundle {
151 primary,
152 canonicals,
153 }
154 }
155
156 pub fn validate(&self) -> Result<(), ErrorList> {
158 let mut errors: ErrorList = Vec::new();
159 let mut b_num: std::collections::HashSet<u64> =
163 std::collections::HashSet::with_capacity(15);
164 let mut b_types: std::collections::HashSet<u64> =
165 std::collections::HashSet::with_capacity(15);
166
167 if let Err(mut err) = self.primary.validate() {
168 errors.append(&mut err);
169 }
170 for blck in &self.canonicals {
171 if let Err(mut err) = blck.validate() {
172 errors.append(&mut err);
173 }
174 if (self
175 .primary
176 .bundle_control_flags
177 .contains(BundleControlFlags::BUNDLE_ADMINISTRATIVE_RECORD_PAYLOAD)
178 || self.primary.source == EndpointID::none())
179 && blck
180 .block_control_flags
181 .flags()
182 .contains(BlockControlFlags::BLOCK_STATUS_REPORT)
183 {
184 errors.push(Error::BundleError(
185 "Bundle Processing Control Flags indicate that this bundle's payload is an administrative record or the source node is omitted, but the \"Transmit status report if block cannot be processed\" Block Processing Control Flag was set in a Canonical Block".to_string()
186 ));
187 }
188 if !b_num.insert(blck.block_number) {
189 errors.push(Error::BundleError(
190 "Block numbers occurred multiple times".to_string(),
191 ));
192 }
193 if !b_types.insert(blck.block_type)
194 && (blck.block_type == BUNDLE_AGE_BLOCK
195 || blck.block_type == HOP_COUNT_BLOCK
196 || blck.block_type == PREVIOUS_NODE_BLOCK)
197 {
198 errors.push(Error::BundleError(
199 "PreviousNode, BundleAge and HopCound blocks must not occure multiple times"
200 .to_string(),
201 ));
202 }
203 }
204 if self.primary.creation_timestamp.dtntime() == 0 && b_types.contains(&BUNDLE_AGE_BLOCK) {
205 errors.push(Error::BundleError(
206 "Creation Timestamp is zero, but no Bundle Age block is present".to_string(),
207 ));
208 }
209 if self.payload().is_none() {
210 errors.push(Error::BundleError("Missing Payload Block".to_string()));
211 } else if self.canonicals.last().unwrap().block_type != PAYLOAD_BLOCK {
212 errors.push(Error::BundleError(
213 "Last block must be a payload block".to_string(),
214 ));
215 }
216
217 if !errors.is_empty() {
218 return Err(errors);
219 }
220 Ok(())
221 }
222 pub fn sort_canonicals(&mut self) {
224 self.canonicals
225 .sort_by(|a, b| b.block_number.cmp(&a.block_number));
226 }
227 fn next_canonical_block_number(&self) -> u64 {
228 let mut highest_block_number = 1;
229 for c in self.canonicals.iter() {
230 highest_block_number = cmp::max(highest_block_number, c.block_number);
231 }
232 highest_block_number + 1
233 }
234
235 pub fn add_canonical_block(&mut self, mut cblock: CanonicalBlock) {
237 if (cblock.block_type == PAYLOAD_BLOCK
239 || cblock.block_type == HOP_COUNT_BLOCK
240 || cblock.block_type == BUNDLE_AGE_BLOCK
241 || cblock.block_type == PREVIOUS_NODE_BLOCK)
242 && self.extension_block_by_type(cblock.block_type).is_some()
243 {
244 return;
245 }
246 let block_num = if cblock.block_type == PAYLOAD_BLOCK {
247 crate::canonical::PAYLOAD_BLOCK_NUMBER
248 } else {
249 self.next_canonical_block_number()
250 };
251 cblock.block_number = block_num;
252 self.canonicals.push(cblock);
253 self.sort_canonicals();
254 }
255 pub fn is_administrative_record(&self) -> bool {
257 self.primary
258 .bundle_control_flags
259 .flags()
260 .contains(BundleControlFlags::BUNDLE_ADMINISTRATIVE_RECORD_PAYLOAD)
261 }
262 pub fn payload(&self) -> Option<&ByteBuffer> {
264 self.extension_block_by_type(crate::canonical::PAYLOAD_BLOCK)?
265 .payload_data()
266 }
267
268 pub fn set_payload_block(&mut self, payload: CanonicalBlock) {
270 self.canonicals
271 .retain(|c| c.block_type != crate::canonical::PAYLOAD_BLOCK);
272 self.add_canonical_block(payload);
273 }
274
275 pub fn set_payload(&mut self, payload: ByteBuffer) {
277 if let Some(pb) = self.extension_block_by_type_mut(crate::canonical::PAYLOAD_BLOCK) {
278 pb.set_data(crate::canonical::CanonicalData::Data(payload));
279 } else {
280 let new_payload =
281 crate::canonical::new_payload_block(BlockControlFlags::empty(), payload);
282 self.set_payload_block(new_payload);
283 }
284 }
285 pub fn set_crc(&mut self, crc_type: CrcRawType) {
288 self.primary.set_crc_type(crc_type);
289 for b in &mut self.canonicals {
290 b.set_crc_type(crc_type);
291 }
292 }
293 pub fn crc_valid(&mut self) -> bool {
295 if !self.primary.check_crc() {
296 return false;
297 }
298 for b in &mut self.canonicals {
299 if !b.check_crc() {
300 return false;
301 }
302 }
303 true
304 }
305 pub fn calculate_crc(&mut self) {
307 self.primary.update_crc();
308 for b in &mut self.canonicals {
309 b.update_crc();
310 }
311 }
312
313 pub fn extension_block_by_type(
315 &self,
316 block_type: CanonicalBlockType,
317 ) -> Option<&CanonicalBlock> {
318 self.canonicals
319 .iter()
320 .find(|&b| b.block_type == block_type && b.extension_validation().is_ok())
321 }
322 pub fn extension_block_by_type_mut(
324 &mut self,
325 block_type: CanonicalBlockType,
326 ) -> Option<&mut CanonicalBlock> {
327 self.canonicals
328 .iter_mut()
329 .find(|b| b.block_type == block_type && b.extension_validation().is_ok())
330 }
331
332 pub fn to_cbor(&mut self) -> ByteBuffer {
334 self.calculate_crc();
335 let mut bytebuf = serde_cbor::to_vec(&self).expect("Error serializing bundle as cbor.");
336 bytebuf[0] = 0x9f; bytebuf.push(0xff); bytebuf
339 }
340
341 pub fn to_json(&mut self) -> String {
343 self.calculate_crc();
344 serde_json::to_string(&self).unwrap()
345 }
346
347 pub fn id(&self) -> String {
351 let src = self.primary.source.to_string();
352 let mut id = format!(
353 "{}-{}-{}",
354 src,
357 self.primary.creation_timestamp.dtntime(),
358 self.primary.creation_timestamp.seqno(),
359 );
361 if self.primary.has_fragmentation() {
362 id = format!("{}-{}", id, self.primary.fragmentation_offset);
363 }
364 id
365 }
366
367 pub fn update_extensions(&mut self, local_node: EndpointID, residence_time: u128) -> bool {
371 if let Some(hcblock) = self.extension_block_by_type_mut(HOP_COUNT_BLOCK) {
372 hcblock.hop_count_increase();
373 if hcblock.hop_count_exceeded() {
374 return false;
375 }
376 }
377 if let Some(pnblock) = self.extension_block_by_type_mut(PREVIOUS_NODE_BLOCK) {
378 pnblock.previous_node_update(local_node);
379 }
380 if let Some(bablock) = self.extension_block_by_type_mut(BUNDLE_AGE_BLOCK) {
381 if let Some(ba_orig) = bablock.bundle_age_get() {
382 bablock.bundle_age_update(ba_orig + residence_time);
383 if ba_orig + residence_time > self.primary.lifetime.as_micros() {
384 return false;
386 }
387 }
388 }
389 !self.primary.is_lifetime_exceeded()
390 }
391
392 pub fn previous_node(&self) -> Option<&EndpointID> {
394 let pnblock = self.extension_block_by_type(PREVIOUS_NODE_BLOCK)?;
395 pnblock.previous_node_get()
396 }
397}
398
399impl fmt::Display for Bundle {
400 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
401 write!(f, "{}_{}", self.id(), self.primary.destination)
402 }
403}
404
405impl TryFrom<ByteBuffer> for Bundle {
407 type Error = Error;
408
409 fn try_from(item: ByteBuffer) -> Result<Self, Self::Error> {
410 match serde_cbor::from_slice(&item) {
411 Ok(bndl) => Ok(bndl),
412 Err(err) => Err(err.into()),
413 }
414 }
415}
416
417impl TryFrom<&[u8]> for Bundle {
419 type Error = Error;
420
421 fn try_from(item: &[u8]) -> Result<Self, Self::Error> {
422 match serde_cbor::from_slice(item) {
423 Ok(bndl) => Ok(bndl),
424 Err(err) => Err(err.into()),
425 }
426 }
427}
428
429impl TryFrom<String> for Bundle {
431 type Error = Error;
432
433 fn try_from(item: String) -> Result<Self, Self::Error> {
434 match serde_json::from_str(&item) {
435 Ok(bndl) => Ok(bndl),
436 Err(err) => Err(err.into()),
437 }
438 }
439}
440
441pub fn new_std_payload_bundle(src: EndpointID, dst: EndpointID, data: ByteBuffer) -> Bundle {
445 let flags: BundleControlFlags = BundleControlFlags::BUNDLE_MUST_NOT_FRAGMENTED
446 | BundleControlFlags::BUNDLE_STATUS_REQUEST_DELIVERY;
447 let pblock = crate::primary::PrimaryBlockBuilder::default()
448 .bundle_control_flags(flags.bits())
449 .destination(dst)
450 .source(src.clone())
451 .report_to(src)
452 .creation_timestamp(CreationTimestamp::now())
453 .lifetime(core::time::Duration::from_secs(60 * 60))
454 .build()
455 .unwrap();
456 let mut b = crate::bundle::Bundle::new(
457 pblock,
458 vec![
459 new_payload_block(BlockControlFlags::empty(), data),
460 new_hop_count_block(2, BlockControlFlags::empty(), 32),
461 ],
462 );
463 b.set_crc(crate::crc::CRC_NO);
464 b.sort_canonicals();
465 b
466}