Skip to main content

nodedb_types/timeseries/
ingest.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Ingest types, time range, and symbol dictionary.
4
5use std::collections::HashMap;
6
7use serde::{Deserialize, Serialize};
8
9/// A single metric sample (timestamp + scalar value).
10#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
11pub struct MetricSample {
12    pub timestamp_ms: i64,
13    pub value: f64,
14}
15
16/// A single log entry (timestamp + arbitrary bytes).
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct LogEntry {
19    pub timestamp_ms: i64,
20    pub data: Vec<u8>,
21}
22
23/// Result of an ingest operation.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25#[non_exhaustive]
26pub enum IngestResult {
27    /// Write accepted, memtable healthy.
28    Ok,
29    /// Write accepted, but memtable should be flushed (memory pressure).
30    FlushNeeded,
31    /// Write rejected — memory budget exhausted and cannot evict further.
32    Rejected,
33}
34
35impl IngestResult {
36    pub fn is_flush_needed(&self) -> bool {
37        matches!(self, Self::FlushNeeded)
38    }
39
40    pub fn is_rejected(&self) -> bool {
41        matches!(self, Self::Rejected)
42    }
43}
44
45/// Time range for queries (inclusive on both ends).
46#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
47pub struct TimeRange {
48    pub start_ms: i64,
49    pub end_ms: i64,
50}
51
52impl TimeRange {
53    pub fn new(start_ms: i64, end_ms: i64) -> Self {
54        Self { start_ms, end_ms }
55    }
56
57    pub fn contains(&self, ts: i64) -> bool {
58        ts >= self.start_ms && ts <= self.end_ms
59    }
60
61    /// Whether two ranges overlap.
62    pub fn overlaps(&self, other: &TimeRange) -> bool {
63        self.start_ms <= other.end_ms && other.start_ms <= self.end_ms
64    }
65}
66
67/// Bidirectional symbol dictionary for tag value interning.
68///
69/// Tag columns store 4-byte u32 IDs instead of full strings. Shared by
70/// Origin columnar segments, Lite native segments, and WASM segments.
71#[derive(Debug, Clone, Default, Serialize, Deserialize)]
72pub struct SymbolDictionary {
73    /// String → symbol ID.
74    forward: HashMap<String, u32>,
75    /// Symbol ID → string (index = id).
76    reverse: Vec<String>,
77}
78
79impl SymbolDictionary {
80    pub fn new() -> Self {
81        Self::default()
82    }
83
84    /// Resolve a string to its symbol ID, inserting if new.
85    ///
86    /// Returns `None` if the dictionary has reached `max_cardinality`.
87    pub fn resolve(&mut self, value: &str, max_cardinality: u32) -> Option<u32> {
88        if let Some(&id) = self.forward.get(value) {
89            return Some(id);
90        }
91        if self.reverse.len() as u32 >= max_cardinality {
92            return None;
93        }
94        let id = self.reverse.len() as u32;
95        self.forward.insert(value.to_string(), id);
96        self.reverse.push(value.to_string());
97        Some(id)
98    }
99
100    /// Look up a string by symbol ID.
101    pub fn get(&self, id: u32) -> Option<&str> {
102        self.reverse.get(id as usize).map(|s| s.as_str())
103    }
104
105    /// Look up a symbol ID by string.
106    pub fn get_id(&self, value: &str) -> Option<u32> {
107        self.forward.get(value).copied()
108    }
109
110    /// Number of symbols.
111    pub fn len(&self) -> usize {
112        self.reverse.len()
113    }
114
115    pub fn is_empty(&self) -> bool {
116        self.reverse.is_empty()
117    }
118
119    /// Merge another dictionary into this one.
120    ///
121    /// Returns a remap table: `old_id → new_id` for the source dictionary.
122    pub fn merge(&mut self, other: &SymbolDictionary, max_cardinality: u32) -> Vec<u32> {
123        let mut remap = Vec::with_capacity(other.reverse.len());
124        for symbol in &other.reverse {
125            match self.resolve(symbol, max_cardinality) {
126                Some(new_id) => remap.push(new_id),
127                None => remap.push(u32::MAX),
128            }
129        }
130        remap
131    }
132}