1use crate::call::Command;
2use crate::event::SessionEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use futures::StreamExt;
6use once_cell::sync::Lazy;
7use regex::Regex;
8use reqwest::Client;
9use serde_json::json;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tracing::{info, warn};
13
14#[cfg(test)]
15mod tests;
16
17static RE_HANGUP: Lazy<Regex> = Lazy::new(|| Regex::new(r"<hangup\s*/>").unwrap());
18static RE_REFER: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<refer\s+to="([^"]+)"\s*/>"#).unwrap());
19static RE_PLAY: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<play\s+file="([^"]+)"\s*/>"#).unwrap());
20static RE_GOTO: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<goto\s+scene="([^"]+)"\s*/>"#).unwrap());
21static RE_SET_VAR: Lazy<Regex> =
22 Lazy::new(|| Regex::new(r#"<set_var\s+key="([^"]+)"\s+value=["'](.+?)["']\s*/>"#).unwrap());
23static RE_HTTP: Lazy<Regex> = Lazy::new(|| {
24 Regex::new(r#"<http\s+url="([^"]+)"(?:\s+method="([^"]+)")?(?:\s+body="([^"]+)")?\s*/>"#)
25 .unwrap()
26});
27static RE_SENTENCE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)[.!?。!?\n]\s*").unwrap());
28static FILLERS: Lazy<std::collections::HashSet<String>> = Lazy::new(|| {
29 let mut s = std::collections::HashSet::new();
30 let default_fillers = ["嗯", "啊", "哦", "那个", "那个...", "uh", "um", "ah"];
31
32 if let Ok(content) = std::fs::read_to_string("config/fillers.txt") {
33 for line in content.lines() {
34 let trimmed = line.trim().to_lowercase();
35 if !trimmed.is_empty() {
36 s.insert(trimmed);
37 }
38 }
39 }
40
41 if s.is_empty() {
42 for f in default_fillers {
43 s.insert(f.to_string());
44 }
45 }
46 s
47});
48
49use super::ChatMessage;
50use super::InterruptionStrategy;
51use super::LlmConfig;
52use super::dialogue::DialogueHandler;
53
54pub mod provider;
55pub mod rag;
56pub mod types;
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59enum CommandKind {
60 Hangup,
61 Refer,
62 Sentence,
63 Play,
64 Goto,
65 SetVar,
66 Http,
67}
68
69pub use provider::*;
70pub use rag::*;
71pub use types::*;
72
73const MAX_RAG_ATTEMPTS: usize = 3;
74
75pub struct LlmHandler {
76 config: LlmConfig,
77 interruption_config: super::InterruptionConfig,
78 global_follow_up_config: Option<super::FollowUpConfig>,
79 dtmf_config: Option<HashMap<String, super::DtmfAction>>,
80 history: Vec<ChatMessage>,
81 provider: Arc<dyn LlmProvider>,
82 rag_retriever: Arc<dyn RagRetriever>,
83 is_speaking: bool,
84 is_hanging_up: bool,
85 consecutive_follow_ups: u32,
86 last_interaction_at: std::time::Instant,
87 event_sender: Option<crate::event::EventSender>,
88 last_asr_final_at: Option<std::time::Instant>,
89 last_tts_start_at: Option<std::time::Instant>,
90 last_robot_msg_at: Option<std::time::Instant>,
91 call: Option<crate::call::ActiveCallRef>,
92 scenes: HashMap<String, super::Scene>,
93 current_scene_id: Option<String>,
94 client: Client,
95 sip_config: Option<crate::SipOption>,
96}
97
98impl LlmHandler {
99 pub fn new(
100 config: LlmConfig,
101 interruption: super::InterruptionConfig,
102 global_follow_up_config: Option<super::FollowUpConfig>,
103 scenes: HashMap<String, super::Scene>,
104 dtmf: Option<HashMap<String, super::DtmfAction>>,
105 initial_scene_id: Option<String>,
106 sip_config: Option<crate::SipOption>,
107 ) -> Self {
108 Self::with_provider(
109 config,
110 Arc::new(DefaultLlmProvider::new()),
111 Arc::new(NoopRagRetriever),
112 interruption,
113 global_follow_up_config,
114 scenes,
115 dtmf,
116 initial_scene_id,
117 sip_config,
118 )
119 }
120
121 pub fn with_provider(
122 config: LlmConfig,
123 provider: Arc<dyn LlmProvider>,
124 rag_retriever: Arc<dyn RagRetriever>,
125 interruption: super::InterruptionConfig,
126 global_follow_up_config: Option<super::FollowUpConfig>,
127 scenes: HashMap<String, super::Scene>,
128 dtmf: Option<HashMap<String, super::DtmfAction>>,
129 initial_scene_id: Option<String>,
130 sip_config: Option<crate::SipOption>,
131 ) -> Self {
132 let mut history = Vec::new();
133 let system_prompt = Self::build_system_prompt(&config, None);
134
135 history.push(ChatMessage {
136 role: "system".to_string(),
137 content: system_prompt,
138 });
139
140 Self {
141 config,
142 interruption_config: interruption,
143 global_follow_up_config,
144 dtmf_config: dtmf,
145 history,
146 provider,
147 rag_retriever,
148 is_speaking: false,
149 is_hanging_up: false,
150 consecutive_follow_ups: 0,
151 last_interaction_at: std::time::Instant::now(),
152 event_sender: None,
153 last_asr_final_at: None,
154 last_tts_start_at: None,
155 last_robot_msg_at: None,
156 call: None,
157 scenes,
158 current_scene_id: initial_scene_id,
159 client: Client::new(),
160 sip_config,
161 }
162 }
163
164 fn build_system_prompt(config: &LlmConfig, scene_prompt: Option<&str>) -> String {
165 let base_prompt =
166 scene_prompt.unwrap_or_else(|| config.prompt.as_deref().unwrap_or_default());
167 let mut features_prompt = String::new();
168
169 if let Some(features) = &config.features {
170 let lang = config.language.as_deref().unwrap_or("zh");
171 for feature in features {
172 match Self::load_feature_snippet(feature, lang) {
173 Ok(snippet) => {
174 features_prompt.push_str(&format!("\n- {}", snippet));
175 }
176 Err(e) => {
177 warn!("Failed to load feature snippet {}: {}", feature, e);
178 }
179 }
180 }
181 }
182
183 let features_section = if features_prompt.is_empty() {
184 String::new()
185 } else {
186 format!("\n\n### Enhanced Capabilities:{}\n", features_prompt)
187 };
188
189 let tool_instructions = if let Some(custom) = &config.tool_instructions {
191 custom.clone()
192 } else {
193 let lang = config.language.as_deref().unwrap_or("zh");
194 Self::load_feature_snippet("tool_instructions", lang)
195 .unwrap_or_else(|_| {
196 Self::load_feature_snippet("tool_instructions", "en")
198 .unwrap_or_else(|_| {
199 "Tool usage instructions:\n\
201 - To hang up the call, output: <hangup/>\n\
202 - To transfer the call, output: <refer to=\"sip:xxxx\"/>\n\
203 - To play an audio file, output: <play file=\"path/to/file.wav\"/>\n\
204 - To switch to another scene, output: <goto scene=\"scene_id\"/>\n\
205 - To call an external HTTP API, output JSON:\n\
206 ```json\n\
207 {{ \"tools\": [{{ \"name\": \"http\", \"url\": \"...\", \"method\": \"POST\", \"body\": {{ ... }} }}] }}\n\
208 ```\n\
209 Please use XML tags for simple actions and JSON blocks for tool calls. \
210 Output your response in short sentences. Each sentence will be played as soon as it is finished."
211 .to_string()
212 })
213 })
214 };
215
216 format!(
217 "{}{}\n\n{}",
218 base_prompt, features_section, tool_instructions
219 )
220 }
221
222 fn load_feature_snippet(feature: &str, lang: &str) -> Result<String> {
223 let path = format!("features/{}.{}.md", feature, lang);
224 let content = std::fs::read_to_string(path)?;
225 Ok(content.trim().to_string())
226 }
227
228 fn get_dtmf_action(&self, digit: &str) -> Option<super::DtmfAction> {
229 if let Some(scene_id) = &self.current_scene_id {
230 if let Some(scene) = self.scenes.get(scene_id) {
231 if let Some(dtmf) = &scene.dtmf {
232 if let Some(action) = dtmf.get(digit) {
233 return Some(action.clone());
234 }
235 }
236 }
237 }
238
239 if let Some(dtmf) = &self.dtmf_config {
240 if let Some(action) = dtmf.get(digit) {
241 return Some(action.clone());
242 }
243 }
244
245 None
246 }
247
248 async fn handle_dtmf_action(&mut self, action: super::DtmfAction) -> Result<Vec<Command>> {
249 match action {
250 super::DtmfAction::Goto { scene } => {
251 info!("DTMF action: switch to scene {}", scene);
252 self.switch_to_scene(&scene, true).await
253 }
254 super::DtmfAction::Transfer { target } => {
255 info!("DTMF action: transfer to {}", target);
256 Ok(vec![Command::Refer {
257 caller: String::new(),
258 callee: target,
259 options: None,
260 }])
261 }
262 super::DtmfAction::Hangup => {
263 info!("DTMF action: hangup");
264 let headers = self.render_sip_headers().await;
265 Ok(vec![Command::Hangup {
266 reason: Some("DTMF Hangup".to_string()),
267 initiator: Some("ai".to_string()),
268 headers,
269 }])
270 }
271 }
272 }
273
274 async fn switch_to_scene(
275 &mut self,
276 scene_id: &str,
277 trigger_response: bool,
278 ) -> Result<Vec<Command>> {
279 if let Some(scene) = self.scenes.get(scene_id).cloned() {
280 info!("Switching to scene: {}", scene_id);
281 self.current_scene_id = Some(scene_id.to_string());
282 let system_prompt = Self::build_system_prompt(&self.config, Some(&scene.prompt));
283 if let Some(first_msg) = self.history.get_mut(0) {
284 if first_msg.role == "system" {
285 first_msg.content = system_prompt;
286 }
287 }
288
289 let mut commands = Vec::new();
290 if let Some(url) = &scene.play {
291 commands.push(Command::Play {
292 url: url.clone(),
293 play_id: None,
294 auto_hangup: None,
295 wait_input_timeout: None,
296 });
297 }
298
299 if trigger_response {
300 let response_cmds = self.generate_response().await?;
301 commands.extend(response_cmds);
302 }
303 Ok(commands)
304 } else {
305 warn!("Scene not found: {}", scene_id);
306 Ok(vec![])
307 }
308 }
309
310 pub fn get_history_ref(&self) -> &[ChatMessage] {
311 &self.history
312 }
313
314 pub fn get_current_scene_id(&self) -> Option<String> {
315 self.current_scene_id.clone()
316 }
317
318 pub fn set_call(&mut self, call: crate::call::ActiveCallRef) {
319 self.call = Some(call);
320 }
321
322 pub fn set_event_sender(&mut self, sender: crate::event::EventSender) {
323 self.event_sender = Some(sender.clone());
324 if let Some(greeting) = &self.config.greeting {
325 let _ = sender.send(crate::event::SessionEvent::AddHistory {
326 sender: Some("system".to_string()),
327 timestamp: crate::media::get_timestamp(),
328 speaker: "assistant".to_string(),
329 text: greeting.clone(),
330 });
331 }
332 }
333
334 fn send_debug_event(&self, key: &str, data: serde_json::Value) {
335 if let Some(sender) = &self.event_sender {
336 let timestamp = crate::media::get_timestamp();
337 if key == "llm_response" {
338 if let Some(text) = data.get("response").and_then(|v| v.as_str()) {
339 let _ = sender.send(crate::event::SessionEvent::AddHistory {
340 sender: Some("llm".to_string()),
341 timestamp,
342 speaker: "assistant".to_string(),
343 text: text.to_string(),
344 });
345 }
346 }
347
348 let event = crate::event::SessionEvent::Metrics {
349 timestamp,
350 key: key.to_string(),
351 duration: 0,
352 data,
353 };
354 let _ = sender.send(event);
355 }
356 }
357
358 async fn call_llm(&self) -> Result<String> {
359 self.provider.call(&self.config, &self.history).await
360 }
361
362 fn create_tts_command(
363 &self,
364 text: String,
365 wait_input_timeout: Option<u32>,
366 auto_hangup: Option<bool>,
367 ) -> Command {
368 let timeout = wait_input_timeout.unwrap_or(10000);
369 let play_id = uuid::Uuid::new_v4().to_string();
370
371 if let Some(sender) = &self.event_sender {
372 let _ = sender.send(crate::event::SessionEvent::Metrics {
373 timestamp: crate::media::get_timestamp(),
374 key: "tts_play_id_map".to_string(),
375 duration: 0,
376 data: serde_json::json!({
377 "playId": play_id,
378 "text": text,
379 }),
380 });
381 }
382
383 Command::Tts {
384 text,
385 speaker: None,
386 play_id: Some(play_id),
387 auto_hangup,
388 streaming: None,
389 end_of_stream: Some(true),
390 option: None,
391 wait_input_timeout: Some(timeout),
392 base64: None,
393 cache_key: None,
394 }
395 }
396
397 async fn generate_response(&mut self) -> Result<Vec<Command>> {
398 let start_time = crate::media::get_timestamp();
399 let play_id = uuid::Uuid::new_v4().to_string();
400
401 self.send_debug_event(
403 "llm_call_start",
404 json!({
405 "history_length": self.history.len(),
406 "playId": play_id,
407 }),
408 );
409
410 let mut stream = self
411 .provider
412 .call_stream(&self.config, &self.history)
413 .await?;
414
415 let mut full_content = String::new();
416 let mut full_reasoning = String::new();
417 let mut buffer = String::new();
418 let mut commands = Vec::new();
419 let mut is_json_mode = false;
420 let mut checked_json_mode = false;
421 let mut first_token_time = None;
422
423 while let Some(chunk_result) = stream.next().await {
424 let event = match chunk_result {
425 Ok(c) => c,
426 Err(e) => {
427 warn!("LLM stream error: {}", e);
428 break;
429 }
430 };
431
432 match event {
433 LlmStreamEvent::Reasoning(text) => {
434 full_reasoning.push_str(&text);
435 }
436 LlmStreamEvent::Content(chunk) => {
437 if first_token_time.is_none() && !chunk.trim().is_empty() {
438 first_token_time = Some(crate::media::get_timestamp());
439 }
440
441 full_content.push_str(&chunk);
442 buffer.push_str(&chunk);
443
444 if !checked_json_mode {
445 let trimmed = full_content.trim();
446 if !trimmed.is_empty() {
447 if trimmed.starts_with('{') || trimmed.starts_with('`') {
448 is_json_mode = true;
449 }
450 checked_json_mode = true;
451 }
452 }
453
454 if checked_json_mode && !is_json_mode {
455 let extracted = self
456 .extract_streaming_commands(&mut buffer, &play_id, false)
457 .await;
458 for cmd in extracted {
459 if let Some(call) = &self.call {
460 let _ = call.enqueue_command(cmd).await;
461 } else {
462 commands.push(cmd);
463 }
464 }
465 }
466 }
467 }
468 }
469
470 let end_time = crate::media::get_timestamp();
472 self.send_debug_event(
473 "llm_response",
474 json!({
475 "response": full_content,
476 "reasoning": full_reasoning,
477 "is_json_mode": is_json_mode,
478 "duration": end_time - start_time,
479 "ttfb": first_token_time.map(|t| t - start_time).unwrap_or(0),
480 "playId": play_id,
481 }),
482 );
483
484 if is_json_mode {
485 self.interpret_response(full_content).await
486 } else {
487 let extracted = self
488 .extract_streaming_commands(&mut buffer, &play_id, true)
489 .await;
490 for cmd in extracted {
491 if let Some(call) = &self.call {
492 let _ = call.enqueue_command(cmd).await;
493 } else {
494 commands.push(cmd);
495 }
496 }
497 if !full_content.trim().is_empty() {
498 self.history.push(ChatMessage {
499 role: "assistant".to_string(),
500 content: full_content,
501 });
502 self.last_robot_msg_at = Some(std::time::Instant::now());
503 self.is_speaking = true;
504 self.last_tts_start_at = Some(std::time::Instant::now());
505 }
506 Ok(commands)
507 }
508 }
509
510 async fn extract_streaming_commands(
511 &mut self,
512 buffer: &mut String,
513 play_id: &str,
514 is_final: bool,
515 ) -> Vec<Command> {
516 let mut commands = Vec::new();
517 let mut pending_hangup: Option<(String, usize)> = None; loop {
520 let hangup_pos = RE_HANGUP.find(buffer);
521 let refer_pos = RE_REFER.captures(buffer);
522 let play_pos = RE_PLAY.captures(buffer);
523 let goto_pos = RE_GOTO.captures(buffer);
524 let set_var_pos = RE_SET_VAR.captures(buffer);
525 let http_pos = RE_HTTP.captures(buffer);
526 let sentence_pos = RE_SENTENCE.find(buffer);
527
528 let mut positions: Vec<(usize, CommandKind)> = Vec::new();
530 if let Some(m) = hangup_pos {
531 positions.push((m.start(), CommandKind::Hangup));
532 }
533 if let Some(caps) = &refer_pos {
534 positions.push((caps.get(0).unwrap().start(), CommandKind::Refer));
535 }
536 if let Some(caps) = &play_pos {
537 positions.push((caps.get(0).unwrap().start(), CommandKind::Play));
538 }
539 if let Some(caps) = &goto_pos {
540 positions.push((caps.get(0).unwrap().start(), CommandKind::Goto));
541 }
542 if let Some(caps) = &set_var_pos {
543 positions.push((caps.get(0).unwrap().start(), CommandKind::SetVar));
544 }
545 if let Some(caps) = &http_pos {
546 positions.push((caps.get(0).unwrap().start(), CommandKind::Http));
547 }
548 if let Some(m) = sentence_pos {
549 positions.push((m.start(), CommandKind::Sentence));
550 }
551
552 positions.sort_by_key(|p| p.0);
553
554 if let Some((pos, kind)) = positions.first() {
555 let pos = *pos;
556 match kind {
557 CommandKind::SetVar => {
558 let caps = RE_SET_VAR.captures(buffer).unwrap();
559 let mat = caps.get(0).unwrap();
560 let key = caps.get(1).unwrap().as_str().to_string();
561 let value = caps.get(2).unwrap().as_str().to_string();
562
563 let prefix = buffer[..pos].to_string();
564 if !prefix.trim().is_empty() {
565 commands.push(self.create_tts_command_with_id(
566 prefix,
567 play_id.to_string(),
568 None,
569 ));
570 }
571
572 if let Some(call) = &self.call {
573 let mut state = call.call_state.write().await;
574 let mut extras = state.extras.take().unwrap_or_default();
575 extras.insert(key, serde_json::Value::String(value));
576 state.extras = Some(extras);
577 }
578
579 buffer.drain(..mat.end());
580 }
581 CommandKind::Http => {
582 let caps = RE_HTTP.captures(buffer).unwrap();
583 let mat = caps.get(0).unwrap();
584 let url = caps.get(1).unwrap().as_str().to_string();
585 let method = caps
586 .get(2)
587 .map(|m| m.as_str().to_string())
588 .unwrap_or("GET".to_string());
589 let body = caps.get(3).map(|m| m.as_str().to_string());
590
591 let prefix = buffer[..pos].to_string();
593 if !prefix.trim().is_empty() {
594 commands.push(self.create_tts_command_with_id(
595 prefix,
596 play_id.to_string(),
597 None,
598 ));
599 }
600
601 let client = self.client.clone();
603 let mut req = match method.to_uppercase().as_str() {
604 "POST" => client.post(&url),
605 "PUT" => client.put(&url),
606 _ => client.get(&url),
607 };
608
609 if let Some(b) = body {
610 req = req.body(b);
611 }
612
613 match req.send().await {
615 Ok(res) => {
616 let status = res.status();
617 let text = res.text().await.unwrap_or_default();
618 info!(url, method, status=?status, "HTTP command executed from stream");
619
620 self.history.push(ChatMessage {
622 role: "system".to_string(),
623 content: format!(
624 "HTTP {} {} returned ({}): {}",
625 method, url, status, text
626 ),
627 });
628 }
629 Err(e) => {
630 warn!(
631 url,
632 method, "Failed to execute HTTP command from stream: {}", e
633 );
634
635 self.history.push(ChatMessage {
637 role: "system".to_string(),
638 content: format!("HTTP {} {} failed: {}", method, url, e),
639 });
640 }
641 }
642
643 buffer.drain(..mat.end());
644 }
645 CommandKind::Hangup => {
646 let prefix = buffer[..pos].to_string();
649 let hangup_match = RE_HANGUP.find(buffer).unwrap();
650 pending_hangup = Some((prefix, hangup_match.end()));
651 buffer.drain(..hangup_match.end());
652
653 }
656 CommandKind::Refer => {
657 let caps = RE_REFER.captures(buffer).unwrap();
658 let mat = caps.get(0).unwrap();
659 let callee = caps.get(1).unwrap().as_str().to_string();
660
661 let prefix = buffer[..pos].to_string();
662 if !prefix.trim().is_empty() {
663 commands.push(self.create_tts_command_with_id(
664 prefix,
665 play_id.to_string(),
666 None,
667 ));
668 }
669 commands.push(Command::Refer {
670 caller: String::new(),
671 callee,
672 options: None,
673 });
674 buffer.drain(..mat.end());
675 }
676 CommandKind::Play => {
677 let caps = RE_PLAY.captures(buffer).unwrap();
679 let mat = caps.get(0).unwrap();
680 let url = caps.get(1).unwrap().as_str().to_string();
681
682 let prefix = buffer[..pos].to_string();
683 if !prefix.trim().is_empty() {
684 commands.push(self.create_tts_command_with_id(
685 prefix,
686 play_id.to_string(),
687 None,
688 ));
689 }
690 commands.push(Command::Play {
691 url,
692 play_id: None,
693 auto_hangup: None,
694 wait_input_timeout: None,
695 });
696 buffer.drain(..mat.end());
697 }
698 CommandKind::Goto => {
699 let caps = RE_GOTO.captures(buffer).unwrap();
701 let mat = caps.get(0).unwrap();
702 let scene_id = caps.get(1).unwrap().as_str().to_string();
703
704 let prefix = buffer[..pos].to_string();
705 if !prefix.trim().is_empty() {
706 commands.push(self.create_tts_command_with_id(
707 prefix,
708 play_id.to_string(),
709 None,
710 ));
711 }
712
713 info!("Switching to scene (from stream): {}", scene_id);
714 if let Some(scene) = self.scenes.get(&scene_id) {
715 self.current_scene_id = Some(scene_id);
716 let system_prompt =
718 Self::build_system_prompt(&self.config, Some(&scene.prompt));
719 if let Some(first_msg) = self.history.get_mut(0) {
720 if first_msg.role == "system" {
721 first_msg.content = system_prompt;
722 }
723 }
724 } else {
725 warn!("Scene not found: {}", scene_id);
726 }
727
728 buffer.drain(..mat.end());
729 }
730 CommandKind::Sentence => {
731 let mat = sentence_pos.unwrap();
733 let sentence = buffer[..mat.end()].to_string();
734 if !sentence.trim().is_empty() {
735 commands.push(self.create_tts_command_with_id(
736 sentence,
737 play_id.to_string(),
738 None,
739 ));
740 }
741 buffer.drain(..mat.end());
742 }
743 }
744 } else {
745 break;
746 }
747 }
748
749 if let Some((prefix, _)) = pending_hangup {
751 if !prefix.trim().is_empty() {
752 let mut cmd =
753 self.create_tts_command_with_id(prefix, play_id.to_string(), Some(true));
754 if let Command::Tts { end_of_stream, .. } = &mut cmd {
755 *end_of_stream = Some(true);
756 }
757 self.is_hanging_up = true;
758 commands.push(cmd);
759 } else {
760 let mut cmd = self.create_tts_command_with_id(
761 "".to_string(),
762 play_id.to_string(),
763 Some(true),
764 );
765 if let Command::Tts { end_of_stream, .. } = &mut cmd {
766 *end_of_stream = Some(true);
767 }
768 self.is_hanging_up = true;
769 commands.push(cmd);
770 }
771
772 let headers = self.render_sip_headers().await;
773 commands.push(Command::Hangup {
774 reason: Some("AI Hangup".to_string()),
775 initiator: Some("ai".to_string()),
776 headers,
777 });
778
779 return commands;
780 }
781
782 if is_final {
783 let remaining = buffer.trim().to_string();
784 if !remaining.is_empty() {
785 commands.push(self.create_tts_command_with_id(
786 remaining,
787 play_id.to_string(),
788 None,
789 ));
790 }
791 buffer.clear();
792
793 if let Some(last) = commands.last_mut() {
794 if let Command::Tts { end_of_stream, .. } = last {
795 *end_of_stream = Some(true);
796 }
797 } else if !self.is_hanging_up {
798 commands.push(Command::Tts {
799 text: "".to_string(),
800 speaker: None,
801 play_id: Some(play_id.to_string()),
802 auto_hangup: None,
803 streaming: Some(true),
804 end_of_stream: Some(true),
805 option: None,
806 wait_input_timeout: None,
807 base64: None,
808 cache_key: None,
809 });
810 }
811 }
812
813 commands
814 }
815
816 fn create_tts_command_with_id(
817 &self,
818 text: String,
819 play_id: String,
820 auto_hangup: Option<bool>,
821 ) -> Command {
822 Command::Tts {
823 text,
824 speaker: None,
825 play_id: Some(play_id),
826 auto_hangup,
827 streaming: Some(true),
828 end_of_stream: None,
829 option: None,
830 wait_input_timeout: Some(10000),
831 base64: None,
832 cache_key: None,
833 }
834 }
835
836 async fn handle_tool_invocation(
837 &mut self,
838 tool: ToolInvocation,
839 tool_commands: &mut Vec<Command>,
840 ) -> Result<bool> {
841 match tool {
842 ToolInvocation::Hangup {
843 ref reason,
844 ref initiator,
845 } => {
846 self.send_debug_event(
847 "tool_invocation",
848 json!({
849 "tool": "Hangup",
850 "params": {
851 "reason": reason,
852 "initiator": initiator,
853 }
854 }),
855 );
856
857 let headers = self.render_sip_headers().await;
858
859 tool_commands.push(Command::Hangup {
860 reason: reason.clone(),
861 initiator: initiator.clone(),
862 headers,
863 });
864 Ok(false)
865 }
866 ToolInvocation::Refer {
867 ref caller,
868 ref callee,
869 ref options,
870 } => {
871 self.send_debug_event(
872 "tool_invocation",
873 json!({
874 "tool": "Refer",
875 "params": {
876 "caller": caller,
877 "callee": callee,
878 }
879 }),
880 );
881 tool_commands.push(Command::Refer {
882 caller: caller.clone(),
883 callee: callee.clone(),
884 options: options.clone(),
885 });
886 Ok(false)
887 }
888 ToolInvocation::Rag {
889 ref query,
890 ref source,
891 } => {
892 self.handle_rag_tool(query, source).await?;
893 Ok(true)
894 }
895 ToolInvocation::Accept { ref options } => {
896 self.send_debug_event("tool_invocation", json!({ "tool": "Accept" }));
897 tool_commands.push(Command::Accept {
898 option: options.clone().unwrap_or_default(),
899 });
900 Ok(false)
901 }
902 ToolInvocation::Reject { ref reason, code } => {
903 self.send_debug_event(
904 "tool_invocation",
905 json!({
906 "tool": "Reject",
907 "params": {
908 "reason": reason,
909 "code": code,
910 }
911 }),
912 );
913 tool_commands.push(Command::Reject {
914 reason: reason
915 .clone()
916 .unwrap_or_else(|| "Rejected by agent".to_string()),
917 code,
918 });
919 Ok(false)
920 }
921 ToolInvocation::Http {
922 ref url,
923 ref method,
924 ref body,
925 ref headers,
926 } => {
927 self.handle_http_tool(url, method, body, headers).await?;
928 Ok(true)
929 }
930 }
931 }
932
933 async fn render_sip_headers(&self) -> Option<HashMap<String, String>> {
934 let hangup_template = self.sip_config.as_ref()?.hangup_headers.as_ref()?;
935 let call = self.call.as_ref()?;
936 let state = call.call_state.read().await;
937
938 let mut context = HashMap::new();
939 let mut sip_headers = HashMap::new();
940
941 let sip_header_keys: Vec<String> = state
944 .extras
945 .as_ref()
946 .and_then(|e| e.get("_sip_header_keys"))
947 .and_then(|v| serde_json::from_value(v.clone()).ok())
948 .unwrap_or_default();
949
950 if let Some(extras) = &state.extras {
951 for (k, v) in extras {
952 if k.starts_with('_') {
954 continue;
955 }
956 context.insert(k.clone(), v.clone());
957 if sip_header_keys.contains(k) {
959 sip_headers.insert(k.clone(), v.clone());
960 }
961 }
962 }
963
964 context.insert(
966 "sip".to_string(),
967 serde_json::to_value(&sip_headers).unwrap_or(serde_json::Value::Null),
968 );
969
970 let env = minijinja::Environment::new();
971 let mut rendered_headers = HashMap::new();
972 for (k, v) in hangup_template {
973 if let Ok(rendered) = env.render_str(v, &context) {
974 rendered_headers.insert(k.clone(), rendered);
975 } else {
976 rendered_headers.insert(k.clone(), v.clone());
977 }
978 }
979 Some(rendered_headers)
980 }
981
982 async fn handle_rag_tool(&mut self, query: &str, source: &Option<String>) -> Result<()> {
983 self.send_debug_event(
984 "tool_invocation",
985 json!({
986 "tool": "Rag",
987 "params": {
988 "query": query,
989 "source": source,
990 }
991 }),
992 );
993
994 let rag_result = self.rag_retriever.retrieve(query).await?;
995
996 self.send_debug_event(
997 "rag_result",
998 json!({
999 "query": query,
1000 "result": rag_result,
1001 }),
1002 );
1003
1004 let summary = if let Some(source) = source {
1005 format!("[{}] {}", source, rag_result)
1006 } else {
1007 rag_result
1008 };
1009
1010 self.history.push(ChatMessage {
1011 role: "system".to_string(),
1012 content: format!("RAG result for {}: {}", query, summary),
1013 });
1014
1015 Ok(())
1016 }
1017
1018 async fn handle_http_tool(
1019 &mut self,
1020 url: &str,
1021 method: &Option<String>,
1022 body: &Option<serde_json::Value>,
1023 headers: &Option<HashMap<String, String>>,
1024 ) -> Result<()> {
1025 let method_str = method.as_deref().unwrap_or("GET").to_uppercase();
1026 let method =
1027 reqwest::Method::from_bytes(method_str.as_bytes()).unwrap_or(reqwest::Method::GET);
1028
1029 self.send_debug_event(
1030 "tool_invocation",
1031 json!({
1032 "tool": "Http",
1033 "params": {
1034 "url": url,
1035 "method": method_str,
1036 }
1037 }),
1038 );
1039
1040 let mut req = self.client.request(method, url);
1041 if let Some(body) = body {
1042 req = req.json(body);
1043 }
1044 if let Some(headers) = headers {
1045 for (k, v) in headers {
1046 req = req.header(k, v);
1047 }
1048 }
1049
1050 match req.send().await {
1051 Ok(res) => {
1052 let status = res.status();
1053 let text = res.text().await.unwrap_or_default();
1054 self.history.push(ChatMessage {
1055 role: "system".to_string(),
1056 content: format!("HTTP tool response ({}): {}", status, text),
1057 });
1058 }
1059 Err(e) => {
1060 warn!("HTTP tool failed: {}", e);
1061 self.history.push(ChatMessage {
1062 role: "system".to_string(),
1063 content: format!("HTTP tool failed: {}", e),
1064 });
1065 }
1066 }
1067
1068 Ok(())
1069 }
1070
1071 async fn handle_asr_final(&mut self, text: &str) -> Result<Vec<Command>> {
1072 if text.trim().is_empty() {
1073 return Ok(vec![]);
1074 }
1075
1076 self.apply_context_repair(text);
1077 self.apply_rolling_summary().await;
1078
1079 self.last_asr_final_at = Some(std::time::Instant::now());
1080 self.last_interaction_at = std::time::Instant::now();
1081 self.is_speaking = false;
1082 self.consecutive_follow_ups = 0;
1083
1084 self.generate_response().await
1085 }
1086
1087 fn apply_context_repair(&mut self, text: &str) {
1088 let enable_repair = self
1089 .config
1090 .features
1091 .as_ref()
1092 .map(|f| f.contains(&"context_repair".to_string()))
1093 .unwrap_or(false);
1094
1095 if !enable_repair {
1096 self.history.push(ChatMessage {
1097 role: "user".to_string(),
1098 content: text.to_string(),
1099 });
1100 return;
1101 }
1102
1103 let repair_window_ms = self.config.repair_window_ms.unwrap_or(3000) as u128;
1104 let mut merged = false;
1105
1106 if let Some(last_robot_at) = self.last_robot_msg_at {
1107 if last_robot_at.elapsed().as_millis() < repair_window_ms {
1108 if let Some(last_msg) = self.history.last() {
1109 if last_msg.role == "assistant" && last_msg.content.chars().count() < 15 {
1110 info!(
1111 "Context Repair: Detected potential fragmentation. Triggering merge."
1112 );
1113 self.history.pop();
1114 if let Some(prev_user) = self.history.last_mut() {
1115 if prev_user.role == "user" {
1116 prev_user.content.push_str(",");
1117 prev_user.content.push_str(text);
1118 merged = true;
1119 }
1120 }
1121 }
1122 }
1123 }
1124 }
1125
1126 if !merged {
1127 self.history.push(ChatMessage {
1128 role: "user".to_string(),
1129 content: text.to_string(),
1130 });
1131 }
1132 }
1133
1134 async fn apply_rolling_summary(&mut self) {
1135 let enable_summary = self
1136 .config
1137 .features
1138 .as_ref()
1139 .map(|f| f.contains(&"rolling_summary".to_string()))
1140 .unwrap_or(false);
1141
1142 if !enable_summary {
1143 return;
1144 }
1145
1146 let summary_limit = self.config.summary_limit.unwrap_or(20);
1147 if self.history.len() <= summary_limit {
1148 return;
1149 }
1150
1151 info!("Rolling Summary: History limit reached. Triggering background summary.");
1152 let keep_recent = 6;
1153 if self.history.len() <= summary_limit + keep_recent
1154 || self.history.len() <= keep_recent + 1
1155 {
1156 return;
1157 }
1158
1159 let split_idx = self.history.len() - keep_recent;
1160 let to_summarize = self.history[1..split_idx].to_vec();
1161 let recent = self.history[split_idx..].to_vec();
1162
1163 let summary_prompt =
1164 "Summarize the above conversation so far, focusing on key details and user intent.";
1165 let mut summary_req_history = to_summarize;
1166 summary_req_history.push(ChatMessage {
1167 role: "user".to_string(),
1168 content: summary_prompt.to_string(),
1169 });
1170
1171 match self.provider.call(&self.config, &summary_req_history).await {
1172 Ok(summary) => {
1173 let mut new_history = Vec::new();
1174 if let Some(sys) = self.history.first() {
1175 let mut new_sys = sys.clone();
1176 new_sys.content.push_str("\n\n[Previous Context Summary]: ");
1177 new_sys.content.push_str(&summary);
1178 new_history.push(new_sys);
1179 }
1180 new_history.extend(recent);
1181 self.history = new_history;
1182 info!(
1183 "Rolling Summary: Applied summary. New history len: {}",
1184 self.history.len()
1185 );
1186 }
1187 Err(e) => {
1188 warn!("Rolling Summary failed: {}", e);
1189 }
1190 }
1191 }
1192
1193 fn check_interruption(
1194 &mut self,
1195 event: &SessionEvent,
1196 is_filler: &Option<bool>,
1197 ) -> Option<Command> {
1198 let strategy = self.interruption_config.strategy;
1199 let should_check = match (strategy, event) {
1200 (InterruptionStrategy::None, _) => false,
1201 (InterruptionStrategy::Vad, SessionEvent::Speaking { .. }) => true,
1202 (InterruptionStrategy::Asr, SessionEvent::AsrDelta { .. }) => true,
1203 (InterruptionStrategy::Both, _) => true,
1204 _ => false,
1205 };
1206
1207 if !self.is_speaking || self.is_hanging_up || !should_check {
1208 return None;
1209 }
1210
1211 if let Some(last_start) = self.last_tts_start_at {
1213 let ignore_ms = self.interruption_config.ignore_first_ms.unwrap_or(800);
1214 if last_start.elapsed().as_millis() < ignore_ms as u128 {
1215 return None;
1216 }
1217 }
1218
1219 if self.interruption_config.filler_word_filter.unwrap_or(false) {
1221 if let Some(true) = is_filler {
1222 return None;
1223 }
1224 if let SessionEvent::AsrDelta { text, .. } = event {
1225 if is_likely_filler(text) {
1226 return None;
1227 }
1228 }
1229 }
1230
1231 if let Some(last_final) = self.last_asr_final_at {
1233 if last_final.elapsed().as_millis() < 500 {
1234 return None;
1235 }
1236 }
1237
1238 info!("Smart interruption detected, stopping playback");
1239 self.is_speaking = false;
1240 Some(Command::Interrupt {
1241 graceful: Some(true),
1242 fade_out_ms: self.interruption_config.volume_fade_ms,
1243 })
1244 }
1245
1246 async fn handle_silence(&mut self) -> Result<Vec<Command>> {
1247 let follow_up_config = if let Some(scene_id) = &self.current_scene_id {
1248 self.scenes
1249 .get(scene_id)
1250 .and_then(|s| s.follow_up)
1251 .or(self.global_follow_up_config)
1252 } else {
1253 self.global_follow_up_config
1254 };
1255
1256 let Some(config) = follow_up_config else {
1257 return Ok(vec![]);
1258 };
1259
1260 if self.is_speaking
1261 || self.last_interaction_at.elapsed().as_millis() < config.timeout as u128
1262 {
1263 return Ok(vec![]);
1264 }
1265
1266 if self.consecutive_follow_ups >= config.max_count {
1267 info!("Max follow-up count reached, hanging up");
1268 let headers = self.render_sip_headers().await;
1269 return Ok(vec![Command::Hangup {
1270 reason: Some("Max follow-up reached".to_string()),
1271 initiator: Some("system".to_string()),
1272 headers,
1273 }]);
1274 }
1275
1276 info!(
1277 "Silence timeout detected ({}ms), triggering follow-up ({}/{})",
1278 self.last_interaction_at.elapsed().as_millis(),
1279 self.consecutive_follow_ups + 1,
1280 config.max_count
1281 );
1282 self.consecutive_follow_ups += 1;
1283 self.last_interaction_at = std::time::Instant::now();
1284 self.generate_response().await
1285 }
1286
1287 async fn handle_function_call(&mut self, name: &str, arguments: &str) -> Result<Vec<Command>> {
1288 info!(
1289 "Function call from Realtime: {} with args {}",
1290 name, arguments
1291 );
1292 let args: serde_json::Value = serde_json::from_str(arguments).unwrap_or_default();
1293
1294 match name {
1295 "hangup_call" => {
1296 let headers = self.render_sip_headers().await;
1297 Ok(vec![Command::Hangup {
1298 reason: args["reason"].as_str().map(|s| s.to_string()),
1299 initiator: Some("ai".to_string()),
1300 headers,
1301 }])
1302 }
1303 "transfer_call" | "refer_call" => {
1304 if let Some(callee) = args["callee"]
1305 .as_str()
1306 .or_else(|| args["callee_uri"].as_str())
1307 {
1308 Ok(vec![Command::Refer {
1309 caller: String::new(),
1310 callee: callee.to_string(),
1311 options: None,
1312 }])
1313 } else {
1314 warn!("No callee provided for transfer_call");
1315 Ok(vec![])
1316 }
1317 }
1318 "goto_scene" => {
1319 if let Some(scene) = args["scene"].as_str() {
1320 self.switch_to_scene(scene, false).await
1321 } else {
1322 Ok(vec![])
1323 }
1324 }
1325 _ => {
1326 warn!("Unhandled function call: {}", name);
1327 Ok(vec![])
1328 }
1329 }
1330 }
1331
1332 async fn interpret_response(&mut self, initial: String) -> Result<Vec<Command>> {
1333 let mut tool_commands = Vec::new();
1334 let mut wait_input_timeout = None;
1335 let mut attempts = 0;
1336 let mut raw = initial;
1337
1338 let final_text = loop {
1339 attempts += 1;
1340
1341 let Some(structured) = parse_structured_response(&raw) else {
1342 break Some(raw);
1343 };
1344
1345 if wait_input_timeout.is_none() {
1346 wait_input_timeout = structured.wait_input_timeout;
1347 }
1348
1349 let mut rerun_for_rag = false;
1350 if let Some(tools) = structured.tools {
1351 for tool in tools {
1352 let needs_rerun = self
1353 .handle_tool_invocation(tool, &mut tool_commands)
1354 .await?;
1355 rerun_for_rag = rerun_for_rag || needs_rerun;
1356 }
1357 }
1358
1359 if !rerun_for_rag {
1360 break structured.text;
1361 }
1362
1363 if attempts >= MAX_RAG_ATTEMPTS {
1364 warn!("Reached RAG iteration limit, using last response");
1365 break structured.text.or(Some(raw));
1366 }
1367
1368 raw = self.call_llm().await?;
1369 };
1370
1371 let has_hangup = tool_commands
1372 .iter()
1373 .any(|c| matches!(c, Command::Hangup { .. }));
1374 let mut commands = Vec::new();
1375
1376 if let Some(text) = final_text {
1377 if !text.trim().is_empty() {
1378 self.history.push(ChatMessage {
1379 role: "assistant".to_string(),
1380 content: text.clone(),
1381 });
1382 self.last_tts_start_at = Some(std::time::Instant::now());
1383 self.is_speaking = true;
1384
1385 let auto_hangup = has_hangup.then_some(true);
1386 commands.push(self.create_tts_command(text, wait_input_timeout, auto_hangup));
1387
1388 if has_hangup {
1389 tool_commands.retain(|c| !matches!(c, Command::Hangup { .. }));
1390 self.is_hanging_up = true;
1391 }
1392 }
1393 }
1394
1395 commands.extend(tool_commands);
1396 Ok(commands)
1397 }
1398}
1399
1400fn parse_structured_response(raw: &str) -> Option<StructuredResponse> {
1401 let payload = extract_json_block(raw)?;
1402 serde_json::from_str(payload).ok()
1403}
1404
1405fn is_likely_filler(text: &str) -> bool {
1406 let trimmed = text.trim().to_lowercase();
1407 FILLERS.contains(&trimmed)
1408}
1409
1410fn extract_json_block(raw: &str) -> Option<&str> {
1411 let trimmed = raw.trim();
1412 if trimmed.starts_with('`') {
1413 if let Some(end) = trimmed.rfind("```") {
1414 if end <= 3 {
1415 return None;
1416 }
1417 let mut inner = &trimmed[3..end];
1418 inner = inner.trim();
1419 if inner.to_lowercase().starts_with("json") {
1420 if let Some(newline) = inner.find('\n') {
1421 inner = inner[newline + 1..].trim();
1422 } else if inner.len() > 4 {
1423 inner = inner[4..].trim();
1424 } else {
1425 inner = inner.trim();
1426 }
1427 }
1428 return Some(inner);
1429 }
1430 } else if trimmed.starts_with('{') || trimmed.starts_with('[') {
1431 return Some(trimmed);
1432 }
1433 None
1434}
1435
1436#[async_trait]
1437impl DialogueHandler for LlmHandler {
1438 async fn on_start(&mut self) -> Result<Vec<Command>> {
1439 self.last_tts_start_at = Some(std::time::Instant::now());
1440
1441 let mut commands = Vec::new();
1442
1443 if let Some(scene_id) = &self.current_scene_id {
1445 if let Some(scene) = self.scenes.get(scene_id) {
1446 if let Some(audio_file) = &scene.play {
1447 commands.push(Command::Play {
1448 url: audio_file.clone(),
1449 play_id: None,
1450 auto_hangup: None,
1451 wait_input_timeout: None,
1452 });
1453 }
1454 }
1455 }
1456
1457 if let Some(greeting) = &self.config.greeting {
1458 self.is_speaking = true;
1459 commands.push(self.create_tts_command(greeting.clone(), None, None));
1460 return Ok(commands);
1461 }
1462
1463 let response_commands = self.generate_response().await?;
1464 commands.extend(response_commands);
1465 Ok(commands)
1466 }
1467
1468 async fn on_event(&mut self, event: &SessionEvent) -> Result<Vec<Command>> {
1469 match event {
1470 SessionEvent::Dtmf { digit, .. } => {
1471 info!("DTMF received: {}", digit);
1472 if let Some(action) = self.get_dtmf_action(digit) {
1473 self.handle_dtmf_action(action).await
1474 } else {
1475 Ok(vec![])
1476 }
1477 }
1478 SessionEvent::AsrFinal { text, .. } => self.handle_asr_final(text).await,
1479 SessionEvent::AsrDelta { is_filler, .. } | SessionEvent::Speaking { is_filler, .. } => {
1480 Ok(self
1481 .check_interruption(event, is_filler)
1482 .into_iter()
1483 .collect())
1484 }
1485 SessionEvent::Eou { completed, .. } => {
1486 if *completed && !self.is_speaking {
1487 info!("EOU detected, triggering early response");
1488 self.generate_response().await
1489 } else {
1490 Ok(vec![])
1491 }
1492 }
1493 SessionEvent::Silence { .. } => self.handle_silence().await,
1494 SessionEvent::TrackStart { .. } => {
1495 self.is_speaking = true;
1496 Ok(vec![])
1497 }
1498 SessionEvent::TrackEnd { .. } => {
1499 self.is_speaking = false;
1500 self.is_hanging_up = false;
1501 self.last_interaction_at = std::time::Instant::now();
1502 Ok(vec![])
1503 }
1504 SessionEvent::FunctionCall {
1505 name, arguments, ..
1506 } => self.handle_function_call(name, arguments).await,
1507 _ => Ok(vec![]),
1508 }
1509 }
1510
1511 async fn get_history(&self) -> Vec<ChatMessage> {
1512 self.history.clone()
1513 }
1514
1515 async fn summarize(&mut self, prompt: &str) -> Result<String> {
1516 info!("Generating summary with prompt: {}", prompt);
1517 let mut summary_history = self.history.clone();
1518 summary_history.push(ChatMessage {
1519 role: "user".to_string(),
1520 content: prompt.to_string(),
1521 });
1522
1523 self.provider.call(&self.config, &summary_history).await
1524 }
1525}