cat_dev/fsemul/pcfs/
mod.rs

1//! Code for "PCFS", or the actual serving of a filesystem for the cat-dev.
2//!
3//! PCFS is the most common thing users are 'expecting' to interact with. It
4//! provides a full filesystem to the cat-dev. The easiest way to think about
5//! it might be like `FSEmul` provides access to raw block data, while `PCFS`
6//! provides a filesystem ontop of that block data.
7//!
8//! It's main protocols implement things like `CreateDirectory`, `OpenFile`,
9//! etc.
10
11pub mod errors;
12pub mod sata_proto;
13
14use crate::{
15	errors::{APIError, CatBridgeError, NetworkError},
16	fsemul::{
17		pcfs::sata_proto::{
18			SataGetInfoByQueryPacketBody, SataProtoChunker, SataRequest, SataRequestBody,
19		},
20		HostFilesystem,
21	},
22};
23use futures::{SinkExt, StreamExt};
24use local_ip_address::local_ip;
25use std::{
26	net::{IpAddr, Ipv4Addr, SocketAddrV4},
27	sync::{atomic::AtomicUsize, Arc},
28};
29use tokio::{
30	net::{TcpListener, TcpStream},
31	task::Builder as TaskBuilder,
32};
33use tokio_util::codec::Framed;
34use tracing::{debug, error, error_span, field::valuable, Instrument};
35use valuable::Valuable;
36
37/// The default port to use for hosting the SATA Server.
38pub const DEFAULT_PCFS_OVER_SATA_PORT: u16 = 7500_u16;
39
40/// An implementation of a PCFS server speaking the Sata over PCFS protocol.
41#[derive(Debug)]
42pub struct PCFSSataServer<'fs> {
43	/// The address we're actively bound and listening on.
44	bound_address: SocketAddrV4,
45	/// Disable actually removing files from the filesystem.
46	disable_real_removal: bool,
47	/// A pointer to our integration with the host filesystem.
48	///
49	/// This let's us read data from the cafe directory and otherwise.
50	host_filesystem: &'fs HostFilesystem,
51	/// The listener to serve traffic on.
52	server: TcpListener,
53	/// If our SATA server should support FFIO (fast file I/O).
54	///
55	/// I'm gonna be honest I don't know what the difference here is yet.
56	should_support_ffio: bool,
57	/// If our SATA server should support Combined Send+Recv.
58	///
59	/// Combined Send/Recv presumably allows us to send data, and receive some
60	/// in the same paccket. However, I haven't fully reverse engineered the
61	/// difference between these two.
62	should_support_csr: bool,
63}
64
65impl<'fs> PCFSSataServer<'fs> {
66	/// Create and bind a new PCFS Sata Server to a port on an address.
67	///
68	/// ## Errors
69	///
70	/// If we cannot bind to the request host ip address.
71	#[allow(
72		// I think the overhead of converting here isn't worth it, but maybe one
73		// day it'd be good to change to two variant enums.
74		clippy::fn_params_excessive_bools,
75	)]
76	pub async fn new(
77		host_filesystem: &'fs HostFilesystem,
78		address: Option<Ipv4Addr>,
79		port: Option<u16>,
80		disable_real_removal: bool,
81		should_support_ffio: bool,
82		should_support_csr: bool,
83	) -> Result<Self, CatBridgeError> {
84		let Some(ip) = address.or_else(|| {
85			// This always returns an ipv4 address, but is still returning
86			// an ip address for legacy reasons.
87			local_ip().ok().map(|ip| match ip {
88				IpAddr::V4(v4) => v4,
89				IpAddr::V6(_v6) => unreachable!(),
90			})
91		}) else {
92			return Err(APIError::NoHostIpFound.into());
93		};
94
95		let bound_address = SocketAddrV4::new(ip, port.unwrap_or(DEFAULT_PCFS_OVER_SATA_PORT));
96		let server = TcpListener::bind(bound_address)
97			.await
98			.map_err(NetworkError::IO)?;
99
100		Ok(Self {
101			bound_address,
102			disable_real_removal,
103			host_filesystem,
104			server,
105			should_support_ffio,
106			should_support_csr,
107		})
108	}
109
110	/// If our server supports FFIO.
111	#[must_use]
112	pub const fn supports_ffio(&self) -> bool {
113		self.should_support_ffio
114	}
115
116	/// If our server supports combined send+recv.
117	#[must_use]
118	pub const fn supports_combined_send_and_recv(&self) -> bool {
119		self.should_support_csr
120	}
121
122	/// Get the port that the sata server will use.
123	#[must_use]
124	pub const fn port(&self) -> u16 {
125		self.bound_address.port()
126	}
127
128	/// Actually end up serving connections to clients.
129	///
130	/// This will continue serving for as long as it is able. Although it doesn't
131	/// do this as 'effeciently' because it cannot server multiple clients
132	/// concurrently itself.
133	///
134	/// In order to do this, we would need a `'static` lifetime'd host filesystem
135	/// which this method works without even `'static` filesystems. If you do
136	/// have a `'static` host filesystem, you should prefer to use the method
137	/// [`PCFSSataServer::serve_concurrently`] to more effeciently serve many
138	/// clients at once.
139	pub async fn serve(self) {
140		let host_filesystem = self.host_filesystem;
141
142		loop {
143			match self.server.accept().await {
144				Ok((stream, address)) => {
145					let client = address;
146					let bound = self.bound_address;
147					if let Err(cause) = stream.set_nodelay(true) {
148						error!(
149						  ?cause,
150						  server.address = %bound,
151						  client.address = %client,
152						  "Failed to disable NAGLE on connection, disabling...",
153						);
154						continue;
155					}
156
157					let result = Self::serve_connection(
158						host_filesystem,
159						stream,
160						self.disable_real_removal,
161						self.should_support_ffio,
162						self.should_support_csr,
163					)
164					.instrument(error_span!(
165					  "cat_dev::fsemul::pcfs::sata::serve_connection",
166					  server.address = %bound,
167					  client.address = %client,
168					))
169					.await;
170
171					if let Err(cause) = result {
172						error!(
173						  ?cause,
174						  server.address = %bound,
175						  client.address = %client,
176						  "Failed to actually handle packets from client SATA connection.",
177						);
178					}
179				}
180				Err(cause) => {
181					error!(
182					  ?cause,
183					  server.address = %self.bound_address,
184					  "Failed to accept PCFS SATA Connection from client, cannot serve.",
185					);
186				}
187			}
188		}
189	}
190
191	/// Serve a connection non-concurrently (E.g. when not dealing with a static
192	/// filesystem).
193	#[allow(
194		// This is okay as most of the logic outside of this function.
195		clippy::too_many_lines,
196		// I think the overhead of converting here isn't worth it, but maybe one
197		// day it'd be good to change to two variant enums.
198		clippy::fn_params_excessive_bools,
199	)]
200	async fn serve_connection(
201		host_filesystem: &'fs HostFilesystem,
202		connection: TcpStream,
203		disable_real_removal: bool,
204		mut supports_ffio: bool,
205		mut supports_csr: bool,
206	) -> Result<(), CatBridgeError> {
207		connection.set_nodelay(true).map_err(NetworkError::IO)?;
208		let bypass_buff_to_read = Arc::new(AtomicUsize::new(0));
209		let (mut sink, mut stream) =
210			Framed::new(connection, SataProtoChunker(bypass_buff_to_read.clone())).split();
211		let mut first_packet = true;
212
213		loop {
214			while let Some(result) = stream.next().await {
215				let packet = result.map_err(NetworkError::IO)?.freeze();
216				let parsed_packet = SataRequest::try_from(packet)?;
217				parsed_packet.header().ensure_not_from_host()?;
218				if first_packet {
219					let flags = SataCapabilitiesFlags(parsed_packet.header().flags());
220					if !flags.intersects(SataCapabilitiesFlags::FAST_FILE_IO_SUPPORTED) {
221						debug!(
222							flags = valuable(&flags),
223							"Disabling FFIO because first packet header requested it..."
224						);
225						supports_ffio = false;
226					}
227					if !flags.intersects(SataCapabilitiesFlags::COMBINED_SEND_RECV_SUPPORTED) {
228						debug!(
229							flags = valuable(&flags),
230							"Disabling CSR because first packet header requested it..."
231						);
232						supports_csr = false;
233					}
234				}
235				first_packet = false;
236
237				debug!("{}", parsed_packet.command_info());
238				match parsed_packet.body() {
239					SataRequestBody::ChangeMode(ref mode) => {
240						sink.send(mode.handle(parsed_packet.header(), host_filesystem)?)
241							.await
242							.map_err(NetworkError::IO)?;
243					}
244					SataRequestBody::ChangeOwner(ref co) => {
245						sink.send(co.handle(parsed_packet.header())?)
246							.await
247							.map_err(NetworkError::IO)?;
248					}
249					SataRequestBody::CloseFile(ref cf) => {
250						sink.send(cf.handle(parsed_packet.header(), host_filesystem).await?)
251							.await
252							.map_err(NetworkError::IO)?;
253					}
254					SataRequestBody::CloseFolder(ref cf) => {
255						sink.send(cf.handle(parsed_packet.header(), host_filesystem).await?)
256							.await
257							.map_err(NetworkError::IO)?;
258					}
259					SataRequestBody::CreateDirectory(ref cd) => {
260						sink.send(cd.handle(parsed_packet.header(), host_filesystem).await?)
261							.await
262							.map_err(NetworkError::IO)?;
263					}
264					SataRequestBody::GetInfoByQuery(ref info) => {
265						sink.send(info.handle(parsed_packet.header(), host_filesystem).await?)
266							.await
267							.map_err(NetworkError::IO)?;
268					}
269					SataRequestBody::OpenFile(ref file) => {
270						sink.send(
271							file.handle(
272								parsed_packet.header(),
273								parsed_packet.command_info(),
274								host_filesystem,
275							)
276							.await?,
277						)
278						.await
279						.map_err(NetworkError::IO)?;
280					}
281					SataRequestBody::OpenFolder(ref folder) => {
282						sink.send(
283							folder
284								.handle(parsed_packet.header(), host_filesystem)
285								.await?,
286						)
287						.await
288						.map_err(NetworkError::IO)?;
289					}
290					SataRequestBody::Ping(ref ping) => {
291						debug!(
292							client.packet.header = valuable(parsed_packet.header()),
293							client.packet.command_info = valuable(parsed_packet.command_info()),
294							client.packet.body = valuable(ping),
295							"received ping packet from client",
296						);
297
298						sink.send(ping.handle(
299							parsed_packet.header(),
300							parsed_packet.command_info(),
301							supports_ffio,
302							supports_csr,
303						)?)
304						.await
305						.map_err(NetworkError::IO)?;
306					}
307					SataRequestBody::ReadFile(ref rfr) => {
308						sink.send(
309							rfr.handle(parsed_packet.header(), host_filesystem, supports_ffio)
310								.await?,
311						)
312						.await
313						.map_err(NetworkError::IO)?;
314					}
315					SataRequestBody::ReadDirectory(ref rd) => {
316						sink.send(rd.handle(parsed_packet.header(), host_filesystem).await?)
317							.await
318							.map_err(NetworkError::IO)?;
319					}
320					SataRequestBody::Remove(ref rm) => {
321						sink.send(
322							rm.handle(
323								parsed_packet.header(),
324								!disable_real_removal,
325								host_filesystem,
326							)
327							.await?,
328						)
329						.await
330						.map_err(NetworkError::IO)?;
331					}
332					SataRequestBody::Rewind(ref rewind) => {
333						sink.send(
334							rewind
335								.handle(parsed_packet.header(), host_filesystem)
336								.await?,
337						)
338						.await
339						.map_err(NetworkError::IO)?;
340					}
341					SataRequestBody::StatFile(ref st) => {
342						sink.send(
343							SataGetInfoByQueryPacketBody::stat_fd(
344								parsed_packet.header(),
345								host_filesystem,
346								st.file_descriptor(),
347							)
348							.await?,
349						)
350						.await
351						.map_err(NetworkError::IO)?;
352					}
353					SataRequestBody::WriteFile(ref wf) => {
354						sink.send(
355							wf.handle(
356								parsed_packet.header(),
357								host_filesystem,
358								supports_ffio,
359								&mut stream,
360								&bypass_buff_to_read,
361							)
362							.await?,
363						)
364						.await
365						.map_err(NetworkError::IO)?;
366					}
367				}
368			}
369		}
370	}
371}
372
373impl PCFSSataServer<'static> {
374	/// Actually end up serving connections to clients.
375	///
376	/// This will continue serving for as long as it is able. This is the
377	/// significantly more efficient than just [`PCFSSataServer::serve`] as we can
378	/// confirm that host filesystem will ive long enough for all of our
379	/// connections.
380	pub async fn serve_concurrently(self) {
381		let host_filesystem: &'static HostFilesystem = self.host_filesystem;
382
383		loop {
384			match self.server.accept().await {
385				Ok((stream, address)) => {
386					let client = address;
387					let bound = self.bound_address;
388					let spawn_result = TaskBuilder::new()
389						.name("cat_dev::fsemul::pcfs::sata::serve_client_connection_concurrently")
390						.spawn(async move {
391							let result = Self::serve_connection_concurrently(
392								host_filesystem,
393								stream,
394								self.disable_real_removal,
395								self.should_support_ffio,
396								self.should_support_csr,
397							)
398							.instrument(error_span!(
399							  "cat_dev::fsemul::pcfs::sata::serve_connection_concurrently",
400							  server.address = %bound,
401							  client.address = %client,
402							))
403							.await;
404
405							if let Err(cause) = result {
406								error!(
407								  ?cause,
408								  server.address = %bound,
409								  client.address = %client,
410								  "Failed to actually handle packets from client connection",
411								);
412							}
413						});
414
415					if let Err(cause) = spawn_result {
416						error!(
417						  ?cause,
418						  server.address = %self.bound_address,
419						  client.address = %address,
420						  "Failed to spawn handler for PCFS Sata Connection, cannot serve task.",
421						);
422					}
423				}
424				Err(cause) => {
425					error!(
426					  ?cause,
427					  server.address = %self.bound_address,
428					  "Failed to accept PCFS SATA Connection from client, cannot serve itself.",
429					);
430				}
431			}
432		}
433	}
434
435	#[allow(
436		// This is okay as most of the logic outside of this function.
437		clippy::too_many_lines,
438		// Refactor one day maybe.
439		clippy::fn_params_excessive_bools,
440	)]
441	async fn serve_connection_concurrently(
442		host_filesystem: &'static HostFilesystem,
443		connection: TcpStream,
444		disable_real_removal: bool,
445		mut supports_ffio: bool,
446		mut supports_csr: bool,
447	) -> Result<(), CatBridgeError> {
448		connection.set_nodelay(true).map_err(NetworkError::IO)?;
449		let bypass_buff_to_read = Arc::new(AtomicUsize::new(0));
450		let (mut sink, mut stream) =
451			Framed::new(connection, SataProtoChunker(bypass_buff_to_read.clone())).split();
452		let mut first_packet = true;
453
454		loop {
455			while let Some(result) = stream.next().await {
456				let packet = result.map_err(NetworkError::IO)?.freeze();
457				let parsed_packet = SataRequest::try_from(packet)?;
458				parsed_packet.header().ensure_not_from_host()?;
459				if first_packet {
460					let flags = SataCapabilitiesFlags(parsed_packet.header().flags());
461					if !flags.intersects(SataCapabilitiesFlags::FAST_FILE_IO_SUPPORTED) {
462						supports_ffio = false;
463					}
464					if !flags.intersects(SataCapabilitiesFlags::COMBINED_SEND_RECV_SUPPORTED) {
465						supports_csr = false;
466					}
467				}
468				first_packet = false;
469
470				debug!("{}", parsed_packet.command_info());
471				match parsed_packet.body() {
472					SataRequestBody::ChangeMode(ref mode) => {
473						sink.send(mode.handle(parsed_packet.header(), host_filesystem)?)
474							.await
475							.map_err(NetworkError::IO)?;
476					}
477					SataRequestBody::ChangeOwner(ref co) => {
478						sink.send(co.handle(parsed_packet.header())?)
479							.await
480							.map_err(NetworkError::IO)?;
481					}
482					SataRequestBody::CloseFile(ref cf) => {
483						sink.send(cf.handle(parsed_packet.header(), host_filesystem).await?)
484							.await
485							.map_err(NetworkError::IO)?;
486					}
487					SataRequestBody::CloseFolder(ref cf) => {
488						sink.send(cf.handle(parsed_packet.header(), host_filesystem).await?)
489							.await
490							.map_err(NetworkError::IO)?;
491					}
492					SataRequestBody::CreateDirectory(ref cd) => {
493						sink.send(cd.handle(parsed_packet.header(), host_filesystem).await?)
494							.await
495							.map_err(NetworkError::IO)?;
496					}
497					SataRequestBody::GetInfoByQuery(ref info) => {
498						sink.send(info.handle(parsed_packet.header(), host_filesystem).await?)
499							.await
500							.map_err(NetworkError::IO)?;
501					}
502					SataRequestBody::OpenFile(ref file) => {
503						sink.send(
504							file.handle(
505								parsed_packet.header(),
506								parsed_packet.command_info(),
507								host_filesystem,
508							)
509							.await?,
510						)
511						.await
512						.map_err(NetworkError::IO)?;
513					}
514					SataRequestBody::OpenFolder(ref folder) => {
515						sink.send(
516							folder
517								.handle(parsed_packet.header(), host_filesystem)
518								.await?,
519						)
520						.await
521						.map_err(NetworkError::IO)?;
522					}
523					SataRequestBody::Ping(ref ping) => {
524						debug!(
525							client.packet.header = valuable(parsed_packet.header()),
526							client.packet.command_info = valuable(parsed_packet.command_info()),
527							client.packet.body = valuable(ping),
528							"received ping packet from client",
529						);
530
531						sink.send(ping.handle(
532							parsed_packet.header(),
533							parsed_packet.command_info(),
534							supports_ffio,
535							supports_csr,
536						)?)
537						.await
538						.map_err(NetworkError::IO)?;
539					}
540					SataRequestBody::ReadFile(ref rfr) => {
541						sink.send(
542							rfr.handle(parsed_packet.header(), host_filesystem, supports_ffio)
543								.await?,
544						)
545						.await
546						.map_err(NetworkError::IO)?;
547					}
548					SataRequestBody::ReadDirectory(ref rd) => {
549						sink.send(rd.handle(parsed_packet.header(), host_filesystem).await?)
550							.await
551							.map_err(NetworkError::IO)?;
552					}
553					SataRequestBody::Remove(ref rm) => {
554						sink.send(
555							rm.handle(
556								parsed_packet.header(),
557								!disable_real_removal,
558								host_filesystem,
559							)
560							.await?,
561						)
562						.await
563						.map_err(NetworkError::IO)?;
564					}
565					SataRequestBody::Rewind(ref rewind) => {
566						sink.send(
567							rewind
568								.handle(parsed_packet.header(), host_filesystem)
569								.await?,
570						)
571						.await
572						.map_err(NetworkError::IO)?;
573					}
574					SataRequestBody::StatFile(ref st) => {
575						sink.send(
576							SataGetInfoByQueryPacketBody::stat_fd(
577								parsed_packet.header(),
578								host_filesystem,
579								st.file_descriptor(),
580							)
581							.await?,
582						)
583						.await
584						.map_err(NetworkError::IO)?;
585					}
586					SataRequestBody::WriteFile(ref wf) => {
587						sink.send(
588							wf.handle(
589								parsed_packet.header(),
590								host_filesystem,
591								supports_ffio,
592								&mut stream,
593								&bypass_buff_to_read,
594							)
595							.await?,
596						)
597						.await
598						.map_err(NetworkError::IO)?;
599					}
600				}
601			}
602		}
603	}
604}
605
606#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Valuable)]
607pub struct SataCapabilitiesFlags(pub u32);
608
609bitflags::bitflags! {
610	impl SataCapabilitiesFlags: u32 {
611		const FAST_FILE_IO_SUPPORTED = 0b0000_0010;
612		const COMBINED_SEND_RECV_SUPPORTED = 0b0000_0100;
613	}
614}