1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
//! Persistence + durability + observability API for `PersistentVocabARTrie` — OVERLAY-ONLY (V6).
//!
//! - `is_dirty` — observability
//! - `checkpoint` — publish the lock-free overlay image (retaining the WAL)
//! - `sync` / `sync_to_disk` / `sync_to_disk_async` / `rotate_wal` — WAL durability
//! - `current_lsn` / `synced_lsn`
//! - `durability_policy` / `set_durability_policy`
//! - `enable_slot_tracking` / `flush_sequential`
#![allow(dead_code)]
use std::sync::atomic::Ordering;
use crate::persistent_artrie::block_storage::BlockStorage;
use crate::persistent_artrie::dict_impl::DurabilityPolicy;
use crate::persistent_artrie::error::{PersistentARTrieError, Result};
use crate::persistent_artrie::wal::WalRecord;
use super::sync_handle::VocabSyncHandle;
use super::types::{VocabTrieFileHeader, VOCAB_HEADER_VERSION_V2, VOCAB_TRIE_MAGIC};
impl<S: BlockStorage> super::dict_impl::PersistentVocabARTrie<S> {
pub fn is_dirty(&self) -> bool {
self.dirty.load(Ordering::Acquire)
}
/// Checkpoint current state to disk — publishes the lock-free overlay image (retaining the
/// WAL). The owned tree is deleted; this is a thin wrapper over `checkpoint_overlay`.
pub fn checkpoint(&self) -> Result<()> {
self.checkpoint_overlay()
}
/// Lock-free OVERLAY checkpoint — the durable-snapshot path (mirrors char's
/// `publish_immutable_snapshot_retaining_wal`).
///
/// Captures the COMMITTED watermark BEFORE the root load (the safe `checkpoint_lsn`: the
/// snapshot is guaranteed to contain every write `<= w`; appended-but-uncommitted writes
/// beyond `w` stay in the WAL), serializes the immutable overlay into the dense char-arena
/// image, writes the VOCB header (version 2 = overlay; `root_ptr` = the root NODE
/// `SwizzledPtr.to_raw()`), then appends a `Checkpoint` record + syncs and RETAINS the WAL
/// (no destructive truncate — reversible + non-double-counting via the Checkpoint gate;
/// idempotent InsertOnce replay tolerates re-applying `(w, frontier]`). `mark_committed` on
/// the Checkpoint record (#49) keeps the watermark == the committed frontier; the commit_seq
/// floor (S5-2) keeps post-checkpoint ops out-ranking pre-checkpoint survivors.
fn checkpoint_overlay(&self) -> Result<()> {
use std::time::{SystemTime, UNIX_EPOCH};
let entry_count = self.entry_count.load(Ordering::Acquire);
if !self.dirty.load(Ordering::Acquire) && entry_count == 0 {
return Ok(());
}
// (0) Capture watermark + commit_seq floor BEFORE the root load (Order-A safe lsn).
let checkpoint_lsn = self
.committed_watermark
.watermark()
.max(self.committed_watermark.take_recovery_image_coverage());
let commit_seq_floor = self.commit_seq.load(Ordering::Acquire);
// (1) Serialize the immutable overlay root (empty -> root_ptr 0). CX-universal: the
// PATH-COMPRESSED serializer (proven no-truncation; the loader expands prefixes on reopen,
// which vocab inherits from the char loader). Uncompressed prefix_len=0 images still load.
let root_ptr_raw: u64 = match self.lockfree_root.as_ref().and_then(|r| r.load()) {
Some(root) if entry_count > 0 => {
self.serialize_overlay_snapshot_compressed(&root)?.to_raw()
}
_ => 0,
};
// Flush the arenas to disk so block_count reflects the serialized overlay nodes (mirrors
// the owned persist_to_disk's arena flush; without it block_count stays at the create-time
// value and reopen skips arena loading -> "arena has 0 nodes").
if let Some(ref arena_manager) = self.arena_manager {
arena_manager.write().flush()?;
}
// (2) Write the VOCB header (version 2 = overlay image; no owned reverse index).
let buffer_manager = self.buffer_manager.as_ref().ok_or_else(|| {
PersistentARTrieError::internal("No buffer manager for overlay checkpoint")
})?;
let block_count = {
let bm = buffer_manager.read();
bm.storage().block_count().unwrap_or(1)
};
let mut header = VocabTrieFileHeader {
magic: VOCAB_TRIE_MAGIC,
version: VOCAB_HEADER_VERSION_V2,
_reserved: [0; 3],
root_ptr: root_ptr_raw,
entry_count: entry_count as u64,
block_count,
_pad1: 0,
checkpoint_lsn,
header_checksum: 0,
_padding: [0; 20],
start_index: self.start_index,
next_index: self.next_index.load(Ordering::Acquire),
reverse_index_capacity: 0,
_ext_padding: [0; 8],
};
{
let bm = buffer_manager.write();
let dm = bm.storage();
dm.write_header_bytes(&header.to_bytes_with_checksum())?;
bm.flush_all()?;
dm.sync()?;
}
// (3) Append Checkpoint + sync; mark it committed (#49); raise the commit_seq floor
// (S5-2); RETAIN the WAL (no truncate — the Checkpoint record gates replay).
if let Some(ref wal) = self.wal_writer {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let cp_lsn = wal
.append(WalRecord::Checkpoint {
checkpoint_lsn,
timestamp,
})
.map_err(|e| {
PersistentARTrieError::Wal(format!("append overlay checkpoint record: {e}"))
})?;
wal.sync().map_err(|e| {
PersistentARTrieError::Wal(format!("sync overlay checkpoint record: {e}"))
})?;
self.synced_lsn.fetch_max(cp_lsn, Ordering::AcqRel);
self.committed_watermark.mark_committed(cp_lsn);
wal.set_commit_seq_floor(commit_seq_floor).map_err(|e| {
PersistentARTrieError::Wal(format!("set overlay commit_seq floor: {e}"))
})?;
}
self.dirty.store(false, Ordering::Release);
Ok(())
}
/// Sync WAL to disk without full checkpoint.
pub fn sync(&self) -> Result<()> {
if let Some(ref wal) = self.wal_writer {
let lsn = wal.sync().map_err(|e| {
PersistentARTrieError::io_error(
"sync WAL",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
self.synced_lsn.fetch_max(lsn, Ordering::AcqRel);
}
Ok(())
}
/// Rotate WAL without full checkpoint serialization — syncs the WAL while retaining it for
/// replay (no overlay image written, no truncate). Crash recovery replays the WAL tail.
pub fn rotate_wal(&self) -> Result<()> {
if !self.dirty.load(Ordering::Acquire) {
return Ok(());
}
// Sync the WAL while RETAINING it (the overlay image is only published by checkpoint();
// a rotate publishes no header, so it must not truncate). WAL replay recovers on restart.
if let Some(ref wal) = self.wal_writer {
let lsn = wal.sync().map_err(|e| {
PersistentARTrieError::io_error(
"sync WAL",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
self.synced_lsn.fetch_max(lsn, Ordering::AcqRel);
}
// Do NOT clear the dirty flag: the overlay image hasn't been published. checkpoint()
// clears it after publishing the image.
Ok(())
}
/// Non-blocking sync to disk without checkpoint bookkeeping — syncs the WAL (the overlay
/// image is only published by `checkpoint()`). Returns an already-completed handle.
pub fn sync_to_disk_async(&self) -> Result<VocabSyncHandle> {
if let Some(ref wal) = self.wal_writer {
wal.sync().map_err(|e| {
PersistentARTrieError::io_error(
"sync WAL",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
}
Ok(VocabSyncHandle::already_synced())
}
/// Blocking sync to disk without checkpoint bookkeeping — `sync_to_disk_async()?.wait()`.
pub fn sync_to_disk(&self) -> Result<()> {
let handle = self.sync_to_disk_async()?;
handle
.wait()
.map_err(|e| PersistentARTrieError::internal(&e))?;
Ok(())
}
/// Get the current (next) LSN.
#[inline]
pub fn current_lsn(&self) -> u64 {
self.next_lsn.load(Ordering::Acquire)
}
/// Get the last synced LSN. `None` if nothing has been synced yet.
#[inline]
pub fn synced_lsn(&self) -> Option<u64> {
let lsn = self.synced_lsn.load(Ordering::Acquire);
if lsn == 0 {
None
} else {
Some(lsn)
}
}
/// Get the durability policy.
#[inline]
pub fn durability_policy(&self) -> DurabilityPolicy {
self.durability_policy
}
/// Set the durability policy.
#[inline]
pub fn set_durability_policy(&mut self, policy: DurabilityPolicy) {
self.durability_policy = policy;
}
/// Enable slot-level dirty tracking for reduced checkpoint I/O. Idempotent.
pub fn enable_slot_tracking(&self) {
if let Some(ref am) = self.arena_manager {
am.write().enable_slot_tracking();
}
}
/// Internal version that doesn't require &mut self for use during construction.
pub(crate) fn enable_slot_tracking_internal(&self) {
if let Some(ref am) = self.arena_manager {
am.write().enable_slot_tracking();
}
}
/// Flush dirty arenas in sequential order for optimized disk I/O.
pub fn flush_sequential(&self) -> Result<()> {
if let Some(ref am) = self.arena_manager {
am.write().flush_sequential()?;
}
Ok(())
}
}