1pub mod callback;
2pub mod file_persisted;
3pub mod wal_record;
4
5pub(crate) mod atomic_flush_metrics;
6pub(crate) mod batch_metrics;
7mod closed_chunk_reader;
8pub(crate) mod file_entry;
9pub(crate) mod flush_request;
10pub(crate) mod flush_worker;
11pub(crate) mod queued_write;
12pub(crate) mod write_batch;
13
14use std::collections::BTreeMap;
15use std::fmt;
16use std::io;
17use std::sync::Arc;
18use std::sync::mpsc::SyncSender;
19use std::time::Instant;
20
21pub use closed_chunk_reader::ClosedChunkReader;
22use codeq::OffsetSize;
23pub use flush_request::FlushStat;
24pub(crate) use flush_request::WorkerRequest;
25use log::info;
26
27use crate::Chunk;
28use crate::ChunkId;
29use crate::Config;
30use crate::WALRecord;
31use crate::WalLock;
32use crate::WalTypes;
33use crate::api::state_machine::StateMachine;
34use crate::api::wal::WAL;
35use crate::chunk::closed_chunk::ClosedChunk;
36use crate::chunk::open_chunk::OpenChunk;
37use crate::num::format_pad_u64;
38use crate::stat::ChunkStat;
39use crate::stat::FlushMetrics;
40use crate::types::Segment;
41use crate::wal::atomic_flush_metrics::AtomicFlushMetrics;
42use crate::wal::file_entry::FileEntry;
43use crate::wal::file_persisted::ChunkPersisted;
44use crate::wal::file_persisted::ChunkPersistedCallback;
45pub use crate::wal::file_persisted::ChunkPersistedFn;
46use crate::wal::flush_request::SeqRequest;
47use crate::wal::flush_request::WriteRequest;
48use crate::wal::flush_worker::FlushWorker;
49use crate::wal::flush_worker::WorkerState;
50
51pub struct ChunkedWal<W>
57where W: WalTypes
58{
59 config: Arc<Config>,
60 open: OpenChunk<WALRecord<W>>,
61 closed: BTreeMap<ChunkId, ClosedChunk<W>>,
62
63 flush_tx: SyncSender<SeqRequest<W>>,
68
69 on_chunk_persisted: ChunkPersistedFn<W>,
73
74 sent_seq: u64,
77
78 worker_state: Arc<WorkerState>,
80
81 flush_metrics: Arc<AtomicFlushMetrics>,
83
84 _dir_lock: WalLock,
86}
87
88impl<W> fmt::Debug for ChunkedWal<W>
89where W: WalTypes
90{
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 f.debug_struct("ChunkedWal")
93 .field("config", &self.config)
94 .field("open", &self.open)
95 .field("closed", &self.closed)
96 .field("sent_seq", &self.sent_seq)
97 .field("done_seq", &self.worker_state.done_seq())
98 .field("flush_metrics", &self.flush_metrics)
99 .finish_non_exhaustive()
100 }
101}
102
103impl<W> ChunkedWal<W>
104where W: WalTypes
105{
106 pub fn open<SM>(
109 config: Arc<Config>,
110 state_machine: &mut SM,
111 on_chunk_persisted: ChunkPersistedFn<W>,
112 ) -> Result<Self, io::Error>
113 where
114 SM: StateMachine<W>,
115 {
116 let dir_lock = Self::acquire_lock(&config)?;
117 Self::open_locked(config, state_machine, on_chunk_persisted, dir_lock)
118 }
119
120 pub fn acquire_lock(config: &Config) -> Result<WalLock, io::Error> {
122 WalLock::new(config)
123 }
124
125 pub fn open_locked<SM>(
127 config: Arc<Config>,
128 state_machine: &mut SM,
129 on_chunk_persisted: ChunkPersistedFn<W>,
130 dir_lock: WalLock,
131 ) -> Result<Self, io::Error>
132 where
133 SM: StateMachine<W>,
134 {
135 let chunk_ids = Self::load_chunk_ids(&config, &dir_lock)?;
136
137 let mut closed = BTreeMap::new();
138 let mut prev_end_offset = None;
139 let mut prev_checkpoint = None;
140 let tail_chunk_id = chunk_ids.last().copied();
141
142 for chunk_id in chunk_ids.iter().copied() {
143 Self::ensure_consecutive_chunks(prev_end_offset, chunk_id)?;
144
145 let (chunk, records) = Chunk::<WALRecord<W>>::open_with_truncate(
146 config.clone(),
147 chunk_id,
148 Some(chunk_id) == tail_chunk_id,
149 )?;
150
151 on_chunk_persisted(
152 ChunkPersisted {
153 file: chunk.f.clone(),
154 starting_offset: chunk.global_start(),
155 synced_offset: chunk.global_end(),
156 },
157 prev_checkpoint.clone(),
158 );
159
160 for (i, record) in records.iter().enumerate() {
161 let seg = chunk.record_segment(i);
162 state_machine
163 .apply(record, chunk_id, seg)
164 .map_err(|e| io::Error::other(e.to_string()))?;
165 }
166
167 prev_end_offset = Some(chunk.last_segment().end().0);
168 let checkpoint = Arc::new(state_machine.checkpoint());
169 prev_checkpoint = Some(checkpoint.clone());
170
171 closed.insert(chunk_id, ClosedChunk::new(chunk, checkpoint));
172 }
173
174 let open = Self::reopen_last_closed(&mut closed);
175
176 let open = if let Some(open) = open {
177 open
178 } else {
179 OpenChunk::create(
180 config.clone(),
181 ChunkId(prev_end_offset.unwrap_or_default()),
182 WALRecord::Checkpoint(state_machine.checkpoint()),
183 )?
184 };
185
186 Ok(Self::new(
187 config,
188 closed,
189 open,
190 on_chunk_persisted,
191 dir_lock,
192 ))
193 }
194
195 pub fn dump_records<D>(
197 config: &Config,
198 _dir_lock: &WalLock,
199 mut write_record: D,
200 ) -> Result<(), io::Error>
201 where
202 D: FnMut(
203 ChunkId,
204 u64,
205 Result<(Segment, WALRecord<W>), io::Error>,
206 ) -> Result<(), io::Error>,
207 {
208 let chunk_ids = Self::load_chunk_ids(config, _dir_lock)?;
209 for chunk_id in chunk_ids {
210 let it = Chunk::<WALRecord<W>>::dump(config, chunk_id)?;
211 for (i, res) in it.into_iter().enumerate() {
212 write_record(chunk_id, i as u64, res)?;
213 }
214 }
215
216 Ok(())
217 }
218
219 fn new(
228 config: Arc<Config>,
229 closed: BTreeMap<ChunkId, ClosedChunk<W>>,
230 open: OpenChunk<WALRecord<W>>,
231 on_chunk_persisted: ChunkPersistedFn<W>,
232 dir_lock: WalLock,
233 ) -> Self {
234 let prev_checkpoint =
235 closed.iter().last().map(|(_, c)| c.state.clone());
236
237 let offset = open.chunk.global_start();
238 let f = open.chunk.f.clone();
239
240 let file_entry = FileEntry::new(
241 offset,
242 f,
243 ChunkPersistedCallback::new(
244 on_chunk_persisted.clone(),
245 prev_checkpoint,
246 ),
247 );
248
249 let worker_state = Arc::new(WorkerState::new());
250 let flush_metrics = Arc::new(AtomicFlushMetrics::default());
251
252 let (flush_tx, rx) = std::sync::mpsc::sync_channel(1024);
253 let worker = FlushWorker::new(
254 rx,
255 file_entry,
256 worker_state.clone(),
257 flush_metrics.clone(),
258 config.flush_batch_wait(),
259 config.flush_batch_max_items(),
260 );
261
262 worker.spawn();
263
264 Self {
265 config,
266 open,
267 closed,
268 flush_tx,
269 on_chunk_persisted,
270 sent_seq: 0,
271 worker_state,
272 flush_metrics,
273 _dir_lock: dir_lock,
274 }
275 }
276
277 fn ensure_consecutive_chunks(
278 prev_end_offset: Option<u64>,
279 chunk_id: ChunkId,
280 ) -> Result<(), io::Error> {
281 let Some(prev_end) = prev_end_offset else {
282 return Ok(());
283 };
284
285 if prev_end != chunk_id.offset() {
286 let message = format!(
287 "Gap between chunks: {} -> {}; Can not open, \
288 fix this error and re-open",
289 format_pad_u64(prev_end),
290 format_pad_u64(chunk_id.offset()),
291 );
292 return Err(io::Error::new(io::ErrorKind::InvalidData, message));
293 }
294
295 Ok(())
296 }
297
298 fn reopen_last_closed(
299 closed_chunks: &mut BTreeMap<ChunkId, ClosedChunk<W>>,
300 ) -> Option<OpenChunk<WALRecord<W>>> {
301 {
302 let (_chunk_id, closed) = closed_chunks.iter().last()?;
303
304 if closed.chunk.is_truncated() {
305 return None;
306 }
307 }
308
309 let (_chunk_id, last) = closed_chunks.pop_last().unwrap();
310 let open = OpenChunk::new(last.chunk);
311 Some(open)
312 }
313
314 pub fn load_chunk_ids(
315 config: &Config,
316 _dir_lock: &WalLock,
317 ) -> Result<Vec<ChunkId>, io::Error> {
318 let path = &config.dir;
319 let entries = std::fs::read_dir(path)?;
320 let mut chunk_ids = vec![];
321 for entry in entries {
322 let entry = entry?;
323 let file_name = entry.file_name();
324
325 let fn_str = file_name.to_string_lossy();
326 if fn_str == WalLock::LOCK_FILE_NAME {
327 continue;
328 }
329
330 let res = Config::parse_chunk_file_name(&fn_str);
331
332 match res {
333 Ok(offset) => {
334 chunk_ids.push(ChunkId(offset));
335 }
336 Err(err) => {
337 log::warn!(
338 "Ignore invalid WAL file name: '{}': {}",
339 fn_str,
340 err
341 );
342 continue;
343 }
344 };
345 }
346
347 chunk_ids.sort();
348
349 Ok(chunk_ids)
350 }
351
352 pub fn open_chunk_id(&self) -> ChunkId {
353 self.open.chunk.chunk_id()
354 }
355
356 pub fn closed_chunk_stats(&self) -> Vec<ChunkStat<W::Checkpoint>> {
357 self.closed.values().map(|c| c.stat()).collect()
358 }
359
360 pub fn open_chunk_stat(
361 &self,
362 checkpoint: W::Checkpoint,
363 ) -> ChunkStat<W::Checkpoint> {
364 ChunkStat {
365 chunk_id: self.open.chunk.chunk_id(),
366 records_count: self.open.chunk.records_count() as u64,
367 global_start: self.open.chunk.global_start(),
368 global_end: self.open.chunk.global_end(),
369 size: self.open.chunk.chunk_size(),
370 log_state: checkpoint,
371 }
372 }
373
374 pub fn closed_chunk_reader(&self) -> ClosedChunkReader<W> {
375 ClosedChunkReader::new(self.closed.clone())
376 }
377
378 pub fn drain_closed_chunks_while<F>(
379 &mut self,
380 mut should_drain: F,
381 ) -> Vec<ChunkId>
382 where
383 F: FnMut(&W::Checkpoint) -> bool,
384 {
385 let mut chunk_ids = Vec::new();
386
387 while let Some((_chunk_id, closed)) = self.closed.first_key_value() {
388 if !should_drain(closed.state.as_ref()) {
389 break;
390 }
391
392 let (chunk_id, _closed) = self.closed.pop_first().unwrap();
393 chunk_ids.push(chunk_id);
394 }
395
396 chunk_ids
397 }
398
399 pub fn dump_loaded_records<D>(
400 &self,
401 mut write_record: D,
402 ) -> Result<(), io::Error>
403 where
404 D: FnMut(
405 ChunkId,
406 u64,
407 Result<(Segment, WALRecord<W>), io::Error>,
408 ) -> Result<(), io::Error>,
409 {
410 let closed = self.closed.keys().copied();
411 let chunk_ids = closed.chain([self.open.chunk.chunk_id()]);
412
413 for chunk_id in chunk_ids {
414 let f =
415 Chunk::<WALRecord<W>>::open_chunk_file(&self.config, chunk_id)?;
416
417 let it = Chunk::<WALRecord<W>>::load_records_iter(
418 &self.config,
419 Arc::new(f),
420 chunk_id,
421 )?;
422
423 for (i, res) in it.enumerate() {
424 write_record(chunk_id, i as u64, res)?;
425 }
426 }
427
428 Ok(())
429 }
430
431 pub fn on_disk_size(&self) -> u64 {
432 let end = self.open.chunk.global_end();
433 let open_start = self.open.chunk.global_start();
434 let first_closed_start = self
435 .closed
436 .first_key_value()
437 .map(|(_, v)| v.chunk.global_start())
438 .unwrap_or(open_start);
439
440 end - first_closed_start
441 }
442
443 pub fn last_closed_chunk_truncated_file_size(&self) -> Option<u64> {
444 self.closed
445 .last_key_value()
446 .and_then(|(_chunk_id, closed)| closed.chunk.truncated_file_size())
447 }
448
449 fn send_request(&mut self, req: WorkerRequest<W>) -> Result<(), io::Error> {
452 self.sent_seq += 1;
453 self.flush_tx
454 .send(SeqRequest {
455 seq: self.sent_seq,
456 queued_at: Instant::now(),
457 req,
458 })
459 .map_err(|e| {
460 io::Error::other(format!("Failed to send request: {}", e))
461 })
462 }
463
464 pub fn wait_worker_idle(&self) -> Result<(), io::Error> {
466 self.worker_state.wait_for(self.sent_seq)
467 }
468
469 pub fn flush_metrics(&self) -> FlushMetrics {
470 self.flush_metrics.snapshot()
471 }
472
473 pub fn send_pending(
481 &mut self,
482 sync: bool,
483 callback: Option<W::Callback>,
484 ) -> Result<(), io::Error> {
485 let data = self.open.take_pending_data();
486 self.send_request(WorkerRequest::Write(WriteRequest {
487 upto_offset: self.open.chunk.global_end(),
488 data,
489 sync,
490 callback,
491 }))
492 }
493
494 pub fn send_remove_chunks(
504 &mut self,
505 chunk_ids: Vec<ChunkId>,
506 ) -> Result<(), io::Error> {
507 let chunk_paths = chunk_ids
508 .into_iter()
509 .map(|chunk_id| self.config.chunk_path(chunk_id))
510 .collect();
511
512 self.send_request(WorkerRequest::RemoveChunks { chunk_paths })
513 }
514
515 #[allow(dead_code)]
516 pub fn get_stat(&mut self) -> Result<Vec<FlushStat>, io::Error> {
517 let (tx, rx) = std::sync::mpsc::sync_channel(1);
518 self.send_get_stat(tx)?;
519 rx.recv().map_err(|e| {
520 io::Error::other(format!(
521 "Failed to receive get state response: {}",
522 e
523 ))
524 })
525 }
526
527 #[allow(dead_code)]
528 pub(crate) fn send_get_stat(
529 &mut self,
530 callback: SyncSender<Vec<FlushStat>>,
531 ) -> Result<(), io::Error> {
532 self.send_request(WorkerRequest::GetFlushStat { tx: callback })
533 }
534
535 pub fn is_open_chunk_full(&self) -> bool {
540 self.open.chunk.records_count() >= self.config.chunk_max_records()
541 || (self.open.chunk.chunk_size() as usize)
542 >= self.config.chunk_max_size()
543 }
544
545 pub fn try_close_full_chunk<SM>(
561 &mut self,
562 state_machine: &SM,
563 ) -> Result<Option<W::Checkpoint>, io::Error>
564 where
565 SM: StateMachine<W>,
566 {
567 if !self.is_open_chunk_full() {
568 return Ok(None);
569 }
570
571 let config = self.config.clone();
572 let offset = self.open.chunk.last_segment().end();
573
574 info!(
575 "Closing full chunk: {}, open new: {}",
576 self.open.chunk.chunk_id(),
577 ChunkId(offset.0)
578 );
579
580 let checkpoint = state_machine.checkpoint();
581
582 let new_open = {
583 let chunk_id = ChunkId(offset.0);
584 OpenChunk::create(
585 config,
586 chunk_id,
587 WALRecord::Checkpoint(checkpoint.clone()),
588 )?
589 };
590
591 let mut old_open = std::mem::replace(&mut self.open, new_open);
592
593 let prev_pending_data = old_open.take_pending_data();
594 if !prev_pending_data.is_empty() {
595 self.send_request(WorkerRequest::Write(WriteRequest {
596 upto_offset: offset.0,
597 data: prev_pending_data,
598 sync: true,
599 callback: None,
600 }))?;
601 }
602
603 let checkpoint = Arc::new(checkpoint);
604
605 self.send_request(WorkerRequest::AppendFile(FileEntry::new(
606 offset.0,
607 self.open.chunk.f.clone(),
608 ChunkPersistedCallback::new(
609 self.on_chunk_persisted.clone(),
610 Some(checkpoint.clone()),
611 ),
612 )))?;
613
614 let chunk = old_open.chunk;
615 let closed_id = chunk.chunk_id();
616 let closed = ClosedChunk::new(chunk, checkpoint.clone());
617 self.closed.insert(closed_id, closed);
618 Ok(Some(checkpoint.as_ref().clone()))
619 }
620
621 pub fn load_record(
635 &self,
636 chunk_id: &ChunkId,
637 segment: Segment,
638 ) -> Result<WALRecord<W>, io::Error> {
639 let record = {
642 let closed = self.closed.get(chunk_id).ok_or_else(|| {
643 io::Error::new(
644 io::ErrorKind::NotFound,
645 format!(
646 "Chunk not found: {}; when:(open cache-miss read)",
647 chunk_id
648 ),
649 )
650 })?;
651 closed.chunk.read_record(segment)?
652 };
653
654 Ok(record)
655 }
656}
657
658impl<W> WAL<WALRecord<W>> for ChunkedWal<W>
659where W: WalTypes
660{
661 fn append(&mut self, rec: &WALRecord<W>) -> Result<(), io::Error> {
662 self.open.append_record(rec)?;
663 Ok(())
664 }
665
666 fn last_segment(&self) -> Segment {
667 self.open.chunk.last_segment()
668 }
669}
670
671#[cfg(test)]
672mod tests {
673 use std::io;
674 use std::io::Seek;
675 use std::io::Write;
676 use std::sync::Arc;
677 use std::sync::Mutex;
678 use std::sync::mpsc::SyncSender;
679 use std::sync::mpsc::sync_channel;
680
681 use codeq::Decode;
682 use codeq::Encode;
683 use codeq::OffsetSize;
684
685 use crate::Chunk;
686 use crate::ChunkId;
687 use crate::ChunkPersisted;
688 use crate::ChunkPersistedFn;
689 use crate::ChunkedWal;
690 use crate::Config;
691 use crate::Segment;
692 use crate::StateMachine;
693 use crate::WAL;
694 use crate::WALRecord;
695 use crate::WalTypes;
696
697 const TEST_ACTION_TYPE: u32 = 1;
698
699 #[derive(Debug, Clone, PartialEq, Eq)]
700 struct TestAction(String);
701
702 impl Encode for TestAction {
703 fn encode<Wt: io::Write>(&self, mut w: Wt) -> Result<usize, io::Error> {
704 let mut n = TEST_ACTION_TYPE.encode(&mut w)?;
705 n += self.0.encode(&mut w)?;
706 Ok(n)
707 }
708
709 fn type_id(&self) -> Option<u32> {
710 Some(TEST_ACTION_TYPE)
711 }
712 }
713
714 impl Decode for TestAction {
715 fn decode<R: io::Read>(mut r: R) -> Result<Self, io::Error> {
716 let type_id = u32::decode(&mut r)?;
717 if type_id != TEST_ACTION_TYPE {
718 return Err(io::Error::new(
719 io::ErrorKind::InvalidData,
720 format!("unexpected action type id {}", type_id),
721 ));
722 }
723
724 Ok(Self(String::decode(&mut r)?))
725 }
726 }
727
728 #[derive(Debug, Default, Clone, PartialEq, Eq)]
729 struct TestWal;
730
731 impl WalTypes for TestWal {
732 type Action = TestAction;
733 type Checkpoint = String;
734 type Callback = SyncSender<Result<(), io::Error>>;
735 }
736
737 #[derive(Debug, Default)]
738 struct TestStateMachine {
739 values: Vec<String>,
740 }
741
742 impl StateMachine<TestWal> for TestStateMachine {
743 type Error = io::Error;
744
745 fn apply(
746 &mut self,
747 record: &WALRecord<TestWal>,
748 _chunk_id: ChunkId,
749 _global_segment: crate::Segment,
750 ) -> Result<(), Self::Error> {
751 match record {
752 WALRecord::Action(v) => self.values.push(v.0.clone()),
753 WALRecord::Checkpoint(checkpoint) => {
754 self.values = decode_checkpoint(checkpoint);
755 }
756 }
757
758 Ok(())
759 }
760
761 fn checkpoint(&self) -> String {
762 encode_checkpoint(&self.values)
763 }
764 }
765
766 #[derive(Debug, Clone, PartialEq, Eq)]
767 struct PersistedCall {
768 starting_offset: u64,
769 synced_offset: u64,
770 checkpoint: Option<String>,
771 }
772
773 fn encode_checkpoint(values: &[String]) -> String {
774 values.join(",")
775 }
776
777 fn decode_checkpoint(checkpoint: &str) -> Vec<String> {
778 if checkpoint.is_empty() {
779 return Vec::new();
780 }
781
782 checkpoint.split(',').map(str::to_string).collect()
783 }
784
785 fn action(value: &str) -> WALRecord<TestWal> {
786 WALRecord::Action(TestAction(value.to_string()))
787 }
788
789 fn callback(
790 calls: Arc<Mutex<Vec<PersistedCall>>>,
791 ) -> ChunkPersistedFn<TestWal> {
792 Arc::new(
793 move |persisted: ChunkPersisted,
794 checkpoint: Option<Arc<String>>| {
795 calls.lock().unwrap().push(PersistedCall {
796 starting_offset: persisted.starting_offset,
797 synced_offset: persisted.synced_offset,
798 checkpoint: checkpoint.as_deref().cloned(),
799 });
800 },
801 )
802 }
803
804 fn open_wal(
805 config: &Config,
806 calls: Arc<Mutex<Vec<PersistedCall>>>,
807 ) -> Result<(ChunkedWal<TestWal>, TestStateMachine), io::Error> {
808 let mut sm = TestStateMachine::default();
809 let wal = ChunkedWal::open(
810 Arc::new(config.clone()),
811 &mut sm,
812 callback(calls),
813 )?;
814
815 Ok((wal, sm))
816 }
817
818 fn append_action(
819 wal: &mut ChunkedWal<TestWal>,
820 sm: &mut TestStateMachine,
821 value: &str,
822 ) -> Result<crate::Segment, io::Error> {
823 let record = action(value);
824 wal.append(&record)?;
825 let segment = wal.last_segment();
826 sm.apply(&record, wal.open.chunk.chunk_id(), segment)?;
827 wal.try_close_full_chunk(sm)?;
828 Ok(segment)
829 }
830
831 fn sync_flush(wal: &mut ChunkedWal<TestWal>) -> Result<(), io::Error> {
832 let (tx, rx) = sync_channel(1);
833 wal.send_pending(true, Some(tx))?;
834 rx.recv()
835 .map_err(|e| io::Error::other(format!("flush callback: {e}")))??;
836 wal.wait_worker_idle()?;
837 Ok(())
838 }
839
840 fn no_sync_flush(wal: &mut ChunkedWal<TestWal>) -> Result<(), io::Error> {
841 let (tx, rx) = sync_channel(1);
842 wal.send_pending(false, Some(tx))?;
843 rx.recv()
844 .map_err(|e| io::Error::other(format!("flush callback: {e}")))??;
845 wal.wait_worker_idle()?;
846 Ok(())
847 }
848
849 fn temp_config() -> (tempfile::TempDir, Config) {
850 let td = tempfile::tempdir().unwrap();
851 let config = Config::new(td.path().to_str().unwrap());
852 (td, config)
853 }
854
855 fn records_in_chunk(
856 config: &Config,
857 chunk_id: ChunkId,
858 ) -> Result<Vec<WALRecord<TestWal>>, io::Error> {
859 Chunk::<WALRecord<TestWal>>::dump(config, chunk_id)?
860 .into_iter()
861 .map(|res| res.map(|(_, record)| record))
862 .collect()
863 }
864
865 #[test]
866 fn test_open_append_flush_reopen() -> Result<(), io::Error> {
867 let (_td, config) = temp_config();
868
869 {
870 let calls = Arc::new(Mutex::new(Vec::new()));
871 let (mut wal, mut sm) = open_wal(&config, calls)?;
872
873 append_action(&mut wal, &mut sm, "a")?;
874 append_action(&mut wal, &mut sm, "b")?;
875 append_action(&mut wal, &mut sm, "c")?;
876 sync_flush(&mut wal)?;
877
878 assert_eq!(vec!["a", "b", "c"], sm.values);
879 assert!(wal.closed.is_empty());
880 assert_eq!(4, wal.open.chunk.records_count());
881 assert!(format!("{wal:?}").contains("ChunkedWal"));
882 }
883
884 {
885 let calls = Arc::new(Mutex::new(Vec::new()));
886 let (wal, sm) = open_wal(&config, calls)?;
887
888 assert_eq!(vec!["a", "b", "c"], sm.values);
889 assert!(wal.closed.is_empty());
890 assert_eq!(4, wal.open.chunk.records_count());
891 }
892
893 Ok(())
894 }
895
896 #[test]
897 fn test_list_chunk_ids_ignores_invalid_file_names() -> Result<(), io::Error>
898 {
899 let (_td, config) = temp_config();
900 std::fs::write(config.chunk_path(ChunkId(12)), [])?;
901 std::fs::write(format!("{}/not-a-chunk", config.dir), [])?;
902
903 let lock = ChunkedWal::<TestWal>::acquire_lock(&config)?;
904 let chunk_ids = ChunkedWal::<TestWal>::load_chunk_ids(&config, &lock)?;
905
906 assert_eq!(vec![ChunkId(12)], chunk_ids);
907 Ok(())
908 }
909
910 #[test]
911 fn test_rotate_chunk_writes_checkpoint() -> Result<(), io::Error> {
912 let (_td, mut config) = temp_config();
913 config.chunk_max_records = Some(3);
914
915 let calls = Arc::new(Mutex::new(Vec::new()));
916 let (mut wal, mut sm) = open_wal(&config, calls)?;
917
918 append_action(&mut wal, &mut sm, "a")?;
919 append_action(&mut wal, &mut sm, "b")?;
920 append_action(&mut wal, &mut sm, "c")?;
921 sync_flush(&mut wal)?;
922
923 assert_eq!(1, wal.closed.len());
924 assert_eq!(
925 "a,b",
926 wal.closed.first_key_value().unwrap().1.state.as_ref()
927 );
928
929 let records = records_in_chunk(&config, wal.open.chunk.chunk_id())?;
930 assert_eq!(
931 vec![WALRecord::Checkpoint("a,b".to_string()), action("c"),],
932 records
933 );
934
935 Ok(())
936 }
937
938 #[test]
939 fn test_reopen_reuses_last_healthy_chunk() -> Result<(), io::Error> {
940 let (_td, mut config) = temp_config();
941 config.chunk_max_records = Some(3);
942
943 let open_chunk_id = {
944 let calls = Arc::new(Mutex::new(Vec::new()));
945 let (mut wal, mut sm) = open_wal(&config, calls)?;
946
947 for value in ["a", "b", "c", "d"] {
948 append_action(&mut wal, &mut sm, value)?;
949 }
950 sync_flush(&mut wal)?;
951
952 assert_eq!(2, wal.closed.len());
953 wal.open.chunk.chunk_id()
954 };
955
956 let calls = Arc::new(Mutex::new(Vec::new()));
957 let (wal, sm) = open_wal(&config, calls)?;
958
959 assert_eq!(vec!["a", "b", "c", "d"], sm.values);
960 assert_eq!(2, wal.closed.len());
961 assert_eq!(open_chunk_id, wal.open.chunk.chunk_id());
962 assert_eq!(1, wal.open.chunk.records_count());
963
964 Ok(())
965 }
966
967 #[test]
968 fn test_reopen_truncates_incomplete_last_record() -> Result<(), io::Error> {
969 let (_td, config) = temp_config();
970
971 let truncated_from = {
972 let calls = Arc::new(Mutex::new(Vec::new()));
973 let (mut wal, mut sm) = open_wal(&config, calls)?;
974
975 append_action(&mut wal, &mut sm, "a")?;
976 append_action(&mut wal, &mut sm, "b")?;
977 let segment = append_action(&mut wal, &mut sm, "c")?;
978 sync_flush(&mut wal)?;
979
980 let chunk_id = wal.open.chunk.chunk_id();
981 let f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
982 &config, chunk_id,
983 )?;
984 let damaged_len = segment.end().0 - chunk_id.offset() - 1;
985 f.set_len(damaged_len)?;
986 damaged_len
987 };
988
989 let calls = Arc::new(Mutex::new(Vec::new()));
990 let (wal, sm) = open_wal(&config, calls)?;
991
992 assert_eq!(vec!["a", "b"], sm.values);
993 assert_eq!(1, wal.closed.len());
994 assert_eq!(
995 Some(truncated_from),
996 wal.last_closed_chunk_truncated_file_size()
997 );
998 assert_eq!(
999 Some(truncated_from),
1000 wal.closed.first_key_value().unwrap().1.chunk.truncated_file_size()
1001 );
1002 assert_eq!(
1003 WALRecord::Checkpoint("a,b".to_string()),
1004 wal.open.chunk.read_record(wal.open.chunk.last_segment())?
1005 );
1006
1007 Ok(())
1008 }
1009
1010 #[test]
1011 fn test_reopen_truncates_trailing_zeroes() -> Result<(), io::Error> {
1012 let (_td, config) = temp_config();
1013
1014 let original_len = {
1015 let calls = Arc::new(Mutex::new(Vec::new()));
1016 let (mut wal, mut sm) = open_wal(&config, calls)?;
1017
1018 append_action(&mut wal, &mut sm, "a")?;
1019 append_action(&mut wal, &mut sm, "b")?;
1020 sync_flush(&mut wal)?;
1021
1022 let chunk_id = wal.open.chunk.chunk_id();
1023 let original_len = wal.open.chunk.global_end() - chunk_id.offset();
1024 let mut f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
1025 &config, chunk_id,
1026 )?;
1027 f.seek(io::SeekFrom::Start(original_len))?;
1028 f.write_all(&[0, 0, 0])?;
1029 original_len
1030 };
1031
1032 let calls = Arc::new(Mutex::new(Vec::new()));
1033 let (wal, sm) = open_wal(&config, calls)?;
1034
1035 assert_eq!(vec!["a", "b"], sm.values);
1036 assert_eq!(1, wal.closed.len());
1037 assert_eq!(
1038 Some(original_len + 3),
1039 wal.last_closed_chunk_truncated_file_size()
1040 );
1041 assert_eq!(
1042 Some(original_len + 3),
1043 wal.closed.first_key_value().unwrap().1.chunk.truncated_file_size()
1044 );
1045
1046 Ok(())
1047 }
1048
1049 #[test]
1050 fn test_reopen_rejects_damaged_trailing_checkpoint() -> Result<(), io::Error>
1051 {
1052 let (_td, config) = temp_config();
1053
1054 {
1055 let calls = Arc::new(Mutex::new(Vec::new()));
1056 let (mut wal, mut sm) = open_wal(&config, calls)?;
1057
1058 append_action(&mut wal, &mut sm, "a")?;
1059 append_action(&mut wal, &mut sm, "b")?;
1060 sync_flush(&mut wal)?;
1061
1062 let chunk_id = wal.open.chunk.chunk_id();
1063 let original_len = wal.open.chunk.global_end() - chunk_id.offset();
1064 let mut f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
1065 &config, chunk_id,
1066 )?;
1067 let mut damaged = Vec::new();
1068 WALRecord::<TestWal>::Checkpoint("bad".to_string())
1069 .encode(&mut damaged)?;
1070 *damaged.last_mut().unwrap() ^= 1;
1071
1072 f.seek(io::SeekFrom::Start(original_len))?;
1073 f.write_all(&damaged)?;
1074 }
1075
1076 let calls = Arc::new(Mutex::new(Vec::new()));
1077 let err = match open_wal(&config, calls) {
1078 Ok(_) => panic!("damaged checkpoint record must fail"),
1079 Err(err) => err,
1080 };
1081
1082 assert!(err.to_string().contains("decode Record at offset"));
1083
1084 Ok(())
1085 }
1086
1087 #[test]
1088 fn test_reopen_rejects_damaged_non_tail_chunk_without_truncating()
1089 -> Result<(), io::Error> {
1090 let (_td, mut config) = temp_config();
1091 config.chunk_max_records = Some(3);
1092
1093 let (chunk_id, damaged_len) = {
1094 let calls = Arc::new(Mutex::new(Vec::new()));
1095 let (mut wal, mut sm) = open_wal(&config, calls)?;
1096
1097 append_action(&mut wal, &mut sm, "a")?;
1098 let truncated_segment = append_action(&mut wal, &mut sm, "b")?;
1099 append_action(&mut wal, &mut sm, "c")?;
1100 sync_flush(&mut wal)?;
1101
1102 let chunk_id = *wal.closed.first_key_value().unwrap().0;
1103 let f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
1104 &config, chunk_id,
1105 )?;
1106 let truncated_len = truncated_segment.end().0 - chunk_id.offset();
1107 let damaged_len = truncated_len - 1;
1108 f.set_len(damaged_len)?;
1109 (chunk_id, damaged_len)
1110 };
1111
1112 let calls = Arc::new(Mutex::new(Vec::new()));
1113 let err = open_wal(&config, calls)
1114 .expect_err("damaged non-tail chunk must fail");
1115
1116 assert!(err.to_string().contains("decode Record at offset"));
1117
1118 let f =
1119 Chunk::<WALRecord<TestWal>>::open_chunk_file(&config, chunk_id)?;
1120 assert_eq!(damaged_len, f.metadata()?.len());
1121
1122 Ok(())
1123 }
1124
1125 #[test]
1126 fn test_on_chunk_persisted_called_on_recovery() -> Result<(), io::Error> {
1127 let (_td, mut config) = temp_config();
1128 config.chunk_max_records = Some(3);
1129
1130 {
1131 let calls = Arc::new(Mutex::new(Vec::new()));
1132 let (mut wal, mut sm) = open_wal(&config, calls)?;
1133
1134 for value in ["a", "b", "c", "d"] {
1135 append_action(&mut wal, &mut sm, value)?;
1136 }
1137 sync_flush(&mut wal)?;
1138 }
1139
1140 let calls = Arc::new(Mutex::new(Vec::new()));
1141 let (_wal, sm) = open_wal(&config, calls.clone())?;
1142
1143 assert_eq!(vec!["a", "b", "c", "d"], sm.values);
1144 assert_eq!(
1145 vec![None, Some("a,b".to_string()), Some("a,b,c,d".to_string()),],
1146 calls
1147 .lock()
1148 .unwrap()
1149 .iter()
1150 .map(|call| call.checkpoint.clone())
1151 .collect::<Vec<_>>()
1152 );
1153
1154 Ok(())
1155 }
1156
1157 #[test]
1158 fn test_on_chunk_persisted_tracks_rotated_file() -> Result<(), io::Error> {
1159 let (_td, mut config) = temp_config();
1160 config.chunk_max_records = Some(3);
1161
1162 let calls = Arc::new(Mutex::new(Vec::new()));
1163 let (mut wal, mut sm) = open_wal(&config, calls.clone())?;
1164
1165 append_action(&mut wal, &mut sm, "a")?;
1166 append_action(&mut wal, &mut sm, "b")?;
1167
1168 let open_start = wal.open.chunk.global_start();
1169 sync_flush(&mut wal)?;
1170
1171 assert!(calls.lock().unwrap().contains(&PersistedCall {
1172 starting_offset: open_start,
1173 synced_offset: wal.open.chunk.global_end(),
1174 checkpoint: Some("a,b".to_string()),
1175 }));
1176
1177 Ok(())
1178 }
1179
1180 #[test]
1181 fn test_loaded_chunk_accessors() -> Result<(), io::Error> {
1182 let (_td, mut config) = temp_config();
1183 config.chunk_max_records = Some(3);
1184
1185 let calls = Arc::new(Mutex::new(Vec::new()));
1186 let (mut wal, mut sm) = open_wal(&config, calls)?;
1187
1188 let segment_a = append_action(&mut wal, &mut sm, "a")?;
1189 append_action(&mut wal, &mut sm, "b")?;
1190 append_action(&mut wal, &mut sm, "c")?;
1191 sync_flush(&mut wal)?;
1192
1193 let open_chunk_id = wal.open_chunk_id();
1194 let closed_stats = wal.closed_chunk_stats();
1195 let open_stat = wal.open_chunk_stat(sm.checkpoint());
1196
1197 assert_eq!(1, closed_stats.len());
1198 assert_eq!(ChunkId(0), closed_stats[0].chunk_id);
1199 assert_eq!(3, closed_stats[0].records_count);
1200 assert_eq!("a,b", closed_stats[0].log_state);
1201 assert_eq!(open_chunk_id, open_stat.chunk_id);
1202 assert_eq!(2, open_stat.records_count);
1203 assert_eq!("a,b,c", open_stat.log_state);
1204 assert_eq!(open_stat.global_end, wal.on_disk_size());
1205 assert_eq!(None, wal.last_closed_chunk_truncated_file_size());
1206
1207 assert_eq!(
1208 action("a"),
1209 wal.closed_chunk_reader().read_record(ChunkId(0), segment_a)?
1210 );
1211
1212 let err =
1213 wal.load_record(&ChunkId(999), Segment::new(999, 1)).unwrap_err();
1214 assert_eq!(io::ErrorKind::NotFound, err.kind());
1215 assert!(err.to_string().contains("Chunk not found"));
1216
1217 let mut dumped = Vec::new();
1218 wal.dump_loaded_records(|chunk_id, index, res| {
1219 dumped.push((chunk_id, index, res.map(|(_segment, rec)| rec)?));
1220 Ok(())
1221 })?;
1222
1223 assert_eq!(
1224 vec![
1225 (ChunkId(0), 0, WALRecord::Checkpoint(String::new())),
1226 (ChunkId(0), 1, action("a")),
1227 (ChunkId(0), 2, action("b")),
1228 (open_chunk_id, 0, WALRecord::Checkpoint("a,b".to_string())),
1229 (open_chunk_id, 1, action("c")),
1230 ],
1231 dumped
1232 );
1233
1234 let drained =
1235 wal.drain_closed_chunks_while(|checkpoint| checkpoint == "a,b");
1236 assert_eq!(vec![ChunkId(0)], drained);
1237 assert!(wal.closed_chunk_stats().is_empty());
1238
1239 let path = config.chunk_path(ChunkId(0));
1240 assert!(std::path::Path::new(&path).exists());
1241 wal.send_remove_chunks(drained)?;
1242 wal.wait_worker_idle()?;
1243 assert!(!std::path::Path::new(&path).exists());
1244
1245 Ok(())
1246 }
1247
1248 #[test]
1249 fn test_drain_closed_chunks_while_stops_at_first_unmatched()
1250 -> Result<(), io::Error> {
1251 let (_td, mut config) = temp_config();
1252 config.chunk_max_records = Some(3);
1253
1254 let calls = Arc::new(Mutex::new(Vec::new()));
1255 let (mut wal, mut sm) = open_wal(&config, calls)?;
1256
1257 for value in ["a", "b", "c", "d", "e"] {
1258 append_action(&mut wal, &mut sm, value)?;
1259 }
1260 sync_flush(&mut wal)?;
1261
1262 let closed_before = wal
1263 .closed_chunk_stats()
1264 .into_iter()
1265 .map(|stat| (stat.chunk_id, stat.log_state))
1266 .collect::<Vec<_>>();
1267 assert_eq!(
1268 vec![
1269 (ChunkId(0), "a,b".to_string()),
1270 (ChunkId(34), "a,b,c,d".to_string()),
1271 ],
1272 closed_before
1273 );
1274
1275 let drained =
1276 wal.drain_closed_chunks_while(|checkpoint| checkpoint == "a,b");
1277 assert_eq!(vec![ChunkId(0)], drained);
1278
1279 let closed_after = wal
1280 .closed_chunk_stats()
1281 .into_iter()
1282 .map(|stat| (stat.chunk_id, stat.log_state))
1283 .collect::<Vec<_>>();
1284 assert_eq!(vec![(ChunkId(34), "a,b,c,d".to_string())], closed_after);
1285
1286 Ok(())
1287 }
1288
1289 #[test]
1290 fn test_lock_blocks_second_open_and_dump() -> Result<(), io::Error> {
1291 let (_td, config) = temp_config();
1292
1293 let calls = Arc::new(Mutex::new(Vec::new()));
1294 let (wal, _sm) = open_wal(&config, calls.clone())?;
1295
1296 let err = ChunkedWal::<TestWal>::acquire_lock(&config)
1297 .expect_err("second lock must fail");
1298 assert_eq!(io::ErrorKind::WouldBlock, err.kind());
1299
1300 drop(wal);
1301
1302 let lock = ChunkedWal::<TestWal>::acquire_lock(&config)?;
1303 let mut records = Vec::new();
1304 ChunkedWal::<TestWal>::dump_records(
1305 &config,
1306 &lock,
1307 |chunk_id, i, res| {
1308 records.push((chunk_id, i, res.map(|(_, record)| record)?));
1309 Ok(())
1310 },
1311 )?;
1312
1313 assert_eq!(
1314 vec![(ChunkId(0), 0, WALRecord::Checkpoint(String::new()))],
1315 records
1316 );
1317
1318 Ok(())
1319 }
1320
1321 #[test]
1322 fn test_flush_without_sync_writes_without_advancing_sync_id()
1323 -> Result<(), io::Error> {
1324 let (_td, config) = temp_config();
1325 let calls = Arc::new(Mutex::new(Vec::new()));
1326 let (mut wal, mut sm) = open_wal(&config, calls)?;
1327
1328 append_action(&mut wal, &mut sm, "a")?;
1329 append_action(&mut wal, &mut sm, "b")?;
1330 no_sync_flush(&mut wal)?;
1331
1332 assert_eq!(
1333 vec![(0, 0)],
1334 wal.get_stat()?
1335 .iter()
1336 .map(|stat| stat.offset_sync_id())
1337 .collect::<Vec<_>>()
1338 );
1339
1340 sync_flush(&mut wal)?;
1341
1342 assert!(
1343 wal.get_stat()?
1344 .iter()
1345 .any(|stat| stat.sync_id == wal.open.chunk.global_end())
1346 );
1347
1348 Ok(())
1349 }
1350
1351 #[test]
1352 fn test_worker_failure_wakes_waiter_and_fails_later_waits()
1353 -> Result<(), io::Error> {
1354 let (_td, config) = temp_config();
1355 let calls = Arc::new(Mutex::new(Vec::new()));
1356 let (mut wal, _sm) = open_wal(&config, calls)?;
1357
1358 wal.send_remove_chunks(vec![ChunkId(999)])?;
1359
1360 let err = wal.wait_worker_idle().unwrap_err();
1361 assert_eq!(io::ErrorKind::NotFound, err.kind());
1362
1363 let res = wal.send_remove_chunks(vec![ChunkId(999)]);
1364 if let Err(err) = res {
1365 assert_eq!(io::ErrorKind::Other, err.kind());
1366 }
1367
1368 let err = wal.wait_worker_idle().unwrap_err();
1369 assert_eq!(io::ErrorKind::NotFound, err.kind());
1370
1371 Ok(())
1372 }
1373}