Skip to main content

lora_wal/wal/
wal.rs

1//! `Wal` — the durable log handle.
2//!
3//! Owns a WAL directory of the shape:
4//!
5//! ```text
6//! <dir>/
7//!   0000000001.wal      sealed segment
8//!   0000000002.wal      sealed segment
9//!   0000000003.wal      active segment
10//! ```
11//!
12//! The active segment is identified by the highest numeric file name —
13//! we deliberately do **not** keep a separate `CURRENT` pointer file.
14//! A pointer would be a second source of truth that crashes can
15//! desynchronise from the directory listing without buying anything:
16//! the file names already encode their ordering, and segment headers
17//! are self-describing.
18//!
19//! Lifecycle is `[`Wal::open`] → acquire the directory lock → drain replay
20//! events into the store → resume normal `begin` / `append` / `commit`
21//! traffic. The directory lock is held until the `Wal` drops; a second
22//! live `Wal::open` on the same directory returns [`WalError::AlreadyOpen`].
23//!
24//! All public methods take `&self` and serialise through an internal
25//! [`Mutex`]. The store write lock already serialises query commits in
26//! production, so the inner mutex is uncontested and effectively free.
27
28use std::fs;
29use std::path::Path;
30use std::sync::{Arc, Mutex};
31#[cfg(not(target_arch = "wasm32"))]
32use std::time::Duration;
33
34use lora_store::MutationEvent;
35
36#[cfg(not(target_arch = "wasm32"))]
37use super::group_flusher::{spawn_group_flusher, GroupFlusherHandle};
38use crate::config::SyncMode;
39use crate::dir::{SegmentDir, SegmentId};
40use crate::errors::WalError;
41use crate::lock::DirLock;
42use crate::lsn::Lsn;
43use crate::record::WalRecord;
44use crate::recorder::WroteCommit;
45use crate::replay::{replay_segments, ReplayOutcome};
46use crate::segment::SegmentWriter;
47
48/// State guarded by the inner `Mutex`. Nothing in this struct is
49/// `Send`-unsafe; the lock is purely for `&self`-safe interior
50/// mutation.
51struct WalState {
52    next_lsn: Lsn,
53    durable_lsn: Lsn,
54    active_segment_id: SegmentId,
55    active_writer: SegmentWriter,
56    /// Lowest segment id still on disk. Bumped by `truncate_up_to`.
57    oldest_segment_id: SegmentId,
58}
59
60/// Reserved latch for durability failures that occur outside the immediate
61/// caller path. Wrapped in a `Mutex` instead of an
62/// `AtomicCell<Option<String>>` because failures are rare and we want the
63/// message preserved verbatim for operator-facing reporting
64/// (`/admin/wal/status` `bgFailure`). Once `Some`, every subsequent
65/// commit/flush returns [`WalError::Poisoned`] and the operator is expected to
66/// restart from the last consistent snapshot + WAL.
67type BgFailure = Mutex<Option<String>>;
68
69/// Selects the durability work that [`Wal::flush_inner`] actually does.
70/// Centralising the three modes here means `flush` and `force_fsync`
71/// share one code path and the call sites don't have to remember which
72/// mode advances `durable_lsn` and which does not.
73#[derive(Debug, Clone, Copy)]
74pub(super) enum FlushKind {
75    /// Honour the configured [`SyncMode`]. This is what the recorder's
76    /// `flush()` calls into.
77    PerConfiguredMode,
78    /// Always write the buffer + fsync + advance `durable_lsn`, regardless of
79    /// mode. Used by checkpoints, explicit sync, and clean Group-mode drop.
80    ForceFsync,
81}
82
83/// Live, append-side WAL handle.
84///
85/// Construct via [`Wal::open`]. The returned tuple includes the list of
86/// committed mutation events that need to be re-applied to the
87/// in-memory store before any new traffic is accepted.
88///
89/// `Wal::open` returns `Arc<Self>` because the optional Group-mode
90/// background flusher needs a `Weak<Wal>` to call back into without
91/// taking a strong reference (which would prevent shutdown).
92pub struct Wal {
93    segments: SegmentDir,
94    sync_mode: SyncMode,
95    segment_target_bytes: u64,
96    state: Mutex<WalState>,
97    /// Latched durability failure; surfaced via [`Wal::bg_failure`] and
98    /// propagated to commit/flush/force_fsync as [`WalError::Poisoned`].
99    bg_failure: Arc<BgFailure>,
100    /// Background flusher for `SyncMode::Group`. `Drop` joins the
101    /// thread, so a `Wal` going out of scope is a clean shutdown
102    /// signal. Absent on `wasm32`, where Group mode falls back to the
103    /// drop-time flush.
104    #[cfg(not(target_arch = "wasm32"))]
105    flusher: Mutex<Option<GroupFlusherHandle>>,
106    /// Held for the lifetime of the WAL so a second handle cannot append
107    /// to the same active segment concurrently.
108    _dir_lock: DirLock,
109}
110
111impl Wal {
112    /// Open or create the WAL directory at `dir`.
113    ///
114    /// `checkpoint_lsn` is the LSN stamped into the most recent
115    /// snapshot the caller is restoring from (or [`Lsn::ZERO`] if
116    /// there is no snapshot). Replay skips records at or below this
117    /// fence — they are already represented in the loaded state.
118    ///
119    /// Returns `(wal, committed_events)`. The caller is expected to
120    /// apply every event in `committed_events` to its in-memory store
121    /// in order before issuing any new `begin` / `append` calls.
122    pub fn open(
123        dir: impl Into<std::path::PathBuf>,
124        sync_mode: SyncMode,
125        segment_target_bytes: u64,
126        checkpoint_lsn: Lsn,
127    ) -> Result<(Arc<Self>, Vec<MutationEvent>), WalError> {
128        let segments = SegmentDir::new(dir);
129        fs::create_dir_all(segments.root())?;
130        let dir_lock = DirLock::acquire(segments.root())?;
131
132        let entries = segments.list()?;
133        let (active_id, active_writer, replay) = if entries.is_empty() {
134            Self::open_fresh(&segments)?
135        } else {
136            Self::open_existing(&segments, &entries, checkpoint_lsn)?
137        };
138
139        let next_lsn = if replay.max_lsn.is_zero() {
140            Lsn::new(1)
141        } else {
142            replay.max_lsn.next()
143        };
144        // Treat everything readable at open time as the recovered
145        // durability fence. This does not prove the bytes were
146        // fsync-confirmed before the previous process died; it means
147        // they survived to this open and future appends must start
148        // after them.
149        let durable_lsn = replay.max_lsn;
150
151        let oldest_segment_id = entries.first().map(|e| e.id).unwrap_or(active_id);
152
153        let state = WalState {
154            next_lsn,
155            durable_lsn,
156            active_segment_id: active_id,
157            active_writer,
158            oldest_segment_id,
159        };
160
161        let wal = Arc::new(Self {
162            segments,
163            sync_mode,
164            segment_target_bytes,
165            state: Mutex::new(state),
166            bg_failure: Arc::new(Mutex::new(None)),
167            #[cfg(not(target_arch = "wasm32"))]
168            flusher: Mutex::new(None),
169            _dir_lock: dir_lock,
170        });
171
172        // Spawn the Group flusher *after* the Arc exists so it can hold a
173        // `Weak<Wal>` that drops when the last strong ref does. The flusher's
174        // own Drop joins the thread, so removing the field on `Wal::drop` is
175        // a clean shutdown signal. Wasm has no real fsync boundary and no
176        // thread support, so Group there relies on the drop-time flush.
177        #[cfg(not(target_arch = "wasm32"))]
178        if let SyncMode::Group { interval_ms } = sync_mode {
179            let interval = Duration::from_millis(u64::from(interval_ms.max(1)));
180            let handle = spawn_group_flusher(Arc::downgrade(&wal), interval);
181            *wal.flusher.lock().unwrap() = Some(handle);
182        }
183
184        Ok((wal, replay.committed_events))
185    }
186
187    /// Brand-new WAL directory. Create segment 1 with `base_lsn = 1`
188    /// so LSN 0 stays reserved for "empty / never written".
189    fn open_fresh(
190        segments: &SegmentDir,
191    ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
192        let id = SegmentId::FIRST;
193        let writer = SegmentWriter::create(segments.path_for(id), Lsn::new(1))?;
194        segments.sync_dir()?;
195        let replay = ReplayOutcome {
196            committed_events: Vec::new(),
197            max_lsn: Lsn::ZERO,
198            torn_tail: None,
199            checkpoint_lsn_observed: None,
200        };
201        Ok((id, writer, replay))
202    }
203
204    /// Existing directory. Replay every segment to surface committed
205    /// events + detect a torn tail; reopen the highest-id segment
206    /// for append; truncate it if the torn tail is in *that* segment.
207    fn open_existing(
208        segments: &SegmentDir,
209        entries: &[crate::dir::SegmentEntry],
210        checkpoint_lsn: Lsn,
211    ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
212        let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
213        let replay = replay_segments(&paths, checkpoint_lsn)?;
214
215        // The active segment is whichever file has the highest
216        // numeric id — segment file names are self-describing, so
217        // there is no separate CURRENT pointer.
218        let active = entries.last().expect("entries non-empty in open_existing");
219        let (mut writer, _torn_from_writer) =
220            SegmentWriter::open_for_append(segments.path_for(active.id))?;
221
222        // A torn tail in a *sealed* segment is impossible (sealed
223        // segments are never appended to), so we only need to handle
224        // the active one.
225        if let Some(t) = &replay.torn_tail {
226            if t.segment_path == active.path {
227                writer.truncate_to(t.last_good_offset)?;
228            } else {
229                return Err(WalError::Malformed(format!(
230                    "torn tail found in sealed segment {}",
231                    t.segment_path.display()
232                )));
233            }
234        }
235
236        Ok((active.id, writer, replay))
237    }
238
239    pub fn dir(&self) -> &Path {
240        self.segments.root()
241    }
242
243    pub fn sync_mode(&self) -> SyncMode {
244        self.sync_mode
245    }
246
247    pub fn durable_lsn(&self) -> Lsn {
248        self.state.lock().unwrap().durable_lsn
249    }
250
251    /// Latched durability failure, if any. `None` means the WAL is healthy.
252    /// Once set, every commit / flush / force_fsync starts returning
253    /// [`WalError::Poisoned`] and the WAL stops accepting new
254    /// transactions until the operator restarts from the last
255    /// consistent snapshot + WAL.
256    pub fn bg_failure(&self) -> Option<String> {
257        self.bg_failure.lock().unwrap().clone()
258    }
259
260    /// Direct handle to the latched-failure mutex. Used by the bg
261    /// flusher to record an fsync failure exactly once. Hidden from
262    /// outside the module so the latch stays single-writer.
263    #[cfg(not(target_arch = "wasm32"))]
264    pub(super) fn bg_failure_slot(&self) -> &BgFailure {
265        &self.bg_failure
266    }
267
268    fn check_healthy(&self) -> Result<(), WalError> {
269        if self.bg_failure.lock().unwrap().is_some() {
270            return Err(WalError::Poisoned);
271        }
272        Ok(())
273    }
274
275    /// LSN that the *next* `begin` / `append` call will allocate.
276    /// Exposed for tests and for sanity checks at boot; not part of
277    /// any durability contract.
278    pub fn next_lsn(&self) -> Lsn {
279        self.state.lock().unwrap().next_lsn
280    }
281
282    pub fn oldest_segment_id(&self) -> u64 {
283        self.state.lock().unwrap().oldest_segment_id.raw()
284    }
285
286    pub fn active_segment_id(&self) -> u64 {
287        self.state.lock().unwrap().active_segment_id.raw()
288    }
289
290    // -------------------------------------------------------------
291    // Low-level record primitives.
292    //
293    // Production code does **not** use these directly — every commit
294    // goes through [`Self::commit_tx`], which writes the begin/batch/
295    // commit triple atomically and routes durability through the
296    // configured single-thread flush policy. The methods below remain
297    // `pub` for the crate's own integration tests and for the rare
298    // admin path (`checkpoint_marker`) that needs to insert a single record.
299    // Mixing them with `commit_tx` against the same WAL is supported
300    // but unnecessary; if you find yourself calling `begin` /
301    // `append` / `commit` from a new caller, prefer `commit_tx`
302    // unless you specifically need the partial-write shape.
303    // -------------------------------------------------------------
304
305    /// Allocate a `TxBegin` record and return its LSN. *Test/admin
306    /// primitive.* Production commits use [`Self::commit_tx`].
307    ///
308    /// Rotation happens here so a transaction is always wholly within
309    /// one segment.
310    pub fn begin(&self) -> Result<Lsn, WalError> {
311        self.check_healthy()?;
312        let mut state = self.state.lock().unwrap();
313        self.maybe_rotate(&mut state)?;
314        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxBegin { lsn })
315    }
316
317    /// Append a single mutation to the active segment's pending
318    /// buffer. *Test/admin primitive.* Not durable until `flush()`
319    /// runs; production commits use [`Self::commit_tx`].
320    pub fn append(&self, tx_begin_lsn: Lsn, event: &MutationEvent) -> Result<Lsn, WalError> {
321        self.check_healthy()?;
322        let mut state = self.state.lock().unwrap();
323        Self::alloc_and_append(&mut state, |lsn| WalRecord::Mutation {
324            lsn,
325            tx_begin_lsn,
326            event: event.clone(),
327        })
328    }
329
330    /// Append many mutations as one framed record. *Test/admin
331    /// primitive.* Production commits use [`Self::commit_tx`], which
332    /// writes the begin/batch/commit triple in a single critical
333    /// section.
334    pub fn append_batch(
335        &self,
336        tx_begin_lsn: Lsn,
337        events: Vec<MutationEvent>,
338    ) -> Result<Lsn, WalError> {
339        self.check_healthy()?;
340        if events.is_empty() {
341            return Err(WalError::Encode(
342                "mutation batch must contain at least one event".into(),
343            ));
344        }
345        let mut state = self.state.lock().unwrap();
346        Self::alloc_and_append(&mut state, |lsn| WalRecord::MutationBatch {
347            lsn,
348            tx_begin_lsn,
349            events,
350        })
351    }
352
353    /// Append a standalone `TxCommit` marker. *Test/admin primitive.*
354    /// Production commits use [`Self::commit_tx`].
355    pub fn commit(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
356        self.check_healthy()?;
357        let mut state = self.state.lock().unwrap();
358        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxCommit { lsn, tx_begin_lsn })
359    }
360
361    /// Append a `TxAbort` marker. *Test/admin primitive.* Production
362    /// code never writes `TxAbort`: [`Self::commit_tx`] writes the
363    /// begin/batch/commit triple atomically, so an aborted query has
364    /// nothing on disk to mark as aborted.
365    pub fn abort(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
366        self.check_healthy()?;
367        let mut state = self.state.lock().unwrap();
368        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxAbort { lsn, tx_begin_lsn })
369    }
370
371    /// One-shot transaction commit.
372    ///
373    /// Encodes `TxBegin` + `MutationBatch` + `TxCommit` as a single
374    /// contiguous run inside one short critical section, then applies the
375    /// configured flush policy. Compared to the legacy
376    /// `begin → append_batch → commit → flush` sequence this collapses
377    /// four separate state-lock acquisitions into one while preserving the
378    /// release's single-writer execution model. Future concurrent commit
379    /// plumbing can build around this one-shot boundary without changing the
380    /// recorder contract.
381    ///
382    /// Returns [`WroteCommit::No`] for an empty event list (no records
383    /// are written, no fsync is issued).
384    pub fn commit_tx(&self, events: Vec<MutationEvent>) -> Result<WroteCommit, WalError> {
385        self.check_healthy()?;
386        if events.is_empty() {
387            return Ok(WroteCommit::No);
388        }
389
390        // Phase 1: allocate the LSN window and encode all three
391        // records into the active segment's pending buffer in one
392        // critical section. Collapsing what was four separate state
393        // lock acquisitions (begin / append_batch / commit / flush)
394        // into one is the lock-side win that pairs with the
395        // lock-free emit short-circuit on the recorder side.
396        {
397            let mut state = self.state.lock().unwrap();
398            self.maybe_rotate(&mut state)?;
399            let begin_lsn = state.next_lsn;
400            let batch_lsn = begin_lsn.next();
401            let commit_lsn = batch_lsn.next();
402            state.next_lsn = commit_lsn.next();
403            state
404                .active_writer
405                .append(&WalRecord::TxBegin { lsn: begin_lsn })?;
406            state.active_writer.append(&WalRecord::MutationBatch {
407                lsn: batch_lsn,
408                tx_begin_lsn: begin_lsn,
409                events,
410            })?;
411            state.active_writer.append(&WalRecord::TxCommit {
412                lsn: commit_lsn,
413                tx_begin_lsn: begin_lsn,
414            })?;
415        }
416
417        // Phase 2: durability per sync mode. PerCommit fsyncs inline;
418        // Group is cooperative in this single-threaded release (no bg
419        // flusher thread); None just pushes bytes to the page cache.
420        match self.sync_mode {
421            SyncMode::PerCommit => self.flush_inner(FlushKind::ForceFsync)?,
422            SyncMode::Group { .. } | SyncMode::None => {
423                self.flush_inner(FlushKind::PerConfiguredMode)?;
424            }
425        }
426
427        Ok(WroteCommit::Yes)
428    }
429
430    /// Append a `Checkpoint` marker. `snapshot_lsn` should equal the
431    /// LSN written into the snapshot file's header — replay uses
432    /// it to defend against the snapshot-rename-but-no-marker race.
433    pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
434        self.check_healthy()?;
435        let mut state = self.state.lock().unwrap();
436        Self::alloc_and_append(&mut state, |lsn| WalRecord::Checkpoint {
437            lsn,
438            snapshot_lsn,
439        })
440    }
441
442    /// Single-source-of-truth for "allocate the next LSN, build the
443    /// record, push it onto the active segment's pending buffer".
444    /// The five public append paths (`begin / append / commit / abort
445    /// / checkpoint_marker`) all funnel through here so the LSN
446    /// allocation never gets out of sync with the encoded record.
447    #[inline]
448    fn alloc_and_append(
449        state: &mut WalState,
450        build: impl FnOnce(Lsn) -> WalRecord,
451    ) -> Result<Lsn, WalError> {
452        let lsn = state.next_lsn;
453        state.next_lsn = lsn.next();
454        state.active_writer.append(&build(lsn))?;
455        Ok(lsn)
456    }
457
458    /// Flush the active segment's pending buffer.
459    ///
460    /// What "flush" means depends on [`SyncMode`]:
461    ///
462    /// - `PerCommit` — write the buffer to the OS, `fsync`, and
463    ///   advance `durable_lsn`. The strongest contract: every
464    ///   record up to `next_lsn - 1` is on disk.
465    /// - `Group` — write the buffer to the OS, but leave
466    ///   `durable_lsn` unchanged until an explicit `force_fsync`,
467    ///   checkpoint, sync, or clean drop.
468    /// - `None` — write the buffer to the OS only, but advance
469    ///   `durable_lsn` anyway. The mode opts out of crash
470    ///   durability, so the checkpoint fence reports
471    ///   "what's been written" instead of "what's actually safe".
472    pub fn flush(&self) -> Result<(), WalError> {
473        self.check_healthy()?;
474        self.flush_inner(FlushKind::PerConfiguredMode)
475    }
476
477    /// Unconditionally write the buffer to the OS, `fsync`, and
478    /// advance `durable_lsn`. Used by callers that need a durability
479    /// point right now regardless of the configured cadence (e.g.
480    /// checkpoint). Returns [`WalError::Poisoned`] if the WAL has already
481    /// latched a durability failure.
482    pub fn force_fsync(&self) -> Result<(), WalError> {
483        self.check_healthy()?;
484        self.flush_inner(FlushKind::ForceFsync)
485    }
486
487    /// Single source of truth for the flush state machine. Skips the
488    /// `check_healthy` gate so clean shutdown can force a final Group-mode
489    /// sync even if callers are otherwise done with the handle.
490    pub(super) fn flush_inner(&self, kind: FlushKind) -> Result<(), WalError> {
491        let mut state = self.state.lock().unwrap();
492        let written_lsn = Lsn::new(state.next_lsn.raw().saturating_sub(1));
493
494        // Decide whether this call is allowed to advance `durable_lsn`.
495        // PerCommit and forced syncs advance after the fsync boundary. None
496        // advances after write because the mode opts out of crash durability.
497        // Group is cooperative for this release: normal `flush()` writes bytes
498        // but leaves the durable fence alone until `force_fsync`, checkpoint,
499        // sync(), or drop.
500        let do_fsync = matches!(
501            (kind, self.sync_mode),
502            (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit)
503        );
504        let advance_durable = matches!(
505            (kind, self.sync_mode),
506            (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit) | (_, SyncMode::None)
507        );
508
509        if do_fsync {
510            state.active_writer.flush_and_sync()?;
511        } else {
512            state.active_writer.flush_buffer()?;
513        }
514        if advance_durable {
515            state.durable_lsn = written_lsn;
516        }
517        Ok(())
518    }
519
520    /// Drop sealed segments whose entire LSN range is at or below
521    /// `fence_lsn`. Idempotent and safe to call repeatedly.
522    ///
523    /// The active segment is never deleted — even if every record in
524    /// it predates the fence, it is still the rotation target for
525    /// new appends. The segment immediately before the active one
526    /// is also kept as a tombstone so a subsequent crash before the
527    /// next checkpoint still finds a self-describing log start.
528    pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
529        let mut state = self.state.lock().unwrap();
530        let active_id = state.active_segment_id;
531        let entries = self.segments.list()?;
532
533        let mut to_drop: Vec<crate::dir::SegmentEntry> = Vec::new();
534        for (i, entry) in entries.iter().enumerate() {
535            // Active segment and the one immediately preceding it
536            // are kept by policy.
537            if entry.id >= active_id.saturating_prev() {
538                break;
539            }
540            // Segment `i` covers `[base_i, base_{i+1} - 1]`. We are
541            // safe to drop only when `base_{i+1} - 1 <= fence_lsn`.
542            let next = match entries.get(i + 1) {
543                Some(n) => n,
544                None => break,
545            };
546            let next_base = SegmentDir::base_lsn(&next.path)?;
547            if next_base.raw().saturating_sub(1) <= fence_lsn.raw() {
548                to_drop.push(entry.clone());
549            }
550        }
551
552        for entry in to_drop {
553            fs::remove_file(&entry.path)?;
554            if entry.id >= state.oldest_segment_id {
555                state.oldest_segment_id = entry.id.next();
556            }
557        }
558        if state.oldest_segment_id != entries.first().map(|e| e.id).unwrap_or(active_id) {
559            self.segments.sync_dir()?;
560        }
561        Ok(())
562    }
563
564    /// Rotate the active segment when it has grown past
565    /// `segment_target_bytes`. Called from `begin()` so rotation only
566    /// ever lands at a transaction boundary.
567    fn maybe_rotate(&self, state: &mut WalState) -> Result<(), WalError> {
568        if state.active_writer.bytes_written() < self.segment_target_bytes {
569            return Ok(());
570        }
571        // Seal the current segment (forces a flush + fsync) and open
572        // a fresh one with `base_lsn = next_lsn` so the segment file
573        // names line up with the record LSNs they contain.
574        state.active_writer.flush_and_sync()?;
575        state.active_writer.seal()?;
576
577        let next_id = state.active_segment_id.next();
578        let writer = SegmentWriter::create(self.segments.path_for(next_id), state.next_lsn)?;
579        self.segments.sync_dir()?;
580        state.active_writer = writer;
581        state.active_segment_id = next_id;
582        Ok(())
583    }
584}
585
586impl Drop for Wal {
587    fn drop(&mut self) {
588        if matches!(self.sync_mode, SyncMode::Group { .. }) {
589            let _ = self.flush_inner(FlushKind::ForceFsync);
590        }
591        // Join the group flusher, if any, before the directory lock is
592        // released. That keeps the "one live append owner" boundary intact
593        // through shutdown.
594        #[cfg(not(target_arch = "wasm32"))]
595        if let Ok(slot) = self.flusher.get_mut() {
596            let _ = slot.take();
597        }
598    }
599}