fcdb_cas/
lib.rs

1//! # Enishi CAS (Content Addressable Storage)
2//!
3//! PackCAS implementation with cidx indexing and bloom filters.
4//!
5//! Merkle DAG: enishi_cas -> pack_cas, cidx, bloom_filters, wal, gc
6
7use fcdb_core::{Cid, varint};
8use std::collections::HashMap;
9use std::fs::{File, OpenOptions};
10use std::io::{self, Read, Write, Seek, SeekFrom};
11use std::path::{Path, PathBuf};
12use memmap2::Mmap;
13use bloom::{BloomFilter, ASMS};
14use crc32fast::Hasher as Crc32;
15use tracing::{info, warn, error};
16
17/// Pack size configuration (256-512MiB)
18const PACK_SIZE_TARGET: u64 = 256 * 1024 * 1024; // 256 MiB
19const PACK_SIZE_MAX: u64 = 512 * 1024 * 1024;    // 512 MiB
20
21/// Temperature bands for pack organization
22#[derive(Clone, Copy, Debug, PartialEq, Eq)]
23pub enum PackBand {
24    Small,  // Small objects (< 4KB)
25    Index,  // Index structures
26    Blob,   // Large blobs (>= 4KB)
27}
28
29/// Pack metadata
30#[derive(Clone, Debug)]
31pub struct PackMeta {
32    pub id: u32,
33    pub band: PackBand,
34    pub size: u64,
35    pub object_count: u64,
36    pub created_at: u64,
37}
38
39/// Content Index Record (64B fixed length)
40#[repr(C)]
41#[derive(Clone, Copy, Debug)]
42pub struct CidxRec {
43    pub cid: [u8; 32],      // CID
44    pub pack_id: u32,       // Pack ID
45    pub offset: u64,        // Offset in pack
46    pub len: u32,           // Object length
47    pub kind: u8,           // Object kind/type
48    pub flags: u8,          // Flags
49    pub crc: u32,           // CRC32 checksum
50    pub _pad: [u8; 10],     // Padding to 64B
51}
52
53impl CidxRec {
54    /// Create a new cidx record
55    pub fn new(cid: Cid, pack_id: u32, offset: u64, len: u32, kind: u8, flags: u8) -> Self {
56        let mut crc = Crc32::new();
57        crc.update(cid.as_bytes());
58        crc.update(&pack_id.to_le_bytes());
59        crc.update(&offset.to_le_bytes());
60        crc.update(&len.to_le_bytes());
61        crc.update(&[kind, flags]);
62
63        Self {
64            cid: *cid.as_bytes(),
65            pack_id,
66            offset,
67            len,
68            kind,
69            flags,
70            crc: crc.finalize(),
71            _pad: [0; 10],
72        }
73    }
74
75    /// Verify CRC
76    pub fn verify_crc(&self) -> bool {
77        let mut crc = Crc32::new();
78        crc.update(&self.cid);
79        crc.update(&self.pack_id.to_le_bytes());
80        crc.update(&self.offset.to_le_bytes());
81        crc.update(&self.len.to_le_bytes());
82        crc.update(&[self.kind, self.flags]);
83        crc.finalize() == self.crc
84    }
85}
86
87/// Bloom filter configuration for different levels
88#[derive(Clone, Debug)]
89pub struct BloomConfig {
90    pub expected_items: usize,
91    pub fp_rate: f64,
92}
93
94impl Default for BloomConfig {
95    fn default() -> Self {
96        Self {
97            expected_items: 1_000_000,
98            fp_rate: 1e-6, // Very low false positive rate
99        }
100    }
101}
102
103/// Multi-level bloom filter system
104pub struct BloomFilters {
105    global: BloomFilter,
106    pack_filters: HashMap<u32, BloomFilter>,
107    shard_filters: HashMap<(u16, u64), BloomFilter>, // (type, time_bucket) -> filter
108}
109
110impl BloomFilters {
111    pub fn new() -> Self {
112        Self {
113            global: BloomFilter::with_rate(BloomConfig::default().fp_rate as f32, BloomConfig::default().expected_items as u32),
114            pack_filters: HashMap::new(),
115            shard_filters: HashMap::new(),
116        }
117    }
118
119    pub fn insert(&mut self, cid: &Cid, pack_id: u32, type_part: u16, time_bucket: u64) {
120        // Global filter
121        self.global.insert(cid.as_bytes());
122
123        // Pack filter
124        self.pack_filters
125            .entry(pack_id)
126            .or_insert_with(|| BloomFilter::with_rate(1e-7, 100_000))
127            .insert(cid.as_bytes());
128
129        // Shard filter
130        self.shard_filters
131            .entry((type_part, time_bucket))
132            .or_insert_with(|| BloomFilter::with_rate(1e-8, 10_000))
133            .insert(cid.as_bytes());
134    }
135
136    pub fn contains(&self, cid: &Cid, pack_id: Option<u32>, shard: Option<(u16, u64)>) -> bool {
137        // Check global first
138        if !self.global.contains(cid.as_bytes()) {
139            return false;
140        }
141
142        // Check pack filter if specified
143        if let Some(pack_id) = pack_id {
144            if let Some(filter) = self.pack_filters.get(&pack_id) {
145                if !filter.contains(cid.as_bytes()) {
146                    return false;
147                }
148            }
149        }
150
151        // Check shard filter if specified
152        if let Some((type_part, time_bucket)) = shard {
153            if let Some(filter) = self.shard_filters.get(&(type_part, time_bucket)) {
154                if !filter.contains(cid.as_bytes()) {
155                    return false;
156                }
157            }
158        }
159
160        true
161    }
162}
163
164/// PackCAS - Content Addressable Storage with pack files
165pub struct PackCAS {
166    base_path: PathBuf,
167    current_pack: Option<PackWriter>,
168    packs: HashMap<u32, PackMeta>,
169    cidx_file: File,
170    bloom_filters: BloomFilters,
171    next_pack_id: u32,
172}
173
174impl PackCAS {
175    /// Open or create a PackCAS instance
176    pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
177        let base_path = path.as_ref().to_path_buf();
178        std::fs::create_dir_all(&base_path)?;
179
180        let cidx_path = base_path.join("cidx.dat");
181        let cidx_file = OpenOptions::new()
182            .read(true)
183            .write(true)
184            .create(true)
185            .open(cidx_path)?;
186
187        let mut cas = Self {
188            base_path,
189            current_pack: None,
190            packs: HashMap::new(),
191            cidx_file,
192            bloom_filters: BloomFilters::new(),
193            next_pack_id: 0,
194        };
195
196        cas.load_existing_packs().await?;
197        cas.load_cidx().await?;
198
199        Ok(cas)
200    }
201
202    /// Load existing pack metadata
203    async fn load_existing_packs(&mut self) -> io::Result<()> {
204        let mut pack_id = 0;
205        loop {
206            let pack_path = self.base_path.join(format!("pack_{:08}.dat", pack_id));
207            if !pack_path.exists() {
208                break;
209            }
210
211            // Load pack metadata (simplified - in real impl, read from manifest)
212            let meta = PackMeta {
213                id: pack_id,
214                band: PackBand::Blob, // Default
215                size: std::fs::metadata(&pack_path)?.len(),
216                object_count: 0, // Would be loaded from manifest
217                created_at: 0,
218            };
219
220            self.packs.insert(pack_id, meta);
221            pack_id += 1;
222        }
223        self.next_pack_id = pack_id;
224
225        Ok(())
226    }
227
228    /// Load content index
229    async fn load_cidx(&mut self) -> io::Result<()> {
230        let file_size = self.cidx_file.metadata()?.len();
231        let record_count = file_size / std::mem::size_of::<CidxRec>() as u64;
232
233        // Memory map the cidx file for fast access
234        let mmap = unsafe { Mmap::map(&self.cidx_file)? };
235        let records = unsafe {
236            std::slice::from_raw_parts(
237                mmap.as_ptr() as *const CidxRec,
238                record_count as usize,
239            )
240        };
241
242        // Rebuild bloom filters from cidx
243        for record in records {
244            if !record.verify_crc() {
245                warn!("Cidx record CRC mismatch, skipping");
246                continue;
247            }
248
249            let cid = Cid::from_bytes(record.cid);
250            let pack_id = record.pack_id;
251            let type_part = (record.kind as u16) << 8; // Simplified type extraction
252            let time_bucket = 0; // Would be derived from metadata
253
254            self.bloom_filters.insert(&cid, pack_id, type_part, time_bucket);
255        }
256
257        info!("Loaded {} cidx records", record_count);
258        Ok(())
259    }
260
261    /// Store data and return CID
262    pub async fn put(&mut self, data: &[u8], kind: u8, band: PackBand) -> io::Result<Cid> {
263        let cid = Cid::hash(data);
264
265        // Check if already exists
266        if self.bloom_filters.contains(&cid, None, None) {
267            // Could do a full lookup here, but for now assume it's there
268            return Ok(cid);
269        }
270
271        // Ensure we have a pack writer
272        self.ensure_pack_writer(band).await?;
273
274        let (offset, pack_id) = if let Some(writer) = &mut self.current_pack {
275            let offset = writer.current_offset;
276            let pack_id = writer.pack_id;
277            writer.file.write_all(data)?;
278            writer.current_offset += data.len() as u64;
279            (offset, pack_id)
280        } else {
281            return Err(io::Error::new(io::ErrorKind::Other, "No current pack writer"));
282        };
283
284        // Add to cidx
285        let record = CidxRec::new(cid, pack_id, offset, data.len() as u32, kind, 0);
286        self.append_cidx_record(&record).await?;
287
288        // Update bloom filters
289        let type_part = (kind as u16) << 8;
290        let time_bucket = 0; // Would be current time bucket
291        self.bloom_filters.insert(&cid, pack_id, type_part, time_bucket);
292
293        // Check if pack is full
294        if offset + data.len() as u64 >= PACK_SIZE_TARGET {
295            self.close_current_pack().await?;
296        }
297
298        Ok(cid)
299    }
300
301    /// Retrieve data by CID
302    pub async fn get(&self, cid: &Cid) -> io::Result<Vec<u8>> {
303        // Use bloom filters to narrow search
304        if !self.bloom_filters.contains(cid, None, None) {
305            return Err(io::Error::new(io::ErrorKind::NotFound, "CID not found"));
306        }
307
308        // For now, do a linear search through packs
309        // In real implementation, would use cidx for direct lookup
310        for (pack_id, _meta) in &self.packs {
311            let pack_path = self.base_path.join(format!("pack_{:08}.dat", pack_id));
312            let mut file = File::open(pack_path)?;
313
314            // This is highly inefficient - real impl would use cidx for direct access
315            let mut data = Vec::new();
316            file.read_to_end(&mut data)?;
317
318            // Check if this pack contains our data (simplified)
319            if data.len() > 32 && &data[..32] == cid.as_bytes() {
320                return Ok(data[32..].to_vec()); // Remove CID prefix if stored
321            }
322        }
323
324        Err(io::Error::new(io::ErrorKind::NotFound, "CID not found"))
325    }
326
327    /// Ensure we have an active pack writer
328    async fn ensure_pack_writer(&mut self, band: PackBand) -> io::Result<()> {
329        if self.current_pack.is_none() {
330            let pack_id = self.next_pack_id;
331            self.next_pack_id += 1;
332
333            let pack_path = self.base_path.join(format!("pack_{:08}.dat", pack_id));
334            let file = OpenOptions::new()
335                .read(true)
336                .write(true)
337                .create(true)
338                .open(pack_path)?;
339
340            self.current_pack = Some(PackWriter {
341                pack_id,
342                file,
343                current_offset: 0,
344                band,
345            });
346
347            // Record pack metadata
348            self.packs.insert(pack_id, PackMeta {
349                id: pack_id,
350                band,
351                size: 0,
352                object_count: 0,
353                created_at: std::time::SystemTime::now()
354                    .duration_since(std::time::UNIX_EPOCH)
355                    .unwrap()
356                    .as_secs(),
357            });
358        }
359        Ok(())
360    }
361
362    /// Close current pack
363    async fn close_current_pack(&mut self) -> io::Result<()> {
364        if let Some(mut writer) = self.current_pack.take() {
365            writer.file.flush()?;
366            info!("Closed pack {}", writer.pack_id);
367        }
368        Ok(())
369    }
370
371    /// Append record to cidx file
372    async fn append_cidx_record(&mut self, record: &CidxRec) -> io::Result<()> {
373        self.cidx_file.seek(SeekFrom::End(0))?;
374        let bytes = unsafe {
375            std::slice::from_raw_parts(
376                record as *const CidxRec as *const u8,
377                std::mem::size_of::<CidxRec>(),
378            )
379        };
380        self.cidx_file.write_all(bytes)?;
381        self.cidx_file.flush()?;
382        Ok(())
383    }
384}
385
386/// Pack writer for building pack files
387struct PackWriter {
388    pack_id: u32,
389    file: File,
390    current_offset: u64,
391    band: PackBand,
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397    use tempfile::tempdir;
398
399    #[tokio::test]
400    async fn test_pack_cas_basic() {
401        let temp_dir = tempdir().unwrap();
402        let mut cas = PackCAS::open(temp_dir.path()).await.unwrap();
403
404        let data = b"Hello, PackCAS!";
405        let cid = cas.put(data, 1, PackBand::Small).await.unwrap();
406
407        let retrieved = cas.get(&cid).await.unwrap();
408        assert_eq!(retrieved, data);
409    }
410
411    #[test]
412    fn test_cidx_record() {
413        let cid = Cid::hash(b"test data");
414        let record = CidxRec::new(cid, 42, 1024, 100, 1, 0);
415
416        assert!(record.verify_crc());
417        assert_eq!(record.pack_id, 42);
418        assert_eq!(record.offset, 1024);
419        assert_eq!(record.len, 100);
420    }
421
422    #[test]
423    fn test_bloom_filters() {
424        let mut filters = BloomFilters::new();
425        let cid = Cid::hash(b"test");
426
427        filters.insert(&cid, 1, 100, 1234567890);
428
429        assert!(filters.contains(&cid, None, None));
430        assert!(filters.contains(&cid, Some(1), None));
431        assert!(filters.contains(&cid, Some(1), Some((100, 1234567890))));
432
433        let other_cid = Cid::hash(b"other");
434        assert!(!filters.contains(&other_cid, None, None));
435    }
436}