nedb_engine/exit.rs
1//! Durable-mode auto-flush-on-exit.
2//!
3//! A durable [`Db`] stages id-index updates in an in-memory WAL (`write_buf`) and
4//! only makes them durable in `flush_all()` — normally driven by the manifest
5//! ticker or by `Drop`. But `Drop` "only fires once every owning handle is gone",
6//! and a ticker thread (or a server that blocks forever in `serve()`) holds an
7//! `Arc<Db>` for the whole process lifetime — so on a hard exit (`Ctrl+C`,
8//! `SIGTERM` from an orchestrator, `kill`) `Drop` NEVER runs and the writes staged
9//! since the last tick are lost. That is the gap this module closes.
10//!
11//! [`Db::install_exit_flush`] registers a durable database to be flushed when the
12//! process receives `SIGINT` or `SIGTERM`. It flushes exactly once, on the way
13//! out — NOT on every put — so the hot write path stays hot.
14//!
15//! # Design
16//!
17//! - **Opt-in.** A *library* that unilaterally seizes signal handlers would
18//! trample a host application's own shutdown logic, so the core never installs
19//! this implicitly. The napi (`nedb-node`) and pyo3 (`nedb-py`) `open()` paths
20//! call it for durable databases — so Node and Python embedders get
21//! flush-on-exit for free — and Rust applications call it explicitly. `nedbd`
22//! keeps its own tokio graceful-shutdown handler and does not use this.
23//!
24//! - **Async-signal-safe.** The installed handler does exactly one thing: a
25//! non-blocking `write(2)` of the signal number to a self-pipe (`write` is on
26//! the POSIX async-signal-safe list). A dedicated reader thread blocks on the
27//! read end; when woken it runs `flush_all()` on every registered database from
28//! a *normal* thread context (locks, allocation and file I/O are all safe
29//! there), restores the signal's default disposition, and re-raises it so the
30//! process terminates with the correct `128 + signum` status.
31//!
32//! - **Idempotent.** The handler and reader thread are installed once per
33//! process; each subsequent call just registers another database. In-memory
34//! (`:memory:`) databases are ignored — there is nothing to flush.
35//!
36//! - **Weak references.** The registry holds `Weak<Db>`, so a registered database
37//! that is otherwise dropped is not kept alive (no leak) and is pruned on the
38//! next flush pass.
39
40use std::sync::{Arc, Mutex, OnceLock, Weak};
41#[cfg(unix)]
42use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
43
44use crate::db::Db;
45
46/// Registered durable databases to flush on exit. Weak so we never keep a `Db`
47/// alive past its owner.
48static REGISTRY: OnceLock<Mutex<Vec<Weak<Db>>>> = OnceLock::new();
49
50fn registry() -> &'static Mutex<Vec<Weak<Db>>> {
51 REGISTRY.get_or_init(|| Mutex::new(Vec::new()))
52}
53
54/// Flush every still-live registered database. Runs on the reader thread (normal
55/// context) — safe to take locks and do I/O. Prunes dead weak refs as it goes.
56fn flush_all_registered() {
57 if let Some(reg) = REGISTRY.get() {
58 let mut guard = match reg.lock() {
59 Ok(g) => g,
60 Err(poisoned) => poisoned.into_inner(), // a panicked writer must not stop the flush
61 };
62 guard.retain(|w| w.strong_count() > 0);
63 for w in guard.iter() {
64 if let Some(db) = w.upgrade() {
65 db.flush_all();
66 }
67 }
68 }
69}
70
71impl Db {
72 /// Flush this durable database's buffered state on `SIGINT`/`SIGTERM`
73 /// (`Ctrl+C`, `kill`, orchestrator shutdown) — the flush-on-close contract
74 /// extended to hard exits that never run `Drop`.
75 ///
76 /// Call once, after the database is wrapped in an `Arc` (the registry holds a
77 /// `Weak`, so this never keeps the `Db` alive). Idempotent; safe to call from
78 /// multiple databases. A no-op for in-memory (`:memory:`) databases.
79 ///
80 /// ```no_run
81 /// # use std::sync::Arc;
82 /// # use nedb_engine::Db;
83 /// let db = Arc::new(Db::open(std::path::Path::new("/data/mydb"), None)?);
84 /// Db::install_exit_flush(Arc::clone(&db)); // durable across Ctrl+C / SIGTERM
85 /// # Ok::<(), anyhow::Error>(())
86 /// ```
87 pub fn install_exit_flush(self_arc: Arc<Db>) {
88 // Nothing to flush for an in-memory database.
89 if self_arc.root == std::path::PathBuf::from(":memory:") {
90 return;
91 }
92 // Register (dedup by pointer identity so repeated calls don't stack).
93 {
94 let mut reg = match registry().lock() {
95 Ok(g) => g,
96 Err(poisoned) => poisoned.into_inner(),
97 };
98 let already = reg
99 .iter()
100 .any(|w| w.upgrade().is_some_and(|a| Arc::ptr_eq(&a, &self_arc)));
101 if !already {
102 reg.push(Arc::downgrade(&self_arc));
103 }
104 }
105 install_signal_handler_once();
106 }
107}
108
109// ── Unix: self-pipe + sigaction, dependency-free (libc). ─────────────────────
110
111#[cfg(unix)]
112static INSTALLED: AtomicBool = AtomicBool::new(false);
113/// Write end of the self-pipe, read by the signal handler. `-1` until installed.
114#[cfg(unix)]
115static PIPE_WRITE_FD: AtomicI32 = AtomicI32::new(-1);
116
117/// The signal handler. MUST be async-signal-safe: it does nothing but write the
118/// signal number to the self-pipe (non-blocking, so it can never stall the
119/// interrupted thread). All real work happens on the reader thread.
120#[cfg(unix)]
121extern "C" fn handler(sig: libc::c_int) {
122 let fd = PIPE_WRITE_FD.load(Ordering::SeqCst);
123 if fd >= 0 {
124 let byte = [sig as u8];
125 // write(2) is async-signal-safe; ignore the result (EAGAIN if the pipe is
126 // already full means a signal is already pending — which is all we need).
127 unsafe {
128 let _ = libc::write(fd, byte.as_ptr() as *const libc::c_void, 1);
129 }
130 }
131}
132
133#[cfg(unix)]
134fn install_signal_handler_once() {
135 // Exactly one handler + reader thread per process.
136 if INSTALLED.swap(true, Ordering::SeqCst) {
137 return;
138 }
139 unsafe {
140 // Self-pipe. write end non-blocking so the handler never blocks.
141 let mut fds = [0i32; 2];
142 if libc::pipe(fds.as_mut_ptr()) != 0 {
143 INSTALLED.store(false, Ordering::SeqCst); // let a later call retry
144 return;
145 }
146 let (read_fd, write_fd) = (fds[0], fds[1]);
147 let flags = libc::fcntl(write_fd, libc::F_GETFL);
148 if flags != -1 {
149 libc::fcntl(write_fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
150 }
151 PIPE_WRITE_FD.store(write_fd, Ordering::SeqCst);
152
153 // Install the handler for SIGINT (Ctrl+C) and SIGTERM (kill / orchestrator).
154 let mut sa: libc::sigaction = std::mem::zeroed();
155 sa.sa_sigaction = handler as extern "C" fn(libc::c_int) as libc::sighandler_t;
156 libc::sigemptyset(&mut sa.sa_mask);
157 sa.sa_flags = libc::SA_RESTART;
158 libc::sigaction(libc::SIGINT, &sa, std::ptr::null_mut());
159 libc::sigaction(libc::SIGTERM, &sa, std::ptr::null_mut());
160
161 // Reader thread: block on the pipe, flush, restore default, re-raise.
162 std::thread::Builder::new()
163 .name("nedb-exit-flush".into())
164 .spawn(move || {
165 reader_loop(read_fd);
166 })
167 .ok();
168 }
169}
170
171/// Block on the self-pipe. On the first signal: flush every registered database,
172/// restore that signal's default disposition, and re-raise so the process
173/// terminates with the correct status. Never returns.
174#[cfg(unix)]
175fn reader_loop(read_fd: i32) -> ! {
176 let mut buf = [0u8; 1];
177 loop {
178 let n = unsafe { libc::read(read_fd, buf.as_mut_ptr() as *mut libc::c_void, 1) };
179 if n <= 0 {
180 continue; // EINTR / spurious wakeup — keep waiting
181 }
182 let sig = buf[0] as libc::c_int;
183
184 flush_all_registered();
185
186 // Restore default disposition and re-raise: preserves 128+signum exit
187 // status and lets the OS terminate us the way the sender intended.
188 unsafe {
189 let mut sa: libc::sigaction = std::mem::zeroed();
190 sa.sa_sigaction = libc::SIG_DFL;
191 libc::sigemptyset(&mut sa.sa_mask);
192 libc::sigaction(sig, &sa, std::ptr::null_mut());
193 libc::raise(sig);
194 }
195 // If raise somehow returns, fall back to a clean exit after flushing.
196 std::process::exit(128 + sig);
197 }
198}
199
200// ── Non-Unix: no POSIX signals. Durability on exit relies on `Drop` / an
201// explicit `flush()`. Documented, honest no-op. ────────────────────────────
202
203#[cfg(not(unix))]
204fn install_signal_handler_once() {
205 // Windows has no SIGTERM; SIGINT semantics differ. Embedders on non-Unix
206 // should flush explicitly on shutdown (or rely on `Drop` for short-lived
207 // handles). Left as a no-op rather than pretending to install a handler.
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213
214 /// In-memory databases have nothing to flush → must never be registered.
215 /// Checks this specific handle (not the registry length), so it stays correct
216 /// under parallel test runs that share the process-global registry.
217 #[test]
218 fn in_memory_is_not_registered() {
219 let db = Arc::new(Db::in_memory());
220 Db::install_exit_flush(Arc::clone(&db));
221 let found = registry()
222 .lock()
223 .unwrap()
224 .iter()
225 .any(|w| w.upgrade().is_some_and(|a| Arc::ptr_eq(&a, &db)));
226 assert!(!found, ":memory: db must not be registered");
227 }
228
229 /// Registering the same durable db twice adds exactly one registry entry.
230 #[test]
231 fn durable_registration_is_idempotent() {
232 let dir = tempfile::tempdir().unwrap();
233 let db = Arc::new(Db::open(dir.path(), None).unwrap());
234 let count = || {
235 registry()
236 .lock()
237 .unwrap()
238 .iter()
239 .filter(|w| w.upgrade().is_some_and(|a| Arc::ptr_eq(&a, &db)))
240 .count()
241 };
242 Db::install_exit_flush(Arc::clone(&db));
243 assert_eq!(count(), 1, "first install registers exactly once");
244 Db::install_exit_flush(Arc::clone(&db));
245 assert_eq!(count(), 1, "second install does not duplicate");
246 }
247
248 /// The flush path the reader thread runs makes staged writes durable: write,
249 /// flush via the registry, reopen from disk, and the doc is present. Exercises
250 /// everything the signal path does except the raise-and-die tail (which cannot
251 /// be asserted in-process — see tests/exit_flush_signal.rs for the
252 /// child-process end-to-end proof).
253 #[test]
254 fn registered_flush_makes_writes_durable_on_reopen() {
255 let dir = tempfile::tempdir().unwrap();
256 let path = dir.path().to_path_buf();
257 {
258 let db = Arc::new(Db::open(&path, None).unwrap());
259 Db::install_exit_flush(Arc::clone(&db));
260 db.put("k", "v1", serde_json::json!({ "n": 1 }), vec![], None, None)
261 .unwrap();
262 flush_all_registered(); // what the reader thread does on SIGTERM (no raise)
263 }
264 let reopened = Db::open(&path, None).unwrap();
265 let got = reopened.get("k", "v1");
266 assert!(got.is_some(), "write must survive flush + reopen");
267 assert_eq!(got.unwrap().data.get("n").and_then(|v| v.as_i64()), Some(1));
268 }
269}