1pub mod errors;
12pub mod sata_proto;
13
14use crate::{
15 errors::{APIError, CatBridgeError, NetworkError},
16 fsemul::{
17 pcfs::sata_proto::{
18 SataGetInfoByQueryPacketBody, SataProtoChunker, SataRequest, SataRequestBody,
19 },
20 HostFilesystem,
21 },
22};
23use futures::{SinkExt, StreamExt};
24use local_ip_address::local_ip;
25use std::{
26 net::{IpAddr, Ipv4Addr, SocketAddrV4},
27 sync::{atomic::AtomicUsize, Arc},
28};
29use tokio::{
30 net::{TcpListener, TcpStream},
31 task::Builder as TaskBuilder,
32};
33use tokio_util::codec::Framed;
34use tracing::{debug, error, error_span, field::valuable, Instrument};
35use valuable::Valuable;
36
37pub const DEFAULT_PCFS_OVER_SATA_PORT: u16 = 7500_u16;
39
40#[derive(Debug)]
42pub struct PCFSSataServer<'fs> {
43 bound_address: SocketAddrV4,
45 disable_real_removal: bool,
47 host_filesystem: &'fs HostFilesystem,
51 server: TcpListener,
53 should_support_ffio: bool,
57 should_support_csr: bool,
63}
64
65impl<'fs> PCFSSataServer<'fs> {
66 #[allow(
72 clippy::fn_params_excessive_bools,
75 )]
76 pub async fn new(
77 host_filesystem: &'fs HostFilesystem,
78 address: Option<Ipv4Addr>,
79 port: Option<u16>,
80 disable_real_removal: bool,
81 should_support_ffio: bool,
82 should_support_csr: bool,
83 ) -> Result<Self, CatBridgeError> {
84 let Some(ip) = address.or_else(|| {
85 local_ip().ok().map(|ip| match ip {
88 IpAddr::V4(v4) => v4,
89 IpAddr::V6(_v6) => unreachable!(),
90 })
91 }) else {
92 return Err(APIError::NoHostIpFound.into());
93 };
94
95 let bound_address = SocketAddrV4::new(ip, port.unwrap_or(DEFAULT_PCFS_OVER_SATA_PORT));
96 let server = TcpListener::bind(bound_address)
97 .await
98 .map_err(NetworkError::IO)?;
99
100 Ok(Self {
101 bound_address,
102 disable_real_removal,
103 host_filesystem,
104 server,
105 should_support_ffio,
106 should_support_csr,
107 })
108 }
109
110 #[must_use]
112 pub const fn supports_ffio(&self) -> bool {
113 self.should_support_ffio
114 }
115
116 #[must_use]
118 pub const fn supports_combined_send_and_recv(&self) -> bool {
119 self.should_support_csr
120 }
121
122 #[must_use]
124 pub const fn port(&self) -> u16 {
125 self.bound_address.port()
126 }
127
128 pub async fn serve(self) {
140 let host_filesystem = self.host_filesystem;
141
142 loop {
143 match self.server.accept().await {
144 Ok((stream, address)) => {
145 let client = address;
146 let bound = self.bound_address;
147 if let Err(cause) = stream.set_nodelay(true) {
148 error!(
149 ?cause,
150 server.address = %bound,
151 client.address = %client,
152 "Failed to disable NAGLE on connection, disabling...",
153 );
154 continue;
155 }
156
157 let result = Self::serve_connection(
158 host_filesystem,
159 stream,
160 self.disable_real_removal,
161 self.should_support_ffio,
162 self.should_support_csr,
163 )
164 .instrument(error_span!(
165 "cat_dev::fsemul::pcfs::sata::serve_connection",
166 server.address = %bound,
167 client.address = %client,
168 ))
169 .await;
170
171 if let Err(cause) = result {
172 error!(
173 ?cause,
174 server.address = %bound,
175 client.address = %client,
176 "Failed to actually handle packets from client SATA connection.",
177 );
178 }
179 }
180 Err(cause) => {
181 error!(
182 ?cause,
183 server.address = %self.bound_address,
184 "Failed to accept PCFS SATA Connection from client, cannot serve.",
185 );
186 }
187 }
188 }
189 }
190
191 #[allow(
194 clippy::too_many_lines,
196 clippy::fn_params_excessive_bools,
199 )]
200 async fn serve_connection(
201 host_filesystem: &'fs HostFilesystem,
202 connection: TcpStream,
203 disable_real_removal: bool,
204 mut supports_ffio: bool,
205 mut supports_csr: bool,
206 ) -> Result<(), CatBridgeError> {
207 connection.set_nodelay(true).map_err(NetworkError::IO)?;
208 let bypass_buff_to_read = Arc::new(AtomicUsize::new(0));
209 let (mut sink, mut stream) =
210 Framed::new(connection, SataProtoChunker(bypass_buff_to_read.clone())).split();
211 let mut first_packet = true;
212
213 loop {
214 while let Some(result) = stream.next().await {
215 let packet = result.map_err(NetworkError::IO)?.freeze();
216 let parsed_packet = SataRequest::try_from(packet)?;
217 parsed_packet.header().ensure_not_from_host()?;
218 if first_packet {
219 let flags = SataCapabilitiesFlags(parsed_packet.header().flags());
220 if !flags.intersects(SataCapabilitiesFlags::FAST_FILE_IO_SUPPORTED) {
221 debug!(
222 flags = valuable(&flags),
223 "Disabling FFIO because first packet header requested it..."
224 );
225 supports_ffio = false;
226 }
227 if !flags.intersects(SataCapabilitiesFlags::COMBINED_SEND_RECV_SUPPORTED) {
228 debug!(
229 flags = valuable(&flags),
230 "Disabling CSR because first packet header requested it..."
231 );
232 supports_csr = false;
233 }
234 }
235 first_packet = false;
236
237 debug!("{}", parsed_packet.command_info());
238 match parsed_packet.body() {
239 SataRequestBody::ChangeMode(ref mode) => {
240 sink.send(mode.handle(parsed_packet.header(), host_filesystem)?)
241 .await
242 .map_err(NetworkError::IO)?;
243 }
244 SataRequestBody::ChangeOwner(ref co) => {
245 sink.send(co.handle(parsed_packet.header())?)
246 .await
247 .map_err(NetworkError::IO)?;
248 }
249 SataRequestBody::CloseFile(ref cf) => {
250 sink.send(cf.handle(parsed_packet.header(), host_filesystem).await?)
251 .await
252 .map_err(NetworkError::IO)?;
253 }
254 SataRequestBody::CloseFolder(ref cf) => {
255 sink.send(cf.handle(parsed_packet.header(), host_filesystem).await?)
256 .await
257 .map_err(NetworkError::IO)?;
258 }
259 SataRequestBody::CreateDirectory(ref cd) => {
260 sink.send(cd.handle(parsed_packet.header(), host_filesystem).await?)
261 .await
262 .map_err(NetworkError::IO)?;
263 }
264 SataRequestBody::GetInfoByQuery(ref info) => {
265 sink.send(info.handle(parsed_packet.header(), host_filesystem).await?)
266 .await
267 .map_err(NetworkError::IO)?;
268 }
269 SataRequestBody::OpenFile(ref file) => {
270 sink.send(
271 file.handle(
272 parsed_packet.header(),
273 parsed_packet.command_info(),
274 host_filesystem,
275 )
276 .await?,
277 )
278 .await
279 .map_err(NetworkError::IO)?;
280 }
281 SataRequestBody::OpenFolder(ref folder) => {
282 sink.send(
283 folder
284 .handle(parsed_packet.header(), host_filesystem)
285 .await?,
286 )
287 .await
288 .map_err(NetworkError::IO)?;
289 }
290 SataRequestBody::Ping(ref ping) => {
291 debug!(
292 client.packet.header = valuable(parsed_packet.header()),
293 client.packet.command_info = valuable(parsed_packet.command_info()),
294 client.packet.body = valuable(ping),
295 "received ping packet from client",
296 );
297
298 sink.send(ping.handle(
299 parsed_packet.header(),
300 parsed_packet.command_info(),
301 supports_ffio,
302 supports_csr,
303 )?)
304 .await
305 .map_err(NetworkError::IO)?;
306 }
307 SataRequestBody::ReadFile(ref rfr) => {
308 sink.send(
309 rfr.handle(parsed_packet.header(), host_filesystem, supports_ffio)
310 .await?,
311 )
312 .await
313 .map_err(NetworkError::IO)?;
314 }
315 SataRequestBody::ReadDirectory(ref rd) => {
316 sink.send(rd.handle(parsed_packet.header(), host_filesystem).await?)
317 .await
318 .map_err(NetworkError::IO)?;
319 }
320 SataRequestBody::Remove(ref rm) => {
321 sink.send(
322 rm.handle(
323 parsed_packet.header(),
324 !disable_real_removal,
325 host_filesystem,
326 )
327 .await?,
328 )
329 .await
330 .map_err(NetworkError::IO)?;
331 }
332 SataRequestBody::Rewind(ref rewind) => {
333 sink.send(
334 rewind
335 .handle(parsed_packet.header(), host_filesystem)
336 .await?,
337 )
338 .await
339 .map_err(NetworkError::IO)?;
340 }
341 SataRequestBody::StatFile(ref st) => {
342 sink.send(
343 SataGetInfoByQueryPacketBody::stat_fd(
344 parsed_packet.header(),
345 host_filesystem,
346 st.file_descriptor(),
347 )
348 .await?,
349 )
350 .await
351 .map_err(NetworkError::IO)?;
352 }
353 SataRequestBody::WriteFile(ref wf) => {
354 sink.send(
355 wf.handle(
356 parsed_packet.header(),
357 host_filesystem,
358 supports_ffio,
359 &mut stream,
360 &bypass_buff_to_read,
361 )
362 .await?,
363 )
364 .await
365 .map_err(NetworkError::IO)?;
366 }
367 }
368 }
369 }
370 }
371}
372
373impl PCFSSataServer<'static> {
374 pub async fn serve_concurrently(self) {
381 let host_filesystem: &'static HostFilesystem = self.host_filesystem;
382
383 loop {
384 match self.server.accept().await {
385 Ok((stream, address)) => {
386 let client = address;
387 let bound = self.bound_address;
388 let spawn_result = TaskBuilder::new()
389 .name("cat_dev::fsemul::pcfs::sata::serve_client_connection_concurrently")
390 .spawn(async move {
391 let result = Self::serve_connection_concurrently(
392 host_filesystem,
393 stream,
394 self.disable_real_removal,
395 self.should_support_ffio,
396 self.should_support_csr,
397 )
398 .instrument(error_span!(
399 "cat_dev::fsemul::pcfs::sata::serve_connection_concurrently",
400 server.address = %bound,
401 client.address = %client,
402 ))
403 .await;
404
405 if let Err(cause) = result {
406 error!(
407 ?cause,
408 server.address = %bound,
409 client.address = %client,
410 "Failed to actually handle packets from client connection",
411 );
412 }
413 });
414
415 if let Err(cause) = spawn_result {
416 error!(
417 ?cause,
418 server.address = %self.bound_address,
419 client.address = %address,
420 "Failed to spawn handler for PCFS Sata Connection, cannot serve task.",
421 );
422 }
423 }
424 Err(cause) => {
425 error!(
426 ?cause,
427 server.address = %self.bound_address,
428 "Failed to accept PCFS SATA Connection from client, cannot serve itself.",
429 );
430 }
431 }
432 }
433 }
434
435 #[allow(
436 clippy::too_many_lines,
438 clippy::fn_params_excessive_bools,
440 )]
441 async fn serve_connection_concurrently(
442 host_filesystem: &'static HostFilesystem,
443 connection: TcpStream,
444 disable_real_removal: bool,
445 mut supports_ffio: bool,
446 mut supports_csr: bool,
447 ) -> Result<(), CatBridgeError> {
448 connection.set_nodelay(true).map_err(NetworkError::IO)?;
449 let bypass_buff_to_read = Arc::new(AtomicUsize::new(0));
450 let (mut sink, mut stream) =
451 Framed::new(connection, SataProtoChunker(bypass_buff_to_read.clone())).split();
452 let mut first_packet = true;
453
454 loop {
455 while let Some(result) = stream.next().await {
456 let packet = result.map_err(NetworkError::IO)?.freeze();
457 let parsed_packet = SataRequest::try_from(packet)?;
458 parsed_packet.header().ensure_not_from_host()?;
459 if first_packet {
460 let flags = SataCapabilitiesFlags(parsed_packet.header().flags());
461 if !flags.intersects(SataCapabilitiesFlags::FAST_FILE_IO_SUPPORTED) {
462 supports_ffio = false;
463 }
464 if !flags.intersects(SataCapabilitiesFlags::COMBINED_SEND_RECV_SUPPORTED) {
465 supports_csr = false;
466 }
467 }
468 first_packet = false;
469
470 debug!("{}", parsed_packet.command_info());
471 match parsed_packet.body() {
472 SataRequestBody::ChangeMode(ref mode) => {
473 sink.send(mode.handle(parsed_packet.header(), host_filesystem)?)
474 .await
475 .map_err(NetworkError::IO)?;
476 }
477 SataRequestBody::ChangeOwner(ref co) => {
478 sink.send(co.handle(parsed_packet.header())?)
479 .await
480 .map_err(NetworkError::IO)?;
481 }
482 SataRequestBody::CloseFile(ref cf) => {
483 sink.send(cf.handle(parsed_packet.header(), host_filesystem).await?)
484 .await
485 .map_err(NetworkError::IO)?;
486 }
487 SataRequestBody::CloseFolder(ref cf) => {
488 sink.send(cf.handle(parsed_packet.header(), host_filesystem).await?)
489 .await
490 .map_err(NetworkError::IO)?;
491 }
492 SataRequestBody::CreateDirectory(ref cd) => {
493 sink.send(cd.handle(parsed_packet.header(), host_filesystem).await?)
494 .await
495 .map_err(NetworkError::IO)?;
496 }
497 SataRequestBody::GetInfoByQuery(ref info) => {
498 sink.send(info.handle(parsed_packet.header(), host_filesystem).await?)
499 .await
500 .map_err(NetworkError::IO)?;
501 }
502 SataRequestBody::OpenFile(ref file) => {
503 sink.send(
504 file.handle(
505 parsed_packet.header(),
506 parsed_packet.command_info(),
507 host_filesystem,
508 )
509 .await?,
510 )
511 .await
512 .map_err(NetworkError::IO)?;
513 }
514 SataRequestBody::OpenFolder(ref folder) => {
515 sink.send(
516 folder
517 .handle(parsed_packet.header(), host_filesystem)
518 .await?,
519 )
520 .await
521 .map_err(NetworkError::IO)?;
522 }
523 SataRequestBody::Ping(ref ping) => {
524 debug!(
525 client.packet.header = valuable(parsed_packet.header()),
526 client.packet.command_info = valuable(parsed_packet.command_info()),
527 client.packet.body = valuable(ping),
528 "received ping packet from client",
529 );
530
531 sink.send(ping.handle(
532 parsed_packet.header(),
533 parsed_packet.command_info(),
534 supports_ffio,
535 supports_csr,
536 )?)
537 .await
538 .map_err(NetworkError::IO)?;
539 }
540 SataRequestBody::ReadFile(ref rfr) => {
541 sink.send(
542 rfr.handle(parsed_packet.header(), host_filesystem, supports_ffio)
543 .await?,
544 )
545 .await
546 .map_err(NetworkError::IO)?;
547 }
548 SataRequestBody::ReadDirectory(ref rd) => {
549 sink.send(rd.handle(parsed_packet.header(), host_filesystem).await?)
550 .await
551 .map_err(NetworkError::IO)?;
552 }
553 SataRequestBody::Remove(ref rm) => {
554 sink.send(
555 rm.handle(
556 parsed_packet.header(),
557 !disable_real_removal,
558 host_filesystem,
559 )
560 .await?,
561 )
562 .await
563 .map_err(NetworkError::IO)?;
564 }
565 SataRequestBody::Rewind(ref rewind) => {
566 sink.send(
567 rewind
568 .handle(parsed_packet.header(), host_filesystem)
569 .await?,
570 )
571 .await
572 .map_err(NetworkError::IO)?;
573 }
574 SataRequestBody::StatFile(ref st) => {
575 sink.send(
576 SataGetInfoByQueryPacketBody::stat_fd(
577 parsed_packet.header(),
578 host_filesystem,
579 st.file_descriptor(),
580 )
581 .await?,
582 )
583 .await
584 .map_err(NetworkError::IO)?;
585 }
586 SataRequestBody::WriteFile(ref wf) => {
587 sink.send(
588 wf.handle(
589 parsed_packet.header(),
590 host_filesystem,
591 supports_ffio,
592 &mut stream,
593 &bypass_buff_to_read,
594 )
595 .await?,
596 )
597 .await
598 .map_err(NetworkError::IO)?;
599 }
600 }
601 }
602 }
603 }
604}
605
606#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Valuable)]
607pub struct SataCapabilitiesFlags(pub u32);
608
609bitflags::bitflags! {
610 impl SataCapabilitiesFlags: u32 {
611 const FAST_FILE_IO_SUPPORTED = 0b0000_0010;
612 const COMBINED_SEND_RECV_SUPPORTED = 0b0000_0100;
613 }
614}