1use super::marker::{serialize_marker_item, Marker};
6use crate::{
7 batch::item::Item as BatchItem, file::fsync_directory, journal::recovery::JournalId, InnerItem,
8 PartitionHandle,
9};
10use lsm_tree::{coding::Encode, EncodeError, SeqNo, ValueType};
11use std::{
12 fs::{File, OpenOptions},
13 hash::Hasher,
14 io::{BufWriter, Write},
15 path::{Path, PathBuf},
16};
17
18pub const PRE_ALLOCATED_BYTES: u64 = 32 * 1_024 * 1_024;
20
21pub const JOURNAL_BUFFER_BYTES: usize = 8 * 1_024;
22
23pub struct Writer {
24 pub(crate) path: PathBuf,
25 file: BufWriter<File>,
26 buf: Vec<u8>,
27
28 is_buffer_dirty: bool,
29}
30
31#[derive(Copy, Clone, Debug, Eq, PartialEq)]
33pub enum PersistMode {
34 Buffer,
40
41 SyncData,
45
46 SyncAll,
48}
49
50impl Writer {
51 pub fn len(&self) -> crate::Result<u64> {
52 Ok(self.file.get_ref().metadata()?.len())
53 }
54
55 pub fn rotate(&mut self) -> crate::Result<(PathBuf, PathBuf)> {
56 self.persist(PersistMode::SyncAll)?;
57
58 log::debug!(
59 "Sealing active journal at {:?}, len={}B",
60 self.path,
61 self.path
62 .metadata()
63 .inspect_err(|e| {
64 log::error!(
65 "Failed to get file metadata of journal file at {:?}: {e:?}",
66 self.path
67 );
68 })?
69 .len(),
70 );
71
72 let prev_path = self.path.clone();
73
74 let folder = self
75 .path
76 .parent()
77 .expect("should have parent")
78 .to_path_buf();
79
80 let journal_id = self
81 .path
82 .file_name()
83 .expect("should be valid file name")
84 .to_str()
85 .expect("should be valid journal file name")
86 .parse::<JournalId>()
87 .expect("should be valid journal ID");
88
89 let new_path = folder.join((journal_id + 1).to_string());
90 log::debug!("Rotating active journal to {new_path:?}");
91
92 *self = Self::create_new(new_path.clone())?;
93
94 fsync_directory(&folder)?;
96
97 Ok((prev_path, new_path))
98 }
99
100 pub fn create_new<P: Into<PathBuf>>(path: P) -> crate::Result<Self> {
101 let path = path.into();
102
103 let file = File::create(&path).inspect_err(|e| {
104 log::error!("Failed to create journal file at {path:?}: {e:?}");
105 })?;
106
107 file.set_len(PRE_ALLOCATED_BYTES).inspect_err(|e| {
108 log::error!(
109 "Failed to set journal file size to {PRE_ALLOCATED_BYTES}B at {path:?}: {e:?}"
110 );
111 })?;
112
113 file.sync_all().inspect_err(|e| {
114 log::error!("Failed to fsync journal file at {path:?}: {e:?}");
115 })?;
116
117 Ok(Self {
118 path,
119 file: BufWriter::new(file),
120 buf: Vec::new(),
121 is_buffer_dirty: false,
122 })
123 }
124
125 pub fn from_file<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
126 let path = path.as_ref();
127
128 if !path.try_exists()? {
129 let file = OpenOptions::new()
130 .create_new(true)
131 .write(true)
132 .open(path)
133 .inspect_err(|e| {
134 log::error!("Failed to create journal file at {path:?}: {e:?}");
135 })?;
136
137 file.set_len(PRE_ALLOCATED_BYTES).inspect_err(|e| {
138 log::error!(
139 "Failed to set journal file size to {PRE_ALLOCATED_BYTES}B at {path:?}: {e:?}"
140 );
141 })?;
142
143 file.sync_all().inspect_err(|e| {
144 log::error!("Failed to fsync journal file at {path:?}: {e:?}");
145 })?;
146
147 return Ok(Self {
148 path: path.into(),
149 file: BufWriter::with_capacity(JOURNAL_BUFFER_BYTES, file),
150 buf: Vec::new(),
151 is_buffer_dirty: false,
152 });
153 }
154
155 let file = OpenOptions::new()
156 .append(true)
157 .open(path)
158 .inspect_err(|e| {
159 log::error!("Failed to open journal file at {path:?}: {e:?}");
160 })?;
161
162 Ok(Self {
163 path: path.into(),
164 file: BufWriter::with_capacity(JOURNAL_BUFFER_BYTES, file),
165 buf: Vec::new(),
166 is_buffer_dirty: false,
167 })
168 }
169
170 pub(crate) fn persist(&mut self, mode: PersistMode) -> std::io::Result<()> {
172 log::trace!("Persisting journal at {:?} with mode={mode:?}", self.path);
173
174 if self.is_buffer_dirty {
175 self.file.flush().inspect_err(|e| {
176 log::error!(
177 "Failed to flush journal IO buffers at {:?}: {e:?}",
178 self.path
179 );
180 })?;
181 self.is_buffer_dirty = false;
182 }
183
184 match mode {
185 PersistMode::SyncAll => self.file.get_mut().sync_all().inspect_err(|e| {
186 log::error!("Failed to fsync journal file at {:?}: {e:?}", self.path);
187 }),
188 PersistMode::SyncData => self.file.get_mut().sync_data().inspect_err(|e| {
189 log::error!("Failed to fsyncdata journal file at {:?}: {e:?}", self.path);
190 }),
191 PersistMode::Buffer => Ok(()),
192 }
193 }
194
195 fn write_start(&mut self, item_count: u32, seqno: SeqNo) -> Result<usize, EncodeError> {
197 debug_assert!(self.buf.is_empty());
198
199 Marker::Start {
200 item_count,
201 seqno,
202 compression: lsm_tree::CompressionType::None,
203 }
204 .encode_into(&mut self.buf)?;
205
206 self.file.write_all(&self.buf)?;
207
208 Ok(self.buf.len())
209 }
210
211 fn write_end(&mut self, checksum: u64) -> Result<usize, EncodeError> {
213 debug_assert!(self.buf.is_empty());
214
215 Marker::End(checksum).encode_into(&mut self.buf)?;
216
217 self.file.write_all(&self.buf)?;
218
219 Ok(self.buf.len())
220 }
221
222 pub(crate) fn write_raw(
223 &mut self,
224 partition: &str,
225 key: &[u8],
226 value: &[u8],
227 value_type: ValueType,
228 seqno: u64,
229 ) -> crate::Result<usize> {
230 self.is_buffer_dirty = true;
231
232 let mut hasher = xxhash_rust::xxh3::Xxh3::new();
233 let mut byte_count = 0;
234
235 self.buf.clear();
236 byte_count += self.write_start(1, seqno)?;
237 self.buf.clear();
238
239 serialize_marker_item(&mut self.buf, partition, key, value, value_type)?;
240
241 self.file.write_all(&self.buf)?;
242
243 hasher.update(&self.buf);
244 byte_count += self.buf.len();
245
246 self.buf.clear();
247 let checksum = hasher.finish();
248 byte_count += self.write_end(checksum)?;
249
250 Ok(byte_count)
251 }
252
253 pub fn write_optimized_batch(
254 &mut self,
255 tuples: &[(&PartitionHandle, Vec<InnerItem>)],
256 batch_size: usize,
257 seqno: SeqNo,
258 ) -> crate::Result<usize> {
259 if batch_size == 0 {
260 return Ok(0);
261 }
262
263 self.is_buffer_dirty = true;
264
265 self.buf.clear();
266
267 #[allow(clippy::cast_possible_truncation)]
269 let item_count = batch_size as u32;
270
271 let mut hasher = xxhash_rust::xxh3::Xxh3::new();
272 let mut byte_count = 0;
273
274 byte_count += self.write_start(item_count, seqno)?;
275 self.buf.clear();
276
277 for (partition, items) in tuples {
278 let partition = partition.name.clone();
279
280 for item in items {
281 debug_assert!(self.buf.is_empty());
282
283 serialize_marker_item(
284 &mut self.buf,
285 &partition,
286 &item.key,
287 &item.value,
288 item.value_type,
289 )?;
290
291 self.file.write_all(&self.buf)?;
292
293 hasher.update(&self.buf);
294 byte_count += self.buf.len();
295
296 self.buf.clear();
297 }
298 }
299
300 let checksum = hasher.finish();
301 byte_count += self.write_end(checksum)?;
302
303 Ok(byte_count)
304 }
305
306 pub fn write_batch<'a>(
307 &mut self,
308 items: impl Iterator<Item = &'a BatchItem>,
309 batch_size: usize,
310 seqno: SeqNo,
311 ) -> crate::Result<usize> {
312 if batch_size == 0 {
313 return Ok(0);
314 }
315
316 self.is_buffer_dirty = true;
317
318 self.buf.clear();
319
320 #[allow(clippy::cast_possible_truncation)]
322 let item_count = batch_size as u32;
323
324 let mut hasher = xxhash_rust::xxh3::Xxh3::new();
325 let mut byte_count = 0;
326
327 byte_count += self.write_start(item_count, seqno)?;
328 self.buf.clear();
329
330 for item in items {
331 debug_assert!(self.buf.is_empty());
332
333 serialize_marker_item(
334 &mut self.buf,
335 &item.partition,
336 &item.key,
337 &item.value,
338 item.value_type,
339 )?;
340
341 self.file.write_all(&self.buf)?;
342
343 hasher.update(&self.buf);
344 byte_count += self.buf.len();
345
346 self.buf.clear();
347 }
348
349 let checksum = hasher.finish();
350 byte_count += self.write_end(checksum)?;
351
352 Ok(byte_count)
353 }
354}