Skip to main content

etchdb/
store.rs

1//! Core persistence engine.
2//!
3//! Generic `Store<T, B>` that holds state in memory behind an `RwLock` and
4//! delegates persistence to a `Backend`. Reads are zero-copy borrows; writes
5//! use transaction capture (overlay + ops) so the read lock is only held
6//! briefly during merge.
7//!
8//! # Flush Policies
9//!
10//! - **Immediate** (default): every write fsyncs before returning.
11//! - **Grouped**: writes are coalesced; a background thread fsyncs at most
12//!   every `interval`. Only the latest state is persisted — intermediate
13//!   mutations are folded in.
14
15use parking_lot::{Condvar, Mutex, RwLock, RwLockReadGuard};
16use serde::{Serialize, de::DeserializeOwned};
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
20use std::time::Duration;
21
22use crate::backend::{Backend, NullBackend};
23use crate::error::{Error, Result};
24use crate::wal::{IncrementalSave, Op, Replayable, Transactable, WalBackend};
25
26/// Controls how writes are persisted to disk.
27#[derive(Debug, Clone)]
28pub enum FlushPolicy {
29    /// Every write fsyncs immediately (current behavior, default).
30    Immediate,
31    /// Writes are coalesced; a background thread fsyncs at most every `interval`.
32    Grouped { interval: Duration },
33}
34
35/// Shared state between the store and the flusher thread.
36struct FlushShared<T, B: Backend<T>> {
37    state: Arc<RwLock<T>>,
38    backend: Arc<B>,
39    /// Optional incremental saver (WAL). When present, the flusher syncs the
40    /// WAL instead of doing full backend.save().
41    incremental: Option<Arc<dyn IncrementalSave<T>>>,
42    /// Pending ops buffer for WAL grouped mode. Writers push ops here; the
43    /// flusher drains and writes them to the WAL file in bulk, then fsyncs.
44    /// This avoids per-write WAL mutex acquisition and BufWriter I/O.
45    pending_ops: Mutex<Vec<Vec<Op>>>,
46    gen_written: AtomicU64,
47    gen_flushed: AtomicU64,
48    notify: Condvar,
49    notify_mu: Mutex<()>,
50    last_error: Mutex<Option<Error>>,
51    shutdown: AtomicBool,
52}
53
54/// Background flusher state (only present in Grouped mode).
55struct FlushState {
56    handle: Mutex<Option<std::thread::JoinHandle<()>>>,
57}
58
59/// Persistent state store.
60///
61/// Holds `T` in memory behind a read-write lock. On write, mutations execute
62/// against a transaction overlay that captures ops directly. The overlay is
63/// merged into state in O(changed keys).
64///
65/// A separate `Mutex` serializes writers so the `RwLock` write-lock is held
66/// only for the final overlay merge (~microseconds), keeping reads unblocked
67/// during persistence.
68pub struct Store<T, B: Backend<T> = NullBackend> {
69    state: Arc<RwLock<T>>,
70    write_gate: Mutex<()>,
71    backend: Arc<B>,
72    /// Optional incremental save (WAL). When present, write() diffs and
73    /// appends ops instead of full backend.save().
74    incremental: Option<Arc<dyn IncrementalSave<T>>>,
75    /// Shared state with flusher thread. None = Immediate mode.
76    shared: Option<Arc<FlushShared<T, B>>>,
77    /// Flusher thread handle. None = Immediate mode or not yet started.
78    flusher: Option<FlushState>,
79}
80
81/// Shared read guard — holds read lock, provides zero-copy access.
82pub struct Ref<'a, T>(RwLockReadGuard<'a, T>);
83
84impl<'a, T> std::ops::Deref for Ref<'a, T> {
85    type Target = T;
86    fn deref(&self) -> &T {
87        &self.0
88    }
89}
90
91impl<T: Default> Store<T, NullBackend> {
92    /// In-memory only store (for tests).
93    pub fn memory() -> Self {
94        Self {
95            state: Arc::new(RwLock::new(T::default())),
96            write_gate: Mutex::new(()),
97            backend: Arc::new(NullBackend),
98            incremental: None,
99            shared: None,
100            flusher: None,
101        }
102    }
103}
104
105impl<T: Replayable + Serialize + DeserializeOwned + Default> Store<T, WalBackend<T>> {
106    /// Open store with WAL backend. Immediate mode (every write fsyncs WAL).
107    pub fn open_wal(dir: PathBuf) -> Result<Self> {
108        let backend = WalBackend::open(&dir)?;
109        let state = backend.load()?;
110        let backend = Arc::new(backend);
111        let incremental: Arc<dyn IncrementalSave<T>> = Arc::clone(&backend) as _;
112        Ok(Self {
113            state: Arc::new(RwLock::new(state)),
114            write_gate: Mutex::new(()),
115            backend,
116            incremental: Some(incremental),
117            shared: None,
118            flusher: None,
119        })
120    }
121}
122
123impl<T: Clone, B: Backend<T>> Store<T, B> {
124    /// Create a store from an existing backend.
125    pub fn with_backend(backend: B) -> Result<Self>
126    where
127        T: DeserializeOwned,
128    {
129        let state = backend.load()?;
130        Ok(Self {
131            state: Arc::new(RwLock::new(state)),
132            write_gate: Mutex::new(()),
133            backend: Arc::new(backend),
134            incremental: None,
135            shared: None,
136            flusher: None,
137        })
138    }
139
140    /// Zero-copy shared read. Multiple readers can hold this concurrently.
141    pub fn read(&self) -> Ref<'_, T> {
142        Ref(self.state.read())
143    }
144
145    /// Returns the last background flush error, if any.
146    pub fn flush_error(&self) -> Option<Error> {
147        self.shared
148            .as_ref()
149            .and_then(|s| s.last_error.lock().take())
150    }
151
152    /// Returns a reference to the backend.
153    pub fn backend(&self) -> &B {
154        &self.backend
155    }
156}
157
158// Write methods — zero-clone transaction capture.
159impl<T: Transactable, B: Backend<T>> Store<T, B> {
160    /// Atomic write via transaction capture.
161    ///
162    /// Borrows committed state via a read lock, executes mutations against an
163    /// overlay that captures ops directly, then merges the overlay into state.
164    /// O(changed keys), not O(total entries).
165    ///
166    /// **With WAL (Immediate)**: begin_tx → mutate → finish → append ops → fsync → merge.
167    /// **With WAL (Grouped)**: begin_tx → mutate → finish → buffer ops → merge → bump gen.
168    /// **Without WAL**: falls through to overlay merge only (no persistence).
169    pub fn write<F, R>(&self, f: F) -> Result<R>
170    where
171        F: for<'a> FnOnce(&mut T::Tx<'a>) -> Result<R>,
172    {
173        let _gate = self.write_gate.lock();
174
175        // Fail-fast on grouped flusher error.
176        if let Some(ref shared) = self.shared
177            && let Some(err) = shared.last_error.lock().take()
178        {
179            return Err(err);
180        }
181
182        // Borrow committed state via read lock — no clone.
183        let state_guard = self.state.read();
184        let mut tx = state_guard.begin_tx();
185        let result = f(&mut tx)?;
186        let (ops, overlay) = T::finish_tx(tx);
187        drop(state_guard); // release read lock before write lock
188
189        // Persist, then merge overlay into in-memory state.
190        if let Some(ref inc) = self.incremental {
191            // WAL path: append ops, then sync or buffer.
192            if !ops.is_empty() {
193                match &self.shared {
194                    None => {
195                        inc.save_ops(&ops)?;
196                        inc.sync()?;
197                    }
198                    Some(shared) => {
199                        shared.pending_ops.lock().push(ops);
200                    }
201                }
202            }
203        } else {
204            // Non-WAL path: merge overlay first, then persist full state.
205            self.state.write().apply_overlay(overlay);
206            match &self.shared {
207                None => {
208                    self.backend.save(&self.state.read())?;
209                }
210                Some(shared) => {
211                    shared.gen_written.fetch_add(1, Ordering::Release);
212                    shared.notify.notify_one();
213                }
214            }
215            return Ok(result);
216        }
217
218        self.state.write().apply_overlay(overlay);
219
220        if let Some(ref shared) = self.shared {
221            shared.gen_written.fetch_add(1, Ordering::Release);
222            shared.notify.notify_one();
223        }
224
225        Ok(result)
226    }
227
228    /// Atomic write with guaranteed immediate persistence.
229    ///
230    /// Same as `write()` but forces an immediate fsync regardless of flush
231    /// policy. Use for critical writes that must survive a crash.
232    pub fn write_durable<F, R>(&self, f: F) -> Result<R>
233    where
234        F: for<'a> FnOnce(&mut T::Tx<'a>) -> Result<R>,
235    {
236        let _gate = self.write_gate.lock();
237
238        let state_guard = self.state.read();
239        let mut tx = state_guard.begin_tx();
240        let result = f(&mut tx)?;
241        let (ops, overlay) = T::finish_tx(tx);
242        drop(state_guard);
243
244        if let Some(ref inc) = self.incremental {
245            // WAL path: drain pending ops, append ours, fsync.
246            if let Some(ref shared) = self.shared {
247                let batched: Vec<Vec<Op>> = {
248                    let mut pending = shared.pending_ops.lock();
249                    std::mem::take(&mut *pending)
250                };
251                for batch in &batched {
252                    inc.save_ops(batch)?;
253                }
254            }
255
256            if !ops.is_empty() {
257                inc.save_ops(&ops)?;
258            }
259            inc.sync()?;
260        } else {
261            // Non-WAL path: merge overlay, then full persist.
262            self.state.write().apply_overlay(overlay);
263            self.backend.save(&self.state.read())?;
264            if let Some(ref shared) = self.shared {
265                let generation = shared.gen_written.fetch_add(1, Ordering::Release) + 1;
266                shared.gen_flushed.store(generation, Ordering::Release);
267            }
268            return Ok(result);
269        }
270
271        self.state.write().apply_overlay(overlay);
272
273        if let Some(ref shared) = self.shared {
274            let generation = shared.gen_written.fetch_add(1, Ordering::Release) + 1;
275            shared.gen_flushed.store(generation, Ordering::Release);
276        }
277
278        Ok(result)
279    }
280}
281
282impl<T: Clone + Send + Sync + 'static, B: Backend<T> + Send + Sync + 'static> Store<T, B> {
283    /// Set flush policy. Must be called before first write.
284    /// Starts background flusher thread for Grouped policy.
285    pub fn set_flush_policy(&mut self, policy: FlushPolicy) {
286        // Shut down existing flusher if any.
287        self.shutdown_flusher();
288
289        match policy {
290            FlushPolicy::Immediate => {
291                self.shared = None;
292                self.flusher = None;
293            }
294            FlushPolicy::Grouped { interval } => {
295                let shared = Arc::new(FlushShared {
296                    state: Arc::clone(&self.state),
297                    backend: Arc::clone(&self.backend),
298                    incremental: self.incremental.clone(),
299                    pending_ops: Mutex::new(Vec::new()),
300                    gen_written: AtomicU64::new(0),
301                    gen_flushed: AtomicU64::new(0),
302                    notify: Condvar::new(),
303                    notify_mu: Mutex::new(()),
304                    last_error: Mutex::new(None),
305                    shutdown: AtomicBool::new(false),
306                });
307
308                let thread_shared = Arc::clone(&shared);
309                let handle = std::thread::Builder::new()
310                    .name("store-flusher".into())
311                    .spawn(move || flusher_loop(&thread_shared, interval))
312                    .expect("failed to spawn flusher thread");
313
314                self.shared = Some(shared);
315                self.flusher = Some(FlushState {
316                    handle: Mutex::new(Some(handle)),
317                });
318            }
319        }
320    }
321
322    /// Flush dirty state now and wait for completion.
323    ///
324    /// In grouped mode, wakes the flusher thread and spins until it catches
325    /// up with the current generation. In immediate mode, this is a no-op
326    /// since writes are already persisted synchronously.
327    pub fn flush(&self) -> Result<()> {
328        let Some(ref shared) = self.shared else {
329            return Ok(());
330        };
331
332        let target_gen = shared.gen_written.load(Ordering::Acquire);
333        if target_gen == shared.gen_flushed.load(Ordering::Acquire) {
334            return Ok(());
335        }
336
337        // Wake the flusher repeatedly until it catches up.
338        let start = std::time::Instant::now();
339        loop {
340            shared.notify.notify_one();
341
342            if shared.gen_flushed.load(Ordering::Acquire) >= target_gen {
343                break;
344            }
345
346            if start.elapsed() > std::time::Duration::from_secs(5) {
347                return Err(Error::Io(std::io::Error::new(
348                    std::io::ErrorKind::TimedOut,
349                    "flush timed out waiting for flusher",
350                )));
351            }
352
353            // Check for flusher error.
354            if let Some(err) = shared.last_error.lock().take() {
355                return Err(err);
356            }
357
358            std::thread::sleep(std::time::Duration::from_millis(1));
359        }
360
361        // Check for any error that occurred during the flush.
362        if let Some(err) = shared.last_error.lock().take() {
363            return Err(err);
364        }
365        Ok(())
366    }
367
368    /// Shut down the flusher thread gracefully.
369    pub fn close(&mut self) -> Result<()> {
370        self.shutdown_flusher();
371        Ok(())
372    }
373
374    fn shutdown_flusher(&mut self) {
375        if let Some(ref shared) = self.shared {
376            shared.shutdown.store(true, Ordering::Release);
377            shared.notify.notify_one();
378        }
379        if let Some(ref flusher) = self.flusher
380            && let Some(handle) = flusher.handle.lock().take()
381        {
382            let _ = handle.join();
383        }
384    }
385}
386
387/// Flusher thread main loop.
388///
389/// With WAL: sync WAL (fsync buffered entries) + check snapshot threshold.
390/// Without WAL: clone state + full backend.save().
391fn flusher_loop<T: Clone, B: Backend<T>>(shared: &FlushShared<T, B>, interval: Duration) {
392    loop {
393        {
394            let mut guard = shared.notify_mu.lock();
395            shared.notify.wait_for(&mut guard, interval);
396        }
397
398        let should_shutdown = shared.shutdown.load(Ordering::Acquire);
399
400        let current_gen = shared.gen_written.load(Ordering::Acquire);
401        let flushed_gen = shared.gen_flushed.load(Ordering::Acquire);
402
403        if current_gen != flushed_gen {
404            let result = if let Some(ref inc) = shared.incremental {
405                // WAL mode: drain buffered ops, write to WAL, fsync once.
406                let batched: Vec<Vec<Op>> = {
407                    let mut pending = shared.pending_ops.lock();
408                    std::mem::take(&mut *pending)
409                };
410                let mut write_err = None;
411                for ops in &batched {
412                    if let Err(e) = inc.save_ops(ops) {
413                        write_err = Some(e);
414                        break;
415                    }
416                }
417                match write_err {
418                    Some(e) => Err(e),
419                    None => match inc.sync() {
420                        Ok(()) => {
421                            // Check if we should compact.
422                            if inc.should_snapshot() {
423                                let snapshot = shared.state.read().clone();
424                                inc.snapshot(&snapshot)
425                            } else {
426                                Ok(())
427                            }
428                        }
429                        Err(e) => Err(e),
430                    },
431                }
432            } else {
433                // Non-WAL: full state clone + serialize.
434                let snapshot = shared.state.read().clone();
435                shared.backend.save(&snapshot)
436            };
437
438            match result {
439                Ok(()) => {
440                    shared.gen_flushed.store(current_gen, Ordering::Release);
441                }
442                Err(e) => {
443                    *shared.last_error.lock() = Some(e);
444                }
445            }
446        }
447
448        if should_shutdown {
449            break;
450        }
451    }
452}
453
454impl<T, B: Backend<T>> Drop for Store<T, B> {
455    fn drop(&mut self) {
456        if let Some(ref shared) = self.shared {
457            shared.shutdown.store(true, Ordering::Release);
458            shared.notify.notify_one();
459        }
460        if let Some(ref flusher) = self.flusher
461            && let Some(handle) = flusher.handle.lock().take()
462        {
463            let _ = handle.join();
464        }
465    }
466}
467
468#[cfg(test)]
469#[path = "store_test.rs"]
470mod store_test;