1use crate::context::Context;
46use crate::messages::Message;
47use crate::topics::Topic;
48use anyhow::Result;
49use serde::{Deserialize, Serialize};
50use std::time::{Duration, SystemTime};
51use tracing::{debug, info, warn};
52
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
55#[serde(rename_all = "snake_case")]
56pub enum InterruptMode {
57 Disabled,
59
60 InterruptOnly,
63
64 InterruptWithAutoResume,
67}
68
69impl Default for InterruptMode {
70 fn default() -> Self {
71 Self::InterruptOnly
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct BehaviorInterruptConfig {
78 #[serde(default = "default_enabled")]
80 pub enabled: bool,
81
82 #[serde(default)]
84 pub mode: InterruptMode,
85
86 #[serde(default = "default_timeout_secs")]
88 pub timeout_secs: u64,
89
90 #[serde(default)]
92 pub await_completion: bool,
93
94 #[serde(skip_serializing_if = "Option::is_none")]
97 pub completion_topic: Option<String>,
98
99 #[serde(default = "default_control_topic")]
101 pub control_topic: String,
102}
103
104impl Default for BehaviorInterruptConfig {
105 fn default() -> Self {
106 Self {
107 enabled: default_enabled(),
108 mode: InterruptMode::default(),
109 timeout_secs: default_timeout_secs(),
110 await_completion: false,
111 completion_topic: None,
112 control_topic: default_control_topic(),
113 }
114 }
115}
116
117fn default_enabled() -> bool {
118 true
119}
120
121fn default_timeout_secs() -> u64 {
122 30
123}
124
125fn default_control_topic() -> String {
126 "/behavior/control".to_string()
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct BehaviorControl {
135 pub action: String,
137
138 #[serde(skip_serializing_if = "Option::is_none")]
140 pub source: Option<String>,
141
142 #[serde(skip_serializing_if = "Option::is_none")]
144 pub duration_secs: Option<u64>,
145
146 #[serde(default)]
148 pub timestamp: u64,
149}
150
151impl Message for BehaviorControl {}
152
153impl BehaviorControl {
154 pub fn interrupt(source: &str) -> Self {
156 Self {
157 action: "interrupt".to_string(),
158 source: Some(source.to_string()),
159 duration_secs: None,
160 timestamp: now_micros(),
161 }
162 }
163
164 pub fn interrupt_with_duration(source: &str, duration_secs: u64) -> Self {
166 Self {
167 action: "interrupt".to_string(),
168 source: Some(source.to_string()),
169 duration_secs: Some(duration_secs),
170 timestamp: now_micros(),
171 }
172 }
173
174 pub fn resume(source: &str) -> Self {
176 Self {
177 action: "resume".to_string(),
178 source: Some(source.to_string()),
179 duration_secs: None,
180 timestamp: now_micros(),
181 }
182 }
183
184 pub fn enable() -> Self {
186 Self {
187 action: "enable".to_string(),
188 source: None,
189 duration_secs: None,
190 timestamp: now_micros(),
191 }
192 }
193
194 pub fn disable() -> Self {
196 Self {
197 action: "disable".to_string(),
198 source: None,
199 duration_secs: None,
200 timestamp: now_micros(),
201 }
202 }
203}
204
205pub struct BehaviorInterruptTrigger {
210 source: String,
212
213 config: BehaviorInterruptConfig,
215
216 interrupt_time: Option<SystemTime>,
218
219 auto_resume_task: Option<tokio::task::JoinHandle<()>>,
221}
222
223impl BehaviorInterruptTrigger {
224 pub fn new(source: &str, config: BehaviorInterruptConfig) -> Self {
226 Self {
227 source: source.to_string(),
228 config,
229 interrupt_time: None,
230 auto_resume_task: None,
231 }
232 }
233
234 pub async fn interrupt(&mut self, ctx: &Context) -> Result<()> {
239 if !self.config.enabled || self.config.mode == InterruptMode::Disabled {
240 debug!("Behavior interrupt disabled, skipping");
241 return Ok(());
242 }
243
244 if let Some(task) = self.auto_resume_task.take() {
246 task.abort();
247 }
248
249 let control = match self.config.mode {
251 InterruptMode::InterruptOnly => BehaviorControl::interrupt(&self.source),
252 InterruptMode::InterruptWithAutoResume => {
253 BehaviorControl::interrupt_with_duration(&self.source, self.config.timeout_secs)
254 }
255 InterruptMode::Disabled => return Ok(()),
256 };
257
258 let control_topic =
260 Topic::<BehaviorControl>::new(Box::leak(self.config.control_topic.clone().into_boxed_str()));
261 ctx.publish_to(control_topic, &control).await?;
262
263 info!(
264 "⏸️ Interrupted behavior tree (source: {}, mode: {:?})",
265 self.source, self.config.mode
266 );
267
268 self.interrupt_time = Some(SystemTime::now());
270
271 if self.config.mode == InterruptMode::InterruptWithAutoResume {
273 self.schedule_auto_resume(ctx).await?;
274 }
275
276 Ok(())
277 }
278
279 pub async fn resume(&mut self, ctx: &Context) -> Result<()> {
284 if !self.config.enabled {
285 return Ok(());
286 }
287
288 if let Some(task) = self.auto_resume_task.take() {
290 task.abort();
291 }
292
293 let control = BehaviorControl::resume(&self.source);
295
296 let control_topic =
298 Topic::<BehaviorControl>::new(Box::leak(self.config.control_topic.clone().into_boxed_str()));
299 ctx.publish_to(control_topic, &control).await?;
300
301 info!("▶️ Resumed behavior tree (source: {})", self.source);
302
303 self.interrupt_time = None;
305
306 Ok(())
307 }
308
309 async fn schedule_auto_resume(&mut self, ctx: &Context) -> Result<()> {
311 let timeout = Duration::from_secs(self.config.timeout_secs);
312 let control_topic = self.config.control_topic.clone();
313 let source = self.source.clone();
314 let ctx_clone = ctx.clone();
315
316 let task = tokio::spawn(async move {
317 tokio::time::sleep(timeout).await;
318
319 let control = BehaviorControl::resume(&source);
321 let topic = Topic::<BehaviorControl>::new(Box::leak(control_topic.into_boxed_str()));
322
323 match ctx_clone.publish_to(topic, &control).await {
324 Ok(_) => {
325 info!(
326 "⏰ Auto-resumed behavior tree after {}s timeout (source: {})",
327 timeout.as_secs(),
328 source
329 );
330 }
331 Err(e) => {
332 warn!("Failed to auto-resume behavior tree: {}", e);
333 }
334 }
335 });
336
337 self.auto_resume_task = Some(task);
338
339 Ok(())
340 }
341
342 pub fn interrupt_duration(&self) -> Option<Duration> {
344 self.interrupt_time
345 .and_then(|t| SystemTime::now().duration_since(t).ok())
346 }
347
348 pub fn is_interrupted(&self) -> bool {
350 self.interrupt_time.is_some()
351 }
352}
353
354impl Drop for BehaviorInterruptTrigger {
355 fn drop(&mut self) {
356 if let Some(task) = self.auto_resume_task.take() {
358 task.abort();
359 }
360 }
361}
362
363fn now_micros() -> u64 {
364 SystemTime::now()
365 .duration_since(SystemTime::UNIX_EPOCH)
366 .unwrap()
367 .as_micros() as u64
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373
374 #[test]
375 fn test_interrupt_mode_default() {
376 assert_eq!(InterruptMode::default(), InterruptMode::InterruptOnly);
377 }
378
379 #[test]
380 fn test_config_default() {
381 let config = BehaviorInterruptConfig::default();
382 assert!(config.enabled);
383 assert_eq!(config.mode, InterruptMode::InterruptOnly);
384 assert_eq!(config.timeout_secs, 30);
385 assert!(!config.await_completion);
386 assert!(config.completion_topic.is_none());
387 assert_eq!(config.control_topic, "/behavior/control");
388 }
389
390 #[test]
391 fn test_control_message_creation() {
392 let interrupt = BehaviorControl::interrupt("test-node");
393 assert_eq!(interrupt.action, "interrupt");
394 assert_eq!(interrupt.source, Some("test-node".to_string()));
395 assert!(interrupt.duration_secs.is_none());
396
397 let interrupt_with_duration = BehaviorControl::interrupt_with_duration("test-node", 60);
398 assert_eq!(interrupt_with_duration.action, "interrupt");
399 assert_eq!(interrupt_with_duration.duration_secs, Some(60));
400
401 let resume = BehaviorControl::resume("test-node");
402 assert_eq!(resume.action, "resume");
403
404 let enable = BehaviorControl::enable();
405 assert_eq!(enable.action, "enable");
406 assert!(enable.source.is_none());
407
408 let disable = BehaviorControl::disable();
409 assert_eq!(disable.action, "disable");
410 }
411}