1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
//! `IoUringDiskManager`-specific constructors for `PersistentARTrie<V>`.
//!
//! Split out of byte `dict_impl.rs` (lines ~1113-1445, ~333 LOC) as
//! the twelfth Phase-5 byte sub-module. These constructors
//! (`create_with_io_uring`, `open_with_io_uring`) are feature-gated
//! on `io-uring-backend` and target the `IoUringDiskManager` storage
//! backend. The MmapDiskManager (default) constructors live in
//! `dict_impl.rs`.
#![cfg(feature = "io-uring-backend")]
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc;
use log::warn;
use crate::sync_compat::RwLock;
use crate::value::DictionaryValue;
use super::arena_manager::ArenaManager;
use super::block_storage::BlockStorage;
use super::buffer_manager::BufferManager;
use super::dict_impl::{DurabilityPolicy, PersistentARTrie};
use super::disk_load::read_root_descriptor_arena_count;
use super::error::{PersistentARTrieError, Result};
use super::recovery::RecoveryManager;
use super::wal::{AsyncWalConfig, AsyncWalWriter, WalConfig};
use super::{IoUringDiskManager, DEFAULT_BUFFER_POOL_SIZE};
impl<V: DictionaryValue> PersistentARTrie<V, IoUringDiskManager> {
/// **io_uring twin of the mmap `install_overlay_on_create`.** A freshly created
/// io_uring byte trie builds the lock-free overlay directly; the overlay is the
/// SOLE representation for ALL `V`. The mmap `install_overlay_on_create` lives in
/// the default-`S` (`MmapDiskManager`) impl block and is not visible here, so the
/// `IoUringDiskManager` create path needs its own. The shared
/// `install_overlay_on_create` / `install_overlay` defaults are on the
/// `<V, S: BlockStorage>` `LockFreeOverlay` block (visible for any `S`). Fresh WAL
/// ⇒ the Overlay stamp MUST take; a failure to engage ⇒ hard error (V-2). NB byte's
/// counter monomorph is `u64` (char's is also `u64`).
fn install_overlay_on_create(self) -> Result<Self> {
<Self as crate::persistent_artrie_core::overlay::flip::LockFreeOverlay<
crate::persistent_artrie_core::key_encoding::ByteKey,
_,
_,
>>::install_overlay_on_create(self)
}
/// Create a new persistent dictionary backed by io_uring + O_DIRECT.
///
/// This uses `IoUringDiskManager` instead of `MmapDiskManager`, which:
/// - Bypasses the kernel page cache (O_DIRECT) to eliminate double caching
/// - Uses io_uring for async I/O with predictable latency
/// - Supports batched block submissions for better throughput
pub fn create_with_io_uring<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
if path.exists() {
return Err(PersistentARTrieError::io_error(
"create",
path.display().to_string(),
std::io::Error::new(
std::io::ErrorKind::AlreadyExists,
"Dictionary file already exists",
),
));
}
let disk_manager = IoUringDiskManager::create(path)?;
let buffer_manager = BufferManager::new(disk_manager, DEFAULT_BUFFER_POOL_SIZE);
let buffer_manager = Arc::new(RwLock::new(buffer_manager));
let wal_path = path.with_extension("wal");
let async_config = AsyncWalConfig {
pending_dir: path.parent().unwrap_or(Path::new(".")).join("wal_pending"),
..Default::default()
};
let archive_config = WalConfig::default();
let wal_writer =
AsyncWalWriter::create(&wal_path, async_config, archive_config).map_err(|e| {
PersistentARTrieError::io_error(
"create_wal",
wal_path.display().to_string(),
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
let wal_writer = Arc::new(wal_writer);
let arena_manager = ArenaManager::with_buffer_manager(Arc::clone(&buffer_manager));
let arena_manager = Arc::new(RwLock::new(arena_manager));
// M4b EDIT 1: flip a fresh eligible-V trie to the overlay (no-op for arbitrary V).
Self::install_overlay_on_create(Self {
term_count: AtomicUsize::new(0),
dirty: AtomicBool::new(false),
buffer_manager: Some(buffer_manager),
wal_writer: Some(wal_writer),
next_lsn: std::sync::atomic::AtomicU64::new(1),
prefetcher: super::prefetch::Prefetcher::new(),
arena_manager: Some(arena_manager),
durability_policy: crate::persistent_artrie_core::shared_access::AtomicEnumCell::new(
DurabilityPolicy::default(),
),
epoch_manager: Arc::new(super::concurrency::EpochManager::new()),
stats: Arc::new(super::concurrency::TrieStats::new()),
eviction_coordinator: std::sync::Mutex::new(None),
#[cfg(feature = "persistent-artrie")]
lockfree_root: None,
#[cfg(feature = "persistent-artrie")]
lockfree_cache: None,
#[cfg(feature = "persistent-artrie")]
cas_retries: std::sync::atomic::AtomicU64::new(0),
// install_overlay_on_create above for eligible V; arbitrary V stays owned.
// M2b: fresh on-disk trie (empty WAL) — watermark base + commit_seq 0.
committed_watermark:
crate::persistent_artrie_core::committed_watermark::CommittedWatermark::new(0),
checkpoint_lock: std::sync::Arc::new(parking_lot::Mutex::new(())),
merge_lock: std::sync::Arc::new(parking_lot::Mutex::new(())),
commit_seq: std::sync::atomic::AtomicU64::new(0),
})
}
/// Open an existing persistent dictionary from disk using io_uring + O_DIRECT.
///
/// This opens an existing dictionary file and replays the WAL if needed
/// to recover from any crash, using `IoUringDiskManager` for block I/O.
pub fn open_with_io_uring<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
if !path.exists() {
return Err(PersistentARTrieError::io_error(
"open",
path.display().to_string(),
std::io::Error::new(std::io::ErrorKind::NotFound, "Dictionary file not found"),
));
}
super::compaction_impl::recover_in_place_compaction_finalization(path)?;
let disk_manager = IoUringDiskManager::open(path)?;
let root_ptr = disk_manager.root_ptr()?;
let _entry_count = disk_manager.entry_count()?;
let storage_block_count = disk_manager.block_count()?;
let arena_count = if root_ptr != 0 {
match read_root_descriptor_arena_count(&disk_manager, root_ptr) {
Ok(count) if count <= storage_block_count.saturating_sub(1) => count,
Ok(count) => {
warn!(
"Ignoring invalid root descriptor arena_count {} for {} storage blocks",
count, storage_block_count
);
0
}
Err(e) => {
warn!("Failed to read root descriptor arena_count: {:?}", e);
0
}
}
} else {
0
};
let buffer_manager = BufferManager::new(disk_manager, DEFAULT_BUFFER_POOL_SIZE);
let buffer_manager = Arc::new(RwLock::new(buffer_manager));
let arena_manager = ArenaManager::with_buffer_manager(Arc::clone(&buffer_manager));
let arena_manager = Arc::new(RwLock::new(arena_manager));
if arena_count > 0 {
let mut am = arena_manager.write();
am.clear_for_loading();
let mut load_failed = false;
for block_id in 1..=arena_count {
if let Err(e) = am.load_arena(block_id) {
warn!("Failed to load arena block {}: {:?}", block_id, e);
am.clear_for_loading();
am.ensure_valid();
load_failed = true;
break;
}
}
if !load_failed {
let count = am.arena_count();
am.set_active_arena(count.saturating_sub(1));
}
}
let wal_path = path.with_extension("wal");
let (recovered_ops, next_lsn, checkpoint_lsn) = if wal_path.exists() {
let recovery_manager = RecoveryManager::new(&wal_path);
match recovery_manager.recover() {
Ok(state) => {
let lsn = state.next_lsn;
let cp_lsn = state.stats.checkpoint_lsn;
(state.into_operations(), lsn, cp_lsn)
}
Err(e) => {
warn!("WAL recovery error: {:?}", e);
(Vec::new(), 1, None)
}
}
} else {
(Vec::new(), 1, None)
};
let async_config = AsyncWalConfig {
pending_dir: path.parent().unwrap_or(Path::new(".")).join("wal_pending"),
..Default::default()
};
let archive_config = WalConfig::default();
let wal_writer = AsyncWalWriter::open_or_create(&wal_path, async_config, archive_config)
.map_err(|e| {
PersistentARTrieError::io_error(
"open_wal",
wal_path.display().to_string(),
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
let wal_writer = Arc::new(wal_writer);
// M2b — Order-A durable-overlay recovery seeding (mirrors mmap `open`):
// watermark base = recovered durable WAL frontier (`next_lsn - 1`),
// commit_seq seed = max(durable header floor, surviving CommitRank
// generation) (the A.2 cross-restart fix). One-time WAL scan on open. See the
// mmap `open` body for the full rationale.
// F7 FIX C: watermark base = max LSN over ALL segments (archive + active), so a
// converted/under-load file's archived committed tail is covered before the first
// post-conversion checkpoint (else a BatchIncrement delta double-applies). Falls
// back to the active-only frontier when no segments are enumerable. (io_uring twin
// of the mmap ctor's FIX-C seed.)
let recovered_frontier = {
let archive_config_for_base = WalConfig::default();
let full_max = wal_writer
.collect_wal_segments(&archive_config_for_base)
.ok()
.and_then(|segments| AsyncWalWriter::max_lsn_in_segments(&segments));
full_max
.unwrap_or_else(|| next_lsn.saturating_sub(1))
.max(next_lsn.saturating_sub(1))
};
let commit_seq_seed = {
let mut max_commit_seq_gen = 0u64;
if wal_path.exists() {
use crate::persistent_artrie_core::wal::{WalReader, WalRecord};
if let Ok(mut reader) = WalReader::new(&wal_path) {
while let Some(result) = reader.next_record() {
match result {
Ok((_lsn, WalRecord::CommitRank { generation, .. })) => {
max_commit_seq_gen = max_commit_seq_gen.max(generation);
}
Ok(_) => {}
Err(_) => break,
}
}
}
}
wal_writer.commit_seq_floor().max(max_commit_seq_gen)
};
// The on-disk rank-regime + the F5 gate (read up-front so F5 can avoid
// installing the owned dense tree). No-drift with the byte mmap ctor.
let rank_regime = {
use crate::persistent_artrie_core::wal::WalReader;
WalReader::read_header(&wal_path)
.map(|h| h.regime())
.unwrap_or(crate::persistent_artrie_core::wal::RankRegime::Owned)
};
let use_f5 = {
use crate::persistent_artrie_core::overlay::flip::LockFreeOverlay;
<Self as LockFreeOverlay<
crate::persistent_artrie_core::key_encoding::ByteKey,
V,
IoUringDiskManager,
>>::USE_F5_REOPEN_LOADER
&& rank_regime == crate::persistent_artrie_core::wal::RankRegime::Overlay
};
// **F7 convert gate** (io_uring twin): an Owned-regime eligible file is CONVERTED
// into the overlay (rotate-if-records-non-empty → stamp → F5 build → archive-aware
// drain). io_uring has no legacy/f5 test ctors, so the convert is gated on the F5
// const directly (always true). Ineligible V stays owned.
let convert_owned = {
use crate::persistent_artrie_core::overlay::flip::LockFreeOverlay;
<Self as LockFreeOverlay<
crate::persistent_artrie_core::key_encoding::ByteKey,
V,
IoUringDiskManager,
>>::USE_F5_REOPEN_LOADER
&& rank_regime == crate::persistent_artrie_core::wal::RankRegime::Owned
};
// L3.3c (BLOCKER#4, io_uring twin): no eager owned pre-load; the owned `dict.root` is a
// vestigial EMPTY placeholder (deleted at L3.3c-C2). The REAL codec `image_loaded` (with
// the in-loader Err→empty fallback) drives the WAL drain-skip — not a separate eager
// probe that could disagree with the codec on a corrupt-NODE image and brick the reopen.
// L3.3c: the owned root is gone; the overlay (built below via `load_root_immutable`)
// is the sole representation. The legacy owned term counter starts at 0.
let initial_term_count = 0usize;
let mut dict = Self {
term_count: AtomicUsize::new(initial_term_count),
dirty: AtomicBool::new(false),
buffer_manager: Some(buffer_manager),
wal_writer: Some(Arc::clone(&wal_writer)),
next_lsn: std::sync::atomic::AtomicU64::new(next_lsn),
prefetcher: super::prefetch::Prefetcher::new(),
arena_manager: Some(arena_manager),
durability_policy: crate::persistent_artrie_core::shared_access::AtomicEnumCell::new(
DurabilityPolicy::default(),
),
epoch_manager: Arc::new(super::concurrency::EpochManager::new()),
stats: Arc::new(super::concurrency::TrieStats::new()),
eviction_coordinator: std::sync::Mutex::new(None),
#[cfg(feature = "persistent-artrie")]
lockfree_root: None,
#[cfg(feature = "persistent-artrie")]
lockfree_cache: None,
#[cfg(feature = "persistent-artrie")]
cas_retries: std::sync::atomic::AtomicU64::new(0),
// M2b: seed watermark base + commit_seq from recovery.
committed_watermark:
crate::persistent_artrie_core::committed_watermark::CommittedWatermark::new(
recovered_frontier,
),
checkpoint_lock: std::sync::Arc::new(parking_lot::Mutex::new(())),
merge_lock: std::sync::Arc::new(parking_lot::Mutex::new(())),
commit_seq: std::sync::atomic::AtomicU64::new(commit_seq_seed),
};
// F5 trait methods resolve through the seam.
#[allow(unused_imports)]
use crate::persistent_artrie_core::overlay::flip::LockFreeOverlay;
if convert_owned {
// ===== F7 CONVERT PATH (Owned-regime eligible → overlay; io_uring twin) =====
// Rotate-if-records-non-empty → stamp Overlay (+ fsync, OBL-1) → F5 build →
// archive-aware drain (FIX B) with the REAL (loaded_from_disk, image
// checkpoint_lsn) (OBL-2; `checkpoint_lsn` is read PRE-rotate = the image redo
// frontier). A `?` aborts open with the durable state intact.
let _ = recovered_ops;
let archive_config = WalConfig::default();
dict.convert_owned_to_overlay_on_reopen(
root_ptr,
/* was_loaded_from_disk */ root_ptr != 0,
checkpoint_lsn.unwrap_or(0),
&archive_config,
)?;
dict.dirty.store(false, AtomicOrdering::Release);
} else if use_f5 {
// ===== F5 PATH (Overlay-regime; direct dense→overlay) =====
// A corrupt/absent image ⇒ `image_loaded = false` (in-loader fallback) ⇒ empty
// overlay + full WAL drain (corrupt-descriptor parity).
let (_lc, image_loaded) = dict.load_root_immutable(root_ptr)?;
let effective_loaded = (root_ptr != 0) && image_loaded;
// **F7 FIX B:** drain ALL segments (archive + active) into the overlay (not
// active-only), so an Overlay tail archived under load (or a post-S2-crash
// converted file reopened as Overlay) recovers its archived tail. OBL-2:
// image_checkpoint_lsn = the recovery `checkpoint_lsn`. RES-3 fail-loud (FIX E).
let _ = recovered_ops;
let archive_config = WalConfig::default();
let _applied = dict.reconcile_and_drain_overlay(
&archive_config,
/* loaded_from_disk */ effective_loaded,
if effective_loaded {
checkpoint_lsn.unwrap_or(0)
} else {
0
},
)?;
dict.dirty.store(false, AtomicOrdering::Release);
}
Ok(dict)
}
}