Skip to main content

sdtn/store/
bundle_descriptor.rs

1use crate::bpv7::bundle::Bundle;
2use crate::bpv7::endpoint::EndpointId;
3use std::collections::HashSet;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6/// BundleDescriptor manages the forwarding state of a bundle
7/// It tracks which endpoints have already received this bundle to prevent duplicates
8#[derive(Debug, Clone)]
9pub struct BundleDescriptor {
10    pub bundle: Bundle,
11    pub already_sent: HashSet<EndpointId>,
12    pub forwarding_attempts: u32,
13    pub created_at: u64,
14}
15
16impl BundleDescriptor {
17    pub fn new(bundle: Bundle) -> Self {
18        let now = SystemTime::now()
19            .duration_since(UNIX_EPOCH)
20            .unwrap()
21            .as_secs();
22
23        Self {
24            bundle,
25            already_sent: HashSet::new(),
26            forwarding_attempts: 0,
27            created_at: now,
28        }
29    }
30
31    pub fn mark_sent(&mut self, eid: EndpointId) {
32        self.already_sent.insert(eid);
33    }
34
35    pub fn get_already_sent(&self) -> &HashSet<EndpointId> {
36        &self.already_sent
37    }
38
39    /// Check if this bundle has been sent to a specific endpoint
40    pub fn has_been_sent_to(&self, eid: &EndpointId) -> bool {
41        self.already_sent.contains(eid)
42    }
43
44    /// Increment the forwarding attempt counter
45    pub fn increment_forwarding_attempts(&mut self) {
46        self.forwarding_attempts += 1;
47    }
48
49    /// Get the number of forwarding attempts
50    pub fn get_forwarding_attempts(&self) -> u32 {
51        self.forwarding_attempts
52    }
53
54    /// Check if this bundle is ready for forwarding (not expired and not over limit)
55    pub fn is_ready_for_forwarding(&self, max_attempts: u32) -> bool {
56        !self.bundle.is_expired() && self.forwarding_attempts < max_attempts
57    }
58
59    /// Get a unique identifier for this bundle
60    pub fn get_bundle_id(&self) -> String {
61        format!(
62            "{}-{}",
63            self.bundle.primary.source, self.bundle.primary.creation_timestamp
64        )
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use super::*;
71    use crate::bpv7::bundle::Bundle;
72    use std::thread;
73    use std::time::Duration;
74
75    #[test]
76    fn test_bundle_descriptor_new() {
77        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
78        let descriptor = BundleDescriptor::new(bundle.clone());
79
80        assert_eq!(descriptor.bundle.primary.source, bundle.primary.source);
81        assert_eq!(
82            descriptor.bundle.primary.destination,
83            bundle.primary.destination
84        );
85        assert_eq!(descriptor.bundle.payload, bundle.payload);
86        assert_eq!(descriptor.forwarding_attempts, 0);
87        assert!(descriptor.already_sent.is_empty());
88        assert!(descriptor.created_at > 0);
89    }
90
91    #[test]
92    fn test_mark_sent() {
93        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
94        let mut descriptor = BundleDescriptor::new(bundle);
95
96        let eid = EndpointId::from("dtn://peer1");
97        descriptor.mark_sent(eid.clone());
98
99        assert!(descriptor.already_sent.contains(&eid));
100        assert_eq!(descriptor.already_sent.len(), 1);
101    }
102
103    #[test]
104    fn test_mark_sent_multiple() {
105        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
106        let mut descriptor = BundleDescriptor::new(bundle);
107
108        let eid1 = EndpointId::from("dtn://peer1");
109        let eid2 = EndpointId::from("dtn://peer2");
110
111        descriptor.mark_sent(eid1.clone());
112        descriptor.mark_sent(eid2.clone());
113
114        assert!(descriptor.already_sent.contains(&eid1));
115        assert!(descriptor.already_sent.contains(&eid2));
116        assert_eq!(descriptor.already_sent.len(), 2);
117    }
118
119    #[test]
120    fn test_mark_sent_duplicate() {
121        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
122        let mut descriptor = BundleDescriptor::new(bundle);
123
124        let eid = EndpointId::from("dtn://peer1");
125        descriptor.mark_sent(eid.clone());
126        descriptor.mark_sent(eid.clone()); // Duplicate
127
128        assert!(descriptor.already_sent.contains(&eid));
129        assert_eq!(descriptor.already_sent.len(), 1); // Should still be 1
130    }
131
132    #[test]
133    fn test_get_already_sent() {
134        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
135        let mut descriptor = BundleDescriptor::new(bundle);
136
137        let eid1 = EndpointId::from("dtn://peer1");
138        let eid2 = EndpointId::from("dtn://peer2");
139
140        descriptor.mark_sent(eid1.clone());
141        descriptor.mark_sent(eid2.clone());
142
143        let already_sent = descriptor.get_already_sent();
144        assert!(already_sent.contains(&eid1));
145        assert!(already_sent.contains(&eid2));
146        assert_eq!(already_sent.len(), 2);
147    }
148
149    #[test]
150    fn test_has_been_sent_to() {
151        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
152        let mut descriptor = BundleDescriptor::new(bundle);
153
154        let eid1 = EndpointId::from("dtn://peer1");
155        let eid2 = EndpointId::from("dtn://peer2");
156
157        descriptor.mark_sent(eid1.clone());
158
159        assert!(descriptor.has_been_sent_to(&eid1));
160        assert!(!descriptor.has_been_sent_to(&eid2));
161    }
162
163    #[test]
164    fn test_increment_forwarding_attempts() {
165        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
166        let mut descriptor = BundleDescriptor::new(bundle);
167
168        assert_eq!(descriptor.get_forwarding_attempts(), 0);
169
170        descriptor.increment_forwarding_attempts();
171        assert_eq!(descriptor.get_forwarding_attempts(), 1);
172
173        descriptor.increment_forwarding_attempts();
174        assert_eq!(descriptor.get_forwarding_attempts(), 2);
175    }
176
177    #[test]
178    fn test_get_forwarding_attempts() {
179        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
180        let descriptor = BundleDescriptor::new(bundle);
181
182        assert_eq!(descriptor.get_forwarding_attempts(), 0);
183    }
184
185    #[test]
186    fn test_is_ready_for_forwarding_valid() {
187        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
188        let descriptor = BundleDescriptor::new(bundle);
189
190        assert!(descriptor.is_ready_for_forwarding(5));
191        assert!(descriptor.is_ready_for_forwarding(1));
192    }
193
194    #[test]
195    fn test_is_ready_for_forwarding_max_attempts_reached() {
196        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
197        let mut descriptor = BundleDescriptor::new(bundle);
198
199        descriptor.increment_forwarding_attempts();
200        descriptor.increment_forwarding_attempts();
201        descriptor.increment_forwarding_attempts();
202
203        assert!(!descriptor.is_ready_for_forwarding(3));
204        assert!(!descriptor.is_ready_for_forwarding(2));
205        assert!(descriptor.is_ready_for_forwarding(4));
206    }
207
208    #[test]
209    fn test_is_ready_for_forwarding_expired_bundle() {
210        // Create a bundle with very short lifetime
211        let mut bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
212        bundle.primary.lifetime = 1; // 1 second
213
214        let descriptor = BundleDescriptor::new(bundle);
215
216        // Wait for bundle to expire
217        thread::sleep(Duration::from_secs(2));
218
219        assert!(!descriptor.is_ready_for_forwarding(5));
220    }
221
222    #[test]
223    fn test_get_bundle_id() {
224        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
225        let descriptor = BundleDescriptor::new(bundle.clone());
226
227        let bundle_id = descriptor.get_bundle_id();
228        let expected_id = format!(
229            "{}-{}",
230            bundle.primary.source, bundle.primary.creation_timestamp
231        );
232
233        assert_eq!(bundle_id, expected_id);
234    }
235
236    #[test]
237    fn test_get_bundle_id_different_bundles() {
238        let bundle1 = Bundle::new("dtn://src1", "dtn://dest", b"test1".to_vec());
239        let bundle2 = Bundle::new("dtn://src2", "dtn://dest", b"test2".to_vec());
240
241        let descriptor1 = BundleDescriptor::new(bundle1);
242        let descriptor2 = BundleDescriptor::new(bundle2);
243
244        assert_ne!(descriptor1.get_bundle_id(), descriptor2.get_bundle_id());
245    }
246
247    #[test]
248    fn test_bundle_descriptor_clone() {
249        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
250        let mut descriptor = BundleDescriptor::new(bundle);
251
252        let eid = EndpointId::from("dtn://peer1");
253        descriptor.mark_sent(eid.clone());
254        descriptor.increment_forwarding_attempts();
255
256        let cloned = descriptor.clone();
257
258        assert_eq!(
259            descriptor.bundle.primary.source,
260            cloned.bundle.primary.source
261        );
262        assert_eq!(descriptor.forwarding_attempts, cloned.forwarding_attempts);
263        assert_eq!(descriptor.created_at, cloned.created_at);
264        assert!(cloned.already_sent.contains(&eid));
265    }
266
267    #[test]
268    fn test_bundle_descriptor_debug() {
269        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
270        let descriptor = BundleDescriptor::new(bundle);
271
272        let debug_str = format!("{:?}", descriptor);
273        assert!(debug_str.contains("BundleDescriptor"));
274        assert!(debug_str.contains("dtn://src"));
275        assert!(debug_str.contains("dtn://dest"));
276    }
277
278    #[test]
279    fn test_created_at_timestamp() {
280        let bundle1 = Bundle::new("dtn://src", "dtn://dest", b"test1".to_vec());
281        let descriptor1 = BundleDescriptor::new(bundle1);
282
283        thread::sleep(Duration::from_millis(10));
284
285        let bundle2 = Bundle::new("dtn://src", "dtn://dest", b"test2".to_vec());
286        let descriptor2 = BundleDescriptor::new(bundle2);
287
288        assert!(descriptor2.created_at >= descriptor1.created_at);
289    }
290
291    #[test]
292    fn test_complex_forwarding_scenario() {
293        let bundle = Bundle::new("dtn://src", "dtn://dest", b"test".to_vec());
294        let mut descriptor = BundleDescriptor::new(bundle);
295
296        // Mark several peers as sent
297        descriptor.mark_sent(EndpointId::from("dtn://peer1"));
298        descriptor.mark_sent(EndpointId::from("dtn://peer2"));
299        descriptor.mark_sent(EndpointId::from("dtn://peer3"));
300
301        // Increment forwarding attempts
302        descriptor.increment_forwarding_attempts();
303        descriptor.increment_forwarding_attempts();
304
305        // Test state
306        assert_eq!(descriptor.already_sent.len(), 3);
307        assert_eq!(descriptor.forwarding_attempts, 2);
308        assert!(descriptor.has_been_sent_to(&EndpointId::from("dtn://peer1")));
309        assert!(descriptor.has_been_sent_to(&EndpointId::from("dtn://peer2")));
310        assert!(descriptor.has_been_sent_to(&EndpointId::from("dtn://peer3")));
311        assert!(!descriptor.has_been_sent_to(&EndpointId::from("dtn://peer4")));
312        assert!(descriptor.is_ready_for_forwarding(5));
313        assert!(!descriptor.is_ready_for_forwarding(2));
314    }
315}