1use parking_lot::RwLock;
54use std::collections::HashMap;
55use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
56
57use crate::block_storage::{
58 BlockCompression, BlockHeader, BlockRef, is_compressible, is_json_content, is_soch_content,
59};
60use crate::{Result, SochDBError};
61
62const DEFAULT_SHARD_COUNT: usize = 8;
64
65const DEFAULT_SEGMENT_SIZE: u64 = 64 * 1024 * 1024;
67
68pub struct BlockShard {
70 id: usize,
72 data: RwLock<Vec<u8>>,
74 next_offset: AtomicU64,
76 index: RwLock<HashMap<u64, BlockRef>>,
78 ref_counts: RwLock<HashMap<u64, AtomicU32>>,
80 bytes_written: AtomicU64,
82 blocks_written: AtomicU64,
84}
85
86impl BlockShard {
87 pub fn new(id: usize) -> Self {
89 Self {
90 id,
91 data: RwLock::new(Vec::new()),
92 next_offset: AtomicU64::new(0),
93 index: RwLock::new(HashMap::new()),
94 ref_counts: RwLock::new(HashMap::new()),
95 bytes_written: AtomicU64::new(0),
96 blocks_written: AtomicU64::new(0),
97 }
98 }
99
100 pub fn write_block(&self, data: &[u8], compression: BlockCompression) -> Result<BlockRef> {
102 let compressed = self.compress(data, compression)?;
104
105 let checksum = crc32fast::hash(&compressed);
107
108 let header_size = BlockHeader::SIZE;
110 let total_size = header_size + compressed.len();
111 let local_offset = self
112 .next_offset
113 .fetch_add(total_size as u64, Ordering::SeqCst);
114
115 let header = BlockHeader {
117 magic: BlockHeader::MAGIC,
118 compression: compression as u8,
119 original_size: data.len() as u32,
120 compressed_size: compressed.len() as u32,
121 checksum,
122 };
123
124 {
126 let mut store = self.data.write();
127 let required_size = (local_offset + total_size as u64) as usize;
128 if store.len() < required_size {
129 store.resize(required_size, 0);
130 }
131
132 let header_bytes = header.to_bytes();
134 store[local_offset as usize..local_offset as usize + header_size]
135 .copy_from_slice(&header_bytes);
136
137 store[local_offset as usize + header_size..local_offset as usize + total_size]
139 .copy_from_slice(&compressed);
140 }
141
142 let block_ref = BlockRef {
144 store_offset: local_offset,
145 compressed_len: compressed.len() as u32,
146 original_len: data.len() as u32,
147 compression,
148 checksum,
149 };
150
151 self.index.write().insert(local_offset, block_ref.clone());
153
154 self.ref_counts
156 .write()
157 .insert(local_offset, AtomicU32::new(1));
158
159 self.bytes_written
161 .fetch_add(total_size as u64, Ordering::Relaxed);
162 self.blocks_written.fetch_add(1, Ordering::Relaxed);
163
164 Ok(block_ref)
165 }
166
167 pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
169 let offset = block_ref.store_offset as usize;
170 let header_size = BlockHeader::SIZE;
171 let total_size = header_size + block_ref.compressed_len as usize;
172
173 let compressed = {
175 let store = self.data.read();
176 if offset + total_size > store.len() {
177 return Err(SochDBError::Corruption(format!(
178 "Block at offset {} extends beyond shard {} data (size {})",
179 offset,
180 self.id,
181 store.len()
182 )));
183 }
184
185 let header = BlockHeader::from_bytes(&store[offset..offset + header_size])?;
187
188 if header.checksum != block_ref.checksum {
190 return Err(SochDBError::Corruption(format!(
191 "Checksum mismatch in shard {}: expected {}, got {}",
192 self.id, block_ref.checksum, header.checksum
193 )));
194 }
195
196 store[offset + header_size..offset + total_size].to_vec()
197 };
198
199 let computed_checksum = crc32fast::hash(&compressed);
201 if computed_checksum != block_ref.checksum {
202 return Err(SochDBError::Corruption(format!(
203 "Data checksum mismatch in shard {}: expected {}, got {}",
204 self.id, block_ref.checksum, computed_checksum
205 )));
206 }
207
208 self.decompress(
210 &compressed,
211 block_ref.compression,
212 block_ref.original_len as usize,
213 )
214 }
215
216 pub fn add_ref(&self, offset: u64) {
218 let refs = self.ref_counts.read();
219 if let Some(count) = refs.get(&offset) {
220 count.fetch_add(1, Ordering::Relaxed);
221 }
222 }
223
224 pub fn release_ref(&self, offset: u64) -> bool {
226 let refs = self.ref_counts.read();
227 if let Some(count) = refs.get(&offset) {
228 let prev = count.fetch_sub(1, Ordering::Relaxed);
229 return prev == 1; }
231 false
232 }
233
234 pub fn stats(&self) -> ShardStats {
236 let index = self.index.read();
237 let mut total_original = 0u64;
238 let mut total_compressed = 0u64;
239
240 for block_ref in index.values() {
241 total_original += block_ref.original_len as u64;
242 total_compressed += block_ref.compressed_len as u64;
243 }
244
245 ShardStats {
246 shard_id: self.id,
247 block_count: index.len(),
248 bytes_written: self.bytes_written.load(Ordering::Relaxed),
249 total_original_bytes: total_original,
250 total_compressed_bytes: total_compressed,
251 }
252 }
253
254 fn compress(&self, data: &[u8], compression: BlockCompression) -> Result<Vec<u8>> {
256 match compression {
257 BlockCompression::None => Ok(data.to_vec()),
258 BlockCompression::Lz4 => match lz4::block::compress(data, None, false) {
259 Ok(compressed) if compressed.len() < data.len() => Ok(compressed),
260 _ => Ok(data.to_vec()),
261 },
262 BlockCompression::Zstd => match zstd::encode_all(data, 3) {
263 Ok(compressed) if compressed.len() < data.len() => Ok(compressed),
264 _ => Ok(data.to_vec()),
265 },
266 }
267 }
268
269 fn decompress(
271 &self,
272 data: &[u8],
273 compression: BlockCompression,
274 original_size: usize,
275 ) -> Result<Vec<u8>> {
276 match compression {
277 BlockCompression::None => Ok(data.to_vec()),
278 BlockCompression::Lz4 => {
279 if data.len() == original_size {
280 return Ok(data.to_vec());
281 }
282 lz4::block::decompress(data, Some(original_size as i32))
283 .map_err(|e| SochDBError::Corruption(format!("LZ4 decompress failed: {}", e)))
284 }
285 BlockCompression::Zstd => {
286 if data.len() == original_size {
287 return Ok(data.to_vec());
288 }
289 zstd::decode_all(data)
290 .map_err(|e| SochDBError::Corruption(format!("ZSTD decompress failed: {}", e)))
291 }
292 }
293 }
294}
295
296#[derive(Debug, Clone)]
298pub struct ShardStats {
299 pub shard_id: usize,
300 pub block_count: usize,
301 pub bytes_written: u64,
302 pub total_original_bytes: u64,
303 pub total_compressed_bytes: u64,
304}
305
306pub struct ShardedBlockStore {
308 shards: Vec<BlockShard>,
310 shard_count: usize,
312 #[allow(dead_code)]
314 segment_size: u64,
315 total_writes: AtomicU64,
317}
318
319impl ShardedBlockStore {
320 pub fn new() -> Self {
322 Self::with_shards(DEFAULT_SHARD_COUNT)
323 }
324
325 pub fn with_shards(shard_count: usize) -> Self {
327 let shards = (0..shard_count).map(BlockShard::new).collect();
328
329 Self {
330 shards,
331 shard_count,
332 segment_size: DEFAULT_SEGMENT_SIZE,
333 total_writes: AtomicU64::new(0),
334 }
335 }
336
337 #[inline]
339 fn shard_for_file(&self, file_id: u64) -> usize {
340 let mut h = file_id;
342 h ^= h >> 33;
343 h = h.wrapping_mul(0xff51afd7ed558ccd);
344 h ^= h >> 33;
345 h = h.wrapping_mul(0xc4ceb9fe1a85ec53);
346 h ^= h >> 33;
347 (h as usize) % self.shard_count
348 }
349
350 #[inline]
352 #[allow(dead_code)]
353 fn shard_for_offset(&self, offset: u64) -> usize {
354 ((offset / self.segment_size) as usize) % self.shard_count
355 }
356
357 pub fn write_block(&self, file_id: u64, data: &[u8]) -> Result<ShardedBlockRef> {
359 let shard_id = self.shard_for_file(file_id);
360 let compression = self.select_compression(data);
361
362 let block_ref = self.shards[shard_id].write_block(data, compression)?;
363
364 self.total_writes.fetch_add(1, Ordering::Relaxed);
365
366 Ok(ShardedBlockRef {
367 shard_id,
368 block_ref,
369 })
370 }
371
372 pub fn write_block_with_compression(
374 &self,
375 file_id: u64,
376 data: &[u8],
377 compression: BlockCompression,
378 ) -> Result<ShardedBlockRef> {
379 let shard_id = self.shard_for_file(file_id);
380 let block_ref = self.shards[shard_id].write_block(data, compression)?;
381
382 self.total_writes.fetch_add(1, Ordering::Relaxed);
383
384 Ok(ShardedBlockRef {
385 shard_id,
386 block_ref,
387 })
388 }
389
390 pub fn read_block(&self, shard_ref: &ShardedBlockRef) -> Result<Vec<u8>> {
392 if shard_ref.shard_id >= self.shard_count {
393 return Err(SochDBError::Corruption(format!(
394 "Invalid shard ID: {} (max {})",
395 shard_ref.shard_id,
396 self.shard_count - 1
397 )));
398 }
399 self.shards[shard_ref.shard_id].read_block(&shard_ref.block_ref)
400 }
401
402 pub fn add_ref(&self, shard_ref: &ShardedBlockRef) {
404 if shard_ref.shard_id < self.shard_count {
405 self.shards[shard_ref.shard_id].add_ref(shard_ref.block_ref.store_offset);
406 }
407 }
408
409 pub fn release_ref(&self, shard_ref: &ShardedBlockRef) -> bool {
411 if shard_ref.shard_id < self.shard_count {
412 self.shards[shard_ref.shard_id].release_ref(shard_ref.block_ref.store_offset)
413 } else {
414 false
415 }
416 }
417
418 pub fn stats(&self) -> ShardedBlockStoreStats {
420 let shard_stats: Vec<ShardStats> = self.shards.iter().map(|s| s.stats()).collect();
421
422 let total_blocks: usize = shard_stats.iter().map(|s| s.block_count).sum();
423 let total_bytes: u64 = shard_stats.iter().map(|s| s.bytes_written).sum();
424 let total_original: u64 = shard_stats.iter().map(|s| s.total_original_bytes).sum();
425 let total_compressed: u64 = shard_stats.iter().map(|s| s.total_compressed_bytes).sum();
426
427 ShardedBlockStoreStats {
428 shard_count: self.shard_count,
429 total_blocks,
430 total_bytes_written: total_bytes,
431 total_original_bytes: total_original,
432 total_compressed_bytes: total_compressed,
433 compression_ratio: if total_compressed > 0 {
434 total_original as f64 / total_compressed as f64
435 } else {
436 1.0
437 },
438 shard_stats,
439 }
440 }
441
442 fn select_compression(&self, data: &[u8]) -> BlockCompression {
444 if data.len() < 128 {
445 return BlockCompression::None;
446 }
447
448 if is_soch_content(data) {
449 BlockCompression::Zstd
450 } else if is_json_content(data) || is_compressible(data) {
451 BlockCompression::Lz4
452 } else {
453 BlockCompression::None
454 }
455 }
456}
457
458impl Default for ShardedBlockStore {
459 fn default() -> Self {
460 Self::new()
461 }
462}
463
464#[derive(Debug, Clone)]
466pub struct ShardedBlockRef {
467 pub shard_id: usize,
469 pub block_ref: BlockRef,
471}
472
473impl ShardedBlockRef {
474 pub fn to_bytes(&self) -> Vec<u8> {
476 let mut buf = Vec::with_capacity(4 + 21); buf.extend(&(self.shard_id as u32).to_le_bytes());
478 buf.extend(&self.block_ref.to_bytes().unwrap_or([0u8; 21]));
479 buf
480 }
481
482 pub fn from_bytes(data: &[u8]) -> Result<Self> {
484 if data.len() < 25 {
485 return Err(SochDBError::Corruption("ShardedBlockRef too short".into()));
486 }
487 let shard_id = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
488 let block_ref = BlockRef::from_bytes(&data[4..])?;
489 Ok(Self {
490 shard_id,
491 block_ref,
492 })
493 }
494}
495
496#[derive(Debug, Clone)]
498pub struct ShardedBlockStoreStats {
499 pub shard_count: usize,
500 pub total_blocks: usize,
501 pub total_bytes_written: u64,
502 pub total_original_bytes: u64,
503 pub total_compressed_bytes: u64,
504 pub compression_ratio: f64,
505 pub shard_stats: Vec<ShardStats>,
506}
507
508#[cfg(test)]
509mod tests {
510 use super::*;
511
512 #[test]
513 fn test_sharded_store_basic() {
514 let store = ShardedBlockStore::new();
515
516 let data = b"Hello, sharded world!";
517 let shard_ref = store.write_block(1, data).unwrap();
518
519 let recovered = store.read_block(&shard_ref).unwrap();
520 assert_eq!(recovered, data);
521 }
522
523 #[test]
524 fn test_sharded_store_multiple_files() {
525 let store = ShardedBlockStore::new();
526
527 let mut refs = Vec::new();
529 for file_id in 0..100u64 {
530 let data = format!("Data for file {}", file_id).into_bytes();
531 let shard_ref = store.write_block(file_id, &data).unwrap();
532 refs.push((file_id, shard_ref, data));
533 }
534
535 for (file_id, shard_ref, expected) in refs {
537 let recovered = store.read_block(&shard_ref).unwrap();
538 assert_eq!(recovered, expected, "File {} mismatch", file_id);
539 }
540 }
541
542 #[test]
543 fn test_sharded_store_distribution() {
544 let store = ShardedBlockStore::with_shards(4);
545
546 for i in 0..1000u64 {
548 let data = vec![i as u8; 64];
549 store.write_block(i, &data).unwrap();
550 }
551
552 let stats = store.stats();
553
554 for shard_stat in &stats.shard_stats {
556 assert!(
558 shard_stat.block_count > 0,
559 "Shard {} has no blocks",
560 shard_stat.shard_id
561 );
562 }
563
564 assert_eq!(stats.total_blocks, 1000);
566 }
567
568 #[test]
569 fn test_sharded_ref_serialization() {
570 let shard_ref = ShardedBlockRef {
571 shard_id: 3,
572 block_ref: BlockRef {
573 store_offset: 12345,
574 compressed_len: 100,
575 original_len: 200,
576 compression: BlockCompression::Lz4,
577 checksum: 0xDEADBEEF,
578 },
579 };
580
581 let bytes = shard_ref.to_bytes();
582 let recovered = ShardedBlockRef::from_bytes(&bytes).unwrap();
583
584 assert_eq!(recovered.shard_id, 3);
585 assert_eq!(recovered.block_ref.store_offset, 12345);
586 assert_eq!(recovered.block_ref.compression, BlockCompression::Lz4);
587 }
588
589 #[test]
590 fn test_sharded_store_compression() {
591 let store = ShardedBlockStore::new();
592
593 let data = vec![0u8; 4096];
595 let shard_ref = store.write_block(1, &data).unwrap();
596
597 assert!(shard_ref.block_ref.compressed_len < shard_ref.block_ref.original_len);
599
600 let recovered = store.read_block(&shard_ref).unwrap();
602 assert_eq!(recovered, data);
603 }
604
605 #[test]
606 fn test_ref_counting() {
607 let store = ShardedBlockStore::new();
608
609 let data = b"Reference counted block";
610 let shard_ref = store.write_block(1, data).unwrap();
611
612 store.add_ref(&shard_ref);
614 store.add_ref(&shard_ref);
615
616 assert!(!store.release_ref(&shard_ref)); assert!(!store.release_ref(&shard_ref)); assert!(store.release_ref(&shard_ref)); }
621}