1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
//! Epoch-based checkpoint tracking for `PersistentARTrieChar<V, S>`.
//!
//! Split out of char `dict_impl_char.rs` (lines ~422-552, ~131 LOC)
//! as the eighteenth Phase-6 char sub-module. Methods covered:
//!
//! - `enable_epoch_checkpointing` (+ `_default`, `_high_throughput`,
//! `_low_latency` convenience variants)
//! - `disable_epoch_checkpointing`
//! - `has_epoch_checkpointing`
//! - `record_epoch_operation`
//! - `current_epoch_id`
//! - `force_epoch_checkpoint`
//! - `last_durable_epoch`
//! - `epoch_stats` / `epoch_metadata` / `epoch_config`
use std::sync::Arc;
use crate::persistent_artrie::block_storage::BlockStorage;
use crate::persistent_artrie::epoch::{
CheckpointManager, EpochConfig, EpochId, EpochMetadata, EpochStats,
};
use crate::persistent_artrie::error::{PersistentARTrieError, Result};
use crate::value::DictionaryValue;
impl<V: DictionaryValue, S: BlockStorage> super::PersistentARTrieChar<V, S> {
// ==================== Epoch-Based Checkpointing Methods ====================
/// Enables epoch-based checkpoint tracking.
///
/// The checkpoint manager tracks successful WAL appends and advances epoch
/// metadata based on configurable thresholds:
/// - Operation count per epoch
/// - WAL size limit
/// - Time-based epoch duration
///
/// Public mutation APIs record epoch operations automatically after their
/// WAL append succeeds. Use [`Self::force_epoch_checkpoint`] to publish the
/// current trie checkpoint and then mark the previous epoch durable.
///
/// **Important:** The checkpoint manager creates its own WAL in a subdirectory.
/// [`Self::record_epoch_operation`] remains available for external/manual WAL
/// records, but ordinary trie mutations should not call it directly.
///
/// # Arguments
/// * `config` - Configuration for epoch thresholds and behavior
///
/// # Returns
/// * `Ok(())` - Checkpoint manager enabled successfully
/// * `Err(_)` - Failed to initialize (e.g., directory creation failed)
///
/// # Example
/// ```text
/// // Enable with custom thresholds
/// let config = EpochConfig {
/// epoch_duration: Duration::from_millis(500),
/// max_ops_per_epoch: 5000,
/// max_wal_size_bytes: 32 * 1024 * 1024, // 32MB
/// ..EpochConfig::default()
/// };
/// trie.enable_epoch_checkpointing(config)?;
/// ```
///
/// **F4:** `&self` (subsystem family). Manager built outside the field lock;
/// re-arm = take-old-then-drop-guard-then-drop-old (V11.3 site 9).
pub fn enable_epoch_checkpointing(&self, config: EpochConfig) -> Result<()> {
// Create epoch subdirectory based on the trie's file path
let epoch_dir = if let Some(ref path) = self.file_path {
path.with_extension("epoch")
} else {
return Err(PersistentARTrieError::internal(
"Cannot enable epoch checkpointing without a file path",
));
};
let manager = Arc::new(CheckpointManager::new(&epoch_dir, config)?);
let old = {
let mut slot = self
.checkpoint_manager
.lock()
.expect("checkpoint_manager mutex poisoned");
slot.replace(manager)
};
drop(old);
Ok(())
}
/// Enables epoch-based checkpoint tracking with default configuration.
pub fn enable_epoch_checkpointing_default(&self) -> Result<()> {
self.enable_epoch_checkpointing(EpochConfig::default())
}
/// Enables epoch-based checkpoint tracking with high-throughput configuration.
///
/// Uses longer epochs and higher operation limits, suitable for
/// batch processing workloads.
pub fn enable_epoch_checkpointing_high_throughput(&self) -> Result<()> {
self.enable_epoch_checkpointing(EpochConfig::high_throughput())
}
/// Enables epoch-based checkpoint tracking with low-latency configuration.
///
/// Uses shorter epochs for lower-latency epoch rotation, suitable for
/// real-time tracking.
pub fn enable_epoch_checkpointing_low_latency(&self) -> Result<()> {
self.enable_epoch_checkpointing(EpochConfig::low_latency())
}
/// Disables epoch-based checkpointing.
///
/// The checkpoint manager is stopped and dropped. Any pending
/// checkpoint operations complete before this returns.
///
/// **F4 drop-before-join (V11.3 site 7):** take the manager into a temporary so
/// the field-mutex guard DROPS before the old `Arc`'s `Drop` joins its thread.
pub fn disable_epoch_checkpointing(&self) {
let old = self
.checkpoint_manager
.lock()
.expect("checkpoint_manager mutex poisoned")
.take();
drop(old);
}
/// Returns whether epoch-based checkpointing is enabled.
pub fn has_epoch_checkpointing(&self) -> bool {
self.checkpoint_manager
.lock()
.expect("checkpoint_manager mutex poisoned")
.is_some()
}
/// Records an externally managed operation in the current epoch.
///
/// Public trie mutation APIs call this automatically after successful WAL
/// appends. Call this manually only for WAL records written outside the
/// trie's normal mutation path. The `wal_bytes` parameter should be the size
/// of the WAL record written.
///
/// # Returns
/// The current epoch ID, or None if checkpointing is not enabled.
pub fn record_epoch_operation(&self, wal_bytes: usize) -> Option<EpochId> {
self.checkpoint_manager
.lock()
.expect("checkpoint_manager mutex poisoned")
.as_ref()
.map(|cm| cm.record_operation(wal_bytes))
}
/// Returns the current epoch ID.
pub fn current_epoch_id(&self) -> Option<EpochId> {
self.checkpoint_manager
.lock()
.expect("checkpoint_manager mutex poisoned")
.as_ref()
.map(|cm| cm.current_epoch_id())
}
/// Forces an immediate trie checkpoint and epoch metadata publication.
///
/// This first persists and verifies the trie checkpoint through
/// [`Self::checkpoint`], then advances to a new epoch and marks the previous
/// epoch durable. The ordering is intentional: epoch metadata is published
/// only after the trie state itself is durable.
///
/// # Returns
/// * `Some(Ok(epoch_id))` - The new current epoch ID after checkpoint publication
/// * `Some(Err(_))` - The trie checkpoint or epoch metadata publication failed
/// * `None` - Checkpoint manager not enabled
///
/// **F4:** `&self` — clones the manager Arc out under a BRIEF lock (released
/// before `self.checkpoint()`, which takes CK + OR — never under the
/// checkpoint-manager field mutex).
pub fn force_epoch_checkpoint(&self) -> Option<Result<EpochId>> {
let checkpoint_manager = self
.checkpoint_manager
.lock()
.expect("checkpoint_manager mutex poisoned")
.as_ref()
.cloned()?;
Some((|| {
self.checkpoint()?;
checkpoint_manager.force_checkpoint()
})())
}
/// Returns the last durable (fully checkpointed) epoch ID.
pub fn last_durable_epoch(&self) -> Option<EpochId> {
self.checkpoint_manager
.lock()
.expect("checkpoint_manager mutex poisoned")
.as_ref()
.and_then(|cm| cm.last_durable_epoch())
}
/// Returns epoch statistics.
pub fn epoch_stats(&self) -> Option<EpochStats> {
self.checkpoint_manager
.lock()
.expect("checkpoint_manager mutex poisoned")
.as_ref()
.map(|cm| cm.stats())
}
/// Returns metadata for recent epochs.
pub fn epoch_metadata(&self) -> Option<Vec<EpochMetadata>> {
self.checkpoint_manager
.lock()
.expect("checkpoint_manager mutex poisoned")
.as_ref()
.map(|cm| cm.epoch_metadata())
}
/// Returns the configuration for epoch checkpointing.
///
/// **F4:** returns an OWNED `EpochConfig` (clone) — the field is now behind a
/// `Mutex`, so a `&EpochConfig` borrow into it can't outlive the guard.
pub fn epoch_config(&self) -> Option<EpochConfig> {
self.checkpoint_manager
.lock()
.expect("checkpoint_manager mutex poisoned")
.as_ref()
.map(|cm| cm.config().clone())
}
}