1use std::collections::HashMap;
19use std::sync::{Arc, Mutex};
20use std::time::{SystemTime, UNIX_EPOCH};
21
22use super::hypertable::{ChunkId, HypertableRegistry, HypertableSpec};
23use super::retention::{RetentionBackend, RetentionPolicy, RetentionRegistry};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
28pub enum LogSeverity {
29 Trace = 0,
30 Debug = 1,
31 Info = 2,
32 Warn = 3,
33 Error = 4,
34 Fatal = 5,
35}
36
37impl LogSeverity {
38 pub fn as_i64(self) -> i64 {
39 self as i64
40 }
41
42 pub fn token(self) -> &'static str {
43 match self {
44 LogSeverity::Trace => "TRACE",
45 LogSeverity::Debug => "DEBUG",
46 LogSeverity::Info => "INFO",
47 LogSeverity::Warn => "WARN",
48 LogSeverity::Error => "ERROR",
49 LogSeverity::Fatal => "FATAL",
50 }
51 }
52
53 pub fn from_token(token: &str) -> Option<LogSeverity> {
54 match token.to_ascii_uppercase().as_str() {
55 "TRACE" => Some(LogSeverity::Trace),
56 "DEBUG" => Some(LogSeverity::Debug),
57 "INFO" => Some(LogSeverity::Info),
58 "WARN" | "WARNING" => Some(LogSeverity::Warn),
59 "ERROR" | "ERR" => Some(LogSeverity::Error),
60 "FATAL" | "CRITICAL" => Some(LogSeverity::Fatal),
61 _ => None,
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
72pub struct LogLine {
73 pub ts_ns: u64,
74 pub severity: LogSeverity,
75 pub service: String,
76 pub message: String,
77 pub labels: HashMap<String, String>,
78 pub numeric_fields: HashMap<String, f64>,
79 pub trace_id: Option<String>,
83 pub span_id: Option<String>,
84}
85
86impl LogLine {
87 pub fn now(
88 severity: LogSeverity,
89 service: impl Into<String>,
90 message: impl Into<String>,
91 ) -> Self {
92 let ts_ns = SystemTime::now()
93 .duration_since(UNIX_EPOCH)
94 .map(|d| d.as_nanos() as u64)
95 .unwrap_or(0);
96 Self {
97 ts_ns,
98 severity,
99 service: service.into(),
100 message: message.into(),
101 labels: HashMap::new(),
102 numeric_fields: HashMap::new(),
103 trace_id: None,
104 span_id: None,
105 }
106 }
107
108 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
109 self.labels.insert(key.into(), value.into());
110 self
111 }
112
113 pub fn with_field(mut self, key: impl Into<String>, value: f64) -> Self {
114 self.numeric_fields.insert(key.into(), value);
115 self
116 }
117
118 pub fn with_trace(mut self, trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
119 self.trace_id = Some(trace_id.into());
120 self.span_id = Some(span_id.into());
121 self
122 }
123}
124
125#[derive(Debug, Default, Clone)]
127pub struct LogIngestStats {
128 pub lines_ingested: u64,
129 pub batches_flushed: u64,
130 pub chunks_touched: u64,
131 pub last_flush_unix_ns: u64,
132}
133
134#[derive(Clone)]
137pub struct LogPipeline {
138 name: String,
139 hypertables: HypertableRegistry,
140 retention: RetentionRegistry,
141 stats: Arc<Mutex<LogIngestStats>>,
142 recent: Arc<Mutex<Vec<LogLine>>>,
146 recent_capacity: usize,
147}
148
149impl std::fmt::Debug for LogPipeline {
150 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 f.debug_struct("LogPipeline")
152 .field("name", &self.name)
153 .field("recent_capacity", &self.recent_capacity)
154 .finish()
155 }
156}
157
158impl LogPipeline {
159 pub fn new(
162 hypertable_name: impl Into<String>,
163 time_column: impl Into<String>,
164 chunk_interval: &str,
165 ) -> Option<Self> {
166 let spec = HypertableSpec::from_interval_string(
167 hypertable_name.into(),
168 time_column.into(),
169 chunk_interval,
170 )?;
171 let registry = HypertableRegistry::new();
172 registry.register(spec.clone());
173 Some(Self {
174 name: spec.name,
175 hypertables: registry,
176 retention: RetentionRegistry::new(),
177 stats: Arc::new(Mutex::new(LogIngestStats::default())),
178 recent: Arc::new(Mutex::new(Vec::new())),
179 recent_capacity: 4096,
180 })
181 }
182
183 pub fn with_recent_capacity(mut self, capacity: usize) -> Self {
184 self.recent_capacity = capacity.max(64);
185 self
186 }
187
188 pub fn hypertables(&self) -> &HypertableRegistry {
189 &self.hypertables
190 }
191
192 pub fn retention(&self) -> &RetentionRegistry {
193 &self.retention
194 }
195
196 pub fn name(&self) -> &str {
197 &self.name
198 }
199
200 pub fn ingest_one(&self, line: LogLine) -> Option<ChunkId> {
203 let chunk = self.hypertables.route(&self.name, line.ts_ns)?;
204 self.record_recent(&line);
205 let mut stats = match self.stats.lock() {
206 Ok(g) => g,
207 Err(p) => p.into_inner(),
208 };
209 stats.lines_ingested += 1;
210 Some(chunk)
211 }
212
213 pub fn ingest_batch(&self, lines: &[LogLine]) -> u64 {
217 if lines.is_empty() {
218 return 0;
219 }
220 let mut distinct_chunks: Vec<ChunkId> = Vec::new();
221 for line in lines {
222 if let Some(id) = self.hypertables.route(&self.name, line.ts_ns) {
223 if distinct_chunks.last() != Some(&id) && !distinct_chunks.contains(&id) {
224 distinct_chunks.push(id);
225 }
226 self.record_recent(line);
227 }
228 }
229 let mut stats = match self.stats.lock() {
230 Ok(g) => g,
231 Err(p) => p.into_inner(),
232 };
233 stats.lines_ingested += lines.len() as u64;
234 stats.batches_flushed += 1;
235 stats.chunks_touched = stats
236 .chunks_touched
237 .saturating_add(distinct_chunks.len() as u64);
238 stats.last_flush_unix_ns = SystemTime::now()
239 .duration_since(UNIX_EPOCH)
240 .map(|d| d.as_nanos() as u64)
241 .unwrap_or(0);
242 lines.len() as u64
243 }
244
245 fn record_recent(&self, line: &LogLine) {
246 let mut guard = match self.recent.lock() {
247 Ok(g) => g,
248 Err(p) => p.into_inner(),
249 };
250 guard.push(line.clone());
251 let overflow = guard.len().saturating_sub(self.recent_capacity);
252 if overflow > 0 {
253 guard.drain(0..overflow);
254 }
255 }
256
257 pub fn tail_since(&self, watermark_ns: u64) -> Vec<LogLine> {
261 let guard = match self.recent.lock() {
262 Ok(g) => g,
263 Err(p) => p.into_inner(),
264 };
265 guard
266 .iter()
267 .filter(|l| l.ts_ns > watermark_ns)
268 .cloned()
269 .collect()
270 }
271
272 pub fn set_retention(&self, max_age_secs: u64) {
276 self.retention
277 .set_policy(self.name.clone(), RetentionPolicy::from_secs(max_age_secs));
278 }
279
280 pub fn set_partition_ttl(&self, ttl: &str) -> bool {
286 let ns = match super::retention::parse_duration_ns(ttl) {
287 Some(n) if n > 0 => n,
288 _ => return false,
289 };
290 self.hypertables.set_default_ttl_ns(&self.name, Some(ns));
291 true
292 }
293
294 pub fn sweep_expired_chunks(&self, now_ns: u64) -> Vec<super::hypertable::ChunkMeta> {
298 self.hypertables.sweep_expired(&self.name, now_ns)
299 }
300
301 pub fn stats(&self) -> LogIngestStats {
302 let guard = match self.stats.lock() {
303 Ok(g) => g,
304 Err(p) => p.into_inner(),
305 };
306 guard.clone()
307 }
308
309 pub fn recent_count_at_or_above(&self, severity: LogSeverity) -> u64 {
315 let guard = match self.recent.lock() {
316 Ok(g) => g,
317 Err(p) => p.into_inner(),
318 };
319 guard.iter().filter(|l| l.severity >= severity).count() as u64
320 }
321
322 pub fn retention_backend(&self) -> Arc<dyn RetentionBackend> {
326 Arc::new(LogRetentionBackend {
327 name: self.name.clone(),
328 hypertables: self.hypertables.clone(),
329 })
330 }
331}
332
333struct LogRetentionBackend {
334 name: String,
335 hypertables: HypertableRegistry,
336}
337
338impl RetentionBackend for LogRetentionBackend {
339 fn time_series_collections(&self) -> Vec<String> {
340 vec![self.name.clone()]
341 }
342
343 fn drop_chunks_older_than(&self, collection: &str, cutoff_ns: u64) -> u64 {
344 if collection != self.name {
345 return 0;
346 }
347 self.hypertables
348 .drop_chunks_before(collection, cutoff_ns)
349 .len() as u64
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356
357 fn line_at(ts_ns: u64, sev: LogSeverity, msg: &str) -> LogLine {
358 LogLine {
359 ts_ns,
360 severity: sev,
361 service: "demo".into(),
362 message: msg.into(),
363 labels: HashMap::new(),
364 numeric_fields: HashMap::new(),
365 trace_id: None,
366 span_id: None,
367 }
368 }
369
370 #[test]
371 fn severity_token_round_trips() {
372 for s in [
373 LogSeverity::Trace,
374 LogSeverity::Debug,
375 LogSeverity::Info,
376 LogSeverity::Warn,
377 LogSeverity::Error,
378 LogSeverity::Fatal,
379 ] {
380 assert_eq!(LogSeverity::from_token(s.token()), Some(s));
381 }
382 assert_eq!(LogSeverity::from_token("warning"), Some(LogSeverity::Warn));
384 assert_eq!(LogSeverity::from_token("err"), Some(LogSeverity::Error));
385 assert_eq!(
386 LogSeverity::from_token("critical"),
387 Some(LogSeverity::Fatal)
388 );
389 assert!(LogSeverity::from_token("nope").is_none());
390 }
391
392 #[test]
393 fn severity_comparisons_use_syslog_rank() {
394 assert!(LogSeverity::Error > LogSeverity::Warn);
395 assert!(LogSeverity::Warn > LogSeverity::Info);
396 assert_eq!(LogSeverity::Info.as_i64(), 2);
397 }
398
399 #[test]
400 fn ingest_one_routes_to_chunk_and_updates_stats() {
401 let pipe = LogPipeline::new("access_log", "ts", "1h").unwrap();
402 let id = pipe
403 .ingest_one(line_at(3_600_000_000_001, LogSeverity::Info, "hi"))
404 .unwrap();
405 assert_eq!(id.start_ns, 3_600_000_000_000);
406 assert_eq!(pipe.stats().lines_ingested, 1);
407 assert_eq!(pipe.hypertables().total_rows("access_log"), 1);
408 }
409
410 #[test]
411 fn ingest_batch_bumps_stats_with_distinct_chunk_count() {
412 let pipe = LogPipeline::new("logs", "ts", "1h").unwrap();
413 let lines: Vec<_> = (0..5)
414 .map(|i| line_at(i * 3_600_000_000_000, LogSeverity::Info, "x"))
415 .collect();
416 let written = pipe.ingest_batch(&lines);
417 assert_eq!(written, 5);
418 let stats = pipe.stats();
419 assert_eq!(stats.lines_ingested, 5);
420 assert_eq!(stats.batches_flushed, 1);
421 assert_eq!(stats.chunks_touched, 5);
422 }
423
424 #[test]
425 fn tail_since_returns_only_newer_lines() {
426 let pipe = LogPipeline::new("logs", "ts", "1h").unwrap();
427 for t in [10, 20, 30, 40] {
428 pipe.ingest_one(line_at(t, LogSeverity::Info, "m"));
429 }
430 let tailed = pipe.tail_since(25);
431 assert_eq!(tailed.len(), 2);
432 assert_eq!(tailed[0].ts_ns, 30);
433 assert_eq!(tailed[1].ts_ns, 40);
434 }
435
436 #[test]
437 fn recent_ring_respects_capacity() {
438 let pipe = LogPipeline::new("logs", "ts", "1h")
439 .unwrap()
440 .with_recent_capacity(100);
441 for t in 0..250 {
442 pipe.ingest_one(line_at(t, LogSeverity::Info, "m"));
443 }
444 let all = pipe.tail_since(0);
445 assert_eq!(all.len(), 100, "only the last 100 lines should remain");
446 assert_eq!(all[0].ts_ns, 150);
447 assert_eq!(all.last().unwrap().ts_ns, 249);
448 }
449
450 #[test]
451 fn recent_count_at_or_above_filters_correctly() {
452 let pipe = LogPipeline::new("logs", "ts", "1h").unwrap();
453 pipe.ingest_one(line_at(1, LogSeverity::Debug, "d"));
454 pipe.ingest_one(line_at(2, LogSeverity::Info, "i"));
455 pipe.ingest_one(line_at(3, LogSeverity::Warn, "w"));
456 pipe.ingest_one(line_at(4, LogSeverity::Error, "e"));
457 pipe.ingest_one(line_at(5, LogSeverity::Fatal, "f"));
458 assert_eq!(pipe.recent_count_at_or_above(LogSeverity::Warn), 3);
459 assert_eq!(pipe.recent_count_at_or_above(LogSeverity::Error), 2);
460 }
461
462 #[test]
463 fn retention_backend_drops_expired_chunks() {
464 let pipe = LogPipeline::new("logs", "ts", "1h").unwrap();
465 for t in [0, 3_600_000_000_000, 7_200_000_000_000] {
467 pipe.ingest_one(line_at(t, LogSeverity::Info, "m"));
468 }
469 assert_eq!(pipe.hypertables().show_chunks("logs").len(), 3);
470
471 pipe.set_retention(3_600); let backend = pipe.retention_backend();
473 let dropped = backend.drop_chunks_older_than("logs", 3_600_000_000_000);
475 assert_eq!(dropped, 2);
476 assert_eq!(pipe.hypertables().show_chunks("logs").len(), 1);
477 }
478
479 #[test]
480 fn log_line_builder_composes_labels_and_fields() {
481 let line = LogLine::now(LogSeverity::Error, "api", "boom")
482 .with_label("region", "us-east-1")
483 .with_field("latency_ms", 230.0)
484 .with_trace("trace-42", "span-7");
485 assert_eq!(line.service, "api");
486 assert_eq!(line.severity, LogSeverity::Error);
487 assert_eq!(line.labels.get("region").unwrap(), "us-east-1");
488 assert_eq!(line.numeric_fields.get("latency_ms").unwrap(), &230.0);
489 assert_eq!(line.trace_id.as_deref(), Some("trace-42"));
490 }
491
492 #[test]
493 fn pipeline_without_valid_interval_returns_none() {
494 assert!(LogPipeline::new("x", "ts", "raw").is_none());
495 assert!(LogPipeline::new("x", "ts", "bogus").is_none());
496 }
497
498 #[test]
499 fn partition_ttl_sweep_drops_expired_chunks() {
500 let pipe = LogPipeline::new("logs", "ts", "1h").unwrap();
501 assert!(pipe.set_partition_ttl("2h"));
502 const HOUR: u64 = 3_600_000_000_000;
503 for t in [0, HOUR, 2 * HOUR] {
506 pipe.ingest_one(line_at(t, LogSeverity::Info, "m"));
507 }
508 let dropped = pipe.sweep_expired_chunks(3 * HOUR + 1);
512 assert_eq!(dropped.len(), 2);
513 assert_eq!(pipe.hypertables().show_chunks("logs").len(), 1);
514 }
515
516 #[test]
517 fn partition_ttl_rejects_invalid_duration() {
518 let pipe = LogPipeline::new("logs", "ts", "1h").unwrap();
519 assert!(!pipe.set_partition_ttl("raw"));
520 assert!(!pipe.set_partition_ttl("nonsense"));
521 assert!(pipe.set_partition_ttl("1d"));
523 }
524}