actflow_agent_sdk/
server.rs1use std::net::ToSocketAddrs;
3use std::sync::Arc;
4
5use tokio::sync::mpsc;
6use tokio_stream::wrappers::ReceiverStream;
7use tonic::{Request, Response, Status};
8
9use crate::{
10 agent::{Agent, LogSender},
11 proto::{self, agent_service_server},
12 types::prost_value_to_json,
13};
14
15pub struct AgentServer<A: Agent> {
19 agent: Arc<A>,
20}
21
22impl<A: Agent> AgentServer<A> {
23 pub fn new(agent: A) -> Self {
25 Self {
26 agent: Arc::new(agent),
27 }
28 }
29
30 pub async fn serve(self, addr: impl ToSocketAddrs) -> Result<(), Box<dyn std::error::Error>> {
41 let addr = addr.to_socket_addrs()?.next().ok_or("Invalid address")?;
42
43 let svc =
44 agent_service_server::AgentServiceServer::new(AgentServiceImpl { agent: self.agent });
45
46 tonic::transport::Server::builder()
47 .add_service(svc)
48 .serve(addr)
49 .await?;
50
51 Ok(())
52 }
53
54 pub fn into_service(self) -> agent_service_server::AgentServiceServer<AgentServiceImpl<A>> {
56 agent_service_server::AgentServiceServer::new(AgentServiceImpl { agent: self.agent })
57 }
58}
59
60pub struct AgentServiceImpl<A: Agent> {
62 agent: Arc<A>,
63}
64
65#[tonic::async_trait]
66impl<A: Agent> agent_service_server::AgentService for AgentServiceImpl<A> {
67 type RunStream = ReceiverStream<Result<proto::AgentUpdate, Status>>;
68
69 async fn run(
70 &self,
71 request: Request<proto::RunRequest>,
72 ) -> Result<Response<Self::RunStream>, Status> {
73 let req = request.into_inner();
74
75 let pid = req.pid;
76 let nid = req.nid;
77 let inputs = req
78 .inputs
79 .map(prost_value_to_json)
80 .unwrap_or(serde_json::Value::Null);
81
82 let (tx, rx) = mpsc::channel(32);
83 let (log_tx, mut log_rx) = mpsc::channel::<String>(1024);
84
85 let agent = self.agent.clone();
86 let log_sender = LogSender::new(log_tx);
87
88 let tx_clone = tx.clone();
90 tokio::spawn(async move {
91 while let Some(log) = log_rx.recv().await {
92 let update = proto::AgentUpdate {
93 relay_message: Some(proto::agent_update::RelayMessage::Log(log)),
94 };
95 if tx_clone.send(Ok(update)).await.is_err() {
96 break;
97 }
98 }
99 });
100
101 tokio::spawn(async move {
103 let output = agent.run(pid, nid, inputs, log_sender).await;
104 let update = proto::AgentUpdate {
105 relay_message: Some(proto::agent_update::RelayMessage::Output(output.into())),
106 };
107 let _ = tx.send(Ok(update)).await;
108 });
109
110 Ok(Response::new(ReceiverStream::new(rx)))
111 }
112
113 async fn shutdown(
114 &self,
115 _request: Request<proto::Empty>,
116 ) -> Result<Response<proto::Empty>, Status> {
117 self.agent.shutdown().await;
118 Ok(Response::new(proto::Empty {}))
119 }
120}