spacetimedb_paths/
server.rs1use std::{fs, io};
2
3use crate::utils::{path_type, PathBufExt};
4use chrono::NaiveDate;
5
6path_type! {
7 ServerDataDir: dir
9}
10
11impl ServerDataDir {
12 pub fn config_toml(&self) -> ConfigToml {
13 ConfigToml(self.0.join("config.toml"))
14 }
15
16 pub fn logs(&self) -> LogsDir {
17 LogsDir(self.0.join("logs"))
18 }
19
20 pub fn wasmtime_cache(&self) -> WasmtimeCacheDir {
21 WasmtimeCacheDir(self.0.join("cache/wasmtime"))
22 }
23
24 pub fn metadata_toml(&self) -> MetadataTomlPath {
25 MetadataTomlPath(self.0.join("metadata.toml"))
26 }
27
28 pub fn pid_file(&self) -> Result<PidFile, PidFileError> {
29 use fs2::FileExt;
30 use io::{Read, Write};
31 self.create()?;
32 let path = self.0.join("spacetime.pid");
33 let mut file = fs::File::options()
34 .create(true)
35 .write(true)
36 .truncate(false)
37 .read(true)
38 .open(&path)?;
39 match file.try_lock_exclusive() {
40 Ok(()) => {}
41 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
42 let mut s = String::new();
43 let pid = file.read_to_string(&mut s).ok().and_then(|_| s.trim().parse().ok());
44 return Err(PidFileError::Exists { pid });
45 }
46 Err(e) => return Err(e.into()),
47 }
48 let mut pidfile = PidFile { file, path };
49 pidfile.file.set_len(0)?;
50 write!(pidfile.file, "{}", std::process::id())?;
51 pidfile.file.flush()?;
52 Ok(pidfile)
53 }
54
55 pub fn replica(&self, replica_id: u64) -> ReplicaDir {
56 ReplicaDir(self.0.join("replicas").joined_int(replica_id))
57 }
58}
59
60path_type! {
61 ConfigToml: file
63}
64
65path_type! {
66 LogsDir: dir
70}
71
72impl LogsDir {
73 pub fn filename_prefix(edition: &str) -> String {
80 format!("spacetime-{edition}")
81 }
82
83 pub fn filename_extension() -> String {
85 "log".to_owned()
86 }
87}
88
89path_type! {
90 WasmtimeCacheDir: dir
92}
93
94path_type! {
95 MetadataTomlPath: file
98}
99
100#[derive(thiserror::Error, Debug)]
101pub enum PidFileError {
102 #[error("error while taking database lock on spacetime.pid")]
103 Io(#[from] io::Error),
104 #[error("cannot take lock on database; spacetime.pid already exists (owned by pid {pid:?})")]
105 Exists { pid: Option<u32> },
106}
107
108pub struct PidFile {
110 file: fs::File,
111 path: std::path::PathBuf,
112}
113
114impl Drop for PidFile {
115 fn drop(&mut self) {
116 let _ = fs::remove_file(&self.path);
117 }
118}
119
120path_type! {
121 ReplicaDir: dir
124}
125
126impl ReplicaDir {
127 pub fn module_logs(self) -> ModuleLogsDir {
128 ModuleLogsDir(self.0.joined("module_logs"))
129 }
130
131 pub fn snapshots(&self) -> SnapshotsPath {
132 SnapshotsPath(self.0.join("snapshots"))
133 }
134
135 pub fn commit_log(&self) -> CommitLogDir {
136 CommitLogDir(self.0.join("clog"))
137 }
138}
139
140path_type! {
141 ModuleLogsDir: dir
143}
144
145impl ModuleLogsDir {
146 pub fn logfile(self, date: NaiveDate) -> ModuleLogPath {
148 ModuleLogPath(self.0.joined(format!("{date}.log")))
149 }
150
151 pub fn today(self) -> ModuleLogPath {
152 self.logfile(chrono::Utc::now().date_naive())
153 }
154
155 pub fn most_recent(self) -> io::Result<Option<ModuleLogPath>> {
156 let mut max_file_name = None;
157 for entry in std::fs::read_dir(&self)? {
158 let file_name = entry?.file_name();
159 max_file_name = std::cmp::max(max_file_name, Some(file_name));
160 }
161 Ok(max_file_name.map(|file_name| ModuleLogPath(self.0.joined(file_name))))
162 }
163}
164
165path_type! {
166 ModuleLogPath: file
168}
169
170impl ModuleLogPath {
171 pub fn date(&self) -> NaiveDate {
172 self.0
173 .file_stem()
174 .and_then(|s| s.to_str()?.parse().ok())
175 .expect("ModuleLogPath should always have a filename of the form `{date}.log`")
176 }
177
178 pub fn with_date(&self, date: NaiveDate) -> Self {
179 Self(self.0.with_file_name(format!("{date}.log")))
180 }
181
182 pub fn yesterday(&self) -> Self {
183 self.with_date(self.date().pred_opt().unwrap())
184 }
185
186 pub fn popped(mut self) -> ModuleLogsDir {
187 self.0.pop();
188 ModuleLogsDir(self.0)
189 }
190}
191
192path_type! {
193 SnapshotsPath: dir
195}
196
197impl SnapshotsPath {
198 pub fn snapshot_dir(&self, tx_offset: u64) -> SnapshotDirPath {
199 let dir_name = format!("{tx_offset:0>20}.snapshot_dir");
200 SnapshotDirPath(self.0.join(dir_name))
201 }
202}
203
204path_type! {
205 SnapshotDirPath: dir
207}
208
209impl SnapshotDirPath {
210 pub fn snapshot_file(&self, tx_offset: u64) -> SnapshotFilePath {
211 let file_name = format!("{tx_offset:0>20}.snapshot_bsatn");
212 SnapshotFilePath(self.0.join(file_name))
213 }
214
215 pub fn objects(&self) -> SnapshotObjectsPath {
216 SnapshotObjectsPath(self.0.join("objects"))
217 }
218
219 pub fn rename_invalid(&self) -> io::Result<()> {
220 let invalid_path = self.0.with_extension("invalid_snapshot");
221 fs::rename(self, invalid_path)
222 }
223
224 pub fn tx_offset(&self) -> Option<u64> {
225 self.0
226 .file_stem()
227 .and_then(|s| s.to_str()?.split('.').next()?.parse::<u64>().ok())
228 }
229}
230
231path_type! {
232 SnapshotFilePath: file
235}
236path_type! {
237 SnapshotObjectsPath: dir
240}
241
242path_type! {
243 CommitLogDir: dir
245}
246
247impl CommitLogDir {
248 pub fn segment(&self, offset: u64) -> SegmentFile {
252 let file_name = format!("{offset:0>20}.stdb.log");
253 SegmentFile(self.0.join(file_name))
254 }
255
256 pub fn index(&self, offset: u64) -> OffsetIndexFile {
258 let file_name = format!("{offset:0>20}.stdb.ofs");
259 OffsetIndexFile(self.0.join(file_name))
260 }
261}
262
263path_type!(SegmentFile: file);
264path_type!(OffsetIndexFile: file);
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use anyhow::Result;
270 use std::fs;
271 use tempfile::TempDir;
272
273 #[test]
274 fn test_pid_file_is_written() -> Result<()> {
275 let tempdir = TempDir::new()?;
276 let sdd = ServerDataDir(tempdir.path().to_path_buf());
277
278 let lock = sdd.pid_file()?;
279
280 let pidstring = fs::read_to_string(lock.path.clone())?;
282 let _pid = pidstring.trim().parse::<u32>()?;
283
284 Ok(())
285 }
286
287 #[test]
288 fn test_pid_is_exclusive() -> Result<()> {
289 let tempdir = TempDir::new()?;
290 let sdd = ServerDataDir(tempdir.path().to_path_buf());
291
292 let lock = sdd.pid_file()?;
293
294 let pidstring = fs::read_to_string(lock.path.clone())?;
296 let _pid = pidstring.trim().parse::<u32>()?;
297
298 let attempt = sdd.pid_file();
299 assert!(attempt.is_err());
300
301 drop(lock);
302 sdd.pid_file()?;
304 Ok(())
305 }
306
307 #[test]
308 fn test_snapshot_parsing() -> Result<()> {
309 let tempdir = TempDir::new()?;
310 let sdd = ServerDataDir(tempdir.path().to_path_buf());
311 const SNAPSHOT_OFFSET: u64 = 123456;
312 let snapshot_dir = sdd.replica(1).snapshots().snapshot_dir(SNAPSHOT_OFFSET);
313 assert_eq!(Some(SNAPSHOT_OFFSET), snapshot_dir.tx_offset());
314 Ok(())
315 }
316}