mecha10-nodes-behavior-executor 0.1.48

Behavior tree executor node for autonomous robot behaviors
Documentation
//! Behavior Executor Node
//!
//! Loads and executes behavior trees for autonomous robot control.
//!
//! This node:
//! 1. Loads a behavior tree from a JSON file
//! 2. Registers built-in action nodes (wander, move, timer, sensor_check)
//! 3. Executes the behavior tree at a configurable tick rate
//! 4. Publishes status and statistics
//! 5. Listens to control commands (enable/disable) from dashboard

mod config;

pub use config::BehaviorExecutorConfig;
use mecha10_behavior_runtime::NodeStatus as BehaviorStatus;
use mecha10_behavior_runtime::{BehaviorLoader, BoxedBehavior, NodeRegistry};
use mecha10_core::behavior_interrupt::BehaviorControl;
use mecha10_core::prelude::*;
use mecha10_core::topics::Topic;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::time::Duration;

/// Status message published by behavior executor
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BehaviorStatusMessage {
    pub behavior_name: String,
    pub is_running: bool,
    pub tick_count: usize,
    pub timestamp: u64,
}

impl Message for BehaviorStatusMessage {}

/// Behavior executor node that runs behavior trees
#[derive(Debug, Node)]
#[node(name = "behavior-executor")]
pub struct BehaviorExecutorNode {
    behavior: BoxedBehavior,
    behavior_name: String,
    tick_rate_hz: f32,
    max_ticks: Option<usize>,
    log_stats: bool,
    tick_count: usize,
    last_status: BehaviorStatus,
    enabled: bool, // Whether behavior execution is enabled
    control_topic: &'static str,
    status_topic: &'static str,
    auto_resume_task: Option<tokio::task::JoinHandle<()>>, // Auto-resume timer
}

#[async_trait]
impl NodeImpl for BehaviorExecutorNode {
    type Config = BehaviorExecutorConfig;

    async fn init(config: Self::Config) -> Result<Self> {
        // Check for behavior override from mecha10.json (behaviors.active field)
        let behavior_name = match Self::get_behavior_from_project_config() {
            Some(active_behavior) => {
                if active_behavior != config.behavior_name {
                    info!(
                        "Using behavior '{}' from mecha10.json (overriding node config '{}')",
                        active_behavior, config.behavior_name
                    );
                }
                active_behavior
            }
            None => config.behavior_name.clone(),
        };

        info!("Initializing behavior executor: {}", behavior_name);

        // Get topic paths from config
        let control_topic_path = config.control_topic();
        let status_topic_path = config.status_topic();

        info!("📡 Control topic: {}", control_topic_path);
        info!("📤 Status topic: {}", status_topic_path);

        // Create node registry and register built-in actions
        let mut registry = NodeRegistry::new();
        mecha10_behavior_runtime::register_builtin_actions(&mut registry);

        info!("Registered built-in action nodes: wander, move, sensor_check, timer");

        // Load behavior tree
        let behavior_path = PathBuf::from(&config.behaviors_dir).join(format!("{}.json", behavior_name));

        info!("Loading behavior tree from: {}", behavior_path.display());

        let loader = BehaviorLoader::new(registry);
        let behavior = loader
            .load_from_file(&behavior_path)
            .map_err(|e| anyhow::anyhow!("Failed to load behavior tree: {}", e))?;

        info!("✅ Loaded behavior tree '{}' successfully", behavior_name);
        info!("Tick rate: {} Hz", config.tick_rate_hz);
        if let Some(max_ticks) = config.max_ticks {
            info!("Max ticks: {}", max_ticks);
        } else {
            info!("Max ticks: unlimited");
        }
        info!("⏸️  Behavior execution disabled on startup - waiting for enable signal from dashboard");

        // Leak topic paths for 'static lifetime
        let control_topic_static: &'static str = Box::leak(control_topic_path.into_boxed_str());
        let status_topic_static: &'static str = Box::leak(status_topic_path.into_boxed_str());

        Ok(Self {
            behavior,
            behavior_name,
            tick_rate_hz: config.tick_rate_hz,
            max_ticks: config.max_ticks,
            log_stats: config.log_stats,
            tick_count: 0,
            last_status: BehaviorStatus::Success,
            enabled: false, // Start disabled, wait for dashboard to enable
            control_topic: control_topic_static,
            status_topic: status_topic_static,
            auto_resume_task: None,
        })
    }

    async fn run(&mut self, ctx: &Context) -> Result<()> {
        info!("Behavior executor running - waiting for enable signal");

        // Subscribe to control commands
        let mut control_rx = ctx.subscribe(Topic::<BehaviorControl>::new(self.control_topic)).await?;

        // Calculate tick interval
        let tick_interval = Duration::from_secs_f32(1.0 / self.tick_rate_hz);
        let mut tick_timer = tokio::time::interval(tick_interval);

        // Status publishing interval (every second)
        let mut status_timer = tokio::time::interval(Duration::from_secs(1));

        loop {
            tokio::select! {
                // Handle control commands
                Some(msg) = control_rx.recv() => {
                    match msg.action.as_str() {
                        "enable" => {
                            if !self.enabled {
                                info!("✅ Behavior execution enabled");
                                self.enabled = true;
                                // Reset behavior when enabling
                                self.behavior.reset().await?;
                                self.tick_count = 0;
                                // Cancel any auto-resume task
                                if let Some(task) = self.auto_resume_task.take() {
                                    task.abort();
                                }
                            }
                        }
                        "disable" => {
                            if self.enabled {
                                info!("⏸️  Behavior execution disabled");
                                self.enabled = false;
                                // Cancel any auto-resume task
                                if let Some(task) = self.auto_resume_task.take() {
                                    task.abort();
                                }
                            }
                        }
                        "interrupt" => {
                            if self.enabled {
                                let source = msg.source.as_deref().unwrap_or("unknown");
                                info!("⏸️  Behavior execution interrupted by {}", source);
                                self.enabled = false;

                                // Schedule auto-resume if duration is specified
                                if let Some(duration_secs) = msg.duration_secs {
                                    info!("⏰ Will auto-resume in {}s", duration_secs);
                                    let ctx_clone = ctx.clone();
                                    let control_topic = self.control_topic;

                                    let task = tokio::spawn(async move {
                                        tokio::time::sleep(Duration::from_secs(duration_secs)).await;

                                        // Send resume command
                                        let resume = BehaviorControl::resume("auto-resume");
                                        let topic = Topic::<BehaviorControl>::new(control_topic);

                                        match ctx_clone.publish_to(topic, &resume).await {
                                            Ok(_) => {
                                                info!("⏰ Auto-resumed behavior after {}s timeout", duration_secs);
                                            }
                                            Err(e) => {
                                                warn!("Failed to auto-resume behavior: {}", e);
                                            }
                                        }
                                    });

                                    self.auto_resume_task = Some(task);
                                }
                            }
                        }
                        "resume" => {
                            if !self.enabled {
                                let source = msg.source.as_deref().unwrap_or("unknown");
                                info!("▶️  Behavior execution resumed by {}", source);
                                self.enabled = true;
                                // Reset behavior when resuming
                                self.behavior.reset().await?;
                                // Cancel any pending auto-resume task
                                if let Some(task) = self.auto_resume_task.take() {
                                    task.abort();
                                }
                            }
                        }
                        _ => {
                            warn!("Unknown control action: {}", msg.action);
                        }
                    }
                }

                // Behavior tick (only if enabled)
                _ = tick_timer.tick() => {
                    if !self.enabled {
                        continue;
                    }

                    // Check if we've reached max ticks
                    if let Some(max_ticks) = self.max_ticks {
                        if self.tick_count >= max_ticks {
                            info!("Reached max ticks ({}), disabling", max_ticks);
                            self.enabled = false;
                            continue;
                        }
                    }

                    // Tick the behavior tree
                    match self.behavior.tick(ctx).await {
                        Ok(status) => {
                            self.last_status = status;
                            self.tick_count += 1;

                            // Log on status change or periodically
                            if status != self.last_status || self.tick_count % 100 == 0 {
                                debug!("Behavior tick #{}: {:?}", self.tick_count, status);
                            }

                            // Log statistics periodically
                            if self.log_stats && self.tick_count % 1000 == 0 {
                                info!(
                                    "Behavior statistics: {} ticks, current status: {:?}",
                                    self.tick_count, status
                                );
                            }

                            // Handle completion
                            if status == BehaviorStatus::Success {
                                info!("Behavior completed successfully after {} ticks", self.tick_count);
                                // Reset and continue (for repeating behaviors)
                                self.behavior.reset().await?;
                                debug!("Behavior reset, starting again");
                            } else if status == BehaviorStatus::Failure {
                                warn!("Behavior failed after {} ticks, resetting", self.tick_count);
                                self.behavior.reset().await?;
                            }
                        }
                        Err(e) => {
                            error!("Behavior tick error: {}", e);
                            // Continue running despite errors
                        }
                    }
                }

                // Publish status updates
                _ = status_timer.tick() => {
                    let status_msg = BehaviorStatusMessage {
                        behavior_name: self.behavior_name.clone(),
                        is_running: self.enabled,
                        tick_count: self.tick_count,
                        timestamp: now_micros(),
                    };

                    ctx.publish_to(Topic::new(self.status_topic), &status_msg).await?;
                }
            }
        }
    }

    async fn health_check(&self) -> HealthStatus {
        HealthStatus {
            healthy: self.last_status != BehaviorStatus::Failure,
            last_check: now_micros(),
            failure_count: if self.last_status == BehaviorStatus::Failure {
                1
            } else {
                0
            },
            message: Some(format!(
                "Behavior executor: {} ticks, status: {:?}",
                self.tick_count, self.last_status
            )),
        }
    }
}

impl BehaviorExecutorNode {
    /// Read the active behavior from mecha10.json if available
    ///
    /// This allows users to set `behaviors.active` in mecha10.json to override
    /// the behavior specified in the node config file.
    fn get_behavior_from_project_config() -> Option<String> {
        // Try to find and read mecha10.json
        let paths_to_try = [
            PathBuf::from("mecha10.json"),
            PathBuf::from("../mecha10.json"),
            PathBuf::from("../../mecha10.json"),
        ];

        for path in &paths_to_try {
            if !path.exists() {
                continue;
            }

            // Read and parse mecha10.json
            let content = match std::fs::read_to_string(path) {
                Ok(c) => c,
                Err(_) => continue,
            };

            let json: serde_json::Value = match serde_json::from_str(&content) {
                Ok(j) => j,
                Err(_) => continue,
            };

            // Extract behaviors.active
            if let Some(active) = json
                .get("behaviors")
                .and_then(|b| b.get("active"))
                .and_then(|a| a.as_str())
            {
                return Some(active.to_string());
            }
        }

        None
    }
}