actflow_agent_sdk/
server.rs

1//! Agent gRPC server implementation.
2use std::net::ToSocketAddrs;
3use std::sync::Arc;
4
5use tokio::sync::mpsc;
6use tokio_stream::wrappers::ReceiverStream;
7use tonic::{Request, Response, Status};
8
9use crate::{
10    agent::{Agent, LogSender},
11    proto::{self, agent_service_server},
12    types::prost_value_to_json,
13};
14
15/// Agent gRPC server.
16///
17/// Wraps an `Agent` implementation and exposes it as a gRPC service.
18pub struct AgentServer<A: Agent> {
19    agent: Arc<A>,
20}
21
22impl<A: Agent> AgentServer<A> {
23    /// Create a new agent server.
24    pub fn new(agent: A) -> Self {
25        Self {
26            agent: Arc::new(agent),
27        }
28    }
29
30    /// Start serving on the given address.
31    ///
32    /// # Example
33    ///
34    /// ```rust,ignore
35    /// AgentServer::new(MyAgent)
36    ///     .serve("0.0.0.0:50051")
37    ///     .await
38    ///     .unwrap();
39    /// ```
40    pub async fn serve(self, addr: impl ToSocketAddrs) -> Result<(), Box<dyn std::error::Error>> {
41        let addr = addr.to_socket_addrs()?.next().ok_or("Invalid address")?;
42
43        let svc =
44            agent_service_server::AgentServiceServer::new(AgentServiceImpl { agent: self.agent });
45
46        tonic::transport::Server::builder()
47            .add_service(svc)
48            .serve(addr)
49            .await?;
50
51        Ok(())
52    }
53
54    /// Get the gRPC service for custom server configuration.
55    pub fn into_service(self) -> agent_service_server::AgentServiceServer<AgentServiceImpl<A>> {
56        agent_service_server::AgentServiceServer::new(AgentServiceImpl { agent: self.agent })
57    }
58}
59
60/// Internal gRPC service implementation.
61pub struct AgentServiceImpl<A: Agent> {
62    agent: Arc<A>,
63}
64
65#[tonic::async_trait]
66impl<A: Agent> agent_service_server::AgentService for AgentServiceImpl<A> {
67    type RunStream = ReceiverStream<Result<proto::AgentUpdate, Status>>;
68
69    async fn run(
70        &self,
71        request: Request<proto::RunRequest>,
72    ) -> Result<Response<Self::RunStream>, Status> {
73        let req = request.into_inner();
74
75        let pid = req.pid;
76        let nid = req.nid;
77        let inputs = req
78            .inputs
79            .map(prost_value_to_json)
80            .unwrap_or(serde_json::Value::Null);
81
82        let (tx, rx) = mpsc::channel(32);
83        let (log_tx, mut log_rx) = mpsc::channel::<String>(1024);
84
85        let agent = self.agent.clone();
86        let log_sender = LogSender::new(log_tx);
87
88        // Spawn task to forward logs
89        let tx_clone = tx.clone();
90        tokio::spawn(async move {
91            while let Some(log) = log_rx.recv().await {
92                let update = proto::AgentUpdate {
93                    relay_message: Some(proto::agent_update::RelayMessage::Log(log)),
94                };
95                if tx_clone.send(Ok(update)).await.is_err() {
96                    break;
97                }
98            }
99        });
100
101        // Spawn task to run agent
102        tokio::spawn(async move {
103            let output = agent.run(pid, nid, inputs, log_sender).await;
104            let update = proto::AgentUpdate {
105                relay_message: Some(proto::agent_update::RelayMessage::Output(output.into())),
106            };
107            let _ = tx.send(Ok(update)).await;
108        });
109
110        Ok(Response::new(ReceiverStream::new(rx)))
111    }
112
113    async fn shutdown(
114        &self,
115        _request: Request<proto::Empty>,
116    ) -> Result<Response<proto::Empty>, Status> {
117        self.agent.shutdown().await;
118        Ok(Response::new(proto::Empty {}))
119    }
120}