Skip to main content

lora_wal/wal/
wal.rs

1//! `Wal` — the durable log handle.
2//!
3//! Owns a WAL directory of the shape:
4//!
5//! ```text
6//! <dir>/
7//!   0000000001.wal      sealed segment
8//!   0000000002.wal      sealed segment
9//!   0000000003.wal      active segment
10//! ```
11//!
12//! The active segment is identified by the highest numeric file name —
13//! we deliberately do **not** keep a separate `CURRENT` pointer file.
14//! A pointer would be a second source of truth that crashes can
15//! desynchronise from the directory listing without buying anything:
16//! the file names already encode their ordering, and segment headers
17//! are self-describing.
18//!
19//! Lifecycle is `[`Wal::open`] → acquire the directory lock → drain replay
20//! events into the store → resume normal `begin` / `append` / `commit`
21//! traffic. The directory lock is held until the `Wal` drops; a second
22//! live `Wal::open` on the same directory returns [`WalError::AlreadyOpen`].
23//!
24//! All public methods take `&self` and serialise through an internal
25//! [`Mutex`]. The store write lock already serialises query commits in
26//! production, so the inner mutex is uncontested and effectively free.
27
28use std::fs;
29use std::path::Path;
30use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
31#[cfg(not(target_arch = "wasm32"))]
32use std::time::Duration;
33
34use lora_store::MutationEvent;
35
36#[cfg(not(target_arch = "wasm32"))]
37use super::group_flusher::{spawn_group_flusher, GroupFlusherHandle};
38use crate::config::SyncMode;
39use crate::dir::{SegmentDir, SegmentId};
40use crate::errors::WalError;
41use crate::lock::DirLock;
42use crate::lsn::Lsn;
43use crate::record::WalRecord;
44use crate::recorder::WroteCommit;
45use crate::replay::{replay_segments, ReplayOutcome};
46use crate::segment::SegmentWriter;
47
48/// State guarded by the inner `Mutex`. Nothing in this struct is
49/// `Send`-unsafe; the lock is purely for `&self`-safe interior
50/// mutation.
51struct WalState {
52    next_lsn: Lsn,
53    durable_lsn: Lsn,
54    active_segment_id: SegmentId,
55    active_writer: SegmentWriter,
56    /// Lowest segment id still on disk. Bumped by `truncate_up_to`.
57    oldest_segment_id: SegmentId,
58}
59
60/// Reserved latch for durability failures that occur outside the immediate
61/// caller path. Wrapped in a `Mutex` instead of an
62/// `AtomicCell<Option<String>>` because failures are rare and we want the
63/// message preserved verbatim for operator-facing reporting
64/// (`/admin/wal/status` `bgFailure`). Once `Some`, every subsequent
65/// commit/flush returns [`WalError::Poisoned`] and the operator is expected to
66/// restart from the last consistent snapshot + WAL.
67type BgFailure = Mutex<Option<String>>;
68
69/// Selects the durability work that [`Wal::flush_inner`] actually does.
70/// Centralising the normal write-only path and the forced fsync path keeps
71/// the call sites from duplicating durable-LSN rules.
72#[derive(Debug, Clone, Copy)]
73pub(super) enum FlushKind {
74    /// Write pending WAL bytes to the OS without forcing storage durability.
75    /// This is what commits do under [`SyncMode::GroupSync`].
76    PerConfiguredMode,
77    /// Write pending WAL bytes, fsync, and advance `durable_lsn`. Used by
78    /// checkpoints, explicit sync, the background flusher, and clean drop.
79    ForceFsync,
80}
81
82/// Live, append-side WAL handle.
83///
84/// Construct via [`Wal::open`]. The returned tuple includes the list of
85/// committed mutation events that need to be re-applied to the
86/// in-memory store before any new traffic is accepted.
87///
88/// `Wal::open` returns `Arc<Self>` because the optional GroupSync
89/// background flusher needs a `Weak<Wal>` to call back into without
90/// taking a strong reference (which would prevent shutdown).
91pub struct Wal {
92    segments: SegmentDir,
93    sync_mode: SyncMode,
94    segment_target_bytes: u64,
95    state: Mutex<WalState>,
96    /// Latched durability failure; surfaced via [`Wal::bg_failure`] and
97    /// propagated to commit/flush/force_fsync as [`WalError::Poisoned`].
98    bg_failure: Arc<BgFailure>,
99    /// Background flusher for `SyncMode::GroupSync`. `Drop` joins the
100    /// thread, so a `Wal` going out of scope is a clean shutdown
101    /// signal. Absent on `wasm32`, where GroupSync falls back to the
102    /// drop-time flush.
103    #[cfg(not(target_arch = "wasm32"))]
104    flusher: Mutex<Option<GroupFlusherHandle>>,
105    /// Held for the lifetime of the WAL so a second handle cannot append
106    /// to the same active segment concurrently.
107    _dir_lock: DirLock,
108}
109
110impl Wal {
111    /// Open or create the WAL directory at `dir`.
112    ///
113    /// `checkpoint_lsn` is the LSN stamped into the most recent
114    /// snapshot the caller is restoring from (or [`Lsn::ZERO`] if
115    /// there is no snapshot). Replay skips records at or below this
116    /// fence — they are already represented in the loaded state.
117    ///
118    /// Returns `(wal, committed_events)`. The caller is expected to
119    /// apply every event in `committed_events` to its in-memory store
120    /// in order before issuing any new `begin` / `append` calls.
121    pub fn open(
122        dir: impl Into<std::path::PathBuf>,
123        sync_mode: SyncMode,
124        segment_target_bytes: u64,
125        checkpoint_lsn: Lsn,
126    ) -> Result<(Arc<Self>, Vec<MutationEvent>), WalError> {
127        let segments = SegmentDir::new(dir);
128        fs::create_dir_all(segments.root())?;
129        let dir_lock = DirLock::acquire(segments.root())?;
130
131        let entries = segments.list()?;
132        let (active_id, active_writer, replay) = if entries.is_empty() {
133            Self::open_fresh(&segments)?
134        } else {
135            Self::open_existing(&segments, &entries, checkpoint_lsn)?
136        };
137
138        let next_lsn = if replay.max_lsn.is_zero() {
139            Lsn::new(1)
140        } else {
141            replay
142                .max_lsn
143                .checked_next()
144                .ok_or_else(|| WalError::Malformed("WAL LSN space is exhausted".into()))?
145        };
146        // Treat everything readable at open time as the recovered
147        // durability fence. This does not prove the bytes were
148        // fsync-confirmed before the previous process died; it means
149        // they survived to this open and future appends must start
150        // after them.
151        let durable_lsn = replay.max_lsn;
152
153        let oldest_segment_id = entries.first().map(|e| e.id).unwrap_or(active_id);
154
155        let state = WalState {
156            next_lsn,
157            durable_lsn,
158            active_segment_id: active_id,
159            active_writer,
160            oldest_segment_id,
161        };
162
163        let wal = Arc::new(Self {
164            segments,
165            sync_mode,
166            segment_target_bytes,
167            state: Mutex::new(state),
168            bg_failure: Arc::new(Mutex::new(None)),
169            #[cfg(not(target_arch = "wasm32"))]
170            flusher: Mutex::new(None),
171            _dir_lock: dir_lock,
172        });
173
174        // Spawn the GroupSync flusher *after* the Arc exists so it can hold a
175        // `Weak<Wal>` that drops when the last strong ref does. The flusher's
176        // own Drop joins the thread, so removing the field on `Wal::drop` is
177        // a clean shutdown signal. Wasm has no real fsync boundary and no
178        // thread support, so GroupSync there relies on the drop-time flush.
179        #[cfg(not(target_arch = "wasm32"))]
180        {
181            let SyncMode::GroupSync { interval_ms } = sync_mode;
182            let interval = Duration::from_millis(u64::from(interval_ms.max(1)));
183            let handle = spawn_group_flusher(Arc::downgrade(&wal), interval);
184            *wal.flusher.lock().map_err(|_| WalError::Poisoned)? = Some(handle);
185        }
186
187        Ok((wal, replay.committed_events))
188    }
189
190    /// Brand-new WAL directory. Create segment 1 with `base_lsn = 1`
191    /// so LSN 0 stays reserved for "empty / never written".
192    fn open_fresh(
193        segments: &SegmentDir,
194    ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
195        let id = SegmentId::FIRST;
196        let writer = SegmentWriter::create(segments.path_for(id), Lsn::new(1))?;
197        segments.sync_dir()?;
198        let replay = ReplayOutcome {
199            committed_events: Vec::new(),
200            max_lsn: Lsn::ZERO,
201            torn_tail: None,
202            checkpoint_lsn_observed: None,
203            last_good_offset: crate::segment::SEGMENT_HEADER_LEN as u64,
204        };
205        Ok((id, writer, replay))
206    }
207
208    /// Existing directory. Replay every segment to surface committed
209    /// events + detect a torn tail; reopen the highest-id segment
210    /// for append; truncate it if the torn tail is in *that* segment.
211    fn open_existing(
212        segments: &SegmentDir,
213        entries: &[crate::dir::SegmentEntry],
214        checkpoint_lsn: Lsn,
215    ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
216        let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
217        let replay = replay_segments(&paths, checkpoint_lsn)?;
218
219        // The active segment is whichever file has the highest
220        // numeric id — segment file names are self-describing, so
221        // there is no separate CURRENT pointer.
222        let active = entries
223            .last()
224            .ok_or_else(|| WalError::Malformed("WAL directory has no segments".into()))?;
225        let mut writer = SegmentWriter::open_for_append_at(
226            segments.path_for(active.id),
227            replay.last_good_offset,
228        )?;
229
230        // A torn tail in a *sealed* segment is impossible (sealed
231        // segments are never appended to), so we only need to handle
232        // the active one.
233        if let Some(t) = &replay.torn_tail {
234            if t.segment_path == active.path {
235                writer.truncate_to(t.last_good_offset)?;
236            } else {
237                return Err(WalError::Malformed(format!(
238                    "torn tail found in sealed segment {}",
239                    t.segment_path.display()
240                )));
241            }
242        }
243
244        Ok((active.id, writer, replay))
245    }
246
247    pub fn dir(&self) -> &Path {
248        self.segments.root()
249    }
250
251    pub fn sync_mode(&self) -> SyncMode {
252        self.sync_mode
253    }
254
255    pub fn durable_lsn(&self) -> Lsn {
256        self.state
257            .lock()
258            .unwrap_or_else(PoisonError::into_inner)
259            .durable_lsn
260    }
261
262    /// Latched durability failure, if any. `None` means the WAL is healthy.
263    /// Once set, every commit / flush / force_fsync starts returning
264    /// [`WalError::Poisoned`] and the WAL stops accepting new
265    /// transactions until the operator restarts from the last
266    /// consistent snapshot + WAL.
267    pub fn bg_failure(&self) -> Option<String> {
268        self.bg_failure
269            .lock()
270            .unwrap_or_else(PoisonError::into_inner)
271            .clone()
272    }
273
274    /// Direct handle to the latched-failure mutex. Used by the bg
275    /// flusher to record an fsync failure exactly once. Hidden from
276    /// outside the module so the latch stays single-writer.
277    #[cfg(not(target_arch = "wasm32"))]
278    pub(super) fn bg_failure_slot(&self) -> &BgFailure {
279        &self.bg_failure
280    }
281
282    fn check_healthy(&self) -> Result<(), WalError> {
283        if self
284            .bg_failure
285            .lock()
286            .map_err(|_| WalError::Poisoned)?
287            .is_some()
288        {
289            return Err(WalError::Poisoned);
290        }
291        Ok(())
292    }
293
294    /// LSN that the *next* `begin` / `append` call will allocate.
295    /// Exposed for tests and for sanity checks at boot; not part of
296    /// any durability contract.
297    pub fn next_lsn(&self) -> Lsn {
298        self.state
299            .lock()
300            .unwrap_or_else(PoisonError::into_inner)
301            .next_lsn
302    }
303
304    pub fn oldest_segment_id(&self) -> u64 {
305        self.state
306            .lock()
307            .unwrap_or_else(PoisonError::into_inner)
308            .oldest_segment_id
309            .raw()
310    }
311
312    pub fn active_segment_id(&self) -> u64 {
313        self.state
314            .lock()
315            .unwrap_or_else(PoisonError::into_inner)
316            .active_segment_id
317            .raw()
318    }
319
320    // -------------------------------------------------------------
321    // Low-level record primitives.
322    //
323    // Production code does **not** use these directly — every commit
324    // goes through [`Self::commit_tx`], which writes the begin/batch/
325    // commit triple atomically and routes durability through the
326    // configured single-thread flush policy. The methods below remain
327    // `pub` for the crate's own integration tests and for the rare
328    // admin path (`checkpoint_marker`) that needs to insert a single record.
329    // Mixing them with `commit_tx` against the same WAL is supported
330    // but unnecessary; if you find yourself calling `begin` /
331    // `append` / `commit` from a new caller, prefer `commit_tx`
332    // unless you specifically need the partial-write shape.
333    // -------------------------------------------------------------
334
335    /// Allocate a `TxBegin` record and return its LSN. *Test/admin
336    /// primitive.* Production commits use [`Self::commit_tx`].
337    ///
338    /// Rotation happens here so a transaction is always wholly within
339    /// one segment.
340    pub fn begin(&self) -> Result<Lsn, WalError> {
341        self.check_healthy()?;
342        let mut state = self.lock_state()?;
343        self.maybe_rotate(&mut state)?;
344        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxBegin { lsn })
345    }
346
347    /// Append a single mutation to the active segment's pending
348    /// buffer. *Test/admin primitive.* Not durable until `flush()`
349    /// runs; production commits use [`Self::commit_tx`].
350    pub fn append(&self, tx_begin_lsn: Lsn, event: &MutationEvent) -> Result<Lsn, WalError> {
351        self.check_healthy()?;
352        let mut state = self.lock_state()?;
353        Self::alloc_and_append(&mut state, |lsn| WalRecord::Mutation {
354            lsn,
355            tx_begin_lsn,
356            event: event.clone(),
357        })
358    }
359
360    /// Append many mutations as one framed record. *Test/admin
361    /// primitive.* Production commits use [`Self::commit_tx`], which
362    /// writes the begin/batch/commit triple in a single critical
363    /// section.
364    pub fn append_batch(
365        &self,
366        tx_begin_lsn: Lsn,
367        events: Vec<MutationEvent>,
368    ) -> Result<Lsn, WalError> {
369        self.check_healthy()?;
370        if events.is_empty() {
371            return Err(WalError::Encode(
372                "mutation batch must contain at least one event".into(),
373            ));
374        }
375        let mut state = self.lock_state()?;
376        Self::alloc_and_append(&mut state, |lsn| WalRecord::MutationBatch {
377            lsn,
378            tx_begin_lsn,
379            events,
380        })
381    }
382
383    /// Append a standalone `TxCommit` marker. *Test/admin primitive.*
384    /// Production commits use [`Self::commit_tx`].
385    pub fn commit(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
386        self.check_healthy()?;
387        let mut state = self.lock_state()?;
388        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxCommit { lsn, tx_begin_lsn })
389    }
390
391    /// Append a `TxAbort` marker. *Test/admin primitive.* Production
392    /// code never writes `TxAbort`: [`Self::commit_tx`] writes the
393    /// begin/batch/commit triple atomically, so an aborted query has
394    /// nothing on disk to mark as aborted.
395    pub fn abort(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
396        self.check_healthy()?;
397        let mut state = self.lock_state()?;
398        Self::alloc_and_append(&mut state, |lsn| WalRecord::TxAbort { lsn, tx_begin_lsn })
399    }
400
401    /// One-shot transaction commit.
402    ///
403    /// Encodes `TxBegin` + `MutationBatch` + `TxCommit` as a single
404    /// contiguous run inside one short critical section, then applies the
405    /// configured flush policy. Compared to the legacy
406    /// `begin → append_batch → commit → flush` sequence this collapses
407    /// four separate state-lock acquisitions into one while preserving the
408    /// release's single-writer execution model. Future concurrent commit
409    /// plumbing can build around this one-shot boundary without changing the
410    /// recorder contract.
411    ///
412    /// Returns [`WroteCommit::No`] for an empty event list (no records
413    /// are written, no fsync is issued).
414    pub fn commit_tx(&self, events: Vec<MutationEvent>) -> Result<WroteCommit, WalError> {
415        self.check_healthy()?;
416        if events.is_empty() {
417            return Ok(WroteCommit::No);
418        }
419
420        // Phase 1: allocate the LSN window and encode all three
421        // records into the active segment's pending buffer in one
422        // critical section. Collapsing what was four separate state
423        // lock acquisitions (begin / append_batch / commit / flush)
424        // into one is the lock-side win that pairs with the
425        // lock-free emit short-circuit on the recorder side.
426        {
427            let mut state = self.lock_state()?;
428            self.maybe_rotate(&mut state)?;
429            let begin_lsn = state.next_lsn;
430            let batch_lsn = begin_lsn
431                .checked_next()
432                .ok_or_else(|| WalError::Malformed("WAL LSN space is exhausted".into()))?;
433            let commit_lsn = batch_lsn
434                .checked_next()
435                .ok_or_else(|| WalError::Malformed("WAL LSN space is exhausted".into()))?;
436            let next_lsn = commit_lsn
437                .checked_next()
438                .ok_or_else(|| WalError::Malformed("WAL LSN space is exhausted".into()))?;
439            state.next_lsn = next_lsn;
440            state
441                .active_writer
442                .append(&WalRecord::TxBegin { lsn: begin_lsn })?;
443            state.active_writer.append(&WalRecord::MutationBatch {
444                lsn: batch_lsn,
445                tx_begin_lsn: begin_lsn,
446                events,
447            })?;
448            state.active_writer.append(&WalRecord::TxCommit {
449                lsn: commit_lsn,
450                tx_begin_lsn: begin_lsn,
451            })?;
452        }
453
454        // Phase 2: make commit bytes visible to the OS page cache. Storage
455        // durability is provided by the GroupSync flusher or an explicit
456        // force_fsync/checkpoint/sync/drop boundary.
457        self.flush_inner(FlushKind::PerConfiguredMode)?;
458
459        Ok(WroteCommit::Yes)
460    }
461
462    /// Append a `Checkpoint` marker. `snapshot_lsn` should equal the
463    /// LSN written into the snapshot file's header — replay uses
464    /// it to defend against the snapshot-rename-but-no-marker race.
465    pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
466        self.check_healthy()?;
467        let mut state = self.lock_state()?;
468        Self::alloc_and_append(&mut state, |lsn| WalRecord::Checkpoint {
469            lsn,
470            snapshot_lsn,
471        })
472    }
473
474    /// Single-source-of-truth for "allocate the next LSN, build the
475    /// record, push it onto the active segment's pending buffer".
476    /// The five public append paths (`begin / append / commit / abort
477    /// / checkpoint_marker`) all funnel through here so the LSN
478    /// allocation never gets out of sync with the encoded record.
479    #[inline]
480    fn alloc_and_append(
481        state: &mut WalState,
482        build: impl FnOnce(Lsn) -> WalRecord,
483    ) -> Result<Lsn, WalError> {
484        let lsn = state.next_lsn;
485        let next_lsn = lsn
486            .checked_next()
487            .ok_or_else(|| WalError::Malformed("WAL LSN space is exhausted".into()))?;
488        state.active_writer.append(&build(lsn))?;
489        state.next_lsn = next_lsn;
490        Ok(lsn)
491    }
492
493    /// Flush the active segment's pending buffer.
494    ///
495    /// Under [`SyncMode::GroupSync`], a normal flush writes bytes to the OS
496    /// but leaves `durable_lsn` unchanged until an explicit `force_fsync`,
497    /// checkpoint, sync, the background flusher, or clean drop.
498    pub fn flush(&self) -> Result<(), WalError> {
499        self.check_healthy()?;
500        self.flush_inner(FlushKind::PerConfiguredMode)
501    }
502
503    /// Unconditionally write the buffer to the OS, `fsync`, and
504    /// advance `durable_lsn`. Used by callers that need a durability
505    /// point right now regardless of the configured cadence (e.g.
506    /// checkpoint). Returns [`WalError::Poisoned`] if the WAL has already
507    /// latched a durability failure.
508    pub fn force_fsync(&self) -> Result<(), WalError> {
509        self.check_healthy()?;
510        self.flush_inner(FlushKind::ForceFsync)
511    }
512
513    /// Single source of truth for the flush state machine. Skips the
514    /// `check_healthy` gate so clean shutdown can force a final GroupSync
515    /// sync even if callers are otherwise done with the handle.
516    pub(super) fn flush_inner(&self, kind: FlushKind) -> Result<(), WalError> {
517        let mut state = self.lock_state()?;
518        let written_lsn = Lsn::new(state.next_lsn.raw().saturating_sub(1));
519
520        if matches!(kind, FlushKind::ForceFsync) {
521            state.active_writer.flush_and_sync()?;
522            state.durable_lsn = written_lsn;
523        } else {
524            state.active_writer.flush_buffer()?;
525        }
526        Ok(())
527    }
528
529    /// Drop sealed segments whose entire LSN range is at or below
530    /// `fence_lsn`. Idempotent and safe to call repeatedly.
531    ///
532    /// The active segment is never deleted — even if every record in
533    /// it predates the fence, it is still the rotation target for
534    /// new appends. The segment immediately before the active one
535    /// is also kept as a tombstone so a subsequent crash before the
536    /// next checkpoint still finds a self-describing log start.
537    pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
538        let mut state = self.lock_state()?;
539        let active_id = state.active_segment_id;
540        let entries = self.segments.list()?;
541
542        let mut to_drop: Vec<crate::dir::SegmentEntry> = Vec::new();
543        for (i, entry) in entries.iter().enumerate() {
544            // Active segment and the one immediately preceding it
545            // are kept by policy.
546            if entry.id >= active_id.saturating_prev() {
547                break;
548            }
549            // Segment `i` covers `[base_i, base_{i+1} - 1]`. We are
550            // safe to drop only when `base_{i+1} - 1 <= fence_lsn`.
551            let next = match entries.get(i + 1) {
552                Some(n) => n,
553                None => break,
554            };
555            let next_base = SegmentDir::base_lsn(&next.path)?;
556            if next_base.raw().saturating_sub(1) <= fence_lsn.raw() {
557                to_drop.push(entry.clone());
558            }
559        }
560
561        for entry in to_drop {
562            fs::remove_file(&entry.path)?;
563            if entry.id >= state.oldest_segment_id {
564                state.oldest_segment_id = entry.id.checked_next().ok_or_else(|| {
565                    WalError::Malformed("WAL segment id space is exhausted".into())
566                })?;
567            }
568        }
569        if state.oldest_segment_id != entries.first().map(|e| e.id).unwrap_or(active_id) {
570            self.segments.sync_dir()?;
571        }
572        Ok(())
573    }
574
575    /// Rotate the active segment when it has grown past
576    /// `segment_target_bytes`. Called from `begin()` so rotation only
577    /// ever lands at a transaction boundary.
578    fn maybe_rotate(&self, state: &mut WalState) -> Result<(), WalError> {
579        if state.active_writer.bytes_written() < self.segment_target_bytes {
580            return Ok(());
581        }
582        // Seal the current segment (forces a flush + fsync) and open
583        // a fresh one with `base_lsn = next_lsn` so the segment file
584        // names line up with the record LSNs they contain.
585        state.active_writer.seal()?;
586
587        let next_id = state
588            .active_segment_id
589            .checked_next()
590            .ok_or_else(|| WalError::Malformed("WAL segment id space is exhausted".into()))?;
591        let writer = SegmentWriter::create(self.segments.path_for(next_id), state.next_lsn)?;
592        self.segments.sync_dir()?;
593        state.active_writer = writer;
594        state.active_segment_id = next_id;
595        Ok(())
596    }
597
598    fn lock_state(&self) -> Result<MutexGuard<'_, WalState>, WalError> {
599        self.state.lock().map_err(|_| WalError::Poisoned)
600    }
601}
602
603impl Drop for Wal {
604    fn drop(&mut self) {
605        let _ = self.flush_inner(FlushKind::ForceFsync);
606        // Join the group flusher, if any, before the directory lock is
607        // released. That keeps the "one live append owner" boundary intact
608        // through shutdown.
609        #[cfg(not(target_arch = "wasm32"))]
610        if let Ok(slot) = self.flusher.get_mut() {
611            let _ = slot.take();
612        }
613    }
614}