spacetimedb_commitlog/repo/
mod.rs1use std::io;
2
3use log::{debug, warn};
4
5use crate::{
6 commit::Commit,
7 error,
8 index::{IndexFile, IndexFileMut},
9 segment::{FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer},
10 Options,
11};
12
13pub(crate) mod fs;
14#[cfg(any(test, feature = "test"))]
15pub mod mem;
16
17pub use fs::Fs;
18#[cfg(any(test, feature = "test"))]
19pub use mem::Memory;
20
21pub type TxOffset = u64;
22pub type TxOffsetIndexMut = IndexFileMut<TxOffset>;
23pub type TxOffsetIndex = IndexFile<TxOffset>;
24
25pub trait SegmentLen: io::Seek {
26 fn segment_len(&mut self) -> io::Result<u64> {
38 let old_pos = self.stream_position()?;
39 let len = self.seek(io::SeekFrom::End(0))?;
40
41 if old_pos != len {
44 self.seek(io::SeekFrom::Start(old_pos))?;
45 }
46
47 Ok(len)
48 }
49}
50
51pub trait SegmentReader: io::BufRead + SegmentLen + Send + Sync {}
52impl<T: io::BufRead + SegmentLen + Send + Sync> SegmentReader for T {}
53
54pub trait SegmentWriter: FileLike + io::Read + io::Write + SegmentLen + Send + Sync {}
55impl<T: FileLike + io::Read + io::Write + SegmentLen + Send + Sync> SegmentWriter for T {}
56
57pub trait Repo: Clone {
62 type SegmentWriter: SegmentWriter + 'static;
64 type SegmentReader: SegmentReader + 'static;
65
66 fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter>;
75
76 fn open_segment_reader(&self, offset: u64) -> io::Result<Self::SegmentReader>;
84
85 fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter>;
93
94 fn remove_segment(&self, offset: u64) -> io::Result<()>;
98
99 fn compress_segment(&self, offset: u64) -> io::Result<()>;
101
102 fn existing_offsets(&self) -> io::Result<Vec<u64>>;
105
106 fn create_offset_index(&self, _offset: TxOffset, _cap: u64) -> io::Result<TxOffsetIndexMut> {
109 Err(io::Error::other("not implemented"))
110 }
111
112 fn remove_offset_index(&self, _offset: TxOffset) -> io::Result<()> {
114 Err(io::Error::other("not implemented"))
115 }
116
117 fn get_offset_index(&self, _offset: TxOffset) -> io::Result<TxOffsetIndex> {
119 Err(io::Error::other("not implemented"))
120 }
121}
122
123impl<T: Repo> Repo for &T {
124 type SegmentWriter = T::SegmentWriter;
125 type SegmentReader = T::SegmentReader;
126
127 fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
128 T::create_segment(self, offset)
129 }
130
131 fn open_segment_reader(&self, offset: u64) -> io::Result<Self::SegmentReader> {
132 T::open_segment_reader(self, offset)
133 }
134
135 fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
136 T::open_segment_writer(self, offset)
137 }
138
139 fn remove_segment(&self, offset: u64) -> io::Result<()> {
140 T::remove_segment(self, offset)
141 }
142
143 fn compress_segment(&self, offset: u64) -> io::Result<()> {
144 T::compress_segment(self, offset)
145 }
146
147 fn existing_offsets(&self) -> io::Result<Vec<u64>> {
148 T::existing_offsets(self)
149 }
150
151 fn create_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result<TxOffsetIndexMut> {
152 T::create_offset_index(self, offset, cap)
153 }
154
155 fn remove_offset_index(&self, offset: TxOffset) -> io::Result<()> {
157 T::remove_offset_index(self, offset)
158 }
159
160 fn get_offset_index(&self, offset: TxOffset) -> io::Result<TxOffsetIndex> {
162 T::get_offset_index(self, offset)
163 }
164}
165
166impl<T: SegmentLen> SegmentLen for io::BufReader<T> {
167 fn segment_len(&mut self) -> io::Result<u64> {
168 SegmentLen::segment_len(self.get_mut())
169 }
170}
171
172pub(crate) fn create_offset_index_writer<R: Repo>(repo: &R, offset: u64, opts: Options) -> Option<OffsetIndexWriter> {
173 repo.create_offset_index(offset, opts.offset_index_len())
174 .map(|index| OffsetIndexWriter::new(index, opts))
175 .map_err(|e| {
176 warn!("failed to get offset index for segment {offset}: {e}");
177 })
178 .ok()
179}
180
181pub fn create_segment_writer<R: Repo>(
188 repo: &R,
189 opts: Options,
190 epoch: u64,
191 offset: u64,
192) -> io::Result<Writer<R::SegmentWriter>> {
193 let mut storage = repo.create_segment(offset)?;
194 Header {
195 log_format_version: opts.log_format_version,
196 checksum_algorithm: Commit::CHECKSUM_ALGORITHM,
197 }
198 .write(&mut storage)?;
199 storage.fsync()?;
200
201 Ok(Writer {
202 commit: Commit {
203 min_tx_offset: offset,
204 n: 0,
205 records: Vec::new(),
206 epoch,
207 },
208 inner: io::BufWriter::new(storage),
209
210 min_tx_offset: offset,
211 bytes_written: Header::LEN as u64,
212
213 max_records_in_commit: opts.max_records_in_commit,
214
215 offset_index_head: create_offset_index_writer(repo, offset, opts),
216 })
217}
218
219pub fn resume_segment_writer<R: Repo>(
235 repo: &R,
236 opts: Options,
237 offset: u64,
238) -> io::Result<Result<Writer<R::SegmentWriter>, Metadata>> {
239 let mut storage = repo.open_segment_writer(offset)?;
240 let offset_index = repo.get_offset_index(offset).ok();
241 let Metadata {
242 header,
243 tx_range,
244 size_in_bytes,
245 max_epoch,
246 max_commit_offset: _,
247 } = match Metadata::extract(offset, &mut storage, offset_index.as_ref()) {
248 Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => {
249 warn!("invalid commit in segment {offset}: {source}");
250 debug!("sofar={sofar:?}");
251 return Ok(Err(sofar));
252 }
253 Err(error::SegmentMetadata::Io(e)) => return Err(e),
254 Ok(meta) => meta,
255 };
256 header
257 .ensure_compatible(opts.log_format_version, Commit::CHECKSUM_ALGORITHM)
258 .map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
259 if header.log_format_version != opts.log_format_version {
261 return Err(io::Error::new(
262 io::ErrorKind::InvalidData,
263 format!(
264 "log format version mismatch: current={} segment={}",
265 opts.log_format_version, header.log_format_version
266 ),
267 ));
268 }
269
270 Ok(Ok(Writer {
271 commit: Commit {
272 min_tx_offset: tx_range.end,
273 n: 0,
274 records: Vec::new(),
275 epoch: max_epoch,
276 },
277 inner: io::BufWriter::new(storage),
278
279 min_tx_offset: tx_range.start,
280 bytes_written: size_in_bytes,
281
282 max_records_in_commit: opts.max_records_in_commit,
283
284 offset_index_head: create_offset_index_writer(repo, offset, opts),
285 }))
286}
287
288pub fn open_segment_reader<R: Repo>(
294 repo: &R,
295 max_log_format_version: u8,
296 offset: u64,
297) -> io::Result<Reader<R::SegmentReader>> {
298 debug!("open segment reader at {offset}");
299 let storage = repo.open_segment_reader(offset)?;
300 Reader::new(max_log_format_version, offset, storage)
301}