Skip to main content

nedb_engine/
exit.rs

1//! Durable-mode auto-flush-on-exit.
2//!
3//! A durable [`Db`] stages id-index updates in an in-memory WAL (`write_buf`) and
4//! only makes them durable in `flush_all()` — normally driven by the manifest
5//! ticker or by `Drop`. But `Drop` "only fires once every owning handle is gone",
6//! and a ticker thread (or a server that blocks forever in `serve()`) holds an
7//! `Arc<Db>` for the whole process lifetime — so on a hard exit (`Ctrl+C`,
8//! `SIGTERM` from an orchestrator, `kill`) `Drop` NEVER runs and the writes staged
9//! since the last tick are lost. That is the gap this module closes.
10//!
11//! [`Db::install_exit_flush`] registers a durable database to be flushed when the
12//! process receives `SIGINT` or `SIGTERM`. It flushes exactly once, on the way
13//! out — NOT on every put — so the hot write path stays hot.
14//!
15//! # Design
16//!
17//! - **Opt-in.** A *library* that unilaterally seizes signal handlers would
18//!   trample a host application's own shutdown logic, so the core never installs
19//!   this implicitly. The napi (`nedb-node`) and pyo3 (`nedb-py`) `open()` paths
20//!   call it for durable databases — so Node and Python embedders get
21//!   flush-on-exit for free — and Rust applications call it explicitly. `nedbd`
22//!   keeps its own tokio graceful-shutdown handler and does not use this.
23//!
24//! - **Async-signal-safe.** The installed handler does exactly one thing: a
25//!   non-blocking `write(2)` of the signal number to a self-pipe (`write` is on
26//!   the POSIX async-signal-safe list). A dedicated reader thread blocks on the
27//!   read end; when woken it runs `flush_all()` on every registered database from
28//!   a *normal* thread context (locks, allocation and file I/O are all safe
29//!   there), restores the signal's default disposition, and re-raises it so the
30//!   process terminates with the correct `128 + signum` status.
31//!
32//! - **Idempotent.** The handler and reader thread are installed once per
33//!   process; each subsequent call just registers another database. In-memory
34//!   (`:memory:`) databases are ignored — there is nothing to flush.
35//!
36//! - **Weak references.** The registry holds `Weak<Db>`, so a registered database
37//!   that is otherwise dropped is not kept alive (no leak) and is pruned on the
38//!   next flush pass.
39
40use std::sync::{Arc, Mutex, OnceLock, Weak};
41#[cfg(unix)]
42use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
43
44use crate::db::Db;
45
46/// Registered durable databases to flush on exit. Weak so we never keep a `Db`
47/// alive past its owner.
48static REGISTRY: OnceLock<Mutex<Vec<Weak<Db>>>> = OnceLock::new();
49
50fn registry() -> &'static Mutex<Vec<Weak<Db>>> {
51    REGISTRY.get_or_init(|| Mutex::new(Vec::new()))
52}
53
54/// Flush every still-live registered database. Runs on the reader thread (normal
55/// context) — safe to take locks and do I/O. Prunes dead weak refs as it goes.
56fn flush_all_registered() {
57    if let Some(reg) = REGISTRY.get() {
58        let mut guard = match reg.lock() {
59            Ok(g) => g,
60            Err(poisoned) => poisoned.into_inner(), // a panicked writer must not stop the flush
61        };
62        guard.retain(|w| w.strong_count() > 0);
63        for w in guard.iter() {
64            if let Some(db) = w.upgrade() {
65                db.flush_all();
66            }
67        }
68    }
69}
70
71impl Db {
72    /// Flush this durable database's buffered state on `SIGINT`/`SIGTERM`
73    /// (`Ctrl+C`, `kill`, orchestrator shutdown) — the flush-on-close contract
74    /// extended to hard exits that never run `Drop`.
75    ///
76    /// Call once, after the database is wrapped in an `Arc` (the registry holds a
77    /// `Weak`, so this never keeps the `Db` alive). Idempotent; safe to call from
78    /// multiple databases. A no-op for in-memory (`:memory:`) databases.
79    ///
80    /// ```no_run
81    /// # use std::sync::Arc;
82    /// # use nedb_engine::Db;
83    /// let db = Arc::new(Db::open(std::path::Path::new("/data/mydb"), None)?);
84    /// Db::install_exit_flush(Arc::clone(&db));   // durable across Ctrl+C / SIGTERM
85    /// # Ok::<(), anyhow::Error>(())
86    /// ```
87    pub fn install_exit_flush(self_arc: Arc<Db>) {
88        // Nothing to flush for an in-memory database.
89        if self_arc.root == std::path::PathBuf::from(":memory:") {
90            return;
91        }
92        // Register (dedup by pointer identity so repeated calls don't stack).
93        {
94            let mut reg = match registry().lock() {
95                Ok(g) => g,
96                Err(poisoned) => poisoned.into_inner(),
97            };
98            let already = reg
99                .iter()
100                .any(|w| w.upgrade().is_some_and(|a| Arc::ptr_eq(&a, &self_arc)));
101            if !already {
102                reg.push(Arc::downgrade(&self_arc));
103            }
104        }
105        install_signal_handler_once();
106    }
107}
108
109// ── Unix: self-pipe + sigaction, dependency-free (libc). ─────────────────────
110
111#[cfg(unix)]
112static INSTALLED: AtomicBool = AtomicBool::new(false);
113/// Write end of the self-pipe, read by the signal handler. `-1` until installed.
114#[cfg(unix)]
115static PIPE_WRITE_FD: AtomicI32 = AtomicI32::new(-1);
116
117/// The signal handler. MUST be async-signal-safe: it does nothing but write the
118/// signal number to the self-pipe (non-blocking, so it can never stall the
119/// interrupted thread). All real work happens on the reader thread.
120#[cfg(unix)]
121extern "C" fn handler(sig: libc::c_int) {
122    let fd = PIPE_WRITE_FD.load(Ordering::SeqCst);
123    if fd >= 0 {
124        let byte = [sig as u8];
125        // write(2) is async-signal-safe; ignore the result (EAGAIN if the pipe is
126        // already full means a signal is already pending — which is all we need).
127        unsafe {
128            let _ = libc::write(fd, byte.as_ptr() as *const libc::c_void, 1);
129        }
130    }
131}
132
133#[cfg(unix)]
134fn install_signal_handler_once() {
135    // Exactly one handler + reader thread per process.
136    if INSTALLED.swap(true, Ordering::SeqCst) {
137        return;
138    }
139    unsafe {
140        // Self-pipe. write end non-blocking so the handler never blocks.
141        let mut fds = [0i32; 2];
142        if libc::pipe(fds.as_mut_ptr()) != 0 {
143            INSTALLED.store(false, Ordering::SeqCst); // let a later call retry
144            return;
145        }
146        let (read_fd, write_fd) = (fds[0], fds[1]);
147        let flags = libc::fcntl(write_fd, libc::F_GETFL);
148        if flags != -1 {
149            libc::fcntl(write_fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
150        }
151        PIPE_WRITE_FD.store(write_fd, Ordering::SeqCst);
152
153        // Install the handler for SIGINT (Ctrl+C) and SIGTERM (kill / orchestrator).
154        let mut sa: libc::sigaction = std::mem::zeroed();
155        sa.sa_sigaction = handler as extern "C" fn(libc::c_int) as libc::sighandler_t;
156        libc::sigemptyset(&mut sa.sa_mask);
157        sa.sa_flags = libc::SA_RESTART;
158        libc::sigaction(libc::SIGINT, &sa, std::ptr::null_mut());
159        libc::sigaction(libc::SIGTERM, &sa, std::ptr::null_mut());
160
161        // Reader thread: block on the pipe, flush, restore default, re-raise.
162        std::thread::Builder::new()
163            .name("nedb-exit-flush".into())
164            .spawn(move || {
165                reader_loop(read_fd);
166            })
167            .ok();
168    }
169}
170
171/// Block on the self-pipe. On the first signal: flush every registered database,
172/// restore that signal's default disposition, and re-raise so the process
173/// terminates with the correct status. Never returns.
174#[cfg(unix)]
175fn reader_loop(read_fd: i32) -> ! {
176    let mut buf = [0u8; 1];
177    loop {
178        let n = unsafe { libc::read(read_fd, buf.as_mut_ptr() as *mut libc::c_void, 1) };
179        if n <= 0 {
180            continue; // EINTR / spurious wakeup — keep waiting
181        }
182        let sig = buf[0] as libc::c_int;
183
184        flush_all_registered();
185
186        // Restore default disposition and re-raise: preserves 128+signum exit
187        // status and lets the OS terminate us the way the sender intended.
188        unsafe {
189            let mut sa: libc::sigaction = std::mem::zeroed();
190            sa.sa_sigaction = libc::SIG_DFL;
191            libc::sigemptyset(&mut sa.sa_mask);
192            libc::sigaction(sig, &sa, std::ptr::null_mut());
193            libc::raise(sig);
194        }
195        // If raise somehow returns, fall back to a clean exit after flushing.
196        std::process::exit(128 + sig);
197    }
198}
199
200// ── Non-Unix: no POSIX signals. Durability on exit relies on `Drop` / an
201//    explicit `flush()`. Documented, honest no-op. ────────────────────────────
202
203#[cfg(not(unix))]
204fn install_signal_handler_once() {
205    // Windows has no SIGTERM; SIGINT semantics differ. Embedders on non-Unix
206    // should flush explicitly on shutdown (or rely on `Drop` for short-lived
207    // handles). Left as a no-op rather than pretending to install a handler.
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213
214    /// In-memory databases have nothing to flush → must never be registered.
215    /// Checks this specific handle (not the registry length), so it stays correct
216    /// under parallel test runs that share the process-global registry.
217    #[test]
218    fn in_memory_is_not_registered() {
219        let db = Arc::new(Db::in_memory());
220        Db::install_exit_flush(Arc::clone(&db));
221        let found = registry()
222            .lock()
223            .unwrap()
224            .iter()
225            .any(|w| w.upgrade().is_some_and(|a| Arc::ptr_eq(&a, &db)));
226        assert!(!found, ":memory: db must not be registered");
227    }
228
229    /// Registering the same durable db twice adds exactly one registry entry.
230    #[test]
231    fn durable_registration_is_idempotent() {
232        let dir = tempfile::tempdir().unwrap();
233        let db = Arc::new(Db::open(dir.path(), None).unwrap());
234        let count = || {
235            registry()
236                .lock()
237                .unwrap()
238                .iter()
239                .filter(|w| w.upgrade().is_some_and(|a| Arc::ptr_eq(&a, &db)))
240                .count()
241        };
242        Db::install_exit_flush(Arc::clone(&db));
243        assert_eq!(count(), 1, "first install registers exactly once");
244        Db::install_exit_flush(Arc::clone(&db));
245        assert_eq!(count(), 1, "second install does not duplicate");
246    }
247
248    /// The flush path the reader thread runs makes staged writes durable: write,
249    /// flush via the registry, reopen from disk, and the doc is present. Exercises
250    /// everything the signal path does except the raise-and-die tail (which cannot
251    /// be asserted in-process — see tests/exit_flush_signal.rs for the
252    /// child-process end-to-end proof).
253    #[test]
254    fn registered_flush_makes_writes_durable_on_reopen() {
255        let dir = tempfile::tempdir().unwrap();
256        let path = dir.path().to_path_buf();
257        {
258            let db = Arc::new(Db::open(&path, None).unwrap());
259            Db::install_exit_flush(Arc::clone(&db));
260            db.put("k", "v1", serde_json::json!({ "n": 1 }), vec![], None, None)
261                .unwrap();
262            flush_all_registered(); // what the reader thread does on SIGTERM (no raise)
263        }
264        let reopened = Db::open(&path, None).unwrap();
265        let got = reopened.get("k", "v1");
266        assert!(got.is_some(), "write must survive flush + reopen");
267        assert_eq!(got.unwrap().data.get("n").and_then(|v| v.as_i64()), Some(1));
268    }
269}