1use std::collections::HashMap;
9use std::fmt;
10use std::sync::Arc;
11
12use anyhow::{Result, bail};
13use schemars::JsonSchema;
14use serde::{Deserialize, Serialize};
15use tokio::sync::{Mutex, mpsc};
16use tracing::{debug, info, warn};
17
18use super::traits::{Channel, ChannelMessage, SendMessage};
19
20#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
24#[serde(rename_all = "lowercase")]
25pub enum VoiceProvider {
26 #[default]
27 Twilio,
28 Telnyx,
29 Plivo,
30}
31
32impl fmt::Display for VoiceProvider {
33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34 match self {
35 Self::Twilio => write!(f, "twilio"),
36 Self::Telnyx => write!(f, "telnyx"),
37 Self::Plivo => write!(f, "plivo"),
38 }
39 }
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
44pub struct VoiceCallConfig {
45 #[serde(default)]
47 pub provider: VoiceProvider,
48 pub account_id: String,
50 pub auth_token: String,
52 pub from_number: String,
54 #[serde(default = "default_webhook_port")]
56 pub webhook_port: u16,
57 #[serde(default = "default_true")]
59 pub require_outbound_approval: bool,
60 #[serde(default = "default_true")]
62 pub transcription_logging: bool,
63 #[serde(default)]
65 pub tts_voice: Option<String>,
66 #[serde(default = "default_max_call_duration")]
68 pub max_call_duration_secs: u64,
69 #[serde(default)]
72 pub webhook_base_url: Option<String>,
73}
74
75fn default_webhook_port() -> u16 {
76 8090
77}
78
79fn default_true() -> bool {
80 true
81}
82
83fn default_max_call_duration() -> u64 {
84 3600
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
91#[serde(rename_all = "snake_case")]
92pub enum CallState {
93 Ringing,
95 InProgress,
97 Completed,
99 Failed,
101 HungUp,
103 PendingApproval,
105}
106
107impl fmt::Display for CallState {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 match self {
110 Self::Ringing => write!(f, "ringing"),
111 Self::InProgress => write!(f, "in_progress"),
112 Self::Completed => write!(f, "completed"),
113 Self::Failed => write!(f, "failed"),
114 Self::HungUp => write!(f, "hung_up"),
115 Self::PendingApproval => write!(f, "pending_approval"),
116 }
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
122#[serde(rename_all = "lowercase")]
123pub enum CallDirection {
124 Inbound,
125 Outbound,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct CallRecord {
131 pub call_id: String,
133 pub direction: CallDirection,
135 pub remote_number: String,
137 pub local_number: String,
139 pub state: CallState,
141 pub started_at: String,
143 pub ended_at: Option<String>,
145 pub duration_secs: u64,
147 pub transcript: Vec<TranscriptEntry>,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct TranscriptEntry {
154 pub speaker: String,
156 pub text: String,
158 pub timestamp: String,
160}
161
162pub struct VoiceCallChannel {
166 config: VoiceCallConfig,
167 active_calls: Arc<Mutex<HashMap<String, CallRecord>>>,
168 client: reqwest::Client,
169}
170
171impl VoiceCallChannel {
172 pub fn new(config: VoiceCallConfig) -> Self {
173 Self {
174 config,
175 active_calls: Arc::new(Mutex::new(HashMap::new())),
176 client: reqwest::Client::new(),
177 }
178 }
179
180 fn api_base_url(&self) -> &str {
182 match self.config.provider {
183 VoiceProvider::Twilio => "https://api.twilio.com/2010-04-01",
184 VoiceProvider::Telnyx => "https://api.telnyx.com/v2",
185 VoiceProvider::Plivo => "https://api.plivo.com/v1",
186 }
187 }
188
189 pub async fn place_call(&self, to_number: &str) -> Result<String> {
191 if self.config.require_outbound_approval {
192 info!(to = to_number, "outbound call requires approval");
193 return Ok(format!("PENDING_APPROVAL:{to_number}"));
194 }
195 self.execute_outbound_call(to_number).await
196 }
197
198 async fn execute_outbound_call(&self, to_number: &str) -> Result<String> {
199 let webhook_url = self.webhook_url("/voice/status");
200
201 match self.config.provider {
202 VoiceProvider::Twilio => {
203 let url = format!(
204 "{}/Accounts/{}/Calls.json",
205 self.api_base_url(),
206 self.config.account_id
207 );
208 let resp = self
209 .client
210 .post(&url)
211 .basic_auth(&self.config.account_id, Some(&self.config.auth_token))
212 .form(&[
213 ("To", to_number),
214 ("From", &self.config.from_number),
215 ("StatusCallback", &webhook_url),
216 ("Timeout", &self.config.max_call_duration_secs.to_string()),
217 ])
218 .send()
219 .await?;
220
221 if !resp.status().is_success() {
222 let body = resp.text().await.unwrap_or_default();
223 bail!("Twilio call failed: {body}");
224 }
225
226 let json: serde_json::Value = serde_json::from_str(&resp.text().await?)?;
227 let call_sid = json["sid"].as_str().unwrap_or("unknown").to_string();
228 info!(call_sid = %call_sid, to = to_number, "outbound call placed via Twilio");
229 Ok(call_sid)
230 }
231 VoiceProvider::Telnyx => {
232 let url = format!("{}/calls", self.api_base_url());
233 let resp = self
234 .client
235 .post(&url)
236 .bearer_auth(&self.config.auth_token)
237 .json(&serde_json::json!({
238 "connection_id": self.config.account_id,
239 "to": to_number,
240 "from": self.config.from_number,
241 "webhook_url": webhook_url,
242 "timeout_secs": self.config.max_call_duration_secs,
243 }))
244 .send()
245 .await?;
246
247 if !resp.status().is_success() {
248 let body = resp.text().await.unwrap_or_default();
249 bail!("Telnyx call failed: {body}");
250 }
251
252 let json: serde_json::Value = serde_json::from_str(&resp.text().await?)?;
253 let call_id = json["data"]["call_control_id"]
254 .as_str()
255 .unwrap_or("unknown")
256 .to_string();
257 info!(call_id = %call_id, to = to_number, "outbound call placed via Telnyx");
258 Ok(call_id)
259 }
260 VoiceProvider::Plivo => {
261 let url = format!(
262 "{}/Account/{}/Call/",
263 self.api_base_url(),
264 self.config.account_id
265 );
266 let resp = self
267 .client
268 .post(&url)
269 .basic_auth(&self.config.account_id, Some(&self.config.auth_token))
270 .json(&serde_json::json!({
271 "to": to_number,
272 "from": self.config.from_number,
273 "answer_url": self.webhook_url("/voice/answer"),
274 "hangup_url": self.webhook_url("/voice/hangup"),
275 "time_limit": self.config.max_call_duration_secs,
276 }))
277 .send()
278 .await?;
279
280 if !resp.status().is_success() {
281 let body = resp.text().await.unwrap_or_default();
282 bail!("Plivo call failed: {body}");
283 }
284
285 let json: serde_json::Value = serde_json::from_str(&resp.text().await?)?;
286 let call_uuid = json["request_uuid"]
287 .as_str()
288 .unwrap_or("unknown")
289 .to_string();
290 info!(call_uuid = %call_uuid, to = to_number, "outbound call placed via Plivo");
291 Ok(call_uuid)
292 }
293 }
294 }
295
296 fn webhook_url(&self, path: &str) -> String {
298 if let Some(ref base) = self.config.webhook_base_url {
299 format!("{}{}", base.trim_end_matches('/'), path)
300 } else {
301 format!("http://localhost:{}{}", self.config.webhook_port, path)
302 }
303 }
304
305 pub async fn add_transcript_entry(&self, call_id: &str, speaker: &str, text: &str) {
307 let mut calls = self.active_calls.lock().await;
308 if let Some(record) = calls.get_mut(call_id) {
309 record.transcript.push(TranscriptEntry {
310 speaker: speaker.to_string(),
311 text: text.to_string(),
312 timestamp: chrono::Utc::now().to_rfc3339(),
313 });
314 }
315 }
316
317 pub async fn get_call(&self, call_id: &str) -> Option<CallRecord> {
319 let calls = self.active_calls.lock().await;
320 calls.get(call_id).cloned()
321 }
322
323 pub async fn active_calls(&self) -> Vec<CallRecord> {
325 let calls = self.active_calls.lock().await;
326 calls.values().cloned().collect()
327 }
328
329 pub async fn handle_inbound_call(
331 &self,
332 call_id: &str,
333 from_number: &str,
334 tx: &mpsc::Sender<ChannelMessage>,
335 ) -> Result<()> {
336 let record = CallRecord {
337 call_id: call_id.to_string(),
338 direction: CallDirection::Inbound,
339 remote_number: from_number.to_string(),
340 local_number: self.config.from_number.clone(),
341 state: CallState::Ringing,
342 started_at: chrono::Utc::now().to_rfc3339(),
343 ended_at: None,
344 duration_secs: 0,
345 transcript: Vec::new(),
346 };
347
348 {
349 let mut calls = self.active_calls.lock().await;
350 calls.insert(call_id.to_string(), record);
351 }
352
353 info!(
354 call_id = call_id,
355 from = from_number,
356 "inbound call received"
357 );
358
359 let msg = ChannelMessage {
361 id: call_id.to_string(),
362 sender: from_number.to_string(),
363 reply_target: from_number.to_string(),
364 content: format!("[Voice Call] Incoming call from {from_number} (call_id: {call_id})"),
365 channel: "voice_call".to_string(),
366 timestamp: chrono::Utc::now().timestamp().unsigned_abs(),
367 thread_ts: Some(call_id.to_string()),
368 interruption_scope_id: Some(call_id.to_string()),
369 attachments: vec![],
370 };
371 tx.send(msg)
372 .await
373 .map_err(|e| anyhow::anyhow!("Failed to send call event: {e}"))?;
374 Ok(())
375 }
376
377 pub async fn handle_status_update(&self, call_id: &str, new_state: CallState) {
379 let mut calls = self.active_calls.lock().await;
380 if let Some(record) = calls.get_mut(call_id) {
381 let old_state = record.state;
382 record.state = new_state;
383
384 if matches!(
385 new_state,
386 CallState::Completed | CallState::Failed | CallState::HungUp
387 ) {
388 record.ended_at = Some(chrono::Utc::now().to_rfc3339());
389 }
390
391 debug!(
392 call_id = call_id,
393 old_state = %old_state,
394 new_state = %new_state,
395 "call state transition"
396 );
397 }
398 }
399
400 pub async fn save_transcript(
402 &self,
403 call_id: &str,
404 workspace_dir: &std::path::Path,
405 ) -> Result<()> {
406 if !self.config.transcription_logging {
407 return Ok(());
408 }
409
410 let calls = self.active_calls.lock().await;
411 let Some(record) = calls.get(call_id) else {
412 bail!("Call not found: {call_id}");
413 };
414
415 let logs_dir = workspace_dir.join("logs").join("calls");
416 std::fs::create_dir_all(&logs_dir)?;
417
418 let filename = format!("{}_{}.json", record.started_at.replace(':', "-"), call_id);
419 let path = logs_dir.join(filename);
420 let json = serde_json::to_string_pretty(record)?;
421 std::fs::write(&path, json)?;
422
423 info!(call_id = call_id, path = %path.display(), "call transcript saved");
424 Ok(())
425 }
426}
427
428#[async_trait::async_trait]
431impl Channel for VoiceCallChannel {
432 fn name(&self) -> &str {
433 "voice_call"
434 }
435
436 async fn send(&self, message: &SendMessage) -> Result<()> {
437 if let Some(ref thread_ts) = message.thread_ts {
439 let calls = self.active_calls.lock().await;
440 if let Some(record) = calls.get(thread_ts) {
441 if record.state == CallState::InProgress {
442 debug!(
443 call_id = thread_ts,
444 "would TTS message to active call: {}", message.content
445 );
446 return Ok(());
449 }
450 }
451 }
452
453 debug!("voice_call send (no active call): {}", message.content);
454 Ok(())
455 }
456
457 async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> Result<()> {
458 let port = self.config.webhook_port;
459 let active_calls = self.active_calls.clone();
460 let _tx = tx.clone();
461
462 info!(port = port, provider = %self.config.provider, "voice call webhook server starting");
463
464 let app = axum::Router::new()
475 .route("/voice/health", axum::routing::get(|| async { "ok" }))
476 .with_state(active_calls);
477
478 let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{port}"))
479 .await
480 .map_err(|e| anyhow::anyhow!("Failed to bind voice webhook server: {e}"))?;
481
482 axum::serve(listener, app)
483 .await
484 .map_err(|e| anyhow::anyhow!("Voice webhook server error: {e}"))?;
485
486 Ok(())
487 }
488
489 async fn health_check(&self) -> bool {
490 let test_url = match self.config.provider {
492 VoiceProvider::Twilio => {
493 format!(
494 "{}/Accounts/{}.json",
495 self.api_base_url(),
496 self.config.account_id
497 )
498 }
499 VoiceProvider::Telnyx => format!("{}/connections", self.api_base_url()),
500 VoiceProvider::Plivo => {
501 format!(
502 "{}/Account/{}/",
503 self.api_base_url(),
504 self.config.account_id
505 )
506 }
507 };
508
509 match self.client.get(&test_url).send().await {
510 Ok(resp) => {
511 resp.status().is_success() || resp.status().as_u16() == 401
513 }
514 Err(e) => {
515 warn!(provider = %self.config.provider, "voice call health check failed: {e}");
516 false
517 }
518 }
519 }
520
521 async fn start_typing(&self, _recipient: &str) -> Result<()> {
522 Ok(()) }
524
525 async fn stop_typing(&self, _recipient: &str) -> Result<()> {
526 Ok(()) }
528
529 fn supports_draft_updates(&self) -> bool {
530 false
531 }
532
533 async fn send_draft(&self, _message: &SendMessage) -> Result<Option<String>> {
534 Ok(None)
535 }
536
537 async fn update_draft(&self, _recipient: &str, _message_id: &str, _text: &str) -> Result<()> {
538 Ok(())
539 }
540
541 async fn finalize_draft(&self, _recipient: &str, _message_id: &str, _text: &str) -> Result<()> {
542 Ok(())
543 }
544
545 async fn cancel_draft(&self, _recipient: &str, _message_id: &str) -> Result<()> {
546 Ok(())
547 }
548
549 async fn add_reaction(&self, _channel_id: &str, _message_id: &str, _emoji: &str) -> Result<()> {
550 Ok(())
551 }
552
553 async fn remove_reaction(
554 &self,
555 _channel_id: &str,
556 _message_id: &str,
557 _emoji: &str,
558 ) -> Result<()> {
559 Ok(())
560 }
561
562 async fn pin_message(&self, _channel_id: &str, _message_id: &str) -> Result<()> {
563 Ok(())
564 }
565
566 async fn unpin_message(&self, _channel_id: &str, _message_id: &str) -> Result<()> {
567 Ok(())
568 }
569
570 async fn redact_message(
571 &self,
572 _channel_id: &str,
573 _message_id: &str,
574 _reason: Option<String>,
575 ) -> Result<()> {
576 Ok(())
577 }
578}
579
580#[cfg(test)]
581mod tests {
582 use super::*;
583
584 fn test_config() -> VoiceCallConfig {
585 VoiceCallConfig {
586 provider: VoiceProvider::Twilio,
587 account_id: "AC_TEST_ACCOUNT".into(),
588 auth_token: "test_token".into(),
589 from_number: "+15551234567".into(),
590 webhook_port: 8090,
591 require_outbound_approval: true,
592 transcription_logging: true,
593 tts_voice: None,
594 max_call_duration_secs: 3600,
595 webhook_base_url: Some("https://tunnel.example.com".into()),
596 }
597 }
598
599 #[test]
600 fn provider_display() {
601 assert_eq!(VoiceProvider::Twilio.to_string(), "twilio");
602 assert_eq!(VoiceProvider::Telnyx.to_string(), "telnyx");
603 assert_eq!(VoiceProvider::Plivo.to_string(), "plivo");
604 }
605
606 #[test]
607 fn call_state_display() {
608 assert_eq!(CallState::Ringing.to_string(), "ringing");
609 assert_eq!(CallState::InProgress.to_string(), "in_progress");
610 assert_eq!(CallState::Completed.to_string(), "completed");
611 assert_eq!(CallState::PendingApproval.to_string(), "pending_approval");
612 }
613
614 #[test]
615 fn webhook_url_with_base() {
616 let channel = VoiceCallChannel::new(test_config());
617 assert_eq!(
618 channel.webhook_url("/voice/status"),
619 "https://tunnel.example.com/voice/status"
620 );
621 }
622
623 #[test]
624 fn webhook_url_without_base() {
625 let mut config = test_config();
626 config.webhook_base_url = None;
627 let channel = VoiceCallChannel::new(config);
628 assert_eq!(
629 channel.webhook_url("/voice/status"),
630 "http://localhost:8090/voice/status"
631 );
632 }
633
634 #[test]
635 fn channel_name() {
636 let channel = VoiceCallChannel::new(test_config());
637 assert_eq!(channel.name(), "voice_call");
638 }
639
640 #[tokio::test]
641 async fn handle_inbound_call_creates_record() {
642 let channel = VoiceCallChannel::new(test_config());
643 let (tx, mut rx) = mpsc::channel(10);
644
645 channel
646 .handle_inbound_call("call-123", "+15559876543", &tx)
647 .await
648 .unwrap();
649
650 let record = channel.get_call("call-123").await.unwrap();
652 assert_eq!(record.call_id, "call-123");
653 assert_eq!(record.remote_number, "+15559876543");
654 assert_eq!(record.state, CallState::Ringing);
655 assert_eq!(record.direction, CallDirection::Inbound);
656
657 let msg = rx.recv().await.unwrap();
659 assert!(msg.content.contains("Incoming call"));
660 assert!(msg.content.contains("+15559876543"));
661 }
662
663 #[tokio::test]
664 async fn handle_status_update_transitions_state() {
665 let channel = VoiceCallChannel::new(test_config());
666 let (tx, _rx) = mpsc::channel(10);
667
668 channel
669 .handle_inbound_call("call-456", "+15559876543", &tx)
670 .await
671 .unwrap();
672
673 channel
674 .handle_status_update("call-456", CallState::InProgress)
675 .await;
676
677 let record = channel.get_call("call-456").await.unwrap();
678 assert_eq!(record.state, CallState::InProgress);
679 assert!(record.ended_at.is_none());
680
681 channel
683 .handle_status_update("call-456", CallState::Completed)
684 .await;
685
686 let record = channel.get_call("call-456").await.unwrap();
687 assert_eq!(record.state, CallState::Completed);
688 assert!(record.ended_at.is_some());
689 }
690
691 #[tokio::test]
692 async fn add_transcript_entry_records_entries() {
693 let channel = VoiceCallChannel::new(test_config());
694 let (tx, _rx) = mpsc::channel(10);
695
696 channel
697 .handle_inbound_call("call-789", "+15559876543", &tx)
698 .await
699 .unwrap();
700
701 channel
702 .add_transcript_entry("call-789", "caller", "Hello, I need help")
703 .await;
704 channel
705 .add_transcript_entry("call-789", "agent", "Hi, how can I assist you?")
706 .await;
707
708 let record = channel.get_call("call-789").await.unwrap();
709 assert_eq!(record.transcript.len(), 2);
710 assert_eq!(record.transcript[0].speaker, "caller");
711 assert_eq!(record.transcript[0].text, "Hello, I need help");
712 assert_eq!(record.transcript[1].speaker, "agent");
713 }
714
715 #[tokio::test]
716 async fn save_transcript_creates_file() {
717 let channel = VoiceCallChannel::new(test_config());
718 let (tx, _rx) = mpsc::channel(10);
719 let workspace = tempfile::tempdir().unwrap();
720
721 channel
722 .handle_inbound_call("call-save", "+15559876543", &tx)
723 .await
724 .unwrap();
725
726 channel
727 .add_transcript_entry("call-save", "caller", "Test message")
728 .await;
729
730 channel
731 .save_transcript("call-save", workspace.path())
732 .await
733 .unwrap();
734
735 let logs_dir = workspace.path().join("logs").join("calls");
737 assert!(logs_dir.exists());
738
739 let entries: Vec<_> = std::fs::read_dir(&logs_dir)
741 .unwrap()
742 .filter_map(|e| e.ok())
743 .collect();
744 assert_eq!(entries.len(), 1);
745
746 let content = std::fs::read_to_string(entries[0].path()).unwrap();
748 let parsed: serde_json::Value = serde_json::from_str(&content).unwrap();
749 assert_eq!(parsed["call_id"], "call-save");
750 assert_eq!(parsed["transcript"][0]["text"], "Test message");
751 }
752
753 #[tokio::test]
754 async fn active_calls_lists_all() {
755 let channel = VoiceCallChannel::new(test_config());
756 let (tx, _rx) = mpsc::channel(10);
757
758 channel
759 .handle_inbound_call("call-a", "+15551111111", &tx)
760 .await
761 .unwrap();
762 channel
763 .handle_inbound_call("call-b", "+15552222222", &tx)
764 .await
765 .unwrap();
766
767 let calls = channel.active_calls().await;
768 assert_eq!(calls.len(), 2);
769 }
770
771 #[tokio::test]
772 async fn place_call_requires_approval() {
773 let channel = VoiceCallChannel::new(test_config());
774 let result = channel.place_call("+15559876543").await.unwrap();
775 assert!(result.starts_with("PENDING_APPROVAL:"));
776 }
777
778 #[test]
779 fn config_serde_roundtrip() {
780 let config = test_config();
781 let json = serde_json::to_string(&config).unwrap();
782 let parsed: VoiceCallConfig = serde_json::from_str(&json).unwrap();
783 assert_eq!(parsed.provider, VoiceProvider::Twilio);
784 assert_eq!(parsed.from_number, "+15551234567");
785 assert_eq!(parsed.webhook_port, 8090);
786 }
787
788 #[test]
789 fn call_record_serde_roundtrip() {
790 let record = CallRecord {
791 call_id: "call-001".into(),
792 direction: CallDirection::Inbound,
793 remote_number: "+15559876543".into(),
794 local_number: "+15551234567".into(),
795 state: CallState::InProgress,
796 started_at: "2026-03-24T12:00:00Z".into(),
797 ended_at: None,
798 duration_secs: 0,
799 transcript: vec![TranscriptEntry {
800 speaker: "caller".into(),
801 text: "Hello".into(),
802 timestamp: "2026-03-24T12:00:01Z".into(),
803 }],
804 };
805 let json = serde_json::to_string(&record).unwrap();
806 let parsed: CallRecord = serde_json::from_str(&json).unwrap();
807 assert_eq!(parsed.call_id, "call-001");
808 assert_eq!(parsed.transcript.len(), 1);
809 }
810
811 #[test]
812 fn default_provider_is_twilio() {
813 assert_eq!(VoiceProvider::default(), VoiceProvider::Twilio);
814 }
815
816 #[test]
817 fn provider_serde_roundtrip() {
818 let json = serde_json::to_string(&VoiceProvider::Telnyx).unwrap();
819 assert_eq!(json, "\"telnyx\"");
820 let parsed: VoiceProvider = serde_json::from_str(&json).unwrap();
821 assert_eq!(parsed, VoiceProvider::Telnyx);
822 }
823
824 #[tokio::test]
825 async fn transcript_logging_disabled_skips_save() {
826 let mut config = test_config();
827 config.transcription_logging = false;
828 let channel = VoiceCallChannel::new(config);
829 let (tx, _rx) = mpsc::channel(10);
830 let workspace = tempfile::tempdir().unwrap();
831
832 channel
833 .handle_inbound_call("call-nolog", "+15559876543", &tx)
834 .await
835 .unwrap();
836
837 channel
838 .save_transcript("call-nolog", workspace.path())
839 .await
840 .unwrap();
841
842 let logs_dir = workspace.path().join("logs").join("calls");
844 assert!(!logs_dir.exists());
845 }
846}