cat_dev/fsemul/pcfs/sata/server/wal/
mod.rs1pub mod layer;
14mod mapper;
15
16use crate::{
17 errors::CatBridgeError,
18 fsemul::pcfs::sata::server::connection_flags::SataConnectionFlags,
19 net::models::{FromRequest, FromRequestParts, Request},
20};
21use bytes::Bytes;
22use fnv::FnvHashMap;
23use std::{
24 collections::VecDeque,
25 path::PathBuf,
26 time::{Duration, Instant},
27};
28use tokio::{
29 fs::File,
30 io::{AsyncWriteExt, BufWriter},
31 sync::mpsc::{
32 Receiver as BoundedReceiver, Sender as BoundedSender, channel as bounded_channel,
33 },
34 task::Builder as TaskBuilder,
35 time::sleep,
36};
37use tracing::error;
38
39#[derive(Clone, Debug)]
46pub struct WriteAheadLog {
47 logger: BoundedSender<WriteAheadLogMessage>,
48}
49
50impl WriteAheadLog {
51 pub fn new(wal: PathBuf) -> Result<Self, CatBridgeError> {
60 let (sender, receiver) = bounded_channel(8192);
61
62 TaskBuilder::new()
63 .name("cat_dev::fsemul::pcfs::sata::server::wal::write_to_log")
64 .spawn(async move {
65 process_wal(receiver, wal).await;
66 error!("WAL SHUTTING DOWN...");
67 })
68 .map_err(CatBridgeError::SpawnFailure)?;
69
70 Ok(Self { logger: sender })
71 }
72
73 pub async fn record_open_stream(&self, stream_id: u64) {
75 _ = self
76 .logger
77 .send(WriteAheadLogMessage::OpenStream(stream_id))
78 .await;
79 }
80
81 pub async fn record_close_stream(&self, stream_id: u64) {
83 _ = self
84 .logger
85 .send(WriteAheadLogMessage::CloseStream(stream_id))
86 .await;
87 }
88
89 pub async fn record_oob_file_write_read(&self, stream_id: u64, fd: i32, length: usize) {
91 _ = self
92 .logger
93 .send(WriteAheadLogMessage::WriteFileRead(stream_id, fd, length))
94 .await;
95 }
96
97 pub async fn record_request(&self, stream_id: u64, request: Bytes) {
99 _ = self
100 .logger
101 .send(WriteAheadLogMessage::StreamEvent(stream_id, false, request))
102 .await;
103 }
104
105 pub async fn record_response(&self, stream_id: u64, request: Bytes) {
107 _ = self
108 .logger
109 .send(WriteAheadLogMessage::StreamEvent(stream_id, true, request))
110 .await;
111 }
112}
113
114impl<State: Clone + Send + Sync + 'static> FromRequestParts<State> for Option<WriteAheadLog> {
115 async fn from_request_parts(parts: &mut Request<State>) -> Result<Self, CatBridgeError> {
116 Ok(parts.extensions().get::<WriteAheadLog>().cloned())
117 }
118}
119
120impl<State: Clone + Send + Sync + 'static> FromRequest<State> for Option<WriteAheadLog> {
121 async fn from_request(req: Request<State>) -> Result<Self, CatBridgeError> {
122 Ok(req.extensions_owned().remove::<WriteAheadLog>())
123 }
124}
125
126#[derive(Clone, Debug)]
128enum WriteAheadLogMessage {
129 OpenStream(u64),
131 CloseStream(u64),
133 WriteFileRead(u64, i32, usize),
135 StreamEvent(u64, bool, Bytes),
137}
138
139#[allow(
140 clippy::too_many_lines,
142)]
143async fn process_wal(mut stream: BoundedReceiver<WriteAheadLogMessage>, path: PathBuf) {
144 let mut fd = BufWriter::new(match File::create_new(path).await {
145 Ok(fd) => fd,
146 Err(cause) => {
147 error!(?cause, "Failed to open WAL file, will not generate WAL!");
148 return;
149 }
150 });
151 let mut fd_map: FnvHashMap<u64, FnvHashMap<i32, String>> = FnvHashMap::default();
154 let mut folder_map: FnvHashMap<u64, FnvHashMap<i32, String>> = FnvHashMap::default();
155 let mut connection_flags: FnvHashMap<u64, SataConnectionFlags> = FnvHashMap::default();
156 let mut waiting_requests: FnvHashMap<u64, VecDeque<mapper::WaitingRequest>> =
157 FnvHashMap::default();
158
159 let mut last_flush = Instant::now();
162 let mut needs_flush = false;
163
164 loop {
165 let msg_opt;
166 tokio::select! {
167 opt = stream.recv() => {
168 msg_opt = opt;
169 }
170 () = sleep(Duration::from_secs(5)) => {
171 if needs_flush {
172 if let Err(cause) = fd.flush().await {
173 error!(?cause, "failed to flush WAL log for SATA!");
174 }
175 needs_flush = false;
176 }
177 continue;
178 }
179 }
180
181 let Some(msg) = msg_opt else {
182 break;
183 };
184 let current_time = Instant::now();
185 if current_time.duration_since(last_flush) > Duration::from_secs(3) {
186 if let Err(cause) = fd.flush().await {
187 error!(?cause, "failed to flush WAL log for SATA!");
188 }
189
190 last_flush = current_time;
191 needs_flush = false;
192 } else {
193 needs_flush = true;
194 }
195
196 let (stream_id, is_response, data) = match msg {
197 WriteAheadLogMessage::OpenStream(stream_id) => {
198 waiting_requests.insert(stream_id, VecDeque::with_capacity(1));
199 fd_map.insert(stream_id, FnvHashMap::default());
200 folder_map.insert(stream_id, FnvHashMap::default());
201 connection_flags.insert(stream_id, SataConnectionFlags::new());
202 continue;
203 }
204 WriteAheadLogMessage::CloseStream(stream_id) => {
205 waiting_requests.remove(&stream_id);
206 fd_map.remove(&stream_id);
207 folder_map.remove(&stream_id);
208 connection_flags.remove(&stream_id);
209 continue;
210 }
211 WriteAheadLogMessage::WriteFileRead(stream_id, file_desc, size) => {
212 if let Some(req_waiting) = waiting_requests.get_mut(&stream_id) {
213 if let Some(front) = req_waiting.front() {
215 match &front {
216 &mapper::WaitingRequest::WriteFile(path, _) => {
217 let final_path = if let Some(p) = fd_map
218 .get(&stream_id)
219 .expect("impossible: fd_map / waiting_request out of sync?")
220 .get(&file_desc)
221 {
222 p.to_owned()
223 } else {
224 "<Unknown; {file_desc}>".to_owned()
225 };
226
227 if path == &final_path {
228 if let Err(cause) =
229 fd.write_all(format!(" ->{size}\n").as_bytes()).await
230 {
231 error!(
232 ?cause,
233 "Failed to write request to WAL log! MAY BE INCOMPLETE!"
234 );
235 }
236 } else {
237 error!(
238 stream_id,
239 fd = file_desc,
240 size,
241 "Mismatched WriteFileRead????"
242 );
243 }
244 }
245 _ => {
246 error!(
247 stream_id,
248 fd = file_desc,
249 size,
250 "Got WriteFileRead for non write-file request???"
251 );
252 }
253 }
254 } else {
255 error!(
256 stream_id,
257 fd = file_desc,
258 size,
259 "Got WriteFileRead when not waiting a request???"
260 );
261 }
262 } else {
263 error!(
264 stream_id,
265 "Got WriteFileRead for stream that doesn't exist???"
266 );
267 }
268
269 continue;
270 }
271 WriteAheadLogMessage::StreamEvent(sid, isresp, data) => (sid, isresp, data),
272 };
273
274 if is_response {
275 let Some(list_mut) = waiting_requests.get_mut(&stream_id) else {
276 error!(
277 stream_id,
278 "got WAL message for stream that doesn't exist???"
279 );
280 return;
281 };
282 let Some(req) = list_mut.pop_front() else {
283 error!(
284 stream_id,
285 response = format!("{data:02x?}"),
286 "got WAL response for stream that is not waiting on request???"
287 );
288 return;
289 };
290 let fd_map_mut = fd_map
291 .get_mut(&stream_id)
292 .expect("impossible fd_map/waiting_requests out of sync!");
293 let folder_map_mut = folder_map
294 .get_mut(&stream_id)
295 .expect("impossible folder_map/waiting_requests out of sync!");
296 let conn_flags = connection_flags
297 .get(&stream_id)
298 .expect("impossible connection_flags/waiting_requests out of sync!");
299
300 let resp = mapper::WaitingResponse::parse(conn_flags, &req, data);
301 match resp {
302 mapper::WaitingResponse::OpenFile(fdres) => {
303 if let Ok(fd) = fdres.result()
304 && let mapper::WaitingRequest::OpenFile(path, _) = req
305 {
306 fd_map_mut.insert(fd, path);
307 }
308 }
309 mapper::WaitingResponse::OpenFolder(fdres) => {
310 if let Ok(fd) = fdres.result()
311 && let mapper::WaitingRequest::OpenFolder(path) = req
312 {
313 folder_map_mut.insert(fd, path);
314 }
315 }
316 mapper::WaitingResponse::CloseFile(rc) => {
317 if rc.0 == 0
318 && let mapper::WaitingRequest::CloseFile(fd, _) = req
319 {
320 fd_map_mut.remove(&fd);
321 }
322 }
323 mapper::WaitingResponse::CloseFolder(rc) => {
324 if rc.0 == 0
325 && let mapper::WaitingRequest::CloseFolder(fd, _) = req
326 {
327 folder_map_mut.remove(&fd);
328 }
329 }
330 mapper::WaitingResponse::Pong(ffio, csr) => {
331 conn_flags.set_ffio_enabled(ffio);
332 conn_flags.set_csr_enabled(csr);
333 }
334 _ => {}
335 }
336 if let Err(cause) = fd.write_all(format!("<-{resp}\n").as_bytes()).await {
337 error!(
338 ?cause,
339 "Failed to write response to WAL log! MAY BE INCOMPLETE!"
340 );
341 }
342 } else {
343 let Some(list_mut) = waiting_requests.get_mut(&stream_id) else {
344 error!(
345 stream_id,
346 "got WAL message for stream that doesn't exist???"
347 );
348 return;
349 };
350
351 let req = mapper::WaitingRequest::parse(
352 fd_map
353 .get(&stream_id)
354 .expect("impossible fd_map/folder_map/waiting_requests out of sync!"),
355 folder_map
356 .get(&stream_id)
357 .expect("impossible fd_map/folder_map/waiting_requests out of sync!"),
358 data,
359 );
360 if let Err(cause) = fd.write_all(format!("->{req}\n").as_bytes()).await {
361 error!(
362 ?cause,
363 "Failed to write request to WAL log! MAY BE INCOMPLETE!"
364 );
365 }
366 list_mut.push_back(req);
367 }
368 }
369
370 if let Err(cause) = fd.flush().await {
371 error!(?cause, "failed to flush WAL log for SATA!");
372 }
373}