kratad/control/
attach_zone_console.rs

1use std::pin::Pin;
2use std::str::FromStr;
3
4use anyhow::{anyhow, Result};
5use async_stream::try_stream;
6use tokio::select;
7use tokio::sync::mpsc::channel;
8use tokio_stream::{Stream, StreamExt};
9use tonic::{Status, Streaming};
10use uuid::Uuid;
11
12use krata::v1::control::{ZoneConsoleReply, ZoneConsoleRequest};
13
14use crate::console::DaemonConsoleHandle;
15use crate::control::ApiError;
16
17enum ConsoleDataSelect {
18    Read(Option<Vec<u8>>),
19    Write(Option<Result<ZoneConsoleRequest, Status>>),
20}
21
22pub struct AttachZoneConsoleRpc {
23    console: DaemonConsoleHandle,
24}
25
26impl AttachZoneConsoleRpc {
27    pub fn new(console: DaemonConsoleHandle) -> Self {
28        Self { console }
29    }
30
31    pub async fn process(
32        self,
33        mut input: Streaming<ZoneConsoleRequest>,
34    ) -> Result<Pin<Box<dyn Stream<Item = Result<ZoneConsoleReply, Status>> + Send + 'static>>>
35    {
36        let Some(request) = input.next().await else {
37            return Err(anyhow!("expected to have at least one request"));
38        };
39        let request = request?;
40        let uuid = Uuid::from_str(&request.zone_id)?;
41        let (sender, mut receiver) = channel(100);
42        let console = self
43            .console
44            .attach(uuid, sender)
45            .await
46            .map_err(|error| anyhow!("failed to attach to console: {}", error))?;
47
48        let output = try_stream! {
49            if request.replay_history {
50                yield ZoneConsoleReply { data: console.initial.clone(), };
51            }
52            loop {
53                let what = select! {
54                    x = receiver.recv() => ConsoleDataSelect::Read(x),
55                    x = input.next() => ConsoleDataSelect::Write(x),
56                };
57
58                match what {
59                    ConsoleDataSelect::Read(Some(data)) => {
60                        yield ZoneConsoleReply { data, };
61                    },
62
63                    ConsoleDataSelect::Read(None) => {
64                        break;
65                    }
66
67                    ConsoleDataSelect::Write(Some(request)) => {
68                        let request = request?;
69                        if !request.data.is_empty() {
70                            console.send(request.data).await.map_err(|error| ApiError {
71                                message: error.to_string(),
72                            })?;
73                        }
74                    },
75
76                    ConsoleDataSelect::Write(None) => {
77                        break;
78                    }
79                }
80            }
81        };
82        Ok(Box::pin(output))
83    }
84}