use std::io::Write;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64;
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tokio::io::AsyncReadExt;
use tokio_tungstenite::tungstenite::Message;
use crate::daemon_client;
pub fn open(project_name: &str, ws_name: Option<&str>, host: Option<&str>) {
let mut client = daemon_client::connect_to_hub_or_exit();
let mut params = json!({"project": project_name});
if let Some(ws) = ws_name {
params["ws"] = json!(ws);
}
if let Some(h) = host {
params["host"] = json!(h);
}
let shell_id = match client.request("shell.open", params) {
Ok(result) => result
.get("shell_id")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string(),
Err(e) => {
eprintln!("Error opening shell: {e}");
std::process::exit(1);
}
};
println!("Opened shell '{shell_id}'");
drop(client);
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(attach_async(&shell_id, host));
}
pub fn attach(shell_id: &str, host: Option<&str>) {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(attach_async(shell_id, host));
}
pub fn list(host: Option<&str>) {
let mut client = daemon_client::connect_to_hub_or_exit();
let mut params = json!({});
if let Some(h) = host {
params["host"] = json!(h);
}
match client.request("shell.list", params) {
Ok(result) => {
let shells = result.as_array().cloned().unwrap_or_default();
if shells.is_empty() {
println!("No active shells.");
return;
}
println!(
"{:<12} {:<12} {:<20} {:<10} {:<24} TITLE",
"HOST", "ID", "WORKSTREAM", "STATUS", "CREATED"
);
println!("{}", "─".repeat(95));
for s in &shells {
let alive = s.get("is_alive").and_then(|v| v.as_bool()).unwrap_or(false);
let title = s.get("title").and_then(|v| v.as_str()).unwrap_or("");
println!(
"{:<12} {:<12} {:<20} {:<10} {:<24} {}",
s.get("host").and_then(|v| v.as_str()).unwrap_or("-"),
s.get("id").and_then(|v| v.as_str()).unwrap_or("-"),
s.get("workstream_id")
.and_then(|v| v.as_str())
.unwrap_or("-"),
if alive { "alive" } else { "dead" },
s.get("created_at").and_then(|v| v.as_str()).unwrap_or("-"),
title,
);
}
}
Err(e) => {
eprintln!("Error: {e}");
std::process::exit(1);
}
}
}
pub fn kill(shell_id: &str, host: Option<&str>) {
let mut client = daemon_client::connect_to_hub_or_exit();
let mut params = json!({"shell_id": shell_id});
if let Some(h) = host {
params["host"] = json!(h);
}
match client.request("shell.kill", params) {
Ok(_) => println!("Killed shell '{shell_id}'"),
Err(e) => {
eprintln!("Error: {e}");
std::process::exit(1);
}
}
}
async fn attach_async(shell_id: &str, host: Option<&str>) {
let vex_home = match vex_app::VexHome::new(None) {
Ok(h) => h,
Err(e) => {
eprintln!("Error: {e}");
std::process::exit(1);
}
};
let hub_config = vex_home.load_hub_config().unwrap_or_default();
let port = hub_config.web_port;
let url = format!("ws://127.0.0.1:{port}/ws");
let token = match vex_home.load_or_create_token() {
Ok(t) => t,
Err(e) => {
eprintln!("Error reading token: {e}");
std::process::exit(1);
}
};
let (ws_stream, _) = match tokio_tungstenite::connect_async(&url).await {
Ok(s) => s,
Err(e) => {
eprintln!("Error connecting to hub: {e}");
eprintln!("Is the hub running? Start it with: vex hub start");
std::process::exit(1);
}
};
let (mut ws_write, mut ws_read) = ws_stream.split();
let auth_msg = json!({"type": "auth", "token": token});
ws_write
.send(Message::Text(auth_msg.to_string().into()))
.await
.unwrap();
let _ = ws_read.next().await;
let mut sub_params = json!({"shell_id": shell_id});
if let Some(h) = host {
sub_params["host"] = json!(h);
}
let sub_msg = json!({
"type": "request",
"id": "sub-1",
"method": "shell.subscribe",
"params": sub_params,
});
ws_write
.send(Message::Text(sub_msg.to_string().into()))
.await
.unwrap();
loop {
match ws_read.next().await {
Some(Ok(Message::Text(text))) => {
if let Ok(val) = serde_json::from_str::<serde_json::Value>(&text) {
if let Some(err) = val.get("error").and_then(|v| v.as_str()) {
eprintln!("Error: {err}");
std::process::exit(1);
}
if val.get("id").is_some() {
break; }
if val.get("event").and_then(|v| v.as_str()) == Some("shell.output")
&& let Some(data) = val.get("data")
&& let Some(b64) = data.get("data").and_then(|v| v.as_str())
&& let Ok(bytes) = BASE64.decode(b64)
{
let mut stdout = std::io::stdout().lock();
let _ = stdout.write_all(&bytes);
let _ = stdout.flush();
}
}
}
_ => {
eprintln!("Error: connection lost during subscribe");
std::process::exit(1);
}
}
}
#[cfg(unix)]
let _raw_guard = RawModeGuard::enter();
let (stdin_tx, mut stdin_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(64);
let stdin_handle = tokio::spawn(async move {
let mut stdin = tokio::io::stdin();
let mut buf = [0u8; 1024];
loop {
match stdin.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
if stdin_tx.send(buf[..n].to_vec()).await.is_err() {
break;
}
}
Err(_) => break,
}
}
});
let shell_id_owned = shell_id.to_string();
let host_owned = host.map(|h| h.to_string());
loop {
tokio::select! {
msg = ws_read.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
if let Ok(val) = serde_json::from_str::<serde_json::Value>(&text)
&& val.get("event").and_then(|v| v.as_str()) == Some("shell.output")
&& let Some(data) = val.get("data")
&& data.get("shell_id").and_then(|v| v.as_str())
.is_some_and(|id| id == shell_id_owned)
&& let Some(b64) = data.get("data").and_then(|v| v.as_str())
&& let Ok(bytes) = BASE64.decode(b64)
{
let mut stdout = std::io::stdout().lock();
let _ = stdout.write_all(&bytes);
let _ = stdout.flush();
}
}
Some(Ok(Message::Close(_))) | None => {
eprintln!("\r\n[server disconnected]\r");
break;
}
Some(Err(_)) => break,
_ => {}
}
}
Some(data) = stdin_rx.recv() => {
if data.contains(&0x1D) {
eprintln!("\r\n[detached from {shell_id_owned}]\r");
break;
}
let encoded = BASE64.encode(&data);
let mut write_params = json!({"shell_id": &shell_id_owned, "data": encoded});
if let Some(ref h) = host_owned {
write_params["host"] = json!(h);
}
let msg = json!({
"type": "request",
"id": "stdin",
"method": "shell.write",
"params": write_params,
});
if ws_write.send(Message::Text(msg.to_string().into())).await.is_err() {
break;
}
}
}
}
let _ = ws_write.send(Message::Close(None)).await;
stdin_handle.abort();
}
#[cfg(unix)]
struct RawModeGuard {
original: termios::Termios,
}
#[cfg(unix)]
impl RawModeGuard {
fn enter() -> Option<Self> {
use std::os::fd::AsRawFd;
use termios::*;
let fd = std::io::stdin().as_raw_fd();
let original = match Termios::from_fd(fd) {
Ok(t) => t,
Err(_) => return None,
};
let mut raw = original;
cfmakeraw(&mut raw);
if tcsetattr(fd, TCSANOW, &raw).is_err() {
return None;
}
Some(Self { original })
}
}
#[cfg(unix)]
impl Drop for RawModeGuard {
fn drop(&mut self) {
use std::os::fd::AsRawFd;
use termios::*;
let fd = std::io::stdin().as_raw_fd();
let _ = tcsetattr(fd, TCSANOW, &self.original);
}
}