mecha10_nodes_behavior_executor/
lib.rs1mod config;
13
14pub use config::BehaviorExecutorConfig;
15use mecha10_behavior_runtime::NodeStatus as BehaviorStatus;
16use mecha10_behavior_runtime::{BehaviorLoader, BoxedBehavior, NodeRegistry};
17use mecha10_core::behavior_interrupt::BehaviorControl;
18use mecha10_core::prelude::*;
19use mecha10_core::topics::Topic;
20use serde::{Deserialize, Serialize};
21use std::path::PathBuf;
22use std::time::Duration;
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct BehaviorStatusMessage {
27 pub behavior_name: String,
28 pub is_running: bool,
29 pub tick_count: usize,
30 pub timestamp: u64,
31}
32
33impl Message for BehaviorStatusMessage {}
34
35#[derive(Debug, Node)]
37#[node(name = "behavior-executor")]
38pub struct BehaviorExecutorNode {
39 behavior: BoxedBehavior,
40 behavior_name: String,
41 tick_rate_hz: f32,
42 max_ticks: Option<usize>,
43 log_stats: bool,
44 tick_count: usize,
45 last_status: BehaviorStatus,
46 enabled: bool, control_topic: &'static str,
48 status_topic: &'static str,
49 auto_resume_task: Option<tokio::task::JoinHandle<()>>, }
51
52#[async_trait]
53impl NodeImpl for BehaviorExecutorNode {
54 type Config = BehaviorExecutorConfig;
55
56 async fn init(config: Self::Config) -> Result<Self> {
57 info!("Initializing behavior executor: {}", config.behavior_name);
58
59 let control_topic_path = config.control_topic();
61 let status_topic_path = config.status_topic();
62
63 info!("📡 Control topic: {}", control_topic_path);
64 info!("📤 Status topic: {}", status_topic_path);
65
66 let mut registry = NodeRegistry::new();
68 mecha10_behavior_runtime::register_builtin_actions(&mut registry);
69
70 info!("Registered built-in action nodes: wander, move, sensor_check, timer");
71
72 let behavior_path = PathBuf::from(&config.behaviors_dir).join(format!("{}.json", config.behavior_name));
74
75 info!("Loading behavior tree from: {}", behavior_path.display());
76
77 let loader = BehaviorLoader::new(registry);
78 let behavior = loader
79 .load_from_file(&behavior_path)
80 .map_err(|e| anyhow::anyhow!("Failed to load behavior tree: {}", e))?;
81
82 info!("✅ Loaded behavior tree '{}' successfully", config.behavior_name);
83 info!("Tick rate: {} Hz", config.tick_rate_hz);
84 if let Some(max_ticks) = config.max_ticks {
85 info!("Max ticks: {}", max_ticks);
86 } else {
87 info!("Max ticks: unlimited");
88 }
89 info!("⏸️ Behavior execution disabled on startup - waiting for enable signal from dashboard");
90
91 let control_topic_static: &'static str = Box::leak(control_topic_path.into_boxed_str());
93 let status_topic_static: &'static str = Box::leak(status_topic_path.into_boxed_str());
94
95 Ok(Self {
96 behavior,
97 behavior_name: config.behavior_name.clone(),
98 tick_rate_hz: config.tick_rate_hz,
99 max_ticks: config.max_ticks,
100 log_stats: config.log_stats,
101 tick_count: 0,
102 last_status: BehaviorStatus::Success,
103 enabled: false, control_topic: control_topic_static,
105 status_topic: status_topic_static,
106 auto_resume_task: None,
107 })
108 }
109
110 async fn run(&mut self, ctx: &Context) -> Result<()> {
111 info!("Behavior executor running - waiting for enable signal");
112
113 let mut control_rx = ctx.subscribe(Topic::<BehaviorControl>::new(self.control_topic)).await?;
115
116 let tick_interval = Duration::from_secs_f32(1.0 / self.tick_rate_hz);
118 let mut tick_timer = tokio::time::interval(tick_interval);
119
120 let mut status_timer = tokio::time::interval(Duration::from_secs(1));
122
123 loop {
124 tokio::select! {
125 Some(msg) = control_rx.recv() => {
127 match msg.action.as_str() {
128 "enable" => {
129 if !self.enabled {
130 info!("✅ Behavior execution enabled");
131 self.enabled = true;
132 self.behavior.reset().await?;
134 self.tick_count = 0;
135 if let Some(task) = self.auto_resume_task.take() {
137 task.abort();
138 }
139 }
140 }
141 "disable" => {
142 if self.enabled {
143 info!("⏸️ Behavior execution disabled");
144 self.enabled = false;
145 if let Some(task) = self.auto_resume_task.take() {
147 task.abort();
148 }
149 }
150 }
151 "interrupt" => {
152 if self.enabled {
153 let source = msg.source.as_deref().unwrap_or("unknown");
154 info!("⏸️ Behavior execution interrupted by {}", source);
155 self.enabled = false;
156
157 if let Some(duration_secs) = msg.duration_secs {
159 info!("⏰ Will auto-resume in {}s", duration_secs);
160 let ctx_clone = ctx.clone();
161 let control_topic = self.control_topic;
162
163 let task = tokio::spawn(async move {
164 tokio::time::sleep(Duration::from_secs(duration_secs)).await;
165
166 let resume = BehaviorControl::resume("auto-resume");
168 let topic = Topic::<BehaviorControl>::new(control_topic);
169
170 match ctx_clone.publish_to(topic, &resume).await {
171 Ok(_) => {
172 info!("⏰ Auto-resumed behavior after {}s timeout", duration_secs);
173 }
174 Err(e) => {
175 warn!("Failed to auto-resume behavior: {}", e);
176 }
177 }
178 });
179
180 self.auto_resume_task = Some(task);
181 }
182 }
183 }
184 "resume" => {
185 if !self.enabled {
186 let source = msg.source.as_deref().unwrap_or("unknown");
187 info!("▶️ Behavior execution resumed by {}", source);
188 self.enabled = true;
189 self.behavior.reset().await?;
191 if let Some(task) = self.auto_resume_task.take() {
193 task.abort();
194 }
195 }
196 }
197 _ => {
198 warn!("Unknown control action: {}", msg.action);
199 }
200 }
201 }
202
203 _ = tick_timer.tick() => {
205 if !self.enabled {
206 continue;
207 }
208
209 if let Some(max_ticks) = self.max_ticks {
211 if self.tick_count >= max_ticks {
212 info!("Reached max ticks ({}), disabling", max_ticks);
213 self.enabled = false;
214 continue;
215 }
216 }
217
218 match self.behavior.tick(ctx).await {
220 Ok(status) => {
221 self.last_status = status;
222 self.tick_count += 1;
223
224 if status != self.last_status || self.tick_count % 100 == 0 {
226 debug!("Behavior tick #{}: {:?}", self.tick_count, status);
227 }
228
229 if self.log_stats && self.tick_count % 1000 == 0 {
231 info!(
232 "Behavior statistics: {} ticks, current status: {:?}",
233 self.tick_count, status
234 );
235 }
236
237 if status == BehaviorStatus::Success {
239 info!("Behavior completed successfully after {} ticks", self.tick_count);
240 self.behavior.reset().await?;
242 debug!("Behavior reset, starting again");
243 } else if status == BehaviorStatus::Failure {
244 warn!("Behavior failed after {} ticks, resetting", self.tick_count);
245 self.behavior.reset().await?;
246 }
247 }
248 Err(e) => {
249 error!("Behavior tick error: {}", e);
250 }
252 }
253 }
254
255 _ = status_timer.tick() => {
257 let status_msg = BehaviorStatusMessage {
258 behavior_name: self.behavior_name.clone(),
259 is_running: self.enabled,
260 tick_count: self.tick_count,
261 timestamp: now_micros(),
262 };
263
264 ctx.publish_to(Topic::new(self.status_topic), &status_msg).await?;
265 }
266 }
267 }
268 }
269
270 async fn health_check(&self) -> HealthStatus {
271 HealthStatus {
272 healthy: self.last_status != BehaviorStatus::Failure,
273 last_check: now_micros(),
274 failure_count: if self.last_status == BehaviorStatus::Failure {
275 1
276 } else {
277 0
278 },
279 message: Some(format!(
280 "Behavior executor: {} ticks, status: {:?}",
281 self.tick_count, self.last_status
282 )),
283 }
284 }
285}