mod change_mode;
mod change_owner;
mod close_file;
mod close_folder;
pub mod connection_flags;
mod create_folder;
mod info_by_query;
mod open_file;
mod open_folder;
mod ping;
mod read_file;
mod read_folder;
mod remove;
mod rename;
mod rewind_folder;
mod set_file_position;
pub mod wal;
mod write_file;
use crate::{
errors::{APIError, CatBridgeError},
fsemul::{
HostFilesystem,
pcfs::sata::{
proto::SataRequest,
server::{
connection_flags::{
SATA_CONNECTION_FLAGS, SataConnectionFlags, SataConnectionFlagsLayer,
},
wal::{
WriteAheadLog,
layer::{WALBeginStreamLayer, WALEndStreamLayer, WALMessageLayer},
},
},
},
},
net::{
DEFAULT_CAT_DEV_CHUNK_SIZE, DEFAULT_CAT_DEV_SLOWDOWN,
additions::{RequestIDLayer, StreamIDLayer},
models::{Endianness, FromRef, NagleGuard, Response},
server::{Router, TCPServer, models::ResponseStreamEvent, requestable::Body},
},
};
use bytes::Bytes;
use local_ip_address::local_ip;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddrV4},
path::PathBuf,
time::Duration,
};
use tower::ServiceBuilder;
use tracing::{field::valuable, warn};
use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit};
pub const DEFAULT_SATA_PORT: u16 = 7500_u16;
#[derive(Clone, Debug, Valuable)]
pub struct PcfsServerState {
disable_real_removal: bool,
host_filesystem: HostFilesystem,
pid: u32,
}
impl PcfsServerState {
#[must_use]
pub const fn new(
disable_real_removal: bool,
host_filesystem: HostFilesystem,
pid: u32,
) -> Self {
PcfsServerState {
disable_real_removal,
host_filesystem,
pid,
}
}
#[must_use]
pub const fn disable_real_removal(&self) -> bool {
self.disable_real_removal
}
#[must_use]
pub const fn host_filesystem(&self) -> &HostFilesystem {
&self.host_filesystem
}
#[must_use]
pub const fn pid(&self) -> u32 {
self.pid
}
}
impl FromRef<PcfsServerState> for HostFilesystem {
fn from_ref(input: &PcfsServerState) -> Self {
input.host_filesystem.clone()
}
}
impl FromRef<PcfsServerState> for u32 {
fn from_ref(input: &PcfsServerState) -> Self {
input.pid
}
}
#[allow(
// TODO(mythra): maybe refactor these into two variant enums.
clippy::struct_excessive_bools,
)]
#[derive(Clone, Debug)]
pub struct PcfsSataServerBuilder {
address: Option<Ipv4Addr>,
cat_dev_sleep_override: Option<Duration>,
chunk_override: Option<usize>,
disable_csr: bool,
disable_ffio: bool,
disable_real_removal: bool,
fully_disable_cat_dev_sleep: bool,
fully_disable_chunk_override: bool,
host_filesystem: HostFilesystem,
port: Option<u16>,
sata_wal_location: Option<PathBuf>,
trace_during_debug: bool,
}
impl PcfsSataServerBuilder {
#[must_use]
pub const fn new(host_filesystem: HostFilesystem) -> Self {
Self {
address: None,
cat_dev_sleep_override: None,
chunk_override: None,
disable_csr: false,
disable_ffio: false,
disable_real_removal: false,
fully_disable_cat_dev_sleep: false,
fully_disable_chunk_override: false,
host_filesystem,
port: None,
sata_wal_location: None,
trace_during_debug: false,
}
}
#[must_use]
pub const fn address(&self) -> Option<Ipv4Addr> {
self.address
}
#[must_use]
pub const fn set_address(mut self, new_address: Option<Ipv4Addr>) -> Self {
self.address = new_address;
self
}
#[must_use]
pub const fn cat_dev_sleep_override(&self) -> Option<Duration> {
self.cat_dev_sleep_override
}
#[must_use]
pub const fn set_cat_dev_sleep_override(mut self, new: Option<Duration>) -> Self {
self.cat_dev_sleep_override = new;
self
}
#[must_use]
pub const fn chunk_override(&self) -> Option<usize> {
self.chunk_override
}
#[must_use]
pub const fn set_chunk_override(mut self, chunk: Option<usize>) -> Self {
self.chunk_override = chunk;
self
}
#[must_use]
pub const fn disable_csr(&self) -> bool {
self.disable_csr
}
#[must_use]
pub const fn set_disable_csr(mut self, new: bool) -> Self {
self.disable_csr = new;
self
}
#[must_use]
pub const fn disable_ffio(&self) -> bool {
self.disable_ffio
}
#[must_use]
pub const fn set_disable_ffio(mut self, new: bool) -> Self {
self.disable_ffio = new;
self
}
#[must_use]
pub const fn disable_real_removal(&self) -> bool {
self.disable_real_removal
}
#[must_use]
pub const fn set_disable_real_removal(mut self, new: bool) -> Self {
self.disable_real_removal = new;
self
}
#[must_use]
pub const fn fully_disable_cat_dev_sleep(&self) -> bool {
self.fully_disable_cat_dev_sleep
}
#[must_use]
pub const fn set_fully_disable_cat_dev_sleep(mut self, new: bool) -> Self {
self.fully_disable_cat_dev_sleep = new;
self
}
#[must_use]
pub const fn fully_disable_chunk_override(&self) -> bool {
self.fully_disable_chunk_override
}
#[must_use]
pub const fn set_fully_disable_chunk_override(mut self, new: bool) -> Self {
self.fully_disable_chunk_override = new;
self
}
#[must_use]
pub const fn host_filesystem(&self) -> &HostFilesystem {
&self.host_filesystem
}
#[must_use]
pub fn set_host_filesystem(mut self, new: HostFilesystem) -> Self {
self.host_filesystem = new;
self
}
#[must_use]
pub const fn port(&self) -> Option<u16> {
self.port
}
#[must_use]
pub const fn set_port(mut self, new: Option<u16>) -> Self {
self.port = new;
self
}
#[must_use]
pub fn sata_wal_location(&self) -> Option<&PathBuf> {
self.sata_wal_location.as_ref()
}
#[must_use]
pub fn set_sata_wal_location(mut self, new_location: Option<PathBuf>) -> Self {
self.sata_wal_location = new_location;
self
}
#[must_use]
pub const fn trace_during_debug(&self) -> bool {
self.trace_during_debug
}
#[must_use]
pub const fn set_trace_during_debug(mut self, new: bool) -> Self {
self.trace_during_debug = new;
self
}
pub async fn build(self) -> Result<TCPServer<PcfsServerState>, CatBridgeError> {
let ip = self
.address
.or_else(|| {
local_ip().ok().map(|ip| match ip {
IpAddr::V4(v4) => v4,
IpAddr::V6(_v6) => unreachable!(),
})
})
.ok_or(APIError::NoHostIpFound)?;
let bound_address = SocketAddrV4::new(ip, self.port.unwrap_or(DEFAULT_SATA_PORT));
let mut router = Router::<PcfsServerState>::new_with_offset(0x30);
router.add_route(&0x0_u32.to_be_bytes(), create_folder::handle_create_folder)?;
router.add_route(&0x1_u32.to_be_bytes(), open_folder::handle_open_folder)?;
router.add_route(&0x2_u32.to_be_bytes(), read_folder::handle_read_folder)?;
router.add_route(&0x3_u32.to_be_bytes(), rewind_folder::handle_rewind_folder)?;
router.add_route(&0x4_u32.to_be_bytes(), close_folder::handle_close_folder)?;
router.add_route(&0x5_u32.to_be_bytes(), open_file::handle_open_file)?;
router.add_route(&0x6_u32.to_be_bytes(), read_file::handle_read_file)?;
router.add_route(&0x7_u32.to_be_bytes(), write_file::handle_write_file)?;
router.add_route(
&0x9_u32.to_be_bytes(),
set_file_position::handle_set_file_position,
)?;
router.add_route(&0xB_u32.to_be_bytes(), info_by_query::stat_fd)?;
router.add_route(&0xD_u32.to_be_bytes(), close_file::handle_close_file)?;
router.add_route(&0xE_u32.to_be_bytes(), remove::handle_removal)?;
router.add_route(&0xF_u32.to_be_bytes(), rename::handle_rename)?;
router.add_route(
&0x10_u32.to_be_bytes(),
info_by_query::handle_get_info_by_query,
)?;
router.add_route(&0x12_u32.to_be_bytes(), change_owner::handle_change_owner)?;
router.add_route(&0x13_u32.to_be_bytes(), change_mode::handle_change_mode)?;
router.add_route(&0x14_u32.to_be_bytes(), ping::handle_ping)?;
router.fallback_handler(unknown_packet_handler)?;
let mut server = TCPServer::new_with_state(
"pcfs-sata",
bound_address,
router,
(None, None),
NagleGuard::U32LengthPrefixed(Endianness::Big, Some(0x20)),
PcfsServerState::new(
self.disable_real_removal,
self.host_filesystem,
std::process::id(),
),
self.trace_during_debug,
)
.await?;
let wal = self
.sata_wal_location
.and_then(|path| WriteAheadLog::new(path).ok());
server.set_on_stream_begin(async move |event: ResponseStreamEvent<PcfsServerState>| {
let sid = event.stream_id();
_ = SATA_CONNECTION_FLAGS
.insert_async(
sid,
SataConnectionFlags::new_with_flags(!self.disable_ffio, !self.disable_csr),
)
.await;
Ok(true)
})?;
if let Some(w) = wal.as_ref() {
server.layer_on_stream_begin(WALBeginStreamLayer(w.clone()))?;
}
server.set_on_stream_end(on_sata_stream_end)?;
if let Some(w) = wal.as_ref() {
server.layer_on_stream_end(WALEndStreamLayer(w.clone()))?;
}
create_initial_server_layer(&mut server, wal, self.trace_during_debug);
server.set_chunk_output_at_size(if self.fully_disable_chunk_override {
None
} else if let Some(over_ride) = self.chunk_override {
Some(over_ride)
} else {
Some(DEFAULT_CAT_DEV_CHUNK_SIZE)
});
server.set_cat_dev_slowdown(if self.fully_disable_cat_dev_sleep {
None
} else if let Some(over_ride) = self.cat_dev_sleep_override {
Some(over_ride)
} else {
Some(DEFAULT_CAT_DEV_SLOWDOWN)
});
Ok(server)
}
}
const PCFS_SATA_SERVER_BUILDER_FIELDS: &[NamedField<'static>] = &[
NamedField::new("address"),
NamedField::new("cat_dev_sleep_override"),
NamedField::new("chunk_override"),
NamedField::new("disable_csr"),
NamedField::new("disable_ffio"),
NamedField::new("disable_real_removal"),
NamedField::new("fully_disable_cat_dev_sleep"),
NamedField::new("fully_disable_chunk_override"),
NamedField::new("host_filesystem"),
NamedField::new("port"),
NamedField::new("sata_wal_location"),
NamedField::new("trace_during_debug"),
];
impl Structable for PcfsSataServerBuilder {
fn definition(&self) -> StructDef<'_> {
StructDef::new_static(
"PcfsSataServerBuilder",
Fields::Named(PCFS_SATA_SERVER_BUILDER_FIELDS),
)
}
}
impl Valuable for PcfsSataServerBuilder {
fn as_value(&self) -> Value<'_> {
Value::Structable(self)
}
fn visit(&self, visitor: &mut dyn Visit) {
visitor.visit_named_fields(&NamedValues::new(
PCFS_SATA_SERVER_BUILDER_FIELDS,
&[
Valuable::as_value(
&self
.address
.map_or_else(|| "<none>".to_owned(), |ip| format!("{ip}")),
),
Valuable::as_value(
&self
.cat_dev_sleep_override
.map_or_else(|| "<none>".to_owned(), |dur| format!("{}s", dur.as_secs())),
),
Valuable::as_value(&self.chunk_override),
Valuable::as_value(&self.disable_csr),
Valuable::as_value(&self.disable_ffio),
Valuable::as_value(&self.disable_real_removal),
Valuable::as_value(&self.fully_disable_cat_dev_sleep),
Valuable::as_value(&self.fully_disable_chunk_override),
Valuable::as_value(&self.host_filesystem),
Valuable::as_value(&self.port),
Valuable::as_value(&self.sata_wal_location),
Valuable::as_value(&self.trace_during_debug),
],
));
}
}
fn create_initial_server_layer(
server: &mut TCPServer<PcfsServerState>,
mut wal: Option<WriteAheadLog>,
trace_during_debug: bool,
) {
if let Some(w) = wal.take() {
if trace_during_debug {
server.layer_initial_service(
ServiceBuilder::new()
.layer(RequestIDLayer::new("sata".to_owned()))
.layer(StreamIDLayer)
.layer(SataConnectionFlagsLayer)
.layer(WALMessageLayer(w)),
);
} else {
server.layer_initial_service(
ServiceBuilder::new()
.layer(RequestIDLayer::new("sata".to_owned()))
.layer(SataConnectionFlagsLayer)
.layer(WALMessageLayer(w)),
);
}
} else if trace_during_debug {
server.layer_initial_service(
ServiceBuilder::new()
.layer(RequestIDLayer::new("sata".to_owned()))
.layer(StreamIDLayer)
.layer(SataConnectionFlagsLayer),
);
} else {
server.layer_initial_service(
ServiceBuilder::new()
.layer(RequestIDLayer::new("sata".to_owned()))
.layer(SataConnectionFlagsLayer),
);
}
}
async fn unknown_packet_handler(Body(request): Body<Bytes>) -> Response {
if let Ok(req) = SataRequest::<Bytes>::parse_opaque(request.clone()) {
warn!(
header = valuable(req.header()),
command_info = valuable(req.command_info()),
body = format!("{:02X?}", req.body()),
"Unknown Pcfs Sata packet!",
);
} else {
warn!(
packet = format!("{:02X?}", request),
"Unknown Unparsable Pcfs Sata Packet!",
);
}
Response::empty_close()
}
async fn on_sata_stream_end(
event: ResponseStreamEvent<PcfsServerState>,
) -> Result<(), CatBridgeError> {
let sid = event.stream_id();
_ = SATA_CONNECTION_FLAGS.remove_async(&sid).await;
Ok(())
}