kratad/control/
exec_inside_zone.rs

1use std::pin::Pin;
2use std::str::FromStr;
3
4use anyhow::{anyhow, Result};
5use async_stream::try_stream;
6use tokio::select;
7use tokio_stream::{Stream, StreamExt};
8use tonic::{Status, Streaming};
9use uuid::Uuid;
10
11use krata::idm::internal::Request;
12use krata::{
13    idm::internal::{
14        exec_stream_request_update::Update, request::Request as IdmRequestType,
15        response::Response as IdmResponseType, ExecEnvVar, ExecStreamRequestStart,
16        ExecStreamRequestStdin, ExecStreamRequestTerminalSize, ExecStreamRequestUpdate,
17        Request as IdmRequest,
18    },
19    v1::control::{ExecInsideZoneReply, ExecInsideZoneRequest},
20};
21
22use crate::control::ApiError;
23use crate::idm::DaemonIdmHandle;
24
25pub struct ExecInsideZoneRpc {
26    idm: DaemonIdmHandle,
27}
28
29impl ExecInsideZoneRpc {
30    pub fn new(idm: DaemonIdmHandle) -> Self {
31        Self { idm }
32    }
33
34    pub async fn process(
35        self,
36        mut input: Streaming<ExecInsideZoneRequest>,
37    ) -> Result<Pin<Box<dyn Stream<Item = Result<ExecInsideZoneReply, Status>> + Send + 'static>>>
38    {
39        let Some(request) = input.next().await else {
40            return Err(anyhow!("expected to have at least one request"));
41        };
42        let request = request?;
43
44        let Some(task) = request.task else {
45            return Err(anyhow!("task is missing"));
46        };
47
48        let uuid = Uuid::from_str(&request.zone_id)?;
49        let idm = self.idm.client(uuid).await?;
50
51        let idm_request = Request {
52            request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
53                update: Some(Update::Start(ExecStreamRequestStart {
54                    environment: task
55                        .environment
56                        .into_iter()
57                        .map(|x| ExecEnvVar {
58                            key: x.key,
59                            value: x.value,
60                        })
61                        .collect(),
62                    command: task.command,
63                    working_directory: task.working_directory,
64                    tty: task.tty,
65                    terminal_size: request.terminal_size.map(|size| {
66                        ExecStreamRequestTerminalSize {
67                            rows: size.rows,
68                            columns: size.columns,
69                        }
70                    }),
71                })),
72            })),
73        };
74
75        let output = try_stream! {
76            let mut handle = idm.send_stream(idm_request).await.map_err(|x| ApiError {
77                message: x.to_string(),
78            })?;
79
80            loop {
81                select! {
82                    x = input.next() => if let Some(update) = x {
83                        let update: Result<ExecInsideZoneRequest, Status> = update.map_err(|error| ApiError {
84                            message: error.to_string()
85                        }.into());
86
87                        if let Ok(update) = update {
88                            if !update.stdin.is_empty() {
89                                let _ = handle.update(IdmRequest {
90                                    request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
91                                        update: Some(Update::Stdin(ExecStreamRequestStdin {
92                                            data: update.stdin,
93                                            closed: update.stdin_closed,
94                                        })),
95                                    }))}).await;
96                            }
97
98                            if let Some(ref terminal_size) = update.terminal_size {
99                                let _ = handle.update(IdmRequest {
100                                    request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
101                                        update: Some(Update::TerminalResize(ExecStreamRequestTerminalSize {
102                                            rows: terminal_size.rows,
103                                            columns: terminal_size.columns,
104                                        })),
105                                    }))}).await;
106                            }
107                        }
108                    },
109                    x = handle.receiver.recv() => match x {
110                        Some(response) => {
111                            let Some(IdmResponseType::ExecStream(update)) = response.response else {
112                                break;
113                            };
114                            let reply = ExecInsideZoneReply {
115                                exited: update.exited,
116                                error: update.error,
117                                exit_code: update.exit_code,
118                                stdout: update.stdout,
119                                stderr: update.stderr,
120                            };
121                            yield reply;
122                        },
123                        None => {
124                            break;
125                        }
126                    }
127                }
128            }
129        };
130
131        Ok(Box::pin(output))
132    }
133}