1use crate::call::Command;
2use crate::event::SessionEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use futures::StreamExt;
6use once_cell::sync::Lazy;
7use regex::Regex;
8use reqwest::Client;
9use serde_json::json;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tracing::{info, warn};
13
14#[cfg(test)]
15mod tests;
16
17#[cfg(test)]
18mod dtmf_collector_tests;
19
20static RE_HANGUP: Lazy<Regex> = Lazy::new(|| Regex::new(r"<hangup\s*/>").unwrap());
21static RE_REFER: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<refer\s+to="([^"]+)"\s*/>"#).unwrap());
22static RE_PLAY: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<play\s+file="([^"]+)"\s*/>"#).unwrap());
23static RE_GOTO: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<goto\s+scene="([^"]+)"\s*/>"#).unwrap());
24static RE_SET_VAR: Lazy<Regex> =
25 Lazy::new(|| Regex::new(r#"<set_var\s+key="([^"]+)"\s+value=["'](.+?)["']\s*/>"#).unwrap());
26static RE_HTTP: Lazy<Regex> = Lazy::new(|| {
27 Regex::new(r#"<http\s+url="([^"]+)"(?:\s+method="([^"]+)")?(?:\s+body="([^"]+)")?\s*/>"#)
28 .unwrap()
29});
30static RE_COLLECT: Lazy<Regex> = Lazy::new(|| {
31 Regex::new(r#"<collect\s+type="([^"]+)"\s+var="([^"]+)"(?:\s+prompt="([^"]*)")?\s*/>"#).unwrap()
32});
33static RE_SENTENCE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)[.!?。!?\n]\s*").unwrap());
34static FILLERS: Lazy<std::collections::HashSet<String>> = Lazy::new(|| {
35 let mut s = std::collections::HashSet::new();
36 let default_fillers = ["嗯", "啊", "哦", "那个", "那个...", "uh", "um", "ah"];
37
38 if let Ok(content) = std::fs::read_to_string("config/fillers.txt") {
39 for line in content.lines() {
40 let trimmed = line.trim().to_lowercase();
41 if !trimmed.is_empty() {
42 s.insert(trimmed);
43 }
44 }
45 }
46
47 if s.is_empty() {
48 for f in default_fillers {
49 s.insert(f.to_string());
50 }
51 }
52 s
53});
54
55use super::ChatMessage;
56use super::InterruptionStrategy;
57use super::LlmConfig;
58use super::dialogue::DialogueHandler;
59
60pub mod provider;
61pub mod rag;
62pub mod types;
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65enum CommandKind {
66 Hangup,
67 Refer,
68 Sentence,
69 Play,
70 Goto,
71 SetVar,
72 Http,
73 Collect,
74}
75
76pub use provider::*;
77pub use rag::*;
78pub use types::*;
79
80const MAX_RAG_ATTEMPTS: usize = 3;
81
82#[derive(Debug, Clone)]
84pub struct CollectorState {
85 pub collector_type: String,
87 pub var_name: String,
89 pub config: super::DtmfCollectorConfig,
91 pub buffer: String,
93 pub start_time: std::time::Instant,
95 pub last_digit_time: std::time::Instant,
97 pub retry_count: u32,
99}
100
101pub struct LlmHandler {
102 config: LlmConfig,
103 interruption_config: super::InterruptionConfig,
104 global_follow_up_config: Option<super::FollowUpConfig>,
105 dtmf_config: Option<HashMap<String, super::DtmfAction>>,
106 dtmf_collectors: Option<HashMap<String, super::DtmfCollectorConfig>>,
107 history: Vec<ChatMessage>,
108 provider: Arc<dyn LlmProvider>,
109 rag_retriever: Arc<dyn RagRetriever>,
110 is_speaking: bool,
111 is_hanging_up: bool,
112 consecutive_follow_ups: u32,
113 last_interaction_at: std::time::Instant,
114 event_sender: Option<crate::event::EventSender>,
115 last_asr_final_at: Option<std::time::Instant>,
116 last_tts_start_at: Option<std::time::Instant>,
117 last_robot_msg_at: Option<std::time::Instant>,
118 call: Option<crate::call::ActiveCallRef>,
119 scenes: HashMap<String, super::Scene>,
120 current_scene_id: Option<String>,
121 client: Client,
122 sip_config: Option<crate::SipOption>,
123 collector_state: Option<CollectorState>,
125}
126
127impl LlmHandler {
128 pub fn new(
129 config: LlmConfig,
130 interruption: super::InterruptionConfig,
131 global_follow_up_config: Option<super::FollowUpConfig>,
132 scenes: HashMap<String, super::Scene>,
133 dtmf: Option<HashMap<String, super::DtmfAction>>,
134 dtmf_collectors: Option<HashMap<String, super::DtmfCollectorConfig>>,
135 initial_scene_id: Option<String>,
136 sip_config: Option<crate::SipOption>,
137 ) -> Self {
138 Self::with_provider(
139 config,
140 Arc::new(DefaultLlmProvider::new()),
141 Arc::new(NoopRagRetriever),
142 interruption,
143 global_follow_up_config,
144 scenes,
145 dtmf,
146 dtmf_collectors,
147 initial_scene_id,
148 sip_config,
149 )
150 }
151
152 pub fn with_provider(
153 config: LlmConfig,
154 provider: Arc<dyn LlmProvider>,
155 rag_retriever: Arc<dyn RagRetriever>,
156 interruption: super::InterruptionConfig,
157 global_follow_up_config: Option<super::FollowUpConfig>,
158 scenes: HashMap<String, super::Scene>,
159 dtmf: Option<HashMap<String, super::DtmfAction>>,
160 dtmf_collectors: Option<HashMap<String, super::DtmfCollectorConfig>>,
161 initial_scene_id: Option<String>,
162 sip_config: Option<crate::SipOption>,
163 ) -> Self {
164 let mut history = Vec::new();
165 let system_prompt = Self::build_system_prompt(&config, None, dtmf_collectors.as_ref());
166
167 history.push(ChatMessage {
168 role: "system".to_string(),
169 content: system_prompt,
170 });
171
172 Self {
173 config,
174 interruption_config: interruption,
175 global_follow_up_config,
176 dtmf_config: dtmf,
177 dtmf_collectors,
178 history,
179 provider,
180 rag_retriever,
181 is_speaking: false,
182 is_hanging_up: false,
183 consecutive_follow_ups: 0,
184 last_interaction_at: std::time::Instant::now(),
185 event_sender: None,
186 last_asr_final_at: None,
187 last_tts_start_at: None,
188 last_robot_msg_at: None,
189 call: None,
190 scenes,
191 current_scene_id: initial_scene_id,
192 client: Client::new(),
193 sip_config,
194 collector_state: None,
195 }
196 }
197
198 fn build_system_prompt(
199 config: &LlmConfig,
200 scene_prompt: Option<&str>,
201 dtmf_collectors: Option<&HashMap<String, super::DtmfCollectorConfig>>,
202 ) -> String {
203 let base_prompt =
204 scene_prompt.unwrap_or_else(|| config.prompt.as_deref().unwrap_or_default());
205 let mut features_prompt = String::new();
206
207 if let Some(features) = &config.features {
208 let lang = config.language.as_deref().unwrap_or("zh");
209 for feature in features {
210 match Self::load_feature_snippet(feature, lang) {
211 Ok(snippet) => {
212 features_prompt.push_str(&format!("\n- {}", snippet));
213 }
214 Err(e) => {
215 warn!("Failed to load feature snippet {}: {}", feature, e);
216 }
217 }
218 }
219 }
220
221 let features_section = if features_prompt.is_empty() {
222 String::new()
223 } else {
224 format!("\n\n### Enhanced Capabilities:{}\n", features_prompt)
225 };
226
227 let tool_instructions = if let Some(custom) = &config.tool_instructions {
229 custom.clone()
230 } else {
231 let lang = config.language.as_deref().unwrap_or("zh");
232 Self::load_feature_snippet("tool_instructions", lang)
233 .unwrap_or_else(|_| {
234 Self::load_feature_snippet("tool_instructions", "en")
236 .unwrap_or_else(|_| {
237 "Tool usage instructions:\n\
239 - To hang up the call, output: <hangup/>\n\
240 - To transfer the call, output: <refer to=\"sip:xxxx\"/>\n\
241 - To play an audio file, output: <play file=\"path/to/file.wav\"/>\n\
242 - To switch to another scene, output: <goto scene=\"scene_id\"/>\n\
243 - To call an external HTTP API, output JSON:\n\
244 ```json\n\
245 {{ \"tools\": [{{ \"name\": \"http\", \"url\": \"...\", \"method\": \"POST\", \"body\": {{ ... }} }}] }}\n\
246 ```\n\
247 Please use XML tags for simple actions and JSON blocks for tool calls. \
248 Output your response in short sentences. Each sentence will be played as soon as it is finished."
249 .to_string()
250 })
251 })
252 };
253
254 let collector_section = Self::generate_collector_instructions(dtmf_collectors);
255
256 format!(
257 "{}{}\n\n{}\n{}",
258 base_prompt, features_section, tool_instructions, collector_section
259 )
260 }
261
262 fn load_feature_snippet(feature: &str, lang: &str) -> Result<String> {
263 let path = format!("features/{}.{}.md", feature, lang);
264 let content = std::fs::read_to_string(path)?;
265 Ok(content.trim().to_string())
266 }
267
268 fn generate_collector_instructions(
270 collectors: Option<&HashMap<String, super::DtmfCollectorConfig>>,
271 ) -> String {
272 let collectors = match collectors {
273 Some(c) if !c.is_empty() => c,
274 _ => return String::new(),
275 };
276
277 let mut doc = String::from("\n### DTMF Digit Collection\n\n");
278 doc.push_str(
279 "When you need to collect numeric input from the user (such as phone numbers, \
280 verification codes, ID numbers, etc.), use the DTMF digit collection command. \
281 This is more accurate than voice recognition for numeric input.\n\n",
282 );
283 doc.push_str("**Usage:** Output the following XML tag to start collecting:\n");
284 doc.push_str(
285 "```\n<collect type=\"TYPE\" var=\"VAR_NAME\" prompt=\"PROMPT_TEXT\" />\n```\n\n",
286 );
287 doc.push_str("- `type`: The collector type (see available types below)\n");
288 doc.push_str("- `var`: Variable name to store the collected digits\n");
289 doc.push_str("- `prompt`: The voice prompt to play before collecting (tell the user what to input)\n\n");
290 doc.push_str("**Available collector types:**\n\n");
291
292 let mut sorted: Vec<_> = collectors.iter().collect();
294 sorted.sort_by_key(|(k, _)| (*k).clone());
295
296 for (name, config) in &sorted {
297 let desc = config.description.as_deref().unwrap_or("No description");
298 let mut details = Vec::new();
299 if let Some(d) = config.digits {
300 details.push(format!("{} digits", d));
301 } else {
302 if let Some(min) = config.min_digits {
303 details.push(format!("min {} digits", min));
304 }
305 if let Some(max) = config.max_digits {
306 details.push(format!("max {} digits", max));
307 }
308 }
309 if let Some(fk) = &config.finish_key {
310 details.push(format!("press {} to finish", fk));
311 }
312 let detail_str = if details.is_empty() {
313 String::new()
314 } else {
315 format!(" ({})", details.join(", "))
316 };
317 doc.push_str(&format!("- `{}`: {}{}\n", name, desc, detail_str));
318 }
319
320 doc.push_str("\n**Flow:**\n");
321 doc.push_str("1. You output `<collect .../>` with a voice prompt\n");
322 doc.push_str("2. The system plays your prompt, then enters digit collection mode (voice input is ignored)\n");
323 doc.push_str("3. When collection completes, the system notifies you with the result\n");
324 doc.push_str("4. You can access the collected value via `{{ var_name }}` in subsequent responses\n\n");
325 doc.push_str(
326 "**Important:** During collection the user can only input digits, not speak. ",
327 );
328 doc.push_str("If validation fails, the system will automatically retry. ");
329 doc.push_str("After collection success or failure, continue the conversation naturally.\n");
330
331 doc
332 }
333
334 pub async fn check_collector_timeout(&mut self) -> Result<Vec<Command>> {
337 let state = match &self.collector_state {
338 Some(s) => s,
339 None => return Ok(vec![]),
340 };
341
342 let timeout_secs = state.config.timeout.unwrap_or(15) as u64;
343 let inter_digit_timeout_secs = state.config.inter_digit_timeout.unwrap_or(5) as u64;
344
345 if state.start_time.elapsed().as_secs() >= timeout_secs {
347 info!(
348 "DTMF collector overall timeout ({}s) for var={}",
349 timeout_secs, state.var_name
350 );
351 let var_name = state.var_name.clone();
352 let buffer = state.buffer.clone();
353 let collector_type = state.collector_type.clone();
354 let config = state.config.clone();
355 let retry_count = state.retry_count;
356 self.collector_state = None;
357
358 if !buffer.is_empty() {
359 return self
361 .do_finish_collection(buffer, var_name, collector_type, config, retry_count)
362 .await;
363 }
364
365 self.history.push(ChatMessage {
367 role: "system".to_string(),
368 content: format!(
369 "[DTMF collection timed out for '{}'. No digits were entered. Please guide the user.]",
370 var_name
371 ),
372 });
373 return self.generate_response().await;
374 }
375
376 if !state.buffer.is_empty()
378 && state.last_digit_time.elapsed().as_secs() >= inter_digit_timeout_secs
379 {
380 info!(
381 "DTMF collector inter-digit timeout ({}s) for var={}, buffer={}",
382 inter_digit_timeout_secs, state.var_name, state.buffer
383 );
384 let buffer = state.buffer.clone();
385 let var_name = state.var_name.clone();
386 let collector_type = state.collector_type.clone();
387 let config = state.config.clone();
388 let retry_count = state.retry_count;
389 self.collector_state = None;
390 return self
391 .do_finish_collection(buffer, var_name, collector_type, config, retry_count)
392 .await;
393 }
394
395 Ok(vec![])
396 }
397
398 async fn handle_collector_digit(&mut self, digit: &str) -> Result<Vec<Command>> {
400 let state = self.collector_state.as_mut().unwrap();
401
402 if let Some(ref finish_key) = state.config.finish_key.clone() {
404 if digit == finish_key {
405 info!("DTMF collector: finish key '{}' received", digit);
406 let buffer = state.buffer.clone();
407 let var_name = state.var_name.clone();
408 let collector_type = state.collector_type.clone();
409 let config = state.config.clone();
410 let retry_count = state.retry_count;
411 self.collector_state = None;
412 return self
413 .do_finish_collection(buffer, var_name, collector_type, config, retry_count)
414 .await;
415 }
416 }
417
418 state.buffer.push_str(digit);
420 state.last_digit_time = std::time::Instant::now();
421
422 info!(
423 "DTMF collector: digit '{}', buffer now '{}'",
424 digit, state.buffer
425 );
426
427 let effective_max = state.config.digits.or(state.config.max_digits);
429
430 if let Some(max) = effective_max {
431 if state.buffer.len() >= max as usize {
432 if state.config.finish_key.is_none() {
434 info!("DTMF collector: reached max digits ({})", max);
435 let buffer = state.buffer.clone();
436 let var_name = state.var_name.clone();
437 let collector_type = state.collector_type.clone();
438 let config = state.config.clone();
439 let retry_count = state.retry_count;
440 self.collector_state = None;
441 return self
442 .do_finish_collection(buffer, var_name, collector_type, config, retry_count)
443 .await;
444 }
445 }
446 }
447
448 Ok(vec![])
449 }
450
451 async fn do_finish_collection(
453 &mut self,
454 buffer: String,
455 var_name: String,
456 collector_type: String,
457 config: super::DtmfCollectorConfig,
458 retry_count: u32,
459 ) -> Result<Vec<Command>> {
460 let min = config.digits.or(config.min_digits).unwrap_or(0);
462 if min > 0 && (buffer.len() as u32) < min {
463 return self
464 .retry_or_fail(
465 collector_type,
466 config,
467 retry_count,
468 var_name,
469 &format!("Expected at least {} digits, got {}", min, buffer.len()),
470 )
471 .await;
472 }
473
474 if let Some(validation) = &config.validation {
476 if let Ok(re) = regex::Regex::new(&validation.pattern) {
477 if !re.is_match(&buffer) {
478 let msg = validation
479 .error_message
480 .clone()
481 .unwrap_or_else(|| "Input format is incorrect".to_string());
482 return self
483 .retry_or_fail(collector_type, config, retry_count, var_name, &msg)
484 .await;
485 }
486 }
487 }
488
489 info!(
491 "DTMF collector: successfully collected '{}' for var '{}'",
492 buffer, var_name
493 );
494
495 if let Some(call) = &self.call {
496 let mut state = call.call_state.write().await;
497 let mut extras = state.extras.take().unwrap_or_default();
498 extras.insert(var_name.clone(), serde_json::Value::String(buffer.clone()));
499 state.extras = Some(extras);
500 }
501
502 self.history.push(ChatMessage {
504 role: "system".to_string(),
505 content: format!("[DTMF collection completed for '{}': {}]", var_name, buffer),
506 });
507
508 self.generate_response().await
510 }
511
512 async fn retry_or_fail(
514 &mut self,
515 collector_type: String,
516 config: super::DtmfCollectorConfig,
517 retry_count: u32,
518 var_name: String,
519 reason: &str,
520 ) -> Result<Vec<Command>> {
521 let max_retries = config.retry_times.unwrap_or(3);
522
523 if retry_count >= max_retries {
524 info!(
525 "DTMF collector: max retries ({}) reached for var '{}'",
526 max_retries, var_name
527 );
528 self.history.push(ChatMessage {
529 role: "system".to_string(),
530 content: format!(
531 "[DTMF collection failed for '{}' after {} retries: {}. Please guide the user to try again or use an alternative method.]",
532 var_name, max_retries, reason
533 ),
534 });
535 return self.generate_response().await;
536 }
537
538 info!(
539 "DTMF collector: retry {}/{} for var '{}': {}",
540 retry_count + 1,
541 max_retries,
542 var_name,
543 reason
544 );
545
546 let now = std::time::Instant::now();
548 self.collector_state = Some(CollectorState {
549 collector_type,
550 var_name,
551 config: config.clone(),
552 buffer: String::new(),
553 start_time: now,
554 last_digit_time: now,
555 retry_count: retry_count + 1,
556 });
557
558 let error_msg = config
560 .validation
561 .as_ref()
562 .and_then(|v| v.error_message.clone())
563 .unwrap_or_else(|| reason.to_string());
564
565 Ok(vec![self.create_tts_command(error_msg, None, None)])
566 }
567
568 fn start_collector(&mut self, collector_type: &str, var_name: &str) -> bool {
570 let config = match &self.dtmf_collectors {
571 Some(collectors) => match collectors.get(collector_type) {
572 Some(c) => c.clone(),
573 None => {
574 warn!("Unknown DTMF collector type: {}", collector_type);
575 return false;
576 }
577 },
578 None => {
579 warn!("No DTMF collectors configured");
580 return false;
581 }
582 };
583
584 let now = std::time::Instant::now();
585 self.collector_state = Some(CollectorState {
586 collector_type: collector_type.to_string(),
587 var_name: var_name.to_string(),
588 config,
589 buffer: String::new(),
590 start_time: now,
591 last_digit_time: now,
592 retry_count: 0,
593 });
594
595 info!(
596 "DTMF collector started: type={}, var={}",
597 collector_type, var_name
598 );
599 true
600 }
601
602 pub fn is_collecting(&self) -> bool {
604 self.collector_state.is_some()
605 }
606
607 fn get_dtmf_action(&self, digit: &str) -> Option<super::DtmfAction> {
608 if let Some(scene_id) = &self.current_scene_id {
609 if let Some(scene) = self.scenes.get(scene_id) {
610 if let Some(dtmf) = &scene.dtmf {
611 if let Some(action) = dtmf.get(digit) {
612 return Some(action.clone());
613 }
614 }
615 }
616 }
617
618 if let Some(dtmf) = &self.dtmf_config {
619 if let Some(action) = dtmf.get(digit) {
620 return Some(action.clone());
621 }
622 }
623
624 None
625 }
626
627 async fn handle_dtmf_action(&mut self, action: super::DtmfAction) -> Result<Vec<Command>> {
628 match action {
629 super::DtmfAction::Goto { scene } => {
630 info!("DTMF action: switch to scene {}", scene);
631 self.switch_to_scene(&scene, true).await
632 }
633 super::DtmfAction::Transfer { target } => {
634 info!("DTMF action: transfer to {}", target);
635 Ok(vec![Command::Refer {
636 caller: String::new(),
637 callee: target,
638 options: None,
639 }])
640 }
641 super::DtmfAction::Hangup => {
642 info!("DTMF action: hangup");
643 let headers = self.render_sip_headers().await;
644 Ok(vec![Command::Hangup {
645 reason: Some("DTMF Hangup".to_string()),
646 initiator: Some("ai".to_string()),
647 headers,
648 }])
649 }
650 }
651 }
652
653 async fn get_current_extras(&self) -> HashMap<String, serde_json::Value> {
655 if let Some(call) = &self.call {
656 let state = call.call_state.read().await;
657 state.extras.clone().unwrap_or_default()
658 } else {
659 HashMap::new()
660 }
661 }
662
663 async fn render_scene_prompt(&self, scene: &super::Scene) -> String {
666 let extras = self.get_current_extras().await;
667 super::render_scene_prompt(scene, &extras)
668 }
669
670 async fn switch_to_scene(
671 &mut self,
672 scene_id: &str,
673 trigger_response: bool,
674 ) -> Result<Vec<Command>> {
675 if let Some(scene) = self.scenes.get(scene_id).cloned() {
676 info!("Switching to scene: {}", scene_id);
677 self.current_scene_id = Some(scene_id.to_string());
678 let rendered_prompt = self.render_scene_prompt(&scene).await;
680 let system_prompt = Self::build_system_prompt(
681 &self.config,
682 Some(&rendered_prompt),
683 self.dtmf_collectors.as_ref(),
684 );
685 if let Some(first_msg) = self.history.get_mut(0) {
686 if first_msg.role == "system" {
687 first_msg.content = system_prompt;
688 }
689 }
690
691 let mut commands = Vec::new();
692 if let Some(url) = &scene.play {
693 commands.push(Command::Play {
694 url: url.clone(),
695 play_id: None,
696 auto_hangup: None,
697 wait_input_timeout: None,
698 });
699 }
700
701 if trigger_response {
702 let response_cmds = self.generate_response().await?;
703 commands.extend(response_cmds);
704 }
705 Ok(commands)
706 } else {
707 warn!("Scene not found: {}", scene_id);
708 Ok(vec![])
709 }
710 }
711
712 pub fn get_history_ref(&self) -> &[ChatMessage] {
713 &self.history
714 }
715
716 pub fn get_current_scene_id(&self) -> Option<String> {
717 self.current_scene_id.clone()
718 }
719
720 pub fn set_call(&mut self, call: crate::call::ActiveCallRef) {
721 self.call = Some(call);
722 }
723
724 pub fn set_event_sender(&mut self, sender: crate::event::EventSender) {
725 self.event_sender = Some(sender.clone());
726 if let Some(greeting) = &self.config.greeting {
727 let _ = sender.send(crate::event::SessionEvent::AddHistory {
728 sender: Some("system".to_string()),
729 timestamp: crate::media::get_timestamp(),
730 speaker: "assistant".to_string(),
731 text: greeting.clone(),
732 });
733 }
734 }
735
736 fn send_debug_event(&self, key: &str, data: serde_json::Value) {
737 if let Some(sender) = &self.event_sender {
738 let timestamp = crate::media::get_timestamp();
739 if key == "llm_response" {
740 if let Some(text) = data.get("response").and_then(|v| v.as_str()) {
741 let _ = sender.send(crate::event::SessionEvent::AddHistory {
742 sender: Some("llm".to_string()),
743 timestamp,
744 speaker: "assistant".to_string(),
745 text: text.to_string(),
746 });
747 }
748 }
749
750 let event = crate::event::SessionEvent::Metrics {
751 timestamp,
752 key: key.to_string(),
753 duration: 0,
754 data,
755 };
756 let _ = sender.send(event);
757 }
758 }
759
760 async fn call_llm(&self) -> Result<String> {
761 self.provider.call(&self.config, &self.history).await
762 }
763
764 fn create_tts_command(
765 &self,
766 text: String,
767 wait_input_timeout: Option<u32>,
768 auto_hangup: Option<bool>,
769 ) -> Command {
770 let timeout = wait_input_timeout.unwrap_or(10000);
771 let play_id = uuid::Uuid::new_v4().to_string();
772
773 if let Some(sender) = &self.event_sender {
774 let _ = sender.send(crate::event::SessionEvent::Metrics {
775 timestamp: crate::media::get_timestamp(),
776 key: "tts_play_id_map".to_string(),
777 duration: 0,
778 data: serde_json::json!({
779 "playId": play_id,
780 "text": text,
781 }),
782 });
783 }
784
785 Command::Tts {
786 text,
787 speaker: None,
788 play_id: Some(play_id),
789 auto_hangup,
790 streaming: None,
791 end_of_stream: Some(true),
792 option: None,
793 wait_input_timeout: Some(timeout),
794 base64: None,
795 cache_key: None,
796 }
797 }
798
799 async fn generate_response(&mut self) -> Result<Vec<Command>> {
800 let start_time = crate::media::get_timestamp();
801 let play_id = uuid::Uuid::new_v4().to_string();
802
803 self.send_debug_event(
805 "llm_call_start",
806 json!({
807 "history_length": self.history.len(),
808 "playId": play_id,
809 }),
810 );
811
812 let mut stream = self
813 .provider
814 .call_stream(&self.config, &self.history)
815 .await?;
816
817 let mut full_content = String::new();
818 let mut full_reasoning = String::new();
819 let mut buffer = String::new();
820 let mut commands = Vec::new();
821 let mut is_json_mode = false;
822 let mut checked_json_mode = false;
823 let mut first_token_time = None;
824
825 while let Some(chunk_result) = stream.next().await {
826 let event = match chunk_result {
827 Ok(c) => c,
828 Err(e) => {
829 warn!("LLM stream error: {}", e);
830 break;
831 }
832 };
833
834 match event {
835 LlmStreamEvent::Reasoning(text) => {
836 full_reasoning.push_str(&text);
837 }
838 LlmStreamEvent::Content(chunk) => {
839 if first_token_time.is_none() && !chunk.trim().is_empty() {
840 first_token_time = Some(crate::media::get_timestamp());
841 }
842
843 full_content.push_str(&chunk);
844 buffer.push_str(&chunk);
845
846 if !checked_json_mode {
847 let trimmed = full_content.trim();
848 if !trimmed.is_empty() {
849 if trimmed.starts_with('{') || trimmed.starts_with('`') {
850 is_json_mode = true;
851 }
852 checked_json_mode = true;
853 }
854 }
855
856 if checked_json_mode && !is_json_mode {
857 let extracted = self
858 .extract_streaming_commands(&mut buffer, &play_id, false)
859 .await;
860 for cmd in extracted {
861 if let Some(call) = &self.call {
862 let _ = call.enqueue_command(cmd).await;
863 } else {
864 commands.push(cmd);
865 }
866 }
867 }
868 }
869 }
870 }
871
872 let end_time = crate::media::get_timestamp();
874 self.send_debug_event(
875 "llm_response",
876 json!({
877 "response": full_content,
878 "reasoning": full_reasoning,
879 "is_json_mode": is_json_mode,
880 "duration": end_time - start_time,
881 "ttfb": first_token_time.map(|t| t - start_time).unwrap_or(0),
882 "playId": play_id,
883 }),
884 );
885
886 if is_json_mode {
887 self.interpret_response(full_content).await
888 } else {
889 let extracted = self
890 .extract_streaming_commands(&mut buffer, &play_id, true)
891 .await;
892 for cmd in extracted {
893 if let Some(call) = &self.call {
894 let _ = call.enqueue_command(cmd).await;
895 } else {
896 commands.push(cmd);
897 }
898 }
899 if !full_content.trim().is_empty() {
900 self.history.push(ChatMessage {
901 role: "assistant".to_string(),
902 content: full_content,
903 });
904 self.last_robot_msg_at = Some(std::time::Instant::now());
905 self.is_speaking = true;
906 self.last_tts_start_at = Some(std::time::Instant::now());
907 }
908 Ok(commands)
909 }
910 }
911
912 async fn extract_streaming_commands(
913 &mut self,
914 buffer: &mut String,
915 play_id: &str,
916 is_final: bool,
917 ) -> Vec<Command> {
918 let mut commands = Vec::new();
919 let mut pending_hangup: Option<(String, usize)> = None; loop {
922 let hangup_pos = RE_HANGUP.find(buffer);
923 let refer_pos = RE_REFER.captures(buffer);
924 let play_pos = RE_PLAY.captures(buffer);
925 let goto_pos = RE_GOTO.captures(buffer);
926 let set_var_pos = RE_SET_VAR.captures(buffer);
927 let http_pos = RE_HTTP.captures(buffer);
928 let collect_pos = RE_COLLECT.captures(buffer);
929 let sentence_pos = RE_SENTENCE.find(buffer);
930
931 let mut positions: Vec<(usize, CommandKind)> = Vec::new();
933 if let Some(m) = hangup_pos {
934 positions.push((m.start(), CommandKind::Hangup));
935 }
936 if let Some(caps) = &refer_pos {
937 positions.push((caps.get(0).unwrap().start(), CommandKind::Refer));
938 }
939 if let Some(caps) = &play_pos {
940 positions.push((caps.get(0).unwrap().start(), CommandKind::Play));
941 }
942 if let Some(caps) = &goto_pos {
943 positions.push((caps.get(0).unwrap().start(), CommandKind::Goto));
944 }
945 if let Some(caps) = &set_var_pos {
946 positions.push((caps.get(0).unwrap().start(), CommandKind::SetVar));
947 }
948 if let Some(caps) = &http_pos {
949 positions.push((caps.get(0).unwrap().start(), CommandKind::Http));
950 }
951 if let Some(caps) = &collect_pos {
952 positions.push((caps.get(0).unwrap().start(), CommandKind::Collect));
953 }
954 if let Some(m) = sentence_pos {
955 positions.push((m.start(), CommandKind::Sentence));
956 }
957
958 positions.sort_by_key(|p| p.0);
959
960 if let Some((pos, kind)) = positions.first() {
961 let pos = *pos;
962 match kind {
963 CommandKind::SetVar => {
964 let caps = RE_SET_VAR.captures(buffer).unwrap();
965 let mat = caps.get(0).unwrap();
966 let key = caps.get(1).unwrap().as_str().to_string();
967 let value = caps.get(2).unwrap().as_str().to_string();
968
969 let prefix = buffer[..pos].to_string();
970 if !prefix.trim().is_empty() {
971 commands.push(self.create_tts_command_with_id(
972 prefix,
973 play_id.to_string(),
974 None,
975 ));
976 }
977
978 if let Some(call) = &self.call {
979 let mut state = call.call_state.write().await;
980 let mut extras = state.extras.take().unwrap_or_default();
981 extras.insert(key, serde_json::Value::String(value));
982 state.extras = Some(extras);
983 }
984
985 buffer.drain(..mat.end());
986 }
987 CommandKind::Http => {
988 let caps = RE_HTTP.captures(buffer).unwrap();
989 let mat = caps.get(0).unwrap();
990 let url = caps.get(1).unwrap().as_str().to_string();
991 let method = caps
992 .get(2)
993 .map(|m| m.as_str().to_string())
994 .unwrap_or("GET".to_string());
995 let body = caps.get(3).map(|m| m.as_str().to_string());
996
997 let prefix = buffer[..pos].to_string();
999 if !prefix.trim().is_empty() {
1000 commands.push(self.create_tts_command_with_id(
1001 prefix,
1002 play_id.to_string(),
1003 None,
1004 ));
1005 }
1006
1007 let client = self.client.clone();
1009 let mut req = match method.to_uppercase().as_str() {
1010 "POST" => client.post(&url),
1011 "PUT" => client.put(&url),
1012 _ => client.get(&url),
1013 };
1014
1015 if let Some(b) = body {
1016 req = req.body(b);
1017 }
1018
1019 match req.send().await {
1021 Ok(res) => {
1022 let status = res.status();
1023 let text = res.text().await.unwrap_or_default();
1024 info!(url, method, status=?status, "HTTP command executed from stream");
1025
1026 self.history.push(ChatMessage {
1028 role: "system".to_string(),
1029 content: format!(
1030 "HTTP {} {} returned ({}): {}",
1031 method, url, status, text
1032 ),
1033 });
1034 }
1035 Err(e) => {
1036 warn!(
1037 url,
1038 method, "Failed to execute HTTP command from stream: {}", e
1039 );
1040
1041 self.history.push(ChatMessage {
1043 role: "system".to_string(),
1044 content: format!("HTTP {} {} failed: {}", method, url, e),
1045 });
1046 }
1047 }
1048
1049 buffer.drain(..mat.end());
1050 }
1051 CommandKind::Hangup => {
1052 let prefix = buffer[..pos].to_string();
1055 let hangup_match = RE_HANGUP.find(buffer).unwrap();
1056 pending_hangup = Some((prefix, hangup_match.end()));
1057 buffer.drain(..hangup_match.end());
1058
1059 }
1062 CommandKind::Refer => {
1063 let caps = RE_REFER.captures(buffer).unwrap();
1064 let mat = caps.get(0).unwrap();
1065 let callee = caps.get(1).unwrap().as_str().to_string();
1066
1067 let prefix = buffer[..pos].to_string();
1068 if !prefix.trim().is_empty() {
1069 commands.push(self.create_tts_command_with_id(
1070 prefix,
1071 play_id.to_string(),
1072 None,
1073 ));
1074 }
1075 commands.push(Command::Refer {
1076 caller: String::new(),
1077 callee,
1078 options: None,
1079 });
1080 buffer.drain(..mat.end());
1081 }
1082 CommandKind::Play => {
1083 let caps = RE_PLAY.captures(buffer).unwrap();
1085 let mat = caps.get(0).unwrap();
1086 let url = caps.get(1).unwrap().as_str().to_string();
1087
1088 let prefix = buffer[..pos].to_string();
1089 if !prefix.trim().is_empty() {
1090 commands.push(self.create_tts_command_with_id(
1091 prefix,
1092 play_id.to_string(),
1093 None,
1094 ));
1095 }
1096 commands.push(Command::Play {
1097 url,
1098 play_id: None,
1099 auto_hangup: None,
1100 wait_input_timeout: None,
1101 });
1102 buffer.drain(..mat.end());
1103 }
1104 CommandKind::Goto => {
1105 let caps = RE_GOTO.captures(buffer).unwrap();
1107 let mat = caps.get(0).unwrap();
1108 let scene_id = caps.get(1).unwrap().as_str().to_string();
1109
1110 let prefix = buffer[..pos].to_string();
1111 if !prefix.trim().is_empty() {
1112 commands.push(self.create_tts_command_with_id(
1113 prefix,
1114 play_id.to_string(),
1115 None,
1116 ));
1117 }
1118
1119 info!("Switching to scene (from stream): {}", scene_id);
1120 if let Some(scene) = self.scenes.get(&scene_id).cloned() {
1121 self.current_scene_id = Some(scene_id);
1122 let rendered_prompt = self.render_scene_prompt(&scene).await;
1124 let system_prompt = Self::build_system_prompt(
1126 &self.config,
1127 Some(&rendered_prompt),
1128 self.dtmf_collectors.as_ref(),
1129 );
1130 if let Some(first_msg) = self.history.get_mut(0) {
1131 if first_msg.role == "system" {
1132 first_msg.content = system_prompt;
1133 }
1134 }
1135 } else {
1136 warn!("Scene not found: {}", scene_id);
1137 }
1138
1139 buffer.drain(..mat.end());
1140 }
1141 CommandKind::Collect => {
1142 let caps = RE_COLLECT.captures(buffer).unwrap();
1143 let mat = caps.get(0).unwrap();
1144 let collector_type = caps.get(1).unwrap().as_str().to_string();
1145 let var_name = caps.get(2).unwrap().as_str().to_string();
1146 let prompt = caps.get(3).map(|m| m.as_str().to_string());
1147
1148 let prefix = buffer[..pos].to_string();
1150 if !prefix.trim().is_empty() {
1151 commands.push(self.create_tts_command_with_id(
1152 prefix,
1153 play_id.to_string(),
1154 None,
1155 ));
1156 }
1157
1158 if let Some(p) = prompt {
1160 if !p.trim().is_empty() {
1161 commands.push(self.create_tts_command(p, None, None));
1162 }
1163 }
1164
1165 if !self.start_collector(&collector_type, &var_name) {
1167 self.history.push(ChatMessage {
1169 role: "system".to_string(),
1170 content: format!(
1171 "[Unknown DTMF collector type '{}'. Available types: {}]",
1172 collector_type,
1173 self.dtmf_collectors
1174 .as_ref()
1175 .map(|c| c.keys().cloned().collect::<Vec<_>>().join(", "))
1176 .unwrap_or_default()
1177 ),
1178 });
1179 }
1180
1181 buffer.drain(..mat.end());
1182 }
1183 CommandKind::Sentence => {
1184 let mat = sentence_pos.unwrap();
1186 let sentence = buffer[..mat.end()].to_string();
1187 if !sentence.trim().is_empty() {
1188 commands.push(self.create_tts_command_with_id(
1189 sentence,
1190 play_id.to_string(),
1191 None,
1192 ));
1193 }
1194 buffer.drain(..mat.end());
1195 }
1196 }
1197 } else {
1198 break;
1199 }
1200 }
1201
1202 if let Some((prefix, _)) = pending_hangup {
1204 let headers = self.render_sip_headers().await;
1205
1206 if let Some(call) = &self.call {
1207 let h_val = serde_json::to_value(&headers).unwrap_or_default();
1208 let mut state = call.call_state.write().await;
1209 let mut extras = state.extras.take().unwrap_or_default();
1210 extras.insert("_hangup_headers".to_string(), h_val);
1211 state.extras = Some(extras);
1212 }
1213
1214 if !prefix.trim().is_empty() {
1215 let mut cmd =
1216 self.create_tts_command_with_id(prefix, play_id.to_string(), Some(true));
1217 if let Command::Tts { end_of_stream, .. } = &mut cmd {
1218 *end_of_stream = Some(true);
1219 }
1220 self.is_hanging_up = true;
1221 commands.push(cmd);
1222 } else {
1223 let mut cmd = self.create_tts_command_with_id(
1224 "".to_string(),
1225 play_id.to_string(),
1226 Some(true),
1227 );
1228 if let Command::Tts { end_of_stream, .. } = &mut cmd {
1229 *end_of_stream = Some(true);
1230 }
1231 self.is_hanging_up = true;
1232 commands.push(cmd);
1233 }
1234
1235 return commands;
1236 }
1237
1238 if is_final {
1239 let remaining = buffer.trim().to_string();
1240 if !remaining.is_empty() {
1241 commands.push(self.create_tts_command_with_id(
1242 remaining,
1243 play_id.to_string(),
1244 None,
1245 ));
1246 }
1247 buffer.clear();
1248
1249 if let Some(last) = commands.last_mut() {
1250 if let Command::Tts { end_of_stream, .. } = last {
1251 *end_of_stream = Some(true);
1252 }
1253 } else if !self.is_hanging_up {
1254 commands.push(Command::Tts {
1255 text: "".to_string(),
1256 speaker: None,
1257 play_id: Some(play_id.to_string()),
1258 auto_hangup: None,
1259 streaming: Some(true),
1260 end_of_stream: Some(true),
1261 option: None,
1262 wait_input_timeout: None,
1263 base64: None,
1264 cache_key: None,
1265 });
1266 }
1267 }
1268
1269 commands
1270 }
1271
1272 fn create_tts_command_with_id(
1273 &self,
1274 text: String,
1275 play_id: String,
1276 auto_hangup: Option<bool>,
1277 ) -> Command {
1278 Command::Tts {
1279 text,
1280 speaker: None,
1281 play_id: Some(play_id),
1282 auto_hangup,
1283 streaming: Some(true),
1284 end_of_stream: None,
1285 option: None,
1286 wait_input_timeout: Some(10000),
1287 base64: None,
1288 cache_key: None,
1289 }
1290 }
1291
1292 async fn handle_tool_invocation(
1293 &mut self,
1294 tool: ToolInvocation,
1295 tool_commands: &mut Vec<Command>,
1296 ) -> Result<bool> {
1297 match tool {
1298 ToolInvocation::Hangup {
1299 ref reason,
1300 ref initiator,
1301 } => {
1302 self.send_debug_event(
1303 "tool_invocation",
1304 json!({
1305 "tool": "Hangup",
1306 "params": {
1307 "reason": reason,
1308 "initiator": initiator,
1309 }
1310 }),
1311 );
1312
1313 let headers = self.render_sip_headers().await;
1314
1315 tool_commands.push(Command::Hangup {
1316 reason: reason.clone(),
1317 initiator: initiator.clone(),
1318 headers,
1319 });
1320 Ok(false)
1321 }
1322 ToolInvocation::Refer {
1323 ref caller,
1324 ref callee,
1325 ref options,
1326 } => {
1327 self.send_debug_event(
1328 "tool_invocation",
1329 json!({
1330 "tool": "Refer",
1331 "params": {
1332 "caller": caller,
1333 "callee": callee,
1334 }
1335 }),
1336 );
1337 tool_commands.push(Command::Refer {
1338 caller: caller.clone(),
1339 callee: callee.clone(),
1340 options: options.clone(),
1341 });
1342 Ok(false)
1343 }
1344 ToolInvocation::Rag {
1345 ref query,
1346 ref source,
1347 } => {
1348 self.handle_rag_tool(query, source).await?;
1349 Ok(true)
1350 }
1351 ToolInvocation::Accept { ref options } => {
1352 self.send_debug_event("tool_invocation", json!({ "tool": "Accept" }));
1353 tool_commands.push(Command::Accept {
1354 option: options.clone().unwrap_or_default(),
1355 });
1356 Ok(false)
1357 }
1358 ToolInvocation::Reject { ref reason, code } => {
1359 self.send_debug_event(
1360 "tool_invocation",
1361 json!({
1362 "tool": "Reject",
1363 "params": {
1364 "reason": reason,
1365 "code": code,
1366 }
1367 }),
1368 );
1369 tool_commands.push(Command::Reject {
1370 reason: reason
1371 .clone()
1372 .unwrap_or_else(|| "Rejected by agent".to_string()),
1373 code,
1374 });
1375 Ok(false)
1376 }
1377 ToolInvocation::Http {
1378 ref url,
1379 ref method,
1380 ref body,
1381 ref headers,
1382 } => {
1383 self.handle_http_tool(url, method, body, headers).await?;
1384 Ok(true)
1385 }
1386 }
1387 }
1388
1389 async fn render_sip_headers(&self) -> Option<HashMap<String, String>> {
1390 let hangup_template = self.sip_config.as_ref()?.hangup_headers.as_ref()?;
1391 let call = self.call.as_ref()?;
1392 let state = call.call_state.read().await;
1393
1394 let mut context = HashMap::new();
1395 let mut sip_headers = HashMap::new();
1396
1397 let sip_header_keys: Vec<String> = state
1400 .extras
1401 .as_ref()
1402 .and_then(|e| e.get("_sip_header_keys"))
1403 .and_then(|v| serde_json::from_value(v.clone()).ok())
1404 .unwrap_or_default();
1405
1406 if let Some(extras) = &state.extras {
1407 for (k, v) in extras {
1408 if k.starts_with('_') {
1410 continue;
1411 }
1412 context.insert(k.clone(), v.clone());
1413 if sip_header_keys.contains(k) {
1415 sip_headers.insert(k.clone(), v.clone());
1416 }
1417 }
1418 }
1419
1420 context.insert(
1422 "sip".to_string(),
1423 serde_json::to_value(&sip_headers).unwrap_or(serde_json::Value::Null),
1424 );
1425
1426 let env = minijinja::Environment::new();
1427 let mut rendered_headers = HashMap::new();
1428 for (k, v) in hangup_template {
1429 if let Ok(rendered) = env.render_str(v, &context) {
1430 rendered_headers.insert(k.clone(), rendered);
1431 } else {
1432 rendered_headers.insert(k.clone(), v.clone());
1433 }
1434 }
1435 Some(rendered_headers)
1436 }
1437
1438 async fn handle_rag_tool(&mut self, query: &str, source: &Option<String>) -> Result<()> {
1439 self.send_debug_event(
1440 "tool_invocation",
1441 json!({
1442 "tool": "Rag",
1443 "params": {
1444 "query": query,
1445 "source": source,
1446 }
1447 }),
1448 );
1449
1450 let rag_result = self.rag_retriever.retrieve(query).await?;
1451
1452 self.send_debug_event(
1453 "rag_result",
1454 json!({
1455 "query": query,
1456 "result": rag_result,
1457 }),
1458 );
1459
1460 let summary = if let Some(source) = source {
1461 format!("[{}] {}", source, rag_result)
1462 } else {
1463 rag_result
1464 };
1465
1466 self.history.push(ChatMessage {
1467 role: "system".to_string(),
1468 content: format!("RAG result for {}: {}", query, summary),
1469 });
1470
1471 Ok(())
1472 }
1473
1474 async fn handle_http_tool(
1475 &mut self,
1476 url: &str,
1477 method: &Option<String>,
1478 body: &Option<serde_json::Value>,
1479 headers: &Option<HashMap<String, String>>,
1480 ) -> Result<()> {
1481 let method_str = method.as_deref().unwrap_or("GET").to_uppercase();
1482 let method =
1483 reqwest::Method::from_bytes(method_str.as_bytes()).unwrap_or(reqwest::Method::GET);
1484
1485 self.send_debug_event(
1486 "tool_invocation",
1487 json!({
1488 "tool": "Http",
1489 "params": {
1490 "url": url,
1491 "method": method_str,
1492 }
1493 }),
1494 );
1495
1496 let mut req = self.client.request(method, url);
1497 if let Some(body) = body {
1498 req = req.json(body);
1499 }
1500 if let Some(headers) = headers {
1501 for (k, v) in headers {
1502 req = req.header(k, v);
1503 }
1504 }
1505
1506 match req.send().await {
1507 Ok(res) => {
1508 let status = res.status();
1509 let text = res.text().await.unwrap_or_default();
1510 self.history.push(ChatMessage {
1511 role: "system".to_string(),
1512 content: format!("HTTP tool response ({}): {}", status, text),
1513 });
1514 }
1515 Err(e) => {
1516 warn!("HTTP tool failed: {}", e);
1517 self.history.push(ChatMessage {
1518 role: "system".to_string(),
1519 content: format!("HTTP tool failed: {}", e),
1520 });
1521 }
1522 }
1523
1524 Ok(())
1525 }
1526
1527 async fn handle_asr_final(&mut self, text: &str) -> Result<Vec<Command>> {
1528 if text.trim().is_empty() {
1529 return Ok(vec![]);
1530 }
1531
1532 self.apply_context_repair(text);
1533 self.apply_rolling_summary().await;
1534
1535 self.last_asr_final_at = Some(std::time::Instant::now());
1536 self.last_interaction_at = std::time::Instant::now();
1537 self.is_speaking = false;
1538 self.consecutive_follow_ups = 0;
1539
1540 self.generate_response().await
1541 }
1542
1543 fn apply_context_repair(&mut self, text: &str) {
1544 let enable_repair = self
1545 .config
1546 .features
1547 .as_ref()
1548 .map(|f| f.contains(&"context_repair".to_string()))
1549 .unwrap_or(false);
1550
1551 if !enable_repair {
1552 self.history.push(ChatMessage {
1553 role: "user".to_string(),
1554 content: text.to_string(),
1555 });
1556 return;
1557 }
1558
1559 let repair_window_ms = self.config.repair_window_ms.unwrap_or(3000) as u128;
1560 let mut merged = false;
1561
1562 if let Some(last_robot_at) = self.last_robot_msg_at {
1563 if last_robot_at.elapsed().as_millis() < repair_window_ms {
1564 if let Some(last_msg) = self.history.last() {
1565 if last_msg.role == "assistant" && last_msg.content.chars().count() < 15 {
1566 info!(
1567 "Context Repair: Detected potential fragmentation. Triggering merge."
1568 );
1569 self.history.pop();
1570 if let Some(prev_user) = self.history.last_mut() {
1571 if prev_user.role == "user" {
1572 prev_user.content.push_str(",");
1573 prev_user.content.push_str(text);
1574 merged = true;
1575 }
1576 }
1577 }
1578 }
1579 }
1580 }
1581
1582 if !merged {
1583 self.history.push(ChatMessage {
1584 role: "user".to_string(),
1585 content: text.to_string(),
1586 });
1587 }
1588 }
1589
1590 async fn apply_rolling_summary(&mut self) {
1591 let enable_summary = self
1592 .config
1593 .features
1594 .as_ref()
1595 .map(|f| f.contains(&"rolling_summary".to_string()))
1596 .unwrap_or(false);
1597
1598 if !enable_summary {
1599 return;
1600 }
1601
1602 let summary_limit = self.config.summary_limit.unwrap_or(20);
1603 if self.history.len() <= summary_limit {
1604 return;
1605 }
1606
1607 info!("Rolling Summary: History limit reached. Triggering background summary.");
1608 let keep_recent = 6;
1609 if self.history.len() <= summary_limit + keep_recent
1610 || self.history.len() <= keep_recent + 1
1611 {
1612 return;
1613 }
1614
1615 let split_idx = self.history.len() - keep_recent;
1616 let to_summarize = self.history[1..split_idx].to_vec();
1617 let recent = self.history[split_idx..].to_vec();
1618
1619 let summary_prompt =
1620 "Summarize the above conversation so far, focusing on key details and user intent.";
1621 let mut summary_req_history = to_summarize;
1622 summary_req_history.push(ChatMessage {
1623 role: "user".to_string(),
1624 content: summary_prompt.to_string(),
1625 });
1626
1627 match self.provider.call(&self.config, &summary_req_history).await {
1628 Ok(summary) => {
1629 let mut new_history = Vec::new();
1630 if let Some(sys) = self.history.first() {
1631 let mut new_sys = sys.clone();
1632 new_sys.content.push_str("\n\n[Previous Context Summary]: ");
1633 new_sys.content.push_str(&summary);
1634 new_history.push(new_sys);
1635 }
1636 new_history.extend(recent);
1637 self.history = new_history;
1638 info!(
1639 "Rolling Summary: Applied summary. New history len: {}",
1640 self.history.len()
1641 );
1642 }
1643 Err(e) => {
1644 warn!("Rolling Summary failed: {}", e);
1645 }
1646 }
1647 }
1648
1649 fn check_interruption(
1650 &mut self,
1651 event: &SessionEvent,
1652 is_filler: &Option<bool>,
1653 ) -> Option<Command> {
1654 let strategy = self.interruption_config.strategy;
1655 let should_check = match (strategy, event) {
1656 (InterruptionStrategy::None, _) => false,
1657 (InterruptionStrategy::Vad, SessionEvent::Speaking { .. }) => true,
1658 (InterruptionStrategy::Asr, SessionEvent::AsrDelta { .. }) => true,
1659 (InterruptionStrategy::Both, _) => true,
1660 _ => false,
1661 };
1662
1663 if !self.is_speaking || self.is_hanging_up || !should_check {
1664 return None;
1665 }
1666
1667 if let Some(last_start) = self.last_tts_start_at {
1669 let ignore_ms = self.interruption_config.ignore_first_ms.unwrap_or(800);
1670 if last_start.elapsed().as_millis() < ignore_ms as u128 {
1671 return None;
1672 }
1673 }
1674
1675 if self.interruption_config.filler_word_filter.unwrap_or(false) {
1677 if let Some(true) = is_filler {
1678 return None;
1679 }
1680 if let SessionEvent::AsrDelta { text, .. } = event {
1681 if is_likely_filler(text) {
1682 return None;
1683 }
1684 }
1685 }
1686
1687 if let Some(last_final) = self.last_asr_final_at {
1689 if last_final.elapsed().as_millis() < 500 {
1690 return None;
1691 }
1692 }
1693
1694 info!("Smart interruption detected, stopping playback");
1695 self.is_speaking = false;
1696 Some(Command::Interrupt {
1697 graceful: Some(true),
1698 fade_out_ms: self.interruption_config.volume_fade_ms,
1699 })
1700 }
1701
1702 async fn handle_silence(&mut self) -> Result<Vec<Command>> {
1703 let follow_up_config = if let Some(scene_id) = &self.current_scene_id {
1704 self.scenes
1705 .get(scene_id)
1706 .and_then(|s| s.follow_up)
1707 .or(self.global_follow_up_config)
1708 } else {
1709 self.global_follow_up_config
1710 };
1711
1712 let Some(config) = follow_up_config else {
1713 return Ok(vec![]);
1714 };
1715
1716 if self.is_speaking
1717 || self.last_interaction_at.elapsed().as_millis() < config.timeout as u128
1718 {
1719 return Ok(vec![]);
1720 }
1721
1722 if self.consecutive_follow_ups >= config.max_count {
1723 info!("Max follow-up count reached, hanging up");
1724 let headers = self.render_sip_headers().await;
1725 return Ok(vec![Command::Hangup {
1726 reason: Some("Max follow-up reached".to_string()),
1727 initiator: Some("system".to_string()),
1728 headers,
1729 }]);
1730 }
1731
1732 info!(
1733 "Silence timeout detected ({}ms), triggering follow-up ({}/{})",
1734 self.last_interaction_at.elapsed().as_millis(),
1735 self.consecutive_follow_ups + 1,
1736 config.max_count
1737 );
1738 self.consecutive_follow_ups += 1;
1739 self.last_interaction_at = std::time::Instant::now();
1740 self.generate_response().await
1741 }
1742
1743 async fn handle_function_call(&mut self, name: &str, arguments: &str) -> Result<Vec<Command>> {
1744 info!(
1745 "Function call from Realtime: {} with args {}",
1746 name, arguments
1747 );
1748 let args: serde_json::Value = serde_json::from_str(arguments).unwrap_or_default();
1749
1750 match name {
1751 "hangup_call" => {
1752 let headers = self.render_sip_headers().await;
1753 Ok(vec![Command::Hangup {
1754 reason: args["reason"].as_str().map(|s| s.to_string()),
1755 initiator: Some("ai".to_string()),
1756 headers,
1757 }])
1758 }
1759 "transfer_call" | "refer_call" => {
1760 if let Some(callee) = args["callee"]
1761 .as_str()
1762 .or_else(|| args["callee_uri"].as_str())
1763 {
1764 Ok(vec![Command::Refer {
1765 caller: String::new(),
1766 callee: callee.to_string(),
1767 options: None,
1768 }])
1769 } else {
1770 warn!("No callee provided for transfer_call");
1771 Ok(vec![])
1772 }
1773 }
1774 "goto_scene" => {
1775 if let Some(scene) = args["scene"].as_str() {
1776 self.switch_to_scene(scene, false).await
1777 } else {
1778 Ok(vec![])
1779 }
1780 }
1781 _ => {
1782 warn!("Unhandled function call: {}", name);
1783 Ok(vec![])
1784 }
1785 }
1786 }
1787
1788 async fn interpret_response(&mut self, initial: String) -> Result<Vec<Command>> {
1789 let mut tool_commands = Vec::new();
1790 let mut wait_input_timeout = None;
1791 let mut attempts = 0;
1792 let mut raw = initial;
1793
1794 let final_text = loop {
1795 attempts += 1;
1796
1797 let Some(structured) = parse_structured_response(&raw) else {
1798 break Some(raw);
1799 };
1800
1801 if wait_input_timeout.is_none() {
1802 wait_input_timeout = structured.wait_input_timeout;
1803 }
1804
1805 let mut rerun_for_rag = false;
1806 if let Some(tools) = structured.tools {
1807 for tool in tools {
1808 let needs_rerun = self
1809 .handle_tool_invocation(tool, &mut tool_commands)
1810 .await?;
1811 rerun_for_rag = rerun_for_rag || needs_rerun;
1812 }
1813 }
1814
1815 if !rerun_for_rag {
1816 break structured.text;
1817 }
1818
1819 if attempts >= MAX_RAG_ATTEMPTS {
1820 warn!("Reached RAG iteration limit, using last response");
1821 break structured.text.or(Some(raw));
1822 }
1823
1824 raw = self.call_llm().await?;
1825 };
1826
1827 let has_hangup = tool_commands
1828 .iter()
1829 .any(|c| matches!(c, Command::Hangup { .. }));
1830 let mut commands = Vec::new();
1831
1832 if let Some(text) = final_text {
1833 if !text.trim().is_empty() {
1834 self.history.push(ChatMessage {
1835 role: "assistant".to_string(),
1836 content: text.clone(),
1837 });
1838 self.last_tts_start_at = Some(std::time::Instant::now());
1839 self.is_speaking = true;
1840
1841 let auto_hangup = has_hangup.then_some(true);
1842 commands.push(self.create_tts_command(text, wait_input_timeout, auto_hangup));
1843
1844 if has_hangup {
1845 tool_commands.retain(|c| !matches!(c, Command::Hangup { .. }));
1846 self.is_hanging_up = true;
1847 }
1848 }
1849 }
1850
1851 commands.extend(tool_commands);
1852 Ok(commands)
1853 }
1854}
1855
1856fn parse_structured_response(raw: &str) -> Option<StructuredResponse> {
1857 let payload = extract_json_block(raw)?;
1858 serde_json::from_str(payload).ok()
1859}
1860
1861fn is_likely_filler(text: &str) -> bool {
1862 let trimmed = text.trim().to_lowercase();
1863 FILLERS.contains(&trimmed)
1864}
1865
1866fn extract_json_block(raw: &str) -> Option<&str> {
1867 let trimmed = raw.trim();
1868 if trimmed.starts_with('`') {
1869 if let Some(end) = trimmed.rfind("```") {
1870 if end <= 3 {
1871 return None;
1872 }
1873 let mut inner = &trimmed[3..end];
1874 inner = inner.trim();
1875 if inner.to_lowercase().starts_with("json") {
1876 if let Some(newline) = inner.find('\n') {
1877 inner = inner[newline + 1..].trim();
1878 } else if inner.len() > 4 {
1879 inner = inner[4..].trim();
1880 } else {
1881 inner = inner.trim();
1882 }
1883 }
1884 return Some(inner);
1885 }
1886 } else if trimmed.starts_with('{') || trimmed.starts_with('[') {
1887 return Some(trimmed);
1888 }
1889 None
1890}
1891
1892#[async_trait]
1893impl DialogueHandler for LlmHandler {
1894 async fn on_start(&mut self) -> Result<Vec<Command>> {
1895 self.last_tts_start_at = Some(std::time::Instant::now());
1896
1897 let mut commands = Vec::new();
1898
1899 if let Some(scene_id) = &self.current_scene_id {
1901 if let Some(scene) = self.scenes.get(scene_id) {
1902 if let Some(audio_file) = &scene.play {
1903 commands.push(Command::Play {
1904 url: audio_file.clone(),
1905 play_id: None,
1906 auto_hangup: None,
1907 wait_input_timeout: None,
1908 });
1909 }
1910 }
1911 }
1912
1913 if let Some(greeting) = &self.config.greeting {
1914 self.is_speaking = true;
1915 commands.push(self.create_tts_command(greeting.clone(), None, None));
1916 return Ok(commands);
1917 }
1918
1919 let response_commands = self.generate_response().await?;
1920 commands.extend(response_commands);
1921 Ok(commands)
1922 }
1923
1924 async fn on_event(&mut self, event: &SessionEvent) -> Result<Vec<Command>> {
1925 if self.collector_state.is_some() {
1927 match event {
1928 SessionEvent::Dtmf { digit, .. } => {
1929 info!("DTMF received (collecting): {}", digit);
1930 return self.handle_collector_digit(digit).await;
1931 }
1932 SessionEvent::Silence { .. } => {
1933 return self.check_collector_timeout().await;
1935 }
1936 SessionEvent::TrackEnd { .. } => {
1937 self.is_speaking = false;
1938 return Ok(vec![]);
1939 }
1940 SessionEvent::TrackStart { .. } => {
1941 self.is_speaking = true;
1942 return Ok(vec![]);
1943 }
1944 SessionEvent::Hangup { .. } => {
1945 self.collector_state = None;
1947 }
1948 SessionEvent::AsrFinal { .. }
1950 | SessionEvent::AsrDelta { .. }
1951 | SessionEvent::Speaking { .. }
1952 | SessionEvent::Eou { .. } => {
1953 let interruptible = self
1954 .collector_state
1955 .as_ref()
1956 .and_then(|s| s.config.interruptible)
1957 .unwrap_or(false);
1958 if !interruptible {
1959 return Ok(vec![]);
1960 }
1961 }
1963 _ => return Ok(vec![]),
1964 }
1965 }
1966
1967 match event {
1968 SessionEvent::Dtmf { digit, .. } => {
1969 info!("DTMF received: {}", digit);
1970 if let Some(action) = self.get_dtmf_action(digit) {
1971 self.handle_dtmf_action(action).await
1972 } else {
1973 Ok(vec![])
1974 }
1975 }
1976 SessionEvent::AsrFinal { text, .. } => self.handle_asr_final(text).await,
1977 SessionEvent::AsrDelta { is_filler, .. } | SessionEvent::Speaking { is_filler, .. } => {
1978 Ok(self
1979 .check_interruption(event, is_filler)
1980 .into_iter()
1981 .collect())
1982 }
1983 SessionEvent::Eou { completed, .. } => {
1984 if *completed && !self.is_speaking {
1985 info!("EOU detected, triggering early response");
1986 self.generate_response().await
1987 } else {
1988 Ok(vec![])
1989 }
1990 }
1991 SessionEvent::Silence { .. } => self.handle_silence().await,
1992 SessionEvent::TrackStart { .. } => {
1993 self.is_speaking = true;
1994 Ok(vec![])
1995 }
1996 SessionEvent::TrackEnd { .. } => {
1997 self.is_speaking = false;
1998 self.is_hanging_up = false;
1999 self.last_interaction_at = std::time::Instant::now();
2000 Ok(vec![])
2001 }
2002 SessionEvent::FunctionCall {
2003 name, arguments, ..
2004 } => self.handle_function_call(name, arguments).await,
2005 _ => Ok(vec![]),
2006 }
2007 }
2008
2009 async fn get_history(&self) -> Vec<ChatMessage> {
2010 self.history.clone()
2011 }
2012
2013 async fn summarize(&mut self, prompt: &str) -> Result<String> {
2014 info!("Generating summary with prompt: {}", prompt);
2015 let mut summary_history = self.history.clone();
2016 summary_history.push(ChatMessage {
2017 role: "user".to_string(),
2018 content: prompt.to_string(),
2019 });
2020
2021 self.provider.call(&self.config, &summary_history).await
2022 }
2023}