Skip to main content

shuru_store/
cas.rs

1use std::collections::HashMap;
2use std::fs;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::sync::RwLock;
6
7use anyhow::{Context, Result};
8
9pub const CHUNK_SIZE: usize = 64 * 1024; // 64KB
10
11/// Trait for content-addressable chunk storage backends.
12/// Implement this for S3, SSH, or any other remote storage.
13pub trait ChunkStore: Send + Sync {
14    /// Store a chunk, returning its blake3 hash hex string.
15    fn put(&self, data: &[u8]) -> Result<String>;
16    /// Read a chunk by hash. Returns None if not found.
17    fn get(&self, hash: &str) -> Result<Option<Vec<u8>>>;
18}
19
20/// Local filesystem chunk store — chunks stored as files named by hash.
21pub struct LocalChunkStore {
22    chunks_dir: PathBuf,
23}
24
25impl LocalChunkStore {
26    pub fn open(cas_dir: &str) -> Result<Self> {
27        let chunks_dir = Path::new(cas_dir).join("chunks");
28        fs::create_dir_all(&chunks_dir)
29            .with_context(|| format!("failed to create chunks dir: {}", chunks_dir.display()))?;
30        Ok(LocalChunkStore { chunks_dir })
31    }
32
33    fn chunk_path(&self, hash: &str) -> PathBuf {
34        self.chunks_dir.join(hash)
35    }
36}
37
38impl ChunkStore for LocalChunkStore {
39    fn put(&self, data: &[u8]) -> Result<String> {
40        let hash = blake3::hash(data);
41        let hex = hash.to_hex().to_string();
42        let path = self.chunk_path(&hex);
43        match fs::OpenOptions::new().write(true).create_new(true).open(&path) {
44            Ok(mut f) => {
45                use std::io::Write;
46                f.write_all(data)
47                    .with_context(|| format!("failed to write chunk {}", hex))?;
48            }
49            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
50            Err(e) => return Err(e).with_context(|| format!("failed to create chunk {}", hex)),
51        }
52        Ok(hex)
53    }
54
55    fn get(&self, hash: &str) -> Result<Option<Vec<u8>>> {
56        let path = self.chunk_path(hash);
57        match fs::read(&path) {
58            Ok(data) => Ok(Some(data)),
59            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
60            Err(e) => Err(e).with_context(|| format!("failed to read chunk {}", hash)),
61        }
62    }
63}
64
65/// Index mapping chunk positions to hashes. One index per disk image / checkpoint.
66/// ZERO entries mean "ask parent" — enables delta-only checkpoints.
67pub struct ChunkIndex {
68    hashes: Vec<String>,
69    disk_size: u64,
70    /// Path to parent index. On read, ZERO entries resolve through the parent chain.
71    pub parent_path: Option<String>,
72    /// Path to the flat rootfs file at the bottom of the chain (for lazy ingestion).
73    pub fallback_path: Option<String>,
74}
75
76const ZERO_CHUNK_HASH: &str = "ZERO";
77
78impl ChunkIndex {
79    pub fn new(disk_size: u64) -> Self {
80        let num_chunks = ((disk_size + CHUNK_SIZE as u64 - 1) / CHUNK_SIZE as u64) as usize;
81        ChunkIndex {
82            hashes: vec![ZERO_CHUNK_HASH.to_string(); num_chunks],
83            disk_size,
84            parent_path: None,
85            fallback_path: None,
86        }
87    }
88
89    pub fn disk_size(&self) -> u64 {
90        self.disk_size
91    }
92
93    pub fn num_chunks(&self) -> usize {
94        self.hashes.len()
95    }
96
97    pub fn get_hash(&self, chunk_idx: usize) -> Option<&str> {
98        self.hashes.get(chunk_idx).map(|s| s.as_str())
99    }
100
101    pub fn set_hash(&mut self, chunk_idx: usize, hash: String) {
102        if chunk_idx < self.hashes.len() {
103            self.hashes[chunk_idx] = hash;
104        }
105    }
106
107    /// Save index to a file.
108    pub fn save(&self, path: &str) -> Result<()> {
109        if let Some(parent) = Path::new(path).parent() {
110            fs::create_dir_all(parent)?;
111        }
112        let mut f = fs::File::create(path)
113            .with_context(|| format!("failed to create index: {}", path))?;
114        // Header: disk_size, num_chunks, parent_path, fallback_path
115        f.write_all(&self.disk_size.to_le_bytes())?;
116        f.write_all(&(self.hashes.len() as u64).to_le_bytes())?;
117        let parent_bytes = self.parent_path.as_deref().unwrap_or("").as_bytes();
118        f.write_all(&(parent_bytes.len() as u32).to_le_bytes())?;
119        f.write_all(parent_bytes)?;
120        let fallback_bytes = self.fallback_path.as_deref().unwrap_or("").as_bytes();
121        f.write_all(&(fallback_bytes.len() as u32).to_le_bytes())?;
122        f.write_all(fallback_bytes)?;
123        // Chunk hashes
124        for hash in &self.hashes {
125            let bytes = hash.as_bytes();
126            f.write_all(&(bytes.len() as u32).to_le_bytes())?;
127            f.write_all(bytes)?;
128        }
129        Ok(())
130    }
131
132    /// Load index from a file.
133    pub fn load(path: &str) -> Result<Self> {
134        let mut f = fs::File::open(path)
135            .with_context(|| format!("failed to open index: {}", path))?;
136        let mut buf8 = [0u8; 8];
137        f.read_exact(&mut buf8)?;
138        let disk_size = u64::from_le_bytes(buf8);
139        f.read_exact(&mut buf8)?;
140        let num_chunks = u64::from_le_bytes(buf8) as usize;
141
142        // Parent path
143        let mut buf4 = [0u8; 4];
144        f.read_exact(&mut buf4)?;
145        let parent_len = u32::from_le_bytes(buf4) as usize;
146        let parent_path = if parent_len > 0 {
147            let mut parent_bytes = vec![0u8; parent_len];
148            f.read_exact(&mut parent_bytes)?;
149            Some(String::from_utf8(parent_bytes)?)
150        } else {
151            None
152        };
153
154        f.read_exact(&mut buf4)?;
155        let fallback_len = u32::from_le_bytes(buf4) as usize;
156        let fallback_path = if fallback_len > 0 {
157            let mut fallback_bytes = vec![0u8; fallback_len];
158            f.read_exact(&mut fallback_bytes)?;
159            Some(String::from_utf8(fallback_bytes)?)
160        } else {
161            None
162        };
163
164        let mut hashes = Vec::with_capacity(num_chunks);
165        for _ in 0..num_chunks {
166            f.read_exact(&mut buf4)?;
167            let len = u32::from_le_bytes(buf4) as usize;
168            let mut hash_bytes = vec![0u8; len];
169            f.read_exact(&mut hash_bytes)?;
170            hashes.push(String::from_utf8(hash_bytes)?);
171        }
172
173        Ok(ChunkIndex { hashes, disk_size, parent_path, fallback_path })
174    }
175}
176
177/// CAS-backed storage backend for the NBD server.
178pub struct CasBackend {
179    store: Box<dyn ChunkStore>,
180    index: RwLock<ChunkIndex>,
181    dirty: RwLock<HashMap<usize, Vec<u8>>>,
182    /// Parent indexes for chain resolution (loaded lazily from parent_path).
183    parents: RwLock<Vec<ChunkIndex>>,
184    /// Optional flat file for lazy ingestion at the bottom of the chain.
185    fallback: Option<crate::backend::FlatFileBackend>,
186    /// The index path we booted from (becomes the parent when saving a checkpoint).
187    pub source_index_path: Option<String>,
188}
189
190impl CasBackend {
191    pub fn new(store: Box<dyn ChunkStore>, index: ChunkIndex) -> Self {
192        // Load the parent chain upfront (typically 0-3 levels)
193        let parents = Self::load_parent_chain(&index);
194        CasBackend {
195            store,
196            index: RwLock::new(index),
197            dirty: RwLock::new(HashMap::new()),
198            parents: RwLock::new(parents),
199            fallback: None,
200            source_index_path: None,
201        }
202    }
203
204    pub fn with_fallback(store: Box<dyn ChunkStore>, index: ChunkIndex, fallback: crate::backend::FlatFileBackend) -> Self {
205        let parents = Self::load_parent_chain(&index);
206        CasBackend {
207            store,
208            index: RwLock::new(index),
209            dirty: RwLock::new(HashMap::new()),
210            parents: RwLock::new(parents),
211            fallback: Some(fallback),
212            source_index_path: None,
213        }
214    }
215
216    fn load_parent_chain(index: &ChunkIndex) -> Vec<ChunkIndex> {
217        let mut chain = Vec::new();
218        let mut current_parent = index.parent_path.clone();
219        while let Some(ref path) = current_parent {
220            match ChunkIndex::load(path) {
221                Ok(parent) => {
222                    current_parent = parent.parent_path.clone();
223                    chain.push(parent);
224                }
225                Err(e) => {
226                    tracing::warn!("failed to load parent index {}: {}", path, e);
227                    break;
228                }
229            }
230        }
231        chain
232    }
233
234    pub fn size(&self) -> u64 {
235        self.index.read().unwrap().disk_size()
236    }
237
238    /// Extend the virtual disk size. The index grows to cover the new size;
239    /// new chunks default to ZERO (sparse).
240    pub fn set_disk_size(&mut self, new_size: u64) {
241        let mut index = self.index.write().unwrap();
242        if new_size > index.disk_size() {
243            let new_num_chunks = ((new_size + CHUNK_SIZE as u64 - 1) / CHUNK_SIZE as u64) as usize;
244            while index.num_chunks() < new_num_chunks {
245                index.hashes.push(ZERO_CHUNK_HASH.to_string());
246            }
247            index.disk_size = new_size;
248        }
249    }
250
251    pub fn read(&self, offset: u64, buf: &mut [u8]) -> std::io::Result<usize> {
252        let mut pos = 0usize;
253        let mut file_offset = offset;
254
255        while pos < buf.len() {
256            let chunk_idx = (file_offset / CHUNK_SIZE as u64) as usize;
257            let offset_in_chunk = (file_offset % CHUNK_SIZE as u64) as usize;
258            let remaining_in_chunk = CHUNK_SIZE - offset_in_chunk;
259            let to_read = remaining_in_chunk.min(buf.len() - pos);
260
261            let chunk_data = self.read_chunk(chunk_idx)?;
262            let available = chunk_data.len().saturating_sub(offset_in_chunk);
263            let copy_len = to_read.min(available);
264
265            if copy_len > 0 {
266                buf[pos..pos + copy_len]
267                    .copy_from_slice(&chunk_data[offset_in_chunk..offset_in_chunk + copy_len]);
268            }
269            // Zero-fill if chunk is shorter than expected
270            if copy_len < to_read {
271                buf[pos + copy_len..pos + to_read].fill(0);
272            }
273
274            pos += to_read;
275            file_offset += to_read as u64;
276        }
277
278        Ok(buf.len())
279    }
280
281    pub fn write(&self, offset: u64, data: &[u8]) -> std::io::Result<usize> {
282        let mut pos = 0usize;
283        let mut file_offset = offset;
284
285        while pos < data.len() {
286            let chunk_idx = (file_offset / CHUNK_SIZE as u64) as usize;
287            let offset_in_chunk = (file_offset % CHUNK_SIZE as u64) as usize;
288            let remaining_in_chunk = CHUNK_SIZE - offset_in_chunk;
289            let to_write = remaining_in_chunk.min(data.len() - pos);
290
291            // Read-modify-write: get current chunk, overlay the write
292            let mut chunk_data = self.read_chunk(chunk_idx)?;
293            if chunk_data.len() < offset_in_chunk + to_write {
294                chunk_data.resize(offset_in_chunk + to_write, 0);
295            }
296            chunk_data[offset_in_chunk..offset_in_chunk + to_write]
297                .copy_from_slice(&data[pos..pos + to_write]);
298
299            self.dirty.write().unwrap().insert(chunk_idx, chunk_data);
300
301            pos += to_write;
302            file_offset += to_write as u64;
303        }
304
305        Ok(data.len())
306    }
307
308    pub fn flush(&self) -> std::io::Result<()> {
309        let mut dirty = self.dirty.write().unwrap();
310        let mut index = self.index.write().unwrap();
311
312        for (chunk_idx, data) in dirty.drain() {
313            let hash = self.store.put(&data).map_err(|e| {
314                std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
315            })?;
316            index.set_hash(chunk_idx, hash);
317        }
318
319        Ok(())
320    }
321
322    /// Save the current index as a checkpoint. Only writes the delta — ZERO entries
323    /// resolve through the parent chain at read time. Instant regardless of disk size.
324    pub fn save_index(&self, path: &str) -> Result<()> {
325        self.flush().map_err(|e| anyhow::anyhow!(e))?;
326        let mut index = self.index.write().unwrap();
327        index.parent_path = self.source_index_path.clone();
328        // Propagate fallback path so the chain can always resolve to the flat file
329        if index.fallback_path.is_none() {
330            if let Some(ref fb) = self.fallback {
331                index.fallback_path = Some(fb.path().to_string());
332            }
333        }
334        index.save(path)
335    }
336
337    fn read_chunk(&self, chunk_idx: usize) -> std::io::Result<Vec<u8>> {
338        // 1. Check dirty map
339        if let Some(data) = self.dirty.read().unwrap().get(&chunk_idx) {
340            return Ok(data.clone());
341        }
342
343        // 2. Check current index
344        let hash = {
345            let index = self.index.read().unwrap();
346            index.get_hash(chunk_idx).unwrap_or(ZERO_CHUNK_HASH).to_string()
347        };
348        if hash != ZERO_CHUNK_HASH {
349            return self.fetch_chunk(&hash);
350        }
351
352        // 3. Walk parent chain
353        for parent in self.parents.read().unwrap().iter() {
354            let parent_hash = parent.get_hash(chunk_idx).unwrap_or(ZERO_CHUNK_HASH);
355            if parent_hash != ZERO_CHUNK_HASH {
356                return self.fetch_chunk(parent_hash);
357            }
358        }
359
360        // 4. Fallback to flat file (lazy ingestion at the bottom of the chain)
361        if let Some(ref fb) = self.fallback {
362            let offset = chunk_idx as u64 * CHUNK_SIZE as u64;
363            if offset < fb.size() {
364                let read_len = CHUNK_SIZE.min((fb.size() - offset) as usize);
365                let mut buf = vec![0u8; read_len];
366                fb.read(offset, &mut buf)?;
367
368                if buf.iter().all(|&b| b == 0) {
369                    return Ok(vec![0u8; CHUNK_SIZE]);
370                }
371
372                // Cache in chunk store for future reads
373                let new_hash = self.store.put(&buf).map_err(|e| {
374                    std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
375                })?;
376                self.index.write().unwrap().set_hash(chunk_idx, new_hash);
377                return Ok(buf);
378            }
379        }
380
381        // 5. Truly zero
382        Ok(vec![0u8; CHUNK_SIZE])
383    }
384
385    fn fetch_chunk(&self, hash: &str) -> std::io::Result<Vec<u8>> {
386        match self.store.get(hash) {
387            Ok(Some(data)) => Ok(data),
388            Ok(None) => {
389                tracing::warn!("chunk {} not found in store, returning zeros", hash);
390                Ok(vec![0u8; CHUNK_SIZE])
391            }
392            Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())),
393        }
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400
401    #[test]
402    fn test_chunk_store_put_get() {
403        let tmp = tempfile::tempdir().unwrap();
404        let store = LocalChunkStore::open(tmp.path().to_str().unwrap()).unwrap();
405
406        let data = b"hello world";
407        let hash = store.put(data).unwrap();
408        let retrieved = store.get(&hash).unwrap().unwrap();
409        assert_eq!(retrieved, data);
410    }
411
412    #[test]
413    fn test_chunk_store_dedup() {
414        let tmp = tempfile::tempdir().unwrap();
415        let store = LocalChunkStore::open(tmp.path().to_str().unwrap()).unwrap();
416
417        let data = b"same content";
418        let h1 = store.put(data).unwrap();
419        let h2 = store.put(data).unwrap();
420        assert_eq!(h1, h2);
421    }
422
423    #[test]
424    fn test_index_save_load() {
425        let tmp = tempfile::tempdir().unwrap();
426        let idx_path = tmp.path().join("test.idx");
427
428        let mut index = ChunkIndex::new(1024 * 1024);
429        index.set_hash(0, "abc123".to_string());
430        index.set_hash(5, "def456".to_string());
431        index.save(idx_path.to_str().unwrap()).unwrap();
432
433        let loaded = ChunkIndex::load(idx_path.to_str().unwrap()).unwrap();
434        assert_eq!(loaded.disk_size(), 1024 * 1024);
435        assert_eq!(loaded.get_hash(0).unwrap(), "abc123");
436        assert_eq!(loaded.get_hash(5).unwrap(), "def456");
437        assert_eq!(loaded.get_hash(1).unwrap(), ZERO_CHUNK_HASH);
438    }
439
440    #[test]
441    fn test_cas_backend_read_write() {
442        let tmp = tempfile::tempdir().unwrap();
443        let store = LocalChunkStore::open(tmp.path().to_str().unwrap()).unwrap();
444        let index = ChunkIndex::new(256 * 1024); // 256KB = 4 chunks
445        let backend = CasBackend::new(Box::new(store), index);
446
447        // Write some data
448        let data = b"hello from CAS";
449        backend.write(100, data).unwrap();
450
451        // Read it back
452        let mut buf = vec![0u8; data.len()];
453        backend.read(100, &mut buf).unwrap();
454        assert_eq!(&buf, data);
455
456        // Flush and read again (now from chunk store, not dirty map)
457        backend.flush().unwrap();
458        let mut buf2 = vec![0u8; data.len()];
459        backend.read(100, &mut buf2).unwrap();
460        assert_eq!(&buf2, data);
461    }
462
463    #[test]
464    fn test_cas_backend_cross_chunk_write() {
465        let tmp = tempfile::tempdir().unwrap();
466        let store = LocalChunkStore::open(tmp.path().to_str().unwrap()).unwrap();
467        let index = ChunkIndex::new(256 * 1024);
468        let backend = CasBackend::new(Box::new(store), index);
469
470        // Write across chunk boundary (chunk 0 ends at 65536)
471        let offset = CHUNK_SIZE as u64 - 4;
472        let data = b"crosschunk";
473        backend.write(offset, data).unwrap();
474
475        let mut buf = vec![0u8; data.len()];
476        backend.read(offset, &mut buf).unwrap();
477        assert_eq!(&buf, data);
478    }
479}