1use parking_lot::RwLock;
57use std::collections::HashMap;
58use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
59
60use crate::block_storage::{
61 BlockCompression, BlockHeader, BlockRef, is_compressible, is_json_content, is_soch_content,
62};
63use crate::{Result, SochDBError};
64
65const DEFAULT_SHARD_COUNT: usize = 8;
67
68const DEFAULT_SEGMENT_SIZE: u64 = 64 * 1024 * 1024;
70
71pub struct BlockShard {
73 id: usize,
75 data: RwLock<Vec<u8>>,
77 next_offset: AtomicU64,
79 index: RwLock<HashMap<u64, BlockRef>>,
81 ref_counts: RwLock<HashMap<u64, AtomicU32>>,
83 bytes_written: AtomicU64,
85 blocks_written: AtomicU64,
87}
88
89impl BlockShard {
90 pub fn new(id: usize) -> Self {
92 Self {
93 id,
94 data: RwLock::new(Vec::new()),
95 next_offset: AtomicU64::new(0),
96 index: RwLock::new(HashMap::new()),
97 ref_counts: RwLock::new(HashMap::new()),
98 bytes_written: AtomicU64::new(0),
99 blocks_written: AtomicU64::new(0),
100 }
101 }
102
103 pub fn write_block(&self, data: &[u8], compression: BlockCompression) -> Result<BlockRef> {
105 let compressed = self.compress(data, compression)?;
107
108 let checksum = crc32fast::hash(&compressed);
110
111 let header_size = BlockHeader::SIZE;
113 let total_size = header_size + compressed.len();
114 let local_offset = self
115 .next_offset
116 .fetch_add(total_size as u64, Ordering::SeqCst);
117
118 let header = BlockHeader {
120 magic: BlockHeader::MAGIC,
121 compression: compression as u8,
122 original_size: data.len() as u32,
123 compressed_size: compressed.len() as u32,
124 checksum,
125 };
126
127 {
129 let mut store = self.data.write();
130 let required_size = (local_offset + total_size as u64) as usize;
131 if store.len() < required_size {
132 store.resize(required_size, 0);
133 }
134
135 let header_bytes = header.to_bytes();
137 store[local_offset as usize..local_offset as usize + header_size]
138 .copy_from_slice(&header_bytes);
139
140 store[local_offset as usize + header_size..local_offset as usize + total_size]
142 .copy_from_slice(&compressed);
143 }
144
145 let block_ref = BlockRef {
147 store_offset: local_offset,
148 compressed_len: compressed.len() as u32,
149 original_len: data.len() as u32,
150 compression,
151 checksum,
152 };
153
154 self.index.write().insert(local_offset, block_ref.clone());
156
157 self.ref_counts
159 .write()
160 .insert(local_offset, AtomicU32::new(1));
161
162 self.bytes_written
164 .fetch_add(total_size as u64, Ordering::Relaxed);
165 self.blocks_written.fetch_add(1, Ordering::Relaxed);
166
167 Ok(block_ref)
168 }
169
170 pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
172 let offset = block_ref.store_offset as usize;
173 let header_size = BlockHeader::SIZE;
174 let total_size = header_size + block_ref.compressed_len as usize;
175
176 let compressed = {
178 let store = self.data.read();
179 if offset + total_size > store.len() {
180 return Err(SochDBError::Corruption(format!(
181 "Block at offset {} extends beyond shard {} data (size {})",
182 offset,
183 self.id,
184 store.len()
185 )));
186 }
187
188 let header = BlockHeader::from_bytes(&store[offset..offset + header_size])?;
190
191 if header.checksum != block_ref.checksum {
193 return Err(SochDBError::Corruption(format!(
194 "Checksum mismatch in shard {}: expected {}, got {}",
195 self.id, block_ref.checksum, header.checksum
196 )));
197 }
198
199 store[offset + header_size..offset + total_size].to_vec()
200 };
201
202 let computed_checksum = crc32fast::hash(&compressed);
204 if computed_checksum != block_ref.checksum {
205 return Err(SochDBError::Corruption(format!(
206 "Data checksum mismatch in shard {}: expected {}, got {}",
207 self.id, block_ref.checksum, computed_checksum
208 )));
209 }
210
211 self.decompress(
213 &compressed,
214 block_ref.compression,
215 block_ref.original_len as usize,
216 )
217 }
218
219 pub fn add_ref(&self, offset: u64) {
221 let refs = self.ref_counts.read();
222 if let Some(count) = refs.get(&offset) {
223 count.fetch_add(1, Ordering::Relaxed);
224 }
225 }
226
227 pub fn release_ref(&self, offset: u64) -> bool {
229 let refs = self.ref_counts.read();
230 if let Some(count) = refs.get(&offset) {
231 let prev = count.fetch_sub(1, Ordering::Relaxed);
232 return prev == 1; }
234 false
235 }
236
237 pub fn stats(&self) -> ShardStats {
239 let index = self.index.read();
240 let mut total_original = 0u64;
241 let mut total_compressed = 0u64;
242
243 for block_ref in index.values() {
244 total_original += block_ref.original_len as u64;
245 total_compressed += block_ref.compressed_len as u64;
246 }
247
248 ShardStats {
249 shard_id: self.id,
250 block_count: index.len(),
251 bytes_written: self.bytes_written.load(Ordering::Relaxed),
252 total_original_bytes: total_original,
253 total_compressed_bytes: total_compressed,
254 }
255 }
256
257 fn compress(&self, data: &[u8], compression: BlockCompression) -> Result<Vec<u8>> {
259 match compression {
260 BlockCompression::None => Ok(data.to_vec()),
261 BlockCompression::Lz4 => match lz4::block::compress(data, None, false) {
262 Ok(compressed) if compressed.len() < data.len() => Ok(compressed),
263 _ => Ok(data.to_vec()),
264 },
265 BlockCompression::Zstd => match zstd::encode_all(data, 3) {
266 Ok(compressed) if compressed.len() < data.len() => Ok(compressed),
267 _ => Ok(data.to_vec()),
268 },
269 }
270 }
271
272 fn decompress(
274 &self,
275 data: &[u8],
276 compression: BlockCompression,
277 original_size: usize,
278 ) -> Result<Vec<u8>> {
279 match compression {
280 BlockCompression::None => Ok(data.to_vec()),
281 BlockCompression::Lz4 => {
282 if data.len() == original_size {
283 return Ok(data.to_vec());
284 }
285 lz4::block::decompress(data, Some(original_size as i32))
286 .map_err(|e| SochDBError::Corruption(format!("LZ4 decompress failed: {}", e)))
287 }
288 BlockCompression::Zstd => {
289 if data.len() == original_size {
290 return Ok(data.to_vec());
291 }
292 zstd::decode_all(data)
293 .map_err(|e| SochDBError::Corruption(format!("ZSTD decompress failed: {}", e)))
294 }
295 }
296 }
297}
298
299#[derive(Debug, Clone)]
301pub struct ShardStats {
302 pub shard_id: usize,
303 pub block_count: usize,
304 pub bytes_written: u64,
305 pub total_original_bytes: u64,
306 pub total_compressed_bytes: u64,
307}
308
309pub struct ShardedBlockStore {
311 shards: Vec<BlockShard>,
313 shard_count: usize,
315 #[allow(dead_code)]
317 segment_size: u64,
318 total_writes: AtomicU64,
320}
321
322impl ShardedBlockStore {
323 pub fn new() -> Self {
325 Self::with_shards(DEFAULT_SHARD_COUNT)
326 }
327
328 pub fn with_shards(shard_count: usize) -> Self {
330 let shards = (0..shard_count).map(BlockShard::new).collect();
331
332 Self {
333 shards,
334 shard_count,
335 segment_size: DEFAULT_SEGMENT_SIZE,
336 total_writes: AtomicU64::new(0),
337 }
338 }
339
340 #[inline]
342 fn shard_for_file(&self, file_id: u64) -> usize {
343 let mut h = file_id;
345 h ^= h >> 33;
346 h = h.wrapping_mul(0xff51afd7ed558ccd);
347 h ^= h >> 33;
348 h = h.wrapping_mul(0xc4ceb9fe1a85ec53);
349 h ^= h >> 33;
350 (h as usize) % self.shard_count
351 }
352
353 #[inline]
355 #[allow(dead_code)]
356 fn shard_for_offset(&self, offset: u64) -> usize {
357 ((offset / self.segment_size) as usize) % self.shard_count
358 }
359
360 pub fn write_block(&self, file_id: u64, data: &[u8]) -> Result<ShardedBlockRef> {
362 let shard_id = self.shard_for_file(file_id);
363 let compression = self.select_compression(data);
364
365 let block_ref = self.shards[shard_id].write_block(data, compression)?;
366
367 self.total_writes.fetch_add(1, Ordering::Relaxed);
368
369 Ok(ShardedBlockRef {
370 shard_id,
371 block_ref,
372 })
373 }
374
375 pub fn write_block_with_compression(
377 &self,
378 file_id: u64,
379 data: &[u8],
380 compression: BlockCompression,
381 ) -> Result<ShardedBlockRef> {
382 let shard_id = self.shard_for_file(file_id);
383 let block_ref = self.shards[shard_id].write_block(data, compression)?;
384
385 self.total_writes.fetch_add(1, Ordering::Relaxed);
386
387 Ok(ShardedBlockRef {
388 shard_id,
389 block_ref,
390 })
391 }
392
393 pub fn read_block(&self, shard_ref: &ShardedBlockRef) -> Result<Vec<u8>> {
395 if shard_ref.shard_id >= self.shard_count {
396 return Err(SochDBError::Corruption(format!(
397 "Invalid shard ID: {} (max {})",
398 shard_ref.shard_id,
399 self.shard_count - 1
400 )));
401 }
402 self.shards[shard_ref.shard_id].read_block(&shard_ref.block_ref)
403 }
404
405 pub fn add_ref(&self, shard_ref: &ShardedBlockRef) {
407 if shard_ref.shard_id < self.shard_count {
408 self.shards[shard_ref.shard_id].add_ref(shard_ref.block_ref.store_offset);
409 }
410 }
411
412 pub fn release_ref(&self, shard_ref: &ShardedBlockRef) -> bool {
414 if shard_ref.shard_id < self.shard_count {
415 self.shards[shard_ref.shard_id].release_ref(shard_ref.block_ref.store_offset)
416 } else {
417 false
418 }
419 }
420
421 pub fn stats(&self) -> ShardedBlockStoreStats {
423 let shard_stats: Vec<ShardStats> = self.shards.iter().map(|s| s.stats()).collect();
424
425 let total_blocks: usize = shard_stats.iter().map(|s| s.block_count).sum();
426 let total_bytes: u64 = shard_stats.iter().map(|s| s.bytes_written).sum();
427 let total_original: u64 = shard_stats.iter().map(|s| s.total_original_bytes).sum();
428 let total_compressed: u64 = shard_stats.iter().map(|s| s.total_compressed_bytes).sum();
429
430 ShardedBlockStoreStats {
431 shard_count: self.shard_count,
432 total_blocks,
433 total_bytes_written: total_bytes,
434 total_original_bytes: total_original,
435 total_compressed_bytes: total_compressed,
436 compression_ratio: if total_compressed > 0 {
437 total_original as f64 / total_compressed as f64
438 } else {
439 1.0
440 },
441 shard_stats,
442 }
443 }
444
445 fn select_compression(&self, data: &[u8]) -> BlockCompression {
447 if data.len() < 128 {
448 return BlockCompression::None;
449 }
450
451 if is_soch_content(data) {
452 BlockCompression::Zstd
453 } else if is_json_content(data) || is_compressible(data) {
454 BlockCompression::Lz4
455 } else {
456 BlockCompression::None
457 }
458 }
459}
460
461impl Default for ShardedBlockStore {
462 fn default() -> Self {
463 Self::new()
464 }
465}
466
467#[derive(Debug, Clone)]
469pub struct ShardedBlockRef {
470 pub shard_id: usize,
472 pub block_ref: BlockRef,
474}
475
476impl ShardedBlockRef {
477 pub fn to_bytes(&self) -> Vec<u8> {
479 let mut buf = Vec::with_capacity(4 + 21); buf.extend(&(self.shard_id as u32).to_le_bytes());
481 buf.extend(&self.block_ref.to_bytes().unwrap_or([0u8; 21]));
482 buf
483 }
484
485 pub fn from_bytes(data: &[u8]) -> Result<Self> {
487 if data.len() < 25 {
488 return Err(SochDBError::Corruption("ShardedBlockRef too short".into()));
489 }
490 let shard_id = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
491 let block_ref = BlockRef::from_bytes(&data[4..])?;
492 Ok(Self {
493 shard_id,
494 block_ref,
495 })
496 }
497}
498
499#[derive(Debug, Clone)]
501pub struct ShardedBlockStoreStats {
502 pub shard_count: usize,
503 pub total_blocks: usize,
504 pub total_bytes_written: u64,
505 pub total_original_bytes: u64,
506 pub total_compressed_bytes: u64,
507 pub compression_ratio: f64,
508 pub shard_stats: Vec<ShardStats>,
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514
515 #[test]
516 fn test_sharded_store_basic() {
517 let store = ShardedBlockStore::new();
518
519 let data = b"Hello, sharded world!";
520 let shard_ref = store.write_block(1, data).unwrap();
521
522 let recovered = store.read_block(&shard_ref).unwrap();
523 assert_eq!(recovered, data);
524 }
525
526 #[test]
527 fn test_sharded_store_multiple_files() {
528 let store = ShardedBlockStore::new();
529
530 let mut refs = Vec::new();
532 for file_id in 0..100u64 {
533 let data = format!("Data for file {}", file_id).into_bytes();
534 let shard_ref = store.write_block(file_id, &data).unwrap();
535 refs.push((file_id, shard_ref, data));
536 }
537
538 for (file_id, shard_ref, expected) in refs {
540 let recovered = store.read_block(&shard_ref).unwrap();
541 assert_eq!(recovered, expected, "File {} mismatch", file_id);
542 }
543 }
544
545 #[test]
546 fn test_sharded_store_distribution() {
547 let store = ShardedBlockStore::with_shards(4);
548
549 for i in 0..1000u64 {
551 let data = vec![i as u8; 64];
552 store.write_block(i, &data).unwrap();
553 }
554
555 let stats = store.stats();
556
557 for shard_stat in &stats.shard_stats {
559 assert!(
561 shard_stat.block_count > 0,
562 "Shard {} has no blocks",
563 shard_stat.shard_id
564 );
565 }
566
567 assert_eq!(stats.total_blocks, 1000);
569 }
570
571 #[test]
572 fn test_sharded_ref_serialization() {
573 let shard_ref = ShardedBlockRef {
574 shard_id: 3,
575 block_ref: BlockRef {
576 store_offset: 12345,
577 compressed_len: 100,
578 original_len: 200,
579 compression: BlockCompression::Lz4,
580 checksum: 0xDEADBEEF,
581 },
582 };
583
584 let bytes = shard_ref.to_bytes();
585 let recovered = ShardedBlockRef::from_bytes(&bytes).unwrap();
586
587 assert_eq!(recovered.shard_id, 3);
588 assert_eq!(recovered.block_ref.store_offset, 12345);
589 assert_eq!(recovered.block_ref.compression, BlockCompression::Lz4);
590 }
591
592 #[test]
593 fn test_sharded_store_compression() {
594 let store = ShardedBlockStore::new();
595
596 let data = vec![0u8; 4096];
598 let shard_ref = store.write_block(1, &data).unwrap();
599
600 assert!(shard_ref.block_ref.compressed_len < shard_ref.block_ref.original_len);
602
603 let recovered = store.read_block(&shard_ref).unwrap();
605 assert_eq!(recovered, data);
606 }
607
608 #[test]
609 fn test_ref_counting() {
610 let store = ShardedBlockStore::new();
611
612 let data = b"Reference counted block";
613 let shard_ref = store.write_block(1, data).unwrap();
614
615 store.add_ref(&shard_ref);
617 store.add_ref(&shard_ref);
618
619 assert!(!store.release_ref(&shard_ref)); assert!(!store.release_ref(&shard_ref)); assert!(store.release_ref(&shard_ref)); }
624}