use crate::a2a::proto;
use crate::a2a::proto::voice_service_server::{VoiceService, VoiceServiceServer};
use crate::bus::{AgentBus, BusMessage};
use std::pin::Pin;
use std::sync::Arc;
use tokio_stream::Stream;
use tonic::{Request, Response, Status};
type StreamResult<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send>>;
pub struct VoiceServiceImpl {
bus: Arc<AgentBus>,
}
impl VoiceServiceImpl {
pub fn new(bus: Arc<AgentBus>) -> Self {
Self { bus }
}
pub fn into_service(self) -> VoiceServiceServer<Self> {
VoiceServiceServer::new(self)
}
}
#[tonic::async_trait]
impl VoiceService for VoiceServiceImpl {
async fn create_voice_session(
&self,
request: Request<proto::CreateVoiceSessionRequest>,
) -> Result<Response<proto::VoiceSession>, Status> {
let req = request.into_inner();
let voice_id = if req.voice_id.is_empty() {
"960f89fc".to_string()
} else {
req.voice_id
};
let room_name = format!("voice-{}", uuid::Uuid::new_v4());
let handle = self.bus.handle("voice-grpc");
handle.send_voice_session_started(&room_name, &voice_id);
let session = proto::VoiceSession {
room_name,
voice_id,
state: proto::VoiceSessionState::Creating.into(),
agent_state: proto::VoiceAgentState::Initializing.into(),
access_token: String::new(), livekit_url: std::env::var("LIVEKIT_URL").unwrap_or_default(),
created_at: Some(prost_types::Timestamp {
seconds: chrono::Utc::now().timestamp(),
nanos: 0,
}),
};
Ok(Response::new(session))
}
async fn get_voice_session(
&self,
request: Request<proto::GetVoiceSessionRequest>,
) -> Result<Response<proto::VoiceSession>, Status> {
let req = request.into_inner();
if req.room_name.is_empty() {
return Err(Status::invalid_argument("room_name is required"));
}
Ok(Response::new(proto::VoiceSession {
room_name: req.room_name,
voice_id: String::new(),
state: proto::VoiceSessionState::Active.into(),
agent_state: proto::VoiceAgentState::Listening.into(),
access_token: String::new(),
livekit_url: std::env::var("LIVEKIT_URL").unwrap_or_default(),
created_at: None,
}))
}
async fn delete_voice_session(
&self,
request: Request<proto::DeleteVoiceSessionRequest>,
) -> Result<Response<()>, Status> {
let req = request.into_inner();
if req.room_name.is_empty() {
return Err(Status::invalid_argument("room_name is required"));
}
let handle = self.bus.handle("voice-grpc");
handle.send_voice_session_ended(&req.room_name, "user_ended");
Ok(Response::new(()))
}
async fn list_voices(
&self,
_request: Request<proto::ListVoicesRequest>,
) -> Result<Response<proto::ListVoicesResponse>, Status> {
let voices = vec![
proto::VoiceProfile {
voice_id: "960f89fc".into(),
name: "Riley".into(),
language: "english".into(),
sample_duration_seconds: 10.0,
created_at: String::new(),
},
proto::VoiceProfile {
voice_id: "puck".into(),
name: "Puck".into(),
language: "english".into(),
sample_duration_seconds: 0.0,
created_at: String::new(),
},
proto::VoiceProfile {
voice_id: "charon".into(),
name: "Charon".into(),
language: "english".into(),
sample_duration_seconds: 0.0,
created_at: String::new(),
},
proto::VoiceProfile {
voice_id: "kore".into(),
name: "Kore".into(),
language: "english".into(),
sample_duration_seconds: 0.0,
created_at: String::new(),
},
proto::VoiceProfile {
voice_id: "fenrir".into(),
name: "Fenrir".into(),
language: "english".into(),
sample_duration_seconds: 0.0,
created_at: String::new(),
},
proto::VoiceProfile {
voice_id: "aoede".into(),
name: "Aoede".into(),
language: "english".into(),
sample_duration_seconds: 0.0,
created_at: String::new(),
},
];
Ok(Response::new(proto::ListVoicesResponse { voices }))
}
type StreamVoiceEventsStream = StreamResult<proto::VoiceEvent>;
async fn stream_voice_events(
&self,
request: Request<proto::StreamVoiceEventsRequest>,
) -> Result<Response<Self::StreamVoiceEventsStream>, Status> {
let req = request.into_inner();
if req.room_name.is_empty() {
return Err(Status::invalid_argument("room_name is required"));
}
let room_name = req.room_name;
let bus_handle = self.bus.handle("voice-stream");
let rx = bus_handle.into_receiver();
let topic_prefix = format!("voice.{room_name}");
let stream = async_stream::try_stream! {
let mut rx = rx;
loop {
match rx.recv().await {
Ok(envelope) => {
if !envelope.topic.starts_with(&topic_prefix) {
continue;
}
match &envelope.message {
BusMessage::VoiceTranscript { room_name: _, text, role, is_final } => {
let proto_role = match role.as_str() {
"user" => proto::Role::User,
_ => proto::Role::Agent,
};
yield proto::VoiceEvent {
event: Some(proto::voice_event::Event::Transcript(
proto::VoiceTranscriptEvent {
room_name: room_name.clone(),
text: text.clone(),
role: proto_role.into(),
is_final: *is_final,
timestamp: Some(prost_types::Timestamp {
seconds: envelope.timestamp.timestamp(),
nanos: envelope.timestamp.timestamp_subsec_nanos() as i32,
}),
},
)),
};
}
BusMessage::VoiceAgentStateChanged { room_name: _, state } => {
let proto_state = match state.as_str() {
"listening" => proto::VoiceAgentState::Listening,
"thinking" => proto::VoiceAgentState::Thinking,
"speaking" => proto::VoiceAgentState::Speaking,
_ => proto::VoiceAgentState::Initializing,
};
yield proto::VoiceEvent {
event: Some(proto::voice_event::Event::AgentStateChange(
proto::VoiceAgentStateEvent {
room_name: room_name.clone(),
state: proto_state.into(),
},
)),
};
}
BusMessage::VoiceSessionEnded { room_name: _, reason } => {
yield proto::VoiceEvent {
event: Some(proto::voice_event::Event::SessionEnded(
proto::VoiceSessionEndedEvent {
room_name: room_name.clone(),
reason: reason.clone(),
},
)),
};
break;
}
_ => {}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
};
Ok(Response::new(
Box::pin(stream) as Self::StreamVoiceEventsStream
))
}
}