active_call/playbook/handler/
mod.rs

1use crate::call::Command;
2use crate::event::SessionEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use futures::StreamExt;
6use once_cell::sync::Lazy;
7use regex::Regex;
8use reqwest::Client;
9use serde_json::json;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tracing::{info, warn};
13
14#[cfg(test)]
15mod tests;
16
17static RE_HANGUP: Lazy<Regex> = Lazy::new(|| Regex::new(r"<hangup\s*/>").unwrap());
18static RE_REFER: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<refer\s+to="([^"]+)"\s*/>"#).unwrap());
19static RE_PLAY: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<play\s+file="([^"]+)"\s*/>"#).unwrap());
20static RE_GOTO: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<goto\s+scene="([^"]+)"\s*/>"#).unwrap());
21static RE_SENTENCE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)[.!?。!?\n]\s*").unwrap());
22static FILLERS: Lazy<std::collections::HashSet<String>> = Lazy::new(|| {
23    let mut s = std::collections::HashSet::new();
24    let default_fillers = ["嗯", "啊", "哦", "那个", "那个...", "uh", "um", "ah"];
25
26    if let Ok(content) = std::fs::read_to_string("config/fillers.txt") {
27        for line in content.lines() {
28            let trimmed = line.trim().to_lowercase();
29            if !trimmed.is_empty() {
30                s.insert(trimmed);
31            }
32        }
33    }
34
35    if s.is_empty() {
36        for f in default_fillers {
37            s.insert(f.to_string());
38        }
39    }
40    s
41});
42
43use super::ChatMessage;
44use super::InterruptionStrategy;
45use super::LlmConfig;
46use super::dialogue::DialogueHandler;
47
48pub mod provider;
49pub mod rag;
50pub mod types;
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53enum CommandKind {
54    Hangup,
55    Refer,
56    Sentence,
57    Play,
58    Goto,
59}
60
61pub use provider::*;
62pub use rag::*;
63pub use types::*;
64
65const MAX_RAG_ATTEMPTS: usize = 3;
66
67pub struct LlmHandler {
68    config: LlmConfig,
69    interruption_config: super::InterruptionConfig,
70    global_follow_up_config: Option<super::FollowUpConfig>,
71    dtmf_config: Option<HashMap<String, super::DtmfAction>>,
72    history: Vec<ChatMessage>,
73    provider: Arc<dyn LlmProvider>,
74    rag_retriever: Arc<dyn RagRetriever>,
75    is_speaking: bool,
76    is_hanging_up: bool,
77    consecutive_follow_ups: u32,
78    last_interaction_at: std::time::Instant,
79    event_sender: Option<crate::event::EventSender>,
80    last_asr_final_at: Option<std::time::Instant>,
81    last_tts_start_at: Option<std::time::Instant>,
82    last_robot_msg_at: Option<std::time::Instant>,
83    call: Option<crate::call::ActiveCallRef>,
84    scenes: HashMap<String, super::Scene>,
85    current_scene_id: Option<String>,
86    client: Client,
87}
88
89impl LlmHandler {
90    pub fn new(
91        config: LlmConfig,
92        interruption: super::InterruptionConfig,
93        global_follow_up_config: Option<super::FollowUpConfig>,
94        scenes: HashMap<String, super::Scene>,
95        dtmf: Option<HashMap<String, super::DtmfAction>>,
96        initial_scene_id: Option<String>,
97    ) -> Self {
98        Self::with_provider(
99            config,
100            Arc::new(DefaultLlmProvider::new()),
101            Arc::new(NoopRagRetriever),
102            interruption,
103            global_follow_up_config,
104            scenes,
105            dtmf,
106            initial_scene_id,
107        )
108    }
109
110    pub fn with_provider(
111        config: LlmConfig,
112        provider: Arc<dyn LlmProvider>,
113        rag_retriever: Arc<dyn RagRetriever>,
114        interruption: super::InterruptionConfig,
115        global_follow_up_config: Option<super::FollowUpConfig>,
116        scenes: HashMap<String, super::Scene>,
117        dtmf: Option<HashMap<String, super::DtmfAction>>,
118        initial_scene_id: Option<String>,
119    ) -> Self {
120        let mut history = Vec::new();
121        let system_prompt = Self::build_system_prompt(&config, None);
122
123        history.push(ChatMessage {
124            role: "system".to_string(),
125            content: system_prompt,
126        });
127
128        Self {
129            config,
130            interruption_config: interruption,
131            global_follow_up_config,
132            dtmf_config: dtmf,
133            history,
134            provider,
135            rag_retriever,
136            is_speaking: false,
137            is_hanging_up: false,
138            consecutive_follow_ups: 0,
139            last_interaction_at: std::time::Instant::now(),
140            event_sender: None,
141            last_asr_final_at: None,
142            last_tts_start_at: None,
143            last_robot_msg_at: None,
144            call: None,
145            scenes,
146            current_scene_id: initial_scene_id,
147            client: Client::new(),
148        }
149    }
150
151    fn build_system_prompt(config: &LlmConfig, scene_prompt: Option<&str>) -> String {
152        let base_prompt =
153            scene_prompt.unwrap_or_else(|| config.prompt.as_deref().unwrap_or_default());
154        let mut features_prompt = String::new();
155
156        if let Some(features) = &config.features {
157            let lang = config.language.as_deref().unwrap_or("zh");
158            for feature in features {
159                match Self::load_feature_snippet(feature, lang) {
160                    Ok(snippet) => {
161                        features_prompt.push_str(&format!("\n- {}", snippet));
162                    }
163                    Err(e) => {
164                        warn!("Failed to load feature snippet {}: {}", feature, e);
165                    }
166                }
167            }
168        }
169
170        let features_section = if features_prompt.is_empty() {
171            String::new()
172        } else {
173            format!("\n\n### Enhanced Capabilities:{}\n", features_prompt)
174        };
175
176        format!(
177            "{}{}\n\n\
178            Tool usage instructions:\n\
179            - To hang up the call, output: <hangup/>\n\
180            - To transfer the call, output: <refer to=\"sip:xxxx\"/>\n\
181            - To play an audio file, output: <play file=\"path/to/file.wav\"/>\n\
182            - To switch to another scene, output: <goto scene=\"scene_id\"/>\n\
183            - To call an external HTTP API, output JSON:\n\
184              ```json\n\
185              {{ \"tools\": [{{ \"name\": \"http\", \"url\": \"...\", \"method\": \"POST\", \"body\": {{ ... }} }}] }}\n\
186              ```\n\
187            Please use XML tags for simple actions and JSON blocks for tool calls. \
188            Output your response in short sentences. Each sentence will be played as soon as it is finished.",
189            base_prompt, features_section
190        )
191    }
192
193    fn load_feature_snippet(feature: &str, lang: &str) -> Result<String> {
194        let path = format!("features/{}.{}.md", feature, lang);
195        let content = std::fs::read_to_string(path)?;
196        Ok(content.trim().to_string())
197    }
198
199    fn get_dtmf_action(&self, digit: &str) -> Option<super::DtmfAction> {
200        if let Some(scene_id) = &self.current_scene_id {
201            if let Some(scene) = self.scenes.get(scene_id) {
202                if let Some(dtmf) = &scene.dtmf {
203                    if let Some(action) = dtmf.get(digit) {
204                        return Some(action.clone());
205                    }
206                }
207            }
208        }
209
210        if let Some(dtmf) = &self.dtmf_config {
211            if let Some(action) = dtmf.get(digit) {
212                return Some(action.clone());
213            }
214        }
215
216        None
217    }
218
219    async fn handle_dtmf_action(&mut self, action: super::DtmfAction) -> Result<Vec<Command>> {
220        match action {
221            super::DtmfAction::Goto { scene } => {
222                info!("DTMF action: switch to scene {}", scene);
223                self.switch_to_scene(&scene, true).await
224            }
225            super::DtmfAction::Transfer { target } => {
226                info!("DTMF action: transfer to {}", target);
227                Ok(vec![Command::Refer {
228                    caller: String::new(),
229                    callee: target,
230                    options: None,
231                }])
232            }
233            super::DtmfAction::Hangup => {
234                info!("DTMF action: hangup");
235                Ok(vec![Command::Hangup {
236                    reason: Some("DTMF Hangup".to_string()),
237                    initiator: Some("ai".to_string()),
238                }])
239            }
240        }
241    }
242
243    async fn switch_to_scene(
244        &mut self,
245        scene_id: &str,
246        trigger_response: bool,
247    ) -> Result<Vec<Command>> {
248        if let Some(scene) = self.scenes.get(scene_id).cloned() {
249            info!("Switching to scene: {}", scene_id);
250            self.current_scene_id = Some(scene_id.to_string());
251            let system_prompt = Self::build_system_prompt(&self.config, Some(&scene.prompt));
252            if let Some(first_msg) = self.history.get_mut(0) {
253                if first_msg.role == "system" {
254                    first_msg.content = system_prompt;
255                }
256            }
257
258            let mut commands = Vec::new();
259            if let Some(url) = &scene.play {
260                commands.push(Command::Play {
261                    url: url.clone(),
262                    play_id: None,
263                    auto_hangup: None,
264                    wait_input_timeout: None,
265                });
266            }
267
268            if trigger_response {
269                let response_cmds = self.generate_response().await?;
270                commands.extend(response_cmds);
271            }
272            Ok(commands)
273        } else {
274            warn!("Scene not found: {}", scene_id);
275            Ok(vec![])
276        }
277    }
278
279    pub fn get_history_ref(&self) -> &[ChatMessage] {
280        &self.history
281    }
282
283    pub fn get_current_scene_id(&self) -> Option<String> {
284        self.current_scene_id.clone()
285    }
286
287    pub fn set_call(&mut self, call: crate::call::ActiveCallRef) {
288        self.call = Some(call);
289    }
290
291    pub fn set_event_sender(&mut self, sender: crate::event::EventSender) {
292        self.event_sender = Some(sender.clone());
293        if let Some(greeting) = &self.config.greeting {
294            let _ = sender.send(crate::event::SessionEvent::AddHistory {
295                sender: Some("system".to_string()),
296                timestamp: crate::media::get_timestamp(),
297                speaker: "assistant".to_string(),
298                text: greeting.clone(),
299            });
300        }
301    }
302
303    fn send_debug_event(&self, key: &str, data: serde_json::Value) {
304        if let Some(sender) = &self.event_sender {
305            let timestamp = crate::media::get_timestamp();
306            if key == "llm_response" {
307                if let Some(text) = data.get("response").and_then(|v| v.as_str()) {
308                    let _ = sender.send(crate::event::SessionEvent::AddHistory {
309                        sender: Some("llm".to_string()),
310                        timestamp,
311                        speaker: "assistant".to_string(),
312                        text: text.to_string(),
313                    });
314                }
315            }
316
317            let event = crate::event::SessionEvent::Metrics {
318                timestamp,
319                key: key.to_string(),
320                duration: 0,
321                data,
322            };
323            let _ = sender.send(event);
324        }
325    }
326
327    async fn call_llm(&self) -> Result<String> {
328        self.provider.call(&self.config, &self.history).await
329    }
330
331    fn create_tts_command(
332        &self,
333        text: String,
334        wait_input_timeout: Option<u32>,
335        auto_hangup: Option<bool>,
336    ) -> Command {
337        let timeout = wait_input_timeout.unwrap_or(10000);
338        let play_id = uuid::Uuid::new_v4().to_string();
339
340        if let Some(sender) = &self.event_sender {
341            let _ = sender.send(crate::event::SessionEvent::Metrics {
342                timestamp: crate::media::get_timestamp(),
343                key: "tts_play_id_map".to_string(),
344                duration: 0,
345                data: serde_json::json!({
346                    "playId": play_id,
347                    "text": text,
348                }),
349            });
350        }
351
352        Command::Tts {
353            text,
354            speaker: None,
355            play_id: Some(play_id),
356            auto_hangup,
357            streaming: None,
358            end_of_stream: Some(true),
359            option: None,
360            wait_input_timeout: Some(timeout),
361            base64: None,
362        }
363    }
364
365    async fn generate_response(&mut self) -> Result<Vec<Command>> {
366        let start_time = crate::media::get_timestamp();
367        let play_id = uuid::Uuid::new_v4().to_string();
368
369        // Send debug event - LLM call started
370        self.send_debug_event(
371            "llm_call_start",
372            json!({
373                "history_length": self.history.len(),
374                "playId": play_id,
375            }),
376        );
377
378        let mut stream = self
379            .provider
380            .call_stream(&self.config, &self.history)
381            .await?;
382
383        let mut full_content = String::new();
384        let mut full_reasoning = String::new();
385        let mut buffer = String::new();
386        let mut commands = Vec::new();
387        let mut is_json_mode = false;
388        let mut checked_json_mode = false;
389        let mut first_token_time = None;
390
391        while let Some(chunk_result) = stream.next().await {
392            let event = match chunk_result {
393                Ok(c) => c,
394                Err(e) => {
395                    warn!("LLM stream error: {}", e);
396                    break;
397                }
398            };
399
400            match event {
401                LlmStreamEvent::Reasoning(text) => {
402                    full_reasoning.push_str(&text);
403                }
404                LlmStreamEvent::Content(chunk) => {
405                    if first_token_time.is_none() && !chunk.trim().is_empty() {
406                        first_token_time = Some(crate::media::get_timestamp());
407                    }
408
409                    full_content.push_str(&chunk);
410                    buffer.push_str(&chunk);
411
412                    if !checked_json_mode {
413                        let trimmed = full_content.trim();
414                        if !trimmed.is_empty() {
415                            if trimmed.starts_with('{') || trimmed.starts_with('`') {
416                                is_json_mode = true;
417                            }
418                            checked_json_mode = true;
419                        }
420                    }
421
422                    if checked_json_mode && !is_json_mode {
423                        let extracted =
424                            self.extract_streaming_commands(&mut buffer, &play_id, false);
425                        for cmd in extracted {
426                            if let Some(call) = &self.call {
427                                let _ = call.enqueue_command(cmd).await;
428                            } else {
429                                commands.push(cmd);
430                            }
431                        }
432                    }
433                }
434            }
435        }
436
437        // Send debug event - LLM response received
438        let end_time = crate::media::get_timestamp();
439        self.send_debug_event(
440            "llm_response",
441            json!({
442                "response": full_content,
443                "reasoning": full_reasoning,
444                "is_json_mode": is_json_mode,
445                "duration": end_time - start_time,
446                "ttfb": first_token_time.map(|t| t - start_time).unwrap_or(0),
447                "playId": play_id,
448            }),
449        );
450
451        if is_json_mode {
452            self.interpret_response(full_content).await
453        } else {
454            let extracted = self.extract_streaming_commands(&mut buffer, &play_id, true);
455            for cmd in extracted {
456                if let Some(call) = &self.call {
457                    let _ = call.enqueue_command(cmd).await;
458                } else {
459                    commands.push(cmd);
460                }
461            }
462            if !full_content.trim().is_empty() {
463                self.history.push(ChatMessage {
464                    role: "assistant".to_string(),
465                    content: full_content,
466                });
467                self.last_robot_msg_at = Some(std::time::Instant::now());
468                self.is_speaking = true;
469                self.last_tts_start_at = Some(std::time::Instant::now());
470            }
471            Ok(commands)
472        }
473    }
474
475    fn extract_streaming_commands(
476        &mut self,
477        buffer: &mut String,
478        play_id: &str,
479        is_final: bool,
480    ) -> Vec<Command> {
481        let mut commands = Vec::new();
482
483        loop {
484            let hangup_pos = RE_HANGUP.find(buffer);
485            let refer_pos = RE_REFER.captures(buffer);
486            let play_pos = RE_PLAY.captures(buffer);
487            let goto_pos = RE_GOTO.captures(buffer);
488            let sentence_pos = RE_SENTENCE.find(buffer);
489
490            // Find the first occurrence
491            let mut positions: Vec<(usize, CommandKind)> = Vec::new();
492            if let Some(m) = hangup_pos {
493                positions.push((m.start(), CommandKind::Hangup));
494            }
495            if let Some(caps) = &refer_pos {
496                positions.push((caps.get(0).unwrap().start(), CommandKind::Refer));
497            }
498            if let Some(caps) = &play_pos {
499                positions.push((caps.get(0).unwrap().start(), CommandKind::Play));
500            }
501            if let Some(caps) = &goto_pos {
502                positions.push((caps.get(0).unwrap().start(), CommandKind::Goto));
503            }
504            if let Some(m) = sentence_pos {
505                positions.push((m.start(), CommandKind::Sentence));
506            }
507
508            positions.sort_by_key(|p| p.0);
509
510            if let Some((pos, kind)) = positions.first() {
511                let pos = *pos;
512                match kind {
513                    CommandKind::Hangup => {
514                        let prefix = buffer[..pos].to_string();
515                        if !prefix.trim().is_empty() {
516                            let mut cmd = self.create_tts_command_with_id(
517                                prefix,
518                                play_id.to_string(),
519                                Some(true),
520                            );
521                            if let Command::Tts { end_of_stream, .. } = &mut cmd {
522                                *end_of_stream = Some(true);
523                            }
524                            self.is_hanging_up = true;
525                            commands.push(cmd);
526                        } else {
527                            let mut cmd = self.create_tts_command_with_id(
528                                "".to_string(),
529                                play_id.to_string(),
530                                Some(true),
531                            );
532                            if let Command::Tts { end_of_stream, .. } = &mut cmd {
533                                *end_of_stream = Some(true);
534                            }
535                            self.is_hanging_up = true;
536                            commands.push(cmd);
537                        }
538                        buffer.drain(..RE_HANGUP.find(buffer).unwrap().end());
539                        return commands;
540                    }
541                    CommandKind::Refer => {
542                        let caps = RE_REFER.captures(buffer).unwrap();
543                        let mat = caps.get(0).unwrap();
544                        let callee = caps.get(1).unwrap().as_str().to_string();
545
546                        let prefix = buffer[..pos].to_string();
547                        if !prefix.trim().is_empty() {
548                            commands.push(self.create_tts_command_with_id(
549                                prefix,
550                                play_id.to_string(),
551                                None,
552                            ));
553                        }
554                        commands.push(Command::Refer {
555                            caller: String::new(),
556                            callee,
557                            options: None,
558                        });
559                        buffer.drain(..mat.end());
560                    }
561                    CommandKind::Play => {
562                        // Play audio
563                        let caps = RE_PLAY.captures(buffer).unwrap();
564                        let mat = caps.get(0).unwrap();
565                        let url = caps.get(1).unwrap().as_str().to_string();
566
567                        let prefix = buffer[..pos].to_string();
568                        if !prefix.trim().is_empty() {
569                            commands.push(self.create_tts_command_with_id(
570                                prefix,
571                                play_id.to_string(),
572                                None,
573                            ));
574                        }
575                        commands.push(Command::Play {
576                            url,
577                            play_id: None,
578                            auto_hangup: None,
579                            wait_input_timeout: None,
580                        });
581                        buffer.drain(..mat.end());
582                    }
583                    CommandKind::Goto => {
584                        // Goto Scene
585                        let caps = RE_GOTO.captures(buffer).unwrap();
586                        let mat = caps.get(0).unwrap();
587                        let scene_id = caps.get(1).unwrap().as_str().to_string();
588
589                        let prefix = buffer[..pos].to_string();
590                        if !prefix.trim().is_empty() {
591                            commands.push(self.create_tts_command_with_id(
592                                prefix,
593                                play_id.to_string(),
594                                None,
595                            ));
596                        }
597
598                        info!("Switching to scene (from stream): {}", scene_id);
599                        if let Some(scene) = self.scenes.get(&scene_id) {
600                            self.current_scene_id = Some(scene_id);
601                            // Update system prompt in history
602                            let system_prompt =
603                                Self::build_system_prompt(&self.config, Some(&scene.prompt));
604                            if let Some(first_msg) = self.history.get_mut(0) {
605                                if first_msg.role == "system" {
606                                    first_msg.content = system_prompt;
607                                }
608                            }
609                        } else {
610                            warn!("Scene not found: {}", scene_id);
611                        }
612
613                        buffer.drain(..mat.end());
614                    }
615                    CommandKind::Sentence => {
616                        // Sentence
617                        let mat = sentence_pos.unwrap();
618                        let sentence = buffer[..mat.end()].to_string();
619                        if !sentence.trim().is_empty() {
620                            commands.push(self.create_tts_command_with_id(
621                                sentence,
622                                play_id.to_string(),
623                                None,
624                            ));
625                        }
626                        buffer.drain(..mat.end());
627                    }
628                }
629            } else {
630                break;
631            }
632        }
633
634        if is_final {
635            let remaining = buffer.trim().to_string();
636            if !remaining.is_empty() {
637                commands.push(self.create_tts_command_with_id(
638                    remaining,
639                    play_id.to_string(),
640                    None,
641                ));
642            }
643            buffer.clear();
644
645            if let Some(last) = commands.last_mut() {
646                if let Command::Tts { end_of_stream, .. } = last {
647                    *end_of_stream = Some(true);
648                }
649            } else if !self.is_hanging_up {
650                commands.push(Command::Tts {
651                    text: "".to_string(),
652                    speaker: None,
653                    play_id: Some(play_id.to_string()),
654                    auto_hangup: None,
655                    streaming: Some(true),
656                    end_of_stream: Some(true),
657                    option: None,
658                    wait_input_timeout: None,
659                    base64: None,
660                });
661            }
662        }
663
664        commands
665    }
666
667    fn create_tts_command_with_id(
668        &self,
669        text: String,
670        play_id: String,
671        auto_hangup: Option<bool>,
672    ) -> Command {
673        Command::Tts {
674            text,
675            speaker: None,
676            play_id: Some(play_id),
677            auto_hangup,
678            streaming: Some(true),
679            end_of_stream: None,
680            option: None,
681            wait_input_timeout: Some(10000),
682            base64: None,
683        }
684    }
685
686    async fn handle_tool_invocation(
687        &mut self,
688        tool: ToolInvocation,
689        tool_commands: &mut Vec<Command>,
690    ) -> Result<bool> {
691        match tool {
692            ToolInvocation::Hangup {
693                ref reason,
694                ref initiator,
695            } => {
696                self.send_debug_event(
697                    "tool_invocation",
698                    json!({
699                        "tool": "Hangup",
700                        "params": {
701                            "reason": reason,
702                            "initiator": initiator,
703                        }
704                    }),
705                );
706                tool_commands.push(Command::Hangup {
707                    reason: reason.clone(),
708                    initiator: initiator.clone(),
709                });
710                Ok(false)
711            }
712            ToolInvocation::Refer {
713                ref caller,
714                ref callee,
715                ref options,
716            } => {
717                self.send_debug_event(
718                    "tool_invocation",
719                    json!({
720                        "tool": "Refer",
721                        "params": {
722                            "caller": caller,
723                            "callee": callee,
724                        }
725                    }),
726                );
727                tool_commands.push(Command::Refer {
728                    caller: caller.clone(),
729                    callee: callee.clone(),
730                    options: options.clone(),
731                });
732                Ok(false)
733            }
734            ToolInvocation::Rag {
735                ref query,
736                ref source,
737            } => {
738                self.handle_rag_tool(query, source).await?;
739                Ok(true)
740            }
741            ToolInvocation::Accept { ref options } => {
742                self.send_debug_event("tool_invocation", json!({ "tool": "Accept" }));
743                tool_commands.push(Command::Accept {
744                    option: options.clone().unwrap_or_default(),
745                });
746                Ok(false)
747            }
748            ToolInvocation::Reject { ref reason, code } => {
749                self.send_debug_event(
750                    "tool_invocation",
751                    json!({
752                        "tool": "Reject",
753                        "params": {
754                            "reason": reason,
755                            "code": code,
756                        }
757                    }),
758                );
759                tool_commands.push(Command::Reject {
760                    reason: reason
761                        .clone()
762                        .unwrap_or_else(|| "Rejected by agent".to_string()),
763                    code,
764                });
765                Ok(false)
766            }
767            ToolInvocation::Http {
768                ref url,
769                ref method,
770                ref body,
771                ref headers,
772            } => {
773                self.handle_http_tool(url, method, body, headers).await?;
774                Ok(true)
775            }
776        }
777    }
778
779    async fn handle_rag_tool(&mut self, query: &str, source: &Option<String>) -> Result<()> {
780        self.send_debug_event(
781            "tool_invocation",
782            json!({
783                "tool": "Rag",
784                "params": {
785                    "query": query,
786                    "source": source,
787                }
788            }),
789        );
790
791        let rag_result = self.rag_retriever.retrieve(query).await?;
792
793        self.send_debug_event(
794            "rag_result",
795            json!({
796                "query": query,
797                "result": rag_result,
798            }),
799        );
800
801        let summary = if let Some(source) = source {
802            format!("[{}] {}", source, rag_result)
803        } else {
804            rag_result
805        };
806
807        self.history.push(ChatMessage {
808            role: "system".to_string(),
809            content: format!("RAG result for {}: {}", query, summary),
810        });
811
812        Ok(())
813    }
814
815    async fn handle_http_tool(
816        &mut self,
817        url: &str,
818        method: &Option<String>,
819        body: &Option<serde_json::Value>,
820        headers: &Option<HashMap<String, String>>,
821    ) -> Result<()> {
822        let method_str = method.as_deref().unwrap_or("GET").to_uppercase();
823        let method =
824            reqwest::Method::from_bytes(method_str.as_bytes()).unwrap_or(reqwest::Method::GET);
825
826        self.send_debug_event(
827            "tool_invocation",
828            json!({
829                "tool": "Http",
830                "params": {
831                    "url": url,
832                    "method": method_str,
833                }
834            }),
835        );
836
837        let mut req = self.client.request(method, url);
838        if let Some(body) = body {
839            req = req.json(body);
840        }
841        if let Some(headers) = headers {
842            for (k, v) in headers {
843                req = req.header(k, v);
844            }
845        }
846
847        match req.send().await {
848            Ok(res) => {
849                let status = res.status();
850                let text = res.text().await.unwrap_or_default();
851                self.history.push(ChatMessage {
852                    role: "system".to_string(),
853                    content: format!("HTTP tool response ({}): {}", status, text),
854                });
855            }
856            Err(e) => {
857                warn!("HTTP tool failed: {}", e);
858                self.history.push(ChatMessage {
859                    role: "system".to_string(),
860                    content: format!("HTTP tool failed: {}", e),
861                });
862            }
863        }
864
865        Ok(())
866    }
867
868    async fn handle_asr_final(&mut self, text: &str) -> Result<Vec<Command>> {
869        if text.trim().is_empty() {
870            return Ok(vec![]);
871        }
872
873        self.apply_context_repair(text);
874        self.apply_rolling_summary().await;
875
876        self.last_asr_final_at = Some(std::time::Instant::now());
877        self.last_interaction_at = std::time::Instant::now();
878        self.is_speaking = false;
879        self.consecutive_follow_ups = 0;
880
881        self.generate_response().await
882    }
883
884    fn apply_context_repair(&mut self, text: &str) {
885        let enable_repair = self
886            .config
887            .features
888            .as_ref()
889            .map(|f| f.contains(&"context_repair".to_string()))
890            .unwrap_or(false);
891
892        if !enable_repair {
893            self.history.push(ChatMessage {
894                role: "user".to_string(),
895                content: text.to_string(),
896            });
897            return;
898        }
899
900        let repair_window_ms = self.config.repair_window_ms.unwrap_or(3000) as u128;
901        let mut merged = false;
902
903        if let Some(last_robot_at) = self.last_robot_msg_at {
904            if last_robot_at.elapsed().as_millis() < repair_window_ms {
905                if let Some(last_msg) = self.history.last() {
906                    if last_msg.role == "assistant" && last_msg.content.chars().count() < 15 {
907                        info!(
908                            "Context Repair: Detected potential fragmentation. Triggering merge."
909                        );
910                        self.history.pop();
911                        if let Some(prev_user) = self.history.last_mut() {
912                            if prev_user.role == "user" {
913                                prev_user.content.push_str(",");
914                                prev_user.content.push_str(text);
915                                merged = true;
916                            }
917                        }
918                    }
919                }
920            }
921        }
922
923        if !merged {
924            self.history.push(ChatMessage {
925                role: "user".to_string(),
926                content: text.to_string(),
927            });
928        }
929    }
930
931    async fn apply_rolling_summary(&mut self) {
932        let enable_summary = self
933            .config
934            .features
935            .as_ref()
936            .map(|f| f.contains(&"rolling_summary".to_string()))
937            .unwrap_or(false);
938
939        if !enable_summary {
940            return;
941        }
942
943        let summary_limit = self.config.summary_limit.unwrap_or(20);
944        if self.history.len() <= summary_limit {
945            return;
946        }
947
948        info!("Rolling Summary: History limit reached. Triggering background summary.");
949        let keep_recent = 6;
950        if self.history.len() <= summary_limit + keep_recent
951            || self.history.len() <= keep_recent + 1
952        {
953            return;
954        }
955
956        let split_idx = self.history.len() - keep_recent;
957        let to_summarize = self.history[1..split_idx].to_vec();
958        let recent = self.history[split_idx..].to_vec();
959
960        let summary_prompt =
961            "Summarize the above conversation so far, focusing on key details and user intent.";
962        let mut summary_req_history = to_summarize;
963        summary_req_history.push(ChatMessage {
964            role: "user".to_string(),
965            content: summary_prompt.to_string(),
966        });
967
968        match self.provider.call(&self.config, &summary_req_history).await {
969            Ok(summary) => {
970                let mut new_history = Vec::new();
971                if let Some(sys) = self.history.first() {
972                    let mut new_sys = sys.clone();
973                    new_sys.content.push_str("\n\n[Previous Context Summary]: ");
974                    new_sys.content.push_str(&summary);
975                    new_history.push(new_sys);
976                }
977                new_history.extend(recent);
978                self.history = new_history;
979                info!(
980                    "Rolling Summary: Applied summary. New history len: {}",
981                    self.history.len()
982                );
983            }
984            Err(e) => {
985                warn!("Rolling Summary failed: {}", e);
986            }
987        }
988    }
989
990    fn check_interruption(
991        &mut self,
992        event: &SessionEvent,
993        is_filler: &Option<bool>,
994    ) -> Option<Command> {
995        let strategy = self.interruption_config.strategy;
996        let should_check = match (strategy, event) {
997            (InterruptionStrategy::None, _) => false,
998            (InterruptionStrategy::Vad, SessionEvent::Speaking { .. }) => true,
999            (InterruptionStrategy::Asr, SessionEvent::AsrDelta { .. }) => true,
1000            (InterruptionStrategy::Both, _) => true,
1001            _ => false,
1002        };
1003
1004        if !self.is_speaking || self.is_hanging_up || !should_check {
1005            return None;
1006        }
1007
1008        // Protection period check
1009        if let Some(last_start) = self.last_tts_start_at {
1010            let ignore_ms = self.interruption_config.ignore_first_ms.unwrap_or(800);
1011            if last_start.elapsed().as_millis() < ignore_ms as u128 {
1012                return None;
1013            }
1014        }
1015
1016        // Filler word filter
1017        if self.interruption_config.filler_word_filter.unwrap_or(false) {
1018            if let Some(true) = is_filler {
1019                return None;
1020            }
1021            if let SessionEvent::AsrDelta { text, .. } = event {
1022                if is_likely_filler(text) {
1023                    return None;
1024                }
1025            }
1026        }
1027
1028        // Stale event check
1029        if let Some(last_final) = self.last_asr_final_at {
1030            if last_final.elapsed().as_millis() < 500 {
1031                return None;
1032            }
1033        }
1034
1035        info!("Smart interruption detected, stopping playback");
1036        self.is_speaking = false;
1037        Some(Command::Interrupt {
1038            graceful: Some(true),
1039            fade_out_ms: self.interruption_config.volume_fade_ms,
1040        })
1041    }
1042
1043    async fn handle_silence(&mut self) -> Result<Vec<Command>> {
1044        let follow_up_config = if let Some(scene_id) = &self.current_scene_id {
1045            self.scenes
1046                .get(scene_id)
1047                .and_then(|s| s.follow_up)
1048                .or(self.global_follow_up_config)
1049        } else {
1050            self.global_follow_up_config
1051        };
1052
1053        let Some(config) = follow_up_config else {
1054            return Ok(vec![]);
1055        };
1056
1057        if self.is_speaking
1058            || self.last_interaction_at.elapsed().as_millis() < config.timeout as u128
1059        {
1060            return Ok(vec![]);
1061        }
1062
1063        if self.consecutive_follow_ups >= config.max_count {
1064            info!("Max follow-up count reached, hanging up");
1065            return Ok(vec![Command::Hangup {
1066                reason: Some("Max follow-up reached".to_string()),
1067                initiator: Some("system".to_string()),
1068            }]);
1069        }
1070
1071        info!(
1072            "Silence timeout detected ({}ms), triggering follow-up ({}/{})",
1073            self.last_interaction_at.elapsed().as_millis(),
1074            self.consecutive_follow_ups + 1,
1075            config.max_count
1076        );
1077        self.consecutive_follow_ups += 1;
1078        self.last_interaction_at = std::time::Instant::now();
1079        self.generate_response().await
1080    }
1081
1082    async fn handle_function_call(&mut self, name: &str, arguments: &str) -> Result<Vec<Command>> {
1083        info!(
1084            "Function call from Realtime: {} with args {}",
1085            name, arguments
1086        );
1087        let args: serde_json::Value = serde_json::from_str(arguments).unwrap_or_default();
1088
1089        match name {
1090            "hangup_call" => Ok(vec![Command::Hangup {
1091                reason: args["reason"].as_str().map(|s| s.to_string()),
1092                initiator: Some("ai".to_string()),
1093            }]),
1094            "transfer_call" | "refer_call" => {
1095                if let Some(callee) = args["callee"]
1096                    .as_str()
1097                    .or_else(|| args["callee_uri"].as_str())
1098                {
1099                    Ok(vec![Command::Refer {
1100                        caller: String::new(),
1101                        callee: callee.to_string(),
1102                        options: None,
1103                    }])
1104                } else {
1105                    warn!("No callee provided for transfer_call");
1106                    Ok(vec![])
1107                }
1108            }
1109            "goto_scene" => {
1110                if let Some(scene) = args["scene"].as_str() {
1111                    self.switch_to_scene(scene, false).await
1112                } else {
1113                    Ok(vec![])
1114                }
1115            }
1116            _ => {
1117                warn!("Unhandled function call: {}", name);
1118                Ok(vec![])
1119            }
1120        }
1121    }
1122
1123    async fn interpret_response(&mut self, initial: String) -> Result<Vec<Command>> {
1124        let mut tool_commands = Vec::new();
1125        let mut wait_input_timeout = None;
1126        let mut attempts = 0;
1127        let mut raw = initial;
1128
1129        let final_text = loop {
1130            attempts += 1;
1131
1132            let Some(structured) = parse_structured_response(&raw) else {
1133                break Some(raw);
1134            };
1135
1136            if wait_input_timeout.is_none() {
1137                wait_input_timeout = structured.wait_input_timeout;
1138            }
1139
1140            let mut rerun_for_rag = false;
1141            if let Some(tools) = structured.tools {
1142                for tool in tools {
1143                    let needs_rerun = self
1144                        .handle_tool_invocation(tool, &mut tool_commands)
1145                        .await?;
1146                    rerun_for_rag = rerun_for_rag || needs_rerun;
1147                }
1148            }
1149
1150            if !rerun_for_rag {
1151                break structured.text;
1152            }
1153
1154            if attempts >= MAX_RAG_ATTEMPTS {
1155                warn!("Reached RAG iteration limit, using last response");
1156                break structured.text.or(Some(raw));
1157            }
1158
1159            raw = self.call_llm().await?;
1160        };
1161
1162        let has_hangup = tool_commands
1163            .iter()
1164            .any(|c| matches!(c, Command::Hangup { .. }));
1165        let mut commands = Vec::new();
1166
1167        if let Some(text) = final_text {
1168            if !text.trim().is_empty() {
1169                self.history.push(ChatMessage {
1170                    role: "assistant".to_string(),
1171                    content: text.clone(),
1172                });
1173                self.last_tts_start_at = Some(std::time::Instant::now());
1174                self.is_speaking = true;
1175
1176                let auto_hangup = has_hangup.then_some(true);
1177                commands.push(self.create_tts_command(text, wait_input_timeout, auto_hangup));
1178
1179                if has_hangup {
1180                    tool_commands.retain(|c| !matches!(c, Command::Hangup { .. }));
1181                    self.is_hanging_up = true;
1182                }
1183            }
1184        }
1185
1186        commands.extend(tool_commands);
1187        Ok(commands)
1188    }
1189}
1190
1191fn parse_structured_response(raw: &str) -> Option<StructuredResponse> {
1192    let payload = extract_json_block(raw)?;
1193    serde_json::from_str(payload).ok()
1194}
1195
1196fn is_likely_filler(text: &str) -> bool {
1197    let trimmed = text.trim().to_lowercase();
1198    FILLERS.contains(&trimmed)
1199}
1200
1201fn extract_json_block(raw: &str) -> Option<&str> {
1202    let trimmed = raw.trim();
1203    if trimmed.starts_with('`') {
1204        if let Some(end) = trimmed.rfind("```") {
1205            if end <= 3 {
1206                return None;
1207            }
1208            let mut inner = &trimmed[3..end];
1209            inner = inner.trim();
1210            if inner.to_lowercase().starts_with("json") {
1211                if let Some(newline) = inner.find('\n') {
1212                    inner = inner[newline + 1..].trim();
1213                } else if inner.len() > 4 {
1214                    inner = inner[4..].trim();
1215                } else {
1216                    inner = inner.trim();
1217                }
1218            }
1219            return Some(inner);
1220        }
1221    } else if trimmed.starts_with('{') || trimmed.starts_with('[') {
1222        return Some(trimmed);
1223    }
1224    None
1225}
1226
1227#[async_trait]
1228impl DialogueHandler for LlmHandler {
1229    async fn on_start(&mut self) -> Result<Vec<Command>> {
1230        self.last_tts_start_at = Some(std::time::Instant::now());
1231
1232        let mut commands = Vec::new();
1233
1234        // Check if current scene has an audio file to play
1235        if let Some(scene_id) = &self.current_scene_id {
1236            if let Some(scene) = self.scenes.get(scene_id) {
1237                if let Some(audio_file) = &scene.play {
1238                    commands.push(Command::Play {
1239                        url: audio_file.clone(),
1240                        play_id: None,
1241                        auto_hangup: None,
1242                        wait_input_timeout: None,
1243                    });
1244                }
1245            }
1246        }
1247
1248        if let Some(greeting) = &self.config.greeting {
1249            self.is_speaking = true;
1250            commands.push(self.create_tts_command(greeting.clone(), None, None));
1251            return Ok(commands);
1252        }
1253
1254        let response_commands = self.generate_response().await?;
1255        commands.extend(response_commands);
1256        Ok(commands)
1257    }
1258
1259    async fn on_event(&mut self, event: &SessionEvent) -> Result<Vec<Command>> {
1260        match event {
1261            SessionEvent::Dtmf { digit, .. } => {
1262                info!("DTMF received: {}", digit);
1263                if let Some(action) = self.get_dtmf_action(digit) {
1264                    self.handle_dtmf_action(action).await
1265                } else {
1266                    Ok(vec![])
1267                }
1268            }
1269            SessionEvent::AsrFinal { text, .. } => self.handle_asr_final(text).await,
1270            SessionEvent::AsrDelta { is_filler, .. } | SessionEvent::Speaking { is_filler, .. } => {
1271                Ok(self
1272                    .check_interruption(event, is_filler)
1273                    .into_iter()
1274                    .collect())
1275            }
1276            SessionEvent::Eou { completed, .. } => {
1277                if *completed && !self.is_speaking {
1278                    info!("EOU detected, triggering early response");
1279                    self.generate_response().await
1280                } else {
1281                    Ok(vec![])
1282                }
1283            }
1284            SessionEvent::Silence { .. } => self.handle_silence().await,
1285            SessionEvent::TrackStart { .. } => {
1286                self.is_speaking = true;
1287                Ok(vec![])
1288            }
1289            SessionEvent::TrackEnd { .. } => {
1290                self.is_speaking = false;
1291                self.is_hanging_up = false;
1292                self.last_interaction_at = std::time::Instant::now();
1293                Ok(vec![])
1294            }
1295            SessionEvent::FunctionCall {
1296                name, arguments, ..
1297            } => self.handle_function_call(name, arguments).await,
1298            _ => Ok(vec![]),
1299        }
1300    }
1301
1302    async fn get_history(&self) -> Vec<ChatMessage> {
1303        self.history.clone()
1304    }
1305
1306    async fn summarize(&mut self, prompt: &str) -> Result<String> {
1307        info!("Generating summary with prompt: {}", prompt);
1308        let mut summary_history = self.history.clone();
1309        summary_history.push(ChatMessage {
1310            role: "user".to_string(),
1311            content: prompt.to_string(),
1312        });
1313
1314        self.provider.call(&self.config, &summary_history).await
1315    }
1316}