Skip to main content

active_call/playbook/handler/
mod.rs

1use crate::call::Command;
2use crate::event::SessionEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use futures::StreamExt;
6use once_cell::sync::Lazy;
7use regex::Regex;
8use reqwest::Client;
9use serde_json::json;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tracing::{info, warn};
13
14#[cfg(test)]
15mod tests;
16
17static RE_HANGUP: Lazy<Regex> = Lazy::new(|| Regex::new(r"<hangup\s*/>").unwrap());
18static RE_REFER: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<refer\s+to="([^"]+)"\s*/>"#).unwrap());
19static RE_PLAY: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<play\s+file="([^"]+)"\s*/>"#).unwrap());
20static RE_GOTO: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<goto\s+scene="([^"]+)"\s*/>"#).unwrap());
21static RE_SENTENCE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)[.!?。!?\n]\s*").unwrap());
22static FILLERS: Lazy<std::collections::HashSet<String>> = Lazy::new(|| {
23    let mut s = std::collections::HashSet::new();
24    let default_fillers = ["嗯", "啊", "哦", "那个", "那个...", "uh", "um", "ah"];
25
26    if let Ok(content) = std::fs::read_to_string("config/fillers.txt") {
27        for line in content.lines() {
28            let trimmed = line.trim().to_lowercase();
29            if !trimmed.is_empty() {
30                s.insert(trimmed);
31            }
32        }
33    }
34
35    if s.is_empty() {
36        for f in default_fillers {
37            s.insert(f.to_string());
38        }
39    }
40    s
41});
42
43use super::ChatMessage;
44use super::InterruptionStrategy;
45use super::LlmConfig;
46use super::dialogue::DialogueHandler;
47
48pub mod provider;
49pub mod rag;
50pub mod types;
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53enum CommandKind {
54    Hangup,
55    Refer,
56    Sentence,
57    Play,
58    Goto,
59}
60
61pub use provider::*;
62pub use rag::*;
63pub use types::*;
64
65const MAX_RAG_ATTEMPTS: usize = 3;
66
67pub struct LlmHandler {
68    config: LlmConfig,
69    interruption_config: super::InterruptionConfig,
70    global_follow_up_config: Option<super::FollowUpConfig>,
71    dtmf_config: Option<HashMap<String, super::DtmfAction>>,
72    history: Vec<ChatMessage>,
73    provider: Arc<dyn LlmProvider>,
74    rag_retriever: Arc<dyn RagRetriever>,
75    is_speaking: bool,
76    is_hanging_up: bool,
77    consecutive_follow_ups: u32,
78    last_interaction_at: std::time::Instant,
79    event_sender: Option<crate::event::EventSender>,
80    last_asr_final_at: Option<std::time::Instant>,
81    last_tts_start_at: Option<std::time::Instant>,
82    last_robot_msg_at: Option<std::time::Instant>,
83    call: Option<crate::call::ActiveCallRef>,
84    scenes: HashMap<String, super::Scene>,
85    current_scene_id: Option<String>,
86    client: Client,
87}
88
89impl LlmHandler {
90    pub fn new(
91        config: LlmConfig,
92        interruption: super::InterruptionConfig,
93        global_follow_up_config: Option<super::FollowUpConfig>,
94        scenes: HashMap<String, super::Scene>,
95        dtmf: Option<HashMap<String, super::DtmfAction>>,
96        initial_scene_id: Option<String>,
97    ) -> Self {
98        Self::with_provider(
99            config,
100            Arc::new(DefaultLlmProvider::new()),
101            Arc::new(NoopRagRetriever),
102            interruption,
103            global_follow_up_config,
104            scenes,
105            dtmf,
106            initial_scene_id,
107        )
108    }
109
110    pub fn with_provider(
111        config: LlmConfig,
112        provider: Arc<dyn LlmProvider>,
113        rag_retriever: Arc<dyn RagRetriever>,
114        interruption: super::InterruptionConfig,
115        global_follow_up_config: Option<super::FollowUpConfig>,
116        scenes: HashMap<String, super::Scene>,
117        dtmf: Option<HashMap<String, super::DtmfAction>>,
118        initial_scene_id: Option<String>,
119    ) -> Self {
120        let mut history = Vec::new();
121        let system_prompt = Self::build_system_prompt(&config, None);
122
123        history.push(ChatMessage {
124            role: "system".to_string(),
125            content: system_prompt,
126        });
127
128        Self {
129            config,
130            interruption_config: interruption,
131            global_follow_up_config,
132            dtmf_config: dtmf,
133            history,
134            provider,
135            rag_retriever,
136            is_speaking: false,
137            is_hanging_up: false,
138            consecutive_follow_ups: 0,
139            last_interaction_at: std::time::Instant::now(),
140            event_sender: None,
141            last_asr_final_at: None,
142            last_tts_start_at: None,
143            last_robot_msg_at: None,
144            call: None,
145            scenes,
146            current_scene_id: initial_scene_id,
147            client: Client::new(),
148        }
149    }
150
151    fn build_system_prompt(config: &LlmConfig, scene_prompt: Option<&str>) -> String {
152        let base_prompt =
153            scene_prompt.unwrap_or_else(|| config.prompt.as_deref().unwrap_or_default());
154        let mut features_prompt = String::new();
155
156        if let Some(features) = &config.features {
157            let lang = config.language.as_deref().unwrap_or("zh");
158            for feature in features {
159                match Self::load_feature_snippet(feature, lang) {
160                    Ok(snippet) => {
161                        features_prompt.push_str(&format!("\n- {}", snippet));
162                    }
163                    Err(e) => {
164                        warn!("Failed to load feature snippet {}: {}", feature, e);
165                    }
166                }
167            }
168        }
169
170        let features_section = if features_prompt.is_empty() {
171            String::new()
172        } else {
173            format!("\n\n### Enhanced Capabilities:{}\n", features_prompt)
174        };
175
176        // Load tool instructions - either custom or language-specific default
177        let tool_instructions = if let Some(custom) = &config.tool_instructions {
178            custom.clone()
179        } else {
180            let lang = config.language.as_deref().unwrap_or("zh");
181            Self::load_feature_snippet("tool_instructions", lang)
182                .unwrap_or_else(|_| {
183                    // Fallback to English if loading fails
184                    Self::load_feature_snippet("tool_instructions", "en")
185                        .unwrap_or_else(|_| {
186                            // Ultimate fallback to hardcoded English
187                            "Tool usage instructions:\n\
188                            - To hang up the call, output: <hangup/>\n\
189                            - To transfer the call, output: <refer to=\"sip:xxxx\"/>\n\
190                            - To play an audio file, output: <play file=\"path/to/file.wav\"/>\n\
191                            - To switch to another scene, output: <goto scene=\"scene_id\"/>\n\
192                            - To call an external HTTP API, output JSON:\n\
193                              ```json\n\
194                              {{ \"tools\": [{{ \"name\": \"http\", \"url\": \"...\", \"method\": \"POST\", \"body\": {{ ... }} }}] }}\n\
195                              ```\n\
196                            Please use XML tags for simple actions and JSON blocks for tool calls. \
197                            Output your response in short sentences. Each sentence will be played as soon as it is finished."
198                                .to_string()
199                        })
200                })
201        };
202
203        format!(
204            "{}{}\n\n{}",
205            base_prompt, features_section, tool_instructions
206        )
207    }
208
209    fn load_feature_snippet(feature: &str, lang: &str) -> Result<String> {
210        let path = format!("features/{}.{}.md", feature, lang);
211        let content = std::fs::read_to_string(path)?;
212        Ok(content.trim().to_string())
213    }
214
215    fn get_dtmf_action(&self, digit: &str) -> Option<super::DtmfAction> {
216        if let Some(scene_id) = &self.current_scene_id {
217            if let Some(scene) = self.scenes.get(scene_id) {
218                if let Some(dtmf) = &scene.dtmf {
219                    if let Some(action) = dtmf.get(digit) {
220                        return Some(action.clone());
221                    }
222                }
223            }
224        }
225
226        if let Some(dtmf) = &self.dtmf_config {
227            if let Some(action) = dtmf.get(digit) {
228                return Some(action.clone());
229            }
230        }
231
232        None
233    }
234
235    async fn handle_dtmf_action(&mut self, action: super::DtmfAction) -> Result<Vec<Command>> {
236        match action {
237            super::DtmfAction::Goto { scene } => {
238                info!("DTMF action: switch to scene {}", scene);
239                self.switch_to_scene(&scene, true).await
240            }
241            super::DtmfAction::Transfer { target } => {
242                info!("DTMF action: transfer to {}", target);
243                Ok(vec![Command::Refer {
244                    caller: String::new(),
245                    callee: target,
246                    options: None,
247                }])
248            }
249            super::DtmfAction::Hangup => {
250                info!("DTMF action: hangup");
251                Ok(vec![Command::Hangup {
252                    reason: Some("DTMF Hangup".to_string()),
253                    initiator: Some("ai".to_string()),
254                }])
255            }
256        }
257    }
258
259    async fn switch_to_scene(
260        &mut self,
261        scene_id: &str,
262        trigger_response: bool,
263    ) -> Result<Vec<Command>> {
264        if let Some(scene) = self.scenes.get(scene_id).cloned() {
265            info!("Switching to scene: {}", scene_id);
266            self.current_scene_id = Some(scene_id.to_string());
267            let system_prompt = Self::build_system_prompt(&self.config, Some(&scene.prompt));
268            if let Some(first_msg) = self.history.get_mut(0) {
269                if first_msg.role == "system" {
270                    first_msg.content = system_prompt;
271                }
272            }
273
274            let mut commands = Vec::new();
275            if let Some(url) = &scene.play {
276                commands.push(Command::Play {
277                    url: url.clone(),
278                    play_id: None,
279                    auto_hangup: None,
280                    wait_input_timeout: None,
281                });
282            }
283
284            if trigger_response {
285                let response_cmds = self.generate_response().await?;
286                commands.extend(response_cmds);
287            }
288            Ok(commands)
289        } else {
290            warn!("Scene not found: {}", scene_id);
291            Ok(vec![])
292        }
293    }
294
295    pub fn get_history_ref(&self) -> &[ChatMessage] {
296        &self.history
297    }
298
299    pub fn get_current_scene_id(&self) -> Option<String> {
300        self.current_scene_id.clone()
301    }
302
303    pub fn set_call(&mut self, call: crate::call::ActiveCallRef) {
304        self.call = Some(call);
305    }
306
307    pub fn set_event_sender(&mut self, sender: crate::event::EventSender) {
308        self.event_sender = Some(sender.clone());
309        if let Some(greeting) = &self.config.greeting {
310            let _ = sender.send(crate::event::SessionEvent::AddHistory {
311                sender: Some("system".to_string()),
312                timestamp: crate::media::get_timestamp(),
313                speaker: "assistant".to_string(),
314                text: greeting.clone(),
315            });
316        }
317    }
318
319    fn send_debug_event(&self, key: &str, data: serde_json::Value) {
320        if let Some(sender) = &self.event_sender {
321            let timestamp = crate::media::get_timestamp();
322            if key == "llm_response" {
323                if let Some(text) = data.get("response").and_then(|v| v.as_str()) {
324                    let _ = sender.send(crate::event::SessionEvent::AddHistory {
325                        sender: Some("llm".to_string()),
326                        timestamp,
327                        speaker: "assistant".to_string(),
328                        text: text.to_string(),
329                    });
330                }
331            }
332
333            let event = crate::event::SessionEvent::Metrics {
334                timestamp,
335                key: key.to_string(),
336                duration: 0,
337                data,
338            };
339            let _ = sender.send(event);
340        }
341    }
342
343    async fn call_llm(&self) -> Result<String> {
344        self.provider.call(&self.config, &self.history).await
345    }
346
347    fn create_tts_command(
348        &self,
349        text: String,
350        wait_input_timeout: Option<u32>,
351        auto_hangup: Option<bool>,
352    ) -> Command {
353        let timeout = wait_input_timeout.unwrap_or(10000);
354        let play_id = uuid::Uuid::new_v4().to_string();
355
356        if let Some(sender) = &self.event_sender {
357            let _ = sender.send(crate::event::SessionEvent::Metrics {
358                timestamp: crate::media::get_timestamp(),
359                key: "tts_play_id_map".to_string(),
360                duration: 0,
361                data: serde_json::json!({
362                    "playId": play_id,
363                    "text": text,
364                }),
365            });
366        }
367
368        Command::Tts {
369            text,
370            speaker: None,
371            play_id: Some(play_id),
372            auto_hangup,
373            streaming: None,
374            end_of_stream: Some(true),
375            option: None,
376            wait_input_timeout: Some(timeout),
377            base64: None,
378            cache_key: None,
379        }
380    }
381
382    async fn generate_response(&mut self) -> Result<Vec<Command>> {
383        let start_time = crate::media::get_timestamp();
384        let play_id = uuid::Uuid::new_v4().to_string();
385
386        // Send debug event - LLM call started
387        self.send_debug_event(
388            "llm_call_start",
389            json!({
390                "history_length": self.history.len(),
391                "playId": play_id,
392            }),
393        );
394
395        let mut stream = self
396            .provider
397            .call_stream(&self.config, &self.history)
398            .await?;
399
400        let mut full_content = String::new();
401        let mut full_reasoning = String::new();
402        let mut buffer = String::new();
403        let mut commands = Vec::new();
404        let mut is_json_mode = false;
405        let mut checked_json_mode = false;
406        let mut first_token_time = None;
407
408        while let Some(chunk_result) = stream.next().await {
409            let event = match chunk_result {
410                Ok(c) => c,
411                Err(e) => {
412                    warn!("LLM stream error: {}", e);
413                    break;
414                }
415            };
416
417            match event {
418                LlmStreamEvent::Reasoning(text) => {
419                    full_reasoning.push_str(&text);
420                }
421                LlmStreamEvent::Content(chunk) => {
422                    if first_token_time.is_none() && !chunk.trim().is_empty() {
423                        first_token_time = Some(crate::media::get_timestamp());
424                    }
425
426                    full_content.push_str(&chunk);
427                    buffer.push_str(&chunk);
428
429                    if !checked_json_mode {
430                        let trimmed = full_content.trim();
431                        if !trimmed.is_empty() {
432                            if trimmed.starts_with('{') || trimmed.starts_with('`') {
433                                is_json_mode = true;
434                            }
435                            checked_json_mode = true;
436                        }
437                    }
438
439                    if checked_json_mode && !is_json_mode {
440                        let extracted =
441                            self.extract_streaming_commands(&mut buffer, &play_id, false);
442                        for cmd in extracted {
443                            if let Some(call) = &self.call {
444                                let _ = call.enqueue_command(cmd).await;
445                            } else {
446                                commands.push(cmd);
447                            }
448                        }
449                    }
450                }
451            }
452        }
453
454        // Send debug event - LLM response received
455        let end_time = crate::media::get_timestamp();
456        self.send_debug_event(
457            "llm_response",
458            json!({
459                "response": full_content,
460                "reasoning": full_reasoning,
461                "is_json_mode": is_json_mode,
462                "duration": end_time - start_time,
463                "ttfb": first_token_time.map(|t| t - start_time).unwrap_or(0),
464                "playId": play_id,
465            }),
466        );
467
468        if is_json_mode {
469            self.interpret_response(full_content).await
470        } else {
471            let extracted = self.extract_streaming_commands(&mut buffer, &play_id, true);
472            for cmd in extracted {
473                if let Some(call) = &self.call {
474                    let _ = call.enqueue_command(cmd).await;
475                } else {
476                    commands.push(cmd);
477                }
478            }
479            if !full_content.trim().is_empty() {
480                self.history.push(ChatMessage {
481                    role: "assistant".to_string(),
482                    content: full_content,
483                });
484                self.last_robot_msg_at = Some(std::time::Instant::now());
485                self.is_speaking = true;
486                self.last_tts_start_at = Some(std::time::Instant::now());
487            }
488            Ok(commands)
489        }
490    }
491
492    fn extract_streaming_commands(
493        &mut self,
494        buffer: &mut String,
495        play_id: &str,
496        is_final: bool,
497    ) -> Vec<Command> {
498        let mut commands = Vec::new();
499
500        loop {
501            let hangup_pos = RE_HANGUP.find(buffer);
502            let refer_pos = RE_REFER.captures(buffer);
503            let play_pos = RE_PLAY.captures(buffer);
504            let goto_pos = RE_GOTO.captures(buffer);
505            let sentence_pos = RE_SENTENCE.find(buffer);
506
507            // Find the first occurrence
508            let mut positions: Vec<(usize, CommandKind)> = Vec::new();
509            if let Some(m) = hangup_pos {
510                positions.push((m.start(), CommandKind::Hangup));
511            }
512            if let Some(caps) = &refer_pos {
513                positions.push((caps.get(0).unwrap().start(), CommandKind::Refer));
514            }
515            if let Some(caps) = &play_pos {
516                positions.push((caps.get(0).unwrap().start(), CommandKind::Play));
517            }
518            if let Some(caps) = &goto_pos {
519                positions.push((caps.get(0).unwrap().start(), CommandKind::Goto));
520            }
521            if let Some(m) = sentence_pos {
522                positions.push((m.start(), CommandKind::Sentence));
523            }
524
525            positions.sort_by_key(|p| p.0);
526
527            if let Some((pos, kind)) = positions.first() {
528                let pos = *pos;
529                match kind {
530                    CommandKind::Hangup => {
531                        let prefix = buffer[..pos].to_string();
532                        if !prefix.trim().is_empty() {
533                            let mut cmd = self.create_tts_command_with_id(
534                                prefix,
535                                play_id.to_string(),
536                                Some(true),
537                            );
538                            if let Command::Tts { end_of_stream, .. } = &mut cmd {
539                                *end_of_stream = Some(true);
540                            }
541                            self.is_hanging_up = true;
542                            commands.push(cmd);
543                        } else {
544                            let mut cmd = self.create_tts_command_with_id(
545                                "".to_string(),
546                                play_id.to_string(),
547                                Some(true),
548                            );
549                            if let Command::Tts { end_of_stream, .. } = &mut cmd {
550                                *end_of_stream = Some(true);
551                            }
552                            self.is_hanging_up = true;
553                            commands.push(cmd);
554                        }
555                        buffer.drain(..RE_HANGUP.find(buffer).unwrap().end());
556                        return commands;
557                    }
558                    CommandKind::Refer => {
559                        let caps = RE_REFER.captures(buffer).unwrap();
560                        let mat = caps.get(0).unwrap();
561                        let callee = caps.get(1).unwrap().as_str().to_string();
562
563                        let prefix = buffer[..pos].to_string();
564                        if !prefix.trim().is_empty() {
565                            commands.push(self.create_tts_command_with_id(
566                                prefix,
567                                play_id.to_string(),
568                                None,
569                            ));
570                        }
571                        commands.push(Command::Refer {
572                            caller: String::new(),
573                            callee,
574                            options: None,
575                        });
576                        buffer.drain(..mat.end());
577                    }
578                    CommandKind::Play => {
579                        // Play audio
580                        let caps = RE_PLAY.captures(buffer).unwrap();
581                        let mat = caps.get(0).unwrap();
582                        let url = caps.get(1).unwrap().as_str().to_string();
583
584                        let prefix = buffer[..pos].to_string();
585                        if !prefix.trim().is_empty() {
586                            commands.push(self.create_tts_command_with_id(
587                                prefix,
588                                play_id.to_string(),
589                                None,
590                            ));
591                        }
592                        commands.push(Command::Play {
593                            url,
594                            play_id: None,
595                            auto_hangup: None,
596                            wait_input_timeout: None,
597                        });
598                        buffer.drain(..mat.end());
599                    }
600                    CommandKind::Goto => {
601                        // Goto Scene
602                        let caps = RE_GOTO.captures(buffer).unwrap();
603                        let mat = caps.get(0).unwrap();
604                        let scene_id = caps.get(1).unwrap().as_str().to_string();
605
606                        let prefix = buffer[..pos].to_string();
607                        if !prefix.trim().is_empty() {
608                            commands.push(self.create_tts_command_with_id(
609                                prefix,
610                                play_id.to_string(),
611                                None,
612                            ));
613                        }
614
615                        info!("Switching to scene (from stream): {}", scene_id);
616                        if let Some(scene) = self.scenes.get(&scene_id) {
617                            self.current_scene_id = Some(scene_id);
618                            // Update system prompt in history
619                            let system_prompt =
620                                Self::build_system_prompt(&self.config, Some(&scene.prompt));
621                            if let Some(first_msg) = self.history.get_mut(0) {
622                                if first_msg.role == "system" {
623                                    first_msg.content = system_prompt;
624                                }
625                            }
626                        } else {
627                            warn!("Scene not found: {}", scene_id);
628                        }
629
630                        buffer.drain(..mat.end());
631                    }
632                    CommandKind::Sentence => {
633                        // Sentence
634                        let mat = sentence_pos.unwrap();
635                        let sentence = buffer[..mat.end()].to_string();
636                        if !sentence.trim().is_empty() {
637                            commands.push(self.create_tts_command_with_id(
638                                sentence,
639                                play_id.to_string(),
640                                None,
641                            ));
642                        }
643                        buffer.drain(..mat.end());
644                    }
645                }
646            } else {
647                break;
648            }
649        }
650
651        if is_final {
652            let remaining = buffer.trim().to_string();
653            if !remaining.is_empty() {
654                commands.push(self.create_tts_command_with_id(
655                    remaining,
656                    play_id.to_string(),
657                    None,
658                ));
659            }
660            buffer.clear();
661
662            if let Some(last) = commands.last_mut() {
663                if let Command::Tts { end_of_stream, .. } = last {
664                    *end_of_stream = Some(true);
665                }
666            } else if !self.is_hanging_up {
667                commands.push(Command::Tts {
668                    text: "".to_string(),
669                    speaker: None,
670                    play_id: Some(play_id.to_string()),
671                    auto_hangup: None,
672                    streaming: Some(true),
673                    end_of_stream: Some(true),
674                    option: None,
675                    wait_input_timeout: None,
676                    base64: None,
677                    cache_key: None,
678                });
679            }
680        }
681
682        commands
683    }
684
685    fn create_tts_command_with_id(
686        &self,
687        text: String,
688        play_id: String,
689        auto_hangup: Option<bool>,
690    ) -> Command {
691        Command::Tts {
692            text,
693            speaker: None,
694            play_id: Some(play_id),
695            auto_hangup,
696            streaming: Some(true),
697            end_of_stream: None,
698            option: None,
699            wait_input_timeout: Some(10000),
700            base64: None,
701            cache_key: None,
702        }
703    }
704
705    async fn handle_tool_invocation(
706        &mut self,
707        tool: ToolInvocation,
708        tool_commands: &mut Vec<Command>,
709    ) -> Result<bool> {
710        match tool {
711            ToolInvocation::Hangup {
712                ref reason,
713                ref initiator,
714            } => {
715                self.send_debug_event(
716                    "tool_invocation",
717                    json!({
718                        "tool": "Hangup",
719                        "params": {
720                            "reason": reason,
721                            "initiator": initiator,
722                        }
723                    }),
724                );
725                tool_commands.push(Command::Hangup {
726                    reason: reason.clone(),
727                    initiator: initiator.clone(),
728                });
729                Ok(false)
730            }
731            ToolInvocation::Refer {
732                ref caller,
733                ref callee,
734                ref options,
735            } => {
736                self.send_debug_event(
737                    "tool_invocation",
738                    json!({
739                        "tool": "Refer",
740                        "params": {
741                            "caller": caller,
742                            "callee": callee,
743                        }
744                    }),
745                );
746                tool_commands.push(Command::Refer {
747                    caller: caller.clone(),
748                    callee: callee.clone(),
749                    options: options.clone(),
750                });
751                Ok(false)
752            }
753            ToolInvocation::Rag {
754                ref query,
755                ref source,
756            } => {
757                self.handle_rag_tool(query, source).await?;
758                Ok(true)
759            }
760            ToolInvocation::Accept { ref options } => {
761                self.send_debug_event("tool_invocation", json!({ "tool": "Accept" }));
762                tool_commands.push(Command::Accept {
763                    option: options.clone().unwrap_or_default(),
764                });
765                Ok(false)
766            }
767            ToolInvocation::Reject { ref reason, code } => {
768                self.send_debug_event(
769                    "tool_invocation",
770                    json!({
771                        "tool": "Reject",
772                        "params": {
773                            "reason": reason,
774                            "code": code,
775                        }
776                    }),
777                );
778                tool_commands.push(Command::Reject {
779                    reason: reason
780                        .clone()
781                        .unwrap_or_else(|| "Rejected by agent".to_string()),
782                    code,
783                });
784                Ok(false)
785            }
786            ToolInvocation::Http {
787                ref url,
788                ref method,
789                ref body,
790                ref headers,
791            } => {
792                self.handle_http_tool(url, method, body, headers).await?;
793                Ok(true)
794            }
795        }
796    }
797
798    async fn handle_rag_tool(&mut self, query: &str, source: &Option<String>) -> Result<()> {
799        self.send_debug_event(
800            "tool_invocation",
801            json!({
802                "tool": "Rag",
803                "params": {
804                    "query": query,
805                    "source": source,
806                }
807            }),
808        );
809
810        let rag_result = self.rag_retriever.retrieve(query).await?;
811
812        self.send_debug_event(
813            "rag_result",
814            json!({
815                "query": query,
816                "result": rag_result,
817            }),
818        );
819
820        let summary = if let Some(source) = source {
821            format!("[{}] {}", source, rag_result)
822        } else {
823            rag_result
824        };
825
826        self.history.push(ChatMessage {
827            role: "system".to_string(),
828            content: format!("RAG result for {}: {}", query, summary),
829        });
830
831        Ok(())
832    }
833
834    async fn handle_http_tool(
835        &mut self,
836        url: &str,
837        method: &Option<String>,
838        body: &Option<serde_json::Value>,
839        headers: &Option<HashMap<String, String>>,
840    ) -> Result<()> {
841        let method_str = method.as_deref().unwrap_or("GET").to_uppercase();
842        let method =
843            reqwest::Method::from_bytes(method_str.as_bytes()).unwrap_or(reqwest::Method::GET);
844
845        self.send_debug_event(
846            "tool_invocation",
847            json!({
848                "tool": "Http",
849                "params": {
850                    "url": url,
851                    "method": method_str,
852                }
853            }),
854        );
855
856        let mut req = self.client.request(method, url);
857        if let Some(body) = body {
858            req = req.json(body);
859        }
860        if let Some(headers) = headers {
861            for (k, v) in headers {
862                req = req.header(k, v);
863            }
864        }
865
866        match req.send().await {
867            Ok(res) => {
868                let status = res.status();
869                let text = res.text().await.unwrap_or_default();
870                self.history.push(ChatMessage {
871                    role: "system".to_string(),
872                    content: format!("HTTP tool response ({}): {}", status, text),
873                });
874            }
875            Err(e) => {
876                warn!("HTTP tool failed: {}", e);
877                self.history.push(ChatMessage {
878                    role: "system".to_string(),
879                    content: format!("HTTP tool failed: {}", e),
880                });
881            }
882        }
883
884        Ok(())
885    }
886
887    async fn handle_asr_final(&mut self, text: &str) -> Result<Vec<Command>> {
888        if text.trim().is_empty() {
889            return Ok(vec![]);
890        }
891
892        self.apply_context_repair(text);
893        self.apply_rolling_summary().await;
894
895        self.last_asr_final_at = Some(std::time::Instant::now());
896        self.last_interaction_at = std::time::Instant::now();
897        self.is_speaking = false;
898        self.consecutive_follow_ups = 0;
899
900        self.generate_response().await
901    }
902
903    fn apply_context_repair(&mut self, text: &str) {
904        let enable_repair = self
905            .config
906            .features
907            .as_ref()
908            .map(|f| f.contains(&"context_repair".to_string()))
909            .unwrap_or(false);
910
911        if !enable_repair {
912            self.history.push(ChatMessage {
913                role: "user".to_string(),
914                content: text.to_string(),
915            });
916            return;
917        }
918
919        let repair_window_ms = self.config.repair_window_ms.unwrap_or(3000) as u128;
920        let mut merged = false;
921
922        if let Some(last_robot_at) = self.last_robot_msg_at {
923            if last_robot_at.elapsed().as_millis() < repair_window_ms {
924                if let Some(last_msg) = self.history.last() {
925                    if last_msg.role == "assistant" && last_msg.content.chars().count() < 15 {
926                        info!(
927                            "Context Repair: Detected potential fragmentation. Triggering merge."
928                        );
929                        self.history.pop();
930                        if let Some(prev_user) = self.history.last_mut() {
931                            if prev_user.role == "user" {
932                                prev_user.content.push_str(",");
933                                prev_user.content.push_str(text);
934                                merged = true;
935                            }
936                        }
937                    }
938                }
939            }
940        }
941
942        if !merged {
943            self.history.push(ChatMessage {
944                role: "user".to_string(),
945                content: text.to_string(),
946            });
947        }
948    }
949
950    async fn apply_rolling_summary(&mut self) {
951        let enable_summary = self
952            .config
953            .features
954            .as_ref()
955            .map(|f| f.contains(&"rolling_summary".to_string()))
956            .unwrap_or(false);
957
958        if !enable_summary {
959            return;
960        }
961
962        let summary_limit = self.config.summary_limit.unwrap_or(20);
963        if self.history.len() <= summary_limit {
964            return;
965        }
966
967        info!("Rolling Summary: History limit reached. Triggering background summary.");
968        let keep_recent = 6;
969        if self.history.len() <= summary_limit + keep_recent
970            || self.history.len() <= keep_recent + 1
971        {
972            return;
973        }
974
975        let split_idx = self.history.len() - keep_recent;
976        let to_summarize = self.history[1..split_idx].to_vec();
977        let recent = self.history[split_idx..].to_vec();
978
979        let summary_prompt =
980            "Summarize the above conversation so far, focusing on key details and user intent.";
981        let mut summary_req_history = to_summarize;
982        summary_req_history.push(ChatMessage {
983            role: "user".to_string(),
984            content: summary_prompt.to_string(),
985        });
986
987        match self.provider.call(&self.config, &summary_req_history).await {
988            Ok(summary) => {
989                let mut new_history = Vec::new();
990                if let Some(sys) = self.history.first() {
991                    let mut new_sys = sys.clone();
992                    new_sys.content.push_str("\n\n[Previous Context Summary]: ");
993                    new_sys.content.push_str(&summary);
994                    new_history.push(new_sys);
995                }
996                new_history.extend(recent);
997                self.history = new_history;
998                info!(
999                    "Rolling Summary: Applied summary. New history len: {}",
1000                    self.history.len()
1001                );
1002            }
1003            Err(e) => {
1004                warn!("Rolling Summary failed: {}", e);
1005            }
1006        }
1007    }
1008
1009    fn check_interruption(
1010        &mut self,
1011        event: &SessionEvent,
1012        is_filler: &Option<bool>,
1013    ) -> Option<Command> {
1014        let strategy = self.interruption_config.strategy;
1015        let should_check = match (strategy, event) {
1016            (InterruptionStrategy::None, _) => false,
1017            (InterruptionStrategy::Vad, SessionEvent::Speaking { .. }) => true,
1018            (InterruptionStrategy::Asr, SessionEvent::AsrDelta { .. }) => true,
1019            (InterruptionStrategy::Both, _) => true,
1020            _ => false,
1021        };
1022
1023        if !self.is_speaking || self.is_hanging_up || !should_check {
1024            return None;
1025        }
1026
1027        // Protection period check
1028        if let Some(last_start) = self.last_tts_start_at {
1029            let ignore_ms = self.interruption_config.ignore_first_ms.unwrap_or(800);
1030            if last_start.elapsed().as_millis() < ignore_ms as u128 {
1031                return None;
1032            }
1033        }
1034
1035        // Filler word filter
1036        if self.interruption_config.filler_word_filter.unwrap_or(false) {
1037            if let Some(true) = is_filler {
1038                return None;
1039            }
1040            if let SessionEvent::AsrDelta { text, .. } = event {
1041                if is_likely_filler(text) {
1042                    return None;
1043                }
1044            }
1045        }
1046
1047        // Stale event check
1048        if let Some(last_final) = self.last_asr_final_at {
1049            if last_final.elapsed().as_millis() < 500 {
1050                return None;
1051            }
1052        }
1053
1054        info!("Smart interruption detected, stopping playback");
1055        self.is_speaking = false;
1056        Some(Command::Interrupt {
1057            graceful: Some(true),
1058            fade_out_ms: self.interruption_config.volume_fade_ms,
1059        })
1060    }
1061
1062    async fn handle_silence(&mut self) -> Result<Vec<Command>> {
1063        let follow_up_config = if let Some(scene_id) = &self.current_scene_id {
1064            self.scenes
1065                .get(scene_id)
1066                .and_then(|s| s.follow_up)
1067                .or(self.global_follow_up_config)
1068        } else {
1069            self.global_follow_up_config
1070        };
1071
1072        let Some(config) = follow_up_config else {
1073            return Ok(vec![]);
1074        };
1075
1076        if self.is_speaking
1077            || self.last_interaction_at.elapsed().as_millis() < config.timeout as u128
1078        {
1079            return Ok(vec![]);
1080        }
1081
1082        if self.consecutive_follow_ups >= config.max_count {
1083            info!("Max follow-up count reached, hanging up");
1084            return Ok(vec![Command::Hangup {
1085                reason: Some("Max follow-up reached".to_string()),
1086                initiator: Some("system".to_string()),
1087            }]);
1088        }
1089
1090        info!(
1091            "Silence timeout detected ({}ms), triggering follow-up ({}/{})",
1092            self.last_interaction_at.elapsed().as_millis(),
1093            self.consecutive_follow_ups + 1,
1094            config.max_count
1095        );
1096        self.consecutive_follow_ups += 1;
1097        self.last_interaction_at = std::time::Instant::now();
1098        self.generate_response().await
1099    }
1100
1101    async fn handle_function_call(&mut self, name: &str, arguments: &str) -> Result<Vec<Command>> {
1102        info!(
1103            "Function call from Realtime: {} with args {}",
1104            name, arguments
1105        );
1106        let args: serde_json::Value = serde_json::from_str(arguments).unwrap_or_default();
1107
1108        match name {
1109            "hangup_call" => Ok(vec![Command::Hangup {
1110                reason: args["reason"].as_str().map(|s| s.to_string()),
1111                initiator: Some("ai".to_string()),
1112            }]),
1113            "transfer_call" | "refer_call" => {
1114                if let Some(callee) = args["callee"]
1115                    .as_str()
1116                    .or_else(|| args["callee_uri"].as_str())
1117                {
1118                    Ok(vec![Command::Refer {
1119                        caller: String::new(),
1120                        callee: callee.to_string(),
1121                        options: None,
1122                    }])
1123                } else {
1124                    warn!("No callee provided for transfer_call");
1125                    Ok(vec![])
1126                }
1127            }
1128            "goto_scene" => {
1129                if let Some(scene) = args["scene"].as_str() {
1130                    self.switch_to_scene(scene, false).await
1131                } else {
1132                    Ok(vec![])
1133                }
1134            }
1135            _ => {
1136                warn!("Unhandled function call: {}", name);
1137                Ok(vec![])
1138            }
1139        }
1140    }
1141
1142    async fn interpret_response(&mut self, initial: String) -> Result<Vec<Command>> {
1143        let mut tool_commands = Vec::new();
1144        let mut wait_input_timeout = None;
1145        let mut attempts = 0;
1146        let mut raw = initial;
1147
1148        let final_text = loop {
1149            attempts += 1;
1150
1151            let Some(structured) = parse_structured_response(&raw) else {
1152                break Some(raw);
1153            };
1154
1155            if wait_input_timeout.is_none() {
1156                wait_input_timeout = structured.wait_input_timeout;
1157            }
1158
1159            let mut rerun_for_rag = false;
1160            if let Some(tools) = structured.tools {
1161                for tool in tools {
1162                    let needs_rerun = self
1163                        .handle_tool_invocation(tool, &mut tool_commands)
1164                        .await?;
1165                    rerun_for_rag = rerun_for_rag || needs_rerun;
1166                }
1167            }
1168
1169            if !rerun_for_rag {
1170                break structured.text;
1171            }
1172
1173            if attempts >= MAX_RAG_ATTEMPTS {
1174                warn!("Reached RAG iteration limit, using last response");
1175                break structured.text.or(Some(raw));
1176            }
1177
1178            raw = self.call_llm().await?;
1179        };
1180
1181        let has_hangup = tool_commands
1182            .iter()
1183            .any(|c| matches!(c, Command::Hangup { .. }));
1184        let mut commands = Vec::new();
1185
1186        if let Some(text) = final_text {
1187            if !text.trim().is_empty() {
1188                self.history.push(ChatMessage {
1189                    role: "assistant".to_string(),
1190                    content: text.clone(),
1191                });
1192                self.last_tts_start_at = Some(std::time::Instant::now());
1193                self.is_speaking = true;
1194
1195                let auto_hangup = has_hangup.then_some(true);
1196                commands.push(self.create_tts_command(text, wait_input_timeout, auto_hangup));
1197
1198                if has_hangup {
1199                    tool_commands.retain(|c| !matches!(c, Command::Hangup { .. }));
1200                    self.is_hanging_up = true;
1201                }
1202            }
1203        }
1204
1205        commands.extend(tool_commands);
1206        Ok(commands)
1207    }
1208}
1209
1210fn parse_structured_response(raw: &str) -> Option<StructuredResponse> {
1211    let payload = extract_json_block(raw)?;
1212    serde_json::from_str(payload).ok()
1213}
1214
1215fn is_likely_filler(text: &str) -> bool {
1216    let trimmed = text.trim().to_lowercase();
1217    FILLERS.contains(&trimmed)
1218}
1219
1220fn extract_json_block(raw: &str) -> Option<&str> {
1221    let trimmed = raw.trim();
1222    if trimmed.starts_with('`') {
1223        if let Some(end) = trimmed.rfind("```") {
1224            if end <= 3 {
1225                return None;
1226            }
1227            let mut inner = &trimmed[3..end];
1228            inner = inner.trim();
1229            if inner.to_lowercase().starts_with("json") {
1230                if let Some(newline) = inner.find('\n') {
1231                    inner = inner[newline + 1..].trim();
1232                } else if inner.len() > 4 {
1233                    inner = inner[4..].trim();
1234                } else {
1235                    inner = inner.trim();
1236                }
1237            }
1238            return Some(inner);
1239        }
1240    } else if trimmed.starts_with('{') || trimmed.starts_with('[') {
1241        return Some(trimmed);
1242    }
1243    None
1244}
1245
1246#[async_trait]
1247impl DialogueHandler for LlmHandler {
1248    async fn on_start(&mut self) -> Result<Vec<Command>> {
1249        self.last_tts_start_at = Some(std::time::Instant::now());
1250
1251        let mut commands = Vec::new();
1252
1253        // Check if current scene has an audio file to play
1254        if let Some(scene_id) = &self.current_scene_id {
1255            if let Some(scene) = self.scenes.get(scene_id) {
1256                if let Some(audio_file) = &scene.play {
1257                    commands.push(Command::Play {
1258                        url: audio_file.clone(),
1259                        play_id: None,
1260                        auto_hangup: None,
1261                        wait_input_timeout: None,
1262                    });
1263                }
1264            }
1265        }
1266
1267        if let Some(greeting) = &self.config.greeting {
1268            self.is_speaking = true;
1269            commands.push(self.create_tts_command(greeting.clone(), None, None));
1270            return Ok(commands);
1271        }
1272
1273        let response_commands = self.generate_response().await?;
1274        commands.extend(response_commands);
1275        Ok(commands)
1276    }
1277
1278    async fn on_event(&mut self, event: &SessionEvent) -> Result<Vec<Command>> {
1279        match event {
1280            SessionEvent::Dtmf { digit, .. } => {
1281                info!("DTMF received: {}", digit);
1282                if let Some(action) = self.get_dtmf_action(digit) {
1283                    self.handle_dtmf_action(action).await
1284                } else {
1285                    Ok(vec![])
1286                }
1287            }
1288            SessionEvent::AsrFinal { text, .. } => self.handle_asr_final(text).await,
1289            SessionEvent::AsrDelta { is_filler, .. } | SessionEvent::Speaking { is_filler, .. } => {
1290                Ok(self
1291                    .check_interruption(event, is_filler)
1292                    .into_iter()
1293                    .collect())
1294            }
1295            SessionEvent::Eou { completed, .. } => {
1296                if *completed && !self.is_speaking {
1297                    info!("EOU detected, triggering early response");
1298                    self.generate_response().await
1299                } else {
1300                    Ok(vec![])
1301                }
1302            }
1303            SessionEvent::Silence { .. } => self.handle_silence().await,
1304            SessionEvent::TrackStart { .. } => {
1305                self.is_speaking = true;
1306                Ok(vec![])
1307            }
1308            SessionEvent::TrackEnd { .. } => {
1309                self.is_speaking = false;
1310                self.is_hanging_up = false;
1311                self.last_interaction_at = std::time::Instant::now();
1312                Ok(vec![])
1313            }
1314            SessionEvent::FunctionCall {
1315                name, arguments, ..
1316            } => self.handle_function_call(name, arguments).await,
1317            _ => Ok(vec![]),
1318        }
1319    }
1320
1321    async fn get_history(&self) -> Vec<ChatMessage> {
1322        self.history.clone()
1323    }
1324
1325    async fn summarize(&mut self, prompt: &str) -> Result<String> {
1326        info!("Generating summary with prompt: {}", prompt);
1327        let mut summary_history = self.history.clone();
1328        summary_history.push(ChatMessage {
1329            role: "user".to_string(),
1330            content: prompt.to_string(),
1331        });
1332
1333        self.provider.call(&self.config, &summary_history).await
1334    }
1335}