1mod message;
8mod read;
9
10use crate::{
11 errors::CatBridgeError,
12 fsemul::{
13 HostFilesystem,
14 sdio::{
15 DEFAULT_SDIO_BLOCK_PORT, DEFAULT_SDIO_CONTROL_PORT, SDIO_DATA_STREAMS,
16 data_stream::DataStream,
17 proto::{SdioControlPacketType, message::SdioControlTelnetChannel},
18 },
19 },
20 mion::proto::control::MionBootType,
21 net::{
22 DEFAULT_CAT_DEV_CHUNK_SIZE, DEFAULT_CAT_DEV_SLOWDOWN,
23 additions::{RequestIDLayer, StreamIDLayer},
24 models::FromRef,
25 server::{Router, TCPServer, models::ResponseStreamEvent},
26 },
27};
28use scc::HashMap as ConcurrentHashMap;
29use std::{
30 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
31 sync::{Arc, LazyLock},
32 time::Duration,
33};
34use tokio::sync::{Mutex, oneshot::Sender as OneshotSender};
35use tower::ServiceBuilder;
36use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit};
37
38static SDIO_PRINTF_BUFFS: LazyLock<ConcurrentHashMap<(u64, SdioControlTelnetChannel), String>> =
47 LazyLock::new(|| ConcurrentHashMap::with_capacity(1));
48
49#[derive(Clone, Debug)]
57pub struct SdioServerBuilder {
58 active_hook: Option<Arc<Mutex<Option<OneshotSender<()>>>>>,
60 boot_type: MionBootType,
62 cat_dev_sleep_override: Option<Duration>,
68 chunk_override: Option<usize>,
74 control_port: Option<u16>,
76 data_port: Option<u16>,
78 fully_disable_cat_dev_sleep: bool,
80 fully_disable_chunk_override: bool,
82 host_filesystem: HostFilesystem,
84 mion_ip: Ipv4Addr,
86 trace_during_debug: bool,
88}
89
90impl SdioServerBuilder {
91 #[must_use]
92 pub const fn new(
93 boot_type: MionBootType,
94 host_filesystem: HostFilesystem,
95 mion_ip: Ipv4Addr,
96 ) -> Self {
97 Self {
98 active_hook: None,
99 boot_type,
100 cat_dev_sleep_override: None,
101 chunk_override: None,
102 control_port: None,
103 data_port: None,
104 host_filesystem,
105 fully_disable_cat_dev_sleep: false,
106 fully_disable_chunk_override: false,
107 mion_ip,
108 trace_during_debug: false,
109 }
110 }
111
112 #[must_use]
113 pub fn set_active_hook(mut self, hook: Option<OneshotSender<()>>) -> Self {
114 self.active_hook = hook.map(|h| Arc::new(Mutex::new(Some(h))));
115 self
116 }
117 #[must_use]
118 pub fn raw_set_active_hook(
119 mut self,
120 hook: Option<Arc<Mutex<Option<OneshotSender<()>>>>>,
121 ) -> Self {
122 self.active_hook = hook;
123 self
124 }
125
126 #[must_use]
127 pub const fn boot_type(&self) -> MionBootType {
128 self.boot_type
129 }
130 #[must_use]
131 pub const fn set_boot_type(mut self, new_type: MionBootType) -> Self {
132 self.boot_type = new_type;
133 self
134 }
135
136 #[must_use]
137 pub const fn cat_dev_sleep_override(&self) -> Option<Duration> {
138 self.cat_dev_sleep_override
139 }
140 #[must_use]
141 pub const fn set_cat_dev_sleep_override(mut self, duration: Option<Duration>) -> Self {
142 self.cat_dev_sleep_override = duration;
143 self
144 }
145
146 #[must_use]
147 pub const fn chunk_override(&self) -> Option<usize> {
148 self.chunk_override
149 }
150 #[must_use]
151 pub const fn set_chunk_override(mut self, new: Option<usize>) -> Self {
152 self.chunk_override = new;
153 self
154 }
155
156 #[must_use]
157 pub const fn control_port(&self) -> Option<u16> {
158 self.control_port
159 }
160 #[must_use]
161 pub const fn set_control_port(mut self, new: Option<u16>) -> Self {
162 self.control_port = new;
163 self
164 }
165
166 #[must_use]
167 pub const fn data_port(&self) -> Option<u16> {
168 self.data_port
169 }
170 #[must_use]
171 pub const fn set_data_port(mut self, new: Option<u16>) -> Self {
172 self.data_port = new;
173 self
174 }
175
176 #[must_use]
177 pub const fn fully_disable_cat_dev_sleep(&self) -> bool {
178 self.fully_disable_cat_dev_sleep
179 }
180 #[must_use]
181 pub const fn set_fully_disable_cat_dev_sleep(mut self, new: bool) -> Self {
182 self.fully_disable_cat_dev_sleep = new;
183 self
184 }
185
186 #[must_use]
187 pub const fn fully_disable_chunk_override(&self) -> bool {
188 self.fully_disable_chunk_override
189 }
190 #[must_use]
191 pub const fn set_fully_disable_chunk_override(mut self, new: bool) -> Self {
192 self.fully_disable_chunk_override = new;
193 self
194 }
195
196 #[must_use]
197 pub const fn host_filesystem(&self) -> &HostFilesystem {
198 &self.host_filesystem
199 }
200 #[must_use]
201 pub fn set_host_filesystem(mut self, new: HostFilesystem) -> Self {
202 self.host_filesystem = new;
203 self
204 }
205
206 #[must_use]
207 pub const fn mion_ip(&self) -> Ipv4Addr {
208 self.mion_ip
209 }
210 #[must_use]
211 pub const fn set_mion_ip(mut self, new: Ipv4Addr) -> Self {
212 self.mion_ip = new;
213 self
214 }
215
216 #[must_use]
217 pub const fn trace_during_debug(&self) -> bool {
218 self.trace_during_debug
219 }
220 #[must_use]
221 pub const fn set_trace_during_debug(mut self, new: bool) -> Self {
222 self.trace_during_debug = new;
223 self
224 }
225
226 pub async fn build(self) -> Result<TCPServer<SdioStreamState>, CatBridgeError> {
235 let control_port = self.control_port.unwrap_or(DEFAULT_SDIO_CONTROL_PORT);
236 let data_port = self.data_port.unwrap_or(DEFAULT_SDIO_BLOCK_PORT);
237
238 let mut router = Router::<SdioStreamState>::new();
239 router.add_route(
240 &[
241 u8::try_from(u16::from(SdioControlPacketType::Read)).unwrap_or(u8::MAX),
242 0,
243 ],
244 read::handle_read_request,
245 )?;
246 router.add_route(
247 &[
248 u8::try_from(u16::from(SdioControlPacketType::TelnetMessage)).unwrap_or(u8::MAX),
249 0,
250 ],
251 message::handle_telnet_message,
252 )?;
253
254 let mut control_server = TCPServer::new_with_state(
255 "sdio",
256 SocketAddr::V4(SocketAddrV4::new(self.mion_ip, control_port)),
257 router,
258 (None, None),
259 512_usize,
260 SdioStreamState::new(
261 self.active_hook,
262 self.boot_type,
263 self.chunk_override,
264 data_port,
265 self.host_filesystem,
266 if self.fully_disable_cat_dev_sleep {
267 None
268 } else if let Some(over_ride) = self.cat_dev_sleep_override {
269 Some(over_ride)
270 } else {
271 Some(DEFAULT_CAT_DEV_SLOWDOWN)
272 },
273 #[cfg(debug_assertions)]
274 self.trace_during_debug,
275 ),
276 self.trace_during_debug,
277 )
278 .await?;
279 control_server.set_on_stream_begin(on_sdio_stream_begin)?;
280 control_server.set_on_stream_end(on_sdio_stream_end)?;
281 if self.trace_during_debug {
282 control_server.layer_initial_service(
283 ServiceBuilder::new()
284 .layer(RequestIDLayer::new("sdio".to_owned()))
285 .layer(StreamIDLayer),
286 );
287 } else {
288 control_server.layer_initial_service(
289 ServiceBuilder::new().layer(RequestIDLayer::new("sdio".to_owned())),
290 );
291 }
292 control_server.set_cat_dev_slowdown(control_server.state().cat_dev_slowdown);
294 control_server.set_chunk_output_at_size(if self.fully_disable_chunk_override {
295 None
296 } else if let Some(over_ride) = self.chunk_override {
297 Some(over_ride)
298 } else {
299 Some(DEFAULT_CAT_DEV_CHUNK_SIZE)
300 });
301
302 Ok(control_server)
303 }
304}
305
306const SDIO_SERVER_BUILDER_FIELDS: &[NamedField<'static>] = &[
307 NamedField::new("boot_type"),
308 NamedField::new("cat_dev_sleep_override"),
309 NamedField::new("chunk_override"),
310 NamedField::new("control_port"),
311 NamedField::new("data_port"),
312 NamedField::new("fully_disable_cat_dev_sleep"),
313 NamedField::new("fully_disable_chunk_override"),
314 NamedField::new("host_filesystem"),
315 NamedField::new("mion_ip"),
316 NamedField::new("trace_during_debug"),
317];
318
319impl Structable for SdioServerBuilder {
320 fn definition(&self) -> StructDef<'_> {
321 StructDef::new_static(
322 "SdioServerBuilder",
323 Fields::Named(SDIO_SERVER_BUILDER_FIELDS),
324 )
325 }
326}
327
328impl Valuable for SdioServerBuilder {
329 fn as_value(&self) -> Value<'_> {
330 Value::Structable(self)
331 }
332
333 fn visit(&self, visitor: &mut dyn Visit) {
334 visitor.visit_named_fields(&NamedValues::new(
335 SDIO_SERVER_BUILDER_FIELDS,
336 &[
337 Valuable::as_value(&self.boot_type),
338 Valuable::as_value(
339 &self
340 .cat_dev_sleep_override
341 .map_or_else(|| "<none>".to_owned(), |dur| format!("{}s", dur.as_secs())),
342 ),
343 Valuable::as_value(&self.chunk_override),
344 Valuable::as_value(&self.control_port),
345 Valuable::as_value(&self.data_port),
346 Valuable::as_value(&self.fully_disable_cat_dev_sleep),
347 Valuable::as_value(&self.fully_disable_chunk_override),
348 Valuable::as_value(&self.host_filesystem),
349 Valuable::as_value(&format!("{}", self.mion_ip)),
350 Valuable::as_value(&self.trace_during_debug),
351 ],
352 ));
353 }
354}
355
356async fn on_sdio_stream_begin(
370 event: ResponseStreamEvent<SdioStreamState>,
371) -> Result<bool, CatBridgeError> {
372 let mut addr = *event.source();
373 addr.set_port(event.state().data_port);
374 let stream = DataStream::connect(
375 addr,
376 event.state().cat_dev_slowdown,
377 event.state().chunk_size,
378 #[cfg(debug_assertions)]
379 event.state().trace_during_debug,
380 )
381 .await?;
382 let sid = event.stream_id();
383
384 _ = SDIO_DATA_STREAMS.insert_async(sid, stream).await;
385 _ = SDIO_PRINTF_BUFFS
386 .insert_async(
387 (sid, SdioControlTelnetChannel::SysConfigTool),
388 String::with_capacity(0),
389 )
390 .await;
391
392 Ok(true)
393}
394
395async fn on_sdio_stream_end(
406 event: ResponseStreamEvent<SdioStreamState>,
407) -> Result<(), CatBridgeError> {
408 let sid = event.stream_id();
409
410 _ = SDIO_DATA_STREAMS.remove_async(&sid).await;
411 _ = SDIO_PRINTF_BUFFS
412 .remove_async(&(sid, SdioControlTelnetChannel::SysConfigTool))
413 .await;
414
415 Ok(())
416}
417
418#[derive(Clone, Debug)]
424pub struct SdioStreamState {
425 active_hook: Option<Arc<Mutex<Option<OneshotSender<()>>>>>,
427 boot_type: MionBootType,
429 cat_dev_slowdown: Option<Duration>,
431 chunk_size: Option<usize>,
433 data_port: u16,
435 host_fs: HostFilesystem,
437 #[cfg(debug_assertions)]
439 trace_during_debug: bool,
440}
441
442impl SdioStreamState {
443 #[must_use]
444 pub fn new(
445 active_hook: Option<Arc<Mutex<Option<OneshotSender<()>>>>>,
446 boot_type: MionBootType,
447 chunk_size: Option<usize>,
448 data_port: u16,
449 host_fs: HostFilesystem,
450 cat_dev_sleep: Option<Duration>,
451 #[cfg(debug_assertions)] trace_during_debug: bool,
452 ) -> Self {
453 Self {
454 active_hook,
455 boot_type,
456 chunk_size,
457 cat_dev_slowdown: cat_dev_sleep,
458 data_port,
459 host_fs,
460 #[cfg(debug_assertions)]
461 trace_during_debug,
462 }
463 }
464}
465
466impl FromRef<SdioStreamState> for HostFilesystem {
467 fn from_ref(input: &SdioStreamState) -> Self {
468 input.host_fs.clone()
469 }
470}
471
472impl FromRef<SdioStreamState> for MionBootType {
473 fn from_ref(input: &SdioStreamState) -> Self {
474 input.boot_type
475 }
476}
477
478const SDIO_STREAM_STATE_FIELDS: &[NamedField<'static>] = &[
479 NamedField::new("active_hook"),
480 NamedField::new("boot_type"),
481 NamedField::new("cat_dev_slowdown"),
482 NamedField::new("chunk_size"),
483 NamedField::new("data_port"),
484 NamedField::new("host_fs"),
485 #[cfg(debug_assertions)]
486 NamedField::new("trace_during_debug"),
487];
488
489impl Structable for SdioStreamState {
490 fn definition(&self) -> StructDef<'_> {
491 StructDef::new_static("SdioStreamState", Fields::Named(SDIO_STREAM_STATE_FIELDS))
492 }
493}
494
495impl Valuable for SdioStreamState {
496 fn as_value(&self) -> Value<'_> {
497 Value::Structable(self)
498 }
499
500 fn visit(&self, visitor: &mut dyn Visit) {
501 visitor.visit_named_fields(&NamedValues::new(
502 SDIO_STREAM_STATE_FIELDS,
503 &[
504 Valuable::as_value(&self.active_hook.is_some()),
505 Valuable::as_value(&self.boot_type),
506 Valuable::as_value(&if let Some(slowdown) = self.cat_dev_slowdown {
507 slowdown.as_secs()
508 } else {
509 0_u64
510 }),
511 Valuable::as_value(&self.chunk_size),
512 Valuable::as_value(&self.data_port),
513 Valuable::as_value(&self.host_fs),
514 #[cfg(debug_assertions)]
515 Valuable::as_value(&self.trace_during_debug),
516 ],
517 ));
518 }
519}