Skip to main content

active_call/playbook/handler/
mod.rs

1use crate::call::Command;
2use crate::event::SessionEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use futures::StreamExt;
6use once_cell::sync::Lazy;
7use regex::Regex;
8use reqwest::Client;
9use serde_json::json;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tracing::{info, warn};
13
14#[cfg(test)]
15mod tests;
16
17static RE_HANGUP: Lazy<Regex> = Lazy::new(|| Regex::new(r"<hangup\s*/>").unwrap());
18static RE_REFER: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<refer\s+to="([^"]+)"\s*/>"#).unwrap());
19static RE_PLAY: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<play\s+file="([^"]+)"\s*/>"#).unwrap());
20static RE_GOTO: Lazy<Regex> = Lazy::new(|| Regex::new(r#"<goto\s+scene="([^"]+)"\s*/>"#).unwrap());
21static RE_SET_VAR: Lazy<Regex> =
22    Lazy::new(|| Regex::new(r#"<set_var\s+key="([^"]+)"\s+value=["'](.+?)["']\s*/>"#).unwrap());
23static RE_HTTP: Lazy<Regex> = Lazy::new(|| {
24    Regex::new(r#"<http\s+url="([^"]+)"(?:\s+method="([^"]+)")?(?:\s+body="([^"]+)")?\s*/>"#)
25        .unwrap()
26});
27static RE_SENTENCE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)[.!?。!?\n]\s*").unwrap());
28static FILLERS: Lazy<std::collections::HashSet<String>> = Lazy::new(|| {
29    let mut s = std::collections::HashSet::new();
30    let default_fillers = ["嗯", "啊", "哦", "那个", "那个...", "uh", "um", "ah"];
31
32    if let Ok(content) = std::fs::read_to_string("config/fillers.txt") {
33        for line in content.lines() {
34            let trimmed = line.trim().to_lowercase();
35            if !trimmed.is_empty() {
36                s.insert(trimmed);
37            }
38        }
39    }
40
41    if s.is_empty() {
42        for f in default_fillers {
43            s.insert(f.to_string());
44        }
45    }
46    s
47});
48
49use super::ChatMessage;
50use super::InterruptionStrategy;
51use super::LlmConfig;
52use super::dialogue::DialogueHandler;
53
54pub mod provider;
55pub mod rag;
56pub mod types;
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59enum CommandKind {
60    Hangup,
61    Refer,
62    Sentence,
63    Play,
64    Goto,
65    SetVar,
66    Http,
67}
68
69pub use provider::*;
70pub use rag::*;
71pub use types::*;
72
73const MAX_RAG_ATTEMPTS: usize = 3;
74
75pub struct LlmHandler {
76    config: LlmConfig,
77    interruption_config: super::InterruptionConfig,
78    global_follow_up_config: Option<super::FollowUpConfig>,
79    dtmf_config: Option<HashMap<String, super::DtmfAction>>,
80    history: Vec<ChatMessage>,
81    provider: Arc<dyn LlmProvider>,
82    rag_retriever: Arc<dyn RagRetriever>,
83    is_speaking: bool,
84    is_hanging_up: bool,
85    consecutive_follow_ups: u32,
86    last_interaction_at: std::time::Instant,
87    event_sender: Option<crate::event::EventSender>,
88    last_asr_final_at: Option<std::time::Instant>,
89    last_tts_start_at: Option<std::time::Instant>,
90    last_robot_msg_at: Option<std::time::Instant>,
91    call: Option<crate::call::ActiveCallRef>,
92    scenes: HashMap<String, super::Scene>,
93    current_scene_id: Option<String>,
94    client: Client,
95    sip_config: Option<crate::SipOption>,
96}
97
98impl LlmHandler {
99    pub fn new(
100        config: LlmConfig,
101        interruption: super::InterruptionConfig,
102        global_follow_up_config: Option<super::FollowUpConfig>,
103        scenes: HashMap<String, super::Scene>,
104        dtmf: Option<HashMap<String, super::DtmfAction>>,
105        initial_scene_id: Option<String>,
106        sip_config: Option<crate::SipOption>,
107    ) -> Self {
108        Self::with_provider(
109            config,
110            Arc::new(DefaultLlmProvider::new()),
111            Arc::new(NoopRagRetriever),
112            interruption,
113            global_follow_up_config,
114            scenes,
115            dtmf,
116            initial_scene_id,
117            sip_config,
118        )
119    }
120
121    pub fn with_provider(
122        config: LlmConfig,
123        provider: Arc<dyn LlmProvider>,
124        rag_retriever: Arc<dyn RagRetriever>,
125        interruption: super::InterruptionConfig,
126        global_follow_up_config: Option<super::FollowUpConfig>,
127        scenes: HashMap<String, super::Scene>,
128        dtmf: Option<HashMap<String, super::DtmfAction>>,
129        initial_scene_id: Option<String>,
130        sip_config: Option<crate::SipOption>,
131    ) -> Self {
132        let mut history = Vec::new();
133        let system_prompt = Self::build_system_prompt(&config, None);
134
135        history.push(ChatMessage {
136            role: "system".to_string(),
137            content: system_prompt,
138        });
139
140        Self {
141            config,
142            interruption_config: interruption,
143            global_follow_up_config,
144            dtmf_config: dtmf,
145            history,
146            provider,
147            rag_retriever,
148            is_speaking: false,
149            is_hanging_up: false,
150            consecutive_follow_ups: 0,
151            last_interaction_at: std::time::Instant::now(),
152            event_sender: None,
153            last_asr_final_at: None,
154            last_tts_start_at: None,
155            last_robot_msg_at: None,
156            call: None,
157            scenes,
158            current_scene_id: initial_scene_id,
159            client: Client::new(),
160            sip_config,
161        }
162    }
163
164    fn build_system_prompt(config: &LlmConfig, scene_prompt: Option<&str>) -> String {
165        let base_prompt =
166            scene_prompt.unwrap_or_else(|| config.prompt.as_deref().unwrap_or_default());
167        let mut features_prompt = String::new();
168
169        if let Some(features) = &config.features {
170            let lang = config.language.as_deref().unwrap_or("zh");
171            for feature in features {
172                match Self::load_feature_snippet(feature, lang) {
173                    Ok(snippet) => {
174                        features_prompt.push_str(&format!("\n- {}", snippet));
175                    }
176                    Err(e) => {
177                        warn!("Failed to load feature snippet {}: {}", feature, e);
178                    }
179                }
180            }
181        }
182
183        let features_section = if features_prompt.is_empty() {
184            String::new()
185        } else {
186            format!("\n\n### Enhanced Capabilities:{}\n", features_prompt)
187        };
188
189        // Load tool instructions - either custom or language-specific default
190        let tool_instructions = if let Some(custom) = &config.tool_instructions {
191            custom.clone()
192        } else {
193            let lang = config.language.as_deref().unwrap_or("zh");
194            Self::load_feature_snippet("tool_instructions", lang)
195                .unwrap_or_else(|_| {
196                    // Fallback to English if loading fails
197                    Self::load_feature_snippet("tool_instructions", "en")
198                        .unwrap_or_else(|_| {
199                            // Ultimate fallback to hardcoded English
200                            "Tool usage instructions:\n\
201                            - To hang up the call, output: <hangup/>\n\
202                            - To transfer the call, output: <refer to=\"sip:xxxx\"/>\n\
203                            - To play an audio file, output: <play file=\"path/to/file.wav\"/>\n\
204                            - To switch to another scene, output: <goto scene=\"scene_id\"/>\n\
205                            - To call an external HTTP API, output JSON:\n\
206                              ```json\n\
207                              {{ \"tools\": [{{ \"name\": \"http\", \"url\": \"...\", \"method\": \"POST\", \"body\": {{ ... }} }}] }}\n\
208                              ```\n\
209                            Please use XML tags for simple actions and JSON blocks for tool calls. \
210                            Output your response in short sentences. Each sentence will be played as soon as it is finished."
211                                .to_string()
212                        })
213                })
214        };
215
216        format!(
217            "{}{}\n\n{}",
218            base_prompt, features_section, tool_instructions
219        )
220    }
221
222    fn load_feature_snippet(feature: &str, lang: &str) -> Result<String> {
223        let path = format!("features/{}.{}.md", feature, lang);
224        let content = std::fs::read_to_string(path)?;
225        Ok(content.trim().to_string())
226    }
227
228    fn get_dtmf_action(&self, digit: &str) -> Option<super::DtmfAction> {
229        if let Some(scene_id) = &self.current_scene_id {
230            if let Some(scene) = self.scenes.get(scene_id) {
231                if let Some(dtmf) = &scene.dtmf {
232                    if let Some(action) = dtmf.get(digit) {
233                        return Some(action.clone());
234                    }
235                }
236            }
237        }
238
239        if let Some(dtmf) = &self.dtmf_config {
240            if let Some(action) = dtmf.get(digit) {
241                return Some(action.clone());
242            }
243        }
244
245        None
246    }
247
248    async fn handle_dtmf_action(&mut self, action: super::DtmfAction) -> Result<Vec<Command>> {
249        match action {
250            super::DtmfAction::Goto { scene } => {
251                info!("DTMF action: switch to scene {}", scene);
252                self.switch_to_scene(&scene, true).await
253            }
254            super::DtmfAction::Transfer { target } => {
255                info!("DTMF action: transfer to {}", target);
256                Ok(vec![Command::Refer {
257                    caller: String::new(),
258                    callee: target,
259                    options: None,
260                }])
261            }
262            super::DtmfAction::Hangup => {
263                info!("DTMF action: hangup");
264                let headers = self.render_sip_headers().await;
265                Ok(vec![Command::Hangup {
266                    reason: Some("DTMF Hangup".to_string()),
267                    initiator: Some("ai".to_string()),
268                    headers,
269                }])
270            }
271        }
272    }
273
274    async fn switch_to_scene(
275        &mut self,
276        scene_id: &str,
277        trigger_response: bool,
278    ) -> Result<Vec<Command>> {
279        if let Some(scene) = self.scenes.get(scene_id).cloned() {
280            info!("Switching to scene: {}", scene_id);
281            self.current_scene_id = Some(scene_id.to_string());
282            let system_prompt = Self::build_system_prompt(&self.config, Some(&scene.prompt));
283            if let Some(first_msg) = self.history.get_mut(0) {
284                if first_msg.role == "system" {
285                    first_msg.content = system_prompt;
286                }
287            }
288
289            let mut commands = Vec::new();
290            if let Some(url) = &scene.play {
291                commands.push(Command::Play {
292                    url: url.clone(),
293                    play_id: None,
294                    auto_hangup: None,
295                    wait_input_timeout: None,
296                });
297            }
298
299            if trigger_response {
300                let response_cmds = self.generate_response().await?;
301                commands.extend(response_cmds);
302            }
303            Ok(commands)
304        } else {
305            warn!("Scene not found: {}", scene_id);
306            Ok(vec![])
307        }
308    }
309
310    pub fn get_history_ref(&self) -> &[ChatMessage] {
311        &self.history
312    }
313
314    pub fn get_current_scene_id(&self) -> Option<String> {
315        self.current_scene_id.clone()
316    }
317
318    pub fn set_call(&mut self, call: crate::call::ActiveCallRef) {
319        self.call = Some(call);
320    }
321
322    pub fn set_event_sender(&mut self, sender: crate::event::EventSender) {
323        self.event_sender = Some(sender.clone());
324        if let Some(greeting) = &self.config.greeting {
325            let _ = sender.send(crate::event::SessionEvent::AddHistory {
326                sender: Some("system".to_string()),
327                timestamp: crate::media::get_timestamp(),
328                speaker: "assistant".to_string(),
329                text: greeting.clone(),
330            });
331        }
332    }
333
334    fn send_debug_event(&self, key: &str, data: serde_json::Value) {
335        if let Some(sender) = &self.event_sender {
336            let timestamp = crate::media::get_timestamp();
337            if key == "llm_response" {
338                if let Some(text) = data.get("response").and_then(|v| v.as_str()) {
339                    let _ = sender.send(crate::event::SessionEvent::AddHistory {
340                        sender: Some("llm".to_string()),
341                        timestamp,
342                        speaker: "assistant".to_string(),
343                        text: text.to_string(),
344                    });
345                }
346            }
347
348            let event = crate::event::SessionEvent::Metrics {
349                timestamp,
350                key: key.to_string(),
351                duration: 0,
352                data,
353            };
354            let _ = sender.send(event);
355        }
356    }
357
358    async fn call_llm(&self) -> Result<String> {
359        self.provider.call(&self.config, &self.history).await
360    }
361
362    fn create_tts_command(
363        &self,
364        text: String,
365        wait_input_timeout: Option<u32>,
366        auto_hangup: Option<bool>,
367    ) -> Command {
368        let timeout = wait_input_timeout.unwrap_or(10000);
369        let play_id = uuid::Uuid::new_v4().to_string();
370
371        if let Some(sender) = &self.event_sender {
372            let _ = sender.send(crate::event::SessionEvent::Metrics {
373                timestamp: crate::media::get_timestamp(),
374                key: "tts_play_id_map".to_string(),
375                duration: 0,
376                data: serde_json::json!({
377                    "playId": play_id,
378                    "text": text,
379                }),
380            });
381        }
382
383        Command::Tts {
384            text,
385            speaker: None,
386            play_id: Some(play_id),
387            auto_hangup,
388            streaming: None,
389            end_of_stream: Some(true),
390            option: None,
391            wait_input_timeout: Some(timeout),
392            base64: None,
393            cache_key: None,
394        }
395    }
396
397    async fn generate_response(&mut self) -> Result<Vec<Command>> {
398        let start_time = crate::media::get_timestamp();
399        let play_id = uuid::Uuid::new_v4().to_string();
400
401        // Send debug event - LLM call started
402        self.send_debug_event(
403            "llm_call_start",
404            json!({
405                "history_length": self.history.len(),
406                "playId": play_id,
407            }),
408        );
409
410        let mut stream = self
411            .provider
412            .call_stream(&self.config, &self.history)
413            .await?;
414
415        let mut full_content = String::new();
416        let mut full_reasoning = String::new();
417        let mut buffer = String::new();
418        let mut commands = Vec::new();
419        let mut is_json_mode = false;
420        let mut checked_json_mode = false;
421        let mut first_token_time = None;
422
423        while let Some(chunk_result) = stream.next().await {
424            let event = match chunk_result {
425                Ok(c) => c,
426                Err(e) => {
427                    warn!("LLM stream error: {}", e);
428                    break;
429                }
430            };
431
432            match event {
433                LlmStreamEvent::Reasoning(text) => {
434                    full_reasoning.push_str(&text);
435                }
436                LlmStreamEvent::Content(chunk) => {
437                    if first_token_time.is_none() && !chunk.trim().is_empty() {
438                        first_token_time = Some(crate::media::get_timestamp());
439                    }
440
441                    full_content.push_str(&chunk);
442                    buffer.push_str(&chunk);
443
444                    if !checked_json_mode {
445                        let trimmed = full_content.trim();
446                        if !trimmed.is_empty() {
447                            if trimmed.starts_with('{') || trimmed.starts_with('`') {
448                                is_json_mode = true;
449                            }
450                            checked_json_mode = true;
451                        }
452                    }
453
454                    if checked_json_mode && !is_json_mode {
455                        let extracted = self
456                            .extract_streaming_commands(&mut buffer, &play_id, false)
457                            .await;
458                        for cmd in extracted {
459                            if let Some(call) = &self.call {
460                                let _ = call.enqueue_command(cmd).await;
461                            } else {
462                                commands.push(cmd);
463                            }
464                        }
465                    }
466                }
467            }
468        }
469
470        // Send debug event - LLM response received
471        let end_time = crate::media::get_timestamp();
472        self.send_debug_event(
473            "llm_response",
474            json!({
475                "response": full_content,
476                "reasoning": full_reasoning,
477                "is_json_mode": is_json_mode,
478                "duration": end_time - start_time,
479                "ttfb": first_token_time.map(|t| t - start_time).unwrap_or(0),
480                "playId": play_id,
481            }),
482        );
483
484        if is_json_mode {
485            self.interpret_response(full_content).await
486        } else {
487            let extracted = self
488                .extract_streaming_commands(&mut buffer, &play_id, true)
489                .await;
490            for cmd in extracted {
491                if let Some(call) = &self.call {
492                    let _ = call.enqueue_command(cmd).await;
493                } else {
494                    commands.push(cmd);
495                }
496            }
497            if !full_content.trim().is_empty() {
498                self.history.push(ChatMessage {
499                    role: "assistant".to_string(),
500                    content: full_content,
501                });
502                self.last_robot_msg_at = Some(std::time::Instant::now());
503                self.is_speaking = true;
504                self.last_tts_start_at = Some(std::time::Instant::now());
505            }
506            Ok(commands)
507        }
508    }
509
510    async fn extract_streaming_commands(
511        &mut self,
512        buffer: &mut String,
513        play_id: &str,
514        is_final: bool,
515    ) -> Vec<Command> {
516        let mut commands = Vec::new();
517        let mut pending_hangup: Option<(String, usize)> = None; // Store hangup prefix and position
518
519        loop {
520            let hangup_pos = RE_HANGUP.find(buffer);
521            let refer_pos = RE_REFER.captures(buffer);
522            let play_pos = RE_PLAY.captures(buffer);
523            let goto_pos = RE_GOTO.captures(buffer);
524            let set_var_pos = RE_SET_VAR.captures(buffer);
525            let http_pos = RE_HTTP.captures(buffer);
526            let sentence_pos = RE_SENTENCE.find(buffer);
527
528            // Find the first occurrence
529            let mut positions: Vec<(usize, CommandKind)> = Vec::new();
530            if let Some(m) = hangup_pos {
531                positions.push((m.start(), CommandKind::Hangup));
532            }
533            if let Some(caps) = &refer_pos {
534                positions.push((caps.get(0).unwrap().start(), CommandKind::Refer));
535            }
536            if let Some(caps) = &play_pos {
537                positions.push((caps.get(0).unwrap().start(), CommandKind::Play));
538            }
539            if let Some(caps) = &goto_pos {
540                positions.push((caps.get(0).unwrap().start(), CommandKind::Goto));
541            }
542            if let Some(caps) = &set_var_pos {
543                positions.push((caps.get(0).unwrap().start(), CommandKind::SetVar));
544            }
545            if let Some(caps) = &http_pos {
546                positions.push((caps.get(0).unwrap().start(), CommandKind::Http));
547            }
548            if let Some(m) = sentence_pos {
549                positions.push((m.start(), CommandKind::Sentence));
550            }
551
552            positions.sort_by_key(|p| p.0);
553
554            if let Some((pos, kind)) = positions.first() {
555                let pos = *pos;
556                match kind {
557                    CommandKind::SetVar => {
558                        let caps = RE_SET_VAR.captures(buffer).unwrap();
559                        let mat = caps.get(0).unwrap();
560                        let key = caps.get(1).unwrap().as_str().to_string();
561                        let value = caps.get(2).unwrap().as_str().to_string();
562
563                        let prefix = buffer[..pos].to_string();
564                        if !prefix.trim().is_empty() {
565                            commands.push(self.create_tts_command_with_id(
566                                prefix,
567                                play_id.to_string(),
568                                None,
569                            ));
570                        }
571
572                        if let Some(call) = &self.call {
573                            let mut state = call.call_state.write().await;
574                            let mut extras = state.extras.take().unwrap_or_default();
575                            extras.insert(key, serde_json::Value::String(value));
576                            state.extras = Some(extras);
577                        }
578
579                        buffer.drain(..mat.end());
580                    }
581                    CommandKind::Http => {
582                        let caps = RE_HTTP.captures(buffer).unwrap();
583                        let mat = caps.get(0).unwrap();
584                        let url = caps.get(1).unwrap().as_str().to_string();
585                        let method = caps
586                            .get(2)
587                            .map(|m| m.as_str().to_string())
588                            .unwrap_or("GET".to_string());
589                        let body = caps.get(3).map(|m| m.as_str().to_string());
590
591                        // Flush TTS
592                        let prefix = buffer[..pos].to_string();
593                        if !prefix.trim().is_empty() {
594                            commands.push(self.create_tts_command_with_id(
595                                prefix,
596                                play_id.to_string(),
597                                None,
598                            ));
599                        }
600
601                        // Execute HTTP request synchronously and capture response
602                        let client = self.client.clone();
603                        let mut req = match method.to_uppercase().as_str() {
604                            "POST" => client.post(&url),
605                            "PUT" => client.put(&url),
606                            _ => client.get(&url),
607                        };
608
609                        if let Some(b) = body {
610                            req = req.body(b);
611                        }
612
613                        // Send request and wait for response
614                        match req.send().await {
615                            Ok(res) => {
616                                let status = res.status();
617                                let text = res.text().await.unwrap_or_default();
618                                info!(url, method, status=?status, "HTTP command executed from stream");
619
620                                // Add response to history for LLM context
621                                self.history.push(ChatMessage {
622                                    role: "system".to_string(),
623                                    content: format!(
624                                        "HTTP {} {} returned ({}): {}",
625                                        method, url, status, text
626                                    ),
627                                });
628                            }
629                            Err(e) => {
630                                warn!(
631                                    url,
632                                    method, "Failed to execute HTTP command from stream: {}", e
633                                );
634
635                                // Add error to history
636                                self.history.push(ChatMessage {
637                                    role: "system".to_string(),
638                                    content: format!("HTTP {} {} failed: {}", method, url, e),
639                                });
640                            }
641                        }
642
643                        buffer.drain(..mat.end());
644                    }
645                    CommandKind::Hangup => {
646                        // Don't execute hangup immediately, store it for later
647                        // This allows set_var commands after hangup to still be processed
648                        let prefix = buffer[..pos].to_string();
649                        let hangup_match = RE_HANGUP.find(buffer).unwrap();
650                        pending_hangup = Some((prefix, hangup_match.end()));
651                        buffer.drain(..hangup_match.end());
652
653                        // Continue processing remaining buffer for set_var commands
654                        // Don't return yet!
655                    }
656                    CommandKind::Refer => {
657                        let caps = RE_REFER.captures(buffer).unwrap();
658                        let mat = caps.get(0).unwrap();
659                        let callee = caps.get(1).unwrap().as_str().to_string();
660
661                        let prefix = buffer[..pos].to_string();
662                        if !prefix.trim().is_empty() {
663                            commands.push(self.create_tts_command_with_id(
664                                prefix,
665                                play_id.to_string(),
666                                None,
667                            ));
668                        }
669                        commands.push(Command::Refer {
670                            caller: String::new(),
671                            callee,
672                            options: None,
673                        });
674                        buffer.drain(..mat.end());
675                    }
676                    CommandKind::Play => {
677                        // Play audio
678                        let caps = RE_PLAY.captures(buffer).unwrap();
679                        let mat = caps.get(0).unwrap();
680                        let url = caps.get(1).unwrap().as_str().to_string();
681
682                        let prefix = buffer[..pos].to_string();
683                        if !prefix.trim().is_empty() {
684                            commands.push(self.create_tts_command_with_id(
685                                prefix,
686                                play_id.to_string(),
687                                None,
688                            ));
689                        }
690                        commands.push(Command::Play {
691                            url,
692                            play_id: None,
693                            auto_hangup: None,
694                            wait_input_timeout: None,
695                        });
696                        buffer.drain(..mat.end());
697                    }
698                    CommandKind::Goto => {
699                        // Goto Scene
700                        let caps = RE_GOTO.captures(buffer).unwrap();
701                        let mat = caps.get(0).unwrap();
702                        let scene_id = caps.get(1).unwrap().as_str().to_string();
703
704                        let prefix = buffer[..pos].to_string();
705                        if !prefix.trim().is_empty() {
706                            commands.push(self.create_tts_command_with_id(
707                                prefix,
708                                play_id.to_string(),
709                                None,
710                            ));
711                        }
712
713                        info!("Switching to scene (from stream): {}", scene_id);
714                        if let Some(scene) = self.scenes.get(&scene_id) {
715                            self.current_scene_id = Some(scene_id);
716                            // Update system prompt in history
717                            let system_prompt =
718                                Self::build_system_prompt(&self.config, Some(&scene.prompt));
719                            if let Some(first_msg) = self.history.get_mut(0) {
720                                if first_msg.role == "system" {
721                                    first_msg.content = system_prompt;
722                                }
723                            }
724                        } else {
725                            warn!("Scene not found: {}", scene_id);
726                        }
727
728                        buffer.drain(..mat.end());
729                    }
730                    CommandKind::Sentence => {
731                        // Sentence
732                        let mat = sentence_pos.unwrap();
733                        let sentence = buffer[..mat.end()].to_string();
734                        if !sentence.trim().is_empty() {
735                            commands.push(self.create_tts_command_with_id(
736                                sentence,
737                                play_id.to_string(),
738                                None,
739                            ));
740                        }
741                        buffer.drain(..mat.end());
742                    }
743                }
744            } else {
745                break;
746            }
747        }
748
749        // Process pending hangup after all other commands (especially set_var)
750        if let Some((prefix, _)) = pending_hangup {
751            if !prefix.trim().is_empty() {
752                let mut cmd =
753                    self.create_tts_command_with_id(prefix, play_id.to_string(), Some(true));
754                if let Command::Tts { end_of_stream, .. } = &mut cmd {
755                    *end_of_stream = Some(true);
756                }
757                self.is_hanging_up = true;
758                commands.push(cmd);
759            } else {
760                let mut cmd = self.create_tts_command_with_id(
761                    "".to_string(),
762                    play_id.to_string(),
763                    Some(true),
764                );
765                if let Command::Tts { end_of_stream, .. } = &mut cmd {
766                    *end_of_stream = Some(true);
767                }
768                self.is_hanging_up = true;
769                commands.push(cmd);
770            }
771
772            let headers = self.render_sip_headers().await;
773            commands.push(Command::Hangup {
774                reason: Some("AI Hangup".to_string()),
775                initiator: Some("ai".to_string()),
776                headers,
777            });
778
779            return commands;
780        }
781
782        if is_final {
783            let remaining = buffer.trim().to_string();
784            if !remaining.is_empty() {
785                commands.push(self.create_tts_command_with_id(
786                    remaining,
787                    play_id.to_string(),
788                    None,
789                ));
790            }
791            buffer.clear();
792
793            if let Some(last) = commands.last_mut() {
794                if let Command::Tts { end_of_stream, .. } = last {
795                    *end_of_stream = Some(true);
796                }
797            } else if !self.is_hanging_up {
798                commands.push(Command::Tts {
799                    text: "".to_string(),
800                    speaker: None,
801                    play_id: Some(play_id.to_string()),
802                    auto_hangup: None,
803                    streaming: Some(true),
804                    end_of_stream: Some(true),
805                    option: None,
806                    wait_input_timeout: None,
807                    base64: None,
808                    cache_key: None,
809                });
810            }
811        }
812
813        commands
814    }
815
816    fn create_tts_command_with_id(
817        &self,
818        text: String,
819        play_id: String,
820        auto_hangup: Option<bool>,
821    ) -> Command {
822        Command::Tts {
823            text,
824            speaker: None,
825            play_id: Some(play_id),
826            auto_hangup,
827            streaming: Some(true),
828            end_of_stream: None,
829            option: None,
830            wait_input_timeout: Some(10000),
831            base64: None,
832            cache_key: None,
833        }
834    }
835
836    async fn handle_tool_invocation(
837        &mut self,
838        tool: ToolInvocation,
839        tool_commands: &mut Vec<Command>,
840    ) -> Result<bool> {
841        match tool {
842            ToolInvocation::Hangup {
843                ref reason,
844                ref initiator,
845            } => {
846                self.send_debug_event(
847                    "tool_invocation",
848                    json!({
849                        "tool": "Hangup",
850                        "params": {
851                            "reason": reason,
852                            "initiator": initiator,
853                        }
854                    }),
855                );
856
857                let headers = self.render_sip_headers().await;
858
859                tool_commands.push(Command::Hangup {
860                    reason: reason.clone(),
861                    initiator: initiator.clone(),
862                    headers,
863                });
864                Ok(false)
865            }
866            ToolInvocation::Refer {
867                ref caller,
868                ref callee,
869                ref options,
870            } => {
871                self.send_debug_event(
872                    "tool_invocation",
873                    json!({
874                        "tool": "Refer",
875                        "params": {
876                            "caller": caller,
877                            "callee": callee,
878                        }
879                    }),
880                );
881                tool_commands.push(Command::Refer {
882                    caller: caller.clone(),
883                    callee: callee.clone(),
884                    options: options.clone(),
885                });
886                Ok(false)
887            }
888            ToolInvocation::Rag {
889                ref query,
890                ref source,
891            } => {
892                self.handle_rag_tool(query, source).await?;
893                Ok(true)
894            }
895            ToolInvocation::Accept { ref options } => {
896                self.send_debug_event("tool_invocation", json!({ "tool": "Accept" }));
897                tool_commands.push(Command::Accept {
898                    option: options.clone().unwrap_or_default(),
899                });
900                Ok(false)
901            }
902            ToolInvocation::Reject { ref reason, code } => {
903                self.send_debug_event(
904                    "tool_invocation",
905                    json!({
906                        "tool": "Reject",
907                        "params": {
908                            "reason": reason,
909                            "code": code,
910                        }
911                    }),
912                );
913                tool_commands.push(Command::Reject {
914                    reason: reason
915                        .clone()
916                        .unwrap_or_else(|| "Rejected by agent".to_string()),
917                    code,
918                });
919                Ok(false)
920            }
921            ToolInvocation::Http {
922                ref url,
923                ref method,
924                ref body,
925                ref headers,
926            } => {
927                self.handle_http_tool(url, method, body, headers).await?;
928                Ok(true)
929            }
930        }
931    }
932
933    async fn render_sip_headers(&self) -> Option<HashMap<String, String>> {
934        let hangup_template = self.sip_config.as_ref()?.hangup_headers.as_ref()?;
935        let call = self.call.as_ref()?;
936        let state = call.call_state.read().await;
937
938        let mut context = HashMap::new();
939        let mut sip_headers = HashMap::new();
940
941        // Get the list of SIP header keys stored during extraction
942        // If not present, sip dict will be empty (no headers were configured for extraction)
943        let sip_header_keys: Vec<String> = state
944            .extras
945            .as_ref()
946            .and_then(|e| e.get("_sip_header_keys"))
947            .and_then(|v| serde_json::from_value(v.clone()).ok())
948            .unwrap_or_default();
949
950        if let Some(extras) = &state.extras {
951            for (k, v) in extras {
952                // Skip internal keys
953                if k.starts_with('_') {
954                    continue;
955                }
956                context.insert(k.clone(), v.clone());
957                // Only include keys that were extracted as SIP headers
958                if sip_header_keys.contains(k) {
959                    sip_headers.insert(k.clone(), v.clone());
960                }
961            }
962        }
963
964        // Add sip dictionary for template access
965        context.insert(
966            "sip".to_string(),
967            serde_json::to_value(&sip_headers).unwrap_or(serde_json::Value::Null),
968        );
969
970        let env = minijinja::Environment::new();
971        let mut rendered_headers = HashMap::new();
972        for (k, v) in hangup_template {
973            if let Ok(rendered) = env.render_str(v, &context) {
974                rendered_headers.insert(k.clone(), rendered);
975            } else {
976                rendered_headers.insert(k.clone(), v.clone());
977            }
978        }
979        Some(rendered_headers)
980    }
981
982    async fn handle_rag_tool(&mut self, query: &str, source: &Option<String>) -> Result<()> {
983        self.send_debug_event(
984            "tool_invocation",
985            json!({
986                "tool": "Rag",
987                "params": {
988                    "query": query,
989                    "source": source,
990                }
991            }),
992        );
993
994        let rag_result = self.rag_retriever.retrieve(query).await?;
995
996        self.send_debug_event(
997            "rag_result",
998            json!({
999                "query": query,
1000                "result": rag_result,
1001            }),
1002        );
1003
1004        let summary = if let Some(source) = source {
1005            format!("[{}] {}", source, rag_result)
1006        } else {
1007            rag_result
1008        };
1009
1010        self.history.push(ChatMessage {
1011            role: "system".to_string(),
1012            content: format!("RAG result for {}: {}", query, summary),
1013        });
1014
1015        Ok(())
1016    }
1017
1018    async fn handle_http_tool(
1019        &mut self,
1020        url: &str,
1021        method: &Option<String>,
1022        body: &Option<serde_json::Value>,
1023        headers: &Option<HashMap<String, String>>,
1024    ) -> Result<()> {
1025        let method_str = method.as_deref().unwrap_or("GET").to_uppercase();
1026        let method =
1027            reqwest::Method::from_bytes(method_str.as_bytes()).unwrap_or(reqwest::Method::GET);
1028
1029        self.send_debug_event(
1030            "tool_invocation",
1031            json!({
1032                "tool": "Http",
1033                "params": {
1034                    "url": url,
1035                    "method": method_str,
1036                }
1037            }),
1038        );
1039
1040        let mut req = self.client.request(method, url);
1041        if let Some(body) = body {
1042            req = req.json(body);
1043        }
1044        if let Some(headers) = headers {
1045            for (k, v) in headers {
1046                req = req.header(k, v);
1047            }
1048        }
1049
1050        match req.send().await {
1051            Ok(res) => {
1052                let status = res.status();
1053                let text = res.text().await.unwrap_or_default();
1054                self.history.push(ChatMessage {
1055                    role: "system".to_string(),
1056                    content: format!("HTTP tool response ({}): {}", status, text),
1057                });
1058            }
1059            Err(e) => {
1060                warn!("HTTP tool failed: {}", e);
1061                self.history.push(ChatMessage {
1062                    role: "system".to_string(),
1063                    content: format!("HTTP tool failed: {}", e),
1064                });
1065            }
1066        }
1067
1068        Ok(())
1069    }
1070
1071    async fn handle_asr_final(&mut self, text: &str) -> Result<Vec<Command>> {
1072        if text.trim().is_empty() {
1073            return Ok(vec![]);
1074        }
1075
1076        self.apply_context_repair(text);
1077        self.apply_rolling_summary().await;
1078
1079        self.last_asr_final_at = Some(std::time::Instant::now());
1080        self.last_interaction_at = std::time::Instant::now();
1081        self.is_speaking = false;
1082        self.consecutive_follow_ups = 0;
1083
1084        self.generate_response().await
1085    }
1086
1087    fn apply_context_repair(&mut self, text: &str) {
1088        let enable_repair = self
1089            .config
1090            .features
1091            .as_ref()
1092            .map(|f| f.contains(&"context_repair".to_string()))
1093            .unwrap_or(false);
1094
1095        if !enable_repair {
1096            self.history.push(ChatMessage {
1097                role: "user".to_string(),
1098                content: text.to_string(),
1099            });
1100            return;
1101        }
1102
1103        let repair_window_ms = self.config.repair_window_ms.unwrap_or(3000) as u128;
1104        let mut merged = false;
1105
1106        if let Some(last_robot_at) = self.last_robot_msg_at {
1107            if last_robot_at.elapsed().as_millis() < repair_window_ms {
1108                if let Some(last_msg) = self.history.last() {
1109                    if last_msg.role == "assistant" && last_msg.content.chars().count() < 15 {
1110                        info!(
1111                            "Context Repair: Detected potential fragmentation. Triggering merge."
1112                        );
1113                        self.history.pop();
1114                        if let Some(prev_user) = self.history.last_mut() {
1115                            if prev_user.role == "user" {
1116                                prev_user.content.push_str(",");
1117                                prev_user.content.push_str(text);
1118                                merged = true;
1119                            }
1120                        }
1121                    }
1122                }
1123            }
1124        }
1125
1126        if !merged {
1127            self.history.push(ChatMessage {
1128                role: "user".to_string(),
1129                content: text.to_string(),
1130            });
1131        }
1132    }
1133
1134    async fn apply_rolling_summary(&mut self) {
1135        let enable_summary = self
1136            .config
1137            .features
1138            .as_ref()
1139            .map(|f| f.contains(&"rolling_summary".to_string()))
1140            .unwrap_or(false);
1141
1142        if !enable_summary {
1143            return;
1144        }
1145
1146        let summary_limit = self.config.summary_limit.unwrap_or(20);
1147        if self.history.len() <= summary_limit {
1148            return;
1149        }
1150
1151        info!("Rolling Summary: History limit reached. Triggering background summary.");
1152        let keep_recent = 6;
1153        if self.history.len() <= summary_limit + keep_recent
1154            || self.history.len() <= keep_recent + 1
1155        {
1156            return;
1157        }
1158
1159        let split_idx = self.history.len() - keep_recent;
1160        let to_summarize = self.history[1..split_idx].to_vec();
1161        let recent = self.history[split_idx..].to_vec();
1162
1163        let summary_prompt =
1164            "Summarize the above conversation so far, focusing on key details and user intent.";
1165        let mut summary_req_history = to_summarize;
1166        summary_req_history.push(ChatMessage {
1167            role: "user".to_string(),
1168            content: summary_prompt.to_string(),
1169        });
1170
1171        match self.provider.call(&self.config, &summary_req_history).await {
1172            Ok(summary) => {
1173                let mut new_history = Vec::new();
1174                if let Some(sys) = self.history.first() {
1175                    let mut new_sys = sys.clone();
1176                    new_sys.content.push_str("\n\n[Previous Context Summary]: ");
1177                    new_sys.content.push_str(&summary);
1178                    new_history.push(new_sys);
1179                }
1180                new_history.extend(recent);
1181                self.history = new_history;
1182                info!(
1183                    "Rolling Summary: Applied summary. New history len: {}",
1184                    self.history.len()
1185                );
1186            }
1187            Err(e) => {
1188                warn!("Rolling Summary failed: {}", e);
1189            }
1190        }
1191    }
1192
1193    fn check_interruption(
1194        &mut self,
1195        event: &SessionEvent,
1196        is_filler: &Option<bool>,
1197    ) -> Option<Command> {
1198        let strategy = self.interruption_config.strategy;
1199        let should_check = match (strategy, event) {
1200            (InterruptionStrategy::None, _) => false,
1201            (InterruptionStrategy::Vad, SessionEvent::Speaking { .. }) => true,
1202            (InterruptionStrategy::Asr, SessionEvent::AsrDelta { .. }) => true,
1203            (InterruptionStrategy::Both, _) => true,
1204            _ => false,
1205        };
1206
1207        if !self.is_speaking || self.is_hanging_up || !should_check {
1208            return None;
1209        }
1210
1211        // Protection period check
1212        if let Some(last_start) = self.last_tts_start_at {
1213            let ignore_ms = self.interruption_config.ignore_first_ms.unwrap_or(800);
1214            if last_start.elapsed().as_millis() < ignore_ms as u128 {
1215                return None;
1216            }
1217        }
1218
1219        // Filler word filter
1220        if self.interruption_config.filler_word_filter.unwrap_or(false) {
1221            if let Some(true) = is_filler {
1222                return None;
1223            }
1224            if let SessionEvent::AsrDelta { text, .. } = event {
1225                if is_likely_filler(text) {
1226                    return None;
1227                }
1228            }
1229        }
1230
1231        // Stale event check
1232        if let Some(last_final) = self.last_asr_final_at {
1233            if last_final.elapsed().as_millis() < 500 {
1234                return None;
1235            }
1236        }
1237
1238        info!("Smart interruption detected, stopping playback");
1239        self.is_speaking = false;
1240        Some(Command::Interrupt {
1241            graceful: Some(true),
1242            fade_out_ms: self.interruption_config.volume_fade_ms,
1243        })
1244    }
1245
1246    async fn handle_silence(&mut self) -> Result<Vec<Command>> {
1247        let follow_up_config = if let Some(scene_id) = &self.current_scene_id {
1248            self.scenes
1249                .get(scene_id)
1250                .and_then(|s| s.follow_up)
1251                .or(self.global_follow_up_config)
1252        } else {
1253            self.global_follow_up_config
1254        };
1255
1256        let Some(config) = follow_up_config else {
1257            return Ok(vec![]);
1258        };
1259
1260        if self.is_speaking
1261            || self.last_interaction_at.elapsed().as_millis() < config.timeout as u128
1262        {
1263            return Ok(vec![]);
1264        }
1265
1266        if self.consecutive_follow_ups >= config.max_count {
1267            info!("Max follow-up count reached, hanging up");
1268            let headers = self.render_sip_headers().await;
1269            return Ok(vec![Command::Hangup {
1270                reason: Some("Max follow-up reached".to_string()),
1271                initiator: Some("system".to_string()),
1272                headers,
1273            }]);
1274        }
1275
1276        info!(
1277            "Silence timeout detected ({}ms), triggering follow-up ({}/{})",
1278            self.last_interaction_at.elapsed().as_millis(),
1279            self.consecutive_follow_ups + 1,
1280            config.max_count
1281        );
1282        self.consecutive_follow_ups += 1;
1283        self.last_interaction_at = std::time::Instant::now();
1284        self.generate_response().await
1285    }
1286
1287    async fn handle_function_call(&mut self, name: &str, arguments: &str) -> Result<Vec<Command>> {
1288        info!(
1289            "Function call from Realtime: {} with args {}",
1290            name, arguments
1291        );
1292        let args: serde_json::Value = serde_json::from_str(arguments).unwrap_or_default();
1293
1294        match name {
1295            "hangup_call" => {
1296                let headers = self.render_sip_headers().await;
1297                Ok(vec![Command::Hangup {
1298                    reason: args["reason"].as_str().map(|s| s.to_string()),
1299                    initiator: Some("ai".to_string()),
1300                    headers,
1301                }])
1302            }
1303            "transfer_call" | "refer_call" => {
1304                if let Some(callee) = args["callee"]
1305                    .as_str()
1306                    .or_else(|| args["callee_uri"].as_str())
1307                {
1308                    Ok(vec![Command::Refer {
1309                        caller: String::new(),
1310                        callee: callee.to_string(),
1311                        options: None,
1312                    }])
1313                } else {
1314                    warn!("No callee provided for transfer_call");
1315                    Ok(vec![])
1316                }
1317            }
1318            "goto_scene" => {
1319                if let Some(scene) = args["scene"].as_str() {
1320                    self.switch_to_scene(scene, false).await
1321                } else {
1322                    Ok(vec![])
1323                }
1324            }
1325            _ => {
1326                warn!("Unhandled function call: {}", name);
1327                Ok(vec![])
1328            }
1329        }
1330    }
1331
1332    async fn interpret_response(&mut self, initial: String) -> Result<Vec<Command>> {
1333        let mut tool_commands = Vec::new();
1334        let mut wait_input_timeout = None;
1335        let mut attempts = 0;
1336        let mut raw = initial;
1337
1338        let final_text = loop {
1339            attempts += 1;
1340
1341            let Some(structured) = parse_structured_response(&raw) else {
1342                break Some(raw);
1343            };
1344
1345            if wait_input_timeout.is_none() {
1346                wait_input_timeout = structured.wait_input_timeout;
1347            }
1348
1349            let mut rerun_for_rag = false;
1350            if let Some(tools) = structured.tools {
1351                for tool in tools {
1352                    let needs_rerun = self
1353                        .handle_tool_invocation(tool, &mut tool_commands)
1354                        .await?;
1355                    rerun_for_rag = rerun_for_rag || needs_rerun;
1356                }
1357            }
1358
1359            if !rerun_for_rag {
1360                break structured.text;
1361            }
1362
1363            if attempts >= MAX_RAG_ATTEMPTS {
1364                warn!("Reached RAG iteration limit, using last response");
1365                break structured.text.or(Some(raw));
1366            }
1367
1368            raw = self.call_llm().await?;
1369        };
1370
1371        let has_hangup = tool_commands
1372            .iter()
1373            .any(|c| matches!(c, Command::Hangup { .. }));
1374        let mut commands = Vec::new();
1375
1376        if let Some(text) = final_text {
1377            if !text.trim().is_empty() {
1378                self.history.push(ChatMessage {
1379                    role: "assistant".to_string(),
1380                    content: text.clone(),
1381                });
1382                self.last_tts_start_at = Some(std::time::Instant::now());
1383                self.is_speaking = true;
1384
1385                let auto_hangup = has_hangup.then_some(true);
1386                commands.push(self.create_tts_command(text, wait_input_timeout, auto_hangup));
1387
1388                if has_hangup {
1389                    tool_commands.retain(|c| !matches!(c, Command::Hangup { .. }));
1390                    self.is_hanging_up = true;
1391                }
1392            }
1393        }
1394
1395        commands.extend(tool_commands);
1396        Ok(commands)
1397    }
1398}
1399
1400fn parse_structured_response(raw: &str) -> Option<StructuredResponse> {
1401    let payload = extract_json_block(raw)?;
1402    serde_json::from_str(payload).ok()
1403}
1404
1405fn is_likely_filler(text: &str) -> bool {
1406    let trimmed = text.trim().to_lowercase();
1407    FILLERS.contains(&trimmed)
1408}
1409
1410fn extract_json_block(raw: &str) -> Option<&str> {
1411    let trimmed = raw.trim();
1412    if trimmed.starts_with('`') {
1413        if let Some(end) = trimmed.rfind("```") {
1414            if end <= 3 {
1415                return None;
1416            }
1417            let mut inner = &trimmed[3..end];
1418            inner = inner.trim();
1419            if inner.to_lowercase().starts_with("json") {
1420                if let Some(newline) = inner.find('\n') {
1421                    inner = inner[newline + 1..].trim();
1422                } else if inner.len() > 4 {
1423                    inner = inner[4..].trim();
1424                } else {
1425                    inner = inner.trim();
1426                }
1427            }
1428            return Some(inner);
1429        }
1430    } else if trimmed.starts_with('{') || trimmed.starts_with('[') {
1431        return Some(trimmed);
1432    }
1433    None
1434}
1435
1436#[async_trait]
1437impl DialogueHandler for LlmHandler {
1438    async fn on_start(&mut self) -> Result<Vec<Command>> {
1439        self.last_tts_start_at = Some(std::time::Instant::now());
1440
1441        let mut commands = Vec::new();
1442
1443        // Check if current scene has an audio file to play
1444        if let Some(scene_id) = &self.current_scene_id {
1445            if let Some(scene) = self.scenes.get(scene_id) {
1446                if let Some(audio_file) = &scene.play {
1447                    commands.push(Command::Play {
1448                        url: audio_file.clone(),
1449                        play_id: None,
1450                        auto_hangup: None,
1451                        wait_input_timeout: None,
1452                    });
1453                }
1454            }
1455        }
1456
1457        if let Some(greeting) = &self.config.greeting {
1458            self.is_speaking = true;
1459            commands.push(self.create_tts_command(greeting.clone(), None, None));
1460            return Ok(commands);
1461        }
1462
1463        let response_commands = self.generate_response().await?;
1464        commands.extend(response_commands);
1465        Ok(commands)
1466    }
1467
1468    async fn on_event(&mut self, event: &SessionEvent) -> Result<Vec<Command>> {
1469        match event {
1470            SessionEvent::Dtmf { digit, .. } => {
1471                info!("DTMF received: {}", digit);
1472                if let Some(action) = self.get_dtmf_action(digit) {
1473                    self.handle_dtmf_action(action).await
1474                } else {
1475                    Ok(vec![])
1476                }
1477            }
1478            SessionEvent::AsrFinal { text, .. } => self.handle_asr_final(text).await,
1479            SessionEvent::AsrDelta { is_filler, .. } | SessionEvent::Speaking { is_filler, .. } => {
1480                Ok(self
1481                    .check_interruption(event, is_filler)
1482                    .into_iter()
1483                    .collect())
1484            }
1485            SessionEvent::Eou { completed, .. } => {
1486                if *completed && !self.is_speaking {
1487                    info!("EOU detected, triggering early response");
1488                    self.generate_response().await
1489                } else {
1490                    Ok(vec![])
1491                }
1492            }
1493            SessionEvent::Silence { .. } => self.handle_silence().await,
1494            SessionEvent::TrackStart { .. } => {
1495                self.is_speaking = true;
1496                Ok(vec![])
1497            }
1498            SessionEvent::TrackEnd { .. } => {
1499                self.is_speaking = false;
1500                self.is_hanging_up = false;
1501                self.last_interaction_at = std::time::Instant::now();
1502                Ok(vec![])
1503            }
1504            SessionEvent::FunctionCall {
1505                name, arguments, ..
1506            } => self.handle_function_call(name, arguments).await,
1507            _ => Ok(vec![]),
1508        }
1509    }
1510
1511    async fn get_history(&self) -> Vec<ChatMessage> {
1512        self.history.clone()
1513    }
1514
1515    async fn summarize(&mut self, prompt: &str) -> Result<String> {
1516        info!("Generating summary with prompt: {}", prompt);
1517        let mut summary_history = self.history.clone();
1518        summary_history.push(ChatMessage {
1519            role: "user".to_string(),
1520            content: prompt.to_string(),
1521        });
1522
1523        self.provider.call(&self.config, &summary_history).await
1524    }
1525}