spacetimedb_durability/imp/
local.rs1use std::{
2 io,
3 num::NonZeroU16,
4 panic,
5 sync::{
6 atomic::{
7 AtomicI64, AtomicU64,
8 Ordering::{Acquire, Relaxed, Release},
9 },
10 Arc, Weak,
11 },
12 time::Duration,
13};
14
15use anyhow::Context as _;
16use itertools::Itertools as _;
17use log::{info, trace, warn};
18use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction};
19use spacetimedb_paths::server::CommitLogDir;
20use tokio::{
21 sync::mpsc,
22 task::{spawn_blocking, AbortHandle, JoinHandle},
23 time::{interval, MissedTickBehavior},
24};
25use tracing::instrument;
26
27use crate::{Durability, History, TxOffset};
28
29#[derive(Clone, Copy, Debug)]
31pub struct Options {
32 pub sync_interval: Duration,
36 pub commitlog: spacetimedb_commitlog::Options,
38}
39
40impl Default for Options {
41 fn default() -> Self {
42 Self {
43 sync_interval: Duration::from_millis(500),
44 commitlog: Default::default(),
45 }
46 }
47}
48
49pub struct Local<T> {
61 clog: Arc<Commitlog<Txdata<T>>>,
63 durable_offset: Arc<AtomicI64>,
76 queue: mpsc::UnboundedSender<Txdata<T>>,
81 queue_depth: Arc<AtomicU64>,
86 persister_task: JoinHandle<()>,
89}
90
91impl<T: Encode + Send + Sync + 'static> Local<T> {
92 pub fn open(root: CommitLogDir, rt: tokio::runtime::Handle, opts: Options) -> io::Result<Self> {
98 info!("open local durability");
99
100 let clog = Arc::new(Commitlog::open(root, opts.commitlog)?);
101 let (queue, rx) = mpsc::unbounded_channel();
102 let queue_depth = Arc::new(AtomicU64::new(0));
103 let offset = {
104 let offset = clog.max_committed_offset().map(|x| x as i64).unwrap_or(-1);
105 Arc::new(AtomicI64::new(offset))
106 };
107
108 let persister_task = rt.spawn(
109 PersisterTask {
110 clog: clog.clone(),
111 rx,
112 queue_depth: queue_depth.clone(),
113 max_records_in_commit: opts.commitlog.max_records_in_commit,
114 }
115 .run(),
116 );
117 rt.spawn(
118 FlushAndSyncTask {
119 clog: Arc::downgrade(&clog),
120 period: opts.sync_interval,
121 offset: offset.clone(),
122 abort: persister_task.abort_handle(),
123 }
124 .run(),
125 );
126
127 Ok(Self {
128 clog,
129 durable_offset: offset,
130 queue,
131 queue_depth,
132 persister_task,
133 })
134 }
135
136 pub fn queue_depth(&self) -> u64 {
139 self.queue_depth.load(Relaxed)
140 }
141
142 pub fn commits_from(&self, offset: TxOffset) -> impl Iterator<Item = Result<Commit, error::Traversal>> {
144 self.clog.commits_from(offset).map_ok(Commit::from)
145 }
146
147 pub fn existing_segment_offsets(&self) -> io::Result<Vec<TxOffset>> {
149 self.clog.existing_segment_offsets()
150 }
151
152 pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result<()> {
154 self.clog.compress_segments(offsets)
155 }
156
157 pub async fn close(self) -> anyhow::Result<Option<TxOffset>> {
162 info!("close local durability");
163
164 drop(self.queue);
165 if let Err(e) = self.persister_task.await {
166 if e.is_panic() {
167 return Err(e).context("persister task panicked");
168 }
169 }
170
171 spawn_blocking(move || self.clog.flush_and_sync())
172 .await?
173 .context("failed to sync commitlog")
174 }
175
176 pub fn size_on_disk(&self) -> io::Result<u64> {
178 self.clog.size_on_disk()
179 }
180}
181
182struct PersisterTask<T> {
183 clog: Arc<Commitlog<Txdata<T>>>,
184 rx: mpsc::UnboundedReceiver<Txdata<T>>,
185 queue_depth: Arc<AtomicU64>,
186 max_records_in_commit: NonZeroU16,
187}
188
189impl<T: Encode + Send + Sync + 'static> PersisterTask<T> {
190 #[instrument(name = "durability::local::persister_task", skip_all)]
191 async fn run(mut self) {
192 info!("starting persister task");
193
194 while let Some(txdata) = self.rx.recv().await {
195 self.queue_depth.fetch_sub(1, Relaxed);
196 trace!("received txdata");
197
198 if self.max_records_in_commit.get() == 1 {
204 self.flush_append(txdata, true).await;
205 } else if let Err(retry) = self.clog.append(txdata) {
206 self.flush_append(retry, false).await
207 }
208
209 trace!("appended txdata");
210 }
211
212 info!("exiting persister task");
213 }
214
215 #[instrument(skip_all)]
216 async fn flush_append(&self, txdata: Txdata<T>, flush_after: bool) {
217 let clog = self.clog.clone();
218 let task = spawn_blocking(move || {
219 let mut retry = Some(txdata);
220 while let Some(txdata) = retry.take() {
221 if let Err(error::Append { txdata, source }) = clog.append_maybe_flush(txdata) {
222 flush_error(source);
223 retry = Some(txdata);
224 }
225 }
226
227 if flush_after {
228 clog.flush().map(drop).unwrap_or_else(flush_error);
229 }
230
231 trace!("flush-append succeeded");
232 })
233 .await;
234 if let Err(e) = task {
235 if e.is_panic() {
239 panic::resume_unwind(e.into_panic())
240 }
241 }
242 }
243}
244
245#[inline]
249fn flush_error(e: io::Error) {
250 warn!("error flushing commitlog: {e:?}");
251 if e.kind() == io::ErrorKind::AlreadyExists {
252 panic!("commitlog unwritable!");
253 }
254}
255
256struct FlushAndSyncTask<T> {
257 clog: Weak<Commitlog<Txdata<T>>>,
258 period: Duration,
259 offset: Arc<AtomicI64>,
260 abort: AbortHandle,
262}
263
264impl<T: Send + Sync + 'static> FlushAndSyncTask<T> {
265 #[instrument(name = "durability::local::flush_and_sync_task", skip_all)]
266 async fn run(self) {
267 info!("starting syncer task");
268
269 let mut interval = interval(self.period);
270 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
271
272 loop {
273 interval.tick().await;
274
275 let Some(clog) = self.clog.upgrade() else {
276 break;
277 };
278 if let Some(committed) = clog.max_committed_offset() {
280 let durable = self.offset.load(Acquire);
281 if durable.is_positive() && committed == durable as _ {
282 continue;
283 }
284 }
285
286 let task = spawn_blocking(move || clog.flush_and_sync()).await;
287 match task {
288 Err(e) => {
289 if e.is_panic() {
290 self.abort.abort();
291 panic::resume_unwind(e.into_panic())
292 }
293 break;
294 }
295 Ok(Err(e)) => {
296 warn!("flush failed: {e}");
297 }
298 Ok(Ok(Some(new_offset))) => {
299 trace!("synced to offset {new_offset}");
300 self.offset.store(new_offset as i64, Release);
302 }
303 Ok(Ok(None)) => {}
305 }
306 }
307
308 info!("exiting syncer task");
309 }
310}
311
312impl<T: Send + Sync + 'static> Durability for Local<T> {
313 type TxData = Txdata<T>;
314
315 fn append_tx(&self, tx: Self::TxData) {
316 self.queue.send(tx).expect("commitlog persister task vanished");
317 self.queue_depth.fetch_add(1, Relaxed);
318 }
319
320 fn durable_tx_offset(&self) -> Option<TxOffset> {
321 let offset = self.durable_offset.load(Acquire);
322 (offset > -1).then_some(offset as u64)
323 }
324}
325
326impl<T: Encode + 'static> History for Local<T> {
327 type TxData = Txdata<T>;
328
329 fn fold_transactions_from<D>(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error>
330 where
331 D: Decoder,
332 D::Error: From<error::Traversal>,
333 {
334 self.clog.fold_transactions_from(offset, decoder)
335 }
336
337 fn transactions_from<'a, D>(
338 &self,
339 offset: TxOffset,
340 decoder: &'a D,
341 ) -> impl Iterator<Item = Result<Transaction<Self::TxData>, D::Error>>
342 where
343 D: Decoder<Record = Self::TxData>,
344 D::Error: From<error::Traversal>,
345 Self::TxData: 'a,
346 {
347 self.clog.transactions_from(offset, decoder)
348 }
349
350 fn tx_range_hint(&self) -> (TxOffset, Option<TxOffset>) {
351 let min = self.clog.min_committed_offset().unwrap_or_default();
352 let max = self.clog.max_committed_offset();
353
354 (min, max)
355 }
356}