brainos_grpcadapter/handlers/
agent.rs1use std::pin::Pin;
5
6use tokio_stream::Stream;
7use tonic::{Request, Response, Status};
8use uuid::Uuid;
9
10use signal::{Signal, SignalSource};
11
12use crate::agent_proto::{
13 agent_service_server::AgentService, BrainEventMessage, BrainEventsRequest, ConnectRequest,
14 ConnectResponse, ReceiveRequest, SignalRequest as AgentSignalRequest,
15 SignalResponse as AgentSignalResponse, SignalUpdate,
16};
17use crate::errors::public_status;
18use crate::events::brain_event_matches;
19use crate::helpers::{non_empty, response_to_string};
20use crate::state::AgentServiceImpl;
21
22type SignalUpdateStream =
24 Pin<Box<dyn Stream<Item = Result<SignalUpdate, Status>> + Send + 'static>>;
25
26type BrainEventStream =
28 Pin<Box<dyn Stream<Item = Result<BrainEventMessage, Status>> + Send + 'static>>;
29
30#[tonic::async_trait]
31impl AgentService for AgentServiceImpl {
32 async fn connect(
34 &self,
35 request: Request<ConnectRequest>,
36 ) -> Result<Response<ConnectResponse>, Status> {
37 let req = request.into_inner();
38 let session_id = Uuid::new_v4().to_string();
39
40 tracing::info!(
41 agent_id = %req.agent_id,
42 agent_type = %req.agent_type,
43 session_id = %session_id,
44 "gRPC agent connected"
45 );
46
47 Ok(Response::new(ConnectResponse {
48 session_id,
49 accepted: true,
50 message: format!(
51 "Synapse established — welcome, {} ({}).",
52 req.agent_id, req.agent_type
53 ),
54 }))
55 }
56
57 async fn send_signal(
59 &self,
60 request: Request<AgentSignalRequest>,
61 ) -> Result<Response<AgentSignalResponse>, Status> {
62 let principal = self.resolve_principal(&request).await;
63 let req = request.into_inner();
64 let source = SignalSource::parse(Some(&req.source), SignalSource::Grpc);
65
66 let sig = Signal::from_adapter_request(signal::AdapterRequest {
67 source,
68 content: req.content,
69 channel: non_empty(req.channel),
70 sender: non_empty(req.sender),
71 metadata: Some(req.metadata),
72 namespace: non_empty(req.namespace),
73 agent: non_empty(req.agent),
74 session_id: non_empty(req.session_id),
75 default_channel: "grpc".to_string(),
76 default_sender: "agent".to_string(),
77 })
78 .with_principal_opt(principal);
79
80 match self.processor.process(sig).await {
81 Ok(resp) => Ok(Response::new(AgentSignalResponse {
82 signal_id: resp.signal_id.to_string(),
83 status: format!("{:?}", resp.status),
84 response: response_to_string(resp.response),
85 facts_used: resp.memory_context.facts_used as u32,
86 episodes_used: resp.memory_context.episodes_used as u32,
87 session_id: resp.session_id.unwrap_or_default(),
88 })),
89 Err(e) => {
90 tracing::error!(error = %e, "gRPC send_signal processing failed");
91 Err(public_status(&e))
92 }
93 }
94 }
95
96 type ReceiveSignalsStream = SignalUpdateStream;
97
98 async fn receive_signals(
100 &self,
101 request: Request<ReceiveRequest>,
102 ) -> Result<Response<Self::ReceiveSignalsStream>, Status> {
103 let req = request.into_inner();
104 let session_id = req.session_id.clone();
105 let mut events = self.processor.subscribe_events();
106
107 tracing::debug!(session_id = %session_id, "ReceiveSignals stream opened");
108
109 let (tx, rx) = tokio::sync::mpsc::channel(32);
110 let now = chrono::Utc::now().to_rfc3339();
111
112 tokio::spawn(async move {
113 if tx
115 .send(Ok(SignalUpdate {
116 event_type: "connected".to_string(),
117 content: format!("Session {session_id} active"),
118 timestamp: now,
119 }))
120 .await
121 .is_err()
122 {
123 return;
124 }
125
126 loop {
127 match events.recv().await {
128 Ok(event) => {
129 let content = format!(
130 "[{}:{}] {}",
131 event.namespace, event.signal_id, event.response
132 );
133 if tx
134 .send(Ok(SignalUpdate {
135 event_type: "processed".to_string(),
136 content,
137 timestamp: event.timestamp.to_rfc3339(),
138 }))
139 .await
140 .is_err()
141 {
142 break;
143 }
144 }
145 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
146 if tx
147 .send(Ok(SignalUpdate {
148 event_type: "lagged".to_string(),
149 content: format!("Dropped {skipped} events"),
150 timestamp: chrono::Utc::now().to_rfc3339(),
151 }))
152 .await
153 .is_err()
154 {
155 break;
156 }
157 }
158 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
159 }
160 }
161 });
162
163 let stream: SignalUpdateStream = Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx));
164 Ok(Response::new(stream))
165 }
166
167 type BrainEventsStream = BrainEventStream;
168
169 async fn brain_events(
172 &self,
173 request: Request<BrainEventsRequest>,
174 ) -> Result<Response<Self::BrainEventsStream>, Status> {
175 let filter = request.into_inner();
176 let Some(mut rx) = self.processor.subscribe_brain_events() else {
177 return Err(Status::failed_precondition(
178 "observability bus not wired on this SignalProcessor",
179 ));
180 };
181
182 tracing::debug!(?filter.kind, ?filter.tool_id, "BrainEvents stream opened");
183
184 let (tx, out) = tokio::sync::mpsc::channel::<Result<BrainEventMessage, Status>>(64);
185 tokio::spawn(async move {
186 loop {
187 match rx.recv().await {
188 Ok(ev) => {
189 if !brain_event_matches(&ev, &filter) {
190 continue;
191 }
192 let event_json = match serde_json::to_string(&ev) {
193 Ok(s) => s,
194 Err(e) => {
195 tracing::warn!("BrainEvents serialise failed: {e}");
196 continue;
197 }
198 };
199 if tx.send(Ok(BrainEventMessage { event_json })).await.is_err() {
200 break;
201 }
202 }
203 Err(tokio::sync::broadcast::error::RecvError::Lagged(_n)) => {
204 continue;
208 }
209 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
210 }
211 }
212 });
213
214 let stream: BrainEventStream = Box::pin(tokio_stream::wrappers::ReceiverStream::new(out));
215 Ok(Response::new(stream))
216 }
217}