mecha10_nodes_behavior_executor/
lib.rs1mod config;
13
14pub use config::BehaviorExecutorConfig;
15use mecha10_behavior_runtime::NodeStatus as BehaviorStatus;
16use mecha10_behavior_runtime::{BehaviorLoader, BoxedBehavior, NodeRegistry};
17use mecha10_core::behavior_interrupt::BehaviorControl;
18use mecha10_core::prelude::*;
19use mecha10_core::topics::Topic;
20use serde::{Deserialize, Serialize};
21use std::path::PathBuf;
22use std::time::Duration;
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct BehaviorStatusMessage {
27 pub behavior_name: String,
28 pub is_running: bool,
29 pub tick_count: usize,
30 pub timestamp: u64,
31}
32
33impl Message for BehaviorStatusMessage {}
34
35#[derive(Debug, Node)]
37#[node(name = "behavior-executor")]
38pub struct BehaviorExecutorNode {
39 behavior: BoxedBehavior,
40 behavior_name: String,
41 tick_rate_hz: f32,
42 max_ticks: Option<usize>,
43 log_stats: bool,
44 tick_count: usize,
45 last_status: BehaviorStatus,
46 enabled: bool, control_topic: &'static str,
48 status_topic: &'static str,
49 auto_resume_task: Option<tokio::task::JoinHandle<()>>, }
51
52#[async_trait]
53impl NodeImpl for BehaviorExecutorNode {
54 type Config = BehaviorExecutorConfig;
55
56 async fn init(config: Self::Config) -> Result<Self> {
57 let behavior_name = match Self::get_behavior_from_project_config() {
59 Some(active_behavior) => {
60 if active_behavior != config.behavior_name {
61 info!(
62 "Using behavior '{}' from mecha10.json (overriding node config '{}')",
63 active_behavior, config.behavior_name
64 );
65 }
66 active_behavior
67 }
68 None => config.behavior_name.clone(),
69 };
70
71 info!("Initializing behavior executor: {}", behavior_name);
72
73 let control_topic_path = config.control_topic();
75 let status_topic_path = config.status_topic();
76
77 info!("📡 Control topic: {}", control_topic_path);
78 info!("📤 Status topic: {}", status_topic_path);
79
80 let mut registry = NodeRegistry::new();
82 mecha10_behavior_runtime::register_builtin_actions(&mut registry);
83
84 info!("Registered built-in action nodes: wander, move, sensor_check, timer");
85
86 let behavior_path = PathBuf::from(&config.behaviors_dir).join(format!("{}.json", behavior_name));
88
89 info!("Loading behavior tree from: {}", behavior_path.display());
90
91 let loader = BehaviorLoader::new(registry);
92 let behavior = loader
93 .load_from_file(&behavior_path)
94 .map_err(|e| anyhow::anyhow!("Failed to load behavior tree: {}", e))?;
95
96 info!("✅ Loaded behavior tree '{}' successfully", behavior_name);
97 info!("Tick rate: {} Hz", config.tick_rate_hz);
98 if let Some(max_ticks) = config.max_ticks {
99 info!("Max ticks: {}", max_ticks);
100 } else {
101 info!("Max ticks: unlimited");
102 }
103 info!("⏸️ Behavior execution disabled on startup - waiting for enable signal from dashboard");
104
105 let control_topic_static: &'static str = Box::leak(control_topic_path.into_boxed_str());
107 let status_topic_static: &'static str = Box::leak(status_topic_path.into_boxed_str());
108
109 Ok(Self {
110 behavior,
111 behavior_name,
112 tick_rate_hz: config.tick_rate_hz,
113 max_ticks: config.max_ticks,
114 log_stats: config.log_stats,
115 tick_count: 0,
116 last_status: BehaviorStatus::Success,
117 enabled: false, control_topic: control_topic_static,
119 status_topic: status_topic_static,
120 auto_resume_task: None,
121 })
122 }
123
124 async fn run(&mut self, ctx: &Context) -> Result<()> {
125 info!("Behavior executor running - waiting for enable signal");
126
127 let mut control_rx = ctx.subscribe(Topic::<BehaviorControl>::new(self.control_topic)).await?;
129
130 let tick_interval = Duration::from_secs_f32(1.0 / self.tick_rate_hz);
132 let mut tick_timer = tokio::time::interval(tick_interval);
133
134 let mut status_timer = tokio::time::interval(Duration::from_secs(1));
136
137 loop {
138 tokio::select! {
139 Some(msg) = control_rx.recv() => {
141 match msg.action.as_str() {
142 "enable" => {
143 if !self.enabled {
144 info!("✅ Behavior execution enabled");
145 self.enabled = true;
146 self.behavior.reset().await?;
148 self.tick_count = 0;
149 if let Some(task) = self.auto_resume_task.take() {
151 task.abort();
152 }
153 }
154 }
155 "disable" => {
156 if self.enabled {
157 info!("⏸️ Behavior execution disabled");
158 self.enabled = false;
159 if let Some(task) = self.auto_resume_task.take() {
161 task.abort();
162 }
163 }
164 }
165 "interrupt" => {
166 if self.enabled {
167 let source = msg.source.as_deref().unwrap_or("unknown");
168 info!("⏸️ Behavior execution interrupted by {}", source);
169 self.enabled = false;
170
171 if let Some(duration_secs) = msg.duration_secs {
173 info!("⏰ Will auto-resume in {}s", duration_secs);
174 let ctx_clone = ctx.clone();
175 let control_topic = self.control_topic;
176
177 let task = tokio::spawn(async move {
178 tokio::time::sleep(Duration::from_secs(duration_secs)).await;
179
180 let resume = BehaviorControl::resume("auto-resume");
182 let topic = Topic::<BehaviorControl>::new(control_topic);
183
184 match ctx_clone.publish_to(topic, &resume).await {
185 Ok(_) => {
186 info!("⏰ Auto-resumed behavior after {}s timeout", duration_secs);
187 }
188 Err(e) => {
189 warn!("Failed to auto-resume behavior: {}", e);
190 }
191 }
192 });
193
194 self.auto_resume_task = Some(task);
195 }
196 }
197 }
198 "resume" => {
199 if !self.enabled {
200 let source = msg.source.as_deref().unwrap_or("unknown");
201 info!("▶️ Behavior execution resumed by {}", source);
202 self.enabled = true;
203 self.behavior.reset().await?;
205 if let Some(task) = self.auto_resume_task.take() {
207 task.abort();
208 }
209 }
210 }
211 _ => {
212 warn!("Unknown control action: {}", msg.action);
213 }
214 }
215 }
216
217 _ = tick_timer.tick() => {
219 if !self.enabled {
220 continue;
221 }
222
223 if let Some(max_ticks) = self.max_ticks {
225 if self.tick_count >= max_ticks {
226 info!("Reached max ticks ({}), disabling", max_ticks);
227 self.enabled = false;
228 continue;
229 }
230 }
231
232 match self.behavior.tick(ctx).await {
234 Ok(status) => {
235 self.last_status = status;
236 self.tick_count += 1;
237
238 if status != self.last_status || self.tick_count % 100 == 0 {
240 debug!("Behavior tick #{}: {:?}", self.tick_count, status);
241 }
242
243 if self.log_stats && self.tick_count % 1000 == 0 {
245 info!(
246 "Behavior statistics: {} ticks, current status: {:?}",
247 self.tick_count, status
248 );
249 }
250
251 if status == BehaviorStatus::Success {
253 info!("Behavior completed successfully after {} ticks", self.tick_count);
254 self.behavior.reset().await?;
256 debug!("Behavior reset, starting again");
257 } else if status == BehaviorStatus::Failure {
258 warn!("Behavior failed after {} ticks, resetting", self.tick_count);
259 self.behavior.reset().await?;
260 }
261 }
262 Err(e) => {
263 error!("Behavior tick error: {}", e);
264 }
266 }
267 }
268
269 _ = status_timer.tick() => {
271 let status_msg = BehaviorStatusMessage {
272 behavior_name: self.behavior_name.clone(),
273 is_running: self.enabled,
274 tick_count: self.tick_count,
275 timestamp: now_micros(),
276 };
277
278 ctx.publish_to(Topic::new(self.status_topic), &status_msg).await?;
279 }
280 }
281 }
282 }
283
284 async fn health_check(&self) -> HealthStatus {
285 HealthStatus {
286 healthy: self.last_status != BehaviorStatus::Failure,
287 last_check: now_micros(),
288 failure_count: if self.last_status == BehaviorStatus::Failure {
289 1
290 } else {
291 0
292 },
293 message: Some(format!(
294 "Behavior executor: {} ticks, status: {:?}",
295 self.tick_count, self.last_status
296 )),
297 }
298 }
299}
300
301impl BehaviorExecutorNode {
302 fn get_behavior_from_project_config() -> Option<String> {
307 let paths_to_try = [
309 PathBuf::from("mecha10.json"),
310 PathBuf::from("../mecha10.json"),
311 PathBuf::from("../../mecha10.json"),
312 ];
313
314 for path in &paths_to_try {
315 if !path.exists() {
316 continue;
317 }
318
319 let content = match std::fs::read_to_string(path) {
321 Ok(c) => c,
322 Err(_) => continue,
323 };
324
325 let json: serde_json::Value = match serde_json::from_str(&content) {
326 Ok(j) => j,
327 Err(_) => continue,
328 };
329
330 if let Some(active) = json
332 .get("behaviors")
333 .and_then(|b| b.get("active"))
334 .and_then(|a| a.as_str())
335 {
336 return Some(active.to_string());
337 }
338 }
339
340 None
341 }
342}