1use crate::a2a::proto;
9use crate::a2a::proto::voice_service_server::{VoiceService, VoiceServiceServer};
10use crate::bus::{AgentBus, BusMessage};
11
12use std::pin::Pin;
13use std::sync::Arc;
14use tokio_stream::Stream;
15use tonic::{Request, Response, Status};
16
17type StreamResult<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send>>;
18
19pub struct VoiceServiceImpl {
21 bus: Arc<AgentBus>,
22}
23
24impl VoiceServiceImpl {
25 pub fn new(bus: Arc<AgentBus>) -> Self {
26 Self { bus }
27 }
28
29 pub fn into_service(self) -> VoiceServiceServer<Self> {
30 VoiceServiceServer::new(self)
31 }
32}
33
34#[tonic::async_trait]
35impl VoiceService for VoiceServiceImpl {
36 async fn create_voice_session(
39 &self,
40 request: Request<proto::CreateVoiceSessionRequest>,
41 ) -> Result<Response<proto::VoiceSession>, Status> {
42 let req = request.into_inner();
43 let voice_id = if req.voice_id.is_empty() {
44 "960f89fc".to_string()
45 } else {
46 req.voice_id
47 };
48
49 let room_name = format!("voice-{}", uuid::Uuid::new_v4());
50
51 let handle = self.bus.handle("voice-grpc");
53 handle.send_voice_session_started(&room_name, &voice_id);
54
55 let session = proto::VoiceSession {
56 room_name,
57 voice_id,
58 state: proto::VoiceSessionState::Creating.into(),
59 agent_state: proto::VoiceAgentState::Initializing.into(),
60 access_token: String::new(), livekit_url: std::env::var("LIVEKIT_URL").unwrap_or_default(),
62 created_at: Some(prost_types::Timestamp {
63 seconds: chrono::Utc::now().timestamp(),
64 nanos: 0,
65 }),
66 };
67
68 Ok(Response::new(session))
69 }
70
71 async fn get_voice_session(
74 &self,
75 request: Request<proto::GetVoiceSessionRequest>,
76 ) -> Result<Response<proto::VoiceSession>, Status> {
77 let req = request.into_inner();
78 if req.room_name.is_empty() {
79 return Err(Status::invalid_argument("room_name is required"));
80 }
81
82 Ok(Response::new(proto::VoiceSession {
86 room_name: req.room_name,
87 voice_id: String::new(),
88 state: proto::VoiceSessionState::Active.into(),
89 agent_state: proto::VoiceAgentState::Listening.into(),
90 access_token: String::new(),
91 livekit_url: std::env::var("LIVEKIT_URL").unwrap_or_default(),
92 created_at: None,
93 }))
94 }
95
96 async fn delete_voice_session(
99 &self,
100 request: Request<proto::DeleteVoiceSessionRequest>,
101 ) -> Result<Response<()>, Status> {
102 let req = request.into_inner();
103 if req.room_name.is_empty() {
104 return Err(Status::invalid_argument("room_name is required"));
105 }
106
107 let handle = self.bus.handle("voice-grpc");
108 handle.send_voice_session_ended(&req.room_name, "user_ended");
109
110 Ok(Response::new(()))
111 }
112
113 async fn list_voices(
116 &self,
117 _request: Request<proto::ListVoicesRequest>,
118 ) -> Result<Response<proto::ListVoicesResponse>, Status> {
119 let voices = vec![
121 proto::VoiceProfile {
122 voice_id: "960f89fc".into(),
123 name: "Riley".into(),
124 language: "english".into(),
125 sample_duration_seconds: 10.0,
126 created_at: String::new(),
127 },
128 proto::VoiceProfile {
129 voice_id: "puck".into(),
130 name: "Puck".into(),
131 language: "english".into(),
132 sample_duration_seconds: 0.0,
133 created_at: String::new(),
134 },
135 proto::VoiceProfile {
136 voice_id: "charon".into(),
137 name: "Charon".into(),
138 language: "english".into(),
139 sample_duration_seconds: 0.0,
140 created_at: String::new(),
141 },
142 proto::VoiceProfile {
143 voice_id: "kore".into(),
144 name: "Kore".into(),
145 language: "english".into(),
146 sample_duration_seconds: 0.0,
147 created_at: String::new(),
148 },
149 proto::VoiceProfile {
150 voice_id: "fenrir".into(),
151 name: "Fenrir".into(),
152 language: "english".into(),
153 sample_duration_seconds: 0.0,
154 created_at: String::new(),
155 },
156 proto::VoiceProfile {
157 voice_id: "aoede".into(),
158 name: "Aoede".into(),
159 language: "english".into(),
160 sample_duration_seconds: 0.0,
161 created_at: String::new(),
162 },
163 ];
164
165 Ok(Response::new(proto::ListVoicesResponse { voices }))
166 }
167
168 type StreamVoiceEventsStream = StreamResult<proto::VoiceEvent>;
171
172 async fn stream_voice_events(
173 &self,
174 request: Request<proto::StreamVoiceEventsRequest>,
175 ) -> Result<Response<Self::StreamVoiceEventsStream>, Status> {
176 let req = request.into_inner();
177 if req.room_name.is_empty() {
178 return Err(Status::invalid_argument("room_name is required"));
179 }
180
181 let room_name = req.room_name;
182 let bus_handle = self.bus.handle("voice-stream");
183 let rx = bus_handle.into_receiver();
184 let topic_prefix = format!("voice.{room_name}");
185
186 let stream = async_stream::try_stream! {
187 let mut rx = rx;
188 loop {
189 match rx.recv().await {
190 Ok(envelope) => {
191 if !envelope.topic.starts_with(&topic_prefix) {
192 continue;
193 }
194 match &envelope.message {
195 BusMessage::VoiceTranscript { room_name: _, text, role, is_final } => {
196 let proto_role = match role.as_str() {
197 "user" => proto::Role::User,
198 _ => proto::Role::Agent,
199 };
200 yield proto::VoiceEvent {
201 event: Some(proto::voice_event::Event::Transcript(
202 proto::VoiceTranscriptEvent {
203 room_name: room_name.clone(),
204 text: text.clone(),
205 role: proto_role.into(),
206 is_final: *is_final,
207 timestamp: Some(prost_types::Timestamp {
208 seconds: envelope.timestamp.timestamp(),
209 nanos: envelope.timestamp.timestamp_subsec_nanos() as i32,
210 }),
211 },
212 )),
213 };
214 }
215 BusMessage::VoiceAgentStateChanged { room_name: _, state } => {
216 let proto_state = match state.as_str() {
217 "listening" => proto::VoiceAgentState::Listening,
218 "thinking" => proto::VoiceAgentState::Thinking,
219 "speaking" => proto::VoiceAgentState::Speaking,
220 _ => proto::VoiceAgentState::Initializing,
221 };
222 yield proto::VoiceEvent {
223 event: Some(proto::voice_event::Event::AgentStateChange(
224 proto::VoiceAgentStateEvent {
225 room_name: room_name.clone(),
226 state: proto_state.into(),
227 },
228 )),
229 };
230 }
231 BusMessage::VoiceSessionEnded { room_name: _, reason } => {
232 yield proto::VoiceEvent {
233 event: Some(proto::voice_event::Event::SessionEnded(
234 proto::VoiceSessionEndedEvent {
235 room_name: room_name.clone(),
236 reason: reason.clone(),
237 },
238 )),
239 };
240 break;
241 }
242 _ => {}
243 }
244 }
245 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
246 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
247 }
248 }
249 };
250
251 Ok(Response::new(
252 Box::pin(stream) as Self::StreamVoiceEventsStream
253 ))
254 }
255}