loco_protocol/command/
client.rs1use alloc::{boxed::Box, collections::VecDeque};
8use core::mem;
9
10use arrayvec::ArrayVec;
11use serde::{Deserialize, Serialize};
12
13use crate::command::Header;
14
15use super::Command;
16
17#[derive(Debug)]
18#[non_exhaustive]
19pub struct LocoSink {
21 pub write_buffer: VecDeque<u8>,
23}
24
25impl LocoSink {
26 pub const fn new() -> Self {
28 Self {
29 write_buffer: VecDeque::new(),
30 }
31 }
32
33 pub fn send(&mut self, command: Command<impl AsRef<[u8]>>) {
35 let data = command.data.as_ref();
36
37 bincode::serialize_into(
38 &mut self.write_buffer,
39 &RawHeader {
40 header: command.header,
41 data_size: data.len() as u32,
42 },
43 )
44 .unwrap();
45
46 self.write_buffer.extend(data);
47 }
48}
49
50impl Default for LocoSink {
51 fn default() -> Self {
52 Self::new()
53 }
54}
55
56#[derive(Debug)]
57pub struct LocoStream {
59 state: StreamState,
60
61 pub read_buffer: VecDeque<u8>,
63}
64
65impl LocoStream {
66 pub const fn new() -> Self {
68 Self {
69 state: StreamState::Pending,
70 read_buffer: VecDeque::new(),
71 }
72 }
73
74 pub const fn state(&self) -> &StreamState {
75 &self.state
76 }
77
78 pub fn read(&mut self) -> Option<Command<Box<[u8]>>> {
80 loop {
81 match mem::replace(&mut self.state, StreamState::Corrupted) {
82 StreamState::Pending => {
83 if self.read_buffer.len() < 22 {
84 self.state = StreamState::Pending;
85 return None;
86 }
87
88 let raw_header = {
89 let buf = self.read_buffer.drain(..22).collect::<ArrayVec<u8, 22>>();
90
91 bincode::deserialize::<RawHeader>(&buf).unwrap()
92 };
93
94 self.state = StreamState::Header(raw_header);
95 }
96
97 StreamState::Header(raw_header) => {
98 if self.read_buffer.len() < raw_header.data_size as usize {
99 self.state = StreamState::Header(raw_header);
100 return None;
101 }
102
103 let data = self
104 .read_buffer
105 .drain(..raw_header.data_size as usize)
106 .collect::<Box<[u8]>>();
107
108 self.state = StreamState::Pending;
109 return Some(Command {
110 header: raw_header.header,
111 data,
112 });
113 }
114
115 StreamState::Corrupted => unreachable!(),
116 }
117 }
118 }
119}
120
121impl Default for LocoStream {
122 fn default() -> Self {
123 Self::new()
124 }
125}
126
127#[derive(Debug, Clone, PartialEq)]
128pub enum StreamState {
129 Pending,
131
132 Header(RawHeader),
134
135 Corrupted,
137}
138
139#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
140pub struct RawHeader {
141 pub header: Header,
143
144 pub data_size: u32,
146}