spacetimedb_commitlog/repo/
fs.rs1use std::fs::{self, File};
2use std::io;
3
4use log::{debug, warn};
5use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader};
6use spacetimedb_paths::server::{CommitLogDir, SegmentFile};
7use tempfile::NamedTempFile;
8
9use crate::segment::FileLike;
10
11use super::{Repo, SegmentLen, TxOffset, TxOffsetIndex, TxOffsetIndexMut};
12
13const SEGMENT_FILE_EXT: &str = ".stdb.log";
14
15#[derive(Clone, Debug)]
29pub struct Fs {
30 root: CommitLogDir,
32}
33
34impl Fs {
35 pub fn new(root: CommitLogDir) -> io::Result<Self> {
39 root.create()?;
40 Ok(Self { root })
41 }
42
43 pub fn segment_path(&self, offset: u64) -> SegmentFile {
46 self.root.segment(offset)
47 }
48
49 pub fn size_on_disk(&self) -> io::Result<u64> {
53 let mut sz = 0;
54 for offset in self.existing_offsets()? {
55 sz += self.segment_path(offset).metadata()?.len();
56 sz += self.root.index(offset).metadata().map(|m| m.len()).unwrap_or(0);
58 }
59
60 Ok(sz)
61 }
62}
63
64impl SegmentLen for File {}
65
66impl FileLike for NamedTempFile {
67 fn fsync(&mut self) -> io::Result<()> {
68 self.as_file_mut().fsync()
69 }
70
71 fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
72 self.as_file_mut().ftruncate(tx_offset, size)
73 }
74}
75
76impl Repo for Fs {
77 type SegmentWriter = File;
78 type SegmentReader = CompressReader;
79
80 fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
81 File::options()
82 .read(true)
83 .append(true)
84 .create_new(true)
85 .open(self.segment_path(offset))
86 .or_else(|e| {
87 if e.kind() == io::ErrorKind::AlreadyExists {
88 debug!("segment {offset} already exists");
89 let file = self.open_segment_writer(offset)?;
90 if file.metadata()?.len() == 0 {
91 debug!("segment {offset} is empty");
92 return Ok(file);
93 }
94 }
95
96 Err(e)
97 })
98 }
99
100 fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
101 File::options().read(true).append(true).open(self.segment_path(offset))
102 }
103
104 fn open_segment_reader(&self, offset: u64) -> io::Result<Self::SegmentReader> {
105 let file = File::open(self.segment_path(offset))?;
106 CompressReader::new(file)
107 }
108
109 fn remove_segment(&self, offset: u64) -> io::Result<()> {
110 let _ = self.remove_offset_index(offset).map_err(|e| {
111 warn!("failed to remove offset index for segment {offset}, error: {e}");
112 });
113 fs::remove_file(self.segment_path(offset))
114 }
115
116 fn compress_segment(&self, offset: u64) -> io::Result<()> {
117 let src = self.open_segment_reader(offset)?;
118 let CompressReader::None(mut src) = src else {
120 return Ok(());
121 };
122
123 let mut dst = NamedTempFile::new_in(&self.root)?;
124 let max_frame_size = 0x1000;
127 compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?;
128 dst.persist(self.segment_path(offset))?;
129 Ok(())
130 }
131
132 fn existing_offsets(&self) -> io::Result<Vec<u64>> {
133 let mut segments = Vec::new();
134
135 for entry in fs::read_dir(&self.root)? {
136 let entry = entry?;
137 if entry.file_type()?.is_file() {
138 let path = entry.path();
139 let name = path.file_name().unwrap_or_default().to_string_lossy();
140 let Some(file_name) = name.strip_suffix(SEGMENT_FILE_EXT) else {
141 continue;
142 };
143 let Ok(offset) = file_name.parse::<u64>() else {
144 continue;
145 };
146
147 segments.push(offset);
148 }
149 }
150
151 segments.sort_unstable();
152
153 Ok(segments)
154 }
155
156 fn create_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result<TxOffsetIndexMut> {
157 TxOffsetIndexMut::create_index_file(&self.root.index(offset), cap)
158 }
159
160 fn remove_offset_index(&self, offset: TxOffset) -> io::Result<()> {
161 TxOffsetIndexMut::delete_index_file(&self.root.index(offset))
162 }
163
164 fn get_offset_index(&self, offset: TxOffset) -> io::Result<TxOffsetIndex> {
165 TxOffsetIndex::open_index_file(&self.root.index(offset))
166 }
167}
168
169impl SegmentLen for CompressReader {}
170
171#[cfg(feature = "streaming")]
172impl crate::stream::AsyncRepo for Fs {
173 type AsyncSegmentWriter = tokio::io::BufWriter<tokio::fs::File>;
174 type AsyncSegmentReader = spacetimedb_fs_utils::compression::AsyncCompressReader<tokio::fs::File>;
175
176 async fn open_segment_reader_async(&self, offset: u64) -> io::Result<Self::AsyncSegmentReader> {
177 let file = tokio::fs::File::open(self.segment_path(offset)).await?;
178 spacetimedb_fs_utils::compression::AsyncCompressReader::new(file).await
179 }
180}
181
182#[cfg(feature = "streaming")]
183impl<T> crate::stream::AsyncLen for spacetimedb_fs_utils::compression::AsyncCompressReader<T> where
184 T: tokio::io::AsyncSeek + tokio::io::AsyncRead + Unpin + Send
185{
186}