zerodds-xrce 1.0.0-rc.1

DDS-XRCE Wire-Codec (16 Submessages, MessageHeader, RFC-1982, UDP-Mapping)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
// SPDX-License-Identifier: Apache-2.0
// Copyright 2026 ZeroDDS Contributors

//! XRCE FRAGMENT-Reassembler (Spec §8.4.13).
//!
//! Spec §8.3.5.14 erlaubt FRAGMENT-Submessages nur in reliable Streams.
//! Jedes Fragment traegt einen Body-Slice der zerlegten Original-
//! Submessage und ein `LAST`-Flag (siehe `crate::submessages::fragment`).
//!
//! Der Reassembler ist tick-frei: Fragmente werden via `add_fragment`
//! eingespielt; sobald `LAST` empfangen ist und alle Bytes-Bereiche luecken-
//! frei sind, wird `Some(Vec<u8>)` zurueckgeliefert. Bis dahin behaelt der
//! Reassembler eine einzige Pre-Allokation pro `(stream, base_seq)`.
//!
//! ## DoS-Caps
//! - `MAX_FRAGMENTS_PER_STREAM = 256`: maximale Anzahl Fragmente pro
//!   in-flight Reassembly. 256 deckt 256·1500 ≈ 384 KiB ab — mehr als
//!   genug fuer den realistischen Use-Case.
//! - `MAX_TOTAL_PAYLOAD = 1 MiB`: harter Cap fuer den reassemblierten
//!   Payload. Wird auch zum Pre-Allocation-Limit genutzt (kein
//!   "behauptete 4 GiB" → OOM).
//! - `MAX_PENDING_STREAMS = 32`: gleichzeitig zulaessig laufende
//!   Reassemblies. Schuetzt vor "1000 Streams a 1 Fragment".
//! - GC: jedes Fragment hat einen `arrival`-Tick; alte Reassemblies
//!   werden via `gc(now)` nach `gc_ttl` verworfen.

extern crate alloc;
use alloc::collections::BTreeMap;
use alloc::vec::Vec;
use core::time::Duration;

use crate::error::XrceError;

/// DoS-Cap: maximale Fragmente pro in-flight Reassembly.
pub const MAX_FRAGMENTS_PER_STREAM: usize = 256;
/// DoS-Cap: maximale Gesamtgroesse einer reassemblierten Submessage.
pub const MAX_TOTAL_PAYLOAD: usize = 1 << 20; // 1 MiB
/// DoS-Cap: maximale Anzahl gleichzeitig laufender Reassemblies.
pub const MAX_PENDING_STREAMS: usize = 32;
/// Default-TTL fuer abgelaufene Reassemblies (10 s).
pub const DEFAULT_GC_TTL: Duration = Duration::from_secs(10);

/// Schluessel einer in-flight Reassembly: `(stream_id, base_seqnr)`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct AssemblerKey {
    /// Reliable Stream-Id.
    pub stream_id: u8,
    /// Base-Sequence-Number der ersten Fragment-Submessage.
    pub base_seq: u16,
}

#[derive(Debug, Clone)]
struct PendingAssembly {
    /// Geordnete Map `offset → bytes`. Sobald `last_offset_end`
    /// gesetzt und alle Slots zusammenhaengend sind, ist die
    /// Reassembly fertig.
    fragments: BTreeMap<u32, Vec<u8>>,
    /// Wenn `LAST` empfangen ist, das End-Offset (offset + len) des
    /// letzten Fragments. Markiert die Gesamtlaenge.
    final_size: Option<u32>,
    /// Tick des letzten Inputs — fuer GC.
    last_arrival: Duration,
}

impl PendingAssembly {
    fn new(now: Duration) -> Self {
        Self {
            fragments: BTreeMap::new(),
            final_size: None,
            last_arrival: now,
        }
    }

    fn fragment_count(&self) -> usize {
        self.fragments.len()
    }

    fn current_total(&self) -> usize {
        self.fragments.values().map(Vec::len).sum()
    }

    /// Ist die Reassembly luecken-frei und hat das letzte Fragment?
    fn is_complete(&self) -> bool {
        let Some(target) = self.final_size else {
            return false;
        };
        let mut cursor: u32 = 0;
        for (&offset, bytes) in &self.fragments {
            if offset != cursor {
                return false;
            }
            cursor = cursor.saturating_add(bytes.len() as u32);
        }
        cursor == target
    }

    fn assemble(self) -> Vec<u8> {
        let mut out = Vec::with_capacity(self.final_size.unwrap_or(0) as usize);
        for (_, frag) in self.fragments {
            out.extend_from_slice(&frag);
        }
        out
    }
}

/// FRAGMENT-Reassembler ueber alle reliable Streams.
#[derive(Debug, Clone, Default)]
pub struct FragmentAssembler {
    pending: BTreeMap<AssemblerKey, PendingAssembly>,
    gc_ttl: Duration,
    drop_count: u64,
}

impl FragmentAssembler {
    /// Konstruktor mit Default-TTL.
    #[must_use]
    pub fn new() -> Self {
        Self {
            pending: BTreeMap::new(),
            gc_ttl: DEFAULT_GC_TTL,
            drop_count: 0,
        }
    }

    /// Mit explizit gesetzter GC-TTL.
    #[must_use]
    pub fn with_gc_ttl(ttl: Duration) -> Self {
        Self {
            pending: BTreeMap::new(),
            gc_ttl: ttl,
            drop_count: 0,
        }
    }

    /// Anzahl gleichzeitig laufender Reassemblies.
    #[must_use]
    pub fn pending_count(&self) -> usize {
        self.pending.len()
    }

    /// Anzahl insgesamt verworfener Fragmente (DoS-Diagnose).
    #[must_use]
    pub fn drop_count(&self) -> u64 {
        self.drop_count
    }

    /// Spielt ein Fragment ein. `offset` ist der Byte-Offset des
    /// Fragments innerhalb der zerlegten Original-Submessage.
    /// Liefert `Some(payload)`, wenn die Reassembly mit diesem Aufruf
    /// vollstaendig wird; `None` sonst.
    ///
    /// # Errors
    /// - `PayloadTooLarge`, wenn die Reassembly `MAX_TOTAL_PAYLOAD`
    ///   ueberschreiten wuerde.
    /// - `ValueOutOfRange`, wenn ein Cap erreicht ist
    ///   (`MAX_FRAGMENTS_PER_STREAM`, `MAX_PENDING_STREAMS`) oder ein
    ///   Fragment-Overlap entsteht.
    pub fn add_fragment(
        &mut self,
        key: AssemblerKey,
        offset: u32,
        last_flag: bool,
        bytes: Vec<u8>,
        now: Duration,
    ) -> Result<Option<Vec<u8>>, XrceError> {
        let new_end = offset
            .checked_add(bytes.len() as u32)
            .ok_or(XrceError::ValueOutOfRange {
                message: "fragment offset+len overflow",
            })?;
        if (new_end as usize) > MAX_TOTAL_PAYLOAD {
            self.drop_count = self.drop_count.saturating_add(1);
            return Err(XrceError::PayloadTooLarge {
                limit: MAX_TOTAL_PAYLOAD,
                actual: new_end as usize,
            });
        }

        // Neuen Pending-Bucket anlegen ggf. mit Stream-Cap-Check.
        let entry = match self.pending.get_mut(&key) {
            Some(e) => e,
            None => {
                if self.pending.len() >= MAX_PENDING_STREAMS {
                    self.drop_count = self.drop_count.saturating_add(1);
                    return Err(XrceError::ValueOutOfRange {
                        message: "fragment assembler max-pending-streams reached",
                    });
                }
                self.pending
                    .entry(key)
                    .or_insert_with(|| PendingAssembly::new(now))
            }
        };
        entry.last_arrival = now;

        if entry.fragment_count() >= MAX_FRAGMENTS_PER_STREAM {
            self.drop_count = self.drop_count.saturating_add(1);
            return Err(XrceError::ValueOutOfRange {
                message: "fragment assembler max-fragments-per-stream reached",
            });
        }

        // Ueberlapp-Check (kein gleicher offset, kein Overlap).
        if entry.fragments.contains_key(&offset) {
            // duplicates dropped
            self.drop_count = self.drop_count.saturating_add(1);
            return Ok(None);
        }
        // Pruefen ob `[offset, new_end)` mit existierenden Ranges
        // ueberlappt: hole groessten key <= offset und kleinsten key > offset.
        let prev = entry.fragments.range(..=offset).next_back();
        if let Some((&po, pb)) = prev {
            let pe = po + pb.len() as u32;
            if pe > offset {
                self.drop_count = self.drop_count.saturating_add(1);
                return Err(XrceError::ValueOutOfRange {
                    message: "overlapping fragment",
                });
            }
        }
        let next = entry.fragments.range(offset..).next();
        if let Some((&no, _)) = next {
            if new_end > no {
                self.drop_count = self.drop_count.saturating_add(1);
                return Err(XrceError::ValueOutOfRange {
                    message: "overlapping fragment",
                });
            }
        }

        // Total-Cap nach Insert?
        if entry.current_total() + bytes.len() > MAX_TOTAL_PAYLOAD {
            self.drop_count = self.drop_count.saturating_add(1);
            return Err(XrceError::PayloadTooLarge {
                limit: MAX_TOTAL_PAYLOAD,
                actual: entry.current_total() + bytes.len(),
            });
        }

        entry.fragments.insert(offset, bytes);
        if last_flag {
            entry.final_size = Some(new_end);
        }

        let complete = entry.is_complete();
        if complete {
            if let Some(done) = self.pending.remove(&key) {
                return Ok(Some(done.assemble()));
            }
        }
        Ok(None)
    }

    /// GC: verwirft alle Reassemblies, deren letztes Fragment laenger als
    /// `gc_ttl` her ist. Liefert die Anzahl verworfener Buckets.
    pub fn gc(&mut self, now: Duration) -> usize {
        let cutoff = now.saturating_sub(self.gc_ttl);
        let to_drop: Vec<AssemblerKey> = self
            .pending
            .iter()
            .filter(|(_, p)| p.last_arrival < cutoff)
            .map(|(k, _)| *k)
            .collect();
        let n = to_drop.len();
        for k in to_drop {
            self.pending.remove(&k);
            self.drop_count = self.drop_count.saturating_add(1);
        }
        n
    }

    /// Setzt alle Pending-Buckets zurueck (z.B. bei `RESET`).
    pub fn reset(&mut self) {
        let n = self.pending.len() as u64;
        self.pending.clear();
        self.drop_count = self.drop_count.saturating_add(n);
    }
}

#[cfg(test)]
mod tests {
    #![allow(clippy::expect_used, clippy::unwrap_used)]
    use super::*;

    fn k() -> AssemblerKey {
        AssemblerKey {
            stream_id: 0x80,
            base_seq: 0,
        }
    }

    #[test]
    fn happy_path_two_fragments() {
        let mut a = FragmentAssembler::new();
        let r = a
            .add_fragment(k(), 0, false, vec![1, 2, 3, 4], Duration::ZERO)
            .unwrap();
        assert!(r.is_none());
        let r = a
            .add_fragment(k(), 4, true, vec![5, 6, 7, 8], Duration::ZERO)
            .unwrap();
        assert_eq!(r.unwrap(), vec![1, 2, 3, 4, 5, 6, 7, 8]);
        assert_eq!(a.pending_count(), 0);
    }

    #[test]
    fn out_of_order_reassembly() {
        let mut a = FragmentAssembler::new();
        a.add_fragment(k(), 4, true, vec![5, 6, 7, 8], Duration::ZERO)
            .unwrap();
        let r = a
            .add_fragment(k(), 0, false, vec![1, 2, 3, 4], Duration::ZERO)
            .unwrap();
        assert_eq!(r.unwrap(), vec![1, 2, 3, 4, 5, 6, 7, 8]);
    }

    #[test]
    fn missing_middle_fragment_does_not_complete() {
        let mut a = FragmentAssembler::new();
        a.add_fragment(k(), 0, false, vec![1, 2], Duration::ZERO)
            .unwrap();
        let r = a
            .add_fragment(k(), 4, true, vec![5, 6], Duration::ZERO)
            .unwrap();
        assert!(r.is_none());
        assert_eq!(a.pending_count(), 1);
    }

    #[test]
    fn duplicate_offset_dropped() {
        let mut a = FragmentAssembler::new();
        a.add_fragment(k(), 0, false, vec![1, 2], Duration::ZERO)
            .unwrap();
        let before = a.drop_count();
        let r = a
            .add_fragment(k(), 0, false, vec![9, 9], Duration::ZERO)
            .unwrap();
        assert!(r.is_none());
        assert_eq!(a.drop_count(), before + 1);
    }

    #[test]
    fn overlapping_fragments_rejected() {
        let mut a = FragmentAssembler::new();
        a.add_fragment(k(), 0, false, vec![1, 2, 3, 4], Duration::ZERO)
            .unwrap();
        let res = a.add_fragment(k(), 2, false, vec![9, 9], Duration::ZERO);
        assert!(matches!(res, Err(XrceError::ValueOutOfRange { .. })));
    }

    #[test]
    fn dos_cap_max_total_payload() {
        let mut a = FragmentAssembler::new();
        let res = a.add_fragment(
            k(),
            0,
            true,
            vec![0u8; MAX_TOTAL_PAYLOAD + 1],
            Duration::ZERO,
        );
        assert!(matches!(res, Err(XrceError::PayloadTooLarge { .. })));
    }

    #[test]
    fn dos_cap_max_pending_streams() {
        let mut a = FragmentAssembler::new();
        for i in 0..MAX_PENDING_STREAMS as u16 {
            let key = AssemblerKey {
                stream_id: 0x80,
                base_seq: i,
            };
            a.add_fragment(key, 0, false, vec![0u8; 4], Duration::ZERO)
                .unwrap();
        }
        let key = AssemblerKey {
            stream_id: 0x80,
            base_seq: 999,
        };
        let res = a.add_fragment(key, 0, false, vec![0u8; 4], Duration::ZERO);
        assert!(matches!(res, Err(XrceError::ValueOutOfRange { .. })));
    }

    #[test]
    fn dos_cap_max_fragments_per_stream() {
        let mut a = FragmentAssembler::new();
        for i in 0..MAX_FRAGMENTS_PER_STREAM as u32 {
            a.add_fragment(k(), i * 4, false, vec![0u8; 4], Duration::ZERO)
                .unwrap();
        }
        let res = a.add_fragment(
            k(),
            (MAX_FRAGMENTS_PER_STREAM as u32) * 4,
            true,
            vec![0u8; 4],
            Duration::ZERO,
        );
        assert!(matches!(res, Err(XrceError::ValueOutOfRange { .. })));
    }

    #[test]
    fn gc_drops_stale_assemblies() {
        let mut a = FragmentAssembler::with_gc_ttl(Duration::from_secs(5));
        a.add_fragment(k(), 0, false, vec![1, 2], Duration::ZERO)
            .unwrap();
        assert_eq!(a.pending_count(), 1);
        // 4s spaeter → noch nicht abgelaufen
        assert_eq!(a.gc(Duration::from_secs(4)), 0);
        assert_eq!(a.pending_count(), 1);
        // 11s spaeter → abgelaufen
        assert_eq!(a.gc(Duration::from_secs(11)), 1);
        assert_eq!(a.pending_count(), 0);
    }

    #[test]
    fn reset_clears_all() {
        let mut a = FragmentAssembler::new();
        a.add_fragment(k(), 0, false, vec![1], Duration::ZERO)
            .unwrap();
        a.reset();
        assert_eq!(a.pending_count(), 0);
        assert!(a.drop_count() >= 1);
    }
}