Skip to main content

array_format/delta/
allocator.rs

1use std::sync::Arc;
2
3use bytes::Bytes;
4
5use crate::{
6    address::{BlockAllocAddress, BlockId},
7    block::BlockMeta,
8    codec::{CompressionCodec, decompress_by_id},
9};
10
11#[cfg(unix)]
12fn read_exact_at(file: &std::fs::File, buf: &mut [u8], offset: u64) -> std::io::Result<()> {
13    use std::os::unix::fs::FileExt;
14    file.read_exact_at(buf, offset)
15}
16
17#[cfg(windows)]
18fn read_exact_at(file: &std::fs::File, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
19    use std::io::{Error, ErrorKind};
20    use std::os::windows::fs::FileExt;
21    while !buf.is_empty() {
22        match file.seek_read(buf, offset) {
23            Ok(0) => {
24                return Err(Error::new(
25                    ErrorKind::UnexpectedEof,
26                    "failed to fill whole buffer",
27                ));
28            }
29            Ok(n) => {
30                let tmp = buf;
31                buf = &mut tmp[n..];
32                offset += n as u64;
33            }
34            Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
35            Err(e) => return Err(e),
36        }
37    }
38    Ok(())
39}
40
41/// Packs raw chunk bytes into compressed blocks.
42///
43/// The current (unflushed) block is held in memory; completed blocks are
44/// compressed and written to a single backing tempfile. [`fetch`](DeltaAllocator::fetch)
45/// serves the current block from memory and decompresses completed blocks
46/// from that file on demand — no second file is needed.
47///
48/// Call [`commit`](DeltaAllocator::commit) to flush the final block and
49/// retrieve the compressed output file plus block metadata.
50pub struct DeltaAllocator {
51    codec: Arc<dyn CompressionCodec>,
52    block_target_size: usize,
53    // Current (not-yet-flushed) block — kept in memory for cheap fetch().
54    current_block_id: u32,
55    current_block: Vec<u8>,
56    // Single file: compressed blocks are appended here on each flush.
57    // Completed-block fetches decompress directly from this file.
58    output_file: std::fs::File,
59    completed_blocks: Vec<BlockMeta>,
60    file_offset: u64,
61}
62
63/// The output of [`DeltaAllocator::commit`]: a file handle to the compressed
64/// block bytes (seeked to the start), the total byte count, and the block
65/// metadata table.
66pub struct AllocatorOutput {
67    /// Compressed block data, seeked to position 0 and ready to read.
68    pub file: tokio::fs::File,
69    /// Total number of bytes in `file`.
70    pub output_size: u64,
71    pub blocks: Vec<BlockMeta>,
72}
73
74impl DeltaAllocator {
75    pub fn new(codec: Arc<dyn CompressionCodec>, block_target_size: usize) -> Self {
76        let output_file =
77            tempfile::tempfile().expect("DeltaAllocator: failed to create output tempfile");
78        Self {
79            codec,
80            block_target_size,
81            current_block_id: 0,
82            current_block: Vec::new(),
83            output_file,
84            completed_blocks: Vec::new(),
85            file_offset: 0,
86        }
87    }
88
89    /// Appends `slice` to the current block and returns its allocation address.
90    /// Flushes the current block to the output file if it reaches the target size.
91    pub fn allocate(&mut self, slice: &[u8]) -> BlockAllocAddress {
92        let block_id = self.current_block_id;
93        let intra_offset = self.current_block.len() as u64;
94        self.current_block.extend_from_slice(slice);
95        if self.current_block.len() >= self.block_target_size {
96            self.flush_block();
97        }
98        BlockAllocAddress::new(BlockId(block_id), intra_offset, slice.len() as u64)
99    }
100
101    fn flush_block(&mut self) {
102        use std::io::Write;
103        if self.current_block.is_empty() {
104            return;
105        }
106        let uncompressed_size = self.current_block.len() as u64;
107        let compressed = self
108            .codec
109            .compress(&self.current_block)
110            .expect("compression failed");
111        let compressed_size = compressed.len() as u64;
112        self.completed_blocks.push(BlockMeta {
113            id: BlockId(self.current_block_id),
114            file_offset: self.file_offset,
115            compressed_size,
116            uncompressed_size,
117            codec: self.codec.id(),
118        });
119        self.output_file
120            .write_all(&compressed)
121            .expect("output_file write failed");
122        self.file_offset += compressed_size;
123        self.current_block_id += 1;
124        self.current_block.clear();
125    }
126
127    /// Returns the raw (uncompressed) bytes for the given allocation address.
128    ///
129    /// Serves the current block from memory; for completed blocks it reads
130    /// from the tempfile at the block's offset (positional read, no seek),
131    /// decompresses the block, and slices out the bytes.
132    pub fn fetch(&self, addr: &BlockAllocAddress) -> Option<Bytes> {
133        let block_id = addr.id().0;
134        let off = addr.offset() as usize;
135        let sz = addr.size() as usize;
136
137        if block_id == self.current_block_id {
138            let end = off + sz;
139            if end > self.current_block.len() {
140                return None;
141            }
142            return Some(Bytes::copy_from_slice(&self.current_block[off..end]));
143        }
144
145        let block = self.completed_blocks.iter().find(|b| b.id.0 == block_id)?;
146        let file_offset = block.file_offset;
147        let compressed_size = block.compressed_size as usize;
148        let uncompressed_size = block.uncompressed_size as usize;
149        let codec = block.codec.clone();
150
151        let mut compressed = vec![0u8; compressed_size];
152        read_exact_at(&self.output_file, &mut compressed, file_offset).ok()?;
153
154        let decompressed = decompress_by_id(&codec, &compressed, uncompressed_size).ok()?;
155        let end = off + sz;
156        Some(Bytes::copy_from_slice(&decompressed[off..end]))
157    }
158
159    /// Flushes the remaining partial block and returns a file handle to the
160    /// compressed output (seeked to position 0), the total output size, and
161    /// the block metadata table.
162    pub async fn commit(mut self) -> AllocatorOutput {
163        use tokio::io::AsyncSeekExt;
164        self.flush_block();
165        let output_size = self.file_offset;
166        let mut file = tokio::fs::File::from_std(self.output_file);
167        file.seek(std::io::SeekFrom::Start(0))
168            .await
169            .expect("output_file seek failed");
170        AllocatorOutput {
171            file,
172            output_size,
173            blocks: self.completed_blocks,
174        }
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use std::sync::Arc;
181
182    use super::*;
183    use crate::{NoCompression, codec::CompressionCodec};
184
185    fn codec() -> Arc<dyn CompressionCodec> {
186        Arc::new(NoCompression)
187    }
188
189    #[test]
190    fn allocator_address_reflects_block_and_offset() {
191        let mut alloc = DeltaAllocator::new(codec(), 1024);
192        let a = alloc.allocate(&[1, 2, 3, 4]);
193        assert_eq!(a.id(), BlockId(0));
194        assert_eq!(a.offset(), 0);
195        assert_eq!(a.size(), 4);
196
197        let b = alloc.allocate(&[5, 6]);
198        assert_eq!(b.id(), BlockId(0));
199        assert_eq!(b.offset(), 4);
200        assert_eq!(b.size(), 2);
201    }
202
203    #[test]
204    fn allocator_fetch_from_current_block() {
205        let mut alloc = DeltaAllocator::new(codec(), 1024);
206        let addr = alloc.allocate(&[10, 20, 30]);
207        let bytes = alloc.fetch(&addr).expect("fetch returned None");
208        assert_eq!(bytes.as_ref(), &[10, 20, 30]);
209    }
210
211    #[test]
212    fn allocator_fetch_second_slice_in_current_block() {
213        let mut alloc = DeltaAllocator::new(codec(), 1024);
214        alloc.allocate(&[0u8; 8]);
215        let addr = alloc.allocate(&[42, 43, 44, 45]);
216        let bytes = alloc.fetch(&addr).unwrap();
217        assert_eq!(bytes.as_ref(), &[42, 43, 44, 45]);
218    }
219
220    #[test]
221    fn allocator_flush_triggered_at_target_size() {
222        let mut alloc = DeltaAllocator::new(codec(), 8);
223        let addr = alloc.allocate(&[1u8; 8]);
224        assert_eq!(
225            alloc.current_block_id, 1,
226            "expected flush to advance block_id"
227        );
228        assert_eq!(addr.id(), BlockId(0));
229    }
230
231    #[test]
232    fn allocator_fetch_from_completed_block() {
233        let payload = [0xABu8; 8];
234        let mut alloc = DeltaAllocator::new(codec(), 8);
235        let addr = alloc.allocate(&payload);
236        alloc.allocate(&[0u8; 4]);
237        let bytes = alloc
238            .fetch(&addr)
239            .expect("fetch from completed block returned None");
240        assert_eq!(bytes.as_ref(), &payload);
241    }
242
243    #[tokio::test]
244    async fn allocator_commit_captures_all_blocks() {
245        let mut alloc = DeltaAllocator::new(codec(), 8);
246        alloc.allocate(&[1u8; 8]); // fills block 0 → flush
247        alloc.allocate(&[2u8; 6]); // partial block 1
248
249        let out = alloc.commit().await;
250        assert_eq!(out.blocks.len(), 2);
251        assert_eq!(out.blocks[0].id, BlockId(0));
252        assert_eq!(out.blocks[1].id, BlockId(1));
253        assert_eq!(out.blocks[0].uncompressed_size, 8);
254        assert_eq!(out.blocks[1].uncompressed_size, 6);
255    }
256
257    #[tokio::test]
258    async fn allocator_commit_output_decompresses_to_original() {
259        let data: Vec<u8> = (0u8..=127).collect();
260        let mut alloc = DeltaAllocator::new(codec(), 64);
261        let addr = alloc.allocate(&data[..64]); // block 0
262        alloc.allocate(&data[64..]); // block 1
263
264        let mut out = alloc.commit().await;
265
266        use tokio::io::AsyncReadExt;
267        let mut all_bytes = vec![0u8; out.output_size as usize];
268        out.file.read_exact(&mut all_bytes).await.unwrap();
269
270        let b0 = &out.blocks[0];
271        let raw0: Vec<u8> = crate::codec::decompress_by_id(
272            &b0.codec,
273            &all_bytes[..b0.compressed_size as usize],
274            b0.uncompressed_size as usize,
275        )
276        .unwrap()
277        .to_vec();
278        assert_eq!(raw0, &data[..64]);
279
280        let off1 = b0.compressed_size as usize;
281        let b1 = &out.blocks[1];
282        let raw1: Vec<u8> = crate::codec::decompress_by_id(
283            &b1.codec,
284            &all_bytes[off1..],
285            b1.uncompressed_size as usize,
286        )
287        .unwrap()
288        .to_vec();
289        assert_eq!(raw1, &data[64..]);
290
291        assert_eq!(addr.id(), BlockId(0));
292        assert_eq!(addr.size() as usize, 64);
293    }
294}