chunked_wal/wal/
flush_request.rs1use std::fmt;
2use std::sync::mpsc::SyncSender;
3use std::time::Instant;
4
5use crate::WalTypes;
6use crate::wal::file_entry::FileEntry;
7
8pub(crate) struct SeqRequest<W>
15where W: WalTypes
16{
17 pub(crate) seq: u64,
18 pub(crate) queued_at: Instant,
19 pub(crate) req: WorkerRequest<W>,
20}
21
22impl<W> fmt::Debug for SeqRequest<W>
23where W: WalTypes
24{
25 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26 f.debug_struct("SeqRequest")
27 .field("seq", &self.seq)
28 .field("queued_at", &self.queued_at)
29 .finish_non_exhaustive()
30 }
31}
32
33pub(crate) struct WriteRequest<W>
34where W: WalTypes
35{
36 pub(crate) upto_offset: u64,
37 pub(crate) data: Vec<u8>,
38 pub(crate) sync: bool,
39 pub(crate) callback: Option<W::Callback>,
40}
41
42impl<W> fmt::Debug for WriteRequest<W>
43where W: WalTypes
44{
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 f.debug_struct("WriteRequest")
47 .field("upto_offset", &self.upto_offset)
48 .field("data_len", &self.data.len())
49 .field("sync", &self.sync)
50 .field("has_callback", &self.callback.is_some())
51 .finish()
52 }
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
56#[allow(dead_code)]
57pub struct FlushStat {
58 pub starting_offset: u64,
59 pub sync_id: u64,
60 pub ino: u64,
61}
62
63impl FlushStat {
64 #[allow(dead_code)]
65 pub fn offset_sync_id(&self) -> (u64, u64) {
66 (self.starting_offset, self.sync_id)
67 }
68}
69
70pub(crate) enum WorkerRequest<W>
71where W: WalTypes
72{
73 AppendFile(FileEntry<W>),
75
76 RemoveChunks { chunk_paths: Vec<String> },
81
82 Write(WriteRequest<W>),
84
85 #[allow(dead_code)]
87 GetFlushStat { tx: SyncSender<Vec<FlushStat>> },
88}
89
90impl<W> fmt::Debug for WorkerRequest<W>
91where W: WalTypes
92{
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 match self {
95 WorkerRequest::AppendFile(file_entry) => {
96 f.debug_tuple("AppendFile").field(file_entry).finish()
97 }
98 WorkerRequest::RemoveChunks { chunk_paths } => f
99 .debug_struct("RemoveChunks")
100 .field("chunk_paths", chunk_paths)
101 .finish(),
102 WorkerRequest::Write(write) => {
103 f.debug_tuple("Write").field(write).finish()
104 }
105 WorkerRequest::GetFlushStat { .. } => {
106 f.debug_struct("GetFlushStat").finish_non_exhaustive()
107 }
108 }
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use std::io;
115 use std::sync::Arc;
116 use std::sync::mpsc::SyncSender;
117 use std::sync::mpsc::sync_channel;
118 use std::time::Instant;
119
120 use crate::WalTypes;
121 use crate::wal::file_entry::FileEntry;
122 use crate::wal::file_persisted::ChunkPersistedCallback;
123 use crate::wal::file_persisted::ChunkPersistedFn;
124 use crate::wal::flush_request::FlushStat;
125 use crate::wal::flush_request::SeqRequest;
126 use crate::wal::flush_request::WorkerRequest;
127 use crate::wal::flush_request::WriteRequest;
128
129 #[derive(Debug, Default, Clone, PartialEq, Eq)]
130 struct TestWal;
131
132 impl WalTypes for TestWal {
133 type Action = String;
134 type Checkpoint = String;
135 type Callback = SyncSender<Result<(), io::Error>>;
136 }
137
138 fn callback() -> ChunkPersistedCallback<TestWal> {
139 let cb: ChunkPersistedFn<TestWal> = Arc::new(|_persisted, _state| {});
140 ChunkPersistedCallback::new(cb, None)
141 }
142
143 #[test]
144 fn test_flush_stat_offset_sync_id() {
145 let stat = FlushStat {
146 starting_offset: 12,
147 sync_id: 34,
148 ino: 56,
149 };
150
151 assert_eq!((12, 34), stat.offset_sync_id());
152 assert_eq!(
153 "FlushStat { starting_offset: 12, sync_id: 34, ino: 56 }",
154 format!("{stat:?}")
155 );
156 }
157
158 #[test]
159 fn test_request_debug() -> Result<(), io::Error> {
160 let (tx, _rx) = sync_channel(1);
161 let write = WriteRequest::<TestWal> {
162 upto_offset: 99,
163 data: vec![1, 2, 3],
164 sync: true,
165 callback: Some(tx),
166 };
167 assert_eq!(
168 "WriteRequest { upto_offset: 99, data_len: 3, sync: true, has_callback: true }",
169 format!("{write:?}")
170 );
171
172 let req = WorkerRequest::Write(write);
173 assert_eq!(
174 "Write(WriteRequest { upto_offset: 99, data_len: 3, sync: true, has_callback: true })",
175 format!("{req:?}")
176 );
177
178 let seq_req = SeqRequest {
179 seq: 7,
180 queued_at: Instant::now(),
181 req,
182 };
183 let seq_debug = format!("{seq_req:?}");
184 assert!(seq_debug.contains("SeqRequest"));
185 assert!(seq_debug.contains("seq: 7"));
186 assert!(seq_debug.contains(".."));
187 assert!(matches!(seq_req.req, WorkerRequest::Write(_)));
188
189 let remove = WorkerRequest::<TestWal>::RemoveChunks {
190 chunk_paths: vec!["a".to_string(), "b".to_string()],
191 };
192 assert_eq!(
193 "RemoveChunks { chunk_paths: [\"a\", \"b\"] }",
194 format!("{remove:?}")
195 );
196
197 let (tx, _rx) = sync_channel(1);
198 let stat = WorkerRequest::<TestWal>::GetFlushStat { tx };
199 assert_eq!("GetFlushStat { .. }", format!("{stat:?}"));
200
201 let file = Arc::new(tempfile::tempfile()?);
202 let append = WorkerRequest::AppendFile(FileEntry::<TestWal>::new(
203 12,
204 file,
205 callback(),
206 ));
207 assert_eq!(
208 "AppendFile(FileEntry { starting_offset: ChunkId(12), sync_id: 0 })",
209 format!("{append:?}")
210 );
211
212 Ok(())
213 }
214}