mecha10_nodes_behavior_executor/
lib.rs

1//! Behavior Executor Node
2//!
3//! Loads and executes behavior trees for autonomous robot control.
4//!
5//! This node:
6//! 1. Loads a behavior tree from a JSON file
7//! 2. Registers built-in action nodes (wander, move, timer, sensor_check)
8//! 3. Executes the behavior tree at a configurable tick rate
9//! 4. Publishes status and statistics
10//! 5. Listens to control commands (enable/disable) from dashboard
11
12mod config;
13
14pub use config::BehaviorExecutorConfig;
15use mecha10_behavior_runtime::NodeStatus as BehaviorStatus;
16use mecha10_behavior_runtime::{BehaviorLoader, BoxedBehavior, NodeRegistry};
17use mecha10_core::behavior_interrupt::BehaviorControl;
18use mecha10_core::prelude::*;
19use mecha10_core::topics::Topic;
20use serde::{Deserialize, Serialize};
21use std::path::PathBuf;
22use std::time::Duration;
23
24/// Status message published by behavior executor
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct BehaviorStatusMessage {
27    pub behavior_name: String,
28    pub is_running: bool,
29    pub tick_count: usize,
30    pub timestamp: u64,
31}
32
33impl Message for BehaviorStatusMessage {}
34
35/// Behavior executor node that runs behavior trees
36#[derive(Debug, Node)]
37#[node(name = "behavior-executor")]
38pub struct BehaviorExecutorNode {
39    behavior: BoxedBehavior,
40    behavior_name: String,
41    tick_rate_hz: f32,
42    max_ticks: Option<usize>,
43    log_stats: bool,
44    tick_count: usize,
45    last_status: BehaviorStatus,
46    enabled: bool, // Whether behavior execution is enabled
47    control_topic: &'static str,
48    status_topic: &'static str,
49    auto_resume_task: Option<tokio::task::JoinHandle<()>>, // Auto-resume timer
50}
51
52#[async_trait]
53impl NodeImpl for BehaviorExecutorNode {
54    type Config = BehaviorExecutorConfig;
55
56    async fn init(config: Self::Config) -> Result<Self> {
57        info!("Initializing behavior executor: {}", config.behavior_name);
58
59        // Get topic paths from config
60        let control_topic_path = config.control_topic();
61        let status_topic_path = config.status_topic();
62
63        info!("📡 Control topic: {}", control_topic_path);
64        info!("📤 Status topic: {}", status_topic_path);
65
66        // Create node registry and register built-in actions
67        let mut registry = NodeRegistry::new();
68        mecha10_behavior_runtime::register_builtin_actions(&mut registry);
69
70        info!("Registered built-in action nodes: wander, move, sensor_check, timer");
71
72        // Load behavior tree
73        let behavior_path = PathBuf::from(&config.behaviors_dir).join(format!("{}.json", config.behavior_name));
74
75        info!("Loading behavior tree from: {}", behavior_path.display());
76
77        let loader = BehaviorLoader::new(registry);
78        let behavior = loader
79            .load_from_file(&behavior_path)
80            .map_err(|e| anyhow::anyhow!("Failed to load behavior tree: {}", e))?;
81
82        info!("✅ Loaded behavior tree '{}' successfully", config.behavior_name);
83        info!("Tick rate: {} Hz", config.tick_rate_hz);
84        if let Some(max_ticks) = config.max_ticks {
85            info!("Max ticks: {}", max_ticks);
86        } else {
87            info!("Max ticks: unlimited");
88        }
89        info!("⏸️  Behavior execution disabled on startup - waiting for enable signal from dashboard");
90
91        // Leak topic paths for 'static lifetime
92        let control_topic_static: &'static str = Box::leak(control_topic_path.into_boxed_str());
93        let status_topic_static: &'static str = Box::leak(status_topic_path.into_boxed_str());
94
95        Ok(Self {
96            behavior,
97            behavior_name: config.behavior_name.clone(),
98            tick_rate_hz: config.tick_rate_hz,
99            max_ticks: config.max_ticks,
100            log_stats: config.log_stats,
101            tick_count: 0,
102            last_status: BehaviorStatus::Success,
103            enabled: false, // Start disabled, wait for dashboard to enable
104            control_topic: control_topic_static,
105            status_topic: status_topic_static,
106            auto_resume_task: None,
107        })
108    }
109
110    async fn run(&mut self, ctx: &Context) -> Result<()> {
111        info!("Behavior executor running - waiting for enable signal");
112
113        // Subscribe to control commands
114        let mut control_rx = ctx.subscribe(Topic::<BehaviorControl>::new(self.control_topic)).await?;
115
116        // Calculate tick interval
117        let tick_interval = Duration::from_secs_f32(1.0 / self.tick_rate_hz);
118        let mut tick_timer = tokio::time::interval(tick_interval);
119
120        // Status publishing interval (every second)
121        let mut status_timer = tokio::time::interval(Duration::from_secs(1));
122
123        loop {
124            tokio::select! {
125                // Handle control commands
126                Some(msg) = control_rx.recv() => {
127                    match msg.action.as_str() {
128                        "enable" => {
129                            if !self.enabled {
130                                info!("✅ Behavior execution enabled");
131                                self.enabled = true;
132                                // Reset behavior when enabling
133                                self.behavior.reset().await?;
134                                self.tick_count = 0;
135                                // Cancel any auto-resume task
136                                if let Some(task) = self.auto_resume_task.take() {
137                                    task.abort();
138                                }
139                            }
140                        }
141                        "disable" => {
142                            if self.enabled {
143                                info!("⏸️  Behavior execution disabled");
144                                self.enabled = false;
145                                // Cancel any auto-resume task
146                                if let Some(task) = self.auto_resume_task.take() {
147                                    task.abort();
148                                }
149                            }
150                        }
151                        "interrupt" => {
152                            if self.enabled {
153                                let source = msg.source.as_deref().unwrap_or("unknown");
154                                info!("⏸️  Behavior execution interrupted by {}", source);
155                                self.enabled = false;
156
157                                // Schedule auto-resume if duration is specified
158                                if let Some(duration_secs) = msg.duration_secs {
159                                    info!("⏰ Will auto-resume in {}s", duration_secs);
160                                    let ctx_clone = ctx.clone();
161                                    let control_topic = self.control_topic;
162
163                                    let task = tokio::spawn(async move {
164                                        tokio::time::sleep(Duration::from_secs(duration_secs)).await;
165
166                                        // Send resume command
167                                        let resume = BehaviorControl::resume("auto-resume");
168                                        let topic = Topic::<BehaviorControl>::new(control_topic);
169
170                                        match ctx_clone.publish_to(topic, &resume).await {
171                                            Ok(_) => {
172                                                info!("⏰ Auto-resumed behavior after {}s timeout", duration_secs);
173                                            }
174                                            Err(e) => {
175                                                warn!("Failed to auto-resume behavior: {}", e);
176                                            }
177                                        }
178                                    });
179
180                                    self.auto_resume_task = Some(task);
181                                }
182                            }
183                        }
184                        "resume" => {
185                            if !self.enabled {
186                                let source = msg.source.as_deref().unwrap_or("unknown");
187                                info!("▶️  Behavior execution resumed by {}", source);
188                                self.enabled = true;
189                                // Reset behavior when resuming
190                                self.behavior.reset().await?;
191                                // Cancel any pending auto-resume task
192                                if let Some(task) = self.auto_resume_task.take() {
193                                    task.abort();
194                                }
195                            }
196                        }
197                        _ => {
198                            warn!("Unknown control action: {}", msg.action);
199                        }
200                    }
201                }
202
203                // Behavior tick (only if enabled)
204                _ = tick_timer.tick() => {
205                    if !self.enabled {
206                        continue;
207                    }
208
209                    // Check if we've reached max ticks
210                    if let Some(max_ticks) = self.max_ticks {
211                        if self.tick_count >= max_ticks {
212                            info!("Reached max ticks ({}), disabling", max_ticks);
213                            self.enabled = false;
214                            continue;
215                        }
216                    }
217
218                    // Tick the behavior tree
219                    match self.behavior.tick(ctx).await {
220                        Ok(status) => {
221                            self.last_status = status;
222                            self.tick_count += 1;
223
224                            // Log on status change or periodically
225                            if status != self.last_status || self.tick_count % 100 == 0 {
226                                debug!("Behavior tick #{}: {:?}", self.tick_count, status);
227                            }
228
229                            // Log statistics periodically
230                            if self.log_stats && self.tick_count % 1000 == 0 {
231                                info!(
232                                    "Behavior statistics: {} ticks, current status: {:?}",
233                                    self.tick_count, status
234                                );
235                            }
236
237                            // Handle completion
238                            if status == BehaviorStatus::Success {
239                                info!("Behavior completed successfully after {} ticks", self.tick_count);
240                                // Reset and continue (for repeating behaviors)
241                                self.behavior.reset().await?;
242                                debug!("Behavior reset, starting again");
243                            } else if status == BehaviorStatus::Failure {
244                                warn!("Behavior failed after {} ticks, resetting", self.tick_count);
245                                self.behavior.reset().await?;
246                            }
247                        }
248                        Err(e) => {
249                            error!("Behavior tick error: {}", e);
250                            // Continue running despite errors
251                        }
252                    }
253                }
254
255                // Publish status updates
256                _ = status_timer.tick() => {
257                    let status_msg = BehaviorStatusMessage {
258                        behavior_name: self.behavior_name.clone(),
259                        is_running: self.enabled,
260                        tick_count: self.tick_count,
261                        timestamp: now_micros(),
262                    };
263
264                    ctx.publish_to(Topic::new(self.status_topic), &status_msg).await?;
265                }
266            }
267        }
268    }
269
270    async fn health_check(&self) -> HealthStatus {
271        HealthStatus {
272            healthy: self.last_status != BehaviorStatus::Failure,
273            last_check: now_micros(),
274            failure_count: if self.last_status == BehaviorStatus::Failure {
275                1
276            } else {
277                0
278            },
279            message: Some(format!(
280                "Behavior executor: {} ticks, status: {:?}",
281                self.tick_count, self.last_status
282            )),
283        }
284    }
285}