1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
//! WAL + durability helpers for `PersistentARTrieChar<V, S>`.
//!
//! Split out of char `dict_impl_char.rs` (lines ~346-426, ~81 LOC)
//! as the twenty-second Phase-6 char sub-module. Methods covered:
//!
//! - `durability_policy` / `set_durability_policy`
//! - `append_to_wal` (pub(super); routes through group commit when enabled)
//! - `sync_wal` (pub(super); respects full durability policies)
#[cfg(feature = "group-commit")]
use std::sync::Arc;
use crate::persistent_artrie::block_storage::BlockStorage;
use crate::persistent_artrie::dict_impl::DurabilityPolicy;
use crate::persistent_artrie::error::{PersistentARTrieError, Result};
use crate::persistent_artrie::wal::WalRecord;
use crate::value::DictionaryValue;
impl<V: DictionaryValue, S: BlockStorage> super::PersistentARTrieChar<V, S> {
/// Get the current durability policy.
///
/// The durability policy controls when fsync is called after WAL writes.
/// See [`DurabilityPolicy`] for available options and their trade-offs.
pub fn durability_policy(&self) -> DurabilityPolicy {
self.durability_policy.load()
}
/// Set the durability policy for this trie.
///
/// The durability policy controls when fsync is called after WAL writes,
/// providing a trade-off between durability and performance.
///
/// # Arguments
///
/// * `policy` - The new durability policy
///
/// # Example
///
/// ```rust,no_run
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// use libdictenstein::persistent_artrie_char::{PersistentARTrieChar, DurabilityPolicy};
///
/// let mut trie: PersistentARTrieChar<()> = PersistentARTrieChar::create("words.trie")?;
///
/// // Use periodic sync for better performance (accepts bounded data loss)
/// trie.set_durability_policy(DurabilityPolicy::Periodic);
/// # Ok(())
/// # }
/// ```
pub fn set_durability_policy(&self, policy: DurabilityPolicy) {
self.durability_policy.store(policy);
}
// ==================== End Epoch-Based Checkpointing Methods ====================
/// Append a record to the WAL, returning the assigned WAL **LSN** (in the
/// WAL-writer LSN domain — the same domain `WalRecord::Checkpoint` and
/// recovery use). This is the foundation of the lock-free **Order-A** durable
/// write path: the returned LSN is durable-per-policy at return (group-commit
/// blocks on the batch fsync; the direct path verifies `synced_lsn >= lsn`),
/// so the WAL record is durable BEFORE the caller performs the
/// visibility-publishing root CAS. Returns `0` when no WAL writer is
/// installed (no durability is available — Order-A callers MUST treat a `0`
/// return as "no WAL" and refuse to acknowledge durability).
pub(super) fn append_to_wal_returning_lsn(&self, record: WalRecord) -> Result<u64> {
self.append_to_wal_inner(record)
}
/// **Order-A replay-order fix (design C′, step 2.5).** Append + sync a
/// [`WalRecord::CommitRank`] binding the durable data record at `data_lsn` to
/// the commit `generation` its visibility CAS landed at, returning the rank
/// record's own LSN. Called AFTER the visibility CAS wins and BEFORE the op is
/// acked, so it STRENGTHENS Order-A (an ack now also waits for the rank to be
/// durable). Recovery's `reconcile_lww` consumes these to order same-term
/// replay by commit generation instead of WAL physical/LSN order.
///
/// Returns `0` when no WAL writer is installed (same convention as
/// [`Self::append_to_wal_returning_lsn`]).
pub(super) fn append_commit_rank(
&self,
data_lsn: u64,
term: &[u8],
generation: u64,
) -> Result<u64> {
self.append_to_wal_returning_lsn(WalRecord::CommitRank {
data_lsn,
term: term.to_vec(),
generation,
})
}
/// Shared body for [`Self::append_to_wal`] / [`Self::append_to_wal_returning_lsn`].
fn append_to_wal_inner(&self, record: WalRecord) -> Result<u64> {
// A durable mutation is being logged: the in-memory trie is diverging
// from the last checkpoint's on-disk image, so any published eviction
// registry now references potentially-stale on-disk data. Invalidate it
// here — the single chokepoint every public mutation passes through — so
// eviction cannot unswizzle a live node onto a stale disk location until
// the next checkpoint rebuilds a fresh registry. No-op when eviction is
// disabled. See `invalidate_eviction_registry` for the full rationale.
self.invalidate_eviction_registry();
let wal_bytes = record.serialized_size();
// Check if group commit is enabled first. F4: clone the coordinator Arc out
// under a BRIEF lock then RELEASE it before `append_with_sync` (which may
// block on fsync) — never hold the subsystem mutex across I/O.
#[cfg(feature = "group-commit")]
{
let gc = self
.group_commit
.lock()
.expect("group_commit mutex poisoned")
.as_ref()
.map(Arc::clone);
if let Some(gc) = gc {
let appended_lsn =
gc.append_with_sync(record)
.map_err(|e| PersistentARTrieError::WalError {
reason: format!("{:?}", e),
})?;
self.record_epoch_operation(wal_bytes);
self.verify_full_policy_sync_coverage(appended_lsn)?;
return Ok(appended_lsn);
}
}
// Fall back to direct WAL write
if let Some(ref wal_writer) = self.wal_writer {
let appended_lsn =
wal_writer
.append(record)
.map_err(|e| PersistentARTrieError::WalError {
reason: format!("{:?}", e),
})?;
self.record_epoch_operation(wal_bytes);
self.sync_wal_after_append(appended_lsn)?;
return Ok(appended_lsn);
}
Ok(0)
}
fn sync_wal_after_append(&self, appended_lsn: u64) -> Result<()> {
match self.durability_policy.load() {
DurabilityPolicy::Immediate | DurabilityPolicy::GroupCommit => {}
DurabilityPolicy::Periodic | DurabilityPolicy::None => return Ok(()),
}
// Group commit handles syncing internally via append_with_sync.
#[cfg(feature = "group-commit")]
if self
.group_commit
.lock()
.expect("group_commit mutex poisoned")
.is_some()
{
return self.verify_full_policy_sync_coverage(appended_lsn);
}
if let Some(ref wal_writer) = self.wal_writer {
let synced_lsn = wal_writer
.sync()
.map_err(|e| PersistentARTrieError::WalError {
reason: format!("{:?}", e),
})?;
if synced_lsn < appended_lsn {
return Err(PersistentARTrieError::Wal(format!(
"char WAL sync failed to cover appended LSN {appended_lsn}; synced {synced_lsn}"
)));
}
}
Ok(())
}
#[cfg(feature = "group-commit")]
fn verify_full_policy_sync_coverage(&self, appended_lsn: u64) -> Result<()> {
match self.durability_policy.load() {
DurabilityPolicy::Immediate | DurabilityPolicy::GroupCommit => {}
DurabilityPolicy::Periodic | DurabilityPolicy::None => return Ok(()),
}
if let Some(ref wal_writer) = self.wal_writer {
let synced_lsn = wal_writer.synced_lsn();
if synced_lsn < appended_lsn {
return Err(PersistentARTrieError::Wal(format!(
"char WAL sync failed to cover appended LSN {appended_lsn}; synced {synced_lsn}"
)));
}
}
Ok(())
}
}