spacetimedb_commitlog/repo/
fs.rs

1use std::fs::{self, File};
2use std::io;
3
4use log::{debug, warn};
5use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader};
6use spacetimedb_paths::server::{CommitLogDir, SegmentFile};
7use tempfile::NamedTempFile;
8
9use crate::segment::FileLike;
10
11use super::{Repo, SegmentLen, TxOffset, TxOffsetIndex, TxOffsetIndexMut};
12
13const SEGMENT_FILE_EXT: &str = ".stdb.log";
14
15// TODO
16//
17// - should use advisory locks?
18//
19// Experiment:
20//
21// - O_DIRECT | O_DSYNC
22// - preallocation of disk space
23// - io_uring
24//
25
26/// A commitlog repository [`Repo`] which stores commits in ordinary files on
27/// disk.
28#[derive(Clone, Debug)]
29pub struct Fs {
30    /// The base directory within which segment files will be stored.
31    root: CommitLogDir,
32}
33
34impl Fs {
35    /// Create a commitlog repository which stores segments in the directory `root`.
36    ///
37    /// `root` must name an extant, accessible, writeable directory.
38    pub fn new(root: CommitLogDir) -> io::Result<Self> {
39        root.create()?;
40        Ok(Self { root })
41    }
42
43    /// Get the filename for a segment starting with `offset` within this
44    /// repository.
45    pub fn segment_path(&self, offset: u64) -> SegmentFile {
46        self.root.segment(offset)
47    }
48
49    /// Determine the size on disk as the sum of the sizes of all segments.
50    ///
51    /// Note that the actively written-to segment (if any) is included.
52    pub fn size_on_disk(&self) -> io::Result<u64> {
53        let mut sz = 0;
54        for offset in self.existing_offsets()? {
55            sz += self.segment_path(offset).metadata()?.len();
56            // Add the size of the offset index file if present
57            sz += self.root.index(offset).metadata().map(|m| m.len()).unwrap_or(0);
58        }
59
60        Ok(sz)
61    }
62}
63
64impl SegmentLen for File {}
65
66impl FileLike for NamedTempFile {
67    fn fsync(&mut self) -> io::Result<()> {
68        self.as_file_mut().fsync()
69    }
70
71    fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
72        self.as_file_mut().ftruncate(tx_offset, size)
73    }
74}
75
76impl Repo for Fs {
77    type SegmentWriter = File;
78    type SegmentReader = CompressReader;
79
80    fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
81        File::options()
82            .read(true)
83            .append(true)
84            .create_new(true)
85            .open(self.segment_path(offset))
86            .or_else(|e| {
87                if e.kind() == io::ErrorKind::AlreadyExists {
88                    debug!("segment {offset} already exists");
89                    let file = self.open_segment_writer(offset)?;
90                    if file.metadata()?.len() == 0 {
91                        debug!("segment {offset} is empty");
92                        return Ok(file);
93                    }
94                }
95
96                Err(e)
97            })
98    }
99
100    fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
101        File::options().read(true).append(true).open(self.segment_path(offset))
102    }
103
104    fn open_segment_reader(&self, offset: u64) -> io::Result<Self::SegmentReader> {
105        let file = File::open(self.segment_path(offset))?;
106        CompressReader::new(file)
107    }
108
109    fn remove_segment(&self, offset: u64) -> io::Result<()> {
110        let _ = self.remove_offset_index(offset).map_err(|e| {
111            warn!("failed to remove offset index for segment {offset}, error: {e}");
112        });
113        fs::remove_file(self.segment_path(offset))
114    }
115
116    fn compress_segment(&self, offset: u64) -> io::Result<()> {
117        let src = self.open_segment_reader(offset)?;
118        // if it's already compressed, leave it be
119        let CompressReader::None(mut src) = src else {
120            return Ok(());
121        };
122
123        let mut dst = NamedTempFile::new_in(&self.root)?;
124        // bytes per frame. in the future, it might be worth looking into putting
125        // every commit into its own frame, to make seeking more efficient.
126        let max_frame_size = 0x1000;
127        compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?;
128        dst.persist(self.segment_path(offset))?;
129        Ok(())
130    }
131
132    fn existing_offsets(&self) -> io::Result<Vec<u64>> {
133        let mut segments = Vec::new();
134
135        for entry in fs::read_dir(&self.root)? {
136            let entry = entry?;
137            if entry.file_type()?.is_file() {
138                let path = entry.path();
139                let name = path.file_name().unwrap_or_default().to_string_lossy();
140                let Some(file_name) = name.strip_suffix(SEGMENT_FILE_EXT) else {
141                    continue;
142                };
143                let Ok(offset) = file_name.parse::<u64>() else {
144                    continue;
145                };
146
147                segments.push(offset);
148            }
149        }
150
151        segments.sort_unstable();
152
153        Ok(segments)
154    }
155
156    fn create_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result<TxOffsetIndexMut> {
157        TxOffsetIndexMut::create_index_file(&self.root.index(offset), cap)
158    }
159
160    fn remove_offset_index(&self, offset: TxOffset) -> io::Result<()> {
161        TxOffsetIndexMut::delete_index_file(&self.root.index(offset))
162    }
163
164    fn get_offset_index(&self, offset: TxOffset) -> io::Result<TxOffsetIndex> {
165        TxOffsetIndex::open_index_file(&self.root.index(offset))
166    }
167}
168
169impl SegmentLen for CompressReader {}
170
171#[cfg(feature = "streaming")]
172impl crate::stream::AsyncRepo for Fs {
173    type AsyncSegmentWriter = tokio::io::BufWriter<tokio::fs::File>;
174    type AsyncSegmentReader = spacetimedb_fs_utils::compression::AsyncCompressReader<tokio::fs::File>;
175
176    async fn open_segment_reader_async(&self, offset: u64) -> io::Result<Self::AsyncSegmentReader> {
177        let file = tokio::fs::File::open(self.segment_path(offset)).await?;
178        spacetimedb_fs_utils::compression::AsyncCompressReader::new(file).await
179    }
180}
181
182#[cfg(feature = "streaming")]
183impl<T> crate::stream::AsyncLen for spacetimedb_fs_utils::compression::AsyncCompressReader<T> where
184    T: tokio::io::AsyncSeek + tokio::io::AsyncRead + Unpin + Send
185{
186}