use nebulous::config::GlobalConfig;
use std::error::Error as StdError;
use futures::{SinkExt, StreamExt};
use reqwest::Client;
use serde::Deserialize;
use tokio_tungstenite::{
connect_async, tungstenite::http::Request, tungstenite::protocol::Message,
};
pub async fn fetch_container_logs(
name: String,
namespace: Option<String>,
follow: bool,
) -> Result<String, Box<dyn StdError>> {
let config = GlobalConfig::read()?;
let current_server = config.get_current_server_config().unwrap();
let server = current_server.server.as_ref().unwrap();
let api_key = current_server.api_key.as_ref().unwrap();
let namespace = namespace.unwrap_or("-".to_string());
if follow {
let ws_url = server
.replace("http://", "ws://")
.replace("https://", "wss://");
let ws_url = format!(
"{}/v1/containers/{}/{}/logs/stream",
ws_url, namespace, name
);
let request = Request::builder()
.uri(ws_url)
.header("Authorization", format!("Bearer {}", api_key))
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13") .body(())?;
let (ws_stream, response) = connect_async(request).await?;
if !response.status().is_success() {
return Err(format!("WebSocket handshake failed: Status {}", response.status()).into());
}
println!("WebSocket connection established");
let (mut write, mut read) = ws_stream.split();
while let Some(message) = read.next().await {
match message {
Ok(Message::Text(text)) => {
println!("{}", text);
}
Ok(Message::Close(_)) => {
println!("Connection closed by server");
break;
}
Ok(Message::Ping(ping_data)) => {
write.send(Message::Pong(ping_data)).await?;
}
Err(e) => {
eprintln!("WebSocket error: {}", e);
return Err(e.into());
}
_ => {}
}
}
Ok("Log streaming finished.".to_string())
} else {
let container_id = fetch_container_id_from_api(&namespace, &name).await?;
let _bearer_token = format!("Bearer {}", api_key);
let mut cmd = vec![
"cat".to_string(),
"$HOME/.logs/nebu_container.log".to_string(),
];
let output = nebulous::ssh::exec::run_ssh_command_ts(
&format!("container-{}", container_id),
cmd,
false,
false,
Some("root"), )?;
println!("{}", output);
Ok(output)
}
}
async fn fetch_container_id_from_api(
namespace: &str,
name: &str,
) -> Result<String, Box<dyn StdError>> {
let config = nebulous::config::GlobalConfig::read()?;
let current_server = config.get_current_server_config().unwrap();
let server = current_server.server.as_ref().unwrap();
let api_key = current_server.api_key.as_ref().unwrap();
let url = format!("{}/v1/containers/{}/{}", server, namespace, name);
let client = Client::new();
let response = client
.get(&url)
.header("Authorization", format!("Bearer {}", api_key))
.send()
.await?;
let container = response
.error_for_status()? .json::<V1Container>()
.await?;
Ok(container.metadata.id)
}
#[derive(Deserialize)]
struct V1Container {
metadata: V1ResourceMeta,
}
#[derive(Deserialize)]
struct V1ResourceMeta {
pub id: String,
}