actflow_agent_sdk/
server.rs

1//! Agent gRPC server implementation.
2
3use crate::agent::{Agent, LogSender};
4use crate::proto::{self, agent_service_server};
5use crate::types::{Context, prost_value_to_json};
6use std::net::ToSocketAddrs;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio_stream::wrappers::ReceiverStream;
10use tonic::{Request, Response, Status};
11
12/// Agent gRPC server.
13///
14/// Wraps an `Agent` implementation and exposes it as a gRPC service.
15pub struct AgentServer<A: Agent> {
16    agent: Arc<A>,
17}
18
19impl<A: Agent> AgentServer<A> {
20    /// Create a new agent server.
21    pub fn new(agent: A) -> Self {
22        Self {
23            agent: Arc::new(agent),
24        }
25    }
26
27    /// Start serving on the given address.
28    ///
29    /// # Example
30    ///
31    /// ```rust,ignore
32    /// AgentServer::new(MyAgent)
33    ///     .serve("0.0.0.0:50051")
34    ///     .await
35    ///     .unwrap();
36    /// ```
37    pub async fn serve(self, addr: impl ToSocketAddrs) -> Result<(), Box<dyn std::error::Error>> {
38        let addr = addr.to_socket_addrs()?.next().ok_or("Invalid address")?;
39
40        let svc =
41            agent_service_server::AgentServiceServer::new(AgentServiceImpl { agent: self.agent });
42
43        tonic::transport::Server::builder()
44            .add_service(svc)
45            .serve(addr)
46            .await?;
47
48        Ok(())
49    }
50
51    /// Get the gRPC service for custom server configuration.
52    pub fn into_service(self) -> agent_service_server::AgentServiceServer<AgentServiceImpl<A>> {
53        agent_service_server::AgentServiceServer::new(AgentServiceImpl { agent: self.agent })
54    }
55}
56
57/// Internal gRPC service implementation.
58pub struct AgentServiceImpl<A: Agent> {
59    agent: Arc<A>,
60}
61
62#[tonic::async_trait]
63impl<A: Agent> agent_service_server::AgentService for AgentServiceImpl<A> {
64    type RunStream = ReceiverStream<Result<proto::AgentUpdate, Status>>;
65
66    async fn run(
67        &self,
68        request: Request<proto::RunRequest>,
69    ) -> Result<Response<Self::RunStream>, Status> {
70        let req = request.into_inner();
71
72        let nid = req.nid;
73        let ctx: Context = req.ctx.unwrap_or_default().into();
74        let inputs = req
75            .inputs
76            .map(prost_value_to_json)
77            .unwrap_or(serde_json::Value::Null);
78
79        let (tx, rx) = mpsc::channel(32);
80        let (log_tx, mut log_rx) = mpsc::channel::<String>(1024);
81
82        let agent = self.agent.clone();
83        let log_sender = LogSender::new(log_tx);
84
85        // Spawn task to forward logs
86        let tx_clone = tx.clone();
87        tokio::spawn(async move {
88            while let Some(log) = log_rx.recv().await {
89                let update = proto::AgentUpdate {
90                    relay_message: Some(proto::agent_update::RelayMessage::Log(log)),
91                };
92                if tx_clone.send(Ok(update)).await.is_err() {
93                    break;
94                }
95            }
96        });
97
98        // Spawn task to run agent
99        tokio::spawn(async move {
100            let output = agent.run(nid, ctx, inputs, log_sender).await;
101            let update = proto::AgentUpdate {
102                relay_message: Some(proto::agent_update::RelayMessage::Output(output.into())),
103            };
104            let _ = tx.send(Ok(update)).await;
105        });
106
107        Ok(Response::new(ReceiverStream::new(rx)))
108    }
109
110    async fn shutdown(
111        &self,
112        _request: Request<proto::Empty>,
113    ) -> Result<Response<proto::Empty>, Status> {
114        self.agent.shutdown().await;
115        Ok(Response::new(proto::Empty {}))
116    }
117}