kratad/control/
exec_inside_zone.rs1use std::pin::Pin;
2use std::str::FromStr;
3
4use anyhow::{anyhow, Result};
5use async_stream::try_stream;
6use tokio::select;
7use tokio_stream::{Stream, StreamExt};
8use tonic::{Status, Streaming};
9use uuid::Uuid;
10
11use krata::idm::internal::Request;
12use krata::{
13 idm::internal::{
14 exec_stream_request_update::Update, request::Request as IdmRequestType,
15 response::Response as IdmResponseType, ExecEnvVar, ExecStreamRequestStart,
16 ExecStreamRequestStdin, ExecStreamRequestTerminalSize, ExecStreamRequestUpdate,
17 Request as IdmRequest,
18 },
19 v1::control::{ExecInsideZoneReply, ExecInsideZoneRequest},
20};
21
22use crate::control::ApiError;
23use crate::idm::DaemonIdmHandle;
24
25pub struct ExecInsideZoneRpc {
26 idm: DaemonIdmHandle,
27}
28
29impl ExecInsideZoneRpc {
30 pub fn new(idm: DaemonIdmHandle) -> Self {
31 Self { idm }
32 }
33
34 pub async fn process(
35 self,
36 mut input: Streaming<ExecInsideZoneRequest>,
37 ) -> Result<Pin<Box<dyn Stream<Item = Result<ExecInsideZoneReply, Status>> + Send + 'static>>>
38 {
39 let Some(request) = input.next().await else {
40 return Err(anyhow!("expected to have at least one request"));
41 };
42 let request = request?;
43
44 let Some(task) = request.task else {
45 return Err(anyhow!("task is missing"));
46 };
47
48 let uuid = Uuid::from_str(&request.zone_id)?;
49 let idm = self.idm.client(uuid).await?;
50
51 let idm_request = Request {
52 request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
53 update: Some(Update::Start(ExecStreamRequestStart {
54 environment: task
55 .environment
56 .into_iter()
57 .map(|x| ExecEnvVar {
58 key: x.key,
59 value: x.value,
60 })
61 .collect(),
62 command: task.command,
63 working_directory: task.working_directory,
64 tty: task.tty,
65 terminal_size: request.terminal_size.map(|size| {
66 ExecStreamRequestTerminalSize {
67 rows: size.rows,
68 columns: size.columns,
69 }
70 }),
71 })),
72 })),
73 };
74
75 let output = try_stream! {
76 let mut handle = idm.send_stream(idm_request).await.map_err(|x| ApiError {
77 message: x.to_string(),
78 })?;
79
80 loop {
81 select! {
82 x = input.next() => if let Some(update) = x {
83 let update: Result<ExecInsideZoneRequest, Status> = update.map_err(|error| ApiError {
84 message: error.to_string()
85 }.into());
86
87 if let Ok(update) = update {
88 if !update.stdin.is_empty() {
89 let _ = handle.update(IdmRequest {
90 request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
91 update: Some(Update::Stdin(ExecStreamRequestStdin {
92 data: update.stdin,
93 closed: update.stdin_closed,
94 })),
95 }))}).await;
96 }
97
98 if let Some(ref terminal_size) = update.terminal_size {
99 let _ = handle.update(IdmRequest {
100 request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
101 update: Some(Update::TerminalResize(ExecStreamRequestTerminalSize {
102 rows: terminal_size.rows,
103 columns: terminal_size.columns,
104 })),
105 }))}).await;
106 }
107 }
108 },
109 x = handle.receiver.recv() => match x {
110 Some(response) => {
111 let Some(IdmResponseType::ExecStream(update)) = response.response else {
112 break;
113 };
114 let reply = ExecInsideZoneReply {
115 exited: update.exited,
116 error: update.error,
117 exit_code: update.exit_code,
118 stdout: update.stdout,
119 stderr: update.stderr,
120 };
121 yield reply;
122 },
123 None => {
124 break;
125 }
126 }
127 }
128 }
129 };
130
131 Ok(Box::pin(output))
132 }
133}