use anyhow::Context;
use super::{MantaClient, ws_base_url};
impl MantaClient {
pub async fn console_node(
&self,
token: &str,
xname: &str,
cols: u16,
rows: u16,
) -> anyhow::Result<(
Box<dyn tokio::io::AsyncWrite + Unpin + Send>,
Box<dyn tokio::io::AsyncRead + Unpin + Send>,
)> {
let url = format!(
"{}/nodes/{}/console?cols={}&rows={}",
ws_base_url(self.base_url()),
xname,
cols,
rows,
);
self.connect_console_ws(token, &url).await
}
pub async fn console_session(
&self,
token: &str,
session_name: &str,
cols: u16,
rows: u16,
) -> anyhow::Result<(
Box<dyn tokio::io::AsyncWrite + Unpin + Send>,
Box<dyn tokio::io::AsyncRead + Unpin + Send>,
)> {
let url = format!(
"{}/sessions/{}/console?cols={}&rows={}",
ws_base_url(self.base_url()),
session_name,
cols,
rows,
);
self.connect_console_ws(token, &url).await
}
async fn connect_console_ws(
&self,
token: &str,
url: &str,
) -> anyhow::Result<(
Box<dyn tokio::io::AsyncWrite + Unpin + Send>,
Box<dyn tokio::io::AsyncRead + Unpin + Send>,
)> {
use futures::{SinkExt, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::http::HeaderValue;
let mut req = url.into_client_request().context("Invalid WebSocket URL")?;
req.headers_mut().insert(
"Authorization",
HeaderValue::from_str(&format!("Bearer {token}"))
.context("Invalid token header value")?,
);
req.headers_mut().insert(
"X-Manta-Site",
HeaderValue::from_str(self.site_name())
.context("Invalid site-name header value")?,
);
let (ws_stream, _) = tokio_tungstenite::connect_async(req)
.await
.context("WebSocket connection failed")?;
let (mut ws_sink, mut ws_source) = ws_stream.split();
let (stdin_cli_end, mut stdin_bridge_end) = tokio::io::duplex(65536);
let (mut stdout_bridge_end, stdout_cli_end) = tokio::io::duplex(65536);
tokio::spawn(async move {
let mut buf = vec![0u8; 4096];
loop {
tokio::select! {
n = stdin_bridge_end.read(&mut buf) => {
match n {
Ok(0) | Err(_) => break,
Ok(n) => {
let data = tokio_util::bytes::Bytes::copy_from_slice(&buf[..n]);
if ws_sink.send(Message::Binary(data)).await.is_err() {
break;
}
}
}
}
frame = ws_source.next() => {
match frame {
Some(Ok(Message::Binary(data))) => {
if stdout_bridge_end.write_all(&data).await.is_err() { break; }
}
Some(Ok(Message::Text(text))) => {
if stdout_bridge_end.write_all(text.as_bytes()).await.is_err() { break; }
}
Some(Ok(Message::Close(_))) | None => break,
Some(Err(_)) => break,
Some(Ok(_)) => {} }
}
}
}
});
Ok((Box::new(stdin_cli_end), Box::new(stdout_cli_end)))
}
}