Skip to main content

sochdb_core/
sharded_block_store.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Sharded Block Store (Task 3/7)
19//!
20//! Partitions block storage by hash(file_id) for parallel I/O:
21//! - Each segment has independent file and index
22//! - Lock-free offset allocation via atomic counters
23//! - Per-segment reference counting for GC
24//!
25//! ## Architecture
26//!
27//! ```text
28//! ┌─────────────────────────────────────────────────────────────┐
29//! │                   ShardedBlockStore                         │
30//! │  global_offset: AtomicU64                                   │
31//! │  shard_count: 8 (default, based on CPU cores)              │
32//! └──────────────────────┬──────────────────────────────────────┘
33//!                        │
34//!        ┌───────────────┼───────────────┐
35//!        ▼               ▼               ▼
36//! ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
37//! │  Shard 0    │ │  Shard 1    │ │  Shard N    │
38//! │  file_0.blk │ │  file_1.blk │ │  file_N.blk │
39//! │  index: Map │ │  index: Map │ │  index: Map │
40//! │  refs: Map  │ │  refs: Map  │ │  refs: Map  │
41//! └─────────────┘ └─────────────┘ └─────────────┘
42//! ```
43//!
44//! ## Sharding Algorithm
45//!
46//! Segment assignment: `shard_id = hash(file_id) % shard_count`
47//!
48//! For reads: `shard_id = (block_offset / segment_size) % shard_count`
49//!
50//! ## Throughput Model (Amdahl's Law)
51//!
52//! Speedup(S) = 1 / ((1-p) + p/S)
53//!
54//! With p ≈ 0.95 (fraction parallelizable), S=16 shards → ~10× speedup
55
56use parking_lot::RwLock;
57use std::collections::HashMap;
58use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
59
60use crate::block_storage::{
61    BlockCompression, BlockHeader, BlockRef, is_compressible, is_json_content, is_soch_content,
62};
63use crate::{Result, SochDBError};
64
65/// Default number of shards (based on typical CPU core count)
66const DEFAULT_SHARD_COUNT: usize = 8;
67
68/// Default segment size (64MB per shard)
69const DEFAULT_SEGMENT_SIZE: u64 = 64 * 1024 * 1024;
70
71/// Individual shard with its own storage and index
72pub struct BlockShard {
73    /// Shard ID
74    id: usize,
75    /// Data storage (append-only)
76    data: RwLock<Vec<u8>>,
77    /// Next write offset within this shard
78    next_offset: AtomicU64,
79    /// Block index: local_offset -> BlockRef
80    index: RwLock<HashMap<u64, BlockRef>>,
81    /// Reference counts for GC
82    ref_counts: RwLock<HashMap<u64, AtomicU32>>,
83    /// Bytes written (for stats)
84    bytes_written: AtomicU64,
85    /// Blocks written (for stats)
86    blocks_written: AtomicU64,
87}
88
89impl BlockShard {
90    /// Create a new shard
91    pub fn new(id: usize) -> Self {
92        Self {
93            id,
94            data: RwLock::new(Vec::new()),
95            next_offset: AtomicU64::new(0),
96            index: RwLock::new(HashMap::new()),
97            ref_counts: RwLock::new(HashMap::new()),
98            bytes_written: AtomicU64::new(0),
99            blocks_written: AtomicU64::new(0),
100        }
101    }
102
103    /// Write a block to this shard
104    pub fn write_block(&self, data: &[u8], compression: BlockCompression) -> Result<BlockRef> {
105        // Compress data
106        let compressed = self.compress(data, compression)?;
107
108        // Calculate checksum
109        let checksum = crc32fast::hash(&compressed);
110
111        // Allocate offset atomically
112        let header_size = BlockHeader::SIZE;
113        let total_size = header_size + compressed.len();
114        let local_offset = self
115            .next_offset
116            .fetch_add(total_size as u64, Ordering::SeqCst);
117
118        // Create header
119        let header = BlockHeader {
120            magic: BlockHeader::MAGIC,
121            compression: compression as u8,
122            original_size: data.len() as u32,
123            compressed_size: compressed.len() as u32,
124            checksum,
125        };
126
127        // Write to shard storage
128        {
129            let mut store = self.data.write();
130            let required_size = (local_offset + total_size as u64) as usize;
131            if store.len() < required_size {
132                store.resize(required_size, 0);
133            }
134
135            // Write header
136            let header_bytes = header.to_bytes();
137            store[local_offset as usize..local_offset as usize + header_size]
138                .copy_from_slice(&header_bytes);
139
140            // Write data
141            store[local_offset as usize + header_size..local_offset as usize + total_size]
142                .copy_from_slice(&compressed);
143        }
144
145        // Create block reference with shard-local offset
146        let block_ref = BlockRef {
147            store_offset: local_offset,
148            compressed_len: compressed.len() as u32,
149            original_len: data.len() as u32,
150            compression,
151            checksum,
152        };
153
154        // Update index
155        self.index.write().insert(local_offset, block_ref.clone());
156
157        // Initialize ref count
158        self.ref_counts
159            .write()
160            .insert(local_offset, AtomicU32::new(1));
161
162        // Update stats
163        self.bytes_written
164            .fetch_add(total_size as u64, Ordering::Relaxed);
165        self.blocks_written.fetch_add(1, Ordering::Relaxed);
166
167        Ok(block_ref)
168    }
169
170    /// Read a block from this shard
171    pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
172        let offset = block_ref.store_offset as usize;
173        let header_size = BlockHeader::SIZE;
174        let total_size = header_size + block_ref.compressed_len as usize;
175
176        // Read from shard storage
177        let compressed = {
178            let store = self.data.read();
179            if offset + total_size > store.len() {
180                return Err(SochDBError::Corruption(format!(
181                    "Block at offset {} extends beyond shard {} data (size {})",
182                    offset,
183                    self.id,
184                    store.len()
185                )));
186            }
187
188            // Verify header
189            let header = BlockHeader::from_bytes(&store[offset..offset + header_size])?;
190
191            // Verify checksum matches
192            if header.checksum != block_ref.checksum {
193                return Err(SochDBError::Corruption(format!(
194                    "Checksum mismatch in shard {}: expected {}, got {}",
195                    self.id, block_ref.checksum, header.checksum
196                )));
197            }
198
199            store[offset + header_size..offset + total_size].to_vec()
200        };
201
202        // Verify data checksum
203        let computed_checksum = crc32fast::hash(&compressed);
204        if computed_checksum != block_ref.checksum {
205            return Err(SochDBError::Corruption(format!(
206                "Data checksum mismatch in shard {}: expected {}, got {}",
207                self.id, block_ref.checksum, computed_checksum
208            )));
209        }
210
211        // Decompress
212        self.decompress(
213            &compressed,
214            block_ref.compression,
215            block_ref.original_len as usize,
216        )
217    }
218
219    /// Increment reference count
220    pub fn add_ref(&self, offset: u64) {
221        let refs = self.ref_counts.read();
222        if let Some(count) = refs.get(&offset) {
223            count.fetch_add(1, Ordering::Relaxed);
224        }
225    }
226
227    /// Decrement reference count, returns true if block can be reclaimed
228    pub fn release_ref(&self, offset: u64) -> bool {
229        let refs = self.ref_counts.read();
230        if let Some(count) = refs.get(&offset) {
231            let prev = count.fetch_sub(1, Ordering::Relaxed);
232            return prev == 1; // Was 1, now 0
233        }
234        false
235    }
236
237    /// Get shard statistics
238    pub fn stats(&self) -> ShardStats {
239        let index = self.index.read();
240        let mut total_original = 0u64;
241        let mut total_compressed = 0u64;
242
243        for block_ref in index.values() {
244            total_original += block_ref.original_len as u64;
245            total_compressed += block_ref.compressed_len as u64;
246        }
247
248        ShardStats {
249            shard_id: self.id,
250            block_count: index.len(),
251            bytes_written: self.bytes_written.load(Ordering::Relaxed),
252            total_original_bytes: total_original,
253            total_compressed_bytes: total_compressed,
254        }
255    }
256
257    /// Compress data with fallback
258    fn compress(&self, data: &[u8], compression: BlockCompression) -> Result<Vec<u8>> {
259        match compression {
260            BlockCompression::None => Ok(data.to_vec()),
261            BlockCompression::Lz4 => match lz4::block::compress(data, None, false) {
262                Ok(compressed) if compressed.len() < data.len() => Ok(compressed),
263                _ => Ok(data.to_vec()),
264            },
265            BlockCompression::Zstd => match zstd::encode_all(data, 3) {
266                Ok(compressed) if compressed.len() < data.len() => Ok(compressed),
267                _ => Ok(data.to_vec()),
268            },
269        }
270    }
271
272    /// Decompress data
273    fn decompress(
274        &self,
275        data: &[u8],
276        compression: BlockCompression,
277        original_size: usize,
278    ) -> Result<Vec<u8>> {
279        match compression {
280            BlockCompression::None => Ok(data.to_vec()),
281            BlockCompression::Lz4 => {
282                if data.len() == original_size {
283                    return Ok(data.to_vec());
284                }
285                lz4::block::decompress(data, Some(original_size as i32))
286                    .map_err(|e| SochDBError::Corruption(format!("LZ4 decompress failed: {}", e)))
287            }
288            BlockCompression::Zstd => {
289                if data.len() == original_size {
290                    return Ok(data.to_vec());
291                }
292                zstd::decode_all(data)
293                    .map_err(|e| SochDBError::Corruption(format!("ZSTD decompress failed: {}", e)))
294            }
295        }
296    }
297}
298
299/// Statistics for a single shard
300#[derive(Debug, Clone)]
301pub struct ShardStats {
302    pub shard_id: usize,
303    pub block_count: usize,
304    pub bytes_written: u64,
305    pub total_original_bytes: u64,
306    pub total_compressed_bytes: u64,
307}
308
309/// Sharded block store for parallel I/O
310pub struct ShardedBlockStore {
311    /// Individual shards
312    shards: Vec<BlockShard>,
313    /// Number of shards
314    shard_count: usize,
315    /// Segment size for offset-based shard lookup
316    #[allow(dead_code)]
317    segment_size: u64,
318    /// Global write counter (for stats)
319    total_writes: AtomicU64,
320}
321
322impl ShardedBlockStore {
323    /// Create a new sharded block store with default settings
324    pub fn new() -> Self {
325        Self::with_shards(DEFAULT_SHARD_COUNT)
326    }
327
328    /// Create with specific number of shards
329    pub fn with_shards(shard_count: usize) -> Self {
330        let shards = (0..shard_count).map(BlockShard::new).collect();
331
332        Self {
333            shards,
334            shard_count,
335            segment_size: DEFAULT_SEGMENT_SIZE,
336            total_writes: AtomicU64::new(0),
337        }
338    }
339
340    /// Get shard for a file ID (for writes)
341    #[inline]
342    fn shard_for_file(&self, file_id: u64) -> usize {
343        // Use FxHash-style mixing for fast hashing
344        let mut h = file_id;
345        h ^= h >> 33;
346        h = h.wrapping_mul(0xff51afd7ed558ccd);
347        h ^= h >> 33;
348        h = h.wrapping_mul(0xc4ceb9fe1a85ec53);
349        h ^= h >> 33;
350        (h as usize) % self.shard_count
351    }
352
353    /// Get shard for an offset (for reads)
354    #[inline]
355    #[allow(dead_code)]
356    fn shard_for_offset(&self, offset: u64) -> usize {
357        ((offset / self.segment_size) as usize) % self.shard_count
358    }
359
360    /// Write a block for a specific file
361    pub fn write_block(&self, file_id: u64, data: &[u8]) -> Result<ShardedBlockRef> {
362        let shard_id = self.shard_for_file(file_id);
363        let compression = self.select_compression(data);
364
365        let block_ref = self.shards[shard_id].write_block(data, compression)?;
366
367        self.total_writes.fetch_add(1, Ordering::Relaxed);
368
369        Ok(ShardedBlockRef {
370            shard_id,
371            block_ref,
372        })
373    }
374
375    /// Write a block with specific compression
376    pub fn write_block_with_compression(
377        &self,
378        file_id: u64,
379        data: &[u8],
380        compression: BlockCompression,
381    ) -> Result<ShardedBlockRef> {
382        let shard_id = self.shard_for_file(file_id);
383        let block_ref = self.shards[shard_id].write_block(data, compression)?;
384
385        self.total_writes.fetch_add(1, Ordering::Relaxed);
386
387        Ok(ShardedBlockRef {
388            shard_id,
389            block_ref,
390        })
391    }
392
393    /// Read a block
394    pub fn read_block(&self, shard_ref: &ShardedBlockRef) -> Result<Vec<u8>> {
395        if shard_ref.shard_id >= self.shard_count {
396            return Err(SochDBError::Corruption(format!(
397                "Invalid shard ID: {} (max {})",
398                shard_ref.shard_id,
399                self.shard_count - 1
400            )));
401        }
402        self.shards[shard_ref.shard_id].read_block(&shard_ref.block_ref)
403    }
404
405    /// Increment reference count
406    pub fn add_ref(&self, shard_ref: &ShardedBlockRef) {
407        if shard_ref.shard_id < self.shard_count {
408            self.shards[shard_ref.shard_id].add_ref(shard_ref.block_ref.store_offset);
409        }
410    }
411
412    /// Decrement reference count
413    pub fn release_ref(&self, shard_ref: &ShardedBlockRef) -> bool {
414        if shard_ref.shard_id < self.shard_count {
415            self.shards[shard_ref.shard_id].release_ref(shard_ref.block_ref.store_offset)
416        } else {
417            false
418        }
419    }
420
421    /// Get aggregate statistics
422    pub fn stats(&self) -> ShardedBlockStoreStats {
423        let shard_stats: Vec<ShardStats> = self.shards.iter().map(|s| s.stats()).collect();
424
425        let total_blocks: usize = shard_stats.iter().map(|s| s.block_count).sum();
426        let total_bytes: u64 = shard_stats.iter().map(|s| s.bytes_written).sum();
427        let total_original: u64 = shard_stats.iter().map(|s| s.total_original_bytes).sum();
428        let total_compressed: u64 = shard_stats.iter().map(|s| s.total_compressed_bytes).sum();
429
430        ShardedBlockStoreStats {
431            shard_count: self.shard_count,
432            total_blocks,
433            total_bytes_written: total_bytes,
434            total_original_bytes: total_original,
435            total_compressed_bytes: total_compressed,
436            compression_ratio: if total_compressed > 0 {
437                total_original as f64 / total_compressed as f64
438            } else {
439                1.0
440            },
441            shard_stats,
442        }
443    }
444
445    /// Select compression based on data content
446    fn select_compression(&self, data: &[u8]) -> BlockCompression {
447        if data.len() < 128 {
448            return BlockCompression::None;
449        }
450
451        if is_soch_content(data) {
452            BlockCompression::Zstd
453        } else if is_json_content(data) || is_compressible(data) {
454            BlockCompression::Lz4
455        } else {
456            BlockCompression::None
457        }
458    }
459}
460
461impl Default for ShardedBlockStore {
462    fn default() -> Self {
463        Self::new()
464    }
465}
466
467/// Reference to a block in a specific shard
468#[derive(Debug, Clone)]
469pub struct ShardedBlockRef {
470    /// Which shard contains this block
471    pub shard_id: usize,
472    /// The block reference within that shard
473    pub block_ref: BlockRef,
474}
475
476impl ShardedBlockRef {
477    /// Serialize to bytes
478    pub fn to_bytes(&self) -> Vec<u8> {
479        let mut buf = Vec::with_capacity(4 + 21); // shard_id (u32) + BlockRef
480        buf.extend(&(self.shard_id as u32).to_le_bytes());
481        buf.extend(&self.block_ref.to_bytes().unwrap_or([0u8; 21]));
482        buf
483    }
484
485    /// Deserialize from bytes
486    pub fn from_bytes(data: &[u8]) -> Result<Self> {
487        if data.len() < 25 {
488            return Err(SochDBError::Corruption("ShardedBlockRef too short".into()));
489        }
490        let shard_id = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
491        let block_ref = BlockRef::from_bytes(&data[4..])?;
492        Ok(Self {
493            shard_id,
494            block_ref,
495        })
496    }
497}
498
499/// Statistics for the sharded block store
500#[derive(Debug, Clone)]
501pub struct ShardedBlockStoreStats {
502    pub shard_count: usize,
503    pub total_blocks: usize,
504    pub total_bytes_written: u64,
505    pub total_original_bytes: u64,
506    pub total_compressed_bytes: u64,
507    pub compression_ratio: f64,
508    pub shard_stats: Vec<ShardStats>,
509}
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514
515    #[test]
516    fn test_sharded_store_basic() {
517        let store = ShardedBlockStore::new();
518
519        let data = b"Hello, sharded world!";
520        let shard_ref = store.write_block(1, data).unwrap();
521
522        let recovered = store.read_block(&shard_ref).unwrap();
523        assert_eq!(recovered, data);
524    }
525
526    #[test]
527    fn test_sharded_store_multiple_files() {
528        let store = ShardedBlockStore::new();
529
530        // Write blocks for different files
531        let mut refs = Vec::new();
532        for file_id in 0..100u64 {
533            let data = format!("Data for file {}", file_id).into_bytes();
534            let shard_ref = store.write_block(file_id, &data).unwrap();
535            refs.push((file_id, shard_ref, data));
536        }
537
538        // Verify all blocks can be read
539        for (file_id, shard_ref, expected) in refs {
540            let recovered = store.read_block(&shard_ref).unwrap();
541            assert_eq!(recovered, expected, "File {} mismatch", file_id);
542        }
543    }
544
545    #[test]
546    fn test_sharded_store_distribution() {
547        let store = ShardedBlockStore::with_shards(4);
548
549        // Write many blocks and check distribution
550        for i in 0..1000u64 {
551            let data = vec![i as u8; 64];
552            store.write_block(i, &data).unwrap();
553        }
554
555        let stats = store.stats();
556
557        // Check that blocks are distributed across shards
558        for shard_stat in &stats.shard_stats {
559            // Each shard should have some blocks (probabilistic)
560            assert!(
561                shard_stat.block_count > 0,
562                "Shard {} has no blocks",
563                shard_stat.shard_id
564            );
565        }
566
567        // Total should be 1000
568        assert_eq!(stats.total_blocks, 1000);
569    }
570
571    #[test]
572    fn test_sharded_ref_serialization() {
573        let shard_ref = ShardedBlockRef {
574            shard_id: 3,
575            block_ref: BlockRef {
576                store_offset: 12345,
577                compressed_len: 100,
578                original_len: 200,
579                compression: BlockCompression::Lz4,
580                checksum: 0xDEADBEEF,
581            },
582        };
583
584        let bytes = shard_ref.to_bytes();
585        let recovered = ShardedBlockRef::from_bytes(&bytes).unwrap();
586
587        assert_eq!(recovered.shard_id, 3);
588        assert_eq!(recovered.block_ref.store_offset, 12345);
589        assert_eq!(recovered.block_ref.compression, BlockCompression::Lz4);
590    }
591
592    #[test]
593    fn test_sharded_store_compression() {
594        let store = ShardedBlockStore::new();
595
596        // Compressible data
597        let data = vec![0u8; 4096];
598        let shard_ref = store.write_block(1, &data).unwrap();
599
600        // Verify compression happened
601        assert!(shard_ref.block_ref.compressed_len < shard_ref.block_ref.original_len);
602
603        // Verify data is correct
604        let recovered = store.read_block(&shard_ref).unwrap();
605        assert_eq!(recovered, data);
606    }
607
608    #[test]
609    fn test_ref_counting() {
610        let store = ShardedBlockStore::new();
611
612        let data = b"Reference counted block";
613        let shard_ref = store.write_block(1, data).unwrap();
614
615        // Add references
616        store.add_ref(&shard_ref);
617        store.add_ref(&shard_ref);
618
619        // Release references
620        assert!(!store.release_ref(&shard_ref)); // ref count: 2
621        assert!(!store.release_ref(&shard_ref)); // ref count: 1
622        assert!(store.release_ref(&shard_ref)); // ref count: 0, can reclaim
623    }
624}