1mod change_mode;
4mod change_owner;
5mod close_file;
6mod close_folder;
7pub mod connection_flags;
8mod create_folder;
9mod get_file_position;
10mod info_by_query;
11mod open_file;
12mod open_folder;
13mod ping;
14mod read_file;
15mod read_folder;
16mod remove;
17mod rename;
18mod rewind_folder;
19mod set_file_position;
20pub mod wal;
21mod write_file;
22
23use crate::{
24 errors::{APIError, CatBridgeError},
25 fsemul::{
26 filesystem::host::HostFilesystem,
27 pcfs::sata::{
28 proto::SataRequest,
29 server::{
30 connection_flags::{
31 SATA_CONNECTION_FLAGS, SataConnectionFlags, SataConnectionFlagsLayer,
32 },
33 wal::{
34 WriteAheadLog,
35 layer::{WALBeginStreamLayer, WALEndStreamLayer, WALMessageLayer},
36 },
37 },
38 },
39 },
40 net::{
41 DEFAULT_CAT_DEV_CHUNK_SIZE,
42 additions::{RequestIDLayer, StreamIDLayer},
43 models::{Endianness, FromRef, NagleGuard, Response},
44 server::{Router, TCPServer, models::ResponseStreamEvent, requestable::Body},
45 },
46};
47use bytes::Bytes;
48use local_ip_address::local_ip;
49use std::{
50 net::{IpAddr, Ipv4Addr, SocketAddrV4},
51 path::PathBuf,
52 time::Duration,
53};
54use tower::ServiceBuilder;
55use tracing::{field::valuable, warn};
56use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit};
57
58pub const DEFAULT_SATA_PORT: u16 = 7500_u16;
60
61#[derive(Clone, Debug, Valuable)]
63pub struct PcfsServerState {
64 disable_real_removal: bool,
66 host_filesystem: HostFilesystem,
68 pid: u32,
70}
71
72impl PcfsServerState {
73 #[must_use]
74 pub const fn new(
75 disable_real_removal: bool,
76 host_filesystem: HostFilesystem,
77 pid: u32,
78 ) -> Self {
79 PcfsServerState {
80 disable_real_removal,
81 host_filesystem,
82 pid,
83 }
84 }
85
86 #[must_use]
87 pub const fn disable_real_removal(&self) -> bool {
88 self.disable_real_removal
89 }
90
91 #[must_use]
92 pub const fn host_filesystem(&self) -> &HostFilesystem {
93 &self.host_filesystem
94 }
95
96 #[must_use]
97 pub const fn pid(&self) -> u32 {
98 self.pid
99 }
100}
101
102impl FromRef<PcfsServerState> for HostFilesystem {
103 fn from_ref(input: &PcfsServerState) -> Self {
104 input.host_filesystem.clone()
105 }
106}
107
108impl FromRef<PcfsServerState> for u32 {
109 fn from_ref(input: &PcfsServerState) -> Self {
110 input.pid
111 }
112}
113
114#[allow(
115 clippy::struct_excessive_bools,
117)]
118#[derive(Clone, Debug)]
119pub struct PcfsSataServerBuilder {
120 address: Option<Ipv4Addr>,
122 cat_dev_sleep_override: Option<Duration>,
128 chunk_override: Option<usize>,
134 disable_csr: bool,
136 disable_ffio: bool,
138 disable_real_removal: bool,
140 fully_disable_cat_dev_sleep: bool,
142 fully_disable_chunk_override: bool,
144 host_filesystem: HostFilesystem,
146 port: Option<u16>,
148 sata_wal_location: Option<PathBuf>,
150 trace_during_debug: bool,
152}
153
154impl PcfsSataServerBuilder {
155 #[must_use]
157 pub const fn new(host_filesystem: HostFilesystem) -> Self {
158 Self {
159 address: None,
160 cat_dev_sleep_override: None,
161 chunk_override: None,
162 disable_csr: false,
163 disable_ffio: false,
164 disable_real_removal: false,
165 fully_disable_cat_dev_sleep: false,
166 fully_disable_chunk_override: false,
167 host_filesystem,
168 port: None,
169 sata_wal_location: None,
170 trace_during_debug: false,
171 }
172 }
173
174 #[must_use]
175 pub const fn address(&self) -> Option<Ipv4Addr> {
176 self.address
177 }
178 #[must_use]
179 pub const fn set_address(mut self, new_address: Option<Ipv4Addr>) -> Self {
180 self.address = new_address;
181 self
182 }
183
184 #[must_use]
185 pub const fn cat_dev_sleep_override(&self) -> Option<Duration> {
186 self.cat_dev_sleep_override
187 }
188 #[must_use]
189 pub const fn set_cat_dev_sleep_override(mut self, new: Option<Duration>) -> Self {
190 self.cat_dev_sleep_override = new;
191 self
192 }
193
194 #[must_use]
195 pub const fn chunk_override(&self) -> Option<usize> {
196 self.chunk_override
197 }
198 #[must_use]
199 pub const fn set_chunk_override(mut self, chunk: Option<usize>) -> Self {
200 self.chunk_override = chunk;
201 self
202 }
203
204 #[must_use]
205 pub const fn disable_csr(&self) -> bool {
206 self.disable_csr
207 }
208 #[must_use]
209 pub const fn set_disable_csr(mut self, new: bool) -> Self {
210 self.disable_csr = new;
211 self
212 }
213
214 #[must_use]
215 pub const fn disable_ffio(&self) -> bool {
216 self.disable_ffio
217 }
218 #[must_use]
219 pub const fn set_disable_ffio(mut self, new: bool) -> Self {
220 self.disable_ffio = new;
221 self
222 }
223
224 #[must_use]
225 pub const fn disable_real_removal(&self) -> bool {
226 self.disable_real_removal
227 }
228 #[must_use]
229 pub const fn set_disable_real_removal(mut self, new: bool) -> Self {
230 self.disable_real_removal = new;
231 self
232 }
233
234 #[must_use]
235 pub const fn fully_disable_cat_dev_sleep(&self) -> bool {
236 self.fully_disable_cat_dev_sleep
237 }
238 #[must_use]
239 pub const fn set_fully_disable_cat_dev_sleep(mut self, new: bool) -> Self {
240 self.fully_disable_cat_dev_sleep = new;
241 self
242 }
243
244 #[must_use]
245 pub const fn fully_disable_chunk_override(&self) -> bool {
246 self.fully_disable_chunk_override
247 }
248 #[must_use]
249 pub const fn set_fully_disable_chunk_override(mut self, new: bool) -> Self {
250 self.fully_disable_chunk_override = new;
251 self
252 }
253
254 #[must_use]
255 pub const fn host_filesystem(&self) -> &HostFilesystem {
256 &self.host_filesystem
257 }
258 #[must_use]
259 pub fn set_host_filesystem(mut self, new: HostFilesystem) -> Self {
260 self.host_filesystem = new;
261 self
262 }
263
264 #[must_use]
265 pub const fn port(&self) -> Option<u16> {
266 self.port
267 }
268 #[must_use]
269 pub const fn set_port(mut self, new: Option<u16>) -> Self {
270 self.port = new;
271 self
272 }
273
274 #[must_use]
275 pub fn sata_wal_location(&self) -> Option<&PathBuf> {
276 self.sata_wal_location.as_ref()
277 }
278 #[must_use]
279 pub fn set_sata_wal_location(mut self, new_location: Option<PathBuf>) -> Self {
280 self.sata_wal_location = new_location;
281 self
282 }
283
284 #[must_use]
285 pub const fn trace_during_debug(&self) -> bool {
286 self.trace_during_debug
287 }
288 #[must_use]
289 pub const fn set_trace_during_debug(mut self, new: bool) -> Self {
290 self.trace_during_debug = new;
291 self
292 }
293
294 #[allow(
303 clippy::manual_map,
306 )]
307 pub async fn build(self) -> Result<TCPServer<PcfsServerState>, CatBridgeError> {
308 let ip = self
309 .address
310 .or_else(|| {
311 local_ip().ok().map(|ip| match ip {
314 IpAddr::V4(v4) => v4,
315 IpAddr::V6(_v6) => unreachable!(),
316 })
317 })
318 .ok_or(APIError::NoHostIpFound)?;
319 let bound_address = SocketAddrV4::new(ip, self.port.unwrap_or(DEFAULT_SATA_PORT));
320
321 let mut router = Router::<PcfsServerState>::new_with_offset(0x30);
322 router.add_route(&0x0_u32.to_be_bytes(), create_folder::handle_create_folder)?;
323 router.add_route(&0x1_u32.to_be_bytes(), open_folder::handle_open_folder)?;
324 router.add_route(&0x2_u32.to_be_bytes(), read_folder::handle_read_folder)?;
325 router.add_route(&0x3_u32.to_be_bytes(), rewind_folder::handle_rewind_folder)?;
326 router.add_route(&0x4_u32.to_be_bytes(), close_folder::handle_close_folder)?;
327 router.add_route(&0x5_u32.to_be_bytes(), open_file::handle_open_file)?;
328 router.add_route(&0x6_u32.to_be_bytes(), read_file::handle_read_file)?;
329 router.add_route(&0x7_u32.to_be_bytes(), write_file::handle_write_file)?;
330 router.add_route(
331 &0x8_u32.to_be_bytes(),
332 get_file_position::handle_get_file_position,
333 )?;
334 router.add_route(
335 &0x9_u32.to_be_bytes(),
336 set_file_position::handle_set_file_position,
337 )?;
338 router.add_route(&0xB_u32.to_be_bytes(), info_by_query::stat_fd)?;
339 router.add_route(&0xD_u32.to_be_bytes(), close_file::handle_close_file)?;
340 router.add_route(&0xE_u32.to_be_bytes(), remove::handle_removal)?;
341 router.add_route(&0xF_u32.to_be_bytes(), rename::handle_rename)?;
342 router.add_route(
343 &0x10_u32.to_be_bytes(),
344 info_by_query::handle_get_info_by_query,
345 )?;
346 router.add_route(&0x12_u32.to_be_bytes(), change_owner::handle_change_owner)?;
347 router.add_route(&0x13_u32.to_be_bytes(), change_mode::handle_change_mode)?;
348 router.add_route(&0x14_u32.to_be_bytes(), ping::handle_ping)?;
349 router.fallback_handler(unknown_packet_handler)?;
350
351 let mut server = TCPServer::new_with_state(
352 "pcfs-sata",
353 bound_address,
354 router,
355 (None, None),
356 NagleGuard::U32LengthPrefixed(Endianness::Big, Some(0x20)),
357 PcfsServerState::new(
358 self.disable_real_removal,
359 self.host_filesystem,
360 std::process::id(),
361 ),
362 self.trace_during_debug,
363 )
364 .await?;
365 let wal = self
366 .sata_wal_location
367 .and_then(|path| WriteAheadLog::new(path).ok());
368
369 server.set_on_stream_begin(async move |event: ResponseStreamEvent<PcfsServerState>| {
370 let sid = event.stream_id();
371
372 _ = SATA_CONNECTION_FLAGS
373 .insert_async(
374 sid,
375 SataConnectionFlags::new_with_flags(!self.disable_ffio, !self.disable_csr),
376 )
377 .await;
378
379 Ok(true)
380 })?;
381 if let Some(w) = wal.as_ref() {
382 server.layer_on_stream_begin(WALBeginStreamLayer(w.clone()))?;
383 }
384 server.set_on_stream_end(on_sata_stream_end)?;
385 if let Some(w) = wal.as_ref() {
386 server.layer_on_stream_end(WALEndStreamLayer(w.clone()))?;
387 }
388
389 create_initial_server_layer(&mut server, wal, self.trace_during_debug);
390
391 server.set_chunk_output_at_size(if self.fully_disable_chunk_override {
392 None
393 } else if let Some(over_ride) = self.chunk_override {
394 Some(over_ride)
395 } else {
396 Some(DEFAULT_CAT_DEV_CHUNK_SIZE)
397 });
398 server.set_cat_dev_slowdown(if self.fully_disable_cat_dev_sleep {
399 None
400 } else if let Some(over_ride) = self.cat_dev_sleep_override {
401 Some(over_ride)
402 } else {
403 None
404 });
405
406 Ok(server)
407 }
408}
409
410const PCFS_SATA_SERVER_BUILDER_FIELDS: &[NamedField<'static>] = &[
411 NamedField::new("address"),
412 NamedField::new("cat_dev_sleep_override"),
413 NamedField::new("chunk_override"),
414 NamedField::new("disable_csr"),
415 NamedField::new("disable_ffio"),
416 NamedField::new("disable_real_removal"),
417 NamedField::new("fully_disable_cat_dev_sleep"),
418 NamedField::new("fully_disable_chunk_override"),
419 NamedField::new("host_filesystem"),
420 NamedField::new("port"),
421 NamedField::new("sata_wal_location"),
422 NamedField::new("trace_during_debug"),
423];
424
425impl Structable for PcfsSataServerBuilder {
426 fn definition(&self) -> StructDef<'_> {
427 StructDef::new_static(
428 "PcfsSataServerBuilder",
429 Fields::Named(PCFS_SATA_SERVER_BUILDER_FIELDS),
430 )
431 }
432}
433
434impl Valuable for PcfsSataServerBuilder {
435 fn as_value(&self) -> Value<'_> {
436 Value::Structable(self)
437 }
438
439 fn visit(&self, visitor: &mut dyn Visit) {
440 visitor.visit_named_fields(&NamedValues::new(
441 PCFS_SATA_SERVER_BUILDER_FIELDS,
442 &[
443 Valuable::as_value(
444 &self
445 .address
446 .map_or_else(|| "<none>".to_owned(), |ip| format!("{ip}")),
447 ),
448 Valuable::as_value(
449 &self
450 .cat_dev_sleep_override
451 .map_or_else(|| "<none>".to_owned(), |dur| format!("{}s", dur.as_secs())),
452 ),
453 Valuable::as_value(&self.chunk_override),
454 Valuable::as_value(&self.disable_csr),
455 Valuable::as_value(&self.disable_ffio),
456 Valuable::as_value(&self.disable_real_removal),
457 Valuable::as_value(&self.fully_disable_cat_dev_sleep),
458 Valuable::as_value(&self.fully_disable_chunk_override),
459 Valuable::as_value(&self.host_filesystem),
460 Valuable::as_value(&self.port),
461 Valuable::as_value(&self.sata_wal_location),
462 Valuable::as_value(&self.trace_during_debug),
463 ],
464 ));
465 }
466}
467
468fn create_initial_server_layer(
469 server: &mut TCPServer<PcfsServerState>,
470 mut wal: Option<WriteAheadLog>,
471 trace_during_debug: bool,
472) {
473 if let Some(w) = wal.take() {
474 if trace_during_debug {
475 server.layer_initial_service(
476 ServiceBuilder::new()
477 .layer(RequestIDLayer::new("sata".to_owned()))
478 .layer(StreamIDLayer)
479 .layer(SataConnectionFlagsLayer)
480 .layer(WALMessageLayer(w)),
481 );
482 } else {
483 server.layer_initial_service(
484 ServiceBuilder::new()
485 .layer(RequestIDLayer::new("sata".to_owned()))
486 .layer(SataConnectionFlagsLayer)
487 .layer(WALMessageLayer(w)),
488 );
489 }
490 } else if trace_during_debug {
491 server.layer_initial_service(
492 ServiceBuilder::new()
493 .layer(RequestIDLayer::new("sata".to_owned()))
494 .layer(StreamIDLayer)
495 .layer(SataConnectionFlagsLayer),
496 );
497 } else {
498 server.layer_initial_service(
499 ServiceBuilder::new()
500 .layer(RequestIDLayer::new("sata".to_owned()))
501 .layer(SataConnectionFlagsLayer),
502 );
503 }
504}
505
506async fn unknown_packet_handler(Body(request): Body<Bytes>) -> Response {
507 if let Ok(req) = SataRequest::<Bytes>::parse_opaque(request.clone()) {
508 warn!(
509 header = valuable(req.header()),
510 command_info = valuable(req.command_info()),
511 body = format!("{:02X?}", req.body()),
512 "Unknown Pcfs Sata packet!",
513 );
514 } else {
515 warn!(
516 packet = format!("{:02X?}", request),
517 "Unknown Unparsable Pcfs Sata Packet!",
518 );
519 }
520
521 Response::empty_close()
522}
523
524async fn on_sata_stream_end(
535 event: ResponseStreamEvent<PcfsServerState>,
536) -> Result<(), CatBridgeError> {
537 let sid = event.stream_id();
538 _ = SATA_CONNECTION_FLAGS.remove_async(&sid).await;
539 Ok(())
540}