1use fcdb_core::{Cid, varint};
8use std::collections::HashMap;
9use std::fs::{File, OpenOptions};
10use std::io::{self, Read, Write, Seek, SeekFrom};
11use std::path::{Path, PathBuf};
12use memmap2::Mmap;
13use bloom::{BloomFilter, ASMS};
14use crc32fast::Hasher as Crc32;
15use tracing::{info, warn, error};
16
17const PACK_SIZE_TARGET: u64 = 256 * 1024 * 1024; const PACK_SIZE_MAX: u64 = 512 * 1024 * 1024; #[derive(Clone, Copy, Debug, PartialEq, Eq)]
23pub enum PackBand {
24 Small, Index, Blob, }
28
29#[derive(Clone, Debug)]
31pub struct PackMeta {
32 pub id: u32,
33 pub band: PackBand,
34 pub size: u64,
35 pub object_count: u64,
36 pub created_at: u64,
37}
38
39#[repr(C)]
41#[derive(Clone, Copy, Debug)]
42pub struct CidxRec {
43 pub cid: [u8; 32], pub pack_id: u32, pub offset: u64, pub len: u32, pub kind: u8, pub flags: u8, pub crc: u32, pub _pad: [u8; 10], }
52
53impl CidxRec {
54 pub fn new(cid: Cid, pack_id: u32, offset: u64, len: u32, kind: u8, flags: u8) -> Self {
56 let mut crc = Crc32::new();
57 crc.update(cid.as_bytes());
58 crc.update(&pack_id.to_le_bytes());
59 crc.update(&offset.to_le_bytes());
60 crc.update(&len.to_le_bytes());
61 crc.update(&[kind, flags]);
62
63 Self {
64 cid: *cid.as_bytes(),
65 pack_id,
66 offset,
67 len,
68 kind,
69 flags,
70 crc: crc.finalize(),
71 _pad: [0; 10],
72 }
73 }
74
75 pub fn verify_crc(&self) -> bool {
77 let mut crc = Crc32::new();
78 crc.update(&self.cid);
79 crc.update(&self.pack_id.to_le_bytes());
80 crc.update(&self.offset.to_le_bytes());
81 crc.update(&self.len.to_le_bytes());
82 crc.update(&[self.kind, self.flags]);
83 crc.finalize() == self.crc
84 }
85}
86
87#[derive(Clone, Debug)]
89pub struct BloomConfig {
90 pub expected_items: usize,
91 pub fp_rate: f64,
92}
93
94impl Default for BloomConfig {
95 fn default() -> Self {
96 Self {
97 expected_items: 1_000_000,
98 fp_rate: 1e-6, }
100 }
101}
102
103pub struct BloomFilters {
105 global: BloomFilter,
106 pack_filters: HashMap<u32, BloomFilter>,
107 shard_filters: HashMap<(u16, u64), BloomFilter>, }
109
110impl BloomFilters {
111 pub fn new() -> Self {
112 Self {
113 global: BloomFilter::with_rate(BloomConfig::default().fp_rate as f32, BloomConfig::default().expected_items as u32),
114 pack_filters: HashMap::new(),
115 shard_filters: HashMap::new(),
116 }
117 }
118
119 pub fn insert(&mut self, cid: &Cid, pack_id: u32, type_part: u16, time_bucket: u64) {
120 self.global.insert(cid.as_bytes());
122
123 self.pack_filters
125 .entry(pack_id)
126 .or_insert_with(|| BloomFilter::with_rate(1e-7, 100_000))
127 .insert(cid.as_bytes());
128
129 self.shard_filters
131 .entry((type_part, time_bucket))
132 .or_insert_with(|| BloomFilter::with_rate(1e-8, 10_000))
133 .insert(cid.as_bytes());
134 }
135
136 pub fn contains(&self, cid: &Cid, pack_id: Option<u32>, shard: Option<(u16, u64)>) -> bool {
137 if !self.global.contains(cid.as_bytes()) {
139 return false;
140 }
141
142 if let Some(pack_id) = pack_id {
144 if let Some(filter) = self.pack_filters.get(&pack_id) {
145 if !filter.contains(cid.as_bytes()) {
146 return false;
147 }
148 }
149 }
150
151 if let Some((type_part, time_bucket)) = shard {
153 if let Some(filter) = self.shard_filters.get(&(type_part, time_bucket)) {
154 if !filter.contains(cid.as_bytes()) {
155 return false;
156 }
157 }
158 }
159
160 true
161 }
162}
163
164pub struct PackCAS {
166 base_path: PathBuf,
167 current_pack: Option<PackWriter>,
168 packs: HashMap<u32, PackMeta>,
169 cidx_file: File,
170 bloom_filters: BloomFilters,
171 next_pack_id: u32,
172}
173
174impl PackCAS {
175 pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
177 let base_path = path.as_ref().to_path_buf();
178 std::fs::create_dir_all(&base_path)?;
179
180 let cidx_path = base_path.join("cidx.dat");
181 let cidx_file = OpenOptions::new()
182 .read(true)
183 .write(true)
184 .create(true)
185 .open(cidx_path)?;
186
187 let mut cas = Self {
188 base_path,
189 current_pack: None,
190 packs: HashMap::new(),
191 cidx_file,
192 bloom_filters: BloomFilters::new(),
193 next_pack_id: 0,
194 };
195
196 cas.load_existing_packs().await?;
197 cas.load_cidx().await?;
198
199 Ok(cas)
200 }
201
202 async fn load_existing_packs(&mut self) -> io::Result<()> {
204 let mut pack_id = 0;
205 loop {
206 let pack_path = self.base_path.join(format!("pack_{:08}.dat", pack_id));
207 if !pack_path.exists() {
208 break;
209 }
210
211 let meta = PackMeta {
213 id: pack_id,
214 band: PackBand::Blob, size: std::fs::metadata(&pack_path)?.len(),
216 object_count: 0, created_at: 0,
218 };
219
220 self.packs.insert(pack_id, meta);
221 pack_id += 1;
222 }
223 self.next_pack_id = pack_id;
224
225 Ok(())
226 }
227
228 async fn load_cidx(&mut self) -> io::Result<()> {
230 let file_size = self.cidx_file.metadata()?.len();
231 let record_count = file_size / std::mem::size_of::<CidxRec>() as u64;
232
233 let mmap = unsafe { Mmap::map(&self.cidx_file)? };
235 let records = unsafe {
236 std::slice::from_raw_parts(
237 mmap.as_ptr() as *const CidxRec,
238 record_count as usize,
239 )
240 };
241
242 for record in records {
244 if !record.verify_crc() {
245 warn!("Cidx record CRC mismatch, skipping");
246 continue;
247 }
248
249 let cid = Cid::from_bytes(record.cid);
250 let pack_id = record.pack_id;
251 let type_part = (record.kind as u16) << 8; let time_bucket = 0; self.bloom_filters.insert(&cid, pack_id, type_part, time_bucket);
255 }
256
257 info!("Loaded {} cidx records", record_count);
258 Ok(())
259 }
260
261 pub async fn put(&mut self, data: &[u8], kind: u8, band: PackBand) -> io::Result<Cid> {
263 let cid = Cid::hash(data);
264
265 if self.bloom_filters.contains(&cid, None, None) {
267 return Ok(cid);
269 }
270
271 self.ensure_pack_writer(band).await?;
273
274 let (offset, pack_id) = if let Some(writer) = &mut self.current_pack {
275 let offset = writer.current_offset;
276 let pack_id = writer.pack_id;
277 writer.file.write_all(data)?;
278 writer.current_offset += data.len() as u64;
279 (offset, pack_id)
280 } else {
281 return Err(io::Error::new(io::ErrorKind::Other, "No current pack writer"));
282 };
283
284 let record = CidxRec::new(cid, pack_id, offset, data.len() as u32, kind, 0);
286 self.append_cidx_record(&record).await?;
287
288 let type_part = (kind as u16) << 8;
290 let time_bucket = 0; self.bloom_filters.insert(&cid, pack_id, type_part, time_bucket);
292
293 if offset + data.len() as u64 >= PACK_SIZE_TARGET {
295 self.close_current_pack().await?;
296 }
297
298 Ok(cid)
299 }
300
301 pub async fn get(&self, cid: &Cid) -> io::Result<Vec<u8>> {
303 if !self.bloom_filters.contains(cid, None, None) {
305 return Err(io::Error::new(io::ErrorKind::NotFound, "CID not found"));
306 }
307
308 for (pack_id, _meta) in &self.packs {
311 let pack_path = self.base_path.join(format!("pack_{:08}.dat", pack_id));
312 let mut file = File::open(pack_path)?;
313
314 let mut data = Vec::new();
316 file.read_to_end(&mut data)?;
317
318 if data.len() > 32 && &data[..32] == cid.as_bytes() {
320 return Ok(data[32..].to_vec()); }
322 }
323
324 Err(io::Error::new(io::ErrorKind::NotFound, "CID not found"))
325 }
326
327 async fn ensure_pack_writer(&mut self, band: PackBand) -> io::Result<()> {
329 if self.current_pack.is_none() {
330 let pack_id = self.next_pack_id;
331 self.next_pack_id += 1;
332
333 let pack_path = self.base_path.join(format!("pack_{:08}.dat", pack_id));
334 let file = OpenOptions::new()
335 .read(true)
336 .write(true)
337 .create(true)
338 .open(pack_path)?;
339
340 self.current_pack = Some(PackWriter {
341 pack_id,
342 file,
343 current_offset: 0,
344 band,
345 });
346
347 self.packs.insert(pack_id, PackMeta {
349 id: pack_id,
350 band,
351 size: 0,
352 object_count: 0,
353 created_at: std::time::SystemTime::now()
354 .duration_since(std::time::UNIX_EPOCH)
355 .unwrap()
356 .as_secs(),
357 });
358 }
359 Ok(())
360 }
361
362 async fn close_current_pack(&mut self) -> io::Result<()> {
364 if let Some(mut writer) = self.current_pack.take() {
365 writer.file.flush()?;
366 info!("Closed pack {}", writer.pack_id);
367 }
368 Ok(())
369 }
370
371 async fn append_cidx_record(&mut self, record: &CidxRec) -> io::Result<()> {
373 self.cidx_file.seek(SeekFrom::End(0))?;
374 let bytes = unsafe {
375 std::slice::from_raw_parts(
376 record as *const CidxRec as *const u8,
377 std::mem::size_of::<CidxRec>(),
378 )
379 };
380 self.cidx_file.write_all(bytes)?;
381 self.cidx_file.flush()?;
382 Ok(())
383 }
384}
385
386struct PackWriter {
388 pack_id: u32,
389 file: File,
390 current_offset: u64,
391 band: PackBand,
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397 use tempfile::tempdir;
398
399 #[tokio::test]
400 async fn test_pack_cas_basic() {
401 let temp_dir = tempdir().unwrap();
402 let mut cas = PackCAS::open(temp_dir.path()).await.unwrap();
403
404 let data = b"Hello, PackCAS!";
405 let cid = cas.put(data, 1, PackBand::Small).await.unwrap();
406
407 let retrieved = cas.get(&cid).await.unwrap();
408 assert_eq!(retrieved, data);
409 }
410
411 #[test]
412 fn test_cidx_record() {
413 let cid = Cid::hash(b"test data");
414 let record = CidxRec::new(cid, 42, 1024, 100, 1, 0);
415
416 assert!(record.verify_crc());
417 assert_eq!(record.pack_id, 42);
418 assert_eq!(record.offset, 1024);
419 assert_eq!(record.len, 100);
420 }
421
422 #[test]
423 fn test_bloom_filters() {
424 let mut filters = BloomFilters::new();
425 let cid = Cid::hash(b"test");
426
427 filters.insert(&cid, 1, 100, 1234567890);
428
429 assert!(filters.contains(&cid, None, None));
430 assert!(filters.contains(&cid, Some(1), None));
431 assert!(filters.contains(&cid, Some(1), Some((100, 1234567890))));
432
433 let other_cid = Cid::hash(b"other");
434 assert!(!filters.contains(&other_cid, None, None));
435 }
436}