1pub mod errors;
13pub mod proto;
14mod reads;
15
16use crate::{
17 errors::{CatBridgeError, NetworkError},
18 fsemul::{
19 sdio::{
20 proto::{
21 ChunkSDIOControlCodec, SdioControlMessage, SdioControlMessageRequest,
22 SdioControlPacketType, SdioControlReadRequest, SdioControlWriteRequest,
23 },
24 reads::serve_read_request,
25 },
26 HostFilesystem,
27 },
28};
29use bytes::Bytes;
30use futures::{stream::SplitSink, SinkExt, StreamExt};
31use std::{net::Ipv4Addr, time::Duration};
32use tokio::{
33 io::AsyncWriteExt,
34 net::{tcp::OwnedWriteHalf, TcpStream},
35 sync::{
36 mpsc::{channel, Sender},
37 Mutex,
38 },
39 task::Builder as TaskBuilder,
40 time::{sleep, timeout},
41};
42use tokio_util::codec::Framed;
43use tracing::{error, info, trace, warn};
44
45pub const DEFAULT_SDIO_CONTROL_PORT: u16 = 7975;
51pub const DEFAULT_SDIO_BLOCK_PORT: u16 = 7976;
57
58pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
60
61const SDIO_TCP_PACKET_BUFFER_SIZE: usize = 8192_usize;
66
67static SDIO_DATA_LOCK: Mutex<()> = Mutex::const_new(());
73
74#[derive(Debug)]
86pub struct SdioClient<'fs> {
87 control_port: u16,
88 control_stream: TcpStream,
89 data_port: u16,
90 data_stream: TcpStream,
91 host_ip: Ipv4Addr,
92 host_filesystem: &'fs HostFilesystem,
93 no_load_bearing_sleep: bool,
94 printf_control_buff: String,
95}
96impl<'fs> SdioClient<'fs> {
97 pub async fn connect(
106 mion_ip: Ipv4Addr,
107 control_port: Option<u16>,
108 data_port: Option<u16>,
109 connect_timeout: Option<Duration>,
110 host_filesystem: &'fs HostFilesystem,
111 no_load_bearing_sleep: bool,
112 ) -> Result<Self, CatBridgeError> {
113 let control_port = control_port.unwrap_or(DEFAULT_SDIO_CONTROL_PORT);
114 let data_port = data_port.unwrap_or(DEFAULT_SDIO_BLOCK_PORT);
115 let timeout_duration = connect_timeout.unwrap_or(CONNECT_TIMEOUT);
116
117 let control_stream_future = timeout(
120 timeout_duration,
121 TcpStream::connect((mion_ip, control_port)),
122 );
123 let data_stream_future =
124 timeout(timeout_duration, TcpStream::connect((mion_ip, data_port)));
125
126 if no_load_bearing_sleep {
127 warn!(
128 bridge.ip = %mion_ip,
129 bridge.control_port = control_port,
130 bridge.data_port = data_port,
131 "You have disabled our LOAD-BEARING SLEEP for our SDIO connection, if talking to a REAL MION, THIS WILL CAUSE ERRORS.",
132 );
133 }
134
135 Ok(Self {
136 control_port,
137 control_stream: control_stream_future
138 .await
139 .map_err(|_| NetworkError::Timeout(timeout_duration))?
140 .map_err(NetworkError::IO)?,
141 data_port,
142 data_stream: data_stream_future
143 .await
144 .map_err(|_| NetworkError::Timeout(timeout_duration))?
145 .map_err(NetworkError::IO)?,
146 host_ip: mion_ip,
147 host_filesystem,
148 no_load_bearing_sleep,
149 printf_control_buff: String::with_capacity(0),
150 })
151 }
152
153 pub async fn serve(self) -> Result<(), CatBridgeError> {
165 let mut printf_buff = self.printf_control_buff;
166 let (sink, mut stream) = Framed::new(self.control_stream, ChunkSDIOControlCodec).split();
167 self.data_stream
168 .set_nodelay(true)
169 .map_err(NetworkError::IO)?;
170 let (_data_stream, data_sink) = self.data_stream.into_split();
171
172 let _control_sender = Self::spawn_control_write_task(sink)?;
173 let data_sender = Self::spawn_data_write_task(self.no_load_bearing_sleep, data_sink)?;
174
175 while let Some(packet_result) = stream.next().await {
176 let packet = match packet_result {
177 Ok(packet) => packet.freeze(),
178 Err(cause) => return Err(NetworkError::IO(cause).into()),
179 };
180 if packet.is_empty() {
182 break;
183 }
184
185 match SdioControlPacketType::try_from(packet[0])? {
186 SdioControlPacketType::Message => {
187 let message_request =
188 SdioControlMessageRequest::try_from(packet).map_err(NetworkError::Parse)?;
189 for message in message_request.messages_owned() {
190 match message {
191 SdioControlMessage::Printf(to_print) => {
192 printf_buff.push_str(&to_print);
193 }
194 }
195 }
196
197 printf_buff = Self::process_log_messages(
198 printf_buff,
199 self.host_ip,
200 self.control_port,
201 self.data_port,
202 );
203 }
204 SdioControlPacketType::Read => {
205 let read_request = SdioControlReadRequest::try_from(packet)?;
206 let guard = SDIO_DATA_LOCK.lock().await;
207 serve_read_request(self.host_filesystem, &read_request, &data_sender).await?;
208 std::mem::drop(guard);
209 }
210 SdioControlPacketType::Write => {
211 let _write_request = SdioControlWriteRequest::try_from(packet)?;
212 }
213 SdioControlPacketType::StartBlockChannel => {
214 info!(
215 "Got request to start PCFS Block Channel, but we've already started it..."
216 );
217 }
218 SdioControlPacketType::StartControlListeningChannel => {
219 info!("Got request to start CTRL Character Channel, but we've already started it...");
220 }
221 }
222 }
223
224 Ok(())
225 }
226
227 fn process_log_messages(
228 mut printf_buff: String,
229 host_ip: Ipv4Addr,
230 control_port: u16,
231 data_port: u16,
232 ) -> String {
233 let mut used_one = false;
234
235 loop {
236 while let Some(line_ending) = printf_buff.find('\n') {
237 used_one = true;
238 let remaining = printf_buff.split_off(line_ending + 1);
239 let actual_line: String = printf_buff;
240 printf_buff = remaining;
241
242 if !actual_line.trim().is_empty() {
244 info!(
245 sdio.host_ip = %host_ip,
246 sdio.host_control_port = control_port,
247 sdio.host_data_port = data_port,
248 sdio.data.printf = %actual_line.trim(),
249 "Received SDIO message.",
250 );
251 }
252 }
253 while let Some(line_ending) = printf_buff.find('\r') {
254 used_one = true;
255 let remaining = printf_buff.split_off(line_ending + 1);
256 let actual_line: String = printf_buff;
257 printf_buff = remaining;
258
259 if !actual_line.trim().is_empty() {
261 info!(
262 sdio.host_ip = %host_ip,
263 sdio.host_control_port = control_port,
264 sdio.host_data_port = data_port,
265 sdio.data.printf = %actual_line.trim(),
266 "Received SDIO message.",
267 );
268 }
269 }
270
271 if !used_one {
272 break;
273 }
274 used_one = false;
275 }
276
277 printf_buff
278 }
279
280 fn spawn_control_write_task(
282 mut sink: SplitSink<Framed<TcpStream, ChunkSDIOControlCodec>, Bytes>,
283 ) -> Result<Sender<Bytes>, CatBridgeError> {
284 let (sender, mut receiver) = channel::<Bytes>(SDIO_TCP_PACKET_BUFFER_SIZE);
285
286 TaskBuilder::new()
287 .name("cat_dev::fsemul::sdio_control::write_task")
288 .spawn(async move {
289 while let Some(packet) = receiver.recv().await {
290 if let Err(cause) = sink.send(packet).await {
291 error!(
292 ?cause,
293 "Failed to send packet over SDIO Control, error in write channel, shutting down",
294 );
295 break;
296 }
297 }
298 })
299 .map_err(CatBridgeError::SpawnFailure)?;
300
301 Ok(sender)
302 }
303
304 fn spawn_data_write_task(
306 disable_load_bearing_sleep: bool,
307 mut sink: OwnedWriteHalf,
308 ) -> Result<Sender<Bytes>, CatBridgeError> {
309 let (sender, mut receiver) = channel::<Bytes>(SDIO_TCP_PACKET_BUFFER_SIZE);
310
311 TaskBuilder::new()
312 .name("cat_dev::fsemul::sdio_data::write_task")
313 .spawn(async move {
314 while let Some(packet) = receiver.recv().await {
315 if let Err(cause) = sink.write(&packet).await {
316 error!(
317 ?cause,
318 "Failed to send packet over SDIO Data, error in write channel, shutting down",
319 );
320 break;
321 }
322
323 if !disable_load_bearing_sleep {
324 trace!("sleeping to work around MION SDIO buffer-bug...");
339 sleep(Duration::from_millis(25)).await;
340 }
341 }
342 })
343 .map_err(CatBridgeError::SpawnFailure)?;
344
345 Ok(sender)
346 }
347}
348
349impl SdioClient<'static> {
350 pub async fn serve_concurrently(self) -> Result<(), CatBridgeError> {
368 let mut printf_buff = self.printf_control_buff;
369 let (sink, mut stream) = Framed::new(self.control_stream, ChunkSDIOControlCodec).split();
370 let (_data_stream, data_sink) = self.data_stream.into_split();
371
372 let _control_sender = Self::spawn_control_write_task(sink)?;
373 let data_sender = Self::spawn_data_write_task(self.no_load_bearing_sleep, data_sink)?;
374
375 while let Some(packet_result) = stream.next().await {
376 let packet = match packet_result {
377 Ok(packet) => packet.freeze(),
378 Err(cause) => return Err(NetworkError::IO(cause).into()),
379 };
380 if packet.is_empty() {
382 break;
383 }
384
385 match SdioControlPacketType::try_from(packet[0])? {
386 SdioControlPacketType::Message => {
387 let message_request = SdioControlMessageRequest::try_from(packet)?;
388 for message in message_request.messages_owned() {
389 match message {
390 SdioControlMessage::Printf(to_print) => {
391 printf_buff.push_str(&to_print);
392 }
393 }
394 }
395
396 printf_buff = Self::process_log_messages(
397 printf_buff,
398 self.host_ip,
399 self.control_port,
400 self.data_port,
401 );
402 }
403 SdioControlPacketType::Read => {
404 let read_request = SdioControlReadRequest::try_from(packet)?;
405
406 let host_fs: &'static HostFilesystem = self.host_filesystem;
407 let cloned_sender = data_sender.clone();
408 TaskBuilder::new()
409 .name("cat_dev::fsemul::sdio::serve_read_concurrently")
410 .spawn(async move {
411 let guard = SDIO_DATA_LOCK.lock().await;
412 if let Err(cause) =
413 serve_read_request(host_fs, &read_request, &cloned_sender).await
414 {
415 error!(?cause, "Failed to respond to read request, ignoring!");
416 }
417 std::mem::drop(guard);
418 })
419 .map_err(CatBridgeError::SpawnFailure)?;
420 }
421 SdioControlPacketType::Write => {
422 let _write_request = SdioControlWriteRequest::try_from(packet)?;
423 }
424 SdioControlPacketType::StartBlockChannel => {
425 info!(
426 "Got request to start PCFS Block Channel, but we've already started it..."
427 );
428 }
429 SdioControlPacketType::StartControlListeningChannel => {
430 info!("Got request to start CTRL Character Channel, but we've already started it...");
431 }
432 }
433 }
434
435 Ok(())
436 }
437}