use crate::mailbox::types::{ServerMsg, random_side};
use futures_util::{SinkExt, StreamExt};
use thiserror::Error;
use tokio_tungstenite::{connect_async};
use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message;
use url::Url;
use tokio_socks::tcp::Socks5Stream;
use tracing::{debug, info, warn};
#[derive(Debug, Error)]
pub enum MailboxError {
#[error("websocket error")] Ws,
#[error("protocol error: {0}")] Proto(String),
}
pub struct MailboxClient {
url: String,
appid: String,
side: String,
tx: Option<futures::stream::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, Message>>,
rx: Option<mpsc::UnboundedReceiver<ServerMsg>>,
pub(crate) last_nameplate: Option<String>,
pub(crate) last_mailbox: Option<String>,
pub(crate) last_pong: Option<std::time::Instant>,
socks: Option<(String, u16)>,
ping_counter: u64,
}
impl MailboxClient {
pub fn new(url: String, appid: String) -> Self {
let side = random_side();
let url = normalize_relay_url(url);
Self { url, appid, side, tx: None, rx: None, last_nameplate: None, last_mailbox: None, last_pong: None, socks: None, ping_counter: 0 }
}
pub fn set_socks(&mut self, socks: Option<(String, u16)>) { self.socks = socks; }
pub fn socks_endpoint(&self) -> Option<(String,u16)> { self.socks.clone() }
pub async fn connect(&mut self) -> Result<(), MailboxError> {
info!(target: "anubis.mailbox", url=%self.url, side=%self.side, "connecting");
let (ws, _resp) = if let Some((shost, sport)) = &self.socks {
let url = Url::parse(&self.url).map_err(|e| MailboxError::Proto(format!("bad url: {e}")))?;
let host = url.host_str().ok_or_else(|| MailboxError::Proto("missing host".into()))?.to_string();
let is_wss = url.scheme() == "wss";
let port = url.port().unwrap_or(if is_wss { 443 } else { 80 });
let tcp = Socks5Stream::connect((shost.as_str(), *sport), (host.as_str(), port)).await
.map_err(|_| MailboxError::Ws)?
.into_inner();
let req_url = self.url.as_str();
tokio_tungstenite::client_async_tls_with_config(req_url, tcp, None, None).await.map_err(|_| MailboxError::Ws)?
} else {
connect_async(&self.url).await.map_err(|_| MailboxError::Ws)?
};
info!(target: "anubis.mailbox", url=%self.url, side=%self.side, "connected");
let (mut sink, mut stream) = ws.split();
let bind = serde_json::json!({
"type": "bind",
"appid": self.appid,
"side": self.side,
"client_version": [0,0,1],
});
sink.send(Message::Text(bind.to_string())).await.map_err(|_| MailboxError::Ws)?;
let (tx_srv, rx_srv) = mpsc::unbounded_channel();
tokio::spawn(async move {
while let Some(msg) = stream.next().await {
match msg {
Ok(Message::Text(txt)) => {
let v: serde_json::Value = match serde_json::from_str(&txt) { Ok(v) => v, Err(_) => continue };
let mtype = v.get("type").and_then(|t| t.as_str()).unwrap_or("");
let parsed = match mtype {
"welcome" => ServerMsg::Welcome { motd: v["welcome"]["motd"].as_str().map(|s| s.to_string()), current_cli_version: None, error: v["welcome"]["error"].as_str().map(|s| s.to_string()) },
"allocated" => ServerMsg::NameplateAllocated { nameplate: v["nameplate"].as_str().unwrap_or("").to_string() },
"claimed" => ServerMsg::Claimed { mailbox: v["mailbox"].as_str().unwrap_or("").to_string() },
"message" => ServerMsg::Message { side: v["side"].as_str().unwrap_or("").to_string(), phase: v["phase"].as_str().unwrap_or("").to_string(), body: v["body"].as_str().unwrap_or("").to_string() },
"nameplates" => {
let ids = v["nameplates"].as_array().map(|arr| arr.iter().filter_map(|n| n["id"].as_str().map(|s| s.to_string())).collect()).unwrap_or_else(|| vec![]);
ServerMsg::Nameplates { ids }
}
"released" => ServerMsg::Released { },
"closed" => ServerMsg::Closed { },
"pong" => ServerMsg::Pong,
"error" => ServerMsg::Error { error: v["error"].as_str().unwrap_or("").to_string() },
_ => continue,
};
match &parsed {
ServerMsg::Welcome { .. } => info!(target: "anubis.mailbox", "rx: welcome"),
ServerMsg::NameplateAllocated { nameplate } => info!(target: "anubis.mailbox", nameplate=%nameplate, "rx: allocated"),
ServerMsg::Claimed { mailbox } => info!(target: "anubis.mailbox", mailbox=%mailbox, "rx: claimed"),
ServerMsg::Message { phase, side, .. } => debug!(target: "anubis.mailbox", phase=%phase, side=%side, "rx: message"),
ServerMsg::Nameplates { ids } => info!(target: "anubis.mailbox", count=%ids.len(), "rx: nameplates"),
ServerMsg::Pong => debug!(target: "anubis.mailbox", "rx: pong"),
ServerMsg::Error { error } => warn!(target: "anubis.mailbox", error=%error, "rx: error"),
ServerMsg::Released { .. } => info!(target: "anubis.mailbox", "rx: released"),
ServerMsg::Closed { .. } => info!(target: "anubis.mailbox", "rx: closed"),
}
let _ = tx_srv.send(parsed);
}
Ok(Message::Ping(_)) => { let _ = tx_srv.send(ServerMsg::Pong); }
Ok(Message::Pong(_)) => { let _ = tx_srv.send(ServerMsg::Pong); }
Err(_) => { let _ = tx_srv.send(ServerMsg::Error { error: "stream_closed".to_string() }); break; }
_ => {}
}
}
});
self.tx = Some(sink);
self.rx = Some(rx_srv);
Ok(())
}
pub fn side(&self) -> &str { &self.side }
async fn send_json(&mut self, v: serde_json::Value) -> Result<(), MailboxError> {
if let Some(sink) = &mut self.tx {
debug!(target: "anubis.mailbox", msg=%v["type"].as_str().unwrap_or(""), "tx: json");
sink.send(Message::Text(v.to_string())).await.map_err(|_| MailboxError::Ws)?;
}
Ok(())
}
pub async fn allocate(&mut self) -> Result<(), MailboxError> {
self.send_json(serde_json::json!({"type":"allocate"})).await
}
pub async fn claim(&mut self, nameplate: &str) -> Result<(), MailboxError> {
self.last_nameplate = Some(nameplate.to_string());
self.send_json(serde_json::json!({"type":"claim","nameplate":nameplate})).await
}
pub async fn open(&mut self, mailbox: &str) -> Result<(), MailboxError> {
self.last_mailbox = Some(mailbox.to_string());
self.send_json(serde_json::json!({"type":"open","mailbox":mailbox})).await
}
pub async fn add(&mut self, phase: &str, body_hex: &str) -> Result<(), MailboxError> {
self.send_json(serde_json::json!({"type":"add","phase":phase,"body":body_hex})).await
}
pub async fn release(&mut self, nameplate: &str) -> Result<(), MailboxError> {
self.send_json(serde_json::json!({"type":"release","nameplate":nameplate})).await
}
pub async fn close(&mut self, mailbox: &str, mood: &str) -> Result<(), MailboxError> {
self.send_json(serde_json::json!({"type":"close","mailbox":mailbox,"mood":mood})).await
}
pub async fn next(&mut self) -> Option<ServerMsg> {
if let Some(rx) = &mut self.rx {
let msg = rx.recv().await;
match &msg {
Some(ServerMsg::Message { .. }) => {
if let Some(np) = self.last_nameplate.clone() {
let _ = self.release(&np).await; self.last_nameplate = None;
}
}
Some(ServerMsg::Pong) => { self.last_pong = Some(std::time::Instant::now()); }
Some(ServerMsg::Error { error }) if error == "stream_closed" => {
}
_ => {}
}
msg
} else { None }
}
pub async fn reopen_last(&mut self) {
if let Some(mb) = self.last_mailbox.clone() {
let _ = self.open(&mb).await;
}
}
pub async fn connect_with_backoff(&mut self) -> Result<(), MailboxError> {
let mut delay = 1u64;
loop {
match self.connect().await {
Ok(()) => return Ok(()),
Err(_) => {
warn!(target: "anubis.mailbox", secs=delay, "connect failed; backing off");
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
delay = (delay * 2).min(30);
}
}
}
}
pub async fn list(&mut self) -> Result<(), MailboxError> {
self.send_json(serde_json::json!({"type":"list"})).await
}
pub async fn ping(&mut self) -> Result<(), MailboxError> {
self.ping_counter = self.ping_counter.wrapping_add(1);
self.send_json(serde_json::json!({"type":"ping","ping": self.ping_counter})).await
}
}
fn normalize_relay_url(s: String) -> String {
if let Ok(mut u) = Url::parse(&s) {
let p = u.path();
if p.is_empty() || p == "/" {
u.set_path("/v1");
return u.to_string();
}
return s;
}
s
}