mod config;
pub use config::BehaviorExecutorConfig;
use mecha10_behavior_runtime::NodeStatus as BehaviorStatus;
use mecha10_behavior_runtime::{BehaviorLoader, BoxedBehavior, NodeRegistry};
use mecha10_core::behavior_interrupt::BehaviorControl;
use mecha10_core::prelude::*;
use mecha10_core::topics::Topic;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BehaviorStatusMessage {
pub behavior_name: String,
pub is_running: bool,
pub tick_count: usize,
pub timestamp: u64,
}
impl Message for BehaviorStatusMessage {}
#[derive(Debug, Node)]
#[node(name = "behavior-executor")]
pub struct BehaviorExecutorNode {
behavior: BoxedBehavior,
behavior_name: String,
tick_rate_hz: f32,
max_ticks: Option<usize>,
log_stats: bool,
tick_count: usize,
last_status: BehaviorStatus,
enabled: bool, control_topic: &'static str,
status_topic: &'static str,
auto_resume_task: Option<tokio::task::JoinHandle<()>>, }
#[async_trait]
impl NodeImpl for BehaviorExecutorNode {
type Config = BehaviorExecutorConfig;
async fn init(config: Self::Config) -> Result<Self> {
info!("Initializing behavior executor: {}", config.behavior_name);
let control_topic_path = config.control_topic();
let status_topic_path = config.status_topic();
info!("📡 Control topic: {}", control_topic_path);
info!("📤 Status topic: {}", status_topic_path);
let mut registry = NodeRegistry::new();
mecha10_behavior_runtime::register_builtin_actions(&mut registry);
info!("Registered built-in action nodes: wander, move, sensor_check, timer");
let behavior_path = PathBuf::from(&config.behaviors_dir).join(format!("{}.json", config.behavior_name));
info!("Loading behavior tree from: {}", behavior_path.display());
let loader = BehaviorLoader::new(registry);
let behavior = loader
.load_from_file(&behavior_path)
.map_err(|e| anyhow::anyhow!("Failed to load behavior tree: {}", e))?;
info!("✅ Loaded behavior tree '{}' successfully", config.behavior_name);
info!("Tick rate: {} Hz", config.tick_rate_hz);
if let Some(max_ticks) = config.max_ticks {
info!("Max ticks: {}", max_ticks);
} else {
info!("Max ticks: unlimited");
}
info!("⏸️ Behavior execution disabled on startup - waiting for enable signal from dashboard");
let control_topic_static: &'static str = Box::leak(control_topic_path.into_boxed_str());
let status_topic_static: &'static str = Box::leak(status_topic_path.into_boxed_str());
Ok(Self {
behavior,
behavior_name: config.behavior_name.clone(),
tick_rate_hz: config.tick_rate_hz,
max_ticks: config.max_ticks,
log_stats: config.log_stats,
tick_count: 0,
last_status: BehaviorStatus::Success,
enabled: false, control_topic: control_topic_static,
status_topic: status_topic_static,
auto_resume_task: None,
})
}
async fn run(&mut self, ctx: &Context) -> Result<()> {
info!("Behavior executor running - waiting for enable signal");
let mut control_rx = ctx.subscribe(Topic::<BehaviorControl>::new(self.control_topic)).await?;
let tick_interval = Duration::from_secs_f32(1.0 / self.tick_rate_hz);
let mut tick_timer = tokio::time::interval(tick_interval);
let mut status_timer = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
Some(msg) = control_rx.recv() => {
match msg.action.as_str() {
"enable" => {
if !self.enabled {
info!("✅ Behavior execution enabled");
self.enabled = true;
self.behavior.reset().await?;
self.tick_count = 0;
if let Some(task) = self.auto_resume_task.take() {
task.abort();
}
}
}
"disable" => {
if self.enabled {
info!("⏸️ Behavior execution disabled");
self.enabled = false;
if let Some(task) = self.auto_resume_task.take() {
task.abort();
}
}
}
"interrupt" => {
if self.enabled {
let source = msg.source.as_deref().unwrap_or("unknown");
info!("⏸️ Behavior execution interrupted by {}", source);
self.enabled = false;
if let Some(duration_secs) = msg.duration_secs {
info!("⏰ Will auto-resume in {}s", duration_secs);
let ctx_clone = ctx.clone();
let control_topic = self.control_topic;
let task = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(duration_secs)).await;
let resume = BehaviorControl::resume("auto-resume");
let topic = Topic::<BehaviorControl>::new(control_topic);
match ctx_clone.publish_to(topic, &resume).await {
Ok(_) => {
info!("⏰ Auto-resumed behavior after {}s timeout", duration_secs);
}
Err(e) => {
warn!("Failed to auto-resume behavior: {}", e);
}
}
});
self.auto_resume_task = Some(task);
}
}
}
"resume" => {
if !self.enabled {
let source = msg.source.as_deref().unwrap_or("unknown");
info!("▶️ Behavior execution resumed by {}", source);
self.enabled = true;
self.behavior.reset().await?;
if let Some(task) = self.auto_resume_task.take() {
task.abort();
}
}
}
_ => {
warn!("Unknown control action: {}", msg.action);
}
}
}
_ = tick_timer.tick() => {
if !self.enabled {
continue;
}
if let Some(max_ticks) = self.max_ticks {
if self.tick_count >= max_ticks {
info!("Reached max ticks ({}), disabling", max_ticks);
self.enabled = false;
continue;
}
}
match self.behavior.tick(ctx).await {
Ok(status) => {
self.last_status = status;
self.tick_count += 1;
if status != self.last_status || self.tick_count % 100 == 0 {
debug!("Behavior tick #{}: {:?}", self.tick_count, status);
}
if self.log_stats && self.tick_count % 1000 == 0 {
info!(
"Behavior statistics: {} ticks, current status: {:?}",
self.tick_count, status
);
}
if status == BehaviorStatus::Success {
info!("Behavior completed successfully after {} ticks", self.tick_count);
self.behavior.reset().await?;
debug!("Behavior reset, starting again");
} else if status == BehaviorStatus::Failure {
warn!("Behavior failed after {} ticks, resetting", self.tick_count);
self.behavior.reset().await?;
}
}
Err(e) => {
error!("Behavior tick error: {}", e);
}
}
}
_ = status_timer.tick() => {
let status_msg = BehaviorStatusMessage {
behavior_name: self.behavior_name.clone(),
is_running: self.enabled,
tick_count: self.tick_count,
timestamp: now_micros(),
};
ctx.publish_to(Topic::new(self.status_topic), &status_msg).await?;
}
}
}
}
async fn health_check(&self) -> HealthStatus {
HealthStatus {
healthy: self.last_status != BehaviorStatus::Failure,
last_check: now_micros(),
failure_count: if self.last_status == BehaviorStatus::Failure {
1
} else {
0
},
message: Some(format!(
"Behavior executor: {} ticks, status: {:?}",
self.tick_count, self.last_status
)),
}
}
}