Skip to main content

sendcipher_core/
chunking.rs

1/* Created on 2025-10-27 */
2/* Copyright (c) 2025-2026 Youcef Lemsafer */
3/* SPDX-License-Identifier: MIT */
4
5use crate::span::*;
6use crate::span_generator::*;
7use std::sync::Arc;
8
9#[derive(Clone)]
10pub struct Chunk {
11    span: Span,
12    data: Arc<Vec<u8>>,
13}
14pub trait ChunkGenerator: Send + Sync {
15    fn process_data(&mut self, data: &[u8]) -> Vec<Chunk>;
16    fn signal_eos(&mut self) -> Vec<Chunk>;
17    /// Total number of chunks generated
18    fn chunks_count(&self) -> u64;
19    /// Total number of bytes received by the generator
20    fn chunked_bytes_count(&self) -> u64;
21}
22
23impl Chunk {
24    pub fn index(&self) -> u64 {
25        self.span.index()
26    }
27    pub fn size(&self) -> u64 {
28        self.data.len() as u64
29    }
30    pub fn data(&self) -> &Vec<u8> {
31        &self.data
32    }
33    pub fn span(&self) -> &Span {
34        &self.span
35    }
36}
37
38struct ProtoChunk {
39    span: Span,
40    data: Vec<u8>,
41    is_ready: bool,
42}
43
44impl ProtoChunk {
45    pub fn index(&self) -> u64 {
46        self.span.index()
47    }
48    pub fn size(&self) -> u64 {
49        self.data.len() as u64
50    }
51    pub fn data(&self) -> &Vec<u8> {
52        &self.data
53    }
54    pub fn data_mut(&mut self) -> &mut Vec<u8> {
55        &mut self.data
56    }
57    pub fn span(&self) -> &Span {
58        &self.span
59    }
60    pub fn span_mut(&mut self) -> &mut Span {
61        &mut self.span
62    }
63    pub fn is_ready(&self) -> bool {
64        self.is_ready
65    }
66    pub fn set_is_ready(&mut self, is_ready: bool) {
67        self.is_ready = is_ready;
68    }
69}
70
71pub struct RandomChunkGenerator {
72    /// Spans generator used for chunk sizes computation
73    span_generator: SpanGenerator,
74    chunks: Vec<ProtoChunk>,
75    chunks_count: u64,
76    chunked_bytes: u64,
77}
78
79impl ChunkGenerator for RandomChunkGenerator {
80    fn process_data(&mut self, data: &[u8]) -> Vec<Chunk> {
81        let mut remaining_data_offset = 0 as usize;
82        while remaining_data_offset != data.len() {
83            let mut available_space = if self.chunks.is_empty() {
84                0u64
85            } else {
86                let last_chunk = self.chunks.last().unwrap();
87                last_chunk.span().size() - last_chunk.size() as u64
88            };
89            if available_space == 0 {
90                self.add_new_chunk();
91                available_space = self.chunks.last().unwrap().span().size();
92            }
93            let copyable_amount =
94                ((data.len() - remaining_data_offset) as usize).min(available_space as usize);
95            let last_chunk = self.chunks.last_mut().unwrap();
96            last_chunk.data_mut().extend_from_slice(
97                &data[remaining_data_offset..remaining_data_offset + copyable_amount],
98            );
99            remaining_data_offset += copyable_amount;
100        }
101        self.chunked_bytes += data.len() as u64;
102        self.update_chunk_readiness();
103
104        let ready_count = self.chunks.iter().take_while(|c| c.is_ready()).count();
105        self.chunks
106            .drain(0..ready_count)
107            .map(|x| Chunk {
108                span: x.span,
109                data: Arc::new(x.data),
110            })
111            .collect()
112    }
113
114    fn signal_eos(&mut self) -> Vec<Chunk> {
115        self.adjust_last_chunks();
116        self.chunks
117            .drain(0..)
118            .map(|x| Chunk {
119                span: x.span,
120                data: Arc::new(x.data),
121            })
122            .collect()
123    }
124
125    fn chunks_count(&self) -> u64 {
126        self.chunks_count
127    }
128
129    fn chunked_bytes_count(&self) -> u64 {
130        self.chunked_bytes
131    }
132}
133
134impl RandomChunkGenerator {
135    pub fn new(chunking_threshold: u64, min_chunk_size: u64, max_chunk_size: u64) -> Self {
136        Self {
137            span_generator: SpanGenerator::new(chunking_threshold, min_chunk_size, max_chunk_size),
138            chunks: Vec::new(),
139            chunks_count: 0,
140            chunked_bytes: 0u64,
141        }
142    }
143
144    pub fn with_seed(
145        chunking_threshold: u64,
146        min_chunk_size: u64,
147        max_chunk_size: u64,
148        seed: u128,
149    ) -> Self {
150        Self {
151            span_generator: SpanGenerator::with_seed(
152                chunking_threshold,
153                min_chunk_size,
154                max_chunk_size,
155                seed,
156            ),
157            chunks: Vec::new(),
158            chunks_count: 0,
159            chunked_bytes: 0u64,
160        }
161    }
162
163    fn add_new_chunk(&mut self) -> &mut ProtoChunk {
164        let chunk = ProtoChunk {
165            span: self.span_generator.next_span(),
166            data: Vec::new(),
167            is_ready: false,
168        };
169        self.chunks.push(chunk);
170        self.chunks_count += 1;
171        self.chunks.last_mut().unwrap()
172    }
173
174    /// Updates the chunks readiness status
175    ///
176    /// This has to be called after each slice of data received by process_data
177    fn update_chunk_readiness(&mut self) {
178        // Skip last two because the last two can become ready only after finalize is called.
179        self.chunks
180            .iter_mut()
181            .rev()
182            .skip(2)
183            .take_while(|chnk| !chnk.is_ready())
184            .for_each(|chnk| chnk.set_is_ready(true));
185    }
186
187    /// Performs the necessary adjustements when the final size is known (i.e. upon finalization).
188    pub fn adjust_last_chunks(&mut self) {
189        // Now all chunks are ready, we mark them as such
190        self.chunks
191            .iter_mut()
192            .take_while(|chnk| !chnk.is_ready())
193            .for_each(|chnk| chnk.set_is_ready(true));
194        let opt_first_changed_index = self.span_generator.finalize(self.chunked_bytes);
195        // Move data over if at least one of the last two spans changed
196        if opt_first_changed_index.is_none() {
197            return;
198        }
199        let first_changed_index = opt_first_changed_index.unwrap();
200        // Get the first changed span/chunk
201        let pos_in_chunks = self
202            .chunks
203            .iter()
204            .rposition(|chnk| chnk.index() == first_changed_index)
205            .unwrap();
206        let last_spans = self.span_generator.last_spans();
207        let pos_in_spans = last_spans
208            .iter()
209            .rposition(|sp| sp.index() == first_changed_index)
210            .unwrap();
211        let chunk_data_len = self.chunks[pos_in_chunks].data().len();
212        if chunk_data_len as u64 > last_spans[pos_in_spans].size() {
213            // Move surplus to next chunk
214            let surplus = chunk_data_len - last_spans[pos_in_spans].size() as usize;
215            let (left_chunks, right_chunks) = self.chunks.split_at_mut(pos_in_chunks + 1);
216            let lchunk = left_chunks.last_mut().unwrap();
217            let rchunk = right_chunks.first_mut().unwrap();
218            rchunk
219                .data_mut()
220                .splice(0..0, lchunk.data_mut().split_off(chunk_data_len - surplus));
221        } else if (chunk_data_len as u64) < last_spans[pos_in_spans].size() {
222            // Take deficit from next chunk if any
223            if pos_in_chunks + 1 < self.chunks.len() {
224                let deficit = last_spans[pos_in_spans].size() as usize - chunk_data_len;
225                let (left_chunks, right_chunks) = self.chunks.split_at_mut(pos_in_chunks + 1);
226                let lchunk = left_chunks.last_mut().unwrap();
227                let rchunk = right_chunks.first_mut().unwrap();
228                lchunk
229                    .data_mut()
230                    .extend(rchunk.data_mut().drain(0..deficit));
231            }
232        }
233        // @todo: what if last chunk size became 0 ???
234        self.chunks
235            .iter_mut()
236            .skip(pos_in_chunks)
237            .zip(last_spans.iter().skip(pos_in_spans))
238            .for_each(|(chnk, spn)| *chnk.span_mut() = spn.clone());
239    }
240}