use std::time::SystemTime;
use axum::extract::ws::{CloseFrame, Message, WebSocket, WebSocketUpgrade, close_code};
use axum::extract::{Path, State};
use axum::response::Response;
use futures::StreamExt;
use lightshuttle_runtime::{
LifecycleHandle, LifecycleHandleError, LogChunk, LogChunkStream, LogStream,
};
use serde::Serialize;
use tracing::warn;
use crate::state::ControlState;
#[derive(Debug, Serialize)]
struct LogFrame<'a> {
stream: &'static str,
ts_secs: u64,
ts_nanos: u32,
data: &'a str,
}
pub(crate) async fn logs_ws<H>(
ws: WebSocketUpgrade,
State(state): State<ControlState<H>>,
Path(name): Path<String>,
) -> Response
where
H: LifecycleHandle + Clone + Send + Sync + 'static,
{
ws.on_upgrade(move |socket| handle_socket(socket, state.handle, name))
}
async fn handle_socket<H>(mut socket: WebSocket, handle: H, name: String)
where
H: LifecycleHandle + Clone + Send + Sync + 'static,
{
let mut stream = match handle.logs(&name, true).await {
Ok(s) => s,
Err(LifecycleHandleError::UnknownResource(_)) => {
send_close(
&mut socket,
close_code::UNSUPPORTED,
format!("unknown resource: {name}"),
)
.await;
return;
}
Err(err) => {
send_close(&mut socket, close_code::ERROR, err.to_string()).await;
return;
}
};
forward(&mut socket, &mut stream).await;
let _ = socket.send(Message::Close(None)).await;
}
async fn forward(socket: &mut WebSocket, stream: &mut LogChunkStream) {
loop {
tokio::select! {
chunk = stream.next() => match chunk {
Some(Ok(chunk)) => {
let text = encode_frame(&chunk);
if socket.send(Message::Text(text.into())).await.is_err() {
return; }
}
Some(Err(err)) => {
warn!(error = %err, "upstream log stream error");
return;
}
None => return, },
msg = socket.recv() => match msg {
Some(Ok(Message::Close(_)) | Err(_)) | None => return,
Some(Ok(_)) => {} }
}
}
}
fn encode_frame(chunk: &LogChunk) -> String {
let (secs, nanos) = chunk
.timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| (d.as_secs(), d.subsec_nanos()))
.unwrap_or((0, 0));
let data = String::from_utf8_lossy(&chunk.bytes);
let frame = LogFrame {
stream: match chunk.stream {
LogStream::Stdout => "stdout",
LogStream::Stderr => "stderr",
},
ts_secs: secs,
ts_nanos: nanos,
data: data.as_ref(),
};
serde_json::to_string(&frame).unwrap_or_else(|_| String::from("{}"))
}
async fn send_close(socket: &mut WebSocket, code: u16, reason: String) {
let frame = Message::Close(Some(CloseFrame {
code,
reason: reason.into(),
}));
let _ = socket.send(frame).await;
}