1mod change_mode;
4mod change_owner;
5mod close_file;
6mod close_folder;
7pub mod connection_flags;
8mod create_folder;
9mod info_by_query;
10mod open_file;
11mod open_folder;
12mod ping;
13mod read_file;
14mod read_folder;
15mod remove;
16mod rename;
17mod rewind_folder;
18mod set_file_position;
19pub mod wal;
20mod write_file;
21
22use crate::{
23 errors::{APIError, CatBridgeError},
24 fsemul::{
25 HostFilesystem,
26 pcfs::sata::{
27 proto::SataRequest,
28 server::{
29 connection_flags::{
30 SATA_CONNECTION_FLAGS, SataConnectionFlags, SataConnectionFlagsLayer,
31 },
32 wal::{
33 WriteAheadLog,
34 layer::{WALBeginStreamLayer, WALEndStreamLayer, WALMessageLayer},
35 },
36 },
37 },
38 },
39 net::{
40 DEFAULT_CAT_DEV_CHUNK_SIZE, DEFAULT_CAT_DEV_SLOWDOWN,
41 additions::{RequestIDLayer, StreamIDLayer},
42 models::{Endianness, FromRef, NagleGuard, Response},
43 server::{Router, TCPServer, models::ResponseStreamEvent, requestable::Body},
44 },
45};
46use bytes::Bytes;
47use local_ip_address::local_ip;
48use std::{
49 net::{IpAddr, Ipv4Addr, SocketAddrV4},
50 path::PathBuf,
51 time::Duration,
52};
53use tower::ServiceBuilder;
54use tracing::{field::valuable, warn};
55use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit};
56
57pub const DEFAULT_SATA_PORT: u16 = 7500_u16;
59
60#[derive(Clone, Debug, Valuable)]
62pub struct PcfsServerState {
63 disable_real_removal: bool,
65 host_filesystem: HostFilesystem,
67 pid: u32,
69}
70
71impl PcfsServerState {
72 #[must_use]
73 pub const fn new(
74 disable_real_removal: bool,
75 host_filesystem: HostFilesystem,
76 pid: u32,
77 ) -> Self {
78 PcfsServerState {
79 disable_real_removal,
80 host_filesystem,
81 pid,
82 }
83 }
84
85 #[must_use]
86 pub const fn disable_real_removal(&self) -> bool {
87 self.disable_real_removal
88 }
89
90 #[must_use]
91 pub const fn host_filesystem(&self) -> &HostFilesystem {
92 &self.host_filesystem
93 }
94
95 #[must_use]
96 pub const fn pid(&self) -> u32 {
97 self.pid
98 }
99}
100
101impl FromRef<PcfsServerState> for HostFilesystem {
102 fn from_ref(input: &PcfsServerState) -> Self {
103 input.host_filesystem.clone()
104 }
105}
106
107impl FromRef<PcfsServerState> for u32 {
108 fn from_ref(input: &PcfsServerState) -> Self {
109 input.pid
110 }
111}
112
113#[allow(
114 clippy::struct_excessive_bools,
116)]
117#[derive(Clone, Debug)]
118pub struct PcfsSataServerBuilder {
119 address: Option<Ipv4Addr>,
121 cat_dev_sleep_override: Option<Duration>,
127 chunk_override: Option<usize>,
133 disable_csr: bool,
135 disable_ffio: bool,
137 disable_real_removal: bool,
139 fully_disable_cat_dev_sleep: bool,
141 fully_disable_chunk_override: bool,
143 host_filesystem: HostFilesystem,
145 port: Option<u16>,
147 sata_wal_location: Option<PathBuf>,
149 trace_during_debug: bool,
151}
152
153impl PcfsSataServerBuilder {
154 #[must_use]
156 pub const fn new(host_filesystem: HostFilesystem) -> Self {
157 Self {
158 address: None,
159 cat_dev_sleep_override: None,
160 chunk_override: None,
161 disable_csr: false,
162 disable_ffio: false,
163 disable_real_removal: false,
164 fully_disable_cat_dev_sleep: false,
165 fully_disable_chunk_override: false,
166 host_filesystem,
167 port: None,
168 sata_wal_location: None,
169 trace_during_debug: false,
170 }
171 }
172
173 #[must_use]
174 pub const fn address(&self) -> Option<Ipv4Addr> {
175 self.address
176 }
177 #[must_use]
178 pub const fn set_address(mut self, new_address: Option<Ipv4Addr>) -> Self {
179 self.address = new_address;
180 self
181 }
182
183 #[must_use]
184 pub const fn cat_dev_sleep_override(&self) -> Option<Duration> {
185 self.cat_dev_sleep_override
186 }
187 #[must_use]
188 pub const fn set_cat_dev_sleep_override(mut self, new: Option<Duration>) -> Self {
189 self.cat_dev_sleep_override = new;
190 self
191 }
192
193 #[must_use]
194 pub const fn chunk_override(&self) -> Option<usize> {
195 self.chunk_override
196 }
197 #[must_use]
198 pub const fn set_chunk_override(mut self, chunk: Option<usize>) -> Self {
199 self.chunk_override = chunk;
200 self
201 }
202
203 #[must_use]
204 pub const fn disable_csr(&self) -> bool {
205 self.disable_csr
206 }
207 #[must_use]
208 pub const fn set_disable_csr(mut self, new: bool) -> Self {
209 self.disable_csr = new;
210 self
211 }
212
213 #[must_use]
214 pub const fn disable_ffio(&self) -> bool {
215 self.disable_ffio
216 }
217 #[must_use]
218 pub const fn set_disable_ffio(mut self, new: bool) -> Self {
219 self.disable_ffio = new;
220 self
221 }
222
223 #[must_use]
224 pub const fn disable_real_removal(&self) -> bool {
225 self.disable_real_removal
226 }
227 #[must_use]
228 pub const fn set_disable_real_removal(mut self, new: bool) -> Self {
229 self.disable_real_removal = new;
230 self
231 }
232
233 #[must_use]
234 pub const fn fully_disable_cat_dev_sleep(&self) -> bool {
235 self.fully_disable_cat_dev_sleep
236 }
237 #[must_use]
238 pub const fn set_fully_disable_cat_dev_sleep(mut self, new: bool) -> Self {
239 self.fully_disable_cat_dev_sleep = new;
240 self
241 }
242
243 #[must_use]
244 pub const fn fully_disable_chunk_override(&self) -> bool {
245 self.fully_disable_chunk_override
246 }
247 #[must_use]
248 pub const fn set_fully_disable_chunk_override(mut self, new: bool) -> Self {
249 self.fully_disable_chunk_override = new;
250 self
251 }
252
253 #[must_use]
254 pub const fn host_filesystem(&self) -> &HostFilesystem {
255 &self.host_filesystem
256 }
257 #[must_use]
258 pub fn set_host_filesystem(mut self, new: HostFilesystem) -> Self {
259 self.host_filesystem = new;
260 self
261 }
262
263 #[must_use]
264 pub const fn port(&self) -> Option<u16> {
265 self.port
266 }
267 #[must_use]
268 pub const fn set_port(mut self, new: Option<u16>) -> Self {
269 self.port = new;
270 self
271 }
272
273 #[must_use]
274 pub fn sata_wal_location(&self) -> Option<&PathBuf> {
275 self.sata_wal_location.as_ref()
276 }
277 #[must_use]
278 pub fn set_sata_wal_location(mut self, new_location: Option<PathBuf>) -> Self {
279 self.sata_wal_location = new_location;
280 self
281 }
282
283 #[must_use]
284 pub const fn trace_during_debug(&self) -> bool {
285 self.trace_during_debug
286 }
287 #[must_use]
288 pub const fn set_trace_during_debug(mut self, new: bool) -> Self {
289 self.trace_during_debug = new;
290 self
291 }
292
293 pub async fn build(self) -> Result<TCPServer<PcfsServerState>, CatBridgeError> {
302 let ip = self
303 .address
304 .or_else(|| {
305 local_ip().ok().map(|ip| match ip {
308 IpAddr::V4(v4) => v4,
309 IpAddr::V6(_v6) => unreachable!(),
310 })
311 })
312 .ok_or(APIError::NoHostIpFound)?;
313 let bound_address = SocketAddrV4::new(ip, self.port.unwrap_or(DEFAULT_SATA_PORT));
314
315 let mut router = Router::<PcfsServerState>::new_with_offset(0x30);
316 router.add_route(&0x0_u32.to_be_bytes(), create_folder::handle_create_folder)?;
317 router.add_route(&0x1_u32.to_be_bytes(), open_folder::handle_open_folder)?;
318 router.add_route(&0x2_u32.to_be_bytes(), read_folder::handle_read_folder)?;
319 router.add_route(&0x3_u32.to_be_bytes(), rewind_folder::handle_rewind_folder)?;
320 router.add_route(&0x4_u32.to_be_bytes(), close_folder::handle_close_folder)?;
321 router.add_route(&0x5_u32.to_be_bytes(), open_file::handle_open_file)?;
322 router.add_route(&0x6_u32.to_be_bytes(), read_file::handle_read_file)?;
323 router.add_route(&0x7_u32.to_be_bytes(), write_file::handle_write_file)?;
324 router.add_route(
325 &0x9_u32.to_be_bytes(),
326 set_file_position::handle_set_file_position,
327 )?;
328 router.add_route(&0xB_u32.to_be_bytes(), info_by_query::stat_fd)?;
329 router.add_route(&0xD_u32.to_be_bytes(), close_file::handle_close_file)?;
330 router.add_route(&0xE_u32.to_be_bytes(), remove::handle_removal)?;
331 router.add_route(&0xF_u32.to_be_bytes(), rename::handle_rename)?;
332 router.add_route(
333 &0x10_u32.to_be_bytes(),
334 info_by_query::handle_get_info_by_query,
335 )?;
336 router.add_route(&0x12_u32.to_be_bytes(), change_owner::handle_change_owner)?;
337 router.add_route(&0x13_u32.to_be_bytes(), change_mode::handle_change_mode)?;
338 router.add_route(&0x14_u32.to_be_bytes(), ping::handle_ping)?;
339 router.fallback_handler(unknown_packet_handler)?;
340
341 let mut server = TCPServer::new_with_state(
342 "pcfs-sata",
343 bound_address,
344 router,
345 (None, None),
346 NagleGuard::U32LengthPrefixed(Endianness::Big, Some(0x20)),
347 PcfsServerState::new(
348 self.disable_real_removal,
349 self.host_filesystem,
350 std::process::id(),
351 ),
352 self.trace_during_debug,
353 )
354 .await?;
355 let wal = self
356 .sata_wal_location
357 .and_then(|path| WriteAheadLog::new(path).ok());
358
359 server.set_on_stream_begin(async move |event: ResponseStreamEvent<PcfsServerState>| {
360 let sid = event.stream_id();
361
362 _ = SATA_CONNECTION_FLAGS
363 .insert_async(
364 sid,
365 SataConnectionFlags::new_with_flags(!self.disable_ffio, !self.disable_csr),
366 )
367 .await;
368
369 Ok(true)
370 })?;
371 if let Some(w) = wal.as_ref() {
372 server.layer_on_stream_begin(WALBeginStreamLayer(w.clone()))?;
373 }
374 server.set_on_stream_end(on_sata_stream_end)?;
375 if let Some(w) = wal.as_ref() {
376 server.layer_on_stream_end(WALEndStreamLayer(w.clone()))?;
377 }
378
379 create_initial_server_layer(&mut server, wal, self.trace_during_debug);
380
381 server.set_chunk_output_at_size(if self.fully_disable_chunk_override {
382 None
383 } else if let Some(over_ride) = self.chunk_override {
384 Some(over_ride)
385 } else {
386 Some(DEFAULT_CAT_DEV_CHUNK_SIZE)
387 });
388 server.set_cat_dev_slowdown(if self.fully_disable_cat_dev_sleep {
389 None
390 } else if let Some(over_ride) = self.cat_dev_sleep_override {
391 Some(over_ride)
392 } else {
393 Some(DEFAULT_CAT_DEV_SLOWDOWN)
394 });
395
396 Ok(server)
397 }
398}
399
400const PCFS_SATA_SERVER_BUILDER_FIELDS: &[NamedField<'static>] = &[
401 NamedField::new("address"),
402 NamedField::new("cat_dev_sleep_override"),
403 NamedField::new("chunk_override"),
404 NamedField::new("disable_csr"),
405 NamedField::new("disable_ffio"),
406 NamedField::new("disable_real_removal"),
407 NamedField::new("fully_disable_cat_dev_sleep"),
408 NamedField::new("fully_disable_chunk_override"),
409 NamedField::new("host_filesystem"),
410 NamedField::new("port"),
411 NamedField::new("sata_wal_location"),
412 NamedField::new("trace_during_debug"),
413];
414
415impl Structable for PcfsSataServerBuilder {
416 fn definition(&self) -> StructDef<'_> {
417 StructDef::new_static(
418 "PcfsSataServerBuilder",
419 Fields::Named(PCFS_SATA_SERVER_BUILDER_FIELDS),
420 )
421 }
422}
423
424impl Valuable for PcfsSataServerBuilder {
425 fn as_value(&self) -> Value<'_> {
426 Value::Structable(self)
427 }
428
429 fn visit(&self, visitor: &mut dyn Visit) {
430 visitor.visit_named_fields(&NamedValues::new(
431 PCFS_SATA_SERVER_BUILDER_FIELDS,
432 &[
433 Valuable::as_value(
434 &self
435 .address
436 .map_or_else(|| "<none>".to_owned(), |ip| format!("{ip}")),
437 ),
438 Valuable::as_value(
439 &self
440 .cat_dev_sleep_override
441 .map_or_else(|| "<none>".to_owned(), |dur| format!("{}s", dur.as_secs())),
442 ),
443 Valuable::as_value(&self.chunk_override),
444 Valuable::as_value(&self.disable_csr),
445 Valuable::as_value(&self.disable_ffio),
446 Valuable::as_value(&self.disable_real_removal),
447 Valuable::as_value(&self.fully_disable_cat_dev_sleep),
448 Valuable::as_value(&self.fully_disable_chunk_override),
449 Valuable::as_value(&self.host_filesystem),
450 Valuable::as_value(&self.port),
451 Valuable::as_value(&self.sata_wal_location),
452 Valuable::as_value(&self.trace_during_debug),
453 ],
454 ));
455 }
456}
457
458fn create_initial_server_layer(
459 server: &mut TCPServer<PcfsServerState>,
460 mut wal: Option<WriteAheadLog>,
461 trace_during_debug: bool,
462) {
463 if let Some(w) = wal.take() {
464 if trace_during_debug {
465 server.layer_initial_service(
466 ServiceBuilder::new()
467 .layer(RequestIDLayer::new("sata".to_owned()))
468 .layer(StreamIDLayer)
469 .layer(SataConnectionFlagsLayer)
470 .layer(WALMessageLayer(w)),
471 );
472 } else {
473 server.layer_initial_service(
474 ServiceBuilder::new()
475 .layer(RequestIDLayer::new("sata".to_owned()))
476 .layer(SataConnectionFlagsLayer)
477 .layer(WALMessageLayer(w)),
478 );
479 }
480 } else if trace_during_debug {
481 server.layer_initial_service(
482 ServiceBuilder::new()
483 .layer(RequestIDLayer::new("sata".to_owned()))
484 .layer(StreamIDLayer)
485 .layer(SataConnectionFlagsLayer),
486 );
487 } else {
488 server.layer_initial_service(
489 ServiceBuilder::new()
490 .layer(RequestIDLayer::new("sata".to_owned()))
491 .layer(SataConnectionFlagsLayer),
492 );
493 }
494}
495
496async fn unknown_packet_handler(Body(request): Body<Bytes>) -> Response {
497 if let Ok(req) = SataRequest::<Bytes>::parse_opaque(request.clone()) {
498 warn!(
499 header = valuable(req.header()),
500 command_info = valuable(req.command_info()),
501 body = format!("{:02X?}", req.body()),
502 "Unknown Pcfs Sata packet!",
503 );
504 } else {
505 warn!(
506 packet = format!("{:02X?}", request),
507 "Unknown Unparsable Pcfs Sata Packet!",
508 );
509 }
510
511 Response::empty_close()
512}
513
514async fn on_sata_stream_end(
525 event: ResponseStreamEvent<PcfsServerState>,
526) -> Result<(), CatBridgeError> {
527 let sid = event.stream_id();
528 _ = SATA_CONNECTION_FLAGS.remove_async(&sid).await;
529 Ok(())
530}