Skip to main content

cat_dev/fsemul/pcfs/sata/server/
mod.rs

1//! Server implementation for SATA Server.
2
3mod change_mode;
4mod change_owner;
5mod close_file;
6mod close_folder;
7pub mod connection_flags;
8mod create_folder;
9mod get_file_position;
10mod info_by_query;
11mod open_file;
12mod open_folder;
13mod ping;
14mod read_file;
15mod read_folder;
16mod remove;
17mod rename;
18mod rewind_folder;
19mod set_file_position;
20pub mod wal;
21mod write_file;
22
23use crate::{
24	errors::{APIError, CatBridgeError},
25	fsemul::{
26		filesystem::host::HostFilesystem,
27		pcfs::sata::{
28			proto::SataRequest,
29			server::{
30				connection_flags::{
31					SATA_CONNECTION_FLAGS, SataConnectionFlags, SataConnectionFlagsLayer,
32				},
33				wal::{
34					WriteAheadLog,
35					layer::{WALBeginStreamLayer, WALEndStreamLayer, WALMessageLayer},
36				},
37			},
38		},
39	},
40	net::{
41		DEFAULT_CAT_DEV_CHUNK_SIZE,
42		additions::{RequestIDLayer, StreamIDLayer},
43		models::{Endianness, FromRef, NagleGuard, Response},
44		server::{Router, TCPServer, models::ResponseStreamEvent, requestable::Body},
45	},
46};
47use bytes::Bytes;
48use local_ip_address::local_ip;
49use std::{
50	net::{IpAddr, Ipv4Addr, SocketAddrV4},
51	path::PathBuf,
52	time::Duration,
53};
54use tower::ServiceBuilder;
55use tracing::{field::valuable, warn};
56use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit};
57
58/// The default port to use for hosting the SATA Server.
59pub const DEFAULT_SATA_PORT: u16 = 7500_u16;
60
61/// The 'state' of the server that is server wide.
62#[derive(Clone, Debug, Valuable)]
63pub struct PcfsServerState {
64	/// If we're not actually removing files, and preventing things from being removed.
65	disable_real_removal: bool,
66	/// The underlying host filesystem to write too.
67	host_filesystem: HostFilesystem,
68	/// Our process id.
69	pid: u32,
70}
71
72impl PcfsServerState {
73	#[must_use]
74	pub const fn new(
75		disable_real_removal: bool,
76		host_filesystem: HostFilesystem,
77		pid: u32,
78	) -> Self {
79		PcfsServerState {
80			disable_real_removal,
81			host_filesystem,
82			pid,
83		}
84	}
85
86	#[must_use]
87	pub const fn disable_real_removal(&self) -> bool {
88		self.disable_real_removal
89	}
90
91	#[must_use]
92	pub const fn host_filesystem(&self) -> &HostFilesystem {
93		&self.host_filesystem
94	}
95
96	#[must_use]
97	pub const fn pid(&self) -> u32 {
98		self.pid
99	}
100}
101
102impl FromRef<PcfsServerState> for HostFilesystem {
103	fn from_ref(input: &PcfsServerState) -> Self {
104		input.host_filesystem.clone()
105	}
106}
107
108impl FromRef<PcfsServerState> for u32 {
109	fn from_ref(input: &PcfsServerState) -> Self {
110		input.pid
111	}
112}
113
114#[allow(
115	// TODO(mythra): maybe refactor these into two variant enums.
116	clippy::struct_excessive_bools,
117)]
118#[derive(Clone, Debug)]
119pub struct PcfsSataServerBuilder {
120	/// The explicit address to bind too, otherwise a local ip will be fetched.
121	address: Option<Ipv4Addr>,
122	/// Allow overriding the amount of time that we wait to make sure everything
123	/// is safe for a cat-dev.
124	///
125	/// *note: `None` does not mean cat-dev sleep is disabled, that is controleld
126	/// through [`Self::fully_disable_cat_dev_sleep`]*.
127	cat_dev_sleep_override: Option<Duration>,
128	/// Override the chunking value, and how many bytes we'll purposefully chunk on
129	/// when sending out TCP data.
130	///
131	/// *note: `None` does not mean cat-dev sleep is disabled, that is controleld
132	/// through [`Self::fully_disable_chunk_override`]*.
133	chunk_override: Option<usize>,
134	/// If we should disable Combined Send+Recv.
135	disable_csr: bool,
136	/// If we should disable FFIO.
137	disable_ffio: bool,
138	/// If we should disable real removal of files. Prevents anything from being lost.
139	disable_real_removal: bool,
140	/// An override to fully disable any cat-dev sleeping to prevent issues.
141	fully_disable_cat_dev_sleep: bool,
142	/// An override to fully disable any TCP level chunking.
143	fully_disable_chunk_override: bool,
144	/// The host filesystem to treat when serving PCFS-SATA requests.
145	host_filesystem: HostFilesystem,
146	/// The explicit port to bind too if not the default.
147	port: Option<u16>,
148	/// A place to write a SATA WAL too.
149	sata_wal_location: Option<PathBuf>,
150	/// If we should trace all I/O when in debug mode.
151	trace_during_debug: bool,
152}
153
154impl PcfsSataServerBuilder {
155	/// Create a new builder for creating a PCFS-Sata Server.
156	#[must_use]
157	pub const fn new(host_filesystem: HostFilesystem) -> Self {
158		Self {
159			address: None,
160			cat_dev_sleep_override: None,
161			chunk_override: None,
162			disable_csr: false,
163			disable_ffio: false,
164			disable_real_removal: false,
165			fully_disable_cat_dev_sleep: false,
166			fully_disable_chunk_override: false,
167			host_filesystem,
168			port: None,
169			sata_wal_location: None,
170			trace_during_debug: false,
171		}
172	}
173
174	#[must_use]
175	pub const fn address(&self) -> Option<Ipv4Addr> {
176		self.address
177	}
178	#[must_use]
179	pub const fn set_address(mut self, new_address: Option<Ipv4Addr>) -> Self {
180		self.address = new_address;
181		self
182	}
183
184	#[must_use]
185	pub const fn cat_dev_sleep_override(&self) -> Option<Duration> {
186		self.cat_dev_sleep_override
187	}
188	#[must_use]
189	pub const fn set_cat_dev_sleep_override(mut self, new: Option<Duration>) -> Self {
190		self.cat_dev_sleep_override = new;
191		self
192	}
193
194	#[must_use]
195	pub const fn chunk_override(&self) -> Option<usize> {
196		self.chunk_override
197	}
198	#[must_use]
199	pub const fn set_chunk_override(mut self, chunk: Option<usize>) -> Self {
200		self.chunk_override = chunk;
201		self
202	}
203
204	#[must_use]
205	pub const fn disable_csr(&self) -> bool {
206		self.disable_csr
207	}
208	#[must_use]
209	pub const fn set_disable_csr(mut self, new: bool) -> Self {
210		self.disable_csr = new;
211		self
212	}
213
214	#[must_use]
215	pub const fn disable_ffio(&self) -> bool {
216		self.disable_ffio
217	}
218	#[must_use]
219	pub const fn set_disable_ffio(mut self, new: bool) -> Self {
220		self.disable_ffio = new;
221		self
222	}
223
224	#[must_use]
225	pub const fn disable_real_removal(&self) -> bool {
226		self.disable_real_removal
227	}
228	#[must_use]
229	pub const fn set_disable_real_removal(mut self, new: bool) -> Self {
230		self.disable_real_removal = new;
231		self
232	}
233
234	#[must_use]
235	pub const fn fully_disable_cat_dev_sleep(&self) -> bool {
236		self.fully_disable_cat_dev_sleep
237	}
238	#[must_use]
239	pub const fn set_fully_disable_cat_dev_sleep(mut self, new: bool) -> Self {
240		self.fully_disable_cat_dev_sleep = new;
241		self
242	}
243
244	#[must_use]
245	pub const fn fully_disable_chunk_override(&self) -> bool {
246		self.fully_disable_chunk_override
247	}
248	#[must_use]
249	pub const fn set_fully_disable_chunk_override(mut self, new: bool) -> Self {
250		self.fully_disable_chunk_override = new;
251		self
252	}
253
254	#[must_use]
255	pub const fn host_filesystem(&self) -> &HostFilesystem {
256		&self.host_filesystem
257	}
258	#[must_use]
259	pub fn set_host_filesystem(mut self, new: HostFilesystem) -> Self {
260		self.host_filesystem = new;
261		self
262	}
263
264	#[must_use]
265	pub const fn port(&self) -> Option<u16> {
266		self.port
267	}
268	#[must_use]
269	pub const fn set_port(mut self, new: Option<u16>) -> Self {
270		self.port = new;
271		self
272	}
273
274	#[must_use]
275	pub fn sata_wal_location(&self) -> Option<&PathBuf> {
276		self.sata_wal_location.as_ref()
277	}
278	#[must_use]
279	pub fn set_sata_wal_location(mut self, new_location: Option<PathBuf>) -> Self {
280		self.sata_wal_location = new_location;
281		self
282	}
283
284	#[must_use]
285	pub const fn trace_during_debug(&self) -> bool {
286		self.trace_during_debug
287	}
288	#[must_use]
289	pub const fn set_trace_during_debug(mut self, new: bool) -> Self {
290		self.trace_during_debug = new;
291		self
292	}
293
294	/// Create a TCP server that is capable of serving PCFS-SATA to a cat-dev
295	/// console.
296	///
297	/// ## Errors
298	///
299	/// If we cannot lookup the address to bind too, or there's been some
300	/// programming error, and we run into an API error calling an internal
301	/// api wrong.
302	#[allow(
303		// TODO(mythra): temporary while we determine if we can remove PCFS Load-bearing sleep.
304		// TODO(mythra): this will make it easier to revert.
305		clippy::manual_map,
306	)]
307	pub async fn build(self) -> Result<TCPServer<PcfsServerState>, CatBridgeError> {
308		let ip = self
309			.address
310			.or_else(|| {
311				// This always returns an ipv4 address, but is still returning
312				// an ip address for legacy reasons.
313				local_ip().ok().map(|ip| match ip {
314					IpAddr::V4(v4) => v4,
315					IpAddr::V6(_v6) => unreachable!(),
316				})
317			})
318			.ok_or(APIError::NoHostIpFound)?;
319		let bound_address = SocketAddrV4::new(ip, self.port.unwrap_or(DEFAULT_SATA_PORT));
320
321		let mut router = Router::<PcfsServerState>::new_with_offset(0x30);
322		router.add_route(&0x0_u32.to_be_bytes(), create_folder::handle_create_folder)?;
323		router.add_route(&0x1_u32.to_be_bytes(), open_folder::handle_open_folder)?;
324		router.add_route(&0x2_u32.to_be_bytes(), read_folder::handle_read_folder)?;
325		router.add_route(&0x3_u32.to_be_bytes(), rewind_folder::handle_rewind_folder)?;
326		router.add_route(&0x4_u32.to_be_bytes(), close_folder::handle_close_folder)?;
327		router.add_route(&0x5_u32.to_be_bytes(), open_file::handle_open_file)?;
328		router.add_route(&0x6_u32.to_be_bytes(), read_file::handle_read_file)?;
329		router.add_route(&0x7_u32.to_be_bytes(), write_file::handle_write_file)?;
330		router.add_route(
331			&0x8_u32.to_be_bytes(),
332			get_file_position::handle_get_file_position,
333		)?;
334		router.add_route(
335			&0x9_u32.to_be_bytes(),
336			set_file_position::handle_set_file_position,
337		)?;
338		router.add_route(&0xB_u32.to_be_bytes(), info_by_query::stat_fd)?;
339		router.add_route(&0xD_u32.to_be_bytes(), close_file::handle_close_file)?;
340		router.add_route(&0xE_u32.to_be_bytes(), remove::handle_removal)?;
341		router.add_route(&0xF_u32.to_be_bytes(), rename::handle_rename)?;
342		router.add_route(
343			&0x10_u32.to_be_bytes(),
344			info_by_query::handle_get_info_by_query,
345		)?;
346		router.add_route(&0x12_u32.to_be_bytes(), change_owner::handle_change_owner)?;
347		router.add_route(&0x13_u32.to_be_bytes(), change_mode::handle_change_mode)?;
348		router.add_route(&0x14_u32.to_be_bytes(), ping::handle_ping)?;
349		router.fallback_handler(unknown_packet_handler)?;
350
351		let mut server = TCPServer::new_with_state(
352			"pcfs-sata",
353			bound_address,
354			router,
355			(None, None),
356			NagleGuard::U32LengthPrefixed(Endianness::Big, Some(0x20)),
357			PcfsServerState::new(
358				self.disable_real_removal,
359				self.host_filesystem,
360				std::process::id(),
361			),
362			self.trace_during_debug,
363		)
364		.await?;
365		let wal = self
366			.sata_wal_location
367			.and_then(|path| WriteAheadLog::new(path).ok());
368
369		server.set_on_stream_begin(async move |event: ResponseStreamEvent<PcfsServerState>| {
370			let sid = event.stream_id();
371
372			_ = SATA_CONNECTION_FLAGS
373				.insert_async(
374					sid,
375					SataConnectionFlags::new_with_flags(!self.disable_ffio, !self.disable_csr),
376				)
377				.await;
378
379			Ok(true)
380		})?;
381		if let Some(w) = wal.as_ref() {
382			server.layer_on_stream_begin(WALBeginStreamLayer(w.clone()))?;
383		}
384		server.set_on_stream_end(on_sata_stream_end)?;
385		if let Some(w) = wal.as_ref() {
386			server.layer_on_stream_end(WALEndStreamLayer(w.clone()))?;
387		}
388
389		create_initial_server_layer(&mut server, wal, self.trace_during_debug);
390
391		server.set_chunk_output_at_size(if self.fully_disable_chunk_override {
392			None
393		} else if let Some(over_ride) = self.chunk_override {
394			Some(over_ride)
395		} else {
396			Some(DEFAULT_CAT_DEV_CHUNK_SIZE)
397		});
398		server.set_cat_dev_slowdown(if self.fully_disable_cat_dev_sleep {
399			None
400		} else if let Some(over_ride) = self.cat_dev_sleep_override {
401			Some(over_ride)
402		} else {
403			None
404		});
405
406		Ok(server)
407	}
408}
409
410const PCFS_SATA_SERVER_BUILDER_FIELDS: &[NamedField<'static>] = &[
411	NamedField::new("address"),
412	NamedField::new("cat_dev_sleep_override"),
413	NamedField::new("chunk_override"),
414	NamedField::new("disable_csr"),
415	NamedField::new("disable_ffio"),
416	NamedField::new("disable_real_removal"),
417	NamedField::new("fully_disable_cat_dev_sleep"),
418	NamedField::new("fully_disable_chunk_override"),
419	NamedField::new("host_filesystem"),
420	NamedField::new("port"),
421	NamedField::new("sata_wal_location"),
422	NamedField::new("trace_during_debug"),
423];
424
425impl Structable for PcfsSataServerBuilder {
426	fn definition(&self) -> StructDef<'_> {
427		StructDef::new_static(
428			"PcfsSataServerBuilder",
429			Fields::Named(PCFS_SATA_SERVER_BUILDER_FIELDS),
430		)
431	}
432}
433
434impl Valuable for PcfsSataServerBuilder {
435	fn as_value(&self) -> Value<'_> {
436		Value::Structable(self)
437	}
438
439	fn visit(&self, visitor: &mut dyn Visit) {
440		visitor.visit_named_fields(&NamedValues::new(
441			PCFS_SATA_SERVER_BUILDER_FIELDS,
442			&[
443				Valuable::as_value(
444					&self
445						.address
446						.map_or_else(|| "<none>".to_owned(), |ip| format!("{ip}")),
447				),
448				Valuable::as_value(
449					&self
450						.cat_dev_sleep_override
451						.map_or_else(|| "<none>".to_owned(), |dur| format!("{}s", dur.as_secs())),
452				),
453				Valuable::as_value(&self.chunk_override),
454				Valuable::as_value(&self.disable_csr),
455				Valuable::as_value(&self.disable_ffio),
456				Valuable::as_value(&self.disable_real_removal),
457				Valuable::as_value(&self.fully_disable_cat_dev_sleep),
458				Valuable::as_value(&self.fully_disable_chunk_override),
459				Valuable::as_value(&self.host_filesystem),
460				Valuable::as_value(&self.port),
461				Valuable::as_value(&self.sata_wal_location),
462				Valuable::as_value(&self.trace_during_debug),
463			],
464		));
465	}
466}
467
468fn create_initial_server_layer(
469	server: &mut TCPServer<PcfsServerState>,
470	mut wal: Option<WriteAheadLog>,
471	trace_during_debug: bool,
472) {
473	if let Some(w) = wal.take() {
474		if trace_during_debug {
475			server.layer_initial_service(
476				ServiceBuilder::new()
477					.layer(RequestIDLayer::new("sata".to_owned()))
478					.layer(StreamIDLayer)
479					.layer(SataConnectionFlagsLayer)
480					.layer(WALMessageLayer(w)),
481			);
482		} else {
483			server.layer_initial_service(
484				ServiceBuilder::new()
485					.layer(RequestIDLayer::new("sata".to_owned()))
486					.layer(SataConnectionFlagsLayer)
487					.layer(WALMessageLayer(w)),
488			);
489		}
490	} else if trace_during_debug {
491		server.layer_initial_service(
492			ServiceBuilder::new()
493				.layer(RequestIDLayer::new("sata".to_owned()))
494				.layer(StreamIDLayer)
495				.layer(SataConnectionFlagsLayer),
496		);
497	} else {
498		server.layer_initial_service(
499			ServiceBuilder::new()
500				.layer(RequestIDLayer::new("sata".to_owned()))
501				.layer(SataConnectionFlagsLayer),
502		);
503	}
504}
505
506async fn unknown_packet_handler(Body(request): Body<Bytes>) -> Response {
507	if let Ok(req) = SataRequest::<Bytes>::parse_opaque(request.clone()) {
508		warn!(
509			header = valuable(req.header()),
510			command_info = valuable(req.command_info()),
511			body = format!("{:02X?}", req.body()),
512			"Unknown Pcfs Sata packet!",
513		);
514	} else {
515		warn!(
516			packet = format!("{:02X?}", request),
517			"Unknown Unparsable Pcfs Sata Packet!",
518		);
519	}
520
521	Response::empty_close()
522}
523
524/// Gets called when an PCFS SATA stream ends.
525///
526/// This is where we actually 'cleanup' all the data related to the PCFS
527/// stream. For us this really just means clearing the stream id from the map
528/// of version/capability flags we're using.
529///
530/// ## Errors
531///
532/// This shouldn't ever error, but needs to make the signature pass to auto
533/// turn into a tower service.
534async fn on_sata_stream_end(
535	event: ResponseStreamEvent<PcfsServerState>,
536) -> Result<(), CatBridgeError> {
537	let sid = event.stream_id();
538	_ = SATA_CONNECTION_FLAGS.remove_async(&sid).await;
539	Ok(())
540}