1mod message;
8mod read;
9
10use crate::{
11 errors::CatBridgeError,
12 fsemul::{
13 filesystem::host::HostFilesystem,
14 sdio::{
15 DEFAULT_SDIO_BLOCK_PORT, DEFAULT_SDIO_CONTROL_PORT, SDIO_DATA_STREAMS,
16 data_stream::DataStream,
17 proto::{SdioControlPacketType, message::SdioControlTelnetChannel},
18 },
19 },
20 mion::proto::control::MionBootType,
21 net::{
22 DEFAULT_CAT_DEV_CHUNK_SIZE, DEFAULT_CAT_DEV_SLOWDOWN,
23 additions::{RequestIDLayer, StreamIDLayer},
24 models::FromRef,
25 server::{Router, TCPServer, models::ResponseStreamEvent},
26 },
27};
28use scc::HashMap as ConcurrentHashMap;
29use std::{
30 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
31 sync::{Arc, LazyLock},
32 time::Duration,
33};
34use tokio::sync::{Mutex, mpsc::Sender as BoundedSender, oneshot::Sender as OneshotSender};
35use tower::ServiceBuilder;
36use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit};
37
38static SDIO_PRINTF_BUFFS: LazyLock<ConcurrentHashMap<(u64, SdioControlTelnetChannel), String>> =
47 LazyLock::new(|| ConcurrentHashMap::with_capacity(1));
48
49#[derive(Clone, Debug)]
57pub struct SdioServerBuilder {
58 active_hook: Option<Arc<Mutex<Option<OneshotSender<()>>>>>,
60 boot_type: MionBootType,
62 cat_dev_sleep_override: Option<Duration>,
68 chunk_override: Option<usize>,
74 control_port: Option<u16>,
76 data_port: Option<u16>,
78 fully_disable_cat_dev_sleep: bool,
80 fully_disable_chunk_override: bool,
82 host_filesystem: Option<HostFilesystem>,
84 mion_ip: Ipv4Addr,
86 sdio_telnet_message_hook: Option<BoundedSender<(SdioControlTelnetChannel, String)>>,
88 trace_during_debug: bool,
90}
91
92impl SdioServerBuilder {
93 #[must_use]
94 pub const fn new(
95 boot_type: MionBootType,
96 host_filesystem: Option<HostFilesystem>,
97 mion_ip: Ipv4Addr,
98 ) -> Self {
99 Self {
100 active_hook: None,
101 boot_type,
102 cat_dev_sleep_override: None,
103 chunk_override: None,
104 control_port: None,
105 data_port: None,
106 host_filesystem,
107 fully_disable_cat_dev_sleep: false,
108 fully_disable_chunk_override: false,
109 mion_ip,
110 sdio_telnet_message_hook: None,
111 trace_during_debug: false,
112 }
113 }
114
115 #[must_use]
116 pub fn set_active_hook(mut self, hook: Option<OneshotSender<()>>) -> Self {
117 self.active_hook = hook.map(|h| Arc::new(Mutex::new(Some(h))));
118 self
119 }
120 #[must_use]
121 pub fn raw_set_active_hook(
122 mut self,
123 hook: Option<Arc<Mutex<Option<OneshotSender<()>>>>>,
124 ) -> Self {
125 self.active_hook = hook;
126 self
127 }
128
129 #[must_use]
130 pub const fn boot_type(&self) -> MionBootType {
131 self.boot_type
132 }
133 #[must_use]
134 pub const fn set_boot_type(mut self, new_type: MionBootType) -> Self {
135 self.boot_type = new_type;
136 self
137 }
138
139 #[must_use]
140 pub const fn cat_dev_sleep_override(&self) -> Option<Duration> {
141 self.cat_dev_sleep_override
142 }
143 #[must_use]
144 pub const fn set_cat_dev_sleep_override(mut self, duration: Option<Duration>) -> Self {
145 self.cat_dev_sleep_override = duration;
146 self
147 }
148
149 #[must_use]
150 pub const fn chunk_override(&self) -> Option<usize> {
151 self.chunk_override
152 }
153 #[must_use]
154 pub const fn set_chunk_override(mut self, new: Option<usize>) -> Self {
155 self.chunk_override = new;
156 self
157 }
158
159 #[must_use]
160 pub const fn control_port(&self) -> Option<u16> {
161 self.control_port
162 }
163 #[must_use]
164 pub const fn set_control_port(mut self, new: Option<u16>) -> Self {
165 self.control_port = new;
166 self
167 }
168
169 #[must_use]
170 pub const fn data_port(&self) -> Option<u16> {
171 self.data_port
172 }
173 #[must_use]
174 pub const fn set_data_port(mut self, new: Option<u16>) -> Self {
175 self.data_port = new;
176 self
177 }
178
179 #[must_use]
180 pub const fn fully_disable_cat_dev_sleep(&self) -> bool {
181 self.fully_disable_cat_dev_sleep
182 }
183 #[must_use]
184 pub const fn set_fully_disable_cat_dev_sleep(mut self, new: bool) -> Self {
185 self.fully_disable_cat_dev_sleep = new;
186 self
187 }
188
189 #[must_use]
190 pub const fn fully_disable_chunk_override(&self) -> bool {
191 self.fully_disable_chunk_override
192 }
193 #[must_use]
194 pub const fn set_fully_disable_chunk_override(mut self, new: bool) -> Self {
195 self.fully_disable_chunk_override = new;
196 self
197 }
198
199 #[must_use]
200 pub const fn host_filesystem(&self) -> Option<&HostFilesystem> {
201 self.host_filesystem.as_ref()
202 }
203 #[must_use]
204 pub fn set_host_filesystem(mut self, new: Option<HostFilesystem>) -> Self {
205 self.host_filesystem = new;
206 self
207 }
208
209 #[must_use]
210 pub const fn mion_ip(&self) -> Ipv4Addr {
211 self.mion_ip
212 }
213 #[must_use]
214 pub const fn set_mion_ip(mut self, new: Ipv4Addr) -> Self {
215 self.mion_ip = new;
216 self
217 }
218
219 #[must_use]
220 pub const fn sdio_telnet_message_hook(
221 &self,
222 ) -> Option<&BoundedSender<(SdioControlTelnetChannel, String)>> {
223 self.sdio_telnet_message_hook.as_ref()
224 }
225 #[must_use]
226 pub fn set_sdio_telnet_message_hook(
227 mut self,
228 new: Option<BoundedSender<(SdioControlTelnetChannel, String)>>,
229 ) -> Self {
230 self.sdio_telnet_message_hook = new;
231 self
232 }
233
234 #[must_use]
235 pub const fn trace_during_debug(&self) -> bool {
236 self.trace_during_debug
237 }
238 #[must_use]
239 pub const fn set_trace_during_debug(mut self, new: bool) -> Self {
240 self.trace_during_debug = new;
241 self
242 }
243
244 pub async fn build(self) -> Result<TCPServer<SdioStreamState>, CatBridgeError> {
253 let control_port = self.control_port.unwrap_or(DEFAULT_SDIO_CONTROL_PORT);
254 let data_port = self.data_port.unwrap_or(DEFAULT_SDIO_BLOCK_PORT);
255
256 let mut router = Router::<SdioStreamState>::new();
257 router.add_route(
258 &[
259 u8::try_from(u16::from(SdioControlPacketType::Read)).unwrap_or(u8::MAX),
260 0,
261 ],
262 read::handle_read_request,
263 )?;
264 router.add_route(
265 &[
266 u8::try_from(u16::from(SdioControlPacketType::TelnetMessage)).unwrap_or(u8::MAX),
267 0,
268 ],
269 message::handle_telnet_message,
270 )?;
271
272 let mut control_server = TCPServer::new_with_state(
273 "sdio",
274 SocketAddr::V4(SocketAddrV4::new(self.mion_ip, control_port)),
275 router,
276 (None, None),
277 512_usize,
278 SdioStreamState::new(
279 self.active_hook,
280 self.boot_type,
281 self.chunk_override,
282 data_port,
283 self.host_filesystem,
284 if self.fully_disable_cat_dev_sleep {
285 None
286 } else if let Some(over_ride) = self.cat_dev_sleep_override {
287 Some(over_ride)
288 } else {
289 Some(DEFAULT_CAT_DEV_SLOWDOWN)
290 },
291 self.sdio_telnet_message_hook,
292 #[cfg(debug_assertions)]
293 self.trace_during_debug,
294 ),
295 self.trace_during_debug,
296 )
297 .await?;
298 control_server.set_on_stream_begin(on_sdio_stream_begin)?;
299 control_server.set_on_stream_end(on_sdio_stream_end)?;
300 if self.trace_during_debug {
301 control_server.layer_initial_service(
302 ServiceBuilder::new()
303 .layer(RequestIDLayer::new("sdio".to_owned()))
304 .layer(StreamIDLayer),
305 );
306 } else {
307 control_server.layer_initial_service(
308 ServiceBuilder::new().layer(RequestIDLayer::new("sdio".to_owned())),
309 );
310 }
311 control_server.set_cat_dev_slowdown(control_server.state().cat_dev_slowdown);
313 control_server.set_chunk_output_at_size(if self.fully_disable_chunk_override {
314 None
315 } else if let Some(over_ride) = self.chunk_override {
316 Some(over_ride)
317 } else {
318 Some(DEFAULT_CAT_DEV_CHUNK_SIZE)
319 });
320
321 Ok(control_server)
322 }
323}
324
325const SDIO_SERVER_BUILDER_FIELDS: &[NamedField<'static>] = &[
326 NamedField::new("boot_type"),
327 NamedField::new("cat_dev_sleep_override"),
328 NamedField::new("chunk_override"),
329 NamedField::new("control_port"),
330 NamedField::new("data_port"),
331 NamedField::new("fully_disable_cat_dev_sleep"),
332 NamedField::new("fully_disable_chunk_override"),
333 NamedField::new("host_filesystem"),
334 NamedField::new("mion_ip"),
335 NamedField::new("trace_during_debug"),
336];
337
338impl Structable for SdioServerBuilder {
339 fn definition(&self) -> StructDef<'_> {
340 StructDef::new_static(
341 "SdioServerBuilder",
342 Fields::Named(SDIO_SERVER_BUILDER_FIELDS),
343 )
344 }
345}
346
347impl Valuable for SdioServerBuilder {
348 fn as_value(&self) -> Value<'_> {
349 Value::Structable(self)
350 }
351
352 fn visit(&self, visitor: &mut dyn Visit) {
353 visitor.visit_named_fields(&NamedValues::new(
354 SDIO_SERVER_BUILDER_FIELDS,
355 &[
356 Valuable::as_value(&self.boot_type),
357 Valuable::as_value(
358 &self
359 .cat_dev_sleep_override
360 .map_or_else(|| "<none>".to_owned(), |dur| format!("{}s", dur.as_secs())),
361 ),
362 Valuable::as_value(&self.chunk_override),
363 Valuable::as_value(&self.control_port),
364 Valuable::as_value(&self.data_port),
365 Valuable::as_value(&self.fully_disable_cat_dev_sleep),
366 Valuable::as_value(&self.fully_disable_chunk_override),
367 Valuable::as_value(&self.host_filesystem),
368 Valuable::as_value(&format!("{}", self.mion_ip)),
369 Valuable::as_value(&self.trace_during_debug),
370 ],
371 ));
372 }
373}
374
375async fn on_sdio_stream_begin(
389 event: ResponseStreamEvent<SdioStreamState>,
390) -> Result<bool, CatBridgeError> {
391 let mut addr = *event.source();
392 addr.set_port(event.state().data_port);
393 let stream = DataStream::connect(
394 addr,
395 event.state().cat_dev_slowdown,
396 event.state().chunk_size,
397 #[cfg(debug_assertions)]
398 event.state().trace_during_debug,
399 )
400 .await?;
401 let sid = event.stream_id();
402
403 _ = SDIO_DATA_STREAMS.insert_async(sid, stream).await;
404 _ = SDIO_PRINTF_BUFFS
405 .insert_async(
406 (sid, SdioControlTelnetChannel::SysConfigTool),
407 String::with_capacity(0),
408 )
409 .await;
410
411 Ok(true)
412}
413
414async fn on_sdio_stream_end(
425 event: ResponseStreamEvent<SdioStreamState>,
426) -> Result<(), CatBridgeError> {
427 let sid = event.stream_id();
428
429 _ = SDIO_DATA_STREAMS.remove_async(&sid).await;
430 _ = SDIO_PRINTF_BUFFS
431 .remove_async(&(sid, SdioControlTelnetChannel::SysConfigTool))
432 .await;
433
434 Ok(())
435}
436
437#[derive(Clone, Debug)]
443pub struct SdioStreamState {
444 active_hook: Option<Arc<Mutex<Option<OneshotSender<()>>>>>,
446 boot_type: MionBootType,
448 cat_dev_slowdown: Option<Duration>,
450 chunk_size: Option<usize>,
452 data_port: u16,
454 host_fs: Option<HostFilesystem>,
456 sdio_telnet_message_hook: Option<BoundedSender<(SdioControlTelnetChannel, String)>>,
459 #[cfg(debug_assertions)]
461 trace_during_debug: bool,
462}
463
464impl SdioStreamState {
465 #[allow(clippy::too_many_arguments)]
466 #[must_use]
467 pub fn new(
468 active_hook: Option<Arc<Mutex<Option<OneshotSender<()>>>>>,
469 boot_type: MionBootType,
470 chunk_size: Option<usize>,
471 data_port: u16,
472 host_fs: Option<HostFilesystem>,
473 cat_dev_sleep: Option<Duration>,
474 sdio_telnet_message_hook: Option<BoundedSender<(SdioControlTelnetChannel, String)>>,
475 #[cfg(debug_assertions)] trace_during_debug: bool,
476 ) -> Self {
477 Self {
478 active_hook,
479 boot_type,
480 chunk_size,
481 cat_dev_slowdown: cat_dev_sleep,
482 data_port,
483 host_fs,
484 sdio_telnet_message_hook,
485 #[cfg(debug_assertions)]
486 trace_during_debug,
487 }
488 }
489}
490
491impl FromRef<SdioStreamState> for Option<HostFilesystem> {
492 fn from_ref(input: &SdioStreamState) -> Self {
493 input.host_fs.clone()
494 }
495}
496
497impl FromRef<SdioStreamState> for MionBootType {
498 fn from_ref(input: &SdioStreamState) -> Self {
499 input.boot_type
500 }
501}
502
503const SDIO_STREAM_STATE_FIELDS: &[NamedField<'static>] = &[
504 NamedField::new("active_hook"),
505 NamedField::new("boot_type"),
506 NamedField::new("cat_dev_slowdown"),
507 NamedField::new("chunk_size"),
508 NamedField::new("data_port"),
509 NamedField::new("host_fs"),
510 NamedField::new("sdio_telnet_message_hook"),
511 #[cfg(debug_assertions)]
512 NamedField::new("trace_during_debug"),
513];
514
515impl Structable for SdioStreamState {
516 fn definition(&self) -> StructDef<'_> {
517 StructDef::new_static("SdioStreamState", Fields::Named(SDIO_STREAM_STATE_FIELDS))
518 }
519}
520
521impl Valuable for SdioStreamState {
522 fn as_value(&self) -> Value<'_> {
523 Value::Structable(self)
524 }
525
526 fn visit(&self, visitor: &mut dyn Visit) {
527 visitor.visit_named_fields(&NamedValues::new(
528 SDIO_STREAM_STATE_FIELDS,
529 &[
530 Valuable::as_value(&self.active_hook.is_some()),
531 Valuable::as_value(&self.boot_type),
532 Valuable::as_value(&if let Some(slowdown) = self.cat_dev_slowdown {
533 slowdown.as_secs()
534 } else {
535 0_u64
536 }),
537 Valuable::as_value(&self.chunk_size),
538 Valuable::as_value(&self.data_port),
539 Valuable::as_value(&self.host_fs),
540 Valuable::as_value(&self.sdio_telnet_message_hook.is_some()),
541 #[cfg(debug_assertions)]
542 Valuable::as_value(&self.trace_during_debug),
543 ],
544 ));
545 }
546}