1use std::collections::HashMap;
6
7use serde::{Deserialize, Serialize};
8use zerompk::{FromMessagePack, ToMessagePack};
9
10use super::ingest::{LogEntry, TimeRange};
11
12#[derive(Debug, thiserror::Error)]
14#[non_exhaustive]
15pub enum IntervalParseError {
16 #[error("invalid interval format: '{input}' — expected format like '1h', '3d', '1w'")]
17 InvalidFormat { input: String },
18
19 #[error("invalid number in interval: '{input}'")]
20 InvalidNumber { input: String },
21
22 #[error("partition interval must be > 0")]
23 ZeroInterval,
24
25 #[error("unknown unit '{unit}': expected s, m, h, d, w, M, y")]
26 UnknownUnit { unit: String },
27
28 #[error("unsupported calendar interval '{input}': {hint}")]
29 UnsupportedCalendar { input: String, hint: &'static str },
30}
31
32#[derive(
34 Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToMessagePack, FromMessagePack,
35)]
36#[non_exhaustive]
37pub enum PartitionState {
38 Active,
40 Sealed,
42 Merging,
44 Merged,
46 Deleted,
48 Archived,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
56pub struct PartitionMeta {
57 pub min_ts: i64,
59 pub max_ts: i64,
61 pub row_count: u64,
63 pub size_bytes: u64,
65 pub schema_version: u32,
67 pub state: PartitionState,
69 pub interval_ms: u64,
71 pub last_flushed_wal_lsn: u64,
73 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
75 pub column_stats: HashMap<String, nodedb_codec::ColumnStatistics>,
76 #[serde(default)]
81 pub max_system_ts: i64,
82}
83
84impl PartitionMeta {
85 pub fn overlaps(&self, range: &TimeRange) -> bool {
87 self.min_ts <= range.end_ms && range.start_ms <= self.max_ts
88 }
89
90 pub fn is_queryable(&self) -> bool {
92 matches!(
93 self.state,
94 PartitionState::Active | PartitionState::Sealed | PartitionState::Merged
95 )
96 }
97}
98
99#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
101#[non_exhaustive]
102pub enum PartitionInterval {
103 Duration(u64),
105 Month,
107 Year,
109 Unbounded,
111 Auto,
113}
114
115impl PartitionInterval {
116 pub fn parse(s: &str) -> Result<Self, IntervalParseError> {
118 let s = s.trim();
119 match s.to_uppercase().as_str() {
120 "AUTO" => return Ok(Self::Auto),
121 "UNBOUNDED" | "NONE" => return Ok(Self::Unbounded),
122 _ => {}
123 }
124
125 if s.ends_with('M') && s.len() > 1 && s[..s.len() - 1].chars().all(|c| c.is_ascii_digit()) {
126 let n: u64 = s[..s.len() - 1]
127 .parse()
128 .map_err(|_| IntervalParseError::InvalidNumber { input: s.into() })?;
129 if n != 1 {
130 return Err(IntervalParseError::UnsupportedCalendar {
131 input: s.into(),
132 hint: "only '1M' (one calendar month) is supported",
133 });
134 }
135 return Ok(Self::Month);
136 }
137
138 if s.ends_with('y') && s.len() > 1 && s[..s.len() - 1].chars().all(|c| c.is_ascii_digit()) {
139 let n: u64 = s[..s.len() - 1]
140 .parse()
141 .map_err(|_| IntervalParseError::InvalidNumber { input: s.into() })?;
142 if n != 1 {
143 return Err(IntervalParseError::UnsupportedCalendar {
144 input: s.into(),
145 hint: "only '1y' (one calendar year) is supported",
146 });
147 }
148 return Ok(Self::Year);
149 }
150
151 let (num_str, unit) = if s.len() > 1 && s.as_bytes()[s.len() - 1].is_ascii_alphabetic() {
152 (&s[..s.len() - 1], &s[s.len() - 1..])
153 } else {
154 return Err(IntervalParseError::InvalidFormat { input: s.into() });
155 };
156
157 let n: u64 = num_str
158 .parse()
159 .map_err(|_| IntervalParseError::InvalidNumber { input: s.into() })?;
160 if n == 0 {
161 return Err(IntervalParseError::ZeroInterval);
162 }
163
164 let ms = match unit {
165 "s" => n * 1_000,
166 "m" => n * 60_000,
167 "h" => n * 3_600_000,
168 "d" => n * 86_400_000,
169 "w" => n * 604_800_000,
170 _ => {
171 return Err(IntervalParseError::UnknownUnit { unit: unit.into() });
172 }
173 };
174
175 Ok(Self::Duration(ms))
176 }
177
178 pub fn as_millis(&self) -> Option<u64> {
180 match self {
181 Self::Duration(ms) => Some(*ms),
182 _ => None,
183 }
184 }
185}
186
187impl std::fmt::Display for PartitionInterval {
188 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189 match self {
190 Self::Duration(ms) => {
191 if *ms % 604_800_000 == 0 {
192 write!(f, "{}w", ms / 604_800_000)
193 } else if *ms % 86_400_000 == 0 {
194 write!(f, "{}d", ms / 86_400_000)
195 } else if *ms % 3_600_000 == 0 {
196 write!(f, "{}h", ms / 3_600_000)
197 } else if *ms % 60_000 == 0 {
198 write!(f, "{}m", ms / 60_000)
199 } else {
200 write!(f, "{}s", ms / 1_000)
201 }
202 }
203 Self::Month => write!(f, "1M"),
204 Self::Year => write!(f, "1y"),
205 Self::Unbounded => write!(f, "UNBOUNDED"),
206 Self::Auto => write!(f, "AUTO"),
207 }
208 }
209}
210
211#[derive(Debug)]
213pub struct FlushedSeries {
214 pub series_id: super::series::SeriesId,
215 pub kind: FlushedKind,
216 pub min_ts: i64,
217 pub max_ts: i64,
218}
219
220#[derive(Debug)]
222#[non_exhaustive]
223pub enum FlushedKind {
224 Metric {
225 gorilla_block: Vec<u8>,
226 sample_count: u64,
227 },
228 Log {
229 entries: Vec<LogEntry>,
230 total_bytes: usize,
231 },
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct SegmentRef {
237 pub path: String,
238 pub min_ts: i64,
239 pub max_ts: i64,
240 pub kind: SegmentKind,
241 pub size_bytes: u64,
242 pub created_at_ms: i64,
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
247#[non_exhaustive]
248pub enum SegmentKind {
249 Metric,
250 Log,
251}