loco_protocol/command/
client.rs

1/*
2 * Created on Sat Sep 09 2023
3 *
4 * Copyright (c) storycraft. Licensed under the MIT Licence.
5 */
6
7use alloc::{boxed::Box, collections::VecDeque};
8use core::mem;
9
10use arrayvec::ArrayVec;
11use serde::{Deserialize, Serialize};
12
13use crate::command::Header;
14
15use super::Command;
16
17#[derive(Debug)]
18#[non_exhaustive]
19/// IO-free loco protocol sink
20pub struct LocoSink {
21    /// Write buffer for sink
22    pub write_buffer: VecDeque<u8>,
23}
24
25impl LocoSink {
26    /// Create new [`LocoSink`]
27    pub const fn new() -> Self {
28        Self {
29            write_buffer: VecDeque::new(),
30        }
31    }
32
33    /// Write single [`Command`] to [`LocoSink::write_buffer`]
34    pub fn send(&mut self, command: Command<impl AsRef<[u8]>>) {
35        let data = command.data.as_ref();
36
37        bincode::serialize_into(
38            &mut self.write_buffer,
39            &RawHeader {
40                header: command.header,
41                data_size: data.len() as u32,
42            },
43        )
44        .unwrap();
45
46        self.write_buffer.extend(data);
47    }
48}
49
50impl Default for LocoSink {
51    fn default() -> Self {
52        Self::new()
53    }
54}
55
56#[derive(Debug)]
57/// IO-free loco protocol stream
58pub struct LocoStream {
59    state: StreamState,
60
61    /// Read buffer for stream
62    pub read_buffer: VecDeque<u8>,
63}
64
65impl LocoStream {
66    /// Create new [`LocoStream`]
67    pub const fn new() -> Self {
68        Self {
69            state: StreamState::Pending,
70            read_buffer: VecDeque::new(),
71        }
72    }
73
74    pub const fn state(&self) -> &StreamState {
75        &self.state
76    }
77
78    /// Try reading single [`Command`] from [`LocoClient::read_buffer`]
79    pub fn read(&mut self) -> Option<Command<Box<[u8]>>> {
80        loop {
81            match mem::replace(&mut self.state, StreamState::Corrupted) {
82                StreamState::Pending => {
83                    if self.read_buffer.len() < 22 {
84                        self.state = StreamState::Pending;
85                        return None;
86                    }
87
88                    let raw_header = {
89                        let buf = self.read_buffer.drain(..22).collect::<ArrayVec<u8, 22>>();
90
91                        bincode::deserialize::<RawHeader>(&buf).unwrap()
92                    };
93
94                    self.state = StreamState::Header(raw_header);
95                }
96
97                StreamState::Header(raw_header) => {
98                    if self.read_buffer.len() < raw_header.data_size as usize {
99                        self.state = StreamState::Header(raw_header);
100                        return None;
101                    }
102
103                    let data = self
104                        .read_buffer
105                        .drain(..raw_header.data_size as usize)
106                        .collect::<Box<[u8]>>();
107
108                    self.state = StreamState::Pending;
109                    return Some(Command {
110                        header: raw_header.header,
111                        data,
112                    });
113                }
114
115                StreamState::Corrupted => unreachable!(),
116            }
117        }
118    }
119}
120
121impl Default for LocoStream {
122    fn default() -> Self {
123        Self::new()
124    }
125}
126
127#[derive(Debug, Clone, PartialEq)]
128pub enum StreamState {
129    /// Stream is waiting for packet
130    Pending,
131
132    /// Stream read header and wait for data
133    Header(RawHeader),
134
135    /// Client corrupted and cannot continue
136    Corrupted,
137}
138
139#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
140pub struct RawHeader {
141    /// Packet header
142    pub header: Header,
143
144    /// Data size
145    pub data_size: u32,
146}