1use std::sync::Arc;
2
3use bytes::Bytes;
4
5use crate::{
6 address::{BlockAllocAddress, BlockId},
7 block::BlockMeta,
8 codec::{CompressionCodec, decompress_by_id},
9};
10
11#[cfg(unix)]
12fn read_exact_at(file: &std::fs::File, buf: &mut [u8], offset: u64) -> std::io::Result<()> {
13 use std::os::unix::fs::FileExt;
14 file.read_exact_at(buf, offset)
15}
16
17#[cfg(windows)]
18fn read_exact_at(file: &std::fs::File, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
19 use std::io::{Error, ErrorKind};
20 use std::os::windows::fs::FileExt;
21 while !buf.is_empty() {
22 match file.seek_read(buf, offset) {
23 Ok(0) => {
24 return Err(Error::new(
25 ErrorKind::UnexpectedEof,
26 "failed to fill whole buffer",
27 ));
28 }
29 Ok(n) => {
30 let tmp = buf;
31 buf = &mut tmp[n..];
32 offset += n as u64;
33 }
34 Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
35 Err(e) => return Err(e),
36 }
37 }
38 Ok(())
39}
40
41pub struct DeltaAllocator {
51 codec: Arc<dyn CompressionCodec>,
52 block_target_size: usize,
53 current_block_id: u32,
55 current_block: Vec<u8>,
56 output_file: std::fs::File,
59 completed_blocks: Vec<BlockMeta>,
60 file_offset: u64,
61}
62
63pub struct AllocatorOutput {
67 pub file: tokio::fs::File,
69 pub output_size: u64,
71 pub blocks: Vec<BlockMeta>,
72}
73
74impl DeltaAllocator {
75 pub fn new(codec: Arc<dyn CompressionCodec>, block_target_size: usize) -> Self {
76 let output_file =
77 tempfile::tempfile().expect("DeltaAllocator: failed to create output tempfile");
78 Self {
79 codec,
80 block_target_size,
81 current_block_id: 0,
82 current_block: Vec::new(),
83 output_file,
84 completed_blocks: Vec::new(),
85 file_offset: 0,
86 }
87 }
88
89 pub fn allocate(&mut self, slice: &[u8]) -> BlockAllocAddress {
92 let block_id = self.current_block_id;
93 let intra_offset = self.current_block.len() as u64;
94 self.current_block.extend_from_slice(slice);
95 if self.current_block.len() >= self.block_target_size {
96 self.flush_block();
97 }
98 BlockAllocAddress::new(BlockId(block_id), intra_offset, slice.len() as u64)
99 }
100
101 fn flush_block(&mut self) {
102 use std::io::Write;
103 if self.current_block.is_empty() {
104 return;
105 }
106 let uncompressed_size = self.current_block.len() as u64;
107 let compressed = self
108 .codec
109 .compress(&self.current_block)
110 .expect("compression failed");
111 let compressed_size = compressed.len() as u64;
112 self.completed_blocks.push(BlockMeta {
113 id: BlockId(self.current_block_id),
114 file_offset: self.file_offset,
115 compressed_size,
116 uncompressed_size,
117 codec: self.codec.id(),
118 });
119 self.output_file
120 .write_all(&compressed)
121 .expect("output_file write failed");
122 self.file_offset += compressed_size;
123 self.current_block_id += 1;
124 self.current_block.clear();
125 }
126
127 pub fn fetch(&self, addr: &BlockAllocAddress) -> Option<Bytes> {
133 let block_id = addr.id().0;
134 let off = addr.offset() as usize;
135 let sz = addr.size() as usize;
136
137 if block_id == self.current_block_id {
138 let end = off + sz;
139 if end > self.current_block.len() {
140 return None;
141 }
142 return Some(Bytes::copy_from_slice(&self.current_block[off..end]));
143 }
144
145 let block = self.completed_blocks.iter().find(|b| b.id.0 == block_id)?;
146 let file_offset = block.file_offset;
147 let compressed_size = block.compressed_size as usize;
148 let uncompressed_size = block.uncompressed_size as usize;
149 let codec = block.codec.clone();
150
151 let mut compressed = vec![0u8; compressed_size];
152 read_exact_at(&self.output_file, &mut compressed, file_offset).ok()?;
153
154 let decompressed = decompress_by_id(&codec, &compressed, uncompressed_size).ok()?;
155 let end = off + sz;
156 Some(Bytes::copy_from_slice(&decompressed[off..end]))
157 }
158
159 pub async fn commit(mut self) -> AllocatorOutput {
163 use tokio::io::AsyncSeekExt;
164 self.flush_block();
165 let output_size = self.file_offset;
166 let mut file = tokio::fs::File::from_std(self.output_file);
167 file.seek(std::io::SeekFrom::Start(0))
168 .await
169 .expect("output_file seek failed");
170 AllocatorOutput {
171 file,
172 output_size,
173 blocks: self.completed_blocks,
174 }
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use std::sync::Arc;
181
182 use super::*;
183 use crate::{NoCompression, codec::CompressionCodec};
184
185 fn codec() -> Arc<dyn CompressionCodec> {
186 Arc::new(NoCompression)
187 }
188
189 #[test]
190 fn allocator_address_reflects_block_and_offset() {
191 let mut alloc = DeltaAllocator::new(codec(), 1024);
192 let a = alloc.allocate(&[1, 2, 3, 4]);
193 assert_eq!(a.id(), BlockId(0));
194 assert_eq!(a.offset(), 0);
195 assert_eq!(a.size(), 4);
196
197 let b = alloc.allocate(&[5, 6]);
198 assert_eq!(b.id(), BlockId(0));
199 assert_eq!(b.offset(), 4);
200 assert_eq!(b.size(), 2);
201 }
202
203 #[test]
204 fn allocator_fetch_from_current_block() {
205 let mut alloc = DeltaAllocator::new(codec(), 1024);
206 let addr = alloc.allocate(&[10, 20, 30]);
207 let bytes = alloc.fetch(&addr).expect("fetch returned None");
208 assert_eq!(bytes.as_ref(), &[10, 20, 30]);
209 }
210
211 #[test]
212 fn allocator_fetch_second_slice_in_current_block() {
213 let mut alloc = DeltaAllocator::new(codec(), 1024);
214 alloc.allocate(&[0u8; 8]);
215 let addr = alloc.allocate(&[42, 43, 44, 45]);
216 let bytes = alloc.fetch(&addr).unwrap();
217 assert_eq!(bytes.as_ref(), &[42, 43, 44, 45]);
218 }
219
220 #[test]
221 fn allocator_flush_triggered_at_target_size() {
222 let mut alloc = DeltaAllocator::new(codec(), 8);
223 let addr = alloc.allocate(&[1u8; 8]);
224 assert_eq!(
225 alloc.current_block_id, 1,
226 "expected flush to advance block_id"
227 );
228 assert_eq!(addr.id(), BlockId(0));
229 }
230
231 #[test]
232 fn allocator_fetch_from_completed_block() {
233 let payload = [0xABu8; 8];
234 let mut alloc = DeltaAllocator::new(codec(), 8);
235 let addr = alloc.allocate(&payload);
236 alloc.allocate(&[0u8; 4]);
237 let bytes = alloc
238 .fetch(&addr)
239 .expect("fetch from completed block returned None");
240 assert_eq!(bytes.as_ref(), &payload);
241 }
242
243 #[tokio::test]
244 async fn allocator_commit_captures_all_blocks() {
245 let mut alloc = DeltaAllocator::new(codec(), 8);
246 alloc.allocate(&[1u8; 8]); alloc.allocate(&[2u8; 6]); let out = alloc.commit().await;
250 assert_eq!(out.blocks.len(), 2);
251 assert_eq!(out.blocks[0].id, BlockId(0));
252 assert_eq!(out.blocks[1].id, BlockId(1));
253 assert_eq!(out.blocks[0].uncompressed_size, 8);
254 assert_eq!(out.blocks[1].uncompressed_size, 6);
255 }
256
257 #[tokio::test]
258 async fn allocator_commit_output_decompresses_to_original() {
259 let data: Vec<u8> = (0u8..=127).collect();
260 let mut alloc = DeltaAllocator::new(codec(), 64);
261 let addr = alloc.allocate(&data[..64]); alloc.allocate(&data[64..]); let mut out = alloc.commit().await;
265
266 use tokio::io::AsyncReadExt;
267 let mut all_bytes = vec![0u8; out.output_size as usize];
268 out.file.read_exact(&mut all_bytes).await.unwrap();
269
270 let b0 = &out.blocks[0];
271 let raw0: Vec<u8> = crate::codec::decompress_by_id(
272 &b0.codec,
273 &all_bytes[..b0.compressed_size as usize],
274 b0.uncompressed_size as usize,
275 )
276 .unwrap()
277 .to_vec();
278 assert_eq!(raw0, &data[..64]);
279
280 let off1 = b0.compressed_size as usize;
281 let b1 = &out.blocks[1];
282 let raw1: Vec<u8> = crate::codec::decompress_by_id(
283 &b1.codec,
284 &all_bytes[off1..],
285 b1.uncompressed_size as usize,
286 )
287 .unwrap()
288 .to_vec();
289 assert_eq!(raw1, &data[64..]);
290
291 assert_eq!(addr.id(), BlockId(0));
292 assert_eq!(addr.size() as usize, 64);
293 }
294}