azoth_core/traits/canonical.rs
1use crate::error::Result;
2use crate::lock_manager::LockManager;
3use crate::types::{BackupInfo, CanonicalMeta, CommitInfo, EventId};
4use std::path::Path;
5
6/// Preflight validation result
7///
8/// Tracks keys accessed during preflight validation for stripe locking
9#[derive(Debug, Clone)]
10pub struct PreflightResult {
11 /// Whether validation passed
12 pub valid: bool,
13
14 /// Validation errors (if any)
15 pub errors: Vec<String>,
16
17 /// Keys read during preflight
18 pub read_keys: Vec<Vec<u8>>,
19
20 /// Keys to be written
21 pub write_keys: Vec<Vec<u8>>,
22}
23
24impl PreflightResult {
25 pub fn success() -> Self {
26 Self {
27 valid: true,
28 errors: Vec::new(),
29 read_keys: Vec::new(),
30 write_keys: Vec::new(),
31 }
32 }
33
34 pub fn failure(error: String) -> Self {
35 Self {
36 valid: false,
37 errors: vec![error],
38 read_keys: Vec::new(),
39 write_keys: Vec::new(),
40 }
41 }
42
43 pub fn with_keys(mut self, read_keys: Vec<Vec<u8>>, write_keys: Vec<Vec<u8>>) -> Self {
44 self.read_keys = read_keys;
45 self.write_keys = write_keys;
46 self
47 }
48}
49
50/// Iterator over events in the canonical store
51pub trait EventIter: Send {
52 /// Get the next event
53 ///
54 /// Returns None when iteration is complete
55 fn next(&mut self) -> Result<Option<(EventId, Vec<u8>)>>;
56}
57
58/// Transaction for canonical store operations
59///
60/// Supports three-phase commit:
61/// 1. Async preflight validation (with stripe locking)
62/// 2. Fast sync state updates and event appends
63/// 3. Atomic commit
64///
65/// Note: Not required to be Send, as some backends (LMDB) have thread-affine transactions
66pub trait CanonicalTxn {
67 /// Phase 1: Preflight validation (async with stripe locking)
68 ///
69 /// Returns keys that will be accessed and validation result.
70 /// This phase can run concurrently with other non-conflicting transactions.
71 fn preflight(&mut self) -> Result<PreflightResult> {
72 // Default implementation: no preflight validation
73 Ok(PreflightResult::success())
74 }
75
76 /// Phase 2: Read state (fast, single-writer)
77 fn get_state(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
78
79 /// Phase 2: Write state (fast, single-writer)
80 fn put_state(&mut self, key: &[u8], value: &[u8]) -> Result<()>;
81
82 /// Phase 2: Delete state (fast, single-writer)
83 fn del_state(&mut self, key: &[u8]) -> Result<()>;
84
85 /// Phase 3: Append single event (fast, single-writer)
86 fn append_event(&mut self, event: &[u8]) -> Result<EventId>;
87
88 /// Phase 3: Append multiple events (fast, single-writer)
89 fn append_events(&mut self, events: &[Vec<u8>]) -> Result<(EventId, EventId)>;
90
91 /// Commit transaction (phases 2+3 succeed atomically)
92 ///
93 /// Note: Phase 1 (preflight) is separate and uses stripe locks
94 fn commit(self) -> Result<CommitInfo>;
95
96 /// Abort transaction
97 fn abort(self);
98}
99
100/// Canonical store: transactional KV + append-only event log
101///
102/// Provides:
103/// - Atomic commits over state + events
104/// - Stripe locking for concurrent preflight validation
105/// - Sequential event iteration
106/// - Seal mechanism for deterministic snapshots
107/// - Pausable ingestion for safe backups
108pub trait CanonicalStore: Send + Sync {
109 type Txn<'a>: CanonicalTxn
110 where
111 Self: 'a;
112
113 /// Open a canonical store
114 fn open(cfg: crate::config::CanonicalConfig) -> Result<Self>
115 where
116 Self: Sized;
117
118 /// Close the store
119 fn close(&self) -> Result<()>;
120
121 /// Begin a read-only transaction
122 fn read_txn(&self) -> Result<Self::Txn<'_>>;
123
124 /// Begin a write transaction
125 ///
126 /// Returns error if store is paused or sealed
127 fn write_txn(&self) -> Result<Self::Txn<'_>>;
128
129 /// Iterate events in a range
130 ///
131 /// - `from`: Starting event ID (inclusive)
132 /// - `to`: Optional ending event ID (exclusive)
133 fn iter_events(&self, from: EventId, to: Option<EventId>) -> Result<Box<dyn EventIter>>;
134
135 /// Seal the store at the current event ID
136 ///
137 /// Returns the sealed event ID. No future commits will change state
138 /// below or at this event ID.
139 fn seal(&self) -> Result<EventId>;
140
141 /// Get the lock manager for stripe locking
142 fn lock_manager(&self) -> &LockManager;
143
144 /// Pause ingestion (stop accepting new writes)
145 ///
146 /// Waits for in-flight transactions to complete
147 fn pause_ingestion(&self) -> Result<()>;
148
149 /// Resume ingestion (allow new writes)
150 fn resume_ingestion(&self) -> Result<()>;
151
152 /// Check if ingestion is paused
153 fn is_paused(&self) -> bool;
154
155 /// Create a backup
156 fn backup_to(&self, dir: &Path) -> Result<BackupInfo>;
157
158 /// Restore from a backup
159 fn restore_from(dir: &Path, cfg: crate::config::CanonicalConfig) -> Result<Self>
160 where
161 Self: Sized;
162
163 /// Get store metadata
164 fn meta(&self) -> Result<CanonicalMeta>;
165}