1use std::collections::HashMap;
2use std::sync::Arc;
3
4#[cfg(not(feature = "tokio_mutex"))]
5use std::sync::Mutex as StdMutex;
6
7#[cfg(feature = "tokio_mutex")]
8use tokio::sync::Mutex as TokioMutex;
9
10use proto::message_parser::subscription::Subscription;
11use tokio::task::JoinHandle;
12use tokio::time::{Duration, sleep};
13use ftswarm_proto::command::direct::FtSwarmDirectCommand;
14use ftswarm_proto::command::FtSwarmCommand;
15use ftswarm_proto::message_parser::rpc::RPCReturnParam;
16use ftswarm_proto::message_parser::S2RMessage;
17use ftswarm_proto::Serialized;
18use ftswarm_serial::SwarmSerialPort;
19use ftswarm_serial::serial::SerialCommunication;
20use crate::message_queue::{ReturnQueue, SenderHandle, WriteQueue};
21
22pub use ftswarm_proto as proto;
23use ftswarm_proto::command::rpc::RpcFunction;
24use crate::direct::{parse_uptime, WhoamiResponse};
25
26mod message_queue;
27pub mod swarm_object;
28mod direct;
29pub mod prelude;
30
31#[cfg(test)]
32mod tests;
33
34#[cfg(feature = "tokio_mutex")]
36pub type Mutex<T> = TokioMutex<T>;
37#[cfg(not(feature = "tokio_mutex"))]
38pub type Mutex<T> = StdMutex<T>;
39
40#[cfg(feature = "tokio_mutex")]
41async fn lock<T>(mutex: &Mutex<T>) -> tokio::sync::MutexGuard<T> {
42 mutex.lock().await
43}
44
45#[cfg(not(feature = "tokio_mutex"))]
46async fn lock<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<T> {
47 mutex.lock().unwrap()
48}
49
50#[macro_export]
70macro_rules! aliases {
71 (
72 $enum_name:ident {
73 $(
74 $variant:ident = $alias:expr
75 ),* $(,)?
76 }
77 ) => {
78 #[derive(Debug)]
79 pub struct $enum_name {
80 }
81
82 impl $enum_name {
83 $(
84 pub const $variant: &'static str = $alias;
85 )*
86 }
87 };
88}
89
90struct InnerFtSwarm {
91 objects: HashMap<String, Box<dyn Fn(RPCReturnParam) + Send>>,
92 message_queue: ReturnQueue,
93 write_queue: WriteQueue,
94}
95
96impl InnerFtSwarm {
97 fn new() -> Self {
98 InnerFtSwarm {
99 objects: HashMap::new(),
100 message_queue: ReturnQueue::new(),
101 write_queue: WriteQueue::new(),
102 }
103 }
104}
105
106pub struct FtSwarm {
108 inner: Arc<Mutex<InnerFtSwarm>>,
109 coro: Option<JoinHandle<()>>,
110}
111
112impl FtSwarm {
113 pub fn new<Serial: SwarmSerialPort + 'static>(mut serial: Serial) -> Self {
115 let inner = Arc::new(Mutex::new(InnerFtSwarm::new()));
116
117 let inner_for_thread = inner.clone();
118 serial.write_line(FtSwarmCommand::Direct(FtSwarmDirectCommand::StartCli).serialize()).expect("Write line failure");
121 serial.block_until("@@@".to_string()).expect("Block until failure");
122
123 let handle = tokio::spawn(async move {
124 FtSwarm::input_loop(inner_for_thread, serial).await;
125 });
126
127 FtSwarm {
128 inner,
129 coro: Some(handle),
130 }
131 }
132
133 async fn input_loop<Serial: SwarmSerialPort + 'static>(inner_ft_swarm: Arc<Mutex<InnerFtSwarm>>, mut serial_port: Serial) {
134 loop {
135 if serial_port.available().expect("Available check failure") {
136 let line = serial_port.read_line().expect("Readline failure").replace("\n", "").replace("\r", "");
137 let response = S2RMessage::from(line);
138 {
139 let mut inner = lock(&inner_ft_swarm).await;
140 if let S2RMessage::Subscription(subscription) = response {
141 if let Ok(subscription) = Subscription::try_from(subscription) {
142 if let Some(object) = inner.objects.get(&subscription.port_name) {
143 object(subscription.value.clone());
144 }
145 }
146 } else {
147 inner.message_queue.push(response);
148 }
149 }
150 }
151
152 {
153 let mut inner = lock(&inner_ft_swarm).await;
154
155 if let Some(data) = inner.write_queue.pop() {
157 serial_port.write_line(data).expect("Write line failure");
158 }
159 }
160
161 sleep(Duration::from_millis(15)).await;
162 }
163 }
164
165
166pub(crate) async fn push_cache(&self, object: Box<dyn Fn(RPCReturnParam) + Send>, name: &str) {
167 let mut inner = lock(&self.inner).await;
168 inner.objects.insert(name.to_string(), object);
169}
170
171pub async fn send_command(&self, command: FtSwarmCommand) {
173 let mut inner = lock(&self.inner).await;
174 inner.write_queue.push(command);
175}
176
177pub async fn read_response(&self) -> Result<RPCReturnParam, String> {
179 let (handle, mut recv) = SenderHandle::create();
180 {
181 let mut inner = lock(&self.inner).await;
182 inner.message_queue.push_sender(&handle);
183 }
184
185 let response = recv.recv().await.unwrap();
186
187 {
188 let mut inner = lock(&self.inner).await;
189 inner.message_queue.drop_sender(&handle);
190 }
191
192 match response {
193 S2RMessage::RPCResponse(data) => Ok(RPCReturnParam::from(data)),
194 S2RMessage::Error(data) => Err(data),
195 any => Err(format!("Received non-RPCResponse message, {:?}", any).to_string()),
196 }
197}
198
199
200pub async fn transact(&self, command: FtSwarmCommand) -> Result<RPCReturnParam, String> {
202 let is_subscription = match &command {
204 FtSwarmCommand::RPC(cmd) => cmd.function == RpcFunction::Subscribe,
205 _ => false,
206 };
207
208 self.send_command(command).await;
209
210 if is_subscription {
211 return Ok(RPCReturnParam::Ok);
212 }
213
214 self.read_response().await
215}
216
217pub async fn whoami(&self) -> Result<WhoamiResponse, String> {
219 let response = self.transact(FtSwarmCommand::Direct(FtSwarmDirectCommand::Whoami)).await?;
220 if let RPCReturnParam::String(str) = response {
221 Ok(WhoamiResponse::try_from(str)?)
222 } else {
223 Err("Received non-string response".to_string())
224 }
225}
226
227pub async fn halt(&self) {
229 self.send_command(FtSwarmCommand::Direct(FtSwarmDirectCommand::Halt)).await;
230}
231
232pub async fn uptime(&self) -> Result<Duration, String> {
234 let response = self.transact(FtSwarmCommand::Direct(FtSwarmDirectCommand::Uptime)).await?;
235
236 if let RPCReturnParam::String(str) = response {
237 Ok(parse_uptime(str)?)
238 } else {
239 Err("Received non-string response".to_string())
240 }
241}
242}
243
244impl Drop for FtSwarm {
245 fn drop(&mut self) {
246 if let Some(coro) = self.coro.take() {
247 coro.abort();
248 }
249 }
250}
251
252impl Clone for FtSwarm {
253 fn clone(&self) -> Self {
254 Self { inner: self.inner.clone(), coro: None }
255 }
256}
257
258impl Default for FtSwarm {
261 fn default() -> Self {
262 FtSwarm::new(SerialCommunication::default())
263 }
264}