Skip to main content

nodedb_types/timeseries/
ingest.rs

1//! Ingest types, time range, and symbol dictionary.
2
3use std::collections::HashMap;
4
5use serde::{Deserialize, Serialize};
6
7/// A single metric sample (timestamp + scalar value).
8#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
9pub struct MetricSample {
10    pub timestamp_ms: i64,
11    pub value: f64,
12}
13
14/// A single log entry (timestamp + arbitrary bytes).
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct LogEntry {
17    pub timestamp_ms: i64,
18    pub data: Vec<u8>,
19}
20
21/// Result of an ingest operation.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum IngestResult {
24    /// Write accepted, memtable healthy.
25    Ok,
26    /// Write accepted, but memtable should be flushed (memory pressure).
27    FlushNeeded,
28    /// Write rejected — memory budget exhausted and cannot evict further.
29    Rejected,
30}
31
32impl IngestResult {
33    pub fn is_flush_needed(&self) -> bool {
34        matches!(self, Self::FlushNeeded)
35    }
36
37    pub fn is_rejected(&self) -> bool {
38        matches!(self, Self::Rejected)
39    }
40}
41
42/// Time range for queries (inclusive on both ends).
43#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
44pub struct TimeRange {
45    pub start_ms: i64,
46    pub end_ms: i64,
47}
48
49impl TimeRange {
50    pub fn new(start_ms: i64, end_ms: i64) -> Self {
51        Self { start_ms, end_ms }
52    }
53
54    pub fn contains(&self, ts: i64) -> bool {
55        ts >= self.start_ms && ts <= self.end_ms
56    }
57
58    /// Whether two ranges overlap.
59    pub fn overlaps(&self, other: &TimeRange) -> bool {
60        self.start_ms <= other.end_ms && other.start_ms <= self.end_ms
61    }
62}
63
64/// Bidirectional symbol dictionary for tag value interning.
65///
66/// Tag columns store 4-byte u32 IDs instead of full strings. Shared by
67/// Origin columnar segments, Lite native segments, and WASM segments.
68#[derive(Debug, Clone, Default, Serialize, Deserialize)]
69pub struct SymbolDictionary {
70    /// String → symbol ID.
71    forward: HashMap<String, u32>,
72    /// Symbol ID → string (index = id).
73    reverse: Vec<String>,
74}
75
76impl SymbolDictionary {
77    pub fn new() -> Self {
78        Self::default()
79    }
80
81    /// Resolve a string to its symbol ID, inserting if new.
82    ///
83    /// Returns `None` if the dictionary has reached `max_cardinality`.
84    pub fn resolve(&mut self, value: &str, max_cardinality: u32) -> Option<u32> {
85        if let Some(&id) = self.forward.get(value) {
86            return Some(id);
87        }
88        if self.reverse.len() as u32 >= max_cardinality {
89            return None;
90        }
91        let id = self.reverse.len() as u32;
92        self.forward.insert(value.to_string(), id);
93        self.reverse.push(value.to_string());
94        Some(id)
95    }
96
97    /// Look up a string by symbol ID.
98    pub fn get(&self, id: u32) -> Option<&str> {
99        self.reverse.get(id as usize).map(|s| s.as_str())
100    }
101
102    /// Look up a symbol ID by string.
103    pub fn get_id(&self, value: &str) -> Option<u32> {
104        self.forward.get(value).copied()
105    }
106
107    /// Number of symbols.
108    pub fn len(&self) -> usize {
109        self.reverse.len()
110    }
111
112    pub fn is_empty(&self) -> bool {
113        self.reverse.is_empty()
114    }
115
116    /// Merge another dictionary into this one.
117    ///
118    /// Returns a remap table: `old_id → new_id` for the source dictionary.
119    pub fn merge(&mut self, other: &SymbolDictionary, max_cardinality: u32) -> Vec<u32> {
120        let mut remap = Vec::with_capacity(other.reverse.len());
121        for symbol in &other.reverse {
122            match self.resolve(symbol, max_cardinality) {
123                Some(new_id) => remap.push(new_id),
124                None => remap.push(u32::MAX),
125            }
126        }
127        remap
128    }
129}