Skip to main content

noxu_recovery/
checkpointer.rs

1//! Checkpoint daemon for Noxu DB.
2//!
3//!
4//! The Checkpointer flushes dirty IN nodes from the tree to the log in
5//! bottom-up order. This bounds recovery time and ensures durability.
6
7use crate::checkpoint_end::CheckpointEnd;
8use crate::checkpoint_start::CheckpointStart;
9use crate::checkpoint_stat::CheckpointStats;
10use crate::dirty_in_map::DirtyINMap;
11use crate::error::{RecoveryError, Result};
12use noxu_cleaner::UtilizationTracker;
13use noxu_log::entry::FileSummaryLnEntry;
14use noxu_log::entry::bin_delta_log_entry::BinDeltaLogEntry;
15use noxu_log::entry::in_log_entry::InLogEntry;
16use noxu_log::{LogEntryType, LogManager, Provisional};
17use noxu_sync::Mutex;
18use noxu_tree::tree::{Tree, TreeNode};
19use noxu_txn::TxnManager;
20use noxu_util::{Lsn, NULL_LSN};
21use parking_lot::RwLock as NodeRwLock;
22use std::collections::HashMap;
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, Condvar, RwLock};
25
26/// Configuration for checkpoint behavior.
27///
28///
29///
30/// Controls when and how checkpoints are performed.
31#[derive(Debug, Clone)]
32pub struct CheckpointConfig {
33    /// Force a checkpoint even if nothing is dirty.
34    pub force: bool,
35    /// Minimize recovery time (checkpoint all dirty nodes).
36    pub minimize_recovery_time: bool,
37    /// Bytes written between checkpoints (0 = time-based only).
38    pub bytes_interval: u64,
39    /// Milliseconds between checkpoints (0 = disabled).
40    pub time_interval: u64,
41    /// BIN-delta percent threshold (JE `TREE_BIN_DELTA` / `BIN_DELTA_PERCENT`,
42    /// 0–75, default 25).  A BIN is logged as a delta only when its delta-slot
43    /// count is `<= nEntries * bin_delta_percent / 100`.  See
44    /// `BinStub::should_log_delta` / JE `DatabaseImpl.getBinDeltaPercent()`.
45    pub bin_delta_percent: i32,
46}
47
48impl CheckpointConfig {
49    /// Create a new checkpoint configuration with default values.
50    pub fn new() -> Self {
51        Self::default()
52    }
53
54    /// Set force flag.
55    pub fn force(mut self, force: bool) -> Self {
56        self.force = force;
57        self
58    }
59
60    /// Set minimize recovery time flag.
61    pub fn minimize_recovery_time(mut self, minimize: bool) -> Self {
62        self.minimize_recovery_time = minimize;
63        self
64    }
65
66    /// Set bytes interval.
67    pub fn bytes_interval(mut self, bytes: u64) -> Self {
68        self.bytes_interval = bytes;
69        self
70    }
71
72    /// Set time interval in milliseconds.
73    pub fn time_interval(mut self, millis: u64) -> Self {
74        self.time_interval = millis;
75        self
76    }
77
78    /// Set the BIN-delta percent threshold (`TREE_BIN_DELTA`, 0–75).
79    pub fn bin_delta_percent(mut self, percent: i32) -> Self {
80        self.bin_delta_percent = percent;
81        self
82    }
83}
84
85impl Default for CheckpointConfig {
86    fn default() -> Self {
87        CheckpointConfig {
88            force: false,
89            minimize_recovery_time: false,
90            bytes_interval: 20_000_000, // 20MB default
91            time_interval: 0, // Time-based checkpoints disabled by default
92            // JE BIN_DELTA_PERCENT default (TREE_BIN_DELTA, 0–75).
93            bin_delta_percent: 25,
94        }
95    }
96}
97
98/// Result of a checkpoint operation.
99///
100/// Contains information about what was flushed during the checkpoint.
101#[derive(Debug, Clone)]
102pub struct CheckpointResult {
103    /// The checkpoint ID.
104    pub checkpoint_id: u64,
105    /// LSN of the CheckpointStart entry.
106    pub start_lsn: Lsn,
107    /// LSN of the CheckpointEnd entry.
108    pub end_lsn: Lsn,
109    /// Number of full INs flushed.
110    pub full_ins_flushed: u64,
111    /// Number of full BINs flushed.
112    pub full_bins_flushed: u64,
113    /// Number of delta INs flushed.
114    pub delta_ins_flushed: u64,
115    /// Time spent on checkpoint in milliseconds.
116    pub elapsed_ms: u64,
117}
118
119impl CheckpointResult {
120    /// Total nodes flushed.
121    pub fn total_nodes_flushed(&self) -> u64 {
122        self.full_ins_flushed + self.full_bins_flushed + self.delta_ins_flushed
123    }
124}
125
126/// The Checkpointer flushes dirty IN nodes to the log.
127///
128///
129///
130/// Checkpoint flushes must be done in ascending order from the bottom
131/// of the tree up. This ensures that recovery can reconstruct the tree
132/// from the checkpoint.
133///
134/// # Checkpoint Algorithm
135///
136/// 1. Generate checkpoint ID
137/// 2. Create and log CheckpointStart
138/// 3. Build dirty IN map (organized by Btree level)
139/// 4. Flush dirty INs level by level (bottom-up)
140///    - Bottom levels logged provisionally
141///    - Top level logged non-provisionally
142/// 5. Create and log CheckpointEnd
143/// 6. Update statistics
144///
145/// This implementation flushes dirty BINs via `flush_dirty_bins_internal()`,
146/// which writes full BIN or BINDelta log entries depending on the dirty-slot
147/// fraction (TREE_BIN_DELTA = 25%). Upper INs (level ≥ 2) are flushed
148/// by `flush_upper_ins_internal()` after the BIN pass, bottom-up, using
149/// `Provisional::Yes` for intermediate levels and `Provisional::No` for
150/// the root. File utilization summaries are persisted via
151/// `persist_file_summaries()` at the end of each checkpoint.
152pub struct Checkpointer {
153    /// Checkpoint statistics
154    stats: Arc<CheckpointStats>,
155    /// Next checkpoint ID
156    next_checkpoint_id: AtomicU64,
157    /// The dirty IN map for the current checkpoint
158    dirty_map: Mutex<DirtyINMap>,
159    /// LSN of the last checkpoint start
160    last_checkpoint_start: Mutex<Lsn>,
161    /// LSN of the last checkpoint end
162    last_checkpoint_end: Mutex<Lsn>,
163    /// Whether a checkpoint is in progress
164    checkpoint_in_progress: AtomicBool,
165    /// Per-database highest IN-level being flushed in the current checkpoint.
166    ///
167    /// Maps `db_id → highest dirty upper-IN level` for every tree that has
168    /// dirty upper INs in this checkpoint pass.  A tree absent from the map
169    /// has no dirty upper INs → its highest flush level is 0 → an evicted BIN
170    /// from that tree gets `Provisional::No` (no covering ancestor will be
171    /// written).  Cleared when the checkpoint finishes or is abandoned.
172    ///
173    /// JE ref: `DirtyINMap.highestFlushLevels` (per-`DatabaseImpl` map) /
174    /// `DirtyINMap.coordinateEvictionWithCheckpoint` / `getHighestFlushLevel`.
175    ///
176    /// CC-4 residual fix: the old single `AtomicI32` held the *global* max
177    /// across all trees, causing a BIN evicted from a tree with **no** dirty
178    /// upper INs to be logged `Provisional::Yes` (covered by a non-provisional
179    /// ancestor that the checkpoint never actually writes for that tree).
180    checkpoint_flush_levels: std::sync::Mutex<HashMap<u64, i32>>,
181    /// Shutdown flag
182    shutdown: AtomicBool,
183    /// Condvar for interruptible daemon sleep — notified by `request_shutdown()`
184    /// so the daemon thread wakes up immediately instead of waiting the full
185    /// sleep interval.
186    shutdown_condvar: Condvar,
187    /// Mutex paired with `shutdown_condvar`.
188    shutdown_mutex: std::sync::Mutex<bool>,
189    /// Configuration
190    config: CheckpointConfig,
191    /// Optional LogManager for writing CkptStart/CkptEnd WAL entries.
192    log_manager: Option<Arc<LogManager>>,
193    /// Optional Tree reference for flushing dirty BINs in step 4.
194    ///
195    /// When `None` (unit tests without a real tree) step 4 is a no-op.
196    tree: Option<Arc<RwLock<Tree>>>,
197    /// Database ID to pass to `Tree::collect_dirty_bins()`.
198    db_id: u64,
199    /// Registry of ALL open user-database trees (Stage-1 fix).
200    ///
201    /// Maps `db_id as i64` → `Arc<RwLock<Tree>>` for every database the
202    /// environment has opened.  The checkpointer must flush dirty BINs from
203    /// EVERY tree, not just the primary one, so that committed LNs written to
204    /// user databases are captured in a BIN entry before `CkptEnd` is written.
205    /// JE walks a single env-wide `INList` that covers all databases;
206    /// Noxu achieves the same effect by iterating this registry.
207    ///
208    /// `None` until `with_db_trees_registry` is called (unit tests without a
209    /// full environment).
210    db_trees_registry:
211        Option<Arc<std::sync::Mutex<HashMap<i64, Arc<RwLock<Tree>>>>>>,
212    /// Bytes written to the log since the last checkpoint.
213    ///
214    /// Incremented by `wakeup_after_write()`. When this exceeds
215    /// `checkpoint_bytes_interval` a checkpoint is triggered immediately.
216    ///
217    /// Write-byte accumulation.
218    bytes_since_checkpoint: AtomicU64,
219    /// Bytes-written threshold that triggers an immediate checkpoint.
220    ///
221    /// Default: 10 MiB (10 * 1024 * 1024).  Set to 0 to disable.
222    ///
223    /// REC-D: wired from `CHECKPOINTER_BYTES_INTERVAL` (default 20 MB) by the
224    /// environment via `with_bytes_interval`. JE Checkpointer ctor:
225    /// `logSizeBytesInterval = configManager.getLong(CHECKPOINTER_BYTES_INTERVAL)`.
226    checkpoint_bytes_interval: u64,
227    /// Time-based checkpoint interval in milliseconds (0 = time-based
228    /// checkpoints disabled, bytes-only).
229    ///
230    /// REC-D: wired from `CHECKPOINTER_WAKEUP_INTERVAL` by the environment via
231    /// `with_time_interval`. JE `getWakeupPeriod`: bytes-OR-time, with the
232    /// byte interval taking precedence when non-zero. The daemon paces its
233    /// own sleep at this interval; `is_runnable` consults it only when the
234    /// byte interval is disabled (matches JE `isRunnable` useTimeInterval
235    /// branch, which fires only when `logSizeBytesInterval == 0`).
236    checkpoint_time_interval_ms: u64,
237    /// Optional utilization tracker for persisting file summaries.
238    ///
239    /// When set, `persist_file_summaries()` iterates tracked summaries and
240    /// writes `FileSummaryLN` WAL entries.
241    utilization_tracker: Option<Arc<Mutex<UtilizationTracker>>>,
242    /// Optional cleaner reference for the post-checkpoint callback.
243    ///
244    /// After each successful `do_checkpoint`, the checkpointer calls
245    /// `cleaner.after_checkpoint(&state)` to advance the three-state
246    /// checkpoint barrier in `FileSelector`.  X-5 fix.
247    cleaner: Option<Arc<noxu_cleaner::Cleaner>>,
248    /// Optional transaction manager for T-F3/T-F4: first-active-LSN tracking.
249    ///
250    /// When `Some`, `do_checkpoint` queries `txn_manager.get_first_active_lsn()`
251    /// and writes the result into `CkptEnd.first_active_lsn` instead of the
252    /// conservative `Lsn::new(0,0)` full-scan sentinel.  This bounds the
253    /// recovery scan to entries at or after the earliest active transaction's
254    /// first logged LSN, reducing crash-recovery time.
255    ///
256    /// Safe only after Stage 1 (all user-database BINs are checkpointed);
257    /// `None` for unit tests without a full environment.
258    txn_manager: Option<Arc<TxnManager>>,
259    /// REC-S: id sources read at checkpoint time to write the real last
260    /// node/db/txn id values into `CheckpointEnd` (instead of zeros).
261    ///
262    /// JE `Checkpointer.doCheckpoint` writes `getLastLocalNodeId` /
263    /// `getLastLocalDbId` / `getLastLocalTxnId` into the `CheckpointEnd`.
264    /// `next_db_id` mirrors the env's db-id counter (last db-id = value-1);
265    /// the last txn-id is read from `txn_manager.get_last_local_txn_id()`;
266    /// the last node-id comes from the single tree-wide node counter
267    /// (`noxu_tree::peek_next_node_id_counter`, L-30).  `None` keeps the old
268    /// zero behaviour for unit tests without a full environment.
269    next_db_id: Option<Arc<std::sync::atomic::AtomicI64>>,
270}
271
272impl Checkpointer {
273    /// Create a new Checkpointer.
274    ///
275    /// # Arguments
276    /// * `config` - Checkpoint configuration
277    pub fn new(config: CheckpointConfig) -> Self {
278        Self {
279            stats: Arc::new(CheckpointStats::new()),
280            next_checkpoint_id: AtomicU64::new(1),
281            dirty_map: Mutex::new(DirtyINMap::new()),
282            last_checkpoint_start: Mutex::new(noxu_util::NULL_LSN),
283            last_checkpoint_end: Mutex::new(noxu_util::NULL_LSN),
284            checkpoint_in_progress: AtomicBool::new(false),
285            checkpoint_flush_levels: std::sync::Mutex::new(HashMap::new()),
286            shutdown: AtomicBool::new(false),
287            shutdown_condvar: Condvar::new(),
288            shutdown_mutex: std::sync::Mutex::new(false),
289            config,
290            log_manager: None,
291            tree: None,
292            db_id: 0,
293            db_trees_registry: None,
294            bytes_since_checkpoint: AtomicU64::new(0),
295            checkpoint_bytes_interval: 10 * 1024 * 1024, // 10 MiB default
296            checkpoint_time_interval_ms: 0, // time-based disabled by default
297            utilization_tracker: None,
298            cleaner: None,
299            txn_manager: None,
300            next_db_id: None,
301        }
302    }
303
304    /// Set the bytes-written threshold that triggers an immediate checkpoint.
305    ///
306    ///
307    pub fn with_bytes_interval(mut self, bytes: u64) -> Self {
308        self.checkpoint_bytes_interval = bytes;
309        self
310    }
311
312    /// Set the time-based checkpoint interval (milliseconds).
313    ///
314    /// REC-D: wired from `CHECKPOINTER_WAKEUP_INTERVAL`. JE `getWakeupPeriod`
315    /// computes bytes-OR-time with bytes taking precedence; `isRunnable`
316    /// consults the time interval only when the byte interval is 0.
317    pub fn with_time_interval(mut self, millis: u64) -> Self {
318        self.checkpoint_time_interval_ms = millis;
319        self
320    }
321
322    /// Attach a LogManager so that `do_checkpoint` writes real WAL entries.
323    ///
324    /// Call this before invoking `do_checkpoint` when a writable log is
325    /// available (i.e. from `EnvironmentImpl`).
326    pub fn with_log_manager(mut self, lm: Arc<LogManager>) -> Self {
327        self.log_manager = Some(lm);
328        self
329    }
330
331    /// Attach a Tree so that `do_checkpoint` flushes dirty BINs in step 4.
332    ///
333    /// `db_id` is the database ID passed to `Tree::collect_dirty_bins()`.
334    /// `Checkpointer` receiving the environment's tree reference.
335    pub fn with_tree(mut self, tree: Arc<RwLock<Tree>>, db_id: u64) -> Self {
336        self.tree = Some(tree);
337        self.db_id = db_id;
338        self
339    }
340
341    /// Wire the env-wide db-tree registry so the checkpointer flushes ALL
342    /// user-database dirty BINs, not just the primary tree.
343    ///
344    /// This is the Stage-1 fix: JE's `Checkpointer.processINList` walks a
345    /// single env-wide `INList` covering all databases.  Noxu achieves the
346    /// same effect by iterating `db_trees_registry` and flushing each tree.
347    pub fn with_db_trees_registry(
348        mut self,
349        registry: Arc<std::sync::Mutex<HashMap<i64, Arc<RwLock<Tree>>>>>,
350    ) -> Self {
351        self.db_trees_registry = Some(registry);
352        self
353    }
354
355    /// Attach a UtilizationTracker so that `persist_file_summaries()` writes
356    /// real `FileSummaryLN` WAL entries during each checkpoint.
357    ///
358    /// `Checkpointer` receiving the environment's utilization tracker.
359    pub fn with_utilization_tracker(
360        mut self,
361        tracker: Arc<Mutex<UtilizationTracker>>,
362    ) -> Self {
363        self.utilization_tracker = Some(tracker);
364        self
365    }
366
367    /// Wire a cleaner so that `do_checkpoint` calls
368    /// `cleaner.after_checkpoint()` after a successful checkpoint.
369    ///
370    /// This is the X-5 fix: it activates the three-state checkpoint barrier
371    /// (`cleaned → checkpointed → safe_to_delete`) in `FileSelector` so that
372    /// log files are only deleted after their migrations have been captured by
373    /// two successive checkpoints.
374    pub fn with_cleaner(mut self, cleaner: Arc<noxu_cleaner::Cleaner>) -> Self {
375        self.cleaner = Some(cleaner);
376        self
377    }
378
379    /// Wire the transaction manager so `do_checkpoint` can compute the real
380    /// `first_active_lsn` for `CkptEnd` (T-F3/T-F4).
381    ///
382    /// Safe to call only after Stage 1 (user-database BINs are checkpointed);
383    /// before Stage 1 a non-zero `first_active_lsn` would cause recovery to
384    /// skip committed LNs not captured in any BIN.
385    pub fn with_txn_manager(mut self, txn_manager: Arc<TxnManager>) -> Self {
386        self.txn_manager = Some(txn_manager);
387        self
388    }
389
390    /// REC-S: wire the env's db-id counter so `do_checkpoint` writes the real
391    /// last node/db/txn id values into `CheckpointEnd` instead of zeros.
392    ///
393    /// `next_db_id` is the env's `AtomicI64` (last allocated db-id =
394    /// `next_db_id - 1`).  The last txn-id is read from the wired
395    /// `txn_manager`; the last node-id from the tree-wide node counter.
396    ///
397    /// JE `Checkpointer.doCheckpoint` reads `envImpl.getNodeSequence()
398    /// .getLastLocalNodeId()`, `getDbTree().getLastLocalDbId()`, and
399    /// `getTxnManager().getLastLocalTxnId()` into the `CheckpointEnd`.
400    pub fn with_id_sources(
401        mut self,
402        next_db_id: Arc<std::sync::atomic::AtomicI64>,
403    ) -> Self {
404        self.next_db_id = Some(next_db_id);
405        self
406    }
407
408    /// Accumulate bytes written and trigger a checkpoint when the threshold
409    /// is exceeded.
410    ///
411    /// Called after each WAL write from `EnvironmentImpl` (or LogManager) with
412    /// the number of bytes appended.  When the running total exceeds
413    /// `checkpoint_bytes_interval` the counter is reset and
414    /// `do_checkpoint("wakeup")` is invoked synchronously.
415    ///
416    ///
417    pub fn wakeup_after_write(&self, bytes: u64) {
418        if self.checkpoint_bytes_interval == 0 {
419            return;
420        }
421        let prev =
422            self.bytes_since_checkpoint.fetch_add(bytes, Ordering::Relaxed);
423        if prev + bytes >= self.checkpoint_bytes_interval {
424            // Reset counter *before* triggering so parallel callers don't
425            // all pile in at once — best-effort, not strictly once.
426            self.bytes_since_checkpoint.store(0, Ordering::Relaxed);
427            // Ignore errors: a concurrent checkpoint may be in progress.
428            let _ = self.do_checkpoint("wakeup_after_write");
429        }
430    }
431
432    /// Whether a periodic (daemon) checkpoint should run now (JE
433    /// `Checkpointer.isRunnable`). Without this gate the daemon wrote a
434    /// checkpoint on every wakeup tick even on a fully idle environment
435    /// (wasted I/O). Returns true if:
436    ///   - `force`, OR
437    ///   - REC-F: the cleaner has files pending reclaim
438    ///     (`needCheckpointForCleanedFiles()` → `isCheckpointNeeded()`), even
439    ///     with no writes — so an idle env still reclaims cleaned files, OR
440    ///   - bytes written since the last checkpoint >= the byte interval, OR
441    ///   - (only when the byte interval is disabled) the time interval elapsed
442    ///     AND something was written since the last checkpoint
443    ///     (`bytes_since_checkpoint > 0` — JE's `lastUsedLsn !=
444    ///     lastCheckpointEnd` idle-guard).
445    ///
446    /// JE ref: `Checkpointer.isRunnable` — order is force, then
447    /// `wakeupAfterNoWrites && needCheckpointForCleanedFiles()`, then the
448    /// bytes-OR-time interval (bytes takes precedence; the time branch only
449    /// runs when `logSizeBytesInterval == 0`).
450    pub fn is_runnable(&self, force: bool) -> bool {
451        if force {
452            return true;
453        }
454        // REC-F: wake for cleaner-pending files even on an idle environment.
455        // JE `isRunnable`: `if (wakeupAfterNoWrites && needCheckpointForCleanedFiles())
456        // return true;`.  Noxu folds `wakeupAfterNoWrites` into the cleaner
457        // query directly — `needs_checkpoint_for_cleaned_files()` is true iff
458        // the cleaner reports CLEANED/FULLY_PROCESSED files pending reclaim.
459        if self.needs_checkpoint_for_cleaned_files() {
460            return true;
461        }
462        let bytes_since = self.bytes_since_checkpoint.load(Ordering::Relaxed);
463        if self.checkpoint_bytes_interval != 0 {
464            // Bytes interval takes precedence (JE getWakeupPeriod): when it is
465            // non-zero the time branch is never consulted.
466            return bytes_since >= self.checkpoint_bytes_interval;
467        }
468        // Time-cadence branch (only reached when the byte interval is 0): the
469        // caller (the daemon) only invokes this once per wakeup interval, so
470        // reaching here means the time interval has elapsed. JE's idle-guard
471        // (`lastUsedLsn != lastCheckpointEnd`) maps to "something was written
472        // since the last checkpoint" — i.e. bytes_since > 0. Skip the
473        // checkpoint entirely on an idle environment.
474        bytes_since > 0
475    }
476
477    /// REC-F: whether the cleaner has files pending reclaim that a checkpoint
478    /// would unblock.  Mirrors JE `Checkpointer.needCheckpointForCleanedFiles`
479    /// → `cleaner.getFileSelector().isCheckpointNeeded()` (any CLEANED or
480    /// FULLY_PROCESSED files exist).  Returns `false` when no cleaner is
481    /// wired.
482    fn needs_checkpoint_for_cleaned_files(&self) -> bool {
483        self.cleaner.as_ref().map(|c| c.is_checkpoint_needed()).unwrap_or(false)
484    }
485
486    /// Test-only: bump `bytes_since_checkpoint` without triggering a
487    /// checkpoint (wakeup_after_write would fire do_checkpoint at the
488    /// threshold). Used to exercise `is_runnable`.
489    #[cfg(test)]
490    pub fn note_bytes_for_test(&self, bytes: u64) {
491        self.bytes_since_checkpoint.fetch_add(bytes, Ordering::Relaxed);
492    }
493
494    /// Returns `true` if the given BIN node has been checkpointed at least
495    /// once (its `last_full_lsn` is not NULL_LSN).
496    ///
497    /// The evictor calls this before evicting a node: a node that has never
498    /// been checkpointed would be lost on eviction because it has no on-disk
499    /// representation yet.
500    ///
501    ///
502    pub fn is_checkpointed(node: &NodeRwLock<TreeNode>) -> bool {
503        let guard = node.read();
504        match &*guard {
505            TreeNode::Bottom(b) => b.last_full_lsn != NULL_LSN,
506            // Non-BIN internal nodes are always considered checkpointed for
507            // eviction purposes (they are reconstructed from their children).
508            _ => true,
509        }
510    }
511
512    /// Persist file utilization summaries to the WAL.
513    ///
514    /// Writes a `FileSummaryLN` log entry for each tracked file summary so
515    /// that utilization data survives a restart.
516    ///
517    ///
518    ///
519    /// Requires both a `LogManager` (via `with_log_manager`) and a
520    /// `UtilizationTracker` (via `with_utilization_tracker`) to be wired.
521    /// Returns `Ok(())` without writing if either is absent.
522    pub fn persist_file_summaries(&self) -> Result<()> {
523        let (Some(lm), Some(tracker_lock)) =
524            (&self.log_manager, &self.utilization_tracker)
525        else {
526            return Ok(());
527        };
528
529        // Snapshot the tracked summaries into owned values, then DROP the
530        // tracker lock BEFORE writing any FileSummaryLN.  This avoids a
531        // reentrant deadlock: `lm.log()` calls the installed write observer
532        // (the same UtilizationTracker, behind the same Mutex) to
533        // countNewLogEntry for the FileSummaryLN it just wrote.  JE
534        // (UtilizationProfile.putFileSummary) likewise reads the
535        // TrackedFileSummary out and then logs the FileSummaryLN without
536        // holding the tracker latch across the log write.
537        //
538        // noxu_sync::Mutex::lock() returns the guard directly (no poison).
539        let snapshot: Vec<(u32, noxu_cleaner::FileSummary, Vec<u32>)> = {
540            let tracker = tracker_lock.lock();
541            let tracked_files = tracker.get_tracked_files();
542            if tracked_files.is_empty() {
543                return Ok(());
544            }
545            tracked_files
546                .iter()
547                .map(|(file_number, tracked)| {
548                    (
549                        *file_number,
550                        tracked.get_summary().clone(),
551                        tracked.get_obsolete_offsets().to_vec(),
552                    )
553                })
554                .collect()
555        };
556
557        for (file_number, summary, offsets) in &snapshot {
558            // C7: persist the full FileSummary breakdown (LN/IN totals +
559            // obsolete + maxLNSize) AND the packed obsolete-offset list, so
560            // the on-disk FileSummaryLN is as faithful as the in-memory
561            // TrackedFileSummary.  JE: FileSummaryLN.writeToLog ->
562            // baseSummary.writeToLog (11 ints) + obsoleteOffsets.writeToLog.
563            let mut packed = noxu_cleaner::PackedOffsets::new();
564            packed.pack(offsets);
565            // CLN-24: attach the serialized per-file expiration histogram so
566            // the cleaner's TTL expiration prediction survives restart.  JE
567            // persists this in a separate EXPIRATION DB (FileExpirationLN);
568            // Noxu folds it into the FileSummaryLN trailer.  Built from the
569            // file's LN entries via the wired cleaner; empty when no cleaner
570            // is wired or the file has no expiring data.
571            let expiration_histogram = self
572                .cleaner
573                .as_ref()
574                .map(|c| c.serialize_expiration_histogram(*file_number))
575                .unwrap_or_default();
576            let entry = FileSummaryLnEntry::new(
577                *file_number as u64,
578                summary.total_count,
579                summary.total_size,
580                summary.total_in_count,
581                summary.total_in_size,
582                summary.total_ln_count,
583                summary.total_ln_size,
584                summary.max_ln_size,
585                summary.obsolete_in_count,
586                summary.obsolete_ln_count,
587                summary.obsolete_ln_size,
588                summary.obsolete_ln_size_counted,
589                packed.get_count() as u32,
590                packed.get_data().to_vec(),
591                expiration_histogram,
592            );
593            let mut buf = bytes::BytesMut::with_capacity(entry.log_size());
594            entry.write_to_log(&mut buf);
595            lm.log(
596                LogEntryType::FileSummaryLN,
597                &buf,
598                Provisional::No,
599                false,
600                false,
601            )
602            .map_err(|e| {
603                RecoveryError::CheckpointError(format!(
604                    "persist_file_summaries log write failed: {e}"
605                ))
606            })?;
607            log::debug!(
608                "persist_file_summaries: wrote FileSummaryLN for file {}",
609                file_number
610            );
611        }
612        Ok(())
613    }
614
615    /// Perform a checkpoint.
616    pub fn do_checkpoint(&self, invoker: &str) -> Result<CheckpointResult> {
617        // Check if shutdown
618        if self.shutdown.load(Ordering::Acquire) {
619            return Err(RecoveryError::CheckpointError(
620                "Checkpointer has been shut down".to_string(),
621            ));
622        }
623
624        // Check if already in progress
625        if self
626            .checkpoint_in_progress
627            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
628            .is_err()
629        {
630            return Err(RecoveryError::CheckpointError(
631                "Checkpoint already in progress".to_string(),
632            ));
633        }
634
635        let start_time = std::time::Instant::now();
636
637        // Ensure we clear the in-progress flag (and flush_levels map) on exit.
638        let _guard = CheckpointGuard {
639            flag: &self.checkpoint_in_progress,
640            flush_levels: &self.checkpoint_flush_levels,
641        };
642
643        // Step 1: Generate checkpoint ID
644        let checkpoint_id =
645            self.next_checkpoint_id.fetch_add(1, Ordering::SeqCst);
646
647        // X-5: snapshot the cleaner's "cleaned" file set at checkpoint START
648        // (before we write CkptStart) so we know which files were in the
649        // cleaned state when this checkpoint began.  Passed to
650        // `after_checkpoint` at the end of this function.
651        let cleaner_state =
652            self.cleaner.as_ref().map(|c| c.get_checkpoint_start_state());
653
654        // Step 2: Write CkptStart entry to WAL (or synthesise a fake LSN when
655        // no LogManager is wired — used by unit tests that don't need I/O).
656        let start_lsn = if let Some(lm) = &self.log_manager {
657            let ckpt_start = CheckpointStart::new(checkpoint_id, invoker);
658            let mut buf = Vec::with_capacity(ckpt_start.log_size());
659            ckpt_start.write_to_log(&mut buf).map_err(|e| {
660                RecoveryError::CheckpointError(format!(
661                    "CkptStart serialization failed: {e}"
662                ))
663            })?;
664            lm.log(
665                LogEntryType::CkptStart,
666                &buf,
667                Provisional::No,
668                false, // flush_required
669                false, // fsync_required
670            )
671            .map_err(|e| {
672                RecoveryError::CheckpointError(format!(
673                    "CkptStart WAL write failed: {e}"
674                ))
675            })?
676        } else {
677            // No LogManager attached — synthetic LSN so existing tests pass.
678            Lsn::new(0, checkpoint_id as u32)
679        };
680
681        // Step 3: Build dirty IN map
682        let mut dirty_map = self.dirty_map.lock();
683        dirty_map.clear();
684        drop(dirty_map);
685
686        // Step 4a: Flush dirty BINs.
687        //
688        // For each dirty BIN in the tree decide — using TREE_BIN_DELTA
689        // threshold of 25 % — whether to write a BINDelta or a full BIN.
690        //
691        // `Checkpointer.processINList()` + `logIN()` (BIN path).
692        let mut flush_result = self.flush_dirty_bins_internal()?;
693
694        // Step 4b: Flush dirty upper INs (level ≥ 2) bottom-up.
695        //
696        // After BINs are written their parent INs are dirtied by splits.
697        // These must be logged before CkptEnd to make the checkpoint complete.
698        // Intermediate levels use Provisional::Yes (subsumed by root);
699        // the root level uses Provisional::No (anchors the checkpoint).
700        //
701        // `Checkpointer.processINList()` upper-IN loop +
702        // `Checkpointer.logIN()` for non-BIN nodes.
703        let upper_result = self.flush_upper_ins_internal()?;
704        flush_result.full_ins_flushed += upper_result.full_ins_flushed;
705
706        // Step 5: Write CkptEnd entry to WAL.
707        //
708        // T-F3 is NOT yet active: first_active_lsn stays Lsn::new(0,0) (full
709        // scan from start of log).  Setting a non-zero first_active_lsn would
710        // bound the recovery scan — but that requires pre-loading BINs from
711        // the checkpoint into the recovery tree before replaying LNs (P-2
712        // BIN-preload infrastructure).  Without P-2, starting from any LSN
713        // other than 0 silently drops pre-checkpoint committed LNs.
714        //
715        // Stage 2 wires T-F4 (update_first_lsn is called on first txn write,
716        // get_first_active_lsn() now returns a real LSN), but the consumer
717        // (T-F3 scan bounding) is deferred until P-2 lands.
718        //
719        // Backward compat: Lsn::new(0,0) tells recovery to full-scan from
720        // the start, which is correct and was always the behaviour.
721        let first_active_lsn: noxu_util::Lsn = noxu_util::Lsn::new(0, 0);
722        // (T-F4: txn_manager is wired; get_first_active_lsn() returns real
723        // LSN for future P-2 use; suppress unused warning.)
724        let _ = &self.txn_manager;
725
726        // REC-S: read the env's current last node/db/txn ids and write the
727        // REAL values into CheckpointEnd (instead of the old hardcoded zeros)
728        // so recovery folds them into use_max_* and the env seeds its
729        // sequences past them on restart.  JE Checkpointer.doCheckpoint writes
730        // getLastLocalNodeId / getLastLocalDbId / getLastLocalTxnId.
731        //   - last node-id: the tree-wide node counter (L-30); the next id to
732        //     be handed out is `peek_next_node_id_counter()`, so the last
733        //     allocated id is that minus 1 (saturating).
734        //   - last db-id: the env's next_db_id minus 1.
735        //   - last txn-id: txn_manager.get_last_local_txn_id().
736        let last_local_node_id: u64 =
737            noxu_tree::tree::peek_next_node_id_counter().saturating_sub(1);
738        let last_local_db_id: u64 = self
739            .next_db_id
740            .as_ref()
741            .map(|n| {
742                n.load(std::sync::atomic::Ordering::Relaxed).saturating_sub(1)
743                    as u64
744            })
745            .unwrap_or(0);
746        let last_local_txn_id: u64 = self
747            .txn_manager
748            .as_ref()
749            .map(|t| t.get_last_local_txn_id().max(0) as u64)
750            .unwrap_or(0);
751
752        let end_lsn = if let Some(lm) = &self.log_manager {
753            let ckpt_end = CheckpointEnd::new(
754                checkpoint_id,
755                invoker,
756                start_lsn,
757                // REC-P / REC-B: root_lsn is intentionally always None.  JE
758                // records the mapping-tree root here (Checkpointer.flushRoot
759                // → CheckpointEnd.rootLsn), but Noxu's catalog is an in-memory
760                // HashMap rebuilt from NameLN WAL entries during recovery
761                // (REC-B authorized divergence), so there is no mapping tree
762                // to flush and no root LSN to record.  Per-DB utilization is
763                // persisted via persist_file_summaries (FileSummaryLN), not a
764                // mapping-tree MapLN flush.
765                None, // root_lsn
766                first_active_lsn,
767                // REC-S: real id maxima (were hardcoded 0).
768                last_local_node_id,
769                0, // last_replicated_node_id (HA: deferred)
770                last_local_db_id,
771                0, // last_replicated_db_id (HA: deferred)
772                last_local_txn_id,
773                0,     // last_replicated_txn_id (HA: deferred)
774                false, // cleaned_files_to_delete
775            );
776            let mut buf = Vec::with_capacity(ckpt_end.log_size());
777            ckpt_end.write_to_log(&mut buf).map_err(|e| {
778                RecoveryError::CheckpointError(format!(
779                    "CkptEnd serialization failed: {e}"
780                ))
781            })?;
782            lm.log(
783                LogEntryType::CkptEnd,
784                &buf,
785                Provisional::No,
786                true, // flush_required
787                // REC-F1: fsync the CkptEnd entry before returning.  JE
788                // Checkpointer.doCheckpoint (~line 895):
789                //   lastCheckpointEnd = logManager.logForceFlush(
790                //       endEntry, true /*fsyncRequired*/, ...);
791                // "We must flush and fsync to ensure that cleaned files are
792                // not referenced. This also ensures that this checkpoint is
793                // not wasted if we crash."  The fsync MUST precede the
794                // cleaner.after_checkpoint() barrier advance below (and JE
795                // fsyncs inside doCheckpoint before
796                // updateFilesAtCheckpointEnd), so ALL callers — close,
797                // daemon, and bytes-triggered wakeup_after_write — get a
798                // durable CkptEnd, not just close.
799                true, // fsync_required
800            )
801            .map_err(|e| {
802                RecoveryError::CheckpointError(format!(
803                    "CkptEnd WAL write failed: {e}"
804                ))
805            })?
806        } else {
807            // No LogManager attached — synthetic LSN so existing tests pass.
808            Lsn::new(0, (checkpoint_id as u32) + 1)
809        };
810
811        // Step 6: Update statistics
812        *self.last_checkpoint_start.lock() = start_lsn;
813        *self.last_checkpoint_end.lock() = end_lsn;
814        // Reset the runnable-gate state: bytes written since this checkpoint.
815        self.bytes_since_checkpoint.store(0, Ordering::Relaxed);
816
817        let elapsed_ms = start_time.elapsed().as_millis() as u64;
818
819        self.stats.checkpoints.fetch_add(1, Ordering::Relaxed);
820        self.stats
821            .full_in_flush
822            .fetch_add(flush_result.full_ins_flushed, Ordering::Relaxed);
823        self.stats
824            .full_bin_flush
825            .fetch_add(flush_result.full_bins_flushed, Ordering::Relaxed);
826        self.stats
827            .delta_in_flush
828            .fetch_add(flush_result.delta_ins_flushed, Ordering::Relaxed);
829        self.stats.last_ckpt_id.store(checkpoint_id, Ordering::Relaxed);
830        self.stats.last_ckpt_start.store(start_lsn.as_u64(), Ordering::Relaxed);
831        self.stats.last_ckpt_end.store(end_lsn.as_u64(), Ordering::Relaxed);
832        self.stats.last_ckpt_interval.store(elapsed_ms, Ordering::Relaxed);
833
834        // X-5: advance the cleaner's three-state checkpoint barrier now that
835        // a checkpoint has successfully completed.  Cleaned files that were
836        // snapshotted at checkpoint-start (`cleaner_state`) move to
837        // `checkpointed`; previously-checkpointed files move to
838        // `safe_to_delete` and will be removed on the next `delete_safe_files`
839        // call.
840        if let (Some(cleaner), Some(state)) = (&self.cleaner, cleaner_state) {
841            cleaner.after_checkpoint(&state);
842        }
843
844        Ok(CheckpointResult {
845            checkpoint_id,
846            start_lsn,
847            end_lsn,
848            full_ins_flushed: flush_result.full_ins_flushed,
849            full_bins_flushed: flush_result.full_bins_flushed,
850            delta_ins_flushed: flush_result.delta_ins_flushed,
851            elapsed_ms,
852        })
853    }
854
855    /// Get the LSN of the last checkpoint start.
856    pub fn get_last_checkpoint_start(&self) -> Lsn {
857        *self.last_checkpoint_start.lock()
858    }
859
860    /// Get the LSN of the last checkpoint end.
861    pub fn get_last_checkpoint_end(&self) -> Lsn {
862        *self.last_checkpoint_end.lock()
863    }
864
865    /// Check if a checkpoint is currently in progress.
866    pub fn is_checkpoint_in_progress(&self) -> bool {
867        self.checkpoint_in_progress.load(Ordering::Acquire)
868    }
869
870    /// Choose the [`Provisional`] flag for a node being evicted by the evictor.
871    ///
872    /// Returns `Provisional::Yes` when a checkpoint is in progress **and** the
873    /// node's level is strictly below the **tree-specific** highest flush level
874    /// for `db_id` (meaning the checkpoint will write a non-provisional ancestor
875    /// for that tree that subsumes this entry).  Returns `Provisional::No` if
876    /// no checkpoint is in progress, or if `db_id` has no dirty upper INs in
877    /// this checkpoint (level absent from map → 0 → not covered).
878    ///
879    /// # JE reference
880    /// `Checkpointer.coordinateEvictionWithCheckpoint` →
881    /// `DirtyINMap.coordinateEvictionWithCheckpoint` which calls
882    /// `getHighestFlushLevel(db)` — **per-`DatabaseImpl`** lookup.  If the db
883    /// is absent from `highestFlushLevels`, `getHighestFlushLevel` returns
884    /// `IN.MIN_LEVEL` (≤ 0) making the comparison false → `Provisional::NO`.
885    ///
886    /// # CC-4 residual
887    /// The prior implementation stored a single global max-level (`AtomicI32`)
888    /// that was the maximum across ALL trees.  A BIN evicted from tree A (no
889    /// dirty upper INs) got `Provisional::Yes` because tree B's level was
890    /// non-zero, but NO non-provisional ancestor was written for tree A →
891    /// recovery discards the provisional BIN → data loss on crash before the
892    /// next checkpoint.  Per-tree lookup (this method) fixes that: tree A's
893    /// level is absent → 0 → `Provisional::No` (authoritative log entry).
894    ///
895    /// # Race window
896    /// Same benign race as JE: if the checkpoint finishes between the
897    /// `in_progress` read and the log write, the BIN may be logged
898    /// `Provisional::Yes` without a covering ancestor in *this* checkpoint, but
899    /// the next checkpoint will cover it.  Logging `Yes` without strict need is
900    /// safe (log bloat only); the reverse is what causes recovery inconsistency.
901    pub fn get_eviction_provisional(
902        &self,
903        db_id: u64,
904        node_level: i32,
905    ) -> Provisional {
906        if !self.checkpoint_in_progress.load(Ordering::Acquire) {
907            return Provisional::No;
908        }
909        // Look up this tree's flush level.  Missing entry means no dirty upper
910        // INs → level 0 → condition false → Provisional::No.
911        let max_flush = self
912            .checkpoint_flush_levels
913            .lock()
914            .unwrap_or_else(|e| e.into_inner())
915            .get(&db_id)
916            .copied()
917            .unwrap_or(0);
918        if max_flush > 0 && node_level < max_flush {
919            Provisional::Yes
920        } else {
921            Provisional::No
922        }
923    }
924
925    pub fn get_stats(&self) -> Arc<CheckpointStats> {
926        Arc::clone(&self.stats)
927    }
928
929    /// Get the configuration.
930    pub fn get_config(&self) -> &CheckpointConfig {
931        &self.config
932    }
933
934    /// Request shutdown of the checkpointer.
935    ///
936    /// Sets the shutdown flag AND wakes up the daemon thread so it exits
937    /// immediately without waiting the full sleep interval.
938    pub fn request_shutdown(&self) {
939        self.shutdown.store(true, Ordering::Release);
940        // Wake up any thread sleeping in wait_for_shutdown_or_timeout().
941        if let Ok(mut guard) = self.shutdown_mutex.lock() {
942            *guard = true;
943        }
944        self.shutdown_condvar.notify_all();
945    }
946
947    /// Check if shutdown has been requested.
948    pub fn is_shutdown(&self) -> bool {
949        self.shutdown.load(Ordering::Acquire)
950    }
951
952    /// CLN-14: wake the checkpointer daemon promptly after a cleaning pass so
953    /// cleaned files are deleted without waiting the full wakeup interval
954    /// (default 60 s).
955    ///
956    /// The cleaner registers this via `Cleaner::set_checkpoint_wakeup_fn`; it
957    /// is invoked at the end of a successful `do_clean`.  It notifies the
958    /// daemon's sleep condvar WITHOUT setting the shutdown flag, so the daemon
959    /// thread returns early from `wait_for_shutdown_or_timeout` and re-checks
960    /// `is_runnable(false)` — which returns `true` because
961    /// `needs_checkpoint_for_cleaned_files()` now reports the just-cleaned
962    /// files pending reclaim.  The result is a prompt checkpoint that runs
963    /// `after_checkpoint()` and lets `delete_safe_files` remove the files.
964    ///
965    /// JE: `FileProcessor.doClean` calls
966    /// `envImpl.getCheckpointer().wakeupAfterNoWrites()` (Cleaner/FileProcessor),
967    /// which sets `wakeupAfterNoWrites = true` and wakes the checkpointer;
968    /// `Checkpointer.isRunnable` then returns true via
969    /// `needCheckpointForCleanedFiles()`.  Noxu folds the flag into the
970    /// cleaner query (`is_runnable`), so this method only has to wake the
971    /// sleeping daemon.
972    pub fn wakeup_after_no_writes(&self) {
973        // Notify the sleep condvar without touching the shutdown flag: the
974        // daemon wakes, sees is_shutdown() == false, and re-evaluates
975        // is_runnable(false).
976        if self.shutdown_mutex.lock().is_ok() {
977            self.shutdown_condvar.notify_all();
978        }
979    }
980
981    /// Sleep for `duration` or until `request_shutdown()` is called.
982    ///
983    /// Used by the daemon thread in `EnvironmentImpl` instead of
984    /// `thread::sleep()` so that shutdown is immediate.
985    pub fn wait_for_shutdown_or_timeout(&self, duration: std::time::Duration) {
986        if let Ok(guard) = self.shutdown_mutex.lock() {
987            // wait_timeout returns immediately when the condvar is notified.
988            let _ = self.shutdown_condvar.wait_timeout(guard, duration);
989        }
990    }
991
992    /// Get the next checkpoint ID (without incrementing).
993    pub fn peek_next_checkpoint_id(&self) -> u64 {
994        self.next_checkpoint_id.load(Ordering::SeqCst)
995    }
996
997    /// REC-G: seed the checkpoint-interval baselines from a recovered
998    /// checkpoint, so the FIRST post-recovery checkpoint interval is measured
999    /// from the recovered `CkptEnd` rather than from process start.
1000    ///
1001    /// Without this, `last_checkpoint_start`/`_end` start at `NULL_LSN` and
1002    /// `bytes_since_checkpoint` at 0 after recovery, so the bytes/time gate
1003    /// would treat all log written before the crash as "since the last
1004    /// checkpoint" — firing a redundant checkpoint immediately, or (for the
1005    /// time branch) measuring the interval from the wrong baseline.
1006    ///
1007    /// JE ref: `Checkpointer.initIntervals(lastCheckpointStart,
1008    /// lastCheckpointEnd, lastCheckpointMillis)` — called from
1009    /// `RecoveryManager.recover()` after the recovery scan completes.  Noxu
1010    /// passes the recovered `checkpoint_start_lsn` / `checkpoint_end_lsn`
1011    /// (NULL_LSN when the log had no prior checkpoint, matching JE).
1012    pub fn init_intervals(
1013        &self,
1014        last_checkpoint_start: Lsn,
1015        last_checkpoint_end: Lsn,
1016    ) {
1017        *self.last_checkpoint_start.lock() = last_checkpoint_start;
1018        *self.last_checkpoint_end.lock() = last_checkpoint_end;
1019        // A freshly-recovered environment has written nothing since the
1020        // recovered checkpoint; reset the byte accumulator so the gate does
1021        // not immediately fire on pre-crash log volume.
1022        self.bytes_since_checkpoint.store(0, Ordering::Relaxed);
1023    }
1024
1025    /// REC-H: continue the checkpoint-ID sequence after recovery instead of
1026    /// restarting at 1.  The next checkpoint will use `last_checkpoint_id + 1`.
1027    ///
1028    /// The ID is a debug/log tag (not a correctness key), but it should not
1029    /// regress or collide across restarts.  Seeded from the recovered
1030    /// `CkptEnd.id`.
1031    ///
1032    /// JE ref: `Checkpointer.setCheckpointId(lastCheckpointId)` — "can only be
1033    /// done after recovery"; JE stores `checkpointId = lastCheckpointId` and
1034    /// `incrementProgress`/`generateCheckpointId` advances from there.  Noxu's
1035    /// `do_checkpoint` does `fetch_add(1)`, so we seed `next_checkpoint_id =
1036    /// last_checkpoint_id + 1` to make the next emitted ID strictly greater.
1037    pub fn set_checkpoint_id(&self, last_checkpoint_id: u64) {
1038        self.next_checkpoint_id.store(last_checkpoint_id + 1, Ordering::SeqCst);
1039    }
1040
1041    /// Flush all dirty BINs to the log (public, unit-result API).
1042    ///
1043    /// Calls the internal flush logic and discards the detailed `FlushResult`,
1044    /// returning only success/failure.  Use this from external callers (e.g.
1045    /// daemon threads) that do not need per-BIN counts.
1046    ///
1047    /// `Checkpointer.doCheckpoint()` partial flush path.
1048    pub fn flush_dirty_bins(&self) -> Result<()> {
1049        self.flush_dirty_bins_internal().map(|_| ())
1050    }
1051
1052    /// Internal flush all dirty BINs to the log.
1053    ///
1054    /// Flushes dirty BINs from `self.tree` (primary tree) AND from every
1055    /// tree in `self.db_trees_registry` (user databases).
1056    ///
1057    /// JE `Checkpointer.processINList` walks a single env-wide `INList`
1058    /// covering all databases; Noxu achieves the same effect by iterating the
1059    /// `db_trees_registry` and calling the per-tree BIN-flush logic for each.
1060    ///
1061    /// For each dirty BIN `BinStub::should_log_delta(bin_delta_percent)`
1062    /// (faithful JE `BIN.shouldLogDelta`, BIN.java:1892) decides:
1063    /// - delta-slot count `<= nEntries * bin_delta_percent / 100` (and no
1064    ///   prohibit / a prior full exists) → write `BINDelta` entry (delta path)
1065    /// - otherwise                  → write full `BIN` entry (full path)
1066    ///
1067    /// Also calls `persist_file_summaries()` to ensure utilization data is
1068    /// durable.
1069    ///
1070    /// `Checkpointer.processINList()` + `Checkpointer.logIN()`.
1071    pub(crate) fn flush_dirty_bins_internal(&self) -> Result<FlushResult> {
1072        let mut result = FlushResult::default();
1073
1074        let lm = match &self.log_manager {
1075            Some(lm) => lm,
1076            // No log manager — nothing to flush (unit tests).
1077            None => return Ok(result),
1078        };
1079
1080        // Stage-1: flush the primary tree (if wired) then every user-database
1081        // tree from the registry.  JE's equivalent is processINList walking
1082        // the single env-wide INList that covers all databases.
1083        //
1084        // IMPORTANT: the primary_tree (self.tree, db_id=1) and the user-database
1085        // real_tree for db_id=1 are DIFFERENT Arc<RwLock<Tree>> objects.  The
1086        // primary_tree is used by the cleaner for LN migration but is never
1087        // written by user operations.  User data lives in the real_trees stored
1088        // in db_trees_registry.  We flush both: primary_tree first (harmless
1089        // if empty), then all registry trees (where user data lives).
1090        // No skip guard — the registry trees are always distinct objects from
1091        // self.tree even when their db_id happens to match self.db_id.
1092        let mut trees_to_flush: Vec<(u64, Arc<RwLock<Tree>>)> = Vec::new();
1093        if let Some(t) = &self.tree {
1094            trees_to_flush.push((self.db_id, Arc::clone(t)));
1095        }
1096        if let Some(reg) = &self.db_trees_registry
1097            && let Ok(guard) = reg.lock()
1098        {
1099            for (&db_id_i64, tree_arc) in guard.iter() {
1100                let db_id = db_id_i64 as u64;
1101                trees_to_flush.push((db_id, Arc::clone(tree_arc)));
1102            }
1103        }
1104
1105        for (db_id, tree_arc) in trees_to_flush {
1106            let r = Self::flush_one_tree_bins(
1107                db_id,
1108                &tree_arc,
1109                lm,
1110                self.config.bin_delta_percent,
1111            )?;
1112            result.full_bins_flushed += r.full_bins_flushed;
1113            result.delta_ins_flushed += r.delta_ins_flushed;
1114            result.obsolete_delta_lsns.extend(r.obsolete_delta_lsns);
1115        }
1116
1117        // L-5-delta: count the superseded prior BIN-deltas (auxOldLsn)
1118        // obsolete via the wired UtilizationTracker, BEFORE
1119        // persist_file_summaries so the counts land in this checkpoint's
1120        // FileSummaryLN.  JE: LogManager.serialLogWork counts auxOldLsn via
1121        // countObsoleteNodeDupsAllowed(auxOldLsn, type, size=0, nodeDb).
1122        if !result.obsolete_delta_lsns.is_empty()
1123            && let Some(tracker_lock) = &self.utilization_tracker
1124        {
1125            let mut tracker = tracker_lock.lock();
1126            for (lsn, db_id) in &result.obsolete_delta_lsns {
1127                // size 0 (auxOldLsn carries no size); count_as_ln = false (an
1128                // IN/BIN-delta, not an LN); dups-allowed variant (INs use it).
1129                tracker.count_obsolete_node_dups_allowed(
1130                    lsn.file_number(),
1131                    lsn.file_offset(),
1132                    0,
1133                    false,
1134                    Some(*db_id),
1135                );
1136            }
1137        }
1138
1139        // Persist file utilization summaries so they survive restarts.
1140        self.persist_file_summaries()?;
1141
1142        Ok(result)
1143    }
1144
1145    /// Flush dirty BINs for a single tree to the WAL.
1146    ///
1147    /// Extracted so both `flush_dirty_bins_internal` (primary tree) and the
1148    /// per-user-database loop can share the same logic without duplicating the
1149    /// TREE_BIN_DELTA decision or the X-8 early-exit guard.
1150    fn flush_one_tree_bins(
1151        db_id: u64,
1152        tree_arc: &Arc<RwLock<Tree>>,
1153        lm: &Arc<LogManager>,
1154        bin_delta_percent: i32,
1155    ) -> Result<FlushResult> {
1156        let mut result = FlushResult::default();
1157
1158        // Collect dirty BINs under a read lock on the tree.
1159        let dirty_bins = {
1160            let tree_guard = tree_arc.read().map_err(|_| {
1161                RecoveryError::CheckpointError(
1162                    "tree lock poisoned during checkpoint".to_string(),
1163                )
1164            })?;
1165            tree_guard.collect_dirty_bins(db_id)
1166        };
1167
1168        // The delta-vs-full decision per BIN is made by
1169        // `BinStub::should_log_delta(bin_delta_percent)` below — faithful JE
1170        // `BIN.shouldLogDelta` (count-based + configurable percent).
1171
1172        for (_node_db_id, bin_arc) in dirty_bins {
1173            // Acquire write lock to serialize + clear dirty flags.
1174            let mut bin_guard = bin_arc.write();
1175
1176            let b = match &mut *bin_guard {
1177                TreeNode::Bottom(b) => b,
1178                _ => continue, // not a BIN (defensive)
1179            };
1180
1181            let dirty = b.dirty_count();
1182
1183            // X-8: skip nodes that the evictor already flushed and cleared
1184            // between our dirty-BIN snapshot (under tree read lock) and the
1185            // per-node write-lock acquisition.
1186            if !b.dirty && dirty == 0 {
1187                continue;
1188            }
1189
1190            // TREE_BIN_DELTA decision — faithful JE `BIN.shouldLogDelta`
1191            // (BIN.java:1892): COUNT-based (numDeltas = dirty slots) against the
1192            // CONFIGURABLE percent limit, with the isBINDelta fast path, the
1193            // numDeltas<=0 guard, and the isDeltaProhibited / lastFullLsn==NULL
1194            // bound — all encapsulated in `BinStub::should_log_delta`.
1195            let use_delta = b.should_log_delta(bin_delta_percent);
1196
1197            if use_delta {
1198                // --- BIN-delta path ---
1199                // L-5-delta: the prior BIN-delta this one supersedes (JE
1200                // `auxOldLsn = logEntry.getPrevDeltaLsn()`) becomes obsolete
1201                // when we log the new delta non-provisionally.  Capture it
1202                // BEFORE advancing the delta chain; counted obsolete by
1203                // `flush_dirty_bins_internal` via the tracker.
1204                let superseded_delta_lsn = b.last_delta_lsn;
1205                let delta_bytes = b.serialize_delta();
1206                let entry = BinDeltaLogEntry::new(
1207                    db_id,
1208                    b.last_full_lsn,
1209                    b.last_delta_lsn, // prev_delta_lsn
1210                    delta_bytes,
1211                );
1212                let mut buf = bytes::BytesMut::with_capacity(entry.log_size());
1213                entry.write_to_log(&mut buf);
1214                let delta_logged_lsn = lm
1215                    .log(
1216                        LogEntryType::BINDelta,
1217                        &buf,
1218                        Provisional::No,
1219                        false, // flush_required
1220                        false, // fsync_required — fsync at CkptEnd
1221                    )
1222                    .map_err(|e| {
1223                        RecoveryError::CheckpointError(format!(
1224                            "BINDelta WAL write failed: {e}"
1225                        ))
1226                    })?;
1227                b.last_delta_lsn = delta_logged_lsn; // advance chain for next delta
1228                b.clear_dirty_after_delta_log();
1229                result.delta_ins_flushed += 1;
1230                // L-5-delta: record the superseded prior delta (auxOldLsn) for
1231                // obsolete counting.  Only when non-null — a delta whose prior
1232                // version was a full BIN has prev_delta_lsn == NULL (the full
1233                // version stays live, referenced by the delta).
1234                if superseded_delta_lsn != NULL_LSN {
1235                    result
1236                        .obsolete_delta_lsns
1237                        .push((superseded_delta_lsn, db_id as u32));
1238                }
1239            } else {
1240                // --- Full BIN path ---
1241                let full_bytes = b.serialize_full();
1242                let entry = InLogEntry::new(
1243                    db_id,
1244                    b.last_full_lsn,
1245                    NULL_LSN, // prev_delta_lsn
1246                    full_bytes,
1247                );
1248                let mut buf = bytes::BytesMut::with_capacity(entry.log_size());
1249                entry.write_to_log(&mut buf);
1250                let logged_lsn = lm
1251                    .log(
1252                        LogEntryType::BIN,
1253                        &buf,
1254                        Provisional::No,
1255                        false, // flush_required
1256                        false, // fsync_required — fsync at CkptEnd
1257                    )
1258                    .map_err(|e| {
1259                        RecoveryError::CheckpointError(format!(
1260                            "BIN WAL write failed: {e}"
1261                        ))
1262                    })?;
1263                b.last_delta_lsn = NULL_LSN; // full BIN resets delta chain
1264                b.clear_dirty_after_full_log(logged_lsn);
1265                result.full_bins_flushed += 1;
1266            }
1267        }
1268
1269        Ok(result)
1270    }
1271
1272    /// Flush all dirty upper INs (level ≥ 2) bottom-up to the WAL.
1273    ///
1274    /// Flushes upper INs from `self.tree` (primary tree) AND from every tree
1275    /// in `self.db_trees_registry` (user databases), mirroring
1276    /// `flush_dirty_bins_internal`'s all-trees iteration.
1277    ///
1278    /// `Checkpointer.processINList()` upper-IN pass +
1279    /// `Checkpointer.logIN()` for `TreeNode::Internal` nodes.
1280    fn flush_upper_ins_internal(&self) -> Result<FlushResult> {
1281        let mut result = FlushResult::default();
1282
1283        let lm = match &self.log_manager {
1284            Some(lm) => lm,
1285            None => return Ok(result),
1286        };
1287
1288        let mut trees_to_flush: Vec<(u64, Arc<RwLock<Tree>>)> = Vec::new();
1289        if let Some(t) = &self.tree {
1290            trees_to_flush.push((self.db_id, Arc::clone(t)));
1291        }
1292        if let Some(reg) = &self.db_trees_registry
1293            && let Ok(guard) = reg.lock()
1294        {
1295            for (&db_id_i64, tree_arc) in guard.iter() {
1296                let db_id = db_id_i64 as u64;
1297                // No skip guard: registry trees are distinct objects from self.tree.
1298                trees_to_flush.push((db_id, Arc::clone(tree_arc)));
1299            }
1300        }
1301
1302        // CC-4 residual fix: compute the per-tree highest flush level before
1303        // any logging begins.  Populate checkpoint_flush_levels with one entry
1304        // per tree that has dirty upper INs.  Trees absent from the map have
1305        // no dirty upper INs → their BINs must NOT be logged Provisional::Yes.
1306        //
1307        // JE ref: DirtyINMap.highestFlushLevels (Map<DatabaseImpl, Integer>);
1308        // getHighestFlushLevel(db) returns IN.MIN_LEVEL (0) for absent keys,
1309        // making coordinateEvictionWithCheckpoint return Provisional.NO.
1310        //
1311        // Memory ordering: the map is populated inside the Mutex before the
1312        // first WAL write.  The evictor acquires the same Mutex to read it
1313        // (Mutex provides the necessary happens-before).  The RAII guard in
1314        // do_checkpoint clears the map via CheckpointGuard::drop.
1315        {
1316            let mut levels = self
1317                .checkpoint_flush_levels
1318                .lock()
1319                .unwrap_or_else(|e| e.into_inner());
1320            levels.clear();
1321            for (db_id, tree_arc) in &trees_to_flush {
1322                // REC-AA: the recorded highest flush level is
1323                // `max(dirty-upper-IN-level) + 1`, bounded by the root level
1324                // — JE DirtyINMap.updateFlushLevels flushes at least one level
1325                // ABOVE the highest dirty node (`(ckptFlushExtraLevel || isBIN)
1326                // && !isRoot` → `level += 1`) so the lower level is logged
1327                // provisionally and recovery skips reprocessing it.  The `+1`
1328                // is bounded by the root level (`!isRoot` guard) so we never
1329                // claim to flush above the tree root — a node AT the root level
1330                // is the non-provisional anchor and must NOT itself be marked
1331                // coverable.
1332                let flush_level = tree_arc.read().ok().and_then(|guard| {
1333                    let dirty_ins = guard.collect_dirty_upper_ins(*db_id);
1334                    let max_dirty =
1335                        dirty_ins.iter().map(|(lvl, _)| *lvl).max()?;
1336                    // Root level bounds the +1.  The root is the
1337                    // highest-level resident node.
1338                    let root_level = guard
1339                        .get_root()
1340                        .map(|r| r.read().level())
1341                        .unwrap_or(max_dirty);
1342                    Some((max_dirty + 1).min(root_level))
1343                });
1344                if let Some(level) = flush_level
1345                    && level > 0
1346                {
1347                    levels.insert(*db_id, level);
1348                }
1349            }
1350        }
1351
1352        for (db_id, tree_arc) in trees_to_flush {
1353            let r = Self::flush_one_tree_upper_ins(db_id, &tree_arc, lm)?;
1354            result.full_ins_flushed += r.full_ins_flushed;
1355        }
1356
1357        Ok(result)
1358    }
1359
1360    /// Flush dirty upper INs for a single tree to the WAL.
1361    fn flush_one_tree_upper_ins(
1362        db_id: u64,
1363        tree_arc: &Arc<RwLock<Tree>>,
1364        lm: &Arc<LogManager>,
1365    ) -> Result<FlushResult> {
1366        let mut result = FlushResult::default();
1367
1368        // Collect dirty upper INs under a read lock.
1369        let dirty_ins = {
1370            let tree_guard = tree_arc.read().map_err(|_| {
1371                RecoveryError::CheckpointError(
1372                    "tree lock poisoned during upper-IN flush".to_string(),
1373                )
1374            })?;
1375            tree_guard.collect_dirty_upper_ins(db_id)
1376        };
1377
1378        if dirty_ins.is_empty() {
1379            return Ok(result);
1380        }
1381
1382        // The maximum level present is the root level; it must be logged
1383        // Provisional::No.  All others use Provisional::Yes.
1384        let max_level =
1385            dirty_ins.iter().map(|(lvl, _)| *lvl).max().unwrap_or(0);
1386
1387        for (level, node_arc) in &dirty_ins {
1388            let mut node_guard = node_arc.write();
1389
1390            if !node_guard.is_dirty() {
1391                continue; // may have been cleared by a concurrent checkpoint
1392            }
1393
1394            // Serialize the upper IN using the existing `write_to_bytes()` path.
1395            let node_bytes = node_guard.write_to_bytes();
1396            let provisional = if *level == max_level {
1397                Provisional::No
1398            } else {
1399                Provisional::Yes
1400            };
1401
1402            let entry = InLogEntry::new(
1403                db_id,
1404                noxu_util::NULL_LSN, // prev_full_lsn — no previous version tracking for upper INs yet
1405                noxu_util::NULL_LSN, // prev_delta_lsn
1406                node_bytes,
1407            );
1408            let mut buf = bytes::BytesMut::with_capacity(entry.log_size());
1409            entry.write_to_log(&mut buf);
1410            lm.log(
1411                LogEntryType::IN,
1412                &buf,
1413                provisional,
1414                false, // flush_required
1415                false, // fsync_required — fsync at CkptEnd
1416            )
1417            .map_err(|e| {
1418                RecoveryError::CheckpointError(format!(
1419                    "IN WAL write failed: {e}"
1420                ))
1421            })?;
1422
1423            node_guard.set_dirty(false);
1424            result.full_ins_flushed += 1;
1425        }
1426
1427        Ok(result)
1428    }
1429}
1430
1431/// RAII guard to ensure `checkpoint_in_progress` and `checkpoint_flush_levels`
1432/// are cleared when the checkpoint finishes or is abandoned.
1433///
1434/// CC-4 residual: `flush_levels` must be cleared so the evictor stops
1435/// returning `Provisional::Yes` for any tree after the checkpoint ends.
1436struct CheckpointGuard<'a> {
1437    flag: &'a AtomicBool,
1438    flush_levels: &'a std::sync::Mutex<HashMap<u64, i32>>,
1439}
1440
1441impl<'a> Drop for CheckpointGuard<'a> {
1442    fn drop(&mut self) {
1443        // Clear per-tree flush levels before clearing the in_progress flag.
1444        // An evictor that reads in_progress=true will still see the (stale)
1445        // map; once in_progress goes false the map contents are irrelevant.
1446        if let Ok(mut levels) = self.flush_levels.lock() {
1447            levels.clear();
1448        }
1449        self.flag.store(false, Ordering::Release);
1450    }
1451}
1452
1453/// Internal struct for tracking flush results.
1454#[derive(Debug, Default)]
1455pub(crate) struct FlushResult {
1456    full_ins_flushed: u64,
1457    full_bins_flushed: u64,
1458    delta_ins_flushed: u64,
1459    /// L-5-delta: per-DB superseded prior BIN-delta LSNs made obsolete by a
1460    /// newly-logged BIN-delta (the `prev_delta_lsn` / JE `auxOldLsn`).
1461    /// `flush_dirty_bins_internal` counts these obsolete via the wired
1462    /// `UtilizationTracker` after the flush (it has `&self`; the per-tree
1463    /// flush is a static helper without tracker access).  Tuple is
1464    /// `(prev_delta_lsn, db_id)`.  JE: IN.java auxOldLsn ->
1465    /// LogManager.countObsoleteNodeDupsAllowed.
1466    obsolete_delta_lsns: Vec<(Lsn, u32)>,
1467}
1468
1469#[cfg(test)]
1470mod tests {
1471    use super::*;
1472
1473    #[test]
1474    fn test_checkpoint_config_default() {
1475        let config = CheckpointConfig::default();
1476        assert!(!config.force);
1477        assert!(!config.minimize_recovery_time);
1478        assert_eq!(config.bytes_interval, 20_000_000);
1479        assert_eq!(config.time_interval, 0);
1480    }
1481
1482    #[test]
1483    fn test_checkpoint_config_builder() {
1484        let config = CheckpointConfig::new()
1485            .force(true)
1486            .minimize_recovery_time(true)
1487            .bytes_interval(10_000_000)
1488            .time_interval(5000);
1489        assert!(config.force);
1490        assert!(config.minimize_recovery_time);
1491        assert_eq!(config.bytes_interval, 10_000_000);
1492        assert_eq!(config.time_interval, 5000);
1493    }
1494
1495    #[test]
1496    fn test_checkpoint_result() {
1497        let result = CheckpointResult {
1498            checkpoint_id: 42,
1499            start_lsn: Lsn::new(1, 100),
1500            end_lsn: Lsn::new(1, 200),
1501            full_ins_flushed: 10,
1502            full_bins_flushed: 20,
1503            delta_ins_flushed: 5,
1504            elapsed_ms: 250,
1505        };
1506        assert_eq!(result.checkpoint_id, 42);
1507        assert_eq!(result.total_nodes_flushed(), 35);
1508    }
1509
1510    #[test]
1511    fn test_checkpointer_new() {
1512        let checkpointer = Checkpointer::new(CheckpointConfig::default());
1513        assert!(!checkpointer.is_checkpoint_in_progress());
1514        assert!(!checkpointer.is_shutdown());
1515        assert_eq!(checkpointer.peek_next_checkpoint_id(), 1);
1516        assert_eq!(
1517            checkpointer.get_last_checkpoint_start(),
1518            noxu_util::NULL_LSN
1519        );
1520        assert_eq!(checkpointer.get_last_checkpoint_end(), noxu_util::NULL_LSN);
1521    }
1522
1523    #[test]
1524    fn test_checkpointer_do_checkpoint() {
1525        let checkpointer = Checkpointer::new(CheckpointConfig::default());
1526        let result = checkpointer.do_checkpoint("test").unwrap();
1527        assert_eq!(result.checkpoint_id, 1);
1528        assert!(result.start_lsn != noxu_util::NULL_LSN);
1529        assert!(result.end_lsn != noxu_util::NULL_LSN);
1530        assert_eq!(result.total_nodes_flushed(), 0);
1531    }
1532
1533    #[test]
1534    fn test_checkpointer_sequential_ids() {
1535        let checkpointer = Checkpointer::new(CheckpointConfig::default());
1536        let result1 = checkpointer.do_checkpoint("test1").unwrap();
1537        let result2 = checkpointer.do_checkpoint("test2").unwrap();
1538        assert_eq!(result1.checkpoint_id, 1);
1539        assert_eq!(result2.checkpoint_id, 2);
1540    }
1541
1542    #[test]
1543    fn test_checkpointer_concurrent_checkpoint_fails() {
1544        let checkpointer = Checkpointer::new(CheckpointConfig::default());
1545        checkpointer.checkpoint_in_progress.store(true, Ordering::Release);
1546        let result = checkpointer.do_checkpoint("test");
1547        assert!(result.is_err());
1548        if let Err(RecoveryError::CheckpointError(msg)) = result {
1549            assert!(msg.contains("already in progress"));
1550        } else {
1551            panic!("Expected CheckpointError");
1552        }
1553    }
1554
1555    #[test]
1556    fn test_checkpointer_shutdown() {
1557        let checkpointer = Checkpointer::new(CheckpointConfig::default());
1558        checkpointer.request_shutdown();
1559        assert!(checkpointer.is_shutdown());
1560        let result = checkpointer.do_checkpoint("test");
1561        assert!(result.is_err());
1562        if let Err(RecoveryError::CheckpointError(msg)) = result {
1563            assert!(msg.contains("shut down"));
1564        } else {
1565            panic!("Expected CheckpointError");
1566        }
1567    }
1568
1569    #[test]
1570    fn test_checkpointer_last_lsns() {
1571        let checkpointer = Checkpointer::new(CheckpointConfig::default());
1572        let result = checkpointer.do_checkpoint("test").unwrap();
1573        assert_eq!(checkpointer.get_last_checkpoint_start(), result.start_lsn);
1574        assert_eq!(checkpointer.get_last_checkpoint_end(), result.end_lsn);
1575    }
1576
1577    #[test]
1578    fn test_checkpointer_stats() {
1579        let checkpointer = Checkpointer::new(CheckpointConfig::default());
1580        let stats = checkpointer.get_stats();
1581        assert_eq!(stats.checkpoints.load(Ordering::Relaxed), 0);
1582        checkpointer.do_checkpoint("test").unwrap();
1583        assert_eq!(stats.checkpoints.load(Ordering::Relaxed), 1);
1584    }
1585
1586    #[test]
1587    fn test_checkpoint_guard() {
1588        let flag = AtomicBool::new(false);
1589        let levels: std::sync::Mutex<HashMap<u64, i32>> =
1590            std::sync::Mutex::new(HashMap::from([(1u64, 3i32)]));
1591        {
1592            flag.store(true, Ordering::Release);
1593            let _guard = CheckpointGuard { flag: &flag, flush_levels: &levels };
1594            assert!(flag.load(Ordering::Acquire));
1595        }
1596        assert!(!flag.load(Ordering::Acquire));
1597        assert!(
1598            levels.lock().unwrap().is_empty(),
1599            "guard must clear flush_levels map"
1600        );
1601    }
1602
1603    #[test]
1604    fn test_checkpoint_config_cloning() {
1605        let config1 = CheckpointConfig::new().force(true).bytes_interval(1000);
1606        let config2 = config1.clone();
1607        assert_eq!(config1.force, config2.force);
1608        assert_eq!(config1.bytes_interval, config2.bytes_interval);
1609    }
1610
1611    #[test]
1612    fn test_checkpoint_result_cloning() {
1613        let result1 = CheckpointResult {
1614            checkpoint_id: 1,
1615            start_lsn: Lsn::new(1, 100),
1616            end_lsn: Lsn::new(1, 200),
1617            full_ins_flushed: 10,
1618            full_bins_flushed: 20,
1619            delta_ins_flushed: 5,
1620            elapsed_ms: 100,
1621        };
1622        let result2 = result1.clone();
1623        assert_eq!(result1.checkpoint_id, result2.checkpoint_id);
1624    }
1625
1626    #[test]
1627    fn test_peek_next_checkpoint_id() {
1628        let checkpointer = Checkpointer::new(CheckpointConfig::default());
1629        assert_eq!(checkpointer.peek_next_checkpoint_id(), 1);
1630        checkpointer.do_checkpoint("test").unwrap();
1631        assert_eq!(checkpointer.peek_next_checkpoint_id(), 2);
1632    }
1633
1634    #[test]
1635    fn test_multiple_checkpoints_update_lsns() {
1636        let checkpointer = Checkpointer::new(CheckpointConfig::default());
1637        let result1 = checkpointer.do_checkpoint("test1").unwrap();
1638        let result2 = checkpointer.do_checkpoint("test2").unwrap();
1639        assert_eq!(checkpointer.get_last_checkpoint_start(), result2.start_lsn);
1640        assert_eq!(checkpointer.get_last_checkpoint_end(), result2.end_lsn);
1641        assert_ne!(result1.start_lsn, result2.start_lsn);
1642    }
1643
1644    // -----------------------------------------------------------------------
1645    // Tests that require a real LogManager / FileManager
1646    // -----------------------------------------------------------------------
1647
1648    #[test]
1649    fn test_checkpoint_writes_wal_entries() {
1650        use noxu_log::{FileManager, LogManager};
1651        use std::sync::Arc;
1652        use tempfile::TempDir;
1653
1654        let dir = TempDir::new().unwrap();
1655        let fm = Arc::new(
1656            FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
1657        );
1658        let lm =
1659            Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
1660
1661        let checkpointer = Checkpointer::new(CheckpointConfig::default())
1662            .with_log_manager(Arc::clone(&lm));
1663
1664        let result = checkpointer.do_checkpoint("test_wal").unwrap();
1665
1666        // Both LSNs must be non-null and the end must follow the start.
1667        assert!(
1668            !result.start_lsn.is_null(),
1669            "start_lsn must not be NULL after a WAL-backed checkpoint"
1670        );
1671        assert!(
1672            !result.end_lsn.is_null(),
1673            "end_lsn must not be NULL after a WAL-backed checkpoint"
1674        );
1675        assert!(
1676            result.end_lsn.as_u64() > result.start_lsn.as_u64(),
1677            "end_lsn ({:?}) must be greater than start_lsn ({:?})",
1678            result.end_lsn,
1679            result.start_lsn
1680        );
1681
1682        // The stored LSNs on the checkpointer must match the returned result.
1683        assert_eq!(checkpointer.get_last_checkpoint_start(), result.start_lsn);
1684        assert_eq!(checkpointer.get_last_checkpoint_end(), result.end_lsn);
1685    }
1686
1687    #[test]
1688    fn test_two_sequential_wal_checkpoints_have_increasing_lsns() {
1689        use noxu_log::{FileManager, LogManager};
1690        use std::sync::Arc;
1691        use tempfile::TempDir;
1692
1693        let dir = TempDir::new().unwrap();
1694        let fm = Arc::new(
1695            FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
1696        );
1697        let lm =
1698            Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
1699
1700        let checkpointer = Checkpointer::new(CheckpointConfig::default())
1701            .with_log_manager(Arc::clone(&lm));
1702
1703        let r1 = checkpointer.do_checkpoint("first").unwrap();
1704        let r2 = checkpointer.do_checkpoint("second").unwrap();
1705
1706        // Each successive checkpoint must have strictly higher LSNs.
1707        assert!(
1708            r2.start_lsn.as_u64() > r1.end_lsn.as_u64(),
1709            "second start ({:?}) must follow first end ({:?})",
1710            r2.start_lsn,
1711            r1.end_lsn
1712        );
1713        assert!(
1714            r2.end_lsn.as_u64() > r2.start_lsn.as_u64(),
1715            "second end ({:?}) must follow second start ({:?})",
1716            r2.end_lsn,
1717            r2.start_lsn
1718        );
1719    }
1720
1721    /// REC-F1 reproduce-first: every `do_checkpoint` path must make the
1722    /// `CkptEnd` entry durable with an fsync BEFORE the cleaner barrier is
1723    /// advanced.  JE Checkpointer.doCheckpoint (~line 895) calls
1724    /// `logManager.logForceFlush(endEntry, true /*fsyncRequired*/, ...)`
1725    /// with the comment "We must flush and fsync to ensure that cleaned
1726    /// files are not referenced. This also ensures that this checkpoint is
1727    /// not wasted if we crash."  Without the fsync, an auto/daemon
1728    /// checkpoint advances the safe-to-delete barrier off a non-durable
1729    /// checkpoint — a crash can then lose committed/migrated data.
1730    #[test]
1731    fn test_do_checkpoint_fsyncs_ckpt_end() {
1732        use noxu_log::{FileManager, LogManager};
1733        use std::sync::Arc;
1734        use tempfile::TempDir;
1735
1736        let dir = TempDir::new().unwrap();
1737        let fm = Arc::new(
1738            FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
1739        );
1740        let lm =
1741            Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
1742
1743        let checkpointer = Checkpointer::new(CheckpointConfig::default())
1744            .with_log_manager(Arc::clone(&lm));
1745
1746        let before = lm.fsync_count();
1747        checkpointer.do_checkpoint("daemon").unwrap();
1748        let after = lm.fsync_count();
1749
1750        assert!(
1751            after > before,
1752            "do_checkpoint must fsync the CkptEnd entry (JE logForceFlush \
1753             fsyncRequired=true); fsync_count before={before} after={after}"
1754        );
1755    }
1756
1757    // -----------------------------------------------------------------------
1758    // Tests for new methods: wakeup_after_write, is_checkpointed,
1759    // persist_file_summaries
1760    // -----------------------------------------------------------------------
1761
1762    /// `wakeup_after_write` triggers a checkpoint once accumulated bytes
1763    /// exceed the configured threshold.
1764    #[test]
1765    fn test_wakeup_after_write_triggers_checkpoint() {
1766        let checkpointer = Checkpointer::new(CheckpointConfig::default())
1767            .with_bytes_interval(100); // tiny threshold for testing
1768
1769        // Initial state: no checkpoints performed yet.
1770        assert_eq!(checkpointer.stats.checkpoints.load(Ordering::Relaxed), 0);
1771
1772        // Write 99 bytes — below threshold; no checkpoint yet.
1773        checkpointer.wakeup_after_write(99);
1774        assert_eq!(
1775            checkpointer.stats.checkpoints.load(Ordering::Relaxed),
1776            0,
1777            "no checkpoint should fire below the threshold"
1778        );
1779
1780        // Write 1 more byte — reaches threshold; checkpoint fires.
1781        checkpointer.wakeup_after_write(1);
1782        assert_eq!(
1783            checkpointer.stats.checkpoints.load(Ordering::Relaxed),
1784            1,
1785            "exactly one checkpoint should fire when threshold is reached"
1786        );
1787
1788        // Counter should have been reset; another 100 bytes should trigger again.
1789        checkpointer.wakeup_after_write(100);
1790        assert_eq!(
1791            checkpointer.stats.checkpoints.load(Ordering::Relaxed),
1792            2,
1793            "second checkpoint should fire after counter reset"
1794        );
1795    }
1796
1797    /// `wakeup_after_write` with interval=0 is a no-op.
1798    #[test]
1799    fn test_wakeup_after_write_disabled_when_interval_zero() {
1800        let checkpointer = Checkpointer::new(CheckpointConfig::default())
1801            .with_bytes_interval(0);
1802
1803        checkpointer.wakeup_after_write(u64::MAX);
1804        assert_eq!(
1805            checkpointer.stats.checkpoints.load(Ordering::Relaxed),
1806            0,
1807            "no checkpoint should fire when interval is 0"
1808        );
1809    }
1810
1811    /// CLN-14: `wakeup_after_no_writes` wakes a daemon-style thread blocked in
1812    /// `wait_for_shutdown_or_timeout` PROMPTLY — well under the (long) sleep
1813    /// interval — without setting the shutdown flag.  This is the primitive
1814    /// the cleaner's wakeup callback uses so cleaned files are deleted at the
1815    /// next early checkpoint instead of after the full wakeup interval
1816    /// (default 60 s).
1817    ///
1818    /// JE: `Checkpointer.wakeupAfterNoWrites` sets a flag and wakes the
1819    /// daemon; the daemon re-checks `isRunnable` (`needCheckpointForCleanedFiles`).
1820    #[test]
1821    fn test_cln14_wakeup_after_no_writes_wakes_daemon_promptly() {
1822        use std::time::{Duration, Instant};
1823
1824        let checkpointer =
1825            Arc::new(Checkpointer::new(CheckpointConfig::default()));
1826
1827        // A daemon-style thread that sleeps for a "60 s" interval on the
1828        // condvar, exactly like the EnvironmentImpl checkpointer daemon.
1829        let ckpt = Arc::clone(&checkpointer);
1830        let woke = Arc::new(AtomicBool::new(false));
1831        let woke2 = Arc::clone(&woke);
1832        let start = Instant::now();
1833        let handle = std::thread::spawn(move || {
1834            ckpt.wait_for_shutdown_or_timeout(Duration::from_secs(60));
1835            woke2.store(true, Ordering::Release);
1836            // The wake must NOT be a shutdown — the daemon would keep running.
1837            assert!(
1838                !ckpt.is_shutdown(),
1839                "wakeup_after_no_writes must not set the shutdown flag"
1840            );
1841        });
1842
1843        // Give the daemon a moment to enter the wait, then wake it.
1844        std::thread::sleep(Duration::from_millis(50));
1845        checkpointer.wakeup_after_no_writes();
1846
1847        handle.join().unwrap();
1848        let elapsed = start.elapsed();
1849        assert!(woke.load(Ordering::Acquire), "daemon thread must have woken");
1850        assert!(
1851            elapsed < Duration::from_secs(5),
1852            "CLN-14: daemon must wake promptly ({:?}), not after the 60 s interval",
1853            elapsed
1854        );
1855    }
1856
1857    /// `is_checkpointed` returns `false` for a BIN whose `last_full_lsn` is
1858    /// NULL_LSN (never checkpointed) and `true` after setting a non-NULL LSN.
1859    #[test]
1860    fn test_is_runnable_idle_guard() {
1861        // The daemon must NOT checkpoint an idle environment every wakeup.
1862        // is_runnable(false) is false until the relevant interval trips; force
1863        // is always true.
1864        //
1865        // REC-D: when the byte interval is set (non-zero) it takes precedence
1866        // (JE getWakeupPeriod / isRunnable: useTimeInterval stays 0), so a
1867        // sub-interval write is NOT runnable — only crossing the byte interval
1868        // is.
1869        let cp = Checkpointer::new(CheckpointConfig::default())
1870            .with_bytes_interval(1024);
1871        // Idle: nothing written since the last checkpoint.
1872        assert!(!cp.is_runnable(false), "idle env must not be runnable");
1873        // Force always runs (JE config.getForce()).
1874        assert!(cp.is_runnable(true), "force must always be runnable");
1875        // A sub-interval write is NOT runnable when a byte interval is set
1876        // (bytes takes precedence over time per JE isRunnable).
1877        cp.note_bytes_for_test(100);
1878        assert!(
1879            !cp.is_runnable(false),
1880            "sub-interval write must not be runnable when a byte interval is set \
1881             (REC-D: bytes takes precedence over the time branch)"
1882        );
1883        // Crossing the byte interval is runnable.
1884        cp.note_bytes_for_test(2000);
1885        assert!(cp.is_runnable(false));
1886    }
1887
1888    /// REC-D: when the byte interval is DISABLED (0) the time branch applies
1889    /// — any write since the last checkpoint makes the daemon runnable on its
1890    /// next wakeup (JE isRunnable useTimeInterval branch with the
1891    /// `lastUsedLsn != lastCheckpointEnd` idle-guard).
1892    #[test]
1893    fn test_is_runnable_time_branch_when_bytes_disabled() {
1894        let cp = Checkpointer::new(CheckpointConfig::default())
1895            .with_bytes_interval(0); // bytes disabled → time-based
1896        // Idle: nothing written → not runnable (idle-guard).
1897        assert!(!cp.is_runnable(false), "idle time-based env must not run");
1898        // Any write makes it runnable on the next wakeup tick.
1899        cp.note_bytes_for_test(1);
1900        assert!(
1901            cp.is_runnable(false),
1902            "time branch: a write since the last checkpoint makes it runnable"
1903        );
1904    }
1905
1906    #[test]
1907    fn test_is_checkpointed() {
1908        use noxu_tree::tree::{BinStub, TreeNode};
1909        use parking_lot::RwLock as NodeRwLock;
1910
1911        // Build a BIN node with last_full_lsn = NULL_LSN.
1912        let bin = BinStub {
1913            node_id: 1,
1914            level: 0,
1915            entries: vec![],
1916            key_prefix: vec![],
1917            dirty: false,
1918            is_delta: false,
1919            last_full_lsn: noxu_util::NULL_LSN,
1920            last_delta_lsn: noxu_util::NULL_LSN,
1921            generation: 0,
1922            parent: None,
1923            // St-H6: test-only BIN; use true to match the engine-wide
1924            // hours-only invariant and avoid any accidental comparison with
1925            // a non-zero expiration_time.
1926            expiration_in_hours: true,
1927            cursor_count: 0,
1928            prohibit_next_delta: false,
1929            lsn_rep: noxu_tree::tree::LsnRep::Empty,
1930            keys: noxu_tree::tree::KeyRep::new(),
1931            compact_max_key_length:
1932                noxu_tree::tree::INKeyRep_DEFAULT_MAX_KEY_LENGTH,
1933        };
1934        let node = NodeRwLock::new(TreeNode::Bottom(bin));
1935
1936        // Not yet checkpointed.
1937        assert!(
1938            !Checkpointer::is_checkpointed(&node),
1939            "fresh BIN should not be checkpointed"
1940        );
1941
1942        // Simulate a checkpoint by setting last_full_lsn.
1943        {
1944            let mut guard = node.write();
1945            if let TreeNode::Bottom(ref mut b) = *guard {
1946                b.last_full_lsn = Lsn::new(1, 100);
1947            }
1948        }
1949
1950        assert!(
1951            Checkpointer::is_checkpointed(&node),
1952            "BIN should be checkpointed after last_full_lsn is set"
1953        );
1954    }
1955
1956    /// `persist_file_summaries` returns Ok(()) without panicking.
1957    #[test]
1958    fn test_persist_file_summaries_is_ok() {
1959        let checkpointer = Checkpointer::new(CheckpointConfig::default());
1960        assert!(checkpointer.persist_file_summaries().is_ok());
1961    }
1962
1963    /// `persist_file_summaries` with a wired UtilizationTracker actually writes
1964    /// a `FileSummaryLN` entry to the WAL.
1965    ///
1966    /// Wires a real LogManager + UtilizationTracker, calls
1967    /// `persist_file_summaries()`, then scans the log file with
1968    /// `LogFileReader` to verify at least one `FileSummaryLN` entry was
1969    /// written.
1970    #[test]
1971    fn test_persist_file_summaries_writes_file_summary_ln_to_log() {
1972        use noxu_cleaner::UtilizationTracker;
1973        use noxu_log::{FileManager, LogEntryType, LogFileReader};
1974        use tempfile::TempDir;
1975
1976        let dir = TempDir::new().unwrap();
1977        let fm = Arc::new(
1978            FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
1979        );
1980        let lm =
1981            Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
1982
1983        // Populate the tracker with a non-empty file summary so something is
1984        // written when persist_file_summaries() is called.
1985        let mut tracker = UtilizationTracker::new(true);
1986        tracker.count_new_log_entry(0, 128, true, false);
1987        tracker.track_obsolete(0, 64, 64, true);
1988        let tracker_arc = Arc::new(Mutex::new(tracker));
1989
1990        let checkpointer = Checkpointer::new(CheckpointConfig::default())
1991            .with_log_manager(Arc::clone(&lm))
1992            .with_utilization_tracker(Arc::clone(&tracker_arc));
1993
1994        checkpointer.persist_file_summaries().unwrap();
1995
1996        // Flush to disk so the reader can see the bytes.
1997        lm.flush_sync().unwrap();
1998
1999        // Scan all log entries in file 0 and look for FileSummaryLN.
2000        let mut reader = LogFileReader::open(Arc::clone(&fm), 0).unwrap();
2001        let mut found = false;
2002        while let Some((_lsn, entry_type, _payload)) = reader.read_next() {
2003            if entry_type == LogEntryType::FileSummaryLN {
2004                found = true;
2005                break;
2006            }
2007        }
2008        assert!(
2009            found,
2010            "expected a FileSummaryLN entry in the log after persist_file_summaries()"
2011        );
2012    }
2013
2014    /// Checkpoint with a real tree flushes dirty BINs — step 4.
2015    ///
2016    /// Inserts a few keys (marking BIN slots dirty), then runs a checkpoint
2017    /// and verifies the dirty count drops to zero after the checkpoint writes
2018    /// BIN/BINDelta entries to the WAL.
2019    #[test]
2020    fn test_checkpoint_flushes_dirty_bins() {
2021        use noxu_log::FileManager;
2022        use noxu_tree::tree::Tree;
2023        use noxu_util::lsn::Lsn;
2024        use tempfile::TempDir;
2025
2026        let dir = TempDir::new().unwrap();
2027        let fm = Arc::new(
2028            FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
2029        );
2030        let lm =
2031            Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
2032
2033        // Build a tree with dirty BINs.
2034        let tree = Tree::new(1, 256);
2035        tree.insert(b"apple".to_vec(), b"fruit".to_vec(), Lsn::new(1, 1))
2036            .unwrap();
2037        tree.insert(b"banana".to_vec(), b"fruit".to_vec(), Lsn::new(1, 2))
2038            .unwrap();
2039        tree.insert(b"cherry".to_vec(), b"fruit".to_vec(), Lsn::new(1, 3))
2040            .unwrap();
2041
2042        let tree_arc = Arc::new(RwLock::new(tree));
2043
2044        // Verify dirty BINs exist before checkpoint.
2045        let dirty_before = tree_arc.read().unwrap().collect_dirty_bins(1);
2046        assert!(
2047            !dirty_before.is_empty(),
2048            "should have dirty BINs before checkpoint"
2049        );
2050
2051        let checkpointer = Checkpointer::new(CheckpointConfig::default())
2052            .with_log_manager(Arc::clone(&lm))
2053            .with_tree(Arc::clone(&tree_arc), 1);
2054
2055        let result = checkpointer.do_checkpoint("test").unwrap();
2056        assert!(
2057            result.total_nodes_flushed() > 0,
2058            "checkpoint should flush dirty BINs"
2059        );
2060
2061        // After checkpoint, dirty BINs should be cleared.
2062        let dirty_after = tree_arc.read().unwrap().collect_dirty_bins(1);
2063        assert!(dirty_after.is_empty(), "no dirty BINs after checkpoint");
2064    }
2065
2066    /// X-8 regression: checkpointer must not write a redundant empty BINDelta
2067    /// for a node that the evictor already flushed and cleared between the
2068    /// dirty-BIN snapshot and the per-node write-lock acquisition.
2069    ///
2070    /// Simulates the race by:
2071    /// 1. Building a tree with dirty BINs.
2072    /// 2. Collecting the dirty-BIN snapshot (as the checkpointer would).
2073    /// 3. Acquiring each BIN's write lock and calling
2074    ///    `clear_dirty_after_full_log` (simulating the evictor flushing).
2075    /// 4. Running `flush_dirty_bins_internal` and asserting that zero
2076    ///    BINDelta or full-BIN entries are written (nothing left to flush).
2077    #[test]
2078    fn test_x8_no_redundant_bindelta_after_evictor_flush() {
2079        use noxu_log::FileManager;
2080        use noxu_tree::tree::Tree;
2081        use noxu_util::lsn::Lsn;
2082        use tempfile::TempDir;
2083
2084        let dir = TempDir::new().unwrap();
2085        let fm = Arc::new(
2086            FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
2087        );
2088        let lm =
2089            Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
2090
2091        // Build a tree with a dirty BIN.
2092        let tree = Tree::new(1, 256);
2093        tree.insert(b"alpha".to_vec(), b"v1".to_vec(), Lsn::new(1, 1)).unwrap();
2094        tree.insert(b"beta".to_vec(), b"v2".to_vec(), Lsn::new(1, 2)).unwrap();
2095
2096        let tree_arc = Arc::new(RwLock::new(tree));
2097
2098        // Snapshot dirty BINs (as the checkpointer does under tree read lock).
2099        let dirty_bins = tree_arc.read().unwrap().collect_dirty_bins(1);
2100        assert!(!dirty_bins.is_empty(), "precondition: must have dirty BINs");
2101
2102        // Simulate the evictor flushing every dirty BIN (writes a full BIN
2103        // entry to WAL and clears the dirty flag) BEFORE the checkpointer
2104        // acquires the per-node write lock.
2105        let evictor_lsn = Lsn::new(2, 0); // fake "evictor-wrote" LSN
2106        for (_db_id, bin_arc) in &dirty_bins {
2107            let mut guard = bin_arc.write();
2108            if let TreeNode::Bottom(ref mut b) = *guard {
2109                // Mark the BIN as "already flushed" by the evictor.
2110                b.clear_dirty_after_full_log(evictor_lsn);
2111            }
2112        }
2113
2114        // Now build the checkpointer and run the internal flush over the
2115        // stale snapshot (all BINs are now clean).
2116        let checkpointer = Checkpointer::new(CheckpointConfig::default())
2117            .with_log_manager(Arc::clone(&lm))
2118            .with_tree(Arc::clone(&tree_arc), 1);
2119
2120        let result = checkpointer
2121            .flush_dirty_bins_internal()
2122            .expect("flush_dirty_bins_internal failed");
2123
2124        // X-8 fix: with the guard `if !b.dirty && dirty == 0 { continue; }`,
2125        // the checkpointer must skip the already-clean BINs entirely.  No
2126        // BINDelta or full-BIN entries should be written.
2127        assert_eq!(
2128            result.delta_ins_flushed, 0,
2129            "X-8: checkpointer must not write a redundant BINDelta for a BIN the evictor already flushed"
2130        );
2131        assert_eq!(
2132            result.full_bins_flushed, 0,
2133            "X-8: checkpointer must not write a redundant full-BIN for a BIN the evictor already flushed"
2134        );
2135    }
2136
2137    /// L-5-delta: when the checkpointer logs a BIN-delta that supersedes a
2138    /// PRIOR delta (`prev_delta_lsn != NULL`), that prior delta's LSN (JE
2139    /// `auxOldLsn`) must be counted obsolete via the wired UtilizationTracker
2140    /// using the dups-allowed variant (size 0, count_as_ln = false), BEFORE
2141    /// persist_file_summaries so it lands in this checkpoint's FileSummaryLN.
2142    ///
2143    /// FAIL-PRE: the prior delta LSN was never counted obsolete — the
2144    /// superseded BIN-delta version leaked, and the cleaner under-counted the
2145    /// obsolete bytes in that file.
2146    ///
2147    /// PASS-POST: the prior delta's file/offset shows one obsolete IN
2148    /// (`obsolete_in_count == 1`) and the offset is tracked.
2149    ///
2150    /// JE: IN.java auxOldLsn -> LogManager.serialLogWork
2151    /// countObsoleteNodeDupsAllowed.
2152    #[test]
2153    fn test_l5_delta_counts_prior_delta_obsolete() {
2154        use noxu_cleaner::UtilizationTracker;
2155        use noxu_log::FileManager;
2156        use noxu_tree::tree::{Tree, TreeNode};
2157        use noxu_util::lsn::Lsn;
2158        use tempfile::TempDir;
2159
2160        let dir = TempDir::new().unwrap();
2161        let fm = Arc::new(
2162            FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
2163        );
2164        let lm =
2165            Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
2166
2167        // Build a tree with a BIN holding several keys.
2168        let tree = Tree::new(1, 256);
2169        for i in 0u16..16 {
2170            let key = format!("key{i:03}");
2171            tree.insert(
2172                key.into_bytes(),
2173                b"v".to_vec(),
2174                Lsn::new(1, i as u32 + 1),
2175            )
2176            .unwrap();
2177        }
2178        let tree_arc = Arc::new(RwLock::new(tree));
2179
2180        // Put the BIN into the "already has a full version AND a prior delta"
2181        // state, then dirty exactly ONE slot so `should_log_delta` chooses the
2182        // delta path (numDeltas=1 <= delta_limit at the default percent).  The
2183        // prior delta LSN we install here is the auxOldLsn that must be
2184        // counted obsolete when the new delta is logged.
2185        let prior_delta_lsn = Lsn::new(3, 4096);
2186        let dirty_bins = tree_arc.read().unwrap().collect_dirty_bins(1);
2187        assert!(!dirty_bins.is_empty(), "precondition: dirty BINs");
2188        for (_db, bin_arc) in &dirty_bins {
2189            let mut guard = bin_arc.write();
2190            if let TreeNode::Bottom(ref mut b) = *guard {
2191                // Mark clean first, then set the prior full + prior delta
2192                // chain and re-dirty exactly one slot.
2193                b.clear_dirty_after_full_log(Lsn::new(2, 100));
2194                b.last_delta_lsn = prior_delta_lsn;
2195                b.is_delta = true;
2196                b.prohibit_next_delta = false;
2197                b.dirty = true;
2198                if let Some(e) = b.entries.first_mut() {
2199                    e.dirty = true;
2200                }
2201            }
2202        }
2203
2204        let mut tracker = UtilizationTracker::new(true);
2205        // Seed file 3 so the obsolete counting has a summary to update.
2206        tracker.count_new_log_entry(3, 64, false, true);
2207        let tracker_arc = Arc::new(Mutex::new(tracker));
2208
2209        let checkpointer = Checkpointer::new(CheckpointConfig::default())
2210            .with_log_manager(Arc::clone(&lm))
2211            .with_tree(Arc::clone(&tree_arc), 1)
2212            .with_utilization_tracker(Arc::clone(&tracker_arc));
2213
2214        let result = checkpointer
2215            .flush_dirty_bins_internal()
2216            .expect("flush_dirty_bins_internal failed");
2217
2218        // The checkpointer must have taken the delta path (producer exists).
2219        assert_eq!(
2220            result.delta_ins_flushed, 1,
2221            "L-5-delta: checkpointer must log exactly one BIN-delta here"
2222        );
2223
2224        // PASS-POST: the prior delta (auxOldLsn) was counted obsolete in its
2225        // file (3) as an IN, size 0, offset tracked.
2226        let tracker = tracker_arc.lock();
2227        let summary = tracker
2228            .get_tracked_summary(prior_delta_lsn.file_number())
2229            .expect("file 3 summary must exist after counting the prior delta");
2230        assert_eq!(
2231            summary.get_summary().obsolete_in_count,
2232            1,
2233            "L-5-delta: the superseded prior BIN-delta must be counted obsolete \
2234             (obsolete_in_count == 1); was the auxOldLsn wiring dropped?"
2235        );
2236        assert!(
2237            summary
2238                .get_obsolete_offsets()
2239                .contains(&prior_delta_lsn.file_offset()),
2240            "L-5-delta: the prior delta's offset must be tracked obsolete"
2241        );
2242    }
2243
2244    // -----------------------------------------------------------------------
2245    // CC-4: get_eviction_provisional tests (per-tree after residual fix)
2246    // -----------------------------------------------------------------------
2247
2248    /// CC-4 acceptance test 1: Provisional::No when no checkpoint is in
2249    /// progress, regardless of db_id or node level.
2250    ///
2251    /// JE ref: coordinateEvictionWithCheckpoint — if no checkpoint is active,
2252    /// evicted nodes are logged non-provisionally.
2253    #[test]
2254    fn test_cc4_no_checkpoint_in_progress_yields_provisional_no() {
2255        let ckpt = Checkpointer::new(CheckpointConfig::default());
2256        assert_eq!(
2257            ckpt.get_eviction_provisional(1, 1),
2258            Provisional::No,
2259            "CC-4: no checkpoint in progress must yield Provisional::No"
2260        );
2261        assert_eq!(ckpt.get_eviction_provisional(1, 2), Provisional::No);
2262    }
2263
2264    /// CC-4 acceptance test 2: Provisional::Yes when a checkpoint is in
2265    /// progress and the node's level is below the tree's max flush level.
2266    ///
2267    /// JE ref: coordinateEvictionWithCheckpoint — node.level < highestFlushLevel
2268    /// (for THIS db) => Provisional::YES.
2269    #[test]
2270    fn test_cc4_below_max_flush_level_yields_provisional_yes() {
2271        let ckpt = Checkpointer::new(CheckpointConfig::default());
2272        ckpt.checkpoint_in_progress.store(true, Ordering::Release);
2273        ckpt.checkpoint_flush_levels.lock().unwrap().insert(42u64, 2i32);
2274
2275        assert_eq!(
2276            ckpt.get_eviction_provisional(42, 1),
2277            Provisional::Yes,
2278            "CC-4: BIN below tree's max_flush_level must yield Provisional::Yes"
2279        );
2280
2281        ckpt.checkpoint_in_progress.store(false, Ordering::Release);
2282        ckpt.checkpoint_flush_levels.lock().unwrap().clear();
2283    }
2284
2285    /// CC-4 acceptance test 3: Provisional::No when the node's level is at or
2286    /// above the tree's max flush level.
2287    ///
2288    /// JE ref: coordinateEvictionWithCheckpoint — node.level >= highestFlushLevel
2289    /// => Provisional::NO.
2290    #[test]
2291    fn test_cc4_at_or_above_max_flush_level_yields_provisional_no() {
2292        let ckpt = Checkpointer::new(CheckpointConfig::default());
2293        ckpt.checkpoint_in_progress.store(true, Ordering::Release);
2294        ckpt.checkpoint_flush_levels.lock().unwrap().insert(42u64, 2i32);
2295
2296        assert_eq!(
2297            ckpt.get_eviction_provisional(42, 2),
2298            Provisional::No,
2299            "CC-4: node at max_flush_level must yield Provisional::No"
2300        );
2301        assert_eq!(
2302            ckpt.get_eviction_provisional(42, 3),
2303            Provisional::No,
2304            "CC-4: node above max_flush_level must yield Provisional::No"
2305        );
2306
2307        ckpt.checkpoint_in_progress.store(false, Ordering::Release);
2308        ckpt.checkpoint_flush_levels.lock().unwrap().clear();
2309    }
2310
2311    /// CC-4 residual acceptance test: a BIN from tree A (no dirty upper INs)
2312    /// must NOT be logged Provisional::Yes even when tree B has dirty upper INs
2313    /// at a higher level.  This is the exact scenario that caused data loss
2314    /// with the old global `AtomicI32`.
2315    ///
2316    /// Fail-pre: on `origin/main` (global level) `get_eviction_provisional(DB_A, 1)`
2317    /// returned `Provisional::Yes` — a lie, no covering ancestor was written for
2318    /// tree A.  Pass-post: per-tree lookup returns `Provisional::No` for tree A.
2319    ///
2320    /// JE ref: DirtyINMap.getHighestFlushLevel returns IN.MIN_LEVEL (0) for a
2321    /// DatabaseImpl absent from highestFlushLevels → comparison false → NO.
2322    #[test]
2323    fn test_cc4_residual_tree_a_no_upper_ins_yields_provisional_no() {
2324        const DB_A: u64 = 1; // only BINs dirty; no dirty upper INs
2325        const DB_B: u64 = 2; // has dirty upper INs at level 2
2326
2327        let ckpt = Checkpointer::new(CheckpointConfig::default());
2328        ckpt.checkpoint_in_progress.store(true, Ordering::Release);
2329
2330        // Only tree B gets an entry in the per-tree flush levels map.
2331        ckpt.checkpoint_flush_levels.lock().unwrap().insert(DB_B, 2i32);
2332
2333        // Tree A's BIN (level 1) must be non-provisional: no covering ancestor.
2334        assert_eq!(
2335            ckpt.get_eviction_provisional(DB_A, 1),
2336            Provisional::No,
2337            "CC-4 residual: tree A has no dirty upper INs; BIN must be \
2338             Provisional::No (not covered by any ancestor in this checkpoint)"
2339        );
2340
2341        // Tree B's BIN (level 1) may be provisional: its level-2 ancestor will
2342        // be written non-provisionally.
2343        assert_eq!(
2344            ckpt.get_eviction_provisional(DB_B, 1),
2345            Provisional::Yes,
2346            "CC-4: tree B has a dirty upper IN at level 2; BIN must be Provisional::Yes"
2347        );
2348
2349        ckpt.checkpoint_in_progress.store(false, Ordering::Release);
2350        ckpt.checkpoint_flush_levels.lock().unwrap().clear();
2351    }
2352
2353    /// CC-4: CheckpointGuard clears the flush_levels map on drop.
2354    #[test]
2355    fn test_cc4_guard_resets_max_flush_level() {
2356        let flag = AtomicBool::new(true);
2357        let levels: std::sync::Mutex<HashMap<u64, i32>> =
2358            std::sync::Mutex::new(HashMap::from([(7u64, 5i32)]));
2359        {
2360            let _guard = CheckpointGuard { flag: &flag, flush_levels: &levels };
2361        }
2362        assert!(levels.lock().unwrap().is_empty(), "guard must clear map");
2363        assert!(!flag.load(Ordering::Acquire));
2364    }
2365
2366    // -----------------------------------------------------------------------
2367    // REC-D: the configured bytes-interval must reach the runnable gate
2368    // (not the hardcoded 10 MiB default).
2369    // -----------------------------------------------------------------------
2370
2371    /// REC-D fail-pre/pass-post: a Checkpointer built with a configured
2372    /// bytes-interval must use THAT value in `is_runnable`, not the hardcoded
2373    /// 10 MiB.  JE Checkpointer ctor:
2374    /// `logSizeBytesInterval = configManager.getLong(CHECKPOINTER_BYTES_INTERVAL)`
2375    /// and `isRunnable` compares the bytes-since-checkpoint against it.
2376    ///
2377    /// Fail-pre: before REC-D the env wired only `CheckpointConfig.bytes_interval`
2378    /// (a field `is_runnable` never reads) while the gate used the hardcoded
2379    /// `checkpoint_bytes_interval = 10 MiB`.  A 1 KiB configured interval would
2380    /// NOT trip the gate at 1 KiB of writes.  Pass-post: `with_bytes_interval`
2381    /// threads the configured value into the gate.
2382    #[test]
2383    fn test_rec_d_configured_bytes_interval_drives_runnable() {
2384        // Configure a 1 KiB interval (far below the old 10 MiB default).
2385        let cp = Checkpointer::new(CheckpointConfig::default())
2386            .with_bytes_interval(1024);
2387
2388        // Just below the configured interval: not runnable.
2389        cp.note_bytes_for_test(1000);
2390        assert!(
2391            !cp.is_runnable(false),
2392            "REC-D: 1000 bytes < configured 1 KiB interval must not be runnable"
2393        );
2394
2395        // Cross the configured interval: runnable.  (With the old hardcoded
2396        // 10 MiB default this would stay false until 10 MiB of writes.)
2397        cp.note_bytes_for_test(100);
2398        assert!(
2399            cp.is_runnable(false),
2400            "REC-D: crossing the configured 1 KiB interval must be runnable; \
2401             the gate must use the configured interval, not 10 MiB"
2402        );
2403    }
2404
2405    // -----------------------------------------------------------------------
2406    // REC-F: an idle environment with cleaner-pending files must trigger a
2407    // checkpoint (JE wakeupAfterNoWrites / needCheckpointForCleanedFiles).
2408    // -----------------------------------------------------------------------
2409
2410    /// REC-F fail-pre/pass-post: with no bytes written since the last
2411    /// checkpoint, `is_runnable(false)` must still return true when the
2412    /// cleaner reports files pending reclaim (CLEANED set non-empty).
2413    ///
2414    /// JE `Checkpointer.isRunnable`:
2415    /// `if (wakeupAfterNoWrites && needCheckpointForCleanedFiles()) return true;`
2416    /// where `needCheckpointForCleanedFiles()` →
2417    /// `cleaner.getFileSelector().isCheckpointNeeded()`.
2418    ///
2419    /// Fail-pre: before REC-F `is_runnable` consulted only bytes; an idle env
2420    /// with cleaned-but-unreclaimed files returned false, so reclamation
2421    /// stalled until the next write-driven checkpoint.
2422    #[test]
2423    fn test_rec_f_idle_env_with_cleaner_pending_is_runnable() {
2424        use noxu_cleaner::Cleaner;
2425        use std::sync::Arc;
2426
2427        let cleaner = Arc::new(Cleaner::new(50, 1, 0));
2428
2429        let cp = Checkpointer::new(CheckpointConfig::default())
2430            .with_bytes_interval(1024)
2431            .with_cleaner(Arc::clone(&cleaner));
2432
2433        // Idle environment: nothing written since the last checkpoint, no
2434        // cleaned files yet.
2435        assert!(
2436            !cp.is_runnable(false),
2437            "REC-F precondition: idle env with no pending files must not be runnable"
2438        );
2439
2440        // Simulate the cleaner cleaning a file: it moves to the CLEANED state
2441        // (cleaned-but-not-checkpointed).  A checkpoint is now needed to
2442        // advance the deletion barrier.
2443        {
2444            let mut selector = cleaner.get_file_selector().lock();
2445            selector.add_file_to_clean(7);
2446            selector.mark_file_cleaned(7);
2447        }
2448
2449        assert!(
2450            cleaner.is_checkpoint_needed(),
2451            "REC-F: cleaner must report a checkpoint is needed for the CLEANED file"
2452        );
2453        // Still no bytes written, but the idle-reclaim trigger fires.
2454        assert!(
2455            cp.is_runnable(false),
2456            "REC-F: idle env with cleaner-pending files must be runnable \
2457             (JE wakeupAfterNoWrites / needCheckpointForCleanedFiles)"
2458        );
2459    }
2460
2461    // -----------------------------------------------------------------------
2462    // REC-G: init_intervals seeds the interval baselines from a recovered
2463    // checkpoint (JE Checkpointer.initIntervals).
2464    // -----------------------------------------------------------------------
2465
2466    /// REC-G fail-pre/pass-post: after recovery the checkpointer's interval
2467    /// baselines must equal the recovered CkptEnd LSNs, not NULL_LSN.
2468    ///
2469    /// Fail-pre: a freshly-constructed Checkpointer has
2470    /// `last_checkpoint_start`/`_end` == NULL_LSN, so the first post-recovery
2471    /// interval is measured from process start.  Pass-post: `init_intervals`
2472    /// seeds them from the recovered CkptEnd.
2473    ///
2474    /// JE ref: `Checkpointer.initIntervals(lastCheckpointStart,
2475    /// lastCheckpointEnd, lastCheckpointMillis)`.
2476    #[test]
2477    fn test_rec_g_init_intervals_seeds_baselines() {
2478        let cp = Checkpointer::new(CheckpointConfig::default());
2479        // Fail-pre baseline: fresh checkpointer starts at NULL_LSN.
2480        assert_eq!(cp.get_last_checkpoint_start(), noxu_util::NULL_LSN);
2481        assert_eq!(cp.get_last_checkpoint_end(), noxu_util::NULL_LSN);
2482
2483        // Simulate recovery surfacing a CkptEnd at (start=4:400, end=5:500).
2484        let recovered_start = Lsn::new(4, 400);
2485        let recovered_end = Lsn::new(5, 500);
2486        // Pretend the env wrote some pre-crash bytes before recovery.
2487        cp.note_bytes_for_test(9999);
2488
2489        cp.init_intervals(recovered_start, recovered_end);
2490
2491        assert_eq!(
2492            cp.get_last_checkpoint_start(),
2493            recovered_start,
2494            "REC-G: baseline start must equal recovered CkptEnd start"
2495        );
2496        assert_eq!(
2497            cp.get_last_checkpoint_end(),
2498            recovered_end,
2499            "REC-G: baseline end must equal recovered CkptEnd end"
2500        );
2501        // The byte accumulator is reset so pre-crash volume does not
2502        // immediately trip the runnable gate.
2503        assert!(
2504            !cp.is_runnable(false),
2505            "REC-G: byte accumulator must reset on init_intervals"
2506        );
2507    }
2508
2509    // -----------------------------------------------------------------------
2510    // REC-H: set_checkpoint_id continues the sequence after recovery
2511    // (JE Checkpointer.setCheckpointId).
2512    // -----------------------------------------------------------------------
2513
2514    /// REC-H fail-pre/pass-post: after recovery the next checkpoint ID must
2515    /// continue from the recovered CkptEnd id, not restart at 1.
2516    ///
2517    /// Fail-pre: a fresh Checkpointer's first checkpoint id is 1, colliding
2518    /// with pre-crash ids.  Pass-post: `set_checkpoint_id(recovered_id)` makes
2519    /// the next emitted id `recovered_id + 1`.
2520    ///
2521    /// JE ref: `Checkpointer.setCheckpointId(lastCheckpointId)`.
2522    #[test]
2523    fn test_rec_h_set_checkpoint_id_continues_sequence() {
2524        let cp = Checkpointer::new(CheckpointConfig::default());
2525        // Fail-pre: a fresh checkpointer would issue id 1.
2526        assert_eq!(cp.peek_next_checkpoint_id(), 1);
2527
2528        // Recovery found a CkptEnd with id 42.
2529        cp.set_checkpoint_id(42);
2530        assert_eq!(
2531            cp.peek_next_checkpoint_id(),
2532            43,
2533            "REC-H: next checkpoint id must be recovered_id + 1"
2534        );
2535
2536        // The next checkpoint must use 43, not 1.
2537        let result = cp.do_checkpoint("post_recovery").unwrap();
2538        assert_eq!(
2539            result.checkpoint_id, 43,
2540            "REC-H: post-recovery checkpoint id must continue the sequence"
2541        );
2542    }
2543
2544    // -----------------------------------------------------------------------
2545    // REC-AA: the highest-flush-level is max(dirty-upper-IN-level) + 1,
2546    // bounded by the root level (JE DirtyINMap.updateFlushLevels).
2547    // -----------------------------------------------------------------------
2548
2549    /// REC-AA fail-pre/pass-post: the per-tree highest flush level recorded
2550    /// for eviction coordination must be `max(dirty-upper-IN-level) + 1`
2551    /// (bounded by the root level), so a BIN evicted during the checkpoint is
2552    /// logged `Provisional::Yes` (covered by a non-provisional ancestor).
2553    ///
2554    /// Fail-pre: before REC-AA `collect_dirty_upper_ins` returned a
2555    /// root-relative depth (root=0) instead of the node's tree level, so the
2556    /// flush-levels map held tiny depths (1, 2) while the evictor compared the
2557    /// BIN's real `BIN_LEVEL` (`MAIN_LEVEL|1`); `BIN_LEVEL < 2` is always false
2558    /// → every BIN was logged `Provisional::No`, and the JE `+1` adjustment
2559    /// was absent entirely.
2560    ///
2561    /// Pass-post: levels are real tree levels, the recorded flush level is
2562    /// `max_dirty_upper_in_level + 1` bounded by the root, and a BIN at
2563    /// `BIN_LEVEL` gets `Provisional::Yes`.
2564    ///
2565    /// JE ref: `DirtyINMap.updateFlushLevels` (`(ckptFlushExtraLevel || isBIN)
2566    /// && !isRoot` → `level += 1`) / `Checkpointer.flushDirtyNodes`.
2567    #[test]
2568    fn test_rec_aa_flush_level_is_max_dirty_plus_one() {
2569        use noxu_log::FileManager;
2570        use noxu_tree::tree::{BIN_LEVEL, Tree};
2571        use noxu_util::lsn::Lsn;
2572        use tempfile::TempDir;
2573
2574        let dir = TempDir::new().unwrap();
2575        let fm = Arc::new(
2576            FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
2577        );
2578        let lm =
2579            Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
2580
2581        // Fanout 4 + 20 inserts forces root splits → a 3-level tree
2582        // (root at MAIN_LEVEL|3, upper INs at |2, BINs at BIN_LEVEL=|1), with
2583        // dirty upper INs from the splits.
2584        let tree = Tree::new(1, 4);
2585        for i in 0u32..20 {
2586            let key = format!("key{:04}", i).into_bytes();
2587            let data = format!("data{}", i).into_bytes();
2588            tree.insert(key, data, Lsn::new(1, 100 + i)).unwrap();
2589        }
2590        let root_level = tree.get_root().unwrap().read().level();
2591        let dirty_uppers = tree.collect_dirty_upper_ins(1);
2592        assert!(
2593            !dirty_uppers.is_empty(),
2594            "precondition: the split tree must have dirty upper INs"
2595        );
2596        let max_dirty = dirty_uppers.iter().map(|(l, _)| *l).max().unwrap();
2597        // Levels must be real tree levels, not depths (REC-AA fail-pre would
2598        // have tiny depths here).
2599        assert!(
2600            max_dirty >= (noxu_tree::MAIN_LEVEL | 2),
2601            "upper-IN levels must be real tree levels (>= MAIN_LEVEL|2), got {max_dirty}"
2602        );
2603
2604        let tree_arc = Arc::new(RwLock::new(tree));
2605        let cp = Checkpointer::new(CheckpointConfig::default())
2606            .with_log_manager(Arc::clone(&lm))
2607            .with_tree(Arc::clone(&tree_arc), 1);
2608
2609        // Mark a checkpoint in progress and run the upper-IN flush, which
2610        // populates checkpoint_flush_levels with the REC-AA value.
2611        cp.checkpoint_in_progress.store(true, Ordering::Release);
2612        cp.flush_upper_ins_internal().unwrap();
2613
2614        let recorded = cp
2615            .checkpoint_flush_levels
2616            .lock()
2617            .unwrap()
2618            .get(&1u64)
2619            .copied()
2620            .expect("db 1 must have a recorded flush level");
2621
2622        let expected = (max_dirty + 1).min(root_level);
2623        assert_eq!(
2624            recorded, expected,
2625            "REC-AA: flush level must be max(dirty-upper-IN-level)+1 bounded by root"
2626        );
2627
2628        // A BIN at BIN_LEVEL must be covered (Provisional::Yes): the recorded
2629        // flush level is strictly above it.
2630        assert!(
2631            BIN_LEVEL < recorded,
2632            "BIN_LEVEL ({BIN_LEVEL}) must be < recorded flush level ({recorded})"
2633        );
2634        assert_eq!(
2635            cp.get_eviction_provisional(1, BIN_LEVEL),
2636            Provisional::Yes,
2637            "REC-AA: a BIN below the flush level must be Provisional::Yes"
2638        );
2639
2640        cp.checkpoint_in_progress.store(false, Ordering::Release);
2641        cp.checkpoint_flush_levels.lock().unwrap().clear();
2642    }
2643}