Skip to main content

sochdb_storage/
object_store_tier.rs

1//! Object-storage-native cold tier for immutable index segments.
2//!
3//! Delta (in-memory mutable) + sealed immutable segments in S3/GCS with local
4//! NVMe cache. Restart reads manifest only — no full corpus rebuild.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use thiserror::Error;
10
11#[derive(Debug, Error)]
12pub enum ObjectStoreError {
13    #[error("segment not found: {0}")]
14    SegmentNotFound(String),
15    #[error("manifest error: {0}")]
16    Manifest(String),
17    #[error("io error: {0}")]
18    Io(#[from] std::io::Error),
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SegmentDescriptor {
23    pub segment_id: String,
24    pub object_uri: String,
25    pub checksum: u64,
26    pub doc_count: u64,
27    pub byte_size: u64,
28    pub sealed_at: u64,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ObjectStoreTierConfig {
33    pub bucket_uri: String,
34    pub local_cache_dir: PathBuf,
35    pub seal_threshold_docs: usize,
36    pub max_cache_segments: usize,
37}
38
39impl Default for ObjectStoreTierConfig {
40    fn default() -> Self {
41        Self {
42            bucket_uri: "s3://sochdb-segments".to_string(),
43            local_cache_dir: PathBuf::from("./sochdb-cache"),
44            seal_threshold_docs: 10_000,
45            max_cache_segments: 64,
46        }
47    }
48}
49
50/// LSM-style tier: in-memory delta + immutable object-store segments.
51pub struct ObjectStoreTier {
52    config: ObjectStoreTierConfig,
53    delta_docs: HashMap<u64, String>,
54    sealed_segments: Vec<SegmentDescriptor>,
55    cache_hits: u64,
56    cache_misses: u64,
57}
58
59impl ObjectStoreTier {
60    pub fn new(config: ObjectStoreTierConfig) -> Self {
61        Self {
62            config,
63            delta_docs: HashMap::new(),
64            sealed_segments: Vec::new(),
65            cache_hits: 0,
66            cache_misses: 0,
67        }
68    }
69
70    pub fn insert_delta(&mut self, doc_id: u64, text: String) {
71        self.delta_docs.insert(doc_id, text);
72        if self.delta_docs.len() >= self.config.seal_threshold_docs {
73            let _ = self.seal_current_delta();
74        }
75    }
76
77    pub fn seal_current_delta(&mut self) -> Result<SegmentDescriptor, ObjectStoreError> {
78        if self.delta_docs.is_empty() {
79            return Err(ObjectStoreError::SegmentNotFound("empty delta".into()));
80        }
81        let segment_id = format!("seg-{}", self.sealed_segments.len());
82        let doc_count = self.delta_docs.len() as u64;
83        let byte_size: u64 = self.delta_docs.values().map(|t| t.len() as u64).sum();
84        let mut payload_bytes = Vec::new();
85        for (k, v) in &self.delta_docs {
86            payload_bytes.extend_from_slice(format!("{k}:{v}").as_bytes());
87        }
88        let checksum = crc32fast::hash(&payload_bytes) as u64;
89
90        let desc = SegmentDescriptor {
91            segment_id: segment_id.clone(),
92            object_uri: format!("{}/{}", self.config.bucket_uri, segment_id),
93            checksum,
94            doc_count,
95            byte_size,
96            sealed_at: std::time::SystemTime::now()
97                .duration_since(std::time::UNIX_EPOCH)
98                .map(|d| d.as_secs())
99                .unwrap_or(0),
100        };
101
102        // Persist to local cache (object upload would happen async in production)
103        std::fs::create_dir_all(&self.config.local_cache_dir)?;
104        let cache_path = self
105            .config
106            .local_cache_dir
107            .join(format!("{segment_id}.seg"));
108        let payload = serde_json::to_string(&self.delta_docs)
109            .map_err(|e| ObjectStoreError::Manifest(e.to_string()))?;
110        std::fs::write(&cache_path, payload)?;
111
112        self.sealed_segments.push(desc.clone());
113        self.delta_docs.clear();
114        Ok(desc)
115    }
116
117    pub fn hydrate_from_manifest(&mut self, manifest_path: &Path) -> Result<(), ObjectStoreError> {
118        if !manifest_path.exists() {
119            return Ok(());
120        }
121        let data = std::fs::read_to_string(manifest_path)?;
122        let segments: Vec<SegmentDescriptor> =
123            serde_json::from_str(&data).map_err(|e| ObjectStoreError::Manifest(e.to_string()))?;
124        self.sealed_segments = segments;
125        Ok(())
126    }
127
128    pub fn lookup(&mut self, doc_id: u64) -> Option<String> {
129        if let Some(t) = self.delta_docs.get(&doc_id) {
130            self.cache_hits += 1;
131            return Some(t.clone());
132        }
133        for seg in &self.sealed_segments {
134            let cache_path = self
135                .config
136                .local_cache_dir
137                .join(format!("{}.seg", seg.segment_id));
138            if cache_path.exists() {
139                if let Ok(data) = std::fs::read_to_string(&cache_path) {
140                    if let Ok(map) = serde_json::from_str::<HashMap<u64, String>>(&data) {
141                        if let Some(t) = map.get(&doc_id) {
142                            self.cache_hits += 1;
143                            return Some(t.clone());
144                        }
145                    }
146                }
147            }
148            self.cache_misses += 1;
149        }
150        None
151    }
152
153    pub fn stats(&self) -> (u64, u64, usize, usize) {
154        (
155            self.cache_hits,
156            self.cache_misses,
157            self.delta_docs.len(),
158            self.sealed_segments.len(),
159        )
160    }
161}