1use std::sync::Arc;
11
12use bytes::Bytes;
13use chrono::Utc;
14use tracing::{info, warn};
15
16use apiary_core::{
17 ApiaryError, CellMetadata, FrameSchema, LedgerAction, LedgerCheckpoint, LedgerEntry, NodeId,
18 Result, StorageBackend,
19};
20
21const MAX_RETRIES: usize = 10;
23
24const CHECKPOINT_INTERVAL: u64 = 100;
26
27pub struct Ledger {
29 frame_path: String,
31 storage: Arc<dyn StorageBackend>,
33 current_version: u64,
35 active_cells: Vec<CellMetadata>,
37 schema: FrameSchema,
39 partition_by: Vec<String>,
41}
42
43impl Ledger {
44 pub async fn create(
46 storage: Arc<dyn StorageBackend>,
47 frame_path: &str,
48 schema: FrameSchema,
49 partition_by: Vec<String>,
50 node_id: &NodeId,
51 ) -> Result<Self> {
52 let entry = LedgerEntry {
53 version: 0,
54 timestamp: Utc::now(),
55 node_id: node_id.clone(),
56 action: LedgerAction::CreateFrame {
57 schema: schema.clone(),
58 partition_by: partition_by.clone(),
59 },
60 };
61
62 let key = ledger_entry_key(frame_path, 0);
63 let data = serde_json::to_vec_pretty(&entry)
64 .map_err(|e| ApiaryError::Serialization(e.to_string()))?;
65
66 let wrote = storage.put_if_not_exists(&key, Bytes::from(data)).await?;
67
68 if !wrote {
69 return Self::open(storage, frame_path).await;
71 }
72
73 info!(frame_path, "Created ledger version 0");
74
75 Ok(Self {
76 frame_path: frame_path.to_string(),
77 storage,
78 current_version: 0,
79 active_cells: Vec::new(),
80 schema,
81 partition_by,
82 })
83 }
84
85 pub async fn open(storage: Arc<dyn StorageBackend>, frame_path: &str) -> Result<Self> {
87 let (version, schema, partition_by, active_cells) =
88 Self::load_state(&storage, frame_path).await?;
89
90 Ok(Self {
91 frame_path: frame_path.to_string(),
92 storage,
93 current_version: version,
94 active_cells,
95 schema,
96 partition_by,
97 })
98 }
99
100 async fn load_state(
102 storage: &Arc<dyn StorageBackend>,
103 frame_path: &str,
104 ) -> Result<(u64, FrameSchema, Vec<String>, Vec<CellMetadata>)> {
105 let checkpoint_prefix = format!("{frame_path}/_ledger/_checkpoint/");
107 let checkpoint_files = storage.list(&checkpoint_prefix).await?;
108
109 let mut start_version = 0u64;
110 let mut schema = FrameSchema { fields: Vec::new() };
111 let mut partition_by = Vec::new();
112 let mut active_cells: Vec<CellMetadata> = Vec::new();
113
114 if let Some(latest_cp) = checkpoint_files
115 .iter()
116 .filter(|f| f.ends_with(".json"))
117 .max()
118 {
119 let cp_data = storage.get(latest_cp).await?;
120 let checkpoint: LedgerCheckpoint = serde_json::from_slice(&cp_data)
121 .map_err(|e| ApiaryError::Serialization(e.to_string()))?;
122 start_version = checkpoint.version;
123 schema = checkpoint.schema;
124 partition_by = checkpoint.partition_by;
125 active_cells = checkpoint.active_cells;
126 }
127
128 let ledger_prefix = format!("{frame_path}/_ledger/");
130 let all_entries = storage.list(&ledger_prefix).await?;
131
132 let mut entry_files: Vec<(u64, String)> = all_entries
133 .iter()
134 .filter(|f| !f.contains("_checkpoint") && f.ends_with(".json"))
135 .filter_map(|f| {
136 let filename = f.rsplit('/').next()?;
137 let version_str = filename.strip_suffix(".json")?;
138 let version: u64 = version_str.parse().ok()?;
139 Some((version, f.clone()))
140 })
141 .filter(|(v, _)| *v >= start_version)
142 .collect();
143
144 entry_files.sort_by_key(|(v, _)| *v);
145
146 if entry_files.is_empty() && start_version == 0 && checkpoint_files.is_empty() {
148 return Err(ApiaryError::NotFound {
149 key: format!("{frame_path}/_ledger"),
150 });
151 }
152
153 let mut current_version = start_version;
154
155 for (version, path) in &entry_files {
156 let data = storage.get(path).await?;
157 let entry: LedgerEntry = serde_json::from_slice(&data)
158 .map_err(|e| ApiaryError::Serialization(e.to_string()))?;
159
160 match &entry.action {
161 LedgerAction::CreateFrame {
162 schema: s,
163 partition_by: pb,
164 } => {
165 schema = s.clone();
166 partition_by = pb.clone();
167 active_cells.clear();
168 }
169 LedgerAction::AddCells { cells } => {
170 active_cells.extend(cells.iter().cloned());
171 }
172 LedgerAction::RewriteCells { removed, added } => {
173 active_cells.retain(|c| !removed.contains(&c.id));
174 active_cells.extend(added.iter().cloned());
175 }
176 }
177
178 current_version = *version;
179 }
180
181 Ok((current_version, schema, partition_by, active_cells))
182 }
183
184 async fn reload(&mut self) -> Result<()> {
186 let (version, schema, partition_by, active_cells) =
187 Self::load_state(&self.storage, &self.frame_path).await?;
188 self.current_version = version;
189 self.schema = schema;
190 self.partition_by = partition_by;
191 self.active_cells = active_cells;
192 Ok(())
193 }
194
195 pub async fn commit(&mut self, action: LedgerAction, node_id: &NodeId) -> Result<u64> {
198 for attempt in 0..MAX_RETRIES {
199 if attempt > 0 {
200 self.reload().await?;
202 }
203
204 let next_version = self.current_version + 1;
205 let entry = LedgerEntry {
206 version: next_version,
207 timestamp: Utc::now(),
208 node_id: node_id.clone(),
209 action: action.clone(),
210 };
211
212 let key = ledger_entry_key(&self.frame_path, next_version);
213 let data = serde_json::to_vec_pretty(&entry)
214 .map_err(|e| ApiaryError::Serialization(e.to_string()))?;
215
216 let wrote = self
217 .storage
218 .put_if_not_exists(&key, Bytes::from(data))
219 .await?;
220
221 if wrote {
222 match &action {
224 LedgerAction::CreateFrame {
225 schema,
226 partition_by,
227 } => {
228 self.schema = schema.clone();
229 self.partition_by = partition_by.clone();
230 self.active_cells.clear();
231 }
232 LedgerAction::AddCells { cells } => {
233 self.active_cells.extend(cells.iter().cloned());
234 }
235 LedgerAction::RewriteCells { removed, added } => {
236 self.active_cells.retain(|c| !removed.contains(&c.id));
237 self.active_cells.extend(added.iter().cloned());
238 }
239 }
240
241 self.current_version = next_version;
242
243 info!(
244 frame_path = %self.frame_path,
245 version = next_version,
246 "Committed ledger entry"
247 );
248
249 if next_version.is_multiple_of(CHECKPOINT_INTERVAL) {
251 self.write_checkpoint().await?;
252 }
253
254 return Ok(next_version);
255 }
256
257 warn!(
258 frame_path = %self.frame_path,
259 attempt,
260 version = next_version,
261 "Ledger commit conflict, retrying"
262 );
263 }
264
265 Err(ApiaryError::Conflict {
266 message: format!(
267 "Failed to commit ledger entry for {} after {} retries",
268 self.frame_path, MAX_RETRIES
269 ),
270 })
271 }
272
273 async fn write_checkpoint(&self) -> Result<()> {
275 let checkpoint = LedgerCheckpoint {
276 version: self.current_version,
277 schema: self.schema.clone(),
278 partition_by: self.partition_by.clone(),
279 active_cells: self.active_cells.clone(),
280 };
281
282 let key = format!(
283 "{}/_ledger/_checkpoint/checkpoint_{:06}.json",
284 self.frame_path, self.current_version
285 );
286 let data = serde_json::to_vec_pretty(&checkpoint)
287 .map_err(|e| ApiaryError::Serialization(e.to_string()))?;
288
289 self.storage.put(&key, Bytes::from(data)).await?;
290 info!(
291 frame_path = %self.frame_path,
292 version = self.current_version,
293 "Wrote ledger checkpoint"
294 );
295 Ok(())
296 }
297
298 pub fn active_cells(&self) -> &[CellMetadata] {
300 &self.active_cells
301 }
302
303 pub fn current_version(&self) -> u64 {
305 self.current_version
306 }
307
308 pub fn schema(&self) -> &FrameSchema {
310 &self.schema
311 }
312
313 pub fn partition_by(&self) -> &[String] {
315 &self.partition_by
316 }
317
318 pub fn frame_path(&self) -> &str {
320 &self.frame_path
321 }
322
323 pub fn prune_cells(
326 &self,
327 partition_filters: &std::collections::HashMap<String, String>,
328 stat_filters: &std::collections::HashMap<
329 String,
330 (Option<serde_json::Value>, Option<serde_json::Value>),
331 >,
332 ) -> Vec<&CellMetadata> {
333 self.active_cells
334 .iter()
335 .filter(|cell| {
336 for (col, val) in partition_filters {
338 if let Some(cell_val) = cell.partition_values.get(col) {
339 if cell_val != val {
340 return false;
341 }
342 }
343 }
344
345 for (col, (min_filter, max_filter)) in stat_filters {
347 if let Some(col_stats) = cell.stats.get(col) {
348 if let (Some(filter_min), Some(cell_max)) = (min_filter, &col_stats.max) {
350 if json_value_lt(cell_max, filter_min) {
351 return false;
352 }
353 }
354 if let (Some(filter_max), Some(cell_min)) = (max_filter, &col_stats.min) {
356 if json_value_lt(filter_max, cell_min) {
357 return false;
358 }
359 }
360 }
361 }
362
363 true
364 })
365 .collect()
366 }
367}
368
369fn json_value_lt(a: &serde_json::Value, b: &serde_json::Value) -> bool {
371 match (a.as_f64(), b.as_f64()) {
372 (Some(a_f), Some(b_f)) => a_f < b_f,
373 _ => {
374 match (a.as_str(), b.as_str()) {
376 (Some(a_s), Some(b_s)) => a_s < b_s,
377 _ => false,
378 }
379 }
380 }
381}
382
383fn ledger_entry_key(frame_path: &str, version: u64) -> String {
385 format!("{frame_path}/_ledger/{version:06}.json")
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391 use crate::local::LocalBackend;
392 use apiary_core::{CellId, ColumnStats, FieldDef};
393 use std::collections::HashMap;
394
395 async fn make_storage() -> Arc<dyn StorageBackend> {
396 let dir = tempfile::tempdir().unwrap();
397 let backend = LocalBackend::new(dir.keep()).await.unwrap();
398 Arc::new(backend)
399 }
400
401 fn test_schema() -> FrameSchema {
402 FrameSchema {
403 fields: vec![
404 FieldDef {
405 name: "region".into(),
406 data_type: "string".into(),
407 nullable: false,
408 },
409 FieldDef {
410 name: "temp".into(),
411 data_type: "float64".into(),
412 nullable: true,
413 },
414 ],
415 }
416 }
417
418 #[tokio::test]
419 async fn test_ledger_create() {
420 let storage = make_storage().await;
421 let node_id = NodeId::new("test_node");
422 let schema = test_schema();
423
424 let ledger = Ledger::create(
425 storage.clone(),
426 "hive/box/frame",
427 schema,
428 vec!["region".into()],
429 &node_id,
430 )
431 .await
432 .unwrap();
433
434 assert_eq!(ledger.current_version(), 0);
435 assert!(ledger.active_cells().is_empty());
436 assert_eq!(ledger.partition_by(), &["region"]);
437 }
438
439 #[tokio::test]
440 async fn test_ledger_create_idempotent() {
441 let storage = make_storage().await;
442 let node_id = NodeId::new("test_node");
443 let schema = test_schema();
444
445 let _ledger1 = Ledger::create(
446 storage.clone(),
447 "hive/box/frame",
448 schema.clone(),
449 vec!["region".into()],
450 &node_id,
451 )
452 .await
453 .unwrap();
454
455 let ledger2 = Ledger::create(
457 storage.clone(),
458 "hive/box/frame",
459 schema,
460 vec!["region".into()],
461 &node_id,
462 )
463 .await
464 .unwrap();
465
466 assert_eq!(ledger2.current_version(), 0);
467 }
468
469 #[tokio::test]
470 async fn test_ledger_commit_add_cells() {
471 let storage = make_storage().await;
472 let node_id = NodeId::new("test_node");
473 let schema = test_schema();
474
475 let mut ledger = Ledger::create(
476 storage.clone(),
477 "hive/box/frame",
478 schema,
479 vec!["region".into()],
480 &node_id,
481 )
482 .await
483 .unwrap();
484
485 let cells = vec![CellMetadata {
486 id: CellId::new("cell_001"),
487 path: "region=north/cell_001.parquet".into(),
488 format: "parquet".into(),
489 partition_values: HashMap::from([("region".into(), "north".into())]),
490 rows: 100,
491 bytes: 2048,
492 stats: HashMap::new(),
493 }];
494
495 let version = ledger
496 .commit(LedgerAction::AddCells { cells }, &node_id)
497 .await
498 .unwrap();
499
500 assert_eq!(version, 1);
501 assert_eq!(ledger.active_cells().len(), 1);
502 assert_eq!(ledger.active_cells()[0].rows, 100);
503 }
504
505 #[tokio::test]
506 async fn test_ledger_open_replays() {
507 let storage = make_storage().await;
508 let node_id = NodeId::new("test_node");
509 let schema = test_schema();
510
511 {
512 let mut ledger = Ledger::create(
513 storage.clone(),
514 "hive/box/frame",
515 schema,
516 vec!["region".into()],
517 &node_id,
518 )
519 .await
520 .unwrap();
521
522 let cells = vec![CellMetadata {
523 id: CellId::new("cell_001"),
524 path: "region=north/cell_001.parquet".into(),
525 format: "parquet".into(),
526 partition_values: HashMap::from([("region".into(), "north".into())]),
527 rows: 100,
528 bytes: 2048,
529 stats: HashMap::new(),
530 }];
531
532 ledger
533 .commit(LedgerAction::AddCells { cells }, &node_id)
534 .await
535 .unwrap();
536 }
537
538 let ledger = Ledger::open(storage.clone(), "hive/box/frame")
540 .await
541 .unwrap();
542
543 assert_eq!(ledger.current_version(), 1);
544 assert_eq!(ledger.active_cells().len(), 1);
545 }
546
547 #[tokio::test]
548 async fn test_ledger_rewrite_cells() {
549 let storage = make_storage().await;
550 let node_id = NodeId::new("test_node");
551 let schema = test_schema();
552
553 let mut ledger = Ledger::create(
554 storage.clone(),
555 "hive/box/frame",
556 schema,
557 vec!["region".into()],
558 &node_id,
559 )
560 .await
561 .unwrap();
562
563 let cell1 = CellMetadata {
565 id: CellId::new("cell_001"),
566 path: "cell_001.parquet".into(),
567 format: "parquet".into(),
568 partition_values: HashMap::new(),
569 rows: 100,
570 bytes: 2048,
571 stats: HashMap::new(),
572 };
573 let cell2 = CellMetadata {
574 id: CellId::new("cell_002"),
575 path: "cell_002.parquet".into(),
576 format: "parquet".into(),
577 partition_values: HashMap::new(),
578 rows: 200,
579 bytes: 4096,
580 stats: HashMap::new(),
581 };
582
583 ledger
584 .commit(
585 LedgerAction::AddCells {
586 cells: vec![cell1, cell2],
587 },
588 &node_id,
589 )
590 .await
591 .unwrap();
592
593 assert_eq!(ledger.active_cells().len(), 2);
594
595 let cell3 = CellMetadata {
597 id: CellId::new("cell_003"),
598 path: "cell_003.parquet".into(),
599 format: "parquet".into(),
600 partition_values: HashMap::new(),
601 rows: 300,
602 bytes: 6144,
603 stats: HashMap::new(),
604 };
605
606 ledger
607 .commit(
608 LedgerAction::RewriteCells {
609 removed: vec![CellId::new("cell_001")],
610 added: vec![cell3],
611 },
612 &node_id,
613 )
614 .await
615 .unwrap();
616
617 assert_eq!(ledger.active_cells().len(), 2);
618 let ids: Vec<&str> = ledger
619 .active_cells()
620 .iter()
621 .map(|c| c.id.as_str())
622 .collect();
623 assert!(ids.contains(&"cell_002"));
624 assert!(ids.contains(&"cell_003"));
625 }
626
627 #[tokio::test]
628 async fn test_prune_cells_by_partition() {
629 let storage = make_storage().await;
630 let node_id = NodeId::new("test_node");
631 let schema = test_schema();
632
633 let mut ledger = Ledger::create(
634 storage.clone(),
635 "hive/box/frame",
636 schema,
637 vec!["region".into()],
638 &node_id,
639 )
640 .await
641 .unwrap();
642
643 let cells = vec![
644 CellMetadata {
645 id: CellId::new("cell_north"),
646 path: "region=north/cell_north.parquet".into(),
647 format: "parquet".into(),
648 partition_values: HashMap::from([("region".into(), "north".into())]),
649 rows: 100,
650 bytes: 2048,
651 stats: HashMap::new(),
652 },
653 CellMetadata {
654 id: CellId::new("cell_south"),
655 path: "region=south/cell_south.parquet".into(),
656 format: "parquet".into(),
657 partition_values: HashMap::from([("region".into(), "south".into())]),
658 rows: 200,
659 bytes: 4096,
660 stats: HashMap::new(),
661 },
662 ];
663
664 ledger
665 .commit(LedgerAction::AddCells { cells }, &node_id)
666 .await
667 .unwrap();
668
669 let filters = HashMap::from([("region".into(), "north".into())]);
671 let pruned = ledger.prune_cells(&filters, &HashMap::new());
672 assert_eq!(pruned.len(), 1);
673 assert_eq!(pruned[0].id.as_str(), "cell_north");
674 }
675
676 #[tokio::test]
677 async fn test_prune_cells_by_stats() {
678 let storage = make_storage().await;
679 let node_id = NodeId::new("test_node");
680 let schema = test_schema();
681
682 let mut ledger =
683 Ledger::create(storage.clone(), "hive/box/frame", schema, vec![], &node_id)
684 .await
685 .unwrap();
686
687 let cells = vec![
688 CellMetadata {
689 id: CellId::new("cell_low"),
690 path: "cell_low.parquet".into(),
691 format: "parquet".into(),
692 partition_values: HashMap::new(),
693 rows: 100,
694 bytes: 2048,
695 stats: HashMap::from([(
696 "temp".into(),
697 ColumnStats {
698 min: Some(serde_json::json!(10.0)),
699 max: Some(serde_json::json!(20.0)),
700 null_count: 0,
701 distinct_count: None,
702 },
703 )]),
704 },
705 CellMetadata {
706 id: CellId::new("cell_high"),
707 path: "cell_high.parquet".into(),
708 format: "parquet".into(),
709 partition_values: HashMap::new(),
710 rows: 100,
711 bytes: 2048,
712 stats: HashMap::from([(
713 "temp".into(),
714 ColumnStats {
715 min: Some(serde_json::json!(30.0)),
716 max: Some(serde_json::json!(40.0)),
717 null_count: 0,
718 distinct_count: None,
719 },
720 )]),
721 },
722 ];
723
724 ledger
725 .commit(LedgerAction::AddCells { cells }, &node_id)
726 .await
727 .unwrap();
728
729 let stat_filters: HashMap<String, (Option<serde_json::Value>, Option<serde_json::Value>)> =
731 HashMap::from([("temp".into(), (Some(serde_json::json!(25.0)), None))]);
732 let pruned = ledger.prune_cells(&HashMap::new(), &stat_filters);
733 assert_eq!(pruned.len(), 1);
734 assert_eq!(pruned[0].id.as_str(), "cell_high");
735 }
736}