1use crate::call::Command;
2use crate::event::SessionEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use futures::StreamExt;
6use once_cell::sync::Lazy;
7use regex::Regex;
8use reqwest::Client;
9use serde_json::json;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tracing::{info, warn};
13
14#[cfg(test)]
15mod tests;
16
17static RE_HANGUP: Lazy<Regex> = Lazy::new(|| Regex::new(r"<hangup\s*/>").unwrap());
18static RE_REFER: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<refer\s+to="([^"]+)"\s*/>"#).unwrap());
19static RE_PLAY: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<play\s+file="([^"]+)"\s*/>"#).unwrap());
20static RE_GOTO: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<goto\s+scene="([^"]+)"\s*/>"#).unwrap());
21static RE_SENTENCE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)[.!?。!?\n]\s*").unwrap());
22static FILLERS: Lazy<std::collections::HashSet<String>> = Lazy::new(|| {
23 let mut s = std::collections::HashSet::new();
24 let default_fillers = ["嗯", "啊", "哦", "那个", "那个...", "uh", "um", "ah"];
25
26 if let Ok(content) = std::fs::read_to_string("config/fillers.txt") {
27 for line in content.lines() {
28 let trimmed = line.trim().to_lowercase();
29 if !trimmed.is_empty() {
30 s.insert(trimmed);
31 }
32 }
33 }
34
35 if s.is_empty() {
36 for f in default_fillers {
37 s.insert(f.to_string());
38 }
39 }
40 s
41});
42
43use super::ChatMessage;
44use super::InterruptionStrategy;
45use super::LlmConfig;
46use super::dialogue::DialogueHandler;
47
48pub mod provider;
49pub mod rag;
50pub mod types;
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53enum CommandKind {
54 Hangup,
55 Refer,
56 Sentence,
57 Play,
58 Goto,
59}
60
61pub use provider::*;
62pub use rag::*;
63pub use types::*;
64
65const MAX_RAG_ATTEMPTS: usize = 3;
66
67pub struct LlmHandler {
68 config: LlmConfig,
69 interruption_config: super::InterruptionConfig,
70 global_follow_up_config: Option<super::FollowUpConfig>,
71 dtmf_config: Option<HashMap<String, super::DtmfAction>>,
72 history: Vec<ChatMessage>,
73 provider: Arc<dyn LlmProvider>,
74 rag_retriever: Arc<dyn RagRetriever>,
75 is_speaking: bool,
76 is_hanging_up: bool,
77 consecutive_follow_ups: u32,
78 last_interaction_at: std::time::Instant,
79 event_sender: Option<crate::event::EventSender>,
80 last_asr_final_at: Option<std::time::Instant>,
81 last_tts_start_at: Option<std::time::Instant>,
82 last_robot_msg_at: Option<std::time::Instant>,
83 call: Option<crate::call::ActiveCallRef>,
84 scenes: HashMap<String, super::Scene>,
85 current_scene_id: Option<String>,
86 client: Client,
87}
88
89impl LlmHandler {
90 pub fn new(
91 config: LlmConfig,
92 interruption: super::InterruptionConfig,
93 global_follow_up_config: Option<super::FollowUpConfig>,
94 scenes: HashMap<String, super::Scene>,
95 dtmf: Option<HashMap<String, super::DtmfAction>>,
96 initial_scene_id: Option<String>,
97 ) -> Self {
98 Self::with_provider(
99 config,
100 Arc::new(DefaultLlmProvider::new()),
101 Arc::new(NoopRagRetriever),
102 interruption,
103 global_follow_up_config,
104 scenes,
105 dtmf,
106 initial_scene_id,
107 )
108 }
109
110 pub fn with_provider(
111 config: LlmConfig,
112 provider: Arc<dyn LlmProvider>,
113 rag_retriever: Arc<dyn RagRetriever>,
114 interruption: super::InterruptionConfig,
115 global_follow_up_config: Option<super::FollowUpConfig>,
116 scenes: HashMap<String, super::Scene>,
117 dtmf: Option<HashMap<String, super::DtmfAction>>,
118 initial_scene_id: Option<String>,
119 ) -> Self {
120 let mut history = Vec::new();
121 let system_prompt = Self::build_system_prompt(&config, None);
122
123 history.push(ChatMessage {
124 role: "system".to_string(),
125 content: system_prompt,
126 });
127
128 Self {
129 config,
130 interruption_config: interruption,
131 global_follow_up_config,
132 dtmf_config: dtmf,
133 history,
134 provider,
135 rag_retriever,
136 is_speaking: false,
137 is_hanging_up: false,
138 consecutive_follow_ups: 0,
139 last_interaction_at: std::time::Instant::now(),
140 event_sender: None,
141 last_asr_final_at: None,
142 last_tts_start_at: None,
143 last_robot_msg_at: None,
144 call: None,
145 scenes,
146 current_scene_id: initial_scene_id,
147 client: Client::new(),
148 }
149 }
150
151 fn build_system_prompt(config: &LlmConfig, scene_prompt: Option<&str>) -> String {
152 let base_prompt =
153 scene_prompt.unwrap_or_else(|| config.prompt.as_deref().unwrap_or_default());
154 let mut features_prompt = String::new();
155
156 if let Some(features) = &config.features {
157 let lang = config.language.as_deref().unwrap_or("zh");
158 for feature in features {
159 match Self::load_feature_snippet(feature, lang) {
160 Ok(snippet) => {
161 features_prompt.push_str(&format!("\n- {}", snippet));
162 }
163 Err(e) => {
164 warn!("Failed to load feature snippet {}: {}", feature, e);
165 }
166 }
167 }
168 }
169
170 let features_section = if features_prompt.is_empty() {
171 String::new()
172 } else {
173 format!("\n\n### Enhanced Capabilities:{}\n", features_prompt)
174 };
175
176 let tool_instructions = if let Some(custom) = &config.tool_instructions {
178 custom.clone()
179 } else {
180 let lang = config.language.as_deref().unwrap_or("zh");
181 Self::load_feature_snippet("tool_instructions", lang)
182 .unwrap_or_else(|_| {
183 Self::load_feature_snippet("tool_instructions", "en")
185 .unwrap_or_else(|_| {
186 "Tool usage instructions:\n\
188 - To hang up the call, output: <hangup/>\n\
189 - To transfer the call, output: <refer to=\"sip:xxxx\"/>\n\
190 - To play an audio file, output: <play file=\"path/to/file.wav\"/>\n\
191 - To switch to another scene, output: <goto scene=\"scene_id\"/>\n\
192 - To call an external HTTP API, output JSON:\n\
193 ```json\n\
194 {{ \"tools\": [{{ \"name\": \"http\", \"url\": \"...\", \"method\": \"POST\", \"body\": {{ ... }} }}] }}\n\
195 ```\n\
196 Please use XML tags for simple actions and JSON blocks for tool calls. \
197 Output your response in short sentences. Each sentence will be played as soon as it is finished."
198 .to_string()
199 })
200 })
201 };
202
203 format!(
204 "{}{}\n\n{}",
205 base_prompt, features_section, tool_instructions
206 )
207 }
208
209 fn load_feature_snippet(feature: &str, lang: &str) -> Result<String> {
210 let path = format!("features/{}.{}.md", feature, lang);
211 let content = std::fs::read_to_string(path)?;
212 Ok(content.trim().to_string())
213 }
214
215 fn get_dtmf_action(&self, digit: &str) -> Option<super::DtmfAction> {
216 if let Some(scene_id) = &self.current_scene_id {
217 if let Some(scene) = self.scenes.get(scene_id) {
218 if let Some(dtmf) = &scene.dtmf {
219 if let Some(action) = dtmf.get(digit) {
220 return Some(action.clone());
221 }
222 }
223 }
224 }
225
226 if let Some(dtmf) = &self.dtmf_config {
227 if let Some(action) = dtmf.get(digit) {
228 return Some(action.clone());
229 }
230 }
231
232 None
233 }
234
235 async fn handle_dtmf_action(&mut self, action: super::DtmfAction) -> Result<Vec<Command>> {
236 match action {
237 super::DtmfAction::Goto { scene } => {
238 info!("DTMF action: switch to scene {}", scene);
239 self.switch_to_scene(&scene, true).await
240 }
241 super::DtmfAction::Transfer { target } => {
242 info!("DTMF action: transfer to {}", target);
243 Ok(vec![Command::Refer {
244 caller: String::new(),
245 callee: target,
246 options: None,
247 }])
248 }
249 super::DtmfAction::Hangup => {
250 info!("DTMF action: hangup");
251 Ok(vec![Command::Hangup {
252 reason: Some("DTMF Hangup".to_string()),
253 initiator: Some("ai".to_string()),
254 }])
255 }
256 }
257 }
258
259 async fn switch_to_scene(
260 &mut self,
261 scene_id: &str,
262 trigger_response: bool,
263 ) -> Result<Vec<Command>> {
264 if let Some(scene) = self.scenes.get(scene_id).cloned() {
265 info!("Switching to scene: {}", scene_id);
266 self.current_scene_id = Some(scene_id.to_string());
267 let system_prompt = Self::build_system_prompt(&self.config, Some(&scene.prompt));
268 if let Some(first_msg) = self.history.get_mut(0) {
269 if first_msg.role == "system" {
270 first_msg.content = system_prompt;
271 }
272 }
273
274 let mut commands = Vec::new();
275 if let Some(url) = &scene.play {
276 commands.push(Command::Play {
277 url: url.clone(),
278 play_id: None,
279 auto_hangup: None,
280 wait_input_timeout: None,
281 });
282 }
283
284 if trigger_response {
285 let response_cmds = self.generate_response().await?;
286 commands.extend(response_cmds);
287 }
288 Ok(commands)
289 } else {
290 warn!("Scene not found: {}", scene_id);
291 Ok(vec![])
292 }
293 }
294
295 pub fn get_history_ref(&self) -> &[ChatMessage] {
296 &self.history
297 }
298
299 pub fn get_current_scene_id(&self) -> Option<String> {
300 self.current_scene_id.clone()
301 }
302
303 pub fn set_call(&mut self, call: crate::call::ActiveCallRef) {
304 self.call = Some(call);
305 }
306
307 pub fn set_event_sender(&mut self, sender: crate::event::EventSender) {
308 self.event_sender = Some(sender.clone());
309 if let Some(greeting) = &self.config.greeting {
310 let _ = sender.send(crate::event::SessionEvent::AddHistory {
311 sender: Some("system".to_string()),
312 timestamp: crate::media::get_timestamp(),
313 speaker: "assistant".to_string(),
314 text: greeting.clone(),
315 });
316 }
317 }
318
319 fn send_debug_event(&self, key: &str, data: serde_json::Value) {
320 if let Some(sender) = &self.event_sender {
321 let timestamp = crate::media::get_timestamp();
322 if key == "llm_response" {
323 if let Some(text) = data.get("response").and_then(|v| v.as_str()) {
324 let _ = sender.send(crate::event::SessionEvent::AddHistory {
325 sender: Some("llm".to_string()),
326 timestamp,
327 speaker: "assistant".to_string(),
328 text: text.to_string(),
329 });
330 }
331 }
332
333 let event = crate::event::SessionEvent::Metrics {
334 timestamp,
335 key: key.to_string(),
336 duration: 0,
337 data,
338 };
339 let _ = sender.send(event);
340 }
341 }
342
343 async fn call_llm(&self) -> Result<String> {
344 self.provider.call(&self.config, &self.history).await
345 }
346
347 fn create_tts_command(
348 &self,
349 text: String,
350 wait_input_timeout: Option<u32>,
351 auto_hangup: Option<bool>,
352 ) -> Command {
353 let timeout = wait_input_timeout.unwrap_or(10000);
354 let play_id = uuid::Uuid::new_v4().to_string();
355
356 if let Some(sender) = &self.event_sender {
357 let _ = sender.send(crate::event::SessionEvent::Metrics {
358 timestamp: crate::media::get_timestamp(),
359 key: "tts_play_id_map".to_string(),
360 duration: 0,
361 data: serde_json::json!({
362 "playId": play_id,
363 "text": text,
364 }),
365 });
366 }
367
368 Command::Tts {
369 text,
370 speaker: None,
371 play_id: Some(play_id),
372 auto_hangup,
373 streaming: None,
374 end_of_stream: Some(true),
375 option: None,
376 wait_input_timeout: Some(timeout),
377 base64: None,
378 cache_key: None,
379 }
380 }
381
382 async fn generate_response(&mut self) -> Result<Vec<Command>> {
383 let start_time = crate::media::get_timestamp();
384 let play_id = uuid::Uuid::new_v4().to_string();
385
386 self.send_debug_event(
388 "llm_call_start",
389 json!({
390 "history_length": self.history.len(),
391 "playId": play_id,
392 }),
393 );
394
395 let mut stream = self
396 .provider
397 .call_stream(&self.config, &self.history)
398 .await?;
399
400 let mut full_content = String::new();
401 let mut full_reasoning = String::new();
402 let mut buffer = String::new();
403 let mut commands = Vec::new();
404 let mut is_json_mode = false;
405 let mut checked_json_mode = false;
406 let mut first_token_time = None;
407
408 while let Some(chunk_result) = stream.next().await {
409 let event = match chunk_result {
410 Ok(c) => c,
411 Err(e) => {
412 warn!("LLM stream error: {}", e);
413 break;
414 }
415 };
416
417 match event {
418 LlmStreamEvent::Reasoning(text) => {
419 full_reasoning.push_str(&text);
420 }
421 LlmStreamEvent::Content(chunk) => {
422 if first_token_time.is_none() && !chunk.trim().is_empty() {
423 first_token_time = Some(crate::media::get_timestamp());
424 }
425
426 full_content.push_str(&chunk);
427 buffer.push_str(&chunk);
428
429 if !checked_json_mode {
430 let trimmed = full_content.trim();
431 if !trimmed.is_empty() {
432 if trimmed.starts_with('{') || trimmed.starts_with('`') {
433 is_json_mode = true;
434 }
435 checked_json_mode = true;
436 }
437 }
438
439 if checked_json_mode && !is_json_mode {
440 let extracted =
441 self.extract_streaming_commands(&mut buffer, &play_id, false);
442 for cmd in extracted {
443 if let Some(call) = &self.call {
444 let _ = call.enqueue_command(cmd).await;
445 } else {
446 commands.push(cmd);
447 }
448 }
449 }
450 }
451 }
452 }
453
454 let end_time = crate::media::get_timestamp();
456 self.send_debug_event(
457 "llm_response",
458 json!({
459 "response": full_content,
460 "reasoning": full_reasoning,
461 "is_json_mode": is_json_mode,
462 "duration": end_time - start_time,
463 "ttfb": first_token_time.map(|t| t - start_time).unwrap_or(0),
464 "playId": play_id,
465 }),
466 );
467
468 if is_json_mode {
469 self.interpret_response(full_content).await
470 } else {
471 let extracted = self.extract_streaming_commands(&mut buffer, &play_id, true);
472 for cmd in extracted {
473 if let Some(call) = &self.call {
474 let _ = call.enqueue_command(cmd).await;
475 } else {
476 commands.push(cmd);
477 }
478 }
479 if !full_content.trim().is_empty() {
480 self.history.push(ChatMessage {
481 role: "assistant".to_string(),
482 content: full_content,
483 });
484 self.last_robot_msg_at = Some(std::time::Instant::now());
485 self.is_speaking = true;
486 self.last_tts_start_at = Some(std::time::Instant::now());
487 }
488 Ok(commands)
489 }
490 }
491
492 fn extract_streaming_commands(
493 &mut self,
494 buffer: &mut String,
495 play_id: &str,
496 is_final: bool,
497 ) -> Vec<Command> {
498 let mut commands = Vec::new();
499
500 loop {
501 let hangup_pos = RE_HANGUP.find(buffer);
502 let refer_pos = RE_REFER.captures(buffer);
503 let play_pos = RE_PLAY.captures(buffer);
504 let goto_pos = RE_GOTO.captures(buffer);
505 let sentence_pos = RE_SENTENCE.find(buffer);
506
507 let mut positions: Vec<(usize, CommandKind)> = Vec::new();
509 if let Some(m) = hangup_pos {
510 positions.push((m.start(), CommandKind::Hangup));
511 }
512 if let Some(caps) = &refer_pos {
513 positions.push((caps.get(0).unwrap().start(), CommandKind::Refer));
514 }
515 if let Some(caps) = &play_pos {
516 positions.push((caps.get(0).unwrap().start(), CommandKind::Play));
517 }
518 if let Some(caps) = &goto_pos {
519 positions.push((caps.get(0).unwrap().start(), CommandKind::Goto));
520 }
521 if let Some(m) = sentence_pos {
522 positions.push((m.start(), CommandKind::Sentence));
523 }
524
525 positions.sort_by_key(|p| p.0);
526
527 if let Some((pos, kind)) = positions.first() {
528 let pos = *pos;
529 match kind {
530 CommandKind::Hangup => {
531 let prefix = buffer[..pos].to_string();
532 if !prefix.trim().is_empty() {
533 let mut cmd = self.create_tts_command_with_id(
534 prefix,
535 play_id.to_string(),
536 Some(true),
537 );
538 if let Command::Tts { end_of_stream, .. } = &mut cmd {
539 *end_of_stream = Some(true);
540 }
541 self.is_hanging_up = true;
542 commands.push(cmd);
543 } else {
544 let mut cmd = self.create_tts_command_with_id(
545 "".to_string(),
546 play_id.to_string(),
547 Some(true),
548 );
549 if let Command::Tts { end_of_stream, .. } = &mut cmd {
550 *end_of_stream = Some(true);
551 }
552 self.is_hanging_up = true;
553 commands.push(cmd);
554 }
555 buffer.drain(..RE_HANGUP.find(buffer).unwrap().end());
556 return commands;
557 }
558 CommandKind::Refer => {
559 let caps = RE_REFER.captures(buffer).unwrap();
560 let mat = caps.get(0).unwrap();
561 let callee = caps.get(1).unwrap().as_str().to_string();
562
563 let prefix = buffer[..pos].to_string();
564 if !prefix.trim().is_empty() {
565 commands.push(self.create_tts_command_with_id(
566 prefix,
567 play_id.to_string(),
568 None,
569 ));
570 }
571 commands.push(Command::Refer {
572 caller: String::new(),
573 callee,
574 options: None,
575 });
576 buffer.drain(..mat.end());
577 }
578 CommandKind::Play => {
579 let caps = RE_PLAY.captures(buffer).unwrap();
581 let mat = caps.get(0).unwrap();
582 let url = caps.get(1).unwrap().as_str().to_string();
583
584 let prefix = buffer[..pos].to_string();
585 if !prefix.trim().is_empty() {
586 commands.push(self.create_tts_command_with_id(
587 prefix,
588 play_id.to_string(),
589 None,
590 ));
591 }
592 commands.push(Command::Play {
593 url,
594 play_id: None,
595 auto_hangup: None,
596 wait_input_timeout: None,
597 });
598 buffer.drain(..mat.end());
599 }
600 CommandKind::Goto => {
601 let caps = RE_GOTO.captures(buffer).unwrap();
603 let mat = caps.get(0).unwrap();
604 let scene_id = caps.get(1).unwrap().as_str().to_string();
605
606 let prefix = buffer[..pos].to_string();
607 if !prefix.trim().is_empty() {
608 commands.push(self.create_tts_command_with_id(
609 prefix,
610 play_id.to_string(),
611 None,
612 ));
613 }
614
615 info!("Switching to scene (from stream): {}", scene_id);
616 if let Some(scene) = self.scenes.get(&scene_id) {
617 self.current_scene_id = Some(scene_id);
618 let system_prompt =
620 Self::build_system_prompt(&self.config, Some(&scene.prompt));
621 if let Some(first_msg) = self.history.get_mut(0) {
622 if first_msg.role == "system" {
623 first_msg.content = system_prompt;
624 }
625 }
626 } else {
627 warn!("Scene not found: {}", scene_id);
628 }
629
630 buffer.drain(..mat.end());
631 }
632 CommandKind::Sentence => {
633 let mat = sentence_pos.unwrap();
635 let sentence = buffer[..mat.end()].to_string();
636 if !sentence.trim().is_empty() {
637 commands.push(self.create_tts_command_with_id(
638 sentence,
639 play_id.to_string(),
640 None,
641 ));
642 }
643 buffer.drain(..mat.end());
644 }
645 }
646 } else {
647 break;
648 }
649 }
650
651 if is_final {
652 let remaining = buffer.trim().to_string();
653 if !remaining.is_empty() {
654 commands.push(self.create_tts_command_with_id(
655 remaining,
656 play_id.to_string(),
657 None,
658 ));
659 }
660 buffer.clear();
661
662 if let Some(last) = commands.last_mut() {
663 if let Command::Tts { end_of_stream, .. } = last {
664 *end_of_stream = Some(true);
665 }
666 } else if !self.is_hanging_up {
667 commands.push(Command::Tts {
668 text: "".to_string(),
669 speaker: None,
670 play_id: Some(play_id.to_string()),
671 auto_hangup: None,
672 streaming: Some(true),
673 end_of_stream: Some(true),
674 option: None,
675 wait_input_timeout: None,
676 base64: None,
677 cache_key: None,
678 });
679 }
680 }
681
682 commands
683 }
684
685 fn create_tts_command_with_id(
686 &self,
687 text: String,
688 play_id: String,
689 auto_hangup: Option<bool>,
690 ) -> Command {
691 Command::Tts {
692 text,
693 speaker: None,
694 play_id: Some(play_id),
695 auto_hangup,
696 streaming: Some(true),
697 end_of_stream: None,
698 option: None,
699 wait_input_timeout: Some(10000),
700 base64: None,
701 cache_key: None,
702 }
703 }
704
705 async fn handle_tool_invocation(
706 &mut self,
707 tool: ToolInvocation,
708 tool_commands: &mut Vec<Command>,
709 ) -> Result<bool> {
710 match tool {
711 ToolInvocation::Hangup {
712 ref reason,
713 ref initiator,
714 } => {
715 self.send_debug_event(
716 "tool_invocation",
717 json!({
718 "tool": "Hangup",
719 "params": {
720 "reason": reason,
721 "initiator": initiator,
722 }
723 }),
724 );
725 tool_commands.push(Command::Hangup {
726 reason: reason.clone(),
727 initiator: initiator.clone(),
728 });
729 Ok(false)
730 }
731 ToolInvocation::Refer {
732 ref caller,
733 ref callee,
734 ref options,
735 } => {
736 self.send_debug_event(
737 "tool_invocation",
738 json!({
739 "tool": "Refer",
740 "params": {
741 "caller": caller,
742 "callee": callee,
743 }
744 }),
745 );
746 tool_commands.push(Command::Refer {
747 caller: caller.clone(),
748 callee: callee.clone(),
749 options: options.clone(),
750 });
751 Ok(false)
752 }
753 ToolInvocation::Rag {
754 ref query,
755 ref source,
756 } => {
757 self.handle_rag_tool(query, source).await?;
758 Ok(true)
759 }
760 ToolInvocation::Accept { ref options } => {
761 self.send_debug_event("tool_invocation", json!({ "tool": "Accept" }));
762 tool_commands.push(Command::Accept {
763 option: options.clone().unwrap_or_default(),
764 });
765 Ok(false)
766 }
767 ToolInvocation::Reject { ref reason, code } => {
768 self.send_debug_event(
769 "tool_invocation",
770 json!({
771 "tool": "Reject",
772 "params": {
773 "reason": reason,
774 "code": code,
775 }
776 }),
777 );
778 tool_commands.push(Command::Reject {
779 reason: reason
780 .clone()
781 .unwrap_or_else(|| "Rejected by agent".to_string()),
782 code,
783 });
784 Ok(false)
785 }
786 ToolInvocation::Http {
787 ref url,
788 ref method,
789 ref body,
790 ref headers,
791 } => {
792 self.handle_http_tool(url, method, body, headers).await?;
793 Ok(true)
794 }
795 }
796 }
797
798 async fn handle_rag_tool(&mut self, query: &str, source: &Option<String>) -> Result<()> {
799 self.send_debug_event(
800 "tool_invocation",
801 json!({
802 "tool": "Rag",
803 "params": {
804 "query": query,
805 "source": source,
806 }
807 }),
808 );
809
810 let rag_result = self.rag_retriever.retrieve(query).await?;
811
812 self.send_debug_event(
813 "rag_result",
814 json!({
815 "query": query,
816 "result": rag_result,
817 }),
818 );
819
820 let summary = if let Some(source) = source {
821 format!("[{}] {}", source, rag_result)
822 } else {
823 rag_result
824 };
825
826 self.history.push(ChatMessage {
827 role: "system".to_string(),
828 content: format!("RAG result for {}: {}", query, summary),
829 });
830
831 Ok(())
832 }
833
834 async fn handle_http_tool(
835 &mut self,
836 url: &str,
837 method: &Option<String>,
838 body: &Option<serde_json::Value>,
839 headers: &Option<HashMap<String, String>>,
840 ) -> Result<()> {
841 let method_str = method.as_deref().unwrap_or("GET").to_uppercase();
842 let method =
843 reqwest::Method::from_bytes(method_str.as_bytes()).unwrap_or(reqwest::Method::GET);
844
845 self.send_debug_event(
846 "tool_invocation",
847 json!({
848 "tool": "Http",
849 "params": {
850 "url": url,
851 "method": method_str,
852 }
853 }),
854 );
855
856 let mut req = self.client.request(method, url);
857 if let Some(body) = body {
858 req = req.json(body);
859 }
860 if let Some(headers) = headers {
861 for (k, v) in headers {
862 req = req.header(k, v);
863 }
864 }
865
866 match req.send().await {
867 Ok(res) => {
868 let status = res.status();
869 let text = res.text().await.unwrap_or_default();
870 self.history.push(ChatMessage {
871 role: "system".to_string(),
872 content: format!("HTTP tool response ({}): {}", status, text),
873 });
874 }
875 Err(e) => {
876 warn!("HTTP tool failed: {}", e);
877 self.history.push(ChatMessage {
878 role: "system".to_string(),
879 content: format!("HTTP tool failed: {}", e),
880 });
881 }
882 }
883
884 Ok(())
885 }
886
887 async fn handle_asr_final(&mut self, text: &str) -> Result<Vec<Command>> {
888 if text.trim().is_empty() {
889 return Ok(vec![]);
890 }
891
892 self.apply_context_repair(text);
893 self.apply_rolling_summary().await;
894
895 self.last_asr_final_at = Some(std::time::Instant::now());
896 self.last_interaction_at = std::time::Instant::now();
897 self.is_speaking = false;
898 self.consecutive_follow_ups = 0;
899
900 self.generate_response().await
901 }
902
903 fn apply_context_repair(&mut self, text: &str) {
904 let enable_repair = self
905 .config
906 .features
907 .as_ref()
908 .map(|f| f.contains(&"context_repair".to_string()))
909 .unwrap_or(false);
910
911 if !enable_repair {
912 self.history.push(ChatMessage {
913 role: "user".to_string(),
914 content: text.to_string(),
915 });
916 return;
917 }
918
919 let repair_window_ms = self.config.repair_window_ms.unwrap_or(3000) as u128;
920 let mut merged = false;
921
922 if let Some(last_robot_at) = self.last_robot_msg_at {
923 if last_robot_at.elapsed().as_millis() < repair_window_ms {
924 if let Some(last_msg) = self.history.last() {
925 if last_msg.role == "assistant" && last_msg.content.chars().count() < 15 {
926 info!(
927 "Context Repair: Detected potential fragmentation. Triggering merge."
928 );
929 self.history.pop();
930 if let Some(prev_user) = self.history.last_mut() {
931 if prev_user.role == "user" {
932 prev_user.content.push_str(",");
933 prev_user.content.push_str(text);
934 merged = true;
935 }
936 }
937 }
938 }
939 }
940 }
941
942 if !merged {
943 self.history.push(ChatMessage {
944 role: "user".to_string(),
945 content: text.to_string(),
946 });
947 }
948 }
949
950 async fn apply_rolling_summary(&mut self) {
951 let enable_summary = self
952 .config
953 .features
954 .as_ref()
955 .map(|f| f.contains(&"rolling_summary".to_string()))
956 .unwrap_or(false);
957
958 if !enable_summary {
959 return;
960 }
961
962 let summary_limit = self.config.summary_limit.unwrap_or(20);
963 if self.history.len() <= summary_limit {
964 return;
965 }
966
967 info!("Rolling Summary: History limit reached. Triggering background summary.");
968 let keep_recent = 6;
969 if self.history.len() <= summary_limit + keep_recent
970 || self.history.len() <= keep_recent + 1
971 {
972 return;
973 }
974
975 let split_idx = self.history.len() - keep_recent;
976 let to_summarize = self.history[1..split_idx].to_vec();
977 let recent = self.history[split_idx..].to_vec();
978
979 let summary_prompt =
980 "Summarize the above conversation so far, focusing on key details and user intent.";
981 let mut summary_req_history = to_summarize;
982 summary_req_history.push(ChatMessage {
983 role: "user".to_string(),
984 content: summary_prompt.to_string(),
985 });
986
987 match self.provider.call(&self.config, &summary_req_history).await {
988 Ok(summary) => {
989 let mut new_history = Vec::new();
990 if let Some(sys) = self.history.first() {
991 let mut new_sys = sys.clone();
992 new_sys.content.push_str("\n\n[Previous Context Summary]: ");
993 new_sys.content.push_str(&summary);
994 new_history.push(new_sys);
995 }
996 new_history.extend(recent);
997 self.history = new_history;
998 info!(
999 "Rolling Summary: Applied summary. New history len: {}",
1000 self.history.len()
1001 );
1002 }
1003 Err(e) => {
1004 warn!("Rolling Summary failed: {}", e);
1005 }
1006 }
1007 }
1008
1009 fn check_interruption(
1010 &mut self,
1011 event: &SessionEvent,
1012 is_filler: &Option<bool>,
1013 ) -> Option<Command> {
1014 let strategy = self.interruption_config.strategy;
1015 let should_check = match (strategy, event) {
1016 (InterruptionStrategy::None, _) => false,
1017 (InterruptionStrategy::Vad, SessionEvent::Speaking { .. }) => true,
1018 (InterruptionStrategy::Asr, SessionEvent::AsrDelta { .. }) => true,
1019 (InterruptionStrategy::Both, _) => true,
1020 _ => false,
1021 };
1022
1023 if !self.is_speaking || self.is_hanging_up || !should_check {
1024 return None;
1025 }
1026
1027 if let Some(last_start) = self.last_tts_start_at {
1029 let ignore_ms = self.interruption_config.ignore_first_ms.unwrap_or(800);
1030 if last_start.elapsed().as_millis() < ignore_ms as u128 {
1031 return None;
1032 }
1033 }
1034
1035 if self.interruption_config.filler_word_filter.unwrap_or(false) {
1037 if let Some(true) = is_filler {
1038 return None;
1039 }
1040 if let SessionEvent::AsrDelta { text, .. } = event {
1041 if is_likely_filler(text) {
1042 return None;
1043 }
1044 }
1045 }
1046
1047 if let Some(last_final) = self.last_asr_final_at {
1049 if last_final.elapsed().as_millis() < 500 {
1050 return None;
1051 }
1052 }
1053
1054 info!("Smart interruption detected, stopping playback");
1055 self.is_speaking = false;
1056 Some(Command::Interrupt {
1057 graceful: Some(true),
1058 fade_out_ms: self.interruption_config.volume_fade_ms,
1059 })
1060 }
1061
1062 async fn handle_silence(&mut self) -> Result<Vec<Command>> {
1063 let follow_up_config = if let Some(scene_id) = &self.current_scene_id {
1064 self.scenes
1065 .get(scene_id)
1066 .and_then(|s| s.follow_up)
1067 .or(self.global_follow_up_config)
1068 } else {
1069 self.global_follow_up_config
1070 };
1071
1072 let Some(config) = follow_up_config else {
1073 return Ok(vec![]);
1074 };
1075
1076 if self.is_speaking
1077 || self.last_interaction_at.elapsed().as_millis() < config.timeout as u128
1078 {
1079 return Ok(vec![]);
1080 }
1081
1082 if self.consecutive_follow_ups >= config.max_count {
1083 info!("Max follow-up count reached, hanging up");
1084 return Ok(vec![Command::Hangup {
1085 reason: Some("Max follow-up reached".to_string()),
1086 initiator: Some("system".to_string()),
1087 }]);
1088 }
1089
1090 info!(
1091 "Silence timeout detected ({}ms), triggering follow-up ({}/{})",
1092 self.last_interaction_at.elapsed().as_millis(),
1093 self.consecutive_follow_ups + 1,
1094 config.max_count
1095 );
1096 self.consecutive_follow_ups += 1;
1097 self.last_interaction_at = std::time::Instant::now();
1098 self.generate_response().await
1099 }
1100
1101 async fn handle_function_call(&mut self, name: &str, arguments: &str) -> Result<Vec<Command>> {
1102 info!(
1103 "Function call from Realtime: {} with args {}",
1104 name, arguments
1105 );
1106 let args: serde_json::Value = serde_json::from_str(arguments).unwrap_or_default();
1107
1108 match name {
1109 "hangup_call" => Ok(vec![Command::Hangup {
1110 reason: args["reason"].as_str().map(|s| s.to_string()),
1111 initiator: Some("ai".to_string()),
1112 }]),
1113 "transfer_call" | "refer_call" => {
1114 if let Some(callee) = args["callee"]
1115 .as_str()
1116 .or_else(|| args["callee_uri"].as_str())
1117 {
1118 Ok(vec![Command::Refer {
1119 caller: String::new(),
1120 callee: callee.to_string(),
1121 options: None,
1122 }])
1123 } else {
1124 warn!("No callee provided for transfer_call");
1125 Ok(vec![])
1126 }
1127 }
1128 "goto_scene" => {
1129 if let Some(scene) = args["scene"].as_str() {
1130 self.switch_to_scene(scene, false).await
1131 } else {
1132 Ok(vec![])
1133 }
1134 }
1135 _ => {
1136 warn!("Unhandled function call: {}", name);
1137 Ok(vec![])
1138 }
1139 }
1140 }
1141
1142 async fn interpret_response(&mut self, initial: String) -> Result<Vec<Command>> {
1143 let mut tool_commands = Vec::new();
1144 let mut wait_input_timeout = None;
1145 let mut attempts = 0;
1146 let mut raw = initial;
1147
1148 let final_text = loop {
1149 attempts += 1;
1150
1151 let Some(structured) = parse_structured_response(&raw) else {
1152 break Some(raw);
1153 };
1154
1155 if wait_input_timeout.is_none() {
1156 wait_input_timeout = structured.wait_input_timeout;
1157 }
1158
1159 let mut rerun_for_rag = false;
1160 if let Some(tools) = structured.tools {
1161 for tool in tools {
1162 let needs_rerun = self
1163 .handle_tool_invocation(tool, &mut tool_commands)
1164 .await?;
1165 rerun_for_rag = rerun_for_rag || needs_rerun;
1166 }
1167 }
1168
1169 if !rerun_for_rag {
1170 break structured.text;
1171 }
1172
1173 if attempts >= MAX_RAG_ATTEMPTS {
1174 warn!("Reached RAG iteration limit, using last response");
1175 break structured.text.or(Some(raw));
1176 }
1177
1178 raw = self.call_llm().await?;
1179 };
1180
1181 let has_hangup = tool_commands
1182 .iter()
1183 .any(|c| matches!(c, Command::Hangup { .. }));
1184 let mut commands = Vec::new();
1185
1186 if let Some(text) = final_text {
1187 if !text.trim().is_empty() {
1188 self.history.push(ChatMessage {
1189 role: "assistant".to_string(),
1190 content: text.clone(),
1191 });
1192 self.last_tts_start_at = Some(std::time::Instant::now());
1193 self.is_speaking = true;
1194
1195 let auto_hangup = has_hangup.then_some(true);
1196 commands.push(self.create_tts_command(text, wait_input_timeout, auto_hangup));
1197
1198 if has_hangup {
1199 tool_commands.retain(|c| !matches!(c, Command::Hangup { .. }));
1200 self.is_hanging_up = true;
1201 }
1202 }
1203 }
1204
1205 commands.extend(tool_commands);
1206 Ok(commands)
1207 }
1208}
1209
1210fn parse_structured_response(raw: &str) -> Option<StructuredResponse> {
1211 let payload = extract_json_block(raw)?;
1212 serde_json::from_str(payload).ok()
1213}
1214
1215fn is_likely_filler(text: &str) -> bool {
1216 let trimmed = text.trim().to_lowercase();
1217 FILLERS.contains(&trimmed)
1218}
1219
1220fn extract_json_block(raw: &str) -> Option<&str> {
1221 let trimmed = raw.trim();
1222 if trimmed.starts_with('`') {
1223 if let Some(end) = trimmed.rfind("```") {
1224 if end <= 3 {
1225 return None;
1226 }
1227 let mut inner = &trimmed[3..end];
1228 inner = inner.trim();
1229 if inner.to_lowercase().starts_with("json") {
1230 if let Some(newline) = inner.find('\n') {
1231 inner = inner[newline + 1..].trim();
1232 } else if inner.len() > 4 {
1233 inner = inner[4..].trim();
1234 } else {
1235 inner = inner.trim();
1236 }
1237 }
1238 return Some(inner);
1239 }
1240 } else if trimmed.starts_with('{') || trimmed.starts_with('[') {
1241 return Some(trimmed);
1242 }
1243 None
1244}
1245
1246#[async_trait]
1247impl DialogueHandler for LlmHandler {
1248 async fn on_start(&mut self) -> Result<Vec<Command>> {
1249 self.last_tts_start_at = Some(std::time::Instant::now());
1250
1251 let mut commands = Vec::new();
1252
1253 if let Some(scene_id) = &self.current_scene_id {
1255 if let Some(scene) = self.scenes.get(scene_id) {
1256 if let Some(audio_file) = &scene.play {
1257 commands.push(Command::Play {
1258 url: audio_file.clone(),
1259 play_id: None,
1260 auto_hangup: None,
1261 wait_input_timeout: None,
1262 });
1263 }
1264 }
1265 }
1266
1267 if let Some(greeting) = &self.config.greeting {
1268 self.is_speaking = true;
1269 commands.push(self.create_tts_command(greeting.clone(), None, None));
1270 return Ok(commands);
1271 }
1272
1273 let response_commands = self.generate_response().await?;
1274 commands.extend(response_commands);
1275 Ok(commands)
1276 }
1277
1278 async fn on_event(&mut self, event: &SessionEvent) -> Result<Vec<Command>> {
1279 match event {
1280 SessionEvent::Dtmf { digit, .. } => {
1281 info!("DTMF received: {}", digit);
1282 if let Some(action) = self.get_dtmf_action(digit) {
1283 self.handle_dtmf_action(action).await
1284 } else {
1285 Ok(vec![])
1286 }
1287 }
1288 SessionEvent::AsrFinal { text, .. } => self.handle_asr_final(text).await,
1289 SessionEvent::AsrDelta { is_filler, .. } | SessionEvent::Speaking { is_filler, .. } => {
1290 Ok(self
1291 .check_interruption(event, is_filler)
1292 .into_iter()
1293 .collect())
1294 }
1295 SessionEvent::Eou { completed, .. } => {
1296 if *completed && !self.is_speaking {
1297 info!("EOU detected, triggering early response");
1298 self.generate_response().await
1299 } else {
1300 Ok(vec![])
1301 }
1302 }
1303 SessionEvent::Silence { .. } => self.handle_silence().await,
1304 SessionEvent::TrackStart { .. } => {
1305 self.is_speaking = true;
1306 Ok(vec![])
1307 }
1308 SessionEvent::TrackEnd { .. } => {
1309 self.is_speaking = false;
1310 self.is_hanging_up = false;
1311 self.last_interaction_at = std::time::Instant::now();
1312 Ok(vec![])
1313 }
1314 SessionEvent::FunctionCall {
1315 name, arguments, ..
1316 } => self.handle_function_call(name, arguments).await,
1317 _ => Ok(vec![]),
1318 }
1319 }
1320
1321 async fn get_history(&self) -> Vec<ChatMessage> {
1322 self.history.clone()
1323 }
1324
1325 async fn summarize(&mut self, prompt: &str) -> Result<String> {
1326 info!("Generating summary with prompt: {}", prompt);
1327 let mut summary_history = self.history.clone();
1328 summary_history.push(ChatMessage {
1329 role: "user".to_string(),
1330 content: prompt.to_string(),
1331 });
1332
1333 self.provider.call(&self.config, &summary_history).await
1334 }
1335}