sochdb_storage/
object_store_tier.rs1use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use thiserror::Error;
10
11#[derive(Debug, Error)]
12pub enum ObjectStoreError {
13 #[error("segment not found: {0}")]
14 SegmentNotFound(String),
15 #[error("manifest error: {0}")]
16 Manifest(String),
17 #[error("io error: {0}")]
18 Io(#[from] std::io::Error),
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SegmentDescriptor {
23 pub segment_id: String,
24 pub object_uri: String,
25 pub checksum: u64,
26 pub doc_count: u64,
27 pub byte_size: u64,
28 pub sealed_at: u64,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ObjectStoreTierConfig {
33 pub bucket_uri: String,
34 pub local_cache_dir: PathBuf,
35 pub seal_threshold_docs: usize,
36 pub max_cache_segments: usize,
37}
38
39impl Default for ObjectStoreTierConfig {
40 fn default() -> Self {
41 Self {
42 bucket_uri: "s3://sochdb-segments".to_string(),
43 local_cache_dir: PathBuf::from("./sochdb-cache"),
44 seal_threshold_docs: 10_000,
45 max_cache_segments: 64,
46 }
47 }
48}
49
50pub struct ObjectStoreTier {
52 config: ObjectStoreTierConfig,
53 delta_docs: HashMap<u64, String>,
54 sealed_segments: Vec<SegmentDescriptor>,
55 cache_hits: u64,
56 cache_misses: u64,
57}
58
59impl ObjectStoreTier {
60 pub fn new(config: ObjectStoreTierConfig) -> Self {
61 Self {
62 config,
63 delta_docs: HashMap::new(),
64 sealed_segments: Vec::new(),
65 cache_hits: 0,
66 cache_misses: 0,
67 }
68 }
69
70 pub fn insert_delta(&mut self, doc_id: u64, text: String) {
71 self.delta_docs.insert(doc_id, text);
72 if self.delta_docs.len() >= self.config.seal_threshold_docs {
73 let _ = self.seal_current_delta();
74 }
75 }
76
77 pub fn seal_current_delta(&mut self) -> Result<SegmentDescriptor, ObjectStoreError> {
78 if self.delta_docs.is_empty() {
79 return Err(ObjectStoreError::SegmentNotFound("empty delta".into()));
80 }
81 let segment_id = format!("seg-{}", self.sealed_segments.len());
82 let doc_count = self.delta_docs.len() as u64;
83 let byte_size: u64 = self.delta_docs.values().map(|t| t.len() as u64).sum();
84 let mut payload_bytes = Vec::new();
85 for (k, v) in &self.delta_docs {
86 payload_bytes.extend_from_slice(format!("{k}:{v}").as_bytes());
87 }
88 let checksum = crc32fast::hash(&payload_bytes) as u64;
89
90 let desc = SegmentDescriptor {
91 segment_id: segment_id.clone(),
92 object_uri: format!("{}/{}", self.config.bucket_uri, segment_id),
93 checksum,
94 doc_count,
95 byte_size,
96 sealed_at: std::time::SystemTime::now()
97 .duration_since(std::time::UNIX_EPOCH)
98 .map(|d| d.as_secs())
99 .unwrap_or(0),
100 };
101
102 std::fs::create_dir_all(&self.config.local_cache_dir)?;
104 let cache_path = self
105 .config
106 .local_cache_dir
107 .join(format!("{segment_id}.seg"));
108 let payload = serde_json::to_string(&self.delta_docs).map_err(|e| {
109 ObjectStoreError::Manifest(e.to_string())
110 })?;
111 std::fs::write(&cache_path, payload)?;
112
113 self.sealed_segments.push(desc.clone());
114 self.delta_docs.clear();
115 Ok(desc)
116 }
117
118 pub fn hydrate_from_manifest(&mut self, manifest_path: &Path) -> Result<(), ObjectStoreError> {
119 if !manifest_path.exists() {
120 return Ok(());
121 }
122 let data = std::fs::read_to_string(manifest_path)?;
123 let segments: Vec<SegmentDescriptor> = serde_json::from_str(&data)
124 .map_err(|e| ObjectStoreError::Manifest(e.to_string()))?;
125 self.sealed_segments = segments;
126 Ok(())
127 }
128
129 pub fn lookup(&mut self, doc_id: u64) -> Option<String> {
130 if let Some(t) = self.delta_docs.get(&doc_id) {
131 self.cache_hits += 1;
132 return Some(t.clone());
133 }
134 for seg in &self.sealed_segments {
135 let cache_path = self
136 .config
137 .local_cache_dir
138 .join(format!("{}.seg", seg.segment_id));
139 if cache_path.exists() {
140 if let Ok(data) = std::fs::read_to_string(&cache_path) {
141 if let Ok(map) = serde_json::from_str::<HashMap<u64, String>>(&data) {
142 if let Some(t) = map.get(&doc_id) {
143 self.cache_hits += 1;
144 return Some(t.clone());
145 }
146 }
147 }
148 }
149 self.cache_misses += 1;
150 }
151 None
152 }
153
154 pub fn stats(&self) -> (u64, u64, usize, usize) {
155 (
156 self.cache_hits,
157 self.cache_misses,
158 self.delta_docs.len(),
159 self.sealed_segments.len(),
160 )
161 }
162}