spacetimedb_durability/imp/
local.rs

1use std::{
2    io,
3    num::NonZeroU16,
4    panic,
5    sync::{
6        atomic::{
7            AtomicI64, AtomicU64,
8            Ordering::{Acquire, Relaxed, Release},
9        },
10        Arc, Weak,
11    },
12    time::Duration,
13};
14
15use anyhow::Context as _;
16use itertools::Itertools as _;
17use log::{info, trace, warn};
18use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction};
19use spacetimedb_paths::server::CommitLogDir;
20use tokio::{
21    sync::mpsc,
22    task::{spawn_blocking, AbortHandle, JoinHandle},
23    time::{interval, MissedTickBehavior},
24};
25use tracing::instrument;
26
27use crate::{Durability, History, TxOffset};
28
29/// [`Local`] configuration.
30#[derive(Clone, Copy, Debug)]
31pub struct Options {
32    /// Periodically flush and sync the log this often.
33    ///
34    /// Default: 500ms
35    pub sync_interval: Duration,
36    /// [`Commitlog`] configuration.
37    pub commitlog: spacetimedb_commitlog::Options,
38}
39
40impl Default for Options {
41    fn default() -> Self {
42        Self {
43            sync_interval: Duration::from_millis(500),
44            commitlog: Default::default(),
45        }
46    }
47}
48
49/// [`Durability`] implementation backed by a [`Commitlog`] on local storage.
50///
51/// The commitlog is constrained to store the canonical [`Txdata`] payload,
52/// where the generic parameter `T` is the type of the row data stored in
53/// the mutations section.
54///
55/// `T` is left generic in order to allow bypassing the `ProductValue`
56/// intermediate representation in the future.
57///
58/// Note, however, that instantiating `T` to a different type may require to
59/// change the log format version!
60pub struct Local<T> {
61    /// The [`Commitlog`] this [`Durability`] and [`History`] impl wraps.
62    clog: Arc<Commitlog<Txdata<T>>>,
63    /// The durable transaction offset, as reported by the background
64    /// [`FlushAndSyncTask`].
65    ///
66    /// A negative number indicates that we haven't flushed yet, or that the
67    /// number overflowed. In either case, appending new transactions shall panic.
68    ///
69    /// The offset will be used by the datastore to squash durable transactions
70    /// into the committed state, thereby making them visible to durable-only
71    /// readers.
72    ///
73    /// We don't want to hang on to those transactions longer than needed, so
74    /// acquire / release or stronger should be used to prevent stale reads.
75    durable_offset: Arc<AtomicI64>,
76    /// Backlog of transactions to be written to disk by the background
77    /// [`PersisterTask`].
78    ///
79    /// Note that this is unbounded!
80    queue: mpsc::UnboundedSender<Txdata<T>>,
81    /// How many transactions are sitting in the `queue`.
82    ///
83    /// This is mainly for observability purposes, and can thus be updated with
84    /// relaxed memory ordering.
85    queue_depth: Arc<AtomicU64>,
86    /// Handle to the [`PersisterTask`], allowing to drain the `queue` when
87    /// explicitly dropped via [`Self::close`].
88    persister_task: JoinHandle<()>,
89}
90
91impl<T: Encode + Send + Sync + 'static> Local<T> {
92    /// Create a [`Local`] instance at the `root` directory.
93    ///
94    /// The `root` directory must already exist.
95    ///
96    /// Background tasks are spawned onto the provided tokio runtime.
97    pub fn open(root: CommitLogDir, rt: tokio::runtime::Handle, opts: Options) -> io::Result<Self> {
98        info!("open local durability");
99
100        let clog = Arc::new(Commitlog::open(root, opts.commitlog)?);
101        let (queue, rx) = mpsc::unbounded_channel();
102        let queue_depth = Arc::new(AtomicU64::new(0));
103        let offset = {
104            let offset = clog.max_committed_offset().map(|x| x as i64).unwrap_or(-1);
105            Arc::new(AtomicI64::new(offset))
106        };
107
108        let persister_task = rt.spawn(
109            PersisterTask {
110                clog: clog.clone(),
111                rx,
112                queue_depth: queue_depth.clone(),
113                max_records_in_commit: opts.commitlog.max_records_in_commit,
114            }
115            .run(),
116        );
117        rt.spawn(
118            FlushAndSyncTask {
119                clog: Arc::downgrade(&clog),
120                period: opts.sync_interval,
121                offset: offset.clone(),
122                abort: persister_task.abort_handle(),
123            }
124            .run(),
125        );
126
127        Ok(Self {
128            clog,
129            durable_offset: offset,
130            queue,
131            queue_depth,
132            persister_task,
133        })
134    }
135
136    /// Inspect how many transactions added via [`Self::append_tx`] are pending
137    /// to be applied to the underlying [`Commitlog`].
138    pub fn queue_depth(&self) -> u64 {
139        self.queue_depth.load(Relaxed)
140    }
141
142    /// Obtain an iterator over the [`Commit`]s in the underlying log.
143    pub fn commits_from(&self, offset: TxOffset) -> impl Iterator<Item = Result<Commit, error::Traversal>> {
144        self.clog.commits_from(offset).map_ok(Commit::from)
145    }
146
147    /// Get a list of segment offsets, sorted in ascending order.
148    pub fn existing_segment_offsets(&self) -> io::Result<Vec<TxOffset>> {
149        self.clog.existing_segment_offsets()
150    }
151
152    /// Compress the segments at the offsets provided, marking them as immutable.
153    pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result<()> {
154        self.clog.compress_segments(offsets)
155    }
156
157    /// Apply all outstanding transactions to the [`Commitlog`] and flush it
158    /// to disk.
159    ///
160    /// Returns the durable [`TxOffset`], if any.
161    pub async fn close(self) -> anyhow::Result<Option<TxOffset>> {
162        info!("close local durability");
163
164        drop(self.queue);
165        if let Err(e) = self.persister_task.await {
166            if e.is_panic() {
167                return Err(e).context("persister task panicked");
168            }
169        }
170
171        spawn_blocking(move || self.clog.flush_and_sync())
172            .await?
173            .context("failed to sync commitlog")
174    }
175
176    /// Get the size on disk of the underlying [`Commitlog`].
177    pub fn size_on_disk(&self) -> io::Result<u64> {
178        self.clog.size_on_disk()
179    }
180}
181
182struct PersisterTask<T> {
183    clog: Arc<Commitlog<Txdata<T>>>,
184    rx: mpsc::UnboundedReceiver<Txdata<T>>,
185    queue_depth: Arc<AtomicU64>,
186    max_records_in_commit: NonZeroU16,
187}
188
189impl<T: Encode + Send + Sync + 'static> PersisterTask<T> {
190    #[instrument(name = "durability::local::persister_task", skip_all)]
191    async fn run(mut self) {
192        info!("starting persister task");
193
194        while let Some(txdata) = self.rx.recv().await {
195            self.queue_depth.fetch_sub(1, Relaxed);
196            trace!("received txdata");
197
198            // If we are writing one commit per tx, trying to buffer is
199            // fairly pointless. Immediately flush instead.
200            //
201            // Otherwise, try `Commitlog::append` as a fast-path which doesn't
202            // require `spawn_blocking`.
203            if self.max_records_in_commit.get() == 1 {
204                self.flush_append(txdata, true).await;
205            } else if let Err(retry) = self.clog.append(txdata) {
206                self.flush_append(retry, false).await
207            }
208
209            trace!("appended txdata");
210        }
211
212        info!("exiting persister task");
213    }
214
215    #[instrument(skip_all)]
216    async fn flush_append(&self, txdata: Txdata<T>, flush_after: bool) {
217        let clog = self.clog.clone();
218        let task = spawn_blocking(move || {
219            let mut retry = Some(txdata);
220            while let Some(txdata) = retry.take() {
221                if let Err(error::Append { txdata, source }) = clog.append_maybe_flush(txdata) {
222                    flush_error(source);
223                    retry = Some(txdata);
224                }
225            }
226
227            if flush_after {
228                clog.flush().map(drop).unwrap_or_else(flush_error);
229            }
230
231            trace!("flush-append succeeded");
232        })
233        .await;
234        if let Err(e) = task {
235            // Resume panic on the spawned task,
236            // which will drop the channel receiver,
237            // which will cause `append_tx` to panic.
238            if e.is_panic() {
239                panic::resume_unwind(e.into_panic())
240            }
241        }
242    }
243}
244
245/// Handle an error flushing the commitlog.
246///
247/// Panics if the error indicates that the log may be permanently unwritable.
248#[inline]
249fn flush_error(e: io::Error) {
250    warn!("error flushing commitlog: {e:?}");
251    if e.kind() == io::ErrorKind::AlreadyExists {
252        panic!("commitlog unwritable!");
253    }
254}
255
256struct FlushAndSyncTask<T> {
257    clog: Weak<Commitlog<Txdata<T>>>,
258    period: Duration,
259    offset: Arc<AtomicI64>,
260    /// Handle to abort the [`PersisterTask`] if fsync panics.
261    abort: AbortHandle,
262}
263
264impl<T: Send + Sync + 'static> FlushAndSyncTask<T> {
265    #[instrument(name = "durability::local::flush_and_sync_task", skip_all)]
266    async fn run(self) {
267        info!("starting syncer task");
268
269        let mut interval = interval(self.period);
270        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
271
272        loop {
273            interval.tick().await;
274
275            let Some(clog) = self.clog.upgrade() else {
276                break;
277            };
278            // Skip if nothing changed.
279            if let Some(committed) = clog.max_committed_offset() {
280                let durable = self.offset.load(Acquire);
281                if durable.is_positive() && committed == durable as _ {
282                    continue;
283                }
284            }
285
286            let task = spawn_blocking(move || clog.flush_and_sync()).await;
287            match task {
288                Err(e) => {
289                    if e.is_panic() {
290                        self.abort.abort();
291                        panic::resume_unwind(e.into_panic())
292                    }
293                    break;
294                }
295                Ok(Err(e)) => {
296                    warn!("flush failed: {e}");
297                }
298                Ok(Ok(Some(new_offset))) => {
299                    trace!("synced to offset {new_offset}");
300                    // NOTE: Overflow will make `durable_tx_offset` return `None`
301                    self.offset.store(new_offset as i64, Release);
302                }
303                // No data to flush.
304                Ok(Ok(None)) => {}
305            }
306        }
307
308        info!("exiting syncer task");
309    }
310}
311
312impl<T: Send + Sync + 'static> Durability for Local<T> {
313    type TxData = Txdata<T>;
314
315    fn append_tx(&self, tx: Self::TxData) {
316        self.queue.send(tx).expect("commitlog persister task vanished");
317        self.queue_depth.fetch_add(1, Relaxed);
318    }
319
320    fn durable_tx_offset(&self) -> Option<TxOffset> {
321        let offset = self.durable_offset.load(Acquire);
322        (offset > -1).then_some(offset as u64)
323    }
324}
325
326impl<T: Encode + 'static> History for Local<T> {
327    type TxData = Txdata<T>;
328
329    fn fold_transactions_from<D>(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error>
330    where
331        D: Decoder,
332        D::Error: From<error::Traversal>,
333    {
334        self.clog.fold_transactions_from(offset, decoder)
335    }
336
337    fn transactions_from<'a, D>(
338        &self,
339        offset: TxOffset,
340        decoder: &'a D,
341    ) -> impl Iterator<Item = Result<Transaction<Self::TxData>, D::Error>>
342    where
343        D: Decoder<Record = Self::TxData>,
344        D::Error: From<error::Traversal>,
345        Self::TxData: 'a,
346    {
347        self.clog.transactions_from(offset, decoder)
348    }
349
350    fn tx_range_hint(&self) -> (TxOffset, Option<TxOffset>) {
351        let min = self.clog.min_committed_offset().unwrap_or_default();
352        let max = self.clog.max_committed_offset();
353
354        (min, max)
355    }
356}