luanti_protocol/peer/
split_receiver.rs

1use crate::wire::packet::SplitBody;
2use crate::wire::sequence_number::WrappingSequenceNumber;
3use anyhow::bail;
4use log::warn;
5use std::collections::BTreeMap;
6use std::collections::HashMap;
7use std::time::Duration;
8use std::time::Instant;
9
10const SPLIT_TIMEOUT: Duration = Duration::from_secs(30);
11
12pub(super) struct IncomingBuffer {
13    chunk_count: u16,
14    chunks: BTreeMap<u16, Vec<u8>>,
15    timeout: Instant,
16}
17
18impl IncomingBuffer {
19    fn new(now: Instant, chunk_count: u16) -> Self {
20        Self {
21            chunk_count,
22            chunks: BTreeMap::new(),
23            timeout: now + SPLIT_TIMEOUT,
24        }
25    }
26
27    /// Push a new split packet into the split receiver
28    /// If a command has become ready as a result, true is returned.
29    fn push(&mut self, now: Instant, body: SplitBody) -> anyhow::Result<bool> {
30        if body.chunk_count != self.chunk_count {
31            bail!("Split packet corrupt: chunk_count mismatch");
32        } else if body.chunk_num >= self.chunk_count {
33            bail!("Split packet corrupt: chunk_num >= chunk_count");
34        }
35        self.timeout = now + SPLIT_TIMEOUT;
36        if self
37            .chunks
38            .insert(body.chunk_num, body.chunk_data)
39            .is_some()
40        {
41            warn!("received duplicate packet for chunk #{}", body.chunk_num);
42        };
43        Ok(self.chunks.len() == self.chunk_count as usize)
44    }
45
46    fn take(self) -> Vec<u8> {
47        assert_eq!(
48            self.chunks.len(),
49            self.chunk_count as usize,
50            "chunk count mismatch"
51        );
52        // TODO replace with `flatten`
53        let total_size: usize = self.chunks.values().map(Vec::len).sum();
54        let mut buf = Vec::with_capacity(total_size);
55        for chunk in self.chunks.values() {
56            buf.extend_from_slice(chunk);
57        }
58        assert_eq!(buf.len(), total_size, "buffer length mismatch");
59        buf
60    }
61}
62
63pub(super) struct SplitReceiver {
64    pending: HashMap<WrappingSequenceNumber, IncomingBuffer>,
65}
66
67impl SplitReceiver {
68    pub(super) fn new() -> Self {
69        Self {
70            pending: HashMap::new(),
71        }
72    }
73
74    /// Push a split packet for reconstruction
75    /// Returns the finished command if it is ready
76    pub(super) fn push(
77        &mut self,
78        now: Instant,
79        body: SplitBody,
80    ) -> anyhow::Result<Option<Vec<u8>>> {
81        let seqnum = body.seqnum;
82        let should_take = self
83            .pending
84            .entry(seqnum)
85            .or_insert_with(|| IncomingBuffer::new(now, body.chunk_count))
86            .push(now, body)?;
87
88        if should_take {
89            Ok(Some(self.pending.remove(&seqnum).unwrap().take()))
90        } else {
91            Ok(None)
92        }
93    }
94}