1use crate::call::Command;
2use crate::event::SessionEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use futures::StreamExt;
6use once_cell::sync::Lazy;
7use regex::Regex;
8use reqwest::Client;
9use serde_json::json;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tracing::{info, warn};
13
14#[cfg(test)]
15mod tests;
16
17static RE_HANGUP: Lazy<Regex> = Lazy::new(|| Regex::new(r"<hangup\s*/>").unwrap());
18static RE_REFER: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<refer\s+to="([^"]+)"\s*/>"#).unwrap());
19static RE_PLAY: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<play\s+file="([^"]+)"\s*/>"#).unwrap());
20static RE_GOTO: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<goto\s+scene="([^"]+)"\s*/>"#).unwrap());
21static RE_SENTENCE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)[.!?。!?\n]\s*").unwrap());
22static FILLERS: Lazy<std::collections::HashSet<String>> = Lazy::new(|| {
23 let mut s = std::collections::HashSet::new();
24 let default_fillers = ["嗯", "啊", "哦", "那个", "那个...", "uh", "um", "ah"];
25
26 if let Ok(content) = std::fs::read_to_string("config/fillers.txt") {
27 for line in content.lines() {
28 let trimmed = line.trim().to_lowercase();
29 if !trimmed.is_empty() {
30 s.insert(trimmed);
31 }
32 }
33 }
34
35 if s.is_empty() {
36 for f in default_fillers {
37 s.insert(f.to_string());
38 }
39 }
40 s
41});
42
43use super::ChatMessage;
44use super::InterruptionStrategy;
45use super::LlmConfig;
46use super::dialogue::DialogueHandler;
47
48pub mod provider;
49pub mod rag;
50pub mod types;
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53enum CommandKind {
54 Hangup,
55 Refer,
56 Sentence,
57 Play,
58 Goto,
59}
60
61pub use provider::*;
62pub use rag::*;
63pub use types::*;
64
65const MAX_RAG_ATTEMPTS: usize = 3;
66
67pub struct LlmHandler {
68 config: LlmConfig,
69 interruption_config: super::InterruptionConfig,
70 global_follow_up_config: Option<super::FollowUpConfig>,
71 dtmf_config: Option<HashMap<String, super::DtmfAction>>,
72 history: Vec<ChatMessage>,
73 provider: Arc<dyn LlmProvider>,
74 rag_retriever: Arc<dyn RagRetriever>,
75 is_speaking: bool,
76 is_hanging_up: bool,
77 consecutive_follow_ups: u32,
78 last_interaction_at: std::time::Instant,
79 event_sender: Option<crate::event::EventSender>,
80 last_asr_final_at: Option<std::time::Instant>,
81 last_tts_start_at: Option<std::time::Instant>,
82 last_robot_msg_at: Option<std::time::Instant>,
83 call: Option<crate::call::ActiveCallRef>,
84 scenes: HashMap<String, super::Scene>,
85 current_scene_id: Option<String>,
86 client: Client,
87}
88
89impl LlmHandler {
90 pub fn new(
91 config: LlmConfig,
92 interruption: super::InterruptionConfig,
93 global_follow_up_config: Option<super::FollowUpConfig>,
94 scenes: HashMap<String, super::Scene>,
95 dtmf: Option<HashMap<String, super::DtmfAction>>,
96 initial_scene_id: Option<String>,
97 ) -> Self {
98 Self::with_provider(
99 config,
100 Arc::new(DefaultLlmProvider::new()),
101 Arc::new(NoopRagRetriever),
102 interruption,
103 global_follow_up_config,
104 scenes,
105 dtmf,
106 initial_scene_id,
107 )
108 }
109
110 pub fn with_provider(
111 config: LlmConfig,
112 provider: Arc<dyn LlmProvider>,
113 rag_retriever: Arc<dyn RagRetriever>,
114 interruption: super::InterruptionConfig,
115 global_follow_up_config: Option<super::FollowUpConfig>,
116 scenes: HashMap<String, super::Scene>,
117 dtmf: Option<HashMap<String, super::DtmfAction>>,
118 initial_scene_id: Option<String>,
119 ) -> Self {
120 let mut history = Vec::new();
121 let system_prompt = Self::build_system_prompt(&config, None);
122
123 history.push(ChatMessage {
124 role: "system".to_string(),
125 content: system_prompt,
126 });
127
128 Self {
129 config,
130 interruption_config: interruption,
131 global_follow_up_config,
132 dtmf_config: dtmf,
133 history,
134 provider,
135 rag_retriever,
136 is_speaking: false,
137 is_hanging_up: false,
138 consecutive_follow_ups: 0,
139 last_interaction_at: std::time::Instant::now(),
140 event_sender: None,
141 last_asr_final_at: None,
142 last_tts_start_at: None,
143 last_robot_msg_at: None,
144 call: None,
145 scenes,
146 current_scene_id: initial_scene_id,
147 client: Client::new(),
148 }
149 }
150
151 fn build_system_prompt(config: &LlmConfig, scene_prompt: Option<&str>) -> String {
152 let base_prompt =
153 scene_prompt.unwrap_or_else(|| config.prompt.as_deref().unwrap_or_default());
154 let mut features_prompt = String::new();
155
156 if let Some(features) = &config.features {
157 let lang = config.language.as_deref().unwrap_or("zh");
158 for feature in features {
159 match Self::load_feature_snippet(feature, lang) {
160 Ok(snippet) => {
161 features_prompt.push_str(&format!("\n- {}", snippet));
162 }
163 Err(e) => {
164 warn!("Failed to load feature snippet {}: {}", feature, e);
165 }
166 }
167 }
168 }
169
170 let features_section = if features_prompt.is_empty() {
171 String::new()
172 } else {
173 format!("\n\n### Enhanced Capabilities:{}\n", features_prompt)
174 };
175
176 format!(
177 "{}{}\n\n\
178 Tool usage instructions:\n\
179 - To hang up the call, output: <hangup/>\n\
180 - To transfer the call, output: <refer to=\"sip:xxxx\"/>\n\
181 - To play an audio file, output: <play file=\"path/to/file.wav\"/>\n\
182 - To switch to another scene, output: <goto scene=\"scene_id\"/>\n\
183 - To call an external HTTP API, output JSON:\n\
184 ```json\n\
185 {{ \"tools\": [{{ \"name\": \"http\", \"url\": \"...\", \"method\": \"POST\", \"body\": {{ ... }} }}] }}\n\
186 ```\n\
187 Please use XML tags for simple actions and JSON blocks for tool calls. \
188 Output your response in short sentences. Each sentence will be played as soon as it is finished.",
189 base_prompt, features_section
190 )
191 }
192
193 fn load_feature_snippet(feature: &str, lang: &str) -> Result<String> {
194 let path = format!("features/{}.{}.md", feature, lang);
195 let content = std::fs::read_to_string(path)?;
196 Ok(content.trim().to_string())
197 }
198
199 fn get_dtmf_action(&self, digit: &str) -> Option<super::DtmfAction> {
200 if let Some(scene_id) = &self.current_scene_id {
201 if let Some(scene) = self.scenes.get(scene_id) {
202 if let Some(dtmf) = &scene.dtmf {
203 if let Some(action) = dtmf.get(digit) {
204 return Some(action.clone());
205 }
206 }
207 }
208 }
209
210 if let Some(dtmf) = &self.dtmf_config {
211 if let Some(action) = dtmf.get(digit) {
212 return Some(action.clone());
213 }
214 }
215
216 None
217 }
218
219 async fn handle_dtmf_action(&mut self, action: super::DtmfAction) -> Result<Vec<Command>> {
220 match action {
221 super::DtmfAction::Goto { scene } => {
222 info!("DTMF action: switch to scene {}", scene);
223 self.switch_to_scene(&scene, true).await
224 }
225 super::DtmfAction::Transfer { target } => {
226 info!("DTMF action: transfer to {}", target);
227 Ok(vec![Command::Refer {
228 caller: String::new(),
229 callee: target,
230 options: None,
231 }])
232 }
233 super::DtmfAction::Hangup => {
234 info!("DTMF action: hangup");
235 Ok(vec![Command::Hangup {
236 reason: Some("DTMF Hangup".to_string()),
237 initiator: Some("ai".to_string()),
238 }])
239 }
240 }
241 }
242
243 async fn switch_to_scene(
244 &mut self,
245 scene_id: &str,
246 trigger_response: bool,
247 ) -> Result<Vec<Command>> {
248 if let Some(scene) = self.scenes.get(scene_id).cloned() {
249 info!("Switching to scene: {}", scene_id);
250 self.current_scene_id = Some(scene_id.to_string());
251 let system_prompt = Self::build_system_prompt(&self.config, Some(&scene.prompt));
252 if let Some(first_msg) = self.history.get_mut(0) {
253 if first_msg.role == "system" {
254 first_msg.content = system_prompt;
255 }
256 }
257
258 let mut commands = Vec::new();
259 if let Some(url) = &scene.play {
260 commands.push(Command::Play {
261 url: url.clone(),
262 play_id: None,
263 auto_hangup: None,
264 wait_input_timeout: None,
265 });
266 }
267
268 if trigger_response {
269 let response_cmds = self.generate_response().await?;
270 commands.extend(response_cmds);
271 }
272 Ok(commands)
273 } else {
274 warn!("Scene not found: {}", scene_id);
275 Ok(vec![])
276 }
277 }
278
279 pub fn get_history_ref(&self) -> &[ChatMessage] {
280 &self.history
281 }
282
283 pub fn get_current_scene_id(&self) -> Option<String> {
284 self.current_scene_id.clone()
285 }
286
287 pub fn set_call(&mut self, call: crate::call::ActiveCallRef) {
288 self.call = Some(call);
289 }
290
291 pub fn set_event_sender(&mut self, sender: crate::event::EventSender) {
292 self.event_sender = Some(sender.clone());
293 if let Some(greeting) = &self.config.greeting {
294 let _ = sender.send(crate::event::SessionEvent::AddHistory {
295 sender: Some("system".to_string()),
296 timestamp: crate::media::get_timestamp(),
297 speaker: "assistant".to_string(),
298 text: greeting.clone(),
299 });
300 }
301 }
302
303 fn send_debug_event(&self, key: &str, data: serde_json::Value) {
304 if let Some(sender) = &self.event_sender {
305 let timestamp = crate::media::get_timestamp();
306 if key == "llm_response" {
307 if let Some(text) = data.get("response").and_then(|v| v.as_str()) {
308 let _ = sender.send(crate::event::SessionEvent::AddHistory {
309 sender: Some("llm".to_string()),
310 timestamp,
311 speaker: "assistant".to_string(),
312 text: text.to_string(),
313 });
314 }
315 }
316
317 let event = crate::event::SessionEvent::Metrics {
318 timestamp,
319 key: key.to_string(),
320 duration: 0,
321 data,
322 };
323 let _ = sender.send(event);
324 }
325 }
326
327 async fn call_llm(&self) -> Result<String> {
328 self.provider.call(&self.config, &self.history).await
329 }
330
331 fn create_tts_command(
332 &self,
333 text: String,
334 wait_input_timeout: Option<u32>,
335 auto_hangup: Option<bool>,
336 ) -> Command {
337 let timeout = wait_input_timeout.unwrap_or(10000);
338 let play_id = uuid::Uuid::new_v4().to_string();
339
340 if let Some(sender) = &self.event_sender {
341 let _ = sender.send(crate::event::SessionEvent::Metrics {
342 timestamp: crate::media::get_timestamp(),
343 key: "tts_play_id_map".to_string(),
344 duration: 0,
345 data: serde_json::json!({
346 "playId": play_id,
347 "text": text,
348 }),
349 });
350 }
351
352 Command::Tts {
353 text,
354 speaker: None,
355 play_id: Some(play_id),
356 auto_hangup,
357 streaming: None,
358 end_of_stream: Some(true),
359 option: None,
360 wait_input_timeout: Some(timeout),
361 base64: None,
362 }
363 }
364
365 async fn generate_response(&mut self) -> Result<Vec<Command>> {
366 let start_time = crate::media::get_timestamp();
367 let play_id = uuid::Uuid::new_v4().to_string();
368
369 self.send_debug_event(
371 "llm_call_start",
372 json!({
373 "history_length": self.history.len(),
374 "playId": play_id,
375 }),
376 );
377
378 let mut stream = self
379 .provider
380 .call_stream(&self.config, &self.history)
381 .await?;
382
383 let mut full_content = String::new();
384 let mut full_reasoning = String::new();
385 let mut buffer = String::new();
386 let mut commands = Vec::new();
387 let mut is_json_mode = false;
388 let mut checked_json_mode = false;
389 let mut first_token_time = None;
390
391 while let Some(chunk_result) = stream.next().await {
392 let event = match chunk_result {
393 Ok(c) => c,
394 Err(e) => {
395 warn!("LLM stream error: {}", e);
396 break;
397 }
398 };
399
400 match event {
401 LlmStreamEvent::Reasoning(text) => {
402 full_reasoning.push_str(&text);
403 }
404 LlmStreamEvent::Content(chunk) => {
405 if first_token_time.is_none() && !chunk.trim().is_empty() {
406 first_token_time = Some(crate::media::get_timestamp());
407 }
408
409 full_content.push_str(&chunk);
410 buffer.push_str(&chunk);
411
412 if !checked_json_mode {
413 let trimmed = full_content.trim();
414 if !trimmed.is_empty() {
415 if trimmed.starts_with('{') || trimmed.starts_with('`') {
416 is_json_mode = true;
417 }
418 checked_json_mode = true;
419 }
420 }
421
422 if checked_json_mode && !is_json_mode {
423 let extracted =
424 self.extract_streaming_commands(&mut buffer, &play_id, false);
425 for cmd in extracted {
426 if let Some(call) = &self.call {
427 let _ = call.enqueue_command(cmd).await;
428 } else {
429 commands.push(cmd);
430 }
431 }
432 }
433 }
434 }
435 }
436
437 let end_time = crate::media::get_timestamp();
439 self.send_debug_event(
440 "llm_response",
441 json!({
442 "response": full_content,
443 "reasoning": full_reasoning,
444 "is_json_mode": is_json_mode,
445 "duration": end_time - start_time,
446 "ttfb": first_token_time.map(|t| t - start_time).unwrap_or(0),
447 "playId": play_id,
448 }),
449 );
450
451 if is_json_mode {
452 self.interpret_response(full_content).await
453 } else {
454 let extracted = self.extract_streaming_commands(&mut buffer, &play_id, true);
455 for cmd in extracted {
456 if let Some(call) = &self.call {
457 let _ = call.enqueue_command(cmd).await;
458 } else {
459 commands.push(cmd);
460 }
461 }
462 if !full_content.trim().is_empty() {
463 self.history.push(ChatMessage {
464 role: "assistant".to_string(),
465 content: full_content,
466 });
467 self.last_robot_msg_at = Some(std::time::Instant::now());
468 self.is_speaking = true;
469 self.last_tts_start_at = Some(std::time::Instant::now());
470 }
471 Ok(commands)
472 }
473 }
474
475 fn extract_streaming_commands(
476 &mut self,
477 buffer: &mut String,
478 play_id: &str,
479 is_final: bool,
480 ) -> Vec<Command> {
481 let mut commands = Vec::new();
482
483 loop {
484 let hangup_pos = RE_HANGUP.find(buffer);
485 let refer_pos = RE_REFER.captures(buffer);
486 let play_pos = RE_PLAY.captures(buffer);
487 let goto_pos = RE_GOTO.captures(buffer);
488 let sentence_pos = RE_SENTENCE.find(buffer);
489
490 let mut positions: Vec<(usize, CommandKind)> = Vec::new();
492 if let Some(m) = hangup_pos {
493 positions.push((m.start(), CommandKind::Hangup));
494 }
495 if let Some(caps) = &refer_pos {
496 positions.push((caps.get(0).unwrap().start(), CommandKind::Refer));
497 }
498 if let Some(caps) = &play_pos {
499 positions.push((caps.get(0).unwrap().start(), CommandKind::Play));
500 }
501 if let Some(caps) = &goto_pos {
502 positions.push((caps.get(0).unwrap().start(), CommandKind::Goto));
503 }
504 if let Some(m) = sentence_pos {
505 positions.push((m.start(), CommandKind::Sentence));
506 }
507
508 positions.sort_by_key(|p| p.0);
509
510 if let Some((pos, kind)) = positions.first() {
511 let pos = *pos;
512 match kind {
513 CommandKind::Hangup => {
514 let prefix = buffer[..pos].to_string();
515 if !prefix.trim().is_empty() {
516 let mut cmd = self.create_tts_command_with_id(
517 prefix,
518 play_id.to_string(),
519 Some(true),
520 );
521 if let Command::Tts { end_of_stream, .. } = &mut cmd {
522 *end_of_stream = Some(true);
523 }
524 self.is_hanging_up = true;
525 commands.push(cmd);
526 } else {
527 let mut cmd = self.create_tts_command_with_id(
528 "".to_string(),
529 play_id.to_string(),
530 Some(true),
531 );
532 if let Command::Tts { end_of_stream, .. } = &mut cmd {
533 *end_of_stream = Some(true);
534 }
535 self.is_hanging_up = true;
536 commands.push(cmd);
537 }
538 buffer.drain(..RE_HANGUP.find(buffer).unwrap().end());
539 return commands;
540 }
541 CommandKind::Refer => {
542 let caps = RE_REFER.captures(buffer).unwrap();
543 let mat = caps.get(0).unwrap();
544 let callee = caps.get(1).unwrap().as_str().to_string();
545
546 let prefix = buffer[..pos].to_string();
547 if !prefix.trim().is_empty() {
548 commands.push(self.create_tts_command_with_id(
549 prefix,
550 play_id.to_string(),
551 None,
552 ));
553 }
554 commands.push(Command::Refer {
555 caller: String::new(),
556 callee,
557 options: None,
558 });
559 buffer.drain(..mat.end());
560 }
561 CommandKind::Play => {
562 let caps = RE_PLAY.captures(buffer).unwrap();
564 let mat = caps.get(0).unwrap();
565 let url = caps.get(1).unwrap().as_str().to_string();
566
567 let prefix = buffer[..pos].to_string();
568 if !prefix.trim().is_empty() {
569 commands.push(self.create_tts_command_with_id(
570 prefix,
571 play_id.to_string(),
572 None,
573 ));
574 }
575 commands.push(Command::Play {
576 url,
577 play_id: None,
578 auto_hangup: None,
579 wait_input_timeout: None,
580 });
581 buffer.drain(..mat.end());
582 }
583 CommandKind::Goto => {
584 let caps = RE_GOTO.captures(buffer).unwrap();
586 let mat = caps.get(0).unwrap();
587 let scene_id = caps.get(1).unwrap().as_str().to_string();
588
589 let prefix = buffer[..pos].to_string();
590 if !prefix.trim().is_empty() {
591 commands.push(self.create_tts_command_with_id(
592 prefix,
593 play_id.to_string(),
594 None,
595 ));
596 }
597
598 info!("Switching to scene (from stream): {}", scene_id);
599 if let Some(scene) = self.scenes.get(&scene_id) {
600 self.current_scene_id = Some(scene_id);
601 let system_prompt =
603 Self::build_system_prompt(&self.config, Some(&scene.prompt));
604 if let Some(first_msg) = self.history.get_mut(0) {
605 if first_msg.role == "system" {
606 first_msg.content = system_prompt;
607 }
608 }
609 } else {
610 warn!("Scene not found: {}", scene_id);
611 }
612
613 buffer.drain(..mat.end());
614 }
615 CommandKind::Sentence => {
616 let mat = sentence_pos.unwrap();
618 let sentence = buffer[..mat.end()].to_string();
619 if !sentence.trim().is_empty() {
620 commands.push(self.create_tts_command_with_id(
621 sentence,
622 play_id.to_string(),
623 None,
624 ));
625 }
626 buffer.drain(..mat.end());
627 }
628 }
629 } else {
630 break;
631 }
632 }
633
634 if is_final {
635 let remaining = buffer.trim().to_string();
636 if !remaining.is_empty() {
637 commands.push(self.create_tts_command_with_id(
638 remaining,
639 play_id.to_string(),
640 None,
641 ));
642 }
643 buffer.clear();
644
645 if let Some(last) = commands.last_mut() {
646 if let Command::Tts { end_of_stream, .. } = last {
647 *end_of_stream = Some(true);
648 }
649 } else if !self.is_hanging_up {
650 commands.push(Command::Tts {
651 text: "".to_string(),
652 speaker: None,
653 play_id: Some(play_id.to_string()),
654 auto_hangup: None,
655 streaming: Some(true),
656 end_of_stream: Some(true),
657 option: None,
658 wait_input_timeout: None,
659 base64: None,
660 });
661 }
662 }
663
664 commands
665 }
666
667 fn create_tts_command_with_id(
668 &self,
669 text: String,
670 play_id: String,
671 auto_hangup: Option<bool>,
672 ) -> Command {
673 Command::Tts {
674 text,
675 speaker: None,
676 play_id: Some(play_id),
677 auto_hangup,
678 streaming: Some(true),
679 end_of_stream: None,
680 option: None,
681 wait_input_timeout: Some(10000),
682 base64: None,
683 }
684 }
685
686 async fn handle_tool_invocation(
687 &mut self,
688 tool: ToolInvocation,
689 tool_commands: &mut Vec<Command>,
690 ) -> Result<bool> {
691 match tool {
692 ToolInvocation::Hangup {
693 ref reason,
694 ref initiator,
695 } => {
696 self.send_debug_event(
697 "tool_invocation",
698 json!({
699 "tool": "Hangup",
700 "params": {
701 "reason": reason,
702 "initiator": initiator,
703 }
704 }),
705 );
706 tool_commands.push(Command::Hangup {
707 reason: reason.clone(),
708 initiator: initiator.clone(),
709 });
710 Ok(false)
711 }
712 ToolInvocation::Refer {
713 ref caller,
714 ref callee,
715 ref options,
716 } => {
717 self.send_debug_event(
718 "tool_invocation",
719 json!({
720 "tool": "Refer",
721 "params": {
722 "caller": caller,
723 "callee": callee,
724 }
725 }),
726 );
727 tool_commands.push(Command::Refer {
728 caller: caller.clone(),
729 callee: callee.clone(),
730 options: options.clone(),
731 });
732 Ok(false)
733 }
734 ToolInvocation::Rag {
735 ref query,
736 ref source,
737 } => {
738 self.handle_rag_tool(query, source).await?;
739 Ok(true)
740 }
741 ToolInvocation::Accept { ref options } => {
742 self.send_debug_event("tool_invocation", json!({ "tool": "Accept" }));
743 tool_commands.push(Command::Accept {
744 option: options.clone().unwrap_or_default(),
745 });
746 Ok(false)
747 }
748 ToolInvocation::Reject { ref reason, code } => {
749 self.send_debug_event(
750 "tool_invocation",
751 json!({
752 "tool": "Reject",
753 "params": {
754 "reason": reason,
755 "code": code,
756 }
757 }),
758 );
759 tool_commands.push(Command::Reject {
760 reason: reason
761 .clone()
762 .unwrap_or_else(|| "Rejected by agent".to_string()),
763 code,
764 });
765 Ok(false)
766 }
767 ToolInvocation::Http {
768 ref url,
769 ref method,
770 ref body,
771 ref headers,
772 } => {
773 self.handle_http_tool(url, method, body, headers).await?;
774 Ok(true)
775 }
776 }
777 }
778
779 async fn handle_rag_tool(&mut self, query: &str, source: &Option<String>) -> Result<()> {
780 self.send_debug_event(
781 "tool_invocation",
782 json!({
783 "tool": "Rag",
784 "params": {
785 "query": query,
786 "source": source,
787 }
788 }),
789 );
790
791 let rag_result = self.rag_retriever.retrieve(query).await?;
792
793 self.send_debug_event(
794 "rag_result",
795 json!({
796 "query": query,
797 "result": rag_result,
798 }),
799 );
800
801 let summary = if let Some(source) = source {
802 format!("[{}] {}", source, rag_result)
803 } else {
804 rag_result
805 };
806
807 self.history.push(ChatMessage {
808 role: "system".to_string(),
809 content: format!("RAG result for {}: {}", query, summary),
810 });
811
812 Ok(())
813 }
814
815 async fn handle_http_tool(
816 &mut self,
817 url: &str,
818 method: &Option<String>,
819 body: &Option<serde_json::Value>,
820 headers: &Option<HashMap<String, String>>,
821 ) -> Result<()> {
822 let method_str = method.as_deref().unwrap_or("GET").to_uppercase();
823 let method =
824 reqwest::Method::from_bytes(method_str.as_bytes()).unwrap_or(reqwest::Method::GET);
825
826 self.send_debug_event(
827 "tool_invocation",
828 json!({
829 "tool": "Http",
830 "params": {
831 "url": url,
832 "method": method_str,
833 }
834 }),
835 );
836
837 let mut req = self.client.request(method, url);
838 if let Some(body) = body {
839 req = req.json(body);
840 }
841 if let Some(headers) = headers {
842 for (k, v) in headers {
843 req = req.header(k, v);
844 }
845 }
846
847 match req.send().await {
848 Ok(res) => {
849 let status = res.status();
850 let text = res.text().await.unwrap_or_default();
851 self.history.push(ChatMessage {
852 role: "system".to_string(),
853 content: format!("HTTP tool response ({}): {}", status, text),
854 });
855 }
856 Err(e) => {
857 warn!("HTTP tool failed: {}", e);
858 self.history.push(ChatMessage {
859 role: "system".to_string(),
860 content: format!("HTTP tool failed: {}", e),
861 });
862 }
863 }
864
865 Ok(())
866 }
867
868 async fn handle_asr_final(&mut self, text: &str) -> Result<Vec<Command>> {
869 if text.trim().is_empty() {
870 return Ok(vec![]);
871 }
872
873 self.apply_context_repair(text);
874 self.apply_rolling_summary().await;
875
876 self.last_asr_final_at = Some(std::time::Instant::now());
877 self.last_interaction_at = std::time::Instant::now();
878 self.is_speaking = false;
879 self.consecutive_follow_ups = 0;
880
881 self.generate_response().await
882 }
883
884 fn apply_context_repair(&mut self, text: &str) {
885 let enable_repair = self
886 .config
887 .features
888 .as_ref()
889 .map(|f| f.contains(&"context_repair".to_string()))
890 .unwrap_or(false);
891
892 if !enable_repair {
893 self.history.push(ChatMessage {
894 role: "user".to_string(),
895 content: text.to_string(),
896 });
897 return;
898 }
899
900 let repair_window_ms = self.config.repair_window_ms.unwrap_or(3000) as u128;
901 let mut merged = false;
902
903 if let Some(last_robot_at) = self.last_robot_msg_at {
904 if last_robot_at.elapsed().as_millis() < repair_window_ms {
905 if let Some(last_msg) = self.history.last() {
906 if last_msg.role == "assistant" && last_msg.content.chars().count() < 15 {
907 info!(
908 "Context Repair: Detected potential fragmentation. Triggering merge."
909 );
910 self.history.pop();
911 if let Some(prev_user) = self.history.last_mut() {
912 if prev_user.role == "user" {
913 prev_user.content.push_str(",");
914 prev_user.content.push_str(text);
915 merged = true;
916 }
917 }
918 }
919 }
920 }
921 }
922
923 if !merged {
924 self.history.push(ChatMessage {
925 role: "user".to_string(),
926 content: text.to_string(),
927 });
928 }
929 }
930
931 async fn apply_rolling_summary(&mut self) {
932 let enable_summary = self
933 .config
934 .features
935 .as_ref()
936 .map(|f| f.contains(&"rolling_summary".to_string()))
937 .unwrap_or(false);
938
939 if !enable_summary {
940 return;
941 }
942
943 let summary_limit = self.config.summary_limit.unwrap_or(20);
944 if self.history.len() <= summary_limit {
945 return;
946 }
947
948 info!("Rolling Summary: History limit reached. Triggering background summary.");
949 let keep_recent = 6;
950 if self.history.len() <= summary_limit + keep_recent
951 || self.history.len() <= keep_recent + 1
952 {
953 return;
954 }
955
956 let split_idx = self.history.len() - keep_recent;
957 let to_summarize = self.history[1..split_idx].to_vec();
958 let recent = self.history[split_idx..].to_vec();
959
960 let summary_prompt =
961 "Summarize the above conversation so far, focusing on key details and user intent.";
962 let mut summary_req_history = to_summarize;
963 summary_req_history.push(ChatMessage {
964 role: "user".to_string(),
965 content: summary_prompt.to_string(),
966 });
967
968 match self.provider.call(&self.config, &summary_req_history).await {
969 Ok(summary) => {
970 let mut new_history = Vec::new();
971 if let Some(sys) = self.history.first() {
972 let mut new_sys = sys.clone();
973 new_sys.content.push_str("\n\n[Previous Context Summary]: ");
974 new_sys.content.push_str(&summary);
975 new_history.push(new_sys);
976 }
977 new_history.extend(recent);
978 self.history = new_history;
979 info!(
980 "Rolling Summary: Applied summary. New history len: {}",
981 self.history.len()
982 );
983 }
984 Err(e) => {
985 warn!("Rolling Summary failed: {}", e);
986 }
987 }
988 }
989
990 fn check_interruption(
991 &mut self,
992 event: &SessionEvent,
993 is_filler: &Option<bool>,
994 ) -> Option<Command> {
995 let strategy = self.interruption_config.strategy;
996 let should_check = match (strategy, event) {
997 (InterruptionStrategy::None, _) => false,
998 (InterruptionStrategy::Vad, SessionEvent::Speaking { .. }) => true,
999 (InterruptionStrategy::Asr, SessionEvent::AsrDelta { .. }) => true,
1000 (InterruptionStrategy::Both, _) => true,
1001 _ => false,
1002 };
1003
1004 if !self.is_speaking || self.is_hanging_up || !should_check {
1005 return None;
1006 }
1007
1008 if let Some(last_start) = self.last_tts_start_at {
1010 let ignore_ms = self.interruption_config.ignore_first_ms.unwrap_or(800);
1011 if last_start.elapsed().as_millis() < ignore_ms as u128 {
1012 return None;
1013 }
1014 }
1015
1016 if self.interruption_config.filler_word_filter.unwrap_or(false) {
1018 if let Some(true) = is_filler {
1019 return None;
1020 }
1021 if let SessionEvent::AsrDelta { text, .. } = event {
1022 if is_likely_filler(text) {
1023 return None;
1024 }
1025 }
1026 }
1027
1028 if let Some(last_final) = self.last_asr_final_at {
1030 if last_final.elapsed().as_millis() < 500 {
1031 return None;
1032 }
1033 }
1034
1035 info!("Smart interruption detected, stopping playback");
1036 self.is_speaking = false;
1037 Some(Command::Interrupt {
1038 graceful: Some(true),
1039 fade_out_ms: self.interruption_config.volume_fade_ms,
1040 })
1041 }
1042
1043 async fn handle_silence(&mut self) -> Result<Vec<Command>> {
1044 let follow_up_config = if let Some(scene_id) = &self.current_scene_id {
1045 self.scenes
1046 .get(scene_id)
1047 .and_then(|s| s.follow_up)
1048 .or(self.global_follow_up_config)
1049 } else {
1050 self.global_follow_up_config
1051 };
1052
1053 let Some(config) = follow_up_config else {
1054 return Ok(vec![]);
1055 };
1056
1057 if self.is_speaking
1058 || self.last_interaction_at.elapsed().as_millis() < config.timeout as u128
1059 {
1060 return Ok(vec![]);
1061 }
1062
1063 if self.consecutive_follow_ups >= config.max_count {
1064 info!("Max follow-up count reached, hanging up");
1065 return Ok(vec![Command::Hangup {
1066 reason: Some("Max follow-up reached".to_string()),
1067 initiator: Some("system".to_string()),
1068 }]);
1069 }
1070
1071 info!(
1072 "Silence timeout detected ({}ms), triggering follow-up ({}/{})",
1073 self.last_interaction_at.elapsed().as_millis(),
1074 self.consecutive_follow_ups + 1,
1075 config.max_count
1076 );
1077 self.consecutive_follow_ups += 1;
1078 self.last_interaction_at = std::time::Instant::now();
1079 self.generate_response().await
1080 }
1081
1082 async fn handle_function_call(&mut self, name: &str, arguments: &str) -> Result<Vec<Command>> {
1083 info!(
1084 "Function call from Realtime: {} with args {}",
1085 name, arguments
1086 );
1087 let args: serde_json::Value = serde_json::from_str(arguments).unwrap_or_default();
1088
1089 match name {
1090 "hangup_call" => Ok(vec![Command::Hangup {
1091 reason: args["reason"].as_str().map(|s| s.to_string()),
1092 initiator: Some("ai".to_string()),
1093 }]),
1094 "transfer_call" | "refer_call" => {
1095 if let Some(callee) = args["callee"]
1096 .as_str()
1097 .or_else(|| args["callee_uri"].as_str())
1098 {
1099 Ok(vec![Command::Refer {
1100 caller: String::new(),
1101 callee: callee.to_string(),
1102 options: None,
1103 }])
1104 } else {
1105 warn!("No callee provided for transfer_call");
1106 Ok(vec![])
1107 }
1108 }
1109 "goto_scene" => {
1110 if let Some(scene) = args["scene"].as_str() {
1111 self.switch_to_scene(scene, false).await
1112 } else {
1113 Ok(vec![])
1114 }
1115 }
1116 _ => {
1117 warn!("Unhandled function call: {}", name);
1118 Ok(vec![])
1119 }
1120 }
1121 }
1122
1123 async fn interpret_response(&mut self, initial: String) -> Result<Vec<Command>> {
1124 let mut tool_commands = Vec::new();
1125 let mut wait_input_timeout = None;
1126 let mut attempts = 0;
1127 let mut raw = initial;
1128
1129 let final_text = loop {
1130 attempts += 1;
1131
1132 let Some(structured) = parse_structured_response(&raw) else {
1133 break Some(raw);
1134 };
1135
1136 if wait_input_timeout.is_none() {
1137 wait_input_timeout = structured.wait_input_timeout;
1138 }
1139
1140 let mut rerun_for_rag = false;
1141 if let Some(tools) = structured.tools {
1142 for tool in tools {
1143 let needs_rerun = self
1144 .handle_tool_invocation(tool, &mut tool_commands)
1145 .await?;
1146 rerun_for_rag = rerun_for_rag || needs_rerun;
1147 }
1148 }
1149
1150 if !rerun_for_rag {
1151 break structured.text;
1152 }
1153
1154 if attempts >= MAX_RAG_ATTEMPTS {
1155 warn!("Reached RAG iteration limit, using last response");
1156 break structured.text.or(Some(raw));
1157 }
1158
1159 raw = self.call_llm().await?;
1160 };
1161
1162 let has_hangup = tool_commands
1163 .iter()
1164 .any(|c| matches!(c, Command::Hangup { .. }));
1165 let mut commands = Vec::new();
1166
1167 if let Some(text) = final_text {
1168 if !text.trim().is_empty() {
1169 self.history.push(ChatMessage {
1170 role: "assistant".to_string(),
1171 content: text.clone(),
1172 });
1173 self.last_tts_start_at = Some(std::time::Instant::now());
1174 self.is_speaking = true;
1175
1176 let auto_hangup = has_hangup.then_some(true);
1177 commands.push(self.create_tts_command(text, wait_input_timeout, auto_hangup));
1178
1179 if has_hangup {
1180 tool_commands.retain(|c| !matches!(c, Command::Hangup { .. }));
1181 self.is_hanging_up = true;
1182 }
1183 }
1184 }
1185
1186 commands.extend(tool_commands);
1187 Ok(commands)
1188 }
1189}
1190
1191fn parse_structured_response(raw: &str) -> Option<StructuredResponse> {
1192 let payload = extract_json_block(raw)?;
1193 serde_json::from_str(payload).ok()
1194}
1195
1196fn is_likely_filler(text: &str) -> bool {
1197 let trimmed = text.trim().to_lowercase();
1198 FILLERS.contains(&trimmed)
1199}
1200
1201fn extract_json_block(raw: &str) -> Option<&str> {
1202 let trimmed = raw.trim();
1203 if trimmed.starts_with('`') {
1204 if let Some(end) = trimmed.rfind("```") {
1205 if end <= 3 {
1206 return None;
1207 }
1208 let mut inner = &trimmed[3..end];
1209 inner = inner.trim();
1210 if inner.to_lowercase().starts_with("json") {
1211 if let Some(newline) = inner.find('\n') {
1212 inner = inner[newline + 1..].trim();
1213 } else if inner.len() > 4 {
1214 inner = inner[4..].trim();
1215 } else {
1216 inner = inner.trim();
1217 }
1218 }
1219 return Some(inner);
1220 }
1221 } else if trimmed.starts_with('{') || trimmed.starts_with('[') {
1222 return Some(trimmed);
1223 }
1224 None
1225}
1226
1227#[async_trait]
1228impl DialogueHandler for LlmHandler {
1229 async fn on_start(&mut self) -> Result<Vec<Command>> {
1230 self.last_tts_start_at = Some(std::time::Instant::now());
1231
1232 let mut commands = Vec::new();
1233
1234 if let Some(scene_id) = &self.current_scene_id {
1236 if let Some(scene) = self.scenes.get(scene_id) {
1237 if let Some(audio_file) = &scene.play {
1238 commands.push(Command::Play {
1239 url: audio_file.clone(),
1240 play_id: None,
1241 auto_hangup: None,
1242 wait_input_timeout: None,
1243 });
1244 }
1245 }
1246 }
1247
1248 if let Some(greeting) = &self.config.greeting {
1249 self.is_speaking = true;
1250 commands.push(self.create_tts_command(greeting.clone(), None, None));
1251 return Ok(commands);
1252 }
1253
1254 let response_commands = self.generate_response().await?;
1255 commands.extend(response_commands);
1256 Ok(commands)
1257 }
1258
1259 async fn on_event(&mut self, event: &SessionEvent) -> Result<Vec<Command>> {
1260 match event {
1261 SessionEvent::Dtmf { digit, .. } => {
1262 info!("DTMF received: {}", digit);
1263 if let Some(action) = self.get_dtmf_action(digit) {
1264 self.handle_dtmf_action(action).await
1265 } else {
1266 Ok(vec![])
1267 }
1268 }
1269 SessionEvent::AsrFinal { text, .. } => self.handle_asr_final(text).await,
1270 SessionEvent::AsrDelta { is_filler, .. } | SessionEvent::Speaking { is_filler, .. } => {
1271 Ok(self
1272 .check_interruption(event, is_filler)
1273 .into_iter()
1274 .collect())
1275 }
1276 SessionEvent::Eou { completed, .. } => {
1277 if *completed && !self.is_speaking {
1278 info!("EOU detected, triggering early response");
1279 self.generate_response().await
1280 } else {
1281 Ok(vec![])
1282 }
1283 }
1284 SessionEvent::Silence { .. } => self.handle_silence().await,
1285 SessionEvent::TrackStart { .. } => {
1286 self.is_speaking = true;
1287 Ok(vec![])
1288 }
1289 SessionEvent::TrackEnd { .. } => {
1290 self.is_speaking = false;
1291 self.is_hanging_up = false;
1292 self.last_interaction_at = std::time::Instant::now();
1293 Ok(vec![])
1294 }
1295 SessionEvent::FunctionCall {
1296 name, arguments, ..
1297 } => self.handle_function_call(name, arguments).await,
1298 _ => Ok(vec![]),
1299 }
1300 }
1301
1302 async fn get_history(&self) -> Vec<ChatMessage> {
1303 self.history.clone()
1304 }
1305
1306 async fn summarize(&mut self, prompt: &str) -> Result<String> {
1307 info!("Generating summary with prompt: {}", prompt);
1308 let mut summary_history = self.history.clone();
1309 summary_history.push(ChatMessage {
1310 role: "user".to_string(),
1311 content: prompt.to_string(),
1312 });
1313
1314 self.provider.call(&self.config, &summary_history).await
1315 }
1316}