cat_dev/fsemul/pcfs/sata/server/
mod.rs

1//! Server implementation for SATA Server.
2
3mod change_mode;
4mod change_owner;
5mod close_file;
6mod close_folder;
7pub mod connection_flags;
8mod create_folder;
9mod info_by_query;
10mod open_file;
11mod open_folder;
12mod ping;
13mod read_file;
14mod read_folder;
15mod remove;
16mod rename;
17mod rewind_folder;
18mod set_file_position;
19pub mod wal;
20mod write_file;
21
22use crate::{
23	errors::{APIError, CatBridgeError},
24	fsemul::{
25		HostFilesystem,
26		pcfs::sata::{
27			proto::SataRequest,
28			server::{
29				connection_flags::{
30					SATA_CONNECTION_FLAGS, SataConnectionFlags, SataConnectionFlagsLayer,
31				},
32				wal::{
33					WriteAheadLog,
34					layer::{WALBeginStreamLayer, WALEndStreamLayer, WALMessageLayer},
35				},
36			},
37		},
38	},
39	net::{
40		DEFAULT_CAT_DEV_CHUNK_SIZE, DEFAULT_CAT_DEV_SLOWDOWN,
41		additions::{RequestIDLayer, StreamIDLayer},
42		models::{Endianness, FromRef, NagleGuard, Response},
43		server::{Router, TCPServer, models::ResponseStreamEvent, requestable::Body},
44	},
45};
46use bytes::Bytes;
47use local_ip_address::local_ip;
48use std::{
49	net::{IpAddr, Ipv4Addr, SocketAddrV4},
50	path::PathBuf,
51	time::Duration,
52};
53use tower::ServiceBuilder;
54use tracing::{field::valuable, warn};
55use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit};
56
57/// The default port to use for hosting the SATA Server.
58pub const DEFAULT_SATA_PORT: u16 = 7500_u16;
59
60/// The 'state' of the server that is server wide.
61#[derive(Clone, Debug, Valuable)]
62pub struct PcfsServerState {
63	/// If we're not actually removing files, and preventing things from being removed.
64	disable_real_removal: bool,
65	/// The underlying host filesystem to write too.
66	host_filesystem: HostFilesystem,
67	/// Our process id.
68	pid: u32,
69}
70
71impl PcfsServerState {
72	#[must_use]
73	pub const fn new(
74		disable_real_removal: bool,
75		host_filesystem: HostFilesystem,
76		pid: u32,
77	) -> Self {
78		PcfsServerState {
79			disable_real_removal,
80			host_filesystem,
81			pid,
82		}
83	}
84
85	#[must_use]
86	pub const fn disable_real_removal(&self) -> bool {
87		self.disable_real_removal
88	}
89
90	#[must_use]
91	pub const fn host_filesystem(&self) -> &HostFilesystem {
92		&self.host_filesystem
93	}
94
95	#[must_use]
96	pub const fn pid(&self) -> u32 {
97		self.pid
98	}
99}
100
101impl FromRef<PcfsServerState> for HostFilesystem {
102	fn from_ref(input: &PcfsServerState) -> Self {
103		input.host_filesystem.clone()
104	}
105}
106
107impl FromRef<PcfsServerState> for u32 {
108	fn from_ref(input: &PcfsServerState) -> Self {
109		input.pid
110	}
111}
112
113#[allow(
114	// TODO(mythra): maybe refactor these into two variant enums.
115	clippy::struct_excessive_bools,
116)]
117#[derive(Clone, Debug)]
118pub struct PcfsSataServerBuilder {
119	/// The explicit address to bind too, otherwise a local ip will be fetched.
120	address: Option<Ipv4Addr>,
121	/// Allow overriding the amount of time that we wait to make sure everything
122	/// is safe for a cat-dev.
123	///
124	/// *note: `None` does not mean cat-dev sleep is disabled, that is controleld
125	/// through [`Self::fully_disable_cat_dev_sleep`]*.
126	cat_dev_sleep_override: Option<Duration>,
127	/// Override the chunking value, and how many bytes we'll purposefully chunk on
128	/// when sending out TCP data.
129	///
130	/// *note: `None` does not mean cat-dev sleep is disabled, that is controleld
131	/// through [`Self::fully_disable_chunk_override`]*.
132	chunk_override: Option<usize>,
133	/// If we should disable Combined Send+Recv.
134	disable_csr: bool,
135	/// If we should disable FFIO.
136	disable_ffio: bool,
137	/// If we should disable real removal of files. Prevents anything from being lost.
138	disable_real_removal: bool,
139	/// An override to fully disable any cat-dev sleeping to prevent issues.
140	fully_disable_cat_dev_sleep: bool,
141	/// An override to fully disable any TCP level chunking.
142	fully_disable_chunk_override: bool,
143	/// The host filesystem to treat when serving PCFS-SATA requests.
144	host_filesystem: HostFilesystem,
145	/// The explicit port to bind too if not the default.
146	port: Option<u16>,
147	/// A place to write a SATA WAL too.
148	sata_wal_location: Option<PathBuf>,
149	/// If we should trace all I/O when in debug mode.
150	trace_during_debug: bool,
151}
152
153impl PcfsSataServerBuilder {
154	/// Create a new builder for creating a PCFS-Sata Server.
155	#[must_use]
156	pub const fn new(host_filesystem: HostFilesystem) -> Self {
157		Self {
158			address: None,
159			cat_dev_sleep_override: None,
160			chunk_override: None,
161			disable_csr: false,
162			disable_ffio: false,
163			disable_real_removal: false,
164			fully_disable_cat_dev_sleep: false,
165			fully_disable_chunk_override: false,
166			host_filesystem,
167			port: None,
168			sata_wal_location: None,
169			trace_during_debug: false,
170		}
171	}
172
173	#[must_use]
174	pub const fn address(&self) -> Option<Ipv4Addr> {
175		self.address
176	}
177	#[must_use]
178	pub const fn set_address(mut self, new_address: Option<Ipv4Addr>) -> Self {
179		self.address = new_address;
180		self
181	}
182
183	#[must_use]
184	pub const fn cat_dev_sleep_override(&self) -> Option<Duration> {
185		self.cat_dev_sleep_override
186	}
187	#[must_use]
188	pub const fn set_cat_dev_sleep_override(mut self, new: Option<Duration>) -> Self {
189		self.cat_dev_sleep_override = new;
190		self
191	}
192
193	#[must_use]
194	pub const fn chunk_override(&self) -> Option<usize> {
195		self.chunk_override
196	}
197	#[must_use]
198	pub const fn set_chunk_override(mut self, chunk: Option<usize>) -> Self {
199		self.chunk_override = chunk;
200		self
201	}
202
203	#[must_use]
204	pub const fn disable_csr(&self) -> bool {
205		self.disable_csr
206	}
207	#[must_use]
208	pub const fn set_disable_csr(mut self, new: bool) -> Self {
209		self.disable_csr = new;
210		self
211	}
212
213	#[must_use]
214	pub const fn disable_ffio(&self) -> bool {
215		self.disable_ffio
216	}
217	#[must_use]
218	pub const fn set_disable_ffio(mut self, new: bool) -> Self {
219		self.disable_ffio = new;
220		self
221	}
222
223	#[must_use]
224	pub const fn disable_real_removal(&self) -> bool {
225		self.disable_real_removal
226	}
227	#[must_use]
228	pub const fn set_disable_real_removal(mut self, new: bool) -> Self {
229		self.disable_real_removal = new;
230		self
231	}
232
233	#[must_use]
234	pub const fn fully_disable_cat_dev_sleep(&self) -> bool {
235		self.fully_disable_cat_dev_sleep
236	}
237	#[must_use]
238	pub const fn set_fully_disable_cat_dev_sleep(mut self, new: bool) -> Self {
239		self.fully_disable_cat_dev_sleep = new;
240		self
241	}
242
243	#[must_use]
244	pub const fn fully_disable_chunk_override(&self) -> bool {
245		self.fully_disable_chunk_override
246	}
247	#[must_use]
248	pub const fn set_fully_disable_chunk_override(mut self, new: bool) -> Self {
249		self.fully_disable_chunk_override = new;
250		self
251	}
252
253	#[must_use]
254	pub const fn host_filesystem(&self) -> &HostFilesystem {
255		&self.host_filesystem
256	}
257	#[must_use]
258	pub fn set_host_filesystem(mut self, new: HostFilesystem) -> Self {
259		self.host_filesystem = new;
260		self
261	}
262
263	#[must_use]
264	pub const fn port(&self) -> Option<u16> {
265		self.port
266	}
267	#[must_use]
268	pub const fn set_port(mut self, new: Option<u16>) -> Self {
269		self.port = new;
270		self
271	}
272
273	#[must_use]
274	pub fn sata_wal_location(&self) -> Option<&PathBuf> {
275		self.sata_wal_location.as_ref()
276	}
277	#[must_use]
278	pub fn set_sata_wal_location(mut self, new_location: Option<PathBuf>) -> Self {
279		self.sata_wal_location = new_location;
280		self
281	}
282
283	#[must_use]
284	pub const fn trace_during_debug(&self) -> bool {
285		self.trace_during_debug
286	}
287	#[must_use]
288	pub const fn set_trace_during_debug(mut self, new: bool) -> Self {
289		self.trace_during_debug = new;
290		self
291	}
292
293	/// Create a TCP server that is capable of serving PCFS-SATA to a cat-dev
294	/// console.
295	///
296	/// ## Errors
297	///
298	/// If we cannot lookup the address to bind too, or there's been some
299	/// programming error, and we run into an API error calling an internal
300	/// api wrong.
301	pub async fn build(self) -> Result<TCPServer<PcfsServerState>, CatBridgeError> {
302		let ip = self
303			.address
304			.or_else(|| {
305				// This always returns an ipv4 address, but is still returning
306				// an ip address for legacy reasons.
307				local_ip().ok().map(|ip| match ip {
308					IpAddr::V4(v4) => v4,
309					IpAddr::V6(_v6) => unreachable!(),
310				})
311			})
312			.ok_or(APIError::NoHostIpFound)?;
313		let bound_address = SocketAddrV4::new(ip, self.port.unwrap_or(DEFAULT_SATA_PORT));
314
315		let mut router = Router::<PcfsServerState>::new_with_offset(0x30);
316		router.add_route(&0x0_u32.to_be_bytes(), create_folder::handle_create_folder)?;
317		router.add_route(&0x1_u32.to_be_bytes(), open_folder::handle_open_folder)?;
318		router.add_route(&0x2_u32.to_be_bytes(), read_folder::handle_read_folder)?;
319		router.add_route(&0x3_u32.to_be_bytes(), rewind_folder::handle_rewind_folder)?;
320		router.add_route(&0x4_u32.to_be_bytes(), close_folder::handle_close_folder)?;
321		router.add_route(&0x5_u32.to_be_bytes(), open_file::handle_open_file)?;
322		router.add_route(&0x6_u32.to_be_bytes(), read_file::handle_read_file)?;
323		router.add_route(&0x7_u32.to_be_bytes(), write_file::handle_write_file)?;
324		router.add_route(
325			&0x9_u32.to_be_bytes(),
326			set_file_position::handle_set_file_position,
327		)?;
328		router.add_route(&0xB_u32.to_be_bytes(), info_by_query::stat_fd)?;
329		router.add_route(&0xD_u32.to_be_bytes(), close_file::handle_close_file)?;
330		router.add_route(&0xE_u32.to_be_bytes(), remove::handle_removal)?;
331		router.add_route(&0xF_u32.to_be_bytes(), rename::handle_rename)?;
332		router.add_route(
333			&0x10_u32.to_be_bytes(),
334			info_by_query::handle_get_info_by_query,
335		)?;
336		router.add_route(&0x12_u32.to_be_bytes(), change_owner::handle_change_owner)?;
337		router.add_route(&0x13_u32.to_be_bytes(), change_mode::handle_change_mode)?;
338		router.add_route(&0x14_u32.to_be_bytes(), ping::handle_ping)?;
339		router.fallback_handler(unknown_packet_handler)?;
340
341		let mut server = TCPServer::new_with_state(
342			"pcfs-sata",
343			bound_address,
344			router,
345			(None, None),
346			NagleGuard::U32LengthPrefixed(Endianness::Big, Some(0x20)),
347			PcfsServerState::new(
348				self.disable_real_removal,
349				self.host_filesystem,
350				std::process::id(),
351			),
352			self.trace_during_debug,
353		)
354		.await?;
355		let wal = self
356			.sata_wal_location
357			.and_then(|path| WriteAheadLog::new(path).ok());
358
359		server.set_on_stream_begin(async move |event: ResponseStreamEvent<PcfsServerState>| {
360			let sid = event.stream_id();
361
362			_ = SATA_CONNECTION_FLAGS
363				.insert_async(
364					sid,
365					SataConnectionFlags::new_with_flags(!self.disable_ffio, !self.disable_csr),
366				)
367				.await;
368
369			Ok(true)
370		})?;
371		if let Some(w) = wal.as_ref() {
372			server.layer_on_stream_begin(WALBeginStreamLayer(w.clone()))?;
373		}
374		server.set_on_stream_end(on_sata_stream_end)?;
375		if let Some(w) = wal.as_ref() {
376			server.layer_on_stream_end(WALEndStreamLayer(w.clone()))?;
377		}
378
379		create_initial_server_layer(&mut server, wal, self.trace_during_debug);
380
381		server.set_chunk_output_at_size(if self.fully_disable_chunk_override {
382			None
383		} else if let Some(over_ride) = self.chunk_override {
384			Some(over_ride)
385		} else {
386			Some(DEFAULT_CAT_DEV_CHUNK_SIZE)
387		});
388		server.set_cat_dev_slowdown(if self.fully_disable_cat_dev_sleep {
389			None
390		} else if let Some(over_ride) = self.cat_dev_sleep_override {
391			Some(over_ride)
392		} else {
393			Some(DEFAULT_CAT_DEV_SLOWDOWN)
394		});
395
396		Ok(server)
397	}
398}
399
400const PCFS_SATA_SERVER_BUILDER_FIELDS: &[NamedField<'static>] = &[
401	NamedField::new("address"),
402	NamedField::new("cat_dev_sleep_override"),
403	NamedField::new("chunk_override"),
404	NamedField::new("disable_csr"),
405	NamedField::new("disable_ffio"),
406	NamedField::new("disable_real_removal"),
407	NamedField::new("fully_disable_cat_dev_sleep"),
408	NamedField::new("fully_disable_chunk_override"),
409	NamedField::new("host_filesystem"),
410	NamedField::new("port"),
411	NamedField::new("sata_wal_location"),
412	NamedField::new("trace_during_debug"),
413];
414
415impl Structable for PcfsSataServerBuilder {
416	fn definition(&self) -> StructDef<'_> {
417		StructDef::new_static(
418			"PcfsSataServerBuilder",
419			Fields::Named(PCFS_SATA_SERVER_BUILDER_FIELDS),
420		)
421	}
422}
423
424impl Valuable for PcfsSataServerBuilder {
425	fn as_value(&self) -> Value<'_> {
426		Value::Structable(self)
427	}
428
429	fn visit(&self, visitor: &mut dyn Visit) {
430		visitor.visit_named_fields(&NamedValues::new(
431			PCFS_SATA_SERVER_BUILDER_FIELDS,
432			&[
433				Valuable::as_value(
434					&self
435						.address
436						.map_or_else(|| "<none>".to_owned(), |ip| format!("{ip}")),
437				),
438				Valuable::as_value(
439					&self
440						.cat_dev_sleep_override
441						.map_or_else(|| "<none>".to_owned(), |dur| format!("{}s", dur.as_secs())),
442				),
443				Valuable::as_value(&self.chunk_override),
444				Valuable::as_value(&self.disable_csr),
445				Valuable::as_value(&self.disable_ffio),
446				Valuable::as_value(&self.disable_real_removal),
447				Valuable::as_value(&self.fully_disable_cat_dev_sleep),
448				Valuable::as_value(&self.fully_disable_chunk_override),
449				Valuable::as_value(&self.host_filesystem),
450				Valuable::as_value(&self.port),
451				Valuable::as_value(&self.sata_wal_location),
452				Valuable::as_value(&self.trace_during_debug),
453			],
454		));
455	}
456}
457
458fn create_initial_server_layer(
459	server: &mut TCPServer<PcfsServerState>,
460	mut wal: Option<WriteAheadLog>,
461	trace_during_debug: bool,
462) {
463	if let Some(w) = wal.take() {
464		if trace_during_debug {
465			server.layer_initial_service(
466				ServiceBuilder::new()
467					.layer(RequestIDLayer::new("sata".to_owned()))
468					.layer(StreamIDLayer)
469					.layer(SataConnectionFlagsLayer)
470					.layer(WALMessageLayer(w)),
471			);
472		} else {
473			server.layer_initial_service(
474				ServiceBuilder::new()
475					.layer(RequestIDLayer::new("sata".to_owned()))
476					.layer(SataConnectionFlagsLayer)
477					.layer(WALMessageLayer(w)),
478			);
479		}
480	} else if trace_during_debug {
481		server.layer_initial_service(
482			ServiceBuilder::new()
483				.layer(RequestIDLayer::new("sata".to_owned()))
484				.layer(StreamIDLayer)
485				.layer(SataConnectionFlagsLayer),
486		);
487	} else {
488		server.layer_initial_service(
489			ServiceBuilder::new()
490				.layer(RequestIDLayer::new("sata".to_owned()))
491				.layer(SataConnectionFlagsLayer),
492		);
493	}
494}
495
496async fn unknown_packet_handler(Body(request): Body<Bytes>) -> Response {
497	if let Ok(req) = SataRequest::<Bytes>::parse_opaque(request.clone()) {
498		warn!(
499			header = valuable(req.header()),
500			command_info = valuable(req.command_info()),
501			body = format!("{:02X?}", req.body()),
502			"Unknown Pcfs Sata packet!",
503		);
504	} else {
505		warn!(
506			packet = format!("{:02X?}", request),
507			"Unknown Unparsable Pcfs Sata Packet!",
508		);
509	}
510
511	Response::empty_close()
512}
513
514/// Gets called when an PCFS SATA stream ends.
515///
516/// This is where we actually 'cleanup' all the data related to the PCFS
517/// stream. For us this really just means clearing the stream id from the map
518/// of version/capability flags we're using.
519///
520/// ## Errors
521///
522/// This shouldn't ever error, but needs to make the signature pass to auto
523/// turn into a tower service.
524async fn on_sata_stream_end(
525	event: ResponseStreamEvent<PcfsServerState>,
526) -> Result<(), CatBridgeError> {
527	let sid = event.stream_id();
528	_ = SATA_CONNECTION_FLAGS.remove_async(&sid).await;
529	Ok(())
530}