cat_dev/fsemul/pcfs/sata/server/wal/
mod.rs

1//! A "WAL" style log for all sata requests, where before processing them we
2//! will write the operation that occured, along with the return code.
3//!
4//! The goal of this is to make it easy to get a full log of what the PCFS Sata
5//! server is doing, and where it might differ, without having to fully break
6//! into a scientists like API where we're doing full diffing between
7//! everything that's happening.
8//!
9//! THIS DOES HAVE A PERFORMANCE OVERHEAD, specifically we will be parsing
10//! every sata request twice (or attempting too). So if you're not in a place
11//! where you can spare that much CPU be aware.
12
13pub mod layer;
14mod mapper;
15
16use crate::{
17	errors::CatBridgeError,
18	fsemul::pcfs::sata::server::connection_flags::SataConnectionFlags,
19	net::models::{FromRequest, FromRequestParts, Request},
20};
21use bytes::Bytes;
22use fnv::FnvHashMap;
23use std::{
24	collections::VecDeque,
25	path::PathBuf,
26	time::{Duration, Instant},
27};
28use tokio::{
29	fs::File,
30	io::{AsyncWriteExt, BufWriter},
31	sync::mpsc::{
32		Receiver as BoundedReceiver, Sender as BoundedSender, channel as bounded_channel,
33	},
34	task::Builder as TaskBuilder,
35	time::sleep,
36};
37use tracing::error;
38
39/// A reference to a single unique 'write-ahead log' for SATA.
40///
41/// This keeps track of requests, and responses coming in and out of a sata
42/// server, to determine what is actually happening.
43///
44/// This WAL is expected to be from the 'servers' point of view, NOT the clients.
45#[derive(Clone, Debug)]
46pub struct WriteAheadLog {
47	logger: BoundedSender<WriteAheadLogMessage>,
48}
49
50impl WriteAheadLog {
51	/// Create a new WAL for sata requests/responses.
52	///
53	/// This is the 'entrypoint', and as a result will spawn the task
54	/// to start receiving messages.
55	///
56	/// ## Errors
57	///
58	/// If we cannot spawn the background task.
59	pub fn new(wal: PathBuf) -> Result<Self, CatBridgeError> {
60		let (sender, receiver) = bounded_channel(8192);
61
62		TaskBuilder::new()
63			.name("cat_dev::fsemul::pcfs::sata::server::wal::write_to_log")
64			.spawn(async move {
65				process_wal(receiver, wal).await;
66				error!("WAL SHUTTING DOWN...");
67			})
68			.map_err(CatBridgeError::SpawnFailure)?;
69
70		Ok(Self { logger: sender })
71	}
72
73	/// Communicate that a stream has opened up.
74	pub async fn record_open_stream(&self, stream_id: u64) {
75		_ = self
76			.logger
77			.send(WriteAheadLogMessage::OpenStream(stream_id))
78			.await;
79	}
80
81	/// Communicate that a stream has closed.
82	pub async fn record_close_stream(&self, stream_id: u64) {
83		_ = self
84			.logger
85			.send(WriteAheadLogMessage::CloseStream(stream_id))
86			.await;
87	}
88
89	/// Communicate that an out of band (e.g. nor equest/response flow) read for write file.
90	pub async fn record_oob_file_write_read(&self, stream_id: u64, fd: i32, length: usize) {
91		_ = self
92			.logger
93			.send(WriteAheadLogMessage::WriteFileRead(stream_id, fd, length))
94			.await;
95	}
96
97	/// Communicate that a request has come into the server.
98	pub async fn record_request(&self, stream_id: u64, request: Bytes) {
99		_ = self
100			.logger
101			.send(WriteAheadLogMessage::StreamEvent(stream_id, false, request))
102			.await;
103	}
104
105	/// Communicate that a response has come into the server.
106	pub async fn record_response(&self, stream_id: u64, request: Bytes) {
107		_ = self
108			.logger
109			.send(WriteAheadLogMessage::StreamEvent(stream_id, true, request))
110			.await;
111	}
112}
113
114impl<State: Clone + Send + Sync + 'static> FromRequestParts<State> for Option<WriteAheadLog> {
115	async fn from_request_parts(parts: &mut Request<State>) -> Result<Self, CatBridgeError> {
116		Ok(parts.extensions().get::<WriteAheadLog>().cloned())
117	}
118}
119
120impl<State: Clone + Send + Sync + 'static> FromRequest<State> for Option<WriteAheadLog> {
121	async fn from_request(req: Request<State>) -> Result<Self, CatBridgeError> {
122		Ok(req.extensions_owned().remove::<WriteAheadLog>())
123	}
124}
125
126/// A message sent over the logging channel to be written to the WAL.
127#[derive(Clone, Debug)]
128enum WriteAheadLogMessage {
129	/// A new TCP stream has been opened (stream id).
130	OpenStream(u64),
131	/// A TCP stream has closed (stream id).
132	CloseStream(u64),
133	/// An out of band Write File Response has been read in (stream id, fd, length read).
134	WriteFileRead(u64, i32, usize),
135	/// A regular stream request/response has happened (stream id, is response, data).
136	StreamEvent(u64, bool, Bytes),
137}
138
139#[allow(
140  // TODO(mythra): clean this up.
141  clippy::too_many_lines,
142)]
143async fn process_wal(mut stream: BoundedReceiver<WriteAheadLogMessage>, path: PathBuf) {
144	let mut fd = BufWriter::new(match File::create_new(path).await {
145		Ok(fd) => fd,
146		Err(cause) => {
147			error!(?cause, "Failed to open WAL file, will not generate WAL!");
148			return;
149		}
150	});
151	// A list of open fd's so we just write operations on which files they're happening on
152	// rather than an explicit fd.
153	let mut fd_map: FnvHashMap<u64, FnvHashMap<i32, String>> = FnvHashMap::default();
154	let mut folder_map: FnvHashMap<u64, FnvHashMap<i32, String>> = FnvHashMap::default();
155	let mut connection_flags: FnvHashMap<u64, SataConnectionFlags> = FnvHashMap::default();
156	let mut waiting_requests: FnvHashMap<u64, VecDeque<mapper::WaitingRequest>> =
157		FnvHashMap::default();
158
159	// When the last 'WriteAheadLog' is dropped, and thus the producer is closed,
160	// we will automatically save. This keeps us from running forever.
161	let mut last_flush = Instant::now();
162	let mut needs_flush = false;
163
164	loop {
165		let msg_opt;
166		tokio::select! {
167			opt = stream.recv() => {
168				msg_opt = opt;
169			}
170			() = sleep(Duration::from_secs(5)) => {
171				if needs_flush {
172					if let Err(cause) = fd.flush().await {
173						error!(?cause, "failed to flush WAL log for SATA!");
174					}
175					needs_flush = false;
176				}
177				continue;
178			}
179		}
180
181		let Some(msg) = msg_opt else {
182			break;
183		};
184		let current_time = Instant::now();
185		if current_time.duration_since(last_flush) > Duration::from_secs(3) {
186			if let Err(cause) = fd.flush().await {
187				error!(?cause, "failed to flush WAL log for SATA!");
188			}
189
190			last_flush = current_time;
191			needs_flush = false;
192		} else {
193			needs_flush = true;
194		}
195
196		let (stream_id, is_response, data) = match msg {
197			WriteAheadLogMessage::OpenStream(stream_id) => {
198				waiting_requests.insert(stream_id, VecDeque::with_capacity(1));
199				fd_map.insert(stream_id, FnvHashMap::default());
200				folder_map.insert(stream_id, FnvHashMap::default());
201				connection_flags.insert(stream_id, SataConnectionFlags::new());
202				continue;
203			}
204			WriteAheadLogMessage::CloseStream(stream_id) => {
205				waiting_requests.remove(&stream_id);
206				fd_map.remove(&stream_id);
207				folder_map.remove(&stream_id);
208				connection_flags.remove(&stream_id);
209				continue;
210			}
211			WriteAheadLogMessage::WriteFileRead(stream_id, file_desc, size) => {
212				if let Some(req_waiting) = waiting_requests.get_mut(&stream_id) {
213					// TODO(mythra): god this is messy, clean this shit up.
214					if let Some(front) = req_waiting.front() {
215						match &front {
216							&mapper::WaitingRequest::WriteFile(path, _) => {
217								let final_path = if let Some(p) = fd_map
218									.get(&stream_id)
219									.expect("impossible: fd_map / waiting_request out of sync?")
220									.get(&file_desc)
221								{
222									p.to_owned()
223								} else {
224									"<Unknown; {file_desc}>".to_owned()
225								};
226
227								if path == &final_path {
228									if let Err(cause) =
229										fd.write_all(format!("  ->{size}\n").as_bytes()).await
230									{
231										error!(
232											?cause,
233											"Failed to write request to WAL log! MAY BE INCOMPLETE!"
234										);
235									}
236								} else {
237									error!(
238										stream_id,
239										fd = file_desc,
240										size,
241										"Mismatched WriteFileRead????"
242									);
243								}
244							}
245							_ => {
246								error!(
247									stream_id,
248									fd = file_desc,
249									size,
250									"Got WriteFileRead for non write-file request???"
251								);
252							}
253						}
254					} else {
255						error!(
256							stream_id,
257							fd = file_desc,
258							size,
259							"Got WriteFileRead when not waiting a request???"
260						);
261					}
262				} else {
263					error!(
264						stream_id,
265						"Got WriteFileRead for stream that doesn't exist???"
266					);
267				}
268
269				continue;
270			}
271			WriteAheadLogMessage::StreamEvent(sid, isresp, data) => (sid, isresp, data),
272		};
273
274		if is_response {
275			let Some(list_mut) = waiting_requests.get_mut(&stream_id) else {
276				error!(
277					stream_id,
278					"got WAL message for stream that doesn't exist???"
279				);
280				return;
281			};
282			let Some(req) = list_mut.pop_front() else {
283				error!(
284					stream_id,
285					response = format!("{data:02x?}"),
286					"got WAL response for stream that is not waiting on request???"
287				);
288				return;
289			};
290			let fd_map_mut = fd_map
291				.get_mut(&stream_id)
292				.expect("impossible fd_map/waiting_requests out of sync!");
293			let folder_map_mut = folder_map
294				.get_mut(&stream_id)
295				.expect("impossible folder_map/waiting_requests out of sync!");
296			let conn_flags = connection_flags
297				.get(&stream_id)
298				.expect("impossible connection_flags/waiting_requests out of sync!");
299
300			let resp = mapper::WaitingResponse::parse(conn_flags, &req, data);
301			match resp {
302				mapper::WaitingResponse::OpenFile(fdres) => {
303					if let Ok(fd) = fdres.result()
304						&& let mapper::WaitingRequest::OpenFile(path, _) = req
305					{
306						fd_map_mut.insert(fd, path);
307					}
308				}
309				mapper::WaitingResponse::OpenFolder(fdres) => {
310					if let Ok(fd) = fdres.result()
311						&& let mapper::WaitingRequest::OpenFolder(path) = req
312					{
313						folder_map_mut.insert(fd, path);
314					}
315				}
316				mapper::WaitingResponse::CloseFile(rc) => {
317					if rc.0 == 0
318						&& let mapper::WaitingRequest::CloseFile(fd, _) = req
319					{
320						fd_map_mut.remove(&fd);
321					}
322				}
323				mapper::WaitingResponse::CloseFolder(rc) => {
324					if rc.0 == 0
325						&& let mapper::WaitingRequest::CloseFolder(fd, _) = req
326					{
327						folder_map_mut.remove(&fd);
328					}
329				}
330				mapper::WaitingResponse::Pong(ffio, csr) => {
331					conn_flags.set_ffio_enabled(ffio);
332					conn_flags.set_csr_enabled(csr);
333				}
334				_ => {}
335			}
336			if let Err(cause) = fd.write_all(format!("<-{resp}\n").as_bytes()).await {
337				error!(
338					?cause,
339					"Failed to write response to WAL log! MAY BE INCOMPLETE!"
340				);
341			}
342		} else {
343			let Some(list_mut) = waiting_requests.get_mut(&stream_id) else {
344				error!(
345					stream_id,
346					"got WAL message for stream that doesn't exist???"
347				);
348				return;
349			};
350
351			let req = mapper::WaitingRequest::parse(
352				fd_map
353					.get(&stream_id)
354					.expect("impossible fd_map/folder_map/waiting_requests out of sync!"),
355				folder_map
356					.get(&stream_id)
357					.expect("impossible fd_map/folder_map/waiting_requests out of sync!"),
358				data,
359			);
360			if let Err(cause) = fd.write_all(format!("->{req}\n").as_bytes()).await {
361				error!(
362					?cause,
363					"Failed to write request to WAL log! MAY BE INCOMPLETE!"
364				);
365			}
366			list_mut.push_back(req);
367		}
368	}
369
370	if let Err(cause) = fd.flush().await {
371		error!(?cause, "failed to flush WAL log for SATA!");
372	}
373}