sendcipher_core/
chunking.rs1use crate::span::*;
6use crate::span_generator::*;
7use std::sync::Arc;
8
9#[derive(Clone)]
10pub struct Chunk {
11 span: Span,
12 data: Arc<Vec<u8>>,
13}
14pub trait ChunkGenerator: Send + Sync {
15 fn process_data(&mut self, data: &[u8]) -> Vec<Chunk>;
16 fn signal_eos(&mut self) -> Vec<Chunk>;
17 fn chunks_count(&self) -> u64;
19 fn chunked_bytes_count(&self) -> u64;
21}
22
23impl Chunk {
24 pub fn index(&self) -> u64 {
25 self.span.index()
26 }
27 pub fn size(&self) -> u64 {
28 self.data.len() as u64
29 }
30 pub fn data(&self) -> &Vec<u8> {
31 &self.data
32 }
33 pub fn span(&self) -> &Span {
34 &self.span
35 }
36}
37
38struct ProtoChunk {
39 span: Span,
40 data: Vec<u8>,
41 is_ready: bool,
42}
43
44impl ProtoChunk {
45 pub fn index(&self) -> u64 {
46 self.span.index()
47 }
48 pub fn size(&self) -> u64 {
49 self.data.len() as u64
50 }
51 pub fn data(&self) -> &Vec<u8> {
52 &self.data
53 }
54 pub fn data_mut(&mut self) -> &mut Vec<u8> {
55 &mut self.data
56 }
57 pub fn span(&self) -> &Span {
58 &self.span
59 }
60 pub fn span_mut(&mut self) -> &mut Span {
61 &mut self.span
62 }
63 pub fn is_ready(&self) -> bool {
64 self.is_ready
65 }
66 pub fn set_is_ready(&mut self, is_ready: bool) {
67 self.is_ready = is_ready;
68 }
69}
70
71pub struct RandomChunkGenerator {
72 span_generator: SpanGenerator,
74 chunks: Vec<ProtoChunk>,
75 chunks_count: u64,
76 chunked_bytes: u64,
77}
78
79impl ChunkGenerator for RandomChunkGenerator {
80 fn process_data(&mut self, data: &[u8]) -> Vec<Chunk> {
81 let mut remaining_data_offset = 0 as usize;
82 while remaining_data_offset != data.len() {
83 let mut available_space = if self.chunks.is_empty() {
84 0u64
85 } else {
86 let last_chunk = self.chunks.last().unwrap();
87 last_chunk.span().size() - last_chunk.size() as u64
88 };
89 if available_space == 0 {
90 self.add_new_chunk();
91 available_space = self.chunks.last().unwrap().span().size();
92 }
93 let copyable_amount =
94 ((data.len() - remaining_data_offset) as usize).min(available_space as usize);
95 let last_chunk = self.chunks.last_mut().unwrap();
96 last_chunk.data_mut().extend_from_slice(
97 &data[remaining_data_offset..remaining_data_offset + copyable_amount],
98 );
99 remaining_data_offset += copyable_amount;
100 }
101 self.chunked_bytes += data.len() as u64;
102 self.update_chunk_readiness();
103
104 let ready_count = self.chunks.iter().take_while(|c| c.is_ready()).count();
105 self.chunks
106 .drain(0..ready_count)
107 .map(|x| Chunk {
108 span: x.span,
109 data: Arc::new(x.data),
110 })
111 .collect()
112 }
113
114 fn signal_eos(&mut self) -> Vec<Chunk> {
115 self.adjust_last_chunks();
116 self.chunks
117 .drain(0..)
118 .map(|x| Chunk {
119 span: x.span,
120 data: Arc::new(x.data),
121 })
122 .collect()
123 }
124
125 fn chunks_count(&self) -> u64 {
126 self.chunks_count
127 }
128
129 fn chunked_bytes_count(&self) -> u64 {
130 self.chunked_bytes
131 }
132}
133
134impl RandomChunkGenerator {
135 pub fn new(chunking_threshold: u64, min_chunk_size: u64, max_chunk_size: u64) -> Self {
136 Self {
137 span_generator: SpanGenerator::new(chunking_threshold, min_chunk_size, max_chunk_size),
138 chunks: Vec::new(),
139 chunks_count: 0,
140 chunked_bytes: 0u64,
141 }
142 }
143
144 pub fn with_seed(
145 chunking_threshold: u64,
146 min_chunk_size: u64,
147 max_chunk_size: u64,
148 seed: u128,
149 ) -> Self {
150 Self {
151 span_generator: SpanGenerator::with_seed(
152 chunking_threshold,
153 min_chunk_size,
154 max_chunk_size,
155 seed,
156 ),
157 chunks: Vec::new(),
158 chunks_count: 0,
159 chunked_bytes: 0u64,
160 }
161 }
162
163 fn add_new_chunk(&mut self) -> &mut ProtoChunk {
164 let chunk = ProtoChunk {
165 span: self.span_generator.next_span(),
166 data: Vec::new(),
167 is_ready: false,
168 };
169 self.chunks.push(chunk);
170 self.chunks_count += 1;
171 self.chunks.last_mut().unwrap()
172 }
173
174 fn update_chunk_readiness(&mut self) {
178 self.chunks
180 .iter_mut()
181 .rev()
182 .skip(2)
183 .take_while(|chnk| !chnk.is_ready())
184 .for_each(|chnk| chnk.set_is_ready(true));
185 }
186
187 pub fn adjust_last_chunks(&mut self) {
189 self.chunks
191 .iter_mut()
192 .take_while(|chnk| !chnk.is_ready())
193 .for_each(|chnk| chnk.set_is_ready(true));
194 let opt_first_changed_index = self.span_generator.finalize(self.chunked_bytes);
195 if opt_first_changed_index.is_none() {
197 return;
198 }
199 let first_changed_index = opt_first_changed_index.unwrap();
200 let pos_in_chunks = self
202 .chunks
203 .iter()
204 .rposition(|chnk| chnk.index() == first_changed_index)
205 .unwrap();
206 let last_spans = self.span_generator.last_spans();
207 let pos_in_spans = last_spans
208 .iter()
209 .rposition(|sp| sp.index() == first_changed_index)
210 .unwrap();
211 let chunk_data_len = self.chunks[pos_in_chunks].data().len();
212 if chunk_data_len as u64 > last_spans[pos_in_spans].size() {
213 let surplus = chunk_data_len - last_spans[pos_in_spans].size() as usize;
215 let (left_chunks, right_chunks) = self.chunks.split_at_mut(pos_in_chunks + 1);
216 let lchunk = left_chunks.last_mut().unwrap();
217 let rchunk = right_chunks.first_mut().unwrap();
218 rchunk
219 .data_mut()
220 .splice(0..0, lchunk.data_mut().split_off(chunk_data_len - surplus));
221 } else if (chunk_data_len as u64) < last_spans[pos_in_spans].size() {
222 if pos_in_chunks + 1 < self.chunks.len() {
224 let deficit = last_spans[pos_in_spans].size() as usize - chunk_data_len;
225 let (left_chunks, right_chunks) = self.chunks.split_at_mut(pos_in_chunks + 1);
226 let lchunk = left_chunks.last_mut().unwrap();
227 let rchunk = right_chunks.first_mut().unwrap();
228 lchunk
229 .data_mut()
230 .extend(rchunk.data_mut().drain(0..deficit));
231 }
232 }
233 self.chunks
235 .iter_mut()
236 .skip(pos_in_chunks)
237 .zip(last_spans.iter().skip(pos_in_spans))
238 .for_each(|(chnk, spn)| *chnk.span_mut() = spn.clone());
239 }
240}