kratactl/
console.rs

1use anyhow::Result;
2use async_stream::stream;
3use crossterm::{
4    terminal::{disable_raw_mode, enable_raw_mode, is_raw_mode_enabled},
5    tty::IsTty,
6};
7use krata::v1::common::ZoneState;
8use krata::{
9    events::EventStream,
10    v1::common::TerminalSize,
11    v1::control::{
12        watch_events_reply::Event, ExecInsideZoneReply, ExecInsideZoneRequest, ZoneConsoleReply,
13        ZoneConsoleRequest,
14    },
15};
16use log::debug;
17use tokio::{
18    io::{stderr, stdin, stdout, AsyncReadExt, AsyncWriteExt},
19    select,
20    task::JoinHandle,
21};
22use tokio_stream::{Stream, StreamExt};
23use tonic::Streaming;
24
25pub struct StdioConsoleStream;
26
27enum ExecStdinSelect {
28    DataRead(std::io::Result<usize>),
29    TerminalResize,
30}
31
32impl StdioConsoleStream {
33    pub async fn stdin_stream(
34        zone: String,
35        replay_history: bool,
36    ) -> impl Stream<Item = ZoneConsoleRequest> {
37        let mut stdin = stdin();
38        stream! {
39            yield ZoneConsoleRequest { zone_id: zone, replay_history, data: vec![] };
40
41            let mut buffer = vec![0u8; 60];
42            loop {
43                let size = match stdin.read(&mut buffer).await {
44                    Ok(size) => size,
45                    Err(error) => {
46                        debug!("failed to read stdin: {}", error);
47                        break;
48                    }
49                };
50                let data = buffer[0..size].to_vec();
51                if size == 1 && buffer[0] == 0x1d {
52                    break;
53                }
54                yield ZoneConsoleRequest { zone_id: String::default(), replay_history, data };
55            }
56        }
57    }
58
59    #[cfg(unix)]
60    pub async fn input_stream_exec(
61        initial: ExecInsideZoneRequest,
62        tty: bool,
63    ) -> impl Stream<Item = ExecInsideZoneRequest> {
64        let mut stdin = stdin();
65        stream! {
66            yield initial;
67
68            let mut buffer = vec![0u8; 60];
69            let mut terminal_size_change = if tty {
70                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::window_change()).ok()
71            } else {
72                None
73            };
74            let mut stdin_closed = false;
75            loop {
76                let selected = if let Some(ref mut terminal_size_change) = terminal_size_change {
77                    if stdin_closed {
78                        select! {
79                            _ = terminal_size_change.recv() => ExecStdinSelect::TerminalResize,
80                        }
81                    } else {
82                        select! {
83                            result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result),
84                            _ = terminal_size_change.recv() => ExecStdinSelect::TerminalResize,
85                        }
86                    }
87                } else {
88                    select! {
89                        result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result),
90                    }
91                };
92
93                match selected {
94                    ExecStdinSelect::DataRead(result) => {
95                        match result {
96                            Ok(size) => {
97                                let stdin = buffer[0..size].to_vec();
98                                if size == 1 && buffer[0] == 0x1d {
99                                    break;
100                                }
101                                stdin_closed = size == 0;
102                                yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: None, stdin, stdin_closed, };
103                            },
104                            Err(error) => {
105                                debug!("failed to read stdin: {}", error);
106                                break;
107                            }
108                        }
109                    },
110                    ExecStdinSelect::TerminalResize => {
111                        if let Ok((columns, rows)) = crossterm::terminal::size() {
112                            yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: Some(TerminalSize {
113                                rows: rows as u32,
114                                columns: columns as u32,
115                            }), stdin: vec![], stdin_closed: false, };
116                        }
117                    }
118                }
119            }
120        }
121    }
122
123    #[cfg(not(unix))]
124    pub async fn input_stream_exec(
125        initial: ExecInsideZoneRequest,
126        _tty: bool,
127    ) -> impl Stream<Item = ExecInsideZoneRequest> {
128        let mut stdin = stdin();
129        stream! {
130            yield initial;
131
132            let mut buffer = vec![0u8; 60];
133            let mut stdin_closed = false;
134            loop {
135                let selected = select! {
136                    result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result),
137                };
138
139                match selected {
140                    ExecStdinSelect::DataRead(result) => {
141                        match result {
142                            Ok(size) => {
143                                let stdin = buffer[0..size].to_vec();
144                                if size == 1 && buffer[0] == 0x1d {
145                                    break;
146                                }
147                                stdin_closed = size == 0;
148                                yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: None, stdin, stdin_closed, };
149                            },
150                            Err(error) => {
151                                debug!("failed to read stdin: {}", error);
152                                break;
153                            }
154                        }
155                    },
156                    _ => {
157                        continue;
158                    }
159                }
160            }
161        }
162    }
163
164    pub async fn stdout(mut stream: Streaming<ZoneConsoleReply>, raw: bool) -> Result<()> {
165        if raw && stdin().is_tty() {
166            enable_raw_mode()?;
167            StdioConsoleStream::register_terminal_restore_hook()?;
168        }
169        let mut stdout = stdout();
170        while let Some(reply) = stream.next().await {
171            let reply = reply?;
172            if reply.data.is_empty() {
173                continue;
174            }
175            stdout.write_all(&reply.data).await?;
176            stdout.flush().await?;
177        }
178        Ok(())
179    }
180
181    pub async fn exec_output(mut stream: Streaming<ExecInsideZoneReply>, raw: bool) -> Result<i32> {
182        if raw {
183            enable_raw_mode()?;
184            StdioConsoleStream::register_terminal_restore_hook()?;
185        }
186        let mut stdout = stdout();
187        let mut stderr = stderr();
188        while let Some(reply) = stream.next().await {
189            let reply = reply?;
190            if !reply.stdout.is_empty() {
191                stdout.write_all(&reply.stdout).await?;
192                stdout.flush().await?;
193            }
194
195            if !reply.stderr.is_empty() {
196                stderr.write_all(&reply.stderr).await?;
197                stderr.flush().await?;
198            }
199
200            if reply.exited {
201                return if reply.error.is_empty() {
202                    Ok(reply.exit_code)
203                } else {
204                    StdioConsoleStream::restore_terminal_mode();
205                    stderr
206                        .write_all(format!("Error: exec failed: {}\n", reply.error).as_bytes())
207                        .await?;
208                    stderr.flush().await?;
209                    Ok(-1)
210                };
211            }
212        }
213        Ok(-1)
214    }
215
216    pub async fn zone_exit_hook(
217        id: String,
218        events: EventStream,
219    ) -> Result<JoinHandle<Option<i32>>> {
220        Ok(tokio::task::spawn(async move {
221            let mut stream = events.subscribe();
222            while let Ok(event) = stream.recv().await {
223                let Event::ZoneChanged(changed) = event;
224                let Some(zone) = changed.zone else {
225                    continue;
226                };
227
228                let Some(status) = zone.status else {
229                    continue;
230                };
231
232                if zone.id != id {
233                    continue;
234                }
235
236                if let Some(exit_status) = status.exit_status {
237                    return Some(exit_status.code);
238                }
239
240                let state = status.state();
241                if state == ZoneState::Destroying || state == ZoneState::Destroyed {
242                    return Some(10);
243                }
244            }
245            None
246        }))
247    }
248
249    fn register_terminal_restore_hook() -> Result<()> {
250        if stdin().is_tty() {
251            ctrlc::set_handler(move || {
252                StdioConsoleStream::restore_terminal_mode();
253            })?;
254        }
255        Ok(())
256    }
257
258    pub fn restore_terminal_mode() {
259        if is_raw_mode_enabled().unwrap_or(false) {
260            let _ = disable_raw_mode();
261        }
262    }
263}