use anyhow::Result;
use async_trait::async_trait;
use oxios_gateway::GatewayInbox;
use oxios_gateway::channel::Channel;
use oxios_gateway::message::{IncomingMessage, OutgoingMessage};
use oxios_gateway::{ReliabilityLayer, ReplayResult};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, broadcast, mpsc, oneshot, watch};
#[derive(Debug)]
pub enum BridgeSendError {
SendFailed(String),
ChannelDropped,
Timeout,
}
impl std::fmt::Display for BridgeSendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BridgeSendError::SendFailed(msg) => {
write!(f, "incoming channel send failed: {msg}")
}
BridgeSendError::ChannelDropped => write!(f, "gateway response channel dropped"),
BridgeSendError::Timeout => write!(f, "gateway response timeout"),
}
}
}
impl std::error::Error for BridgeSendError {}
pub struct WebBridge {
incoming_rx: Mutex<Option<mpsc::Receiver<IncomingMessage>>>,
incoming_tx: mpsc::Sender<IncomingMessage>,
outgoing_tx: broadcast::Sender<OutgoingMessage>,
responses: Arc<RwLock<HashMap<uuid::Uuid, oneshot::Sender<OutgoingMessage>>>>,
reliability: Arc<ReliabilityLayer>,
}
impl WebBridge {
pub fn new(buffer: usize, reliability: Arc<ReliabilityLayer>) -> Self {
let (incoming_tx, incoming_rx) = mpsc::channel(buffer);
let (outgoing_tx, _) = broadcast::channel(buffer);
Self {
incoming_rx: Mutex::new(Some(incoming_rx)),
incoming_tx,
outgoing_tx,
responses: Arc::new(RwLock::new(HashMap::new())),
reliability,
}
}
pub fn sender(&self) -> mpsc::Sender<IncomingMessage> {
self.incoming_tx.clone()
}
#[allow(dead_code)]
pub fn subscribe_outgoing(&self) -> broadcast::Receiver<OutgoingMessage> {
self.outgoing_tx.subscribe()
}
#[allow(dead_code)]
pub fn broadcast_outgoing(&self, msg: OutgoingMessage) -> Result<()> {
let _ = self.outgoing_tx.send(msg);
Ok(())
}
#[allow(dead_code)]
pub async fn deliver_response(&self, msg: OutgoingMessage) -> Result<()> {
let msg_id = msg.id;
{
let mut responses = self.responses.write().await;
if let Some(sender) = responses.remove(&msg_id) {
let _ = sender.send(msg.clone());
}
}
let _ = self.outgoing_tx.send(msg);
tracing::debug!(msg_id = %msg_id, "Delivering response");
Ok(())
}
}
#[async_trait]
impl Channel for WebBridge {
fn name(&self) -> &str {
"web"
}
async fn start(
&self,
tx: mpsc::Sender<GatewayInbox>,
mut shutdown: watch::Receiver<bool>,
) -> Result<tokio::task::JoinHandle<()>> {
let internal_rx = self.incoming_rx.lock().await.take();
let Some(mut internal_rx) = internal_rx else {
anyhow::bail!("Web bridge already started (no receiver)");
};
let channel_name = self.name().to_owned();
let handle = tokio::spawn(async move {
loop {
tokio::select! {
msg = internal_rx.recv() => {
match msg {
Some(msg) => {
if tx.send((channel_name.clone(), msg)).await.is_err() {
break; }
}
None => break,
}
}
_ = shutdown.changed() => break,
}
}
tracing::info!(channel = %channel_name, "Web bridge stopped");
});
Ok(handle)
}
async fn send(&self, msg: OutgoingMessage) -> Result<()> {
{
let mut responses = self.responses.write().await;
if let Some(sender) = responses.remove(&msg.id) {
let _ = sender.send(msg.clone());
tracing::debug!(msg_id = %msg.id, "Correlated response to HTTP handler");
}
}
let _ = self.outgoing_tx.send(msg);
Ok(())
}
}
impl std::fmt::Debug for WebBridge {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WebBridge").finish()
}
}
#[derive(Debug, Clone)]
pub struct WebBridgeHandle {
pub incoming_tx: mpsc::Sender<IncomingMessage>,
pub outgoing_tx: broadcast::Sender<OutgoingMessage>,
responses: Arc<RwLock<HashMap<uuid::Uuid, oneshot::Sender<OutgoingMessage>>>>,
reliability: Arc<ReliabilityLayer>,
response_timeout: std::time::Duration,
}
impl WebBridgeHandle {
pub fn from_bridge(channel: &WebBridge) -> Self {
Self {
incoming_tx: channel.sender(),
outgoing_tx: channel.outgoing_tx.clone(),
responses: channel.responses.clone(),
reliability: channel.reliability.clone(),
response_timeout: std::time::Duration::from_secs(120),
}
}
pub fn with_response_timeout(mut self, timeout: std::time::Duration) -> Self {
self.response_timeout = timeout;
self
}
pub fn replay_after(&self, last_seq: u64) {
let m = oxios_kernel::metrics::get_metrics();
match self.reliability.replay(last_seq) {
ReplayResult::Replay(msgs) => {
m.gateway_replay_replay.inc();
for m in msgs {
let _ = self.outgoing_tx.send(m);
}
}
ReplayResult::Resync => {
m.gateway_replay_resync.inc();
let mut meta = HashMap::new();
meta.insert("type".into(), "resync".into());
let resync = OutgoingMessage::with_id(uuid::Uuid::new_v4(), "web", "system", "")
.with_metadata_only(meta);
let _ = self.outgoing_tx.send(resync);
}
}
}
pub fn subscribe(&self) -> broadcast::Receiver<OutgoingMessage> {
self.outgoing_tx.subscribe()
}
#[allow(dead_code)]
pub async fn send_incoming(&self, msg: IncomingMessage) -> Result<()> {
self.incoming_tx
.send(msg)
.await
.map_err(|e| anyhow::anyhow!("{e}"))
}
pub async fn send_and_wait(
&self,
msg: IncomingMessage,
) -> std::result::Result<OutgoingMessage, BridgeSendError> {
self.send_and_wait_with_timeout(msg, self.response_timeout)
.await
}
pub async fn send_and_wait_with_timeout(
&self,
msg: IncomingMessage,
timeout: std::time::Duration,
) -> std::result::Result<OutgoingMessage, BridgeSendError> {
let (tx, rx) = oneshot::channel::<OutgoingMessage>();
let msg_id = msg.id;
{
let mut responses = self.responses.write().await;
responses.insert(msg_id, tx);
}
let start = std::time::Instant::now();
if let Err(e) = self.incoming_tx.send(msg).await {
self.responses.write().await.remove(&msg_id);
return Err(BridgeSendError::SendFailed(e.to_string()));
}
let outcome = match tokio::time::timeout(timeout, rx).await {
Ok(Ok(resp)) => Ok(resp),
Ok(Err(_)) => {
self.responses.write().await.remove(&msg_id);
Err(BridgeSendError::ChannelDropped)
}
Err(_) => {
self.responses.write().await.remove(&msg_id);
Err(BridgeSendError::Timeout)
}
};
let m = oxios_kernel::metrics::get_metrics();
m.gateway_response_duration
.observe(start.elapsed().as_secs_f64());
match &outcome {
Ok(_) => {} Err(BridgeSendError::Timeout) => m.gateway_messages_timed_out.inc(),
Err(BridgeSendError::ChannelDropped) => m.gateway_messages_dropped.inc(),
Err(BridgeSendError::SendFailed(_)) => m.gateway_messages_dropped.inc(),
}
outcome
}
}