luanti_protocol/peer/
split_receiver.rs1use crate::wire::packet::SplitBody;
2use crate::wire::sequence_number::WrappingSequenceNumber;
3use anyhow::bail;
4use log::warn;
5use std::collections::BTreeMap;
6use std::collections::HashMap;
7use std::time::Duration;
8use std::time::Instant;
9
10const SPLIT_TIMEOUT: Duration = Duration::from_secs(30);
11
12pub(super) struct IncomingBuffer {
13 chunk_count: u16,
14 chunks: BTreeMap<u16, Vec<u8>>,
15 timeout: Instant,
16}
17
18impl IncomingBuffer {
19 fn new(now: Instant, chunk_count: u16) -> Self {
20 Self {
21 chunk_count,
22 chunks: BTreeMap::new(),
23 timeout: now + SPLIT_TIMEOUT,
24 }
25 }
26
27 fn push(&mut self, now: Instant, body: SplitBody) -> anyhow::Result<bool> {
30 if body.chunk_count != self.chunk_count {
31 bail!("Split packet corrupt: chunk_count mismatch");
32 } else if body.chunk_num >= self.chunk_count {
33 bail!("Split packet corrupt: chunk_num >= chunk_count");
34 }
35 self.timeout = now + SPLIT_TIMEOUT;
36 if self
37 .chunks
38 .insert(body.chunk_num, body.chunk_data)
39 .is_some()
40 {
41 warn!("received duplicate packet for chunk #{}", body.chunk_num);
42 };
43 Ok(self.chunks.len() == self.chunk_count as usize)
44 }
45
46 fn take(self) -> Vec<u8> {
47 assert_eq!(
48 self.chunks.len(),
49 self.chunk_count as usize,
50 "chunk count mismatch"
51 );
52 let total_size: usize = self.chunks.values().map(Vec::len).sum();
54 let mut buf = Vec::with_capacity(total_size);
55 for chunk in self.chunks.values() {
56 buf.extend_from_slice(chunk);
57 }
58 assert_eq!(buf.len(), total_size, "buffer length mismatch");
59 buf
60 }
61}
62
63pub(super) struct SplitReceiver {
64 pending: HashMap<WrappingSequenceNumber, IncomingBuffer>,
65}
66
67impl SplitReceiver {
68 pub(super) fn new() -> Self {
69 Self {
70 pending: HashMap::new(),
71 }
72 }
73
74 pub(super) fn push(
77 &mut self,
78 now: Instant,
79 body: SplitBody,
80 ) -> anyhow::Result<Option<Vec<u8>>> {
81 let seqnum = body.seqnum;
82 let should_take = self
83 .pending
84 .entry(seqnum)
85 .or_insert_with(|| IncomingBuffer::new(now, body.chunk_count))
86 .push(now, body)?;
87
88 if should_take {
89 Ok(Some(self.pending.remove(&seqnum).unwrap().take()))
90 } else {
91 Ok(None)
92 }
93 }
94}