sochdb_core/
sharded_block_store.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sharded Block Store (Task 3/7)
16//!
17//! Partitions block storage by hash(file_id) for parallel I/O:
18//! - Each segment has independent file and index
19//! - Lock-free offset allocation via atomic counters
20//! - Per-segment reference counting for GC
21//!
22//! ## Architecture
23//!
24//! ```text
25//! ┌─────────────────────────────────────────────────────────────┐
26//! │                   ShardedBlockStore                         │
27//! │  global_offset: AtomicU64                                   │
28//! │  shard_count: 8 (default, based on CPU cores)              │
29//! └──────────────────────┬──────────────────────────────────────┘
30//!                        │
31//!        ┌───────────────┼───────────────┐
32//!        ▼               ▼               ▼
33//! ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
34//! │  Shard 0    │ │  Shard 1    │ │  Shard N    │
35//! │  file_0.blk │ │  file_1.blk │ │  file_N.blk │
36//! │  index: Map │ │  index: Map │ │  index: Map │
37//! │  refs: Map  │ │  refs: Map  │ │  refs: Map  │
38//! └─────────────┘ └─────────────┘ └─────────────┘
39//! ```
40//!
41//! ## Sharding Algorithm
42//!
43//! Segment assignment: `shard_id = hash(file_id) % shard_count`
44//!
45//! For reads: `shard_id = (block_offset / segment_size) % shard_count`
46//!
47//! ## Throughput Model (Amdahl's Law)
48//!
49//! Speedup(S) = 1 / ((1-p) + p/S)
50//!
51//! With p ≈ 0.95 (fraction parallelizable), S=16 shards → ~10× speedup
52
53use parking_lot::RwLock;
54use std::collections::HashMap;
55use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
56
57use crate::block_storage::{
58    BlockCompression, BlockHeader, BlockRef, is_compressible, is_json_content, is_soch_content,
59};
60use crate::{Result, SochDBError};
61
62/// Default number of shards (based on typical CPU core count)
63const DEFAULT_SHARD_COUNT: usize = 8;
64
65/// Default segment size (64MB per shard)
66const DEFAULT_SEGMENT_SIZE: u64 = 64 * 1024 * 1024;
67
68/// Individual shard with its own storage and index
69pub struct BlockShard {
70    /// Shard ID
71    id: usize,
72    /// Data storage (append-only)
73    data: RwLock<Vec<u8>>,
74    /// Next write offset within this shard
75    next_offset: AtomicU64,
76    /// Block index: local_offset -> BlockRef
77    index: RwLock<HashMap<u64, BlockRef>>,
78    /// Reference counts for GC
79    ref_counts: RwLock<HashMap<u64, AtomicU32>>,
80    /// Bytes written (for stats)
81    bytes_written: AtomicU64,
82    /// Blocks written (for stats)
83    blocks_written: AtomicU64,
84}
85
86impl BlockShard {
87    /// Create a new shard
88    pub fn new(id: usize) -> Self {
89        Self {
90            id,
91            data: RwLock::new(Vec::new()),
92            next_offset: AtomicU64::new(0),
93            index: RwLock::new(HashMap::new()),
94            ref_counts: RwLock::new(HashMap::new()),
95            bytes_written: AtomicU64::new(0),
96            blocks_written: AtomicU64::new(0),
97        }
98    }
99
100    /// Write a block to this shard
101    pub fn write_block(&self, data: &[u8], compression: BlockCompression) -> Result<BlockRef> {
102        // Compress data
103        let compressed = self.compress(data, compression)?;
104
105        // Calculate checksum
106        let checksum = crc32fast::hash(&compressed);
107
108        // Allocate offset atomically
109        let header_size = BlockHeader::SIZE;
110        let total_size = header_size + compressed.len();
111        let local_offset = self
112            .next_offset
113            .fetch_add(total_size as u64, Ordering::SeqCst);
114
115        // Create header
116        let header = BlockHeader {
117            magic: BlockHeader::MAGIC,
118            compression: compression as u8,
119            original_size: data.len() as u32,
120            compressed_size: compressed.len() as u32,
121            checksum,
122        };
123
124        // Write to shard storage
125        {
126            let mut store = self.data.write();
127            let required_size = (local_offset + total_size as u64) as usize;
128            if store.len() < required_size {
129                store.resize(required_size, 0);
130            }
131
132            // Write header
133            let header_bytes = header.to_bytes();
134            store[local_offset as usize..local_offset as usize + header_size]
135                .copy_from_slice(&header_bytes);
136
137            // Write data
138            store[local_offset as usize + header_size..local_offset as usize + total_size]
139                .copy_from_slice(&compressed);
140        }
141
142        // Create block reference with shard-local offset
143        let block_ref = BlockRef {
144            store_offset: local_offset,
145            compressed_len: compressed.len() as u32,
146            original_len: data.len() as u32,
147            compression,
148            checksum,
149        };
150
151        // Update index
152        self.index.write().insert(local_offset, block_ref.clone());
153
154        // Initialize ref count
155        self.ref_counts
156            .write()
157            .insert(local_offset, AtomicU32::new(1));
158
159        // Update stats
160        self.bytes_written
161            .fetch_add(total_size as u64, Ordering::Relaxed);
162        self.blocks_written.fetch_add(1, Ordering::Relaxed);
163
164        Ok(block_ref)
165    }
166
167    /// Read a block from this shard
168    pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
169        let offset = block_ref.store_offset as usize;
170        let header_size = BlockHeader::SIZE;
171        let total_size = header_size + block_ref.compressed_len as usize;
172
173        // Read from shard storage
174        let compressed = {
175            let store = self.data.read();
176            if offset + total_size > store.len() {
177                return Err(SochDBError::Corruption(format!(
178                    "Block at offset {} extends beyond shard {} data (size {})",
179                    offset,
180                    self.id,
181                    store.len()
182                )));
183            }
184
185            // Verify header
186            let header = BlockHeader::from_bytes(&store[offset..offset + header_size])?;
187
188            // Verify checksum matches
189            if header.checksum != block_ref.checksum {
190                return Err(SochDBError::Corruption(format!(
191                    "Checksum mismatch in shard {}: expected {}, got {}",
192                    self.id, block_ref.checksum, header.checksum
193                )));
194            }
195
196            store[offset + header_size..offset + total_size].to_vec()
197        };
198
199        // Verify data checksum
200        let computed_checksum = crc32fast::hash(&compressed);
201        if computed_checksum != block_ref.checksum {
202            return Err(SochDBError::Corruption(format!(
203                "Data checksum mismatch in shard {}: expected {}, got {}",
204                self.id, block_ref.checksum, computed_checksum
205            )));
206        }
207
208        // Decompress
209        self.decompress(
210            &compressed,
211            block_ref.compression,
212            block_ref.original_len as usize,
213        )
214    }
215
216    /// Increment reference count
217    pub fn add_ref(&self, offset: u64) {
218        let refs = self.ref_counts.read();
219        if let Some(count) = refs.get(&offset) {
220            count.fetch_add(1, Ordering::Relaxed);
221        }
222    }
223
224    /// Decrement reference count, returns true if block can be reclaimed
225    pub fn release_ref(&self, offset: u64) -> bool {
226        let refs = self.ref_counts.read();
227        if let Some(count) = refs.get(&offset) {
228            let prev = count.fetch_sub(1, Ordering::Relaxed);
229            return prev == 1; // Was 1, now 0
230        }
231        false
232    }
233
234    /// Get shard statistics
235    pub fn stats(&self) -> ShardStats {
236        let index = self.index.read();
237        let mut total_original = 0u64;
238        let mut total_compressed = 0u64;
239
240        for block_ref in index.values() {
241            total_original += block_ref.original_len as u64;
242            total_compressed += block_ref.compressed_len as u64;
243        }
244
245        ShardStats {
246            shard_id: self.id,
247            block_count: index.len(),
248            bytes_written: self.bytes_written.load(Ordering::Relaxed),
249            total_original_bytes: total_original,
250            total_compressed_bytes: total_compressed,
251        }
252    }
253
254    /// Compress data with fallback
255    fn compress(&self, data: &[u8], compression: BlockCompression) -> Result<Vec<u8>> {
256        match compression {
257            BlockCompression::None => Ok(data.to_vec()),
258            BlockCompression::Lz4 => match lz4::block::compress(data, None, false) {
259                Ok(compressed) if compressed.len() < data.len() => Ok(compressed),
260                _ => Ok(data.to_vec()),
261            },
262            BlockCompression::Zstd => match zstd::encode_all(data, 3) {
263                Ok(compressed) if compressed.len() < data.len() => Ok(compressed),
264                _ => Ok(data.to_vec()),
265            },
266        }
267    }
268
269    /// Decompress data
270    fn decompress(
271        &self,
272        data: &[u8],
273        compression: BlockCompression,
274        original_size: usize,
275    ) -> Result<Vec<u8>> {
276        match compression {
277            BlockCompression::None => Ok(data.to_vec()),
278            BlockCompression::Lz4 => {
279                if data.len() == original_size {
280                    return Ok(data.to_vec());
281                }
282                lz4::block::decompress(data, Some(original_size as i32))
283                    .map_err(|e| SochDBError::Corruption(format!("LZ4 decompress failed: {}", e)))
284            }
285            BlockCompression::Zstd => {
286                if data.len() == original_size {
287                    return Ok(data.to_vec());
288                }
289                zstd::decode_all(data)
290                    .map_err(|e| SochDBError::Corruption(format!("ZSTD decompress failed: {}", e)))
291            }
292        }
293    }
294}
295
296/// Statistics for a single shard
297#[derive(Debug, Clone)]
298pub struct ShardStats {
299    pub shard_id: usize,
300    pub block_count: usize,
301    pub bytes_written: u64,
302    pub total_original_bytes: u64,
303    pub total_compressed_bytes: u64,
304}
305
306/// Sharded block store for parallel I/O
307pub struct ShardedBlockStore {
308    /// Individual shards
309    shards: Vec<BlockShard>,
310    /// Number of shards
311    shard_count: usize,
312    /// Segment size for offset-based shard lookup
313    #[allow(dead_code)]
314    segment_size: u64,
315    /// Global write counter (for stats)
316    total_writes: AtomicU64,
317}
318
319impl ShardedBlockStore {
320    /// Create a new sharded block store with default settings
321    pub fn new() -> Self {
322        Self::with_shards(DEFAULT_SHARD_COUNT)
323    }
324
325    /// Create with specific number of shards
326    pub fn with_shards(shard_count: usize) -> Self {
327        let shards = (0..shard_count).map(BlockShard::new).collect();
328
329        Self {
330            shards,
331            shard_count,
332            segment_size: DEFAULT_SEGMENT_SIZE,
333            total_writes: AtomicU64::new(0),
334        }
335    }
336
337    /// Get shard for a file ID (for writes)
338    #[inline]
339    fn shard_for_file(&self, file_id: u64) -> usize {
340        // Use FxHash-style mixing for fast hashing
341        let mut h = file_id;
342        h ^= h >> 33;
343        h = h.wrapping_mul(0xff51afd7ed558ccd);
344        h ^= h >> 33;
345        h = h.wrapping_mul(0xc4ceb9fe1a85ec53);
346        h ^= h >> 33;
347        (h as usize) % self.shard_count
348    }
349
350    /// Get shard for an offset (for reads)
351    #[inline]
352    #[allow(dead_code)]
353    fn shard_for_offset(&self, offset: u64) -> usize {
354        ((offset / self.segment_size) as usize) % self.shard_count
355    }
356
357    /// Write a block for a specific file
358    pub fn write_block(&self, file_id: u64, data: &[u8]) -> Result<ShardedBlockRef> {
359        let shard_id = self.shard_for_file(file_id);
360        let compression = self.select_compression(data);
361
362        let block_ref = self.shards[shard_id].write_block(data, compression)?;
363
364        self.total_writes.fetch_add(1, Ordering::Relaxed);
365
366        Ok(ShardedBlockRef {
367            shard_id,
368            block_ref,
369        })
370    }
371
372    /// Write a block with specific compression
373    pub fn write_block_with_compression(
374        &self,
375        file_id: u64,
376        data: &[u8],
377        compression: BlockCompression,
378    ) -> Result<ShardedBlockRef> {
379        let shard_id = self.shard_for_file(file_id);
380        let block_ref = self.shards[shard_id].write_block(data, compression)?;
381
382        self.total_writes.fetch_add(1, Ordering::Relaxed);
383
384        Ok(ShardedBlockRef {
385            shard_id,
386            block_ref,
387        })
388    }
389
390    /// Read a block
391    pub fn read_block(&self, shard_ref: &ShardedBlockRef) -> Result<Vec<u8>> {
392        if shard_ref.shard_id >= self.shard_count {
393            return Err(SochDBError::Corruption(format!(
394                "Invalid shard ID: {} (max {})",
395                shard_ref.shard_id,
396                self.shard_count - 1
397            )));
398        }
399        self.shards[shard_ref.shard_id].read_block(&shard_ref.block_ref)
400    }
401
402    /// Increment reference count
403    pub fn add_ref(&self, shard_ref: &ShardedBlockRef) {
404        if shard_ref.shard_id < self.shard_count {
405            self.shards[shard_ref.shard_id].add_ref(shard_ref.block_ref.store_offset);
406        }
407    }
408
409    /// Decrement reference count
410    pub fn release_ref(&self, shard_ref: &ShardedBlockRef) -> bool {
411        if shard_ref.shard_id < self.shard_count {
412            self.shards[shard_ref.shard_id].release_ref(shard_ref.block_ref.store_offset)
413        } else {
414            false
415        }
416    }
417
418    /// Get aggregate statistics
419    pub fn stats(&self) -> ShardedBlockStoreStats {
420        let shard_stats: Vec<ShardStats> = self.shards.iter().map(|s| s.stats()).collect();
421
422        let total_blocks: usize = shard_stats.iter().map(|s| s.block_count).sum();
423        let total_bytes: u64 = shard_stats.iter().map(|s| s.bytes_written).sum();
424        let total_original: u64 = shard_stats.iter().map(|s| s.total_original_bytes).sum();
425        let total_compressed: u64 = shard_stats.iter().map(|s| s.total_compressed_bytes).sum();
426
427        ShardedBlockStoreStats {
428            shard_count: self.shard_count,
429            total_blocks,
430            total_bytes_written: total_bytes,
431            total_original_bytes: total_original,
432            total_compressed_bytes: total_compressed,
433            compression_ratio: if total_compressed > 0 {
434                total_original as f64 / total_compressed as f64
435            } else {
436                1.0
437            },
438            shard_stats,
439        }
440    }
441
442    /// Select compression based on data content
443    fn select_compression(&self, data: &[u8]) -> BlockCompression {
444        if data.len() < 128 {
445            return BlockCompression::None;
446        }
447
448        if is_soch_content(data) {
449            BlockCompression::Zstd
450        } else if is_json_content(data) || is_compressible(data) {
451            BlockCompression::Lz4
452        } else {
453            BlockCompression::None
454        }
455    }
456}
457
458impl Default for ShardedBlockStore {
459    fn default() -> Self {
460        Self::new()
461    }
462}
463
464/// Reference to a block in a specific shard
465#[derive(Debug, Clone)]
466pub struct ShardedBlockRef {
467    /// Which shard contains this block
468    pub shard_id: usize,
469    /// The block reference within that shard
470    pub block_ref: BlockRef,
471}
472
473impl ShardedBlockRef {
474    /// Serialize to bytes
475    pub fn to_bytes(&self) -> Vec<u8> {
476        let mut buf = Vec::with_capacity(4 + 21); // shard_id (u32) + BlockRef
477        buf.extend(&(self.shard_id as u32).to_le_bytes());
478        buf.extend(&self.block_ref.to_bytes().unwrap_or([0u8; 21]));
479        buf
480    }
481
482    /// Deserialize from bytes
483    pub fn from_bytes(data: &[u8]) -> Result<Self> {
484        if data.len() < 25 {
485            return Err(SochDBError::Corruption("ShardedBlockRef too short".into()));
486        }
487        let shard_id = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
488        let block_ref = BlockRef::from_bytes(&data[4..])?;
489        Ok(Self {
490            shard_id,
491            block_ref,
492        })
493    }
494}
495
496/// Statistics for the sharded block store
497#[derive(Debug, Clone)]
498pub struct ShardedBlockStoreStats {
499    pub shard_count: usize,
500    pub total_blocks: usize,
501    pub total_bytes_written: u64,
502    pub total_original_bytes: u64,
503    pub total_compressed_bytes: u64,
504    pub compression_ratio: f64,
505    pub shard_stats: Vec<ShardStats>,
506}
507
508#[cfg(test)]
509mod tests {
510    use super::*;
511
512    #[test]
513    fn test_sharded_store_basic() {
514        let store = ShardedBlockStore::new();
515
516        let data = b"Hello, sharded world!";
517        let shard_ref = store.write_block(1, data).unwrap();
518
519        let recovered = store.read_block(&shard_ref).unwrap();
520        assert_eq!(recovered, data);
521    }
522
523    #[test]
524    fn test_sharded_store_multiple_files() {
525        let store = ShardedBlockStore::new();
526
527        // Write blocks for different files
528        let mut refs = Vec::new();
529        for file_id in 0..100u64 {
530            let data = format!("Data for file {}", file_id).into_bytes();
531            let shard_ref = store.write_block(file_id, &data).unwrap();
532            refs.push((file_id, shard_ref, data));
533        }
534
535        // Verify all blocks can be read
536        for (file_id, shard_ref, expected) in refs {
537            let recovered = store.read_block(&shard_ref).unwrap();
538            assert_eq!(recovered, expected, "File {} mismatch", file_id);
539        }
540    }
541
542    #[test]
543    fn test_sharded_store_distribution() {
544        let store = ShardedBlockStore::with_shards(4);
545
546        // Write many blocks and check distribution
547        for i in 0..1000u64 {
548            let data = vec![i as u8; 64];
549            store.write_block(i, &data).unwrap();
550        }
551
552        let stats = store.stats();
553
554        // Check that blocks are distributed across shards
555        for shard_stat in &stats.shard_stats {
556            // Each shard should have some blocks (probabilistic)
557            assert!(
558                shard_stat.block_count > 0,
559                "Shard {} has no blocks",
560                shard_stat.shard_id
561            );
562        }
563
564        // Total should be 1000
565        assert_eq!(stats.total_blocks, 1000);
566    }
567
568    #[test]
569    fn test_sharded_ref_serialization() {
570        let shard_ref = ShardedBlockRef {
571            shard_id: 3,
572            block_ref: BlockRef {
573                store_offset: 12345,
574                compressed_len: 100,
575                original_len: 200,
576                compression: BlockCompression::Lz4,
577                checksum: 0xDEADBEEF,
578            },
579        };
580
581        let bytes = shard_ref.to_bytes();
582        let recovered = ShardedBlockRef::from_bytes(&bytes).unwrap();
583
584        assert_eq!(recovered.shard_id, 3);
585        assert_eq!(recovered.block_ref.store_offset, 12345);
586        assert_eq!(recovered.block_ref.compression, BlockCompression::Lz4);
587    }
588
589    #[test]
590    fn test_sharded_store_compression() {
591        let store = ShardedBlockStore::new();
592
593        // Compressible data
594        let data = vec![0u8; 4096];
595        let shard_ref = store.write_block(1, &data).unwrap();
596
597        // Verify compression happened
598        assert!(shard_ref.block_ref.compressed_len < shard_ref.block_ref.original_len);
599
600        // Verify data is correct
601        let recovered = store.read_block(&shard_ref).unwrap();
602        assert_eq!(recovered, data);
603    }
604
605    #[test]
606    fn test_ref_counting() {
607        let store = ShardedBlockStore::new();
608
609        let data = b"Reference counted block";
610        let shard_ref = store.write_block(1, data).unwrap();
611
612        // Add references
613        store.add_ref(&shard_ref);
614        store.add_ref(&shard_ref);
615
616        // Release references
617        assert!(!store.release_ref(&shard_ref)); // ref count: 2
618        assert!(!store.release_ref(&shard_ref)); // ref count: 1
619        assert!(store.release_ref(&shard_ref)); // ref count: 0, can reclaim
620    }
621}