Skip to main content

nodedb_types/timeseries/
partition.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Partition types: metadata, state, interval, and flushed data.
4
5use std::collections::HashMap;
6
7use serde::{Deserialize, Serialize};
8use zerompk::{FromMessagePack, ToMessagePack};
9
10use super::ingest::{LogEntry, TimeRange};
11
12/// Error parsing a partition interval string (e.g., "1h", "3d").
13#[derive(Debug, thiserror::Error)]
14#[non_exhaustive]
15pub enum IntervalParseError {
16    #[error("invalid interval format: '{input}' — expected format like '1h', '3d', '1w'")]
17    InvalidFormat { input: String },
18
19    #[error("invalid number in interval: '{input}'")]
20    InvalidNumber { input: String },
21
22    #[error("partition interval must be > 0")]
23    ZeroInterval,
24
25    #[error("unknown unit '{unit}': expected s, m, h, d, w, M, y")]
26    UnknownUnit { unit: String },
27
28    #[error("unsupported calendar interval '{input}': {hint}")]
29    UnsupportedCalendar { input: String, hint: &'static str },
30}
31
32/// Lifecycle state of a partition in the partition manifest.
33#[derive(
34    Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToMessagePack, FromMessagePack,
35)]
36#[non_exhaustive]
37pub enum PartitionState {
38    /// Actively receiving writes.
39    Active,
40    /// Immutable, compactable, archivable.
41    Sealed,
42    /// Being merged into a larger partition (transient).
43    Merging,
44    /// Result of a merge operation.
45    Merged,
46    /// Marked for deletion (sources of a completed merge).
47    Deleted,
48    /// Uploaded to S3/cold storage.
49    Archived,
50}
51
52/// Metadata for a single time partition.
53///
54/// Stored in the partition manifest (redb). Shared between Origin and Lite.
55#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
56pub struct PartitionMeta {
57    /// Inclusive lower bound of timestamps in this partition.
58    pub min_ts: i64,
59    /// Inclusive upper bound of timestamps in this partition.
60    pub max_ts: i64,
61    /// Number of rows.
62    pub row_count: u64,
63    /// On-disk size in bytes (all column files combined).
64    pub size_bytes: u64,
65    /// Schema version — incremented on column add/drop/rename.
66    pub schema_version: u32,
67    /// Current lifecycle state.
68    pub state: PartitionState,
69    /// The partition interval duration in milliseconds that produced this partition.
70    pub interval_ms: u64,
71    /// WAL LSN at last successful flush to this partition.
72    pub last_flushed_wal_lsn: u64,
73    /// Per-column statistics (codec, min/max/sum/count/cardinality).
74    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
75    pub column_stats: HashMap<String, nodedb_codec::ColumnStatistics>,
76    /// Maximum `_ts_system` value across rows in this partition
77    /// (bitemporal only; 0 for non-bitemporal partitions). Retention on
78    /// bitemporal collections uses this instead of `max_ts` so that
79    /// late-arriving backfill survives an event-time-based TTL.
80    #[serde(default)]
81    pub max_system_ts: i64,
82}
83
84impl PartitionMeta {
85    /// Whether this partition's time range overlaps a query range.
86    pub fn overlaps(&self, range: &TimeRange) -> bool {
87        self.min_ts <= range.end_ms && range.start_ms <= self.max_ts
88    }
89
90    /// Whether this partition is queryable (Active, Sealed, or Merged).
91    pub fn is_queryable(&self) -> bool {
92        matches!(
93            self.state,
94            PartitionState::Active | PartitionState::Sealed | PartitionState::Merged
95        )
96    }
97}
98
99/// Partition interval — how wide each time partition is.
100#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
101#[non_exhaustive]
102pub enum PartitionInterval {
103    /// Duration in milliseconds (e.g., 3600000 for 1h, 86400000 for 1d).
104    Duration(u64),
105    /// Calendar month (variable length).
106    Month,
107    /// Calendar year (variable length).
108    Year,
109    /// Single partition, split only when size threshold is reached.
110    Unbounded,
111    /// Engine chooses adaptively based on ingest rate and partition row count.
112    Auto,
113}
114
115impl PartitionInterval {
116    /// Parse a duration string like "1h", "3d", "1w", "1M", "1y", "AUTO", "UNBOUNDED".
117    pub fn parse(s: &str) -> Result<Self, IntervalParseError> {
118        let s = s.trim();
119        match s.to_uppercase().as_str() {
120            "AUTO" => return Ok(Self::Auto),
121            "UNBOUNDED" | "NONE" => return Ok(Self::Unbounded),
122            _ => {}
123        }
124
125        if s.ends_with('M') && s.len() > 1 && s[..s.len() - 1].chars().all(|c| c.is_ascii_digit()) {
126            let n: u64 = s[..s.len() - 1]
127                .parse()
128                .map_err(|_| IntervalParseError::InvalidNumber { input: s.into() })?;
129            if n != 1 {
130                return Err(IntervalParseError::UnsupportedCalendar {
131                    input: s.into(),
132                    hint: "only '1M' (one calendar month) is supported",
133                });
134            }
135            return Ok(Self::Month);
136        }
137
138        if s.ends_with('y') && s.len() > 1 && s[..s.len() - 1].chars().all(|c| c.is_ascii_digit()) {
139            let n: u64 = s[..s.len() - 1]
140                .parse()
141                .map_err(|_| IntervalParseError::InvalidNumber { input: s.into() })?;
142            if n != 1 {
143                return Err(IntervalParseError::UnsupportedCalendar {
144                    input: s.into(),
145                    hint: "only '1y' (one calendar year) is supported",
146                });
147            }
148            return Ok(Self::Year);
149        }
150
151        let (num_str, unit) = if s.len() > 1 && s.as_bytes()[s.len() - 1].is_ascii_alphabetic() {
152            (&s[..s.len() - 1], &s[s.len() - 1..])
153        } else {
154            return Err(IntervalParseError::InvalidFormat { input: s.into() });
155        };
156
157        let n: u64 = num_str
158            .parse()
159            .map_err(|_| IntervalParseError::InvalidNumber { input: s.into() })?;
160        if n == 0 {
161            return Err(IntervalParseError::ZeroInterval);
162        }
163
164        let ms = match unit {
165            "s" => n * 1_000,
166            "m" => n * 60_000,
167            "h" => n * 3_600_000,
168            "d" => n * 86_400_000,
169            "w" => n * 604_800_000,
170            _ => {
171                return Err(IntervalParseError::UnknownUnit { unit: unit.into() });
172            }
173        };
174
175        Ok(Self::Duration(ms))
176    }
177
178    /// Duration in milliseconds, if fixed-duration.
179    pub fn as_millis(&self) -> Option<u64> {
180        match self {
181            Self::Duration(ms) => Some(*ms),
182            _ => None,
183        }
184    }
185}
186
187impl std::fmt::Display for PartitionInterval {
188    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189        match self {
190            Self::Duration(ms) => {
191                if *ms % 604_800_000 == 0 {
192                    write!(f, "{}w", ms / 604_800_000)
193                } else if *ms % 86_400_000 == 0 {
194                    write!(f, "{}d", ms / 86_400_000)
195                } else if *ms % 3_600_000 == 0 {
196                    write!(f, "{}h", ms / 3_600_000)
197                } else if *ms % 60_000 == 0 {
198                    write!(f, "{}m", ms / 60_000)
199                } else {
200                    write!(f, "{}s", ms / 1_000)
201                }
202            }
203            Self::Month => write!(f, "1M"),
204            Self::Year => write!(f, "1y"),
205            Self::Unbounded => write!(f, "UNBOUNDED"),
206            Self::Auto => write!(f, "AUTO"),
207        }
208    }
209}
210
211/// Data from a single series after memtable drain.
212#[derive(Debug)]
213pub struct FlushedSeries {
214    pub series_id: super::series::SeriesId,
215    pub kind: FlushedKind,
216    pub min_ts: i64,
217    pub max_ts: i64,
218}
219
220/// Type-specific flushed data.
221#[derive(Debug)]
222#[non_exhaustive]
223pub enum FlushedKind {
224    Metric {
225        gorilla_block: Vec<u8>,
226        sample_count: u64,
227    },
228    Log {
229        entries: Vec<LogEntry>,
230        total_bytes: usize,
231    },
232}
233
234/// Segment file reference for the L1/L2 index.
235#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct SegmentRef {
237    pub path: String,
238    pub min_ts: i64,
239    pub max_ts: i64,
240    pub kind: SegmentKind,
241    pub size_bytes: u64,
242    pub created_at_ms: i64,
243}
244
245/// Whether a segment contains metrics or logs.
246#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
247#[non_exhaustive]
248pub enum SegmentKind {
249    Metric,
250    Log,
251}