1pub mod callback;
2pub mod file_persisted;
3pub mod wal_record;
4
5pub(crate) mod atomic_flush_metrics;
6pub(crate) mod batch_metrics;
7mod closed_chunk_reader;
8pub(crate) mod file_entry;
9pub(crate) mod flush_request;
10pub(crate) mod flush_worker;
11pub(crate) mod queued_write;
12pub(crate) mod write_batch;
13
14use std::collections::BTreeMap;
15use std::fmt;
16use std::io;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU64;
19use std::sync::atomic::Ordering;
20use std::sync::mpsc::SyncSender;
21use std::time::Instant;
22
23pub use closed_chunk_reader::ClosedChunkReader;
24use codeq::OffsetSize;
25pub use flush_request::FlushStat;
26pub(crate) use flush_request::WorkerRequest;
27use log::info;
28
29use crate::Chunk;
30use crate::ChunkId;
31use crate::Config;
32use crate::WALRecord;
33use crate::WalLock;
34use crate::WalTypes;
35use crate::api::state_machine::StateMachine;
36use crate::api::wal::WAL;
37use crate::chunk::closed_chunk::ClosedChunk;
38use crate::chunk::open_chunk::OpenChunk;
39use crate::num::format_pad_u64;
40use crate::stat::ChunkStat;
41use crate::stat::FlushMetrics;
42use crate::types::Segment;
43use crate::wal::atomic_flush_metrics::AtomicFlushMetrics;
44use crate::wal::file_entry::FileEntry;
45use crate::wal::file_persisted::ChunkPersisted;
46use crate::wal::file_persisted::ChunkPersistedCallback;
47pub use crate::wal::file_persisted::ChunkPersistedFn;
48use crate::wal::flush_request::SeqRequest;
49use crate::wal::flush_request::WriteRequest;
50use crate::wal::flush_worker::FlushWorker;
51
52pub struct ChunkedWal<W>
58where W: WalTypes
59{
60 config: Arc<Config>,
61 open: OpenChunk<WALRecord<W>>,
62 closed: BTreeMap<ChunkId, ClosedChunk<W>>,
63
64 flush_tx: SyncSender<SeqRequest<W>>,
69
70 on_chunk_persisted: ChunkPersistedFn<W>,
74
75 sent_seq: u64,
78
79 done_seq: Arc<AtomicU64>,
81
82 flush_metrics: Arc<AtomicFlushMetrics>,
84
85 _dir_lock: WalLock,
87}
88
89impl<W> fmt::Debug for ChunkedWal<W>
90where W: WalTypes
91{
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 f.debug_struct("ChunkedWal")
94 .field("config", &self.config)
95 .field("open", &self.open)
96 .field("closed", &self.closed)
97 .field("sent_seq", &self.sent_seq)
98 .field("done_seq", &self.done_seq)
99 .field("flush_metrics", &self.flush_metrics)
100 .finish_non_exhaustive()
101 }
102}
103
104impl<W> ChunkedWal<W>
105where W: WalTypes
106{
107 pub fn open<SM>(
110 config: Arc<Config>,
111 state_machine: &mut SM,
112 on_chunk_persisted: ChunkPersistedFn<W>,
113 ) -> Result<Self, io::Error>
114 where
115 SM: StateMachine<W>,
116 {
117 let dir_lock = Self::acquire_lock(&config)?;
118 Self::open_locked(config, state_machine, on_chunk_persisted, dir_lock)
119 }
120
121 pub fn acquire_lock(config: &Config) -> Result<WalLock, io::Error> {
123 WalLock::new(config)
124 }
125
126 pub fn open_locked<SM>(
128 config: Arc<Config>,
129 state_machine: &mut SM,
130 on_chunk_persisted: ChunkPersistedFn<W>,
131 dir_lock: WalLock,
132 ) -> Result<Self, io::Error>
133 where
134 SM: StateMachine<W>,
135 {
136 let chunk_ids = Self::load_chunk_ids(&config, &dir_lock)?;
137
138 let mut closed = BTreeMap::new();
139 let mut prev_end_offset = None;
140 let mut prev_checkpoint = None;
141
142 for chunk_id in chunk_ids.iter().copied() {
143 Self::ensure_consecutive_chunks(prev_end_offset, chunk_id)?;
144
145 let (chunk, records) =
146 Chunk::<WALRecord<W>>::open(config.clone(), chunk_id)?;
147
148 on_chunk_persisted(
149 ChunkPersisted {
150 file: chunk.f.clone(),
151 starting_offset: chunk.global_start(),
152 synced_offset: chunk.global_end(),
153 },
154 prev_checkpoint.clone(),
155 );
156
157 for (i, record) in records.iter().enumerate() {
158 let seg = chunk.record_segment(i);
159 state_machine
160 .apply(record, chunk_id, seg)
161 .map_err(|e| io::Error::other(e.to_string()))?;
162 }
163
164 prev_end_offset = Some(chunk.last_segment().end().0);
165 let checkpoint = Arc::new(state_machine.checkpoint());
166 prev_checkpoint = Some(checkpoint.clone());
167
168 closed.insert(chunk_id, ClosedChunk::new(chunk, checkpoint));
169 }
170
171 let open = Self::reopen_last_closed(&mut closed);
172
173 let open = if let Some(open) = open {
174 open
175 } else {
176 OpenChunk::create(
177 config.clone(),
178 ChunkId(prev_end_offset.unwrap_or_default()),
179 WALRecord::Checkpoint(state_machine.checkpoint()),
180 )?
181 };
182
183 Ok(Self::new(
184 config,
185 closed,
186 open,
187 on_chunk_persisted,
188 dir_lock,
189 ))
190 }
191
192 pub fn dump_records<D>(
194 config: &Config,
195 _dir_lock: &WalLock,
196 mut write_record: D,
197 ) -> Result<(), io::Error>
198 where
199 D: FnMut(
200 ChunkId,
201 u64,
202 Result<(Segment, WALRecord<W>), io::Error>,
203 ) -> Result<(), io::Error>,
204 {
205 let chunk_ids = Self::load_chunk_ids(config, _dir_lock)?;
206 for chunk_id in chunk_ids {
207 let it = Chunk::<WALRecord<W>>::dump(config, chunk_id)?;
208 for (i, res) in it.into_iter().enumerate() {
209 write_record(chunk_id, i as u64, res)?;
210 }
211 }
212
213 Ok(())
214 }
215
216 fn new(
225 config: Arc<Config>,
226 closed: BTreeMap<ChunkId, ClosedChunk<W>>,
227 open: OpenChunk<WALRecord<W>>,
228 on_chunk_persisted: ChunkPersistedFn<W>,
229 dir_lock: WalLock,
230 ) -> Self {
231 let prev_checkpoint =
232 closed.iter().last().map(|(_, c)| c.state.clone());
233
234 let offset = open.chunk.global_start();
235 let f = open.chunk.f.clone();
236
237 let file_entry = FileEntry::new(
238 offset,
239 f,
240 ChunkPersistedCallback::new(
241 on_chunk_persisted.clone(),
242 prev_checkpoint,
243 ),
244 );
245
246 let done_seq = Arc::new(AtomicU64::new(0));
247 let flush_metrics = Arc::new(AtomicFlushMetrics::default());
248
249 let (flush_tx, rx) = std::sync::mpsc::sync_channel(1024);
250 let worker = FlushWorker::new(
251 rx,
252 file_entry,
253 done_seq.clone(),
254 flush_metrics.clone(),
255 config.flush_batch_wait(),
256 config.flush_batch_max_items(),
257 );
258
259 worker.spawn();
260
261 Self {
262 config,
263 open,
264 closed,
265 flush_tx,
266 on_chunk_persisted,
267 sent_seq: 0,
268 done_seq,
269 flush_metrics,
270 _dir_lock: dir_lock,
271 }
272 }
273
274 fn ensure_consecutive_chunks(
275 prev_end_offset: Option<u64>,
276 chunk_id: ChunkId,
277 ) -> Result<(), io::Error> {
278 let Some(prev_end) = prev_end_offset else {
279 return Ok(());
280 };
281
282 if prev_end != chunk_id.offset() {
283 let message = format!(
284 "Gap between chunks: {} -> {}; Can not open, \
285 fix this error and re-open",
286 format_pad_u64(prev_end),
287 format_pad_u64(chunk_id.offset()),
288 );
289 return Err(io::Error::new(io::ErrorKind::InvalidData, message));
290 }
291
292 Ok(())
293 }
294
295 fn reopen_last_closed(
296 closed_chunks: &mut BTreeMap<ChunkId, ClosedChunk<W>>,
297 ) -> Option<OpenChunk<WALRecord<W>>> {
298 {
299 let (_chunk_id, closed) = closed_chunks.iter().last()?;
300
301 if closed.chunk.is_truncated() {
302 return None;
303 }
304 }
305
306 let (_chunk_id, last) = closed_chunks.pop_last().unwrap();
307 let open = OpenChunk::new(last.chunk);
308 Some(open)
309 }
310
311 pub fn load_chunk_ids(
312 config: &Config,
313 _dir_lock: &WalLock,
314 ) -> Result<Vec<ChunkId>, io::Error> {
315 let path = &config.dir;
316 let entries = std::fs::read_dir(path)?;
317 let mut chunk_ids = vec![];
318 for entry in entries {
319 let entry = entry?;
320 let file_name = entry.file_name();
321
322 let fn_str = file_name.to_string_lossy();
323 if fn_str == WalLock::LOCK_FILE_NAME {
324 continue;
325 }
326
327 let res = Config::parse_chunk_file_name(&fn_str);
328
329 match res {
330 Ok(offset) => {
331 chunk_ids.push(ChunkId(offset));
332 }
333 Err(err) => {
334 log::warn!(
335 "Ignore invalid WAL file name: '{}': {}",
336 fn_str,
337 err
338 );
339 continue;
340 }
341 };
342 }
343
344 chunk_ids.sort();
345
346 Ok(chunk_ids)
347 }
348
349 pub fn open_chunk_id(&self) -> ChunkId {
350 self.open.chunk.chunk_id()
351 }
352
353 pub fn closed_chunk_stats(&self) -> Vec<ChunkStat<W::Checkpoint>> {
354 self.closed.values().map(|c| c.stat()).collect()
355 }
356
357 pub fn open_chunk_stat(
358 &self,
359 checkpoint: W::Checkpoint,
360 ) -> ChunkStat<W::Checkpoint> {
361 ChunkStat {
362 chunk_id: self.open.chunk.chunk_id(),
363 records_count: self.open.chunk.records_count() as u64,
364 global_start: self.open.chunk.global_start(),
365 global_end: self.open.chunk.global_end(),
366 size: self.open.chunk.chunk_size(),
367 log_state: checkpoint,
368 }
369 }
370
371 pub fn closed_chunk_reader(&self) -> ClosedChunkReader<W> {
372 ClosedChunkReader::new(self.closed.clone())
373 }
374
375 pub fn drain_closed_chunks_while<F>(
376 &mut self,
377 mut should_drain: F,
378 ) -> Vec<ChunkId>
379 where
380 F: FnMut(&W::Checkpoint) -> bool,
381 {
382 let mut chunk_ids = Vec::new();
383
384 while let Some((_chunk_id, closed)) = self.closed.first_key_value() {
385 if !should_drain(closed.state.as_ref()) {
386 break;
387 }
388
389 let (chunk_id, _closed) = self.closed.pop_first().unwrap();
390 chunk_ids.push(chunk_id);
391 }
392
393 chunk_ids
394 }
395
396 pub fn dump_loaded_records<D>(
397 &self,
398 mut write_record: D,
399 ) -> Result<(), io::Error>
400 where
401 D: FnMut(
402 ChunkId,
403 u64,
404 Result<(Segment, WALRecord<W>), io::Error>,
405 ) -> Result<(), io::Error>,
406 {
407 let closed = self.closed.keys().copied();
408 let chunk_ids = closed.chain([self.open.chunk.chunk_id()]);
409
410 for chunk_id in chunk_ids {
411 let f =
412 Chunk::<WALRecord<W>>::open_chunk_file(&self.config, chunk_id)?;
413
414 let it = Chunk::<WALRecord<W>>::load_records_iter(
415 &self.config,
416 Arc::new(f),
417 chunk_id,
418 )?;
419
420 for (i, res) in it.enumerate() {
421 write_record(chunk_id, i as u64, res)?;
422 }
423 }
424
425 Ok(())
426 }
427
428 pub fn on_disk_size(&self) -> u64 {
429 let end = self.open.chunk.global_end();
430 let open_start = self.open.chunk.global_start();
431 let first_closed_start = self
432 .closed
433 .first_key_value()
434 .map(|(_, v)| v.chunk.global_start())
435 .unwrap_or(open_start);
436
437 end - first_closed_start
438 }
439
440 pub fn last_closed_chunk_truncated_file_size(&self) -> Option<u64> {
441 self.closed
442 .last_key_value()
443 .and_then(|(_chunk_id, closed)| closed.chunk.truncated_file_size())
444 }
445
446 fn send_request(&mut self, req: WorkerRequest<W>) -> Result<(), io::Error> {
449 self.sent_seq += 1;
450 self.flush_tx
451 .send(SeqRequest {
452 seq: self.sent_seq,
453 queued_at: Instant::now(),
454 req,
455 })
456 .map_err(|e| {
457 io::Error::other(format!("Failed to send request: {}", e))
458 })
459 }
460
461 pub fn wait_worker_idle(&self) {
465 while self.done_seq.load(Ordering::Relaxed) < self.sent_seq {
466 std::thread::sleep(std::time::Duration::from_millis(1));
467 }
468 }
469
470 pub fn flush_metrics(&self) -> FlushMetrics {
471 self.flush_metrics.snapshot()
472 }
473
474 pub fn send_pending(
482 &mut self,
483 sync: bool,
484 callback: Option<W::Callback>,
485 ) -> Result<(), io::Error> {
486 let data = self.open.take_pending_data();
487 self.send_request(WorkerRequest::Write(WriteRequest {
488 upto_offset: self.open.chunk.global_end(),
489 data,
490 sync,
491 callback,
492 }))
493 }
494
495 pub fn send_remove_chunks(
505 &mut self,
506 chunk_ids: Vec<ChunkId>,
507 ) -> Result<(), io::Error> {
508 let chunk_paths = chunk_ids
509 .into_iter()
510 .map(|chunk_id| self.config.chunk_path(chunk_id))
511 .collect();
512
513 self.send_request(WorkerRequest::RemoveChunks { chunk_paths })
514 }
515
516 #[allow(dead_code)]
517 pub fn get_stat(&mut self) -> Result<Vec<FlushStat>, io::Error> {
518 let (tx, rx) = std::sync::mpsc::sync_channel(1);
519 self.send_get_stat(tx)?;
520 rx.recv().map_err(|e| {
521 io::Error::other(format!(
522 "Failed to receive get state response: {}",
523 e
524 ))
525 })
526 }
527
528 #[allow(dead_code)]
529 pub(crate) fn send_get_stat(
530 &mut self,
531 callback: SyncSender<Vec<FlushStat>>,
532 ) -> Result<(), io::Error> {
533 self.send_request(WorkerRequest::GetFlushStat { tx: callback })
534 }
535
536 pub fn is_open_chunk_full(&self) -> bool {
541 self.open.chunk.records_count() >= self.config.chunk_max_records()
542 || (self.open.chunk.chunk_size() as usize)
543 >= self.config.chunk_max_size()
544 }
545
546 pub fn try_close_full_chunk<SM>(
562 &mut self,
563 state_machine: &SM,
564 ) -> Result<Option<W::Checkpoint>, io::Error>
565 where
566 SM: StateMachine<W>,
567 {
568 if !self.is_open_chunk_full() {
569 return Ok(None);
570 }
571
572 let config = self.config.clone();
573 let offset = self.open.chunk.last_segment().end();
574
575 info!(
576 "Closing full chunk: {}, open new: {}",
577 self.open.chunk.chunk_id(),
578 ChunkId(offset.0)
579 );
580
581 let checkpoint = state_machine.checkpoint();
582
583 let new_open = {
584 let chunk_id = ChunkId(offset.0);
585 OpenChunk::create(
586 config,
587 chunk_id,
588 WALRecord::Checkpoint(checkpoint.clone()),
589 )?
590 };
591
592 let mut old_open = std::mem::replace(&mut self.open, new_open);
593
594 let prev_pending_data = old_open.take_pending_data();
595 if !prev_pending_data.is_empty() {
596 self.send_request(WorkerRequest::Write(WriteRequest {
597 upto_offset: offset.0,
598 data: prev_pending_data,
599 sync: true,
600 callback: None,
601 }))?;
602 }
603
604 let checkpoint = Arc::new(checkpoint);
605
606 self.send_request(WorkerRequest::AppendFile(FileEntry::new(
607 offset.0,
608 self.open.chunk.f.clone(),
609 ChunkPersistedCallback::new(
610 self.on_chunk_persisted.clone(),
611 Some(checkpoint.clone()),
612 ),
613 )))?;
614
615 let chunk = old_open.chunk;
616 let closed_id = chunk.chunk_id();
617 let closed = ClosedChunk::new(chunk, checkpoint.clone());
618 self.closed.insert(closed_id, closed);
619 Ok(Some(checkpoint.as_ref().clone()))
620 }
621
622 pub fn load_record(
636 &self,
637 chunk_id: &ChunkId,
638 segment: Segment,
639 ) -> Result<WALRecord<W>, io::Error> {
640 let record = {
643 let closed = self.closed.get(chunk_id).ok_or_else(|| {
644 io::Error::new(
645 io::ErrorKind::NotFound,
646 format!(
647 "Chunk not found: {}; when:(open cache-miss read)",
648 chunk_id
649 ),
650 )
651 })?;
652 closed.chunk.read_record(segment)?
653 };
654
655 Ok(record)
656 }
657}
658
659impl<W> WAL<WALRecord<W>> for ChunkedWal<W>
660where W: WalTypes
661{
662 fn append(&mut self, rec: &WALRecord<W>) -> Result<(), io::Error> {
663 self.open.append_record(rec)?;
664 Ok(())
665 }
666
667 fn last_segment(&self) -> Segment {
668 self.open.chunk.last_segment()
669 }
670}
671
672#[cfg(test)]
673mod tests {
674 use std::io;
675 use std::io::Seek;
676 use std::io::Write;
677 use std::sync::Arc;
678 use std::sync::Mutex;
679 use std::sync::mpsc::SyncSender;
680 use std::sync::mpsc::sync_channel;
681
682 use codeq::Encode;
683 use codeq::OffsetSize;
684
685 use crate::Chunk;
686 use crate::ChunkId;
687 use crate::ChunkPersisted;
688 use crate::ChunkPersistedFn;
689 use crate::ChunkedWal;
690 use crate::Config;
691 use crate::Segment;
692 use crate::StateMachine;
693 use crate::WAL;
694 use crate::WALRecord;
695 use crate::WalTypes;
696
697 #[derive(Debug, Default, Clone, PartialEq, Eq)]
698 struct TestWal;
699
700 impl WalTypes for TestWal {
701 type Action = String;
702 type Checkpoint = String;
703 type Callback = SyncSender<Result<(), io::Error>>;
704 }
705
706 #[derive(Debug, Default)]
707 struct TestStateMachine {
708 values: Vec<String>,
709 }
710
711 impl StateMachine<TestWal> for TestStateMachine {
712 type Error = io::Error;
713
714 fn apply(
715 &mut self,
716 record: &WALRecord<TestWal>,
717 _chunk_id: ChunkId,
718 _global_segment: crate::Segment,
719 ) -> Result<(), Self::Error> {
720 match record {
721 WALRecord::Action(v) => self.values.push(v.clone()),
722 WALRecord::Checkpoint(checkpoint) => {
723 self.values = decode_checkpoint(checkpoint);
724 }
725 }
726
727 Ok(())
728 }
729
730 fn checkpoint(&self) -> String {
731 encode_checkpoint(&self.values)
732 }
733 }
734
735 #[derive(Debug, Clone, PartialEq, Eq)]
736 struct PersistedCall {
737 starting_offset: u64,
738 synced_offset: u64,
739 checkpoint: Option<String>,
740 }
741
742 fn encode_checkpoint(values: &[String]) -> String {
743 values.join(",")
744 }
745
746 fn decode_checkpoint(checkpoint: &str) -> Vec<String> {
747 if checkpoint.is_empty() {
748 return Vec::new();
749 }
750
751 checkpoint.split(',').map(str::to_string).collect()
752 }
753
754 fn callback(
755 calls: Arc<Mutex<Vec<PersistedCall>>>,
756 ) -> ChunkPersistedFn<TestWal> {
757 Arc::new(
758 move |persisted: ChunkPersisted,
759 checkpoint: Option<Arc<String>>| {
760 calls.lock().unwrap().push(PersistedCall {
761 starting_offset: persisted.starting_offset,
762 synced_offset: persisted.synced_offset,
763 checkpoint: checkpoint.as_deref().cloned(),
764 });
765 },
766 )
767 }
768
769 fn open_wal(
770 config: &Config,
771 calls: Arc<Mutex<Vec<PersistedCall>>>,
772 ) -> Result<(ChunkedWal<TestWal>, TestStateMachine), io::Error> {
773 let mut sm = TestStateMachine::default();
774 let wal = ChunkedWal::open(
775 Arc::new(config.clone()),
776 &mut sm,
777 callback(calls),
778 )?;
779
780 Ok((wal, sm))
781 }
782
783 fn append_action(
784 wal: &mut ChunkedWal<TestWal>,
785 sm: &mut TestStateMachine,
786 value: &str,
787 ) -> Result<crate::Segment, io::Error> {
788 let record = WALRecord::Action(value.to_string());
789 wal.append(&record)?;
790 let segment = wal.last_segment();
791 sm.apply(&record, wal.open.chunk.chunk_id(), segment)?;
792 wal.try_close_full_chunk(sm)?;
793 Ok(segment)
794 }
795
796 fn sync_flush(wal: &mut ChunkedWal<TestWal>) -> Result<(), io::Error> {
797 let (tx, rx) = sync_channel(1);
798 wal.send_pending(true, Some(tx))?;
799 rx.recv()
800 .map_err(|e| io::Error::other(format!("flush callback: {e}")))??;
801 wal.wait_worker_idle();
802 Ok(())
803 }
804
805 fn no_sync_flush(wal: &mut ChunkedWal<TestWal>) -> Result<(), io::Error> {
806 let (tx, rx) = sync_channel(1);
807 wal.send_pending(false, Some(tx))?;
808 rx.recv()
809 .map_err(|e| io::Error::other(format!("flush callback: {e}")))??;
810 wal.wait_worker_idle();
811 Ok(())
812 }
813
814 fn temp_config() -> (tempfile::TempDir, Config) {
815 let td = tempfile::tempdir().unwrap();
816 let config = Config::new(td.path().to_str().unwrap());
817 (td, config)
818 }
819
820 fn records_in_chunk(
821 config: &Config,
822 chunk_id: ChunkId,
823 ) -> Result<Vec<WALRecord<TestWal>>, io::Error> {
824 Chunk::<WALRecord<TestWal>>::dump(config, chunk_id)?
825 .into_iter()
826 .map(|res| res.map(|(_, record)| record))
827 .collect()
828 }
829
830 #[test]
831 fn test_open_append_flush_reopen() -> Result<(), io::Error> {
832 let (_td, config) = temp_config();
833
834 {
835 let calls = Arc::new(Mutex::new(Vec::new()));
836 let (mut wal, mut sm) = open_wal(&config, calls)?;
837
838 append_action(&mut wal, &mut sm, "a")?;
839 append_action(&mut wal, &mut sm, "b")?;
840 append_action(&mut wal, &mut sm, "c")?;
841 sync_flush(&mut wal)?;
842
843 assert_eq!(vec!["a", "b", "c"], sm.values);
844 assert!(wal.closed.is_empty());
845 assert_eq!(4, wal.open.chunk.records_count());
846 assert!(format!("{wal:?}").contains("ChunkedWal"));
847 }
848
849 {
850 let calls = Arc::new(Mutex::new(Vec::new()));
851 let (wal, sm) = open_wal(&config, calls)?;
852
853 assert_eq!(vec!["a", "b", "c"], sm.values);
854 assert!(wal.closed.is_empty());
855 assert_eq!(4, wal.open.chunk.records_count());
856 }
857
858 Ok(())
859 }
860
861 #[test]
862 fn test_list_chunk_ids_ignores_invalid_file_names() -> Result<(), io::Error>
863 {
864 let (_td, config) = temp_config();
865 std::fs::write(config.chunk_path(ChunkId(12)), [])?;
866 std::fs::write(format!("{}/not-a-chunk", config.dir), [])?;
867
868 let lock = ChunkedWal::<TestWal>::acquire_lock(&config)?;
869 let chunk_ids = ChunkedWal::<TestWal>::load_chunk_ids(&config, &lock)?;
870
871 assert_eq!(vec![ChunkId(12)], chunk_ids);
872 Ok(())
873 }
874
875 #[test]
876 fn test_rotate_chunk_writes_checkpoint() -> Result<(), io::Error> {
877 let (_td, mut config) = temp_config();
878 config.chunk_max_records = Some(3);
879
880 let calls = Arc::new(Mutex::new(Vec::new()));
881 let (mut wal, mut sm) = open_wal(&config, calls)?;
882
883 append_action(&mut wal, &mut sm, "a")?;
884 append_action(&mut wal, &mut sm, "b")?;
885 append_action(&mut wal, &mut sm, "c")?;
886 sync_flush(&mut wal)?;
887
888 assert_eq!(1, wal.closed.len());
889 assert_eq!(
890 "a,b",
891 wal.closed.first_key_value().unwrap().1.state.as_ref()
892 );
893
894 let records = records_in_chunk(&config, wal.open.chunk.chunk_id())?;
895 assert_eq!(
896 vec![
897 WALRecord::Checkpoint("a,b".to_string()),
898 WALRecord::Action("c".to_string()),
899 ],
900 records
901 );
902
903 Ok(())
904 }
905
906 #[test]
907 fn test_reopen_reuses_last_healthy_chunk() -> Result<(), io::Error> {
908 let (_td, mut config) = temp_config();
909 config.chunk_max_records = Some(3);
910
911 let open_chunk_id = {
912 let calls = Arc::new(Mutex::new(Vec::new()));
913 let (mut wal, mut sm) = open_wal(&config, calls)?;
914
915 for value in ["a", "b", "c", "d"] {
916 append_action(&mut wal, &mut sm, value)?;
917 }
918 sync_flush(&mut wal)?;
919
920 assert_eq!(2, wal.closed.len());
921 wal.open.chunk.chunk_id()
922 };
923
924 let calls = Arc::new(Mutex::new(Vec::new()));
925 let (wal, sm) = open_wal(&config, calls)?;
926
927 assert_eq!(vec!["a", "b", "c", "d"], sm.values);
928 assert_eq!(2, wal.closed.len());
929 assert_eq!(open_chunk_id, wal.open.chunk.chunk_id());
930 assert_eq!(1, wal.open.chunk.records_count());
931
932 Ok(())
933 }
934
935 #[test]
936 fn test_reopen_truncates_incomplete_last_record() -> Result<(), io::Error> {
937 let (_td, config) = temp_config();
938
939 let truncated_from = {
940 let calls = Arc::new(Mutex::new(Vec::new()));
941 let (mut wal, mut sm) = open_wal(&config, calls)?;
942
943 append_action(&mut wal, &mut sm, "a")?;
944 append_action(&mut wal, &mut sm, "b")?;
945 let segment = append_action(&mut wal, &mut sm, "c")?;
946 sync_flush(&mut wal)?;
947
948 let chunk_id = wal.open.chunk.chunk_id();
949 let f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
950 &config, chunk_id,
951 )?;
952 let damaged_len = segment.end().0 - chunk_id.offset() - 1;
953 f.set_len(damaged_len)?;
954 damaged_len
955 };
956
957 let calls = Arc::new(Mutex::new(Vec::new()));
958 let (wal, sm) = open_wal(&config, calls)?;
959
960 assert_eq!(vec!["a", "b"], sm.values);
961 assert_eq!(1, wal.closed.len());
962 assert_eq!(
963 Some(truncated_from),
964 wal.last_closed_chunk_truncated_file_size()
965 );
966 assert_eq!(
967 Some(truncated_from),
968 wal.closed.first_key_value().unwrap().1.chunk.truncated_file_size()
969 );
970 assert_eq!(
971 WALRecord::Checkpoint("a,b".to_string()),
972 wal.open.chunk.read_record(wal.open.chunk.last_segment())?
973 );
974
975 Ok(())
976 }
977
978 #[test]
979 fn test_reopen_truncates_trailing_zeroes() -> Result<(), io::Error> {
980 let (_td, config) = temp_config();
981
982 let original_len = {
983 let calls = Arc::new(Mutex::new(Vec::new()));
984 let (mut wal, mut sm) = open_wal(&config, calls)?;
985
986 append_action(&mut wal, &mut sm, "a")?;
987 append_action(&mut wal, &mut sm, "b")?;
988 sync_flush(&mut wal)?;
989
990 let chunk_id = wal.open.chunk.chunk_id();
991 let original_len = wal.open.chunk.global_end() - chunk_id.offset();
992 let mut f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
993 &config, chunk_id,
994 )?;
995 f.seek(io::SeekFrom::Start(original_len))?;
996 f.write_all(&[0, 0, 0])?;
997 original_len
998 };
999
1000 let calls = Arc::new(Mutex::new(Vec::new()));
1001 let (wal, sm) = open_wal(&config, calls)?;
1002
1003 assert_eq!(vec!["a", "b"], sm.values);
1004 assert_eq!(1, wal.closed.len());
1005 assert_eq!(
1006 Some(original_len + 3),
1007 wal.last_closed_chunk_truncated_file_size()
1008 );
1009 assert_eq!(
1010 Some(original_len + 3),
1011 wal.closed.first_key_value().unwrap().1.chunk.truncated_file_size()
1012 );
1013
1014 Ok(())
1015 }
1016
1017 #[test]
1018 fn test_reopen_rejects_damaged_trailing_checkpoint() -> Result<(), io::Error>
1019 {
1020 let (_td, config) = temp_config();
1021
1022 {
1023 let calls = Arc::new(Mutex::new(Vec::new()));
1024 let (mut wal, mut sm) = open_wal(&config, calls)?;
1025
1026 append_action(&mut wal, &mut sm, "a")?;
1027 append_action(&mut wal, &mut sm, "b")?;
1028 sync_flush(&mut wal)?;
1029
1030 let chunk_id = wal.open.chunk.chunk_id();
1031 let original_len = wal.open.chunk.global_end() - chunk_id.offset();
1032 let mut f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
1033 &config, chunk_id,
1034 )?;
1035 let mut damaged = Vec::new();
1036 WALRecord::<TestWal>::Checkpoint("bad".to_string())
1037 .encode(&mut damaged)?;
1038 *damaged.last_mut().unwrap() ^= 1;
1039
1040 f.seek(io::SeekFrom::Start(original_len))?;
1041 f.write_all(&damaged)?;
1042 }
1043
1044 let calls = Arc::new(Mutex::new(Vec::new()));
1045 let err = match open_wal(&config, calls) {
1046 Ok(_) => panic!("damaged checkpoint record must fail"),
1047 Err(err) => err,
1048 };
1049
1050 assert!(err.to_string().contains("decode Record at offset"));
1051
1052 Ok(())
1053 }
1054
1055 #[test]
1056 fn test_reopen_rejects_gap_between_chunks() -> Result<(), io::Error> {
1057 let (_td, mut config) = temp_config();
1058 config.chunk_max_records = Some(3);
1059
1060 {
1061 let calls = Arc::new(Mutex::new(Vec::new()));
1062 let (mut wal, mut sm) = open_wal(&config, calls)?;
1063
1064 append_action(&mut wal, &mut sm, "a")?;
1065 let truncated_segment = append_action(&mut wal, &mut sm, "b")?;
1066 append_action(&mut wal, &mut sm, "c")?;
1067 sync_flush(&mut wal)?;
1068
1069 let chunk_id = *wal.closed.first_key_value().unwrap().0;
1070 let f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
1071 &config, chunk_id,
1072 )?;
1073 let truncated_len = truncated_segment.end().0 - chunk_id.offset();
1074 f.set_len(truncated_len - 1)?;
1075 }
1076
1077 let calls = Arc::new(Mutex::new(Vec::new()));
1078 let err = open_wal(&config, calls).expect_err("chunk gap must fail");
1079
1080 assert!(err.to_string().contains("Gap between chunks"));
1081
1082 Ok(())
1083 }
1084
1085 #[test]
1086 fn test_on_chunk_persisted_called_on_recovery() -> Result<(), io::Error> {
1087 let (_td, mut config) = temp_config();
1088 config.chunk_max_records = Some(3);
1089
1090 {
1091 let calls = Arc::new(Mutex::new(Vec::new()));
1092 let (mut wal, mut sm) = open_wal(&config, calls)?;
1093
1094 for value in ["a", "b", "c", "d"] {
1095 append_action(&mut wal, &mut sm, value)?;
1096 }
1097 sync_flush(&mut wal)?;
1098 }
1099
1100 let calls = Arc::new(Mutex::new(Vec::new()));
1101 let (_wal, sm) = open_wal(&config, calls.clone())?;
1102
1103 assert_eq!(vec!["a", "b", "c", "d"], sm.values);
1104 assert_eq!(
1105 vec![None, Some("a,b".to_string()), Some("a,b,c,d".to_string()),],
1106 calls
1107 .lock()
1108 .unwrap()
1109 .iter()
1110 .map(|call| call.checkpoint.clone())
1111 .collect::<Vec<_>>()
1112 );
1113
1114 Ok(())
1115 }
1116
1117 #[test]
1118 fn test_on_chunk_persisted_tracks_rotated_file() -> Result<(), io::Error> {
1119 let (_td, mut config) = temp_config();
1120 config.chunk_max_records = Some(3);
1121
1122 let calls = Arc::new(Mutex::new(Vec::new()));
1123 let (mut wal, mut sm) = open_wal(&config, calls.clone())?;
1124
1125 append_action(&mut wal, &mut sm, "a")?;
1126 append_action(&mut wal, &mut sm, "b")?;
1127
1128 let open_start = wal.open.chunk.global_start();
1129 sync_flush(&mut wal)?;
1130
1131 assert!(calls.lock().unwrap().contains(&PersistedCall {
1132 starting_offset: open_start,
1133 synced_offset: wal.open.chunk.global_end(),
1134 checkpoint: Some("a,b".to_string()),
1135 }));
1136
1137 Ok(())
1138 }
1139
1140 #[test]
1141 fn test_loaded_chunk_accessors() -> Result<(), io::Error> {
1142 let (_td, mut config) = temp_config();
1143 config.chunk_max_records = Some(3);
1144
1145 let calls = Arc::new(Mutex::new(Vec::new()));
1146 let (mut wal, mut sm) = open_wal(&config, calls)?;
1147
1148 let segment_a = append_action(&mut wal, &mut sm, "a")?;
1149 append_action(&mut wal, &mut sm, "b")?;
1150 append_action(&mut wal, &mut sm, "c")?;
1151 sync_flush(&mut wal)?;
1152
1153 let open_chunk_id = wal.open_chunk_id();
1154 let closed_stats = wal.closed_chunk_stats();
1155 let open_stat = wal.open_chunk_stat(sm.checkpoint());
1156
1157 assert_eq!(1, closed_stats.len());
1158 assert_eq!(ChunkId(0), closed_stats[0].chunk_id);
1159 assert_eq!(3, closed_stats[0].records_count);
1160 assert_eq!("a,b", closed_stats[0].log_state);
1161 assert_eq!(open_chunk_id, open_stat.chunk_id);
1162 assert_eq!(2, open_stat.records_count);
1163 assert_eq!("a,b,c", open_stat.log_state);
1164 assert_eq!(open_stat.global_end, wal.on_disk_size());
1165 assert_eq!(None, wal.last_closed_chunk_truncated_file_size());
1166
1167 assert_eq!(
1168 WALRecord::Action("a".to_string()),
1169 wal.closed_chunk_reader().read_record(ChunkId(0), segment_a)?
1170 );
1171
1172 let err =
1173 wal.load_record(&ChunkId(999), Segment::new(999, 1)).unwrap_err();
1174 assert_eq!(io::ErrorKind::NotFound, err.kind());
1175 assert!(err.to_string().contains("Chunk not found"));
1176
1177 let mut dumped = Vec::new();
1178 wal.dump_loaded_records(|chunk_id, index, res| {
1179 dumped.push((chunk_id, index, res.map(|(_segment, rec)| rec)?));
1180 Ok(())
1181 })?;
1182
1183 assert_eq!(
1184 vec![
1185 (ChunkId(0), 0, WALRecord::Checkpoint(String::new())),
1186 (ChunkId(0), 1, WALRecord::Action("a".to_string())),
1187 (ChunkId(0), 2, WALRecord::Action("b".to_string())),
1188 (open_chunk_id, 0, WALRecord::Checkpoint("a,b".to_string())),
1189 (open_chunk_id, 1, WALRecord::Action("c".to_string())),
1190 ],
1191 dumped
1192 );
1193
1194 let drained =
1195 wal.drain_closed_chunks_while(|checkpoint| checkpoint == "a,b");
1196 assert_eq!(vec![ChunkId(0)], drained);
1197 assert!(wal.closed_chunk_stats().is_empty());
1198
1199 let path = config.chunk_path(ChunkId(0));
1200 assert!(std::path::Path::new(&path).exists());
1201 wal.send_remove_chunks(drained)?;
1202 wal.wait_worker_idle();
1203 assert!(!std::path::Path::new(&path).exists());
1204
1205 Ok(())
1206 }
1207
1208 #[test]
1209 fn test_drain_closed_chunks_while_stops_at_first_unmatched()
1210 -> Result<(), io::Error> {
1211 let (_td, mut config) = temp_config();
1212 config.chunk_max_records = Some(3);
1213
1214 let calls = Arc::new(Mutex::new(Vec::new()));
1215 let (mut wal, mut sm) = open_wal(&config, calls)?;
1216
1217 for value in ["a", "b", "c", "d", "e"] {
1218 append_action(&mut wal, &mut sm, value)?;
1219 }
1220 sync_flush(&mut wal)?;
1221
1222 let closed_before = wal
1223 .closed_chunk_stats()
1224 .into_iter()
1225 .map(|stat| (stat.chunk_id, stat.log_state))
1226 .collect::<Vec<_>>();
1227 assert_eq!(
1228 vec![
1229 (ChunkId(0), "a,b".to_string()),
1230 (ChunkId(26), "a,b,c,d".to_string()),
1231 ],
1232 closed_before
1233 );
1234
1235 let drained =
1236 wal.drain_closed_chunks_while(|checkpoint| checkpoint == "a,b");
1237 assert_eq!(vec![ChunkId(0)], drained);
1238
1239 let closed_after = wal
1240 .closed_chunk_stats()
1241 .into_iter()
1242 .map(|stat| (stat.chunk_id, stat.log_state))
1243 .collect::<Vec<_>>();
1244 assert_eq!(vec![(ChunkId(26), "a,b,c,d".to_string())], closed_after);
1245
1246 Ok(())
1247 }
1248
1249 #[test]
1250 fn test_lock_blocks_second_open_and_dump() -> Result<(), io::Error> {
1251 let (_td, config) = temp_config();
1252
1253 let calls = Arc::new(Mutex::new(Vec::new()));
1254 let (wal, _sm) = open_wal(&config, calls.clone())?;
1255
1256 let err = ChunkedWal::<TestWal>::acquire_lock(&config)
1257 .expect_err("second lock must fail");
1258 assert_eq!(io::ErrorKind::WouldBlock, err.kind());
1259
1260 drop(wal);
1261
1262 let lock = ChunkedWal::<TestWal>::acquire_lock(&config)?;
1263 let mut records = Vec::new();
1264 ChunkedWal::<TestWal>::dump_records(
1265 &config,
1266 &lock,
1267 |chunk_id, i, res| {
1268 records.push((chunk_id, i, res.map(|(_, record)| record)?));
1269 Ok(())
1270 },
1271 )?;
1272
1273 assert_eq!(
1274 vec![(ChunkId(0), 0, WALRecord::Checkpoint(String::new()))],
1275 records
1276 );
1277
1278 Ok(())
1279 }
1280
1281 #[test]
1282 fn test_flush_without_sync_writes_without_advancing_sync_id()
1283 -> Result<(), io::Error> {
1284 let (_td, config) = temp_config();
1285 let calls = Arc::new(Mutex::new(Vec::new()));
1286 let (mut wal, mut sm) = open_wal(&config, calls)?;
1287
1288 append_action(&mut wal, &mut sm, "a")?;
1289 append_action(&mut wal, &mut sm, "b")?;
1290 no_sync_flush(&mut wal)?;
1291
1292 assert_eq!(
1293 vec![(0, 0)],
1294 wal.get_stat()?
1295 .iter()
1296 .map(|stat| stat.offset_sync_id())
1297 .collect::<Vec<_>>()
1298 );
1299
1300 sync_flush(&mut wal)?;
1301
1302 assert!(
1303 wal.get_stat()?
1304 .iter()
1305 .any(|stat| stat.sync_id == wal.open.chunk.global_end())
1306 );
1307
1308 Ok(())
1309 }
1310}