spacetimedb_commitlog/lib.rs
1use std::{
2 io,
3 num::{NonZeroU16, NonZeroU64},
4 sync::RwLock,
5};
6
7use log::trace;
8use spacetimedb_paths::server::CommitLogDir;
9
10pub mod commit;
11pub mod commitlog;
12mod index;
13pub mod repo;
14pub mod segment;
15mod varchar;
16mod varint;
17
18pub use crate::{
19 commit::{Commit, StoredCommit},
20 payload::{Decoder, Encode},
21 segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
22 varchar::Varchar,
23};
24pub mod error;
25pub mod payload;
26
27#[cfg(any(test, feature = "test"))]
28pub mod tests;
29
30/// [`Commitlog`] options.
31#[derive(Clone, Copy, Debug)]
32pub struct Options {
33 /// Set the log format version to write, and the maximum supported version.
34 ///
35 /// Choosing a payload format `T` of [`Commitlog`] should usually result in
36 /// updating the [`DEFAULT_LOG_FORMAT_VERSION`] of this crate. Sometimes it
37 /// may however be useful to set the version at runtime, e.g. to experiment
38 /// with new or very old versions.
39 ///
40 /// Default: [`DEFAULT_LOG_FORMAT_VERSION`]
41 pub log_format_version: u8,
42 /// The maximum size in bytes to which log segments should be allowed to
43 /// grow.
44 ///
45 /// Default: 1GiB
46 pub max_segment_size: u64,
47 /// The maximum number of records in a commit.
48 ///
49 /// If this number is exceeded, the commit is flushed to disk even without
50 /// explicitly calling [`Commitlog::flush`].
51 ///
52 /// Default: 65,535
53 pub max_records_in_commit: NonZeroU16,
54 /// Whenever at least this many bytes have been written to the currently
55 /// active segment, an entry is added to its offset index.
56 ///
57 /// Default: 4096
58 pub offset_index_interval_bytes: NonZeroU64,
59 /// If `true`, require that the segment must be synced to disk before an
60 /// index entry is added.
61 ///
62 /// Setting this to `false` (the default) will update the index every
63 /// `offset_index_interval_bytes`, even if the commitlog wasn't synced.
64 /// This means that the index could contain non-existent entries in the
65 /// event of a crash.
66 ///
67 /// Setting it to `true` will update the index when the commitlog is synced,
68 /// and `offset_index_interval_bytes` have been written.
69 /// This means that the index could contain fewer index entries than
70 /// strictly every `offset_index_interval_bytes`.
71 ///
72 /// Default: false
73 pub offset_index_require_segment_fsync: bool,
74}
75
76impl Default for Options {
77 fn default() -> Self {
78 Self {
79 log_format_version: DEFAULT_LOG_FORMAT_VERSION,
80 max_segment_size: 1024 * 1024 * 1024,
81 max_records_in_commit: NonZeroU16::MAX,
82 offset_index_interval_bytes: NonZeroU64::new(4096).unwrap(),
83 offset_index_require_segment_fsync: false,
84 }
85 }
86}
87
88impl Options {
89 /// Compute the length in bytes of an offset index based on the settings in
90 /// `self`.
91 pub fn offset_index_len(&self) -> u64 {
92 self.max_segment_size / self.offset_index_interval_bytes
93 }
94}
95
96/// The canonical commitlog, backed by on-disk log files.
97///
98/// Records in the log are of type `T`, which canonically is instantiated to
99/// [`payload::Txdata`].
100pub struct Commitlog<T> {
101 inner: RwLock<commitlog::Generic<repo::Fs, T>>,
102}
103
104impl<T> Commitlog<T> {
105 /// Open the log at root directory `root` with [`Options`].
106 ///
107 /// The root directory must already exist.
108 ///
109 /// Note that opening a commitlog involves I/O: some consistency checks are
110 /// performed, and the next writing position is determined.
111 ///
112 /// This is only necessary when opening the commitlog for writing. See the
113 /// free-standing functions in this module for how to traverse a read-only
114 /// commitlog.
115 pub fn open(root: CommitLogDir, opts: Options) -> io::Result<Self> {
116 let inner = commitlog::Generic::open(repo::Fs::new(root)?, opts)?;
117
118 Ok(Self {
119 inner: RwLock::new(inner),
120 })
121 }
122
123 /// Determine the maximum transaction offset considered durable.
124 ///
125 /// The offset is `None` if the log hasn't been flushed to disk yet.
126 pub fn max_committed_offset(&self) -> Option<u64> {
127 self.inner.read().unwrap().max_committed_offset()
128 }
129
130 /// Get the current epoch.
131 ///
132 /// See also: [`Commit::epoch`].
133 pub fn epoch(&self) -> u64 {
134 self.inner.read().unwrap().epoch()
135 }
136
137 /// Update the current epoch.
138 ///
139 /// Does nothing if the given `epoch` is equal to the current epoch.
140 /// Otherwise flushes outstanding transactions to disk (equivalent to
141 /// [`Self::flush`]) before updating the epoch.
142 ///
143 /// Returns the maximum transaction offset written to disk. The offset is
144 /// `None` if the log is empty and no data was pending to be flushed.
145 ///
146 /// # Errors
147 ///
148 /// If `epoch` is smaller than the current epoch, an error of kind
149 /// [`io::ErrorKind::InvalidInput`] is returned.
150 ///
151 /// Errors from the implicit flush are propagated.
152 pub fn set_epoch(&self, epoch: u64) -> io::Result<Option<u64>> {
153 let mut inner = self.inner.write().unwrap();
154 inner.set_epoch(epoch)?;
155
156 Ok(inner.max_committed_offset())
157 }
158
159 /// Sync all OS-buffered writes to disk.
160 ///
161 /// Note that this does **not** write outstanding records to disk.
162 /// Use [`Self::flush_and_sync`] or call [`Self::flush`] prior to this
163 /// method to ensure all data is on disk.
164 ///
165 /// Returns the maximum transaction offset which is considered durable after
166 /// this method returns successfully. The offset is `None` if the log hasn't
167 /// been flushed to disk yet.
168 ///
169 /// # Panics
170 ///
171 /// This method panics if syncing fails irrecoverably.
172 pub fn sync(&self) -> Option<u64> {
173 let mut inner = self.inner.write().unwrap();
174 trace!("sync commitlog");
175 inner.sync();
176
177 inner.max_committed_offset()
178 }
179
180 /// Write all outstanding transaction records to disk.
181 ///
182 /// Note that this does **not** force the OS to sync the data to disk.
183 /// Use [`Self::flush_and_sync`] or call [`Self::sync`] after this method
184 /// to ensure all data is on disk.
185 ///
186 /// Returns the maximum transaction offset written to disk. The offset is
187 /// `None` if the log is empty and no data was pending to be flushed.
188 ///
189 /// Repeatedly calling this method may return the same value.
190 pub fn flush(&self) -> io::Result<Option<u64>> {
191 let mut inner = self.inner.write().unwrap();
192 trace!("flush commitlog");
193 inner.commit()?;
194
195 Ok(inner.max_committed_offset())
196 }
197
198 /// Write all outstanding transaction records to disk and flush OS buffers.
199 ///
200 /// Equivalent to calling [`Self::flush`] followed by [`Self::sync`], but
201 /// without releasing the write lock in between.
202 ///
203 /// # Errors
204 ///
205 /// An error is returned if writing to disk fails due to an I/O error.
206 ///
207 /// # Panics
208 ///
209 /// This method panics if syncing fails irrecoverably.
210 pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
211 let mut inner = self.inner.write().unwrap();
212 trace!("flush and sync commitlog");
213 inner.commit()?;
214 inner.sync();
215
216 Ok(inner.max_committed_offset())
217 }
218
219 /// Obtain an iterator which traverses the log from the start, yielding
220 /// [`StoredCommit`]s.
221 ///
222 /// The returned iterator is not aware of segment rotation. That is, if a
223 /// new segment is created after this method returns, the iterator will not
224 /// traverse it.
225 ///
226 /// Commits appended to the log while it is being traversed are generally
227 /// visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
228 /// however, a new iterator should be created using [`Self::commits_from`]
229 /// with the last transaction offset yielded.
230 ///
231 /// Note that the very last [`StoredCommit`] in a commitlog may be corrupt
232 /// (e.g. due to a partial write to disk), but a subsequent `append` will
233 /// bring the log into a consistent state.
234 ///
235 /// This means that, when this iterator yields an `Err` value, the consumer
236 /// may want to check if the iterator is exhausted (by calling `next()`)
237 /// before treating the `Err` value as an application error.
238 pub fn commits(&self) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
239 self.commits_from(0)
240 }
241
242 /// Obtain an iterator starting from transaction offset `offset`, yielding
243 /// [`StoredCommit`]s.
244 ///
245 /// Similar to [`Self::commits`] but will skip until the offset is contained
246 /// in the next [`StoredCommit`] to yield.
247 ///
248 /// Note that the first [`StoredCommit`] yielded is the first commit
249 /// containing the given transaction offset, i.e. its `min_tx_offset` may be
250 /// smaller than `offset`.
251 pub fn commits_from(&self, offset: u64) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
252 self.inner.read().unwrap().commits_from(offset)
253 }
254
255 /// Remove all data from the log and reopen it.
256 ///
257 /// Log segments are deleted starting from the newest. As multiple segments
258 /// cannot be deleted atomically, the log may not be completely empty if
259 /// the method returns an error.
260 ///
261 /// Note that the method consumes `self` to ensure the log is not modified
262 /// while resetting.
263 pub fn reset(self) -> io::Result<Self> {
264 let inner = self.inner.into_inner().unwrap().reset()?;
265 Ok(Self {
266 inner: RwLock::new(inner),
267 })
268 }
269
270 /// Remove all data past the given transaction `offset` from the log and
271 /// reopen it.
272 ///
273 /// Like with [`Self::reset`], it may happen that not all segments newer
274 /// than `offset` can be deleted.
275 ///
276 /// If the method returns successfully, the most recent [`Commit`] in the
277 /// log will contain the transaction at `offset`.
278 ///
279 /// Note that the method consumes `self` to ensure the log is not modified
280 /// while resetting.
281 pub fn reset_to(self, offset: u64) -> io::Result<Self> {
282 let inner = self.inner.into_inner().unwrap().reset_to(offset)?;
283 Ok(Self {
284 inner: RwLock::new(inner),
285 })
286 }
287
288 /// Determine the size on disk of this commitlog.
289 pub fn size_on_disk(&self) -> io::Result<u64> {
290 let inner = self.inner.read().unwrap();
291 inner.repo.size_on_disk()
292 }
293}
294
295impl<T: Encode> Commitlog<T> {
296 /// Append the record `txdata` to the log.
297 ///
298 /// If the internal buffer exceeds [`Options::max_records_in_commit`], the
299 /// argument is returned in an `Err`. The caller should [`Self::flush`] the
300 /// log and try again.
301 ///
302 /// In case the log is appended to from multiple threads, this may result in
303 /// a busy loop trying to acquire a slot in the buffer. In such scenarios,
304 /// [`Self::append_maybe_flush`] is preferable.
305 pub fn append(&self, txdata: T) -> Result<(), T> {
306 let mut inner = self.inner.write().unwrap();
307 inner.append(txdata)
308 }
309
310 /// Append the record `txdata` to the log.
311 ///
312 /// The `txdata` payload is buffered in memory until either:
313 ///
314 /// - [`Self::flush`] is called explicitly, or
315 /// - [`Options::max_records_in_commit`] is exceeded
316 ///
317 /// In the latter case, [`Self::append`] flushes implicitly, _before_
318 /// appending the `txdata` argument.
319 ///
320 /// I.e. the argument is not guaranteed to be flushed after the method
321 /// returns. If that is desired, [`Self::flush`] must be called explicitly.
322 ///
323 /// # Errors
324 ///
325 /// If the log needs to be flushed, but an I/O error occurs, ownership of
326 /// `txdata` is returned back to the caller alongside the [`io::Error`].
327 ///
328 /// The value can then be used to retry appending.
329 pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
330 let mut inner = self.inner.write().unwrap();
331
332 if let Err(txdata) = inner.append(txdata) {
333 if let Err(source) = inner.commit() {
334 return Err(error::Append { txdata, source });
335 }
336 // `inner.commit.n` must be zero at this point
337 let res = inner.append(txdata);
338 debug_assert!(res.is_ok(), "failed to append while holding write lock");
339 }
340
341 Ok(())
342 }
343
344 /// Obtain an iterator which traverses the log from the start, yielding
345 /// [`Transaction`]s.
346 ///
347 /// The provided `decoder`'s [`Decoder::decode_record`] method will be
348 /// called [`Commit::n`] times per [`Commit`] to obtain the individual
349 /// transaction payloads.
350 ///
351 /// Like [`Self::commits`], the iterator is not aware of segment rotation.
352 /// That is, if a new segment is created after this method returns, the
353 /// iterator will not traverse it.
354 ///
355 /// Transactions appended to the log while it is being traversed are
356 /// generally visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
357 /// however, a new iterator should be created using [`Self::transactions_from`]
358 /// with the last transaction offset yielded.
359 ///
360 /// Note that the very last [`Commit`] in a commitlog may be corrupt (e.g.
361 /// due to a partial write to disk), but a subsequent `append` will bring
362 /// the log into a consistent state.
363 ///
364 /// This means that, when this iterator yields an `Err` value, the consumer
365 /// may want to check if the iterator is exhausted (by calling `next()`)
366 /// before treating the `Err` value as an application error.
367 pub fn transactions<'a, D>(&self, de: &'a D) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
368 where
369 D: Decoder<Record = T>,
370 D::Error: From<error::Traversal>,
371 T: 'a,
372 {
373 self.transactions_from(0, de)
374 }
375
376 /// Obtain an iterator starting from transaction offset `offset`, yielding
377 /// [`Transaction`]s.
378 ///
379 /// Similar to [`Self::transactions`] but will skip until the provided
380 /// `offset`, i.e. the first [`Transaction`] yielded will be the transaction
381 /// with offset `offset`.
382 pub fn transactions_from<'a, D>(
383 &self,
384 offset: u64,
385 de: &'a D,
386 ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
387 where
388 D: Decoder<Record = T>,
389 D::Error: From<error::Traversal>,
390 T: 'a,
391 {
392 self.inner.read().unwrap().transactions_from(offset, de)
393 }
394
395 /// Traverse the log from the start and "fold" its transactions into the
396 /// provided [`Decoder`].
397 ///
398 /// A [`Decoder`] is a stateful object due to the requirement to store
399 /// schema information in the log itself. That is, a [`Decoder`] may need to
400 /// be able to resolve transaction schema information dynamically while
401 /// traversing the log.
402 ///
403 /// This is equivalent to "replaying" a log into a database state. In this
404 /// scenario, it is not interesting to consume the [`Transaction`] payload
405 /// as an iterator.
406 ///
407 /// This method allows the use of a [`Decoder`] which returns zero-sized
408 /// data (e.g. `Decoder<Record = ()>`), as it will not allocate the commit
409 /// payload into a struct.
410 ///
411 /// Note that, unlike [`Self::transactions`], this method will ignore a
412 /// corrupt commit at the very end of the traversed log.
413 pub fn fold_transactions<D>(&self, de: D) -> Result<(), D::Error>
414 where
415 D: Decoder,
416 D::Error: From<error::Traversal>,
417 {
418 self.fold_transactions_from(0, de)
419 }
420
421 /// Traverse the log from the given transaction offset and "fold" its
422 /// transactions into the provided [`Decoder`].
423 ///
424 /// Similar to [`Self::fold_transactions`] but will skip until the provided
425 /// `offset`, i.e. the first `tx_offset` passed to [`Decoder::decode_record`]
426 /// will be equal to `offset`.
427 pub fn fold_transactions_from<D>(&self, offset: u64, de: D) -> Result<(), D::Error>
428 where
429 D: Decoder,
430 D::Error: From<error::Traversal>,
431 {
432 self.inner.read().unwrap().fold_transactions_from(offset, de)
433 }
434}
435
436/// Obtain an iterator which traverses the commitlog located at the `root`
437/// directory from the start, yielding [`StoredCommit`]s.
438///
439/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
440/// See [`Commitlog::commits`] for more information.
441pub fn commits(root: CommitLogDir) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
442 commits_from(root, 0)
443}
444
445/// Obtain an iterator which traverses the commitlog located at the `root`
446/// directory starting from `offset` and yielding [`StoredCommit`]s.
447///
448/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
449/// See [`Commitlog::commits_from`] for more information.
450pub fn commits_from(
451 root: CommitLogDir,
452 offset: u64,
453) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
454 commitlog::commits_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset)
455}
456
457/// Obtain an iterator which traverses the commitlog located at the `root`
458/// directory from the start, yielding [`Transaction`]s.
459///
460/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
461/// See [`Commitlog::transactions`] for more information.
462pub fn transactions<'a, D, T>(
463 root: CommitLogDir,
464 de: &'a D,
465) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
466where
467 D: Decoder<Record = T>,
468 D::Error: From<error::Traversal>,
469 T: 'a,
470{
471 transactions_from(root, 0, de)
472}
473
474/// Obtain an iterator which traverses the commitlog located at the `root`
475/// directory starting from `offset` and yielding [`Transaction`]s.
476///
477/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
478/// See [`Commitlog::transactions_from`] for more information.
479pub fn transactions_from<'a, D, T>(
480 root: CommitLogDir,
481 offset: u64,
482 de: &'a D,
483) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
484where
485 D: Decoder<Record = T>,
486 D::Error: From<error::Traversal>,
487 T: 'a,
488{
489 commitlog::transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
490}
491
492/// Traverse the commitlog located at the `root` directory from the start and
493/// "fold" its transactions into the provided [`Decoder`].
494///
495/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
496/// See [`Commitlog::fold_transactions`] for more information.
497pub fn fold_transactions<D>(root: CommitLogDir, de: D) -> Result<(), D::Error>
498where
499 D: Decoder,
500 D::Error: From<error::Traversal> + From<io::Error>,
501{
502 fold_transactions_from(root, 0, de)
503}
504
505/// Traverse the commitlog located at the `root` directory starting from `offset`
506/// and "fold" its transactions into the provided [`Decoder`].
507///
508/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
509/// See [`Commitlog::fold_transactions_from`] for more information.
510pub fn fold_transactions_from<D>(root: CommitLogDir, offset: u64, de: D) -> Result<(), D::Error>
511where
512 D: Decoder,
513 D::Error: From<error::Traversal> + From<io::Error>,
514{
515 commitlog::fold_transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
516}