1#![cfg_attr(not(feature = "std"), no_std)]
2
3extern crate alloc;
4
5mod protocol;
6
7use alloc::format;
8use core::fmt;
9
10#[cfg(feature = "std")]
11use cu_linux_resources::LinuxSerialPort;
12use cu_sensor_payloads::PointCloudSoa;
13use cu29::prelude::*;
14use cu29::resource::{Owned, ResourceBindingMap, ResourceBindings, ResourceManager};
15use embedded_io::{ErrorKind, ErrorType, Read, Write};
16
17pub use protocol::{MAX_FRAME_BYTES, MAX_POINTS};
18
19const SERIAL_BUFFER_BYTES: usize = protocol::MAX_FRAME_BYTES * 2;
20const DEFAULT_MIN_RANGE_M: f32 = 0.05;
21const DEFAULT_ROW_ID: u8 = 0;
22const DEFAULT_START_COLUMN: u8 = 1;
23const DEFAULT_END_COLUMN: u8 = 64;
24
25#[derive(Clone, Copy, Debug, PartialEq)]
26pub struct Sen0682ReadoutConfig {
27 pub configure_device: bool,
28 pub row_id: u8,
29 pub start_column: u8,
30 pub end_column: u8,
31}
32
33impl Sen0682ReadoutConfig {
34 fn from_component_config(config: Option<&ComponentConfig>) -> CuResult<Self> {
35 let configure_device = cfg_bool(config, "configure_device", true)?;
36 let row_id = cfg_u8(config, "row_id", DEFAULT_ROW_ID)?;
37 let start_column = cfg_u8(config, "start_column", DEFAULT_START_COLUMN)?;
38 let end_column = cfg_u8(config, "end_column", DEFAULT_END_COLUMN)?;
39
40 if row_id > 8 {
41 return Err(CuError::from(format!(
42 "sen0682 row_id must be between 0 and 8, got {row_id}"
43 )));
44 }
45 if !(1..=64).contains(&start_column) {
46 return Err(CuError::from(format!(
47 "sen0682 start_column must be between 1 and 64, got {start_column}"
48 )));
49 }
50 if !(1..=64).contains(&end_column) {
51 return Err(CuError::from(format!(
52 "sen0682 end_column must be between 1 and 64, got {end_column}"
53 )));
54 }
55 if start_column > end_column {
56 return Err(CuError::from(format!(
57 "sen0682 start_column ({start_column}) must be <= end_column ({end_column})"
58 )));
59 }
60
61 Ok(Self {
62 configure_device,
63 row_id,
64 start_column,
65 end_column,
66 })
67 }
68}
69
70fn cfg_bool(config: Option<&ComponentConfig>, key: &str, default: bool) -> CuResult<bool> {
71 Ok(match config {
72 Some(cfg) => cfg.get::<bool>(key)?.unwrap_or(default),
73 None => default,
74 })
75}
76
77fn cfg_u8(config: Option<&ComponentConfig>, key: &str, default: u8) -> CuResult<u8> {
78 let raw = match config {
79 Some(cfg) => cfg.get::<u64>(key)?.unwrap_or(default as u64),
80 None => default as u64,
81 };
82 u8::try_from(raw).map_err(|_| {
83 CuError::from(format!(
84 "sen0682 config key `{key}` must fit in u8, got {raw}"
85 ))
86 })
87}
88
89fn cfg_f32(config: Option<&ComponentConfig>, key: &str, default: f32) -> CuResult<f32> {
90 let raw = match config {
91 Some(cfg) => cfg.get::<f64>(key)?.unwrap_or(default as f64),
92 None => default as f64,
93 };
94 Ok(raw as f32)
95}
96
97trait FrameTransport {
98 fn start(&mut self, readout: &Sen0682ReadoutConfig) -> CuResult<()>;
99 fn read_frame(&mut self, out: &mut [u8]) -> CuResult<Option<usize>>;
100 fn stop(&mut self) -> CuResult<()> {
101 Ok(())
102 }
103}
104
105struct Sen0682SourceCore<T> {
106 transport: T,
107 frame_buffer: [u8; protocol::MAX_FRAME_BYTES],
111 min_range_m: f32,
112}
113
114impl<T> Sen0682SourceCore<T>
115where
116 T: FrameTransport,
117{
118 fn new(transport: T, min_range_m: f32) -> Self {
119 Self {
120 transport,
121 frame_buffer: [0u8; protocol::MAX_FRAME_BYTES],
122 min_range_m,
123 }
124 }
125
126 fn start(&mut self, readout: &Sen0682ReadoutConfig) -> CuResult<()> {
127 self.transport.start(readout)
128 }
129
130 fn process(
131 &mut self,
132 ctx: &CuContext,
133 output: &mut CuMsg<PointCloudSoa<MAX_POINTS>>,
134 ) -> CuResult<()> {
135 let Some(frame_len) = self.transport.read_frame(&mut self.frame_buffer)? else {
136 output.metadata.set_status("waiting");
137 output.clear_payload();
138 return Ok(());
139 };
140
141 let payload = output
144 .payload_mut()
145 .get_or_insert_with(PointCloudSoa::<MAX_POINTS>::default);
146
147 let stats = protocol::decode_frame_into(
148 &self.frame_buffer[..frame_len],
149 ctx.now(),
150 self.min_range_m,
151 payload,
152 )
153 .map_err(|err| CuError::from(format!("sen0682 frame decode failed: {err}")))?;
154
155 if payload.len == 0 {
156 output.metadata.set_status("filtered");
157 output.clear_payload();
158 return Ok(());
159 }
160
161 output.metadata.set_status("streaming");
162 output.tov = Tov::Time(ctx.now());
163
164 if stats.frame_idx == 0 {
165 debug!(
166 "sen0682: first frame width={} height={} points={} device_index={}",
167 stats.width, stats.height, stats.valid_points, stats.device_index
168 );
169 }
170
171 Ok(())
172 }
173
174 fn stop(&mut self) -> CuResult<()> {
175 self.transport.stop()
176 }
177}
178
179struct SerialTransport<S> {
180 serial: S,
181 buffer: [u8; SERIAL_BUFFER_BYTES],
182 buffered: usize,
183 configured_by_driver: bool,
184}
185
186impl<S> SerialTransport<S> {
187 fn new(serial: S) -> Self {
188 Self {
189 serial,
190 buffer: [0u8; SERIAL_BUFFER_BYTES],
191 buffered: 0,
192 configured_by_driver: false,
193 }
194 }
195
196 fn discard_prefix(&mut self, count: usize) {
197 if count >= self.buffered {
198 self.buffered = 0;
199 return;
200 }
201 self.buffer.copy_within(count..self.buffered, 0);
202 self.buffered -= count;
203 }
204
205 fn keep_tail(&mut self, count: usize) {
206 if count >= self.buffered {
207 return;
208 }
209 let start = self.buffered - count;
210 self.buffer.copy_within(start..self.buffered, 0);
211 self.buffered = count;
212 }
213
214 fn try_extract_frame(&mut self, out: &mut [u8]) -> CuResult<Option<usize>> {
215 loop {
216 if self.buffered < protocol::TAG.len() {
217 return Ok(None);
218 }
219
220 let Some(tag_pos) = protocol::find_tag(&self.buffer[..self.buffered]) else {
221 self.keep_tail(self.buffered.min(protocol::TAG.len() - 1));
222 return Ok(None);
223 };
224
225 if tag_pos > 0 {
226 self.discard_prefix(tag_pos);
227 }
228
229 if self.buffered < protocol::HEADER_BYTES {
230 return Ok(None);
231 }
232
233 match protocol::frame_total_bytes_from_prefix(&self.buffer[..protocol::HEADER_BYTES]) {
234 Ok(Some(total_bytes)) => {
235 if total_bytes > out.len() {
236 return Err(CuError::from(format!(
237 "sen0682 frame length {total_bytes} exceeds parser buffer {}",
238 out.len()
239 )));
240 }
241 if self.buffered < total_bytes {
242 return Ok(None);
243 }
244 out[..total_bytes].copy_from_slice(&self.buffer[..total_bytes]);
245 self.discard_prefix(total_bytes);
246 return Ok(Some(total_bytes));
247 }
248 Ok(None) => return Ok(None),
249 Err(_) => {
250 self.discard_prefix(1);
251 }
252 }
253 }
254 }
255}
256
257impl<S> SerialTransport<S>
258where
259 S: Read + Write + ErrorType,
260 <S as ErrorType>::Error: embedded_io::Error + fmt::Debug,
261{
262 fn send_command(&mut self, command: &str) -> CuResult<()> {
263 write_all(&mut self.serial, command.as_bytes())?;
264 self.serial
265 .flush()
266 .map_err(|err| CuError::from(format!("sen0682 command flush failed: {err:?}")))?;
267
268 let mut saw_error = false;
269 let mut scratch = [0u8; 256];
270 loop {
271 match self.serial.read(&mut scratch) {
272 Ok(0) => break,
273 Ok(n) => {
274 saw_error |= contains_ascii_token(&scratch[..n], b"ERROR");
275 }
276 Err(err) if is_idle_io_error(&err) => break,
277 Err(err) => {
278 return Err(CuError::from(format!(
279 "sen0682 response read failed after `{}`: {err:?}",
280 command.trim()
281 )));
282 }
283 }
284 }
285
286 if saw_error {
287 return Err(CuError::from(format!(
288 "sen0682 rejected command `{}`",
289 command.trim()
290 )));
291 }
292
293 Ok(())
294 }
295
296 fn configure_streaming(&mut self, readout: &Sen0682ReadoutConfig) -> CuResult<()> {
297 if !readout.configure_device {
298 self.configured_by_driver = false;
299 return Ok(());
300 }
301
302 self.send_command("AT+STREAM_CONTROL=0\n")?;
303 self.send_command("AT+STREAM_DATA_TYPE=3\n")?;
304 self.send_command("AT+SPAD_FRAME_MODE=0\n")?;
305 self.send_command(
306 format!(
307 "AT+SPAD_OUTPUT_LINE_DATA={},{},{}\n",
308 readout.row_id, readout.start_column, readout.end_column
309 )
310 .as_str(),
311 )?;
312 self.send_command("AT+STREAM_CONTROL=1\n")?;
315 self.buffered = 0;
316 self.configured_by_driver = true;
317 Ok(())
318 }
319}
320
321impl<S> FrameTransport for SerialTransport<S>
322where
323 S: Read + Write + ErrorType + Send + Sync + 'static,
324 <S as ErrorType>::Error: embedded_io::Error + fmt::Debug + 'static,
325{
326 fn start(&mut self, readout: &Sen0682ReadoutConfig) -> CuResult<()> {
327 self.configure_streaming(readout)
328 }
329
330 fn read_frame(&mut self, out: &mut [u8]) -> CuResult<Option<usize>> {
331 if let Some(frame) = self.try_extract_frame(out)? {
332 return Ok(Some(frame));
333 }
334
335 if self.buffered == self.buffer.len() {
336 return Err(CuError::from(
337 "sen0682 serial framing buffer saturated before a valid frame was found",
338 ));
339 }
340
341 match self.serial.read(&mut self.buffer[self.buffered..]) {
342 Ok(0) => Ok(None),
343 Ok(n) => {
344 self.buffered += n;
345 self.try_extract_frame(out)
346 }
347 Err(err) if is_idle_io_error(&err) => Ok(None),
348 Err(err) => Err(CuError::from(format!(
349 "sen0682 serial read failed: {err:?}"
350 ))),
351 }
352 }
353
354 fn stop(&mut self) -> CuResult<()> {
355 if self.configured_by_driver {
356 let _ = self.send_command("AT+STREAM_CONTROL=0\n");
357 self.configured_by_driver = false;
358 }
359 Ok(())
360 }
361}
362
363fn write_all<S>(serial: &mut S, bytes: &[u8]) -> CuResult<()>
364where
365 S: Write + ErrorType,
366 <S as ErrorType>::Error: fmt::Debug,
367{
368 let mut written = 0;
369 while written < bytes.len() {
370 let n = serial
371 .write(&bytes[written..])
372 .map_err(|err| CuError::from(format!("sen0682 command write failed: {err:?}")))?;
373 if n == 0 {
374 return Err(CuError::from(
375 "sen0682 command write returned zero bytes before completion",
376 ));
377 }
378 written += n;
379 }
380 Ok(())
381}
382
383fn contains_ascii_token(haystack: &[u8], token: &[u8]) -> bool {
384 haystack.windows(token.len()).any(|window| window == token)
385}
386
387fn is_idle_io_error<E>(err: &E) -> bool
388where
389 E: embedded_io::Error,
390{
391 matches!(err.kind(), ErrorKind::TimedOut | ErrorKind::Interrupted)
392}
393
394pub trait Sen0682I2cBus: Send + Sync + 'static {
400 type Error: fmt::Debug + Send + 'static;
401
402 fn configure_stream(&mut self, readout: &Sen0682ReadoutConfig) -> Result<(), Self::Error>;
403 fn read_frame(&mut self, out: &mut [u8]) -> Result<Option<usize>, Self::Error>;
404 fn stop_stream(&mut self) -> Result<(), Self::Error> {
405 Ok(())
406 }
407}
408
409struct I2cTransport<B> {
410 bus: B,
411}
412
413impl<B> I2cTransport<B> {
414 fn new(bus: B) -> Self {
415 Self { bus }
416 }
417}
418
419impl<B> FrameTransport for I2cTransport<B>
420where
421 B: Sen0682I2cBus,
422{
423 fn start(&mut self, readout: &Sen0682ReadoutConfig) -> CuResult<()> {
424 self.bus
425 .configure_stream(readout)
426 .map_err(|err| CuError::from(format!("sen0682 i2c configure failed: {err:?}")))
427 }
428
429 fn read_frame(&mut self, out: &mut [u8]) -> CuResult<Option<usize>> {
430 self.bus
431 .read_frame(out)
432 .map_err(|err| CuError::from(format!("sen0682 i2c read failed: {err:?}")))
433 }
434
435 fn stop(&mut self) -> CuResult<()> {
436 self.bus
437 .stop_stream()
438 .map_err(|err| CuError::from(format!("sen0682 i2c stop failed: {err:?}")))
439 }
440}
441
442#[derive(Copy, Clone, Debug, Eq, PartialEq)]
443pub enum SerialBinding {
444 Serial,
445}
446
447pub struct Sen0682SerialResourcesT<S> {
448 pub serial: Owned<S>,
449}
450
451#[cfg(feature = "std")]
452pub type Sen0682SerialResources = Sen0682SerialResourcesT<LinuxSerialPort>;
453
454impl<'r, S: 'static + Send + Sync> ResourceBindings<'r> for Sen0682SerialResourcesT<S> {
455 type Binding = SerialBinding;
456
457 fn from_bindings(
458 manager: &'r mut ResourceManager,
459 mapping: Option<&ResourceBindingMap<Self::Binding>>,
460 ) -> CuResult<Self> {
461 let mapping = mapping.ok_or_else(|| {
462 CuError::from("Sen0682SerialSourceTask requires a `serial` resource mapping")
463 })?;
464 let path = mapping.get(SerialBinding::Serial).ok_or_else(|| {
465 CuError::from(
466 "Sen0682SerialSourceTask resources must include `serial: <bundle.resource>`",
467 )
468 })?;
469 let serial = manager
470 .take::<S>(path.typed())
471 .map_err(|e| e.add_cause("Failed to fetch SEN0682 serial resource"))?;
472 Ok(Self { serial })
473 }
474}
475
476#[derive(Copy, Clone, Debug, Eq, PartialEq)]
477pub enum I2cBinding {
478 I2c,
479}
480
481pub struct Sen0682I2cResourcesT<B> {
482 pub i2c: Owned<B>,
483}
484
485impl<'r, B: 'static + Send + Sync> ResourceBindings<'r> for Sen0682I2cResourcesT<B> {
486 type Binding = I2cBinding;
487
488 fn from_bindings(
489 manager: &'r mut ResourceManager,
490 mapping: Option<&ResourceBindingMap<Self::Binding>>,
491 ) -> CuResult<Self> {
492 let mapping = mapping.ok_or_else(|| {
493 CuError::from("Sen0682I2cSourceTask requires an `i2c` resource mapping")
494 })?;
495 let path = mapping.get(I2cBinding::I2c).ok_or_else(|| {
496 CuError::from("Sen0682I2cSourceTask resources must include `i2c: <bundle.resource>`")
497 })?;
498 let i2c = manager
499 .take::<B>(path.typed())
500 .map_err(|e| e.add_cause("Failed to fetch SEN0682 I2C resource"))?;
501 Ok(Self { i2c })
502 }
503}
504
505#[derive(Reflect)]
506#[reflect(no_field_bounds, from_reflect = false, type_path = false)]
507pub struct Sen0682SerialSourceTask<S> {
508 #[reflect(ignore)]
509 core: Sen0682SourceCore<SerialTransport<S>>,
510 configure_device: bool,
511 row_id: u8,
512 start_column: u8,
513 end_column: u8,
514 min_range_m: f32,
515}
516
517#[cfg(feature = "std")]
518pub type Sen0682SerialSource = Sen0682SerialSourceTask<LinuxSerialPort>;
519
520impl<S: 'static> TypePath for Sen0682SerialSourceTask<S> {
521 fn type_path() -> &'static str {
522 "cu_sen0682::Sen0682SerialSourceTask"
523 }
524
525 fn short_type_path() -> &'static str {
526 "Sen0682SerialSourceTask"
527 }
528
529 fn type_ident() -> Option<&'static str> {
530 Some("Sen0682SerialSourceTask")
531 }
532
533 fn crate_name() -> Option<&'static str> {
534 Some("cu_sen0682")
535 }
536
537 fn module_path() -> Option<&'static str> {
538 Some("cu_sen0682")
539 }
540}
541
542impl<S> fmt::Debug for Sen0682SerialSourceTask<S> {
543 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
544 f.debug_struct("Sen0682SerialSourceTask")
545 .field("configure_device", &self.configure_device)
546 .field("row_id", &self.row_id)
547 .field("start_column", &self.start_column)
548 .field("end_column", &self.end_column)
549 .field("min_range_m", &self.min_range_m)
550 .finish()
551 }
552}
553
554impl<S> Freezable for Sen0682SerialSourceTask<S> {}
555
556impl<S> Sen0682SerialSourceTask<S> {
557 fn readout_config(&self) -> Sen0682ReadoutConfig {
558 Sen0682ReadoutConfig {
559 configure_device: self.configure_device,
560 row_id: self.row_id,
561 start_column: self.start_column,
562 end_column: self.end_column,
563 }
564 }
565}
566
567impl<S> CuSrcTask for Sen0682SerialSourceTask<S>
568where
569 S: Read + Write + ErrorType + Send + Sync + 'static,
570 <S as ErrorType>::Error: embedded_io::Error + fmt::Debug + 'static,
571{
572 type Resources<'r> = Sen0682SerialResourcesT<S>;
573 type Output<'m> = output_msg!(PointCloudSoa<MAX_POINTS>);
574
575 fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
576 where
577 Self: Sized,
578 {
579 let readout = Sen0682ReadoutConfig::from_component_config(config)?;
580 let min_range_m = cfg_f32(config, "min_range_m", DEFAULT_MIN_RANGE_M)?;
581
582 Ok(Self {
583 core: Sen0682SourceCore::new(SerialTransport::new(resources.serial.0), min_range_m),
584 configure_device: readout.configure_device,
585 row_id: readout.row_id,
586 start_column: readout.start_column,
587 end_column: readout.end_column,
588 min_range_m,
589 })
590 }
591
592 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
593 self.core.start(&self.readout_config())
594 }
595
596 fn process(&mut self, ctx: &CuContext, output: &mut Self::Output<'_>) -> CuResult<()> {
597 self.core.process(ctx, output)
598 }
599
600 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
601 self.core.stop()
602 }
603}
604
605#[derive(Reflect)]
606#[reflect(no_field_bounds, from_reflect = false, type_path = false)]
607pub struct Sen0682I2cSourceTask<B> {
608 #[reflect(ignore)]
609 core: Sen0682SourceCore<I2cTransport<B>>,
610 configure_device: bool,
611 row_id: u8,
612 start_column: u8,
613 end_column: u8,
614 min_range_m: f32,
615}
616
617impl<B: 'static> TypePath for Sen0682I2cSourceTask<B> {
618 fn type_path() -> &'static str {
619 "cu_sen0682::Sen0682I2cSourceTask"
620 }
621
622 fn short_type_path() -> &'static str {
623 "Sen0682I2cSourceTask"
624 }
625
626 fn type_ident() -> Option<&'static str> {
627 Some("Sen0682I2cSourceTask")
628 }
629
630 fn crate_name() -> Option<&'static str> {
631 Some("cu_sen0682")
632 }
633
634 fn module_path() -> Option<&'static str> {
635 Some("cu_sen0682")
636 }
637}
638
639impl<B> fmt::Debug for Sen0682I2cSourceTask<B> {
640 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
641 f.debug_struct("Sen0682I2cSourceTask")
642 .field("configure_device", &self.configure_device)
643 .field("row_id", &self.row_id)
644 .field("start_column", &self.start_column)
645 .field("end_column", &self.end_column)
646 .field("min_range_m", &self.min_range_m)
647 .finish()
648 }
649}
650
651impl<B> Freezable for Sen0682I2cSourceTask<B> {}
652
653impl<B> Sen0682I2cSourceTask<B> {
654 fn readout_config(&self) -> Sen0682ReadoutConfig {
655 Sen0682ReadoutConfig {
656 configure_device: self.configure_device,
657 row_id: self.row_id,
658 start_column: self.start_column,
659 end_column: self.end_column,
660 }
661 }
662}
663
664impl<B> CuSrcTask for Sen0682I2cSourceTask<B>
665where
666 B: Sen0682I2cBus,
667{
668 type Resources<'r> = Sen0682I2cResourcesT<B>;
669 type Output<'m> = output_msg!(PointCloudSoa<MAX_POINTS>);
670
671 fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
672 where
673 Self: Sized,
674 {
675 let readout = Sen0682ReadoutConfig::from_component_config(config)?;
676 let min_range_m = cfg_f32(config, "min_range_m", DEFAULT_MIN_RANGE_M)?;
677
678 Ok(Self {
679 core: Sen0682SourceCore::new(I2cTransport::new(resources.i2c.0), min_range_m),
680 configure_device: readout.configure_device,
681 row_id: readout.row_id,
682 start_column: readout.start_column,
683 end_column: readout.end_column,
684 min_range_m,
685 })
686 }
687
688 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
689 self.core.start(&self.readout_config())
690 }
691
692 fn process(&mut self, ctx: &CuContext, output: &mut Self::Output<'_>) -> CuResult<()> {
693 self.core.process(ctx, output)
694 }
695
696 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
697 self.core.stop()
698 }
699}
700
701#[cfg(test)]
702mod tests {
703 use super::*;
704 use alloc::collections::VecDeque;
705 use alloc::vec::Vec;
706
707 fn build_test_frame() -> Vec<u8> {
708 let mut bytes = Vec::new();
709 bytes.extend_from_slice(b"wyld");
710 bytes.extend_from_slice(&2u16.to_le_bytes());
711 bytes.extend_from_slice(&1u16.to_le_bytes());
712 bytes.extend_from_slice(&16u32.to_le_bytes());
713 bytes.extend_from_slice(&16u16.to_le_bytes());
714 bytes.extend_from_slice(&3u16.to_le_bytes());
715 bytes.extend_from_slice(&7u32.to_le_bytes());
716 bytes.extend_from_slice(&0u32.to_le_bytes());
717 bytes.extend_from_slice(&0u32.to_le_bytes());
718 bytes.extend_from_slice(&0u32.to_le_bytes());
719 bytes.extend_from_slice(&1000i16.to_le_bytes());
720 bytes.extend_from_slice(&0i16.to_le_bytes());
721 bytes.extend_from_slice(&2000i16.to_le_bytes());
722 bytes.extend_from_slice(&123u16.to_le_bytes());
723 bytes.extend_from_slice(&1500i16.to_le_bytes());
724 bytes.extend_from_slice(&0i16.to_le_bytes());
725 bytes.extend_from_slice(&2500i16.to_le_bytes());
726 bytes.extend_from_slice(&456u16.to_le_bytes());
727 bytes
728 }
729
730 #[derive(Default)]
731 struct FakeSerial {
732 reads: VecDeque<Result<Vec<u8>, std::io::Error>>,
733 writes: Vec<Vec<u8>>,
734 }
735
736 impl FakeSerial {
737 fn with_reads(reads: impl IntoIterator<Item = Result<Vec<u8>, std::io::Error>>) -> Self {
738 Self {
739 reads: reads.into_iter().collect(),
740 writes: Vec::new(),
741 }
742 }
743 }
744
745 impl ErrorType for FakeSerial {
746 type Error = std::io::Error;
747 }
748
749 impl Read for FakeSerial {
750 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
751 let Some(next) = self.reads.pop_front() else {
752 return Err(std::io::Error::from(std::io::ErrorKind::TimedOut));
753 };
754 match next {
755 Ok(chunk) => {
756 let len = chunk.len().min(buf.len());
757 buf[..len].copy_from_slice(&chunk[..len]);
758 Ok(len)
759 }
760 Err(err) => Err(err),
761 }
762 }
763 }
764
765 impl Write for FakeSerial {
766 fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
767 self.writes.push(buf.to_vec());
768 Ok(buf.len())
769 }
770
771 fn flush(&mut self) -> Result<(), Self::Error> {
772 Ok(())
773 }
774 }
775
776 #[test]
777 fn serial_transport_resynchronizes_after_text_noise() {
778 let frame = build_test_frame();
779 let serial = FakeSerial::with_reads([
780 Ok(b"OK\r\n".to_vec()),
781 Ok(frame[..11].to_vec()),
782 Ok(frame[11..].to_vec()),
783 ]);
784 let mut transport = SerialTransport::new(serial);
785 let mut out = [0u8; protocol::MAX_FRAME_BYTES];
786
787 let mut extracted = None;
788 for _ in 0..3 {
789 extracted = transport.read_frame(&mut out).expect("read should succeed");
790 if extracted.is_some() {
791 break;
792 }
793 }
794
795 assert!(extracted.is_some());
796 assert_eq!(extracted.unwrap(), frame.len());
797 assert_eq!(&out[..frame.len()], frame.as_slice());
798 }
799}