Skip to main content

chunked_wal/wal/
flush_request.rs

1use std::fmt;
2use std::sync::mpsc::SyncSender;
3use std::time::Instant;
4
5use crate::WalTypes;
6use crate::wal::file_entry::FileEntry;
7
8/// A `WorkerRequest` tagged with a monotonically increasing sequence number.
9///
10/// The main thread assigns an incrementing `seq` to every request it sends.
11/// After processing a request, the FlushWorker stores the highest completed
12/// seq into a shared `AtomicU64`, allowing the main thread to wait until all
13/// sent requests have been processed.
14pub(crate) struct SeqRequest<W>
15where W: WalTypes
16{
17    pub(crate) seq: u64,
18    pub(crate) queued_at: Instant,
19    pub(crate) req: WorkerRequest<W>,
20}
21
22impl<W> fmt::Debug for SeqRequest<W>
23where W: WalTypes
24{
25    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26        f.debug_struct("SeqRequest")
27            .field("seq", &self.seq)
28            .field("queued_at", &self.queued_at)
29            .finish_non_exhaustive()
30    }
31}
32
33pub(crate) struct WriteRequest<W>
34where W: WalTypes
35{
36    pub(crate) upto_offset: u64,
37    pub(crate) data: Vec<u8>,
38    pub(crate) sync: bool,
39    pub(crate) callback: Option<W::Callback>,
40}
41
42impl<W> fmt::Debug for WriteRequest<W>
43where W: WalTypes
44{
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        f.debug_struct("WriteRequest")
47            .field("upto_offset", &self.upto_offset)
48            .field("data_len", &self.data.len())
49            .field("sync", &self.sync)
50            .field("has_callback", &self.callback.is_some())
51            .finish()
52    }
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
56#[allow(dead_code)]
57pub struct FlushStat {
58    pub starting_offset: u64,
59    pub sync_id: u64,
60    pub ino: u64,
61}
62
63impl FlushStat {
64    #[allow(dead_code)]
65    pub fn offset_sync_id(&self) -> (u64, u64) {
66        (self.starting_offset, self.sync_id)
67    }
68}
69
70pub(crate) enum WorkerRequest<W>
71where W: WalTypes
72{
73    /// Append a new file that will be need to be sync.
74    AppendFile(FileEntry<W>),
75
76    /// Remove chunks that have been purged.
77    ///
78    /// This job must be done in FlushWorker to ensure it is after the
79    /// corresponding purge record is flushed.
80    RemoveChunks { chunk_paths: Vec<String> },
81
82    /// Write data, and optionally sync all files.
83    Write(WriteRequest<W>),
84
85    /// For debug, return a list of offset and sync id of all files.
86    #[allow(dead_code)]
87    GetFlushStat { tx: SyncSender<Vec<FlushStat>> },
88}
89
90impl<W> fmt::Debug for WorkerRequest<W>
91where W: WalTypes
92{
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        match self {
95            WorkerRequest::AppendFile(file_entry) => {
96                f.debug_tuple("AppendFile").field(file_entry).finish()
97            }
98            WorkerRequest::RemoveChunks { chunk_paths } => f
99                .debug_struct("RemoveChunks")
100                .field("chunk_paths", chunk_paths)
101                .finish(),
102            WorkerRequest::Write(write) => {
103                f.debug_tuple("Write").field(write).finish()
104            }
105            WorkerRequest::GetFlushStat { .. } => {
106                f.debug_struct("GetFlushStat").finish_non_exhaustive()
107            }
108        }
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use std::io;
115    use std::sync::Arc;
116    use std::sync::mpsc::SyncSender;
117    use std::sync::mpsc::sync_channel;
118    use std::time::Instant;
119
120    use crate::WalTypes;
121    use crate::wal::file_entry::FileEntry;
122    use crate::wal::file_persisted::ChunkPersistedCallback;
123    use crate::wal::file_persisted::ChunkPersistedFn;
124    use crate::wal::flush_request::FlushStat;
125    use crate::wal::flush_request::SeqRequest;
126    use crate::wal::flush_request::WorkerRequest;
127    use crate::wal::flush_request::WriteRequest;
128
129    #[derive(Debug, Default, Clone, PartialEq, Eq)]
130    struct TestWal;
131
132    impl WalTypes for TestWal {
133        type Action = String;
134        type Checkpoint = String;
135        type Callback = SyncSender<Result<(), io::Error>>;
136    }
137
138    fn callback() -> ChunkPersistedCallback<TestWal> {
139        let cb: ChunkPersistedFn<TestWal> = Arc::new(|_persisted, _state| {});
140        ChunkPersistedCallback::new(cb, None)
141    }
142
143    #[test]
144    fn test_flush_stat_offset_sync_id() {
145        let stat = FlushStat {
146            starting_offset: 12,
147            sync_id: 34,
148            ino: 56,
149        };
150
151        assert_eq!((12, 34), stat.offset_sync_id());
152        assert_eq!(
153            "FlushStat { starting_offset: 12, sync_id: 34, ino: 56 }",
154            format!("{stat:?}")
155        );
156    }
157
158    #[test]
159    fn test_request_debug() -> Result<(), io::Error> {
160        let (tx, _rx) = sync_channel(1);
161        let write = WriteRequest::<TestWal> {
162            upto_offset: 99,
163            data: vec![1, 2, 3],
164            sync: true,
165            callback: Some(tx),
166        };
167        assert_eq!(
168            "WriteRequest { upto_offset: 99, data_len: 3, sync: true, has_callback: true }",
169            format!("{write:?}")
170        );
171
172        let req = WorkerRequest::Write(write);
173        assert_eq!(
174            "Write(WriteRequest { upto_offset: 99, data_len: 3, sync: true, has_callback: true })",
175            format!("{req:?}")
176        );
177
178        let seq_req = SeqRequest {
179            seq: 7,
180            queued_at: Instant::now(),
181            req,
182        };
183        let seq_debug = format!("{seq_req:?}");
184        assert!(seq_debug.contains("SeqRequest"));
185        assert!(seq_debug.contains("seq: 7"));
186        assert!(seq_debug.contains(".."));
187        assert!(matches!(seq_req.req, WorkerRequest::Write(_)));
188
189        let remove = WorkerRequest::<TestWal>::RemoveChunks {
190            chunk_paths: vec!["a".to_string(), "b".to_string()],
191        };
192        assert_eq!(
193            "RemoveChunks { chunk_paths: [\"a\", \"b\"] }",
194            format!("{remove:?}")
195        );
196
197        let (tx, _rx) = sync_channel(1);
198        let stat = WorkerRequest::<TestWal>::GetFlushStat { tx };
199        assert_eq!("GetFlushStat { .. }", format!("{stat:?}"));
200
201        let file = Arc::new(tempfile::tempfile()?);
202        let append = WorkerRequest::AppendFile(FileEntry::<TestWal>::new(
203            12,
204            file,
205            callback(),
206        ));
207        assert_eq!(
208            "AppendFile(FileEntry { starting_offset: ChunkId(12), sync_id: 0 })",
209            format!("{append:?}")
210        );
211
212        Ok(())
213    }
214}