Skip to main content

apiary_storage/
ledger.rs

1//! Transaction ledger for frame-level ACID operations.
2//!
3//! Each frame has a ledger — an ordered sequence of JSON files describing
4//! every mutation. The ledger is the source of truth for which cells are
5//! active, the schema, and version history.
6//!
7//! Commits use conditional writes (`put_if_not_exists`) for optimistic
8//! concurrency control — no consensus protocol needed.
9
10use std::sync::Arc;
11
12use bytes::Bytes;
13use chrono::Utc;
14use tracing::{info, warn};
15
16use apiary_core::{
17    ApiaryError, CellMetadata, FrameSchema, LedgerAction, LedgerCheckpoint, LedgerEntry, NodeId,
18    Result, StorageBackend,
19};
20
21/// Maximum number of commit retries on conflict.
22const MAX_RETRIES: usize = 10;
23
24/// Checkpoint interval — write a checkpoint every N versions.
25const CHECKPOINT_INTERVAL: u64 = 100;
26
27/// The transaction ledger for a single frame.
28pub struct Ledger {
29    /// Frame path in storage (e.g., "my_hive/my_box/my_frame").
30    frame_path: String,
31    /// Storage backend.
32    storage: Arc<dyn StorageBackend>,
33    /// Current ledger version.
34    current_version: u64,
35    /// Currently active cells (derived from replay).
36    active_cells: Vec<CellMetadata>,
37    /// Frame schema.
38    schema: FrameSchema,
39    /// Partition columns.
40    partition_by: Vec<String>,
41}
42
43impl Ledger {
44    /// Create a new ledger for a frame, writing version 0 (CreateFrame entry).
45    pub async fn create(
46        storage: Arc<dyn StorageBackend>,
47        frame_path: &str,
48        schema: FrameSchema,
49        partition_by: Vec<String>,
50        node_id: &NodeId,
51    ) -> Result<Self> {
52        let entry = LedgerEntry {
53            version: 0,
54            timestamp: Utc::now(),
55            node_id: node_id.clone(),
56            action: LedgerAction::CreateFrame {
57                schema: schema.clone(),
58                partition_by: partition_by.clone(),
59            },
60        };
61
62        let key = ledger_entry_key(frame_path, 0);
63        let data = serde_json::to_vec_pretty(&entry)
64            .map_err(|e| ApiaryError::Serialization(e.to_string()))?;
65
66        let wrote = storage.put_if_not_exists(&key, Bytes::from(data)).await?;
67
68        if !wrote {
69            // Ledger already exists — open it instead
70            return Self::open(storage, frame_path).await;
71        }
72
73        info!(frame_path, "Created ledger version 0");
74
75        Ok(Self {
76            frame_path: frame_path.to_string(),
77            storage,
78            current_version: 0,
79            active_cells: Vec::new(),
80            schema,
81            partition_by,
82        })
83    }
84
85    /// Open an existing ledger by loading the latest checkpoint and replaying entries.
86    pub async fn open(storage: Arc<dyn StorageBackend>, frame_path: &str) -> Result<Self> {
87        let (version, schema, partition_by, active_cells) =
88            Self::load_state(&storage, frame_path).await?;
89
90        Ok(Self {
91            frame_path: frame_path.to_string(),
92            storage,
93            current_version: version,
94            active_cells,
95            schema,
96            partition_by,
97        })
98    }
99
100    /// Load the current state by finding the latest checkpoint and replaying entries after it.
101    async fn load_state(
102        storage: &Arc<dyn StorageBackend>,
103        frame_path: &str,
104    ) -> Result<(u64, FrameSchema, Vec<String>, Vec<CellMetadata>)> {
105        // Try to load the latest checkpoint
106        let checkpoint_prefix = format!("{frame_path}/_ledger/_checkpoint/");
107        let checkpoint_files = storage.list(&checkpoint_prefix).await?;
108
109        let mut start_version = 0u64;
110        let mut schema = FrameSchema { fields: Vec::new() };
111        let mut partition_by = Vec::new();
112        let mut active_cells: Vec<CellMetadata> = Vec::new();
113
114        if let Some(latest_cp) = checkpoint_files
115            .iter()
116            .filter(|f| f.ends_with(".json"))
117            .max()
118        {
119            let cp_data = storage.get(latest_cp).await?;
120            let checkpoint: LedgerCheckpoint = serde_json::from_slice(&cp_data)
121                .map_err(|e| ApiaryError::Serialization(e.to_string()))?;
122            start_version = checkpoint.version;
123            schema = checkpoint.schema;
124            partition_by = checkpoint.partition_by;
125            active_cells = checkpoint.active_cells;
126        }
127
128        // List all ledger entries and replay from start_version
129        let ledger_prefix = format!("{frame_path}/_ledger/");
130        let all_entries = storage.list(&ledger_prefix).await?;
131
132        let mut entry_files: Vec<(u64, String)> = all_entries
133            .iter()
134            .filter(|f| !f.contains("_checkpoint") && f.ends_with(".json"))
135            .filter_map(|f| {
136                let filename = f.rsplit('/').next()?;
137                let version_str = filename.strip_suffix(".json")?;
138                let version: u64 = version_str.parse().ok()?;
139                Some((version, f.clone()))
140            })
141            .filter(|(v, _)| *v >= start_version)
142            .collect();
143
144        entry_files.sort_by_key(|(v, _)| *v);
145
146        // If no entries and no checkpoint found, the ledger doesn't exist
147        if entry_files.is_empty() && start_version == 0 && checkpoint_files.is_empty() {
148            return Err(ApiaryError::NotFound {
149                key: format!("{frame_path}/_ledger"),
150            });
151        }
152
153        let mut current_version = start_version;
154
155        for (version, path) in &entry_files {
156            let data = storage.get(path).await?;
157            let entry: LedgerEntry = serde_json::from_slice(&data)
158                .map_err(|e| ApiaryError::Serialization(e.to_string()))?;
159
160            match &entry.action {
161                LedgerAction::CreateFrame {
162                    schema: s,
163                    partition_by: pb,
164                } => {
165                    schema = s.clone();
166                    partition_by = pb.clone();
167                    active_cells.clear();
168                }
169                LedgerAction::AddCells { cells } => {
170                    active_cells.extend(cells.iter().cloned());
171                }
172                LedgerAction::RewriteCells { removed, added } => {
173                    active_cells.retain(|c| !removed.contains(&c.id));
174                    active_cells.extend(added.iter().cloned());
175                }
176            }
177
178            current_version = *version;
179        }
180
181        Ok((current_version, schema, partition_by, active_cells))
182    }
183
184    /// Reload the ledger state from storage.
185    async fn reload(&mut self) -> Result<()> {
186        let (version, schema, partition_by, active_cells) =
187            Self::load_state(&self.storage, &self.frame_path).await?;
188        self.current_version = version;
189        self.schema = schema;
190        self.partition_by = partition_by;
191        self.active_cells = active_cells;
192        Ok(())
193    }
194
195    /// Commit an action to the ledger with optimistic concurrency.
196    /// Returns the committed version number.
197    pub async fn commit(&mut self, action: LedgerAction, node_id: &NodeId) -> Result<u64> {
198        for attempt in 0..MAX_RETRIES {
199            if attempt > 0 {
200                // Reload state on retry to get fresh version
201                self.reload().await?;
202            }
203
204            let next_version = self.current_version + 1;
205            let entry = LedgerEntry {
206                version: next_version,
207                timestamp: Utc::now(),
208                node_id: node_id.clone(),
209                action: action.clone(),
210            };
211
212            let key = ledger_entry_key(&self.frame_path, next_version);
213            let data = serde_json::to_vec_pretty(&entry)
214                .map_err(|e| ApiaryError::Serialization(e.to_string()))?;
215
216            let wrote = self
217                .storage
218                .put_if_not_exists(&key, Bytes::from(data))
219                .await?;
220
221            if wrote {
222                // Apply the action to our in-memory state
223                match &action {
224                    LedgerAction::CreateFrame {
225                        schema,
226                        partition_by,
227                    } => {
228                        self.schema = schema.clone();
229                        self.partition_by = partition_by.clone();
230                        self.active_cells.clear();
231                    }
232                    LedgerAction::AddCells { cells } => {
233                        self.active_cells.extend(cells.iter().cloned());
234                    }
235                    LedgerAction::RewriteCells { removed, added } => {
236                        self.active_cells.retain(|c| !removed.contains(&c.id));
237                        self.active_cells.extend(added.iter().cloned());
238                    }
239                }
240
241                self.current_version = next_version;
242
243                info!(
244                    frame_path = %self.frame_path,
245                    version = next_version,
246                    "Committed ledger entry"
247                );
248
249                // Write checkpoint every CHECKPOINT_INTERVAL versions
250                if next_version.is_multiple_of(CHECKPOINT_INTERVAL) {
251                    self.write_checkpoint().await?;
252                }
253
254                return Ok(next_version);
255            }
256
257            warn!(
258                frame_path = %self.frame_path,
259                attempt,
260                version = next_version,
261                "Ledger commit conflict, retrying"
262            );
263        }
264
265        Err(ApiaryError::Conflict {
266            message: format!(
267                "Failed to commit ledger entry for {} after {} retries",
268                self.frame_path, MAX_RETRIES
269            ),
270        })
271    }
272
273    /// Write a checkpoint with the current state.
274    async fn write_checkpoint(&self) -> Result<()> {
275        let checkpoint = LedgerCheckpoint {
276            version: self.current_version,
277            schema: self.schema.clone(),
278            partition_by: self.partition_by.clone(),
279            active_cells: self.active_cells.clone(),
280        };
281
282        let key = format!(
283            "{}/_ledger/_checkpoint/checkpoint_{:06}.json",
284            self.frame_path, self.current_version
285        );
286        let data = serde_json::to_vec_pretty(&checkpoint)
287            .map_err(|e| ApiaryError::Serialization(e.to_string()))?;
288
289        self.storage.put(&key, Bytes::from(data)).await?;
290        info!(
291            frame_path = %self.frame_path,
292            version = self.current_version,
293            "Wrote ledger checkpoint"
294        );
295        Ok(())
296    }
297
298    /// Get the currently active cells.
299    pub fn active_cells(&self) -> &[CellMetadata] {
300        &self.active_cells
301    }
302
303    /// Get the current version.
304    pub fn current_version(&self) -> u64 {
305        self.current_version
306    }
307
308    /// Get the frame schema.
309    pub fn schema(&self) -> &FrameSchema {
310        &self.schema
311    }
312
313    /// Get the partition columns.
314    pub fn partition_by(&self) -> &[String] {
315        &self.partition_by
316    }
317
318    /// Get the frame path.
319    pub fn frame_path(&self) -> &str {
320        &self.frame_path
321    }
322
323    /// Prune cells based on partition filters and column stat filters.
324    /// Returns references to cells that match the given filters.
325    pub fn prune_cells(
326        &self,
327        partition_filters: &std::collections::HashMap<String, String>,
328        stat_filters: &std::collections::HashMap<
329            String,
330            (Option<serde_json::Value>, Option<serde_json::Value>),
331        >,
332    ) -> Vec<&CellMetadata> {
333        self.active_cells
334            .iter()
335            .filter(|cell| {
336                // Partition pruning: skip cells whose partition values don't match
337                for (col, val) in partition_filters {
338                    if let Some(cell_val) = cell.partition_values.get(col) {
339                        if cell_val != val {
340                            return false;
341                        }
342                    }
343                }
344
345                // Stat pruning: skip cells whose min/max don't overlap the filter range
346                for (col, (min_filter, max_filter)) in stat_filters {
347                    if let Some(col_stats) = cell.stats.get(col) {
348                        // If filter has a minimum and cell's max is less, skip
349                        if let (Some(filter_min), Some(cell_max)) = (min_filter, &col_stats.max) {
350                            if json_value_lt(cell_max, filter_min) {
351                                return false;
352                            }
353                        }
354                        // If filter has a maximum and cell's min is greater, skip
355                        if let (Some(filter_max), Some(cell_min)) = (max_filter, &col_stats.min) {
356                            if json_value_lt(filter_max, cell_min) {
357                                return false;
358                            }
359                        }
360                    }
361                }
362
363                true
364            })
365            .collect()
366    }
367}
368
369/// Compare two JSON values as numbers (for stat pruning).
370fn json_value_lt(a: &serde_json::Value, b: &serde_json::Value) -> bool {
371    match (a.as_f64(), b.as_f64()) {
372        (Some(a_f), Some(b_f)) => a_f < b_f,
373        _ => {
374            // Fall back to string comparison
375            match (a.as_str(), b.as_str()) {
376                (Some(a_s), Some(b_s)) => a_s < b_s,
377                _ => false,
378            }
379        }
380    }
381}
382
383/// Generate the storage key for a ledger entry.
384fn ledger_entry_key(frame_path: &str, version: u64) -> String {
385    format!("{frame_path}/_ledger/{version:06}.json")
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391    use crate::local::LocalBackend;
392    use apiary_core::{CellId, ColumnStats, FieldDef};
393    use std::collections::HashMap;
394
395    async fn make_storage() -> Arc<dyn StorageBackend> {
396        let dir = tempfile::tempdir().unwrap();
397        let backend = LocalBackend::new(dir.keep()).await.unwrap();
398        Arc::new(backend)
399    }
400
401    fn test_schema() -> FrameSchema {
402        FrameSchema {
403            fields: vec![
404                FieldDef {
405                    name: "region".into(),
406                    data_type: "string".into(),
407                    nullable: false,
408                },
409                FieldDef {
410                    name: "temp".into(),
411                    data_type: "float64".into(),
412                    nullable: true,
413                },
414            ],
415        }
416    }
417
418    #[tokio::test]
419    async fn test_ledger_create() {
420        let storage = make_storage().await;
421        let node_id = NodeId::new("test_node");
422        let schema = test_schema();
423
424        let ledger = Ledger::create(
425            storage.clone(),
426            "hive/box/frame",
427            schema,
428            vec!["region".into()],
429            &node_id,
430        )
431        .await
432        .unwrap();
433
434        assert_eq!(ledger.current_version(), 0);
435        assert!(ledger.active_cells().is_empty());
436        assert_eq!(ledger.partition_by(), &["region"]);
437    }
438
439    #[tokio::test]
440    async fn test_ledger_create_idempotent() {
441        let storage = make_storage().await;
442        let node_id = NodeId::new("test_node");
443        let schema = test_schema();
444
445        let _ledger1 = Ledger::create(
446            storage.clone(),
447            "hive/box/frame",
448            schema.clone(),
449            vec!["region".into()],
450            &node_id,
451        )
452        .await
453        .unwrap();
454
455        // Creating again should open the existing ledger
456        let ledger2 = Ledger::create(
457            storage.clone(),
458            "hive/box/frame",
459            schema,
460            vec!["region".into()],
461            &node_id,
462        )
463        .await
464        .unwrap();
465
466        assert_eq!(ledger2.current_version(), 0);
467    }
468
469    #[tokio::test]
470    async fn test_ledger_commit_add_cells() {
471        let storage = make_storage().await;
472        let node_id = NodeId::new("test_node");
473        let schema = test_schema();
474
475        let mut ledger = Ledger::create(
476            storage.clone(),
477            "hive/box/frame",
478            schema,
479            vec!["region".into()],
480            &node_id,
481        )
482        .await
483        .unwrap();
484
485        let cells = vec![CellMetadata {
486            id: CellId::new("cell_001"),
487            path: "region=north/cell_001.parquet".into(),
488            format: "parquet".into(),
489            partition_values: HashMap::from([("region".into(), "north".into())]),
490            rows: 100,
491            bytes: 2048,
492            stats: HashMap::new(),
493        }];
494
495        let version = ledger
496            .commit(LedgerAction::AddCells { cells }, &node_id)
497            .await
498            .unwrap();
499
500        assert_eq!(version, 1);
501        assert_eq!(ledger.active_cells().len(), 1);
502        assert_eq!(ledger.active_cells()[0].rows, 100);
503    }
504
505    #[tokio::test]
506    async fn test_ledger_open_replays() {
507        let storage = make_storage().await;
508        let node_id = NodeId::new("test_node");
509        let schema = test_schema();
510
511        {
512            let mut ledger = Ledger::create(
513                storage.clone(),
514                "hive/box/frame",
515                schema,
516                vec!["region".into()],
517                &node_id,
518            )
519            .await
520            .unwrap();
521
522            let cells = vec![CellMetadata {
523                id: CellId::new("cell_001"),
524                path: "region=north/cell_001.parquet".into(),
525                format: "parquet".into(),
526                partition_values: HashMap::from([("region".into(), "north".into())]),
527                rows: 100,
528                bytes: 2048,
529                stats: HashMap::new(),
530            }];
531
532            ledger
533                .commit(LedgerAction::AddCells { cells }, &node_id)
534                .await
535                .unwrap();
536        }
537
538        // Open and verify replay
539        let ledger = Ledger::open(storage.clone(), "hive/box/frame")
540            .await
541            .unwrap();
542
543        assert_eq!(ledger.current_version(), 1);
544        assert_eq!(ledger.active_cells().len(), 1);
545    }
546
547    #[tokio::test]
548    async fn test_ledger_rewrite_cells() {
549        let storage = make_storage().await;
550        let node_id = NodeId::new("test_node");
551        let schema = test_schema();
552
553        let mut ledger = Ledger::create(
554            storage.clone(),
555            "hive/box/frame",
556            schema,
557            vec!["region".into()],
558            &node_id,
559        )
560        .await
561        .unwrap();
562
563        // Add cells
564        let cell1 = CellMetadata {
565            id: CellId::new("cell_001"),
566            path: "cell_001.parquet".into(),
567            format: "parquet".into(),
568            partition_values: HashMap::new(),
569            rows: 100,
570            bytes: 2048,
571            stats: HashMap::new(),
572        };
573        let cell2 = CellMetadata {
574            id: CellId::new("cell_002"),
575            path: "cell_002.parquet".into(),
576            format: "parquet".into(),
577            partition_values: HashMap::new(),
578            rows: 200,
579            bytes: 4096,
580            stats: HashMap::new(),
581        };
582
583        ledger
584            .commit(
585                LedgerAction::AddCells {
586                    cells: vec![cell1, cell2],
587                },
588                &node_id,
589            )
590            .await
591            .unwrap();
592
593        assert_eq!(ledger.active_cells().len(), 2);
594
595        // Rewrite: remove cell_001, add cell_003
596        let cell3 = CellMetadata {
597            id: CellId::new("cell_003"),
598            path: "cell_003.parquet".into(),
599            format: "parquet".into(),
600            partition_values: HashMap::new(),
601            rows: 300,
602            bytes: 6144,
603            stats: HashMap::new(),
604        };
605
606        ledger
607            .commit(
608                LedgerAction::RewriteCells {
609                    removed: vec![CellId::new("cell_001")],
610                    added: vec![cell3],
611                },
612                &node_id,
613            )
614            .await
615            .unwrap();
616
617        assert_eq!(ledger.active_cells().len(), 2);
618        let ids: Vec<&str> = ledger
619            .active_cells()
620            .iter()
621            .map(|c| c.id.as_str())
622            .collect();
623        assert!(ids.contains(&"cell_002"));
624        assert!(ids.contains(&"cell_003"));
625    }
626
627    #[tokio::test]
628    async fn test_prune_cells_by_partition() {
629        let storage = make_storage().await;
630        let node_id = NodeId::new("test_node");
631        let schema = test_schema();
632
633        let mut ledger = Ledger::create(
634            storage.clone(),
635            "hive/box/frame",
636            schema,
637            vec!["region".into()],
638            &node_id,
639        )
640        .await
641        .unwrap();
642
643        let cells = vec![
644            CellMetadata {
645                id: CellId::new("cell_north"),
646                path: "region=north/cell_north.parquet".into(),
647                format: "parquet".into(),
648                partition_values: HashMap::from([("region".into(), "north".into())]),
649                rows: 100,
650                bytes: 2048,
651                stats: HashMap::new(),
652            },
653            CellMetadata {
654                id: CellId::new("cell_south"),
655                path: "region=south/cell_south.parquet".into(),
656                format: "parquet".into(),
657                partition_values: HashMap::from([("region".into(), "south".into())]),
658                rows: 200,
659                bytes: 4096,
660                stats: HashMap::new(),
661            },
662        ];
663
664        ledger
665            .commit(LedgerAction::AddCells { cells }, &node_id)
666            .await
667            .unwrap();
668
669        // Filter for region=north
670        let filters = HashMap::from([("region".into(), "north".into())]);
671        let pruned = ledger.prune_cells(&filters, &HashMap::new());
672        assert_eq!(pruned.len(), 1);
673        assert_eq!(pruned[0].id.as_str(), "cell_north");
674    }
675
676    #[tokio::test]
677    async fn test_prune_cells_by_stats() {
678        let storage = make_storage().await;
679        let node_id = NodeId::new("test_node");
680        let schema = test_schema();
681
682        let mut ledger =
683            Ledger::create(storage.clone(), "hive/box/frame", schema, vec![], &node_id)
684                .await
685                .unwrap();
686
687        let cells = vec![
688            CellMetadata {
689                id: CellId::new("cell_low"),
690                path: "cell_low.parquet".into(),
691                format: "parquet".into(),
692                partition_values: HashMap::new(),
693                rows: 100,
694                bytes: 2048,
695                stats: HashMap::from([(
696                    "temp".into(),
697                    ColumnStats {
698                        min: Some(serde_json::json!(10.0)),
699                        max: Some(serde_json::json!(20.0)),
700                        null_count: 0,
701                        distinct_count: None,
702                    },
703                )]),
704            },
705            CellMetadata {
706                id: CellId::new("cell_high"),
707                path: "cell_high.parquet".into(),
708                format: "parquet".into(),
709                partition_values: HashMap::new(),
710                rows: 100,
711                bytes: 2048,
712                stats: HashMap::from([(
713                    "temp".into(),
714                    ColumnStats {
715                        min: Some(serde_json::json!(30.0)),
716                        max: Some(serde_json::json!(40.0)),
717                        null_count: 0,
718                        distinct_count: None,
719                    },
720                )]),
721            },
722        ];
723
724        ledger
725            .commit(LedgerAction::AddCells { cells }, &node_id)
726            .await
727            .unwrap();
728
729        // Filter: temp > 25 (min_filter=25, no max_filter)
730        let stat_filters: HashMap<String, (Option<serde_json::Value>, Option<serde_json::Value>)> =
731            HashMap::from([("temp".into(), (Some(serde_json::json!(25.0)), None))]);
732        let pruned = ledger.prune_cells(&HashMap::new(), &stat_filters);
733        assert_eq!(pruned.len(), 1);
734        assert_eq!(pruned[0].id.as_str(), "cell_high");
735    }
736}