1use std::collections::HashMap;
2use std::fs;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::sync::RwLock;
6
7use anyhow::{Context, Result};
8
9pub const CHUNK_SIZE: usize = 64 * 1024; pub trait ChunkStore: Send + Sync {
14 fn put(&self, data: &[u8]) -> Result<String>;
16 fn get(&self, hash: &str) -> Result<Option<Vec<u8>>>;
18}
19
20pub struct LocalChunkStore {
22 chunks_dir: PathBuf,
23}
24
25impl LocalChunkStore {
26 pub fn open(cas_dir: &str) -> Result<Self> {
27 let chunks_dir = Path::new(cas_dir).join("chunks");
28 fs::create_dir_all(&chunks_dir)
29 .with_context(|| format!("failed to create chunks dir: {}", chunks_dir.display()))?;
30 Ok(LocalChunkStore { chunks_dir })
31 }
32
33 fn chunk_path(&self, hash: &str) -> PathBuf {
34 self.chunks_dir.join(hash)
35 }
36}
37
38impl ChunkStore for LocalChunkStore {
39 fn put(&self, data: &[u8]) -> Result<String> {
40 let hash = blake3::hash(data);
41 let hex = hash.to_hex().to_string();
42 let path = self.chunk_path(&hex);
43 match fs::OpenOptions::new().write(true).create_new(true).open(&path) {
44 Ok(mut f) => {
45 use std::io::Write;
46 f.write_all(data)
47 .with_context(|| format!("failed to write chunk {}", hex))?;
48 }
49 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
50 Err(e) => return Err(e).with_context(|| format!("failed to create chunk {}", hex)),
51 }
52 Ok(hex)
53 }
54
55 fn get(&self, hash: &str) -> Result<Option<Vec<u8>>> {
56 let path = self.chunk_path(hash);
57 match fs::read(&path) {
58 Ok(data) => Ok(Some(data)),
59 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
60 Err(e) => Err(e).with_context(|| format!("failed to read chunk {}", hash)),
61 }
62 }
63}
64
65pub struct ChunkIndex {
68 hashes: Vec<String>,
69 disk_size: u64,
70 pub parent_path: Option<String>,
72 pub fallback_path: Option<String>,
74}
75
76const ZERO_CHUNK_HASH: &str = "ZERO";
77
78impl ChunkIndex {
79 pub fn new(disk_size: u64) -> Self {
80 let num_chunks = ((disk_size + CHUNK_SIZE as u64 - 1) / CHUNK_SIZE as u64) as usize;
81 ChunkIndex {
82 hashes: vec![ZERO_CHUNK_HASH.to_string(); num_chunks],
83 disk_size,
84 parent_path: None,
85 fallback_path: None,
86 }
87 }
88
89 pub fn disk_size(&self) -> u64 {
90 self.disk_size
91 }
92
93 pub fn num_chunks(&self) -> usize {
94 self.hashes.len()
95 }
96
97 pub fn get_hash(&self, chunk_idx: usize) -> Option<&str> {
98 self.hashes.get(chunk_idx).map(|s| s.as_str())
99 }
100
101 pub fn set_hash(&mut self, chunk_idx: usize, hash: String) {
102 if chunk_idx < self.hashes.len() {
103 self.hashes[chunk_idx] = hash;
104 }
105 }
106
107 pub fn save(&self, path: &str) -> Result<()> {
109 if let Some(parent) = Path::new(path).parent() {
110 fs::create_dir_all(parent)?;
111 }
112 let mut f = fs::File::create(path)
113 .with_context(|| format!("failed to create index: {}", path))?;
114 f.write_all(&self.disk_size.to_le_bytes())?;
116 f.write_all(&(self.hashes.len() as u64).to_le_bytes())?;
117 let parent_bytes = self.parent_path.as_deref().unwrap_or("").as_bytes();
118 f.write_all(&(parent_bytes.len() as u32).to_le_bytes())?;
119 f.write_all(parent_bytes)?;
120 let fallback_bytes = self.fallback_path.as_deref().unwrap_or("").as_bytes();
121 f.write_all(&(fallback_bytes.len() as u32).to_le_bytes())?;
122 f.write_all(fallback_bytes)?;
123 for hash in &self.hashes {
125 let bytes = hash.as_bytes();
126 f.write_all(&(bytes.len() as u32).to_le_bytes())?;
127 f.write_all(bytes)?;
128 }
129 Ok(())
130 }
131
132 pub fn load(path: &str) -> Result<Self> {
134 let mut f = fs::File::open(path)
135 .with_context(|| format!("failed to open index: {}", path))?;
136 let mut buf8 = [0u8; 8];
137 f.read_exact(&mut buf8)?;
138 let disk_size = u64::from_le_bytes(buf8);
139 f.read_exact(&mut buf8)?;
140 let num_chunks = u64::from_le_bytes(buf8) as usize;
141
142 let mut buf4 = [0u8; 4];
144 f.read_exact(&mut buf4)?;
145 let parent_len = u32::from_le_bytes(buf4) as usize;
146 let parent_path = if parent_len > 0 {
147 let mut parent_bytes = vec![0u8; parent_len];
148 f.read_exact(&mut parent_bytes)?;
149 Some(String::from_utf8(parent_bytes)?)
150 } else {
151 None
152 };
153
154 f.read_exact(&mut buf4)?;
155 let fallback_len = u32::from_le_bytes(buf4) as usize;
156 let fallback_path = if fallback_len > 0 {
157 let mut fallback_bytes = vec![0u8; fallback_len];
158 f.read_exact(&mut fallback_bytes)?;
159 Some(String::from_utf8(fallback_bytes)?)
160 } else {
161 None
162 };
163
164 let mut hashes = Vec::with_capacity(num_chunks);
165 for _ in 0..num_chunks {
166 f.read_exact(&mut buf4)?;
167 let len = u32::from_le_bytes(buf4) as usize;
168 let mut hash_bytes = vec![0u8; len];
169 f.read_exact(&mut hash_bytes)?;
170 hashes.push(String::from_utf8(hash_bytes)?);
171 }
172
173 Ok(ChunkIndex { hashes, disk_size, parent_path, fallback_path })
174 }
175}
176
177pub struct CasBackend {
179 store: Box<dyn ChunkStore>,
180 index: RwLock<ChunkIndex>,
181 dirty: RwLock<HashMap<usize, Vec<u8>>>,
182 parents: RwLock<Vec<ChunkIndex>>,
184 fallback: Option<crate::backend::FlatFileBackend>,
186 pub source_index_path: Option<String>,
188}
189
190impl CasBackend {
191 pub fn new(store: Box<dyn ChunkStore>, index: ChunkIndex) -> Self {
192 let parents = Self::load_parent_chain(&index);
194 CasBackend {
195 store,
196 index: RwLock::new(index),
197 dirty: RwLock::new(HashMap::new()),
198 parents: RwLock::new(parents),
199 fallback: None,
200 source_index_path: None,
201 }
202 }
203
204 pub fn with_fallback(store: Box<dyn ChunkStore>, index: ChunkIndex, fallback: crate::backend::FlatFileBackend) -> Self {
205 let parents = Self::load_parent_chain(&index);
206 CasBackend {
207 store,
208 index: RwLock::new(index),
209 dirty: RwLock::new(HashMap::new()),
210 parents: RwLock::new(parents),
211 fallback: Some(fallback),
212 source_index_path: None,
213 }
214 }
215
216 fn load_parent_chain(index: &ChunkIndex) -> Vec<ChunkIndex> {
217 let mut chain = Vec::new();
218 let mut current_parent = index.parent_path.clone();
219 while let Some(ref path) = current_parent {
220 match ChunkIndex::load(path) {
221 Ok(parent) => {
222 current_parent = parent.parent_path.clone();
223 chain.push(parent);
224 }
225 Err(e) => {
226 tracing::warn!("failed to load parent index {}: {}", path, e);
227 break;
228 }
229 }
230 }
231 chain
232 }
233
234 pub fn size(&self) -> u64 {
235 self.index.read().unwrap().disk_size()
236 }
237
238 pub fn set_disk_size(&mut self, new_size: u64) {
241 let mut index = self.index.write().unwrap();
242 if new_size > index.disk_size() {
243 let new_num_chunks = ((new_size + CHUNK_SIZE as u64 - 1) / CHUNK_SIZE as u64) as usize;
244 while index.num_chunks() < new_num_chunks {
245 index.hashes.push(ZERO_CHUNK_HASH.to_string());
246 }
247 index.disk_size = new_size;
248 }
249 }
250
251 pub fn read(&self, offset: u64, buf: &mut [u8]) -> std::io::Result<usize> {
252 let mut pos = 0usize;
253 let mut file_offset = offset;
254
255 while pos < buf.len() {
256 let chunk_idx = (file_offset / CHUNK_SIZE as u64) as usize;
257 let offset_in_chunk = (file_offset % CHUNK_SIZE as u64) as usize;
258 let remaining_in_chunk = CHUNK_SIZE - offset_in_chunk;
259 let to_read = remaining_in_chunk.min(buf.len() - pos);
260
261 let chunk_data = self.read_chunk(chunk_idx)?;
262 let available = chunk_data.len().saturating_sub(offset_in_chunk);
263 let copy_len = to_read.min(available);
264
265 if copy_len > 0 {
266 buf[pos..pos + copy_len]
267 .copy_from_slice(&chunk_data[offset_in_chunk..offset_in_chunk + copy_len]);
268 }
269 if copy_len < to_read {
271 buf[pos + copy_len..pos + to_read].fill(0);
272 }
273
274 pos += to_read;
275 file_offset += to_read as u64;
276 }
277
278 Ok(buf.len())
279 }
280
281 pub fn write(&self, offset: u64, data: &[u8]) -> std::io::Result<usize> {
282 let mut pos = 0usize;
283 let mut file_offset = offset;
284
285 while pos < data.len() {
286 let chunk_idx = (file_offset / CHUNK_SIZE as u64) as usize;
287 let offset_in_chunk = (file_offset % CHUNK_SIZE as u64) as usize;
288 let remaining_in_chunk = CHUNK_SIZE - offset_in_chunk;
289 let to_write = remaining_in_chunk.min(data.len() - pos);
290
291 let mut chunk_data = self.read_chunk(chunk_idx)?;
293 if chunk_data.len() < offset_in_chunk + to_write {
294 chunk_data.resize(offset_in_chunk + to_write, 0);
295 }
296 chunk_data[offset_in_chunk..offset_in_chunk + to_write]
297 .copy_from_slice(&data[pos..pos + to_write]);
298
299 self.dirty.write().unwrap().insert(chunk_idx, chunk_data);
300
301 pos += to_write;
302 file_offset += to_write as u64;
303 }
304
305 Ok(data.len())
306 }
307
308 pub fn flush(&self) -> std::io::Result<()> {
309 let mut dirty = self.dirty.write().unwrap();
310 let mut index = self.index.write().unwrap();
311
312 for (chunk_idx, data) in dirty.drain() {
313 let hash = self.store.put(&data).map_err(|e| {
314 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
315 })?;
316 index.set_hash(chunk_idx, hash);
317 }
318
319 Ok(())
320 }
321
322 pub fn save_index(&self, path: &str) -> Result<()> {
325 self.flush().map_err(|e| anyhow::anyhow!(e))?;
326 let mut index = self.index.write().unwrap();
327 index.parent_path = self.source_index_path.clone();
328 if index.fallback_path.is_none() {
330 if let Some(ref fb) = self.fallback {
331 index.fallback_path = Some(fb.path().to_string());
332 }
333 }
334 index.save(path)
335 }
336
337 fn read_chunk(&self, chunk_idx: usize) -> std::io::Result<Vec<u8>> {
338 if let Some(data) = self.dirty.read().unwrap().get(&chunk_idx) {
340 return Ok(data.clone());
341 }
342
343 let hash = {
345 let index = self.index.read().unwrap();
346 index.get_hash(chunk_idx).unwrap_or(ZERO_CHUNK_HASH).to_string()
347 };
348 if hash != ZERO_CHUNK_HASH {
349 return self.fetch_chunk(&hash);
350 }
351
352 for parent in self.parents.read().unwrap().iter() {
354 let parent_hash = parent.get_hash(chunk_idx).unwrap_or(ZERO_CHUNK_HASH);
355 if parent_hash != ZERO_CHUNK_HASH {
356 return self.fetch_chunk(parent_hash);
357 }
358 }
359
360 if let Some(ref fb) = self.fallback {
362 let offset = chunk_idx as u64 * CHUNK_SIZE as u64;
363 if offset < fb.size() {
364 let read_len = CHUNK_SIZE.min((fb.size() - offset) as usize);
365 let mut buf = vec![0u8; read_len];
366 fb.read(offset, &mut buf)?;
367
368 if buf.iter().all(|&b| b == 0) {
369 return Ok(vec![0u8; CHUNK_SIZE]);
370 }
371
372 let new_hash = self.store.put(&buf).map_err(|e| {
374 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
375 })?;
376 self.index.write().unwrap().set_hash(chunk_idx, new_hash);
377 return Ok(buf);
378 }
379 }
380
381 Ok(vec![0u8; CHUNK_SIZE])
383 }
384
385 fn fetch_chunk(&self, hash: &str) -> std::io::Result<Vec<u8>> {
386 match self.store.get(hash) {
387 Ok(Some(data)) => Ok(data),
388 Ok(None) => {
389 tracing::warn!("chunk {} not found in store, returning zeros", hash);
390 Ok(vec![0u8; CHUNK_SIZE])
391 }
392 Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())),
393 }
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400
401 #[test]
402 fn test_chunk_store_put_get() {
403 let tmp = tempfile::tempdir().unwrap();
404 let store = LocalChunkStore::open(tmp.path().to_str().unwrap()).unwrap();
405
406 let data = b"hello world";
407 let hash = store.put(data).unwrap();
408 let retrieved = store.get(&hash).unwrap().unwrap();
409 assert_eq!(retrieved, data);
410 }
411
412 #[test]
413 fn test_chunk_store_dedup() {
414 let tmp = tempfile::tempdir().unwrap();
415 let store = LocalChunkStore::open(tmp.path().to_str().unwrap()).unwrap();
416
417 let data = b"same content";
418 let h1 = store.put(data).unwrap();
419 let h2 = store.put(data).unwrap();
420 assert_eq!(h1, h2);
421 }
422
423 #[test]
424 fn test_index_save_load() {
425 let tmp = tempfile::tempdir().unwrap();
426 let idx_path = tmp.path().join("test.idx");
427
428 let mut index = ChunkIndex::new(1024 * 1024);
429 index.set_hash(0, "abc123".to_string());
430 index.set_hash(5, "def456".to_string());
431 index.save(idx_path.to_str().unwrap()).unwrap();
432
433 let loaded = ChunkIndex::load(idx_path.to_str().unwrap()).unwrap();
434 assert_eq!(loaded.disk_size(), 1024 * 1024);
435 assert_eq!(loaded.get_hash(0).unwrap(), "abc123");
436 assert_eq!(loaded.get_hash(5).unwrap(), "def456");
437 assert_eq!(loaded.get_hash(1).unwrap(), ZERO_CHUNK_HASH);
438 }
439
440 #[test]
441 fn test_cas_backend_read_write() {
442 let tmp = tempfile::tempdir().unwrap();
443 let store = LocalChunkStore::open(tmp.path().to_str().unwrap()).unwrap();
444 let index = ChunkIndex::new(256 * 1024); let backend = CasBackend::new(Box::new(store), index);
446
447 let data = b"hello from CAS";
449 backend.write(100, data).unwrap();
450
451 let mut buf = vec![0u8; data.len()];
453 backend.read(100, &mut buf).unwrap();
454 assert_eq!(&buf, data);
455
456 backend.flush().unwrap();
458 let mut buf2 = vec![0u8; data.len()];
459 backend.read(100, &mut buf2).unwrap();
460 assert_eq!(&buf2, data);
461 }
462
463 #[test]
464 fn test_cas_backend_cross_chunk_write() {
465 let tmp = tempfile::tempdir().unwrap();
466 let store = LocalChunkStore::open(tmp.path().to_str().unwrap()).unwrap();
467 let index = ChunkIndex::new(256 * 1024);
468 let backend = CasBackend::new(Box::new(store), index);
469
470 let offset = CHUNK_SIZE as u64 - 4;
472 let data = b"crosschunk";
473 backend.write(offset, data).unwrap();
474
475 let mut buf = vec![0u8; data.len()];
476 backend.read(offset, &mut buf).unwrap();
477 assert_eq!(&buf, data);
478 }
479}