use crate::mailbox::client::{MailboxClient, MailboxError};
use crate::mailbox::types::ServerMsg;
use crate::journal::{Journal, JournalEvent, NoopJournal};
use std::time::{Duration, Instant};
use tracing::{info, warn};
pub struct MailboxSupervisor<J: Journal = NoopJournal> {
client: MailboxClient,
hb_timeout: Duration,
journal: J,
last_ping: Option<Instant>,
}
impl<J: Journal> MailboxSupervisor<J> {
pub fn new(client: MailboxClient, journal: J) -> Self {
Self { client, hb_timeout: Duration::from_secs(90), journal, last_ping: None }
}
pub fn with_heartbeat_timeout(mut self, d: Duration) -> Self { self.hb_timeout = d; self }
pub fn client_mut(&mut self) -> &mut MailboxClient { &mut self.client }
pub async fn ensure_connected(&mut self) -> Result<(), MailboxError> {
self.client.connect_with_backoff().await
}
pub async fn poll_next(&mut self) -> Option<ServerMsg> {
let start = Instant::now();
loop {
let now = Instant::now();
if self.last_ping.map(|p| now.duration_since(p) > Duration::from_secs(30)).unwrap_or(true) {
let _ = self.client.ping().await; self.last_ping = Some(now);
info!(target: "anubis.mailbox", "tx: ping");
}
if let Some(last) = self.client.last_pong {
if last.elapsed() > self.hb_timeout {
warn!(target: "anubis.mailbox", "heartbeat timeout; reconnecting");
let _ = self.client.connect_with_backoff().await;
self.client.reopen_last().await;
}
}
if let Some(msg) = self.client.next().await {
if let ServerMsg::Error { error } = &msg {
if error == "stream_closed" {
warn!(target: "anubis.mailbox", "stream closed; reconnecting");
let _ = self.client.connect_with_backoff().await;
self.client.reopen_last().await;
continue;
}
}
return Some(msg);
}
if start.elapsed() > Duration::from_secs(5) {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
pub async fn ensure_session_allocate_claim_open(&mut self) -> Result<(String, String), MailboxError> {
self.ensure_connected().await?;
self.client.allocate().await?;
self.journal.record(&JournalEvent::MailboxAllocated { nameplate: "".to_string() });
let nameplate = loop {
if let Some(ServerMsg::NameplateAllocated { nameplate }) = self.poll_next().await { break nameplate; }
};
self.client.claim(&nameplate).await?;
self.journal.record(&JournalEvent::MailboxClaimed { nameplate: nameplate.clone() });
let mailbox = loop {
if let Some(ServerMsg::Claimed { mailbox }) = self.poll_next().await { break mailbox; }
};
self.client.open(&mailbox).await?;
self.journal.record(&JournalEvent::MailboxOpened { mailbox: mailbox.clone() });
Ok((nameplate, mailbox))
}
pub async fn close_and_release(&mut self, mood: &str) {
if let Some(np) = self.client.last_nameplate.clone() {
let _ = self.client.release(&np).await;
self.journal.record(&JournalEvent::MailboxReleased { nameplate: np });
self.client.last_nameplate = None;
}
if let Some(mb) = self.client.last_mailbox.clone() {
let _ = self.client.close(&mb, mood).await;
self.journal.record(&JournalEvent::MailboxClosed { mailbox: mb, mood: mood.to_string() });
self.client.last_mailbox = None;
}
}
pub async fn replay_from_events(&mut self, events: &[JournalEvent]) -> Result<(), MailboxError> {
let mut last_np: Option<String> = None;
let mut last_mb: Option<String> = None;
for ev in events {
match ev {
JournalEvent::MailboxClaimed { nameplate } => last_np = Some(nameplate.clone()),
JournalEvent::MailboxOpened { mailbox } => last_mb = Some(mailbox.clone()),
_ => {}
}
}
if let Some(np) = last_np { self.client.last_nameplate = Some(np); }
if let Some(mb) = last_mb { self.client.last_mailbox = Some(mb); }
self.ensure_connected().await?;
self.client.reopen_last().await;
Ok(())
}
}