Skip to main content

reddb_server/storage/timeseries/
log_pipeline.rs

1//! Log-workload helpers built on top of the hypertable + retention
2//! + continuous-aggregate primitives.
3//!
4//! This module gives application code a tight, typed surface to:
5//!
6//! * Batch-ingest structured log lines (`LogLine`) with explicit
7//!   severity levels and label maps.
8//! * Tail new lines since a timestamp watermark for streaming UIs.
9//! * Enforce retention windows alongside the daemon that already
10//!   ships in [`super::retention`].
11//!
12//! The underlying storage is the same [`super::hypertable::HypertableRegistry`];
13//! physical records live wherever the caller plugs the registry
14//! into. This module owns the log-shaped conveniences that make
15//! "log workload" idiomatic without forcing every consumer to
16//! hand-roll timestamp parsing + label hashing.
17
18use std::collections::HashMap;
19use std::sync::{Arc, Mutex};
20use std::time::{SystemTime, UNIX_EPOCH};
21
22use super::hypertable::{ChunkId, HypertableRegistry, HypertableSpec};
23use super::retention::{RetentionBackend, RetentionPolicy, RetentionRegistry};
24
25/// Severity levels mapped to the standard syslog numeric rank so
26/// downstream queries can do `WHERE severity >= WARN` cheaply.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
28pub enum LogSeverity {
29    Trace = 0,
30    Debug = 1,
31    Info = 2,
32    Warn = 3,
33    Error = 4,
34    Fatal = 5,
35}
36
37impl LogSeverity {
38    pub fn as_i64(self) -> i64 {
39        self as i64
40    }
41
42    pub fn token(self) -> &'static str {
43        match self {
44            LogSeverity::Trace => "TRACE",
45            LogSeverity::Debug => "DEBUG",
46            LogSeverity::Info => "INFO",
47            LogSeverity::Warn => "WARN",
48            LogSeverity::Error => "ERROR",
49            LogSeverity::Fatal => "FATAL",
50        }
51    }
52
53    pub fn from_token(token: &str) -> Option<LogSeverity> {
54        match token.to_ascii_uppercase().as_str() {
55            "TRACE" => Some(LogSeverity::Trace),
56            "DEBUG" => Some(LogSeverity::Debug),
57            "INFO" => Some(LogSeverity::Info),
58            "WARN" | "WARNING" => Some(LogSeverity::Warn),
59            "ERROR" | "ERR" => Some(LogSeverity::Error),
60            "FATAL" | "CRITICAL" => Some(LogSeverity::Fatal),
61            _ => None,
62        }
63    }
64}
65
66/// One structured log line. `labels` is the low-cardinality map
67/// (`service`, `region`, `severity_str`); `fields` carries typed
68/// extra payload (`latency_ms`, `status`, `bytes_out`). Keeping them
69/// separate lets the codec layer pick `Dict` for labels and
70/// `T64` / `Delta` for numeric fields.
71#[derive(Debug, Clone)]
72pub struct LogLine {
73    pub ts_ns: u64,
74    pub severity: LogSeverity,
75    pub service: String,
76    pub message: String,
77    pub labels: HashMap<String, String>,
78    pub numeric_fields: HashMap<String, f64>,
79    /// Optional trace / span identifiers — wiring for graph-based
80    /// span traversal comes later; the field lives here so the
81    /// ingest pipe doesn't need to reshape when that lands.
82    pub trace_id: Option<String>,
83    pub span_id: Option<String>,
84}
85
86impl LogLine {
87    pub fn now(
88        severity: LogSeverity,
89        service: impl Into<String>,
90        message: impl Into<String>,
91    ) -> Self {
92        let ts_ns = SystemTime::now()
93            .duration_since(UNIX_EPOCH)
94            .map(|d| d.as_nanos() as u64)
95            .unwrap_or(0);
96        Self {
97            ts_ns,
98            severity,
99            service: service.into(),
100            message: message.into(),
101            labels: HashMap::new(),
102            numeric_fields: HashMap::new(),
103            trace_id: None,
104            span_id: None,
105        }
106    }
107
108    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
109        self.labels.insert(key.into(), value.into());
110        self
111    }
112
113    pub fn with_field(mut self, key: impl Into<String>, value: f64) -> Self {
114        self.numeric_fields.insert(key.into(), value);
115        self
116    }
117
118    pub fn with_trace(mut self, trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
119        self.trace_id = Some(trace_id.into());
120        self.span_id = Some(span_id.into());
121        self
122    }
123}
124
125/// Stats surface for dashboards + health probes.
126#[derive(Debug, Default, Clone)]
127pub struct LogIngestStats {
128    pub lines_ingested: u64,
129    pub batches_flushed: u64,
130    pub chunks_touched: u64,
131    pub last_flush_unix_ns: u64,
132}
133
134/// Hypertable-backed log pipeline. Accepts batched writes, routes
135/// each line to the owning chunk, and exposes retention + tail APIs.
136#[derive(Clone)]
137pub struct LogPipeline {
138    name: String,
139    hypertables: HypertableRegistry,
140    retention: RetentionRegistry,
141    stats: Arc<Mutex<LogIngestStats>>,
142    /// Records tailed by `tail_since`. Kept in a small ring so the
143    /// tail API can back a `WATCH` endpoint without re-scanning
144    /// chunk metadata on every poll.
145    recent: Arc<Mutex<Vec<LogLine>>>,
146    recent_capacity: usize,
147}
148
149impl std::fmt::Debug for LogPipeline {
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        f.debug_struct("LogPipeline")
152            .field("name", &self.name)
153            .field("recent_capacity", &self.recent_capacity)
154            .finish()
155    }
156}
157
158impl LogPipeline {
159    /// Build a pipeline routing into `hypertable_name` with the
160    /// given chunk interval (e.g. `"1d"`).
161    pub fn new(
162        hypertable_name: impl Into<String>,
163        time_column: impl Into<String>,
164        chunk_interval: &str,
165    ) -> Option<Self> {
166        let spec = HypertableSpec::from_interval_string(
167            hypertable_name.into(),
168            time_column.into(),
169            chunk_interval,
170        )?;
171        let registry = HypertableRegistry::new();
172        registry.register(spec.clone());
173        Some(Self {
174            name: spec.name,
175            hypertables: registry,
176            retention: RetentionRegistry::new(),
177            stats: Arc::new(Mutex::new(LogIngestStats::default())),
178            recent: Arc::new(Mutex::new(Vec::new())),
179            recent_capacity: 4096,
180        })
181    }
182
183    pub fn with_recent_capacity(mut self, capacity: usize) -> Self {
184        self.recent_capacity = capacity.max(64);
185        self
186    }
187
188    pub fn hypertables(&self) -> &HypertableRegistry {
189        &self.hypertables
190    }
191
192    pub fn retention(&self) -> &RetentionRegistry {
193        &self.retention
194    }
195
196    pub fn name(&self) -> &str {
197        &self.name
198    }
199
200    /// Ingest one line. Returns the chunk it landed in (useful for
201    /// diagnostics and tests).
202    pub fn ingest_one(&self, line: LogLine) -> Option<ChunkId> {
203        let chunk = self.hypertables.route(&self.name, line.ts_ns)?;
204        self.record_recent(&line);
205        let mut stats = match self.stats.lock() {
206            Ok(g) => g,
207            Err(p) => p.into_inner(),
208        };
209        stats.lines_ingested += 1;
210        Some(chunk)
211    }
212
213    /// Ingest a batch. Routing + accounting happen under a single
214    /// lock per chunk boundary — typical tight-loop shape for
215    /// 100k lines/s ingest.
216    pub fn ingest_batch(&self, lines: &[LogLine]) -> u64 {
217        if lines.is_empty() {
218            return 0;
219        }
220        let mut distinct_chunks: Vec<ChunkId> = Vec::new();
221        for line in lines {
222            if let Some(id) = self.hypertables.route(&self.name, line.ts_ns) {
223                if distinct_chunks.last() != Some(&id) && !distinct_chunks.contains(&id) {
224                    distinct_chunks.push(id);
225                }
226                self.record_recent(line);
227            }
228        }
229        let mut stats = match self.stats.lock() {
230            Ok(g) => g,
231            Err(p) => p.into_inner(),
232        };
233        stats.lines_ingested += lines.len() as u64;
234        stats.batches_flushed += 1;
235        stats.chunks_touched = stats
236            .chunks_touched
237            .saturating_add(distinct_chunks.len() as u64);
238        stats.last_flush_unix_ns = SystemTime::now()
239            .duration_since(UNIX_EPOCH)
240            .map(|d| d.as_nanos() as u64)
241            .unwrap_or(0);
242        lines.len() as u64
243    }
244
245    fn record_recent(&self, line: &LogLine) {
246        let mut guard = match self.recent.lock() {
247            Ok(g) => g,
248            Err(p) => p.into_inner(),
249        };
250        guard.push(line.clone());
251        let overflow = guard.len().saturating_sub(self.recent_capacity);
252        if overflow > 0 {
253            guard.drain(0..overflow);
254        }
255    }
256
257    /// Return every line seen by `ingest_one` / `ingest_batch` with
258    /// `ts_ns > watermark`. Streaming UIs call this from a polling
259    /// loop or a `WATCH` subscription.
260    pub fn tail_since(&self, watermark_ns: u64) -> Vec<LogLine> {
261        let guard = match self.recent.lock() {
262            Ok(g) => g,
263            Err(p) => p.into_inner(),
264        };
265        guard
266            .iter()
267            .filter(|l| l.ts_ns > watermark_ns)
268            .cloned()
269            .collect()
270    }
271
272    /// Install / replace a retention policy for this pipeline's
273    /// hypertable. The daemon in [`super::retention::RetentionRegistry`]
274    /// is the consumer — wire it separately at startup.
275    pub fn set_retention(&self, max_age_secs: u64) {
276        self.retention
277            .set_policy(self.name.clone(), RetentionPolicy::from_secs(max_age_secs));
278    }
279
280    /// Declare a partition-level TTL — applies to every chunk that
281    /// belongs to this pipeline's hypertable. Combines with the
282    /// retention-daemon sweep: expired chunks disappear in O(1)
283    /// metadata updates instead of row-by-row deletes. `None`
284    /// clears the TTL.
285    pub fn set_partition_ttl(&self, ttl: &str) -> bool {
286        let ns = match super::retention::parse_duration_ns(ttl) {
287            Some(n) if n > 0 => n,
288            _ => return false,
289        };
290        self.hypertables.set_default_ttl_ns(&self.name, Some(ns));
291        true
292    }
293
294    /// Run the partition-level sweep once. Returns the chunks that
295    /// expired (their physical storage release is the caller's
296    /// responsibility — the registry only owns the metadata).
297    pub fn sweep_expired_chunks(&self, now_ns: u64) -> Vec<super::hypertable::ChunkMeta> {
298        self.hypertables.sweep_expired(&self.name, now_ns)
299    }
300
301    pub fn stats(&self) -> LogIngestStats {
302        let guard = match self.stats.lock() {
303            Ok(g) => g,
304            Err(p) => p.into_inner(),
305        };
306        guard.clone()
307    }
308
309    /// Count of lines matching a severity threshold — used by
310    /// health-check endpoints that alert on "ERROR rate in the last
311    /// minute". Looks at the in-memory tail buffer only; for the
312    /// full historical view go through `time_bucket` over the
313    /// hypertable.
314    pub fn recent_count_at_or_above(&self, severity: LogSeverity) -> u64 {
315        let guard = match self.recent.lock() {
316            Ok(g) => g,
317            Err(p) => p.into_inner(),
318        };
319        guard.iter().filter(|l| l.severity >= severity).count() as u64
320    }
321
322    /// Adapter so the retention daemon can see this pipeline's
323    /// hypertable. Returning `Arc<dyn RetentionBackend>` lets the
324    /// caller hand the backend to `RetentionRegistry::start`.
325    pub fn retention_backend(&self) -> Arc<dyn RetentionBackend> {
326        Arc::new(LogRetentionBackend {
327            name: self.name.clone(),
328            hypertables: self.hypertables.clone(),
329        })
330    }
331}
332
333struct LogRetentionBackend {
334    name: String,
335    hypertables: HypertableRegistry,
336}
337
338impl RetentionBackend for LogRetentionBackend {
339    fn time_series_collections(&self) -> Vec<String> {
340        vec![self.name.clone()]
341    }
342
343    fn drop_chunks_older_than(&self, collection: &str, cutoff_ns: u64) -> u64 {
344        if collection != self.name {
345            return 0;
346        }
347        self.hypertables
348            .drop_chunks_before(collection, cutoff_ns)
349            .len() as u64
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356
357    fn line_at(ts_ns: u64, sev: LogSeverity, msg: &str) -> LogLine {
358        LogLine {
359            ts_ns,
360            severity: sev,
361            service: "demo".into(),
362            message: msg.into(),
363            labels: HashMap::new(),
364            numeric_fields: HashMap::new(),
365            trace_id: None,
366            span_id: None,
367        }
368    }
369
370    #[test]
371    fn severity_token_round_trips() {
372        for s in [
373            LogSeverity::Trace,
374            LogSeverity::Debug,
375            LogSeverity::Info,
376            LogSeverity::Warn,
377            LogSeverity::Error,
378            LogSeverity::Fatal,
379        ] {
380            assert_eq!(LogSeverity::from_token(s.token()), Some(s));
381        }
382        // Aliases.
383        assert_eq!(LogSeverity::from_token("warning"), Some(LogSeverity::Warn));
384        assert_eq!(LogSeverity::from_token("err"), Some(LogSeverity::Error));
385        assert_eq!(
386            LogSeverity::from_token("critical"),
387            Some(LogSeverity::Fatal)
388        );
389        assert!(LogSeverity::from_token("nope").is_none());
390    }
391
392    #[test]
393    fn severity_comparisons_use_syslog_rank() {
394        assert!(LogSeverity::Error > LogSeverity::Warn);
395        assert!(LogSeverity::Warn > LogSeverity::Info);
396        assert_eq!(LogSeverity::Info.as_i64(), 2);
397    }
398
399    #[test]
400    fn ingest_one_routes_to_chunk_and_updates_stats() {
401        let pipe = LogPipeline::new("access_log", "ts", "1h").unwrap();
402        let id = pipe
403            .ingest_one(line_at(3_600_000_000_001, LogSeverity::Info, "hi"))
404            .unwrap();
405        assert_eq!(id.start_ns, 3_600_000_000_000);
406        assert_eq!(pipe.stats().lines_ingested, 1);
407        assert_eq!(pipe.hypertables().total_rows("access_log"), 1);
408    }
409
410    #[test]
411    fn ingest_batch_bumps_stats_with_distinct_chunk_count() {
412        let pipe = LogPipeline::new("logs", "ts", "1h").unwrap();
413        let lines: Vec<_> = (0..5)
414            .map(|i| line_at(i * 3_600_000_000_000, LogSeverity::Info, "x"))
415            .collect();
416        let written = pipe.ingest_batch(&lines);
417        assert_eq!(written, 5);
418        let stats = pipe.stats();
419        assert_eq!(stats.lines_ingested, 5);
420        assert_eq!(stats.batches_flushed, 1);
421        assert_eq!(stats.chunks_touched, 5);
422    }
423
424    #[test]
425    fn tail_since_returns_only_newer_lines() {
426        let pipe = LogPipeline::new("logs", "ts", "1h").unwrap();
427        for t in [10, 20, 30, 40] {
428            pipe.ingest_one(line_at(t, LogSeverity::Info, "m"));
429        }
430        let tailed = pipe.tail_since(25);
431        assert_eq!(tailed.len(), 2);
432        assert_eq!(tailed[0].ts_ns, 30);
433        assert_eq!(tailed[1].ts_ns, 40);
434    }
435
436    #[test]
437    fn recent_ring_respects_capacity() {
438        let pipe = LogPipeline::new("logs", "ts", "1h")
439            .unwrap()
440            .with_recent_capacity(100);
441        for t in 0..250 {
442            pipe.ingest_one(line_at(t, LogSeverity::Info, "m"));
443        }
444        let all = pipe.tail_since(0);
445        assert_eq!(all.len(), 100, "only the last 100 lines should remain");
446        assert_eq!(all[0].ts_ns, 150);
447        assert_eq!(all.last().unwrap().ts_ns, 249);
448    }
449
450    #[test]
451    fn recent_count_at_or_above_filters_correctly() {
452        let pipe = LogPipeline::new("logs", "ts", "1h").unwrap();
453        pipe.ingest_one(line_at(1, LogSeverity::Debug, "d"));
454        pipe.ingest_one(line_at(2, LogSeverity::Info, "i"));
455        pipe.ingest_one(line_at(3, LogSeverity::Warn, "w"));
456        pipe.ingest_one(line_at(4, LogSeverity::Error, "e"));
457        pipe.ingest_one(line_at(5, LogSeverity::Fatal, "f"));
458        assert_eq!(pipe.recent_count_at_or_above(LogSeverity::Warn), 3);
459        assert_eq!(pipe.recent_count_at_or_above(LogSeverity::Error), 2);
460    }
461
462    #[test]
463    fn retention_backend_drops_expired_chunks() {
464        let pipe = LogPipeline::new("logs", "ts", "1h").unwrap();
465        // Three chunks (0, 1h, 2h).
466        for t in [0, 3_600_000_000_000, 7_200_000_000_000] {
467            pipe.ingest_one(line_at(t, LogSeverity::Info, "m"));
468        }
469        assert_eq!(pipe.hypertables().show_chunks("logs").len(), 3);
470
471        pipe.set_retention(3_600); // keep last hour only
472        let backend = pipe.retention_backend();
473        // cutoff exactly at 1h boundary drops chunks whose max_ts ≤ cutoff.
474        let dropped = backend.drop_chunks_older_than("logs", 3_600_000_000_000);
475        assert_eq!(dropped, 2);
476        assert_eq!(pipe.hypertables().show_chunks("logs").len(), 1);
477    }
478
479    #[test]
480    fn log_line_builder_composes_labels_and_fields() {
481        let line = LogLine::now(LogSeverity::Error, "api", "boom")
482            .with_label("region", "us-east-1")
483            .with_field("latency_ms", 230.0)
484            .with_trace("trace-42", "span-7");
485        assert_eq!(line.service, "api");
486        assert_eq!(line.severity, LogSeverity::Error);
487        assert_eq!(line.labels.get("region").unwrap(), "us-east-1");
488        assert_eq!(line.numeric_fields.get("latency_ms").unwrap(), &230.0);
489        assert_eq!(line.trace_id.as_deref(), Some("trace-42"));
490    }
491
492    #[test]
493    fn pipeline_without_valid_interval_returns_none() {
494        assert!(LogPipeline::new("x", "ts", "raw").is_none());
495        assert!(LogPipeline::new("x", "ts", "bogus").is_none());
496    }
497
498    #[test]
499    fn partition_ttl_sweep_drops_expired_chunks() {
500        let pipe = LogPipeline::new("logs", "ts", "1h").unwrap();
501        assert!(pipe.set_partition_ttl("2h"));
502        const HOUR: u64 = 3_600_000_000_000;
503        // Three hourly chunks with exactly one row each at the
504        // chunk boundary — so max_ts = 0, 1h, 2h.
505        for t in [0, HOUR, 2 * HOUR] {
506            pipe.ingest_one(line_at(t, LogSeverity::Info, "m"));
507        }
508        // now = 3h + 1ns → expiries are 2h, 3h, 4h. The 3h chunk
509        // (max_ts=1h) is exactly at its expiry (1h+2h=3h) so it
510        // qualifies; the 4h one does not. Plus the 0-start chunk.
511        let dropped = pipe.sweep_expired_chunks(3 * HOUR + 1);
512        assert_eq!(dropped.len(), 2);
513        assert_eq!(pipe.hypertables().show_chunks("logs").len(), 1);
514    }
515
516    #[test]
517    fn partition_ttl_rejects_invalid_duration() {
518        let pipe = LogPipeline::new("logs", "ts", "1h").unwrap();
519        assert!(!pipe.set_partition_ttl("raw"));
520        assert!(!pipe.set_partition_ttl("nonsense"));
521        // A valid TTL still works after a rejected call.
522        assert!(pipe.set_partition_ttl("1d"));
523    }
524}