Skip to main content

active_call/playbook/
runner.rs

1use crate::CallOption;
2use crate::call::{ActiveCallRef, Command};
3use crate::event::EventReceiver;
4use anyhow::{Result, anyhow};
5use serde_json::json;
6use std::time::Duration;
7use tracing::{error, info, warn};
8
9use super::{Playbook, PlaybookConfig, dialogue::DialogueHandler, handler::LlmHandler};
10
11pub struct PlaybookRunner {
12    handler: Box<dyn DialogueHandler>,
13    call: ActiveCallRef,
14    config: PlaybookConfig,
15    event_receiver: EventReceiver,
16}
17
18impl PlaybookRunner {
19    pub fn with_handler(
20        handler: Box<dyn DialogueHandler>,
21        call: ActiveCallRef,
22        config: PlaybookConfig,
23    ) -> Self {
24        let event_receiver = call.event_sender.subscribe();
25        Self {
26            handler,
27            call,
28            config,
29            event_receiver,
30        }
31    }
32
33    pub fn new(playbook: Playbook, call: ActiveCallRef) -> Result<Self> {
34        let event_receiver = call.event_sender.subscribe();
35        if let Ok(mut state) = call.call_state.try_write() {
36            // Ensure option exists before applying config
37            if state.option.is_none() {
38                state.option = Some(CallOption::default());
39            }
40            if let Some(option) = state.option.as_mut() {
41                apply_playbook_config(option, &playbook.config);
42            }
43        }
44
45        let handler: Box<dyn DialogueHandler> = if let Some(llm_config) = &playbook.config.llm {
46            let mut llm_config = llm_config.clone();
47            if let Some(greeting) = playbook.config.greeting.clone() {
48                llm_config.greeting = Some(greeting);
49            }
50            let interruption_config = playbook.config.interruption.clone().unwrap_or_default();
51            let dtmf_config = playbook.config.dtmf.clone();
52            let dtmf_collectors = playbook.config.dtmf_collectors.clone();
53
54            let mut llm_handler = LlmHandler::new(
55                llm_config,
56                interruption_config,
57                playbook.config.follow_up,
58                playbook.scenes.clone(),
59                dtmf_config,
60                dtmf_collectors,
61                playbook.initial_scene_id.clone(),
62                playbook.config.sip.clone(),
63            );
64            // Set event sender for debugging
65            llm_handler.set_event_sender(call.event_sender.clone());
66            llm_handler.set_call(call.clone());
67            Box::new(llm_handler)
68        } else {
69            return Err(anyhow!(
70                "No valid dialogue handler configuration found (e.g. missing 'llm')"
71            ));
72        };
73
74        Ok(Self {
75            handler,
76            call,
77            config: playbook.config,
78            event_receiver,
79        })
80    }
81
82    pub async fn run(mut self) {
83        info!(
84            "PlaybookRunner started for session {}",
85            self.call.session_id
86        );
87
88        let mut answered = {
89            let state = self.call.call_state.read().await;
90            state.answer_time.is_some()
91        };
92
93        if let Ok(commands) = self.handler.on_start().await {
94            for cmd in commands {
95                let is_media = matches!(cmd, Command::Tts { .. } | Command::Play { .. });
96
97                if is_media && !answered {
98                    info!("Waiting for call establishment before executing media command...");
99                    while let Ok(event) = self.event_receiver.recv().await {
100                        match &event {
101                            crate::event::SessionEvent::Answer { .. } => {
102                                info!("Call established, proceeding to execute media command");
103                                answered = true;
104                                break;
105                            }
106                            crate::event::SessionEvent::Hangup { .. } => {
107                                info!("Call hung up before established, stopping");
108                                return;
109                            }
110                            _ => {}
111                        }
112                    }
113                }
114
115                if let Err(e) = self.call.enqueue_command(cmd).await {
116                    error!("Failed to enqueue start command: {}", e);
117                }
118            }
119        }
120
121        if !answered {
122            info!("Waiting for call establishment...");
123            while let Ok(event) = self.event_receiver.recv().await {
124                match &event {
125                    crate::event::SessionEvent::Answer { .. } => {
126                        info!("Call established, proceeding to playbook handles");
127                        break;
128                    }
129                    crate::event::SessionEvent::Hangup { .. } => {
130                        info!("Call hung up before established, stopping");
131                        return;
132                    }
133                    _ => {}
134                }
135            }
136        }
137
138        while let Ok(event) = self.event_receiver.recv().await {
139            if let Ok(commands) = self.handler.on_event(&event).await {
140                for cmd in commands {
141                    if let Err(e) = self.call.enqueue_command(cmd).await {
142                        error!("Failed to enqueue command: {}", e);
143                    }
144                }
145            }
146            match &event {
147                crate::event::SessionEvent::Hangup { .. } => {
148                    info!("Call hung up, stopping playbook");
149                    break;
150                }
151                _ => {}
152            }
153        }
154
155        // Post-hook logic
156        if let Some(posthook) = self.config.posthook.clone() {
157            let mut handler = self.handler;
158            let session_id = self.call.session_id.clone();
159            // Drop the ActiveCallRef before spawning to avoid keeping the entire call alive
160            drop(self.call);
161            crate::spawn(async move {
162                info!("Executing posthook for session {}", session_id);
163
164                let posthook_timeout = Duration::from_secs(
165                    posthook.timeout.unwrap_or(30) as u64
166                );
167
168                let posthook_task = async {
169                    let summary = if let Some(summary_type) = &posthook.summary {
170                        match handler.summarize(summary_type.prompt()).await {
171                            Ok(s) => Some(s),
172                            Err(e) => {
173                                error!("Failed to generate summary: {}", e);
174                                None
175                            }
176                        }
177                    } else {
178                        None
179                    };
180
181                    let history = if posthook.include_history.unwrap_or(true) {
182                        Some(handler.get_history().await)
183                    } else {
184                        None
185                    };
186
187                    let payload = json!({
188                        "sessionId": session_id,
189                        "summary": summary,
190                        "history": history,
191                        "timestamp": chrono::Utc::now().to_rfc3339(),
192                    });
193
194                    let client = reqwest::Client::new();
195                    let method = posthook
196                        .method
197                        .as_deref()
198                        .unwrap_or("POST")
199                        .parse::<reqwest::Method>()
200                        .unwrap_or(reqwest::Method::POST);
201
202                    let mut request = client.request(method, &posthook.url).json(&payload);
203
204                    if let Some(headers) = posthook.headers {
205                        for (k, v) in headers {
206                            request = request.header(k, v);
207                        }
208                    }
209
210                    match request.send().await {
211                        Ok(resp) => {
212                            if resp.status().is_success() {
213                                info!("Posthook sent successfully");
214                            } else {
215                                warn!("Posthook failed with status: {}", resp.status());
216                            }
217                        }
218                        Err(e) => {
219                            error!("Failed to send posthook: {}", e);
220                        }
221                    }
222                };
223
224                if tokio::time::timeout(posthook_timeout, posthook_task).await.is_err() {
225                    error!("Posthook timed out for session {}", session_id);
226                }
227            });
228        }
229    }
230}
231
232pub fn apply_playbook_config(option: &mut CallOption, config: &PlaybookConfig) {
233    let api_key = config.llm.as_ref().and_then(|llm| llm.api_key.clone());
234
235    if let Some(mut asr) = config.asr.clone() {
236        if asr.secret_key.is_none() {
237            asr.secret_key = api_key.clone();
238        }
239        option.asr = Some(asr);
240    }
241    if let Some(mut tts) = config.tts.clone() {
242        if tts.secret_key.is_none() {
243            tts.secret_key = api_key.clone();
244        }
245        option.tts = Some(tts);
246    }
247    if let Some(vad) = config.vad.clone() {
248        option.vad = Some(vad);
249    }
250    if let Some(denoise) = config.denoise {
251        option.denoise = Some(denoise);
252    }
253    if let Some(ambiance) = config.ambiance.clone() {
254        option.ambiance = Some(ambiance);
255    }
256    if let Some(recorder) = config.recorder.clone() {
257        option.recorder = Some(recorder);
258    }
259    if let Some(extra) = config.extra.clone() {
260        option.extra = Some(extra);
261    }
262    if let Some(mut realtime) = config.realtime.clone() {
263        if realtime.secret_key.is_none() {
264            realtime.secret_key = api_key.clone();
265        }
266        option.realtime = Some(realtime);
267    }
268    if let Some(mut eou) = config.eou.clone() {
269        if eou.secret_key.is_none() {
270            eou.secret_key = api_key;
271        }
272        option.eou = Some(eou);
273    }
274    if let Some(sip) = config.sip.clone() {
275        option.sip = Some(sip);
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use crate::{
283        EouOption, media::recorder::RecorderOption, media::vad::VADOption,
284        synthesis::SynthesisOption, transcription::TranscriptionOption,
285    };
286    use std::collections::HashMap;
287
288    #[test]
289    fn apply_playbook_config_sets_fields() {
290        let mut option = CallOption::default();
291        let mut extra = HashMap::new();
292        extra.insert("k".to_string(), "v".to_string());
293
294        let config = PlaybookConfig {
295            asr: Some(TranscriptionOption::default()),
296            tts: Some(SynthesisOption::default()),
297            vad: Some(VADOption::default()),
298            denoise: Some(true),
299            recorder: Some(RecorderOption::default()),
300            extra: Some(extra.clone()),
301            eou: Some(EouOption {
302                r#type: Some("test".to_string()),
303                endpoint: None,
304                secret_key: Some("key".to_string()),
305                secret_id: Some("id".to_string()),
306                timeout: Some(123),
307                extra: None,
308            }),
309            ..Default::default()
310        };
311
312        apply_playbook_config(&mut option, &config);
313
314        assert!(option.asr.is_some());
315        assert!(option.tts.is_some());
316        assert!(option.vad.is_some());
317        assert_eq!(option.denoise, Some(true));
318        assert!(option.recorder.is_some());
319        assert_eq!(option.extra, Some(extra));
320        assert!(option.eou.is_some());
321    }
322
323    #[test]
324    fn apply_playbook_config_propagates_api_key() {
325        let mut option = CallOption::default();
326        let config = PlaybookConfig {
327            llm: Some(super::super::LlmConfig {
328                api_key: Some("test-key".to_string()),
329                ..Default::default()
330            }),
331            asr: Some(TranscriptionOption::default()),
332            tts: Some(SynthesisOption::default()),
333            eou: Some(EouOption::default()),
334            ..Default::default()
335        };
336
337        apply_playbook_config(&mut option, &config);
338
339        assert_eq!(
340            option.asr.as_ref().unwrap().secret_key,
341            Some("test-key".to_string())
342        );
343        assert_eq!(
344            option.tts.as_ref().unwrap().secret_key,
345            Some("test-key".to_string())
346        );
347        assert_eq!(
348            option.eou.as_ref().unwrap().secret_key,
349            Some("test-key".to_string())
350        );
351    }
352
353    #[test]
354    fn posthook_config_timeout_default() {
355        use crate::playbook::PostHookConfig;
356
357        // Test default timeout (None -> should use 30 in code)
358        let config = PostHookConfig {
359            url: "http://example.com".to_string(),
360            ..Default::default()
361        };
362        assert_eq!(config.timeout, None);
363        // Code uses unwrap_or(30), verify the logic
364        assert_eq!(config.timeout.unwrap_or(30), 30);
365
366        // Test custom timeout
367        let config = PostHookConfig {
368            url: "http://example.com".to_string(),
369            timeout: Some(60),
370            ..Default::default()
371        };
372        assert_eq!(config.timeout, Some(60));
373        assert_eq!(config.timeout.unwrap_or(30), 60);
374    }
375
376    #[test]
377    fn posthook_config_serde_with_timeout() {
378        use crate::playbook::PostHookConfig;
379
380        // Test that timeout field is correctly serialized/deserialized
381        let json = r#"{"url": "http://example.com", "timeout": 45}"#;
382        let config: PostHookConfig = serde_json::from_str(json).unwrap();
383        assert_eq!(config.timeout, Some(45));
384        assert_eq!(config.url, "http://example.com");
385
386        // Without timeout
387        let json = r#"{"url": "http://example.com"}"#;
388        let config: PostHookConfig = serde_json::from_str(json).unwrap();
389        assert_eq!(config.timeout, None);
390    }
391}