use tokio::sync::mpsc;
use tokio::time::Instant;
use crate::error::TransportError;
use crate::transport::{ConnectionContext, JsonRpcMessage, Transport, TransportType};
pub struct ServerActorEntry {
pub tx: mpsc::Sender<JsonRpcMessage>,
pub mode: String,
pub a2a_skill_rx: Option<tokio::sync::watch::Receiver<Option<String>>>,
}
pub struct ServerRequest {
pub actor_name: String,
pub request: JsonRpcMessage,
}
pub struct AgUiHandle {
rx: tokio::sync::Mutex<mpsc::UnboundedReceiver<JsonRpcMessage>>,
response_tx: mpsc::UnboundedSender<JsonRpcMessage>,
created_at: Instant,
}
impl AgUiHandle {
#[must_use]
pub fn new(
rx: mpsc::UnboundedReceiver<JsonRpcMessage>,
response_tx: mpsc::UnboundedSender<JsonRpcMessage>,
) -> Self {
Self {
rx: tokio::sync::Mutex::new(rx),
response_tx,
created_at: Instant::now(),
}
}
}
#[async_trait::async_trait]
impl Transport for AgUiHandle {
async fn send_message(&self, message: &JsonRpcMessage) -> crate::transport::Result<()> {
self.response_tx
.send(message.clone())
.map_err(|_| TransportError::ConnectionClosed("drive loop closed".into()))?;
Ok(())
}
async fn send_raw(&self, _bytes: &[u8]) -> crate::transport::Result<()> {
Err(TransportError::ConnectionClosed(
"send_raw not supported in context-mode".into(),
))
}
async fn receive_message(&self) -> crate::transport::Result<Option<JsonRpcMessage>> {
let mut rx = self.rx.lock().await;
Ok(rx.recv().await)
}
fn transport_type(&self) -> TransportType {
TransportType::Context
}
async fn finalize_response(&self) -> crate::transport::Result<()> {
Ok(())
}
fn connection_context(&self) -> ConnectionContext {
ConnectionContext {
connection_id: 0,
remote_addr: None,
is_exclusive: true,
connected_at: self.created_at,
}
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
pub struct ServerHandle {
rx: tokio::sync::Mutex<mpsc::Receiver<JsonRpcMessage>>,
result_tx: mpsc::Sender<JsonRpcMessage>,
server_request_tx: mpsc::Sender<ServerRequest>,
actor_name: String,
created_at: Instant,
}
impl ServerHandle {
#[must_use]
pub fn new(
rx: mpsc::Receiver<JsonRpcMessage>,
result_tx: mpsc::Sender<JsonRpcMessage>,
server_request_tx: mpsc::Sender<ServerRequest>,
actor_name: String,
) -> Self {
Self {
rx: tokio::sync::Mutex::new(rx),
result_tx,
server_request_tx,
actor_name,
created_at: Instant::now(),
}
}
}
#[async_trait::async_trait]
impl Transport for ServerHandle {
async fn send_message(&self, message: &JsonRpcMessage) -> crate::transport::Result<()> {
match message {
JsonRpcMessage::Request(_) => {
self.server_request_tx
.send(ServerRequest {
actor_name: self.actor_name.clone(),
request: message.clone(),
})
.await
.map_err(|_| TransportError::ConnectionClosed("drive loop closed".into()))?;
}
_ => {
self.result_tx.send(message.clone()).await.map_err(|_| {
TransportError::ConnectionClosed("context transport closed".into())
})?;
}
}
Ok(())
}
async fn send_raw(&self, _bytes: &[u8]) -> crate::transport::Result<()> {
Err(TransportError::ConnectionClosed(
"send_raw not supported in context-mode".into(),
))
}
async fn receive_message(&self) -> crate::transport::Result<Option<JsonRpcMessage>> {
let mut rx = self.rx.lock().await;
Ok(rx.recv().await)
}
fn transport_type(&self) -> TransportType {
TransportType::Context
}
async fn finalize_response(&self) -> crate::transport::Result<()> {
Ok(())
}
fn connection_context(&self) -> ConnectionContext {
ConnectionContext {
connection_id: 0,
remote_addr: None,
is_exclusive: true,
connected_at: self.created_at,
}
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}