Skip to main content

chunked_wal/wal/
mod.rs

1pub mod callback;
2pub mod file_persisted;
3pub mod wal_record;
4
5pub(crate) mod atomic_flush_metrics;
6pub(crate) mod batch_metrics;
7mod closed_chunk_reader;
8pub(crate) mod file_entry;
9pub(crate) mod flush_request;
10pub(crate) mod flush_worker;
11pub(crate) mod queued_write;
12pub(crate) mod write_batch;
13
14use std::collections::BTreeMap;
15use std::fmt;
16use std::io;
17use std::sync::Arc;
18use std::sync::mpsc::SyncSender;
19use std::time::Instant;
20
21pub use closed_chunk_reader::ClosedChunkReader;
22use codeq::OffsetSize;
23pub use flush_request::FlushStat;
24pub(crate) use flush_request::WorkerRequest;
25use log::info;
26
27use crate::Chunk;
28use crate::ChunkId;
29use crate::Config;
30use crate::WALRecord;
31use crate::WalLock;
32use crate::WalTypes;
33use crate::api::state_machine::StateMachine;
34use crate::api::wal::WAL;
35use crate::chunk::closed_chunk::ClosedChunk;
36use crate::chunk::open_chunk::OpenChunk;
37use crate::num::format_pad_u64;
38use crate::stat::ChunkStat;
39use crate::stat::FlushMetrics;
40use crate::types::Segment;
41use crate::wal::atomic_flush_metrics::AtomicFlushMetrics;
42use crate::wal::file_entry::FileEntry;
43use crate::wal::file_persisted::ChunkPersisted;
44use crate::wal::file_persisted::ChunkPersistedCallback;
45pub use crate::wal::file_persisted::ChunkPersistedFn;
46use crate::wal::flush_request::SeqRequest;
47use crate::wal::flush_request::WriteRequest;
48use crate::wal::flush_worker::FlushWorker;
49use crate::wal::flush_worker::WorkerState;
50
51/// Chunked write-ahead log implementation.
52///
53/// This WAL implementation manages both open and closed chunks of data.
54/// An open chunk is actively being written to, while closed chunks are
55/// immutable and may be used for reading historical data.
56pub struct ChunkedWal<W>
57where W: WalTypes
58{
59    config: Arc<Config>,
60    open: OpenChunk<WALRecord<W>>,
61    closed: BTreeMap<ChunkId, ClosedChunk<W>>,
62
63    /// Sends user write operations to the flush worker.
64    ///
65    /// Each write operation may carry its own callback, defined by
66    /// `W::Callback`.
67    flush_tx: SyncSender<SeqRequest<W>>,
68
69    /// File-level callback invoked after fsync.
70    ///
71    /// This callback is called once for each synced chunk file.
72    on_chunk_persisted: ChunkPersistedFn<W>,
73
74    /// The next sequence number to assign. Incremented on each `send_request`.
75    /// Only accessed by the main thread, so a plain `u64` suffices.
76    sent_seq: u64,
77
78    /// Shared with `FlushWorker`; stores completion and failure state.
79    worker_state: Arc<WorkerState>,
80
81    /// Shared with `FlushWorker`; stores aggregated flush metrics.
82    flush_metrics: Arc<AtomicFlushMetrics>,
83
84    /// Holds the exclusive lock on the WAL directory for this WAL instance.
85    _dir_lock: WalLock,
86}
87
88impl<W> fmt::Debug for ChunkedWal<W>
89where W: WalTypes
90{
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        f.debug_struct("ChunkedWal")
93            .field("config", &self.config)
94            .field("open", &self.open)
95            .field("closed", &self.closed)
96            .field("sent_seq", &self.sent_seq)
97            .field("done_seq", &self.worker_state.done_seq())
98            .field("flush_metrics", &self.flush_metrics)
99            .finish_non_exhaustive()
100    }
101}
102
103impl<W> ChunkedWal<W>
104where W: WalTypes
105{
106    /// Opens a ChunkedWal instance and replays existing records into a state
107    /// machine.
108    pub fn open<SM>(
109        config: Arc<Config>,
110        state_machine: &mut SM,
111        on_chunk_persisted: ChunkPersistedFn<W>,
112    ) -> Result<Self, io::Error>
113    where
114        SM: StateMachine<W>,
115    {
116        let dir_lock = Self::acquire_lock(&config)?;
117        Self::open_locked(config, state_machine, on_chunk_persisted, dir_lock)
118    }
119
120    /// Acquires the exclusive WAL directory lock.
121    pub fn acquire_lock(config: &Config) -> Result<WalLock, io::Error> {
122        WalLock::new(config)
123    }
124
125    /// Opens a ChunkedWal instance with an already-held WAL directory lock.
126    pub fn open_locked<SM>(
127        config: Arc<Config>,
128        state_machine: &mut SM,
129        on_chunk_persisted: ChunkPersistedFn<W>,
130        dir_lock: WalLock,
131    ) -> Result<Self, io::Error>
132    where
133        SM: StateMachine<W>,
134    {
135        let chunk_ids = Self::load_chunk_ids(&config, &dir_lock)?;
136
137        let mut closed = BTreeMap::new();
138        let mut prev_end_offset = None;
139        let mut prev_checkpoint = None;
140        let tail_chunk_id = chunk_ids.last().copied();
141
142        for chunk_id in chunk_ids.iter().copied() {
143            Self::ensure_consecutive_chunks(prev_end_offset, chunk_id)?;
144
145            let (chunk, records) = Chunk::<WALRecord<W>>::open_with_truncate(
146                config.clone(),
147                chunk_id,
148                Some(chunk_id) == tail_chunk_id,
149            )?;
150
151            on_chunk_persisted(
152                ChunkPersisted {
153                    file: chunk.f.clone(),
154                    starting_offset: chunk.global_start(),
155                    synced_offset: chunk.global_end(),
156                },
157                prev_checkpoint.clone(),
158            );
159
160            for (i, record) in records.iter().enumerate() {
161                let seg = chunk.record_segment(i);
162                state_machine
163                    .apply(record, chunk_id, seg)
164                    .map_err(|e| io::Error::other(e.to_string()))?;
165            }
166
167            prev_end_offset = Some(chunk.last_segment().end().0);
168            let checkpoint = Arc::new(state_machine.checkpoint());
169            prev_checkpoint = Some(checkpoint.clone());
170
171            closed.insert(chunk_id, ClosedChunk::new(chunk, checkpoint));
172        }
173
174        let open = Self::reopen_last_closed(&mut closed);
175
176        let open = if let Some(open) = open {
177            open
178        } else {
179            OpenChunk::create(
180                config.clone(),
181                ChunkId(prev_end_offset.unwrap_or_default()),
182                WALRecord::Checkpoint(state_machine.checkpoint()),
183            )?
184        };
185
186        Ok(Self::new(
187            config,
188            closed,
189            open,
190            on_chunk_persisted,
191            dir_lock,
192        ))
193    }
194
195    /// Dumps all records while holding the WAL directory lock.
196    pub fn dump_records<D>(
197        config: &Config,
198        _dir_lock: &WalLock,
199        mut write_record: D,
200    ) -> Result<(), io::Error>
201    where
202        D: FnMut(
203            ChunkId,
204            u64,
205            Result<(Segment, WALRecord<W>), io::Error>,
206        ) -> Result<(), io::Error>,
207    {
208        let chunk_ids = Self::load_chunk_ids(config, _dir_lock)?;
209        for chunk_id in chunk_ids {
210            let it = Chunk::<WALRecord<W>>::dump(config, chunk_id)?;
211            for (i, res) in it.into_iter().enumerate() {
212                write_record(chunk_id, i as u64, res)?;
213            }
214        }
215
216        Ok(())
217    }
218
219    /// Creates a new ChunkedWal instance after recovery has completed.
220    ///
221    /// # Arguments
222    ///
223    /// * `config` - Configuration for the WAL
224    /// * `closed` - Map of closed (immutable) chunks indexed by chunk ID
225    /// * `open` - The currently active chunk that can be written to
226    /// * `on_chunk_persisted` - Callback invoked after chunk data is persisted
227    fn new(
228        config: Arc<Config>,
229        closed: BTreeMap<ChunkId, ClosedChunk<W>>,
230        open: OpenChunk<WALRecord<W>>,
231        on_chunk_persisted: ChunkPersistedFn<W>,
232        dir_lock: WalLock,
233    ) -> Self {
234        let prev_checkpoint =
235            closed.iter().last().map(|(_, c)| c.state.clone());
236
237        let offset = open.chunk.global_start();
238        let f = open.chunk.f.clone();
239
240        let file_entry = FileEntry::new(
241            offset,
242            f,
243            ChunkPersistedCallback::new(
244                on_chunk_persisted.clone(),
245                prev_checkpoint,
246            ),
247        );
248
249        let worker_state = Arc::new(WorkerState::new());
250        let flush_metrics = Arc::new(AtomicFlushMetrics::default());
251
252        let (flush_tx, rx) = std::sync::mpsc::sync_channel(1024);
253        let worker = FlushWorker::new(
254            rx,
255            file_entry,
256            worker_state.clone(),
257            flush_metrics.clone(),
258            config.flush_batch_wait(),
259            config.flush_batch_max_items(),
260        );
261
262        worker.spawn();
263
264        Self {
265            config,
266            open,
267            closed,
268            flush_tx,
269            on_chunk_persisted,
270            sent_seq: 0,
271            worker_state,
272            flush_metrics,
273            _dir_lock: dir_lock,
274        }
275    }
276
277    fn ensure_consecutive_chunks(
278        prev_end_offset: Option<u64>,
279        chunk_id: ChunkId,
280    ) -> Result<(), io::Error> {
281        let Some(prev_end) = prev_end_offset else {
282            return Ok(());
283        };
284
285        if prev_end != chunk_id.offset() {
286            let message = format!(
287                "Gap between chunks: {} -> {}; Can not open, \
288                        fix this error and re-open",
289                format_pad_u64(prev_end),
290                format_pad_u64(chunk_id.offset()),
291            );
292            return Err(io::Error::new(io::ErrorKind::InvalidData, message));
293        }
294
295        Ok(())
296    }
297
298    fn reopen_last_closed(
299        closed_chunks: &mut BTreeMap<ChunkId, ClosedChunk<W>>,
300    ) -> Option<OpenChunk<WALRecord<W>>> {
301        {
302            let (_chunk_id, closed) = closed_chunks.iter().last()?;
303
304            if closed.chunk.is_truncated() {
305                return None;
306            }
307        }
308
309        let (_chunk_id, last) = closed_chunks.pop_last().unwrap();
310        let open = OpenChunk::new(last.chunk);
311        Some(open)
312    }
313
314    pub fn load_chunk_ids(
315        config: &Config,
316        _dir_lock: &WalLock,
317    ) -> Result<Vec<ChunkId>, io::Error> {
318        let path = &config.dir;
319        let entries = std::fs::read_dir(path)?;
320        let mut chunk_ids = vec![];
321        for entry in entries {
322            let entry = entry?;
323            let file_name = entry.file_name();
324
325            let fn_str = file_name.to_string_lossy();
326            if fn_str == WalLock::LOCK_FILE_NAME {
327                continue;
328            }
329
330            let res = Config::parse_chunk_file_name(&fn_str);
331
332            match res {
333                Ok(offset) => {
334                    chunk_ids.push(ChunkId(offset));
335                }
336                Err(err) => {
337                    log::warn!(
338                        "Ignore invalid WAL file name: '{}': {}",
339                        fn_str,
340                        err
341                    );
342                    continue;
343                }
344            };
345        }
346
347        chunk_ids.sort();
348
349        Ok(chunk_ids)
350    }
351
352    pub fn open_chunk_id(&self) -> ChunkId {
353        self.open.chunk.chunk_id()
354    }
355
356    pub fn closed_chunk_stats(&self) -> Vec<ChunkStat<W::Checkpoint>> {
357        self.closed.values().map(|c| c.stat()).collect()
358    }
359
360    pub fn open_chunk_stat(
361        &self,
362        checkpoint: W::Checkpoint,
363    ) -> ChunkStat<W::Checkpoint> {
364        ChunkStat {
365            chunk_id: self.open.chunk.chunk_id(),
366            records_count: self.open.chunk.records_count() as u64,
367            global_start: self.open.chunk.global_start(),
368            global_end: self.open.chunk.global_end(),
369            size: self.open.chunk.chunk_size(),
370            log_state: checkpoint,
371        }
372    }
373
374    pub fn closed_chunk_reader(&self) -> ClosedChunkReader<W> {
375        ClosedChunkReader::new(self.closed.clone())
376    }
377
378    pub fn drain_closed_chunks_while<F>(
379        &mut self,
380        mut should_drain: F,
381    ) -> Vec<ChunkId>
382    where
383        F: FnMut(&W::Checkpoint) -> bool,
384    {
385        let mut chunk_ids = Vec::new();
386
387        while let Some((_chunk_id, closed)) = self.closed.first_key_value() {
388            if !should_drain(closed.state.as_ref()) {
389                break;
390            }
391
392            let (chunk_id, _closed) = self.closed.pop_first().unwrap();
393            chunk_ids.push(chunk_id);
394        }
395
396        chunk_ids
397    }
398
399    pub fn dump_loaded_records<D>(
400        &self,
401        mut write_record: D,
402    ) -> Result<(), io::Error>
403    where
404        D: FnMut(
405            ChunkId,
406            u64,
407            Result<(Segment, WALRecord<W>), io::Error>,
408        ) -> Result<(), io::Error>,
409    {
410        let closed = self.closed.keys().copied();
411        let chunk_ids = closed.chain([self.open.chunk.chunk_id()]);
412
413        for chunk_id in chunk_ids {
414            let f =
415                Chunk::<WALRecord<W>>::open_chunk_file(&self.config, chunk_id)?;
416
417            let it = Chunk::<WALRecord<W>>::load_records_iter(
418                &self.config,
419                Arc::new(f),
420                chunk_id,
421            )?;
422
423            for (i, res) in it.enumerate() {
424                write_record(chunk_id, i as u64, res)?;
425            }
426        }
427
428        Ok(())
429    }
430
431    pub fn on_disk_size(&self) -> u64 {
432        let end = self.open.chunk.global_end();
433        let open_start = self.open.chunk.global_start();
434        let first_closed_start = self
435            .closed
436            .first_key_value()
437            .map(|(_, v)| v.chunk.global_start())
438            .unwrap_or(open_start);
439
440        end - first_closed_start
441    }
442
443    pub fn last_closed_chunk_truncated_file_size(&self) -> Option<u64> {
444        self.closed
445            .last_key_value()
446            .and_then(|(_chunk_id, closed)| closed.chunk.truncated_file_size())
447    }
448
449    /// Wraps a `WorkerRequest` with an auto-incrementing seq and sends it to
450    /// the FlushWorker.
451    fn send_request(&mut self, req: WorkerRequest<W>) -> Result<(), io::Error> {
452        self.sent_seq += 1;
453        self.flush_tx
454            .send(SeqRequest {
455                seq: self.sent_seq,
456                queued_at: Instant::now(),
457                req,
458            })
459            .map_err(|e| {
460                io::Error::other(format!("Failed to send request: {}", e))
461            })
462    }
463
464    /// Block until the FlushWorker has processed all requests sent so far.
465    pub fn wait_worker_idle(&self) -> Result<(), io::Error> {
466        self.worker_state.wait_for(self.sent_seq)
467    }
468
469    pub fn flush_metrics(&self) -> FlushMetrics {
470        self.flush_metrics.snapshot()
471    }
472
473    /// Hand the pending data buffer to the worker for writing.
474    ///
475    /// Drains `OpenChunk::pending_data` and packages it as a `WriteRequest`.
476    /// The worker always writes the bytes to the OS file. When `sync` is
477    /// `true` it also calls `fsync` so the data is on stable storage; when
478    /// `sync` is `false` it skips the fsync and durability is deferred to
479    /// the next sync write that lands in the same or a later batch.
480    pub fn send_pending(
481        &mut self,
482        sync: bool,
483        callback: Option<W::Callback>,
484    ) -> Result<(), io::Error> {
485        let data = self.open.take_pending_data();
486        self.send_request(WorkerRequest::Write(WriteRequest {
487            upto_offset: self.open.chunk.global_end(),
488            data,
489            sync,
490            callback,
491        }))
492    }
493
494    /// Requests removal of specified chunks.
495    ///
496    /// # Arguments
497    ///
498    /// * `chunk_ids` - IDs of chunk files to be removed
499    ///
500    /// # Errors
501    ///
502    /// Returns an IO error if the remove request cannot be sent
503    pub fn send_remove_chunks(
504        &mut self,
505        chunk_ids: Vec<ChunkId>,
506    ) -> Result<(), io::Error> {
507        let chunk_paths = chunk_ids
508            .into_iter()
509            .map(|chunk_id| self.config.chunk_path(chunk_id))
510            .collect();
511
512        self.send_request(WorkerRequest::RemoveChunks { chunk_paths })
513    }
514
515    #[allow(dead_code)]
516    pub fn get_stat(&mut self) -> Result<Vec<FlushStat>, io::Error> {
517        let (tx, rx) = std::sync::mpsc::sync_channel(1);
518        self.send_get_stat(tx)?;
519        rx.recv().map_err(|e| {
520            io::Error::other(format!(
521                "Failed to receive get state response: {}",
522                e
523            ))
524        })
525    }
526
527    #[allow(dead_code)]
528    pub(crate) fn send_get_stat(
529        &mut self,
530        callback: SyncSender<Vec<FlushStat>>,
531    ) -> Result<(), io::Error> {
532        self.send_request(WorkerRequest::GetFlushStat { tx: callback })
533    }
534
535    /// Checks if the current open chunk has reached its capacity.
536    ///
537    /// Returns true if either the maximum number of records or maximum chunk
538    /// size is reached.
539    pub fn is_open_chunk_full(&self) -> bool {
540        self.open.chunk.records_count() >= self.config.chunk_max_records()
541            || (self.open.chunk.chunk_size() as usize)
542                >= self.config.chunk_max_size()
543    }
544
545    /// Attempts to close the current chunk if it's full and creates a new open
546    /// chunk.
547    ///
548    /// # Arguments
549    ///
550    /// * `state_machine` - The state machine that provides the checkpoint to
551    ///   store at the start of the next chunk.
552    ///
553    /// # Returns
554    ///
555    /// Returns the checkpoint if a chunk was closed, None otherwise.
556    ///
557    /// # Errors
558    ///
559    /// Returns an IO error if chunk operations fail
560    pub fn try_close_full_chunk<SM>(
561        &mut self,
562        state_machine: &SM,
563    ) -> Result<Option<W::Checkpoint>, io::Error>
564    where
565        SM: StateMachine<W>,
566    {
567        if !self.is_open_chunk_full() {
568            return Ok(None);
569        }
570
571        let config = self.config.clone();
572        let offset = self.open.chunk.last_segment().end();
573
574        info!(
575            "Closing full chunk: {}, open new: {}",
576            self.open.chunk.chunk_id(),
577            ChunkId(offset.0)
578        );
579
580        let checkpoint = state_machine.checkpoint();
581
582        let new_open = {
583            let chunk_id = ChunkId(offset.0);
584            OpenChunk::create(
585                config,
586                chunk_id,
587                WALRecord::Checkpoint(checkpoint.clone()),
588            )?
589        };
590
591        let mut old_open = std::mem::replace(&mut self.open, new_open);
592
593        let prev_pending_data = old_open.take_pending_data();
594        if !prev_pending_data.is_empty() {
595            self.send_request(WorkerRequest::Write(WriteRequest {
596                upto_offset: offset.0,
597                data: prev_pending_data,
598                sync: true,
599                callback: None,
600            }))?;
601        }
602
603        let checkpoint = Arc::new(checkpoint);
604
605        self.send_request(WorkerRequest::AppendFile(FileEntry::new(
606            offset.0,
607            self.open.chunk.f.clone(),
608            ChunkPersistedCallback::new(
609                self.on_chunk_persisted.clone(),
610                Some(checkpoint.clone()),
611            ),
612        )))?;
613
614        let chunk = old_open.chunk;
615        let closed_id = chunk.chunk_id();
616        let closed = ClosedChunk::new(chunk, checkpoint.clone());
617        self.closed.insert(closed_id, closed);
618        Ok(Some(checkpoint.as_ref().clone()))
619    }
620
621    /// Loads a record from a closed chunk.
622    ///
623    /// # Arguments
624    ///
625    /// * `log_data` - Metadata about the log entry to load
626    ///
627    /// # Returns
628    ///
629    /// Returns the log payload if found
630    ///
631    /// # Errors
632    ///
633    /// Returns an IO error if the chunk is not found or reading fails
634    pub fn load_record(
635        &self,
636        chunk_id: &ChunkId,
637        segment: Segment,
638    ) -> Result<WALRecord<W>, io::Error> {
639        // All logs in the open chunk are served before this fallback.
640
641        let record = {
642            let closed = self.closed.get(chunk_id).ok_or_else(|| {
643                io::Error::new(
644                    io::ErrorKind::NotFound,
645                    format!(
646                        "Chunk not found: {}; when:(open cache-miss read)",
647                        chunk_id
648                    ),
649                )
650            })?;
651            closed.chunk.read_record(segment)?
652        };
653
654        Ok(record)
655    }
656}
657
658impl<W> WAL<WALRecord<W>> for ChunkedWal<W>
659where W: WalTypes
660{
661    fn append(&mut self, rec: &WALRecord<W>) -> Result<(), io::Error> {
662        self.open.append_record(rec)?;
663        Ok(())
664    }
665
666    fn last_segment(&self) -> Segment {
667        self.open.chunk.last_segment()
668    }
669}
670
671#[cfg(test)]
672mod tests {
673    use std::io;
674    use std::io::Seek;
675    use std::io::Write;
676    use std::sync::Arc;
677    use std::sync::Mutex;
678    use std::sync::mpsc::SyncSender;
679    use std::sync::mpsc::sync_channel;
680
681    use codeq::Decode;
682    use codeq::Encode;
683    use codeq::OffsetSize;
684
685    use crate::Chunk;
686    use crate::ChunkId;
687    use crate::ChunkPersisted;
688    use crate::ChunkPersistedFn;
689    use crate::ChunkedWal;
690    use crate::Config;
691    use crate::Segment;
692    use crate::StateMachine;
693    use crate::WAL;
694    use crate::WALRecord;
695    use crate::WalTypes;
696
697    const TEST_ACTION_TYPE: u32 = 1;
698
699    #[derive(Debug, Clone, PartialEq, Eq)]
700    struct TestAction(String);
701
702    impl Encode for TestAction {
703        fn encode<Wt: io::Write>(&self, mut w: Wt) -> Result<usize, io::Error> {
704            let mut n = TEST_ACTION_TYPE.encode(&mut w)?;
705            n += self.0.encode(&mut w)?;
706            Ok(n)
707        }
708
709        fn type_id(&self) -> Option<u32> {
710            Some(TEST_ACTION_TYPE)
711        }
712    }
713
714    impl Decode for TestAction {
715        fn decode<R: io::Read>(mut r: R) -> Result<Self, io::Error> {
716            let type_id = u32::decode(&mut r)?;
717            if type_id != TEST_ACTION_TYPE {
718                return Err(io::Error::new(
719                    io::ErrorKind::InvalidData,
720                    format!("unexpected action type id {}", type_id),
721                ));
722            }
723
724            Ok(Self(String::decode(&mut r)?))
725        }
726    }
727
728    #[derive(Debug, Default, Clone, PartialEq, Eq)]
729    struct TestWal;
730
731    impl WalTypes for TestWal {
732        type Action = TestAction;
733        type Checkpoint = String;
734        type Callback = SyncSender<Result<(), io::Error>>;
735    }
736
737    #[derive(Debug, Default)]
738    struct TestStateMachine {
739        values: Vec<String>,
740    }
741
742    impl StateMachine<TestWal> for TestStateMachine {
743        type Error = io::Error;
744
745        fn apply(
746            &mut self,
747            record: &WALRecord<TestWal>,
748            _chunk_id: ChunkId,
749            _global_segment: crate::Segment,
750        ) -> Result<(), Self::Error> {
751            match record {
752                WALRecord::Action(v) => self.values.push(v.0.clone()),
753                WALRecord::Checkpoint(checkpoint) => {
754                    self.values = decode_checkpoint(checkpoint);
755                }
756            }
757
758            Ok(())
759        }
760
761        fn checkpoint(&self) -> String {
762            encode_checkpoint(&self.values)
763        }
764    }
765
766    #[derive(Debug, Clone, PartialEq, Eq)]
767    struct PersistedCall {
768        starting_offset: u64,
769        synced_offset: u64,
770        checkpoint: Option<String>,
771    }
772
773    fn encode_checkpoint(values: &[String]) -> String {
774        values.join(",")
775    }
776
777    fn decode_checkpoint(checkpoint: &str) -> Vec<String> {
778        if checkpoint.is_empty() {
779            return Vec::new();
780        }
781
782        checkpoint.split(',').map(str::to_string).collect()
783    }
784
785    fn action(value: &str) -> WALRecord<TestWal> {
786        WALRecord::Action(TestAction(value.to_string()))
787    }
788
789    fn callback(
790        calls: Arc<Mutex<Vec<PersistedCall>>>,
791    ) -> ChunkPersistedFn<TestWal> {
792        Arc::new(
793            move |persisted: ChunkPersisted,
794                  checkpoint: Option<Arc<String>>| {
795                calls.lock().unwrap().push(PersistedCall {
796                    starting_offset: persisted.starting_offset,
797                    synced_offset: persisted.synced_offset,
798                    checkpoint: checkpoint.as_deref().cloned(),
799                });
800            },
801        )
802    }
803
804    fn open_wal(
805        config: &Config,
806        calls: Arc<Mutex<Vec<PersistedCall>>>,
807    ) -> Result<(ChunkedWal<TestWal>, TestStateMachine), io::Error> {
808        let mut sm = TestStateMachine::default();
809        let wal = ChunkedWal::open(
810            Arc::new(config.clone()),
811            &mut sm,
812            callback(calls),
813        )?;
814
815        Ok((wal, sm))
816    }
817
818    fn append_action(
819        wal: &mut ChunkedWal<TestWal>,
820        sm: &mut TestStateMachine,
821        value: &str,
822    ) -> Result<crate::Segment, io::Error> {
823        let record = action(value);
824        wal.append(&record)?;
825        let segment = wal.last_segment();
826        sm.apply(&record, wal.open.chunk.chunk_id(), segment)?;
827        wal.try_close_full_chunk(sm)?;
828        Ok(segment)
829    }
830
831    fn sync_flush(wal: &mut ChunkedWal<TestWal>) -> Result<(), io::Error> {
832        let (tx, rx) = sync_channel(1);
833        wal.send_pending(true, Some(tx))?;
834        rx.recv()
835            .map_err(|e| io::Error::other(format!("flush callback: {e}")))??;
836        wal.wait_worker_idle()?;
837        Ok(())
838    }
839
840    fn no_sync_flush(wal: &mut ChunkedWal<TestWal>) -> Result<(), io::Error> {
841        let (tx, rx) = sync_channel(1);
842        wal.send_pending(false, Some(tx))?;
843        rx.recv()
844            .map_err(|e| io::Error::other(format!("flush callback: {e}")))??;
845        wal.wait_worker_idle()?;
846        Ok(())
847    }
848
849    fn temp_config() -> (tempfile::TempDir, Config) {
850        let td = tempfile::tempdir().unwrap();
851        let config = Config::new(td.path().to_str().unwrap());
852        (td, config)
853    }
854
855    fn records_in_chunk(
856        config: &Config,
857        chunk_id: ChunkId,
858    ) -> Result<Vec<WALRecord<TestWal>>, io::Error> {
859        Chunk::<WALRecord<TestWal>>::dump(config, chunk_id)?
860            .into_iter()
861            .map(|res| res.map(|(_, record)| record))
862            .collect()
863    }
864
865    #[test]
866    fn test_open_append_flush_reopen() -> Result<(), io::Error> {
867        let (_td, config) = temp_config();
868
869        {
870            let calls = Arc::new(Mutex::new(Vec::new()));
871            let (mut wal, mut sm) = open_wal(&config, calls)?;
872
873            append_action(&mut wal, &mut sm, "a")?;
874            append_action(&mut wal, &mut sm, "b")?;
875            append_action(&mut wal, &mut sm, "c")?;
876            sync_flush(&mut wal)?;
877
878            assert_eq!(vec!["a", "b", "c"], sm.values);
879            assert!(wal.closed.is_empty());
880            assert_eq!(4, wal.open.chunk.records_count());
881            assert!(format!("{wal:?}").contains("ChunkedWal"));
882        }
883
884        {
885            let calls = Arc::new(Mutex::new(Vec::new()));
886            let (wal, sm) = open_wal(&config, calls)?;
887
888            assert_eq!(vec!["a", "b", "c"], sm.values);
889            assert!(wal.closed.is_empty());
890            assert_eq!(4, wal.open.chunk.records_count());
891        }
892
893        Ok(())
894    }
895
896    #[test]
897    fn test_list_chunk_ids_ignores_invalid_file_names() -> Result<(), io::Error>
898    {
899        let (_td, config) = temp_config();
900        std::fs::write(config.chunk_path(ChunkId(12)), [])?;
901        std::fs::write(format!("{}/not-a-chunk", config.dir), [])?;
902
903        let lock = ChunkedWal::<TestWal>::acquire_lock(&config)?;
904        let chunk_ids = ChunkedWal::<TestWal>::load_chunk_ids(&config, &lock)?;
905
906        assert_eq!(vec![ChunkId(12)], chunk_ids);
907        Ok(())
908    }
909
910    #[test]
911    fn test_rotate_chunk_writes_checkpoint() -> Result<(), io::Error> {
912        let (_td, mut config) = temp_config();
913        config.chunk_max_records = Some(3);
914
915        let calls = Arc::new(Mutex::new(Vec::new()));
916        let (mut wal, mut sm) = open_wal(&config, calls)?;
917
918        append_action(&mut wal, &mut sm, "a")?;
919        append_action(&mut wal, &mut sm, "b")?;
920        append_action(&mut wal, &mut sm, "c")?;
921        sync_flush(&mut wal)?;
922
923        assert_eq!(1, wal.closed.len());
924        assert_eq!(
925            "a,b",
926            wal.closed.first_key_value().unwrap().1.state.as_ref()
927        );
928
929        let records = records_in_chunk(&config, wal.open.chunk.chunk_id())?;
930        assert_eq!(
931            vec![WALRecord::Checkpoint("a,b".to_string()), action("c"),],
932            records
933        );
934
935        Ok(())
936    }
937
938    #[test]
939    fn test_reopen_reuses_last_healthy_chunk() -> Result<(), io::Error> {
940        let (_td, mut config) = temp_config();
941        config.chunk_max_records = Some(3);
942
943        let open_chunk_id = {
944            let calls = Arc::new(Mutex::new(Vec::new()));
945            let (mut wal, mut sm) = open_wal(&config, calls)?;
946
947            for value in ["a", "b", "c", "d"] {
948                append_action(&mut wal, &mut sm, value)?;
949            }
950            sync_flush(&mut wal)?;
951
952            assert_eq!(2, wal.closed.len());
953            wal.open.chunk.chunk_id()
954        };
955
956        let calls = Arc::new(Mutex::new(Vec::new()));
957        let (wal, sm) = open_wal(&config, calls)?;
958
959        assert_eq!(vec!["a", "b", "c", "d"], sm.values);
960        assert_eq!(2, wal.closed.len());
961        assert_eq!(open_chunk_id, wal.open.chunk.chunk_id());
962        assert_eq!(1, wal.open.chunk.records_count());
963
964        Ok(())
965    }
966
967    #[test]
968    fn test_reopen_truncates_incomplete_last_record() -> Result<(), io::Error> {
969        let (_td, config) = temp_config();
970
971        let truncated_from = {
972            let calls = Arc::new(Mutex::new(Vec::new()));
973            let (mut wal, mut sm) = open_wal(&config, calls)?;
974
975            append_action(&mut wal, &mut sm, "a")?;
976            append_action(&mut wal, &mut sm, "b")?;
977            let segment = append_action(&mut wal, &mut sm, "c")?;
978            sync_flush(&mut wal)?;
979
980            let chunk_id = wal.open.chunk.chunk_id();
981            let f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
982                &config, chunk_id,
983            )?;
984            let damaged_len = segment.end().0 - chunk_id.offset() - 1;
985            f.set_len(damaged_len)?;
986            damaged_len
987        };
988
989        let calls = Arc::new(Mutex::new(Vec::new()));
990        let (wal, sm) = open_wal(&config, calls)?;
991
992        assert_eq!(vec!["a", "b"], sm.values);
993        assert_eq!(1, wal.closed.len());
994        assert_eq!(
995            Some(truncated_from),
996            wal.last_closed_chunk_truncated_file_size()
997        );
998        assert_eq!(
999            Some(truncated_from),
1000            wal.closed.first_key_value().unwrap().1.chunk.truncated_file_size()
1001        );
1002        assert_eq!(
1003            WALRecord::Checkpoint("a,b".to_string()),
1004            wal.open.chunk.read_record(wal.open.chunk.last_segment())?
1005        );
1006
1007        Ok(())
1008    }
1009
1010    #[test]
1011    fn test_reopen_truncates_trailing_zeroes() -> Result<(), io::Error> {
1012        let (_td, config) = temp_config();
1013
1014        let original_len = {
1015            let calls = Arc::new(Mutex::new(Vec::new()));
1016            let (mut wal, mut sm) = open_wal(&config, calls)?;
1017
1018            append_action(&mut wal, &mut sm, "a")?;
1019            append_action(&mut wal, &mut sm, "b")?;
1020            sync_flush(&mut wal)?;
1021
1022            let chunk_id = wal.open.chunk.chunk_id();
1023            let original_len = wal.open.chunk.global_end() - chunk_id.offset();
1024            let mut f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
1025                &config, chunk_id,
1026            )?;
1027            f.seek(io::SeekFrom::Start(original_len))?;
1028            f.write_all(&[0, 0, 0])?;
1029            original_len
1030        };
1031
1032        let calls = Arc::new(Mutex::new(Vec::new()));
1033        let (wal, sm) = open_wal(&config, calls)?;
1034
1035        assert_eq!(vec!["a", "b"], sm.values);
1036        assert_eq!(1, wal.closed.len());
1037        assert_eq!(
1038            Some(original_len + 3),
1039            wal.last_closed_chunk_truncated_file_size()
1040        );
1041        assert_eq!(
1042            Some(original_len + 3),
1043            wal.closed.first_key_value().unwrap().1.chunk.truncated_file_size()
1044        );
1045
1046        Ok(())
1047    }
1048
1049    #[test]
1050    fn test_reopen_rejects_damaged_trailing_checkpoint() -> Result<(), io::Error>
1051    {
1052        let (_td, config) = temp_config();
1053
1054        {
1055            let calls = Arc::new(Mutex::new(Vec::new()));
1056            let (mut wal, mut sm) = open_wal(&config, calls)?;
1057
1058            append_action(&mut wal, &mut sm, "a")?;
1059            append_action(&mut wal, &mut sm, "b")?;
1060            sync_flush(&mut wal)?;
1061
1062            let chunk_id = wal.open.chunk.chunk_id();
1063            let original_len = wal.open.chunk.global_end() - chunk_id.offset();
1064            let mut f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
1065                &config, chunk_id,
1066            )?;
1067            let mut damaged = Vec::new();
1068            WALRecord::<TestWal>::Checkpoint("bad".to_string())
1069                .encode(&mut damaged)?;
1070            *damaged.last_mut().unwrap() ^= 1;
1071
1072            f.seek(io::SeekFrom::Start(original_len))?;
1073            f.write_all(&damaged)?;
1074        }
1075
1076        let calls = Arc::new(Mutex::new(Vec::new()));
1077        let err = match open_wal(&config, calls) {
1078            Ok(_) => panic!("damaged checkpoint record must fail"),
1079            Err(err) => err,
1080        };
1081
1082        assert!(err.to_string().contains("decode Record at offset"));
1083
1084        Ok(())
1085    }
1086
1087    #[test]
1088    fn test_reopen_rejects_damaged_non_tail_chunk_without_truncating()
1089    -> Result<(), io::Error> {
1090        let (_td, mut config) = temp_config();
1091        config.chunk_max_records = Some(3);
1092
1093        let (chunk_id, damaged_len) = {
1094            let calls = Arc::new(Mutex::new(Vec::new()));
1095            let (mut wal, mut sm) = open_wal(&config, calls)?;
1096
1097            append_action(&mut wal, &mut sm, "a")?;
1098            let truncated_segment = append_action(&mut wal, &mut sm, "b")?;
1099            append_action(&mut wal, &mut sm, "c")?;
1100            sync_flush(&mut wal)?;
1101
1102            let chunk_id = *wal.closed.first_key_value().unwrap().0;
1103            let f = Chunk::<WALRecord<TestWal>>::open_chunk_file(
1104                &config, chunk_id,
1105            )?;
1106            let truncated_len = truncated_segment.end().0 - chunk_id.offset();
1107            let damaged_len = truncated_len - 1;
1108            f.set_len(damaged_len)?;
1109            (chunk_id, damaged_len)
1110        };
1111
1112        let calls = Arc::new(Mutex::new(Vec::new()));
1113        let err = open_wal(&config, calls)
1114            .expect_err("damaged non-tail chunk must fail");
1115
1116        assert!(err.to_string().contains("decode Record at offset"));
1117
1118        let f =
1119            Chunk::<WALRecord<TestWal>>::open_chunk_file(&config, chunk_id)?;
1120        assert_eq!(damaged_len, f.metadata()?.len());
1121
1122        Ok(())
1123    }
1124
1125    #[test]
1126    fn test_on_chunk_persisted_called_on_recovery() -> Result<(), io::Error> {
1127        let (_td, mut config) = temp_config();
1128        config.chunk_max_records = Some(3);
1129
1130        {
1131            let calls = Arc::new(Mutex::new(Vec::new()));
1132            let (mut wal, mut sm) = open_wal(&config, calls)?;
1133
1134            for value in ["a", "b", "c", "d"] {
1135                append_action(&mut wal, &mut sm, value)?;
1136            }
1137            sync_flush(&mut wal)?;
1138        }
1139
1140        let calls = Arc::new(Mutex::new(Vec::new()));
1141        let (_wal, sm) = open_wal(&config, calls.clone())?;
1142
1143        assert_eq!(vec!["a", "b", "c", "d"], sm.values);
1144        assert_eq!(
1145            vec![None, Some("a,b".to_string()), Some("a,b,c,d".to_string()),],
1146            calls
1147                .lock()
1148                .unwrap()
1149                .iter()
1150                .map(|call| call.checkpoint.clone())
1151                .collect::<Vec<_>>()
1152        );
1153
1154        Ok(())
1155    }
1156
1157    #[test]
1158    fn test_on_chunk_persisted_tracks_rotated_file() -> Result<(), io::Error> {
1159        let (_td, mut config) = temp_config();
1160        config.chunk_max_records = Some(3);
1161
1162        let calls = Arc::new(Mutex::new(Vec::new()));
1163        let (mut wal, mut sm) = open_wal(&config, calls.clone())?;
1164
1165        append_action(&mut wal, &mut sm, "a")?;
1166        append_action(&mut wal, &mut sm, "b")?;
1167
1168        let open_start = wal.open.chunk.global_start();
1169        sync_flush(&mut wal)?;
1170
1171        assert!(calls.lock().unwrap().contains(&PersistedCall {
1172            starting_offset: open_start,
1173            synced_offset: wal.open.chunk.global_end(),
1174            checkpoint: Some("a,b".to_string()),
1175        }));
1176
1177        Ok(())
1178    }
1179
1180    #[test]
1181    fn test_loaded_chunk_accessors() -> Result<(), io::Error> {
1182        let (_td, mut config) = temp_config();
1183        config.chunk_max_records = Some(3);
1184
1185        let calls = Arc::new(Mutex::new(Vec::new()));
1186        let (mut wal, mut sm) = open_wal(&config, calls)?;
1187
1188        let segment_a = append_action(&mut wal, &mut sm, "a")?;
1189        append_action(&mut wal, &mut sm, "b")?;
1190        append_action(&mut wal, &mut sm, "c")?;
1191        sync_flush(&mut wal)?;
1192
1193        let open_chunk_id = wal.open_chunk_id();
1194        let closed_stats = wal.closed_chunk_stats();
1195        let open_stat = wal.open_chunk_stat(sm.checkpoint());
1196
1197        assert_eq!(1, closed_stats.len());
1198        assert_eq!(ChunkId(0), closed_stats[0].chunk_id);
1199        assert_eq!(3, closed_stats[0].records_count);
1200        assert_eq!("a,b", closed_stats[0].log_state);
1201        assert_eq!(open_chunk_id, open_stat.chunk_id);
1202        assert_eq!(2, open_stat.records_count);
1203        assert_eq!("a,b,c", open_stat.log_state);
1204        assert_eq!(open_stat.global_end, wal.on_disk_size());
1205        assert_eq!(None, wal.last_closed_chunk_truncated_file_size());
1206
1207        assert_eq!(
1208            action("a"),
1209            wal.closed_chunk_reader().read_record(ChunkId(0), segment_a)?
1210        );
1211
1212        let err =
1213            wal.load_record(&ChunkId(999), Segment::new(999, 1)).unwrap_err();
1214        assert_eq!(io::ErrorKind::NotFound, err.kind());
1215        assert!(err.to_string().contains("Chunk not found"));
1216
1217        let mut dumped = Vec::new();
1218        wal.dump_loaded_records(|chunk_id, index, res| {
1219            dumped.push((chunk_id, index, res.map(|(_segment, rec)| rec)?));
1220            Ok(())
1221        })?;
1222
1223        assert_eq!(
1224            vec![
1225                (ChunkId(0), 0, WALRecord::Checkpoint(String::new())),
1226                (ChunkId(0), 1, action("a")),
1227                (ChunkId(0), 2, action("b")),
1228                (open_chunk_id, 0, WALRecord::Checkpoint("a,b".to_string())),
1229                (open_chunk_id, 1, action("c")),
1230            ],
1231            dumped
1232        );
1233
1234        let drained =
1235            wal.drain_closed_chunks_while(|checkpoint| checkpoint == "a,b");
1236        assert_eq!(vec![ChunkId(0)], drained);
1237        assert!(wal.closed_chunk_stats().is_empty());
1238
1239        let path = config.chunk_path(ChunkId(0));
1240        assert!(std::path::Path::new(&path).exists());
1241        wal.send_remove_chunks(drained)?;
1242        wal.wait_worker_idle()?;
1243        assert!(!std::path::Path::new(&path).exists());
1244
1245        Ok(())
1246    }
1247
1248    #[test]
1249    fn test_drain_closed_chunks_while_stops_at_first_unmatched()
1250    -> Result<(), io::Error> {
1251        let (_td, mut config) = temp_config();
1252        config.chunk_max_records = Some(3);
1253
1254        let calls = Arc::new(Mutex::new(Vec::new()));
1255        let (mut wal, mut sm) = open_wal(&config, calls)?;
1256
1257        for value in ["a", "b", "c", "d", "e"] {
1258            append_action(&mut wal, &mut sm, value)?;
1259        }
1260        sync_flush(&mut wal)?;
1261
1262        let closed_before = wal
1263            .closed_chunk_stats()
1264            .into_iter()
1265            .map(|stat| (stat.chunk_id, stat.log_state))
1266            .collect::<Vec<_>>();
1267        assert_eq!(
1268            vec![
1269                (ChunkId(0), "a,b".to_string()),
1270                (ChunkId(34), "a,b,c,d".to_string()),
1271            ],
1272            closed_before
1273        );
1274
1275        let drained =
1276            wal.drain_closed_chunks_while(|checkpoint| checkpoint == "a,b");
1277        assert_eq!(vec![ChunkId(0)], drained);
1278
1279        let closed_after = wal
1280            .closed_chunk_stats()
1281            .into_iter()
1282            .map(|stat| (stat.chunk_id, stat.log_state))
1283            .collect::<Vec<_>>();
1284        assert_eq!(vec![(ChunkId(34), "a,b,c,d".to_string())], closed_after);
1285
1286        Ok(())
1287    }
1288
1289    #[test]
1290    fn test_lock_blocks_second_open_and_dump() -> Result<(), io::Error> {
1291        let (_td, config) = temp_config();
1292
1293        let calls = Arc::new(Mutex::new(Vec::new()));
1294        let (wal, _sm) = open_wal(&config, calls.clone())?;
1295
1296        let err = ChunkedWal::<TestWal>::acquire_lock(&config)
1297            .expect_err("second lock must fail");
1298        assert_eq!(io::ErrorKind::WouldBlock, err.kind());
1299
1300        drop(wal);
1301
1302        let lock = ChunkedWal::<TestWal>::acquire_lock(&config)?;
1303        let mut records = Vec::new();
1304        ChunkedWal::<TestWal>::dump_records(
1305            &config,
1306            &lock,
1307            |chunk_id, i, res| {
1308                records.push((chunk_id, i, res.map(|(_, record)| record)?));
1309                Ok(())
1310            },
1311        )?;
1312
1313        assert_eq!(
1314            vec![(ChunkId(0), 0, WALRecord::Checkpoint(String::new()))],
1315            records
1316        );
1317
1318        Ok(())
1319    }
1320
1321    #[test]
1322    fn test_flush_without_sync_writes_without_advancing_sync_id()
1323    -> Result<(), io::Error> {
1324        let (_td, config) = temp_config();
1325        let calls = Arc::new(Mutex::new(Vec::new()));
1326        let (mut wal, mut sm) = open_wal(&config, calls)?;
1327
1328        append_action(&mut wal, &mut sm, "a")?;
1329        append_action(&mut wal, &mut sm, "b")?;
1330        no_sync_flush(&mut wal)?;
1331
1332        assert_eq!(
1333            vec![(0, 0)],
1334            wal.get_stat()?
1335                .iter()
1336                .map(|stat| stat.offset_sync_id())
1337                .collect::<Vec<_>>()
1338        );
1339
1340        sync_flush(&mut wal)?;
1341
1342        assert!(
1343            wal.get_stat()?
1344                .iter()
1345                .any(|stat| stat.sync_id == wal.open.chunk.global_end())
1346        );
1347
1348        Ok(())
1349    }
1350
1351    #[test]
1352    fn test_worker_failure_wakes_waiter_and_fails_later_waits()
1353    -> Result<(), io::Error> {
1354        let (_td, config) = temp_config();
1355        let calls = Arc::new(Mutex::new(Vec::new()));
1356        let (mut wal, _sm) = open_wal(&config, calls)?;
1357
1358        wal.send_remove_chunks(vec![ChunkId(999)])?;
1359
1360        let err = wal.wait_worker_idle().unwrap_err();
1361        assert_eq!(io::ErrorKind::NotFound, err.kind());
1362
1363        let res = wal.send_remove_chunks(vec![ChunkId(999)]);
1364        if let Err(err) = res {
1365            assert_eq!(io::ErrorKind::Other, err.kind());
1366        }
1367
1368        let err = wal.wait_worker_idle().unwrap_err();
1369        assert_eq!(io::ErrorKind::NotFound, err.kind());
1370
1371        Ok(())
1372    }
1373}