1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
//! Web bridge implementation.
//!
//! Implements the [`Channel`] trait for the web interface, allowing
//! the gateway to route messages to and from the HTTP API.
//!
//! Uses mpsc channels to bridge:
//! - **Incoming**: HTTP POST /api/chat → mpsc → Gateway → Kernel
//! - **Outgoing**: Kernel → Gateway → mpsc → WebSocket/SSE clients
use anyhow::Result;
use async_trait::async_trait;
use oxios_gateway::GatewayInbox;
use oxios_gateway::channel::Channel;
use oxios_gateway::message::{IncomingMessage, OutgoingMessage};
use oxios_gateway::{ReliabilityLayer, ReplayResult};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, broadcast, mpsc, oneshot, watch};
/// The web bridge adapter.
///
/// Bridges the axum HTTP server with the gateway's channel interface
/// using mpsc channels for message passing.
pub struct WebBridge {
/// Receiver for incoming messages from the HTTP layer.
/// `Option` so `start()` can take ownership via `take()`.
incoming_rx: Mutex<Option<mpsc::Receiver<IncomingMessage>>>,
/// Sender to pass to the HTTP layer for injecting messages.
incoming_tx: mpsc::Sender<IncomingMessage>,
/// Broadcaster for outgoing messages to WebSocket/SSE clients.
outgoing_tx: broadcast::Sender<OutgoingMessage>,
/// Correlation map for HTTP request-response matching.
responses: Arc<RwLock<HashMap<uuid::Uuid, oneshot::Sender<OutgoingMessage>>>>,
/// RFC-024 SP2: per-bridge reliability layer (independent of the
/// gateway's global one) so WS resume replays go through the same
/// broadcast channel that live messages use.
reliability: Arc<ReliabilityLayer>,
}
impl WebBridge {
/// Creates a new web bridge with a bounded message buffer and its own
/// reliability layer (for WS resume/replay).
pub fn new(buffer: usize, reliability: Arc<ReliabilityLayer>) -> Self {
let (incoming_tx, incoming_rx) = mpsc::channel(buffer);
let (outgoing_tx, _) = broadcast::channel(buffer);
Self {
incoming_rx: Mutex::new(Some(incoming_rx)),
incoming_tx,
outgoing_tx,
responses: Arc::new(RwLock::new(HashMap::new())),
reliability,
}
}
/// Returns a sender that can be used by HTTP handlers to inject messages.
pub fn sender(&self) -> mpsc::Sender<IncomingMessage> {
self.incoming_tx.clone()
}
/// Returns a receiver for outgoing messages (used by WebSocket/SSE handlers).
pub fn subscribe_outgoing(&self) -> broadcast::Receiver<OutgoingMessage> {
self.outgoing_tx.subscribe()
}
/// Send a message directly (for use in tests or direct API responses).
pub fn broadcast_outgoing(&self, msg: OutgoingMessage) -> Result<()> {
let _ = self.outgoing_tx.send(msg);
Ok(())
}
/// Deliver a response to the registered handler, if any.
/// Also broadcasts for WebSocket/SSE clients.
pub async fn deliver_response(&self, msg: OutgoingMessage) -> Result<()> {
let msg_id = msg.id;
// Try to deliver to a registered HTTP handler first.
{
let mut responses = self.responses.write().await;
if let Some(sender) = responses.remove(&msg_id) {
let _ = sender.send(msg.clone());
}
}
// Always broadcast for WebSocket/SSE clients.
let _ = self.outgoing_tx.send(msg);
tracing::debug!(msg_id = %msg_id, "Delivering response");
Ok(())
}
}
#[async_trait]
impl Channel for WebBridge {
fn name(&self) -> &str {
"web"
}
async fn start(
&self,
tx: mpsc::Sender<GatewayInbox>,
mut shutdown: watch::Receiver<bool>,
) -> Result<tokio::task::JoinHandle<()>> {
let internal_rx = self.incoming_rx.lock().await.take();
let Some(mut internal_rx) = internal_rx else {
anyhow::bail!("Web bridge already started (no receiver)");
};
let channel_name = self.name().to_owned();
let handle = tokio::spawn(async move {
loop {
tokio::select! {
msg = internal_rx.recv() => {
match msg {
Some(msg) => {
if tx.send((channel_name.clone(), msg)).await.is_err() {
break; // Gateway receiver closed
}
}
None => break,
}
}
_ = shutdown.changed() => break,
}
}
tracing::info!(channel = %channel_name, "Web bridge stopped");
});
Ok(handle)
}
async fn send(&self, msg: OutgoingMessage) -> Result<()> {
// Route the response back to the waiting HTTP handler via correlation map.
// The OutgoingMessage.id matches the original IncomingMessage.id,
// which is the key registered by send_and_wait().
{
let mut responses = self.responses.write().await;
if let Some(sender) = responses.remove(&msg.id) {
let _ = sender.send(msg.clone());
tracing::debug!(msg_id = %msg.id, "Correlated response to HTTP handler");
}
}
// Always broadcast for WebSocket/SSE clients.
let _ = self.outgoing_tx.send(msg);
Ok(())
}
}
impl std::fmt::Debug for WebBridge {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WebBridge").finish()
}
}
/// Shared handle to the web bridge, used by route handlers.
#[derive(Debug, Clone)]
pub struct WebBridgeHandle {
/// Sender for injecting incoming messages into the gateway pipeline.
pub incoming_tx: mpsc::Sender<IncomingMessage>,
/// Broadcast sender for pushing outgoing messages to WebSocket/SSE.
pub outgoing_tx: broadcast::Sender<OutgoingMessage>,
/// Correlation map for HTTP request-response matching.
responses: Arc<RwLock<HashMap<uuid::Uuid, oneshot::Sender<OutgoingMessage>>>>,
/// RFC-024 SP2: per-bridge reliability layer shared with [`WebBridge`].
reliability: Arc<ReliabilityLayer>,
/// RFC-024 SP1: ceiling on `send_and_wait`. When the gateway does not
/// respond within this duration, the request is dropped and the HTTP
/// layer returns 504 Gateway Timeout. Default 120 s.
response_timeout: std::time::Duration,
}
impl WebBridgeHandle {
/// Creates a new handle from a WebBridge.
pub fn from_bridge(channel: &WebBridge) -> Self {
Self {
incoming_tx: channel.sender(),
outgoing_tx: channel.outgoing_tx.clone(),
responses: channel.responses.clone(),
reliability: channel.reliability.clone(),
response_timeout: std::time::Duration::from_secs(120),
}
}
/// Override the default `send_and_wait` timeout.
pub fn with_response_timeout(mut self, timeout: std::time::Duration) -> Self {
self.response_timeout = timeout;
self
}
/// RFC-024 SP2 / C2 (replay): look up messages newer than `last_seq` in
/// the per-bridge reliability layer and broadcast them through the
/// outgoing channel so a WebSocket handler forwarding `outgoing_rx` to
/// the client sees them as if they had just been delivered.
///
/// If the cursor is older than the buffer's oldest surviving message,
/// a synthetic `type: "resync"` message is broadcast instead and the
/// client is expected to pull state via the regular HTTP API.
pub fn replay_after(&self, last_seq: u64) {
match self.reliability.replay(last_seq) {
ReplayResult::Replay(msgs) => {
for m in msgs {
let _ = self.outgoing_tx.send(m);
}
}
ReplayResult::Resync => {
let mut meta = HashMap::new();
meta.insert("type".into(), "resync".into());
let resync = OutgoingMessage::with_id(uuid::Uuid::new_v4(), "web", "system", "")
.with_metadata_only(meta);
let _ = self.outgoing_tx.send(resync);
}
}
}
/// Subscribe to outgoing messages.
pub fn subscribe(&self) -> broadcast::Receiver<OutgoingMessage> {
self.outgoing_tx.subscribe()
}
/// Send an incoming message to the gateway pipeline.
pub async fn send_incoming(&self, msg: IncomingMessage) -> Result<()> {
self.incoming_tx
.send(msg)
.await
.map_err(|e| anyhow::anyhow!("{e}"))
}
/// Send a message and wait for a response.
///
/// This registers a oneshot receiver for the response and waits for it.
/// Used by the HTTP chat endpoint to get the orchestrator's response.
///
/// RFC-024 SP1 / C1 (response guarantee): the wait is bounded by
/// `response_timeout` (default 120 s). On timeout the correlation map
/// entry is removed (no leak) and the caller receives a `Timeout` error
/// so the HTTP layer can map it to a 504 Gateway Timeout.
pub async fn send_and_wait(&self, msg: IncomingMessage) -> Result<OutgoingMessage> {
self.send_and_wait_with_timeout(msg, self.response_timeout)
.await
}
/// Like [`send_and_wait`] but with an explicit timeout. Exposed for
/// tests and for callers that want a different ceiling (e.g. health
/// probes with a 1 s deadline).
pub async fn send_and_wait_with_timeout(
&self,
msg: IncomingMessage,
timeout: std::time::Duration,
) -> Result<OutgoingMessage> {
let (tx, rx) = oneshot::channel::<OutgoingMessage>();
let msg_id = msg.id;
// Register the response handler before sending.
{
let mut responses = self.responses.write().await;
responses.insert(msg_id, tx);
}
// Send the message.
if let Err(e) = self.incoming_tx.send(msg).await {
// Could not even enqueue — drop our correlation entry.
self.responses.write().await.remove(&msg_id);
return Err(anyhow::anyhow!("incoming channel send failed: {e}"));
}
match tokio::time::timeout(timeout, rx).await {
Ok(Ok(resp)) => Ok(resp),
Ok(Err(_)) => {
// Gateway gave up; remove the entry.
self.responses.write().await.remove(&msg_id);
Err(anyhow::anyhow!("response channel dropped"))
}
Err(_) => {
// Deadline elapsed; remove the entry to prevent a leak.
self.responses.write().await.remove(&msg_id);
Err(anyhow::anyhow!("gateway response timeout"))
}
}
}
}