1use std::collections::BTreeMap;
11use std::path::Path;
12use std::time::{SystemTime, UNIX_EPOCH};
13
14use redb::{Database, ReadableTable, TableDefinition};
15
16const OBS_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("observations");
19
20const OBS_META: TableDefinition<&str, &[u8]> = TableDefinition::new("obs_meta");
22
23#[derive(Debug, Clone)]
25pub struct Observation {
26 pub timestamp_ms: u64,
27 pub source: String,
28 pub value: f64,
29 pub metadata: BTreeMap<String, String>,
30}
31
32#[derive(serde::Serialize, serde::Deserialize)]
34struct ObsValue {
35 value: f64,
36 metadata: BTreeMap<String, String>,
37}
38
39fn encode_key(source: &str, timestamp_ms: u64) -> Result<Vec<u8>, ObsLogError> {
43 let src = source.as_bytes();
44 if src.len() > u16::MAX as usize {
45 return Err(ObsLogError::Io(format!(
46 "source name too long: {} bytes (max {})",
47 src.len(),
48 u16::MAX
49 )));
50 }
51 let mut key = Vec::with_capacity(2 + src.len() + 8);
52 key.extend_from_slice(&(src.len() as u16).to_be_bytes());
53 key.extend_from_slice(src);
54 key.extend_from_slice(×tamp_ms.to_be_bytes());
55 Ok(key)
56}
57
58fn decode_key(key: &[u8]) -> Option<(String, u64)> {
60 if key.len() < 10 {
61 return None;
62 }
63 let src_len = u16::from_be_bytes([key[0], key[1]]) as usize;
64 if key.len() < 2 + src_len + 8 {
65 return None;
66 }
67 let source = String::from_utf8_lossy(&key[2..2 + src_len]).to_string();
68 let ts_bytes: [u8; 8] = key[2 + src_len..2 + src_len + 8].try_into().ok()?;
69 let timestamp_ms = u64::from_be_bytes(ts_bytes);
70 Some((source, timestamp_ms))
71}
72
73fn now_ms() -> u64 {
74 SystemTime::now()
75 .duration_since(UNIX_EPOCH)
76 .unwrap_or_default()
77 .as_millis() as u64
78}
79
80#[derive(Debug)]
81pub enum ObsLogError {
82 Io(String),
83 Serialization(String),
84}
85
86impl std::fmt::Display for ObsLogError {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 match self {
89 ObsLogError::Io(s) => write!(f, "ObsLog I/O error: {}", s),
90 ObsLogError::Serialization(s) => write!(f, "ObsLog serialization error: {}", s),
91 }
92 }
93}
94
95impl std::error::Error for ObsLogError {}
96
97pub struct ObservationLog {
99 db: Database,
100 pub max_age_secs: u64,
101}
102
103impl ObservationLog {
104 pub fn open(path: &Path, max_age_secs: u64) -> Result<Self, ObsLogError> {
106 let db = Database::create(path).map_err(|e| ObsLogError::Io(e.to_string()))?;
107
108 #[cfg(unix)]
110 {
111 use std::os::unix::fs::PermissionsExt;
112 let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
113 }
114
115 {
117 let txn = db
118 .begin_write()
119 .map_err(|e| ObsLogError::Io(e.to_string()))?;
120 {
121 let _t = txn
122 .open_table(OBS_TABLE)
123 .map_err(|e| ObsLogError::Io(e.to_string()))?;
124 let _m = txn
125 .open_table(OBS_META)
126 .map_err(|e| ObsLogError::Io(e.to_string()))?;
127 }
128 txn.commit().map_err(|e| ObsLogError::Io(e.to_string()))?;
129 }
130
131 Ok(Self { db, max_age_secs })
132 }
133
134 pub fn append(
136 &self,
137 source: &str,
138 value: f64,
139 metadata: BTreeMap<String, String>,
140 ) -> Result<(), ObsLogError> {
141 let ts = now_ms();
142 let key = encode_key(source, ts)?;
143 let obs = ObsValue { value, metadata };
144 let val = rmp_serde::to_vec(&obs).map_err(|e| ObsLogError::Serialization(e.to_string()))?;
145
146 let txn = self
147 .db
148 .begin_write()
149 .map_err(|e| ObsLogError::Io(e.to_string()))?;
150 {
151 let mut table = txn
152 .open_table(OBS_TABLE)
153 .map_err(|e| ObsLogError::Io(e.to_string()))?;
154 table
155 .insert(key.as_slice(), val.as_slice())
156 .map_err(|e| ObsLogError::Io(e.to_string()))?;
157 }
158 txn.commit().map_err(|e| ObsLogError::Io(e.to_string()))?;
159 Ok(())
160 }
161
162 pub fn append_batch(
164 &self,
165 observations: &[(String, f64, BTreeMap<String, String>)],
166 ) -> Result<(), ObsLogError> {
167 let ts = now_ms();
168 let txn = self
169 .db
170 .begin_write()
171 .map_err(|e| ObsLogError::Io(e.to_string()))?;
172 {
173 let mut table = txn
174 .open_table(OBS_TABLE)
175 .map_err(|e| ObsLogError::Io(e.to_string()))?;
176 for (i, (source, value, metadata)) in observations.iter().enumerate() {
177 let key = encode_key(source, ts + i as u64)?;
179 let obs = ObsValue {
180 value: *value,
181 metadata: metadata.clone(),
182 };
183 let val = rmp_serde::to_vec(&obs)
184 .map_err(|e| ObsLogError::Serialization(e.to_string()))?;
185 table
186 .insert(key.as_slice(), val.as_slice())
187 .map_err(|e| ObsLogError::Io(e.to_string()))?;
188 }
189 }
190 txn.commit().map_err(|e| ObsLogError::Io(e.to_string()))?;
191 Ok(())
192 }
193
194 pub fn query(&self, source: &str, since_ts_ms: u64) -> Result<Vec<Observation>, ObsLogError> {
196 let start = encode_key(source, since_ts_ms)?;
197 let end = encode_key(source, u64::MAX)?;
198
199 let txn = self
200 .db
201 .begin_read()
202 .map_err(|e| ObsLogError::Io(e.to_string()))?;
203 let table = txn
204 .open_table(OBS_TABLE)
205 .map_err(|e| ObsLogError::Io(e.to_string()))?;
206
207 let mut results = Vec::new();
208 let range = table
209 .range(start.as_slice()..=end.as_slice())
210 .map_err(|e| ObsLogError::Io(e.to_string()))?;
211
212 for entry in range {
213 let (k, v) = entry.map_err(|e| ObsLogError::Io(e.to_string()))?;
214 let key_bytes = k.value();
215 let val_bytes = v.value();
216
217 if let Some((src, ts)) = decode_key(key_bytes) {
218 if src == source {
219 let obs: ObsValue = rmp_serde::from_slice(val_bytes)
220 .map_err(|e| ObsLogError::Serialization(e.to_string()))?;
221 results.push(Observation {
222 timestamp_ms: ts,
223 source: src,
224 value: obs.value,
225 metadata: obs.metadata,
226 });
227 }
228 }
229 }
230 Ok(results)
231 }
232
233 pub fn query_latest(&self, source: &str) -> Result<Option<Observation>, ObsLogError> {
235 let start = encode_key(source, 0)?;
236 let end = encode_key(source, u64::MAX)?;
237
238 let txn = self
239 .db
240 .begin_read()
241 .map_err(|e| ObsLogError::Io(e.to_string()))?;
242 let table = txn
243 .open_table(OBS_TABLE)
244 .map_err(|e| ObsLogError::Io(e.to_string()))?;
245
246 let range = table
247 .range(start.as_slice()..=end.as_slice())
248 .map_err(|e| ObsLogError::Io(e.to_string()))?;
249
250 let mut latest: Option<Observation> = None;
251 for entry in range {
252 let (k, v) = entry.map_err(|e| ObsLogError::Io(e.to_string()))?;
253 if let Some((src, ts)) = decode_key(k.value()) {
254 if src == source {
255 let obs: ObsValue = rmp_serde::from_slice(v.value())
256 .map_err(|e| ObsLogError::Serialization(e.to_string()))?;
257 latest = Some(Observation {
258 timestamp_ms: ts,
259 source: src,
260 value: obs.value,
261 metadata: obs.metadata,
262 });
263 }
264 }
265 }
266 Ok(latest)
267 }
268
269 pub fn sources(&self) -> Result<Vec<String>, ObsLogError> {
271 let txn = self
272 .db
273 .begin_read()
274 .map_err(|e| ObsLogError::Io(e.to_string()))?;
275 let table = txn
276 .open_table(OBS_TABLE)
277 .map_err(|e| ObsLogError::Io(e.to_string()))?;
278
279 let mut seen = std::collections::BTreeSet::new();
280 let range = table.iter().map_err(|e| ObsLogError::Io(e.to_string()))?;
281
282 for entry in range {
283 let (k, _) = entry.map_err(|e| ObsLogError::Io(e.to_string()))?;
284 if let Some((src, _)) = decode_key(k.value()) {
285 seen.insert(src);
286 }
287 }
288 Ok(seen.into_iter().collect())
289 }
290
291 pub fn truncate(&self, before_ts_ms: u64) -> Result<u64, ObsLogError> {
293 let txn = self
294 .db
295 .begin_write()
296 .map_err(|e| ObsLogError::Io(e.to_string()))?;
297 let mut deleted = 0u64;
298 {
299 let mut table = txn
300 .open_table(OBS_TABLE)
301 .map_err(|e| ObsLogError::Io(e.to_string()))?;
302
303 let mut to_delete = Vec::new();
305 {
306 let range = table.iter().map_err(|e| ObsLogError::Io(e.to_string()))?;
307 for entry in range {
308 let (k, _) = entry.map_err(|e| ObsLogError::Io(e.to_string()))?;
309 if let Some((_, ts)) = decode_key(k.value()) {
310 if ts < before_ts_ms {
311 to_delete.push(k.value().to_vec());
312 }
313 }
314 }
315 }
316
317 for key in &to_delete {
318 table
319 .remove(key.as_slice())
320 .map_err(|e| ObsLogError::Io(e.to_string()))?;
321 deleted += 1;
322 }
323 }
324 txn.commit().map_err(|e| ObsLogError::Io(e.to_string()))?;
325 Ok(deleted)
326 }
327
328 pub fn count(&self) -> Result<u64, ObsLogError> {
330 let txn = self
331 .db
332 .begin_read()
333 .map_err(|e| ObsLogError::Io(e.to_string()))?;
334 let table = txn
335 .open_table(OBS_TABLE)
336 .map_err(|e| ObsLogError::Io(e.to_string()))?;
337 let mut count = 0u64;
338 let iter = table.iter().map_err(|e| ObsLogError::Io(e.to_string()))?;
339 for _ in iter {
340 count += 1;
341 }
342 Ok(count)
343 }
344
345 pub fn size_bytes(&self) -> u64 {
347 0 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356 use std::thread::sleep;
357 use std::time::Duration;
358
359 fn temp_log() -> ObservationLog {
360 let dir = tempfile::tempdir().unwrap();
361 let path = dir.path().join("test_obs.redb");
362 let path_owned = path.to_path_buf();
364 std::mem::forget(dir);
365 ObservationLog::open(&path_owned, 86400).unwrap()
366 }
367
368 #[test]
369 fn test_append_and_query() {
370 let log = temp_log();
371 log.append("health.claro", 200.0, BTreeMap::new()).unwrap();
372 sleep(Duration::from_millis(2));
373 log.append("health.claro", 500.0, BTreeMap::new()).unwrap();
374
375 let all = log.query("health.claro", 0).unwrap();
376 assert_eq!(all.len(), 2);
377 assert_eq!(all[0].value, 200.0);
378 assert_eq!(all[1].value, 500.0);
379 }
380
381 #[test]
382 fn test_query_latest() {
383 let log = temp_log();
384 log.append("metrics.cpu", 45.0, BTreeMap::new()).unwrap();
385 sleep(Duration::from_millis(2));
386 log.append("metrics.cpu", 67.0, BTreeMap::new()).unwrap();
387
388 let latest = log.query_latest("metrics.cpu").unwrap().unwrap();
389 assert_eq!(latest.value, 67.0);
390 }
391
392 #[test]
393 fn test_query_latest_empty() {
394 let log = temp_log();
395 assert!(log.query_latest("nonexistent").unwrap().is_none());
396 }
397
398 #[test]
399 fn test_sources() {
400 let log = temp_log();
401 log.append("health.claro", 200.0, BTreeMap::new()).unwrap();
402 log.append("health.colibri", 200.0, BTreeMap::new())
403 .unwrap();
404 log.append("metrics.cpu", 45.0, BTreeMap::new()).unwrap();
405
406 let sources = log.sources().unwrap();
407 assert_eq!(
408 sources,
409 vec!["health.claro", "health.colibri", "metrics.cpu"]
410 );
411 }
412
413 #[test]
414 fn test_truncate() {
415 let log = temp_log();
416 log.append("health.claro", 200.0, BTreeMap::new()).unwrap();
417 sleep(Duration::from_millis(50));
418 let cutoff = now_ms();
419 sleep(Duration::from_millis(50));
420 log.append("health.claro", 201.0, BTreeMap::new()).unwrap();
421
422 let deleted = log.truncate(cutoff).unwrap();
423 assert_eq!(deleted, 1);
424
425 let remaining = log.query("health.claro", 0).unwrap();
426 assert_eq!(remaining.len(), 1);
427 assert_eq!(remaining[0].value, 201.0);
428 }
429
430 #[test]
431 fn test_count() {
432 let log = temp_log();
433 assert_eq!(log.count().unwrap(), 0);
434 log.append("a", 1.0, BTreeMap::new()).unwrap();
435 log.append("b", 2.0, BTreeMap::new()).unwrap();
436 assert_eq!(log.count().unwrap(), 2);
437 }
438
439 #[test]
440 fn test_metadata() {
441 let log = temp_log();
442 let mut meta = BTreeMap::new();
443 meta.insert("status_text".to_string(), "OK".to_string());
444 meta.insert("response_ms".to_string(), "45".to_string());
445 log.append("health.claro", 200.0, meta).unwrap();
446
447 let obs = log.query_latest("health.claro").unwrap().unwrap();
448 assert_eq!(obs.metadata.get("status_text").unwrap(), "OK");
449 assert_eq!(obs.metadata.get("response_ms").unwrap(), "45");
450 }
451
452 #[test]
453 fn test_batch_append() {
454 let log = temp_log();
455 let batch = vec![
456 ("health.a".to_string(), 200.0, BTreeMap::new()),
457 ("health.b".to_string(), 200.0, BTreeMap::new()),
458 ("metrics.cpu".to_string(), 55.0, BTreeMap::new()),
459 ];
460 log.append_batch(&batch).unwrap();
461 assert_eq!(log.count().unwrap(), 3);
462 assert_eq!(log.sources().unwrap().len(), 3);
463 }
464
465 #[test]
466 fn test_isolation_between_sources() {
467 let log = temp_log();
468 log.append("health.claro", 200.0, BTreeMap::new()).unwrap();
469 log.append("health.colibri", 500.0, BTreeMap::new())
470 .unwrap();
471
472 let claro = log.query("health.claro", 0).unwrap();
473 assert_eq!(claro.len(), 1);
474 assert_eq!(claro[0].value, 200.0);
475
476 let colibri = log.query("health.colibri", 0).unwrap();
477 assert_eq!(colibri.len(), 1);
478 assert_eq!(colibri[0].value, 500.0);
479 }
480}