Skip to main content

bp7/
bundle.rs

1use core::cmp;
2use core::convert::TryFrom;
3use core::fmt;
4use serde::de::{SeqAccess, Visitor};
5use serde::ser::{SerializeSeq, Serializer};
6use serde::{de, Deserialize, Deserializer, Serialize};
7
8use super::canonical::*;
9use super::crc::*;
10use super::dtntime::*;
11use super::eid::*;
12use super::flags::*;
13use super::primary::*;
14use crate::error::{Error, ErrorList};
15use thiserror::Error;
16
17/// Version for upcoming bundle protocol standard is 7.
18pub const DTN_VERSION: u32 = 7;
19
20pub type ByteBuffer = Vec<u8>;
21
22pub type DtnVersionType = u32;
23pub type CanonicalBlockNumberType = u64;
24pub type FragOffsetType = u64;
25pub type TotalDataLengthType = u64;
26
27/******************************
28 *
29 * Block
30 *
31 ******************************/
32
33pub trait Block {
34    /// Convert block struct to a serializable enum
35    fn to_cbor(&self) -> ByteBuffer;
36}
37
38/******************************
39 *
40 * Bundle
41 *
42 ******************************/
43
44#[derive(Error, Debug)]
45pub enum BundleBuilderError {
46    #[error("Missing payload block")]
47    NoPayloadBlock,
48}
49
50#[derive(Debug, Clone, PartialEq, Default)]
51pub struct BundleBuilder {
52    primary: PrimaryBlock,
53    canonicals: Vec<CanonicalBlock>,
54}
55
56impl BundleBuilder {
57    pub fn new() -> Self {
58        Self::default()
59    }
60    pub fn primary(mut self, primary: PrimaryBlock) -> Self {
61        self.primary = primary;
62        self
63    }
64    pub fn canonicals(mut self, canonicals: Vec<CanonicalBlock>) -> Self {
65        self.canonicals = canonicals;
66        self
67    }
68    pub fn payload(mut self, payload: ByteBuffer) -> Self {
69        let payload = crate::canonical::new_payload_block(BlockControlFlags::empty(), payload);
70        self.canonicals.push(payload);
71        self
72    }
73    pub fn build(mut self) -> Result<Bundle, BundleBuilderError> {
74        self.canonicals
75            .sort_by(|a, b| b.block_number.cmp(&a.block_number));
76
77        if self.canonicals.is_empty() || self.canonicals.last().unwrap().payload_data().is_none() {
78            Err(BundleBuilderError::NoPayloadBlock)
79        } else {
80            Ok(Bundle {
81                primary: self.primary,
82                canonicals: self.canonicals,
83            })
84        }
85    }
86}
87
88/// Bundle represents a bundle as defined in section 4.2.1. Each Bundle contains
89/// one primary block and multiple canonical blocks.
90#[derive(Debug, Clone, PartialEq, Default)]
91pub struct Bundle {
92    pub primary: PrimaryBlock,
93    pub canonicals: Vec<CanonicalBlock>,
94}
95
96impl Serialize for Bundle {
97    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
98    where
99        S: Serializer,
100    {
101        let mut seq = serializer.serialize_seq(Some(self.canonicals.len() + 1))?;
102        seq.serialize_element(&self.primary)?;
103        for e in &self.canonicals {
104            seq.serialize_element(&e)?;
105        }
106        seq.end()
107    }
108}
109
110impl<'de> Deserialize<'de> for Bundle {
111    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
112    where
113        D: Deserializer<'de>,
114    {
115        struct BundleVisitor;
116
117        impl<'de> Visitor<'de> for BundleVisitor {
118            type Value = Bundle;
119
120            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
121                formatter.write_str("bundle")
122            }
123
124            fn visit_seq<V>(self, mut seq: V) -> Result<Self::Value, V::Error>
125            where
126                V: SeqAccess<'de>,
127            {
128                let primary: PrimaryBlock = seq
129                    .next_element()?
130                    .ok_or_else(|| de::Error::invalid_length(0, &self))?;
131
132                let mut canonicals: Vec<CanonicalBlock> = Vec::new();
133                while let Some(next) = seq.next_element::<CanonicalBlock>()? {
134                    canonicals.push(next);
135                }
136
137                Ok(Bundle {
138                    primary,
139                    canonicals,
140                })
141            }
142        }
143
144        deserializer.deserialize_any(BundleVisitor)
145    }
146}
147
148impl Bundle {
149    pub fn new(primary: PrimaryBlock, canonicals: Vec<CanonicalBlock>) -> Bundle {
150        Bundle {
151            primary,
152            canonicals,
153        }
154    }
155
156    /// Validate bundle and optionally return list of errors.
157    pub fn validate(&self) -> Result<(), ErrorList> {
158        let mut errors: ErrorList = Vec::new();
159        //let mut block_numbers: Vec<CanonicalBlockNumberType> = Vec::new();
160        //let mut block_types: Vec<CanonicalBlockType> = Vec::new();
161
162        let mut b_num: std::collections::HashSet<u64> =
163            std::collections::HashSet::with_capacity(15);
164        let mut b_types: std::collections::HashSet<u64> =
165            std::collections::HashSet::with_capacity(15);
166
167        if let Err(mut err) = self.primary.validate() {
168            errors.append(&mut err);
169        }
170        for blck in &self.canonicals {
171            if let Err(mut err) = blck.validate() {
172                errors.append(&mut err);
173            }
174            if (self
175                .primary
176                .bundle_control_flags
177                .contains(BundleControlFlags::BUNDLE_ADMINISTRATIVE_RECORD_PAYLOAD)
178                || self.primary.source == EndpointID::none())
179                && blck
180                    .block_control_flags
181                    .flags()
182                    .contains(BlockControlFlags::BLOCK_STATUS_REPORT)
183            {
184                errors.push(Error::BundleError(
185                        "Bundle Processing Control Flags indicate that this bundle's payload is an administrative record or the source node is omitted, but the \"Transmit status report if block cannot be processed\" Block Processing Control Flag was set in a Canonical Block".to_string()
186                    ));
187            }
188            if !b_num.insert(blck.block_number) {
189                errors.push(Error::BundleError(
190                    "Block numbers occurred multiple times".to_string(),
191                ));
192            }
193            if !b_types.insert(blck.block_type)
194                && (blck.block_type == BUNDLE_AGE_BLOCK
195                    || blck.block_type == HOP_COUNT_BLOCK
196                    || blck.block_type == PREVIOUS_NODE_BLOCK)
197            {
198                errors.push(Error::BundleError(
199                    "PreviousNode, BundleAge and HopCound blocks must not occure multiple times"
200                        .to_string(),
201                ));
202            }
203        }
204        if self.primary.creation_timestamp.dtntime() == 0 && b_types.contains(&BUNDLE_AGE_BLOCK) {
205            errors.push(Error::BundleError(
206                "Creation Timestamp is zero, but no Bundle Age block is present".to_string(),
207            ));
208        }
209        if self.payload().is_none() {
210            errors.push(Error::BundleError("Missing Payload Block".to_string()));
211        } else if self.canonicals.last().unwrap().block_type != PAYLOAD_BLOCK {
212            errors.push(Error::BundleError(
213                "Last block must be a payload block".to_string(),
214            ));
215        }
216
217        if !errors.is_empty() {
218            return Err(errors);
219        }
220        Ok(())
221    }
222    /// Sort canonical blocks by block number
223    pub fn sort_canonicals(&mut self) {
224        self.canonicals
225            .sort_by(|a, b| b.block_number.cmp(&a.block_number));
226    }
227    fn next_canonical_block_number(&self) -> u64 {
228        let mut highest_block_number = 1;
229        for c in self.canonicals.iter() {
230            highest_block_number = cmp::max(highest_block_number, c.block_number);
231        }
232        highest_block_number + 1
233    }
234
235    /// Automatically assign a block number and add canonical block to bundle
236    pub fn add_canonical_block(&mut self, mut cblock: CanonicalBlock) {
237        // TODO: report errors
238        if (cblock.block_type == PAYLOAD_BLOCK
239            || cblock.block_type == HOP_COUNT_BLOCK
240            || cblock.block_type == BUNDLE_AGE_BLOCK
241            || cblock.block_type == PREVIOUS_NODE_BLOCK)
242            && self.extension_block_by_type(cblock.block_type).is_some()
243        {
244            return;
245        }
246        let block_num = if cblock.block_type == PAYLOAD_BLOCK {
247            crate::canonical::PAYLOAD_BLOCK_NUMBER
248        } else {
249            self.next_canonical_block_number()
250        };
251        cblock.block_number = block_num;
252        self.canonicals.push(cblock);
253        self.sort_canonicals();
254    }
255    /// Checks whether the bundle is an administrative record
256    pub fn is_administrative_record(&self) -> bool {
257        self.primary
258            .bundle_control_flags
259            .flags()
260            .contains(BundleControlFlags::BUNDLE_ADMINISTRATIVE_RECORD_PAYLOAD)
261    }
262    /// Return payload of bundle if an payload block exists and carries data.
263    pub fn payload(&self) -> Option<&ByteBuffer> {
264        self.extension_block_by_type(crate::canonical::PAYLOAD_BLOCK)?
265            .payload_data()
266    }
267
268    /// Sets or updates the payload block
269    pub fn set_payload_block(&mut self, payload: CanonicalBlock) {
270        self.canonicals
271            .retain(|c| c.block_type != crate::canonical::PAYLOAD_BLOCK);
272        self.add_canonical_block(payload);
273    }
274
275    /// Sets or updates the payload
276    pub fn set_payload(&mut self, payload: ByteBuffer) {
277        if let Some(pb) = self.extension_block_by_type_mut(crate::canonical::PAYLOAD_BLOCK) {
278            pb.set_data(crate::canonical::CanonicalData::Data(payload));
279        } else {
280            let new_payload =
281                crate::canonical::new_payload_block(BlockControlFlags::empty(), payload);
282            self.set_payload_block(new_payload);
283        }
284    }
285    /// Sets the given CRCType for each block. The crc value
286    /// is calculated on-the-fly before serializing.
287    pub fn set_crc(&mut self, crc_type: CrcRawType) {
288        self.primary.set_crc_type(crc_type);
289        for b in &mut self.canonicals {
290            b.set_crc_type(crc_type);
291        }
292    }
293    /// Check whether a bundle has only valid CRC checksums in all blocks.
294    pub fn crc_valid(&mut self) -> bool {
295        if !self.primary.check_crc() {
296            return false;
297        }
298        for b in &mut self.canonicals {
299            if !b.check_crc() {
300                return false;
301            }
302        }
303        true
304    }
305    /// Calculate crc for all blocks.
306    pub fn calculate_crc(&mut self) {
307        self.primary.update_crc();
308        for b in &mut self.canonicals {
309            b.update_crc();
310        }
311    }
312
313    /// Get first extension block matching the block type
314    pub fn extension_block_by_type(
315        &self,
316        block_type: CanonicalBlockType,
317    ) -> Option<&CanonicalBlock> {
318        self.canonicals
319            .iter()
320            .find(|&b| b.block_type == block_type && b.extension_validation().is_ok())
321    }
322    /// Get mutable reference for first extension block matching the block type
323    pub fn extension_block_by_type_mut(
324        &mut self,
325        block_type: CanonicalBlockType,
326    ) -> Option<&mut CanonicalBlock> {
327        self.canonicals
328            .iter_mut()
329            .find(|b| b.block_type == block_type && b.extension_validation().is_ok())
330    }
331
332    /// Serialize bundle as CBOR encoded byte buffer.
333    pub fn to_cbor(&mut self) -> ByteBuffer {
334        self.calculate_crc();
335        let mut bytebuf = serde_cbor::to_vec(&self).expect("Error serializing bundle as cbor.");
336        bytebuf[0] = 0x9f; // TODO: fix hack, indefinite-length array encoding
337        bytebuf.push(0xff); // break mark
338        bytebuf
339    }
340
341    /// Serialize bundle as JSON encoded string.
342    pub fn to_json(&mut self) -> String {
343        self.calculate_crc();
344        serde_json::to_string(&self).unwrap()
345    }
346
347    /// ID returns a kind of uniquene representation of this bundle, containing
348    /// the souce node and creation timestamp. If this bundle is a fragment, the
349    /// offset is also present.
350    pub fn id(&self) -> String {
351        let src = self.primary.source.to_string();
352        let mut id = format!(
353            "{}-{}-{}",
354            // should IDs contain trailing '/' in the source?
355            // src.strip_suffix('/').unwrap_or(&src),
356            src,
357            self.primary.creation_timestamp.dtntime(),
358            self.primary.creation_timestamp.seqno(),
359            //self.primary.destination
360        );
361        if self.primary.has_fragmentation() {
362            id = format!("{}-{}", id, self.primary.fragmentation_offset);
363        }
364        id
365    }
366
367    /// Update extension blocks such as hop count, bundle age and previous node.
368    /// Return true if all successful, omit missing blocks.
369    /// Return false if hop count is exceeded, bundle age exceeds life time or bundle lifetime itself is exceeded
370    pub fn update_extensions(&mut self, local_node: EndpointID, residence_time: u128) -> bool {
371        if let Some(hcblock) = self.extension_block_by_type_mut(HOP_COUNT_BLOCK) {
372            hcblock.hop_count_increase();
373            if hcblock.hop_count_exceeded() {
374                return false;
375            }
376        }
377        if let Some(pnblock) = self.extension_block_by_type_mut(PREVIOUS_NODE_BLOCK) {
378            pnblock.previous_node_update(local_node);
379        }
380        if let Some(bablock) = self.extension_block_by_type_mut(BUNDLE_AGE_BLOCK) {
381            if let Some(ba_orig) = bablock.bundle_age_get() {
382                bablock.bundle_age_update(ba_orig + residence_time);
383                if ba_orig + residence_time > self.primary.lifetime.as_micros() {
384                    // TODO: check lifetime exceeded calculations with rfc
385                    return false;
386                }
387            }
388        }
389        !self.primary.is_lifetime_exceeded()
390    }
391
392    /// Return the previous node of a bundle should a Previous Node Block exist
393    pub fn previous_node(&self) -> Option<&EndpointID> {
394        let pnblock = self.extension_block_by_type(PREVIOUS_NODE_BLOCK)?;
395        pnblock.previous_node_get()
396    }
397}
398
399impl fmt::Display for Bundle {
400    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
401        write!(f, "{}_{}", self.id(), self.primary.destination)
402    }
403}
404
405/// Deserialize from CBOR byte buffer.
406impl TryFrom<ByteBuffer> for Bundle {
407    type Error = Error;
408
409    fn try_from(item: ByteBuffer) -> Result<Self, Self::Error> {
410        match serde_cbor::from_slice(&item) {
411            Ok(bndl) => Ok(bndl),
412            Err(err) => Err(err.into()),
413        }
414    }
415}
416
417/// Deserialize from CBOR byte slice.
418impl TryFrom<&[u8]> for Bundle {
419    type Error = Error;
420
421    fn try_from(item: &[u8]) -> Result<Self, Self::Error> {
422        match serde_cbor::from_slice(item) {
423            Ok(bndl) => Ok(bndl),
424            Err(err) => Err(err.into()),
425        }
426    }
427}
428
429/// Deserialize from JSON string.
430impl TryFrom<String> for Bundle {
431    type Error = Error;
432
433    fn try_from(item: String) -> Result<Self, Self::Error> {
434        match serde_json::from_str(&item) {
435            Ok(bndl) => Ok(bndl),
436            Err(err) => Err(err.into()),
437        }
438    }
439}
440
441/// Creates a new bundle with the given endpoints, a hop count block
442///  and a payload block.
443/// CRC is set to CrcNo by default and the lifetime is set to 60 * 60 seconds.
444pub fn new_std_payload_bundle(src: EndpointID, dst: EndpointID, data: ByteBuffer) -> Bundle {
445    let flags: BundleControlFlags = BundleControlFlags::BUNDLE_MUST_NOT_FRAGMENTED
446        | BundleControlFlags::BUNDLE_STATUS_REQUEST_DELIVERY;
447    let pblock = crate::primary::PrimaryBlockBuilder::default()
448        .bundle_control_flags(flags.bits())
449        .destination(dst)
450        .source(src.clone())
451        .report_to(src)
452        .creation_timestamp(CreationTimestamp::now())
453        .lifetime(core::time::Duration::from_secs(60 * 60))
454        .build()
455        .unwrap();
456    let mut b = crate::bundle::Bundle::new(
457        pblock,
458        vec![
459            new_payload_block(BlockControlFlags::empty(), data),
460            new_hop_count_block(2, BlockControlFlags::empty(), 32),
461        ],
462    );
463    b.set_crc(crate::crc::CRC_NO);
464    b.sort_canonicals();
465    b
466}