use async_trait::async_trait;
use awaken_contract::contract::event::AgentEvent;
use awaken_contract::contract::event_sink::EventSink;
use tokio::sync::mpsc;
pub struct ChannelEventSink {
tx: mpsc::UnboundedSender<AgentEvent>,
}
impl ChannelEventSink {
pub fn new(tx: mpsc::UnboundedSender<AgentEvent>) -> Self {
Self { tx }
}
}
#[async_trait]
impl EventSink for ChannelEventSink {
async fn emit(&self, event: AgentEvent) {
let _ = self.tx.send(event);
}
async fn close(&self) {
}
}
pub struct ReconnectableEventSink {
inner: arc_swap::ArcSwap<mpsc::Sender<AgentEvent>>,
}
impl ReconnectableEventSink {
pub fn new(tx: mpsc::Sender<AgentEvent>) -> Self {
Self {
inner: arc_swap::ArcSwap::from_pointee(tx),
}
}
pub fn reconnect(&self, new_tx: mpsc::Sender<AgentEvent>) {
self.inner.store(std::sync::Arc::new(new_tx));
}
}
#[async_trait]
impl EventSink for ReconnectableEventSink {
async fn emit(&self, event: AgentEvent) {
let sender = self.inner.load();
let _ = sender.send(event).await;
}
async fn close(&self) {}
}
pub struct BoundedChannelEventSink {
tx: mpsc::Sender<AgentEvent>,
}
impl BoundedChannelEventSink {
pub fn new(tx: mpsc::Sender<AgentEvent>) -> Self {
Self { tx }
}
}
#[async_trait]
impl EventSink for BoundedChannelEventSink {
async fn emit(&self, event: AgentEvent) {
let _ = self.tx.send(event).await;
}
async fn close(&self) {
}
}
#[cfg(test)]
mod tests {
use super::*;
use awaken_contract::contract::event::AgentEvent;
#[tokio::test]
async fn channel_sink_forwards_events() {
let (tx, mut rx) = mpsc::unbounded_channel();
let sink = ChannelEventSink::new(tx);
sink.emit(AgentEvent::TextDelta {
delta: "hello".into(),
})
.await;
sink.emit(AgentEvent::StepEnd).await;
let event1 = rx.recv().await.unwrap();
assert!(matches!(event1, AgentEvent::TextDelta { delta } if delta == "hello"));
let event2 = rx.recv().await.unwrap();
assert!(matches!(event2, AgentEvent::StepEnd));
}
#[tokio::test]
async fn channel_sink_drops_silently_on_closed_receiver() {
let (tx, rx) = mpsc::unbounded_channel();
let sink = ChannelEventSink::new(tx);
drop(rx);
sink.emit(AgentEvent::TextDelta {
delta: "ignored".into(),
})
.await;
}
#[tokio::test]
async fn channel_sink_close_is_noop() {
let (tx, _rx) = mpsc::unbounded_channel();
let sink = ChannelEventSink::new(tx);
sink.close().await; }
#[tokio::test]
async fn reconnectable_sink_forwards_events() {
let (tx, mut rx) = mpsc::channel(16);
let sink = ReconnectableEventSink::new(tx);
sink.emit(AgentEvent::TextDelta {
delta: "hello".into(),
})
.await;
let event = rx.recv().await.unwrap();
assert!(matches!(event, AgentEvent::TextDelta { delta } if delta == "hello"));
}
#[tokio::test]
async fn reconnectable_sink_switches_channel() {
let (tx1, mut rx1) = mpsc::channel(16);
let (tx2, mut rx2) = mpsc::channel(16);
let sink = ReconnectableEventSink::new(tx1);
sink.emit(AgentEvent::TextDelta {
delta: "before".into(),
})
.await;
sink.reconnect(tx2);
sink.emit(AgentEvent::TextDelta {
delta: "after".into(),
})
.await;
let e1 = rx1.recv().await.unwrap();
assert!(matches!(e1, AgentEvent::TextDelta { delta } if delta == "before"));
let e2 = rx2.recv().await.unwrap();
assert!(matches!(e2, AgentEvent::TextDelta { delta } if delta == "after"));
assert!(rx1.try_recv().is_err());
}
#[tokio::test]
async fn reconnectable_sink_survives_dropped_receiver() {
let (tx, rx) = mpsc::channel(16);
let sink = ReconnectableEventSink::new(tx);
drop(rx);
sink.emit(AgentEvent::StepEnd).await;
let (tx2, mut rx2) = mpsc::channel(16);
sink.reconnect(tx2);
sink.emit(AgentEvent::StepEnd).await;
assert!(rx2.recv().await.is_some());
}
#[tokio::test]
async fn reconnectable_sink_multiple_reconnects() {
let (tx1, mut rx1) = mpsc::channel(16);
let sink = ReconnectableEventSink::new(tx1);
sink.emit(AgentEvent::StepEnd).await;
assert!(rx1.recv().await.is_some());
let (tx2, mut rx2) = mpsc::channel(16);
sink.reconnect(tx2);
sink.emit(AgentEvent::StepEnd).await;
assert!(rx2.recv().await.is_some());
assert!(rx1.try_recv().is_err());
let (tx3, mut rx3) = mpsc::channel(16);
sink.reconnect(tx3);
sink.emit(AgentEvent::StepEnd).await;
assert!(rx3.recv().await.is_some());
assert!(rx2.try_recv().is_err()); }
#[tokio::test]
async fn reconnectable_sink_shared_via_arc() {
use std::sync::Arc;
let (tx1, mut rx1) = mpsc::channel(16);
let sink = Arc::new(ReconnectableEventSink::new(tx1));
let s1 = Arc::clone(&sink);
s1.emit(AgentEvent::StepEnd).await;
assert!(rx1.recv().await.is_some());
let (tx2, mut rx2) = mpsc::channel(16);
sink.reconnect(tx2);
s1.emit(AgentEvent::StepEnd).await;
assert!(rx2.recv().await.is_some());
}
}