Skip to main content

brainos_grpcadapter/handlers/
agent.rs

1//! `AgentService` RPC handlers — connect, send_signal, receive_signals,
2//! brain_events.
3
4use std::pin::Pin;
5
6use tokio_stream::Stream;
7use tonic::{Request, Response, Status};
8use uuid::Uuid;
9
10use signal::{Signal, SignalSource};
11
12use crate::agent_proto::{
13    agent_service_server::AgentService, BrainEventMessage, BrainEventsRequest, ConnectRequest,
14    ConnectResponse, ReceiveRequest, SignalRequest as AgentSignalRequest,
15    SignalResponse as AgentSignalResponse, SignalUpdate,
16};
17use crate::errors::public_status;
18use crate::events::brain_event_matches;
19use crate::helpers::{non_empty, response_to_string};
20use crate::state::AgentServiceImpl;
21
22/// Stream type alias for the server-streaming `ReceiveSignals` RPC.
23type SignalUpdateStream =
24    Pin<Box<dyn Stream<Item = Result<SignalUpdate, Status>> + Send + 'static>>;
25
26/// Stream type alias for the server-streaming `BrainEvents` RPC.
27type BrainEventStream =
28    Pin<Box<dyn Stream<Item = Result<BrainEventMessage, Status>> + Send + 'static>>;
29
30#[tonic::async_trait]
31impl AgentService for AgentServiceImpl {
32    /// Establish a session and return a session ID.
33    async fn connect(
34        &self,
35        request: Request<ConnectRequest>,
36    ) -> Result<Response<ConnectResponse>, Status> {
37        let req = request.into_inner();
38        let session_id = Uuid::new_v4().to_string();
39
40        tracing::info!(
41            agent_id = %req.agent_id,
42            agent_type = %req.agent_type,
43            session_id = %session_id,
44            "gRPC agent connected"
45        );
46
47        Ok(Response::new(ConnectResponse {
48            session_id,
49            accepted: true,
50            message: format!(
51                "Synapse established — welcome, {} ({}).",
52                req.agent_id, req.agent_type
53            ),
54        }))
55    }
56
57    /// Send a signal and receive a single response.
58    async fn send_signal(
59        &self,
60        request: Request<AgentSignalRequest>,
61    ) -> Result<Response<AgentSignalResponse>, Status> {
62        let principal = self.resolve_principal(&request).await;
63        let req = request.into_inner();
64        let source = SignalSource::parse(Some(&req.source), SignalSource::Grpc);
65
66        let sig = Signal::from_adapter_request(signal::AdapterRequest {
67            source,
68            content: req.content,
69            channel: non_empty(req.channel),
70            sender: non_empty(req.sender),
71            metadata: Some(req.metadata),
72            namespace: non_empty(req.namespace),
73            agent: non_empty(req.agent),
74            session_id: non_empty(req.session_id),
75            default_channel: "grpc".to_string(),
76            default_sender: "agent".to_string(),
77        })
78        .with_principal_opt(principal);
79
80        match self.processor.process(sig).await {
81            Ok(resp) => Ok(Response::new(AgentSignalResponse {
82                signal_id: resp.signal_id.to_string(),
83                status: format!("{:?}", resp.status),
84                response: response_to_string(resp.response),
85                facts_used: resp.memory_context.facts_used as u32,
86                episodes_used: resp.memory_context.episodes_used as u32,
87                session_id: resp.session_id.unwrap_or_default(),
88            })),
89            Err(e) => {
90                tracing::error!(error = %e, "gRPC send_signal processing failed");
91                Err(public_status(&e))
92            }
93        }
94    }
95
96    type ReceiveSignalsStream = SignalUpdateStream;
97
98    /// Subscribe to a stream of updates for a session.
99    async fn receive_signals(
100        &self,
101        request: Request<ReceiveRequest>,
102    ) -> Result<Response<Self::ReceiveSignalsStream>, Status> {
103        let req = request.into_inner();
104        let session_id = req.session_id.clone();
105        let mut events = self.processor.subscribe_events();
106
107        tracing::debug!(session_id = %session_id, "ReceiveSignals stream opened");
108
109        let (tx, rx) = tokio::sync::mpsc::channel(32);
110        let now = chrono::Utc::now().to_rfc3339();
111
112        tokio::spawn(async move {
113            // Send an initial "connected" event
114            if tx
115                .send(Ok(SignalUpdate {
116                    event_type: "connected".to_string(),
117                    content: format!("Session {session_id} active"),
118                    timestamp: now,
119                }))
120                .await
121                .is_err()
122            {
123                return;
124            }
125
126            loop {
127                match events.recv().await {
128                    Ok(event) => {
129                        let content = format!(
130                            "[{}:{}] {}",
131                            event.namespace, event.signal_id, event.response
132                        );
133                        if tx
134                            .send(Ok(SignalUpdate {
135                                event_type: "processed".to_string(),
136                                content,
137                                timestamp: event.timestamp.to_rfc3339(),
138                            }))
139                            .await
140                            .is_err()
141                        {
142                            break;
143                        }
144                    }
145                    Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
146                        if tx
147                            .send(Ok(SignalUpdate {
148                                event_type: "lagged".to_string(),
149                                content: format!("Dropped {skipped} events"),
150                                timestamp: chrono::Utc::now().to_rfc3339(),
151                            }))
152                            .await
153                            .is_err()
154                        {
155                            break;
156                        }
157                    }
158                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
159                }
160            }
161        });
162
163        let stream: SignalUpdateStream = Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx));
164        Ok(Response::new(stream))
165    }
166
167    type BrainEventsStream = BrainEventStream;
168
169    /// Subscribe to the v1.0.0 BrainEvent bus, mirroring the SSE/WS surfaces.
170    /// Each emitted message carries the BrainEvent as JSON in `event_json`.
171    async fn brain_events(
172        &self,
173        request: Request<BrainEventsRequest>,
174    ) -> Result<Response<Self::BrainEventsStream>, Status> {
175        let filter = request.into_inner();
176        let Some(mut rx) = self.processor.subscribe_brain_events() else {
177            return Err(Status::failed_precondition(
178                "observability bus not wired on this SignalProcessor",
179            ));
180        };
181
182        tracing::debug!(?filter.kind, ?filter.tool_id, "BrainEvents stream opened");
183
184        let (tx, out) = tokio::sync::mpsc::channel::<Result<BrainEventMessage, Status>>(64);
185        tokio::spawn(async move {
186            loop {
187                match rx.recv().await {
188                    Ok(ev) => {
189                        if !brain_event_matches(&ev, &filter) {
190                            continue;
191                        }
192                        let event_json = match serde_json::to_string(&ev) {
193                            Ok(s) => s,
194                            Err(e) => {
195                                tracing::warn!("BrainEvents serialise failed: {e}");
196                                continue;
197                            }
198                        };
199                        if tx.send(Ok(BrainEventMessage { event_json })).await.is_err() {
200                            break;
201                        }
202                    }
203                    Err(tokio::sync::broadcast::error::RecvError::Lagged(_n)) => {
204                        // Client is slow; skip silently. (HTTP SSE / WS surfaces
205                        // emit a Lagged marker; gRPC's strongly-typed stream has
206                        // no idle frame — silent drop keeps the protocol clean.)
207                        continue;
208                    }
209                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
210                }
211            }
212        });
213
214        let stream: BrainEventStream = Box::pin(tokio_stream::wrappers::ReceiverStream::new(out));
215        Ok(Response::new(stream))
216    }
217}