1pub mod api;
9pub mod config;
10pub mod discord;
11pub mod greeting;
12pub mod pipeline;
13pub mod registry;
14pub mod twilio;
15
16use std::any::Any;
17use std::collections::HashMap;
18use std::future::Future;
19use std::net::SocketAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use axum::routing::{get, post};
24use axum::Router;
25use pulse_system_types::llm::LmProvider;
26use pulse_system_types::plugin::{Plugin, PluginContext, PluginResult, PluginRole};
27use pulse_system_types::{HealthStatus, PluginMeta, SetupPrompt};
28use tokio::sync::Mutex;
29use tower_http::trace::TraceLayer;
30
31use config::Config;
32use pipeline::audio;
33use pipeline::bridge::BridgeClient;
34use pipeline::conversation::ConversationManager;
35use pipeline::stt::SttClient;
36use pipeline::tts::TtsClient;
37use registry::CallRegistry;
38use twilio::outbound::TwilioClient;
39
40#[derive(Clone)]
42pub enum Brain {
43 Local(Arc<ConversationManager>),
45 Bridge(Arc<BridgeClient>),
47}
48
49pub struct CallMeta {
51 pub context: Option<String>,
52 pub reason: Option<String>,
53}
54
55#[derive(Clone)]
57pub struct AppState {
58 pub config: Config,
59 pub stt: Arc<SttClient>,
60 pub tts: Arc<TtsClient>,
61 pub brain: Brain,
62 pub twilio: Arc<TwilioClient>,
63 pub call_registry: CallRegistry,
64 pub hold_music: Option<Arc<Vec<u8>>>,
66 pub call_metas: Arc<Mutex<HashMap<String, CallMeta>>>,
69}
70
71pub struct VoiceEcho {
73 config: Config,
74 provider: Option<Arc<dyn LmProvider>>,
75 state: Option<AppState>,
76 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
77}
78
79impl VoiceEcho {
80 pub fn new(config: Config) -> Self {
82 Self {
83 config,
84 provider: None,
85 state: None,
86 shutdown_tx: None,
87 }
88 }
89
90 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
93 let config = &self.config;
94
95 let hold_music = config.hold_music.as_ref().and_then(|hm| {
97 let path = std::path::Path::new(&hm.file);
98 match audio::load_wav_as_mulaw(path, hm.volume) {
99 Ok(data) => {
100 tracing::info!(
101 path = %hm.file,
102 volume = hm.volume,
103 mulaw_bytes = data.len(),
104 "Loaded hold music"
105 );
106 Some(Arc::new(data))
107 }
108 Err(e) => {
109 tracing::warn!(path = %hm.file, "Failed to load hold music: {e}");
110 None
111 }
112 }
113 });
114
115 let system_prompt = config
117 .llm
118 .self_path
119 .as_ref()
120 .and_then(|path| std::fs::read_to_string(path).ok())
121 .unwrap_or_default();
122
123 let brain = if let Some(ref bridge_url) = config.llm.bridge_url {
125 Brain::Bridge(Arc::new(BridgeClient::new(
126 bridge_url,
127 config.identity.caller_name.clone(),
128 )))
129 } else if let Some(ref provider) = self.provider {
130 Brain::Local(Arc::new(ConversationManager::new(
131 Arc::clone(provider),
132 system_prompt,
133 config.llm.session_timeout_secs,
134 config.llm.max_response_tokens,
135 )))
136 } else {
137 return Err("No LLM provider available. Set bridge_url or run as a plugin.".into());
138 };
139
140 let state = AppState {
142 stt: Arc::new(SttClient::new(
143 config.groq.api_key.clone(),
144 config.groq.model.clone(),
145 )),
146 tts: Arc::new(TtsClient::new(
147 config.inworld.api_key.clone(),
148 config.inworld.voice_id.clone(),
149 config.inworld.model.clone(),
150 )),
151 brain,
152 twilio: Arc::new(TwilioClient::new(
153 &config.twilio,
154 &config.server.external_url,
155 )),
156 call_registry: CallRegistry::new(),
157 config: config.clone(),
158 hold_music,
159 call_metas: Arc::new(Mutex::new(HashMap::new())),
160 };
161
162 self.state = Some(state.clone());
163
164 let app = self.build_router(state);
165
166 let addr: SocketAddr = format!("{}:{}", config.server.host, config.server.port)
167 .parse()
168 .map_err(|e| format!("Invalid server address: {e}"))?;
169
170 tracing::info!(%addr, "Listening");
171
172 let listener = tokio::net::TcpListener::bind(addr).await?;
173
174 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
175 self.shutdown_tx = Some(shutdown_tx);
176
177 axum::serve(listener, app)
178 .with_graceful_shutdown(async {
179 let _ = shutdown_rx.await;
180 })
181 .await?;
182
183 Ok(())
184 }
185
186 pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
188 if let Some(tx) = self.shutdown_tx.take() {
189 let _ = tx.send(());
190 }
191 self.state = None;
192 Ok(())
193 }
194
195 fn health_check(&self) -> HealthStatus {
197 match &self.state {
198 Some(_) => HealthStatus::Healthy,
199 None => HealthStatus::Down("not started".into()),
200 }
201 }
202
203 pub fn routes(&self) -> Option<Router> {
206 let state = self.state.as_ref()?;
207 Some(self.build_router(state.clone()))
208 }
209
210 fn get_setup_prompts() -> Vec<SetupPrompt> {
212 vec![
213 SetupPrompt {
214 key: "external_url".into(),
215 question: "External URL (where Twilio can reach this server):".into(),
216 required: true,
217 secret: false,
218 default: None,
219 },
220 SetupPrompt {
221 key: "twilio_account_sid".into(),
222 question: "Twilio Account SID:".into(),
223 required: true,
224 secret: false,
225 default: None,
226 },
227 SetupPrompt {
228 key: "twilio_auth_token".into(),
229 question: "Twilio Auth Token:".into(),
230 required: true,
231 secret: true,
232 default: None,
233 },
234 SetupPrompt {
235 key: "twilio_phone_number".into(),
236 question: "Twilio Phone Number (E.164):".into(),
237 required: true,
238 secret: false,
239 default: None,
240 },
241 SetupPrompt {
242 key: "groq_api_key".into(),
243 question: "Groq API Key (for Whisper STT):".into(),
244 required: true,
245 secret: true,
246 default: None,
247 },
248 SetupPrompt {
249 key: "inworld_api_key".into(),
250 question: "Inworld API Key (for TTS):".into(),
251 required: true,
252 secret: true,
253 default: None,
254 },
255 SetupPrompt {
256 key: "inworld_voice_id".into(),
257 question: "Inworld Voice ID:".into(),
258 required: false,
259 secret: false,
260 default: Some("Olivia".into()),
261 },
262 SetupPrompt {
263 key: "api_token".into(),
264 question: "API Token (for outbound call auth):".into(),
265 required: false,
266 secret: true,
267 default: None,
268 },
269 ]
270 }
271
272 fn build_router(&self, state: AppState) -> Router {
273 Router::new()
274 .route("/twilio/voice", post(twilio::webhook::handle_voice))
275 .route(
276 "/twilio/voice/outbound",
277 post(twilio::webhook::handle_voice_outbound),
278 )
279 .route("/twilio/media", get(twilio::media::handle_media_upgrade))
280 .route("/api/call", post(api::outbound::handle_call))
281 .route("/api/inject", post(api::inject::handle_inject))
282 .route(
283 "/discord-stream",
284 get(discord::stream::handle_discord_upgrade),
285 )
286 .route("/health", get(health_handler))
287 .layer(TraceLayer::new_for_http())
288 .with_state(state)
289 }
290}
291
292pub async fn create(
294 config: &serde_json::Value,
295 ctx: &PluginContext,
296) -> Result<Box<dyn Plugin>, Box<dyn std::error::Error + Send + Sync>> {
297 let cfg: Config = serde_json::from_value(config.clone())?;
298 let mut voice = VoiceEcho::new(cfg);
299 voice.provider = Some(Arc::clone(&ctx.provider));
300 Ok(Box::new(voice))
301}
302
303impl Plugin for VoiceEcho {
304 fn meta(&self) -> PluginMeta {
305 PluginMeta {
306 name: "voice-echo".into(),
307 version: env!("CARGO_PKG_VERSION").into(),
308 description: "Voice interface via Twilio".into(),
309 }
310 }
311
312 fn role(&self) -> PluginRole {
313 PluginRole::Interface
314 }
315
316 fn start(&mut self) -> PluginResult<'_> {
317 Box::pin(async move { self.start().await })
318 }
319
320 fn stop(&mut self) -> PluginResult<'_> {
321 Box::pin(async move { self.stop().await })
322 }
323
324 fn health(&self) -> Pin<Box<dyn Future<Output = HealthStatus> + Send + '_>> {
325 Box::pin(async move { self.health_check() })
326 }
327
328 fn setup_prompts(&self) -> Vec<SetupPrompt> {
329 Self::get_setup_prompts()
330 }
331
332 fn as_any(&self) -> &dyn Any {
333 self
334 }
335}
336
337async fn health_handler() -> &'static str {
338 "ok"
339}