1pub mod actions;
66pub mod log_store;
67pub mod segments;
68pub mod table_state;
69
70pub use crate::metadata::table_metadata::{
71 TableKind, TableMeta, TableMetaDelta, TimeBucket, TimeIndexSpec,
72};
73pub use actions::{Commit, LogAction};
74pub use log_store::TransactionLogStore;
75pub use segments::{FileFormat, SegmentId, SegmentMeta};
76pub use table_state::TableState;
77
78use snafu::{Backtrace, prelude::*};
79
80use crate::storage::StorageError;
81
82#[derive(Debug, Snafu)]
84pub enum CommitError {
85 #[snafu(display("Commit conflict: expected version {expected}, but CURRENT is {found}"))]
87 Conflict {
88 expected: u64,
90 found: u64,
92 backtrace: Backtrace,
94 },
95
96 #[snafu(display("Storage error while accessing commit log: {source}"))]
100 Storage {
101 #[snafu(backtrace)]
103 source: StorageError,
104 },
105
106 #[snafu(display("Corrupt log state: {msg}"))]
108 CorruptState {
109 msg: String,
111 backtrace: Backtrace,
113 },
114}
115
116#[cfg(test)]
117mod tests {
118 use crate::metadata::logical_schema::{
119 LogicalDataType, LogicalField, LogicalSchema, LogicalSchemaError, LogicalTimestampUnit,
120 };
121 use crate::transaction_log::*;
122
123 use chrono::{DateTime, TimeZone, Utc};
124 use serde_json;
125
126 fn utc_datetime(
129 year: i32,
130 month: u32,
131 day: u32,
132 hour: u32,
133 minute: u32,
134 second: u32,
135 ) -> DateTime<Utc> {
136 Utc.with_ymd_and_hms(year, month, day, hour, minute, second)
137 .single()
138 .expect("valid UTC timestamp")
139 }
140
141 #[test]
142 fn commit_json_roundtrip() {
143 let ts0 = utc_datetime(2025, 1, 1, 0, 0, 0);
144 let ts1 = utc_datetime(2025, 1, 1, 1, 0, 0);
145
146 let time_index = TimeIndexSpec {
147 timestamp_column: "ts".to_string(),
148 entity_columns: vec!["symbol".to_string()],
149 bucket: TimeBucket::Minutes(60),
150 timezone: Some("UTC".to_string()),
151 };
152
153 let table_meta = TableMeta {
154 kind: TableKind::TimeSeries(time_index),
155 logical_schema: Some(
156 LogicalSchema::new(vec![
157 LogicalField {
158 name: "ts".to_string(),
159 data_type: LogicalDataType::Timestamp {
160 unit: LogicalTimestampUnit::Micros,
161 timezone: None,
162 },
163 nullable: false,
164 },
165 LogicalField {
166 name: "symbol".to_string(),
167 data_type: LogicalDataType::Utf8,
168 nullable: false,
169 },
170 ])
171 .expect("valid logical schema"),
172 ),
173 created_at: ts0,
174 format_version: 1,
175 entity_identity: None,
176 };
177
178 let seg_meta = SegmentMeta {
179 segment_id: SegmentId("seg-0001".to_string()),
180 path: "data/nvda_1h_0001.parquet".to_string(),
181 format: FileFormat::Parquet,
182 ts_min: ts0,
183 ts_max: ts1,
184 row_count: 1024,
185 file_size: None,
186 coverage_path: None,
187 };
188
189 let commit = Commit {
190 version: 1,
191 base_version: 0,
192 timestamp: ts1,
193 actions: vec![
194 LogAction::UpdateTableMeta(table_meta),
195 LogAction::AddSegment(seg_meta),
196 ],
197 };
198
199 let json = serde_json::to_string_pretty(&commit).expect("serialize commit");
201 let decoded: Commit = serde_json::from_str(&json).expect("deserialize commit");
205
206 assert_eq!(commit, decoded);
208 }
209
210 #[test]
211 fn logical_schema_rejects_duplicate_columns() {
212 let dup = LogicalSchema::new(vec![
213 LogicalField {
214 name: "ts".to_string(),
215 data_type: LogicalDataType::Timestamp {
216 unit: LogicalTimestampUnit::Micros,
217 timezone: None,
218 },
219 nullable: false,
220 },
221 LogicalField {
222 name: "ts".to_string(),
223 data_type: LogicalDataType::Timestamp {
224 unit: LogicalTimestampUnit::Micros,
225 timezone: None,
226 },
227 nullable: false,
228 },
229 ]);
230
231 let err = dup.expect_err("duplicate columns should be rejected");
232 assert!(matches!(err, LogicalSchemaError::DuplicateColumn { column } if column == "ts"));
233 }
234
235 #[test]
236 fn time_index_spec_defaults() {
237 let json = r#"{
239 "timestamp_column": "ts",
240 "bucket": { "Hours": 1 }
241 }"#;
242
243 let spec: TimeIndexSpec = serde_json::from_str(json).expect("deserialize");
244
245 assert_eq!(spec.timestamp_column, "ts");
246 assert_eq!(spec.entity_columns, Vec::<String>::new()); assert_eq!(spec.bucket, TimeBucket::Hours(1));
248 assert_eq!(spec.timezone, None); }
250
251 #[test]
252 fn time_index_spec_skips_none_timezone_on_serialize() {
253 let spec = TimeIndexSpec {
254 timestamp_column: "ts".to_string(),
255 entity_columns: vec![],
256 bucket: TimeBucket::Seconds(30),
257 timezone: None,
258 };
259
260 let json = serde_json::to_string(&spec).expect("serialize");
261
262 assert!(!json.contains("timezone"));
264 }
265
266 #[test]
267 fn logical_column_nullable_requires_explicit_value() {
268 let json = r#"{ "name": "price", "data_type": "Float64" }"#;
269
270 let err = serde_json::from_str::<LogicalField>(json).unwrap_err();
271 assert!(
272 err.to_string().contains("missing field `nullable`"),
273 "unexpected error: {err}"
274 );
275 }
276
277 #[test]
278 fn table_kind_generic_roundtrip() {
279 let kind = TableKind::Generic;
280 let json = serde_json::to_string(&kind).expect("serialize");
281 let decoded: TableKind = serde_json::from_str(&json).expect("deserialize");
282
283 assert_eq!(kind, decoded);
284 assert_eq!(json, r#""Generic""#);
285 }
286
287 #[test]
288 fn all_time_bucket_variants_roundtrip() {
289 let buckets = vec![
290 TimeBucket::Seconds(15),
291 TimeBucket::Minutes(5),
292 TimeBucket::Hours(24),
293 TimeBucket::Days(7),
294 ];
295
296 for bucket in buckets {
297 let json = serde_json::to_string(&bucket).expect("serialize");
298 let decoded: TimeBucket = serde_json::from_str(&json).expect("deserialize");
299 assert_eq!(bucket, decoded);
300 }
301 }
302
303 #[test]
304 fn file_format_serializes_lowercase() {
305 let format = FileFormat::Parquet;
306 let json = serde_json::to_string(&format).expect("serialize");
307
308 assert_eq!(json, r#""parquet""#);
309
310 let decoded: FileFormat = serde_json::from_str(&json).expect("deserialize");
312 assert_eq!(format, decoded);
313 }
314
315 #[test]
316 fn file_format_default_is_parquet() {
317 assert_eq!(FileFormat::default(), FileFormat::Parquet);
318 }
319
320 #[test]
321 fn remove_segment_action_roundtrip() {
322 let action = LogAction::RemoveSegment {
323 segment_id: SegmentId("seg-to-remove".to_string()),
324 };
325
326 let json = serde_json::to_string(&action).expect("serialize");
327 let decoded: LogAction = serde_json::from_str(&json).expect("deserialize");
328
329 assert_eq!(action, decoded);
330 }
331
332 #[test]
333 fn commit_with_empty_actions() {
334 let ts = utc_datetime(2025, 6, 15, 12, 0, 0);
335
336 let commit = Commit {
337 version: 1,
338 base_version: 0,
339 timestamp: ts,
340 actions: vec![],
341 };
342
343 let json = serde_json::to_string(&commit).expect("serialize");
344 let decoded: Commit = serde_json::from_str(&json).expect("deserialize");
345
346 assert_eq!(commit, decoded);
347 assert!(decoded.actions.is_empty());
348 }
349
350 #[test]
351 fn segment_id_transparent_serialization() {
352 let id = SegmentId("my-segment".to_string());
353 let json = serde_json::to_string(&id).expect("serialize");
354
355 assert_eq!(json, r#""my-segment""#);
357
358 let decoded: SegmentId = serde_json::from_str(&json).expect("deserialize");
359 assert_eq!(id, decoded);
360 }
361}