Skip to main content

nodedb_types/timeseries/
partition.rs

1//! Partition types: metadata, state, interval, and flushed data.
2
3use std::collections::HashMap;
4
5use serde::{Deserialize, Serialize};
6
7use super::ingest::{LogEntry, TimeRange};
8
9/// Error parsing a partition interval string (e.g., "1h", "3d").
10#[derive(Debug, thiserror::Error)]
11pub enum IntervalParseError {
12    #[error("invalid interval format: '{input}' — expected format like '1h', '3d', '1w'")]
13    InvalidFormat { input: String },
14
15    #[error("invalid number in interval: '{input}'")]
16    InvalidNumber { input: String },
17
18    #[error("partition interval must be > 0")]
19    ZeroInterval,
20
21    #[error("unknown unit '{unit}': expected s, m, h, d, w, M, y")]
22    UnknownUnit { unit: String },
23
24    #[error("unsupported calendar interval '{input}': {hint}")]
25    UnsupportedCalendar { input: String, hint: &'static str },
26}
27
28/// Lifecycle state of a partition in the partition manifest.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
30pub enum PartitionState {
31    /// Actively receiving writes.
32    Active,
33    /// Immutable, compactable, archivable.
34    Sealed,
35    /// Being merged into a larger partition (transient).
36    Merging,
37    /// Result of a merge operation.
38    Merged,
39    /// Marked for deletion (sources of a completed merge).
40    Deleted,
41    /// Uploaded to S3/cold storage.
42    Archived,
43}
44
45/// Metadata for a single time partition.
46///
47/// Stored in the partition manifest (redb). Shared between Origin and Lite.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct PartitionMeta {
50    /// Inclusive lower bound of timestamps in this partition.
51    pub min_ts: i64,
52    /// Inclusive upper bound of timestamps in this partition.
53    pub max_ts: i64,
54    /// Number of rows.
55    pub row_count: u64,
56    /// On-disk size in bytes (all column files combined).
57    pub size_bytes: u64,
58    /// Schema version — incremented on column add/drop/rename.
59    pub schema_version: u32,
60    /// Current lifecycle state.
61    pub state: PartitionState,
62    /// The partition interval duration in milliseconds that produced this partition.
63    pub interval_ms: u64,
64    /// WAL LSN at last successful flush to this partition.
65    pub last_flushed_wal_lsn: u64,
66    /// Per-column statistics (codec, min/max/sum/count/cardinality).
67    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
68    pub column_stats: HashMap<String, nodedb_codec::ColumnStatistics>,
69}
70
71impl PartitionMeta {
72    /// Whether this partition's time range overlaps a query range.
73    pub fn overlaps(&self, range: &TimeRange) -> bool {
74        self.min_ts <= range.end_ms && range.start_ms <= self.max_ts
75    }
76
77    /// Whether this partition is queryable (Active, Sealed, or Merged).
78    pub fn is_queryable(&self) -> bool {
79        matches!(
80            self.state,
81            PartitionState::Active | PartitionState::Sealed | PartitionState::Merged
82        )
83    }
84}
85
86/// Partition interval — how wide each time partition is.
87#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
88pub enum PartitionInterval {
89    /// Duration in milliseconds (e.g., 3600000 for 1h, 86400000 for 1d).
90    Duration(u64),
91    /// Calendar month (variable length).
92    Month,
93    /// Calendar year (variable length).
94    Year,
95    /// Single partition, split only when size threshold is reached.
96    Unbounded,
97    /// Engine chooses adaptively based on ingest rate and partition row count.
98    Auto,
99}
100
101impl PartitionInterval {
102    /// Parse a duration string like "1h", "3d", "1w", "1M", "1y", "AUTO", "UNBOUNDED".
103    pub fn parse(s: &str) -> Result<Self, IntervalParseError> {
104        let s = s.trim();
105        match s.to_uppercase().as_str() {
106            "AUTO" => return Ok(Self::Auto),
107            "UNBOUNDED" | "NONE" => return Ok(Self::Unbounded),
108            _ => {}
109        }
110
111        if s.ends_with('M') && s.len() > 1 && s[..s.len() - 1].chars().all(|c| c.is_ascii_digit()) {
112            let n: u64 = s[..s.len() - 1]
113                .parse()
114                .map_err(|_| IntervalParseError::InvalidNumber { input: s.into() })?;
115            if n != 1 {
116                return Err(IntervalParseError::UnsupportedCalendar {
117                    input: s.into(),
118                    hint: "only '1M' (one calendar month) is supported",
119                });
120            }
121            return Ok(Self::Month);
122        }
123
124        if s.ends_with('y') && s.len() > 1 && s[..s.len() - 1].chars().all(|c| c.is_ascii_digit()) {
125            let n: u64 = s[..s.len() - 1]
126                .parse()
127                .map_err(|_| IntervalParseError::InvalidNumber { input: s.into() })?;
128            if n != 1 {
129                return Err(IntervalParseError::UnsupportedCalendar {
130                    input: s.into(),
131                    hint: "only '1y' (one calendar year) is supported",
132                });
133            }
134            return Ok(Self::Year);
135        }
136
137        let (num_str, unit) = if s.len() > 1 && s.as_bytes()[s.len() - 1].is_ascii_alphabetic() {
138            (&s[..s.len() - 1], &s[s.len() - 1..])
139        } else {
140            return Err(IntervalParseError::InvalidFormat { input: s.into() });
141        };
142
143        let n: u64 = num_str
144            .parse()
145            .map_err(|_| IntervalParseError::InvalidNumber { input: s.into() })?;
146        if n == 0 {
147            return Err(IntervalParseError::ZeroInterval);
148        }
149
150        let ms = match unit {
151            "s" => n * 1_000,
152            "m" => n * 60_000,
153            "h" => n * 3_600_000,
154            "d" => n * 86_400_000,
155            "w" => n * 604_800_000,
156            _ => {
157                return Err(IntervalParseError::UnknownUnit { unit: unit.into() });
158            }
159        };
160
161        Ok(Self::Duration(ms))
162    }
163
164    /// Duration in milliseconds, if fixed-duration.
165    pub fn as_millis(&self) -> Option<u64> {
166        match self {
167            Self::Duration(ms) => Some(*ms),
168            _ => None,
169        }
170    }
171}
172
173impl std::fmt::Display for PartitionInterval {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        match self {
176            Self::Duration(ms) => {
177                if *ms % 604_800_000 == 0 {
178                    write!(f, "{}w", ms / 604_800_000)
179                } else if *ms % 86_400_000 == 0 {
180                    write!(f, "{}d", ms / 86_400_000)
181                } else if *ms % 3_600_000 == 0 {
182                    write!(f, "{}h", ms / 3_600_000)
183                } else if *ms % 60_000 == 0 {
184                    write!(f, "{}m", ms / 60_000)
185                } else {
186                    write!(f, "{}s", ms / 1_000)
187                }
188            }
189            Self::Month => write!(f, "1M"),
190            Self::Year => write!(f, "1y"),
191            Self::Unbounded => write!(f, "UNBOUNDED"),
192            Self::Auto => write!(f, "AUTO"),
193        }
194    }
195}
196
197/// Data from a single series after memtable drain.
198#[derive(Debug)]
199pub struct FlushedSeries {
200    pub series_id: super::series::SeriesId,
201    pub kind: FlushedKind,
202    pub min_ts: i64,
203    pub max_ts: i64,
204}
205
206/// Type-specific flushed data.
207#[derive(Debug)]
208pub enum FlushedKind {
209    Metric {
210        gorilla_block: Vec<u8>,
211        sample_count: u64,
212    },
213    Log {
214        entries: Vec<LogEntry>,
215        total_bytes: usize,
216    },
217}
218
219/// Segment file reference for the L1/L2 index.
220#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct SegmentRef {
222    pub path: String,
223    pub min_ts: i64,
224    pub max_ts: i64,
225    pub kind: SegmentKind,
226    pub size_bytes: u64,
227    pub created_at_ms: i64,
228}
229
230/// Whether a segment contains metrics or logs.
231#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
232pub enum SegmentKind {
233    Metric,
234    Log,
235}