active_call/playbook/
runner.rs1use crate::CallOption;
2use crate::call::{ActiveCallRef, Command};
3use crate::event::EventReceiver;
4use anyhow::{Result, anyhow};
5use serde_json::json;
6use tracing::{error, info, warn};
7
8use super::{Playbook, PlaybookConfig, dialogue::DialogueHandler, handler::LlmHandler};
9
10pub struct PlaybookRunner {
11 handler: Box<dyn DialogueHandler>,
12 call: ActiveCallRef,
13 config: PlaybookConfig,
14 event_receiver: EventReceiver,
15}
16
17impl PlaybookRunner {
18 pub fn with_handler(
19 handler: Box<dyn DialogueHandler>,
20 call: ActiveCallRef,
21 config: PlaybookConfig,
22 ) -> Self {
23 let event_receiver = call.event_sender.subscribe();
24 Self {
25 handler,
26 call,
27 config,
28 event_receiver,
29 }
30 }
31
32 pub fn new(playbook: Playbook, call: ActiveCallRef) -> Result<Self> {
33 let event_receiver = call.event_sender.subscribe();
34 if let Ok(mut state) = call.call_state.try_write() {
35 if state.option.is_none() {
37 state.option = Some(CallOption::default());
38 }
39 if let Some(option) = state.option.as_mut() {
40 apply_playbook_config(option, &playbook.config);
41 }
42 }
43
44 let handler: Box<dyn DialogueHandler> = if let Some(llm_config) = &playbook.config.llm {
45 let mut llm_config = llm_config.clone();
46 if let Some(greeting) = playbook.config.greeting.clone() {
47 llm_config.greeting = Some(greeting);
48 }
49 let interruption_config = playbook.config.interruption.clone().unwrap_or_default();
50 let dtmf_config = playbook.config.dtmf.clone();
51 let dtmf_collectors = playbook.config.dtmf_collectors.clone();
52
53 let mut llm_handler = LlmHandler::new(
54 llm_config,
55 interruption_config,
56 playbook.config.follow_up,
57 playbook.scenes.clone(),
58 dtmf_config,
59 dtmf_collectors,
60 playbook.initial_scene_id.clone(),
61 playbook.config.sip.clone(),
62 );
63 llm_handler.set_event_sender(call.event_sender.clone());
65 llm_handler.set_call(call.clone());
66 Box::new(llm_handler)
67 } else {
68 return Err(anyhow!(
69 "No valid dialogue handler configuration found (e.g. missing 'llm')"
70 ));
71 };
72
73 Ok(Self {
74 handler,
75 call,
76 config: playbook.config,
77 event_receiver,
78 })
79 }
80
81 pub async fn run(mut self) {
82 info!(
83 "PlaybookRunner started for session {}",
84 self.call.session_id
85 );
86
87 let mut answered = {
88 let state = self.call.call_state.read().await;
89 state.answer_time.is_some()
90 };
91
92 if let Ok(commands) = self.handler.on_start().await {
93 for cmd in commands {
94 let is_media = matches!(cmd, Command::Tts { .. } | Command::Play { .. });
95
96 if is_media && !answered {
97 info!("Waiting for call establishment before executing media command...");
98 while let Ok(event) = self.event_receiver.recv().await {
99 match &event {
100 crate::event::SessionEvent::Answer { .. } => {
101 info!("Call established, proceeding to execute media command");
102 answered = true;
103 break;
104 }
105 crate::event::SessionEvent::Hangup { .. } => {
106 info!("Call hung up before established, stopping");
107 return;
108 }
109 _ => {}
110 }
111 }
112 }
113
114 if let Err(e) = self.call.enqueue_command(cmd).await {
115 error!("Failed to enqueue start command: {}", e);
116 }
117 }
118 }
119
120 if !answered {
121 info!("Waiting for call establishment...");
122 while let Ok(event) = self.event_receiver.recv().await {
123 match &event {
124 crate::event::SessionEvent::Answer { .. } => {
125 info!("Call established, proceeding to playbook handles");
126 break;
127 }
128 crate::event::SessionEvent::Hangup { .. } => {
129 info!("Call hung up before established, stopping");
130 return;
131 }
132 _ => {}
133 }
134 }
135 }
136
137 while let Ok(event) = self.event_receiver.recv().await {
138 if let Ok(commands) = self.handler.on_event(&event).await {
139 for cmd in commands {
140 if let Err(e) = self.call.enqueue_command(cmd).await {
141 error!("Failed to enqueue command: {}", e);
142 }
143 }
144 }
145 match &event {
146 crate::event::SessionEvent::Hangup { .. } => {
147 info!("Call hung up, stopping playbook");
148 break;
149 }
150 _ => {}
151 }
152 }
153
154 if let Some(posthook) = self.config.posthook.clone() {
156 let mut handler = self.handler;
157 let call = self.call.clone();
158 crate::spawn(async move {
159 info!("Executing posthook for session {}", call.session_id);
160
161 let summary = if let Some(summary_type) = &posthook.summary {
162 match handler.summarize(summary_type.prompt()).await {
163 Ok(s) => Some(s),
164 Err(e) => {
165 error!("Failed to generate summary: {}", e);
166 None
167 }
168 }
169 } else {
170 None
171 };
172
173 let history = if posthook.include_history.unwrap_or(true) {
174 Some(handler.get_history().await)
175 } else {
176 None
177 };
178
179 let payload = json!({
180 "sessionId": call.session_id,
181 "summary": summary,
182 "history": history,
183 "timestamp": chrono::Utc::now().to_rfc3339(),
184 });
185
186 let client = reqwest::Client::new();
187 let method = posthook
188 .method
189 .as_deref()
190 .unwrap_or("POST")
191 .parse::<reqwest::Method>()
192 .unwrap_or(reqwest::Method::POST);
193
194 let mut request = client.request(method, &posthook.url).json(&payload);
195
196 if let Some(headers) = posthook.headers {
197 for (k, v) in headers {
198 request = request.header(k, v);
199 }
200 }
201
202 match request.send().await {
203 Ok(resp) => {
204 if resp.status().is_success() {
205 info!("Posthook sent successfully");
206 } else {
207 warn!("Posthook failed with status: {}", resp.status());
208 }
209 }
210 Err(e) => {
211 error!("Failed to send posthook: {}", e);
212 }
213 }
214 });
215 }
216 }
217}
218
219pub fn apply_playbook_config(option: &mut CallOption, config: &PlaybookConfig) {
220 let api_key = config.llm.as_ref().and_then(|llm| llm.api_key.clone());
221
222 if let Some(mut asr) = config.asr.clone() {
223 if asr.secret_key.is_none() {
224 asr.secret_key = api_key.clone();
225 }
226 option.asr = Some(asr);
227 }
228 if let Some(mut tts) = config.tts.clone() {
229 if tts.secret_key.is_none() {
230 tts.secret_key = api_key.clone();
231 }
232 option.tts = Some(tts);
233 }
234 if let Some(vad) = config.vad.clone() {
235 option.vad = Some(vad);
236 }
237 if let Some(denoise) = config.denoise {
238 option.denoise = Some(denoise);
239 }
240 if let Some(ambiance) = config.ambiance.clone() {
241 option.ambiance = Some(ambiance);
242 }
243 if let Some(recorder) = config.recorder.clone() {
244 option.recorder = Some(recorder);
245 }
246 if let Some(extra) = config.extra.clone() {
247 option.extra = Some(extra);
248 }
249 if let Some(mut realtime) = config.realtime.clone() {
250 if realtime.secret_key.is_none() {
251 realtime.secret_key = api_key.clone();
252 }
253 option.realtime = Some(realtime);
254 }
255 if let Some(mut eou) = config.eou.clone() {
256 if eou.secret_key.is_none() {
257 eou.secret_key = api_key;
258 }
259 option.eou = Some(eou);
260 }
261 if let Some(sip) = config.sip.clone() {
262 option.sip = Some(sip);
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use crate::{
270 EouOption, media::recorder::RecorderOption, media::vad::VADOption,
271 synthesis::SynthesisOption, transcription::TranscriptionOption,
272 };
273 use std::collections::HashMap;
274
275 #[test]
276 fn apply_playbook_config_sets_fields() {
277 let mut option = CallOption::default();
278 let mut extra = HashMap::new();
279 extra.insert("k".to_string(), "v".to_string());
280
281 let config = PlaybookConfig {
282 asr: Some(TranscriptionOption::default()),
283 tts: Some(SynthesisOption::default()),
284 vad: Some(VADOption::default()),
285 denoise: Some(true),
286 recorder: Some(RecorderOption::default()),
287 extra: Some(extra.clone()),
288 eou: Some(EouOption {
289 r#type: Some("test".to_string()),
290 endpoint: None,
291 secret_key: Some("key".to_string()),
292 secret_id: Some("id".to_string()),
293 timeout: Some(123),
294 extra: None,
295 }),
296 ..Default::default()
297 };
298
299 apply_playbook_config(&mut option, &config);
300
301 assert!(option.asr.is_some());
302 assert!(option.tts.is_some());
303 assert!(option.vad.is_some());
304 assert_eq!(option.denoise, Some(true));
305 assert!(option.recorder.is_some());
306 assert_eq!(option.extra, Some(extra));
307 assert!(option.eou.is_some());
308 }
309
310 #[test]
311 fn apply_playbook_config_propagates_api_key() {
312 let mut option = CallOption::default();
313 let config = PlaybookConfig {
314 llm: Some(super::super::LlmConfig {
315 api_key: Some("test-key".to_string()),
316 ..Default::default()
317 }),
318 asr: Some(TranscriptionOption::default()),
319 tts: Some(SynthesisOption::default()),
320 eou: Some(EouOption::default()),
321 ..Default::default()
322 };
323
324 apply_playbook_config(&mut option, &config);
325
326 assert_eq!(
327 option.asr.as_ref().unwrap().secret_key,
328 Some("test-key".to_string())
329 );
330 assert_eq!(
331 option.tts.as_ref().unwrap().secret_key,
332 Some("test-key".to_string())
333 );
334 assert_eq!(
335 option.eou.as_ref().unwrap().secret_key,
336 Some("test-key".to_string())
337 );
338 }
339}