Skip to main content

active_call/playbook/handler/
mod.rs

1use crate::call::Command;
2use crate::event::SessionEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use futures::StreamExt;
6use once_cell::sync::Lazy;
7use regex::Regex;
8use reqwest::Client;
9use serde_json::json;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tracing::{info, warn};
13
14#[cfg(test)]
15mod tests;
16
17static RE_HANGUP: Lazy<Regex> = Lazy::new(|| Regex::new(r"<hangup\s*/>").unwrap());
18static RE_REFER: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<refer\s+to="([^"]+)"\s*/>"#).unwrap());
19static RE_PLAY: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<play\s+file="([^"]+)"\s*/>"#).unwrap());
20static RE_GOTO: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<goto\s+scene="([^"]+)"\s*/>"#).unwrap());
21static RE_SENTENCE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)[.!?。!?\n]\s*").unwrap());
22static FILLERS: Lazy<std::collections::HashSet<String>> = Lazy::new(|| {
23    let mut s = std::collections::HashSet::new();
24    let default_fillers = ["嗯", "啊", "哦", "那个", "那个...", "uh", "um", "ah"];
25
26    if let Ok(content) = std::fs::read_to_string("config/fillers.txt") {
27        for line in content.lines() {
28            let trimmed = line.trim().to_lowercase();
29            if !trimmed.is_empty() {
30                s.insert(trimmed);
31            }
32        }
33    }
34
35    if s.is_empty() {
36        for f in default_fillers {
37            s.insert(f.to_string());
38        }
39    }
40    s
41});
42
43use super::ChatMessage;
44use super::InterruptionStrategy;
45use super::LlmConfig;
46use super::dialogue::DialogueHandler;
47
48pub mod provider;
49pub mod rag;
50pub mod types;
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53enum CommandKind {
54    Hangup,
55    Refer,
56    Sentence,
57    Play,
58    Goto,
59}
60
61pub use provider::*;
62pub use rag::*;
63pub use types::*;
64
65const MAX_RAG_ATTEMPTS: usize = 3;
66
67pub struct LlmHandler {
68    config: LlmConfig,
69    interruption_config: super::InterruptionConfig,
70    global_follow_up_config: Option<super::FollowUpConfig>,
71    dtmf_config: Option<HashMap<String, super::DtmfAction>>,
72    history: Vec<ChatMessage>,
73    provider: Arc<dyn LlmProvider>,
74    rag_retriever: Arc<dyn RagRetriever>,
75    is_speaking: bool,
76    is_hanging_up: bool,
77    consecutive_follow_ups: u32,
78    last_interaction_at: std::time::Instant,
79    event_sender: Option<crate::event::EventSender>,
80    last_asr_final_at: Option<std::time::Instant>,
81    last_tts_start_at: Option<std::time::Instant>,
82    last_robot_msg_at: Option<std::time::Instant>,
83    call: Option<crate::call::ActiveCallRef>,
84    scenes: HashMap<String, super::Scene>,
85    current_scene_id: Option<String>,
86    client: Client,
87}
88
89impl LlmHandler {
90    pub fn new(
91        config: LlmConfig,
92        interruption: super::InterruptionConfig,
93        global_follow_up_config: Option<super::FollowUpConfig>,
94        scenes: HashMap<String, super::Scene>,
95        dtmf: Option<HashMap<String, super::DtmfAction>>,
96        initial_scene_id: Option<String>,
97    ) -> Self {
98        Self::with_provider(
99            config,
100            Arc::new(DefaultLlmProvider::new()),
101            Arc::new(NoopRagRetriever),
102            interruption,
103            global_follow_up_config,
104            scenes,
105            dtmf,
106            initial_scene_id,
107        )
108    }
109
110    pub fn with_provider(
111        config: LlmConfig,
112        provider: Arc<dyn LlmProvider>,
113        rag_retriever: Arc<dyn RagRetriever>,
114        interruption: super::InterruptionConfig,
115        global_follow_up_config: Option<super::FollowUpConfig>,
116        scenes: HashMap<String, super::Scene>,
117        dtmf: Option<HashMap<String, super::DtmfAction>>,
118        initial_scene_id: Option<String>,
119    ) -> Self {
120        let mut history = Vec::new();
121        let system_prompt = Self::build_system_prompt(&config, None);
122
123        history.push(ChatMessage {
124            role: "system".to_string(),
125            content: system_prompt,
126        });
127
128        Self {
129            config,
130            interruption_config: interruption,
131            global_follow_up_config,
132            dtmf_config: dtmf,
133            history,
134            provider,
135            rag_retriever,
136            is_speaking: false,
137            is_hanging_up: false,
138            consecutive_follow_ups: 0,
139            last_interaction_at: std::time::Instant::now(),
140            event_sender: None,
141            last_asr_final_at: None,
142            last_tts_start_at: None,
143            last_robot_msg_at: None,
144            call: None,
145            scenes,
146            current_scene_id: initial_scene_id,
147            client: Client::new(),
148        }
149    }
150
151    fn build_system_prompt(config: &LlmConfig, scene_prompt: Option<&str>) -> String {
152        let base_prompt =
153            scene_prompt.unwrap_or_else(|| config.prompt.as_deref().unwrap_or_default());
154        let mut features_prompt = String::new();
155
156        if let Some(features) = &config.features {
157            let lang = config.language.as_deref().unwrap_or("zh");
158            for feature in features {
159                match Self::load_feature_snippet(feature, lang) {
160                    Ok(snippet) => {
161                        features_prompt.push_str(&format!("\n- {}", snippet));
162                    }
163                    Err(e) => {
164                        warn!("Failed to load feature snippet {}: {}", feature, e);
165                    }
166                }
167            }
168        }
169
170        let features_section = if features_prompt.is_empty() {
171            String::new()
172        } else {
173            format!("\n\n### Enhanced Capabilities:{}\n", features_prompt)
174        };
175
176        // Load tool instructions - either custom or language-specific default
177        let tool_instructions = if let Some(custom) = &config.tool_instructions {
178            custom.clone()
179        } else {
180            let lang = config.language.as_deref().unwrap_or("zh");
181            Self::load_feature_snippet("tool_instructions", lang)
182                .unwrap_or_else(|_| {
183                    // Fallback to English if loading fails
184                    Self::load_feature_snippet("tool_instructions", "en")
185                        .unwrap_or_else(|_| {
186                            // Ultimate fallback to hardcoded English
187                            "Tool usage instructions:\n\
188                            - To hang up the call, output: <hangup/>\n\
189                            - To transfer the call, output: <refer to=\"sip:xxxx\"/>\n\
190                            - To play an audio file, output: <play file=\"path/to/file.wav\"/>\n\
191                            - To switch to another scene, output: <goto scene=\"scene_id\"/>\n\
192                            - To call an external HTTP API, output JSON:\n\
193                              ```json\n\
194                              {{ \"tools\": [{{ \"name\": \"http\", \"url\": \"...\", \"method\": \"POST\", \"body\": {{ ... }} }}] }}\n\
195                              ```\n\
196                            Please use XML tags for simple actions and JSON blocks for tool calls. \
197                            Output your response in short sentences. Each sentence will be played as soon as it is finished."
198                                .to_string()
199                        })
200                })
201        };
202
203        format!(
204            "{}{}\n\n{}",
205            base_prompt, features_section, tool_instructions
206        )
207    }
208
209    fn load_feature_snippet(feature: &str, lang: &str) -> Result<String> {
210        let path = format!("features/{}.{}.md", feature, lang);
211        let content = std::fs::read_to_string(path)?;
212        Ok(content.trim().to_string())
213    }
214
215    fn get_dtmf_action(&self, digit: &str) -> Option<super::DtmfAction> {
216        if let Some(scene_id) = &self.current_scene_id {
217            if let Some(scene) = self.scenes.get(scene_id) {
218                if let Some(dtmf) = &scene.dtmf {
219                    if let Some(action) = dtmf.get(digit) {
220                        return Some(action.clone());
221                    }
222                }
223            }
224        }
225
226        if let Some(dtmf) = &self.dtmf_config {
227            if let Some(action) = dtmf.get(digit) {
228                return Some(action.clone());
229            }
230        }
231
232        None
233    }
234
235    async fn handle_dtmf_action(&mut self, action: super::DtmfAction) -> Result<Vec<Command>> {
236        match action {
237            super::DtmfAction::Goto { scene } => {
238                info!("DTMF action: switch to scene {}", scene);
239                self.switch_to_scene(&scene, true).await
240            }
241            super::DtmfAction::Transfer { target } => {
242                info!("DTMF action: transfer to {}", target);
243                Ok(vec![Command::Refer {
244                    caller: String::new(),
245                    callee: target,
246                    options: None,
247                }])
248            }
249            super::DtmfAction::Hangup => {
250                info!("DTMF action: hangup");
251                Ok(vec![Command::Hangup {
252                    reason: Some("DTMF Hangup".to_string()),
253                    initiator: Some("ai".to_string()),
254                }])
255            }
256        }
257    }
258
259    async fn switch_to_scene(
260        &mut self,
261        scene_id: &str,
262        trigger_response: bool,
263    ) -> Result<Vec<Command>> {
264        if let Some(scene) = self.scenes.get(scene_id).cloned() {
265            info!("Switching to scene: {}", scene_id);
266            self.current_scene_id = Some(scene_id.to_string());
267            let system_prompt = Self::build_system_prompt(&self.config, Some(&scene.prompt));
268            if let Some(first_msg) = self.history.get_mut(0) {
269                if first_msg.role == "system" {
270                    first_msg.content = system_prompt;
271                }
272            }
273
274            let mut commands = Vec::new();
275            if let Some(url) = &scene.play {
276                commands.push(Command::Play {
277                    url: url.clone(),
278                    play_id: None,
279                    auto_hangup: None,
280                    wait_input_timeout: None,
281                });
282            }
283
284            if trigger_response {
285                let response_cmds = self.generate_response().await?;
286                commands.extend(response_cmds);
287            }
288            Ok(commands)
289        } else {
290            warn!("Scene not found: {}", scene_id);
291            Ok(vec![])
292        }
293    }
294
295    pub fn get_history_ref(&self) -> &[ChatMessage] {
296        &self.history
297    }
298
299    pub fn get_current_scene_id(&self) -> Option<String> {
300        self.current_scene_id.clone()
301    }
302
303    pub fn set_call(&mut self, call: crate::call::ActiveCallRef) {
304        self.call = Some(call);
305    }
306
307    pub fn set_event_sender(&mut self, sender: crate::event::EventSender) {
308        self.event_sender = Some(sender.clone());
309        if let Some(greeting) = &self.config.greeting {
310            let _ = sender.send(crate::event::SessionEvent::AddHistory {
311                sender: Some("system".to_string()),
312                timestamp: crate::media::get_timestamp(),
313                speaker: "assistant".to_string(),
314                text: greeting.clone(),
315            });
316        }
317    }
318
319    fn send_debug_event(&self, key: &str, data: serde_json::Value) {
320        if let Some(sender) = &self.event_sender {
321            let timestamp = crate::media::get_timestamp();
322            if key == "llm_response" {
323                if let Some(text) = data.get("response").and_then(|v| v.as_str()) {
324                    let _ = sender.send(crate::event::SessionEvent::AddHistory {
325                        sender: Some("llm".to_string()),
326                        timestamp,
327                        speaker: "assistant".to_string(),
328                        text: text.to_string(),
329                    });
330                }
331            }
332
333            let event = crate::event::SessionEvent::Metrics {
334                timestamp,
335                key: key.to_string(),
336                duration: 0,
337                data,
338            };
339            let _ = sender.send(event);
340        }
341    }
342
343    async fn call_llm(&self) -> Result<String> {
344        self.provider.call(&self.config, &self.history).await
345    }
346
347    fn create_tts_command(
348        &self,
349        text: String,
350        wait_input_timeout: Option<u32>,
351        auto_hangup: Option<bool>,
352    ) -> Command {
353        let timeout = wait_input_timeout.unwrap_or(10000);
354        let play_id = uuid::Uuid::new_v4().to_string();
355
356        if let Some(sender) = &self.event_sender {
357            let _ = sender.send(crate::event::SessionEvent::Metrics {
358                timestamp: crate::media::get_timestamp(),
359                key: "tts_play_id_map".to_string(),
360                duration: 0,
361                data: serde_json::json!({
362                    "playId": play_id,
363                    "text": text,
364                }),
365            });
366        }
367
368        Command::Tts {
369            text,
370            speaker: None,
371            play_id: Some(play_id),
372            auto_hangup,
373            streaming: None,
374            end_of_stream: Some(true),
375            option: None,
376            wait_input_timeout: Some(timeout),
377            base64: None,
378        }
379    }
380
381    async fn generate_response(&mut self) -> Result<Vec<Command>> {
382        let start_time = crate::media::get_timestamp();
383        let play_id = uuid::Uuid::new_v4().to_string();
384
385        // Send debug event - LLM call started
386        self.send_debug_event(
387            "llm_call_start",
388            json!({
389                "history_length": self.history.len(),
390                "playId": play_id,
391            }),
392        );
393
394        let mut stream = self
395            .provider
396            .call_stream(&self.config, &self.history)
397            .await?;
398
399        let mut full_content = String::new();
400        let mut full_reasoning = String::new();
401        let mut buffer = String::new();
402        let mut commands = Vec::new();
403        let mut is_json_mode = false;
404        let mut checked_json_mode = false;
405        let mut first_token_time = None;
406
407        while let Some(chunk_result) = stream.next().await {
408            let event = match chunk_result {
409                Ok(c) => c,
410                Err(e) => {
411                    warn!("LLM stream error: {}", e);
412                    break;
413                }
414            };
415
416            match event {
417                LlmStreamEvent::Reasoning(text) => {
418                    full_reasoning.push_str(&text);
419                }
420                LlmStreamEvent::Content(chunk) => {
421                    if first_token_time.is_none() && !chunk.trim().is_empty() {
422                        first_token_time = Some(crate::media::get_timestamp());
423                    }
424
425                    full_content.push_str(&chunk);
426                    buffer.push_str(&chunk);
427
428                    if !checked_json_mode {
429                        let trimmed = full_content.trim();
430                        if !trimmed.is_empty() {
431                            if trimmed.starts_with('{') || trimmed.starts_with('`') {
432                                is_json_mode = true;
433                            }
434                            checked_json_mode = true;
435                        }
436                    }
437
438                    if checked_json_mode && !is_json_mode {
439                        let extracted =
440                            self.extract_streaming_commands(&mut buffer, &play_id, false);
441                        for cmd in extracted {
442                            if let Some(call) = &self.call {
443                                let _ = call.enqueue_command(cmd).await;
444                            } else {
445                                commands.push(cmd);
446                            }
447                        }
448                    }
449                }
450            }
451        }
452
453        // Send debug event - LLM response received
454        let end_time = crate::media::get_timestamp();
455        self.send_debug_event(
456            "llm_response",
457            json!({
458                "response": full_content,
459                "reasoning": full_reasoning,
460                "is_json_mode": is_json_mode,
461                "duration": end_time - start_time,
462                "ttfb": first_token_time.map(|t| t - start_time).unwrap_or(0),
463                "playId": play_id,
464            }),
465        );
466
467        if is_json_mode {
468            self.interpret_response(full_content).await
469        } else {
470            let extracted = self.extract_streaming_commands(&mut buffer, &play_id, true);
471            for cmd in extracted {
472                if let Some(call) = &self.call {
473                    let _ = call.enqueue_command(cmd).await;
474                } else {
475                    commands.push(cmd);
476                }
477            }
478            if !full_content.trim().is_empty() {
479                self.history.push(ChatMessage {
480                    role: "assistant".to_string(),
481                    content: full_content,
482                });
483                self.last_robot_msg_at = Some(std::time::Instant::now());
484                self.is_speaking = true;
485                self.last_tts_start_at = Some(std::time::Instant::now());
486            }
487            Ok(commands)
488        }
489    }
490
491    fn extract_streaming_commands(
492        &mut self,
493        buffer: &mut String,
494        play_id: &str,
495        is_final: bool,
496    ) -> Vec<Command> {
497        let mut commands = Vec::new();
498
499        loop {
500            let hangup_pos = RE_HANGUP.find(buffer);
501            let refer_pos = RE_REFER.captures(buffer);
502            let play_pos = RE_PLAY.captures(buffer);
503            let goto_pos = RE_GOTO.captures(buffer);
504            let sentence_pos = RE_SENTENCE.find(buffer);
505
506            // Find the first occurrence
507            let mut positions: Vec<(usize, CommandKind)> = Vec::new();
508            if let Some(m) = hangup_pos {
509                positions.push((m.start(), CommandKind::Hangup));
510            }
511            if let Some(caps) = &refer_pos {
512                positions.push((caps.get(0).unwrap().start(), CommandKind::Refer));
513            }
514            if let Some(caps) = &play_pos {
515                positions.push((caps.get(0).unwrap().start(), CommandKind::Play));
516            }
517            if let Some(caps) = &goto_pos {
518                positions.push((caps.get(0).unwrap().start(), CommandKind::Goto));
519            }
520            if let Some(m) = sentence_pos {
521                positions.push((m.start(), CommandKind::Sentence));
522            }
523
524            positions.sort_by_key(|p| p.0);
525
526            if let Some((pos, kind)) = positions.first() {
527                let pos = *pos;
528                match kind {
529                    CommandKind::Hangup => {
530                        let prefix = buffer[..pos].to_string();
531                        if !prefix.trim().is_empty() {
532                            let mut cmd = self.create_tts_command_with_id(
533                                prefix,
534                                play_id.to_string(),
535                                Some(true),
536                            );
537                            if let Command::Tts { end_of_stream, .. } = &mut cmd {
538                                *end_of_stream = Some(true);
539                            }
540                            self.is_hanging_up = true;
541                            commands.push(cmd);
542                        } else {
543                            let mut cmd = self.create_tts_command_with_id(
544                                "".to_string(),
545                                play_id.to_string(),
546                                Some(true),
547                            );
548                            if let Command::Tts { end_of_stream, .. } = &mut cmd {
549                                *end_of_stream = Some(true);
550                            }
551                            self.is_hanging_up = true;
552                            commands.push(cmd);
553                        }
554                        buffer.drain(..RE_HANGUP.find(buffer).unwrap().end());
555                        return commands;
556                    }
557                    CommandKind::Refer => {
558                        let caps = RE_REFER.captures(buffer).unwrap();
559                        let mat = caps.get(0).unwrap();
560                        let callee = caps.get(1).unwrap().as_str().to_string();
561
562                        let prefix = buffer[..pos].to_string();
563                        if !prefix.trim().is_empty() {
564                            commands.push(self.create_tts_command_with_id(
565                                prefix,
566                                play_id.to_string(),
567                                None,
568                            ));
569                        }
570                        commands.push(Command::Refer {
571                            caller: String::new(),
572                            callee,
573                            options: None,
574                        });
575                        buffer.drain(..mat.end());
576                    }
577                    CommandKind::Play => {
578                        // Play audio
579                        let caps = RE_PLAY.captures(buffer).unwrap();
580                        let mat = caps.get(0).unwrap();
581                        let url = caps.get(1).unwrap().as_str().to_string();
582
583                        let prefix = buffer[..pos].to_string();
584                        if !prefix.trim().is_empty() {
585                            commands.push(self.create_tts_command_with_id(
586                                prefix,
587                                play_id.to_string(),
588                                None,
589                            ));
590                        }
591                        commands.push(Command::Play {
592                            url,
593                            play_id: None,
594                            auto_hangup: None,
595                            wait_input_timeout: None,
596                        });
597                        buffer.drain(..mat.end());
598                    }
599                    CommandKind::Goto => {
600                        // Goto Scene
601                        let caps = RE_GOTO.captures(buffer).unwrap();
602                        let mat = caps.get(0).unwrap();
603                        let scene_id = caps.get(1).unwrap().as_str().to_string();
604
605                        let prefix = buffer[..pos].to_string();
606                        if !prefix.trim().is_empty() {
607                            commands.push(self.create_tts_command_with_id(
608                                prefix,
609                                play_id.to_string(),
610                                None,
611                            ));
612                        }
613
614                        info!("Switching to scene (from stream): {}", scene_id);
615                        if let Some(scene) = self.scenes.get(&scene_id) {
616                            self.current_scene_id = Some(scene_id);
617                            // Update system prompt in history
618                            let system_prompt =
619                                Self::build_system_prompt(&self.config, Some(&scene.prompt));
620                            if let Some(first_msg) = self.history.get_mut(0) {
621                                if first_msg.role == "system" {
622                                    first_msg.content = system_prompt;
623                                }
624                            }
625                        } else {
626                            warn!("Scene not found: {}", scene_id);
627                        }
628
629                        buffer.drain(..mat.end());
630                    }
631                    CommandKind::Sentence => {
632                        // Sentence
633                        let mat = sentence_pos.unwrap();
634                        let sentence = buffer[..mat.end()].to_string();
635                        if !sentence.trim().is_empty() {
636                            commands.push(self.create_tts_command_with_id(
637                                sentence,
638                                play_id.to_string(),
639                                None,
640                            ));
641                        }
642                        buffer.drain(..mat.end());
643                    }
644                }
645            } else {
646                break;
647            }
648        }
649
650        if is_final {
651            let remaining = buffer.trim().to_string();
652            if !remaining.is_empty() {
653                commands.push(self.create_tts_command_with_id(
654                    remaining,
655                    play_id.to_string(),
656                    None,
657                ));
658            }
659            buffer.clear();
660
661            if let Some(last) = commands.last_mut() {
662                if let Command::Tts { end_of_stream, .. } = last {
663                    *end_of_stream = Some(true);
664                }
665            } else if !self.is_hanging_up {
666                commands.push(Command::Tts {
667                    text: "".to_string(),
668                    speaker: None,
669                    play_id: Some(play_id.to_string()),
670                    auto_hangup: None,
671                    streaming: Some(true),
672                    end_of_stream: Some(true),
673                    option: None,
674                    wait_input_timeout: None,
675                    base64: None,
676                });
677            }
678        }
679
680        commands
681    }
682
683    fn create_tts_command_with_id(
684        &self,
685        text: String,
686        play_id: String,
687        auto_hangup: Option<bool>,
688    ) -> Command {
689        Command::Tts {
690            text,
691            speaker: None,
692            play_id: Some(play_id),
693            auto_hangup,
694            streaming: Some(true),
695            end_of_stream: None,
696            option: None,
697            wait_input_timeout: Some(10000),
698            base64: None,
699        }
700    }
701
702    async fn handle_tool_invocation(
703        &mut self,
704        tool: ToolInvocation,
705        tool_commands: &mut Vec<Command>,
706    ) -> Result<bool> {
707        match tool {
708            ToolInvocation::Hangup {
709                ref reason,
710                ref initiator,
711            } => {
712                self.send_debug_event(
713                    "tool_invocation",
714                    json!({
715                        "tool": "Hangup",
716                        "params": {
717                            "reason": reason,
718                            "initiator": initiator,
719                        }
720                    }),
721                );
722                tool_commands.push(Command::Hangup {
723                    reason: reason.clone(),
724                    initiator: initiator.clone(),
725                });
726                Ok(false)
727            }
728            ToolInvocation::Refer {
729                ref caller,
730                ref callee,
731                ref options,
732            } => {
733                self.send_debug_event(
734                    "tool_invocation",
735                    json!({
736                        "tool": "Refer",
737                        "params": {
738                            "caller": caller,
739                            "callee": callee,
740                        }
741                    }),
742                );
743                tool_commands.push(Command::Refer {
744                    caller: caller.clone(),
745                    callee: callee.clone(),
746                    options: options.clone(),
747                });
748                Ok(false)
749            }
750            ToolInvocation::Rag {
751                ref query,
752                ref source,
753            } => {
754                self.handle_rag_tool(query, source).await?;
755                Ok(true)
756            }
757            ToolInvocation::Accept { ref options } => {
758                self.send_debug_event("tool_invocation", json!({ "tool": "Accept" }));
759                tool_commands.push(Command::Accept {
760                    option: options.clone().unwrap_or_default(),
761                });
762                Ok(false)
763            }
764            ToolInvocation::Reject { ref reason, code } => {
765                self.send_debug_event(
766                    "tool_invocation",
767                    json!({
768                        "tool": "Reject",
769                        "params": {
770                            "reason": reason,
771                            "code": code,
772                        }
773                    }),
774                );
775                tool_commands.push(Command::Reject {
776                    reason: reason
777                        .clone()
778                        .unwrap_or_else(|| "Rejected by agent".to_string()),
779                    code,
780                });
781                Ok(false)
782            }
783            ToolInvocation::Http {
784                ref url,
785                ref method,
786                ref body,
787                ref headers,
788            } => {
789                self.handle_http_tool(url, method, body, headers).await?;
790                Ok(true)
791            }
792        }
793    }
794
795    async fn handle_rag_tool(&mut self, query: &str, source: &Option<String>) -> Result<()> {
796        self.send_debug_event(
797            "tool_invocation",
798            json!({
799                "tool": "Rag",
800                "params": {
801                    "query": query,
802                    "source": source,
803                }
804            }),
805        );
806
807        let rag_result = self.rag_retriever.retrieve(query).await?;
808
809        self.send_debug_event(
810            "rag_result",
811            json!({
812                "query": query,
813                "result": rag_result,
814            }),
815        );
816
817        let summary = if let Some(source) = source {
818            format!("[{}] {}", source, rag_result)
819        } else {
820            rag_result
821        };
822
823        self.history.push(ChatMessage {
824            role: "system".to_string(),
825            content: format!("RAG result for {}: {}", query, summary),
826        });
827
828        Ok(())
829    }
830
831    async fn handle_http_tool(
832        &mut self,
833        url: &str,
834        method: &Option<String>,
835        body: &Option<serde_json::Value>,
836        headers: &Option<HashMap<String, String>>,
837    ) -> Result<()> {
838        let method_str = method.as_deref().unwrap_or("GET").to_uppercase();
839        let method =
840            reqwest::Method::from_bytes(method_str.as_bytes()).unwrap_or(reqwest::Method::GET);
841
842        self.send_debug_event(
843            "tool_invocation",
844            json!({
845                "tool": "Http",
846                "params": {
847                    "url": url,
848                    "method": method_str,
849                }
850            }),
851        );
852
853        let mut req = self.client.request(method, url);
854        if let Some(body) = body {
855            req = req.json(body);
856        }
857        if let Some(headers) = headers {
858            for (k, v) in headers {
859                req = req.header(k, v);
860            }
861        }
862
863        match req.send().await {
864            Ok(res) => {
865                let status = res.status();
866                let text = res.text().await.unwrap_or_default();
867                self.history.push(ChatMessage {
868                    role: "system".to_string(),
869                    content: format!("HTTP tool response ({}): {}", status, text),
870                });
871            }
872            Err(e) => {
873                warn!("HTTP tool failed: {}", e);
874                self.history.push(ChatMessage {
875                    role: "system".to_string(),
876                    content: format!("HTTP tool failed: {}", e),
877                });
878            }
879        }
880
881        Ok(())
882    }
883
884    async fn handle_asr_final(&mut self, text: &str) -> Result<Vec<Command>> {
885        if text.trim().is_empty() {
886            return Ok(vec![]);
887        }
888
889        self.apply_context_repair(text);
890        self.apply_rolling_summary().await;
891
892        self.last_asr_final_at = Some(std::time::Instant::now());
893        self.last_interaction_at = std::time::Instant::now();
894        self.is_speaking = false;
895        self.consecutive_follow_ups = 0;
896
897        self.generate_response().await
898    }
899
900    fn apply_context_repair(&mut self, text: &str) {
901        let enable_repair = self
902            .config
903            .features
904            .as_ref()
905            .map(|f| f.contains(&"context_repair".to_string()))
906            .unwrap_or(false);
907
908        if !enable_repair {
909            self.history.push(ChatMessage {
910                role: "user".to_string(),
911                content: text.to_string(),
912            });
913            return;
914        }
915
916        let repair_window_ms = self.config.repair_window_ms.unwrap_or(3000) as u128;
917        let mut merged = false;
918
919        if let Some(last_robot_at) = self.last_robot_msg_at {
920            if last_robot_at.elapsed().as_millis() < repair_window_ms {
921                if let Some(last_msg) = self.history.last() {
922                    if last_msg.role == "assistant" && last_msg.content.chars().count() < 15 {
923                        info!(
924                            "Context Repair: Detected potential fragmentation. Triggering merge."
925                        );
926                        self.history.pop();
927                        if let Some(prev_user) = self.history.last_mut() {
928                            if prev_user.role == "user" {
929                                prev_user.content.push_str(",");
930                                prev_user.content.push_str(text);
931                                merged = true;
932                            }
933                        }
934                    }
935                }
936            }
937        }
938
939        if !merged {
940            self.history.push(ChatMessage {
941                role: "user".to_string(),
942                content: text.to_string(),
943            });
944        }
945    }
946
947    async fn apply_rolling_summary(&mut self) {
948        let enable_summary = self
949            .config
950            .features
951            .as_ref()
952            .map(|f| f.contains(&"rolling_summary".to_string()))
953            .unwrap_or(false);
954
955        if !enable_summary {
956            return;
957        }
958
959        let summary_limit = self.config.summary_limit.unwrap_or(20);
960        if self.history.len() <= summary_limit {
961            return;
962        }
963
964        info!("Rolling Summary: History limit reached. Triggering background summary.");
965        let keep_recent = 6;
966        if self.history.len() <= summary_limit + keep_recent
967            || self.history.len() <= keep_recent + 1
968        {
969            return;
970        }
971
972        let split_idx = self.history.len() - keep_recent;
973        let to_summarize = self.history[1..split_idx].to_vec();
974        let recent = self.history[split_idx..].to_vec();
975
976        let summary_prompt =
977            "Summarize the above conversation so far, focusing on key details and user intent.";
978        let mut summary_req_history = to_summarize;
979        summary_req_history.push(ChatMessage {
980            role: "user".to_string(),
981            content: summary_prompt.to_string(),
982        });
983
984        match self.provider.call(&self.config, &summary_req_history).await {
985            Ok(summary) => {
986                let mut new_history = Vec::new();
987                if let Some(sys) = self.history.first() {
988                    let mut new_sys = sys.clone();
989                    new_sys.content.push_str("\n\n[Previous Context Summary]: ");
990                    new_sys.content.push_str(&summary);
991                    new_history.push(new_sys);
992                }
993                new_history.extend(recent);
994                self.history = new_history;
995                info!(
996                    "Rolling Summary: Applied summary. New history len: {}",
997                    self.history.len()
998                );
999            }
1000            Err(e) => {
1001                warn!("Rolling Summary failed: {}", e);
1002            }
1003        }
1004    }
1005
1006    fn check_interruption(
1007        &mut self,
1008        event: &SessionEvent,
1009        is_filler: &Option<bool>,
1010    ) -> Option<Command> {
1011        let strategy = self.interruption_config.strategy;
1012        let should_check = match (strategy, event) {
1013            (InterruptionStrategy::None, _) => false,
1014            (InterruptionStrategy::Vad, SessionEvent::Speaking { .. }) => true,
1015            (InterruptionStrategy::Asr, SessionEvent::AsrDelta { .. }) => true,
1016            (InterruptionStrategy::Both, _) => true,
1017            _ => false,
1018        };
1019
1020        if !self.is_speaking || self.is_hanging_up || !should_check {
1021            return None;
1022        }
1023
1024        // Protection period check
1025        if let Some(last_start) = self.last_tts_start_at {
1026            let ignore_ms = self.interruption_config.ignore_first_ms.unwrap_or(800);
1027            if last_start.elapsed().as_millis() < ignore_ms as u128 {
1028                return None;
1029            }
1030        }
1031
1032        // Filler word filter
1033        if self.interruption_config.filler_word_filter.unwrap_or(false) {
1034            if let Some(true) = is_filler {
1035                return None;
1036            }
1037            if let SessionEvent::AsrDelta { text, .. } = event {
1038                if is_likely_filler(text) {
1039                    return None;
1040                }
1041            }
1042        }
1043
1044        // Stale event check
1045        if let Some(last_final) = self.last_asr_final_at {
1046            if last_final.elapsed().as_millis() < 500 {
1047                return None;
1048            }
1049        }
1050
1051        info!("Smart interruption detected, stopping playback");
1052        self.is_speaking = false;
1053        Some(Command::Interrupt {
1054            graceful: Some(true),
1055            fade_out_ms: self.interruption_config.volume_fade_ms,
1056        })
1057    }
1058
1059    async fn handle_silence(&mut self) -> Result<Vec<Command>> {
1060        let follow_up_config = if let Some(scene_id) = &self.current_scene_id {
1061            self.scenes
1062                .get(scene_id)
1063                .and_then(|s| s.follow_up)
1064                .or(self.global_follow_up_config)
1065        } else {
1066            self.global_follow_up_config
1067        };
1068
1069        let Some(config) = follow_up_config else {
1070            return Ok(vec![]);
1071        };
1072
1073        if self.is_speaking
1074            || self.last_interaction_at.elapsed().as_millis() < config.timeout as u128
1075        {
1076            return Ok(vec![]);
1077        }
1078
1079        if self.consecutive_follow_ups >= config.max_count {
1080            info!("Max follow-up count reached, hanging up");
1081            return Ok(vec![Command::Hangup {
1082                reason: Some("Max follow-up reached".to_string()),
1083                initiator: Some("system".to_string()),
1084            }]);
1085        }
1086
1087        info!(
1088            "Silence timeout detected ({}ms), triggering follow-up ({}/{})",
1089            self.last_interaction_at.elapsed().as_millis(),
1090            self.consecutive_follow_ups + 1,
1091            config.max_count
1092        );
1093        self.consecutive_follow_ups += 1;
1094        self.last_interaction_at = std::time::Instant::now();
1095        self.generate_response().await
1096    }
1097
1098    async fn handle_function_call(&mut self, name: &str, arguments: &str) -> Result<Vec<Command>> {
1099        info!(
1100            "Function call from Realtime: {} with args {}",
1101            name, arguments
1102        );
1103        let args: serde_json::Value = serde_json::from_str(arguments).unwrap_or_default();
1104
1105        match name {
1106            "hangup_call" => Ok(vec![Command::Hangup {
1107                reason: args["reason"].as_str().map(|s| s.to_string()),
1108                initiator: Some("ai".to_string()),
1109            }]),
1110            "transfer_call" | "refer_call" => {
1111                if let Some(callee) = args["callee"]
1112                    .as_str()
1113                    .or_else(|| args["callee_uri"].as_str())
1114                {
1115                    Ok(vec![Command::Refer {
1116                        caller: String::new(),
1117                        callee: callee.to_string(),
1118                        options: None,
1119                    }])
1120                } else {
1121                    warn!("No callee provided for transfer_call");
1122                    Ok(vec![])
1123                }
1124            }
1125            "goto_scene" => {
1126                if let Some(scene) = args["scene"].as_str() {
1127                    self.switch_to_scene(scene, false).await
1128                } else {
1129                    Ok(vec![])
1130                }
1131            }
1132            _ => {
1133                warn!("Unhandled function call: {}", name);
1134                Ok(vec![])
1135            }
1136        }
1137    }
1138
1139    async fn interpret_response(&mut self, initial: String) -> Result<Vec<Command>> {
1140        let mut tool_commands = Vec::new();
1141        let mut wait_input_timeout = None;
1142        let mut attempts = 0;
1143        let mut raw = initial;
1144
1145        let final_text = loop {
1146            attempts += 1;
1147
1148            let Some(structured) = parse_structured_response(&raw) else {
1149                break Some(raw);
1150            };
1151
1152            if wait_input_timeout.is_none() {
1153                wait_input_timeout = structured.wait_input_timeout;
1154            }
1155
1156            let mut rerun_for_rag = false;
1157            if let Some(tools) = structured.tools {
1158                for tool in tools {
1159                    let needs_rerun = self
1160                        .handle_tool_invocation(tool, &mut tool_commands)
1161                        .await?;
1162                    rerun_for_rag = rerun_for_rag || needs_rerun;
1163                }
1164            }
1165
1166            if !rerun_for_rag {
1167                break structured.text;
1168            }
1169
1170            if attempts >= MAX_RAG_ATTEMPTS {
1171                warn!("Reached RAG iteration limit, using last response");
1172                break structured.text.or(Some(raw));
1173            }
1174
1175            raw = self.call_llm().await?;
1176        };
1177
1178        let has_hangup = tool_commands
1179            .iter()
1180            .any(|c| matches!(c, Command::Hangup { .. }));
1181        let mut commands = Vec::new();
1182
1183        if let Some(text) = final_text {
1184            if !text.trim().is_empty() {
1185                self.history.push(ChatMessage {
1186                    role: "assistant".to_string(),
1187                    content: text.clone(),
1188                });
1189                self.last_tts_start_at = Some(std::time::Instant::now());
1190                self.is_speaking = true;
1191
1192                let auto_hangup = has_hangup.then_some(true);
1193                commands.push(self.create_tts_command(text, wait_input_timeout, auto_hangup));
1194
1195                if has_hangup {
1196                    tool_commands.retain(|c| !matches!(c, Command::Hangup { .. }));
1197                    self.is_hanging_up = true;
1198                }
1199            }
1200        }
1201
1202        commands.extend(tool_commands);
1203        Ok(commands)
1204    }
1205}
1206
1207fn parse_structured_response(raw: &str) -> Option<StructuredResponse> {
1208    let payload = extract_json_block(raw)?;
1209    serde_json::from_str(payload).ok()
1210}
1211
1212fn is_likely_filler(text: &str) -> bool {
1213    let trimmed = text.trim().to_lowercase();
1214    FILLERS.contains(&trimmed)
1215}
1216
1217fn extract_json_block(raw: &str) -> Option<&str> {
1218    let trimmed = raw.trim();
1219    if trimmed.starts_with('`') {
1220        if let Some(end) = trimmed.rfind("```") {
1221            if end <= 3 {
1222                return None;
1223            }
1224            let mut inner = &trimmed[3..end];
1225            inner = inner.trim();
1226            if inner.to_lowercase().starts_with("json") {
1227                if let Some(newline) = inner.find('\n') {
1228                    inner = inner[newline + 1..].trim();
1229                } else if inner.len() > 4 {
1230                    inner = inner[4..].trim();
1231                } else {
1232                    inner = inner.trim();
1233                }
1234            }
1235            return Some(inner);
1236        }
1237    } else if trimmed.starts_with('{') || trimmed.starts_with('[') {
1238        return Some(trimmed);
1239    }
1240    None
1241}
1242
1243#[async_trait]
1244impl DialogueHandler for LlmHandler {
1245    async fn on_start(&mut self) -> Result<Vec<Command>> {
1246        self.last_tts_start_at = Some(std::time::Instant::now());
1247
1248        let mut commands = Vec::new();
1249
1250        // Check if current scene has an audio file to play
1251        if let Some(scene_id) = &self.current_scene_id {
1252            if let Some(scene) = self.scenes.get(scene_id) {
1253                if let Some(audio_file) = &scene.play {
1254                    commands.push(Command::Play {
1255                        url: audio_file.clone(),
1256                        play_id: None,
1257                        auto_hangup: None,
1258                        wait_input_timeout: None,
1259                    });
1260                }
1261            }
1262        }
1263
1264        if let Some(greeting) = &self.config.greeting {
1265            self.is_speaking = true;
1266            commands.push(self.create_tts_command(greeting.clone(), None, None));
1267            return Ok(commands);
1268        }
1269
1270        let response_commands = self.generate_response().await?;
1271        commands.extend(response_commands);
1272        Ok(commands)
1273    }
1274
1275    async fn on_event(&mut self, event: &SessionEvent) -> Result<Vec<Command>> {
1276        match event {
1277            SessionEvent::Dtmf { digit, .. } => {
1278                info!("DTMF received: {}", digit);
1279                if let Some(action) = self.get_dtmf_action(digit) {
1280                    self.handle_dtmf_action(action).await
1281                } else {
1282                    Ok(vec![])
1283                }
1284            }
1285            SessionEvent::AsrFinal { text, .. } => self.handle_asr_final(text).await,
1286            SessionEvent::AsrDelta { is_filler, .. } | SessionEvent::Speaking { is_filler, .. } => {
1287                Ok(self
1288                    .check_interruption(event, is_filler)
1289                    .into_iter()
1290                    .collect())
1291            }
1292            SessionEvent::Eou { completed, .. } => {
1293                if *completed && !self.is_speaking {
1294                    info!("EOU detected, triggering early response");
1295                    self.generate_response().await
1296                } else {
1297                    Ok(vec![])
1298                }
1299            }
1300            SessionEvent::Silence { .. } => self.handle_silence().await,
1301            SessionEvent::TrackStart { .. } => {
1302                self.is_speaking = true;
1303                Ok(vec![])
1304            }
1305            SessionEvent::TrackEnd { .. } => {
1306                self.is_speaking = false;
1307                self.is_hanging_up = false;
1308                self.last_interaction_at = std::time::Instant::now();
1309                Ok(vec![])
1310            }
1311            SessionEvent::FunctionCall {
1312                name, arguments, ..
1313            } => self.handle_function_call(name, arguments).await,
1314            _ => Ok(vec![]),
1315        }
1316    }
1317
1318    async fn get_history(&self) -> Vec<ChatMessage> {
1319        self.history.clone()
1320    }
1321
1322    async fn summarize(&mut self, prompt: &str) -> Result<String> {
1323        info!("Generating summary with prompt: {}", prompt);
1324        let mut summary_history = self.history.clone();
1325        summary_history.push(ChatMessage {
1326            role: "user".to_string(),
1327            content: prompt.to_string(),
1328        });
1329
1330        self.provider.call(&self.config, &summary_history).await
1331    }
1332}