1use crate::message::cli::{
3 CliRecv, CliSend, CliShutdown, CliSubscriptionRecv, CliSubscriptionSend,
4};
5use crate::prelude::*;
6use crate::state::tab::TabsState;
7use anyhow::Context;
8use tab_api::client::InitResponse;
9
10use tokio::stream::StreamExt;
11
12pub mod subscription;
13
14pub struct CliService {
17 _init: Lifeline,
18 _rx_websocket: Lifeline,
19 _rx_daemon: Lifeline,
20 _rx_subscription: Lifeline,
21}
22
23impl Service for CliService {
24 type Bus = CliBus;
25 type Lifeline = anyhow::Result<Self>;
26
27 fn spawn(bus: &Self::Bus) -> Self::Lifeline {
28 let _init = {
29 let mut tx_websocket = bus.tx::<Response>()?;
30 let mut rx_tabs_state = bus.rx::<TabsState>()?;
31
32 Self::try_task("init", async move {
33 let tabs = rx_tabs_state
34 .recv()
35 .await
36 .ok_or_else(|| anyhow::Error::msg("rx TabsState closed"))?;
37
38 let init = InitResponse {
39 tabs: tabs.tabs.clone(),
40 };
41
42 let init = Response::Init(init);
43 tx_websocket.send(init).await?;
44
45 for tab in tabs.tabs.values() {
46 debug!("notifying client of existing tab {}", &tab.name);
47 let message = Response::TabUpdate(tab.clone());
48 tx_websocket.send(message).await?;
49 }
50
51 Ok(())
52 })
53 };
54
55 let _rx_websocket = {
56 let mut rx = bus.rx::<Request>()?;
57
58 let mut tx_daemon = bus.tx::<CliSend>()?;
59 let mut tx_subscription = bus.tx::<CliSubscriptionRecv>()?;
60 let mut tx_shutdown = bus.tx::<CliShutdown>()?;
61
62 Self::try_task("run", async move {
63 debug!("cli connection waiting for messages");
64
65 while let Some(msg) = rx.recv().await {
66 Self::recv_websocket(msg, &mut tx_subscription, &mut tx_daemon).await?
67 }
68
69 tx_shutdown.send(CliShutdown {}).await?;
70
71 Ok(())
72 })
73 };
74
75 let _rx_daemon = {
76 let mut rx = bus.rx::<CliRecv>()?;
77
78 let mut tx_websocket = bus.tx::<Response>()?;
79
80 Self::try_task("run", async move {
81 while let Some(msg) = rx.next().await {
82 Self::recv_daemon(msg, &mut tx_websocket).await?
83 }
84
85 Ok(())
86 })
87 };
88
89 let _rx_subscription = {
90 let mut rx = bus.rx::<CliSubscriptionSend>()?;
91
92 let mut tx = bus.tx::<Response>()?;
93
94 Self::try_task("run", async move {
95 debug!("cli connection waiting for messages");
96
97 while let Some(msg) = rx.recv().await {
98 match msg {
99 CliSubscriptionSend::Retask(id) => {
100 tx.send(Response::Retask(id)).await?;
101 }
102 CliSubscriptionSend::Output(id, chunk) => {
103 tx.send(Response::Output(id, chunk)).await?;
104 }
105 CliSubscriptionSend::Stopped(id) => {
106 tx.send(Response::TabTerminated(id)).await?;
107 }
108 CliSubscriptionSend::Disconnect => {
109 tx.send(Response::Disconnect).await?;
110 }
111 }
112 }
113
114 Ok(())
115 })
116 };
117
118 Ok(CliService {
119 _init,
120 _rx_websocket,
121 _rx_daemon,
122 _rx_subscription,
123 })
124 }
125}
126
127impl CliService {
128 async fn recv_websocket(
129 request: Request,
130 tx_subscription: &mut impl Sender<CliSubscriptionRecv>,
131 tx_daemon: &mut impl Sender<CliSend>,
132 ) -> anyhow::Result<()> {
133 debug!("received Request: {:?}", &request);
134
135 match request {
136 Request::Subscribe(id) => {
137 debug!("client subscribing to tab {}", id);
138 tx_subscription
139 .send(CliSubscriptionRecv::Subscribe(id))
140 .await
141 .context("tx_subscription closed")?;
142 }
143 Request::Unsubscribe(id) => {
144 debug!("client subscribing from tab {}", id);
145 tx_subscription
146 .send(CliSubscriptionRecv::Unsubscribe(id))
147 .await
148 .context("tx_subscription closed")?;
149 }
150 Request::Input(id, stdin) => {
151 debug!("rx input on tab {}, data: {}", id.0, stdin.to_string());
152 let message = CliSend::Input(id, stdin);
153 tx_daemon.send(message).await.context("tx_daemon closed")?;
154 }
155 Request::CreateTab(create) => {
156 let message = CliSend::CreateTab(create);
157 tx_daemon.send(message).await.context("tx_daemon closed")?;
158 }
159 Request::ResizeTab(id, dimensions) => {
160 info!("Resizing tab {} to {:?}", id.0, dimensions);
161 tx_daemon.send(CliSend::ResizeTab(id, dimensions)).await?;
162 }
163 Request::CloseTab(id) => {
164 let message = CliSend::CloseTab(id);
165 tx_daemon.send(message).await.context("tx_daemon closed")?;
166 }
167 Request::DisconnectTab(id) => {
168 let message = CliSend::DisconnectTab(id);
169 tx_daemon.send(message).await.context("tx_daemon closed")?;
170 }
171 Request::Retask(id, name) => {
172 let message = CliSend::Retask(id, name);
174 tx_daemon.send(message).await?;
175 }
176 Request::GlobalShutdown => {
177 tx_daemon.send(CliSend::GlobalShutdown).await?;
178 }
179 }
180
181 Ok(())
182 }
183
184 async fn recv_daemon(
185 msg: CliRecv,
186 tx_websocket: &mut impl Sender<Response>,
187 ) -> anyhow::Result<()> {
188 debug!("message from daemon: {:?}", &msg);
189 match msg {
190 CliRecv::TabStarted(metadata) => {
191 tx_websocket
192 .send(Response::TabUpdate(metadata))
193 .await
194 .context("tx_websocket closed")?;
195 }
196 }
197 Ok(())
198 }
199}
200
201#[cfg(test)]
202mod request_tests {
203 use super::CliService;
204 use crate::{
205 bus::CliBus, message::cli::CliSend, message::cli::CliSubscriptionRecv,
206 state::tab::TabsState,
207 };
208 use lifeline::{assert_completes, Bus, Receiver, Sender, Service};
209 use std::collections::HashMap;
210 use tab_api::{
211 chunk::InputChunk,
212 client::{InitResponse, Request, Response},
213 tab::{CreateTabMetadata, TabId, TabMetadata},
214 };
215
216 #[tokio::test]
217 async fn init() -> anyhow::Result<()> {
218 let cli_bus = CliBus::default();
219
220 let mut tx = cli_bus.tx::<TabsState>()?;
222 let mut tabs = TabsState::default();
223 let tab_id = TabId(0);
224 let tab_metadata = TabMetadata {
225 id: TabId(0),
226 name: "name".into(),
227 doc: Some("doc".into()),
228 dimensions: (1, 2),
229 env: HashMap::new(),
230 shell: "bash".into(),
231 dir: "/".into(),
232 };
233 tabs.tabs.insert(tab_id, tab_metadata.clone());
234 tx.send(tabs).await?;
235
236 let _service = CliService::spawn(&cli_bus)?;
237 let mut rx = cli_bus.rx::<Response>()?;
238
239 assert_completes!(async move {
240 let init = rx.recv().await;
241
242 let mut expect_tabs = InitResponse {
243 tabs: HashMap::new(),
244 };
245 expect_tabs.tabs.insert(tab_id, tab_metadata.clone());
246 assert_eq!(Some(Response::Init(expect_tabs)), init);
247
248 let tab_update = rx.recv().await;
249 assert_eq!(Some(Response::TabUpdate(tab_metadata)), tab_update);
250 });
251
252 Ok(())
253 }
254
255 #[tokio::test]
256 async fn subscribe() -> anyhow::Result<()> {
257 let cli_bus = CliBus::default();
258 let _service = CliService::spawn(&cli_bus)?;
259
260 let mut tx = cli_bus.tx::<Request>()?;
261 let mut rx = cli_bus.rx::<CliSubscriptionRecv>()?;
262
263 tx.send(Request::Subscribe(TabId(0))).await?;
264
265 assert_completes!(async move {
266 let msg = rx.recv().await;
267 assert_eq!(Some(CliSubscriptionRecv::Subscribe(TabId(0))), msg);
268 });
269
270 Ok(())
271 }
272
273 #[tokio::test]
274 async fn unsubscribe() -> anyhow::Result<()> {
275 let cli_bus = CliBus::default();
276 let _service = CliService::spawn(&cli_bus)?;
277
278 let mut tx = cli_bus.tx::<Request>()?;
279 let mut rx = cli_bus.rx::<CliSubscriptionRecv>()?;
280
281 tx.send(Request::Unsubscribe(TabId(0))).await?;
282
283 assert_completes!(async move {
284 let msg = rx.recv().await;
285 assert_eq!(Some(CliSubscriptionRecv::Unsubscribe(TabId(0))), msg);
286 });
287
288 Ok(())
289 }
290
291 #[tokio::test]
292 async fn input() -> anyhow::Result<()> {
293 let cli_bus = CliBus::default();
294 let _service = CliService::spawn(&cli_bus)?;
295
296 let mut tx = cli_bus.tx::<Request>()?;
297 let mut rx = cli_bus.rx::<CliSend>()?;
298
299 let input = InputChunk { data: vec![1u8] };
300 tx.send(Request::Input(TabId(0), input.clone())).await?;
301
302 assert_completes!(async move {
303 let msg = rx.recv().await;
304 assert_eq!(Some(CliSend::Input(TabId(0), input)), msg);
305 });
306
307 Ok(())
308 }
309
310 #[tokio::test]
311 async fn create_tab() -> anyhow::Result<()> {
312 let cli_bus = CliBus::default();
313 let _service = CliService::spawn(&cli_bus)?;
314
315 let mut tx = cli_bus.tx::<Request>()?;
316 let mut rx = cli_bus.rx::<CliSend>()?;
317
318 let mut env = HashMap::new();
319 env.insert("foo".into(), "bar".into());
320
321 let tab = CreateTabMetadata {
322 name: "name".into(),
323 doc: Some("doc".into()),
324 dimensions: (1, 2),
325 shell: "shell".into(),
326 dir: "/".into(),
327 env,
328 };
329 tx.send(Request::CreateTab(tab.clone())).await?;
330
331 assert_completes!(async move {
332 let msg = rx.recv().await;
333 assert_eq!(Some(CliSend::CreateTab(tab)), msg);
334 });
335
336 Ok(())
337 }
338
339 #[tokio::test]
340 async fn resize_tab() -> anyhow::Result<()> {
341 let cli_bus = CliBus::default();
342 let _service = CliService::spawn(&cli_bus)?;
343
344 let mut tx = cli_bus.tx::<Request>()?;
345 let mut rx = cli_bus.rx::<CliSend>()?;
346
347 tx.send(Request::ResizeTab(TabId(0), (1, 2))).await?;
348
349 assert_completes!(async move {
350 let msg = rx.recv().await;
351 assert_eq!(Some(CliSend::ResizeTab(TabId(0), (1, 2))), msg);
352 });
353
354 Ok(())
355 }
356
357 #[tokio::test]
358 async fn close_tab() -> anyhow::Result<()> {
359 let cli_bus = CliBus::default();
360 let _service = CliService::spawn(&cli_bus)?;
361
362 let mut tx = cli_bus.tx::<Request>()?;
363 let mut rx = cli_bus.rx::<CliSend>()?;
364
365 tx.send(Request::CloseTab(TabId(0))).await?;
366
367 assert_completes!(async move {
368 let msg = rx.recv().await;
369 assert_eq!(Some(CliSend::CloseTab(TabId(0))), msg);
370 });
371
372 Ok(())
373 }
374
375 #[tokio::test]
376 async fn disconnect_tab() -> anyhow::Result<()> {
377 let cli_bus = CliBus::default();
378 let _service = CliService::spawn(&cli_bus)?;
379
380 let mut tx = cli_bus.tx::<Request>()?;
381 let mut rx = cli_bus.rx::<CliSend>()?;
382
383 tx.send(Request::DisconnectTab(TabId(0))).await?;
384
385 assert_completes!(async move {
386 let msg = rx.recv().await;
387 assert_eq!(Some(CliSend::DisconnectTab(TabId(0))), msg);
388 });
389
390 Ok(())
391 }
392
393 #[tokio::test]
394 async fn retask() -> anyhow::Result<()> {
395 let cli_bus = CliBus::default();
396 let _service = CliService::spawn(&cli_bus)?;
397
398 let mut tx = cli_bus.tx::<Request>()?;
399 let mut rx = cli_bus.rx::<CliSend>()?;
400
401 tx.send(Request::Retask(TabId(0), TabId(1))).await?;
402
403 assert_completes!(async move {
404 let msg = rx.recv().await;
405 assert_eq!(Some(CliSend::Retask(TabId(0), TabId(1))), msg);
406 });
407
408 Ok(())
409 }
410
411 #[tokio::test]
412 async fn global_shutdown() -> anyhow::Result<()> {
413 let cli_bus = CliBus::default();
414 let _service = CliService::spawn(&cli_bus)?;
415
416 let mut tx = cli_bus.tx::<Request>()?;
417 let mut rx = cli_bus.rx::<CliSend>()?;
418
419 tx.send(Request::GlobalShutdown).await?;
420
421 assert_completes!(async move {
422 let msg = rx.recv().await;
423 assert_eq!(Some(CliSend::GlobalShutdown), msg);
424 });
425
426 Ok(())
427 }
428}
429
430#[cfg(test)]
431mod recv_tests {
432 use super::CliService;
433 use crate::{bus::CliBus, message::cli::CliRecv, message::cli::CliSubscriptionSend};
434 use lifeline::{assert_completes, Bus, Receiver, Sender, Service};
435 use std::collections::HashMap;
436 use tab_api::{
437 client::Response,
438 tab::{TabId, TabMetadata},
439 };
440
441 #[tokio::test]
442 async fn tab_started() -> anyhow::Result<()> {
443 let bus = CliBus::default();
444 let _service = CliService::spawn(&bus)?;
445
446 let mut tx = bus.tx::<CliRecv>()?;
447 let mut rx = bus.rx::<Response>()?;
448
449 let metadata = TabMetadata {
450 id: TabId(0),
451 name: "name".into(),
452 doc: Some("doc".into()),
453 dimensions: (1, 2),
454 env: HashMap::new(),
455 shell: "shell".into(),
456 dir: "/".into(),
457 };
458
459 tx.send(CliRecv::TabStarted(metadata.clone())).await?;
460
461 assert_completes!(async move {
462 let msg = rx.recv().await;
463 assert_eq!(Some(Response::TabUpdate(metadata)), msg);
464 });
465
466 Ok(())
467 }
468
469 #[tokio::test]
470 async fn tab_stopped() -> anyhow::Result<()> {
471 let bus = CliBus::default();
472 let _service = CliService::spawn(&bus)?;
473
474 let mut tx = bus.tx::<CliSubscriptionSend>()?;
475 let mut rx = bus.rx::<Response>()?;
476
477 tx.send(CliSubscriptionSend::Stopped(TabId(0))).await?;
478
479 assert_completes!(async move {
480 let msg = rx.recv().await;
481 assert_eq!(Some(Response::TabTerminated(TabId(0))), msg);
482 });
483
484 Ok(())
485 }
486
487 #[tokio::test]
488 async fn disconnect() -> anyhow::Result<()> {
489 let bus = CliBus::default();
490 let _service = CliService::spawn(&bus)?;
491
492 let mut tx = bus.tx::<CliSubscriptionSend>()?;
493 let mut rx = bus.rx::<Response>()?;
494
495 tx.send(CliSubscriptionSend::Disconnect).await?;
496
497 assert_completes!(async move {
498 let msg = rx.recv().await;
499 assert_eq!(Some(Response::Disconnect), msg);
500 });
501
502 Ok(())
503 }
504}