1use anyhow::Result;
2use async_stream::stream;
3use crossterm::{
4 terminal::{disable_raw_mode, enable_raw_mode, is_raw_mode_enabled},
5 tty::IsTty,
6};
7use krata::v1::common::ZoneState;
8use krata::{
9 events::EventStream,
10 v1::common::TerminalSize,
11 v1::control::{
12 watch_events_reply::Event, ExecInsideZoneReply, ExecInsideZoneRequest, ZoneConsoleReply,
13 ZoneConsoleRequest,
14 },
15};
16use log::debug;
17use tokio::{
18 io::{stderr, stdin, stdout, AsyncReadExt, AsyncWriteExt},
19 select,
20 task::JoinHandle,
21};
22use tokio_stream::{Stream, StreamExt};
23use tonic::Streaming;
24
25pub struct StdioConsoleStream;
26
27enum ExecStdinSelect {
28 DataRead(std::io::Result<usize>),
29 TerminalResize,
30}
31
32impl StdioConsoleStream {
33 pub async fn stdin_stream(
34 zone: String,
35 replay_history: bool,
36 ) -> impl Stream<Item = ZoneConsoleRequest> {
37 let mut stdin = stdin();
38 stream! {
39 yield ZoneConsoleRequest { zone_id: zone, replay_history, data: vec![] };
40
41 let mut buffer = vec![0u8; 60];
42 loop {
43 let size = match stdin.read(&mut buffer).await {
44 Ok(size) => size,
45 Err(error) => {
46 debug!("failed to read stdin: {}", error);
47 break;
48 }
49 };
50 let data = buffer[0..size].to_vec();
51 if size == 1 && buffer[0] == 0x1d {
52 break;
53 }
54 yield ZoneConsoleRequest { zone_id: String::default(), replay_history, data };
55 }
56 }
57 }
58
59 #[cfg(unix)]
60 pub async fn input_stream_exec(
61 initial: ExecInsideZoneRequest,
62 tty: bool,
63 ) -> impl Stream<Item = ExecInsideZoneRequest> {
64 let mut stdin = stdin();
65 stream! {
66 yield initial;
67
68 let mut buffer = vec![0u8; 60];
69 let mut terminal_size_change = if tty {
70 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::window_change()).ok()
71 } else {
72 None
73 };
74 let mut stdin_closed = false;
75 loop {
76 let selected = if let Some(ref mut terminal_size_change) = terminal_size_change {
77 if stdin_closed {
78 select! {
79 _ = terminal_size_change.recv() => ExecStdinSelect::TerminalResize,
80 }
81 } else {
82 select! {
83 result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result),
84 _ = terminal_size_change.recv() => ExecStdinSelect::TerminalResize,
85 }
86 }
87 } else {
88 select! {
89 result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result),
90 }
91 };
92
93 match selected {
94 ExecStdinSelect::DataRead(result) => {
95 match result {
96 Ok(size) => {
97 let stdin = buffer[0..size].to_vec();
98 if size == 1 && buffer[0] == 0x1d {
99 break;
100 }
101 stdin_closed = size == 0;
102 yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: None, stdin, stdin_closed, };
103 },
104 Err(error) => {
105 debug!("failed to read stdin: {}", error);
106 break;
107 }
108 }
109 },
110 ExecStdinSelect::TerminalResize => {
111 if let Ok((columns, rows)) = crossterm::terminal::size() {
112 yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: Some(TerminalSize {
113 rows: rows as u32,
114 columns: columns as u32,
115 }), stdin: vec![], stdin_closed: false, };
116 }
117 }
118 }
119 }
120 }
121 }
122
123 #[cfg(not(unix))]
124 pub async fn input_stream_exec(
125 initial: ExecInsideZoneRequest,
126 _tty: bool,
127 ) -> impl Stream<Item = ExecInsideZoneRequest> {
128 let mut stdin = stdin();
129 stream! {
130 yield initial;
131
132 let mut buffer = vec![0u8; 60];
133 let mut stdin_closed = false;
134 loop {
135 let selected = select! {
136 result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result),
137 };
138
139 match selected {
140 ExecStdinSelect::DataRead(result) => {
141 match result {
142 Ok(size) => {
143 let stdin = buffer[0..size].to_vec();
144 if size == 1 && buffer[0] == 0x1d {
145 break;
146 }
147 stdin_closed = size == 0;
148 yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: None, stdin, stdin_closed, };
149 },
150 Err(error) => {
151 debug!("failed to read stdin: {}", error);
152 break;
153 }
154 }
155 },
156 _ => {
157 continue;
158 }
159 }
160 }
161 }
162 }
163
164 pub async fn stdout(mut stream: Streaming<ZoneConsoleReply>, raw: bool) -> Result<()> {
165 if raw && stdin().is_tty() {
166 enable_raw_mode()?;
167 StdioConsoleStream::register_terminal_restore_hook()?;
168 }
169 let mut stdout = stdout();
170 while let Some(reply) = stream.next().await {
171 let reply = reply?;
172 if reply.data.is_empty() {
173 continue;
174 }
175 stdout.write_all(&reply.data).await?;
176 stdout.flush().await?;
177 }
178 Ok(())
179 }
180
181 pub async fn exec_output(mut stream: Streaming<ExecInsideZoneReply>, raw: bool) -> Result<i32> {
182 if raw {
183 enable_raw_mode()?;
184 StdioConsoleStream::register_terminal_restore_hook()?;
185 }
186 let mut stdout = stdout();
187 let mut stderr = stderr();
188 while let Some(reply) = stream.next().await {
189 let reply = reply?;
190 if !reply.stdout.is_empty() {
191 stdout.write_all(&reply.stdout).await?;
192 stdout.flush().await?;
193 }
194
195 if !reply.stderr.is_empty() {
196 stderr.write_all(&reply.stderr).await?;
197 stderr.flush().await?;
198 }
199
200 if reply.exited {
201 return if reply.error.is_empty() {
202 Ok(reply.exit_code)
203 } else {
204 StdioConsoleStream::restore_terminal_mode();
205 stderr
206 .write_all(format!("Error: exec failed: {}\n", reply.error).as_bytes())
207 .await?;
208 stderr.flush().await?;
209 Ok(-1)
210 };
211 }
212 }
213 Ok(-1)
214 }
215
216 pub async fn zone_exit_hook(
217 id: String,
218 events: EventStream,
219 ) -> Result<JoinHandle<Option<i32>>> {
220 Ok(tokio::task::spawn(async move {
221 let mut stream = events.subscribe();
222 while let Ok(event) = stream.recv().await {
223 let Event::ZoneChanged(changed) = event;
224 let Some(zone) = changed.zone else {
225 continue;
226 };
227
228 let Some(status) = zone.status else {
229 continue;
230 };
231
232 if zone.id != id {
233 continue;
234 }
235
236 if let Some(exit_status) = status.exit_status {
237 return Some(exit_status.code);
238 }
239
240 let state = status.state();
241 if state == ZoneState::Destroying || state == ZoneState::Destroyed {
242 return Some(10);
243 }
244 }
245 None
246 }))
247 }
248
249 fn register_terminal_restore_hook() -> Result<()> {
250 if stdin().is_tty() {
251 ctrlc::set_handler(move || {
252 StdioConsoleStream::restore_terminal_mode();
253 })?;
254 }
255 Ok(())
256 }
257
258 pub fn restore_terminal_mode() {
259 if is_raw_mode_enabled().unwrap_or(false) {
260 let _ = disable_raw_mode();
261 }
262 }
263}