1use crate::CallOption;
2use crate::call::{ActiveCallRef, Command};
3use crate::event::EventReceiver;
4use anyhow::{Result, anyhow};
5use serde_json::json;
6use std::time::Duration;
7use tracing::{error, info, warn};
8
9use super::{Playbook, PlaybookConfig, dialogue::DialogueHandler, handler::LlmHandler};
10
11pub struct PlaybookRunner {
12 handler: Box<dyn DialogueHandler>,
13 call: ActiveCallRef,
14 config: PlaybookConfig,
15 event_receiver: EventReceiver,
16}
17
18impl PlaybookRunner {
19 pub fn with_handler(
20 handler: Box<dyn DialogueHandler>,
21 call: ActiveCallRef,
22 config: PlaybookConfig,
23 ) -> Self {
24 let event_receiver = call.event_sender.subscribe();
25 Self {
26 handler,
27 call,
28 config,
29 event_receiver,
30 }
31 }
32
33 pub fn new(playbook: Playbook, call: ActiveCallRef) -> Result<Self> {
34 let event_receiver = call.event_sender.subscribe();
35 if let Ok(mut state) = call.call_state.try_write() {
36 if state.option.is_none() {
38 state.option = Some(CallOption::default());
39 }
40 if let Some(option) = state.option.as_mut() {
41 apply_playbook_config(option, &playbook.config);
42 }
43 }
44
45 let handler: Box<dyn DialogueHandler> = if let Some(llm_config) = &playbook.config.llm {
46 let mut llm_config = llm_config.clone();
47 if let Some(greeting) = playbook.config.greeting.clone() {
48 llm_config.greeting = Some(greeting);
49 }
50 let interruption_config = playbook.config.interruption.clone().unwrap_or_default();
51 let dtmf_config = playbook.config.dtmf.clone();
52 let dtmf_collectors = playbook.config.dtmf_collectors.clone();
53
54 let mut llm_handler = LlmHandler::new(
55 llm_config,
56 interruption_config,
57 playbook.config.follow_up,
58 playbook.scenes.clone(),
59 dtmf_config,
60 dtmf_collectors,
61 playbook.initial_scene_id.clone(),
62 playbook.config.sip.clone(),
63 );
64 llm_handler.set_event_sender(call.event_sender.clone());
66 llm_handler.set_call(call.clone());
67 Box::new(llm_handler)
68 } else {
69 return Err(anyhow!(
70 "No valid dialogue handler configuration found (e.g. missing 'llm')"
71 ));
72 };
73
74 Ok(Self {
75 handler,
76 call,
77 config: playbook.config,
78 event_receiver,
79 })
80 }
81
82 pub async fn run(mut self) {
83 info!(
84 "PlaybookRunner started for session {}",
85 self.call.session_id
86 );
87
88 let mut answered = {
89 let state = self.call.call_state.read().await;
90 state.answer_time.is_some()
91 };
92
93 if let Ok(commands) = self.handler.on_start().await {
94 for cmd in commands {
95 let is_media = matches!(cmd, Command::Tts { .. } | Command::Play { .. });
96
97 if is_media && !answered {
98 info!("Waiting for call establishment before executing media command...");
99 while let Ok(event) = self.event_receiver.recv().await {
100 match &event {
101 crate::event::SessionEvent::Answer { .. } => {
102 info!("Call established, proceeding to execute media command");
103 answered = true;
104 break;
105 }
106 crate::event::SessionEvent::Hangup { .. } => {
107 info!("Call hung up before established, stopping");
108 return;
109 }
110 _ => {}
111 }
112 }
113 }
114
115 if let Err(e) = self.call.enqueue_command(cmd).await {
116 error!("Failed to enqueue start command: {}", e);
117 }
118 }
119 }
120
121 if !answered {
122 info!("Waiting for call establishment...");
123 while let Ok(event) = self.event_receiver.recv().await {
124 match &event {
125 crate::event::SessionEvent::Answer { .. } => {
126 info!("Call established, proceeding to playbook handles");
127 break;
128 }
129 crate::event::SessionEvent::Hangup { .. } => {
130 info!("Call hung up before established, stopping");
131 return;
132 }
133 _ => {}
134 }
135 }
136 }
137
138 while let Ok(event) = self.event_receiver.recv().await {
139 if let Ok(commands) = self.handler.on_event(&event).await {
140 for cmd in commands {
141 if let Err(e) = self.call.enqueue_command(cmd).await {
142 error!("Failed to enqueue command: {}", e);
143 }
144 }
145 }
146 match &event {
147 crate::event::SessionEvent::Hangup { .. } => {
148 info!("Call hung up, stopping playbook");
149 break;
150 }
151 _ => {}
152 }
153 }
154
155 if let Some(posthook) = self.config.posthook.clone() {
157 let mut handler = self.handler;
158 let session_id = self.call.session_id.clone();
159 drop(self.call);
161 crate::spawn(async move {
162 info!("Executing posthook for session {}", session_id);
163
164 let posthook_timeout = Duration::from_secs(
165 posthook.timeout.unwrap_or(30) as u64
166 );
167
168 let posthook_task = async {
169 let summary = if let Some(summary_type) = &posthook.summary {
170 match handler.summarize(summary_type.prompt()).await {
171 Ok(s) => Some(s),
172 Err(e) => {
173 error!("Failed to generate summary: {}", e);
174 None
175 }
176 }
177 } else {
178 None
179 };
180
181 let history = if posthook.include_history.unwrap_or(true) {
182 Some(handler.get_history().await)
183 } else {
184 None
185 };
186
187 let payload = json!({
188 "sessionId": session_id,
189 "summary": summary,
190 "history": history,
191 "timestamp": chrono::Utc::now().to_rfc3339(),
192 });
193
194 let client = reqwest::Client::new();
195 let method = posthook
196 .method
197 .as_deref()
198 .unwrap_or("POST")
199 .parse::<reqwest::Method>()
200 .unwrap_or(reqwest::Method::POST);
201
202 let mut request = client.request(method, &posthook.url).json(&payload);
203
204 if let Some(headers) = posthook.headers {
205 for (k, v) in headers {
206 request = request.header(k, v);
207 }
208 }
209
210 match request.send().await {
211 Ok(resp) => {
212 if resp.status().is_success() {
213 info!("Posthook sent successfully");
214 } else {
215 warn!("Posthook failed with status: {}", resp.status());
216 }
217 }
218 Err(e) => {
219 error!("Failed to send posthook: {}", e);
220 }
221 }
222 };
223
224 if tokio::time::timeout(posthook_timeout, posthook_task).await.is_err() {
225 error!("Posthook timed out for session {}", session_id);
226 }
227 });
228 }
229 }
230}
231
232pub fn apply_playbook_config(option: &mut CallOption, config: &PlaybookConfig) {
233 let api_key = config.llm.as_ref().and_then(|llm| llm.api_key.clone());
234
235 if let Some(mut asr) = config.asr.clone() {
236 if asr.secret_key.is_none() {
237 asr.secret_key = api_key.clone();
238 }
239 option.asr = Some(asr);
240 }
241 if let Some(mut tts) = config.tts.clone() {
242 if tts.secret_key.is_none() {
243 tts.secret_key = api_key.clone();
244 }
245 option.tts = Some(tts);
246 }
247 if let Some(vad) = config.vad.clone() {
248 option.vad = Some(vad);
249 }
250 if let Some(denoise) = config.denoise {
251 option.denoise = Some(denoise);
252 }
253 if let Some(ambiance) = config.ambiance.clone() {
254 option.ambiance = Some(ambiance);
255 }
256 if let Some(recorder) = config.recorder.clone() {
257 option.recorder = Some(recorder);
258 }
259 if let Some(extra) = config.extra.clone() {
260 option.extra = Some(extra);
261 }
262 if let Some(mut realtime) = config.realtime.clone() {
263 if realtime.secret_key.is_none() {
264 realtime.secret_key = api_key.clone();
265 }
266 option.realtime = Some(realtime);
267 }
268 if let Some(mut eou) = config.eou.clone() {
269 if eou.secret_key.is_none() {
270 eou.secret_key = api_key;
271 }
272 option.eou = Some(eou);
273 }
274 if let Some(sip) = config.sip.clone() {
275 option.sip = Some(sip);
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use crate::{
283 EouOption, media::recorder::RecorderOption, media::vad::VADOption,
284 synthesis::SynthesisOption, transcription::TranscriptionOption,
285 };
286 use std::collections::HashMap;
287
288 #[test]
289 fn apply_playbook_config_sets_fields() {
290 let mut option = CallOption::default();
291 let mut extra = HashMap::new();
292 extra.insert("k".to_string(), "v".to_string());
293
294 let config = PlaybookConfig {
295 asr: Some(TranscriptionOption::default()),
296 tts: Some(SynthesisOption::default()),
297 vad: Some(VADOption::default()),
298 denoise: Some(true),
299 recorder: Some(RecorderOption::default()),
300 extra: Some(extra.clone()),
301 eou: Some(EouOption {
302 r#type: Some("test".to_string()),
303 endpoint: None,
304 secret_key: Some("key".to_string()),
305 secret_id: Some("id".to_string()),
306 timeout: Some(123),
307 extra: None,
308 }),
309 ..Default::default()
310 };
311
312 apply_playbook_config(&mut option, &config);
313
314 assert!(option.asr.is_some());
315 assert!(option.tts.is_some());
316 assert!(option.vad.is_some());
317 assert_eq!(option.denoise, Some(true));
318 assert!(option.recorder.is_some());
319 assert_eq!(option.extra, Some(extra));
320 assert!(option.eou.is_some());
321 }
322
323 #[test]
324 fn apply_playbook_config_propagates_api_key() {
325 let mut option = CallOption::default();
326 let config = PlaybookConfig {
327 llm: Some(super::super::LlmConfig {
328 api_key: Some("test-key".to_string()),
329 ..Default::default()
330 }),
331 asr: Some(TranscriptionOption::default()),
332 tts: Some(SynthesisOption::default()),
333 eou: Some(EouOption::default()),
334 ..Default::default()
335 };
336
337 apply_playbook_config(&mut option, &config);
338
339 assert_eq!(
340 option.asr.as_ref().unwrap().secret_key,
341 Some("test-key".to_string())
342 );
343 assert_eq!(
344 option.tts.as_ref().unwrap().secret_key,
345 Some("test-key".to_string())
346 );
347 assert_eq!(
348 option.eou.as_ref().unwrap().secret_key,
349 Some("test-key".to_string())
350 );
351 }
352
353 #[test]
354 fn posthook_config_timeout_default() {
355 use crate::playbook::PostHookConfig;
356
357 let config = PostHookConfig {
359 url: "http://example.com".to_string(),
360 ..Default::default()
361 };
362 assert_eq!(config.timeout, None);
363 assert_eq!(config.timeout.unwrap_or(30), 30);
365
366 let config = PostHookConfig {
368 url: "http://example.com".to_string(),
369 timeout: Some(60),
370 ..Default::default()
371 };
372 assert_eq!(config.timeout, Some(60));
373 assert_eq!(config.timeout.unwrap_or(30), 60);
374 }
375
376 #[test]
377 fn posthook_config_serde_with_timeout() {
378 use crate::playbook::PostHookConfig;
379
380 let json = r#"{"url": "http://example.com", "timeout": 45}"#;
382 let config: PostHookConfig = serde_json::from_str(json).unwrap();
383 assert_eq!(config.timeout, Some(45));
384 assert_eq!(config.url, "http://example.com");
385
386 let json = r#"{"url": "http://example.com"}"#;
388 let config: PostHookConfig = serde_json::from_str(json).unwrap();
389 assert_eq!(config.timeout, None);
390 }
391}