codetether-agent 4.0.0

A2A-native AI coding agent for the CodeTether ecosystem
Documentation
//! gRPC transport for the VoiceService.
//!
//! Implements the `VoiceService` trait generated by tonic-build from
//! `proto/a2a/v1/a2a.proto`.  Bridges voice session lifecycle events
//! to/from the AgentBus so they flow through the same infrastructure
//! as task updates and agent messages.

use crate::a2a::proto;
use crate::a2a::proto::voice_service_server::{VoiceService, VoiceServiceServer};
use crate::bus::{AgentBus, BusMessage};

use std::pin::Pin;
use std::sync::Arc;
use tokio_stream::Stream;
use tonic::{Request, Response, Status};

type StreamResult<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send>>;

/// VoiceService gRPC implementation backed by the AgentBus.
pub struct VoiceServiceImpl {
    bus: Arc<AgentBus>,
}

impl VoiceServiceImpl {
    pub fn new(bus: Arc<AgentBus>) -> Self {
        Self { bus }
    }

    pub fn into_service(self) -> VoiceServiceServer<Self> {
        VoiceServiceServer::new(self)
    }
}

#[tonic::async_trait]
impl VoiceService for VoiceServiceImpl {
    // ── CreateVoiceSession ──────────────────────────────────────────────

    async fn create_voice_session(
        &self,
        request: Request<proto::CreateVoiceSessionRequest>,
    ) -> Result<Response<proto::VoiceSession>, Status> {
        let req = request.into_inner();
        let voice_id = if req.voice_id.is_empty() {
            "960f89fc".to_string()
        } else {
            req.voice_id
        };

        let room_name = format!("voice-{}", uuid::Uuid::new_v4());

        // Publish to bus
        let handle = self.bus.handle("voice-grpc");
        handle.send_voice_session_started(&room_name, &voice_id);

        let session = proto::VoiceSession {
            room_name,
            voice_id,
            state: proto::VoiceSessionState::Creating.into(),
            agent_state: proto::VoiceAgentState::Initializing.into(),
            access_token: String::new(), // populated by LiveKit bridge
            livekit_url: std::env::var("LIVEKIT_URL").unwrap_or_default(),
            created_at: Some(prost_types::Timestamp {
                seconds: chrono::Utc::now().timestamp(),
                nanos: 0,
            }),
        };

        Ok(Response::new(session))
    }

    // ── GetVoiceSession ─────────────────────────────────────────────────

    async fn get_voice_session(
        &self,
        request: Request<proto::GetVoiceSessionRequest>,
    ) -> Result<Response<proto::VoiceSession>, Status> {
        let req = request.into_inner();
        if req.room_name.is_empty() {
            return Err(Status::invalid_argument("room_name is required"));
        }

        // Return a stub session — the Python LiveKit bridge is the source
        // of truth. This endpoint is a pass-through for the dashboard to
        // get the current state via gRPC instead of REST.
        Ok(Response::new(proto::VoiceSession {
            room_name: req.room_name,
            voice_id: String::new(),
            state: proto::VoiceSessionState::Active.into(),
            agent_state: proto::VoiceAgentState::Listening.into(),
            access_token: String::new(),
            livekit_url: std::env::var("LIVEKIT_URL").unwrap_or_default(),
            created_at: None,
        }))
    }

    // ── DeleteVoiceSession ──────────────────────────────────────────────

    async fn delete_voice_session(
        &self,
        request: Request<proto::DeleteVoiceSessionRequest>,
    ) -> Result<Response<()>, Status> {
        let req = request.into_inner();
        if req.room_name.is_empty() {
            return Err(Status::invalid_argument("room_name is required"));
        }

        let handle = self.bus.handle("voice-grpc");
        handle.send_voice_session_ended(&req.room_name, "user_ended");

        Ok(Response::new(()))
    }

    // ── ListVoices ──────────────────────────────────────────────────────

    async fn list_voices(
        &self,
        _request: Request<proto::ListVoicesRequest>,
    ) -> Result<Response<proto::ListVoicesResponse>, Status> {
        // Built-in voice profiles — matches the Python side's voice list
        let voices = vec![
            proto::VoiceProfile {
                voice_id: "960f89fc".into(),
                name: "Riley".into(),
                language: "english".into(),
                sample_duration_seconds: 10.0,
                created_at: String::new(),
            },
            proto::VoiceProfile {
                voice_id: "puck".into(),
                name: "Puck".into(),
                language: "english".into(),
                sample_duration_seconds: 0.0,
                created_at: String::new(),
            },
            proto::VoiceProfile {
                voice_id: "charon".into(),
                name: "Charon".into(),
                language: "english".into(),
                sample_duration_seconds: 0.0,
                created_at: String::new(),
            },
            proto::VoiceProfile {
                voice_id: "kore".into(),
                name: "Kore".into(),
                language: "english".into(),
                sample_duration_seconds: 0.0,
                created_at: String::new(),
            },
            proto::VoiceProfile {
                voice_id: "fenrir".into(),
                name: "Fenrir".into(),
                language: "english".into(),
                sample_duration_seconds: 0.0,
                created_at: String::new(),
            },
            proto::VoiceProfile {
                voice_id: "aoede".into(),
                name: "Aoede".into(),
                language: "english".into(),
                sample_duration_seconds: 0.0,
                created_at: String::new(),
            },
        ];

        Ok(Response::new(proto::ListVoicesResponse { voices }))
    }

    // ── StreamVoiceEvents ───────────────────────────────────────────────

    type StreamVoiceEventsStream = StreamResult<proto::VoiceEvent>;

    async fn stream_voice_events(
        &self,
        request: Request<proto::StreamVoiceEventsRequest>,
    ) -> Result<Response<Self::StreamVoiceEventsStream>, Status> {
        let req = request.into_inner();
        if req.room_name.is_empty() {
            return Err(Status::invalid_argument("room_name is required"));
        }

        let room_name = req.room_name;
        let bus_handle = self.bus.handle("voice-stream");
        let rx = bus_handle.into_receiver();
        let topic_prefix = format!("voice.{room_name}");

        let stream = async_stream::try_stream! {
            let mut rx = rx;
            loop {
                match rx.recv().await {
                    Ok(envelope) => {
                        if !envelope.topic.starts_with(&topic_prefix) {
                            continue;
                        }
                        match &envelope.message {
                            BusMessage::VoiceTranscript { room_name: _, text, role, is_final } => {
                                let proto_role = match role.as_str() {
                                    "user" => proto::Role::User,
                                    _ => proto::Role::Agent,
                                };
                                yield proto::VoiceEvent {
                                    event: Some(proto::voice_event::Event::Transcript(
                                        proto::VoiceTranscriptEvent {
                                            room_name: room_name.clone(),
                                            text: text.clone(),
                                            role: proto_role.into(),
                                            is_final: *is_final,
                                            timestamp: Some(prost_types::Timestamp {
                                                seconds: envelope.timestamp.timestamp(),
                                                nanos: envelope.timestamp.timestamp_subsec_nanos() as i32,
                                            }),
                                        },
                                    )),
                                };
                            }
                            BusMessage::VoiceAgentStateChanged { room_name: _, state } => {
                                let proto_state = match state.as_str() {
                                    "listening" => proto::VoiceAgentState::Listening,
                                    "thinking" => proto::VoiceAgentState::Thinking,
                                    "speaking" => proto::VoiceAgentState::Speaking,
                                    _ => proto::VoiceAgentState::Initializing,
                                };
                                yield proto::VoiceEvent {
                                    event: Some(proto::voice_event::Event::AgentStateChange(
                                        proto::VoiceAgentStateEvent {
                                            room_name: room_name.clone(),
                                            state: proto_state.into(),
                                        },
                                    )),
                                };
                            }
                            BusMessage::VoiceSessionEnded { room_name: _, reason } => {
                                yield proto::VoiceEvent {
                                    event: Some(proto::voice_event::Event::SessionEnded(
                                        proto::VoiceSessionEndedEvent {
                                            room_name: room_name.clone(),
                                            reason: reason.clone(),
                                        },
                                    )),
                                };
                                break;
                            }
                            _ => {}
                        }
                    }
                    Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
                }
            }
        };

        Ok(Response::new(
            Box::pin(stream) as Self::StreamVoiceEventsStream
        ))
    }
}