spacetimedb_paths/
server.rs

1use std::{fs, io};
2
3use crate::utils::{path_type, PathBufExt};
4use chrono::NaiveDate;
5
6path_type! {
7    /// The data-dir, where all database data is stored for a spacetime server process.
8    ServerDataDir: dir
9}
10
11impl ServerDataDir {
12    pub fn config_toml(&self) -> ConfigToml {
13        ConfigToml(self.0.join("config.toml"))
14    }
15
16    pub fn logs(&self) -> LogsDir {
17        LogsDir(self.0.join("logs"))
18    }
19
20    pub fn wasmtime_cache(&self) -> WasmtimeCacheDir {
21        WasmtimeCacheDir(self.0.join("cache/wasmtime"))
22    }
23
24    pub fn metadata_toml(&self) -> MetadataTomlPath {
25        MetadataTomlPath(self.0.join("metadata.toml"))
26    }
27
28    pub fn pid_file(&self) -> Result<PidFile, PidFileError> {
29        use fs2::FileExt;
30        use io::{Read, Write};
31        self.create()?;
32        let path = self.0.join("spacetime.pid");
33        let mut file = fs::File::options()
34            .create(true)
35            .write(true)
36            .truncate(false)
37            .read(true)
38            .open(&path)?;
39        match file.try_lock_exclusive() {
40            Ok(()) => {}
41            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
42                let mut s = String::new();
43                let pid = file.read_to_string(&mut s).ok().and_then(|_| s.trim().parse().ok());
44                return Err(PidFileError::Exists { pid });
45            }
46            Err(e) => return Err(e.into()),
47        }
48        let mut pidfile = PidFile { file, path };
49        pidfile.file.set_len(0)?;
50        write!(pidfile.file, "{}", std::process::id())?;
51        pidfile.file.flush()?;
52        Ok(pidfile)
53    }
54
55    pub fn replica(&self, replica_id: u64) -> ReplicaDir {
56        ReplicaDir(self.0.join("replicas").joined_int(replica_id))
57    }
58}
59
60path_type! {
61    /// The `config.toml` file, where server configuration is stored.
62    ConfigToml: file
63}
64
65path_type! {
66    /// The directory in which server logs are to be stored.
67    ///
68    /// The files in this directory have the naming format `spacetime-{edition}.YYYY-MM-DD.log`.
69    LogsDir: dir
70}
71
72impl LogsDir {
73    // we can't be as strongly typed as we might like here, because `tracing_subscriber`'s
74    // `RollingFileAppender` specifically takes the prefix and suffix of the filename, and
75    // sticks the date in between them - so we have to expose those components of the
76    // filename separately, rather than `fn logfile(&self, edition, date) -> LogFilePath`
77
78    /// The prefix before the first `.` of a logfile name.
79    pub fn filename_prefix(edition: &str) -> String {
80        format!("spacetime-{edition}")
81    }
82
83    /// The file extension of a logfile name.
84    pub fn filename_extension() -> String {
85        "log".to_owned()
86    }
87}
88
89path_type! {
90    /// The directory we give to wasmtime to cache its compiled artifacts in.
91    WasmtimeCacheDir: dir
92}
93
94path_type! {
95    /// The `metadata.toml` file, where metadata about the server that owns this data-dir
96    /// is stored. Machine-writable only.
97    MetadataTomlPath: file
98}
99
100#[derive(thiserror::Error, Debug)]
101pub enum PidFileError {
102    #[error("error while taking database lock on spacetime.pid")]
103    Io(#[from] io::Error),
104    #[error("cannot take lock on database; spacetime.pid already exists (owned by pid {pid:?})")]
105    Exists { pid: Option<u32> },
106}
107
108/// Removes file upon drop
109pub struct PidFile {
110    file: fs::File,
111    path: std::path::PathBuf,
112}
113
114impl Drop for PidFile {
115    fn drop(&mut self) {
116        let _ = fs::remove_file(&self.path);
117    }
118}
119
120path_type! {
121    /// A replica directory, where all the data for a module's database is stored.
122    /// `{data-dir}/replicas/$replica_id/`
123    ReplicaDir: dir
124}
125
126impl ReplicaDir {
127    pub fn module_logs(self) -> ModuleLogsDir {
128        ModuleLogsDir(self.0.joined("module_logs"))
129    }
130
131    pub fn snapshots(&self) -> SnapshotsPath {
132        SnapshotsPath(self.0.join("snapshots"))
133    }
134
135    pub fn commit_log(&self) -> CommitLogDir {
136        CommitLogDir(self.0.join("clog"))
137    }
138}
139
140path_type! {
141    /// The directory where module logs are stored
142    ModuleLogsDir: dir
143}
144
145impl ModuleLogsDir {
146    /// `date` should be in UTC.
147    pub fn logfile(self, date: NaiveDate) -> ModuleLogPath {
148        ModuleLogPath(self.0.joined(format!("{date}.log")))
149    }
150
151    pub fn today(self) -> ModuleLogPath {
152        self.logfile(chrono::Utc::now().date_naive())
153    }
154
155    pub fn most_recent(self) -> io::Result<Option<ModuleLogPath>> {
156        let mut max_file_name = None;
157        for entry in std::fs::read_dir(&self)? {
158            let file_name = entry?.file_name();
159            max_file_name = std::cmp::max(max_file_name, Some(file_name));
160        }
161        Ok(max_file_name.map(|file_name| ModuleLogPath(self.0.joined(file_name))))
162    }
163}
164
165path_type! {
166    /// A module log from a specific date.
167    ModuleLogPath: file
168}
169
170impl ModuleLogPath {
171    pub fn date(&self) -> NaiveDate {
172        self.0
173            .file_stem()
174            .and_then(|s| s.to_str()?.parse().ok())
175            .expect("ModuleLogPath should always have a filename of the form `{date}.log`")
176    }
177
178    pub fn with_date(&self, date: NaiveDate) -> Self {
179        Self(self.0.with_file_name(format!("{date}.log")))
180    }
181
182    pub fn yesterday(&self) -> Self {
183        self.with_date(self.date().pred_opt().unwrap())
184    }
185
186    pub fn popped(mut self) -> ModuleLogsDir {
187        self.0.pop();
188        ModuleLogsDir(self.0)
189    }
190}
191
192path_type! {
193    /// The snapshots directory. `{data-dir}/replica/$replica_id/snapshots`
194    SnapshotsPath: dir
195}
196
197impl SnapshotsPath {
198    pub fn snapshot_dir(&self, tx_offset: u64) -> SnapshotDirPath {
199        let dir_name = format!("{tx_offset:0>20}.snapshot_dir");
200        SnapshotDirPath(self.0.join(dir_name))
201    }
202}
203
204path_type! {
205    /// A snapshot directory. `{data-dir}/replica/$replica_id/snapshots/$tx_offset.snapshot_dir`
206    SnapshotDirPath: dir
207}
208
209impl SnapshotDirPath {
210    pub fn snapshot_file(&self, tx_offset: u64) -> SnapshotFilePath {
211        let file_name = format!("{tx_offset:0>20}.snapshot_bsatn");
212        SnapshotFilePath(self.0.join(file_name))
213    }
214
215    pub fn objects(&self) -> SnapshotObjectsPath {
216        SnapshotObjectsPath(self.0.join("objects"))
217    }
218
219    pub fn rename_invalid(&self) -> io::Result<()> {
220        let invalid_path = self.0.with_extension("invalid_snapshot");
221        fs::rename(self, invalid_path)
222    }
223
224    pub fn tx_offset(&self) -> Option<u64> {
225        self.0
226            .file_stem()
227            .and_then(|s| s.to_str()?.split('.').next()?.parse::<u64>().ok())
228    }
229}
230
231path_type! {
232    /// A snapshot file.
233    /// `{data-dir}/replica/$replica_id/snapshots/$tx_offset.snapshot_dir/$tx_offset.snapshot_bsatn`
234    SnapshotFilePath: file
235}
236path_type! {
237    /// The objects directory for a snapshot.
238    /// `{data-dir}/replica/$replica_id/snapshots/$tx_offset.snapshot_dir/objects`
239    SnapshotObjectsPath: dir
240}
241
242path_type! {
243    /// The commit log directory. `{data-dir}/replica/$replica_id/clog`
244    CommitLogDir: dir
245}
246
247impl CommitLogDir {
248    /// By convention, the file name of a segment consists of the minimum
249    /// transaction offset contained in it, left-padded with zeroes to 20 digits,
250    /// and the file extension `.stdb.log`.
251    pub fn segment(&self, offset: u64) -> SegmentFile {
252        let file_name = format!("{offset:0>20}.stdb.log");
253        SegmentFile(self.0.join(file_name))
254    }
255
256    /// Returns the offset index file path based on the root path and offset
257    pub fn index(&self, offset: u64) -> OffsetIndexFile {
258        let file_name = format!("{offset:0>20}.stdb.ofs");
259        OffsetIndexFile(self.0.join(file_name))
260    }
261}
262
263path_type!(SegmentFile: file);
264path_type!(OffsetIndexFile: file);
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use anyhow::Result;
270    use std::fs;
271    use tempfile::TempDir;
272
273    #[test]
274    fn test_pid_file_is_written() -> Result<()> {
275        let tempdir = TempDir::new()?;
276        let sdd = ServerDataDir(tempdir.path().to_path_buf());
277
278        let lock = sdd.pid_file()?;
279
280        // Make sure we wrote the pid file.
281        let pidstring = fs::read_to_string(lock.path.clone())?;
282        let _pid = pidstring.trim().parse::<u32>()?;
283
284        Ok(())
285    }
286
287    #[test]
288    fn test_pid_is_exclusive() -> Result<()> {
289        let tempdir = TempDir::new()?;
290        let sdd = ServerDataDir(tempdir.path().to_path_buf());
291
292        let lock = sdd.pid_file()?;
293
294        // Make sure we wrote the pid file.
295        let pidstring = fs::read_to_string(lock.path.clone())?;
296        let _pid = pidstring.trim().parse::<u32>()?;
297
298        let attempt = sdd.pid_file();
299        assert!(attempt.is_err());
300
301        drop(lock);
302        // Make sure it can be acquired now.
303        sdd.pid_file()?;
304        Ok(())
305    }
306
307    #[test]
308    fn test_snapshot_parsing() -> Result<()> {
309        let tempdir = TempDir::new()?;
310        let sdd = ServerDataDir(tempdir.path().to_path_buf());
311        const SNAPSHOT_OFFSET: u64 = 123456;
312        let snapshot_dir = sdd.replica(1).snapshots().snapshot_dir(SNAPSHOT_OFFSET);
313        assert_eq!(Some(SNAPSHOT_OFFSET), snapshot_dir.tx_offset());
314        Ok(())
315    }
316}