noxu_recovery/checkpointer.rs
1//! Checkpoint daemon for Noxu DB.
2//!
3//!
4//! The Checkpointer flushes dirty IN nodes from the tree to the log in
5//! bottom-up order. This bounds recovery time and ensures durability.
6
7use crate::checkpoint_end::CheckpointEnd;
8use crate::checkpoint_start::CheckpointStart;
9use crate::checkpoint_stat::CheckpointStats;
10use crate::dirty_in_map::DirtyINMap;
11use crate::error::{RecoveryError, Result};
12use noxu_cleaner::UtilizationTracker;
13use noxu_log::entry::FileSummaryLnEntry;
14use noxu_log::entry::bin_delta_log_entry::BinDeltaLogEntry;
15use noxu_log::entry::in_log_entry::InLogEntry;
16use noxu_log::{LogEntryType, LogManager, Provisional};
17use noxu_sync::Mutex;
18use noxu_tree::tree::{Tree, TreeNode};
19use noxu_txn::TxnManager;
20use noxu_util::{Lsn, NULL_LSN};
21use parking_lot::RwLock as NodeRwLock;
22use std::collections::HashMap;
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, Condvar, RwLock};
25
26/// Configuration for checkpoint behavior.
27///
28///
29///
30/// Controls when and how checkpoints are performed.
31#[derive(Debug, Clone)]
32pub struct CheckpointConfig {
33 /// Force a checkpoint even if nothing is dirty.
34 pub force: bool,
35 /// Minimize recovery time (checkpoint all dirty nodes).
36 pub minimize_recovery_time: bool,
37 /// Bytes written between checkpoints (0 = time-based only).
38 pub bytes_interval: u64,
39 /// Milliseconds between checkpoints (0 = disabled).
40 pub time_interval: u64,
41 /// BIN-delta percent threshold (JE `TREE_BIN_DELTA` / `BIN_DELTA_PERCENT`,
42 /// 0–75, default 25). A BIN is logged as a delta only when its delta-slot
43 /// count is `<= nEntries * bin_delta_percent / 100`. See
44 /// `BinStub::should_log_delta` / JE `DatabaseImpl.getBinDeltaPercent()`.
45 pub bin_delta_percent: i32,
46}
47
48impl CheckpointConfig {
49 /// Create a new checkpoint configuration with default values.
50 pub fn new() -> Self {
51 Self::default()
52 }
53
54 /// Set force flag.
55 pub fn force(mut self, force: bool) -> Self {
56 self.force = force;
57 self
58 }
59
60 /// Set minimize recovery time flag.
61 pub fn minimize_recovery_time(mut self, minimize: bool) -> Self {
62 self.minimize_recovery_time = minimize;
63 self
64 }
65
66 /// Set bytes interval.
67 pub fn bytes_interval(mut self, bytes: u64) -> Self {
68 self.bytes_interval = bytes;
69 self
70 }
71
72 /// Set time interval in milliseconds.
73 pub fn time_interval(mut self, millis: u64) -> Self {
74 self.time_interval = millis;
75 self
76 }
77
78 /// Set the BIN-delta percent threshold (`TREE_BIN_DELTA`, 0–75).
79 pub fn bin_delta_percent(mut self, percent: i32) -> Self {
80 self.bin_delta_percent = percent;
81 self
82 }
83}
84
85impl Default for CheckpointConfig {
86 fn default() -> Self {
87 CheckpointConfig {
88 force: false,
89 minimize_recovery_time: false,
90 bytes_interval: 20_000_000, // 20MB default
91 time_interval: 0, // Time-based checkpoints disabled by default
92 // JE BIN_DELTA_PERCENT default (TREE_BIN_DELTA, 0–75).
93 bin_delta_percent: 25,
94 }
95 }
96}
97
98/// Result of a checkpoint operation.
99///
100/// Contains information about what was flushed during the checkpoint.
101#[derive(Debug, Clone)]
102pub struct CheckpointResult {
103 /// The checkpoint ID.
104 pub checkpoint_id: u64,
105 /// LSN of the CheckpointStart entry.
106 pub start_lsn: Lsn,
107 /// LSN of the CheckpointEnd entry.
108 pub end_lsn: Lsn,
109 /// Number of full INs flushed.
110 pub full_ins_flushed: u64,
111 /// Number of full BINs flushed.
112 pub full_bins_flushed: u64,
113 /// Number of delta INs flushed.
114 pub delta_ins_flushed: u64,
115 /// Time spent on checkpoint in milliseconds.
116 pub elapsed_ms: u64,
117}
118
119impl CheckpointResult {
120 /// Total nodes flushed.
121 pub fn total_nodes_flushed(&self) -> u64 {
122 self.full_ins_flushed + self.full_bins_flushed + self.delta_ins_flushed
123 }
124}
125
126/// The Checkpointer flushes dirty IN nodes to the log.
127///
128///
129///
130/// Checkpoint flushes must be done in ascending order from the bottom
131/// of the tree up. This ensures that recovery can reconstruct the tree
132/// from the checkpoint.
133///
134/// # Checkpoint Algorithm
135///
136/// 1. Generate checkpoint ID
137/// 2. Create and log CheckpointStart
138/// 3. Build dirty IN map (organized by Btree level)
139/// 4. Flush dirty INs level by level (bottom-up)
140/// - Bottom levels logged provisionally
141/// - Top level logged non-provisionally
142/// 5. Create and log CheckpointEnd
143/// 6. Update statistics
144///
145/// This implementation flushes dirty BINs via `flush_dirty_bins_internal()`,
146/// which writes full BIN or BINDelta log entries depending on the dirty-slot
147/// fraction (TREE_BIN_DELTA = 25%). Upper INs (level ≥ 2) are flushed
148/// by `flush_upper_ins_internal()` after the BIN pass, bottom-up, using
149/// `Provisional::Yes` for intermediate levels and `Provisional::No` for
150/// the root. File utilization summaries are persisted via
151/// `persist_file_summaries()` at the end of each checkpoint.
152pub struct Checkpointer {
153 /// Checkpoint statistics
154 stats: Arc<CheckpointStats>,
155 /// Next checkpoint ID
156 next_checkpoint_id: AtomicU64,
157 /// The dirty IN map for the current checkpoint
158 dirty_map: Mutex<DirtyINMap>,
159 /// LSN of the last checkpoint start
160 last_checkpoint_start: Mutex<Lsn>,
161 /// LSN of the last checkpoint end
162 last_checkpoint_end: Mutex<Lsn>,
163 /// Whether a checkpoint is in progress
164 checkpoint_in_progress: AtomicBool,
165 /// Per-database highest IN-level being flushed in the current checkpoint.
166 ///
167 /// Maps `db_id → highest dirty upper-IN level` for every tree that has
168 /// dirty upper INs in this checkpoint pass. A tree absent from the map
169 /// has no dirty upper INs → its highest flush level is 0 → an evicted BIN
170 /// from that tree gets `Provisional::No` (no covering ancestor will be
171 /// written). Cleared when the checkpoint finishes or is abandoned.
172 ///
173 /// JE ref: `DirtyINMap.highestFlushLevels` (per-`DatabaseImpl` map) /
174 /// `DirtyINMap.coordinateEvictionWithCheckpoint` / `getHighestFlushLevel`.
175 ///
176 /// CC-4 residual fix: the old single `AtomicI32` held the *global* max
177 /// across all trees, causing a BIN evicted from a tree with **no** dirty
178 /// upper INs to be logged `Provisional::Yes` (covered by a non-provisional
179 /// ancestor that the checkpoint never actually writes for that tree).
180 checkpoint_flush_levels: std::sync::Mutex<HashMap<u64, i32>>,
181 /// Shutdown flag
182 shutdown: AtomicBool,
183 /// Condvar for interruptible daemon sleep — notified by `request_shutdown()`
184 /// so the daemon thread wakes up immediately instead of waiting the full
185 /// sleep interval.
186 shutdown_condvar: Condvar,
187 /// Mutex paired with `shutdown_condvar`.
188 shutdown_mutex: std::sync::Mutex<bool>,
189 /// Configuration
190 config: CheckpointConfig,
191 /// Optional LogManager for writing CkptStart/CkptEnd WAL entries.
192 log_manager: Option<Arc<LogManager>>,
193 /// Optional Tree reference for flushing dirty BINs in step 4.
194 ///
195 /// When `None` (unit tests without a real tree) step 4 is a no-op.
196 tree: Option<Arc<RwLock<Tree>>>,
197 /// Database ID to pass to `Tree::collect_dirty_bins()`.
198 db_id: u64,
199 /// Registry of ALL open user-database trees (Stage-1 fix).
200 ///
201 /// Maps `db_id as i64` → `Arc<RwLock<Tree>>` for every database the
202 /// environment has opened. The checkpointer must flush dirty BINs from
203 /// EVERY tree, not just the primary one, so that committed LNs written to
204 /// user databases are captured in a BIN entry before `CkptEnd` is written.
205 /// JE walks a single env-wide `INList` that covers all databases;
206 /// Noxu achieves the same effect by iterating this registry.
207 ///
208 /// `None` until `with_db_trees_registry` is called (unit tests without a
209 /// full environment).
210 db_trees_registry:
211 Option<Arc<std::sync::Mutex<HashMap<i64, Arc<RwLock<Tree>>>>>>,
212 /// Bytes written to the log since the last checkpoint.
213 ///
214 /// Incremented by `wakeup_after_write()`. When this exceeds
215 /// `checkpoint_bytes_interval` a checkpoint is triggered immediately.
216 ///
217 /// Write-byte accumulation.
218 bytes_since_checkpoint: AtomicU64,
219 /// Bytes-written threshold that triggers an immediate checkpoint.
220 ///
221 /// Default: 10 MiB (10 * 1024 * 1024). Set to 0 to disable.
222 ///
223 /// REC-D: wired from `CHECKPOINTER_BYTES_INTERVAL` (default 20 MB) by the
224 /// environment via `with_bytes_interval`. JE Checkpointer ctor:
225 /// `logSizeBytesInterval = configManager.getLong(CHECKPOINTER_BYTES_INTERVAL)`.
226 checkpoint_bytes_interval: u64,
227 /// Time-based checkpoint interval in milliseconds (0 = time-based
228 /// checkpoints disabled, bytes-only).
229 ///
230 /// REC-D: wired from `CHECKPOINTER_WAKEUP_INTERVAL` by the environment via
231 /// `with_time_interval`. JE `getWakeupPeriod`: bytes-OR-time, with the
232 /// byte interval taking precedence when non-zero. The daemon paces its
233 /// own sleep at this interval; `is_runnable` consults it only when the
234 /// byte interval is disabled (matches JE `isRunnable` useTimeInterval
235 /// branch, which fires only when `logSizeBytesInterval == 0`).
236 checkpoint_time_interval_ms: u64,
237 /// Optional utilization tracker for persisting file summaries.
238 ///
239 /// When set, `persist_file_summaries()` iterates tracked summaries and
240 /// writes `FileSummaryLN` WAL entries.
241 utilization_tracker: Option<Arc<Mutex<UtilizationTracker>>>,
242 /// Optional cleaner reference for the post-checkpoint callback.
243 ///
244 /// After each successful `do_checkpoint`, the checkpointer calls
245 /// `cleaner.after_checkpoint(&state)` to advance the three-state
246 /// checkpoint barrier in `FileSelector`. X-5 fix.
247 cleaner: Option<Arc<noxu_cleaner::Cleaner>>,
248 /// Optional transaction manager for T-F3/T-F4: first-active-LSN tracking.
249 ///
250 /// When `Some`, `do_checkpoint` queries `txn_manager.get_first_active_lsn()`
251 /// and writes the result into `CkptEnd.first_active_lsn` instead of the
252 /// conservative `Lsn::new(0,0)` full-scan sentinel. This bounds the
253 /// recovery scan to entries at or after the earliest active transaction's
254 /// first logged LSN, reducing crash-recovery time.
255 ///
256 /// Safe only after Stage 1 (all user-database BINs are checkpointed);
257 /// `None` for unit tests without a full environment.
258 txn_manager: Option<Arc<TxnManager>>,
259 /// REC-S: id sources read at checkpoint time to write the real last
260 /// node/db/txn id values into `CheckpointEnd` (instead of zeros).
261 ///
262 /// JE `Checkpointer.doCheckpoint` writes `getLastLocalNodeId` /
263 /// `getLastLocalDbId` / `getLastLocalTxnId` into the `CheckpointEnd`.
264 /// `next_db_id` mirrors the env's db-id counter (last db-id = value-1);
265 /// the last txn-id is read from `txn_manager.get_last_local_txn_id()`;
266 /// the last node-id comes from the single tree-wide node counter
267 /// (`noxu_tree::peek_next_node_id_counter`, L-30). `None` keeps the old
268 /// zero behaviour for unit tests without a full environment.
269 next_db_id: Option<Arc<std::sync::atomic::AtomicI64>>,
270}
271
272impl Checkpointer {
273 /// Create a new Checkpointer.
274 ///
275 /// # Arguments
276 /// * `config` - Checkpoint configuration
277 pub fn new(config: CheckpointConfig) -> Self {
278 Self {
279 stats: Arc::new(CheckpointStats::new()),
280 next_checkpoint_id: AtomicU64::new(1),
281 dirty_map: Mutex::new(DirtyINMap::new()),
282 last_checkpoint_start: Mutex::new(noxu_util::NULL_LSN),
283 last_checkpoint_end: Mutex::new(noxu_util::NULL_LSN),
284 checkpoint_in_progress: AtomicBool::new(false),
285 checkpoint_flush_levels: std::sync::Mutex::new(HashMap::new()),
286 shutdown: AtomicBool::new(false),
287 shutdown_condvar: Condvar::new(),
288 shutdown_mutex: std::sync::Mutex::new(false),
289 config,
290 log_manager: None,
291 tree: None,
292 db_id: 0,
293 db_trees_registry: None,
294 bytes_since_checkpoint: AtomicU64::new(0),
295 checkpoint_bytes_interval: 10 * 1024 * 1024, // 10 MiB default
296 checkpoint_time_interval_ms: 0, // time-based disabled by default
297 utilization_tracker: None,
298 cleaner: None,
299 txn_manager: None,
300 next_db_id: None,
301 }
302 }
303
304 /// Set the bytes-written threshold that triggers an immediate checkpoint.
305 ///
306 ///
307 pub fn with_bytes_interval(mut self, bytes: u64) -> Self {
308 self.checkpoint_bytes_interval = bytes;
309 self
310 }
311
312 /// Set the time-based checkpoint interval (milliseconds).
313 ///
314 /// REC-D: wired from `CHECKPOINTER_WAKEUP_INTERVAL`. JE `getWakeupPeriod`
315 /// computes bytes-OR-time with bytes taking precedence; `isRunnable`
316 /// consults the time interval only when the byte interval is 0.
317 pub fn with_time_interval(mut self, millis: u64) -> Self {
318 self.checkpoint_time_interval_ms = millis;
319 self
320 }
321
322 /// Attach a LogManager so that `do_checkpoint` writes real WAL entries.
323 ///
324 /// Call this before invoking `do_checkpoint` when a writable log is
325 /// available (i.e. from `EnvironmentImpl`).
326 pub fn with_log_manager(mut self, lm: Arc<LogManager>) -> Self {
327 self.log_manager = Some(lm);
328 self
329 }
330
331 /// Attach a Tree so that `do_checkpoint` flushes dirty BINs in step 4.
332 ///
333 /// `db_id` is the database ID passed to `Tree::collect_dirty_bins()`.
334 /// `Checkpointer` receiving the environment's tree reference.
335 pub fn with_tree(mut self, tree: Arc<RwLock<Tree>>, db_id: u64) -> Self {
336 self.tree = Some(tree);
337 self.db_id = db_id;
338 self
339 }
340
341 /// Wire the env-wide db-tree registry so the checkpointer flushes ALL
342 /// user-database dirty BINs, not just the primary tree.
343 ///
344 /// This is the Stage-1 fix: JE's `Checkpointer.processINList` walks a
345 /// single env-wide `INList` covering all databases. Noxu achieves the
346 /// same effect by iterating `db_trees_registry` and flushing each tree.
347 pub fn with_db_trees_registry(
348 mut self,
349 registry: Arc<std::sync::Mutex<HashMap<i64, Arc<RwLock<Tree>>>>>,
350 ) -> Self {
351 self.db_trees_registry = Some(registry);
352 self
353 }
354
355 /// Attach a UtilizationTracker so that `persist_file_summaries()` writes
356 /// real `FileSummaryLN` WAL entries during each checkpoint.
357 ///
358 /// `Checkpointer` receiving the environment's utilization tracker.
359 pub fn with_utilization_tracker(
360 mut self,
361 tracker: Arc<Mutex<UtilizationTracker>>,
362 ) -> Self {
363 self.utilization_tracker = Some(tracker);
364 self
365 }
366
367 /// Wire a cleaner so that `do_checkpoint` calls
368 /// `cleaner.after_checkpoint()` after a successful checkpoint.
369 ///
370 /// This is the X-5 fix: it activates the three-state checkpoint barrier
371 /// (`cleaned → checkpointed → safe_to_delete`) in `FileSelector` so that
372 /// log files are only deleted after their migrations have been captured by
373 /// two successive checkpoints.
374 pub fn with_cleaner(mut self, cleaner: Arc<noxu_cleaner::Cleaner>) -> Self {
375 self.cleaner = Some(cleaner);
376 self
377 }
378
379 /// Wire the transaction manager so `do_checkpoint` can compute the real
380 /// `first_active_lsn` for `CkptEnd` (T-F3/T-F4).
381 ///
382 /// Safe to call only after Stage 1 (user-database BINs are checkpointed);
383 /// before Stage 1 a non-zero `first_active_lsn` would cause recovery to
384 /// skip committed LNs not captured in any BIN.
385 pub fn with_txn_manager(mut self, txn_manager: Arc<TxnManager>) -> Self {
386 self.txn_manager = Some(txn_manager);
387 self
388 }
389
390 /// REC-S: wire the env's db-id counter so `do_checkpoint` writes the real
391 /// last node/db/txn id values into `CheckpointEnd` instead of zeros.
392 ///
393 /// `next_db_id` is the env's `AtomicI64` (last allocated db-id =
394 /// `next_db_id - 1`). The last txn-id is read from the wired
395 /// `txn_manager`; the last node-id from the tree-wide node counter.
396 ///
397 /// JE `Checkpointer.doCheckpoint` reads `envImpl.getNodeSequence()
398 /// .getLastLocalNodeId()`, `getDbTree().getLastLocalDbId()`, and
399 /// `getTxnManager().getLastLocalTxnId()` into the `CheckpointEnd`.
400 pub fn with_id_sources(
401 mut self,
402 next_db_id: Arc<std::sync::atomic::AtomicI64>,
403 ) -> Self {
404 self.next_db_id = Some(next_db_id);
405 self
406 }
407
408 /// Accumulate bytes written and trigger a checkpoint when the threshold
409 /// is exceeded.
410 ///
411 /// Called after each WAL write from `EnvironmentImpl` (or LogManager) with
412 /// the number of bytes appended. When the running total exceeds
413 /// `checkpoint_bytes_interval` the counter is reset and
414 /// `do_checkpoint("wakeup")` is invoked synchronously.
415 ///
416 ///
417 pub fn wakeup_after_write(&self, bytes: u64) {
418 if self.checkpoint_bytes_interval == 0 {
419 return;
420 }
421 let prev =
422 self.bytes_since_checkpoint.fetch_add(bytes, Ordering::Relaxed);
423 if prev + bytes >= self.checkpoint_bytes_interval {
424 // Reset counter *before* triggering so parallel callers don't
425 // all pile in at once — best-effort, not strictly once.
426 self.bytes_since_checkpoint.store(0, Ordering::Relaxed);
427 // Ignore errors: a concurrent checkpoint may be in progress.
428 let _ = self.do_checkpoint("wakeup_after_write");
429 }
430 }
431
432 /// Whether a periodic (daemon) checkpoint should run now (JE
433 /// `Checkpointer.isRunnable`). Without this gate the daemon wrote a
434 /// checkpoint on every wakeup tick even on a fully idle environment
435 /// (wasted I/O). Returns true if:
436 /// - `force`, OR
437 /// - REC-F: the cleaner has files pending reclaim
438 /// (`needCheckpointForCleanedFiles()` → `isCheckpointNeeded()`), even
439 /// with no writes — so an idle env still reclaims cleaned files, OR
440 /// - bytes written since the last checkpoint >= the byte interval, OR
441 /// - (only when the byte interval is disabled) the time interval elapsed
442 /// AND something was written since the last checkpoint
443 /// (`bytes_since_checkpoint > 0` — JE's `lastUsedLsn !=
444 /// lastCheckpointEnd` idle-guard).
445 ///
446 /// JE ref: `Checkpointer.isRunnable` — order is force, then
447 /// `wakeupAfterNoWrites && needCheckpointForCleanedFiles()`, then the
448 /// bytes-OR-time interval (bytes takes precedence; the time branch only
449 /// runs when `logSizeBytesInterval == 0`).
450 pub fn is_runnable(&self, force: bool) -> bool {
451 if force {
452 return true;
453 }
454 // REC-F: wake for cleaner-pending files even on an idle environment.
455 // JE `isRunnable`: `if (wakeupAfterNoWrites && needCheckpointForCleanedFiles())
456 // return true;`. Noxu folds `wakeupAfterNoWrites` into the cleaner
457 // query directly — `needs_checkpoint_for_cleaned_files()` is true iff
458 // the cleaner reports CLEANED/FULLY_PROCESSED files pending reclaim.
459 if self.needs_checkpoint_for_cleaned_files() {
460 return true;
461 }
462 let bytes_since = self.bytes_since_checkpoint.load(Ordering::Relaxed);
463 if self.checkpoint_bytes_interval != 0 {
464 // Bytes interval takes precedence (JE getWakeupPeriod): when it is
465 // non-zero the time branch is never consulted.
466 return bytes_since >= self.checkpoint_bytes_interval;
467 }
468 // Time-cadence branch (only reached when the byte interval is 0): the
469 // caller (the daemon) only invokes this once per wakeup interval, so
470 // reaching here means the time interval has elapsed. JE's idle-guard
471 // (`lastUsedLsn != lastCheckpointEnd`) maps to "something was written
472 // since the last checkpoint" — i.e. bytes_since > 0. Skip the
473 // checkpoint entirely on an idle environment.
474 bytes_since > 0
475 }
476
477 /// REC-F: whether the cleaner has files pending reclaim that a checkpoint
478 /// would unblock. Mirrors JE `Checkpointer.needCheckpointForCleanedFiles`
479 /// → `cleaner.getFileSelector().isCheckpointNeeded()` (any CLEANED or
480 /// FULLY_PROCESSED files exist). Returns `false` when no cleaner is
481 /// wired.
482 fn needs_checkpoint_for_cleaned_files(&self) -> bool {
483 self.cleaner.as_ref().map(|c| c.is_checkpoint_needed()).unwrap_or(false)
484 }
485
486 /// Test-only: bump `bytes_since_checkpoint` without triggering a
487 /// checkpoint (wakeup_after_write would fire do_checkpoint at the
488 /// threshold). Used to exercise `is_runnable`.
489 #[cfg(test)]
490 pub fn note_bytes_for_test(&self, bytes: u64) {
491 self.bytes_since_checkpoint.fetch_add(bytes, Ordering::Relaxed);
492 }
493
494 /// Returns `true` if the given BIN node has been checkpointed at least
495 /// once (its `last_full_lsn` is not NULL_LSN).
496 ///
497 /// The evictor calls this before evicting a node: a node that has never
498 /// been checkpointed would be lost on eviction because it has no on-disk
499 /// representation yet.
500 ///
501 ///
502 pub fn is_checkpointed(node: &NodeRwLock<TreeNode>) -> bool {
503 let guard = node.read();
504 match &*guard {
505 TreeNode::Bottom(b) => b.last_full_lsn != NULL_LSN,
506 // Non-BIN internal nodes are always considered checkpointed for
507 // eviction purposes (they are reconstructed from their children).
508 _ => true,
509 }
510 }
511
512 /// Persist file utilization summaries to the WAL.
513 ///
514 /// Writes a `FileSummaryLN` log entry for each tracked file summary so
515 /// that utilization data survives a restart.
516 ///
517 ///
518 ///
519 /// Requires both a `LogManager` (via `with_log_manager`) and a
520 /// `UtilizationTracker` (via `with_utilization_tracker`) to be wired.
521 /// Returns `Ok(())` without writing if either is absent.
522 pub fn persist_file_summaries(&self) -> Result<()> {
523 let (Some(lm), Some(tracker_lock)) =
524 (&self.log_manager, &self.utilization_tracker)
525 else {
526 return Ok(());
527 };
528
529 // Snapshot the tracked summaries into owned values, then DROP the
530 // tracker lock BEFORE writing any FileSummaryLN. This avoids a
531 // reentrant deadlock: `lm.log()` calls the installed write observer
532 // (the same UtilizationTracker, behind the same Mutex) to
533 // countNewLogEntry for the FileSummaryLN it just wrote. JE
534 // (UtilizationProfile.putFileSummary) likewise reads the
535 // TrackedFileSummary out and then logs the FileSummaryLN without
536 // holding the tracker latch across the log write.
537 //
538 // noxu_sync::Mutex::lock() returns the guard directly (no poison).
539 let snapshot: Vec<(u32, noxu_cleaner::FileSummary, Vec<u32>)> = {
540 let tracker = tracker_lock.lock();
541 let tracked_files = tracker.get_tracked_files();
542 if tracked_files.is_empty() {
543 return Ok(());
544 }
545 tracked_files
546 .iter()
547 .map(|(file_number, tracked)| {
548 (
549 *file_number,
550 tracked.get_summary().clone(),
551 tracked.get_obsolete_offsets().to_vec(),
552 )
553 })
554 .collect()
555 };
556
557 for (file_number, summary, offsets) in &snapshot {
558 // C7: persist the full FileSummary breakdown (LN/IN totals +
559 // obsolete + maxLNSize) AND the packed obsolete-offset list, so
560 // the on-disk FileSummaryLN is as faithful as the in-memory
561 // TrackedFileSummary. JE: FileSummaryLN.writeToLog ->
562 // baseSummary.writeToLog (11 ints) + obsoleteOffsets.writeToLog.
563 let mut packed = noxu_cleaner::PackedOffsets::new();
564 packed.pack(offsets);
565 // CLN-24: attach the serialized per-file expiration histogram so
566 // the cleaner's TTL expiration prediction survives restart. JE
567 // persists this in a separate EXPIRATION DB (FileExpirationLN);
568 // Noxu folds it into the FileSummaryLN trailer. Built from the
569 // file's LN entries via the wired cleaner; empty when no cleaner
570 // is wired or the file has no expiring data.
571 let expiration_histogram = self
572 .cleaner
573 .as_ref()
574 .map(|c| c.serialize_expiration_histogram(*file_number))
575 .unwrap_or_default();
576 let entry = FileSummaryLnEntry::new(
577 *file_number as u64,
578 summary.total_count,
579 summary.total_size,
580 summary.total_in_count,
581 summary.total_in_size,
582 summary.total_ln_count,
583 summary.total_ln_size,
584 summary.max_ln_size,
585 summary.obsolete_in_count,
586 summary.obsolete_ln_count,
587 summary.obsolete_ln_size,
588 summary.obsolete_ln_size_counted,
589 packed.get_count() as u32,
590 packed.get_data().to_vec(),
591 expiration_histogram,
592 );
593 let mut buf = bytes::BytesMut::with_capacity(entry.log_size());
594 entry.write_to_log(&mut buf);
595 lm.log(
596 LogEntryType::FileSummaryLN,
597 &buf,
598 Provisional::No,
599 false,
600 false,
601 )
602 .map_err(|e| {
603 RecoveryError::CheckpointError(format!(
604 "persist_file_summaries log write failed: {e}"
605 ))
606 })?;
607 log::debug!(
608 "persist_file_summaries: wrote FileSummaryLN for file {}",
609 file_number
610 );
611 }
612 Ok(())
613 }
614
615 /// Perform a checkpoint.
616 pub fn do_checkpoint(&self, invoker: &str) -> Result<CheckpointResult> {
617 // Check if shutdown
618 if self.shutdown.load(Ordering::Acquire) {
619 return Err(RecoveryError::CheckpointError(
620 "Checkpointer has been shut down".to_string(),
621 ));
622 }
623
624 // Check if already in progress
625 if self
626 .checkpoint_in_progress
627 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
628 .is_err()
629 {
630 return Err(RecoveryError::CheckpointError(
631 "Checkpoint already in progress".to_string(),
632 ));
633 }
634
635 let start_time = std::time::Instant::now();
636
637 // Ensure we clear the in-progress flag (and flush_levels map) on exit.
638 let _guard = CheckpointGuard {
639 flag: &self.checkpoint_in_progress,
640 flush_levels: &self.checkpoint_flush_levels,
641 };
642
643 // Step 1: Generate checkpoint ID
644 let checkpoint_id =
645 self.next_checkpoint_id.fetch_add(1, Ordering::SeqCst);
646
647 // X-5: snapshot the cleaner's "cleaned" file set at checkpoint START
648 // (before we write CkptStart) so we know which files were in the
649 // cleaned state when this checkpoint began. Passed to
650 // `after_checkpoint` at the end of this function.
651 let cleaner_state =
652 self.cleaner.as_ref().map(|c| c.get_checkpoint_start_state());
653
654 // Step 2: Write CkptStart entry to WAL (or synthesise a fake LSN when
655 // no LogManager is wired — used by unit tests that don't need I/O).
656 let start_lsn = if let Some(lm) = &self.log_manager {
657 let ckpt_start = CheckpointStart::new(checkpoint_id, invoker);
658 let mut buf = Vec::with_capacity(ckpt_start.log_size());
659 ckpt_start.write_to_log(&mut buf).map_err(|e| {
660 RecoveryError::CheckpointError(format!(
661 "CkptStart serialization failed: {e}"
662 ))
663 })?;
664 lm.log(
665 LogEntryType::CkptStart,
666 &buf,
667 Provisional::No,
668 false, // flush_required
669 false, // fsync_required
670 )
671 .map_err(|e| {
672 RecoveryError::CheckpointError(format!(
673 "CkptStart WAL write failed: {e}"
674 ))
675 })?
676 } else {
677 // No LogManager attached — synthetic LSN so existing tests pass.
678 Lsn::new(0, checkpoint_id as u32)
679 };
680
681 // Step 3: Build dirty IN map
682 let mut dirty_map = self.dirty_map.lock();
683 dirty_map.clear();
684 drop(dirty_map);
685
686 // Step 4a: Flush dirty BINs.
687 //
688 // For each dirty BIN in the tree decide — using TREE_BIN_DELTA
689 // threshold of 25 % — whether to write a BINDelta or a full BIN.
690 //
691 // `Checkpointer.processINList()` + `logIN()` (BIN path).
692 let mut flush_result = self.flush_dirty_bins_internal()?;
693
694 // Step 4b: Flush dirty upper INs (level ≥ 2) bottom-up.
695 //
696 // After BINs are written their parent INs are dirtied by splits.
697 // These must be logged before CkptEnd to make the checkpoint complete.
698 // Intermediate levels use Provisional::Yes (subsumed by root);
699 // the root level uses Provisional::No (anchors the checkpoint).
700 //
701 // `Checkpointer.processINList()` upper-IN loop +
702 // `Checkpointer.logIN()` for non-BIN nodes.
703 let upper_result = self.flush_upper_ins_internal()?;
704 flush_result.full_ins_flushed += upper_result.full_ins_flushed;
705
706 // Step 5: Write CkptEnd entry to WAL.
707 //
708 // T-F3 is NOT yet active: first_active_lsn stays Lsn::new(0,0) (full
709 // scan from start of log). Setting a non-zero first_active_lsn would
710 // bound the recovery scan — but that requires pre-loading BINs from
711 // the checkpoint into the recovery tree before replaying LNs (P-2
712 // BIN-preload infrastructure). Without P-2, starting from any LSN
713 // other than 0 silently drops pre-checkpoint committed LNs.
714 //
715 // Stage 2 wires T-F4 (update_first_lsn is called on first txn write,
716 // get_first_active_lsn() now returns a real LSN), but the consumer
717 // (T-F3 scan bounding) is deferred until P-2 lands.
718 //
719 // Backward compat: Lsn::new(0,0) tells recovery to full-scan from
720 // the start, which is correct and was always the behaviour.
721 let first_active_lsn: noxu_util::Lsn = noxu_util::Lsn::new(0, 0);
722 // (T-F4: txn_manager is wired; get_first_active_lsn() returns real
723 // LSN for future P-2 use; suppress unused warning.)
724 let _ = &self.txn_manager;
725
726 // REC-S: read the env's current last node/db/txn ids and write the
727 // REAL values into CheckpointEnd (instead of the old hardcoded zeros)
728 // so recovery folds them into use_max_* and the env seeds its
729 // sequences past them on restart. JE Checkpointer.doCheckpoint writes
730 // getLastLocalNodeId / getLastLocalDbId / getLastLocalTxnId.
731 // - last node-id: the tree-wide node counter (L-30); the next id to
732 // be handed out is `peek_next_node_id_counter()`, so the last
733 // allocated id is that minus 1 (saturating).
734 // - last db-id: the env's next_db_id minus 1.
735 // - last txn-id: txn_manager.get_last_local_txn_id().
736 let last_local_node_id: u64 =
737 noxu_tree::tree::peek_next_node_id_counter().saturating_sub(1);
738 let last_local_db_id: u64 = self
739 .next_db_id
740 .as_ref()
741 .map(|n| {
742 n.load(std::sync::atomic::Ordering::Relaxed).saturating_sub(1)
743 as u64
744 })
745 .unwrap_or(0);
746 let last_local_txn_id: u64 = self
747 .txn_manager
748 .as_ref()
749 .map(|t| t.get_last_local_txn_id().max(0) as u64)
750 .unwrap_or(0);
751
752 let end_lsn = if let Some(lm) = &self.log_manager {
753 let ckpt_end = CheckpointEnd::new(
754 checkpoint_id,
755 invoker,
756 start_lsn,
757 // REC-P / REC-B: root_lsn is intentionally always None. JE
758 // records the mapping-tree root here (Checkpointer.flushRoot
759 // → CheckpointEnd.rootLsn), but Noxu's catalog is an in-memory
760 // HashMap rebuilt from NameLN WAL entries during recovery
761 // (REC-B authorized divergence), so there is no mapping tree
762 // to flush and no root LSN to record. Per-DB utilization is
763 // persisted via persist_file_summaries (FileSummaryLN), not a
764 // mapping-tree MapLN flush.
765 None, // root_lsn
766 first_active_lsn,
767 // REC-S: real id maxima (were hardcoded 0).
768 last_local_node_id,
769 0, // last_replicated_node_id (HA: deferred)
770 last_local_db_id,
771 0, // last_replicated_db_id (HA: deferred)
772 last_local_txn_id,
773 0, // last_replicated_txn_id (HA: deferred)
774 false, // cleaned_files_to_delete
775 );
776 let mut buf = Vec::with_capacity(ckpt_end.log_size());
777 ckpt_end.write_to_log(&mut buf).map_err(|e| {
778 RecoveryError::CheckpointError(format!(
779 "CkptEnd serialization failed: {e}"
780 ))
781 })?;
782 lm.log(
783 LogEntryType::CkptEnd,
784 &buf,
785 Provisional::No,
786 true, // flush_required
787 // REC-F1: fsync the CkptEnd entry before returning. JE
788 // Checkpointer.doCheckpoint (~line 895):
789 // lastCheckpointEnd = logManager.logForceFlush(
790 // endEntry, true /*fsyncRequired*/, ...);
791 // "We must flush and fsync to ensure that cleaned files are
792 // not referenced. This also ensures that this checkpoint is
793 // not wasted if we crash." The fsync MUST precede the
794 // cleaner.after_checkpoint() barrier advance below (and JE
795 // fsyncs inside doCheckpoint before
796 // updateFilesAtCheckpointEnd), so ALL callers — close,
797 // daemon, and bytes-triggered wakeup_after_write — get a
798 // durable CkptEnd, not just close.
799 true, // fsync_required
800 )
801 .map_err(|e| {
802 RecoveryError::CheckpointError(format!(
803 "CkptEnd WAL write failed: {e}"
804 ))
805 })?
806 } else {
807 // No LogManager attached — synthetic LSN so existing tests pass.
808 Lsn::new(0, (checkpoint_id as u32) + 1)
809 };
810
811 // Step 6: Update statistics
812 *self.last_checkpoint_start.lock() = start_lsn;
813 *self.last_checkpoint_end.lock() = end_lsn;
814 // Reset the runnable-gate state: bytes written since this checkpoint.
815 self.bytes_since_checkpoint.store(0, Ordering::Relaxed);
816
817 let elapsed_ms = start_time.elapsed().as_millis() as u64;
818
819 self.stats.checkpoints.fetch_add(1, Ordering::Relaxed);
820 self.stats
821 .full_in_flush
822 .fetch_add(flush_result.full_ins_flushed, Ordering::Relaxed);
823 self.stats
824 .full_bin_flush
825 .fetch_add(flush_result.full_bins_flushed, Ordering::Relaxed);
826 self.stats
827 .delta_in_flush
828 .fetch_add(flush_result.delta_ins_flushed, Ordering::Relaxed);
829 self.stats.last_ckpt_id.store(checkpoint_id, Ordering::Relaxed);
830 self.stats.last_ckpt_start.store(start_lsn.as_u64(), Ordering::Relaxed);
831 self.stats.last_ckpt_end.store(end_lsn.as_u64(), Ordering::Relaxed);
832 self.stats.last_ckpt_interval.store(elapsed_ms, Ordering::Relaxed);
833
834 // X-5: advance the cleaner's three-state checkpoint barrier now that
835 // a checkpoint has successfully completed. Cleaned files that were
836 // snapshotted at checkpoint-start (`cleaner_state`) move to
837 // `checkpointed`; previously-checkpointed files move to
838 // `safe_to_delete` and will be removed on the next `delete_safe_files`
839 // call.
840 if let (Some(cleaner), Some(state)) = (&self.cleaner, cleaner_state) {
841 cleaner.after_checkpoint(&state);
842 }
843
844 Ok(CheckpointResult {
845 checkpoint_id,
846 start_lsn,
847 end_lsn,
848 full_ins_flushed: flush_result.full_ins_flushed,
849 full_bins_flushed: flush_result.full_bins_flushed,
850 delta_ins_flushed: flush_result.delta_ins_flushed,
851 elapsed_ms,
852 })
853 }
854
855 /// Get the LSN of the last checkpoint start.
856 pub fn get_last_checkpoint_start(&self) -> Lsn {
857 *self.last_checkpoint_start.lock()
858 }
859
860 /// Get the LSN of the last checkpoint end.
861 pub fn get_last_checkpoint_end(&self) -> Lsn {
862 *self.last_checkpoint_end.lock()
863 }
864
865 /// Check if a checkpoint is currently in progress.
866 pub fn is_checkpoint_in_progress(&self) -> bool {
867 self.checkpoint_in_progress.load(Ordering::Acquire)
868 }
869
870 /// Choose the [`Provisional`] flag for a node being evicted by the evictor.
871 ///
872 /// Returns `Provisional::Yes` when a checkpoint is in progress **and** the
873 /// node's level is strictly below the **tree-specific** highest flush level
874 /// for `db_id` (meaning the checkpoint will write a non-provisional ancestor
875 /// for that tree that subsumes this entry). Returns `Provisional::No` if
876 /// no checkpoint is in progress, or if `db_id` has no dirty upper INs in
877 /// this checkpoint (level absent from map → 0 → not covered).
878 ///
879 /// # JE reference
880 /// `Checkpointer.coordinateEvictionWithCheckpoint` →
881 /// `DirtyINMap.coordinateEvictionWithCheckpoint` which calls
882 /// `getHighestFlushLevel(db)` — **per-`DatabaseImpl`** lookup. If the db
883 /// is absent from `highestFlushLevels`, `getHighestFlushLevel` returns
884 /// `IN.MIN_LEVEL` (≤ 0) making the comparison false → `Provisional::NO`.
885 ///
886 /// # CC-4 residual
887 /// The prior implementation stored a single global max-level (`AtomicI32`)
888 /// that was the maximum across ALL trees. A BIN evicted from tree A (no
889 /// dirty upper INs) got `Provisional::Yes` because tree B's level was
890 /// non-zero, but NO non-provisional ancestor was written for tree A →
891 /// recovery discards the provisional BIN → data loss on crash before the
892 /// next checkpoint. Per-tree lookup (this method) fixes that: tree A's
893 /// level is absent → 0 → `Provisional::No` (authoritative log entry).
894 ///
895 /// # Race window
896 /// Same benign race as JE: if the checkpoint finishes between the
897 /// `in_progress` read and the log write, the BIN may be logged
898 /// `Provisional::Yes` without a covering ancestor in *this* checkpoint, but
899 /// the next checkpoint will cover it. Logging `Yes` without strict need is
900 /// safe (log bloat only); the reverse is what causes recovery inconsistency.
901 pub fn get_eviction_provisional(
902 &self,
903 db_id: u64,
904 node_level: i32,
905 ) -> Provisional {
906 if !self.checkpoint_in_progress.load(Ordering::Acquire) {
907 return Provisional::No;
908 }
909 // Look up this tree's flush level. Missing entry means no dirty upper
910 // INs → level 0 → condition false → Provisional::No.
911 let max_flush = self
912 .checkpoint_flush_levels
913 .lock()
914 .unwrap_or_else(|e| e.into_inner())
915 .get(&db_id)
916 .copied()
917 .unwrap_or(0);
918 if max_flush > 0 && node_level < max_flush {
919 Provisional::Yes
920 } else {
921 Provisional::No
922 }
923 }
924
925 pub fn get_stats(&self) -> Arc<CheckpointStats> {
926 Arc::clone(&self.stats)
927 }
928
929 /// Get the configuration.
930 pub fn get_config(&self) -> &CheckpointConfig {
931 &self.config
932 }
933
934 /// Request shutdown of the checkpointer.
935 ///
936 /// Sets the shutdown flag AND wakes up the daemon thread so it exits
937 /// immediately without waiting the full sleep interval.
938 pub fn request_shutdown(&self) {
939 self.shutdown.store(true, Ordering::Release);
940 // Wake up any thread sleeping in wait_for_shutdown_or_timeout().
941 if let Ok(mut guard) = self.shutdown_mutex.lock() {
942 *guard = true;
943 }
944 self.shutdown_condvar.notify_all();
945 }
946
947 /// Check if shutdown has been requested.
948 pub fn is_shutdown(&self) -> bool {
949 self.shutdown.load(Ordering::Acquire)
950 }
951
952 /// CLN-14: wake the checkpointer daemon promptly after a cleaning pass so
953 /// cleaned files are deleted without waiting the full wakeup interval
954 /// (default 60 s).
955 ///
956 /// The cleaner registers this via `Cleaner::set_checkpoint_wakeup_fn`; it
957 /// is invoked at the end of a successful `do_clean`. It notifies the
958 /// daemon's sleep condvar WITHOUT setting the shutdown flag, so the daemon
959 /// thread returns early from `wait_for_shutdown_or_timeout` and re-checks
960 /// `is_runnable(false)` — which returns `true` because
961 /// `needs_checkpoint_for_cleaned_files()` now reports the just-cleaned
962 /// files pending reclaim. The result is a prompt checkpoint that runs
963 /// `after_checkpoint()` and lets `delete_safe_files` remove the files.
964 ///
965 /// JE: `FileProcessor.doClean` calls
966 /// `envImpl.getCheckpointer().wakeupAfterNoWrites()` (Cleaner/FileProcessor),
967 /// which sets `wakeupAfterNoWrites = true` and wakes the checkpointer;
968 /// `Checkpointer.isRunnable` then returns true via
969 /// `needCheckpointForCleanedFiles()`. Noxu folds the flag into the
970 /// cleaner query (`is_runnable`), so this method only has to wake the
971 /// sleeping daemon.
972 pub fn wakeup_after_no_writes(&self) {
973 // Notify the sleep condvar without touching the shutdown flag: the
974 // daemon wakes, sees is_shutdown() == false, and re-evaluates
975 // is_runnable(false).
976 if self.shutdown_mutex.lock().is_ok() {
977 self.shutdown_condvar.notify_all();
978 }
979 }
980
981 /// Sleep for `duration` or until `request_shutdown()` is called.
982 ///
983 /// Used by the daemon thread in `EnvironmentImpl` instead of
984 /// `thread::sleep()` so that shutdown is immediate.
985 pub fn wait_for_shutdown_or_timeout(&self, duration: std::time::Duration) {
986 if let Ok(guard) = self.shutdown_mutex.lock() {
987 // wait_timeout returns immediately when the condvar is notified.
988 let _ = self.shutdown_condvar.wait_timeout(guard, duration);
989 }
990 }
991
992 /// Get the next checkpoint ID (without incrementing).
993 pub fn peek_next_checkpoint_id(&self) -> u64 {
994 self.next_checkpoint_id.load(Ordering::SeqCst)
995 }
996
997 /// REC-G: seed the checkpoint-interval baselines from a recovered
998 /// checkpoint, so the FIRST post-recovery checkpoint interval is measured
999 /// from the recovered `CkptEnd` rather than from process start.
1000 ///
1001 /// Without this, `last_checkpoint_start`/`_end` start at `NULL_LSN` and
1002 /// `bytes_since_checkpoint` at 0 after recovery, so the bytes/time gate
1003 /// would treat all log written before the crash as "since the last
1004 /// checkpoint" — firing a redundant checkpoint immediately, or (for the
1005 /// time branch) measuring the interval from the wrong baseline.
1006 ///
1007 /// JE ref: `Checkpointer.initIntervals(lastCheckpointStart,
1008 /// lastCheckpointEnd, lastCheckpointMillis)` — called from
1009 /// `RecoveryManager.recover()` after the recovery scan completes. Noxu
1010 /// passes the recovered `checkpoint_start_lsn` / `checkpoint_end_lsn`
1011 /// (NULL_LSN when the log had no prior checkpoint, matching JE).
1012 pub fn init_intervals(
1013 &self,
1014 last_checkpoint_start: Lsn,
1015 last_checkpoint_end: Lsn,
1016 ) {
1017 *self.last_checkpoint_start.lock() = last_checkpoint_start;
1018 *self.last_checkpoint_end.lock() = last_checkpoint_end;
1019 // A freshly-recovered environment has written nothing since the
1020 // recovered checkpoint; reset the byte accumulator so the gate does
1021 // not immediately fire on pre-crash log volume.
1022 self.bytes_since_checkpoint.store(0, Ordering::Relaxed);
1023 }
1024
1025 /// REC-H: continue the checkpoint-ID sequence after recovery instead of
1026 /// restarting at 1. The next checkpoint will use `last_checkpoint_id + 1`.
1027 ///
1028 /// The ID is a debug/log tag (not a correctness key), but it should not
1029 /// regress or collide across restarts. Seeded from the recovered
1030 /// `CkptEnd.id`.
1031 ///
1032 /// JE ref: `Checkpointer.setCheckpointId(lastCheckpointId)` — "can only be
1033 /// done after recovery"; JE stores `checkpointId = lastCheckpointId` and
1034 /// `incrementProgress`/`generateCheckpointId` advances from there. Noxu's
1035 /// `do_checkpoint` does `fetch_add(1)`, so we seed `next_checkpoint_id =
1036 /// last_checkpoint_id + 1` to make the next emitted ID strictly greater.
1037 pub fn set_checkpoint_id(&self, last_checkpoint_id: u64) {
1038 self.next_checkpoint_id.store(last_checkpoint_id + 1, Ordering::SeqCst);
1039 }
1040
1041 /// Flush all dirty BINs to the log (public, unit-result API).
1042 ///
1043 /// Calls the internal flush logic and discards the detailed `FlushResult`,
1044 /// returning only success/failure. Use this from external callers (e.g.
1045 /// daemon threads) that do not need per-BIN counts.
1046 ///
1047 /// `Checkpointer.doCheckpoint()` partial flush path.
1048 pub fn flush_dirty_bins(&self) -> Result<()> {
1049 self.flush_dirty_bins_internal().map(|_| ())
1050 }
1051
1052 /// Internal flush all dirty BINs to the log.
1053 ///
1054 /// Flushes dirty BINs from `self.tree` (primary tree) AND from every
1055 /// tree in `self.db_trees_registry` (user databases).
1056 ///
1057 /// JE `Checkpointer.processINList` walks a single env-wide `INList`
1058 /// covering all databases; Noxu achieves the same effect by iterating the
1059 /// `db_trees_registry` and calling the per-tree BIN-flush logic for each.
1060 ///
1061 /// For each dirty BIN `BinStub::should_log_delta(bin_delta_percent)`
1062 /// (faithful JE `BIN.shouldLogDelta`, BIN.java:1892) decides:
1063 /// - delta-slot count `<= nEntries * bin_delta_percent / 100` (and no
1064 /// prohibit / a prior full exists) → write `BINDelta` entry (delta path)
1065 /// - otherwise → write full `BIN` entry (full path)
1066 ///
1067 /// Also calls `persist_file_summaries()` to ensure utilization data is
1068 /// durable.
1069 ///
1070 /// `Checkpointer.processINList()` + `Checkpointer.logIN()`.
1071 pub(crate) fn flush_dirty_bins_internal(&self) -> Result<FlushResult> {
1072 let mut result = FlushResult::default();
1073
1074 let lm = match &self.log_manager {
1075 Some(lm) => lm,
1076 // No log manager — nothing to flush (unit tests).
1077 None => return Ok(result),
1078 };
1079
1080 // Stage-1: flush the primary tree (if wired) then every user-database
1081 // tree from the registry. JE's equivalent is processINList walking
1082 // the single env-wide INList that covers all databases.
1083 //
1084 // IMPORTANT: the primary_tree (self.tree, db_id=1) and the user-database
1085 // real_tree for db_id=1 are DIFFERENT Arc<RwLock<Tree>> objects. The
1086 // primary_tree is used by the cleaner for LN migration but is never
1087 // written by user operations. User data lives in the real_trees stored
1088 // in db_trees_registry. We flush both: primary_tree first (harmless
1089 // if empty), then all registry trees (where user data lives).
1090 // No skip guard — the registry trees are always distinct objects from
1091 // self.tree even when their db_id happens to match self.db_id.
1092 let mut trees_to_flush: Vec<(u64, Arc<RwLock<Tree>>)> = Vec::new();
1093 if let Some(t) = &self.tree {
1094 trees_to_flush.push((self.db_id, Arc::clone(t)));
1095 }
1096 if let Some(reg) = &self.db_trees_registry
1097 && let Ok(guard) = reg.lock()
1098 {
1099 for (&db_id_i64, tree_arc) in guard.iter() {
1100 let db_id = db_id_i64 as u64;
1101 trees_to_flush.push((db_id, Arc::clone(tree_arc)));
1102 }
1103 }
1104
1105 for (db_id, tree_arc) in trees_to_flush {
1106 let r = Self::flush_one_tree_bins(
1107 db_id,
1108 &tree_arc,
1109 lm,
1110 self.config.bin_delta_percent,
1111 )?;
1112 result.full_bins_flushed += r.full_bins_flushed;
1113 result.delta_ins_flushed += r.delta_ins_flushed;
1114 result.obsolete_delta_lsns.extend(r.obsolete_delta_lsns);
1115 }
1116
1117 // L-5-delta: count the superseded prior BIN-deltas (auxOldLsn)
1118 // obsolete via the wired UtilizationTracker, BEFORE
1119 // persist_file_summaries so the counts land in this checkpoint's
1120 // FileSummaryLN. JE: LogManager.serialLogWork counts auxOldLsn via
1121 // countObsoleteNodeDupsAllowed(auxOldLsn, type, size=0, nodeDb).
1122 if !result.obsolete_delta_lsns.is_empty()
1123 && let Some(tracker_lock) = &self.utilization_tracker
1124 {
1125 let mut tracker = tracker_lock.lock();
1126 for (lsn, db_id) in &result.obsolete_delta_lsns {
1127 // size 0 (auxOldLsn carries no size); count_as_ln = false (an
1128 // IN/BIN-delta, not an LN); dups-allowed variant (INs use it).
1129 tracker.count_obsolete_node_dups_allowed(
1130 lsn.file_number(),
1131 lsn.file_offset(),
1132 0,
1133 false,
1134 Some(*db_id),
1135 );
1136 }
1137 }
1138
1139 // Persist file utilization summaries so they survive restarts.
1140 self.persist_file_summaries()?;
1141
1142 Ok(result)
1143 }
1144
1145 /// Flush dirty BINs for a single tree to the WAL.
1146 ///
1147 /// Extracted so both `flush_dirty_bins_internal` (primary tree) and the
1148 /// per-user-database loop can share the same logic without duplicating the
1149 /// TREE_BIN_DELTA decision or the X-8 early-exit guard.
1150 fn flush_one_tree_bins(
1151 db_id: u64,
1152 tree_arc: &Arc<RwLock<Tree>>,
1153 lm: &Arc<LogManager>,
1154 bin_delta_percent: i32,
1155 ) -> Result<FlushResult> {
1156 let mut result = FlushResult::default();
1157
1158 // Collect dirty BINs under a read lock on the tree.
1159 let dirty_bins = {
1160 let tree_guard = tree_arc.read().map_err(|_| {
1161 RecoveryError::CheckpointError(
1162 "tree lock poisoned during checkpoint".to_string(),
1163 )
1164 })?;
1165 tree_guard.collect_dirty_bins(db_id)
1166 };
1167
1168 // The delta-vs-full decision per BIN is made by
1169 // `BinStub::should_log_delta(bin_delta_percent)` below — faithful JE
1170 // `BIN.shouldLogDelta` (count-based + configurable percent).
1171
1172 for (_node_db_id, bin_arc) in dirty_bins {
1173 // Acquire write lock to serialize + clear dirty flags.
1174 let mut bin_guard = bin_arc.write();
1175
1176 let b = match &mut *bin_guard {
1177 TreeNode::Bottom(b) => b,
1178 _ => continue, // not a BIN (defensive)
1179 };
1180
1181 let dirty = b.dirty_count();
1182
1183 // X-8: skip nodes that the evictor already flushed and cleared
1184 // between our dirty-BIN snapshot (under tree read lock) and the
1185 // per-node write-lock acquisition.
1186 if !b.dirty && dirty == 0 {
1187 continue;
1188 }
1189
1190 // TREE_BIN_DELTA decision — faithful JE `BIN.shouldLogDelta`
1191 // (BIN.java:1892): COUNT-based (numDeltas = dirty slots) against the
1192 // CONFIGURABLE percent limit, with the isBINDelta fast path, the
1193 // numDeltas<=0 guard, and the isDeltaProhibited / lastFullLsn==NULL
1194 // bound — all encapsulated in `BinStub::should_log_delta`.
1195 let use_delta = b.should_log_delta(bin_delta_percent);
1196
1197 if use_delta {
1198 // --- BIN-delta path ---
1199 // L-5-delta: the prior BIN-delta this one supersedes (JE
1200 // `auxOldLsn = logEntry.getPrevDeltaLsn()`) becomes obsolete
1201 // when we log the new delta non-provisionally. Capture it
1202 // BEFORE advancing the delta chain; counted obsolete by
1203 // `flush_dirty_bins_internal` via the tracker.
1204 let superseded_delta_lsn = b.last_delta_lsn;
1205 let delta_bytes = b.serialize_delta();
1206 let entry = BinDeltaLogEntry::new(
1207 db_id,
1208 b.last_full_lsn,
1209 b.last_delta_lsn, // prev_delta_lsn
1210 delta_bytes,
1211 );
1212 let mut buf = bytes::BytesMut::with_capacity(entry.log_size());
1213 entry.write_to_log(&mut buf);
1214 let delta_logged_lsn = lm
1215 .log(
1216 LogEntryType::BINDelta,
1217 &buf,
1218 Provisional::No,
1219 false, // flush_required
1220 false, // fsync_required — fsync at CkptEnd
1221 )
1222 .map_err(|e| {
1223 RecoveryError::CheckpointError(format!(
1224 "BINDelta WAL write failed: {e}"
1225 ))
1226 })?;
1227 b.last_delta_lsn = delta_logged_lsn; // advance chain for next delta
1228 b.clear_dirty_after_delta_log();
1229 result.delta_ins_flushed += 1;
1230 // L-5-delta: record the superseded prior delta (auxOldLsn) for
1231 // obsolete counting. Only when non-null — a delta whose prior
1232 // version was a full BIN has prev_delta_lsn == NULL (the full
1233 // version stays live, referenced by the delta).
1234 if superseded_delta_lsn != NULL_LSN {
1235 result
1236 .obsolete_delta_lsns
1237 .push((superseded_delta_lsn, db_id as u32));
1238 }
1239 } else {
1240 // --- Full BIN path ---
1241 let full_bytes = b.serialize_full();
1242 let entry = InLogEntry::new(
1243 db_id,
1244 b.last_full_lsn,
1245 NULL_LSN, // prev_delta_lsn
1246 full_bytes,
1247 );
1248 let mut buf = bytes::BytesMut::with_capacity(entry.log_size());
1249 entry.write_to_log(&mut buf);
1250 let logged_lsn = lm
1251 .log(
1252 LogEntryType::BIN,
1253 &buf,
1254 Provisional::No,
1255 false, // flush_required
1256 false, // fsync_required — fsync at CkptEnd
1257 )
1258 .map_err(|e| {
1259 RecoveryError::CheckpointError(format!(
1260 "BIN WAL write failed: {e}"
1261 ))
1262 })?;
1263 b.last_delta_lsn = NULL_LSN; // full BIN resets delta chain
1264 b.clear_dirty_after_full_log(logged_lsn);
1265 result.full_bins_flushed += 1;
1266 }
1267 }
1268
1269 Ok(result)
1270 }
1271
1272 /// Flush all dirty upper INs (level ≥ 2) bottom-up to the WAL.
1273 ///
1274 /// Flushes upper INs from `self.tree` (primary tree) AND from every tree
1275 /// in `self.db_trees_registry` (user databases), mirroring
1276 /// `flush_dirty_bins_internal`'s all-trees iteration.
1277 ///
1278 /// `Checkpointer.processINList()` upper-IN pass +
1279 /// `Checkpointer.logIN()` for `TreeNode::Internal` nodes.
1280 fn flush_upper_ins_internal(&self) -> Result<FlushResult> {
1281 let mut result = FlushResult::default();
1282
1283 let lm = match &self.log_manager {
1284 Some(lm) => lm,
1285 None => return Ok(result),
1286 };
1287
1288 let mut trees_to_flush: Vec<(u64, Arc<RwLock<Tree>>)> = Vec::new();
1289 if let Some(t) = &self.tree {
1290 trees_to_flush.push((self.db_id, Arc::clone(t)));
1291 }
1292 if let Some(reg) = &self.db_trees_registry
1293 && let Ok(guard) = reg.lock()
1294 {
1295 for (&db_id_i64, tree_arc) in guard.iter() {
1296 let db_id = db_id_i64 as u64;
1297 // No skip guard: registry trees are distinct objects from self.tree.
1298 trees_to_flush.push((db_id, Arc::clone(tree_arc)));
1299 }
1300 }
1301
1302 // CC-4 residual fix: compute the per-tree highest flush level before
1303 // any logging begins. Populate checkpoint_flush_levels with one entry
1304 // per tree that has dirty upper INs. Trees absent from the map have
1305 // no dirty upper INs → their BINs must NOT be logged Provisional::Yes.
1306 //
1307 // JE ref: DirtyINMap.highestFlushLevels (Map<DatabaseImpl, Integer>);
1308 // getHighestFlushLevel(db) returns IN.MIN_LEVEL (0) for absent keys,
1309 // making coordinateEvictionWithCheckpoint return Provisional.NO.
1310 //
1311 // Memory ordering: the map is populated inside the Mutex before the
1312 // first WAL write. The evictor acquires the same Mutex to read it
1313 // (Mutex provides the necessary happens-before). The RAII guard in
1314 // do_checkpoint clears the map via CheckpointGuard::drop.
1315 {
1316 let mut levels = self
1317 .checkpoint_flush_levels
1318 .lock()
1319 .unwrap_or_else(|e| e.into_inner());
1320 levels.clear();
1321 for (db_id, tree_arc) in &trees_to_flush {
1322 // REC-AA: the recorded highest flush level is
1323 // `max(dirty-upper-IN-level) + 1`, bounded by the root level
1324 // — JE DirtyINMap.updateFlushLevels flushes at least one level
1325 // ABOVE the highest dirty node (`(ckptFlushExtraLevel || isBIN)
1326 // && !isRoot` → `level += 1`) so the lower level is logged
1327 // provisionally and recovery skips reprocessing it. The `+1`
1328 // is bounded by the root level (`!isRoot` guard) so we never
1329 // claim to flush above the tree root — a node AT the root level
1330 // is the non-provisional anchor and must NOT itself be marked
1331 // coverable.
1332 let flush_level = tree_arc.read().ok().and_then(|guard| {
1333 let dirty_ins = guard.collect_dirty_upper_ins(*db_id);
1334 let max_dirty =
1335 dirty_ins.iter().map(|(lvl, _)| *lvl).max()?;
1336 // Root level bounds the +1. The root is the
1337 // highest-level resident node.
1338 let root_level = guard
1339 .get_root()
1340 .map(|r| r.read().level())
1341 .unwrap_or(max_dirty);
1342 Some((max_dirty + 1).min(root_level))
1343 });
1344 if let Some(level) = flush_level
1345 && level > 0
1346 {
1347 levels.insert(*db_id, level);
1348 }
1349 }
1350 }
1351
1352 for (db_id, tree_arc) in trees_to_flush {
1353 let r = Self::flush_one_tree_upper_ins(db_id, &tree_arc, lm)?;
1354 result.full_ins_flushed += r.full_ins_flushed;
1355 }
1356
1357 Ok(result)
1358 }
1359
1360 /// Flush dirty upper INs for a single tree to the WAL.
1361 fn flush_one_tree_upper_ins(
1362 db_id: u64,
1363 tree_arc: &Arc<RwLock<Tree>>,
1364 lm: &Arc<LogManager>,
1365 ) -> Result<FlushResult> {
1366 let mut result = FlushResult::default();
1367
1368 // Collect dirty upper INs under a read lock.
1369 let dirty_ins = {
1370 let tree_guard = tree_arc.read().map_err(|_| {
1371 RecoveryError::CheckpointError(
1372 "tree lock poisoned during upper-IN flush".to_string(),
1373 )
1374 })?;
1375 tree_guard.collect_dirty_upper_ins(db_id)
1376 };
1377
1378 if dirty_ins.is_empty() {
1379 return Ok(result);
1380 }
1381
1382 // The maximum level present is the root level; it must be logged
1383 // Provisional::No. All others use Provisional::Yes.
1384 let max_level =
1385 dirty_ins.iter().map(|(lvl, _)| *lvl).max().unwrap_or(0);
1386
1387 for (level, node_arc) in &dirty_ins {
1388 let mut node_guard = node_arc.write();
1389
1390 if !node_guard.is_dirty() {
1391 continue; // may have been cleared by a concurrent checkpoint
1392 }
1393
1394 // Serialize the upper IN using the existing `write_to_bytes()` path.
1395 let node_bytes = node_guard.write_to_bytes();
1396 let provisional = if *level == max_level {
1397 Provisional::No
1398 } else {
1399 Provisional::Yes
1400 };
1401
1402 let entry = InLogEntry::new(
1403 db_id,
1404 noxu_util::NULL_LSN, // prev_full_lsn — no previous version tracking for upper INs yet
1405 noxu_util::NULL_LSN, // prev_delta_lsn
1406 node_bytes,
1407 );
1408 let mut buf = bytes::BytesMut::with_capacity(entry.log_size());
1409 entry.write_to_log(&mut buf);
1410 lm.log(
1411 LogEntryType::IN,
1412 &buf,
1413 provisional,
1414 false, // flush_required
1415 false, // fsync_required — fsync at CkptEnd
1416 )
1417 .map_err(|e| {
1418 RecoveryError::CheckpointError(format!(
1419 "IN WAL write failed: {e}"
1420 ))
1421 })?;
1422
1423 node_guard.set_dirty(false);
1424 result.full_ins_flushed += 1;
1425 }
1426
1427 Ok(result)
1428 }
1429}
1430
1431/// RAII guard to ensure `checkpoint_in_progress` and `checkpoint_flush_levels`
1432/// are cleared when the checkpoint finishes or is abandoned.
1433///
1434/// CC-4 residual: `flush_levels` must be cleared so the evictor stops
1435/// returning `Provisional::Yes` for any tree after the checkpoint ends.
1436struct CheckpointGuard<'a> {
1437 flag: &'a AtomicBool,
1438 flush_levels: &'a std::sync::Mutex<HashMap<u64, i32>>,
1439}
1440
1441impl<'a> Drop for CheckpointGuard<'a> {
1442 fn drop(&mut self) {
1443 // Clear per-tree flush levels before clearing the in_progress flag.
1444 // An evictor that reads in_progress=true will still see the (stale)
1445 // map; once in_progress goes false the map contents are irrelevant.
1446 if let Ok(mut levels) = self.flush_levels.lock() {
1447 levels.clear();
1448 }
1449 self.flag.store(false, Ordering::Release);
1450 }
1451}
1452
1453/// Internal struct for tracking flush results.
1454#[derive(Debug, Default)]
1455pub(crate) struct FlushResult {
1456 full_ins_flushed: u64,
1457 full_bins_flushed: u64,
1458 delta_ins_flushed: u64,
1459 /// L-5-delta: per-DB superseded prior BIN-delta LSNs made obsolete by a
1460 /// newly-logged BIN-delta (the `prev_delta_lsn` / JE `auxOldLsn`).
1461 /// `flush_dirty_bins_internal` counts these obsolete via the wired
1462 /// `UtilizationTracker` after the flush (it has `&self`; the per-tree
1463 /// flush is a static helper without tracker access). Tuple is
1464 /// `(prev_delta_lsn, db_id)`. JE: IN.java auxOldLsn ->
1465 /// LogManager.countObsoleteNodeDupsAllowed.
1466 obsolete_delta_lsns: Vec<(Lsn, u32)>,
1467}
1468
1469#[cfg(test)]
1470mod tests {
1471 use super::*;
1472
1473 #[test]
1474 fn test_checkpoint_config_default() {
1475 let config = CheckpointConfig::default();
1476 assert!(!config.force);
1477 assert!(!config.minimize_recovery_time);
1478 assert_eq!(config.bytes_interval, 20_000_000);
1479 assert_eq!(config.time_interval, 0);
1480 }
1481
1482 #[test]
1483 fn test_checkpoint_config_builder() {
1484 let config = CheckpointConfig::new()
1485 .force(true)
1486 .minimize_recovery_time(true)
1487 .bytes_interval(10_000_000)
1488 .time_interval(5000);
1489 assert!(config.force);
1490 assert!(config.minimize_recovery_time);
1491 assert_eq!(config.bytes_interval, 10_000_000);
1492 assert_eq!(config.time_interval, 5000);
1493 }
1494
1495 #[test]
1496 fn test_checkpoint_result() {
1497 let result = CheckpointResult {
1498 checkpoint_id: 42,
1499 start_lsn: Lsn::new(1, 100),
1500 end_lsn: Lsn::new(1, 200),
1501 full_ins_flushed: 10,
1502 full_bins_flushed: 20,
1503 delta_ins_flushed: 5,
1504 elapsed_ms: 250,
1505 };
1506 assert_eq!(result.checkpoint_id, 42);
1507 assert_eq!(result.total_nodes_flushed(), 35);
1508 }
1509
1510 #[test]
1511 fn test_checkpointer_new() {
1512 let checkpointer = Checkpointer::new(CheckpointConfig::default());
1513 assert!(!checkpointer.is_checkpoint_in_progress());
1514 assert!(!checkpointer.is_shutdown());
1515 assert_eq!(checkpointer.peek_next_checkpoint_id(), 1);
1516 assert_eq!(
1517 checkpointer.get_last_checkpoint_start(),
1518 noxu_util::NULL_LSN
1519 );
1520 assert_eq!(checkpointer.get_last_checkpoint_end(), noxu_util::NULL_LSN);
1521 }
1522
1523 #[test]
1524 fn test_checkpointer_do_checkpoint() {
1525 let checkpointer = Checkpointer::new(CheckpointConfig::default());
1526 let result = checkpointer.do_checkpoint("test").unwrap();
1527 assert_eq!(result.checkpoint_id, 1);
1528 assert!(result.start_lsn != noxu_util::NULL_LSN);
1529 assert!(result.end_lsn != noxu_util::NULL_LSN);
1530 assert_eq!(result.total_nodes_flushed(), 0);
1531 }
1532
1533 #[test]
1534 fn test_checkpointer_sequential_ids() {
1535 let checkpointer = Checkpointer::new(CheckpointConfig::default());
1536 let result1 = checkpointer.do_checkpoint("test1").unwrap();
1537 let result2 = checkpointer.do_checkpoint("test2").unwrap();
1538 assert_eq!(result1.checkpoint_id, 1);
1539 assert_eq!(result2.checkpoint_id, 2);
1540 }
1541
1542 #[test]
1543 fn test_checkpointer_concurrent_checkpoint_fails() {
1544 let checkpointer = Checkpointer::new(CheckpointConfig::default());
1545 checkpointer.checkpoint_in_progress.store(true, Ordering::Release);
1546 let result = checkpointer.do_checkpoint("test");
1547 assert!(result.is_err());
1548 if let Err(RecoveryError::CheckpointError(msg)) = result {
1549 assert!(msg.contains("already in progress"));
1550 } else {
1551 panic!("Expected CheckpointError");
1552 }
1553 }
1554
1555 #[test]
1556 fn test_checkpointer_shutdown() {
1557 let checkpointer = Checkpointer::new(CheckpointConfig::default());
1558 checkpointer.request_shutdown();
1559 assert!(checkpointer.is_shutdown());
1560 let result = checkpointer.do_checkpoint("test");
1561 assert!(result.is_err());
1562 if let Err(RecoveryError::CheckpointError(msg)) = result {
1563 assert!(msg.contains("shut down"));
1564 } else {
1565 panic!("Expected CheckpointError");
1566 }
1567 }
1568
1569 #[test]
1570 fn test_checkpointer_last_lsns() {
1571 let checkpointer = Checkpointer::new(CheckpointConfig::default());
1572 let result = checkpointer.do_checkpoint("test").unwrap();
1573 assert_eq!(checkpointer.get_last_checkpoint_start(), result.start_lsn);
1574 assert_eq!(checkpointer.get_last_checkpoint_end(), result.end_lsn);
1575 }
1576
1577 #[test]
1578 fn test_checkpointer_stats() {
1579 let checkpointer = Checkpointer::new(CheckpointConfig::default());
1580 let stats = checkpointer.get_stats();
1581 assert_eq!(stats.checkpoints.load(Ordering::Relaxed), 0);
1582 checkpointer.do_checkpoint("test").unwrap();
1583 assert_eq!(stats.checkpoints.load(Ordering::Relaxed), 1);
1584 }
1585
1586 #[test]
1587 fn test_checkpoint_guard() {
1588 let flag = AtomicBool::new(false);
1589 let levels: std::sync::Mutex<HashMap<u64, i32>> =
1590 std::sync::Mutex::new(HashMap::from([(1u64, 3i32)]));
1591 {
1592 flag.store(true, Ordering::Release);
1593 let _guard = CheckpointGuard { flag: &flag, flush_levels: &levels };
1594 assert!(flag.load(Ordering::Acquire));
1595 }
1596 assert!(!flag.load(Ordering::Acquire));
1597 assert!(
1598 levels.lock().unwrap().is_empty(),
1599 "guard must clear flush_levels map"
1600 );
1601 }
1602
1603 #[test]
1604 fn test_checkpoint_config_cloning() {
1605 let config1 = CheckpointConfig::new().force(true).bytes_interval(1000);
1606 let config2 = config1.clone();
1607 assert_eq!(config1.force, config2.force);
1608 assert_eq!(config1.bytes_interval, config2.bytes_interval);
1609 }
1610
1611 #[test]
1612 fn test_checkpoint_result_cloning() {
1613 let result1 = CheckpointResult {
1614 checkpoint_id: 1,
1615 start_lsn: Lsn::new(1, 100),
1616 end_lsn: Lsn::new(1, 200),
1617 full_ins_flushed: 10,
1618 full_bins_flushed: 20,
1619 delta_ins_flushed: 5,
1620 elapsed_ms: 100,
1621 };
1622 let result2 = result1.clone();
1623 assert_eq!(result1.checkpoint_id, result2.checkpoint_id);
1624 }
1625
1626 #[test]
1627 fn test_peek_next_checkpoint_id() {
1628 let checkpointer = Checkpointer::new(CheckpointConfig::default());
1629 assert_eq!(checkpointer.peek_next_checkpoint_id(), 1);
1630 checkpointer.do_checkpoint("test").unwrap();
1631 assert_eq!(checkpointer.peek_next_checkpoint_id(), 2);
1632 }
1633
1634 #[test]
1635 fn test_multiple_checkpoints_update_lsns() {
1636 let checkpointer = Checkpointer::new(CheckpointConfig::default());
1637 let result1 = checkpointer.do_checkpoint("test1").unwrap();
1638 let result2 = checkpointer.do_checkpoint("test2").unwrap();
1639 assert_eq!(checkpointer.get_last_checkpoint_start(), result2.start_lsn);
1640 assert_eq!(checkpointer.get_last_checkpoint_end(), result2.end_lsn);
1641 assert_ne!(result1.start_lsn, result2.start_lsn);
1642 }
1643
1644 // -----------------------------------------------------------------------
1645 // Tests that require a real LogManager / FileManager
1646 // -----------------------------------------------------------------------
1647
1648 #[test]
1649 fn test_checkpoint_writes_wal_entries() {
1650 use noxu_log::{FileManager, LogManager};
1651 use std::sync::Arc;
1652 use tempfile::TempDir;
1653
1654 let dir = TempDir::new().unwrap();
1655 let fm = Arc::new(
1656 FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
1657 );
1658 let lm =
1659 Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
1660
1661 let checkpointer = Checkpointer::new(CheckpointConfig::default())
1662 .with_log_manager(Arc::clone(&lm));
1663
1664 let result = checkpointer.do_checkpoint("test_wal").unwrap();
1665
1666 // Both LSNs must be non-null and the end must follow the start.
1667 assert!(
1668 !result.start_lsn.is_null(),
1669 "start_lsn must not be NULL after a WAL-backed checkpoint"
1670 );
1671 assert!(
1672 !result.end_lsn.is_null(),
1673 "end_lsn must not be NULL after a WAL-backed checkpoint"
1674 );
1675 assert!(
1676 result.end_lsn.as_u64() > result.start_lsn.as_u64(),
1677 "end_lsn ({:?}) must be greater than start_lsn ({:?})",
1678 result.end_lsn,
1679 result.start_lsn
1680 );
1681
1682 // The stored LSNs on the checkpointer must match the returned result.
1683 assert_eq!(checkpointer.get_last_checkpoint_start(), result.start_lsn);
1684 assert_eq!(checkpointer.get_last_checkpoint_end(), result.end_lsn);
1685 }
1686
1687 #[test]
1688 fn test_two_sequential_wal_checkpoints_have_increasing_lsns() {
1689 use noxu_log::{FileManager, LogManager};
1690 use std::sync::Arc;
1691 use tempfile::TempDir;
1692
1693 let dir = TempDir::new().unwrap();
1694 let fm = Arc::new(
1695 FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
1696 );
1697 let lm =
1698 Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
1699
1700 let checkpointer = Checkpointer::new(CheckpointConfig::default())
1701 .with_log_manager(Arc::clone(&lm));
1702
1703 let r1 = checkpointer.do_checkpoint("first").unwrap();
1704 let r2 = checkpointer.do_checkpoint("second").unwrap();
1705
1706 // Each successive checkpoint must have strictly higher LSNs.
1707 assert!(
1708 r2.start_lsn.as_u64() > r1.end_lsn.as_u64(),
1709 "second start ({:?}) must follow first end ({:?})",
1710 r2.start_lsn,
1711 r1.end_lsn
1712 );
1713 assert!(
1714 r2.end_lsn.as_u64() > r2.start_lsn.as_u64(),
1715 "second end ({:?}) must follow second start ({:?})",
1716 r2.end_lsn,
1717 r2.start_lsn
1718 );
1719 }
1720
1721 /// REC-F1 reproduce-first: every `do_checkpoint` path must make the
1722 /// `CkptEnd` entry durable with an fsync BEFORE the cleaner barrier is
1723 /// advanced. JE Checkpointer.doCheckpoint (~line 895) calls
1724 /// `logManager.logForceFlush(endEntry, true /*fsyncRequired*/, ...)`
1725 /// with the comment "We must flush and fsync to ensure that cleaned
1726 /// files are not referenced. This also ensures that this checkpoint is
1727 /// not wasted if we crash." Without the fsync, an auto/daemon
1728 /// checkpoint advances the safe-to-delete barrier off a non-durable
1729 /// checkpoint — a crash can then lose committed/migrated data.
1730 #[test]
1731 fn test_do_checkpoint_fsyncs_ckpt_end() {
1732 use noxu_log::{FileManager, LogManager};
1733 use std::sync::Arc;
1734 use tempfile::TempDir;
1735
1736 let dir = TempDir::new().unwrap();
1737 let fm = Arc::new(
1738 FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
1739 );
1740 let lm =
1741 Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
1742
1743 let checkpointer = Checkpointer::new(CheckpointConfig::default())
1744 .with_log_manager(Arc::clone(&lm));
1745
1746 let before = lm.fsync_count();
1747 checkpointer.do_checkpoint("daemon").unwrap();
1748 let after = lm.fsync_count();
1749
1750 assert!(
1751 after > before,
1752 "do_checkpoint must fsync the CkptEnd entry (JE logForceFlush \
1753 fsyncRequired=true); fsync_count before={before} after={after}"
1754 );
1755 }
1756
1757 // -----------------------------------------------------------------------
1758 // Tests for new methods: wakeup_after_write, is_checkpointed,
1759 // persist_file_summaries
1760 // -----------------------------------------------------------------------
1761
1762 /// `wakeup_after_write` triggers a checkpoint once accumulated bytes
1763 /// exceed the configured threshold.
1764 #[test]
1765 fn test_wakeup_after_write_triggers_checkpoint() {
1766 let checkpointer = Checkpointer::new(CheckpointConfig::default())
1767 .with_bytes_interval(100); // tiny threshold for testing
1768
1769 // Initial state: no checkpoints performed yet.
1770 assert_eq!(checkpointer.stats.checkpoints.load(Ordering::Relaxed), 0);
1771
1772 // Write 99 bytes — below threshold; no checkpoint yet.
1773 checkpointer.wakeup_after_write(99);
1774 assert_eq!(
1775 checkpointer.stats.checkpoints.load(Ordering::Relaxed),
1776 0,
1777 "no checkpoint should fire below the threshold"
1778 );
1779
1780 // Write 1 more byte — reaches threshold; checkpoint fires.
1781 checkpointer.wakeup_after_write(1);
1782 assert_eq!(
1783 checkpointer.stats.checkpoints.load(Ordering::Relaxed),
1784 1,
1785 "exactly one checkpoint should fire when threshold is reached"
1786 );
1787
1788 // Counter should have been reset; another 100 bytes should trigger again.
1789 checkpointer.wakeup_after_write(100);
1790 assert_eq!(
1791 checkpointer.stats.checkpoints.load(Ordering::Relaxed),
1792 2,
1793 "second checkpoint should fire after counter reset"
1794 );
1795 }
1796
1797 /// `wakeup_after_write` with interval=0 is a no-op.
1798 #[test]
1799 fn test_wakeup_after_write_disabled_when_interval_zero() {
1800 let checkpointer = Checkpointer::new(CheckpointConfig::default())
1801 .with_bytes_interval(0);
1802
1803 checkpointer.wakeup_after_write(u64::MAX);
1804 assert_eq!(
1805 checkpointer.stats.checkpoints.load(Ordering::Relaxed),
1806 0,
1807 "no checkpoint should fire when interval is 0"
1808 );
1809 }
1810
1811 /// CLN-14: `wakeup_after_no_writes` wakes a daemon-style thread blocked in
1812 /// `wait_for_shutdown_or_timeout` PROMPTLY — well under the (long) sleep
1813 /// interval — without setting the shutdown flag. This is the primitive
1814 /// the cleaner's wakeup callback uses so cleaned files are deleted at the
1815 /// next early checkpoint instead of after the full wakeup interval
1816 /// (default 60 s).
1817 ///
1818 /// JE: `Checkpointer.wakeupAfterNoWrites` sets a flag and wakes the
1819 /// daemon; the daemon re-checks `isRunnable` (`needCheckpointForCleanedFiles`).
1820 #[test]
1821 fn test_cln14_wakeup_after_no_writes_wakes_daemon_promptly() {
1822 use std::time::{Duration, Instant};
1823
1824 let checkpointer =
1825 Arc::new(Checkpointer::new(CheckpointConfig::default()));
1826
1827 // A daemon-style thread that sleeps for a "60 s" interval on the
1828 // condvar, exactly like the EnvironmentImpl checkpointer daemon.
1829 let ckpt = Arc::clone(&checkpointer);
1830 let woke = Arc::new(AtomicBool::new(false));
1831 let woke2 = Arc::clone(&woke);
1832 let start = Instant::now();
1833 let handle = std::thread::spawn(move || {
1834 ckpt.wait_for_shutdown_or_timeout(Duration::from_secs(60));
1835 woke2.store(true, Ordering::Release);
1836 // The wake must NOT be a shutdown — the daemon would keep running.
1837 assert!(
1838 !ckpt.is_shutdown(),
1839 "wakeup_after_no_writes must not set the shutdown flag"
1840 );
1841 });
1842
1843 // Give the daemon a moment to enter the wait, then wake it.
1844 std::thread::sleep(Duration::from_millis(50));
1845 checkpointer.wakeup_after_no_writes();
1846
1847 handle.join().unwrap();
1848 let elapsed = start.elapsed();
1849 assert!(woke.load(Ordering::Acquire), "daemon thread must have woken");
1850 assert!(
1851 elapsed < Duration::from_secs(5),
1852 "CLN-14: daemon must wake promptly ({:?}), not after the 60 s interval",
1853 elapsed
1854 );
1855 }
1856
1857 /// `is_checkpointed` returns `false` for a BIN whose `last_full_lsn` is
1858 /// NULL_LSN (never checkpointed) and `true` after setting a non-NULL LSN.
1859 #[test]
1860 fn test_is_runnable_idle_guard() {
1861 // The daemon must NOT checkpoint an idle environment every wakeup.
1862 // is_runnable(false) is false until the relevant interval trips; force
1863 // is always true.
1864 //
1865 // REC-D: when the byte interval is set (non-zero) it takes precedence
1866 // (JE getWakeupPeriod / isRunnable: useTimeInterval stays 0), so a
1867 // sub-interval write is NOT runnable — only crossing the byte interval
1868 // is.
1869 let cp = Checkpointer::new(CheckpointConfig::default())
1870 .with_bytes_interval(1024);
1871 // Idle: nothing written since the last checkpoint.
1872 assert!(!cp.is_runnable(false), "idle env must not be runnable");
1873 // Force always runs (JE config.getForce()).
1874 assert!(cp.is_runnable(true), "force must always be runnable");
1875 // A sub-interval write is NOT runnable when a byte interval is set
1876 // (bytes takes precedence over time per JE isRunnable).
1877 cp.note_bytes_for_test(100);
1878 assert!(
1879 !cp.is_runnable(false),
1880 "sub-interval write must not be runnable when a byte interval is set \
1881 (REC-D: bytes takes precedence over the time branch)"
1882 );
1883 // Crossing the byte interval is runnable.
1884 cp.note_bytes_for_test(2000);
1885 assert!(cp.is_runnable(false));
1886 }
1887
1888 /// REC-D: when the byte interval is DISABLED (0) the time branch applies
1889 /// — any write since the last checkpoint makes the daemon runnable on its
1890 /// next wakeup (JE isRunnable useTimeInterval branch with the
1891 /// `lastUsedLsn != lastCheckpointEnd` idle-guard).
1892 #[test]
1893 fn test_is_runnable_time_branch_when_bytes_disabled() {
1894 let cp = Checkpointer::new(CheckpointConfig::default())
1895 .with_bytes_interval(0); // bytes disabled → time-based
1896 // Idle: nothing written → not runnable (idle-guard).
1897 assert!(!cp.is_runnable(false), "idle time-based env must not run");
1898 // Any write makes it runnable on the next wakeup tick.
1899 cp.note_bytes_for_test(1);
1900 assert!(
1901 cp.is_runnable(false),
1902 "time branch: a write since the last checkpoint makes it runnable"
1903 );
1904 }
1905
1906 #[test]
1907 fn test_is_checkpointed() {
1908 use noxu_tree::tree::{BinStub, TreeNode};
1909 use parking_lot::RwLock as NodeRwLock;
1910
1911 // Build a BIN node with last_full_lsn = NULL_LSN.
1912 let bin = BinStub {
1913 node_id: 1,
1914 level: 0,
1915 entries: vec![],
1916 key_prefix: vec![],
1917 dirty: false,
1918 is_delta: false,
1919 last_full_lsn: noxu_util::NULL_LSN,
1920 last_delta_lsn: noxu_util::NULL_LSN,
1921 generation: 0,
1922 parent: None,
1923 // St-H6: test-only BIN; use true to match the engine-wide
1924 // hours-only invariant and avoid any accidental comparison with
1925 // a non-zero expiration_time.
1926 expiration_in_hours: true,
1927 cursor_count: 0,
1928 prohibit_next_delta: false,
1929 lsn_rep: noxu_tree::tree::LsnRep::Empty,
1930 keys: noxu_tree::tree::KeyRep::new(),
1931 compact_max_key_length:
1932 noxu_tree::tree::INKeyRep_DEFAULT_MAX_KEY_LENGTH,
1933 };
1934 let node = NodeRwLock::new(TreeNode::Bottom(bin));
1935
1936 // Not yet checkpointed.
1937 assert!(
1938 !Checkpointer::is_checkpointed(&node),
1939 "fresh BIN should not be checkpointed"
1940 );
1941
1942 // Simulate a checkpoint by setting last_full_lsn.
1943 {
1944 let mut guard = node.write();
1945 if let TreeNode::Bottom(ref mut b) = *guard {
1946 b.last_full_lsn = Lsn::new(1, 100);
1947 }
1948 }
1949
1950 assert!(
1951 Checkpointer::is_checkpointed(&node),
1952 "BIN should be checkpointed after last_full_lsn is set"
1953 );
1954 }
1955
1956 /// `persist_file_summaries` returns Ok(()) without panicking.
1957 #[test]
1958 fn test_persist_file_summaries_is_ok() {
1959 let checkpointer = Checkpointer::new(CheckpointConfig::default());
1960 assert!(checkpointer.persist_file_summaries().is_ok());
1961 }
1962
1963 /// `persist_file_summaries` with a wired UtilizationTracker actually writes
1964 /// a `FileSummaryLN` entry to the WAL.
1965 ///
1966 /// Wires a real LogManager + UtilizationTracker, calls
1967 /// `persist_file_summaries()`, then scans the log file with
1968 /// `LogFileReader` to verify at least one `FileSummaryLN` entry was
1969 /// written.
1970 #[test]
1971 fn test_persist_file_summaries_writes_file_summary_ln_to_log() {
1972 use noxu_cleaner::UtilizationTracker;
1973 use noxu_log::{FileManager, LogEntryType, LogFileReader};
1974 use tempfile::TempDir;
1975
1976 let dir = TempDir::new().unwrap();
1977 let fm = Arc::new(
1978 FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
1979 );
1980 let lm =
1981 Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
1982
1983 // Populate the tracker with a non-empty file summary so something is
1984 // written when persist_file_summaries() is called.
1985 let mut tracker = UtilizationTracker::new(true);
1986 tracker.count_new_log_entry(0, 128, true, false);
1987 tracker.track_obsolete(0, 64, 64, true);
1988 let tracker_arc = Arc::new(Mutex::new(tracker));
1989
1990 let checkpointer = Checkpointer::new(CheckpointConfig::default())
1991 .with_log_manager(Arc::clone(&lm))
1992 .with_utilization_tracker(Arc::clone(&tracker_arc));
1993
1994 checkpointer.persist_file_summaries().unwrap();
1995
1996 // Flush to disk so the reader can see the bytes.
1997 lm.flush_sync().unwrap();
1998
1999 // Scan all log entries in file 0 and look for FileSummaryLN.
2000 let mut reader = LogFileReader::open(Arc::clone(&fm), 0).unwrap();
2001 let mut found = false;
2002 while let Some((_lsn, entry_type, _payload)) = reader.read_next() {
2003 if entry_type == LogEntryType::FileSummaryLN {
2004 found = true;
2005 break;
2006 }
2007 }
2008 assert!(
2009 found,
2010 "expected a FileSummaryLN entry in the log after persist_file_summaries()"
2011 );
2012 }
2013
2014 /// Checkpoint with a real tree flushes dirty BINs — step 4.
2015 ///
2016 /// Inserts a few keys (marking BIN slots dirty), then runs a checkpoint
2017 /// and verifies the dirty count drops to zero after the checkpoint writes
2018 /// BIN/BINDelta entries to the WAL.
2019 #[test]
2020 fn test_checkpoint_flushes_dirty_bins() {
2021 use noxu_log::FileManager;
2022 use noxu_tree::tree::Tree;
2023 use noxu_util::lsn::Lsn;
2024 use tempfile::TempDir;
2025
2026 let dir = TempDir::new().unwrap();
2027 let fm = Arc::new(
2028 FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
2029 );
2030 let lm =
2031 Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
2032
2033 // Build a tree with dirty BINs.
2034 let tree = Tree::new(1, 256);
2035 tree.insert(b"apple".to_vec(), b"fruit".to_vec(), Lsn::new(1, 1))
2036 .unwrap();
2037 tree.insert(b"banana".to_vec(), b"fruit".to_vec(), Lsn::new(1, 2))
2038 .unwrap();
2039 tree.insert(b"cherry".to_vec(), b"fruit".to_vec(), Lsn::new(1, 3))
2040 .unwrap();
2041
2042 let tree_arc = Arc::new(RwLock::new(tree));
2043
2044 // Verify dirty BINs exist before checkpoint.
2045 let dirty_before = tree_arc.read().unwrap().collect_dirty_bins(1);
2046 assert!(
2047 !dirty_before.is_empty(),
2048 "should have dirty BINs before checkpoint"
2049 );
2050
2051 let checkpointer = Checkpointer::new(CheckpointConfig::default())
2052 .with_log_manager(Arc::clone(&lm))
2053 .with_tree(Arc::clone(&tree_arc), 1);
2054
2055 let result = checkpointer.do_checkpoint("test").unwrap();
2056 assert!(
2057 result.total_nodes_flushed() > 0,
2058 "checkpoint should flush dirty BINs"
2059 );
2060
2061 // After checkpoint, dirty BINs should be cleared.
2062 let dirty_after = tree_arc.read().unwrap().collect_dirty_bins(1);
2063 assert!(dirty_after.is_empty(), "no dirty BINs after checkpoint");
2064 }
2065
2066 /// X-8 regression: checkpointer must not write a redundant empty BINDelta
2067 /// for a node that the evictor already flushed and cleared between the
2068 /// dirty-BIN snapshot and the per-node write-lock acquisition.
2069 ///
2070 /// Simulates the race by:
2071 /// 1. Building a tree with dirty BINs.
2072 /// 2. Collecting the dirty-BIN snapshot (as the checkpointer would).
2073 /// 3. Acquiring each BIN's write lock and calling
2074 /// `clear_dirty_after_full_log` (simulating the evictor flushing).
2075 /// 4. Running `flush_dirty_bins_internal` and asserting that zero
2076 /// BINDelta or full-BIN entries are written (nothing left to flush).
2077 #[test]
2078 fn test_x8_no_redundant_bindelta_after_evictor_flush() {
2079 use noxu_log::FileManager;
2080 use noxu_tree::tree::Tree;
2081 use noxu_util::lsn::Lsn;
2082 use tempfile::TempDir;
2083
2084 let dir = TempDir::new().unwrap();
2085 let fm = Arc::new(
2086 FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
2087 );
2088 let lm =
2089 Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
2090
2091 // Build a tree with a dirty BIN.
2092 let tree = Tree::new(1, 256);
2093 tree.insert(b"alpha".to_vec(), b"v1".to_vec(), Lsn::new(1, 1)).unwrap();
2094 tree.insert(b"beta".to_vec(), b"v2".to_vec(), Lsn::new(1, 2)).unwrap();
2095
2096 let tree_arc = Arc::new(RwLock::new(tree));
2097
2098 // Snapshot dirty BINs (as the checkpointer does under tree read lock).
2099 let dirty_bins = tree_arc.read().unwrap().collect_dirty_bins(1);
2100 assert!(!dirty_bins.is_empty(), "precondition: must have dirty BINs");
2101
2102 // Simulate the evictor flushing every dirty BIN (writes a full BIN
2103 // entry to WAL and clears the dirty flag) BEFORE the checkpointer
2104 // acquires the per-node write lock.
2105 let evictor_lsn = Lsn::new(2, 0); // fake "evictor-wrote" LSN
2106 for (_db_id, bin_arc) in &dirty_bins {
2107 let mut guard = bin_arc.write();
2108 if let TreeNode::Bottom(ref mut b) = *guard {
2109 // Mark the BIN as "already flushed" by the evictor.
2110 b.clear_dirty_after_full_log(evictor_lsn);
2111 }
2112 }
2113
2114 // Now build the checkpointer and run the internal flush over the
2115 // stale snapshot (all BINs are now clean).
2116 let checkpointer = Checkpointer::new(CheckpointConfig::default())
2117 .with_log_manager(Arc::clone(&lm))
2118 .with_tree(Arc::clone(&tree_arc), 1);
2119
2120 let result = checkpointer
2121 .flush_dirty_bins_internal()
2122 .expect("flush_dirty_bins_internal failed");
2123
2124 // X-8 fix: with the guard `if !b.dirty && dirty == 0 { continue; }`,
2125 // the checkpointer must skip the already-clean BINs entirely. No
2126 // BINDelta or full-BIN entries should be written.
2127 assert_eq!(
2128 result.delta_ins_flushed, 0,
2129 "X-8: checkpointer must not write a redundant BINDelta for a BIN the evictor already flushed"
2130 );
2131 assert_eq!(
2132 result.full_bins_flushed, 0,
2133 "X-8: checkpointer must not write a redundant full-BIN for a BIN the evictor already flushed"
2134 );
2135 }
2136
2137 /// L-5-delta: when the checkpointer logs a BIN-delta that supersedes a
2138 /// PRIOR delta (`prev_delta_lsn != NULL`), that prior delta's LSN (JE
2139 /// `auxOldLsn`) must be counted obsolete via the wired UtilizationTracker
2140 /// using the dups-allowed variant (size 0, count_as_ln = false), BEFORE
2141 /// persist_file_summaries so it lands in this checkpoint's FileSummaryLN.
2142 ///
2143 /// FAIL-PRE: the prior delta LSN was never counted obsolete — the
2144 /// superseded BIN-delta version leaked, and the cleaner under-counted the
2145 /// obsolete bytes in that file.
2146 ///
2147 /// PASS-POST: the prior delta's file/offset shows one obsolete IN
2148 /// (`obsolete_in_count == 1`) and the offset is tracked.
2149 ///
2150 /// JE: IN.java auxOldLsn -> LogManager.serialLogWork
2151 /// countObsoleteNodeDupsAllowed.
2152 #[test]
2153 fn test_l5_delta_counts_prior_delta_obsolete() {
2154 use noxu_cleaner::UtilizationTracker;
2155 use noxu_log::FileManager;
2156 use noxu_tree::tree::{Tree, TreeNode};
2157 use noxu_util::lsn::Lsn;
2158 use tempfile::TempDir;
2159
2160 let dir = TempDir::new().unwrap();
2161 let fm = Arc::new(
2162 FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
2163 );
2164 let lm =
2165 Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
2166
2167 // Build a tree with a BIN holding several keys.
2168 let tree = Tree::new(1, 256);
2169 for i in 0u16..16 {
2170 let key = format!("key{i:03}");
2171 tree.insert(
2172 key.into_bytes(),
2173 b"v".to_vec(),
2174 Lsn::new(1, i as u32 + 1),
2175 )
2176 .unwrap();
2177 }
2178 let tree_arc = Arc::new(RwLock::new(tree));
2179
2180 // Put the BIN into the "already has a full version AND a prior delta"
2181 // state, then dirty exactly ONE slot so `should_log_delta` chooses the
2182 // delta path (numDeltas=1 <= delta_limit at the default percent). The
2183 // prior delta LSN we install here is the auxOldLsn that must be
2184 // counted obsolete when the new delta is logged.
2185 let prior_delta_lsn = Lsn::new(3, 4096);
2186 let dirty_bins = tree_arc.read().unwrap().collect_dirty_bins(1);
2187 assert!(!dirty_bins.is_empty(), "precondition: dirty BINs");
2188 for (_db, bin_arc) in &dirty_bins {
2189 let mut guard = bin_arc.write();
2190 if let TreeNode::Bottom(ref mut b) = *guard {
2191 // Mark clean first, then set the prior full + prior delta
2192 // chain and re-dirty exactly one slot.
2193 b.clear_dirty_after_full_log(Lsn::new(2, 100));
2194 b.last_delta_lsn = prior_delta_lsn;
2195 b.is_delta = true;
2196 b.prohibit_next_delta = false;
2197 b.dirty = true;
2198 if let Some(e) = b.entries.first_mut() {
2199 e.dirty = true;
2200 }
2201 }
2202 }
2203
2204 let mut tracker = UtilizationTracker::new(true);
2205 // Seed file 3 so the obsolete counting has a summary to update.
2206 tracker.count_new_log_entry(3, 64, false, true);
2207 let tracker_arc = Arc::new(Mutex::new(tracker));
2208
2209 let checkpointer = Checkpointer::new(CheckpointConfig::default())
2210 .with_log_manager(Arc::clone(&lm))
2211 .with_tree(Arc::clone(&tree_arc), 1)
2212 .with_utilization_tracker(Arc::clone(&tracker_arc));
2213
2214 let result = checkpointer
2215 .flush_dirty_bins_internal()
2216 .expect("flush_dirty_bins_internal failed");
2217
2218 // The checkpointer must have taken the delta path (producer exists).
2219 assert_eq!(
2220 result.delta_ins_flushed, 1,
2221 "L-5-delta: checkpointer must log exactly one BIN-delta here"
2222 );
2223
2224 // PASS-POST: the prior delta (auxOldLsn) was counted obsolete in its
2225 // file (3) as an IN, size 0, offset tracked.
2226 let tracker = tracker_arc.lock();
2227 let summary = tracker
2228 .get_tracked_summary(prior_delta_lsn.file_number())
2229 .expect("file 3 summary must exist after counting the prior delta");
2230 assert_eq!(
2231 summary.get_summary().obsolete_in_count,
2232 1,
2233 "L-5-delta: the superseded prior BIN-delta must be counted obsolete \
2234 (obsolete_in_count == 1); was the auxOldLsn wiring dropped?"
2235 );
2236 assert!(
2237 summary
2238 .get_obsolete_offsets()
2239 .contains(&prior_delta_lsn.file_offset()),
2240 "L-5-delta: the prior delta's offset must be tracked obsolete"
2241 );
2242 }
2243
2244 // -----------------------------------------------------------------------
2245 // CC-4: get_eviction_provisional tests (per-tree after residual fix)
2246 // -----------------------------------------------------------------------
2247
2248 /// CC-4 acceptance test 1: Provisional::No when no checkpoint is in
2249 /// progress, regardless of db_id or node level.
2250 ///
2251 /// JE ref: coordinateEvictionWithCheckpoint — if no checkpoint is active,
2252 /// evicted nodes are logged non-provisionally.
2253 #[test]
2254 fn test_cc4_no_checkpoint_in_progress_yields_provisional_no() {
2255 let ckpt = Checkpointer::new(CheckpointConfig::default());
2256 assert_eq!(
2257 ckpt.get_eviction_provisional(1, 1),
2258 Provisional::No,
2259 "CC-4: no checkpoint in progress must yield Provisional::No"
2260 );
2261 assert_eq!(ckpt.get_eviction_provisional(1, 2), Provisional::No);
2262 }
2263
2264 /// CC-4 acceptance test 2: Provisional::Yes when a checkpoint is in
2265 /// progress and the node's level is below the tree's max flush level.
2266 ///
2267 /// JE ref: coordinateEvictionWithCheckpoint — node.level < highestFlushLevel
2268 /// (for THIS db) => Provisional::YES.
2269 #[test]
2270 fn test_cc4_below_max_flush_level_yields_provisional_yes() {
2271 let ckpt = Checkpointer::new(CheckpointConfig::default());
2272 ckpt.checkpoint_in_progress.store(true, Ordering::Release);
2273 ckpt.checkpoint_flush_levels.lock().unwrap().insert(42u64, 2i32);
2274
2275 assert_eq!(
2276 ckpt.get_eviction_provisional(42, 1),
2277 Provisional::Yes,
2278 "CC-4: BIN below tree's max_flush_level must yield Provisional::Yes"
2279 );
2280
2281 ckpt.checkpoint_in_progress.store(false, Ordering::Release);
2282 ckpt.checkpoint_flush_levels.lock().unwrap().clear();
2283 }
2284
2285 /// CC-4 acceptance test 3: Provisional::No when the node's level is at or
2286 /// above the tree's max flush level.
2287 ///
2288 /// JE ref: coordinateEvictionWithCheckpoint — node.level >= highestFlushLevel
2289 /// => Provisional::NO.
2290 #[test]
2291 fn test_cc4_at_or_above_max_flush_level_yields_provisional_no() {
2292 let ckpt = Checkpointer::new(CheckpointConfig::default());
2293 ckpt.checkpoint_in_progress.store(true, Ordering::Release);
2294 ckpt.checkpoint_flush_levels.lock().unwrap().insert(42u64, 2i32);
2295
2296 assert_eq!(
2297 ckpt.get_eviction_provisional(42, 2),
2298 Provisional::No,
2299 "CC-4: node at max_flush_level must yield Provisional::No"
2300 );
2301 assert_eq!(
2302 ckpt.get_eviction_provisional(42, 3),
2303 Provisional::No,
2304 "CC-4: node above max_flush_level must yield Provisional::No"
2305 );
2306
2307 ckpt.checkpoint_in_progress.store(false, Ordering::Release);
2308 ckpt.checkpoint_flush_levels.lock().unwrap().clear();
2309 }
2310
2311 /// CC-4 residual acceptance test: a BIN from tree A (no dirty upper INs)
2312 /// must NOT be logged Provisional::Yes even when tree B has dirty upper INs
2313 /// at a higher level. This is the exact scenario that caused data loss
2314 /// with the old global `AtomicI32`.
2315 ///
2316 /// Fail-pre: on `origin/main` (global level) `get_eviction_provisional(DB_A, 1)`
2317 /// returned `Provisional::Yes` — a lie, no covering ancestor was written for
2318 /// tree A. Pass-post: per-tree lookup returns `Provisional::No` for tree A.
2319 ///
2320 /// JE ref: DirtyINMap.getHighestFlushLevel returns IN.MIN_LEVEL (0) for a
2321 /// DatabaseImpl absent from highestFlushLevels → comparison false → NO.
2322 #[test]
2323 fn test_cc4_residual_tree_a_no_upper_ins_yields_provisional_no() {
2324 const DB_A: u64 = 1; // only BINs dirty; no dirty upper INs
2325 const DB_B: u64 = 2; // has dirty upper INs at level 2
2326
2327 let ckpt = Checkpointer::new(CheckpointConfig::default());
2328 ckpt.checkpoint_in_progress.store(true, Ordering::Release);
2329
2330 // Only tree B gets an entry in the per-tree flush levels map.
2331 ckpt.checkpoint_flush_levels.lock().unwrap().insert(DB_B, 2i32);
2332
2333 // Tree A's BIN (level 1) must be non-provisional: no covering ancestor.
2334 assert_eq!(
2335 ckpt.get_eviction_provisional(DB_A, 1),
2336 Provisional::No,
2337 "CC-4 residual: tree A has no dirty upper INs; BIN must be \
2338 Provisional::No (not covered by any ancestor in this checkpoint)"
2339 );
2340
2341 // Tree B's BIN (level 1) may be provisional: its level-2 ancestor will
2342 // be written non-provisionally.
2343 assert_eq!(
2344 ckpt.get_eviction_provisional(DB_B, 1),
2345 Provisional::Yes,
2346 "CC-4: tree B has a dirty upper IN at level 2; BIN must be Provisional::Yes"
2347 );
2348
2349 ckpt.checkpoint_in_progress.store(false, Ordering::Release);
2350 ckpt.checkpoint_flush_levels.lock().unwrap().clear();
2351 }
2352
2353 /// CC-4: CheckpointGuard clears the flush_levels map on drop.
2354 #[test]
2355 fn test_cc4_guard_resets_max_flush_level() {
2356 let flag = AtomicBool::new(true);
2357 let levels: std::sync::Mutex<HashMap<u64, i32>> =
2358 std::sync::Mutex::new(HashMap::from([(7u64, 5i32)]));
2359 {
2360 let _guard = CheckpointGuard { flag: &flag, flush_levels: &levels };
2361 }
2362 assert!(levels.lock().unwrap().is_empty(), "guard must clear map");
2363 assert!(!flag.load(Ordering::Acquire));
2364 }
2365
2366 // -----------------------------------------------------------------------
2367 // REC-D: the configured bytes-interval must reach the runnable gate
2368 // (not the hardcoded 10 MiB default).
2369 // -----------------------------------------------------------------------
2370
2371 /// REC-D fail-pre/pass-post: a Checkpointer built with a configured
2372 /// bytes-interval must use THAT value in `is_runnable`, not the hardcoded
2373 /// 10 MiB. JE Checkpointer ctor:
2374 /// `logSizeBytesInterval = configManager.getLong(CHECKPOINTER_BYTES_INTERVAL)`
2375 /// and `isRunnable` compares the bytes-since-checkpoint against it.
2376 ///
2377 /// Fail-pre: before REC-D the env wired only `CheckpointConfig.bytes_interval`
2378 /// (a field `is_runnable` never reads) while the gate used the hardcoded
2379 /// `checkpoint_bytes_interval = 10 MiB`. A 1 KiB configured interval would
2380 /// NOT trip the gate at 1 KiB of writes. Pass-post: `with_bytes_interval`
2381 /// threads the configured value into the gate.
2382 #[test]
2383 fn test_rec_d_configured_bytes_interval_drives_runnable() {
2384 // Configure a 1 KiB interval (far below the old 10 MiB default).
2385 let cp = Checkpointer::new(CheckpointConfig::default())
2386 .with_bytes_interval(1024);
2387
2388 // Just below the configured interval: not runnable.
2389 cp.note_bytes_for_test(1000);
2390 assert!(
2391 !cp.is_runnable(false),
2392 "REC-D: 1000 bytes < configured 1 KiB interval must not be runnable"
2393 );
2394
2395 // Cross the configured interval: runnable. (With the old hardcoded
2396 // 10 MiB default this would stay false until 10 MiB of writes.)
2397 cp.note_bytes_for_test(100);
2398 assert!(
2399 cp.is_runnable(false),
2400 "REC-D: crossing the configured 1 KiB interval must be runnable; \
2401 the gate must use the configured interval, not 10 MiB"
2402 );
2403 }
2404
2405 // -----------------------------------------------------------------------
2406 // REC-F: an idle environment with cleaner-pending files must trigger a
2407 // checkpoint (JE wakeupAfterNoWrites / needCheckpointForCleanedFiles).
2408 // -----------------------------------------------------------------------
2409
2410 /// REC-F fail-pre/pass-post: with no bytes written since the last
2411 /// checkpoint, `is_runnable(false)` must still return true when the
2412 /// cleaner reports files pending reclaim (CLEANED set non-empty).
2413 ///
2414 /// JE `Checkpointer.isRunnable`:
2415 /// `if (wakeupAfterNoWrites && needCheckpointForCleanedFiles()) return true;`
2416 /// where `needCheckpointForCleanedFiles()` →
2417 /// `cleaner.getFileSelector().isCheckpointNeeded()`.
2418 ///
2419 /// Fail-pre: before REC-F `is_runnable` consulted only bytes; an idle env
2420 /// with cleaned-but-unreclaimed files returned false, so reclamation
2421 /// stalled until the next write-driven checkpoint.
2422 #[test]
2423 fn test_rec_f_idle_env_with_cleaner_pending_is_runnable() {
2424 use noxu_cleaner::Cleaner;
2425 use std::sync::Arc;
2426
2427 let cleaner = Arc::new(Cleaner::new(50, 1, 0));
2428
2429 let cp = Checkpointer::new(CheckpointConfig::default())
2430 .with_bytes_interval(1024)
2431 .with_cleaner(Arc::clone(&cleaner));
2432
2433 // Idle environment: nothing written since the last checkpoint, no
2434 // cleaned files yet.
2435 assert!(
2436 !cp.is_runnable(false),
2437 "REC-F precondition: idle env with no pending files must not be runnable"
2438 );
2439
2440 // Simulate the cleaner cleaning a file: it moves to the CLEANED state
2441 // (cleaned-but-not-checkpointed). A checkpoint is now needed to
2442 // advance the deletion barrier.
2443 {
2444 let mut selector = cleaner.get_file_selector().lock();
2445 selector.add_file_to_clean(7);
2446 selector.mark_file_cleaned(7);
2447 }
2448
2449 assert!(
2450 cleaner.is_checkpoint_needed(),
2451 "REC-F: cleaner must report a checkpoint is needed for the CLEANED file"
2452 );
2453 // Still no bytes written, but the idle-reclaim trigger fires.
2454 assert!(
2455 cp.is_runnable(false),
2456 "REC-F: idle env with cleaner-pending files must be runnable \
2457 (JE wakeupAfterNoWrites / needCheckpointForCleanedFiles)"
2458 );
2459 }
2460
2461 // -----------------------------------------------------------------------
2462 // REC-G: init_intervals seeds the interval baselines from a recovered
2463 // checkpoint (JE Checkpointer.initIntervals).
2464 // -----------------------------------------------------------------------
2465
2466 /// REC-G fail-pre/pass-post: after recovery the checkpointer's interval
2467 /// baselines must equal the recovered CkptEnd LSNs, not NULL_LSN.
2468 ///
2469 /// Fail-pre: a freshly-constructed Checkpointer has
2470 /// `last_checkpoint_start`/`_end` == NULL_LSN, so the first post-recovery
2471 /// interval is measured from process start. Pass-post: `init_intervals`
2472 /// seeds them from the recovered CkptEnd.
2473 ///
2474 /// JE ref: `Checkpointer.initIntervals(lastCheckpointStart,
2475 /// lastCheckpointEnd, lastCheckpointMillis)`.
2476 #[test]
2477 fn test_rec_g_init_intervals_seeds_baselines() {
2478 let cp = Checkpointer::new(CheckpointConfig::default());
2479 // Fail-pre baseline: fresh checkpointer starts at NULL_LSN.
2480 assert_eq!(cp.get_last_checkpoint_start(), noxu_util::NULL_LSN);
2481 assert_eq!(cp.get_last_checkpoint_end(), noxu_util::NULL_LSN);
2482
2483 // Simulate recovery surfacing a CkptEnd at (start=4:400, end=5:500).
2484 let recovered_start = Lsn::new(4, 400);
2485 let recovered_end = Lsn::new(5, 500);
2486 // Pretend the env wrote some pre-crash bytes before recovery.
2487 cp.note_bytes_for_test(9999);
2488
2489 cp.init_intervals(recovered_start, recovered_end);
2490
2491 assert_eq!(
2492 cp.get_last_checkpoint_start(),
2493 recovered_start,
2494 "REC-G: baseline start must equal recovered CkptEnd start"
2495 );
2496 assert_eq!(
2497 cp.get_last_checkpoint_end(),
2498 recovered_end,
2499 "REC-G: baseline end must equal recovered CkptEnd end"
2500 );
2501 // The byte accumulator is reset so pre-crash volume does not
2502 // immediately trip the runnable gate.
2503 assert!(
2504 !cp.is_runnable(false),
2505 "REC-G: byte accumulator must reset on init_intervals"
2506 );
2507 }
2508
2509 // -----------------------------------------------------------------------
2510 // REC-H: set_checkpoint_id continues the sequence after recovery
2511 // (JE Checkpointer.setCheckpointId).
2512 // -----------------------------------------------------------------------
2513
2514 /// REC-H fail-pre/pass-post: after recovery the next checkpoint ID must
2515 /// continue from the recovered CkptEnd id, not restart at 1.
2516 ///
2517 /// Fail-pre: a fresh Checkpointer's first checkpoint id is 1, colliding
2518 /// with pre-crash ids. Pass-post: `set_checkpoint_id(recovered_id)` makes
2519 /// the next emitted id `recovered_id + 1`.
2520 ///
2521 /// JE ref: `Checkpointer.setCheckpointId(lastCheckpointId)`.
2522 #[test]
2523 fn test_rec_h_set_checkpoint_id_continues_sequence() {
2524 let cp = Checkpointer::new(CheckpointConfig::default());
2525 // Fail-pre: a fresh checkpointer would issue id 1.
2526 assert_eq!(cp.peek_next_checkpoint_id(), 1);
2527
2528 // Recovery found a CkptEnd with id 42.
2529 cp.set_checkpoint_id(42);
2530 assert_eq!(
2531 cp.peek_next_checkpoint_id(),
2532 43,
2533 "REC-H: next checkpoint id must be recovered_id + 1"
2534 );
2535
2536 // The next checkpoint must use 43, not 1.
2537 let result = cp.do_checkpoint("post_recovery").unwrap();
2538 assert_eq!(
2539 result.checkpoint_id, 43,
2540 "REC-H: post-recovery checkpoint id must continue the sequence"
2541 );
2542 }
2543
2544 // -----------------------------------------------------------------------
2545 // REC-AA: the highest-flush-level is max(dirty-upper-IN-level) + 1,
2546 // bounded by the root level (JE DirtyINMap.updateFlushLevels).
2547 // -----------------------------------------------------------------------
2548
2549 /// REC-AA fail-pre/pass-post: the per-tree highest flush level recorded
2550 /// for eviction coordination must be `max(dirty-upper-IN-level) + 1`
2551 /// (bounded by the root level), so a BIN evicted during the checkpoint is
2552 /// logged `Provisional::Yes` (covered by a non-provisional ancestor).
2553 ///
2554 /// Fail-pre: before REC-AA `collect_dirty_upper_ins` returned a
2555 /// root-relative depth (root=0) instead of the node's tree level, so the
2556 /// flush-levels map held tiny depths (1, 2) while the evictor compared the
2557 /// BIN's real `BIN_LEVEL` (`MAIN_LEVEL|1`); `BIN_LEVEL < 2` is always false
2558 /// → every BIN was logged `Provisional::No`, and the JE `+1` adjustment
2559 /// was absent entirely.
2560 ///
2561 /// Pass-post: levels are real tree levels, the recorded flush level is
2562 /// `max_dirty_upper_in_level + 1` bounded by the root, and a BIN at
2563 /// `BIN_LEVEL` gets `Provisional::Yes`.
2564 ///
2565 /// JE ref: `DirtyINMap.updateFlushLevels` (`(ckptFlushExtraLevel || isBIN)
2566 /// && !isRoot` → `level += 1`) / `Checkpointer.flushDirtyNodes`.
2567 #[test]
2568 fn test_rec_aa_flush_level_is_max_dirty_plus_one() {
2569 use noxu_log::FileManager;
2570 use noxu_tree::tree::{BIN_LEVEL, Tree};
2571 use noxu_util::lsn::Lsn;
2572 use tempfile::TempDir;
2573
2574 let dir = TempDir::new().unwrap();
2575 let fm = Arc::new(
2576 FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
2577 );
2578 let lm =
2579 Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
2580
2581 // Fanout 4 + 20 inserts forces root splits → a 3-level tree
2582 // (root at MAIN_LEVEL|3, upper INs at |2, BINs at BIN_LEVEL=|1), with
2583 // dirty upper INs from the splits.
2584 let tree = Tree::new(1, 4);
2585 for i in 0u32..20 {
2586 let key = format!("key{:04}", i).into_bytes();
2587 let data = format!("data{}", i).into_bytes();
2588 tree.insert(key, data, Lsn::new(1, 100 + i)).unwrap();
2589 }
2590 let root_level = tree.get_root().unwrap().read().level();
2591 let dirty_uppers = tree.collect_dirty_upper_ins(1);
2592 assert!(
2593 !dirty_uppers.is_empty(),
2594 "precondition: the split tree must have dirty upper INs"
2595 );
2596 let max_dirty = dirty_uppers.iter().map(|(l, _)| *l).max().unwrap();
2597 // Levels must be real tree levels, not depths (REC-AA fail-pre would
2598 // have tiny depths here).
2599 assert!(
2600 max_dirty >= (noxu_tree::MAIN_LEVEL | 2),
2601 "upper-IN levels must be real tree levels (>= MAIN_LEVEL|2), got {max_dirty}"
2602 );
2603
2604 let tree_arc = Arc::new(RwLock::new(tree));
2605 let cp = Checkpointer::new(CheckpointConfig::default())
2606 .with_log_manager(Arc::clone(&lm))
2607 .with_tree(Arc::clone(&tree_arc), 1);
2608
2609 // Mark a checkpoint in progress and run the upper-IN flush, which
2610 // populates checkpoint_flush_levels with the REC-AA value.
2611 cp.checkpoint_in_progress.store(true, Ordering::Release);
2612 cp.flush_upper_ins_internal().unwrap();
2613
2614 let recorded = cp
2615 .checkpoint_flush_levels
2616 .lock()
2617 .unwrap()
2618 .get(&1u64)
2619 .copied()
2620 .expect("db 1 must have a recorded flush level");
2621
2622 let expected = (max_dirty + 1).min(root_level);
2623 assert_eq!(
2624 recorded, expected,
2625 "REC-AA: flush level must be max(dirty-upper-IN-level)+1 bounded by root"
2626 );
2627
2628 // A BIN at BIN_LEVEL must be covered (Provisional::Yes): the recorded
2629 // flush level is strictly above it.
2630 assert!(
2631 BIN_LEVEL < recorded,
2632 "BIN_LEVEL ({BIN_LEVEL}) must be < recorded flush level ({recorded})"
2633 );
2634 assert_eq!(
2635 cp.get_eviction_provisional(1, BIN_LEVEL),
2636 Provisional::Yes,
2637 "REC-AA: a BIN below the flush level must be Provisional::Yes"
2638 );
2639
2640 cp.checkpoint_in_progress.store(false, Ordering::Release);
2641 cp.checkpoint_flush_levels.lock().unwrap().clear();
2642 }
2643}