Skip to main content

chunked_wal/wal/
mod.rs

1pub mod callback;
2pub mod file_persisted;
3pub mod wal_record;
4
5pub(crate) mod atomic_flush_metrics;
6pub(crate) mod batch_metrics;
7mod closed_chunk_reader;
8pub(crate) mod file_entry;
9pub(crate) mod flush_request;
10pub(crate) mod flush_worker;
11pub(crate) mod queued_write;
12pub(crate) mod write_batch;
13
14use std::collections::BTreeMap;
15use std::fmt;
16use std::io;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU64;
19use std::sync::atomic::Ordering;
20use std::sync::mpsc::SyncSender;
21use std::time::Instant;
22
23pub use closed_chunk_reader::ClosedChunkReader;
24use codeq::OffsetSize;
25pub use flush_request::FlushStat;
26pub(crate) use flush_request::WorkerRequest;
27use log::info;
28
29use crate::Chunk;
30use crate::ChunkId;
31use crate::Config;
32use crate::WALRecord;
33use crate::WalLock;
34use crate::WalTypes;
35use crate::api::state_machine::StateMachine;
36use crate::api::wal::WAL;
37use crate::chunk::closed_chunk::ClosedChunk;
38use crate::chunk::open_chunk::OpenChunk;
39use crate::num::format_pad_u64;
40use crate::stat::ChunkStat;
41use crate::stat::FlushMetrics;
42use crate::types::Segment;
43use crate::wal::atomic_flush_metrics::AtomicFlushMetrics;
44use crate::wal::file_entry::FileEntry;
45use crate::wal::file_persisted::ChunkPersisted;
46use crate::wal::file_persisted::ChunkPersistedCallback;
47pub use crate::wal::file_persisted::ChunkPersistedFn;
48use crate::wal::flush_request::SeqRequest;
49use crate::wal::flush_request::WriteRequest;
50use crate::wal::flush_worker::FlushWorker;
51
52/// Chunked write-ahead log implementation.
53///
54/// This WAL implementation manages both open and closed chunks of data.
55/// An open chunk is actively being written to, while closed chunks are
56/// immutable and may be used for reading historical data.
57pub struct ChunkedWal<W>
58where W: WalTypes
59{
60    config: Arc<Config>,
61    open: OpenChunk<WALRecord<W>>,
62    closed: BTreeMap<ChunkId, ClosedChunk<W>>,
63
64    /// Sends user write operations to the flush worker.
65    ///
66    /// Each write operation may carry its own callback, defined by
67    /// `W::Callback`.
68    flush_tx: SyncSender<SeqRequest<W>>,
69
70    /// File-level callback invoked after fsync.
71    ///
72    /// This callback is called once for each synced chunk file.
73    on_chunk_persisted: ChunkPersistedFn<W>,
74
75    /// The next sequence number to assign. Incremented on each `send_request`.
76    /// Only accessed by the main thread, so a plain `u64` suffices.
77    sent_seq: u64,
78
79    /// Shared with `FlushWorker`; stores the highest completed seq.
80    done_seq: Arc<AtomicU64>,
81
82    /// Shared with `FlushWorker`; stores aggregated flush metrics.
83    flush_metrics: Arc<AtomicFlushMetrics>,
84
85    /// Holds the exclusive lock on the WAL directory for this WAL instance.
86    _dir_lock: WalLock,
87}
88
89impl<W> fmt::Debug for ChunkedWal<W>
90where W: WalTypes
91{
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        f.debug_struct("ChunkedWal")
94            .field("config", &self.config)
95            .field("open", &self.open)
96            .field("closed", &self.closed)
97            .field("sent_seq", &self.sent_seq)
98            .field("done_seq", &self.done_seq)
99            .field("flush_metrics", &self.flush_metrics)
100            .finish_non_exhaustive()
101    }
102}
103
104impl<W> ChunkedWal<W>
105where W: WalTypes
106{
107    /// Opens a ChunkedWal instance and replays existing records into a state
108    /// machine.
109    pub fn open<SM>(
110        config: Arc<Config>,
111        state_machine: &mut SM,
112        on_chunk_persisted: ChunkPersistedFn<W>,
113    ) -> Result<Self, io::Error>
114    where
115        SM: StateMachine<W>,
116    {
117        let dir_lock = Self::acquire_lock(&config)?;
118        Self::open_locked(config, state_machine, on_chunk_persisted, dir_lock)
119    }
120
121    /// Acquires the exclusive WAL directory lock.
122    pub fn acquire_lock(config: &Config) -> Result<WalLock, io::Error> {
123        WalLock::new(config)
124    }
125
126    /// Opens a ChunkedWal instance with an already-held WAL directory lock.
127    pub fn open_locked<SM>(
128        config: Arc<Config>,
129        state_machine: &mut SM,
130        on_chunk_persisted: ChunkPersistedFn<W>,
131        dir_lock: WalLock,
132    ) -> Result<Self, io::Error>
133    where
134        SM: StateMachine<W>,
135    {
136        let chunk_ids = Self::load_chunk_ids(&config, &dir_lock)?;
137
138        let mut closed = BTreeMap::new();
139        let mut prev_end_offset = None;
140        let mut prev_checkpoint = None;
141
142        for chunk_id in chunk_ids.iter().copied() {
143            Self::ensure_consecutive_chunks(prev_end_offset, chunk_id)?;
144
145            let (chunk, records) =
146                Chunk::<WALRecord<W>>::open(config.clone(), chunk_id)?;
147
148            on_chunk_persisted(
149                ChunkPersisted {
150                    file: chunk.f.clone(),
151                    starting_offset: chunk.global_start(),
152                    synced_offset: chunk.global_end(),
153                },
154                prev_checkpoint.clone(),
155            );
156
157            for (i, record) in records.iter().enumerate() {
158                let seg = chunk.record_segment(i);
159                state_machine
160                    .apply(record, chunk_id, seg)
161                    .map_err(|e| io::Error::other(e.to_string()))?;
162            }
163
164            prev_end_offset = Some(chunk.last_segment().end().0);
165            let checkpoint = Arc::new(state_machine.checkpoint());
166            prev_checkpoint = Some(checkpoint.clone());
167
168            closed.insert(chunk_id, ClosedChunk::new(chunk, checkpoint));
169        }
170
171        let open = Self::reopen_last_closed(&mut closed);
172
173        let open = if let Some(open) = open {
174            open
175        } else {
176            OpenChunk::create(
177                config.clone(),
178                ChunkId(prev_end_offset.unwrap_or_default()),
179                WALRecord::Checkpoint(state_machine.checkpoint()),
180            )?
181        };
182
183        Ok(Self::new(
184            config,
185            closed,
186            open,
187            on_chunk_persisted,
188            dir_lock,
189        ))
190    }
191
192    /// Dumps all records while holding the WAL directory lock.
193    pub fn dump_records<D>(
194        config: &Config,
195        _dir_lock: &WalLock,
196        mut write_record: D,
197    ) -> Result<(), io::Error>
198    where
199        D: FnMut(
200            ChunkId,
201            u64,
202            Result<(Segment, WALRecord<W>), io::Error>,
203        ) -> Result<(), io::Error>,
204    {
205        let chunk_ids = Self::load_chunk_ids(config, _dir_lock)?;
206        for chunk_id in chunk_ids {
207            let it = Chunk::<WALRecord<W>>::dump(config, chunk_id)?;
208            for (i, res) in it.into_iter().enumerate() {
209                write_record(chunk_id, i as u64, res)?;
210            }
211        }
212
213        Ok(())
214    }
215
216    /// Creates a new ChunkedWal instance after recovery has completed.
217    ///
218    /// # Arguments
219    ///
220    /// * `config` - Configuration for the WAL
221    /// * `closed` - Map of closed (immutable) chunks indexed by chunk ID
222    /// * `open` - The currently active chunk that can be written to
223    /// * `on_chunk_persisted` - Callback invoked after chunk data is persisted
224    fn new(
225        config: Arc<Config>,
226        closed: BTreeMap<ChunkId, ClosedChunk<W>>,
227        open: OpenChunk<WALRecord<W>>,
228        on_chunk_persisted: ChunkPersistedFn<W>,
229        dir_lock: WalLock,
230    ) -> Self {
231        let prev_checkpoint =
232            closed.iter().last().map(|(_, c)| c.state.clone());
233
234        let offset = open.chunk.global_start();
235        let f = open.chunk.f.clone();
236
237        let file_entry = FileEntry::new(
238            offset,
239            f,
240            ChunkPersistedCallback::new(
241                on_chunk_persisted.clone(),
242                prev_checkpoint,
243            ),
244        );
245
246        let done_seq = Arc::new(AtomicU64::new(0));
247        let flush_metrics = Arc::new(AtomicFlushMetrics::default());
248
249        let (flush_tx, rx) = std::sync::mpsc::sync_channel(1024);
250        let worker = FlushWorker::new(
251            rx,
252            file_entry,
253            done_seq.clone(),
254            flush_metrics.clone(),
255            config.flush_batch_wait(),
256            config.flush_batch_max_items(),
257        );
258
259        worker.spawn();
260
261        Self {
262            config,
263            open,
264            closed,
265            flush_tx,
266            on_chunk_persisted,
267            sent_seq: 0,
268            done_seq,
269            flush_metrics,
270            _dir_lock: dir_lock,
271        }
272    }
273
274    fn ensure_consecutive_chunks(
275        prev_end_offset: Option<u64>,
276        chunk_id: ChunkId,
277    ) -> Result<(), io::Error> {
278        let Some(prev_end) = prev_end_offset else {
279            return Ok(());
280        };
281
282        if prev_end != chunk_id.offset() {
283            let message = format!(
284                "Gap between chunks: {} -> {}; Can not open, \
285                        fix this error and re-open",
286                format_pad_u64(prev_end),
287                format_pad_u64(chunk_id.offset()),
288            );
289            return Err(io::Error::new(io::ErrorKind::InvalidData, message));
290        }
291
292        Ok(())
293    }
294
295    fn reopen_last_closed(
296        closed_chunks: &mut BTreeMap<ChunkId, ClosedChunk<W>>,
297    ) -> Option<OpenChunk<WALRecord<W>>> {
298        {
299            let (_chunk_id, closed) = closed_chunks.iter().last()?;
300
301            if closed.chunk.is_truncated() {
302                return None;
303            }
304        }
305
306        let (_chunk_id, last) = closed_chunks.pop_last().unwrap();
307        let open = OpenChunk::new(last.chunk);
308        Some(open)
309    }
310
311    pub fn load_chunk_ids(
312        config: &Config,
313        _dir_lock: &WalLock,
314    ) -> Result<Vec<ChunkId>, io::Error> {
315        let path = &config.dir;
316        let entries = std::fs::read_dir(path)?;
317        let mut chunk_ids = vec![];
318        for entry in entries {
319            let entry = entry?;
320            let file_name = entry.file_name();
321
322            let fn_str = file_name.to_string_lossy();
323            if fn_str == WalLock::LOCK_FILE_NAME {
324                continue;
325            }
326
327            let res = Config::parse_chunk_file_name(&fn_str);
328
329            match res {
330                Ok(offset) => {
331                    chunk_ids.push(ChunkId(offset));
332                }
333                Err(err) => {
334                    log::warn!(
335                        "Ignore invalid WAL file name: '{}': {}",
336                        fn_str,
337                        err
338                    );
339                    continue;
340                }
341            };
342        }
343
344        chunk_ids.sort();
345
346        Ok(chunk_ids)
347    }
348
349    pub fn open_chunk_id(&self) -> ChunkId {
350        self.open.chunk.chunk_id()
351    }
352
353    pub fn closed_chunk_stats(&self) -> Vec<ChunkStat<W::Checkpoint>> {
354        self.closed.values().map(|c| c.stat()).collect()
355    }
356
357    pub fn open_chunk_stat(
358        &self,
359        checkpoint: W::Checkpoint,
360    ) -> ChunkStat<W::Checkpoint> {
361        ChunkStat {
362            chunk_id: self.open.chunk.chunk_id(),
363            records_count: self.open.chunk.records_count() as u64,
364            global_start: self.open.chunk.global_start(),
365            global_end: self.open.chunk.global_end(),
366            size: self.open.chunk.chunk_size(),
367            log_state: checkpoint,
368        }
369    }
370
371    pub fn closed_chunk_reader(&self) -> ClosedChunkReader<W> {
372        ClosedChunkReader::new(self.closed.clone())
373    }
374
375    pub fn drain_closed_chunks_while<F>(
376        &mut self,
377        mut should_drain: F,
378    ) -> Vec<ChunkId>
379    where
380        F: FnMut(&W::Checkpoint) -> bool,
381    {
382        let mut chunk_ids = Vec::new();
383
384        while let Some((_chunk_id, closed)) = self.closed.first_key_value() {
385            if !should_drain(closed.state.as_ref()) {
386                break;
387            }
388
389            let (chunk_id, _closed) = self.closed.pop_first().unwrap();
390            chunk_ids.push(chunk_id);
391        }
392
393        chunk_ids
394    }
395
396    pub fn dump_loaded_records<D>(
397        &self,
398        mut write_record: D,
399    ) -> Result<(), io::Error>
400    where
401        D: FnMut(
402            ChunkId,
403            u64,
404            Result<(Segment, WALRecord<W>), io::Error>,
405        ) -> Result<(), io::Error>,
406    {
407        let closed = self.closed.keys().copied();
408        let chunk_ids = closed.chain([self.open.chunk.chunk_id()]);
409
410        for chunk_id in chunk_ids {
411            let f =
412                Chunk::<WALRecord<W>>::open_chunk_file(&self.config, chunk_id)?;
413
414            let it = Chunk::<WALRecord<W>>::load_records_iter(
415                &self.config,
416                Arc::new(f),
417                chunk_id,
418            )?;
419
420            for (i, res) in it.enumerate() {
421                write_record(chunk_id, i as u64, res)?;
422            }
423        }
424
425        Ok(())
426    }
427
428    pub fn on_disk_size(&self) -> u64 {
429        let end = self.open.chunk.global_end();
430        let open_start = self.open.chunk.global_start();
431        let first_closed_start = self
432            .closed
433            .first_key_value()
434            .map(|(_, v)| v.chunk.global_start())
435            .unwrap_or(open_start);
436
437        end - first_closed_start
438    }
439
440    pub fn last_closed_chunk_truncated_file_size(&self) -> Option<u64> {
441        self.closed
442            .last_key_value()
443            .and_then(|(_chunk_id, closed)| closed.chunk.truncated_file_size())
444    }
445
446    /// Wraps a `WorkerRequest` with an auto-incrementing seq and sends it to
447    /// the FlushWorker.
448    fn send_request(&mut self, req: WorkerRequest<W>) -> Result<(), io::Error> {
449        self.sent_seq += 1;
450        self.flush_tx
451            .send(SeqRequest {
452                seq: self.sent_seq,
453                queued_at: Instant::now(),
454                req,
455            })
456            .map_err(|e| {
457                io::Error::other(format!("Failed to send request: {}", e))
458            })
459    }
460
461    /// Block until the FlushWorker has processed all requests sent so far.
462    ///
463    /// Polls `done_seq` in a 1 ms sleep loop until it reaches `sent_seq`.
464    pub fn wait_worker_idle(&self) {
465        while self.done_seq.load(Ordering::Relaxed) < self.sent_seq {
466            std::thread::sleep(std::time::Duration::from_millis(1));
467        }
468    }
469
470    pub fn flush_metrics(&self) -> FlushMetrics {
471        self.flush_metrics.snapshot()
472    }
473
474    /// Hand the pending data buffer to the worker for writing.
475    ///
476    /// Drains `OpenChunk::pending_data` and packages it as a `WriteRequest`.
477    /// The worker always writes the bytes to the OS file. When `sync` is
478    /// `true` it also calls `fsync` so the data is on stable storage; when
479    /// `sync` is `false` it skips the fsync and durability is deferred to
480    /// the next sync write that lands in the same or a later batch.
481    pub fn send_pending(
482        &mut self,
483        sync: bool,
484        callback: Option<W::Callback>,
485    ) -> Result<(), io::Error> {
486        let data = self.open.take_pending_data();
487        self.send_request(WorkerRequest::Write(WriteRequest {
488            upto_offset: self.open.chunk.global_end(),
489            data,
490            sync,
491            callback,
492        }))
493    }
494
495    /// Requests removal of specified chunks.
496    ///
497    /// # Arguments
498    ///
499    /// * `chunk_ids` - IDs of chunk files to be removed
500    ///
501    /// # Errors
502    ///
503    /// Returns an IO error if the remove request cannot be sent
504    pub fn send_remove_chunks(
505        &mut self,
506        chunk_ids: Vec<ChunkId>,
507    ) -> Result<(), io::Error> {
508        let chunk_paths = chunk_ids
509            .into_iter()
510            .map(|chunk_id| self.config.chunk_path(chunk_id))
511            .collect();
512
513        self.send_request(WorkerRequest::RemoveChunks { chunk_paths })
514    }
515
516    #[allow(dead_code)]
517    pub fn get_stat(&mut self) -> Result<Vec<FlushStat>, io::Error> {
518        let (tx, rx) = std::sync::mpsc::sync_channel(1);
519        self.send_get_stat(tx)?;
520        rx.recv().map_err(|e| {
521            io::Error::other(format!(
522                "Failed to receive get state response: {}",
523                e
524            ))
525        })
526    }
527
528    #[allow(dead_code)]
529    pub(crate) fn send_get_stat(
530        &mut self,
531        callback: SyncSender<Vec<FlushStat>>,
532    ) -> Result<(), io::Error> {
533        self.send_request(WorkerRequest::GetFlushStat { tx: callback })
534    }
535
536    /// Checks if the current open chunk has reached its capacity.
537    ///
538    /// Returns true if either the maximum number of records or maximum chunk
539    /// size is reached.
540    pub fn is_open_chunk_full(&self) -> bool {
541        self.open.chunk.records_count() >= self.config.chunk_max_records()
542            || (self.open.chunk.chunk_size() as usize)
543                >= self.config.chunk_max_size()
544    }
545
546    /// Attempts to close the current chunk if it's full and creates a new open
547    /// chunk.
548    ///
549    /// # Arguments
550    ///
551    /// * `state_machine` - The state machine that provides the checkpoint to
552    ///   store at the start of the next chunk.
553    ///
554    /// # Returns
555    ///
556    /// Returns the checkpoint if a chunk was closed, None otherwise.
557    ///
558    /// # Errors
559    ///
560    /// Returns an IO error if chunk operations fail
561    pub fn try_close_full_chunk<SM>(
562        &mut self,
563        state_machine: &SM,
564    ) -> Result<Option<W::Checkpoint>, io::Error>
565    where
566        SM: StateMachine<W>,
567    {
568        if !self.is_open_chunk_full() {
569            return Ok(None);
570        }
571
572        let config = self.config.clone();
573        let offset = self.open.chunk.last_segment().end();
574
575        info!(
576            "Closing full chunk: {}, open new: {}",
577            self.open.chunk.chunk_id(),
578            ChunkId(offset.0)
579        );
580
581        let checkpoint = state_machine.checkpoint();
582
583        let new_open = {
584            let chunk_id = ChunkId(offset.0);
585            OpenChunk::create(
586                config,
587                chunk_id,
588                WALRecord::Checkpoint(checkpoint.clone()),
589            )?
590        };
591
592        let mut old_open = std::mem::replace(&mut self.open, new_open);
593
594        let prev_pending_data = old_open.take_pending_data();
595        if !prev_pending_data.is_empty() {
596            self.send_request(WorkerRequest::Write(WriteRequest {
597                upto_offset: offset.0,
598                data: prev_pending_data,
599                sync: true,
600                callback: None,
601            }))?;
602        }
603
604        let checkpoint = Arc::new(checkpoint);
605
606        self.send_request(WorkerRequest::AppendFile(FileEntry::new(
607            offset.0,
608            self.open.chunk.f.clone(),
609            ChunkPersistedCallback::new(
610                self.on_chunk_persisted.clone(),
611                Some(checkpoint.clone()),
612            ),
613        )))?;
614
615        let chunk = old_open.chunk;
616        let closed_id = chunk.chunk_id();
617        let closed = ClosedChunk::new(chunk, checkpoint.clone());
618        self.closed.insert(closed_id, closed);
619        Ok(Some(checkpoint.as_ref().clone()))
620    }
621
622    /// Loads a record from a closed chunk.
623    ///
624    /// # Arguments
625    ///
626    /// * `log_data` - Metadata about the log entry to load
627    ///
628    /// # Returns
629    ///
630    /// Returns the log payload if found
631    ///
632    /// # Errors
633    ///
634    /// Returns an IO error if the chunk is not found or reading fails
635    pub fn load_record(
636        &self,
637        chunk_id: &ChunkId,
638        segment: Segment,
639    ) -> Result<WALRecord<W>, io::Error> {
640        // All logs in the open chunk are served before this fallback.
641
642        let record = {
643            let closed = self.closed.get(chunk_id).ok_or_else(|| {
644                io::Error::new(
645                    io::ErrorKind::NotFound,
646                    format!(
647                        "Chunk not found: {}; when:(open cache-miss read)",
648                        chunk_id
649                    ),
650                )
651            })?;
652            closed.chunk.read_record(segment)?
653        };
654
655        Ok(record)
656    }
657}
658
659impl<W> WAL<WALRecord<W>> for ChunkedWal<W>
660where W: WalTypes
661{
662    fn append(&mut self, rec: &WALRecord<W>) -> Result<(), io::Error> {
663        self.open.append_record(rec)?;
664        Ok(())
665    }
666
667    fn last_segment(&self) -> Segment {
668        self.open.chunk.last_segment()
669    }
670}
671
672#[cfg(test)]
673mod tests {
674    use std::io;
675    use std::io::Seek;
676    use std::io::Write;
677    use std::sync::Arc;
678    use std::sync::Mutex;
679    use std::sync::mpsc::SyncSender;
680    use std::sync::mpsc::sync_channel;
681
682    use codeq::Encode;
683    use codeq::OffsetSize;
684
685    use crate::Chunk;
686    use crate::ChunkId;
687    use crate::ChunkPersisted;
688    use crate::ChunkPersistedFn;
689    use crate::ChunkedWal;
690    use crate::Config;
691    use crate::Segment;
692    use crate::StateMachine;
693    use crate::WAL;
694    use crate::WALRecord;
695    use crate::WalTypes;
696
697    #[derive(Debug, Default, Clone, PartialEq, Eq)]
698    struct TestWal;
699
700    impl WalTypes for TestWal {
701        type Action = String;
702        type Checkpoint = String;
703        type Callback = SyncSender<Result<(), io::Error>>;
704    }
705
706    #[derive(Debug, Default)]
707    struct TestStateMachine {
708        values: Vec<String>,
709    }
710
711    impl StateMachine<TestWal> for TestStateMachine {
712        type Error = io::Error;
713
714        fn apply(
715            &mut self,
716            record: &WALRecord<TestWal>,
717            _chunk_id: ChunkId,
718            _global_segment: crate::Segment,
719        ) -> Result<(), Self::Error> {
720            match record {
721                WALRecord::Action(v) => self.values.push(v.clone()),
722                WALRecord::Checkpoint(checkpoint) => {
723                    self.values = decode_checkpoint(checkpoint);
724                }
725            }
726
727            Ok(())
728        }
729
730        fn checkpoint(&self) -> String {
731            encode_checkpoint(&self.values)
732        }
733    }
734
735    #[derive(Debug, Clone, PartialEq, Eq)]
736    struct PersistedCall {
737        starting_offset: u64,
738        synced_offset: u64,
739        checkpoint: Option<String>,
740    }
741
742    fn encode_checkpoint(values: &[String]) -> String {
743        values.join(",")
744    }
745
746    fn decode_checkpoint(checkpoint: &str) -> Vec<String> {
747        if checkpoint.is_empty() {
748            return Vec::new();
749        }
750
751        checkpoint.split(',').map(str::to_string).collect()
752    }
753
754    fn callback(
755        calls: Arc<Mutex<Vec<PersistedCall>>>,
756    ) -> ChunkPersistedFn<TestWal> {
757        Arc::new(
758            move |persisted: ChunkPersisted,
759                  checkpoint: Option<Arc<String>>| {
760                calls.lock().unwrap().push(PersistedCall {
761                    starting_offset: persisted.starting_offset,
762                    synced_offset: persisted.synced_offset,
763                    checkpoint: checkpoint.as_deref().cloned(),
764                });
765            },
766        )
767    }
768
769    fn open_wal(
770        config: &Config,
771        calls: Arc<Mutex<Vec<PersistedCall>>>,
772    ) -> Result<(ChunkedWal<TestWal>, TestStateMachine), io::Error> {
773        let mut sm = TestStateMachine::default();
774        let wal = ChunkedWal::open(
775            Arc::new(config.clone()),
776            &mut sm,
777            callback(calls),
778        )?;
779
780        Ok((wal, sm))
781    }
782
783    fn append_action(
784        wal: &mut ChunkedWal<TestWal>,
785        sm: &mut TestStateMachine,
786        value: &str,
787    ) -> Result<crate::Segment, io::Error> {
788        let record = WALRecord::Action(value.to_string());
789        wal.append(&record)?;
790        let segment = wal.last_segment();
791        sm.apply(&record, wal.open.chunk.chunk_id(), segment)?;
792        wal.try_close_full_chunk(sm)?;
793        Ok(segment)
794    }
795
796    fn sync_flush(wal: &mut ChunkedWal<TestWal>) -> Result<(), io::Error> {
797        let (tx, rx) = sync_channel(1);
798        wal.send_pending(true, Some(tx))?;
799        rx.recv()
800            .map_err(|e| io::Error::other(format!("flush callback: {e}")))??;
801        wal.wait_worker_idle();
802        Ok(())
803    }
804
805    fn no_sync_flush(wal: &mut ChunkedWal<TestWal>) -> Result<(), io::Error> {
806        let (tx, rx) = sync_channel(1);
807        wal.send_pending(false, Some(tx))?;
808        rx.recv()
809            .map_err(|e| io::Error::other(format!("flush callback: {e}")))??;
810        wal.wait_worker_idle();
811        Ok(())
812    }
813
814    fn temp_config() -> (tempfile::TempDir, Config) {
815        let td = tempfile::tempdir().unwrap();
816        let config = Config::new(td.path().to_str().unwrap());
817        (td, config)
818    }
819
820    fn records_in_chunk(
821        config: &Config,
822        chunk_id: ChunkId,
823    ) -> Result<Vec<WALRecord<TestWal>>, io::Error> {
824        Chunk::<WALRecord<TestWal>>::dump(config, chunk_id)?
825            .into_iter()
826            .map(|res| res.map(|(_, record)| record))
827            .collect()
828    }
829
830    #[test]
831    fn test_open_append_flush_reopen() -> Result<(), io::Error> {
832        let (_td, config) = temp_config();
833
834        {
835            let calls = Arc::new(Mutex::new(Vec::new()));
836            let (mut wal, mut sm) = open_wal(&config, calls)?;
837
838            append_action(&mut wal, &mut sm, "a")?;
839            append_action(&mut wal, &mut sm, "b")?;
840            append_action(&mut wal, &mut sm, "c")?;
841            sync_flush(&mut wal)?;
842
843            assert_eq!(vec!["a", "b", "c"], sm.values);
844            assert!(wal.closed.is_empty());
845            assert_eq!(4, wal.open.chunk.records_count());
846            assert!(format!("{wal:?}").contains("ChunkedWal"));
847        }
848
849        {
850            let calls = Arc::new(Mutex::new(Vec::new()));
851            let (wal, sm) = open_wal(&config, calls)?;
852
853            assert_eq!(vec!["a", "b", "c"], sm.values);
854            assert!(wal.closed.is_empty());
855            assert_eq!(4, wal.open.chunk.records_count());
856        }
857
858        Ok(())
859    }
860
861    #[test]
862    fn test_list_chunk_ids_ignores_invalid_file_names() -> Result<(), io::Error>
863    {
864        let (_td, config) = temp_config();
865        std::fs::write(config.chunk_path(ChunkId(12)), [])?;
866        std::fs::write(format!("{}/not-a-chunk", config.dir), [])?;
867
868        let lock = ChunkedWal::<TestWal>::acquire_lock(&config)?;
869        let chunk_ids = ChunkedWal::<TestWal>::load_chunk_ids(&config, &lock)?;
870
871        assert_eq!(vec![ChunkId(12)], chunk_ids);
872        Ok(())
873    }
874
875    #[test]
876    fn test_rotate_chunk_writes_checkpoint() -> Result<(), io::Error> {
877        let (_td, mut config) = temp_config();
878        config.chunk_max_records = Some(3);
879
880        let calls = Arc::new(Mutex::new(Vec::new()));
881        let (mut wal, mut sm) = open_wal(&config, calls)?;
882
883        append_action(&mut wal, &mut sm, "a")?;
884        append_action(&mut wal, &mut sm, "b")?;
885        append_action(&mut wal, &mut sm, "c")?;
886        sync_flush(&mut wal)?;
887
888        assert_eq!(1, wal.closed.len());
889        assert_eq!(
890            "a,b",
891            wal.closed.first_key_value().unwrap().1.state.as_ref()
892        );
893
894        let records = records_in_chunk(&config, wal.open.chunk.chunk_id())?;
895        assert_eq!(
896            vec![
897                WALRecord::Checkpoint("a,b".to_string()),
898                WALRecord::Action("c".to_string()),
899            ],
900            records
901        );
902
903        Ok(())
904    }
905
906    #[test]
907    fn test_reopen_reuses_last_healthy_chunk() -> Result<(), io::Error> {
908        let (_td, mut config) = temp_config();
909        config.chunk_max_records = Some(3);
910
911        let open_chunk_id = {
912            let calls = Arc::new(Mutex::new(Vec::new()));
913            let (mut wal, mut sm) = open_wal(&config, calls)?;
914
915            for value in ["a", "b", "c", "d"] {
916                append_action(&mut wal, &mut sm, value)?;
917            }
918            sync_flush(&mut wal)?;
919
920            assert_eq!(2, wal.closed.len());
921            wal.open.chunk.chunk_id()
922        };
923
924        let calls = Arc::new(Mutex::new(Vec::new()));
925        let (wal, sm) = open_wal(&config, calls)?;
926
927        assert_eq!(vec!["a", "b", "c", "d"], sm.values);
928        assert_eq!(2, wal.closed.len());
929        assert_eq!(open_chunk_id, wal.open.chunk.chunk_id());
930        assert_eq!(1, wal.open.chunk.records_count());
931
932        Ok(())
933    }
934
935    #[test]
936    fn test_reopen_truncates_incomplete_last_record() -> Result<(), io::Error> {
937        let (_td, config) = temp_config();
938
939        let truncated_from = {
940            let calls = Arc::new(Mutex::new(Vec::new()));
941            let (mut wal, mut sm) = open_wal(&config, calls)?;
942
943            append_action(&mut wal, &mut sm, "a")?;
944            append_action(&mut wal, &mut sm, "b")?;
945            let segment = append_action(&mut wal, &mut sm, "c")?;
946            sync_flush(&mut wal)?;
947
948            let chunk_id = wal.open.chunk.chunk_id();
949            let f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
950                &config, chunk_id,
951            )?;
952            let damaged_len = segment.end().0 - chunk_id.offset() - 1;
953            f.set_len(damaged_len)?;
954            damaged_len
955        };
956
957        let calls = Arc::new(Mutex::new(Vec::new()));
958        let (wal, sm) = open_wal(&config, calls)?;
959
960        assert_eq!(vec!["a", "b"], sm.values);
961        assert_eq!(1, wal.closed.len());
962        assert_eq!(
963            Some(truncated_from),
964            wal.last_closed_chunk_truncated_file_size()
965        );
966        assert_eq!(
967            Some(truncated_from),
968            wal.closed.first_key_value().unwrap().1.chunk.truncated_file_size()
969        );
970        assert_eq!(
971            WALRecord::Checkpoint("a,b".to_string()),
972            wal.open.chunk.read_record(wal.open.chunk.last_segment())?
973        );
974
975        Ok(())
976    }
977
978    #[test]
979    fn test_reopen_truncates_trailing_zeroes() -> Result<(), io::Error> {
980        let (_td, config) = temp_config();
981
982        let original_len = {
983            let calls = Arc::new(Mutex::new(Vec::new()));
984            let (mut wal, mut sm) = open_wal(&config, calls)?;
985
986            append_action(&mut wal, &mut sm, "a")?;
987            append_action(&mut wal, &mut sm, "b")?;
988            sync_flush(&mut wal)?;
989
990            let chunk_id = wal.open.chunk.chunk_id();
991            let original_len = wal.open.chunk.global_end() - chunk_id.offset();
992            let mut f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
993                &config, chunk_id,
994            )?;
995            f.seek(io::SeekFrom::Start(original_len))?;
996            f.write_all(&[0, 0, 0])?;
997            original_len
998        };
999
1000        let calls = Arc::new(Mutex::new(Vec::new()));
1001        let (wal, sm) = open_wal(&config, calls)?;
1002
1003        assert_eq!(vec!["a", "b"], sm.values);
1004        assert_eq!(1, wal.closed.len());
1005        assert_eq!(
1006            Some(original_len + 3),
1007            wal.last_closed_chunk_truncated_file_size()
1008        );
1009        assert_eq!(
1010            Some(original_len + 3),
1011            wal.closed.first_key_value().unwrap().1.chunk.truncated_file_size()
1012        );
1013
1014        Ok(())
1015    }
1016
1017    #[test]
1018    fn test_reopen_rejects_damaged_trailing_checkpoint() -> Result<(), io::Error>
1019    {
1020        let (_td, config) = temp_config();
1021
1022        {
1023            let calls = Arc::new(Mutex::new(Vec::new()));
1024            let (mut wal, mut sm) = open_wal(&config, calls)?;
1025
1026            append_action(&mut wal, &mut sm, "a")?;
1027            append_action(&mut wal, &mut sm, "b")?;
1028            sync_flush(&mut wal)?;
1029
1030            let chunk_id = wal.open.chunk.chunk_id();
1031            let original_len = wal.open.chunk.global_end() - chunk_id.offset();
1032            let mut f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
1033                &config, chunk_id,
1034            )?;
1035            let mut damaged = Vec::new();
1036            WALRecord::<TestWal>::Checkpoint("bad".to_string())
1037                .encode(&mut damaged)?;
1038            *damaged.last_mut().unwrap() ^= 1;
1039
1040            f.seek(io::SeekFrom::Start(original_len))?;
1041            f.write_all(&damaged)?;
1042        }
1043
1044        let calls = Arc::new(Mutex::new(Vec::new()));
1045        let err = match open_wal(&config, calls) {
1046            Ok(_) => panic!("damaged checkpoint record must fail"),
1047            Err(err) => err,
1048        };
1049
1050        assert!(err.to_string().contains("decode Record at offset"));
1051
1052        Ok(())
1053    }
1054
1055    #[test]
1056    fn test_reopen_rejects_gap_between_chunks() -> Result<(), io::Error> {
1057        let (_td, mut config) = temp_config();
1058        config.chunk_max_records = Some(3);
1059
1060        {
1061            let calls = Arc::new(Mutex::new(Vec::new()));
1062            let (mut wal, mut sm) = open_wal(&config, calls)?;
1063
1064            append_action(&mut wal, &mut sm, "a")?;
1065            let truncated_segment = append_action(&mut wal, &mut sm, "b")?;
1066            append_action(&mut wal, &mut sm, "c")?;
1067            sync_flush(&mut wal)?;
1068
1069            let chunk_id = *wal.closed.first_key_value().unwrap().0;
1070            let f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
1071                &config, chunk_id,
1072            )?;
1073            let truncated_len = truncated_segment.end().0 - chunk_id.offset();
1074            f.set_len(truncated_len - 1)?;
1075        }
1076
1077        let calls = Arc::new(Mutex::new(Vec::new()));
1078        let err = open_wal(&config, calls).expect_err("chunk gap must fail");
1079
1080        assert!(err.to_string().contains("Gap between chunks"));
1081
1082        Ok(())
1083    }
1084
1085    #[test]
1086    fn test_on_chunk_persisted_called_on_recovery() -> Result<(), io::Error> {
1087        let (_td, mut config) = temp_config();
1088        config.chunk_max_records = Some(3);
1089
1090        {
1091            let calls = Arc::new(Mutex::new(Vec::new()));
1092            let (mut wal, mut sm) = open_wal(&config, calls)?;
1093
1094            for value in ["a", "b", "c", "d"] {
1095                append_action(&mut wal, &mut sm, value)?;
1096            }
1097            sync_flush(&mut wal)?;
1098        }
1099
1100        let calls = Arc::new(Mutex::new(Vec::new()));
1101        let (_wal, sm) = open_wal(&config, calls.clone())?;
1102
1103        assert_eq!(vec!["a", "b", "c", "d"], sm.values);
1104        assert_eq!(
1105            vec![None, Some("a,b".to_string()), Some("a,b,c,d".to_string()),],
1106            calls
1107                .lock()
1108                .unwrap()
1109                .iter()
1110                .map(|call| call.checkpoint.clone())
1111                .collect::<Vec<_>>()
1112        );
1113
1114        Ok(())
1115    }
1116
1117    #[test]
1118    fn test_on_chunk_persisted_tracks_rotated_file() -> Result<(), io::Error> {
1119        let (_td, mut config) = temp_config();
1120        config.chunk_max_records = Some(3);
1121
1122        let calls = Arc::new(Mutex::new(Vec::new()));
1123        let (mut wal, mut sm) = open_wal(&config, calls.clone())?;
1124
1125        append_action(&mut wal, &mut sm, "a")?;
1126        append_action(&mut wal, &mut sm, "b")?;
1127
1128        let open_start = wal.open.chunk.global_start();
1129        sync_flush(&mut wal)?;
1130
1131        assert!(calls.lock().unwrap().contains(&PersistedCall {
1132            starting_offset: open_start,
1133            synced_offset: wal.open.chunk.global_end(),
1134            checkpoint: Some("a,b".to_string()),
1135        }));
1136
1137        Ok(())
1138    }
1139
1140    #[test]
1141    fn test_loaded_chunk_accessors() -> Result<(), io::Error> {
1142        let (_td, mut config) = temp_config();
1143        config.chunk_max_records = Some(3);
1144
1145        let calls = Arc::new(Mutex::new(Vec::new()));
1146        let (mut wal, mut sm) = open_wal(&config, calls)?;
1147
1148        let segment_a = append_action(&mut wal, &mut sm, "a")?;
1149        append_action(&mut wal, &mut sm, "b")?;
1150        append_action(&mut wal, &mut sm, "c")?;
1151        sync_flush(&mut wal)?;
1152
1153        let open_chunk_id = wal.open_chunk_id();
1154        let closed_stats = wal.closed_chunk_stats();
1155        let open_stat = wal.open_chunk_stat(sm.checkpoint());
1156
1157        assert_eq!(1, closed_stats.len());
1158        assert_eq!(ChunkId(0), closed_stats[0].chunk_id);
1159        assert_eq!(3, closed_stats[0].records_count);
1160        assert_eq!("a,b", closed_stats[0].log_state);
1161        assert_eq!(open_chunk_id, open_stat.chunk_id);
1162        assert_eq!(2, open_stat.records_count);
1163        assert_eq!("a,b,c", open_stat.log_state);
1164        assert_eq!(open_stat.global_end, wal.on_disk_size());
1165        assert_eq!(None, wal.last_closed_chunk_truncated_file_size());
1166
1167        assert_eq!(
1168            WALRecord::Action("a".to_string()),
1169            wal.closed_chunk_reader().read_record(ChunkId(0), segment_a)?
1170        );
1171
1172        let err =
1173            wal.load_record(&ChunkId(999), Segment::new(999, 1)).unwrap_err();
1174        assert_eq!(io::ErrorKind::NotFound, err.kind());
1175        assert!(err.to_string().contains("Chunk not found"));
1176
1177        let mut dumped = Vec::new();
1178        wal.dump_loaded_records(|chunk_id, index, res| {
1179            dumped.push((chunk_id, index, res.map(|(_segment, rec)| rec)?));
1180            Ok(())
1181        })?;
1182
1183        assert_eq!(
1184            vec![
1185                (ChunkId(0), 0, WALRecord::Checkpoint(String::new())),
1186                (ChunkId(0), 1, WALRecord::Action("a".to_string())),
1187                (ChunkId(0), 2, WALRecord::Action("b".to_string())),
1188                (open_chunk_id, 0, WALRecord::Checkpoint("a,b".to_string())),
1189                (open_chunk_id, 1, WALRecord::Action("c".to_string())),
1190            ],
1191            dumped
1192        );
1193
1194        let drained =
1195            wal.drain_closed_chunks_while(|checkpoint| checkpoint == "a,b");
1196        assert_eq!(vec![ChunkId(0)], drained);
1197        assert!(wal.closed_chunk_stats().is_empty());
1198
1199        let path = config.chunk_path(ChunkId(0));
1200        assert!(std::path::Path::new(&path).exists());
1201        wal.send_remove_chunks(drained)?;
1202        wal.wait_worker_idle();
1203        assert!(!std::path::Path::new(&path).exists());
1204
1205        Ok(())
1206    }
1207
1208    #[test]
1209    fn test_drain_closed_chunks_while_stops_at_first_unmatched()
1210    -> Result<(), io::Error> {
1211        let (_td, mut config) = temp_config();
1212        config.chunk_max_records = Some(3);
1213
1214        let calls = Arc::new(Mutex::new(Vec::new()));
1215        let (mut wal, mut sm) = open_wal(&config, calls)?;
1216
1217        for value in ["a", "b", "c", "d", "e"] {
1218            append_action(&mut wal, &mut sm, value)?;
1219        }
1220        sync_flush(&mut wal)?;
1221
1222        let closed_before = wal
1223            .closed_chunk_stats()
1224            .into_iter()
1225            .map(|stat| (stat.chunk_id, stat.log_state))
1226            .collect::<Vec<_>>();
1227        assert_eq!(
1228            vec![
1229                (ChunkId(0), "a,b".to_string()),
1230                (ChunkId(26), "a,b,c,d".to_string()),
1231            ],
1232            closed_before
1233        );
1234
1235        let drained =
1236            wal.drain_closed_chunks_while(|checkpoint| checkpoint == "a,b");
1237        assert_eq!(vec![ChunkId(0)], drained);
1238
1239        let closed_after = wal
1240            .closed_chunk_stats()
1241            .into_iter()
1242            .map(|stat| (stat.chunk_id, stat.log_state))
1243            .collect::<Vec<_>>();
1244        assert_eq!(vec![(ChunkId(26), "a,b,c,d".to_string())], closed_after);
1245
1246        Ok(())
1247    }
1248
1249    #[test]
1250    fn test_lock_blocks_second_open_and_dump() -> Result<(), io::Error> {
1251        let (_td, config) = temp_config();
1252
1253        let calls = Arc::new(Mutex::new(Vec::new()));
1254        let (wal, _sm) = open_wal(&config, calls.clone())?;
1255
1256        let err = ChunkedWal::<TestWal>::acquire_lock(&config)
1257            .expect_err("second lock must fail");
1258        assert_eq!(io::ErrorKind::WouldBlock, err.kind());
1259
1260        drop(wal);
1261
1262        let lock = ChunkedWal::<TestWal>::acquire_lock(&config)?;
1263        let mut records = Vec::new();
1264        ChunkedWal::<TestWal>::dump_records(
1265            &config,
1266            &lock,
1267            |chunk_id, i, res| {
1268                records.push((chunk_id, i, res.map(|(_, record)| record)?));
1269                Ok(())
1270            },
1271        )?;
1272
1273        assert_eq!(
1274            vec![(ChunkId(0), 0, WALRecord::Checkpoint(String::new()))],
1275            records
1276        );
1277
1278        Ok(())
1279    }
1280
1281    #[test]
1282    fn test_flush_without_sync_writes_without_advancing_sync_id()
1283    -> Result<(), io::Error> {
1284        let (_td, config) = temp_config();
1285        let calls = Arc::new(Mutex::new(Vec::new()));
1286        let (mut wal, mut sm) = open_wal(&config, calls)?;
1287
1288        append_action(&mut wal, &mut sm, "a")?;
1289        append_action(&mut wal, &mut sm, "b")?;
1290        no_sync_flush(&mut wal)?;
1291
1292        assert_eq!(
1293            vec![(0, 0)],
1294            wal.get_stat()?
1295                .iter()
1296                .map(|stat| stat.offset_sync_id())
1297                .collect::<Vec<_>>()
1298        );
1299
1300        sync_flush(&mut wal)?;
1301
1302        assert!(
1303            wal.get_stat()?
1304                .iter()
1305                .any(|stat| stat.sync_id == wal.open.chunk.global_end())
1306        );
1307
1308        Ok(())
1309    }
1310}