autocore_std/
command_client.rs1use std::collections::HashMap;
30use std::time::{Duration, Instant};
31
32use mechutil::ipc::CommandMessage;
33use serde_json::Value;
34use tokio::sync::mpsc;
35
36struct PendingRequest {
37 topic: String,
38 sent_at: Instant,
39}
40
41pub struct CommandClient {
55 write_tx: mpsc::UnboundedSender<String>,
57 response_rx: mpsc::UnboundedReceiver<CommandMessage>,
59 pending: HashMap<u32, PendingRequest>,
61 responses: HashMap<u32, CommandMessage>,
63}
64
65impl CommandClient {
66 pub fn new(
68 write_tx: mpsc::UnboundedSender<String>,
69 response_rx: mpsc::UnboundedReceiver<CommandMessage>,
70 ) -> Self {
71 Self {
72 write_tx,
73 response_rx,
74 pending: HashMap::new(),
75 responses: HashMap::new(),
76 }
77 }
78
79 pub fn send(&mut self, topic: &str, data: Value) -> u32 {
91 let msg = CommandMessage::request(topic, data);
92 let transaction_id = msg.transaction_id;
93
94 if let Ok(json) = serde_json::to_string(&msg) {
95 let _ = self.write_tx.send(json);
96 }
97
98 self.pending.insert(transaction_id, PendingRequest {
99 topic: topic.to_string(),
100 sent_at: Instant::now(),
101 });
102
103 transaction_id
104 }
105
106 pub fn poll(&mut self) {
113 while let Ok(msg) = self.response_rx.try_recv() {
114 self.pending.remove(&msg.transaction_id);
115 self.responses.insert(msg.transaction_id, msg);
116 }
117 }
118
119 pub fn take_response(&mut self, transaction_id: u32) -> Option<CommandMessage> {
142 self.responses.remove(&transaction_id)
143 }
144
145 pub fn is_pending(&self, transaction_id: u32) -> bool {
152 self.pending.contains_key(&transaction_id)
153 }
154
155 pub fn pending_count(&self) -> usize {
157 self.pending.len()
158 }
159
160 pub fn response_count(&self) -> usize {
162 self.responses.len()
163 }
164
165 pub fn drain_stale(&mut self, timeout: Duration) -> Vec<u32> {
167 let now = Instant::now();
168 let stale: Vec<u32> = self.pending.iter()
169 .filter(|(_, req)| now.duration_since(req.sent_at) > timeout)
170 .map(|(&tid, _)| tid)
171 .collect();
172
173 for tid in &stale {
174 if let Some(req) = self.pending.remove(tid) {
175 log::warn!("Command request {} ('{}') timed out after {:?}",
176 tid, req.topic, timeout);
177 }
178 }
179
180 stale
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187 use mechutil::ipc::MessageType;
188 use serde_json::json;
189
190 #[test]
191 fn test_send_pushes_to_channel() {
192 let (write_tx, mut write_rx) = mpsc::unbounded_channel();
193 let (_response_tx, response_rx) = mpsc::unbounded_channel();
194 let mut client = CommandClient::new(write_tx, response_rx);
195
196 let tid = client.send("test.command", json!({"key": "value"}));
197
198 let msg_json = write_rx.try_recv().expect("should have a message");
200 let msg: CommandMessage = serde_json::from_str(&msg_json).unwrap();
201 assert_eq!(msg.transaction_id, tid);
202 assert_eq!(msg.topic, "test.command");
203 assert_eq!(msg.message_type, MessageType::Request);
204 assert_eq!(msg.data, json!({"key": "value"}));
205
206 assert!(client.is_pending(tid));
208 assert_eq!(client.pending_count(), 1);
209 }
210
211 #[test]
212 fn test_poll_and_take_response() {
213 let (write_tx, _write_rx) = mpsc::unbounded_channel();
214 let (response_tx, response_rx) = mpsc::unbounded_channel();
215 let mut client = CommandClient::new(write_tx, response_rx);
216
217 let tid = client.send("test.command", json!(null));
218 assert!(client.is_pending(tid));
219
220 response_tx.send(CommandMessage::response(tid, json!("ok"))).unwrap();
222
223 assert!(client.take_response(tid).is_none());
225
226 client.poll();
228 assert!(!client.is_pending(tid));
229 assert_eq!(client.response_count(), 1);
230
231 let recv = client.take_response(tid).unwrap();
232 assert_eq!(recv.transaction_id, tid);
233 assert_eq!(client.response_count(), 0);
234 }
235
236 #[test]
237 fn test_multi_consumer_isolation() {
238 let (write_tx, _write_rx) = mpsc::unbounded_channel();
239 let (response_tx, response_rx) = mpsc::unbounded_channel();
240 let mut client = CommandClient::new(write_tx, response_rx);
241
242 let tid_a = client.send("labelit.inspect", json!(null));
244 let tid_b = client.send("other.status", json!(null));
245
246 response_tx.send(CommandMessage::response(tid_b, json!("b_result"))).unwrap();
248 response_tx.send(CommandMessage::response(tid_a, json!("a_result"))).unwrap();
249
250 client.poll();
252 assert_eq!(client.response_count(), 2);
253
254 let resp_a = client.take_response(tid_a).unwrap();
256 assert_eq!(resp_a.data, json!("a_result"));
257
258 let resp_b = client.take_response(tid_b).unwrap();
259 assert_eq!(resp_b.data, json!("b_result"));
260
261 assert!(client.take_response(tid_a).is_none());
263 assert!(client.take_response(tid_b).is_none());
264 assert_eq!(client.response_count(), 0);
265 }
266
267 #[test]
268 fn test_drain_stale() {
269 let (write_tx, _write_rx) = mpsc::unbounded_channel();
270 let (_response_tx, response_rx) = mpsc::unbounded_channel();
271 let mut client = CommandClient::new(write_tx, response_rx);
272
273 let tid = client.send("test.command", json!(null));
274 assert_eq!(client.pending_count(), 1);
275
276 let stale = client.drain_stale(Duration::from_secs(0));
278 assert_eq!(stale, vec![tid]);
279 assert_eq!(client.pending_count(), 0);
280 }
281
282 #[test]
283 fn test_drain_stale_keeps_fresh() {
284 let (write_tx, _write_rx) = mpsc::unbounded_channel();
285 let (_response_tx, response_rx) = mpsc::unbounded_channel();
286 let mut client = CommandClient::new(write_tx, response_rx);
287
288 let tid = client.send("test.command", json!(null));
289
290 let stale = client.drain_stale(Duration::from_secs(3600));
292 assert!(stale.is_empty());
293 assert!(client.is_pending(tid));
294 }
295
296 #[test]
297 fn test_drain_stale_ignores_received() {
298 let (write_tx, _write_rx) = mpsc::unbounded_channel();
299 let (response_tx, response_rx) = mpsc::unbounded_channel();
300 let mut client = CommandClient::new(write_tx, response_rx);
301
302 let tid = client.send("test.command", json!(null));
303
304 response_tx.send(CommandMessage::response(tid, json!("ok"))).unwrap();
306 client.poll();
307
308 let stale = client.drain_stale(Duration::from_secs(0));
310 assert!(stale.is_empty());
311
312 assert!(client.take_response(tid).is_some());
314 }
315
316 #[test]
317 fn test_multiple_pending() {
318 let (write_tx, _write_rx) = mpsc::unbounded_channel();
319 let (response_tx, response_rx) = mpsc::unbounded_channel();
320 let mut client = CommandClient::new(write_tx, response_rx);
321
322 let tid1 = client.send("cmd.first", json!(1));
323 let tid2 = client.send("cmd.second", json!(2));
324 let tid3 = client.send("cmd.third", json!(3));
325 assert_eq!(client.pending_count(), 3);
326
327 response_tx.send(CommandMessage::response(tid2, json!("ok"))).unwrap();
329 client.poll();
330
331 assert_eq!(client.pending_count(), 2);
332 assert!(client.is_pending(tid1));
333 assert!(!client.is_pending(tid2));
334 assert!(client.is_pending(tid3));
335
336 let recv = client.take_response(tid2).unwrap();
337 assert_eq!(recv.transaction_id, tid2);
338 }
339}