kratad/control/
attach_zone_console.rs1use std::pin::Pin;
2use std::str::FromStr;
3
4use anyhow::{anyhow, Result};
5use async_stream::try_stream;
6use tokio::select;
7use tokio::sync::mpsc::channel;
8use tokio_stream::{Stream, StreamExt};
9use tonic::{Status, Streaming};
10use uuid::Uuid;
11
12use krata::v1::control::{ZoneConsoleReply, ZoneConsoleRequest};
13
14use crate::console::DaemonConsoleHandle;
15use crate::control::ApiError;
16
17enum ConsoleDataSelect {
18 Read(Option<Vec<u8>>),
19 Write(Option<Result<ZoneConsoleRequest, Status>>),
20}
21
22pub struct AttachZoneConsoleRpc {
23 console: DaemonConsoleHandle,
24}
25
26impl AttachZoneConsoleRpc {
27 pub fn new(console: DaemonConsoleHandle) -> Self {
28 Self { console }
29 }
30
31 pub async fn process(
32 self,
33 mut input: Streaming<ZoneConsoleRequest>,
34 ) -> Result<Pin<Box<dyn Stream<Item = Result<ZoneConsoleReply, Status>> + Send + 'static>>>
35 {
36 let Some(request) = input.next().await else {
37 return Err(anyhow!("expected to have at least one request"));
38 };
39 let request = request?;
40 let uuid = Uuid::from_str(&request.zone_id)?;
41 let (sender, mut receiver) = channel(100);
42 let console = self
43 .console
44 .attach(uuid, sender)
45 .await
46 .map_err(|error| anyhow!("failed to attach to console: {}", error))?;
47
48 let output = try_stream! {
49 if request.replay_history {
50 yield ZoneConsoleReply { data: console.initial.clone(), };
51 }
52 loop {
53 let what = select! {
54 x = receiver.recv() => ConsoleDataSelect::Read(x),
55 x = input.next() => ConsoleDataSelect::Write(x),
56 };
57
58 match what {
59 ConsoleDataSelect::Read(Some(data)) => {
60 yield ZoneConsoleReply { data, };
61 },
62
63 ConsoleDataSelect::Read(None) => {
64 break;
65 }
66
67 ConsoleDataSelect::Write(Some(request)) => {
68 let request = request?;
69 if !request.data.is_empty() {
70 console.send(request.data).await.map_err(|error| ApiError {
71 message: error.to_string(),
72 })?;
73 }
74 },
75
76 ConsoleDataSelect::Write(None) => {
77 break;
78 }
79 }
80 }
81 };
82 Ok(Box::pin(output))
83 }
84}