anubis-wormhole 1.0.0

A post-quantum secure file transfer tool based on the Magic Wormhole protocol.
Documentation
use crate::mailbox::types::{ServerMsg, random_side};
use futures_util::{SinkExt, StreamExt};
use thiserror::Error;
use tokio_tungstenite::{connect_async};
use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message;
use url::Url;
use tokio_socks::tcp::Socks5Stream;
use tracing::{debug, info, warn};

#[derive(Debug, Error)]
pub enum MailboxError {
    #[error("websocket error")] Ws,
    #[error("protocol error: {0}")] Proto(String),
}

pub struct MailboxClient {
    url: String,
    appid: String,
    side: String,
    tx: Option<futures::stream::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, Message>>,
    rx: Option<mpsc::UnboundedReceiver<ServerMsg>>,
    pub(crate) last_nameplate: Option<String>,
    pub(crate) last_mailbox: Option<String>,
    pub(crate) last_pong: Option<std::time::Instant>,
    socks: Option<(String, u16)>,
    ping_counter: u64,
}

impl MailboxClient {
    pub fn new(url: String, appid: String) -> Self {
        let side = random_side();
        let url = normalize_relay_url(url);
        Self { url, appid, side, tx: None, rx: None, last_nameplate: None, last_mailbox: None, last_pong: None, socks: None, ping_counter: 0 }
    }

    pub fn set_socks(&mut self, socks: Option<(String, u16)>) { self.socks = socks; }
    pub fn socks_endpoint(&self) -> Option<(String,u16)> { self.socks.clone() }

    pub async fn connect(&mut self) -> Result<(), MailboxError> {
        info!(target: "anubis.mailbox", url=%self.url, side=%self.side, "connecting");
        // If socks is configured, tunnel via SOCKS5 and perform WS handshake over that stream (supports ws and wss)
        let (ws, _resp) = if let Some((shost, sport)) = &self.socks {
            let url = Url::parse(&self.url).map_err(|e| MailboxError::Proto(format!("bad url: {e}")))?;
            let host = url.host_str().ok_or_else(|| MailboxError::Proto("missing host".into()))?.to_string();
            let is_wss = url.scheme() == "wss";
            let port = url.port().unwrap_or(if is_wss { 443 } else { 80 });
            // SOCKS5 connect to target host:port
            let tcp = Socks5Stream::connect((shost.as_str(), *sport), (host.as_str(), port)).await
                .map_err(|_| MailboxError::Ws)?
                .into_inner();

            // Use TLS-capable client to handle ws:// or wss:// appropriately over the preconnected stream
            let req_url = self.url.as_str();
            tokio_tungstenite::client_async_tls_with_config(req_url, tcp, None, None).await.map_err(|_| MailboxError::Ws)?
        } else {
            connect_async(&self.url).await.map_err(|_| MailboxError::Ws)?
        };
        info!(target: "anubis.mailbox", url=%self.url, side=%self.side, "connected");
        let (mut sink, mut stream) = ws.split();
        let bind = serde_json::json!({
            "type": "bind",
            "appid": self.appid,
            "side": self.side,
            "client_version": [0,0,1],
        });
        sink.send(Message::Text(bind.to_string())).await.map_err(|_| MailboxError::Ws)?;
        let (tx_srv, rx_srv) = mpsc::unbounded_channel();
        tokio::spawn(async move {
            while let Some(msg) = stream.next().await {
                match msg {
                    Ok(Message::Text(txt)) => {
                        let v: serde_json::Value = match serde_json::from_str(&txt) { Ok(v) => v, Err(_) => continue };
                        let mtype = v.get("type").and_then(|t| t.as_str()).unwrap_or("");
                        let parsed = match mtype {
                            "welcome" => ServerMsg::Welcome { motd: v["welcome"]["motd"].as_str().map(|s| s.to_string()), current_cli_version: None, error: v["welcome"]["error"].as_str().map(|s| s.to_string()) },
                            "allocated" => ServerMsg::NameplateAllocated { nameplate: v["nameplate"].as_str().unwrap_or("").to_string() },
                            "claimed" => ServerMsg::Claimed { mailbox: v["mailbox"].as_str().unwrap_or("").to_string() },
                            "message" => ServerMsg::Message { side: v["side"].as_str().unwrap_or("").to_string(), phase: v["phase"].as_str().unwrap_or("").to_string(), body: v["body"].as_str().unwrap_or("").to_string() },
                            "nameplates" => {
                                let ids = v["nameplates"].as_array().map(|arr| arr.iter().filter_map(|n| n["id"].as_str().map(|s| s.to_string())).collect()).unwrap_or_else(|| vec![]);
                                ServerMsg::Nameplates { ids }
                            }
                            "released" => ServerMsg::Released { },
                            "closed" => ServerMsg::Closed { },
                            "pong" => ServerMsg::Pong,
                            "error" => ServerMsg::Error { error: v["error"].as_str().unwrap_or("").to_string() },
                            _ => continue,
                        };
                        match &parsed {
                            ServerMsg::Welcome { .. } => info!(target: "anubis.mailbox", "rx: welcome"),
                            ServerMsg::NameplateAllocated { nameplate } => info!(target: "anubis.mailbox", nameplate=%nameplate, "rx: allocated"),
                            ServerMsg::Claimed { mailbox } => info!(target: "anubis.mailbox", mailbox=%mailbox, "rx: claimed"),
                            ServerMsg::Message { phase, side, .. } => debug!(target: "anubis.mailbox", phase=%phase, side=%side, "rx: message"),
                            ServerMsg::Nameplates { ids } => info!(target: "anubis.mailbox", count=%ids.len(), "rx: nameplates"),
                            ServerMsg::Pong => debug!(target: "anubis.mailbox", "rx: pong"),
                            ServerMsg::Error { error } => warn!(target: "anubis.mailbox", error=%error, "rx: error"),
                            ServerMsg::Released { .. } => info!(target: "anubis.mailbox", "rx: released"),
                            ServerMsg::Closed { .. } => info!(target: "anubis.mailbox", "rx: closed"),
                        }
                        let _ = tx_srv.send(parsed);
                    }
                    Ok(Message::Ping(_)) => { let _ = tx_srv.send(ServerMsg::Pong); }
                    Ok(Message::Pong(_)) => { let _ = tx_srv.send(ServerMsg::Pong); }
                    Err(_) => { let _ = tx_srv.send(ServerMsg::Error { error: "stream_closed".to_string() }); break; }
                    _ => {}
                }
            }
        });
        self.tx = Some(sink);
        self.rx = Some(rx_srv);
        // (Optional heartbeat disabled in this minimal client; Supervisor drives pings.)
        Ok(())
    }

    pub fn side(&self) -> &str { &self.side }

    async fn send_json(&mut self, v: serde_json::Value) -> Result<(), MailboxError> {
        if let Some(sink) = &mut self.tx {
            debug!(target: "anubis.mailbox", msg=%v["type"].as_str().unwrap_or(""), "tx: json");
            sink.send(Message::Text(v.to_string())).await.map_err(|_| MailboxError::Ws)?;
        }
        Ok(())
    }

    pub async fn allocate(&mut self) -> Result<(), MailboxError> {
        self.send_json(serde_json::json!({"type":"allocate"})).await
    }
    pub async fn claim(&mut self, nameplate: &str) -> Result<(), MailboxError> {
        self.last_nameplate = Some(nameplate.to_string());
        self.send_json(serde_json::json!({"type":"claim","nameplate":nameplate})).await
    }
    pub async fn open(&mut self, mailbox: &str) -> Result<(), MailboxError> {
        self.last_mailbox = Some(mailbox.to_string());
        self.send_json(serde_json::json!({"type":"open","mailbox":mailbox})).await
    }
    pub async fn add(&mut self, phase: &str, body_hex: &str) -> Result<(), MailboxError> {
        self.send_json(serde_json::json!({"type":"add","phase":phase,"body":body_hex})).await
    }
    pub async fn release(&mut self, nameplate: &str) -> Result<(), MailboxError> {
        self.send_json(serde_json::json!({"type":"release","nameplate":nameplate})).await
    }
    pub async fn close(&mut self, mailbox: &str, mood: &str) -> Result<(), MailboxError> {
        self.send_json(serde_json::json!({"type":"close","mailbox":mailbox,"mood":mood})).await
    }

    pub async fn next(&mut self) -> Option<ServerMsg> {
        if let Some(rx) = &mut self.rx {
            let msg = rx.recv().await;
            match &msg {
                Some(ServerMsg::Message { .. }) => {
                    if let Some(np) = self.last_nameplate.clone() {
                        let _ = self.release(&np).await; // release on first peer
                        self.last_nameplate = None;
                    }
                }
                Some(ServerMsg::Pong) => { self.last_pong = Some(std::time::Instant::now()); }
                Some(ServerMsg::Error { error }) if error == "stream_closed" => {
                    // surface to caller; they may reconnect
                }
                _ => {}
            }
            msg
        } else { None }
    }

    pub async fn reopen_last(&mut self) {
        if let Some(mb) = self.last_mailbox.clone() {
            let _ = self.open(&mb).await;
        }
    }

    pub async fn connect_with_backoff(&mut self) -> Result<(), MailboxError> {
        let mut delay = 1u64;
        loop {
            match self.connect().await {
                Ok(()) => return Ok(()),
                Err(_) => {
                    warn!(target: "anubis.mailbox", secs=delay, "connect failed; backing off");
                    tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
                    delay = (delay * 2).min(30);
                }
            }
        }
    }

    pub async fn list(&mut self) -> Result<(), MailboxError> {
        self.send_json(serde_json::json!({"type":"list"})).await
    }

    pub async fn ping(&mut self) -> Result<(), MailboxError> {
        self.ping_counter = self.ping_counter.wrapping_add(1);
        self.send_json(serde_json::json!({"type":"ping","ping": self.ping_counter})).await
    }
}

fn normalize_relay_url(s: String) -> String {
    // Ensure path ends with /v1 if empty
    if let Ok(mut u) = Url::parse(&s) {
        let p = u.path();
        if p.is_empty() || p == "/" {
            u.set_path("/v1");
            return u.to_string();
        }
        return s;
    }
    s
}