Skip to main content

silk/
obslog.rs

1//! ObservationLog — append-only, TTL-pruned time-series store (D-025).
2//!
3//! The "log" half of Silk's log/KG duality (SA-014).
4//! Stores raw observations (health checks, metrics, container status).
5//! Local-only — never syncs between instances. TTL-pruned — entries older
6//! than `max_age_secs` are deleted on truncate().
7//!
8//! Backed by redb in a separate file from GraphStore.
9
10use std::collections::BTreeMap;
11use std::path::Path;
12use std::time::{SystemTime, UNIX_EPOCH};
13
14use redb::{Database, ReadableTable, TableDefinition};
15
16/// Observations table: (source, timestamp_ms) → msgpack(value, metadata).
17/// Using a regular table with a compound key encoded as bytes.
18const OBS_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("observations");
19
20/// Meta table for bookkeeping.
21const OBS_META: TableDefinition<&str, &[u8]> = TableDefinition::new("obs_meta");
22
23/// A single observation record.
24#[derive(Debug, Clone)]
25pub struct Observation {
26    pub timestamp_ms: u64,
27    pub source: String,
28    pub value: f64,
29    pub metadata: BTreeMap<String, String>,
30}
31
32/// Serialized form stored in redb value.
33#[derive(serde::Serialize, serde::Deserialize)]
34struct ObsValue {
35    value: f64,
36    metadata: BTreeMap<String, String>,
37}
38
39/// Compound key: source + timestamp_ms, encoded for lexicographic ordering.
40/// Format: [source_len(2 bytes)][source bytes][timestamp_ms(8 bytes BE)]
41/// S-13: returns error instead of silently truncating source names > 65535 bytes.
42fn encode_key(source: &str, timestamp_ms: u64) -> Result<Vec<u8>, ObsLogError> {
43    let src = source.as_bytes();
44    if src.len() > u16::MAX as usize {
45        return Err(ObsLogError::Io(format!(
46            "source name too long: {} bytes (max {})",
47            src.len(),
48            u16::MAX
49        )));
50    }
51    let mut key = Vec::with_capacity(2 + src.len() + 8);
52    key.extend_from_slice(&(src.len() as u16).to_be_bytes());
53    key.extend_from_slice(src);
54    key.extend_from_slice(&timestamp_ms.to_be_bytes());
55    Ok(key)
56}
57
58/// Decode source and timestamp from a compound key.
59fn decode_key(key: &[u8]) -> Option<(String, u64)> {
60    if key.len() < 10 {
61        return None;
62    }
63    let src_len = u16::from_be_bytes([key[0], key[1]]) as usize;
64    if key.len() < 2 + src_len + 8 {
65        return None;
66    }
67    let source = String::from_utf8_lossy(&key[2..2 + src_len]).to_string();
68    let ts_bytes: [u8; 8] = key[2 + src_len..2 + src_len + 8].try_into().ok()?;
69    let timestamp_ms = u64::from_be_bytes(ts_bytes);
70    Some((source, timestamp_ms))
71}
72
73fn now_ms() -> u64 {
74    SystemTime::now()
75        .duration_since(UNIX_EPOCH)
76        .unwrap_or_default()
77        .as_millis() as u64
78}
79
80#[derive(Debug)]
81pub enum ObsLogError {
82    Io(String),
83    Serialization(String),
84}
85
86impl std::fmt::Display for ObsLogError {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        match self {
89            ObsLogError::Io(s) => write!(f, "ObsLog I/O error: {}", s),
90            ObsLogError::Serialization(s) => write!(f, "ObsLog serialization error: {}", s),
91        }
92    }
93}
94
95impl std::error::Error for ObsLogError {}
96
97/// Append-only, TTL-pruned observation log backed by redb.
98pub struct ObservationLog {
99    db: Database,
100    pub max_age_secs: u64,
101}
102
103impl ObservationLog {
104    /// Open or create an observation log at the given path.
105    pub fn open(path: &Path, max_age_secs: u64) -> Result<Self, ObsLogError> {
106        let db = Database::create(path).map_err(|e| ObsLogError::Io(e.to_string()))?;
107
108        // S-09: restrict file permissions to owner-only on Unix
109        #[cfg(unix)]
110        {
111            use std::os::unix::fs::PermissionsExt;
112            let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
113        }
114
115        // Ensure tables exist.
116        {
117            let txn = db
118                .begin_write()
119                .map_err(|e| ObsLogError::Io(e.to_string()))?;
120            {
121                let _t = txn
122                    .open_table(OBS_TABLE)
123                    .map_err(|e| ObsLogError::Io(e.to_string()))?;
124                let _m = txn
125                    .open_table(OBS_META)
126                    .map_err(|e| ObsLogError::Io(e.to_string()))?;
127            }
128            txn.commit().map_err(|e| ObsLogError::Io(e.to_string()))?;
129        }
130
131        Ok(Self { db, max_age_secs })
132    }
133
134    /// Append a single observation.
135    pub fn append(
136        &self,
137        source: &str,
138        value: f64,
139        metadata: BTreeMap<String, String>,
140    ) -> Result<(), ObsLogError> {
141        let ts = now_ms();
142        let key = encode_key(source, ts)?;
143        let obs = ObsValue { value, metadata };
144        let val = rmp_serde::to_vec(&obs).map_err(|e| ObsLogError::Serialization(e.to_string()))?;
145
146        let txn = self
147            .db
148            .begin_write()
149            .map_err(|e| ObsLogError::Io(e.to_string()))?;
150        {
151            let mut table = txn
152                .open_table(OBS_TABLE)
153                .map_err(|e| ObsLogError::Io(e.to_string()))?;
154            table
155                .insert(key.as_slice(), val.as_slice())
156                .map_err(|e| ObsLogError::Io(e.to_string()))?;
157        }
158        txn.commit().map_err(|e| ObsLogError::Io(e.to_string()))?;
159        Ok(())
160    }
161
162    /// Append a batch of observations in a single transaction.
163    pub fn append_batch(
164        &self,
165        observations: &[(String, f64, BTreeMap<String, String>)],
166    ) -> Result<(), ObsLogError> {
167        let ts = now_ms();
168        let txn = self
169            .db
170            .begin_write()
171            .map_err(|e| ObsLogError::Io(e.to_string()))?;
172        {
173            let mut table = txn
174                .open_table(OBS_TABLE)
175                .map_err(|e| ObsLogError::Io(e.to_string()))?;
176            for (i, (source, value, metadata)) in observations.iter().enumerate() {
177                // Offset each by 1ms to ensure unique keys within batch
178                let key = encode_key(source, ts + i as u64)?;
179                let obs = ObsValue {
180                    value: *value,
181                    metadata: metadata.clone(),
182                };
183                let val = rmp_serde::to_vec(&obs)
184                    .map_err(|e| ObsLogError::Serialization(e.to_string()))?;
185                table
186                    .insert(key.as_slice(), val.as_slice())
187                    .map_err(|e| ObsLogError::Io(e.to_string()))?;
188            }
189        }
190        txn.commit().map_err(|e| ObsLogError::Io(e.to_string()))?;
191        Ok(())
192    }
193
194    /// Query observations for a source since a given timestamp.
195    pub fn query(&self, source: &str, since_ts_ms: u64) -> Result<Vec<Observation>, ObsLogError> {
196        let start = encode_key(source, since_ts_ms)?;
197        let end = encode_key(source, u64::MAX)?;
198
199        let txn = self
200            .db
201            .begin_read()
202            .map_err(|e| ObsLogError::Io(e.to_string()))?;
203        let table = txn
204            .open_table(OBS_TABLE)
205            .map_err(|e| ObsLogError::Io(e.to_string()))?;
206
207        let mut results = Vec::new();
208        let range = table
209            .range(start.as_slice()..=end.as_slice())
210            .map_err(|e| ObsLogError::Io(e.to_string()))?;
211
212        for entry in range {
213            let (k, v) = entry.map_err(|e| ObsLogError::Io(e.to_string()))?;
214            let key_bytes = k.value();
215            let val_bytes = v.value();
216
217            if let Some((src, ts)) = decode_key(key_bytes) {
218                if src == source {
219                    let obs: ObsValue = rmp_serde::from_slice(val_bytes)
220                        .map_err(|e| ObsLogError::Serialization(e.to_string()))?;
221                    results.push(Observation {
222                        timestamp_ms: ts,
223                        source: src,
224                        value: obs.value,
225                        metadata: obs.metadata,
226                    });
227                }
228            }
229        }
230        Ok(results)
231    }
232
233    /// Get the most recent observation for a source.
234    pub fn query_latest(&self, source: &str) -> Result<Option<Observation>, ObsLogError> {
235        let start = encode_key(source, 0)?;
236        let end = encode_key(source, u64::MAX)?;
237
238        let txn = self
239            .db
240            .begin_read()
241            .map_err(|e| ObsLogError::Io(e.to_string()))?;
242        let table = txn
243            .open_table(OBS_TABLE)
244            .map_err(|e| ObsLogError::Io(e.to_string()))?;
245
246        let range = table
247            .range(start.as_slice()..=end.as_slice())
248            .map_err(|e| ObsLogError::Io(e.to_string()))?;
249
250        let mut latest: Option<Observation> = None;
251        for entry in range {
252            let (k, v) = entry.map_err(|e| ObsLogError::Io(e.to_string()))?;
253            if let Some((src, ts)) = decode_key(k.value()) {
254                if src == source {
255                    let obs: ObsValue = rmp_serde::from_slice(v.value())
256                        .map_err(|e| ObsLogError::Serialization(e.to_string()))?;
257                    latest = Some(Observation {
258                        timestamp_ms: ts,
259                        source: src,
260                        value: obs.value,
261                        metadata: obs.metadata,
262                    });
263                }
264            }
265        }
266        Ok(latest)
267    }
268
269    /// List distinct sources that have observations.
270    pub fn sources(&self) -> Result<Vec<String>, ObsLogError> {
271        let txn = self
272            .db
273            .begin_read()
274            .map_err(|e| ObsLogError::Io(e.to_string()))?;
275        let table = txn
276            .open_table(OBS_TABLE)
277            .map_err(|e| ObsLogError::Io(e.to_string()))?;
278
279        let mut seen = std::collections::BTreeSet::new();
280        let range = table.iter().map_err(|e| ObsLogError::Io(e.to_string()))?;
281
282        for entry in range {
283            let (k, _) = entry.map_err(|e| ObsLogError::Io(e.to_string()))?;
284            if let Some((src, _)) = decode_key(k.value()) {
285                seen.insert(src);
286            }
287        }
288        Ok(seen.into_iter().collect())
289    }
290
291    /// Delete all observations older than `before_ts_ms`. Returns count deleted.
292    pub fn truncate(&self, before_ts_ms: u64) -> Result<u64, ObsLogError> {
293        let txn = self
294            .db
295            .begin_write()
296            .map_err(|e| ObsLogError::Io(e.to_string()))?;
297        let mut deleted = 0u64;
298        {
299            let mut table = txn
300                .open_table(OBS_TABLE)
301                .map_err(|e| ObsLogError::Io(e.to_string()))?;
302
303            // Collect keys to delete (can't delete while iterating).
304            let mut to_delete = Vec::new();
305            {
306                let range = table.iter().map_err(|e| ObsLogError::Io(e.to_string()))?;
307                for entry in range {
308                    let (k, _) = entry.map_err(|e| ObsLogError::Io(e.to_string()))?;
309                    if let Some((_, ts)) = decode_key(k.value()) {
310                        if ts < before_ts_ms {
311                            to_delete.push(k.value().to_vec());
312                        }
313                    }
314                }
315            }
316
317            for key in &to_delete {
318                table
319                    .remove(key.as_slice())
320                    .map_err(|e| ObsLogError::Io(e.to_string()))?;
321                deleted += 1;
322            }
323        }
324        txn.commit().map_err(|e| ObsLogError::Io(e.to_string()))?;
325        Ok(deleted)
326    }
327
328    /// Total number of observations.
329    pub fn count(&self) -> Result<u64, ObsLogError> {
330        let txn = self
331            .db
332            .begin_read()
333            .map_err(|e| ObsLogError::Io(e.to_string()))?;
334        let table = txn
335            .open_table(OBS_TABLE)
336            .map_err(|e| ObsLogError::Io(e.to_string()))?;
337        let mut count = 0u64;
338        let iter = table.iter().map_err(|e| ObsLogError::Io(e.to_string()))?;
339        for _ in iter {
340            count += 1;
341        }
342        Ok(count)
343    }
344
345    /// Size of the redb file in bytes.
346    pub fn size_bytes(&self) -> u64 {
347        // redb doesn't expose file size directly; we approximate from metadata
348        // or use the file system
349        0 // Caller should use std::fs::metadata on the path
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356    use std::thread::sleep;
357    use std::time::Duration;
358
359    fn temp_log() -> ObservationLog {
360        let dir = tempfile::tempdir().unwrap();
361        let path = dir.path().join("test_obs.redb");
362        // Leak the dir so it's not cleaned up during test
363        let path_owned = path.to_path_buf();
364        std::mem::forget(dir);
365        ObservationLog::open(&path_owned, 86400).unwrap()
366    }
367
368    #[test]
369    fn test_append_and_query() {
370        let log = temp_log();
371        log.append("health.claro", 200.0, BTreeMap::new()).unwrap();
372        sleep(Duration::from_millis(2));
373        log.append("health.claro", 500.0, BTreeMap::new()).unwrap();
374
375        let all = log.query("health.claro", 0).unwrap();
376        assert_eq!(all.len(), 2);
377        assert_eq!(all[0].value, 200.0);
378        assert_eq!(all[1].value, 500.0);
379    }
380
381    #[test]
382    fn test_query_latest() {
383        let log = temp_log();
384        log.append("metrics.cpu", 45.0, BTreeMap::new()).unwrap();
385        sleep(Duration::from_millis(2));
386        log.append("metrics.cpu", 67.0, BTreeMap::new()).unwrap();
387
388        let latest = log.query_latest("metrics.cpu").unwrap().unwrap();
389        assert_eq!(latest.value, 67.0);
390    }
391
392    #[test]
393    fn test_query_latest_empty() {
394        let log = temp_log();
395        assert!(log.query_latest("nonexistent").unwrap().is_none());
396    }
397
398    #[test]
399    fn test_sources() {
400        let log = temp_log();
401        log.append("health.claro", 200.0, BTreeMap::new()).unwrap();
402        log.append("health.colibri", 200.0, BTreeMap::new())
403            .unwrap();
404        log.append("metrics.cpu", 45.0, BTreeMap::new()).unwrap();
405
406        let sources = log.sources().unwrap();
407        assert_eq!(
408            sources,
409            vec!["health.claro", "health.colibri", "metrics.cpu"]
410        );
411    }
412
413    #[test]
414    fn test_truncate() {
415        let log = temp_log();
416        log.append("health.claro", 200.0, BTreeMap::new()).unwrap();
417        sleep(Duration::from_millis(50));
418        let cutoff = now_ms();
419        sleep(Duration::from_millis(50));
420        log.append("health.claro", 201.0, BTreeMap::new()).unwrap();
421
422        let deleted = log.truncate(cutoff).unwrap();
423        assert_eq!(deleted, 1);
424
425        let remaining = log.query("health.claro", 0).unwrap();
426        assert_eq!(remaining.len(), 1);
427        assert_eq!(remaining[0].value, 201.0);
428    }
429
430    #[test]
431    fn test_count() {
432        let log = temp_log();
433        assert_eq!(log.count().unwrap(), 0);
434        log.append("a", 1.0, BTreeMap::new()).unwrap();
435        log.append("b", 2.0, BTreeMap::new()).unwrap();
436        assert_eq!(log.count().unwrap(), 2);
437    }
438
439    #[test]
440    fn test_metadata() {
441        let log = temp_log();
442        let mut meta = BTreeMap::new();
443        meta.insert("status_text".to_string(), "OK".to_string());
444        meta.insert("response_ms".to_string(), "45".to_string());
445        log.append("health.claro", 200.0, meta).unwrap();
446
447        let obs = log.query_latest("health.claro").unwrap().unwrap();
448        assert_eq!(obs.metadata.get("status_text").unwrap(), "OK");
449        assert_eq!(obs.metadata.get("response_ms").unwrap(), "45");
450    }
451
452    #[test]
453    fn test_batch_append() {
454        let log = temp_log();
455        let batch = vec![
456            ("health.a".to_string(), 200.0, BTreeMap::new()),
457            ("health.b".to_string(), 200.0, BTreeMap::new()),
458            ("metrics.cpu".to_string(), 55.0, BTreeMap::new()),
459        ];
460        log.append_batch(&batch).unwrap();
461        assert_eq!(log.count().unwrap(), 3);
462        assert_eq!(log.sources().unwrap().len(), 3);
463    }
464
465    #[test]
466    fn test_isolation_between_sources() {
467        let log = temp_log();
468        log.append("health.claro", 200.0, BTreeMap::new()).unwrap();
469        log.append("health.colibri", 500.0, BTreeMap::new())
470            .unwrap();
471
472        let claro = log.query("health.claro", 0).unwrap();
473        assert_eq!(claro.len(), 1);
474        assert_eq!(claro[0].value, 200.0);
475
476        let colibri = log.query("health.colibri", 0).unwrap();
477        assert_eq!(colibri.len(), 1);
478        assert_eq!(colibri[0].value, 500.0);
479    }
480}