actflow_agent_sdk/
server.rs1use crate::agent::{Agent, LogSender};
4use crate::proto::{self, agent_service_server};
5use crate::types::{Context, prost_value_to_json};
6use std::net::ToSocketAddrs;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio_stream::wrappers::ReceiverStream;
10use tonic::{Request, Response, Status};
11
12pub struct AgentServer<A: Agent> {
16 agent: Arc<A>,
17}
18
19impl<A: Agent> AgentServer<A> {
20 pub fn new(agent: A) -> Self {
22 Self {
23 agent: Arc::new(agent),
24 }
25 }
26
27 pub async fn serve(self, addr: impl ToSocketAddrs) -> Result<(), Box<dyn std::error::Error>> {
38 let addr = addr.to_socket_addrs()?.next().ok_or("Invalid address")?;
39
40 let svc =
41 agent_service_server::AgentServiceServer::new(AgentServiceImpl { agent: self.agent });
42
43 tonic::transport::Server::builder()
44 .add_service(svc)
45 .serve(addr)
46 .await?;
47
48 Ok(())
49 }
50
51 pub fn into_service(self) -> agent_service_server::AgentServiceServer<AgentServiceImpl<A>> {
53 agent_service_server::AgentServiceServer::new(AgentServiceImpl { agent: self.agent })
54 }
55}
56
57pub struct AgentServiceImpl<A: Agent> {
59 agent: Arc<A>,
60}
61
62#[tonic::async_trait]
63impl<A: Agent> agent_service_server::AgentService for AgentServiceImpl<A> {
64 type RunStream = ReceiverStream<Result<proto::AgentUpdate, Status>>;
65
66 async fn run(
67 &self,
68 request: Request<proto::RunRequest>,
69 ) -> Result<Response<Self::RunStream>, Status> {
70 let req = request.into_inner();
71
72 let nid = req.nid;
73 let ctx: Context = req.ctx.unwrap_or_default().into();
74 let inputs = req
75 .inputs
76 .map(prost_value_to_json)
77 .unwrap_or(serde_json::Value::Null);
78
79 let (tx, rx) = mpsc::channel(32);
80 let (log_tx, mut log_rx) = mpsc::channel::<String>(1024);
81
82 let agent = self.agent.clone();
83 let log_sender = LogSender::new(log_tx);
84
85 let tx_clone = tx.clone();
87 tokio::spawn(async move {
88 while let Some(log) = log_rx.recv().await {
89 let update = proto::AgentUpdate {
90 relay_message: Some(proto::agent_update::RelayMessage::Log(log)),
91 };
92 if tx_clone.send(Ok(update)).await.is_err() {
93 break;
94 }
95 }
96 });
97
98 tokio::spawn(async move {
100 let output = agent.run(nid, ctx, inputs, log_sender).await;
101 let update = proto::AgentUpdate {
102 relay_message: Some(proto::agent_update::RelayMessage::Output(output.into())),
103 };
104 let _ = tx.send(Ok(update)).await;
105 });
106
107 Ok(Response::new(ReceiverStream::new(rx)))
108 }
109
110 async fn shutdown(
111 &self,
112 _request: Request<proto::Empty>,
113 ) -> Result<Response<proto::Empty>, Status> {
114 self.agent.shutdown().await;
115 Ok(Response::new(proto::Empty {}))
116 }
117}