anubis-wormhole 1.0.0

A post-quantum secure file transfer tool based on the Magic Wormhole protocol.
Documentation
use crate::mailbox::client::{MailboxClient, MailboxError};
use crate::mailbox::types::ServerMsg;
use crate::journal::{Journal, JournalEvent, NoopJournal};
use std::time::{Duration, Instant};
use tracing::{info, warn};

pub struct MailboxSupervisor<J: Journal = NoopJournal> {
    client: MailboxClient,
    hb_timeout: Duration,
    journal: J,
    last_ping: Option<Instant>,
}

impl<J: Journal> MailboxSupervisor<J> {
    pub fn new(client: MailboxClient, journal: J) -> Self {
        Self { client, hb_timeout: Duration::from_secs(90), journal, last_ping: None }
    }
    pub fn with_heartbeat_timeout(mut self, d: Duration) -> Self { self.hb_timeout = d; self }
    pub fn client_mut(&mut self) -> &mut MailboxClient { &mut self.client }

    pub async fn ensure_connected(&mut self) -> Result<(), MailboxError> {
        self.client.connect_with_backoff().await
    }

    pub async fn poll_next(&mut self) -> Option<ServerMsg> {
        let start = Instant::now();
        loop {
            // heartbeat: send ping every 30s
            let now = Instant::now();
            if self.last_ping.map(|p| now.duration_since(p) > Duration::from_secs(30)).unwrap_or(true) {
                let _ = self.client.ping().await; // ignore errors; reconnect logic below handles failures
                self.last_ping = Some(now);
                info!(target: "anubis.mailbox", "tx: ping");
            }
            // heartbeat timeout
            if let Some(last) = self.client.last_pong {
                if last.elapsed() > self.hb_timeout {
                    // reconnect and reopen
                    warn!(target: "anubis.mailbox", "heartbeat timeout; reconnecting");
                    let _ = self.client.connect_with_backoff().await;
                    self.client.reopen_last().await;
                }
            }
            if let Some(msg) = self.client.next().await {
                // auto-handle stream_closed errors by reconnecting and re-opening
                if let ServerMsg::Error { error } = &msg {
                    if error == "stream_closed" {
                        warn!(target: "anubis.mailbox", "stream closed; reconnecting");
                        let _ = self.client.connect_with_backoff().await;
                        self.client.reopen_last().await;
                        continue;
                    }
                }
                return Some(msg);
            }
            if start.elapsed() > Duration::from_secs(5) {
                // periodically yield control
                tokio::time::sleep(Duration::from_millis(100)).await;
            }
        }
    }

    pub async fn ensure_session_allocate_claim_open(&mut self) -> Result<(String, String), MailboxError> {
        self.ensure_connected().await?;
        self.client.allocate().await?;
        self.journal.record(&JournalEvent::MailboxAllocated { nameplate: "".to_string() });
        let nameplate = loop {
            if let Some(ServerMsg::NameplateAllocated { nameplate }) = self.poll_next().await { break nameplate; }
        };
        self.client.claim(&nameplate).await?;
        self.journal.record(&JournalEvent::MailboxClaimed { nameplate: nameplate.clone() });
        let mailbox = loop {
            if let Some(ServerMsg::Claimed { mailbox }) = self.poll_next().await { break mailbox; }
        };
        self.client.open(&mailbox).await?;
        self.journal.record(&JournalEvent::MailboxOpened { mailbox: mailbox.clone() });
        Ok((nameplate, mailbox))
    }

    pub async fn close_and_release(&mut self, mood: &str) {
        if let Some(np) = self.client.last_nameplate.clone() {
            let _ = self.client.release(&np).await;
            self.journal.record(&JournalEvent::MailboxReleased { nameplate: np });
            self.client.last_nameplate = None;
        }
        if let Some(mb) = self.client.last_mailbox.clone() {
            let _ = self.client.close(&mb, mood).await;
            self.journal.record(&JournalEvent::MailboxClosed { mailbox: mb, mood: mood.to_string() });
            self.client.last_mailbox = None;
        }
    }

    /// Replay mailbox state from journal events, then reconnect/reopen.
    pub async fn replay_from_events(&mut self, events: &[JournalEvent]) -> Result<(), MailboxError> {
        let mut last_np: Option<String> = None;
        let mut last_mb: Option<String> = None;
        for ev in events {
            match ev {
                JournalEvent::MailboxClaimed { nameplate } => last_np = Some(nameplate.clone()),
                JournalEvent::MailboxOpened { mailbox } => last_mb = Some(mailbox.clone()),
                _ => {}
            }
        }
        if let Some(np) = last_np { self.client.last_nameplate = Some(np); }
        if let Some(mb) = last_mb { self.client.last_mailbox = Some(mb); }
        self.ensure_connected().await?;
        self.client.reopen_last().await;
        Ok(())
    }
}