Skip to main content

codetether_agent/a2a/
voice_grpc.rs

1//! gRPC transport for the VoiceService.
2//!
3//! Implements the `VoiceService` trait generated by tonic-build from
4//! `proto/a2a/v1/a2a.proto`.  Bridges voice session lifecycle events
5//! to/from the AgentBus so they flow through the same infrastructure
6//! as task updates and agent messages.
7
8use crate::a2a::proto;
9use crate::a2a::proto::voice_service_server::{VoiceService, VoiceServiceServer};
10use crate::bus::{AgentBus, BusMessage};
11
12use std::pin::Pin;
13use std::sync::Arc;
14use tokio_stream::Stream;
15use tonic::{Request, Response, Status};
16
17type StreamResult<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send>>;
18
19/// VoiceService gRPC implementation backed by the AgentBus.
20pub struct VoiceServiceImpl {
21    bus: Arc<AgentBus>,
22}
23
24impl VoiceServiceImpl {
25    pub fn new(bus: Arc<AgentBus>) -> Self {
26        Self { bus }
27    }
28
29    pub fn into_service(self) -> VoiceServiceServer<Self> {
30        VoiceServiceServer::new(self)
31    }
32}
33
34#[tonic::async_trait]
35impl VoiceService for VoiceServiceImpl {
36    // ── CreateVoiceSession ──────────────────────────────────────────────
37
38    async fn create_voice_session(
39        &self,
40        request: Request<proto::CreateVoiceSessionRequest>,
41    ) -> Result<Response<proto::VoiceSession>, Status> {
42        let req = request.into_inner();
43        let voice_id = if req.voice_id.is_empty() {
44            "960f89fc".to_string()
45        } else {
46            req.voice_id
47        };
48
49        let room_name = format!("voice-{}", uuid::Uuid::new_v4());
50
51        // Publish to bus
52        let handle = self.bus.handle("voice-grpc");
53        handle.send_voice_session_started(&room_name, &voice_id);
54
55        let session = proto::VoiceSession {
56            room_name,
57            voice_id,
58            state: proto::VoiceSessionState::Creating.into(),
59            agent_state: proto::VoiceAgentState::Initializing.into(),
60            access_token: String::new(), // populated by LiveKit bridge
61            livekit_url: std::env::var("LIVEKIT_URL").unwrap_or_default(),
62            created_at: Some(prost_types::Timestamp {
63                seconds: chrono::Utc::now().timestamp(),
64                nanos: 0,
65            }),
66        };
67
68        Ok(Response::new(session))
69    }
70
71    // ── GetVoiceSession ─────────────────────────────────────────────────
72
73    async fn get_voice_session(
74        &self,
75        request: Request<proto::GetVoiceSessionRequest>,
76    ) -> Result<Response<proto::VoiceSession>, Status> {
77        let req = request.into_inner();
78        if req.room_name.is_empty() {
79            return Err(Status::invalid_argument("room_name is required"));
80        }
81
82        // Return a stub session — the Python LiveKit bridge is the source
83        // of truth. This endpoint is a pass-through for the dashboard to
84        // get the current state via gRPC instead of REST.
85        Ok(Response::new(proto::VoiceSession {
86            room_name: req.room_name,
87            voice_id: String::new(),
88            state: proto::VoiceSessionState::Active.into(),
89            agent_state: proto::VoiceAgentState::Listening.into(),
90            access_token: String::new(),
91            livekit_url: std::env::var("LIVEKIT_URL").unwrap_or_default(),
92            created_at: None,
93        }))
94    }
95
96    // ── DeleteVoiceSession ──────────────────────────────────────────────
97
98    async fn delete_voice_session(
99        &self,
100        request: Request<proto::DeleteVoiceSessionRequest>,
101    ) -> Result<Response<()>, Status> {
102        let req = request.into_inner();
103        if req.room_name.is_empty() {
104            return Err(Status::invalid_argument("room_name is required"));
105        }
106
107        let handle = self.bus.handle("voice-grpc");
108        handle.send_voice_session_ended(&req.room_name, "user_ended");
109
110        Ok(Response::new(()))
111    }
112
113    // ── ListVoices ──────────────────────────────────────────────────────
114
115    async fn list_voices(
116        &self,
117        _request: Request<proto::ListVoicesRequest>,
118    ) -> Result<Response<proto::ListVoicesResponse>, Status> {
119        // Built-in voice profiles — matches the Python side's voice list
120        let voices = vec![
121            proto::VoiceProfile {
122                voice_id: "960f89fc".into(),
123                name: "Riley".into(),
124                language: "english".into(),
125                sample_duration_seconds: 10.0,
126                created_at: String::new(),
127            },
128            proto::VoiceProfile {
129                voice_id: "puck".into(),
130                name: "Puck".into(),
131                language: "english".into(),
132                sample_duration_seconds: 0.0,
133                created_at: String::new(),
134            },
135            proto::VoiceProfile {
136                voice_id: "charon".into(),
137                name: "Charon".into(),
138                language: "english".into(),
139                sample_duration_seconds: 0.0,
140                created_at: String::new(),
141            },
142            proto::VoiceProfile {
143                voice_id: "kore".into(),
144                name: "Kore".into(),
145                language: "english".into(),
146                sample_duration_seconds: 0.0,
147                created_at: String::new(),
148            },
149            proto::VoiceProfile {
150                voice_id: "fenrir".into(),
151                name: "Fenrir".into(),
152                language: "english".into(),
153                sample_duration_seconds: 0.0,
154                created_at: String::new(),
155            },
156            proto::VoiceProfile {
157                voice_id: "aoede".into(),
158                name: "Aoede".into(),
159                language: "english".into(),
160                sample_duration_seconds: 0.0,
161                created_at: String::new(),
162            },
163        ];
164
165        Ok(Response::new(proto::ListVoicesResponse { voices }))
166    }
167
168    // ── StreamVoiceEvents ───────────────────────────────────────────────
169
170    type StreamVoiceEventsStream = StreamResult<proto::VoiceEvent>;
171
172    async fn stream_voice_events(
173        &self,
174        request: Request<proto::StreamVoiceEventsRequest>,
175    ) -> Result<Response<Self::StreamVoiceEventsStream>, Status> {
176        let req = request.into_inner();
177        if req.room_name.is_empty() {
178            return Err(Status::invalid_argument("room_name is required"));
179        }
180
181        let room_name = req.room_name;
182        let bus_handle = self.bus.handle("voice-stream");
183        let rx = bus_handle.into_receiver();
184        let topic_prefix = format!("voice.{room_name}");
185
186        let stream = async_stream::try_stream! {
187            let mut rx = rx;
188            loop {
189                match rx.recv().await {
190                    Ok(envelope) => {
191                        if !envelope.topic.starts_with(&topic_prefix) {
192                            continue;
193                        }
194                        match &envelope.message {
195                            BusMessage::VoiceTranscript { room_name: _, text, role, is_final } => {
196                                let proto_role = match role.as_str() {
197                                    "user" => proto::Role::User,
198                                    _ => proto::Role::Agent,
199                                };
200                                yield proto::VoiceEvent {
201                                    event: Some(proto::voice_event::Event::Transcript(
202                                        proto::VoiceTranscriptEvent {
203                                            room_name: room_name.clone(),
204                                            text: text.clone(),
205                                            role: proto_role.into(),
206                                            is_final: *is_final,
207                                            timestamp: Some(prost_types::Timestamp {
208                                                seconds: envelope.timestamp.timestamp(),
209                                                nanos: envelope.timestamp.timestamp_subsec_nanos() as i32,
210                                            }),
211                                        },
212                                    )),
213                                };
214                            }
215                            BusMessage::VoiceAgentStateChanged { room_name: _, state } => {
216                                let proto_state = match state.as_str() {
217                                    "listening" => proto::VoiceAgentState::Listening,
218                                    "thinking" => proto::VoiceAgentState::Thinking,
219                                    "speaking" => proto::VoiceAgentState::Speaking,
220                                    _ => proto::VoiceAgentState::Initializing,
221                                };
222                                yield proto::VoiceEvent {
223                                    event: Some(proto::voice_event::Event::AgentStateChange(
224                                        proto::VoiceAgentStateEvent {
225                                            room_name: room_name.clone(),
226                                            state: proto_state.into(),
227                                        },
228                                    )),
229                                };
230                            }
231                            BusMessage::VoiceSessionEnded { room_name: _, reason } => {
232                                yield proto::VoiceEvent {
233                                    event: Some(proto::voice_event::Event::SessionEnded(
234                                        proto::VoiceSessionEndedEvent {
235                                            room_name: room_name.clone(),
236                                            reason: reason.clone(),
237                                        },
238                                    )),
239                                };
240                                break;
241                            }
242                            _ => {}
243                        }
244                    }
245                    Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
246                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
247                }
248            }
249        };
250
251        Ok(Response::new(
252            Box::pin(stream) as Self::StreamVoiceEventsStream
253        ))
254    }
255}