use crate::apis::configuration::Configuration;
use crate::apis::{lxc_api, nodes_api, qemu_api};
use async_trait::async_trait;
use futures_util::{SinkExt, StreamExt};
use std::sync::Arc;
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::http::HeaderValue;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
#[derive(Debug, Clone)]
pub enum TerminalTarget {
Node { node: String },
Qemu { node: String, vmid: i32 },
Lxc { node: String, vmid: i32 },
}
#[derive(Debug, Clone)]
pub enum VncTarget {
NodeShell { node: String },
Qemu { node: String, vmid: i32 },
Lxc { node: String, vmid: i32 },
}
#[derive(Debug, Clone)]
pub struct ConsoleTicket {
pub ticket: String,
pub port: u16,
pub user: String,
pub upid: Option<String>,
pub cert: Option<String>,
}
#[derive(Debug, Error)]
pub enum WsError {
#[error("api call failed: {0}")]
Api(String),
#[error("ticket payload missing required fields")]
BadTicket,
#[error("invalid base_path: {0}")]
BadBasePath(String),
#[error("websocket protocol error: {0}")]
Tungstenite(#[from] tokio_tungstenite::tungstenite::Error),
#[error("invalid request header: {0}")]
Header(#[from] tokio_tungstenite::tungstenite::http::header::InvalidHeaderValue),
#[error("no issuer registered for the requested flow")]
UnsupportedTarget,
}
pub trait FrameCodec: Send {
type In;
type Out;
fn encode(&self, payload: Self::In) -> Vec<u8>;
fn decode(&mut self, chunk: &[u8], out: &mut Vec<Self::Out>);
}
#[derive(Default)]
pub struct TextFrameCodec {
pending: String,
}
impl TextFrameCodec {
pub const PING_FRAME: &'static str = "2";
pub fn encode_resize(cols: u32, rows: u32) -> String {
format!("1:{}:{}:", cols, rows)
}
}
impl FrameCodec for TextFrameCodec {
type In = String;
type Out = String;
fn encode(&self, payload: Self::In) -> Vec<u8> {
format!("0:{}:{}\n", payload.len(), payload).into_bytes()
}
fn decode(&mut self, chunk: &[u8], out: &mut Vec<Self::Out>) {
if chunk.is_empty() {
return;
}
out.push(String::from_utf8_lossy(chunk).into_owned());
}
}
#[derive(Default)]
pub struct BinaryFrameCodec;
impl FrameCodec for BinaryFrameCodec {
type In = Vec<u8>;
type Out = Vec<u8>;
fn encode(&self, payload: Self::In) -> Vec<u8> {
payload
}
fn decode(&mut self, chunk: &[u8], out: &mut Vec<Self::Out>) {
if !chunk.is_empty() {
out.push(chunk.to_vec());
}
}
}
#[async_trait]
pub trait TerminalTicketIssuer: Send + Sync {
async fn issue(&self, cfg: &Configuration) -> Result<(ConsoleTicket, String), WsError>;
}
#[async_trait]
pub trait VncTicketIssuer: Send + Sync {
async fn issue(&self, cfg: &Configuration) -> Result<(ConsoleTicket, String), WsError>;
}
struct NodeTermproxyIssuer { node: String }
struct QemuTermproxyIssuer { node: String, vmid: i32 }
struct LxcTermproxyIssuer { node: String, vmid: i32 }
struct NodeVncShellIssuer { node: String }
struct QemuVncproxyIssuer { node: String, vmid: i32 }
struct LxcVncproxyIssuer { node: String, vmid: i32 }
#[async_trait]
impl TerminalTicketIssuer for NodeTermproxyIssuer {
async fn issue(&self, cfg: &Configuration) -> Result<(ConsoleTicket, String), WsError> {
let resp = nodes_api::nodes_termproxy(cfg, &self.node, Some(crate::models::NodesTermproxyRequest::new()))
.await
.map_err(|e| WsError::Api(format!("{e}")))?;
Ok((parse_ticket(&resp.data)?, format!("/nodes/{}", urlencoding::encode(&self.node))))
}
}
#[async_trait]
impl TerminalTicketIssuer for QemuTermproxyIssuer {
async fn issue(&self, cfg: &Configuration) -> Result<(ConsoleTicket, String), WsError> {
let resp = qemu_api::qemu_termproxy(cfg, &self.node, self.vmid, Some(crate::models::QemuTermproxyRequest::new()))
.await
.map_err(|e| WsError::Api(format!("{e}")))?;
Ok((parse_ticket(&resp.data)?, format!("/nodes/{}/qemu/{}", urlencoding::encode(&self.node), self.vmid)))
}
}
#[async_trait]
impl TerminalTicketIssuer for LxcTermproxyIssuer {
async fn issue(&self, cfg: &Configuration) -> Result<(ConsoleTicket, String), WsError> {
let resp = lxc_api::lxc_termproxy(cfg, &self.node, self.vmid)
.await
.map_err(|e| WsError::Api(format!("{e}")))?;
Ok((parse_ticket(&resp.data)?, format!("/nodes/{}/lxc/{}", urlencoding::encode(&self.node), self.vmid)))
}
}
#[async_trait]
impl VncTicketIssuer for NodeVncShellIssuer {
async fn issue(&self, cfg: &Configuration) -> Result<(ConsoleTicket, String), WsError> {
let resp = nodes_api::nodes_vncshell(cfg, &self.node, None)
.await
.map_err(|e| WsError::Api(format!("{e}")))?;
Ok((parse_ticket(&resp.data)?, format!("/nodes/{}", urlencoding::encode(&self.node))))
}
}
#[async_trait]
impl VncTicketIssuer for QemuVncproxyIssuer {
async fn issue(&self, cfg: &Configuration) -> Result<(ConsoleTicket, String), WsError> {
let resp = qemu_api::qemu_vncproxy(cfg, &self.node, self.vmid, None)
.await
.map_err(|e| WsError::Api(format!("{e}")))?;
Ok((parse_ticket(&resp.data)?, format!("/nodes/{}/qemu/{}", urlencoding::encode(&self.node), self.vmid)))
}
}
#[async_trait]
impl VncTicketIssuer for LxcVncproxyIssuer {
async fn issue(&self, cfg: &Configuration) -> Result<(ConsoleTicket, String), WsError> {
let resp = lxc_api::lxc_vncproxy(cfg, &self.node, self.vmid, None)
.await
.map_err(|e| WsError::Api(format!("{e}")))?;
Ok((parse_ticket(&resp.data)?, format!("/nodes/{}/lxc/{}", urlencoding::encode(&self.node), self.vmid)))
}
}
fn terminal_issuer_for(target: TerminalTarget) -> Box<dyn TerminalTicketIssuer> {
match target {
TerminalTarget::Node { node } => Box::new(NodeTermproxyIssuer { node }),
TerminalTarget::Qemu { node, vmid } => Box::new(QemuTermproxyIssuer { node, vmid }),
TerminalTarget::Lxc { node, vmid } => Box::new(LxcTermproxyIssuer { node, vmid }),
}
}
fn vnc_issuer_for(target: VncTarget) -> Box<dyn VncTicketIssuer> {
match target {
VncTarget::NodeShell { node } => Box::new(NodeVncShellIssuer { node }),
VncTarget::Qemu { node, vmid } => Box::new(QemuVncproxyIssuer { node, vmid }),
VncTarget::Lxc { node, vmid } => Box::new(LxcVncproxyIssuer { node, vmid }),
}
}
fn parse_ticket(value: &Option<serde_json::Value>) -> Result<ConsoleTicket, WsError> {
let v = value.as_ref().ok_or(WsError::BadTicket)?;
let obj = v.as_object().ok_or(WsError::BadTicket)?;
let ticket = obj.get("ticket").and_then(|x| x.as_str()).ok_or(WsError::BadTicket)?.to_string();
let port = obj
.get("port")
.and_then(|p| p.as_u64().or_else(|| p.as_str().and_then(|s| s.parse::<u64>().ok())))
.ok_or(WsError::BadTicket)? as u16;
let user = obj.get("user").and_then(|x| x.as_str()).unwrap_or("").to_string();
let upid = obj.get("upid").and_then(|x| x.as_str()).map(String::from);
let cert = obj.get("cert").and_then(|x| x.as_str()).map(String::from);
Ok(ConsoleTicket { ticket, port, user, upid, cert })
}
pub trait AuthAttacher: Send + Sync {
fn apply(&self, request: &mut tokio_tungstenite::tungstenite::http::Request<()>, cfg: &Configuration) -> Result<(), WsError>;
}
pub struct CompositeAuthAttacher {
inner: Vec<Box<dyn AuthAttacher>>,
}
impl CompositeAuthAttacher {
pub fn new(inner: Vec<Box<dyn AuthAttacher>>) -> Self { Self { inner } }
}
impl AuthAttacher for CompositeAuthAttacher {
fn apply(&self, req: &mut tokio_tungstenite::tungstenite::http::Request<()>, cfg: &Configuration) -> Result<(), WsError> {
for a in &self.inner { a.apply(req, cfg)?; }
Ok(())
}
}
struct TokenAuthAttacher;
impl AuthAttacher for TokenAuthAttacher {
fn apply(&self, req: &mut tokio_tungstenite::tungstenite::http::Request<()>, cfg: &Configuration) -> Result<(), WsError> {
if let Some(token) = cfg.bearer_access_token.as_ref() {
let header = HeaderValue::from_str(&format!("PVEAPIToken={}", token))?;
req.headers_mut().insert("authorization", header);
}
Ok(())
}
}
struct ApiKeyAuthAttacher;
impl AuthAttacher for ApiKeyAuthAttacher {
fn apply(&self, req: &mut tokio_tungstenite::tungstenite::http::Request<()>, cfg: &Configuration) -> Result<(), WsError> {
if let Some(api_key) = cfg.api_key.as_ref() {
let prefix = api_key.prefix.as_deref().unwrap_or("");
let value = if prefix.is_empty() { api_key.key.clone() } else { format!("{} {}", prefix, api_key.key) };
let header = HeaderValue::from_str(&value)?;
if value.starts_with("PVEAuthCookie=") {
req.headers_mut().insert("cookie", header);
} else if !req.headers().contains_key("authorization") {
req.headers_mut().insert("authorization", header);
}
}
Ok(())
}
}
fn default_auth() -> Box<dyn AuthAttacher> {
Box::new(CompositeAuthAttacher::new(vec![
Box::new(TokenAuthAttacher),
Box::new(ApiKeyAuthAttacher),
]))
}
#[async_trait]
pub trait WebSocketTransport: Send + Sync {
async fn open(&self, url: &str, auth: &dyn AuthAttacher, cfg: &Configuration) -> Result<WsStream, WsError>;
}
struct TungsteniteTransport;
#[async_trait]
impl WebSocketTransport for TungsteniteTransport {
async fn open(&self, url: &str, auth: &dyn AuthAttacher, cfg: &Configuration) -> Result<WsStream, WsError> {
let mut req = url.into_client_request()?;
auth.apply(&mut req, cfg)?;
let (stream, _) = tokio_tungstenite::connect_async(req).await?;
Ok(stream)
}
}
fn default_transport() -> Arc<dyn WebSocketTransport> {
Arc::new(TungsteniteTransport)
}
fn build_upgrade_url(cfg: &Configuration, base_path: &str, ticket: &ConsoleTicket) -> Result<String, WsError> {
let mut url = url::Url::parse(&cfg.base_path).map_err(|e| WsError::BadBasePath(e.to_string()))?;
let new_scheme = match url.scheme() {
"https" => "wss",
"http" => "ws",
other => return Err(WsError::BadBasePath(format!("unsupported scheme {other}"))),
};
url.set_scheme(new_scheme).map_err(|_| WsError::BadBasePath("scheme set failed".into()))?;
let mut joined = url.to_string();
if joined.ends_with('/') { joined.pop(); }
Ok(format!(
"{joined}{base_path}/vncwebsocket?port={port}&vncticket={tk}",
port = ticket.port,
tk = urlencoding::encode(&ticket.ticket),
))
}
const PING_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
pub struct TerminalSession {
stream: WsStream,
codec: TextFrameCodec,
last_ping: std::time::Instant,
}
impl TerminalSession {
pub async fn send(&mut self, text: &str) -> Result<(), WsError> {
let frame = self.codec.encode(text.to_string());
self.stream.send(Message::Binary(frame.into())).await?;
Ok(())
}
pub async fn resize(&mut self, cols: u32, rows: u32) -> Result<(), WsError> {
self.stream.send(Message::Text(TextFrameCodec::encode_resize(cols, rows).into())).await?;
Ok(())
}
pub async fn recv(&mut self) -> Result<Option<String>, WsError> {
let mut decoded: Vec<String> = Vec::new();
loop {
if let Some(msg) = decoded.pop() { return Ok(Some(msg)); }
if self.last_ping.elapsed() >= PING_INTERVAL {
self.stream.send(Message::Text(TextFrameCodec::PING_FRAME.into())).await?;
self.last_ping = std::time::Instant::now();
}
match self.stream.next().await {
None => return Ok(None),
Some(Err(e)) => return Err(WsError::Tungstenite(e)),
Some(Ok(Message::Text(t))) => self.codec.decode(t.as_bytes(), &mut decoded),
Some(Ok(Message::Binary(b))) => self.codec.decode(b.as_ref(), &mut decoded),
Some(Ok(Message::Close(_))) => return Ok(None),
Some(Ok(_)) => {}
}
}
}
pub async fn close(mut self) -> Result<(), WsError> {
self.stream.close(None).await?;
Ok(())
}
}
pub struct VncSession {
stream: WsStream,
codec: BinaryFrameCodec,
}
impl VncSession {
pub async fn send(&mut self, data: Vec<u8>) -> Result<(), WsError> {
self.stream.send(Message::Binary(self.codec.encode(data).into())).await?;
Ok(())
}
pub async fn recv(&mut self) -> Result<Option<Vec<u8>>, WsError> {
loop {
match self.stream.next().await {
None => return Ok(None),
Some(Err(e)) => return Err(WsError::Tungstenite(e)),
Some(Ok(Message::Binary(b))) => return Ok(Some(b.to_vec())),
Some(Ok(Message::Text(t))) => return Ok(Some(t.as_bytes().to_vec())),
Some(Ok(Message::Close(_))) => return Ok(None),
Some(Ok(_)) => {}
}
}
}
pub async fn close(mut self) -> Result<(), WsError> {
self.stream.close(None).await?;
Ok(())
}
}
pub struct ConsoleConnector {
auth: Box<dyn AuthAttacher>,
transport: Arc<dyn WebSocketTransport>,
}
impl ConsoleConnector {
pub fn new(auth: Box<dyn AuthAttacher>, transport: Arc<dyn WebSocketTransport>) -> Self {
Self { auth, transport }
}
pub fn default() -> Self {
Self::new(default_auth(), default_transport())
}
pub async fn open_terminal(&self, cfg: &Configuration, target: TerminalTarget) -> Result<TerminalSession, WsError> {
let issuer = terminal_issuer_for(target);
let (ticket, base_path) = issuer.issue(cfg).await?;
let url = build_upgrade_url(cfg, &base_path, &ticket)?;
let mut stream = self.transport.open(&url, self.auth.as_ref(), cfg).await?;
let auth_frame = format!("{}:{}\n", ticket.user, ticket.ticket);
SinkExt::send(&mut stream, Message::Text(auth_frame.into())).await?;
Ok(TerminalSession {
stream,
codec: TextFrameCodec::default(),
last_ping: std::time::Instant::now(),
})
}
pub async fn open_vnc(&self, cfg: &Configuration, target: VncTarget) -> Result<VncSession, WsError> {
let issuer = vnc_issuer_for(target);
let (ticket, base_path) = issuer.issue(cfg).await?;
let url = build_upgrade_url(cfg, &base_path, &ticket)?;
let stream = self.transport.open(&url, self.auth.as_ref(), cfg).await?;
Ok(VncSession {
stream,
codec: BinaryFrameCodec,
})
}
}
pub async fn connect_terminal(cfg: &Configuration, target: TerminalTarget) -> Result<TerminalSession, WsError> {
ConsoleConnector::default().open_terminal(cfg, target).await
}
pub async fn connect_vnc(cfg: &Configuration, target: VncTarget) -> Result<VncSession, WsError> {
ConsoleConnector::default().open_vnc(cfg, target).await
}
#[cfg(test)]
mod tests {
use super::*;
fn decode_text(codec: &mut TextFrameCodec, chunks: &[&str]) -> Vec<String> {
let mut out = Vec::new();
for c in chunks {
codec.decode(c.as_bytes(), &mut out);
}
out
}
#[test]
fn text_codec_passes_chunk_through_verbatim() {
let mut c = TextFrameCodec::default();
assert_eq!(decode_text(&mut c, &["hello"]), vec!["hello".to_string()]);
}
#[test]
fn text_codec_emits_each_chunk_separately() {
let mut c = TextFrameCodec::default();
assert_eq!(
decode_text(&mut c, &["foo", "bar"]),
vec!["foo".to_string(), "bar".to_string()],
);
}
#[test]
fn text_codec_passes_raw_tty_bytes_through() {
let mut c = TextFrameCodec::default();
assert_eq!(
decode_text(&mut c, &["\x1b[Kprompt$ "]),
vec!["\x1b[Kprompt$ ".to_string()],
);
}
#[test]
fn text_codec_ignores_empty_chunks() {
let mut c = TextFrameCodec::default();
assert!(decode_text(&mut c, &[""]).is_empty());
}
#[test]
fn text_codec_returns_empty_when_no_chunks_arrived() {
let mut c = TextFrameCodec::default();
assert!(decode_text(&mut c, &[]).is_empty());
}
#[test]
fn text_codec_preserves_bytes_that_contain_colons() {
let mut c = TextFrameCodec::default();
assert_eq!(
decode_text(&mut c, &["0:5:a:b:c"]),
vec!["0:5:a:b:c".to_string()],
);
}
#[test]
fn text_codec_encode_produces_data_frame_format() {
let c = TextFrameCodec::default();
let frame = c.encode("hello".to_string());
assert_eq!(String::from_utf8(frame).unwrap(), "0:5:hello\n");
}
#[test]
fn text_codec_encode_resize_produces_resize_frame_format() {
assert_eq!(TextFrameCodec::encode_resize(120, 32), "1:120:32:");
}
#[test]
fn text_codec_ping_frame_constant() {
assert_eq!(TextFrameCodec::PING_FRAME, "2");
}
#[test]
fn binary_codec_passes_bytes_through_unchanged() {
let mut c = BinaryFrameCodec;
let mut out: Vec<Vec<u8>> = Vec::new();
c.decode(&[1, 2, 3, 4, 5], &mut out);
assert_eq!(out, vec![vec![1u8, 2, 3, 4, 5]]);
}
#[test]
fn binary_codec_encode_passthrough() {
let c = BinaryFrameCodec;
assert_eq!(c.encode(vec![7u8, 8, 9]), vec![7u8, 8, 9]);
}
}