mod message;
mod read;
use crate::{
errors::CatBridgeError,
fsemul::{
HostFilesystem,
sdio::{
DEFAULT_SDIO_BLOCK_PORT, DEFAULT_SDIO_CONTROL_PORT, SDIO_DATA_STREAMS,
data_stream::DataStream,
proto::{SdioControlPacketType, message::SdioControlTelnetChannel},
},
},
mion::proto::control::MionBootType,
net::{
DEFAULT_CAT_DEV_CHUNK_SIZE, DEFAULT_CAT_DEV_SLOWDOWN,
additions::{RequestIDLayer, StreamIDLayer},
models::FromRef,
server::{Router, TCPServer, models::ResponseStreamEvent},
},
};
use scc::HashMap as ConcurrentHashMap;
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::{Arc, LazyLock},
time::Duration,
};
use tokio::sync::{Mutex, oneshot::Sender as OneshotSender};
use tower::ServiceBuilder;
use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit};
static SDIO_PRINTF_BUFFS: LazyLock<ConcurrentHashMap<(u64, SdioControlTelnetChannel), String>> =
LazyLock::new(|| ConcurrentHashMap::with_capacity(1));
#[derive(Clone, Debug)]
pub struct SdioServerBuilder {
active_hook: Option<Arc<Mutex<Option<OneshotSender<()>>>>>,
boot_type: MionBootType,
cat_dev_sleep_override: Option<Duration>,
chunk_override: Option<usize>,
control_port: Option<u16>,
data_port: Option<u16>,
fully_disable_cat_dev_sleep: bool,
fully_disable_chunk_override: bool,
host_filesystem: HostFilesystem,
mion_ip: Ipv4Addr,
trace_during_debug: bool,
}
impl SdioServerBuilder {
#[must_use]
pub const fn new(
boot_type: MionBootType,
host_filesystem: HostFilesystem,
mion_ip: Ipv4Addr,
) -> Self {
Self {
active_hook: None,
boot_type,
cat_dev_sleep_override: None,
chunk_override: None,
control_port: None,
data_port: None,
host_filesystem,
fully_disable_cat_dev_sleep: false,
fully_disable_chunk_override: false,
mion_ip,
trace_during_debug: false,
}
}
#[must_use]
pub fn set_active_hook(mut self, hook: Option<OneshotSender<()>>) -> Self {
self.active_hook = hook.map(|h| Arc::new(Mutex::new(Some(h))));
self
}
#[must_use]
pub fn raw_set_active_hook(
mut self,
hook: Option<Arc<Mutex<Option<OneshotSender<()>>>>>,
) -> Self {
self.active_hook = hook;
self
}
#[must_use]
pub const fn boot_type(&self) -> MionBootType {
self.boot_type
}
#[must_use]
pub const fn set_boot_type(mut self, new_type: MionBootType) -> Self {
self.boot_type = new_type;
self
}
#[must_use]
pub const fn cat_dev_sleep_override(&self) -> Option<Duration> {
self.cat_dev_sleep_override
}
#[must_use]
pub const fn set_cat_dev_sleep_override(mut self, duration: Option<Duration>) -> Self {
self.cat_dev_sleep_override = duration;
self
}
#[must_use]
pub const fn chunk_override(&self) -> Option<usize> {
self.chunk_override
}
#[must_use]
pub const fn set_chunk_override(mut self, new: Option<usize>) -> Self {
self.chunk_override = new;
self
}
#[must_use]
pub const fn control_port(&self) -> Option<u16> {
self.control_port
}
#[must_use]
pub const fn set_control_port(mut self, new: Option<u16>) -> Self {
self.control_port = new;
self
}
#[must_use]
pub const fn data_port(&self) -> Option<u16> {
self.data_port
}
#[must_use]
pub const fn set_data_port(mut self, new: Option<u16>) -> Self {
self.data_port = new;
self
}
#[must_use]
pub const fn fully_disable_cat_dev_sleep(&self) -> bool {
self.fully_disable_cat_dev_sleep
}
#[must_use]
pub const fn set_fully_disable_cat_dev_sleep(mut self, new: bool) -> Self {
self.fully_disable_cat_dev_sleep = new;
self
}
#[must_use]
pub const fn fully_disable_chunk_override(&self) -> bool {
self.fully_disable_chunk_override
}
#[must_use]
pub const fn set_fully_disable_chunk_override(mut self, new: bool) -> Self {
self.fully_disable_chunk_override = new;
self
}
#[must_use]
pub const fn host_filesystem(&self) -> &HostFilesystem {
&self.host_filesystem
}
#[must_use]
pub fn set_host_filesystem(mut self, new: HostFilesystem) -> Self {
self.host_filesystem = new;
self
}
#[must_use]
pub const fn mion_ip(&self) -> Ipv4Addr {
self.mion_ip
}
#[must_use]
pub const fn set_mion_ip(mut self, new: Ipv4Addr) -> Self {
self.mion_ip = new;
self
}
#[must_use]
pub const fn trace_during_debug(&self) -> bool {
self.trace_during_debug
}
#[must_use]
pub const fn set_trace_during_debug(mut self, new: bool) -> Self {
self.trace_during_debug = new;
self
}
pub async fn build(self) -> Result<TCPServer<SdioStreamState>, CatBridgeError> {
let control_port = self.control_port.unwrap_or(DEFAULT_SDIO_CONTROL_PORT);
let data_port = self.data_port.unwrap_or(DEFAULT_SDIO_BLOCK_PORT);
let mut router = Router::<SdioStreamState>::new();
router.add_route(
&[
u8::try_from(u16::from(SdioControlPacketType::Read)).unwrap_or(u8::MAX),
0,
],
read::handle_read_request,
)?;
router.add_route(
&[
u8::try_from(u16::from(SdioControlPacketType::TelnetMessage)).unwrap_or(u8::MAX),
0,
],
message::handle_telnet_message,
)?;
let mut control_server = TCPServer::new_with_state(
"sdio",
SocketAddr::V4(SocketAddrV4::new(self.mion_ip, control_port)),
router,
(None, None),
512_usize,
SdioStreamState::new(
self.active_hook,
self.boot_type,
self.chunk_override,
data_port,
self.host_filesystem,
if self.fully_disable_cat_dev_sleep {
None
} else if let Some(over_ride) = self.cat_dev_sleep_override {
Some(over_ride)
} else {
Some(DEFAULT_CAT_DEV_SLOWDOWN)
},
#[cfg(debug_assertions)]
self.trace_during_debug,
),
self.trace_during_debug,
)
.await?;
control_server.set_on_stream_begin(on_sdio_stream_begin)?;
control_server.set_on_stream_end(on_sdio_stream_end)?;
if self.trace_during_debug {
control_server.layer_initial_service(
ServiceBuilder::new()
.layer(RequestIDLayer::new("sdio".to_owned()))
.layer(StreamIDLayer),
);
} else {
control_server.layer_initial_service(
ServiceBuilder::new().layer(RequestIDLayer::new("sdio".to_owned())),
);
}
control_server.set_cat_dev_slowdown(control_server.state().cat_dev_slowdown);
control_server.set_chunk_output_at_size(if self.fully_disable_chunk_override {
None
} else if let Some(over_ride) = self.chunk_override {
Some(over_ride)
} else {
Some(DEFAULT_CAT_DEV_CHUNK_SIZE)
});
Ok(control_server)
}
}
const SDIO_SERVER_BUILDER_FIELDS: &[NamedField<'static>] = &[
NamedField::new("boot_type"),
NamedField::new("cat_dev_sleep_override"),
NamedField::new("chunk_override"),
NamedField::new("control_port"),
NamedField::new("data_port"),
NamedField::new("fully_disable_cat_dev_sleep"),
NamedField::new("fully_disable_chunk_override"),
NamedField::new("host_filesystem"),
NamedField::new("mion_ip"),
NamedField::new("trace_during_debug"),
];
impl Structable for SdioServerBuilder {
fn definition(&self) -> StructDef<'_> {
StructDef::new_static(
"SdioServerBuilder",
Fields::Named(SDIO_SERVER_BUILDER_FIELDS),
)
}
}
impl Valuable for SdioServerBuilder {
fn as_value(&self) -> Value<'_> {
Value::Structable(self)
}
fn visit(&self, visitor: &mut dyn Visit) {
visitor.visit_named_fields(&NamedValues::new(
SDIO_SERVER_BUILDER_FIELDS,
&[
Valuable::as_value(&self.boot_type),
Valuable::as_value(
&self
.cat_dev_sleep_override
.map_or_else(|| "<none>".to_owned(), |dur| format!("{}s", dur.as_secs())),
),
Valuable::as_value(&self.chunk_override),
Valuable::as_value(&self.control_port),
Valuable::as_value(&self.data_port),
Valuable::as_value(&self.fully_disable_cat_dev_sleep),
Valuable::as_value(&self.fully_disable_chunk_override),
Valuable::as_value(&self.host_filesystem),
Valuable::as_value(&format!("{}", self.mion_ip)),
Valuable::as_value(&self.trace_during_debug),
],
));
}
}
async fn on_sdio_stream_begin(
event: ResponseStreamEvent<SdioStreamState>,
) -> Result<bool, CatBridgeError> {
let mut addr = *event.source();
addr.set_port(event.state().data_port);
let stream = DataStream::connect(
addr,
event.state().cat_dev_slowdown,
event.state().chunk_size,
#[cfg(debug_assertions)]
event.state().trace_during_debug,
)
.await?;
let sid = event.stream_id();
_ = SDIO_DATA_STREAMS.insert_async(sid, stream).await;
_ = SDIO_PRINTF_BUFFS
.insert_async(
(sid, SdioControlTelnetChannel::SysConfigTool),
String::with_capacity(0),
)
.await;
Ok(true)
}
async fn on_sdio_stream_end(
event: ResponseStreamEvent<SdioStreamState>,
) -> Result<(), CatBridgeError> {
let sid = event.stream_id();
_ = SDIO_DATA_STREAMS.remove_async(&sid).await;
_ = SDIO_PRINTF_BUFFS
.remove_async(&(sid, SdioControlTelnetChannel::SysConfigTool))
.await;
Ok(())
}
#[derive(Clone, Debug)]
pub struct SdioStreamState {
active_hook: Option<Arc<Mutex<Option<OneshotSender<()>>>>>,
boot_type: MionBootType,
cat_dev_slowdown: Option<Duration>,
chunk_size: Option<usize>,
data_port: u16,
host_fs: HostFilesystem,
#[cfg(debug_assertions)]
trace_during_debug: bool,
}
impl SdioStreamState {
#[must_use]
pub fn new(
active_hook: Option<Arc<Mutex<Option<OneshotSender<()>>>>>,
boot_type: MionBootType,
chunk_size: Option<usize>,
data_port: u16,
host_fs: HostFilesystem,
cat_dev_sleep: Option<Duration>,
#[cfg(debug_assertions)] trace_during_debug: bool,
) -> Self {
Self {
active_hook,
boot_type,
chunk_size,
cat_dev_slowdown: cat_dev_sleep,
data_port,
host_fs,
#[cfg(debug_assertions)]
trace_during_debug,
}
}
}
impl FromRef<SdioStreamState> for HostFilesystem {
fn from_ref(input: &SdioStreamState) -> Self {
input.host_fs.clone()
}
}
impl FromRef<SdioStreamState> for MionBootType {
fn from_ref(input: &SdioStreamState) -> Self {
input.boot_type
}
}
const SDIO_STREAM_STATE_FIELDS: &[NamedField<'static>] = &[
NamedField::new("active_hook"),
NamedField::new("boot_type"),
NamedField::new("cat_dev_slowdown"),
NamedField::new("chunk_size"),
NamedField::new("data_port"),
NamedField::new("host_fs"),
#[cfg(debug_assertions)]
NamedField::new("trace_during_debug"),
];
impl Structable for SdioStreamState {
fn definition(&self) -> StructDef<'_> {
StructDef::new_static("SdioStreamState", Fields::Named(SDIO_STREAM_STATE_FIELDS))
}
}
impl Valuable for SdioStreamState {
fn as_value(&self) -> Value<'_> {
Value::Structable(self)
}
fn visit(&self, visitor: &mut dyn Visit) {
visitor.visit_named_fields(&NamedValues::new(
SDIO_STREAM_STATE_FIELDS,
&[
Valuable::as_value(&self.active_hook.is_some()),
Valuable::as_value(&self.boot_type),
Valuable::as_value(&if let Some(slowdown) = self.cat_dev_slowdown {
slowdown.as_secs()
} else {
0_u64
}),
Valuable::as_value(&self.chunk_size),
Valuable::as_value(&self.data_port),
Valuable::as_value(&self.host_fs),
#[cfg(debug_assertions)]
Valuable::as_value(&self.trace_during_debug),
],
));
}
}