Skip to main content

active_call/playbook/handler/
mod.rs

1use crate::call::Command;
2use crate::event::SessionEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use futures::StreamExt;
6use once_cell::sync::Lazy;
7use regex::Regex;
8use reqwest::Client;
9use serde_json::json;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tracing::{info, warn};
13
14#[cfg(test)]
15mod tests;
16
17#[cfg(test)]
18mod dtmf_collector_tests;
19
20static RE_HANGUP: Lazy<Regex> = Lazy::new(|| Regex::new(r"<hangup\s*/>").unwrap());
21static RE_REFER: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<refer\s+to="([^"]+)"\s*/>"#).unwrap());
22static RE_PLAY: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<play\s+file="([^"]+)"\s*/>"#).unwrap());
23static RE_GOTO: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<goto\s+scene="([^"]+)"\s*/>"#).unwrap());
24static RE_SET_VAR: Lazy<Regex> =
25    Lazy::new(|| Regex::new(r#"<set_var\s+key="([^"]+)"\s+value=["'](.+?)["']\s*/>"#).unwrap());
26static RE_HTTP: Lazy<Regex> = Lazy::new(|| {
27    Regex::new(r#"<http\s+url="([^"]+)"(?:\s+method="([^"]+)")?(?:\s+body="([^"]+)")?\s*/>"#)
28        .unwrap()
29});
30static RE_COLLECT: Lazy<Regex> = Lazy::new(|| {
31    Regex::new(r#"<collect\s+type="([^"]+)"\s+var="([^"]+)"(?:\s+prompt="([^"]*)")?\s*/>"#).unwrap()
32});
33static RE_SENTENCE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)[.!?。!?\n]\s*").unwrap());
34static FILLERS: Lazy<std::collections::HashSet<String>> = Lazy::new(|| {
35    let mut s = std::collections::HashSet::new();
36    let default_fillers = ["嗯", "啊", "哦", "那个", "那个...", "uh", "um", "ah"];
37
38    if let Ok(content) = std::fs::read_to_string("config/fillers.txt") {
39        for line in content.lines() {
40            let trimmed = line.trim().to_lowercase();
41            if !trimmed.is_empty() {
42                s.insert(trimmed);
43            }
44        }
45    }
46
47    if s.is_empty() {
48        for f in default_fillers {
49            s.insert(f.to_string());
50        }
51    }
52    s
53});
54
55use super::ChatMessage;
56use super::InterruptionStrategy;
57use super::LlmConfig;
58use super::dialogue::DialogueHandler;
59
60pub mod provider;
61pub mod rag;
62pub mod types;
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65enum CommandKind {
66    Hangup,
67    Refer,
68    Sentence,
69    Play,
70    Goto,
71    SetVar,
72    Http,
73    Collect,
74}
75
76pub use provider::*;
77pub use rag::*;
78pub use types::*;
79
80const MAX_RAG_ATTEMPTS: usize = 3;
81
82/// Runtime state for an active DTMF digit collection session
83#[derive(Debug, Clone)]
84pub struct CollectorState {
85    /// Name of the collector type being used (key into dtmf_collectors)
86    pub collector_type: String,
87    /// Variable name to store the collected digits
88    pub var_name: String,
89    /// Resolved config from the collector template
90    pub config: super::DtmfCollectorConfig,
91    /// Buffer of collected digits so far
92    pub buffer: String,
93    /// When collection started
94    pub start_time: std::time::Instant,
95    /// When the last digit was received
96    pub last_digit_time: std::time::Instant,
97    /// Number of retries attempted
98    pub retry_count: u32,
99}
100
101pub struct LlmHandler {
102    config: LlmConfig,
103    interruption_config: super::InterruptionConfig,
104    global_follow_up_config: Option<super::FollowUpConfig>,
105    dtmf_config: Option<HashMap<String, super::DtmfAction>>,
106    dtmf_collectors: Option<HashMap<String, super::DtmfCollectorConfig>>,
107    history: Vec<ChatMessage>,
108    provider: Arc<dyn LlmProvider>,
109    rag_retriever: Arc<dyn RagRetriever>,
110    is_speaking: bool,
111    is_hanging_up: bool,
112    consecutive_follow_ups: u32,
113    last_interaction_at: std::time::Instant,
114    event_sender: Option<crate::event::EventSender>,
115    last_asr_final_at: Option<std::time::Instant>,
116    last_tts_start_at: Option<std::time::Instant>,
117    last_robot_msg_at: Option<std::time::Instant>,
118    call: Option<crate::call::ActiveCallRef>,
119    scenes: HashMap<String, super::Scene>,
120    current_scene_id: Option<String>,
121    client: Client,
122    sip_config: Option<crate::SipOption>,
123    /// Active DTMF digit collector state (None when not collecting)
124    collector_state: Option<CollectorState>,
125}
126
127impl LlmHandler {
128    pub fn new(
129        config: LlmConfig,
130        interruption: super::InterruptionConfig,
131        global_follow_up_config: Option<super::FollowUpConfig>,
132        scenes: HashMap<String, super::Scene>,
133        dtmf: Option<HashMap<String, super::DtmfAction>>,
134        dtmf_collectors: Option<HashMap<String, super::DtmfCollectorConfig>>,
135        initial_scene_id: Option<String>,
136        sip_config: Option<crate::SipOption>,
137    ) -> Self {
138        Self::with_provider(
139            config,
140            Arc::new(DefaultLlmProvider::new()),
141            Arc::new(NoopRagRetriever),
142            interruption,
143            global_follow_up_config,
144            scenes,
145            dtmf,
146            dtmf_collectors,
147            initial_scene_id,
148            sip_config,
149        )
150    }
151
152    pub fn with_provider(
153        config: LlmConfig,
154        provider: Arc<dyn LlmProvider>,
155        rag_retriever: Arc<dyn RagRetriever>,
156        interruption: super::InterruptionConfig,
157        global_follow_up_config: Option<super::FollowUpConfig>,
158        scenes: HashMap<String, super::Scene>,
159        dtmf: Option<HashMap<String, super::DtmfAction>>,
160        dtmf_collectors: Option<HashMap<String, super::DtmfCollectorConfig>>,
161        initial_scene_id: Option<String>,
162        sip_config: Option<crate::SipOption>,
163    ) -> Self {
164        let mut history = Vec::new();
165        let system_prompt = Self::build_system_prompt(&config, None, dtmf_collectors.as_ref());
166
167        history.push(ChatMessage {
168            role: "system".to_string(),
169            content: system_prompt,
170        });
171
172        Self {
173            config,
174            interruption_config: interruption,
175            global_follow_up_config,
176            dtmf_config: dtmf,
177            dtmf_collectors,
178            history,
179            provider,
180            rag_retriever,
181            is_speaking: false,
182            is_hanging_up: false,
183            consecutive_follow_ups: 0,
184            last_interaction_at: std::time::Instant::now(),
185            event_sender: None,
186            last_asr_final_at: None,
187            last_tts_start_at: None,
188            last_robot_msg_at: None,
189            call: None,
190            scenes,
191            current_scene_id: initial_scene_id,
192            client: Client::new(),
193            sip_config,
194            collector_state: None,
195        }
196    }
197
198    fn build_system_prompt(
199        config: &LlmConfig,
200        scene_prompt: Option<&str>,
201        dtmf_collectors: Option<&HashMap<String, super::DtmfCollectorConfig>>,
202    ) -> String {
203        let base_prompt =
204            scene_prompt.unwrap_or_else(|| config.prompt.as_deref().unwrap_or_default());
205        let mut features_prompt = String::new();
206
207        if let Some(features) = &config.features {
208            let lang = config.language.as_deref().unwrap_or("zh");
209            for feature in features {
210                match Self::load_feature_snippet(feature, lang) {
211                    Ok(snippet) => {
212                        features_prompt.push_str(&format!("\n- {}", snippet));
213                    }
214                    Err(e) => {
215                        warn!("Failed to load feature snippet {}: {}", feature, e);
216                    }
217                }
218            }
219        }
220
221        let features_section = if features_prompt.is_empty() {
222            String::new()
223        } else {
224            format!("\n\n### Enhanced Capabilities:{}\n", features_prompt)
225        };
226
227        // Load tool instructions - either custom or language-specific default
228        let tool_instructions = if let Some(custom) = &config.tool_instructions {
229            custom.clone()
230        } else {
231            let lang = config.language.as_deref().unwrap_or("zh");
232            Self::load_feature_snippet("tool_instructions", lang)
233                .unwrap_or_else(|_| {
234                    // Fallback to English if loading fails
235                    Self::load_feature_snippet("tool_instructions", "en")
236                        .unwrap_or_else(|_| {
237                            // Ultimate fallback to hardcoded English
238                            "Tool usage instructions:\n\
239                            - To hang up the call, output: <hangup/>\n\
240                            - To transfer the call, output: <refer to=\"sip:xxxx\"/>\n\
241                            - To play an audio file, output: <play file=\"path/to/file.wav\"/>\n\
242                            - To switch to another scene, output: <goto scene=\"scene_id\"/>\n\
243                            - To call an external HTTP API, output JSON:\n\
244                              ```json\n\
245                              {{ \"tools\": [{{ \"name\": \"http\", \"url\": \"...\", \"method\": \"POST\", \"body\": {{ ... }} }}] }}\n\
246                              ```\n\
247                            Please use XML tags for simple actions and JSON blocks for tool calls. \
248                            Output your response in short sentences. Each sentence will be played as soon as it is finished."
249                                .to_string()
250                        })
251                })
252        };
253
254        let collector_section = Self::generate_collector_instructions(dtmf_collectors);
255
256        format!(
257            "{}{}\n\n{}\n{}",
258            base_prompt, features_section, tool_instructions, collector_section
259        )
260    }
261
262    fn load_feature_snippet(feature: &str, lang: &str) -> Result<String> {
263        let path = format!("features/{}.{}.md", feature, lang);
264        let content = std::fs::read_to_string(path)?;
265        Ok(content.trim().to_string())
266    }
267
268    /// Generate LLM prompt instructions for available DTMF digit collectors
269    fn generate_collector_instructions(
270        collectors: Option<&HashMap<String, super::DtmfCollectorConfig>>,
271    ) -> String {
272        let collectors = match collectors {
273            Some(c) if !c.is_empty() => c,
274            _ => return String::new(),
275        };
276
277        let mut doc = String::from("\n### DTMF Digit Collection\n\n");
278        doc.push_str(
279            "When you need to collect numeric input from the user (such as phone numbers, \
280             verification codes, ID numbers, etc.), use the DTMF digit collection command. \
281             This is more accurate than voice recognition for numeric input.\n\n",
282        );
283        doc.push_str("**Usage:** Output the following XML tag to start collecting:\n");
284        doc.push_str(
285            "```\n<collect type=\"TYPE\" var=\"VAR_NAME\" prompt=\"PROMPT_TEXT\" />\n```\n\n",
286        );
287        doc.push_str("- `type`: The collector type (see available types below)\n");
288        doc.push_str("- `var`: Variable name to store the collected digits\n");
289        doc.push_str("- `prompt`: The voice prompt to play before collecting (tell the user what to input)\n\n");
290        doc.push_str("**Available collector types:**\n\n");
291
292        // Sort by key for deterministic output
293        let mut sorted: Vec<_> = collectors.iter().collect();
294        sorted.sort_by_key(|(k, _)| (*k).clone());
295
296        for (name, config) in &sorted {
297            let desc = config.description.as_deref().unwrap_or("No description");
298            let mut details = Vec::new();
299            if let Some(d) = config.digits {
300                details.push(format!("{} digits", d));
301            } else {
302                if let Some(min) = config.min_digits {
303                    details.push(format!("min {} digits", min));
304                }
305                if let Some(max) = config.max_digits {
306                    details.push(format!("max {} digits", max));
307                }
308            }
309            if let Some(fk) = &config.finish_key {
310                details.push(format!("press {} to finish", fk));
311            }
312            let detail_str = if details.is_empty() {
313                String::new()
314            } else {
315                format!(" ({})", details.join(", "))
316            };
317            doc.push_str(&format!("- `{}`: {}{}\n", name, desc, detail_str));
318        }
319
320        doc.push_str("\n**Flow:**\n");
321        doc.push_str("1. You output `<collect .../>` with a voice prompt\n");
322        doc.push_str("2. The system plays your prompt, then enters digit collection mode (voice input is ignored)\n");
323        doc.push_str("3. When collection completes, the system notifies you with the result\n");
324        doc.push_str("4. You can access the collected value via `{{ var_name }}` in subsequent responses\n\n");
325        doc.push_str(
326            "**Important:** During collection the user can only input digits, not speak. ",
327        );
328        doc.push_str("If validation fails, the system will automatically retry. ");
329        doc.push_str("After collection success or failure, continue the conversation naturally.\n");
330
331        doc
332    }
333
334    /// Check if the collector has timed out and handle accordingly.
335    /// Returns commands to execute (e.g., retry prompt or failure notification).
336    pub async fn check_collector_timeout(&mut self) -> Result<Vec<Command>> {
337        let state = match &self.collector_state {
338            Some(s) => s,
339            None => return Ok(vec![]),
340        };
341
342        let timeout_secs = state.config.timeout.unwrap_or(15) as u64;
343        let inter_digit_timeout_secs = state.config.inter_digit_timeout.unwrap_or(5) as u64;
344
345        // Check overall timeout
346        if state.start_time.elapsed().as_secs() >= timeout_secs {
347            info!(
348                "DTMF collector overall timeout ({}s) for var={}",
349                timeout_secs, state.var_name
350            );
351            let var_name = state.var_name.clone();
352            let buffer = state.buffer.clone();
353            let collector_type = state.collector_type.clone();
354            let config = state.config.clone();
355            let retry_count = state.retry_count;
356            self.collector_state = None;
357
358            if !buffer.is_empty() {
359                // Try to validate what we have
360                return self
361                    .do_finish_collection(buffer, var_name, collector_type, config, retry_count)
362                    .await;
363            }
364
365            // Nothing collected - notify LLM
366            self.history.push(ChatMessage {
367                role: "system".to_string(),
368                content: format!(
369                    "[DTMF collection timed out for '{}'. No digits were entered. Please guide the user.]",
370                    var_name
371                ),
372            });
373            return self.generate_response().await;
374        }
375
376        // Check inter-digit timeout (only if we have some digits)
377        if !state.buffer.is_empty()
378            && state.last_digit_time.elapsed().as_secs() >= inter_digit_timeout_secs
379        {
380            info!(
381                "DTMF collector inter-digit timeout ({}s) for var={}, buffer={}",
382                inter_digit_timeout_secs, state.var_name, state.buffer
383            );
384            let buffer = state.buffer.clone();
385            let var_name = state.var_name.clone();
386            let collector_type = state.collector_type.clone();
387            let config = state.config.clone();
388            let retry_count = state.retry_count;
389            self.collector_state = None;
390            return self
391                .do_finish_collection(buffer, var_name, collector_type, config, retry_count)
392                .await;
393        }
394
395        Ok(vec![])
396    }
397
398    /// Handle a DTMF digit while in collector mode
399    async fn handle_collector_digit(&mut self, digit: &str) -> Result<Vec<Command>> {
400        let state = self.collector_state.as_mut().unwrap();
401
402        // Check if it's the finish key
403        if let Some(ref finish_key) = state.config.finish_key.clone() {
404            if digit == finish_key {
405                info!("DTMF collector: finish key '{}' received", digit);
406                let buffer = state.buffer.clone();
407                let var_name = state.var_name.clone();
408                let collector_type = state.collector_type.clone();
409                let config = state.config.clone();
410                let retry_count = state.retry_count;
411                self.collector_state = None;
412                return self
413                    .do_finish_collection(buffer, var_name, collector_type, config, retry_count)
414                    .await;
415            }
416        }
417
418        // Append digit to buffer
419        state.buffer.push_str(digit);
420        state.last_digit_time = std::time::Instant::now();
421
422        info!(
423            "DTMF collector: digit '{}', buffer now '{}'",
424            digit, state.buffer
425        );
426
427        // Check if we've reached the required digit count
428        let effective_max = state.config.digits.or(state.config.max_digits);
429
430        if let Some(max) = effective_max {
431            if state.buffer.len() >= max as usize {
432                // If no finish_key is configured, auto-complete at max digits
433                if state.config.finish_key.is_none() {
434                    info!("DTMF collector: reached max digits ({})", max);
435                    let buffer = state.buffer.clone();
436                    let var_name = state.var_name.clone();
437                    let collector_type = state.collector_type.clone();
438                    let config = state.config.clone();
439                    let retry_count = state.retry_count;
440                    self.collector_state = None;
441                    return self
442                        .do_finish_collection(buffer, var_name, collector_type, config, retry_count)
443                        .await;
444                }
445            }
446        }
447
448        Ok(vec![])
449    }
450
451    /// Internal: finish collection with full state available
452    async fn do_finish_collection(
453        &mut self,
454        buffer: String,
455        var_name: String,
456        collector_type: String,
457        config: super::DtmfCollectorConfig,
458        retry_count: u32,
459    ) -> Result<Vec<Command>> {
460        // Validate min digits
461        let min = config.digits.or(config.min_digits).unwrap_or(0);
462        if min > 0 && (buffer.len() as u32) < min {
463            return self
464                .retry_or_fail(
465                    collector_type,
466                    config,
467                    retry_count,
468                    var_name,
469                    &format!("Expected at least {} digits, got {}", min, buffer.len()),
470                )
471                .await;
472        }
473
474        // Validate pattern
475        if let Some(validation) = &config.validation {
476            if let Ok(re) = regex::Regex::new(&validation.pattern) {
477                if !re.is_match(&buffer) {
478                    let msg = validation
479                        .error_message
480                        .clone()
481                        .unwrap_or_else(|| "Input format is incorrect".to_string());
482                    return self
483                        .retry_or_fail(collector_type, config, retry_count, var_name, &msg)
484                        .await;
485                }
486            }
487        }
488
489        // Validation passed - store the variable
490        info!(
491            "DTMF collector: successfully collected '{}' for var '{}'",
492            buffer, var_name
493        );
494
495        if let Some(call) = &self.call {
496            let mut state = call.call_state.write().await;
497            let mut extras = state.extras.take().unwrap_or_default();
498            extras.insert(var_name.clone(), serde_json::Value::String(buffer.clone()));
499            state.extras = Some(extras);
500        }
501
502        // Notify LLM of the result
503        self.history.push(ChatMessage {
504            role: "system".to_string(),
505            content: format!("[DTMF collection completed for '{}': {}]", var_name, buffer),
506        });
507
508        // Let LLM continue
509        self.generate_response().await
510    }
511
512    /// Retry collection or fail after max retries
513    async fn retry_or_fail(
514        &mut self,
515        collector_type: String,
516        config: super::DtmfCollectorConfig,
517        retry_count: u32,
518        var_name: String,
519        reason: &str,
520    ) -> Result<Vec<Command>> {
521        let max_retries = config.retry_times.unwrap_or(3);
522
523        if retry_count >= max_retries {
524            info!(
525                "DTMF collector: max retries ({}) reached for var '{}'",
526                max_retries, var_name
527            );
528            self.history.push(ChatMessage {
529                role: "system".to_string(),
530                content: format!(
531                    "[DTMF collection failed for '{}' after {} retries: {}. Please guide the user to try again or use an alternative method.]",
532                    var_name, max_retries, reason
533                ),
534            });
535            return self.generate_response().await;
536        }
537
538        info!(
539            "DTMF collector: retry {}/{} for var '{}': {}",
540            retry_count + 1,
541            max_retries,
542            var_name,
543            reason
544        );
545
546        // Restart collection with incremented retry count
547        let now = std::time::Instant::now();
548        self.collector_state = Some(CollectorState {
549            collector_type,
550            var_name,
551            config: config.clone(),
552            buffer: String::new(),
553            start_time: now,
554            last_digit_time: now,
555            retry_count: retry_count + 1,
556        });
557
558        // Play error message
559        let error_msg = config
560            .validation
561            .as_ref()
562            .and_then(|v| v.error_message.clone())
563            .unwrap_or_else(|| reason.to_string());
564
565        Ok(vec![self.create_tts_command(error_msg, None, None)])
566    }
567
568    /// Start a DTMF collector from an LLM-generated <collect> command
569    fn start_collector(&mut self, collector_type: &str, var_name: &str) -> bool {
570        let config = match &self.dtmf_collectors {
571            Some(collectors) => match collectors.get(collector_type) {
572                Some(c) => c.clone(),
573                None => {
574                    warn!("Unknown DTMF collector type: {}", collector_type);
575                    return false;
576                }
577            },
578            None => {
579                warn!("No DTMF collectors configured");
580                return false;
581            }
582        };
583
584        let now = std::time::Instant::now();
585        self.collector_state = Some(CollectorState {
586            collector_type: collector_type.to_string(),
587            var_name: var_name.to_string(),
588            config,
589            buffer: String::new(),
590            start_time: now,
591            last_digit_time: now,
592            retry_count: 0,
593        });
594
595        info!(
596            "DTMF collector started: type={}, var={}",
597            collector_type, var_name
598        );
599        true
600    }
601
602    /// Returns true if currently in DTMF digit collection mode
603    pub fn is_collecting(&self) -> bool {
604        self.collector_state.is_some()
605    }
606
607    fn get_dtmf_action(&self, digit: &str) -> Option<super::DtmfAction> {
608        if let Some(scene_id) = &self.current_scene_id {
609            if let Some(scene) = self.scenes.get(scene_id) {
610                if let Some(dtmf) = &scene.dtmf {
611                    if let Some(action) = dtmf.get(digit) {
612                        return Some(action.clone());
613                    }
614                }
615            }
616        }
617
618        if let Some(dtmf) = &self.dtmf_config {
619            if let Some(action) = dtmf.get(digit) {
620                return Some(action.clone());
621            }
622        }
623
624        None
625    }
626
627    async fn handle_dtmf_action(&mut self, action: super::DtmfAction) -> Result<Vec<Command>> {
628        match action {
629            super::DtmfAction::Goto { scene } => {
630                info!("DTMF action: switch to scene {}", scene);
631                self.switch_to_scene(&scene, true).await
632            }
633            super::DtmfAction::Transfer { target } => {
634                info!("DTMF action: transfer to {}", target);
635                Ok(vec![Command::Refer {
636                    caller: String::new(),
637                    callee: target,
638                    options: None,
639                }])
640            }
641            super::DtmfAction::Hangup => {
642                info!("DTMF action: hangup");
643                let headers = self.render_sip_headers().await;
644                Ok(vec![Command::Hangup {
645                    reason: Some("DTMF Hangup".to_string()),
646                    initiator: Some("ai".to_string()),
647                    headers,
648                }])
649            }
650        }
651    }
652
653    /// Get current extras (variables) from call_state for dynamic template rendering.
654    async fn get_current_extras(&self) -> HashMap<String, serde_json::Value> {
655        if let Some(call) = &self.call {
656            let state = call.call_state.read().await;
657            state.extras.clone().unwrap_or_default()
658        } else {
659            HashMap::new()
660        }
661    }
662
663    /// Render a scene's prompt template using the latest variables from call_state.
664    /// This enables `set_var` values to be reflected in system prompts dynamically.
665    async fn render_scene_prompt(&self, scene: &super::Scene) -> String {
666        let extras = self.get_current_extras().await;
667        super::render_scene_prompt(scene, &extras)
668    }
669
670    async fn switch_to_scene(
671        &mut self,
672        scene_id: &str,
673        trigger_response: bool,
674    ) -> Result<Vec<Command>> {
675        if let Some(scene) = self.scenes.get(scene_id).cloned() {
676            info!("Switching to scene: {}", scene_id);
677            self.current_scene_id = Some(scene_id.to_string());
678            // Dynamically render the scene prompt with the latest variables
679            let rendered_prompt = self.render_scene_prompt(&scene).await;
680            let system_prompt = Self::build_system_prompt(
681                &self.config,
682                Some(&rendered_prompt),
683                self.dtmf_collectors.as_ref(),
684            );
685            if let Some(first_msg) = self.history.get_mut(0) {
686                if first_msg.role == "system" {
687                    first_msg.content = system_prompt;
688                }
689            }
690
691            let mut commands = Vec::new();
692            if let Some(url) = &scene.play {
693                commands.push(Command::Play {
694                    url: url.clone(),
695                    play_id: None,
696                    auto_hangup: None,
697                    wait_input_timeout: None,
698                });
699            }
700
701            if trigger_response {
702                let response_cmds = self.generate_response().await?;
703                commands.extend(response_cmds);
704            }
705            Ok(commands)
706        } else {
707            warn!("Scene not found: {}", scene_id);
708            Ok(vec![])
709        }
710    }
711
712    pub fn get_history_ref(&self) -> &[ChatMessage] {
713        &self.history
714    }
715
716    pub fn get_current_scene_id(&self) -> Option<String> {
717        self.current_scene_id.clone()
718    }
719
720    pub fn set_call(&mut self, call: crate::call::ActiveCallRef) {
721        self.call = Some(call);
722    }
723
724    pub fn set_event_sender(&mut self, sender: crate::event::EventSender) {
725        self.event_sender = Some(sender.clone());
726        if let Some(greeting) = &self.config.greeting {
727            let _ = sender.send(crate::event::SessionEvent::AddHistory {
728                sender: Some("system".to_string()),
729                timestamp: crate::media::get_timestamp(),
730                speaker: "assistant".to_string(),
731                text: greeting.clone(),
732            });
733        }
734    }
735
736    fn send_debug_event(&self, key: &str, data: serde_json::Value) {
737        if let Some(sender) = &self.event_sender {
738            let timestamp = crate::media::get_timestamp();
739            if key == "llm_response" {
740                if let Some(text) = data.get("response").and_then(|v| v.as_str()) {
741                    let _ = sender.send(crate::event::SessionEvent::AddHistory {
742                        sender: Some("llm".to_string()),
743                        timestamp,
744                        speaker: "assistant".to_string(),
745                        text: text.to_string(),
746                    });
747                }
748            }
749
750            let event = crate::event::SessionEvent::Metrics {
751                timestamp,
752                key: key.to_string(),
753                duration: 0,
754                data,
755            };
756            let _ = sender.send(event);
757        }
758    }
759
760    async fn call_llm(&self) -> Result<String> {
761        self.provider.call(&self.config, &self.history).await
762    }
763
764    fn create_tts_command(
765        &self,
766        text: String,
767        wait_input_timeout: Option<u32>,
768        auto_hangup: Option<bool>,
769    ) -> Command {
770        let timeout = wait_input_timeout.unwrap_or(10000);
771        let play_id = uuid::Uuid::new_v4().to_string();
772
773        if let Some(sender) = &self.event_sender {
774            let _ = sender.send(crate::event::SessionEvent::Metrics {
775                timestamp: crate::media::get_timestamp(),
776                key: "tts_play_id_map".to_string(),
777                duration: 0,
778                data: serde_json::json!({
779                    "playId": play_id,
780                    "text": text,
781                }),
782            });
783        }
784
785        Command::Tts {
786            text,
787            speaker: None,
788            play_id: Some(play_id),
789            auto_hangup,
790            streaming: None,
791            end_of_stream: Some(true),
792            option: None,
793            wait_input_timeout: Some(timeout),
794            base64: None,
795            cache_key: None,
796        }
797    }
798
799    async fn generate_response(&mut self) -> Result<Vec<Command>> {
800        let start_time = crate::media::get_timestamp();
801        let play_id = uuid::Uuid::new_v4().to_string();
802
803        // Send debug event - LLM call started
804        self.send_debug_event(
805            "llm_call_start",
806            json!({
807                "history_length": self.history.len(),
808                "playId": play_id,
809            }),
810        );
811
812        let mut stream = self
813            .provider
814            .call_stream(&self.config, &self.history)
815            .await?;
816
817        let mut full_content = String::new();
818        let mut full_reasoning = String::new();
819        let mut buffer = String::new();
820        let mut commands = Vec::new();
821        let mut is_json_mode = false;
822        let mut checked_json_mode = false;
823        let mut first_token_time = None;
824
825        while let Some(chunk_result) = stream.next().await {
826            let event = match chunk_result {
827                Ok(c) => c,
828                Err(e) => {
829                    warn!("LLM stream error: {}", e);
830                    break;
831                }
832            };
833
834            match event {
835                LlmStreamEvent::Reasoning(text) => {
836                    full_reasoning.push_str(&text);
837                }
838                LlmStreamEvent::Content(chunk) => {
839                    if first_token_time.is_none() && !chunk.trim().is_empty() {
840                        first_token_time = Some(crate::media::get_timestamp());
841                    }
842
843                    full_content.push_str(&chunk);
844                    buffer.push_str(&chunk);
845
846                    if !checked_json_mode {
847                        let trimmed = full_content.trim();
848                        if !trimmed.is_empty() {
849                            if trimmed.starts_with('{') || trimmed.starts_with('`') {
850                                is_json_mode = true;
851                            }
852                            checked_json_mode = true;
853                        }
854                    }
855
856                    if checked_json_mode && !is_json_mode {
857                        let extracted = self
858                            .extract_streaming_commands(&mut buffer, &play_id, false)
859                            .await;
860                        for cmd in extracted {
861                            if let Some(call) = &self.call {
862                                let _ = call.enqueue_command(cmd).await;
863                            } else {
864                                commands.push(cmd);
865                            }
866                        }
867                    }
868                }
869            }
870        }
871
872        // Send debug event - LLM response received
873        let end_time = crate::media::get_timestamp();
874        self.send_debug_event(
875            "llm_response",
876            json!({
877                "response": full_content,
878                "reasoning": full_reasoning,
879                "is_json_mode": is_json_mode,
880                "duration": end_time - start_time,
881                "ttfb": first_token_time.map(|t| t - start_time).unwrap_or(0),
882                "playId": play_id,
883            }),
884        );
885
886        if is_json_mode {
887            self.interpret_response(full_content).await
888        } else {
889            let extracted = self
890                .extract_streaming_commands(&mut buffer, &play_id, true)
891                .await;
892            for cmd in extracted {
893                if let Some(call) = &self.call {
894                    let _ = call.enqueue_command(cmd).await;
895                } else {
896                    commands.push(cmd);
897                }
898            }
899            if !full_content.trim().is_empty() {
900                self.history.push(ChatMessage {
901                    role: "assistant".to_string(),
902                    content: full_content,
903                });
904                self.last_robot_msg_at = Some(std::time::Instant::now());
905                self.is_speaking = true;
906                self.last_tts_start_at = Some(std::time::Instant::now());
907            }
908            Ok(commands)
909        }
910    }
911
912    async fn extract_streaming_commands(
913        &mut self,
914        buffer: &mut String,
915        play_id: &str,
916        is_final: bool,
917    ) -> Vec<Command> {
918        let mut commands = Vec::new();
919        let mut pending_hangup: Option<(String, usize)> = None; // Store hangup prefix and position
920
921        loop {
922            let hangup_pos = RE_HANGUP.find(buffer);
923            let refer_pos = RE_REFER.captures(buffer);
924            let play_pos = RE_PLAY.captures(buffer);
925            let goto_pos = RE_GOTO.captures(buffer);
926            let set_var_pos = RE_SET_VAR.captures(buffer);
927            let http_pos = RE_HTTP.captures(buffer);
928            let collect_pos = RE_COLLECT.captures(buffer);
929            let sentence_pos = RE_SENTENCE.find(buffer);
930
931            // Find the first occurrence
932            let mut positions: Vec<(usize, CommandKind)> = Vec::new();
933            if let Some(m) = hangup_pos {
934                positions.push((m.start(), CommandKind::Hangup));
935            }
936            if let Some(caps) = &refer_pos {
937                positions.push((caps.get(0).unwrap().start(), CommandKind::Refer));
938            }
939            if let Some(caps) = &play_pos {
940                positions.push((caps.get(0).unwrap().start(), CommandKind::Play));
941            }
942            if let Some(caps) = &goto_pos {
943                positions.push((caps.get(0).unwrap().start(), CommandKind::Goto));
944            }
945            if let Some(caps) = &set_var_pos {
946                positions.push((caps.get(0).unwrap().start(), CommandKind::SetVar));
947            }
948            if let Some(caps) = &http_pos {
949                positions.push((caps.get(0).unwrap().start(), CommandKind::Http));
950            }
951            if let Some(caps) = &collect_pos {
952                positions.push((caps.get(0).unwrap().start(), CommandKind::Collect));
953            }
954            if let Some(m) = sentence_pos {
955                positions.push((m.start(), CommandKind::Sentence));
956            }
957
958            positions.sort_by_key(|p| p.0);
959
960            if let Some((pos, kind)) = positions.first() {
961                let pos = *pos;
962                match kind {
963                    CommandKind::SetVar => {
964                        let caps = RE_SET_VAR.captures(buffer).unwrap();
965                        let mat = caps.get(0).unwrap();
966                        let key = caps.get(1).unwrap().as_str().to_string();
967                        let value = caps.get(2).unwrap().as_str().to_string();
968
969                        let prefix = buffer[..pos].to_string();
970                        if !prefix.trim().is_empty() {
971                            commands.push(self.create_tts_command_with_id(
972                                prefix,
973                                play_id.to_string(),
974                                None,
975                            ));
976                        }
977
978                        if let Some(call) = &self.call {
979                            let mut state = call.call_state.write().await;
980                            let mut extras = state.extras.take().unwrap_or_default();
981                            extras.insert(key, serde_json::Value::String(value));
982                            state.extras = Some(extras);
983                        }
984
985                        buffer.drain(..mat.end());
986                    }
987                    CommandKind::Http => {
988                        let caps = RE_HTTP.captures(buffer).unwrap();
989                        let mat = caps.get(0).unwrap();
990                        let url = caps.get(1).unwrap().as_str().to_string();
991                        let method = caps
992                            .get(2)
993                            .map(|m| m.as_str().to_string())
994                            .unwrap_or("GET".to_string());
995                        let body = caps.get(3).map(|m| m.as_str().to_string());
996
997                        // Flush TTS
998                        let prefix = buffer[..pos].to_string();
999                        if !prefix.trim().is_empty() {
1000                            commands.push(self.create_tts_command_with_id(
1001                                prefix,
1002                                play_id.to_string(),
1003                                None,
1004                            ));
1005                        }
1006
1007                        // Execute HTTP request synchronously and capture response
1008                        let client = self.client.clone();
1009                        let mut req = match method.to_uppercase().as_str() {
1010                            "POST" => client.post(&url),
1011                            "PUT" => client.put(&url),
1012                            _ => client.get(&url),
1013                        };
1014
1015                        if let Some(b) = body {
1016                            req = req.body(b);
1017                        }
1018
1019                        // Send request and wait for response
1020                        match req.send().await {
1021                            Ok(res) => {
1022                                let status = res.status();
1023                                let text = res.text().await.unwrap_or_default();
1024                                info!(url, method, status=?status, "HTTP command executed from stream");
1025
1026                                // Add response to history for LLM context
1027                                self.history.push(ChatMessage {
1028                                    role: "system".to_string(),
1029                                    content: format!(
1030                                        "HTTP {} {} returned ({}): {}",
1031                                        method, url, status, text
1032                                    ),
1033                                });
1034                            }
1035                            Err(e) => {
1036                                warn!(
1037                                    url,
1038                                    method, "Failed to execute HTTP command from stream: {}", e
1039                                );
1040
1041                                // Add error to history
1042                                self.history.push(ChatMessage {
1043                                    role: "system".to_string(),
1044                                    content: format!("HTTP {} {} failed: {}", method, url, e),
1045                                });
1046                            }
1047                        }
1048
1049                        buffer.drain(..mat.end());
1050                    }
1051                    CommandKind::Hangup => {
1052                        // Don't execute hangup immediately, store it for later
1053                        // This allows set_var commands after hangup to still be processed
1054                        let prefix = buffer[..pos].to_string();
1055                        let hangup_match = RE_HANGUP.find(buffer).unwrap();
1056                        pending_hangup = Some((prefix, hangup_match.end()));
1057                        buffer.drain(..hangup_match.end());
1058
1059                        // Continue processing remaining buffer for set_var commands
1060                        // Don't return yet!
1061                    }
1062                    CommandKind::Refer => {
1063                        let caps = RE_REFER.captures(buffer).unwrap();
1064                        let mat = caps.get(0).unwrap();
1065                        let callee = caps.get(1).unwrap().as_str().to_string();
1066
1067                        let prefix = buffer[..pos].to_string();
1068                        if !prefix.trim().is_empty() {
1069                            commands.push(self.create_tts_command_with_id(
1070                                prefix,
1071                                play_id.to_string(),
1072                                None,
1073                            ));
1074                        }
1075                        commands.push(Command::Refer {
1076                            caller: String::new(),
1077                            callee,
1078                            options: None,
1079                        });
1080                        buffer.drain(..mat.end());
1081                    }
1082                    CommandKind::Play => {
1083                        // Play audio
1084                        let caps = RE_PLAY.captures(buffer).unwrap();
1085                        let mat = caps.get(0).unwrap();
1086                        let url = caps.get(1).unwrap().as_str().to_string();
1087
1088                        let prefix = buffer[..pos].to_string();
1089                        if !prefix.trim().is_empty() {
1090                            commands.push(self.create_tts_command_with_id(
1091                                prefix,
1092                                play_id.to_string(),
1093                                None,
1094                            ));
1095                        }
1096                        commands.push(Command::Play {
1097                            url,
1098                            play_id: None,
1099                            auto_hangup: None,
1100                            wait_input_timeout: None,
1101                        });
1102                        buffer.drain(..mat.end());
1103                    }
1104                    CommandKind::Goto => {
1105                        // Goto Scene
1106                        let caps = RE_GOTO.captures(buffer).unwrap();
1107                        let mat = caps.get(0).unwrap();
1108                        let scene_id = caps.get(1).unwrap().as_str().to_string();
1109
1110                        let prefix = buffer[..pos].to_string();
1111                        if !prefix.trim().is_empty() {
1112                            commands.push(self.create_tts_command_with_id(
1113                                prefix,
1114                                play_id.to_string(),
1115                                None,
1116                            ));
1117                        }
1118
1119                        info!("Switching to scene (from stream): {}", scene_id);
1120                        if let Some(scene) = self.scenes.get(&scene_id).cloned() {
1121                            self.current_scene_id = Some(scene_id);
1122                            // Dynamically render scene prompt with the latest variables
1123                            let rendered_prompt = self.render_scene_prompt(&scene).await;
1124                            // Update system prompt in history
1125                            let system_prompt = Self::build_system_prompt(
1126                                &self.config,
1127                                Some(&rendered_prompt),
1128                                self.dtmf_collectors.as_ref(),
1129                            );
1130                            if let Some(first_msg) = self.history.get_mut(0) {
1131                                if first_msg.role == "system" {
1132                                    first_msg.content = system_prompt;
1133                                }
1134                            }
1135                        } else {
1136                            warn!("Scene not found: {}", scene_id);
1137                        }
1138
1139                        buffer.drain(..mat.end());
1140                    }
1141                    CommandKind::Collect => {
1142                        let caps = RE_COLLECT.captures(buffer).unwrap();
1143                        let mat = caps.get(0).unwrap();
1144                        let collector_type = caps.get(1).unwrap().as_str().to_string();
1145                        let var_name = caps.get(2).unwrap().as_str().to_string();
1146                        let prompt = caps.get(3).map(|m| m.as_str().to_string());
1147
1148                        // Flush any text before the <collect> tag as TTS
1149                        let prefix = buffer[..pos].to_string();
1150                        if !prefix.trim().is_empty() {
1151                            commands.push(self.create_tts_command_with_id(
1152                                prefix,
1153                                play_id.to_string(),
1154                                None,
1155                            ));
1156                        }
1157
1158                        // Play the collector prompt if provided
1159                        if let Some(p) = prompt {
1160                            if !p.trim().is_empty() {
1161                                commands.push(self.create_tts_command(p, None, None));
1162                            }
1163                        }
1164
1165                        // Start the collector
1166                        if !self.start_collector(&collector_type, &var_name) {
1167                            // Collector type not found, notify LLM
1168                            self.history.push(ChatMessage {
1169                                role: "system".to_string(),
1170                                content: format!(
1171                                    "[Unknown DTMF collector type '{}'. Available types: {}]",
1172                                    collector_type,
1173                                    self.dtmf_collectors
1174                                        .as_ref()
1175                                        .map(|c| c.keys().cloned().collect::<Vec<_>>().join(", "))
1176                                        .unwrap_or_default()
1177                                ),
1178                            });
1179                        }
1180
1181                        buffer.drain(..mat.end());
1182                    }
1183                    CommandKind::Sentence => {
1184                        // Sentence
1185                        let mat = sentence_pos.unwrap();
1186                        let sentence = buffer[..mat.end()].to_string();
1187                        if !sentence.trim().is_empty() {
1188                            commands.push(self.create_tts_command_with_id(
1189                                sentence,
1190                                play_id.to_string(),
1191                                None,
1192                            ));
1193                        }
1194                        buffer.drain(..mat.end());
1195                    }
1196                }
1197            } else {
1198                break;
1199            }
1200        }
1201
1202        // Process pending hangup after all other commands (especially set_var)
1203        if let Some((prefix, _)) = pending_hangup {
1204            let headers = self.render_sip_headers().await;
1205
1206            if let Some(call) = &self.call {
1207                let h_val = serde_json::to_value(&headers).unwrap_or_default();
1208                let mut state = call.call_state.write().await;
1209                let mut extras = state.extras.take().unwrap_or_default();
1210                extras.insert("_hangup_headers".to_string(), h_val);
1211                state.extras = Some(extras);
1212            }
1213
1214            if !prefix.trim().is_empty() {
1215                let mut cmd =
1216                    self.create_tts_command_with_id(prefix, play_id.to_string(), Some(true));
1217                if let Command::Tts { end_of_stream, .. } = &mut cmd {
1218                    *end_of_stream = Some(true);
1219                }
1220                self.is_hanging_up = true;
1221                commands.push(cmd);
1222            } else {
1223                let mut cmd = self.create_tts_command_with_id(
1224                    "".to_string(),
1225                    play_id.to_string(),
1226                    Some(true),
1227                );
1228                if let Command::Tts { end_of_stream, .. } = &mut cmd {
1229                    *end_of_stream = Some(true);
1230                }
1231                self.is_hanging_up = true;
1232                commands.push(cmd);
1233            }
1234
1235            return commands;
1236        }
1237
1238        if is_final {
1239            let remaining = buffer.trim().to_string();
1240            if !remaining.is_empty() {
1241                commands.push(self.create_tts_command_with_id(
1242                    remaining,
1243                    play_id.to_string(),
1244                    None,
1245                ));
1246            }
1247            buffer.clear();
1248
1249            if let Some(last) = commands.last_mut() {
1250                if let Command::Tts { end_of_stream, .. } = last {
1251                    *end_of_stream = Some(true);
1252                }
1253            } else if !self.is_hanging_up {
1254                commands.push(Command::Tts {
1255                    text: "".to_string(),
1256                    speaker: None,
1257                    play_id: Some(play_id.to_string()),
1258                    auto_hangup: None,
1259                    streaming: Some(true),
1260                    end_of_stream: Some(true),
1261                    option: None,
1262                    wait_input_timeout: None,
1263                    base64: None,
1264                    cache_key: None,
1265                });
1266            }
1267        }
1268
1269        commands
1270    }
1271
1272    fn create_tts_command_with_id(
1273        &self,
1274        text: String,
1275        play_id: String,
1276        auto_hangup: Option<bool>,
1277    ) -> Command {
1278        Command::Tts {
1279            text,
1280            speaker: None,
1281            play_id: Some(play_id),
1282            auto_hangup,
1283            streaming: Some(true),
1284            end_of_stream: None,
1285            option: None,
1286            wait_input_timeout: Some(10000),
1287            base64: None,
1288            cache_key: None,
1289        }
1290    }
1291
1292    async fn handle_tool_invocation(
1293        &mut self,
1294        tool: ToolInvocation,
1295        tool_commands: &mut Vec<Command>,
1296    ) -> Result<bool> {
1297        match tool {
1298            ToolInvocation::Hangup {
1299                ref reason,
1300                ref initiator,
1301            } => {
1302                self.send_debug_event(
1303                    "tool_invocation",
1304                    json!({
1305                        "tool": "Hangup",
1306                        "params": {
1307                            "reason": reason,
1308                            "initiator": initiator,
1309                        }
1310                    }),
1311                );
1312
1313                let headers = self.render_sip_headers().await;
1314
1315                tool_commands.push(Command::Hangup {
1316                    reason: reason.clone(),
1317                    initiator: initiator.clone(),
1318                    headers,
1319                });
1320                Ok(false)
1321            }
1322            ToolInvocation::Refer {
1323                ref caller,
1324                ref callee,
1325                ref options,
1326            } => {
1327                self.send_debug_event(
1328                    "tool_invocation",
1329                    json!({
1330                        "tool": "Refer",
1331                        "params": {
1332                            "caller": caller,
1333                            "callee": callee,
1334                        }
1335                    }),
1336                );
1337                tool_commands.push(Command::Refer {
1338                    caller: caller.clone(),
1339                    callee: callee.clone(),
1340                    options: options.clone(),
1341                });
1342                Ok(false)
1343            }
1344            ToolInvocation::Rag {
1345                ref query,
1346                ref source,
1347            } => {
1348                self.handle_rag_tool(query, source).await?;
1349                Ok(true)
1350            }
1351            ToolInvocation::Accept { ref options } => {
1352                self.send_debug_event("tool_invocation", json!({ "tool": "Accept" }));
1353                tool_commands.push(Command::Accept {
1354                    option: options.clone().unwrap_or_default(),
1355                });
1356                Ok(false)
1357            }
1358            ToolInvocation::Reject { ref reason, code } => {
1359                self.send_debug_event(
1360                    "tool_invocation",
1361                    json!({
1362                        "tool": "Reject",
1363                        "params": {
1364                            "reason": reason,
1365                            "code": code,
1366                        }
1367                    }),
1368                );
1369                tool_commands.push(Command::Reject {
1370                    reason: reason
1371                        .clone()
1372                        .unwrap_or_else(|| "Rejected by agent".to_string()),
1373                    code,
1374                });
1375                Ok(false)
1376            }
1377            ToolInvocation::Http {
1378                ref url,
1379                ref method,
1380                ref body,
1381                ref headers,
1382            } => {
1383                self.handle_http_tool(url, method, body, headers).await?;
1384                Ok(true)
1385            }
1386        }
1387    }
1388
1389    async fn render_sip_headers(&self) -> Option<HashMap<String, String>> {
1390        let hangup_template = self.sip_config.as_ref()?.hangup_headers.as_ref()?;
1391        let call = self.call.as_ref()?;
1392        let state = call.call_state.read().await;
1393
1394        let mut context = HashMap::new();
1395        let mut sip_headers = HashMap::new();
1396
1397        // Get the list of SIP header keys stored during extraction
1398        // If not present, sip dict will be empty (no headers were configured for extraction)
1399        let sip_header_keys: Vec<String> = state
1400            .extras
1401            .as_ref()
1402            .and_then(|e| e.get("_sip_header_keys"))
1403            .and_then(|v| serde_json::from_value(v.clone()).ok())
1404            .unwrap_or_default();
1405
1406        if let Some(extras) = &state.extras {
1407            for (k, v) in extras {
1408                // Skip internal keys
1409                if k.starts_with('_') {
1410                    continue;
1411                }
1412                context.insert(k.clone(), v.clone());
1413                // Only include keys that were extracted as SIP headers
1414                if sip_header_keys.contains(k) {
1415                    sip_headers.insert(k.clone(), v.clone());
1416                }
1417            }
1418        }
1419
1420        // Add sip dictionary for template access
1421        context.insert(
1422            "sip".to_string(),
1423            serde_json::to_value(&sip_headers).unwrap_or(serde_json::Value::Null),
1424        );
1425
1426        let env = minijinja::Environment::new();
1427        let mut rendered_headers = HashMap::new();
1428        for (k, v) in hangup_template {
1429            if let Ok(rendered) = env.render_str(v, &context) {
1430                rendered_headers.insert(k.clone(), rendered);
1431            } else {
1432                rendered_headers.insert(k.clone(), v.clone());
1433            }
1434        }
1435        Some(rendered_headers)
1436    }
1437
1438    async fn handle_rag_tool(&mut self, query: &str, source: &Option<String>) -> Result<()> {
1439        self.send_debug_event(
1440            "tool_invocation",
1441            json!({
1442                "tool": "Rag",
1443                "params": {
1444                    "query": query,
1445                    "source": source,
1446                }
1447            }),
1448        );
1449
1450        let rag_result = self.rag_retriever.retrieve(query).await?;
1451
1452        self.send_debug_event(
1453            "rag_result",
1454            json!({
1455                "query": query,
1456                "result": rag_result,
1457            }),
1458        );
1459
1460        let summary = if let Some(source) = source {
1461            format!("[{}] {}", source, rag_result)
1462        } else {
1463            rag_result
1464        };
1465
1466        self.history.push(ChatMessage {
1467            role: "system".to_string(),
1468            content: format!("RAG result for {}: {}", query, summary),
1469        });
1470
1471        Ok(())
1472    }
1473
1474    async fn handle_http_tool(
1475        &mut self,
1476        url: &str,
1477        method: &Option<String>,
1478        body: &Option<serde_json::Value>,
1479        headers: &Option<HashMap<String, String>>,
1480    ) -> Result<()> {
1481        let method_str = method.as_deref().unwrap_or("GET").to_uppercase();
1482        let method =
1483            reqwest::Method::from_bytes(method_str.as_bytes()).unwrap_or(reqwest::Method::GET);
1484
1485        self.send_debug_event(
1486            "tool_invocation",
1487            json!({
1488                "tool": "Http",
1489                "params": {
1490                    "url": url,
1491                    "method": method_str,
1492                }
1493            }),
1494        );
1495
1496        let mut req = self.client.request(method, url);
1497        if let Some(body) = body {
1498            req = req.json(body);
1499        }
1500        if let Some(headers) = headers {
1501            for (k, v) in headers {
1502                req = req.header(k, v);
1503            }
1504        }
1505
1506        match req.send().await {
1507            Ok(res) => {
1508                let status = res.status();
1509                let text = res.text().await.unwrap_or_default();
1510                self.history.push(ChatMessage {
1511                    role: "system".to_string(),
1512                    content: format!("HTTP tool response ({}): {}", status, text),
1513                });
1514            }
1515            Err(e) => {
1516                warn!("HTTP tool failed: {}", e);
1517                self.history.push(ChatMessage {
1518                    role: "system".to_string(),
1519                    content: format!("HTTP tool failed: {}", e),
1520                });
1521            }
1522        }
1523
1524        Ok(())
1525    }
1526
1527    async fn handle_asr_final(&mut self, text: &str) -> Result<Vec<Command>> {
1528        if text.trim().is_empty() {
1529            return Ok(vec![]);
1530        }
1531
1532        self.apply_context_repair(text);
1533        self.apply_rolling_summary().await;
1534
1535        self.last_asr_final_at = Some(std::time::Instant::now());
1536        self.last_interaction_at = std::time::Instant::now();
1537        self.is_speaking = false;
1538        self.consecutive_follow_ups = 0;
1539
1540        self.generate_response().await
1541    }
1542
1543    fn apply_context_repair(&mut self, text: &str) {
1544        let enable_repair = self
1545            .config
1546            .features
1547            .as_ref()
1548            .map(|f| f.contains(&"context_repair".to_string()))
1549            .unwrap_or(false);
1550
1551        if !enable_repair {
1552            self.history.push(ChatMessage {
1553                role: "user".to_string(),
1554                content: text.to_string(),
1555            });
1556            return;
1557        }
1558
1559        let repair_window_ms = self.config.repair_window_ms.unwrap_or(3000) as u128;
1560        let mut merged = false;
1561
1562        if let Some(last_robot_at) = self.last_robot_msg_at {
1563            if last_robot_at.elapsed().as_millis() < repair_window_ms {
1564                if let Some(last_msg) = self.history.last() {
1565                    if last_msg.role == "assistant" && last_msg.content.chars().count() < 15 {
1566                        info!(
1567                            "Context Repair: Detected potential fragmentation. Triggering merge."
1568                        );
1569                        self.history.pop();
1570                        if let Some(prev_user) = self.history.last_mut() {
1571                            if prev_user.role == "user" {
1572                                prev_user.content.push_str(",");
1573                                prev_user.content.push_str(text);
1574                                merged = true;
1575                            }
1576                        }
1577                    }
1578                }
1579            }
1580        }
1581
1582        if !merged {
1583            self.history.push(ChatMessage {
1584                role: "user".to_string(),
1585                content: text.to_string(),
1586            });
1587        }
1588    }
1589
1590    async fn apply_rolling_summary(&mut self) {
1591        let enable_summary = self
1592            .config
1593            .features
1594            .as_ref()
1595            .map(|f| f.contains(&"rolling_summary".to_string()))
1596            .unwrap_or(false);
1597
1598        if !enable_summary {
1599            return;
1600        }
1601
1602        let summary_limit = self.config.summary_limit.unwrap_or(20);
1603        if self.history.len() <= summary_limit {
1604            return;
1605        }
1606
1607        info!("Rolling Summary: History limit reached. Triggering background summary.");
1608        let keep_recent = 6;
1609        if self.history.len() <= summary_limit + keep_recent
1610            || self.history.len() <= keep_recent + 1
1611        {
1612            return;
1613        }
1614
1615        let split_idx = self.history.len() - keep_recent;
1616        let to_summarize = self.history[1..split_idx].to_vec();
1617        let recent = self.history[split_idx..].to_vec();
1618
1619        let summary_prompt =
1620            "Summarize the above conversation so far, focusing on key details and user intent.";
1621        let mut summary_req_history = to_summarize;
1622        summary_req_history.push(ChatMessage {
1623            role: "user".to_string(),
1624            content: summary_prompt.to_string(),
1625        });
1626
1627        match self.provider.call(&self.config, &summary_req_history).await {
1628            Ok(summary) => {
1629                let mut new_history = Vec::new();
1630                if let Some(sys) = self.history.first() {
1631                    let mut new_sys = sys.clone();
1632                    new_sys.content.push_str("\n\n[Previous Context Summary]: ");
1633                    new_sys.content.push_str(&summary);
1634                    new_history.push(new_sys);
1635                }
1636                new_history.extend(recent);
1637                self.history = new_history;
1638                info!(
1639                    "Rolling Summary: Applied summary. New history len: {}",
1640                    self.history.len()
1641                );
1642            }
1643            Err(e) => {
1644                warn!("Rolling Summary failed: {}", e);
1645            }
1646        }
1647    }
1648
1649    fn check_interruption(
1650        &mut self,
1651        event: &SessionEvent,
1652        is_filler: &Option<bool>,
1653    ) -> Option<Command> {
1654        let strategy = self.interruption_config.strategy;
1655        let should_check = match (strategy, event) {
1656            (InterruptionStrategy::None, _) => false,
1657            (InterruptionStrategy::Vad, SessionEvent::Speaking { .. }) => true,
1658            (InterruptionStrategy::Asr, SessionEvent::AsrDelta { .. }) => true,
1659            (InterruptionStrategy::Both, _) => true,
1660            _ => false,
1661        };
1662
1663        if !self.is_speaking || self.is_hanging_up || !should_check {
1664            return None;
1665        }
1666
1667        // Protection period check
1668        if let Some(last_start) = self.last_tts_start_at {
1669            let ignore_ms = self.interruption_config.ignore_first_ms.unwrap_or(800);
1670            if last_start.elapsed().as_millis() < ignore_ms as u128 {
1671                return None;
1672            }
1673        }
1674
1675        // Filler word filter
1676        if self.interruption_config.filler_word_filter.unwrap_or(false) {
1677            if let Some(true) = is_filler {
1678                return None;
1679            }
1680            if let SessionEvent::AsrDelta { text, .. } = event {
1681                if is_likely_filler(text) {
1682                    return None;
1683                }
1684            }
1685        }
1686
1687        // Stale event check
1688        if let Some(last_final) = self.last_asr_final_at {
1689            if last_final.elapsed().as_millis() < 500 {
1690                return None;
1691            }
1692        }
1693
1694        info!("Smart interruption detected, stopping playback");
1695        self.is_speaking = false;
1696        Some(Command::Interrupt {
1697            graceful: Some(true),
1698            fade_out_ms: self.interruption_config.volume_fade_ms,
1699        })
1700    }
1701
1702    async fn handle_silence(&mut self) -> Result<Vec<Command>> {
1703        let follow_up_config = if let Some(scene_id) = &self.current_scene_id {
1704            self.scenes
1705                .get(scene_id)
1706                .and_then(|s| s.follow_up)
1707                .or(self.global_follow_up_config)
1708        } else {
1709            self.global_follow_up_config
1710        };
1711
1712        let Some(config) = follow_up_config else {
1713            return Ok(vec![]);
1714        };
1715
1716        if self.is_speaking
1717            || self.last_interaction_at.elapsed().as_millis() < config.timeout as u128
1718        {
1719            return Ok(vec![]);
1720        }
1721
1722        if self.consecutive_follow_ups >= config.max_count {
1723            info!("Max follow-up count reached, hanging up");
1724            let headers = self.render_sip_headers().await;
1725            return Ok(vec![Command::Hangup {
1726                reason: Some("Max follow-up reached".to_string()),
1727                initiator: Some("system".to_string()),
1728                headers,
1729            }]);
1730        }
1731
1732        info!(
1733            "Silence timeout detected ({}ms), triggering follow-up ({}/{})",
1734            self.last_interaction_at.elapsed().as_millis(),
1735            self.consecutive_follow_ups + 1,
1736            config.max_count
1737        );
1738        self.consecutive_follow_ups += 1;
1739        self.last_interaction_at = std::time::Instant::now();
1740        self.generate_response().await
1741    }
1742
1743    async fn handle_function_call(&mut self, name: &str, arguments: &str) -> Result<Vec<Command>> {
1744        info!(
1745            "Function call from Realtime: {} with args {}",
1746            name, arguments
1747        );
1748        let args: serde_json::Value = serde_json::from_str(arguments).unwrap_or_default();
1749
1750        match name {
1751            "hangup_call" => {
1752                let headers = self.render_sip_headers().await;
1753                Ok(vec![Command::Hangup {
1754                    reason: args["reason"].as_str().map(|s| s.to_string()),
1755                    initiator: Some("ai".to_string()),
1756                    headers,
1757                }])
1758            }
1759            "transfer_call" | "refer_call" => {
1760                if let Some(callee) = args["callee"]
1761                    .as_str()
1762                    .or_else(|| args["callee_uri"].as_str())
1763                {
1764                    Ok(vec![Command::Refer {
1765                        caller: String::new(),
1766                        callee: callee.to_string(),
1767                        options: None,
1768                    }])
1769                } else {
1770                    warn!("No callee provided for transfer_call");
1771                    Ok(vec![])
1772                }
1773            }
1774            "goto_scene" => {
1775                if let Some(scene) = args["scene"].as_str() {
1776                    self.switch_to_scene(scene, false).await
1777                } else {
1778                    Ok(vec![])
1779                }
1780            }
1781            _ => {
1782                warn!("Unhandled function call: {}", name);
1783                Ok(vec![])
1784            }
1785        }
1786    }
1787
1788    async fn interpret_response(&mut self, initial: String) -> Result<Vec<Command>> {
1789        let mut tool_commands = Vec::new();
1790        let mut wait_input_timeout = None;
1791        let mut attempts = 0;
1792        let mut raw = initial;
1793
1794        let final_text = loop {
1795            attempts += 1;
1796
1797            let Some(structured) = parse_structured_response(&raw) else {
1798                break Some(raw);
1799            };
1800
1801            if wait_input_timeout.is_none() {
1802                wait_input_timeout = structured.wait_input_timeout;
1803            }
1804
1805            let mut rerun_for_rag = false;
1806            if let Some(tools) = structured.tools {
1807                for tool in tools {
1808                    let needs_rerun = self
1809                        .handle_tool_invocation(tool, &mut tool_commands)
1810                        .await?;
1811                    rerun_for_rag = rerun_for_rag || needs_rerun;
1812                }
1813            }
1814
1815            if !rerun_for_rag {
1816                break structured.text;
1817            }
1818
1819            if attempts >= MAX_RAG_ATTEMPTS {
1820                warn!("Reached RAG iteration limit, using last response");
1821                break structured.text.or(Some(raw));
1822            }
1823
1824            raw = self.call_llm().await?;
1825        };
1826
1827        let has_hangup = tool_commands
1828            .iter()
1829            .any(|c| matches!(c, Command::Hangup { .. }));
1830        let mut commands = Vec::new();
1831
1832        if let Some(text) = final_text {
1833            if !text.trim().is_empty() {
1834                self.history.push(ChatMessage {
1835                    role: "assistant".to_string(),
1836                    content: text.clone(),
1837                });
1838                self.last_tts_start_at = Some(std::time::Instant::now());
1839                self.is_speaking = true;
1840
1841                let auto_hangup = has_hangup.then_some(true);
1842                commands.push(self.create_tts_command(text, wait_input_timeout, auto_hangup));
1843
1844                if has_hangup {
1845                    tool_commands.retain(|c| !matches!(c, Command::Hangup { .. }));
1846                    self.is_hanging_up = true;
1847                }
1848            }
1849        }
1850
1851        commands.extend(tool_commands);
1852        Ok(commands)
1853    }
1854}
1855
1856fn parse_structured_response(raw: &str) -> Option<StructuredResponse> {
1857    let payload = extract_json_block(raw)?;
1858    serde_json::from_str(payload).ok()
1859}
1860
1861fn is_likely_filler(text: &str) -> bool {
1862    let trimmed = text.trim().to_lowercase();
1863    FILLERS.contains(&trimmed)
1864}
1865
1866fn extract_json_block(raw: &str) -> Option<&str> {
1867    let trimmed = raw.trim();
1868    if trimmed.starts_with('`') {
1869        if let Some(end) = trimmed.rfind("```") {
1870            if end <= 3 {
1871                return None;
1872            }
1873            let mut inner = &trimmed[3..end];
1874            inner = inner.trim();
1875            if inner.to_lowercase().starts_with("json") {
1876                if let Some(newline) = inner.find('\n') {
1877                    inner = inner[newline + 1..].trim();
1878                } else if inner.len() > 4 {
1879                    inner = inner[4..].trim();
1880                } else {
1881                    inner = inner.trim();
1882                }
1883            }
1884            return Some(inner);
1885        }
1886    } else if trimmed.starts_with('{') || trimmed.starts_with('[') {
1887        return Some(trimmed);
1888    }
1889    None
1890}
1891
1892#[async_trait]
1893impl DialogueHandler for LlmHandler {
1894    async fn on_start(&mut self) -> Result<Vec<Command>> {
1895        self.last_tts_start_at = Some(std::time::Instant::now());
1896
1897        let mut commands = Vec::new();
1898
1899        // Check if current scene has an audio file to play
1900        if let Some(scene_id) = &self.current_scene_id {
1901            if let Some(scene) = self.scenes.get(scene_id) {
1902                if let Some(audio_file) = &scene.play {
1903                    commands.push(Command::Play {
1904                        url: audio_file.clone(),
1905                        play_id: None,
1906                        auto_hangup: None,
1907                        wait_input_timeout: None,
1908                    });
1909                }
1910            }
1911        }
1912
1913        if let Some(greeting) = &self.config.greeting {
1914            self.is_speaking = true;
1915            commands.push(self.create_tts_command(greeting.clone(), None, None));
1916            return Ok(commands);
1917        }
1918
1919        let response_commands = self.generate_response().await?;
1920        commands.extend(response_commands);
1921        Ok(commands)
1922    }
1923
1924    async fn on_event(&mut self, event: &SessionEvent) -> Result<Vec<Command>> {
1925        // When in DTMF collection mode, only handle DTMF events and track lifecycle
1926        if self.collector_state.is_some() {
1927            match event {
1928                SessionEvent::Dtmf { digit, .. } => {
1929                    info!("DTMF received (collecting): {}", digit);
1930                    return self.handle_collector_digit(digit).await;
1931                }
1932                SessionEvent::Silence { .. } => {
1933                    // Check collector timeout on silence events
1934                    return self.check_collector_timeout().await;
1935                }
1936                SessionEvent::TrackEnd { .. } => {
1937                    self.is_speaking = false;
1938                    return Ok(vec![]);
1939                }
1940                SessionEvent::TrackStart { .. } => {
1941                    self.is_speaking = true;
1942                    return Ok(vec![]);
1943                }
1944                SessionEvent::Hangup { .. } => {
1945                    // Allow hangup to pass through
1946                    self.collector_state = None;
1947                }
1948                // Ignore ASR/Speaking/Eou during collection (not interruptible by default)
1949                SessionEvent::AsrFinal { .. }
1950                | SessionEvent::AsrDelta { .. }
1951                | SessionEvent::Speaking { .. }
1952                | SessionEvent::Eou { .. } => {
1953                    let interruptible = self
1954                        .collector_state
1955                        .as_ref()
1956                        .and_then(|s| s.config.interruptible)
1957                        .unwrap_or(false);
1958                    if !interruptible {
1959                        return Ok(vec![]);
1960                    }
1961                    // If interruptible, fall through to normal handling
1962                }
1963                _ => return Ok(vec![]),
1964            }
1965        }
1966
1967        match event {
1968            SessionEvent::Dtmf { digit, .. } => {
1969                info!("DTMF received: {}", digit);
1970                if let Some(action) = self.get_dtmf_action(digit) {
1971                    self.handle_dtmf_action(action).await
1972                } else {
1973                    Ok(vec![])
1974                }
1975            }
1976            SessionEvent::AsrFinal { text, .. } => self.handle_asr_final(text).await,
1977            SessionEvent::AsrDelta { is_filler, .. } | SessionEvent::Speaking { is_filler, .. } => {
1978                Ok(self
1979                    .check_interruption(event, is_filler)
1980                    .into_iter()
1981                    .collect())
1982            }
1983            SessionEvent::Eou { completed, .. } => {
1984                if *completed && !self.is_speaking {
1985                    info!("EOU detected, triggering early response");
1986                    self.generate_response().await
1987                } else {
1988                    Ok(vec![])
1989                }
1990            }
1991            SessionEvent::Silence { .. } => self.handle_silence().await,
1992            SessionEvent::TrackStart { .. } => {
1993                self.is_speaking = true;
1994                Ok(vec![])
1995            }
1996            SessionEvent::TrackEnd { .. } => {
1997                self.is_speaking = false;
1998                self.is_hanging_up = false;
1999                self.last_interaction_at = std::time::Instant::now();
2000                Ok(vec![])
2001            }
2002            SessionEvent::FunctionCall {
2003                name, arguments, ..
2004            } => self.handle_function_call(name, arguments).await,
2005            _ => Ok(vec![]),
2006        }
2007    }
2008
2009    async fn get_history(&self) -> Vec<ChatMessage> {
2010        self.history.clone()
2011    }
2012
2013    async fn summarize(&mut self, prompt: &str) -> Result<String> {
2014        info!("Generating summary with prompt: {}", prompt);
2015        let mut summary_history = self.history.clone();
2016        summary_history.push(ChatMessage {
2017            role: "user".to_string(),
2018            content: prompt.to_string(),
2019        });
2020
2021        self.provider.call(&self.config, &summary_history).await
2022    }
2023}