mecha10_nodes_behavior_executor/
lib.rs

1//! Behavior Executor Node
2//!
3//! Loads and executes behavior trees for autonomous robot control.
4//!
5//! This node:
6//! 1. Loads a behavior tree from a JSON file
7//! 2. Registers built-in action nodes (wander, move, timer, sensor_check)
8//! 3. Executes the behavior tree at a configurable tick rate
9//! 4. Publishes status and statistics
10//! 5. Listens to control commands (enable/disable) from dashboard
11
12mod config;
13
14pub use config::BehaviorExecutorConfig;
15use mecha10_behavior_runtime::NodeStatus as BehaviorStatus;
16use mecha10_behavior_runtime::{BehaviorLoader, BoxedBehavior, NodeRegistry};
17use mecha10_core::behavior_interrupt::BehaviorControl;
18use mecha10_core::prelude::*;
19use mecha10_core::topics::Topic;
20use serde::{Deserialize, Serialize};
21use std::path::PathBuf;
22use std::time::Duration;
23
24/// Status message published by behavior executor
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct BehaviorStatusMessage {
27    pub behavior_name: String,
28    pub is_running: bool,
29    pub tick_count: usize,
30    pub timestamp: u64,
31}
32
33impl Message for BehaviorStatusMessage {}
34
35/// Behavior executor node that runs behavior trees
36#[derive(Debug, Node)]
37#[node(name = "behavior-executor")]
38pub struct BehaviorExecutorNode {
39    behavior: BoxedBehavior,
40    behavior_name: String,
41    tick_rate_hz: f32,
42    max_ticks: Option<usize>,
43    log_stats: bool,
44    tick_count: usize,
45    last_status: BehaviorStatus,
46    enabled: bool, // Whether behavior execution is enabled
47    control_topic: &'static str,
48    status_topic: &'static str,
49    auto_resume_task: Option<tokio::task::JoinHandle<()>>, // Auto-resume timer
50}
51
52#[async_trait]
53impl NodeImpl for BehaviorExecutorNode {
54    type Config = BehaviorExecutorConfig;
55
56    async fn init(config: Self::Config) -> Result<Self> {
57        // Check for behavior override from mecha10.json (behaviors.active field)
58        let behavior_name = match Self::get_behavior_from_project_config() {
59            Some(active_behavior) => {
60                if active_behavior != config.behavior_name {
61                    info!(
62                        "Using behavior '{}' from mecha10.json (overriding node config '{}')",
63                        active_behavior, config.behavior_name
64                    );
65                }
66                active_behavior
67            }
68            None => config.behavior_name.clone(),
69        };
70
71        info!("Initializing behavior executor: {}", behavior_name);
72
73        // Get topic paths from config
74        let control_topic_path = config.control_topic();
75        let status_topic_path = config.status_topic();
76
77        info!("📡 Control topic: {}", control_topic_path);
78        info!("📤 Status topic: {}", status_topic_path);
79
80        // Create node registry and register built-in actions
81        let mut registry = NodeRegistry::new();
82        mecha10_behavior_runtime::register_builtin_actions(&mut registry);
83
84        info!("Registered built-in action nodes: wander, move, sensor_check, timer");
85
86        // Load behavior tree
87        let behavior_path = PathBuf::from(&config.behaviors_dir).join(format!("{}.json", behavior_name));
88
89        info!("Loading behavior tree from: {}", behavior_path.display());
90
91        let loader = BehaviorLoader::new(registry);
92        let behavior = loader
93            .load_from_file(&behavior_path)
94            .map_err(|e| anyhow::anyhow!("Failed to load behavior tree: {}", e))?;
95
96        info!("✅ Loaded behavior tree '{}' successfully", behavior_name);
97        info!("Tick rate: {} Hz", config.tick_rate_hz);
98        if let Some(max_ticks) = config.max_ticks {
99            info!("Max ticks: {}", max_ticks);
100        } else {
101            info!("Max ticks: unlimited");
102        }
103        info!("⏸️  Behavior execution disabled on startup - waiting for enable signal from dashboard");
104
105        // Leak topic paths for 'static lifetime
106        let control_topic_static: &'static str = Box::leak(control_topic_path.into_boxed_str());
107        let status_topic_static: &'static str = Box::leak(status_topic_path.into_boxed_str());
108
109        Ok(Self {
110            behavior,
111            behavior_name,
112            tick_rate_hz: config.tick_rate_hz,
113            max_ticks: config.max_ticks,
114            log_stats: config.log_stats,
115            tick_count: 0,
116            last_status: BehaviorStatus::Success,
117            enabled: false, // Start disabled, wait for dashboard to enable
118            control_topic: control_topic_static,
119            status_topic: status_topic_static,
120            auto_resume_task: None,
121        })
122    }
123
124    async fn run(&mut self, ctx: &Context) -> Result<()> {
125        info!("Behavior executor running - waiting for enable signal");
126
127        // Subscribe to control commands
128        let mut control_rx = ctx.subscribe(Topic::<BehaviorControl>::new(self.control_topic)).await?;
129
130        // Calculate tick interval
131        let tick_interval = Duration::from_secs_f32(1.0 / self.tick_rate_hz);
132        let mut tick_timer = tokio::time::interval(tick_interval);
133
134        // Status publishing interval (every second)
135        let mut status_timer = tokio::time::interval(Duration::from_secs(1));
136
137        loop {
138            tokio::select! {
139                // Handle control commands
140                Some(msg) = control_rx.recv() => {
141                    match msg.action.as_str() {
142                        "enable" => {
143                            if !self.enabled {
144                                info!("✅ Behavior execution enabled");
145                                self.enabled = true;
146                                // Reset behavior when enabling
147                                self.behavior.reset().await?;
148                                self.tick_count = 0;
149                                // Cancel any auto-resume task
150                                if let Some(task) = self.auto_resume_task.take() {
151                                    task.abort();
152                                }
153                            }
154                        }
155                        "disable" => {
156                            if self.enabled {
157                                info!("⏸️  Behavior execution disabled");
158                                self.enabled = false;
159                                // Cancel any auto-resume task
160                                if let Some(task) = self.auto_resume_task.take() {
161                                    task.abort();
162                                }
163                            }
164                        }
165                        "interrupt" => {
166                            if self.enabled {
167                                let source = msg.source.as_deref().unwrap_or("unknown");
168                                info!("⏸️  Behavior execution interrupted by {}", source);
169                                self.enabled = false;
170
171                                // Schedule auto-resume if duration is specified
172                                if let Some(duration_secs) = msg.duration_secs {
173                                    info!("⏰ Will auto-resume in {}s", duration_secs);
174                                    let ctx_clone = ctx.clone();
175                                    let control_topic = self.control_topic;
176
177                                    let task = tokio::spawn(async move {
178                                        tokio::time::sleep(Duration::from_secs(duration_secs)).await;
179
180                                        // Send resume command
181                                        let resume = BehaviorControl::resume("auto-resume");
182                                        let topic = Topic::<BehaviorControl>::new(control_topic);
183
184                                        match ctx_clone.publish_to(topic, &resume).await {
185                                            Ok(_) => {
186                                                info!("⏰ Auto-resumed behavior after {}s timeout", duration_secs);
187                                            }
188                                            Err(e) => {
189                                                warn!("Failed to auto-resume behavior: {}", e);
190                                            }
191                                        }
192                                    });
193
194                                    self.auto_resume_task = Some(task);
195                                }
196                            }
197                        }
198                        "resume" => {
199                            if !self.enabled {
200                                let source = msg.source.as_deref().unwrap_or("unknown");
201                                info!("▶️  Behavior execution resumed by {}", source);
202                                self.enabled = true;
203                                // Reset behavior when resuming
204                                self.behavior.reset().await?;
205                                // Cancel any pending auto-resume task
206                                if let Some(task) = self.auto_resume_task.take() {
207                                    task.abort();
208                                }
209                            }
210                        }
211                        _ => {
212                            warn!("Unknown control action: {}", msg.action);
213                        }
214                    }
215                }
216
217                // Behavior tick (only if enabled)
218                _ = tick_timer.tick() => {
219                    if !self.enabled {
220                        continue;
221                    }
222
223                    // Check if we've reached max ticks
224                    if let Some(max_ticks) = self.max_ticks {
225                        if self.tick_count >= max_ticks {
226                            info!("Reached max ticks ({}), disabling", max_ticks);
227                            self.enabled = false;
228                            continue;
229                        }
230                    }
231
232                    // Tick the behavior tree
233                    match self.behavior.tick(ctx).await {
234                        Ok(status) => {
235                            self.last_status = status;
236                            self.tick_count += 1;
237
238                            // Log on status change or periodically
239                            if status != self.last_status || self.tick_count % 100 == 0 {
240                                debug!("Behavior tick #{}: {:?}", self.tick_count, status);
241                            }
242
243                            // Log statistics periodically
244                            if self.log_stats && self.tick_count % 1000 == 0 {
245                                info!(
246                                    "Behavior statistics: {} ticks, current status: {:?}",
247                                    self.tick_count, status
248                                );
249                            }
250
251                            // Handle completion
252                            if status == BehaviorStatus::Success {
253                                info!("Behavior completed successfully after {} ticks", self.tick_count);
254                                // Reset and continue (for repeating behaviors)
255                                self.behavior.reset().await?;
256                                debug!("Behavior reset, starting again");
257                            } else if status == BehaviorStatus::Failure {
258                                warn!("Behavior failed after {} ticks, resetting", self.tick_count);
259                                self.behavior.reset().await?;
260                            }
261                        }
262                        Err(e) => {
263                            error!("Behavior tick error: {}", e);
264                            // Continue running despite errors
265                        }
266                    }
267                }
268
269                // Publish status updates
270                _ = status_timer.tick() => {
271                    let status_msg = BehaviorStatusMessage {
272                        behavior_name: self.behavior_name.clone(),
273                        is_running: self.enabled,
274                        tick_count: self.tick_count,
275                        timestamp: now_micros(),
276                    };
277
278                    ctx.publish_to(Topic::new(self.status_topic), &status_msg).await?;
279                }
280            }
281        }
282    }
283
284    async fn health_check(&self) -> HealthStatus {
285        HealthStatus {
286            healthy: self.last_status != BehaviorStatus::Failure,
287            last_check: now_micros(),
288            failure_count: if self.last_status == BehaviorStatus::Failure {
289                1
290            } else {
291                0
292            },
293            message: Some(format!(
294                "Behavior executor: {} ticks, status: {:?}",
295                self.tick_count, self.last_status
296            )),
297        }
298    }
299}
300
301impl BehaviorExecutorNode {
302    /// Read the active behavior from mecha10.json if available
303    ///
304    /// This allows users to set `behaviors.active` in mecha10.json to override
305    /// the behavior specified in the node config file.
306    fn get_behavior_from_project_config() -> Option<String> {
307        // Try to find and read mecha10.json
308        let paths_to_try = [
309            PathBuf::from("mecha10.json"),
310            PathBuf::from("../mecha10.json"),
311            PathBuf::from("../../mecha10.json"),
312        ];
313
314        for path in &paths_to_try {
315            if !path.exists() {
316                continue;
317            }
318
319            // Read and parse mecha10.json
320            let content = match std::fs::read_to_string(path) {
321                Ok(c) => c,
322                Err(_) => continue,
323            };
324
325            let json: serde_json::Value = match serde_json::from_str(&content) {
326                Ok(j) => j,
327                Err(_) => continue,
328            };
329
330            // Extract behaviors.active
331            if let Some(active) = json
332                .get("behaviors")
333                .and_then(|b| b.get("active"))
334                .and_then(|a| a.as_str())
335            {
336                return Some(active.to_string());
337            }
338        }
339
340        None
341    }
342}