Skip to main content

voice_echo/
lib.rs

1//! voice-echo — Voice interface for AI entities via Twilio.
2//!
3//! This crate provides a complete voice pipeline: Twilio WebSocket audio streaming,
4//! voice activity detection, speech-to-text (Groq Whisper), LLM provider,
5//! and text-to-speech (Inworld). It can be used as a standalone binary or as a
6//! library dependency in echo-system.
7
8pub mod api;
9pub mod config;
10pub mod discord;
11pub mod greeting;
12pub mod pipeline;
13pub mod registry;
14pub mod twilio;
15
16use std::any::Any;
17use std::collections::HashMap;
18use std::future::Future;
19use std::net::SocketAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use axum::routing::{get, post};
24use axum::Router;
25use pulse_system_types::llm::LmProvider;
26use pulse_system_types::plugin::{Plugin, PluginContext, PluginResult, PluginRole};
27use pulse_system_types::{HealthStatus, PluginMeta, SetupPrompt};
28use tokio::sync::Mutex;
29use tower_http::trace::TraceLayer;
30
31use config::Config;
32use pipeline::audio;
33use pipeline::bridge::BridgeClient;
34use pipeline::conversation::ConversationManager;
35use pipeline::stt::SttClient;
36use pipeline::tts::TtsClient;
37use registry::CallRegistry;
38use twilio::outbound::TwilioClient;
39
40/// How LLM communication is routed for a call.
41#[derive(Clone)]
42pub enum Brain {
43    /// Direct LLM invocation via provider (plugin mode).
44    Local(Arc<ConversationManager>),
45    /// Forwarded to bridge-echo multiplexer.
46    Bridge(Arc<BridgeClient>),
47}
48
49/// Metadata for an outbound call — context and reason injected into the first prompt.
50pub struct CallMeta {
51    pub context: Option<String>,
52    pub reason: Option<String>,
53}
54
55/// Shared application state accessible from all handlers.
56#[derive(Clone)]
57pub struct AppState {
58    pub config: Config,
59    pub stt: Arc<SttClient>,
60    pub tts: Arc<TtsClient>,
61    pub brain: Brain,
62    pub twilio: Arc<TwilioClient>,
63    pub call_registry: CallRegistry,
64    /// Pre-converted mu-law hold music data, if configured.
65    pub hold_music: Option<Arc<Vec<u8>>>,
66    /// Metadata for outbound calls, keyed by call_sid.
67    /// Consumed on first utterance so the LLM knows why it called.
68    pub call_metas: Arc<Mutex<HashMap<String, CallMeta>>>,
69}
70
71/// The voice-echo plugin. Manages the voice pipeline lifecycle.
72pub struct VoiceEcho {
73    config: Config,
74    provider: Option<Arc<dyn LmProvider>>,
75    state: Option<AppState>,
76    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
77}
78
79impl VoiceEcho {
80    /// Create a new VoiceEcho instance from config.
81    pub fn new(config: Config) -> Self {
82        Self {
83            config,
84            provider: None,
85            state: None,
86            shutdown_tx: None,
87        }
88    }
89
90    /// Start the voice server. Builds state, binds the listener, and serves.
91    /// This blocks until the server is shut down via `stop()`.
92    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
93        let config = &self.config;
94
95        // Load hold music if configured
96        let hold_music = config.hold_music.as_ref().and_then(|hm| {
97            let path = std::path::Path::new(&hm.file);
98            match audio::load_wav_as_mulaw(path, hm.volume) {
99                Ok(data) => {
100                    tracing::info!(
101                        path = %hm.file,
102                        volume = hm.volume,
103                        mulaw_bytes = data.len(),
104                        "Loaded hold music"
105                    );
106                    Some(Arc::new(data))
107                }
108                Err(e) => {
109                    tracing::warn!(path = %hm.file, "Failed to load hold music: {e}");
110                    None
111                }
112            }
113        });
114
115        // Build system prompt from SELF.md if configured
116        let system_prompt = config
117            .llm
118            .self_path
119            .as_ref()
120            .and_then(|path| std::fs::read_to_string(path).ok())
121            .unwrap_or_default();
122
123        // Determine brain mode
124        let brain = if let Some(ref bridge_url) = config.llm.bridge_url {
125            Brain::Bridge(Arc::new(BridgeClient::new(
126                bridge_url,
127                config.identity.caller_name.clone(),
128            )))
129        } else if let Some(ref provider) = self.provider {
130            Brain::Local(Arc::new(ConversationManager::new(
131                Arc::clone(provider),
132                system_prompt,
133                config.llm.session_timeout_secs,
134                config.llm.max_response_tokens,
135            )))
136        } else {
137            return Err("No LLM provider available. Set bridge_url or run as a plugin.".into());
138        };
139
140        // Build shared state
141        let state = AppState {
142            stt: Arc::new(SttClient::new(
143                config.groq.api_key.clone(),
144                config.groq.model.clone(),
145            )),
146            tts: Arc::new(TtsClient::new(
147                config.inworld.api_key.clone(),
148                config.inworld.voice_id.clone(),
149                config.inworld.model.clone(),
150            )),
151            brain,
152            twilio: Arc::new(TwilioClient::new(
153                &config.twilio,
154                &config.server.external_url,
155            )),
156            call_registry: CallRegistry::new(),
157            config: config.clone(),
158            hold_music,
159            call_metas: Arc::new(Mutex::new(HashMap::new())),
160        };
161
162        self.state = Some(state.clone());
163
164        let app = self.build_router(state);
165
166        let addr: SocketAddr = format!("{}:{}", config.server.host, config.server.port)
167            .parse()
168            .map_err(|e| format!("Invalid server address: {e}"))?;
169
170        tracing::info!(%addr, "Listening");
171
172        let listener = tokio::net::TcpListener::bind(addr).await?;
173
174        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
175        self.shutdown_tx = Some(shutdown_tx);
176
177        axum::serve(listener, app)
178            .with_graceful_shutdown(async {
179                let _ = shutdown_rx.await;
180            })
181            .await?;
182
183        Ok(())
184    }
185
186    /// Stop the voice server gracefully.
187    pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
188        if let Some(tx) = self.shutdown_tx.take() {
189            let _ = tx.send(());
190        }
191        self.state = None;
192        Ok(())
193    }
194
195    /// Report health status.
196    fn health_check(&self) -> HealthStatus {
197        match &self.state {
198            Some(_) => HealthStatus::Healthy,
199            None => HealthStatus::Down("not started".into()),
200        }
201    }
202
203    /// Return the Axum router with all voice-echo routes.
204    /// Returns `None` if the server hasn't been started (no state).
205    pub fn routes(&self) -> Option<Router> {
206        let state = self.state.as_ref()?;
207        Some(self.build_router(state.clone()))
208    }
209
210    /// Configuration prompts for the echo-system init wizard.
211    fn get_setup_prompts() -> Vec<SetupPrompt> {
212        vec![
213            SetupPrompt {
214                key: "external_url".into(),
215                question: "External URL (where Twilio can reach this server):".into(),
216                required: true,
217                secret: false,
218                default: None,
219            },
220            SetupPrompt {
221                key: "twilio_account_sid".into(),
222                question: "Twilio Account SID:".into(),
223                required: true,
224                secret: false,
225                default: None,
226            },
227            SetupPrompt {
228                key: "twilio_auth_token".into(),
229                question: "Twilio Auth Token:".into(),
230                required: true,
231                secret: true,
232                default: None,
233            },
234            SetupPrompt {
235                key: "twilio_phone_number".into(),
236                question: "Twilio Phone Number (E.164):".into(),
237                required: true,
238                secret: false,
239                default: None,
240            },
241            SetupPrompt {
242                key: "groq_api_key".into(),
243                question: "Groq API Key (for Whisper STT):".into(),
244                required: true,
245                secret: true,
246                default: None,
247            },
248            SetupPrompt {
249                key: "inworld_api_key".into(),
250                question: "Inworld API Key (for TTS):".into(),
251                required: true,
252                secret: true,
253                default: None,
254            },
255            SetupPrompt {
256                key: "inworld_voice_id".into(),
257                question: "Inworld Voice ID:".into(),
258                required: false,
259                secret: false,
260                default: Some("Olivia".into()),
261            },
262            SetupPrompt {
263                key: "api_token".into(),
264                question: "API Token (for outbound call auth):".into(),
265                required: false,
266                secret: true,
267                default: None,
268            },
269        ]
270    }
271
272    fn build_router(&self, state: AppState) -> Router {
273        Router::new()
274            .route("/twilio/voice", post(twilio::webhook::handle_voice))
275            .route(
276                "/twilio/voice/outbound",
277                post(twilio::webhook::handle_voice_outbound),
278            )
279            .route("/twilio/media", get(twilio::media::handle_media_upgrade))
280            .route("/api/call", post(api::outbound::handle_call))
281            .route("/api/inject", post(api::inject::handle_inject))
282            .route(
283                "/discord-stream",
284                get(discord::stream::handle_discord_upgrade),
285            )
286            .route("/health", get(health_handler))
287            .layer(TraceLayer::new_for_http())
288            .with_state(state)
289    }
290}
291
292/// Factory function — creates a fully initialized voice-echo plugin.
293pub async fn create(
294    config: &serde_json::Value,
295    ctx: &PluginContext,
296) -> Result<Box<dyn Plugin>, Box<dyn std::error::Error + Send + Sync>> {
297    let cfg: Config = serde_json::from_value(config.clone())?;
298    let mut voice = VoiceEcho::new(cfg);
299    voice.provider = Some(Arc::clone(&ctx.provider));
300    Ok(Box::new(voice))
301}
302
303impl Plugin for VoiceEcho {
304    fn meta(&self) -> PluginMeta {
305        PluginMeta {
306            name: "voice-echo".into(),
307            version: env!("CARGO_PKG_VERSION").into(),
308            description: "Voice interface via Twilio".into(),
309        }
310    }
311
312    fn role(&self) -> PluginRole {
313        PluginRole::Interface
314    }
315
316    fn start(&mut self) -> PluginResult<'_> {
317        Box::pin(async move { self.start().await })
318    }
319
320    fn stop(&mut self) -> PluginResult<'_> {
321        Box::pin(async move { self.stop().await })
322    }
323
324    fn health(&self) -> Pin<Box<dyn Future<Output = HealthStatus> + Send + '_>> {
325        Box::pin(async move { self.health_check() })
326    }
327
328    fn setup_prompts(&self) -> Vec<SetupPrompt> {
329        Self::get_setup_prompts()
330    }
331
332    fn as_any(&self) -> &dyn Any {
333        self
334    }
335}
336
337async fn health_handler() -> &'static str {
338    "ok"
339}