tab_daemon/service/
cli.rs

1// mod session;
2use crate::message::cli::{
3    CliRecv, CliSend, CliShutdown, CliSubscriptionRecv, CliSubscriptionSend,
4};
5use crate::prelude::*;
6use crate::state::tab::TabsState;
7use anyhow::Context;
8use tab_api::client::InitResponse;
9
10use tokio::stream::StreamExt;
11
12pub mod subscription;
13
14/// Drives an active connection from the tab-command client, and forwards messages between the websocket and the daemon.
15/// Tracks the client tab subscriptions, and filters messages received from the daemon.
16pub struct CliService {
17    _init: Lifeline,
18    _rx_websocket: Lifeline,
19    _rx_daemon: Lifeline,
20    _rx_subscription: Lifeline,
21}
22
23impl Service for CliService {
24    type Bus = CliBus;
25    type Lifeline = anyhow::Result<Self>;
26
27    fn spawn(bus: &Self::Bus) -> Self::Lifeline {
28        let _init = {
29            let mut tx_websocket = bus.tx::<Response>()?;
30            let mut rx_tabs_state = bus.rx::<TabsState>()?;
31
32            Self::try_task("init", async move {
33                let tabs = rx_tabs_state
34                    .recv()
35                    .await
36                    .ok_or_else(|| anyhow::Error::msg("rx TabsState closed"))?;
37
38                let init = InitResponse {
39                    tabs: tabs.tabs.clone(),
40                };
41
42                let init = Response::Init(init);
43                tx_websocket.send(init).await?;
44
45                for tab in tabs.tabs.values() {
46                    debug!("notifying client of existing tab {}", &tab.name);
47                    let message = Response::TabUpdate(tab.clone());
48                    tx_websocket.send(message).await?;
49                }
50
51                Ok(())
52            })
53        };
54
55        let _rx_websocket = {
56            let mut rx = bus.rx::<Request>()?;
57
58            let mut tx_daemon = bus.tx::<CliSend>()?;
59            let mut tx_subscription = bus.tx::<CliSubscriptionRecv>()?;
60            let mut tx_shutdown = bus.tx::<CliShutdown>()?;
61
62            Self::try_task("run", async move {
63                debug!("cli connection waiting for messages");
64
65                while let Some(msg) = rx.recv().await {
66                    Self::recv_websocket(msg, &mut tx_subscription, &mut tx_daemon).await?
67                }
68
69                tx_shutdown.send(CliShutdown {}).await?;
70
71                Ok(())
72            })
73        };
74
75        let _rx_daemon = {
76            let mut rx = bus.rx::<CliRecv>()?;
77
78            let mut tx_websocket = bus.tx::<Response>()?;
79
80            Self::try_task("run", async move {
81                while let Some(msg) = rx.next().await {
82                    Self::recv_daemon(msg, &mut tx_websocket).await?
83                }
84
85                Ok(())
86            })
87        };
88
89        let _rx_subscription = {
90            let mut rx = bus.rx::<CliSubscriptionSend>()?;
91
92            let mut tx = bus.tx::<Response>()?;
93
94            Self::try_task("run", async move {
95                debug!("cli connection waiting for messages");
96
97                while let Some(msg) = rx.recv().await {
98                    match msg {
99                        CliSubscriptionSend::Retask(id) => {
100                            tx.send(Response::Retask(id)).await?;
101                        }
102                        CliSubscriptionSend::Output(id, chunk) => {
103                            tx.send(Response::Output(id, chunk)).await?;
104                        }
105                        CliSubscriptionSend::Stopped(id) => {
106                            tx.send(Response::TabTerminated(id)).await?;
107                        }
108                        CliSubscriptionSend::Disconnect => {
109                            tx.send(Response::Disconnect).await?;
110                        }
111                    }
112                }
113
114                Ok(())
115            })
116        };
117
118        Ok(CliService {
119            _init,
120            _rx_websocket,
121            _rx_daemon,
122            _rx_subscription,
123        })
124    }
125}
126
127impl CliService {
128    async fn recv_websocket(
129        request: Request,
130        tx_subscription: &mut impl Sender<CliSubscriptionRecv>,
131        tx_daemon: &mut impl Sender<CliSend>,
132    ) -> anyhow::Result<()> {
133        debug!("received Request: {:?}", &request);
134
135        match request {
136            Request::Subscribe(id) => {
137                debug!("client subscribing to tab {}", id);
138                tx_subscription
139                    .send(CliSubscriptionRecv::Subscribe(id))
140                    .await
141                    .context("tx_subscription closed")?;
142            }
143            Request::Unsubscribe(id) => {
144                debug!("client subscribing from tab {}", id);
145                tx_subscription
146                    .send(CliSubscriptionRecv::Unsubscribe(id))
147                    .await
148                    .context("tx_subscription closed")?;
149            }
150            Request::Input(id, stdin) => {
151                debug!("rx input on tab {}, data: {}", id.0, stdin.to_string());
152                let message = CliSend::Input(id, stdin);
153                tx_daemon.send(message).await.context("tx_daemon closed")?;
154            }
155            Request::CreateTab(create) => {
156                let message = CliSend::CreateTab(create);
157                tx_daemon.send(message).await.context("tx_daemon closed")?;
158            }
159            Request::ResizeTab(id, dimensions) => {
160                info!("Resizing tab {} to {:?}", id.0, dimensions);
161                tx_daemon.send(CliSend::ResizeTab(id, dimensions)).await?;
162            }
163            Request::CloseTab(id) => {
164                let message = CliSend::CloseTab(id);
165                tx_daemon.send(message).await.context("tx_daemon closed")?;
166            }
167            Request::DisconnectTab(id) => {
168                let message = CliSend::DisconnectTab(id);
169                tx_daemon.send(message).await.context("tx_daemon closed")?;
170            }
171            Request::Retask(id, name) => {
172                // we need to send this along so other attached tabs get retasked
173                let message = CliSend::Retask(id, name);
174                tx_daemon.send(message).await?;
175            }
176            Request::GlobalShutdown => {
177                tx_daemon.send(CliSend::GlobalShutdown).await?;
178            }
179        }
180
181        Ok(())
182    }
183
184    async fn recv_daemon(
185        msg: CliRecv,
186        tx_websocket: &mut impl Sender<Response>,
187    ) -> anyhow::Result<()> {
188        debug!("message from daemon: {:?}", &msg);
189        match msg {
190            CliRecv::TabStarted(metadata) => {
191                tx_websocket
192                    .send(Response::TabUpdate(metadata))
193                    .await
194                    .context("tx_websocket closed")?;
195            }
196        }
197        Ok(())
198    }
199}
200
201#[cfg(test)]
202mod request_tests {
203    use super::CliService;
204    use crate::{
205        bus::CliBus, message::cli::CliSend, message::cli::CliSubscriptionRecv,
206        state::tab::TabsState,
207    };
208    use lifeline::{assert_completes, Bus, Receiver, Sender, Service};
209    use std::collections::HashMap;
210    use tab_api::{
211        chunk::InputChunk,
212        client::{InitResponse, Request, Response},
213        tab::{CreateTabMetadata, TabId, TabMetadata},
214    };
215
216    #[tokio::test]
217    async fn init() -> anyhow::Result<()> {
218        let cli_bus = CliBus::default();
219
220        // create an existing tab, then spawn the connection
221        let mut tx = cli_bus.tx::<TabsState>()?;
222        let mut tabs = TabsState::default();
223        let tab_id = TabId(0);
224        let tab_metadata = TabMetadata {
225            id: TabId(0),
226            name: "name".into(),
227            doc: Some("doc".into()),
228            dimensions: (1, 2),
229            env: HashMap::new(),
230            shell: "bash".into(),
231            dir: "/".into(),
232        };
233        tabs.tabs.insert(tab_id, tab_metadata.clone());
234        tx.send(tabs).await?;
235
236        let _service = CliService::spawn(&cli_bus)?;
237        let mut rx = cli_bus.rx::<Response>()?;
238
239        assert_completes!(async move {
240            let init = rx.recv().await;
241
242            let mut expect_tabs = InitResponse {
243                tabs: HashMap::new(),
244            };
245            expect_tabs.tabs.insert(tab_id, tab_metadata.clone());
246            assert_eq!(Some(Response::Init(expect_tabs)), init);
247
248            let tab_update = rx.recv().await;
249            assert_eq!(Some(Response::TabUpdate(tab_metadata)), tab_update);
250        });
251
252        Ok(())
253    }
254
255    #[tokio::test]
256    async fn subscribe() -> anyhow::Result<()> {
257        let cli_bus = CliBus::default();
258        let _service = CliService::spawn(&cli_bus)?;
259
260        let mut tx = cli_bus.tx::<Request>()?;
261        let mut rx = cli_bus.rx::<CliSubscriptionRecv>()?;
262
263        tx.send(Request::Subscribe(TabId(0))).await?;
264
265        assert_completes!(async move {
266            let msg = rx.recv().await;
267            assert_eq!(Some(CliSubscriptionRecv::Subscribe(TabId(0))), msg);
268        });
269
270        Ok(())
271    }
272
273    #[tokio::test]
274    async fn unsubscribe() -> anyhow::Result<()> {
275        let cli_bus = CliBus::default();
276        let _service = CliService::spawn(&cli_bus)?;
277
278        let mut tx = cli_bus.tx::<Request>()?;
279        let mut rx = cli_bus.rx::<CliSubscriptionRecv>()?;
280
281        tx.send(Request::Unsubscribe(TabId(0))).await?;
282
283        assert_completes!(async move {
284            let msg = rx.recv().await;
285            assert_eq!(Some(CliSubscriptionRecv::Unsubscribe(TabId(0))), msg);
286        });
287
288        Ok(())
289    }
290
291    #[tokio::test]
292    async fn input() -> anyhow::Result<()> {
293        let cli_bus = CliBus::default();
294        let _service = CliService::spawn(&cli_bus)?;
295
296        let mut tx = cli_bus.tx::<Request>()?;
297        let mut rx = cli_bus.rx::<CliSend>()?;
298
299        let input = InputChunk { data: vec![1u8] };
300        tx.send(Request::Input(TabId(0), input.clone())).await?;
301
302        assert_completes!(async move {
303            let msg = rx.recv().await;
304            assert_eq!(Some(CliSend::Input(TabId(0), input)), msg);
305        });
306
307        Ok(())
308    }
309
310    #[tokio::test]
311    async fn create_tab() -> anyhow::Result<()> {
312        let cli_bus = CliBus::default();
313        let _service = CliService::spawn(&cli_bus)?;
314
315        let mut tx = cli_bus.tx::<Request>()?;
316        let mut rx = cli_bus.rx::<CliSend>()?;
317
318        let mut env = HashMap::new();
319        env.insert("foo".into(), "bar".into());
320
321        let tab = CreateTabMetadata {
322            name: "name".into(),
323            doc: Some("doc".into()),
324            dimensions: (1, 2),
325            shell: "shell".into(),
326            dir: "/".into(),
327            env,
328        };
329        tx.send(Request::CreateTab(tab.clone())).await?;
330
331        assert_completes!(async move {
332            let msg = rx.recv().await;
333            assert_eq!(Some(CliSend::CreateTab(tab)), msg);
334        });
335
336        Ok(())
337    }
338
339    #[tokio::test]
340    async fn resize_tab() -> anyhow::Result<()> {
341        let cli_bus = CliBus::default();
342        let _service = CliService::spawn(&cli_bus)?;
343
344        let mut tx = cli_bus.tx::<Request>()?;
345        let mut rx = cli_bus.rx::<CliSend>()?;
346
347        tx.send(Request::ResizeTab(TabId(0), (1, 2))).await?;
348
349        assert_completes!(async move {
350            let msg = rx.recv().await;
351            assert_eq!(Some(CliSend::ResizeTab(TabId(0), (1, 2))), msg);
352        });
353
354        Ok(())
355    }
356
357    #[tokio::test]
358    async fn close_tab() -> anyhow::Result<()> {
359        let cli_bus = CliBus::default();
360        let _service = CliService::spawn(&cli_bus)?;
361
362        let mut tx = cli_bus.tx::<Request>()?;
363        let mut rx = cli_bus.rx::<CliSend>()?;
364
365        tx.send(Request::CloseTab(TabId(0))).await?;
366
367        assert_completes!(async move {
368            let msg = rx.recv().await;
369            assert_eq!(Some(CliSend::CloseTab(TabId(0))), msg);
370        });
371
372        Ok(())
373    }
374
375    #[tokio::test]
376    async fn disconnect_tab() -> anyhow::Result<()> {
377        let cli_bus = CliBus::default();
378        let _service = CliService::spawn(&cli_bus)?;
379
380        let mut tx = cli_bus.tx::<Request>()?;
381        let mut rx = cli_bus.rx::<CliSend>()?;
382
383        tx.send(Request::DisconnectTab(TabId(0))).await?;
384
385        assert_completes!(async move {
386            let msg = rx.recv().await;
387            assert_eq!(Some(CliSend::DisconnectTab(TabId(0))), msg);
388        });
389
390        Ok(())
391    }
392
393    #[tokio::test]
394    async fn retask() -> anyhow::Result<()> {
395        let cli_bus = CliBus::default();
396        let _service = CliService::spawn(&cli_bus)?;
397
398        let mut tx = cli_bus.tx::<Request>()?;
399        let mut rx = cli_bus.rx::<CliSend>()?;
400
401        tx.send(Request::Retask(TabId(0), TabId(1))).await?;
402
403        assert_completes!(async move {
404            let msg = rx.recv().await;
405            assert_eq!(Some(CliSend::Retask(TabId(0), TabId(1))), msg);
406        });
407
408        Ok(())
409    }
410
411    #[tokio::test]
412    async fn global_shutdown() -> anyhow::Result<()> {
413        let cli_bus = CliBus::default();
414        let _service = CliService::spawn(&cli_bus)?;
415
416        let mut tx = cli_bus.tx::<Request>()?;
417        let mut rx = cli_bus.rx::<CliSend>()?;
418
419        tx.send(Request::GlobalShutdown).await?;
420
421        assert_completes!(async move {
422            let msg = rx.recv().await;
423            assert_eq!(Some(CliSend::GlobalShutdown), msg);
424        });
425
426        Ok(())
427    }
428}
429
430#[cfg(test)]
431mod recv_tests {
432    use super::CliService;
433    use crate::{bus::CliBus, message::cli::CliRecv, message::cli::CliSubscriptionSend};
434    use lifeline::{assert_completes, Bus, Receiver, Sender, Service};
435    use std::collections::HashMap;
436    use tab_api::{
437        client::Response,
438        tab::{TabId, TabMetadata},
439    };
440
441    #[tokio::test]
442    async fn tab_started() -> anyhow::Result<()> {
443        let bus = CliBus::default();
444        let _service = CliService::spawn(&bus)?;
445
446        let mut tx = bus.tx::<CliRecv>()?;
447        let mut rx = bus.rx::<Response>()?;
448
449        let metadata = TabMetadata {
450            id: TabId(0),
451            name: "name".into(),
452            doc: Some("doc".into()),
453            dimensions: (1, 2),
454            env: HashMap::new(),
455            shell: "shell".into(),
456            dir: "/".into(),
457        };
458
459        tx.send(CliRecv::TabStarted(metadata.clone())).await?;
460
461        assert_completes!(async move {
462            let msg = rx.recv().await;
463            assert_eq!(Some(Response::TabUpdate(metadata)), msg);
464        });
465
466        Ok(())
467    }
468
469    #[tokio::test]
470    async fn tab_stopped() -> anyhow::Result<()> {
471        let bus = CliBus::default();
472        let _service = CliService::spawn(&bus)?;
473
474        let mut tx = bus.tx::<CliSubscriptionSend>()?;
475        let mut rx = bus.rx::<Response>()?;
476
477        tx.send(CliSubscriptionSend::Stopped(TabId(0))).await?;
478
479        assert_completes!(async move {
480            let msg = rx.recv().await;
481            assert_eq!(Some(Response::TabTerminated(TabId(0))), msg);
482        });
483
484        Ok(())
485    }
486
487    #[tokio::test]
488    async fn disconnect() -> anyhow::Result<()> {
489        let bus = CliBus::default();
490        let _service = CliService::spawn(&bus)?;
491
492        let mut tx = bus.tx::<CliSubscriptionSend>()?;
493        let mut rx = bus.rx::<Response>()?;
494
495        tx.send(CliSubscriptionSend::Disconnect).await?;
496
497        assert_completes!(async move {
498            let msg = rx.recv().await;
499            assert_eq!(Some(Response::Disconnect), msg);
500        });
501
502        Ok(())
503    }
504}