1use crate::call::Command;
2use crate::event::SessionEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use futures::StreamExt;
6use once_cell::sync::Lazy;
7use regex::Regex;
8use reqwest::Client;
9use serde_json::json;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tracing::{info, warn};
13
14#[cfg(test)]
15mod tests;
16
17static RE_HANGUP: Lazy<Regex> = Lazy::new(|| Regex::new(r"<hangup\s*/>").unwrap());
18static RE_REFER: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<refer\s+to="([^"]+)"\s*/>"#).unwrap());
19static RE_PLAY: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<play\s+file="([^"]+)"\s*/>"#).unwrap());
20static RE_GOTO: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<goto\s+scene="([^"]+)"\s*/>"#).unwrap());
21static RE_SENTENCE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)[.!?。!?\n]\s*").unwrap());
22static FILLERS: Lazy<std::collections::HashSet<String>> = Lazy::new(|| {
23 let mut s = std::collections::HashSet::new();
24 let default_fillers = ["嗯", "啊", "哦", "那个", "那个...", "uh", "um", "ah"];
25
26 if let Ok(content) = std::fs::read_to_string("config/fillers.txt") {
27 for line in content.lines() {
28 let trimmed = line.trim().to_lowercase();
29 if !trimmed.is_empty() {
30 s.insert(trimmed);
31 }
32 }
33 }
34
35 if s.is_empty() {
36 for f in default_fillers {
37 s.insert(f.to_string());
38 }
39 }
40 s
41});
42
43use super::ChatMessage;
44use super::InterruptionStrategy;
45use super::LlmConfig;
46use super::dialogue::DialogueHandler;
47
48pub mod provider;
49pub mod rag;
50pub mod types;
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53enum CommandKind {
54 Hangup,
55 Refer,
56 Sentence,
57 Play,
58 Goto,
59}
60
61pub use provider::*;
62pub use rag::*;
63pub use types::*;
64
65const MAX_RAG_ATTEMPTS: usize = 3;
66
67pub struct LlmHandler {
68 config: LlmConfig,
69 interruption_config: super::InterruptionConfig,
70 global_follow_up_config: Option<super::FollowUpConfig>,
71 dtmf_config: Option<HashMap<String, super::DtmfAction>>,
72 history: Vec<ChatMessage>,
73 provider: Arc<dyn LlmProvider>,
74 rag_retriever: Arc<dyn RagRetriever>,
75 is_speaking: bool,
76 is_hanging_up: bool,
77 consecutive_follow_ups: u32,
78 last_interaction_at: std::time::Instant,
79 event_sender: Option<crate::event::EventSender>,
80 last_asr_final_at: Option<std::time::Instant>,
81 last_tts_start_at: Option<std::time::Instant>,
82 last_robot_msg_at: Option<std::time::Instant>,
83 call: Option<crate::call::ActiveCallRef>,
84 scenes: HashMap<String, super::Scene>,
85 current_scene_id: Option<String>,
86 client: Client,
87}
88
89impl LlmHandler {
90 pub fn new(
91 config: LlmConfig,
92 interruption: super::InterruptionConfig,
93 global_follow_up_config: Option<super::FollowUpConfig>,
94 scenes: HashMap<String, super::Scene>,
95 dtmf: Option<HashMap<String, super::DtmfAction>>,
96 initial_scene_id: Option<String>,
97 ) -> Self {
98 Self::with_provider(
99 config,
100 Arc::new(DefaultLlmProvider::new()),
101 Arc::new(NoopRagRetriever),
102 interruption,
103 global_follow_up_config,
104 scenes,
105 dtmf,
106 initial_scene_id,
107 )
108 }
109
110 pub fn with_provider(
111 config: LlmConfig,
112 provider: Arc<dyn LlmProvider>,
113 rag_retriever: Arc<dyn RagRetriever>,
114 interruption: super::InterruptionConfig,
115 global_follow_up_config: Option<super::FollowUpConfig>,
116 scenes: HashMap<String, super::Scene>,
117 dtmf: Option<HashMap<String, super::DtmfAction>>,
118 initial_scene_id: Option<String>,
119 ) -> Self {
120 let mut history = Vec::new();
121 let system_prompt = Self::build_system_prompt(&config, None);
122
123 history.push(ChatMessage {
124 role: "system".to_string(),
125 content: system_prompt,
126 });
127
128 Self {
129 config,
130 interruption_config: interruption,
131 global_follow_up_config,
132 dtmf_config: dtmf,
133 history,
134 provider,
135 rag_retriever,
136 is_speaking: false,
137 is_hanging_up: false,
138 consecutive_follow_ups: 0,
139 last_interaction_at: std::time::Instant::now(),
140 event_sender: None,
141 last_asr_final_at: None,
142 last_tts_start_at: None,
143 last_robot_msg_at: None,
144 call: None,
145 scenes,
146 current_scene_id: initial_scene_id,
147 client: Client::new(),
148 }
149 }
150
151 fn build_system_prompt(config: &LlmConfig, scene_prompt: Option<&str>) -> String {
152 let base_prompt =
153 scene_prompt.unwrap_or_else(|| config.prompt.as_deref().unwrap_or_default());
154 let mut features_prompt = String::new();
155
156 if let Some(features) = &config.features {
157 let lang = config.language.as_deref().unwrap_or("zh");
158 for feature in features {
159 match Self::load_feature_snippet(feature, lang) {
160 Ok(snippet) => {
161 features_prompt.push_str(&format!("\n- {}", snippet));
162 }
163 Err(e) => {
164 warn!("Failed to load feature snippet {}: {}", feature, e);
165 }
166 }
167 }
168 }
169
170 let features_section = if features_prompt.is_empty() {
171 String::new()
172 } else {
173 format!("\n\n### Enhanced Capabilities:{}\n", features_prompt)
174 };
175
176 let tool_instructions = if let Some(custom) = &config.tool_instructions {
178 custom.clone()
179 } else {
180 let lang = config.language.as_deref().unwrap_or("zh");
181 Self::load_feature_snippet("tool_instructions", lang)
182 .unwrap_or_else(|_| {
183 Self::load_feature_snippet("tool_instructions", "en")
185 .unwrap_or_else(|_| {
186 "Tool usage instructions:\n\
188 - To hang up the call, output: <hangup/>\n\
189 - To transfer the call, output: <refer to=\"sip:xxxx\"/>\n\
190 - To play an audio file, output: <play file=\"path/to/file.wav\"/>\n\
191 - To switch to another scene, output: <goto scene=\"scene_id\"/>\n\
192 - To call an external HTTP API, output JSON:\n\
193 ```json\n\
194 {{ \"tools\": [{{ \"name\": \"http\", \"url\": \"...\", \"method\": \"POST\", \"body\": {{ ... }} }}] }}\n\
195 ```\n\
196 Please use XML tags for simple actions and JSON blocks for tool calls. \
197 Output your response in short sentences. Each sentence will be played as soon as it is finished."
198 .to_string()
199 })
200 })
201 };
202
203 format!(
204 "{}{}\n\n{}",
205 base_prompt, features_section, tool_instructions
206 )
207 }
208
209 fn load_feature_snippet(feature: &str, lang: &str) -> Result<String> {
210 let path = format!("features/{}.{}.md", feature, lang);
211 let content = std::fs::read_to_string(path)?;
212 Ok(content.trim().to_string())
213 }
214
215 fn get_dtmf_action(&self, digit: &str) -> Option<super::DtmfAction> {
216 if let Some(scene_id) = &self.current_scene_id {
217 if let Some(scene) = self.scenes.get(scene_id) {
218 if let Some(dtmf) = &scene.dtmf {
219 if let Some(action) = dtmf.get(digit) {
220 return Some(action.clone());
221 }
222 }
223 }
224 }
225
226 if let Some(dtmf) = &self.dtmf_config {
227 if let Some(action) = dtmf.get(digit) {
228 return Some(action.clone());
229 }
230 }
231
232 None
233 }
234
235 async fn handle_dtmf_action(&mut self, action: super::DtmfAction) -> Result<Vec<Command>> {
236 match action {
237 super::DtmfAction::Goto { scene } => {
238 info!("DTMF action: switch to scene {}", scene);
239 self.switch_to_scene(&scene, true).await
240 }
241 super::DtmfAction::Transfer { target } => {
242 info!("DTMF action: transfer to {}", target);
243 Ok(vec![Command::Refer {
244 caller: String::new(),
245 callee: target,
246 options: None,
247 }])
248 }
249 super::DtmfAction::Hangup => {
250 info!("DTMF action: hangup");
251 Ok(vec![Command::Hangup {
252 reason: Some("DTMF Hangup".to_string()),
253 initiator: Some("ai".to_string()),
254 }])
255 }
256 }
257 }
258
259 async fn switch_to_scene(
260 &mut self,
261 scene_id: &str,
262 trigger_response: bool,
263 ) -> Result<Vec<Command>> {
264 if let Some(scene) = self.scenes.get(scene_id).cloned() {
265 info!("Switching to scene: {}", scene_id);
266 self.current_scene_id = Some(scene_id.to_string());
267 let system_prompt = Self::build_system_prompt(&self.config, Some(&scene.prompt));
268 if let Some(first_msg) = self.history.get_mut(0) {
269 if first_msg.role == "system" {
270 first_msg.content = system_prompt;
271 }
272 }
273
274 let mut commands = Vec::new();
275 if let Some(url) = &scene.play {
276 commands.push(Command::Play {
277 url: url.clone(),
278 play_id: None,
279 auto_hangup: None,
280 wait_input_timeout: None,
281 });
282 }
283
284 if trigger_response {
285 let response_cmds = self.generate_response().await?;
286 commands.extend(response_cmds);
287 }
288 Ok(commands)
289 } else {
290 warn!("Scene not found: {}", scene_id);
291 Ok(vec![])
292 }
293 }
294
295 pub fn get_history_ref(&self) -> &[ChatMessage] {
296 &self.history
297 }
298
299 pub fn get_current_scene_id(&self) -> Option<String> {
300 self.current_scene_id.clone()
301 }
302
303 pub fn set_call(&mut self, call: crate::call::ActiveCallRef) {
304 self.call = Some(call);
305 }
306
307 pub fn set_event_sender(&mut self, sender: crate::event::EventSender) {
308 self.event_sender = Some(sender.clone());
309 if let Some(greeting) = &self.config.greeting {
310 let _ = sender.send(crate::event::SessionEvent::AddHistory {
311 sender: Some("system".to_string()),
312 timestamp: crate::media::get_timestamp(),
313 speaker: "assistant".to_string(),
314 text: greeting.clone(),
315 });
316 }
317 }
318
319 fn send_debug_event(&self, key: &str, data: serde_json::Value) {
320 if let Some(sender) = &self.event_sender {
321 let timestamp = crate::media::get_timestamp();
322 if key == "llm_response" {
323 if let Some(text) = data.get("response").and_then(|v| v.as_str()) {
324 let _ = sender.send(crate::event::SessionEvent::AddHistory {
325 sender: Some("llm".to_string()),
326 timestamp,
327 speaker: "assistant".to_string(),
328 text: text.to_string(),
329 });
330 }
331 }
332
333 let event = crate::event::SessionEvent::Metrics {
334 timestamp,
335 key: key.to_string(),
336 duration: 0,
337 data,
338 };
339 let _ = sender.send(event);
340 }
341 }
342
343 async fn call_llm(&self) -> Result<String> {
344 self.provider.call(&self.config, &self.history).await
345 }
346
347 fn create_tts_command(
348 &self,
349 text: String,
350 wait_input_timeout: Option<u32>,
351 auto_hangup: Option<bool>,
352 ) -> Command {
353 let timeout = wait_input_timeout.unwrap_or(10000);
354 let play_id = uuid::Uuid::new_v4().to_string();
355
356 if let Some(sender) = &self.event_sender {
357 let _ = sender.send(crate::event::SessionEvent::Metrics {
358 timestamp: crate::media::get_timestamp(),
359 key: "tts_play_id_map".to_string(),
360 duration: 0,
361 data: serde_json::json!({
362 "playId": play_id,
363 "text": text,
364 }),
365 });
366 }
367
368 Command::Tts {
369 text,
370 speaker: None,
371 play_id: Some(play_id),
372 auto_hangup,
373 streaming: None,
374 end_of_stream: Some(true),
375 option: None,
376 wait_input_timeout: Some(timeout),
377 base64: None,
378 }
379 }
380
381 async fn generate_response(&mut self) -> Result<Vec<Command>> {
382 let start_time = crate::media::get_timestamp();
383 let play_id = uuid::Uuid::new_v4().to_string();
384
385 self.send_debug_event(
387 "llm_call_start",
388 json!({
389 "history_length": self.history.len(),
390 "playId": play_id,
391 }),
392 );
393
394 let mut stream = self
395 .provider
396 .call_stream(&self.config, &self.history)
397 .await?;
398
399 let mut full_content = String::new();
400 let mut full_reasoning = String::new();
401 let mut buffer = String::new();
402 let mut commands = Vec::new();
403 let mut is_json_mode = false;
404 let mut checked_json_mode = false;
405 let mut first_token_time = None;
406
407 while let Some(chunk_result) = stream.next().await {
408 let event = match chunk_result {
409 Ok(c) => c,
410 Err(e) => {
411 warn!("LLM stream error: {}", e);
412 break;
413 }
414 };
415
416 match event {
417 LlmStreamEvent::Reasoning(text) => {
418 full_reasoning.push_str(&text);
419 }
420 LlmStreamEvent::Content(chunk) => {
421 if first_token_time.is_none() && !chunk.trim().is_empty() {
422 first_token_time = Some(crate::media::get_timestamp());
423 }
424
425 full_content.push_str(&chunk);
426 buffer.push_str(&chunk);
427
428 if !checked_json_mode {
429 let trimmed = full_content.trim();
430 if !trimmed.is_empty() {
431 if trimmed.starts_with('{') || trimmed.starts_with('`') {
432 is_json_mode = true;
433 }
434 checked_json_mode = true;
435 }
436 }
437
438 if checked_json_mode && !is_json_mode {
439 let extracted =
440 self.extract_streaming_commands(&mut buffer, &play_id, false);
441 for cmd in extracted {
442 if let Some(call) = &self.call {
443 let _ = call.enqueue_command(cmd).await;
444 } else {
445 commands.push(cmd);
446 }
447 }
448 }
449 }
450 }
451 }
452
453 let end_time = crate::media::get_timestamp();
455 self.send_debug_event(
456 "llm_response",
457 json!({
458 "response": full_content,
459 "reasoning": full_reasoning,
460 "is_json_mode": is_json_mode,
461 "duration": end_time - start_time,
462 "ttfb": first_token_time.map(|t| t - start_time).unwrap_or(0),
463 "playId": play_id,
464 }),
465 );
466
467 if is_json_mode {
468 self.interpret_response(full_content).await
469 } else {
470 let extracted = self.extract_streaming_commands(&mut buffer, &play_id, true);
471 for cmd in extracted {
472 if let Some(call) = &self.call {
473 let _ = call.enqueue_command(cmd).await;
474 } else {
475 commands.push(cmd);
476 }
477 }
478 if !full_content.trim().is_empty() {
479 self.history.push(ChatMessage {
480 role: "assistant".to_string(),
481 content: full_content,
482 });
483 self.last_robot_msg_at = Some(std::time::Instant::now());
484 self.is_speaking = true;
485 self.last_tts_start_at = Some(std::time::Instant::now());
486 }
487 Ok(commands)
488 }
489 }
490
491 fn extract_streaming_commands(
492 &mut self,
493 buffer: &mut String,
494 play_id: &str,
495 is_final: bool,
496 ) -> Vec<Command> {
497 let mut commands = Vec::new();
498
499 loop {
500 let hangup_pos = RE_HANGUP.find(buffer);
501 let refer_pos = RE_REFER.captures(buffer);
502 let play_pos = RE_PLAY.captures(buffer);
503 let goto_pos = RE_GOTO.captures(buffer);
504 let sentence_pos = RE_SENTENCE.find(buffer);
505
506 let mut positions: Vec<(usize, CommandKind)> = Vec::new();
508 if let Some(m) = hangup_pos {
509 positions.push((m.start(), CommandKind::Hangup));
510 }
511 if let Some(caps) = &refer_pos {
512 positions.push((caps.get(0).unwrap().start(), CommandKind::Refer));
513 }
514 if let Some(caps) = &play_pos {
515 positions.push((caps.get(0).unwrap().start(), CommandKind::Play));
516 }
517 if let Some(caps) = &goto_pos {
518 positions.push((caps.get(0).unwrap().start(), CommandKind::Goto));
519 }
520 if let Some(m) = sentence_pos {
521 positions.push((m.start(), CommandKind::Sentence));
522 }
523
524 positions.sort_by_key(|p| p.0);
525
526 if let Some((pos, kind)) = positions.first() {
527 let pos = *pos;
528 match kind {
529 CommandKind::Hangup => {
530 let prefix = buffer[..pos].to_string();
531 if !prefix.trim().is_empty() {
532 let mut cmd = self.create_tts_command_with_id(
533 prefix,
534 play_id.to_string(),
535 Some(true),
536 );
537 if let Command::Tts { end_of_stream, .. } = &mut cmd {
538 *end_of_stream = Some(true);
539 }
540 self.is_hanging_up = true;
541 commands.push(cmd);
542 } else {
543 let mut cmd = self.create_tts_command_with_id(
544 "".to_string(),
545 play_id.to_string(),
546 Some(true),
547 );
548 if let Command::Tts { end_of_stream, .. } = &mut cmd {
549 *end_of_stream = Some(true);
550 }
551 self.is_hanging_up = true;
552 commands.push(cmd);
553 }
554 buffer.drain(..RE_HANGUP.find(buffer).unwrap().end());
555 return commands;
556 }
557 CommandKind::Refer => {
558 let caps = RE_REFER.captures(buffer).unwrap();
559 let mat = caps.get(0).unwrap();
560 let callee = caps.get(1).unwrap().as_str().to_string();
561
562 let prefix = buffer[..pos].to_string();
563 if !prefix.trim().is_empty() {
564 commands.push(self.create_tts_command_with_id(
565 prefix,
566 play_id.to_string(),
567 None,
568 ));
569 }
570 commands.push(Command::Refer {
571 caller: String::new(),
572 callee,
573 options: None,
574 });
575 buffer.drain(..mat.end());
576 }
577 CommandKind::Play => {
578 let caps = RE_PLAY.captures(buffer).unwrap();
580 let mat = caps.get(0).unwrap();
581 let url = caps.get(1).unwrap().as_str().to_string();
582
583 let prefix = buffer[..pos].to_string();
584 if !prefix.trim().is_empty() {
585 commands.push(self.create_tts_command_with_id(
586 prefix,
587 play_id.to_string(),
588 None,
589 ));
590 }
591 commands.push(Command::Play {
592 url,
593 play_id: None,
594 auto_hangup: None,
595 wait_input_timeout: None,
596 });
597 buffer.drain(..mat.end());
598 }
599 CommandKind::Goto => {
600 let caps = RE_GOTO.captures(buffer).unwrap();
602 let mat = caps.get(0).unwrap();
603 let scene_id = caps.get(1).unwrap().as_str().to_string();
604
605 let prefix = buffer[..pos].to_string();
606 if !prefix.trim().is_empty() {
607 commands.push(self.create_tts_command_with_id(
608 prefix,
609 play_id.to_string(),
610 None,
611 ));
612 }
613
614 info!("Switching to scene (from stream): {}", scene_id);
615 if let Some(scene) = self.scenes.get(&scene_id) {
616 self.current_scene_id = Some(scene_id);
617 let system_prompt =
619 Self::build_system_prompt(&self.config, Some(&scene.prompt));
620 if let Some(first_msg) = self.history.get_mut(0) {
621 if first_msg.role == "system" {
622 first_msg.content = system_prompt;
623 }
624 }
625 } else {
626 warn!("Scene not found: {}", scene_id);
627 }
628
629 buffer.drain(..mat.end());
630 }
631 CommandKind::Sentence => {
632 let mat = sentence_pos.unwrap();
634 let sentence = buffer[..mat.end()].to_string();
635 if !sentence.trim().is_empty() {
636 commands.push(self.create_tts_command_with_id(
637 sentence,
638 play_id.to_string(),
639 None,
640 ));
641 }
642 buffer.drain(..mat.end());
643 }
644 }
645 } else {
646 break;
647 }
648 }
649
650 if is_final {
651 let remaining = buffer.trim().to_string();
652 if !remaining.is_empty() {
653 commands.push(self.create_tts_command_with_id(
654 remaining,
655 play_id.to_string(),
656 None,
657 ));
658 }
659 buffer.clear();
660
661 if let Some(last) = commands.last_mut() {
662 if let Command::Tts { end_of_stream, .. } = last {
663 *end_of_stream = Some(true);
664 }
665 } else if !self.is_hanging_up {
666 commands.push(Command::Tts {
667 text: "".to_string(),
668 speaker: None,
669 play_id: Some(play_id.to_string()),
670 auto_hangup: None,
671 streaming: Some(true),
672 end_of_stream: Some(true),
673 option: None,
674 wait_input_timeout: None,
675 base64: None,
676 });
677 }
678 }
679
680 commands
681 }
682
683 fn create_tts_command_with_id(
684 &self,
685 text: String,
686 play_id: String,
687 auto_hangup: Option<bool>,
688 ) -> Command {
689 Command::Tts {
690 text,
691 speaker: None,
692 play_id: Some(play_id),
693 auto_hangup,
694 streaming: Some(true),
695 end_of_stream: None,
696 option: None,
697 wait_input_timeout: Some(10000),
698 base64: None,
699 }
700 }
701
702 async fn handle_tool_invocation(
703 &mut self,
704 tool: ToolInvocation,
705 tool_commands: &mut Vec<Command>,
706 ) -> Result<bool> {
707 match tool {
708 ToolInvocation::Hangup {
709 ref reason,
710 ref initiator,
711 } => {
712 self.send_debug_event(
713 "tool_invocation",
714 json!({
715 "tool": "Hangup",
716 "params": {
717 "reason": reason,
718 "initiator": initiator,
719 }
720 }),
721 );
722 tool_commands.push(Command::Hangup {
723 reason: reason.clone(),
724 initiator: initiator.clone(),
725 });
726 Ok(false)
727 }
728 ToolInvocation::Refer {
729 ref caller,
730 ref callee,
731 ref options,
732 } => {
733 self.send_debug_event(
734 "tool_invocation",
735 json!({
736 "tool": "Refer",
737 "params": {
738 "caller": caller,
739 "callee": callee,
740 }
741 }),
742 );
743 tool_commands.push(Command::Refer {
744 caller: caller.clone(),
745 callee: callee.clone(),
746 options: options.clone(),
747 });
748 Ok(false)
749 }
750 ToolInvocation::Rag {
751 ref query,
752 ref source,
753 } => {
754 self.handle_rag_tool(query, source).await?;
755 Ok(true)
756 }
757 ToolInvocation::Accept { ref options } => {
758 self.send_debug_event("tool_invocation", json!({ "tool": "Accept" }));
759 tool_commands.push(Command::Accept {
760 option: options.clone().unwrap_or_default(),
761 });
762 Ok(false)
763 }
764 ToolInvocation::Reject { ref reason, code } => {
765 self.send_debug_event(
766 "tool_invocation",
767 json!({
768 "tool": "Reject",
769 "params": {
770 "reason": reason,
771 "code": code,
772 }
773 }),
774 );
775 tool_commands.push(Command::Reject {
776 reason: reason
777 .clone()
778 .unwrap_or_else(|| "Rejected by agent".to_string()),
779 code,
780 });
781 Ok(false)
782 }
783 ToolInvocation::Http {
784 ref url,
785 ref method,
786 ref body,
787 ref headers,
788 } => {
789 self.handle_http_tool(url, method, body, headers).await?;
790 Ok(true)
791 }
792 }
793 }
794
795 async fn handle_rag_tool(&mut self, query: &str, source: &Option<String>) -> Result<()> {
796 self.send_debug_event(
797 "tool_invocation",
798 json!({
799 "tool": "Rag",
800 "params": {
801 "query": query,
802 "source": source,
803 }
804 }),
805 );
806
807 let rag_result = self.rag_retriever.retrieve(query).await?;
808
809 self.send_debug_event(
810 "rag_result",
811 json!({
812 "query": query,
813 "result": rag_result,
814 }),
815 );
816
817 let summary = if let Some(source) = source {
818 format!("[{}] {}", source, rag_result)
819 } else {
820 rag_result
821 };
822
823 self.history.push(ChatMessage {
824 role: "system".to_string(),
825 content: format!("RAG result for {}: {}", query, summary),
826 });
827
828 Ok(())
829 }
830
831 async fn handle_http_tool(
832 &mut self,
833 url: &str,
834 method: &Option<String>,
835 body: &Option<serde_json::Value>,
836 headers: &Option<HashMap<String, String>>,
837 ) -> Result<()> {
838 let method_str = method.as_deref().unwrap_or("GET").to_uppercase();
839 let method =
840 reqwest::Method::from_bytes(method_str.as_bytes()).unwrap_or(reqwest::Method::GET);
841
842 self.send_debug_event(
843 "tool_invocation",
844 json!({
845 "tool": "Http",
846 "params": {
847 "url": url,
848 "method": method_str,
849 }
850 }),
851 );
852
853 let mut req = self.client.request(method, url);
854 if let Some(body) = body {
855 req = req.json(body);
856 }
857 if let Some(headers) = headers {
858 for (k, v) in headers {
859 req = req.header(k, v);
860 }
861 }
862
863 match req.send().await {
864 Ok(res) => {
865 let status = res.status();
866 let text = res.text().await.unwrap_or_default();
867 self.history.push(ChatMessage {
868 role: "system".to_string(),
869 content: format!("HTTP tool response ({}): {}", status, text),
870 });
871 }
872 Err(e) => {
873 warn!("HTTP tool failed: {}", e);
874 self.history.push(ChatMessage {
875 role: "system".to_string(),
876 content: format!("HTTP tool failed: {}", e),
877 });
878 }
879 }
880
881 Ok(())
882 }
883
884 async fn handle_asr_final(&mut self, text: &str) -> Result<Vec<Command>> {
885 if text.trim().is_empty() {
886 return Ok(vec![]);
887 }
888
889 self.apply_context_repair(text);
890 self.apply_rolling_summary().await;
891
892 self.last_asr_final_at = Some(std::time::Instant::now());
893 self.last_interaction_at = std::time::Instant::now();
894 self.is_speaking = false;
895 self.consecutive_follow_ups = 0;
896
897 self.generate_response().await
898 }
899
900 fn apply_context_repair(&mut self, text: &str) {
901 let enable_repair = self
902 .config
903 .features
904 .as_ref()
905 .map(|f| f.contains(&"context_repair".to_string()))
906 .unwrap_or(false);
907
908 if !enable_repair {
909 self.history.push(ChatMessage {
910 role: "user".to_string(),
911 content: text.to_string(),
912 });
913 return;
914 }
915
916 let repair_window_ms = self.config.repair_window_ms.unwrap_or(3000) as u128;
917 let mut merged = false;
918
919 if let Some(last_robot_at) = self.last_robot_msg_at {
920 if last_robot_at.elapsed().as_millis() < repair_window_ms {
921 if let Some(last_msg) = self.history.last() {
922 if last_msg.role == "assistant" && last_msg.content.chars().count() < 15 {
923 info!(
924 "Context Repair: Detected potential fragmentation. Triggering merge."
925 );
926 self.history.pop();
927 if let Some(prev_user) = self.history.last_mut() {
928 if prev_user.role == "user" {
929 prev_user.content.push_str(",");
930 prev_user.content.push_str(text);
931 merged = true;
932 }
933 }
934 }
935 }
936 }
937 }
938
939 if !merged {
940 self.history.push(ChatMessage {
941 role: "user".to_string(),
942 content: text.to_string(),
943 });
944 }
945 }
946
947 async fn apply_rolling_summary(&mut self) {
948 let enable_summary = self
949 .config
950 .features
951 .as_ref()
952 .map(|f| f.contains(&"rolling_summary".to_string()))
953 .unwrap_or(false);
954
955 if !enable_summary {
956 return;
957 }
958
959 let summary_limit = self.config.summary_limit.unwrap_or(20);
960 if self.history.len() <= summary_limit {
961 return;
962 }
963
964 info!("Rolling Summary: History limit reached. Triggering background summary.");
965 let keep_recent = 6;
966 if self.history.len() <= summary_limit + keep_recent
967 || self.history.len() <= keep_recent + 1
968 {
969 return;
970 }
971
972 let split_idx = self.history.len() - keep_recent;
973 let to_summarize = self.history[1..split_idx].to_vec();
974 let recent = self.history[split_idx..].to_vec();
975
976 let summary_prompt =
977 "Summarize the above conversation so far, focusing on key details and user intent.";
978 let mut summary_req_history = to_summarize;
979 summary_req_history.push(ChatMessage {
980 role: "user".to_string(),
981 content: summary_prompt.to_string(),
982 });
983
984 match self.provider.call(&self.config, &summary_req_history).await {
985 Ok(summary) => {
986 let mut new_history = Vec::new();
987 if let Some(sys) = self.history.first() {
988 let mut new_sys = sys.clone();
989 new_sys.content.push_str("\n\n[Previous Context Summary]: ");
990 new_sys.content.push_str(&summary);
991 new_history.push(new_sys);
992 }
993 new_history.extend(recent);
994 self.history = new_history;
995 info!(
996 "Rolling Summary: Applied summary. New history len: {}",
997 self.history.len()
998 );
999 }
1000 Err(e) => {
1001 warn!("Rolling Summary failed: {}", e);
1002 }
1003 }
1004 }
1005
1006 fn check_interruption(
1007 &mut self,
1008 event: &SessionEvent,
1009 is_filler: &Option<bool>,
1010 ) -> Option<Command> {
1011 let strategy = self.interruption_config.strategy;
1012 let should_check = match (strategy, event) {
1013 (InterruptionStrategy::None, _) => false,
1014 (InterruptionStrategy::Vad, SessionEvent::Speaking { .. }) => true,
1015 (InterruptionStrategy::Asr, SessionEvent::AsrDelta { .. }) => true,
1016 (InterruptionStrategy::Both, _) => true,
1017 _ => false,
1018 };
1019
1020 if !self.is_speaking || self.is_hanging_up || !should_check {
1021 return None;
1022 }
1023
1024 if let Some(last_start) = self.last_tts_start_at {
1026 let ignore_ms = self.interruption_config.ignore_first_ms.unwrap_or(800);
1027 if last_start.elapsed().as_millis() < ignore_ms as u128 {
1028 return None;
1029 }
1030 }
1031
1032 if self.interruption_config.filler_word_filter.unwrap_or(false) {
1034 if let Some(true) = is_filler {
1035 return None;
1036 }
1037 if let SessionEvent::AsrDelta { text, .. } = event {
1038 if is_likely_filler(text) {
1039 return None;
1040 }
1041 }
1042 }
1043
1044 if let Some(last_final) = self.last_asr_final_at {
1046 if last_final.elapsed().as_millis() < 500 {
1047 return None;
1048 }
1049 }
1050
1051 info!("Smart interruption detected, stopping playback");
1052 self.is_speaking = false;
1053 Some(Command::Interrupt {
1054 graceful: Some(true),
1055 fade_out_ms: self.interruption_config.volume_fade_ms,
1056 })
1057 }
1058
1059 async fn handle_silence(&mut self) -> Result<Vec<Command>> {
1060 let follow_up_config = if let Some(scene_id) = &self.current_scene_id {
1061 self.scenes
1062 .get(scene_id)
1063 .and_then(|s| s.follow_up)
1064 .or(self.global_follow_up_config)
1065 } else {
1066 self.global_follow_up_config
1067 };
1068
1069 let Some(config) = follow_up_config else {
1070 return Ok(vec![]);
1071 };
1072
1073 if self.is_speaking
1074 || self.last_interaction_at.elapsed().as_millis() < config.timeout as u128
1075 {
1076 return Ok(vec![]);
1077 }
1078
1079 if self.consecutive_follow_ups >= config.max_count {
1080 info!("Max follow-up count reached, hanging up");
1081 return Ok(vec![Command::Hangup {
1082 reason: Some("Max follow-up reached".to_string()),
1083 initiator: Some("system".to_string()),
1084 }]);
1085 }
1086
1087 info!(
1088 "Silence timeout detected ({}ms), triggering follow-up ({}/{})",
1089 self.last_interaction_at.elapsed().as_millis(),
1090 self.consecutive_follow_ups + 1,
1091 config.max_count
1092 );
1093 self.consecutive_follow_ups += 1;
1094 self.last_interaction_at = std::time::Instant::now();
1095 self.generate_response().await
1096 }
1097
1098 async fn handle_function_call(&mut self, name: &str, arguments: &str) -> Result<Vec<Command>> {
1099 info!(
1100 "Function call from Realtime: {} with args {}",
1101 name, arguments
1102 );
1103 let args: serde_json::Value = serde_json::from_str(arguments).unwrap_or_default();
1104
1105 match name {
1106 "hangup_call" => Ok(vec![Command::Hangup {
1107 reason: args["reason"].as_str().map(|s| s.to_string()),
1108 initiator: Some("ai".to_string()),
1109 }]),
1110 "transfer_call" | "refer_call" => {
1111 if let Some(callee) = args["callee"]
1112 .as_str()
1113 .or_else(|| args["callee_uri"].as_str())
1114 {
1115 Ok(vec![Command::Refer {
1116 caller: String::new(),
1117 callee: callee.to_string(),
1118 options: None,
1119 }])
1120 } else {
1121 warn!("No callee provided for transfer_call");
1122 Ok(vec![])
1123 }
1124 }
1125 "goto_scene" => {
1126 if let Some(scene) = args["scene"].as_str() {
1127 self.switch_to_scene(scene, false).await
1128 } else {
1129 Ok(vec![])
1130 }
1131 }
1132 _ => {
1133 warn!("Unhandled function call: {}", name);
1134 Ok(vec![])
1135 }
1136 }
1137 }
1138
1139 async fn interpret_response(&mut self, initial: String) -> Result<Vec<Command>> {
1140 let mut tool_commands = Vec::new();
1141 let mut wait_input_timeout = None;
1142 let mut attempts = 0;
1143 let mut raw = initial;
1144
1145 let final_text = loop {
1146 attempts += 1;
1147
1148 let Some(structured) = parse_structured_response(&raw) else {
1149 break Some(raw);
1150 };
1151
1152 if wait_input_timeout.is_none() {
1153 wait_input_timeout = structured.wait_input_timeout;
1154 }
1155
1156 let mut rerun_for_rag = false;
1157 if let Some(tools) = structured.tools {
1158 for tool in tools {
1159 let needs_rerun = self
1160 .handle_tool_invocation(tool, &mut tool_commands)
1161 .await?;
1162 rerun_for_rag = rerun_for_rag || needs_rerun;
1163 }
1164 }
1165
1166 if !rerun_for_rag {
1167 break structured.text;
1168 }
1169
1170 if attempts >= MAX_RAG_ATTEMPTS {
1171 warn!("Reached RAG iteration limit, using last response");
1172 break structured.text.or(Some(raw));
1173 }
1174
1175 raw = self.call_llm().await?;
1176 };
1177
1178 let has_hangup = tool_commands
1179 .iter()
1180 .any(|c| matches!(c, Command::Hangup { .. }));
1181 let mut commands = Vec::new();
1182
1183 if let Some(text) = final_text {
1184 if !text.trim().is_empty() {
1185 self.history.push(ChatMessage {
1186 role: "assistant".to_string(),
1187 content: text.clone(),
1188 });
1189 self.last_tts_start_at = Some(std::time::Instant::now());
1190 self.is_speaking = true;
1191
1192 let auto_hangup = has_hangup.then_some(true);
1193 commands.push(self.create_tts_command(text, wait_input_timeout, auto_hangup));
1194
1195 if has_hangup {
1196 tool_commands.retain(|c| !matches!(c, Command::Hangup { .. }));
1197 self.is_hanging_up = true;
1198 }
1199 }
1200 }
1201
1202 commands.extend(tool_commands);
1203 Ok(commands)
1204 }
1205}
1206
1207fn parse_structured_response(raw: &str) -> Option<StructuredResponse> {
1208 let payload = extract_json_block(raw)?;
1209 serde_json::from_str(payload).ok()
1210}
1211
1212fn is_likely_filler(text: &str) -> bool {
1213 let trimmed = text.trim().to_lowercase();
1214 FILLERS.contains(&trimmed)
1215}
1216
1217fn extract_json_block(raw: &str) -> Option<&str> {
1218 let trimmed = raw.trim();
1219 if trimmed.starts_with('`') {
1220 if let Some(end) = trimmed.rfind("```") {
1221 if end <= 3 {
1222 return None;
1223 }
1224 let mut inner = &trimmed[3..end];
1225 inner = inner.trim();
1226 if inner.to_lowercase().starts_with("json") {
1227 if let Some(newline) = inner.find('\n') {
1228 inner = inner[newline + 1..].trim();
1229 } else if inner.len() > 4 {
1230 inner = inner[4..].trim();
1231 } else {
1232 inner = inner.trim();
1233 }
1234 }
1235 return Some(inner);
1236 }
1237 } else if trimmed.starts_with('{') || trimmed.starts_with('[') {
1238 return Some(trimmed);
1239 }
1240 None
1241}
1242
1243#[async_trait]
1244impl DialogueHandler for LlmHandler {
1245 async fn on_start(&mut self) -> Result<Vec<Command>> {
1246 self.last_tts_start_at = Some(std::time::Instant::now());
1247
1248 let mut commands = Vec::new();
1249
1250 if let Some(scene_id) = &self.current_scene_id {
1252 if let Some(scene) = self.scenes.get(scene_id) {
1253 if let Some(audio_file) = &scene.play {
1254 commands.push(Command::Play {
1255 url: audio_file.clone(),
1256 play_id: None,
1257 auto_hangup: None,
1258 wait_input_timeout: None,
1259 });
1260 }
1261 }
1262 }
1263
1264 if let Some(greeting) = &self.config.greeting {
1265 self.is_speaking = true;
1266 commands.push(self.create_tts_command(greeting.clone(), None, None));
1267 return Ok(commands);
1268 }
1269
1270 let response_commands = self.generate_response().await?;
1271 commands.extend(response_commands);
1272 Ok(commands)
1273 }
1274
1275 async fn on_event(&mut self, event: &SessionEvent) -> Result<Vec<Command>> {
1276 match event {
1277 SessionEvent::Dtmf { digit, .. } => {
1278 info!("DTMF received: {}", digit);
1279 if let Some(action) = self.get_dtmf_action(digit) {
1280 self.handle_dtmf_action(action).await
1281 } else {
1282 Ok(vec![])
1283 }
1284 }
1285 SessionEvent::AsrFinal { text, .. } => self.handle_asr_final(text).await,
1286 SessionEvent::AsrDelta { is_filler, .. } | SessionEvent::Speaking { is_filler, .. } => {
1287 Ok(self
1288 .check_interruption(event, is_filler)
1289 .into_iter()
1290 .collect())
1291 }
1292 SessionEvent::Eou { completed, .. } => {
1293 if *completed && !self.is_speaking {
1294 info!("EOU detected, triggering early response");
1295 self.generate_response().await
1296 } else {
1297 Ok(vec![])
1298 }
1299 }
1300 SessionEvent::Silence { .. } => self.handle_silence().await,
1301 SessionEvent::TrackStart { .. } => {
1302 self.is_speaking = true;
1303 Ok(vec![])
1304 }
1305 SessionEvent::TrackEnd { .. } => {
1306 self.is_speaking = false;
1307 self.is_hanging_up = false;
1308 self.last_interaction_at = std::time::Instant::now();
1309 Ok(vec![])
1310 }
1311 SessionEvent::FunctionCall {
1312 name, arguments, ..
1313 } => self.handle_function_call(name, arguments).await,
1314 _ => Ok(vec![]),
1315 }
1316 }
1317
1318 async fn get_history(&self) -> Vec<ChatMessage> {
1319 self.history.clone()
1320 }
1321
1322 async fn summarize(&mut self, prompt: &str) -> Result<String> {
1323 info!("Generating summary with prompt: {}", prompt);
1324 let mut summary_history = self.history.clone();
1325 summary_history.push(ChatMessage {
1326 role: "user".to_string(),
1327 content: prompt.to_string(),
1328 });
1329
1330 self.provider.call(&self.config, &summary_history).await
1331 }
1332}