Skip to main content

sochdb_storage/
object_store_tier.rs

1//! Object-storage-native cold tier for immutable index segments.
2//!
3//! Delta (in-memory mutable) + sealed immutable segments in S3/GCS with local
4//! NVMe cache. Restart reads manifest only — no full corpus rebuild.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use thiserror::Error;
10
11#[derive(Debug, Error)]
12pub enum ObjectStoreError {
13    #[error("segment not found: {0}")]
14    SegmentNotFound(String),
15    #[error("manifest error: {0}")]
16    Manifest(String),
17    #[error("io error: {0}")]
18    Io(#[from] std::io::Error),
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SegmentDescriptor {
23    pub segment_id: String,
24    pub object_uri: String,
25    pub checksum: u64,
26    pub doc_count: u64,
27    pub byte_size: u64,
28    pub sealed_at: u64,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ObjectStoreTierConfig {
33    pub bucket_uri: String,
34    pub local_cache_dir: PathBuf,
35    pub seal_threshold_docs: usize,
36    pub max_cache_segments: usize,
37}
38
39impl Default for ObjectStoreTierConfig {
40    fn default() -> Self {
41        Self {
42            bucket_uri: "s3://sochdb-segments".to_string(),
43            local_cache_dir: PathBuf::from("./sochdb-cache"),
44            seal_threshold_docs: 10_000,
45            max_cache_segments: 64,
46        }
47    }
48}
49
50/// LSM-style tier: in-memory delta + immutable object-store segments.
51pub struct ObjectStoreTier {
52    config: ObjectStoreTierConfig,
53    delta_docs: HashMap<u64, String>,
54    sealed_segments: Vec<SegmentDescriptor>,
55    cache_hits: u64,
56    cache_misses: u64,
57}
58
59impl ObjectStoreTier {
60    pub fn new(config: ObjectStoreTierConfig) -> Self {
61        Self {
62            config,
63            delta_docs: HashMap::new(),
64            sealed_segments: Vec::new(),
65            cache_hits: 0,
66            cache_misses: 0,
67        }
68    }
69
70    pub fn insert_delta(&mut self, doc_id: u64, text: String) {
71        self.delta_docs.insert(doc_id, text);
72        if self.delta_docs.len() >= self.config.seal_threshold_docs {
73            let _ = self.seal_current_delta();
74        }
75    }
76
77    pub fn seal_current_delta(&mut self) -> Result<SegmentDescriptor, ObjectStoreError> {
78        if self.delta_docs.is_empty() {
79            return Err(ObjectStoreError::SegmentNotFound("empty delta".into()));
80        }
81        let segment_id = format!("seg-{}", self.sealed_segments.len());
82        let doc_count = self.delta_docs.len() as u64;
83        let byte_size: u64 = self.delta_docs.values().map(|t| t.len() as u64).sum();
84        let mut payload_bytes = Vec::new();
85        for (k, v) in &self.delta_docs {
86            payload_bytes.extend_from_slice(format!("{k}:{v}").as_bytes());
87        }
88        let checksum = crc32fast::hash(&payload_bytes) as u64;
89
90        let desc = SegmentDescriptor {
91            segment_id: segment_id.clone(),
92            object_uri: format!("{}/{}", self.config.bucket_uri, segment_id),
93            checksum,
94            doc_count,
95            byte_size,
96            sealed_at: std::time::SystemTime::now()
97                .duration_since(std::time::UNIX_EPOCH)
98                .map(|d| d.as_secs())
99                .unwrap_or(0),
100        };
101
102        // Persist to local cache (object upload would happen async in production)
103        std::fs::create_dir_all(&self.config.local_cache_dir)?;
104        let cache_path = self
105            .config
106            .local_cache_dir
107            .join(format!("{segment_id}.seg"));
108        let payload = serde_json::to_string(&self.delta_docs).map_err(|e| {
109            ObjectStoreError::Manifest(e.to_string())
110        })?;
111        std::fs::write(&cache_path, payload)?;
112
113        self.sealed_segments.push(desc.clone());
114        self.delta_docs.clear();
115        Ok(desc)
116    }
117
118    pub fn hydrate_from_manifest(&mut self, manifest_path: &Path) -> Result<(), ObjectStoreError> {
119        if !manifest_path.exists() {
120            return Ok(());
121        }
122        let data = std::fs::read_to_string(manifest_path)?;
123        let segments: Vec<SegmentDescriptor> = serde_json::from_str(&data)
124            .map_err(|e| ObjectStoreError::Manifest(e.to_string()))?;
125        self.sealed_segments = segments;
126        Ok(())
127    }
128
129    pub fn lookup(&mut self, doc_id: u64) -> Option<String> {
130        if let Some(t) = self.delta_docs.get(&doc_id) {
131            self.cache_hits += 1;
132            return Some(t.clone());
133        }
134        for seg in &self.sealed_segments {
135            let cache_path = self
136                .config
137                .local_cache_dir
138                .join(format!("{}.seg", seg.segment_id));
139            if cache_path.exists() {
140                if let Ok(data) = std::fs::read_to_string(&cache_path) {
141                    if let Ok(map) = serde_json::from_str::<HashMap<u64, String>>(&data) {
142                        if let Some(t) = map.get(&doc_id) {
143                            self.cache_hits += 1;
144                            return Some(t.clone());
145                        }
146                    }
147                }
148            }
149            self.cache_misses += 1;
150        }
151        None
152    }
153
154    pub fn stats(&self) -> (u64, u64, usize, usize) {
155        (
156            self.cache_hits,
157            self.cache_misses,
158            self.delta_docs.len(),
159            self.sealed_segments.len(),
160        )
161    }
162}