1pub mod types;
62
63pub mod messages;
65
66pub mod sensor;
68
69pub mod actuator;
71
72pub mod teleop;
74
75pub mod topics;
77
78pub mod topic_utils;
80
81pub mod error;
83
84pub mod context;
86
87pub mod stream;
89
90pub mod publish;
92
93pub mod node;
95
96pub mod config;
98
99pub mod config_watcher;
101
102pub mod model;
104
105pub mod macros;
107
108pub mod rpc;
110
111pub mod rpc_breaker;
113
114pub mod rpc_streaming;
116
117pub mod discovery;
119
120pub mod schema;
122
123pub mod auth;
125
126pub mod secrets;
128
129pub mod health;
131
132pub mod metrics;
134
135pub mod rate_limit;
137
138pub mod circuit_breaker;
140
141pub mod dead_letter;
143
144pub mod schema_validation;
146
147pub mod dependency;
149
150pub mod lifecycle;
152
153pub mod topic_registry;
155
156pub mod state;
158
159pub mod recovery;
161
162pub mod tracing_otel;
164
165pub mod profiling;
167
168pub mod priority_queue;
170
171pub mod service;
173pub mod service_manager;
174
175pub mod aggregated;
177
178pub mod recorder;
180
181pub mod redis_bridge;
183
184pub mod behavior_interrupt;
186
187pub mod prelude;
189
190pub use context::{Context, Receiver};
192pub use error::{Mecha10Error, Result};
193pub use messages::Message;
194pub use types::*;
195
196use std::collections::HashMap;
197use std::process::Stdio;
198use std::sync::Arc;
199use thiserror::Error;
200use tokio::process::Child;
201use tokio::sync::RwLock;
202use tokio::time::{interval, Duration};
203
204#[derive(Debug, Error)]
206pub enum RuntimeError {
207 #[error("IO error: {0}")]
208 Io(#[from] std::io::Error),
209
210 #[error("Node error: {0}")]
211 Node(String),
212
213 #[error("Configuration error: {0}")]
214 Config(String),
215
216 #[error("Process error: {0}")]
217 Process(String),
218
219 #[error("Health check failed: {0}")]
220 HealthCheck(String),
221}
222
223pub type RuntimeResult<T> = std::result::Result<T, RuntimeError>;
225
226struct ManagedNode {
228 config: NodeConfig,
229 state: NodeState,
230 process: Option<Child>,
231 restart_count: u32,
232 last_health_check: std::time::Instant,
233}
234
235impl ManagedNode {
236 fn new(config: NodeConfig) -> Self {
237 Self {
238 config,
239 state: NodeState::Stopped,
240 process: None,
241 restart_count: 0,
242 last_health_check: std::time::Instant::now(),
243 }
244 }
245
246 async fn start(&mut self) -> RuntimeResult<()> {
247 if self.state == NodeState::Running {
248 return Ok(());
249 }
250
251 self.state = NodeState::Starting;
252
253 let mut cmd = tokio::process::Command::new(&self.config.executable);
254 cmd.args(&self.config.args);
255 cmd.envs(&self.config.env);
256 cmd.stdout(Stdio::piped());
257 cmd.stderr(Stdio::piped());
258
259 let child = cmd
260 .spawn()
261 .map_err(|e| RuntimeError::Process(format!("Failed to spawn {}: {}", self.config.id, e)))?;
262
263 self.process = Some(child);
264 self.state = NodeState::Running;
265
266 tracing::info!("Started node: {}", self.config.id);
267
268 Ok(())
269 }
270
271 async fn stop(&mut self) -> RuntimeResult<()> {
272 if let Some(mut child) = self.process.take() {
273 self.state = NodeState::Stopping;
274
275 #[cfg(unix)]
277 {
278 use nix::sys::signal::{self, Signal};
279 use nix::unistd::Pid;
280
281 if let Some(pid) = child.id() {
282 let _ = signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
283 }
284 }
285
286 match tokio::time::timeout(Duration::from_secs(5), child.wait()).await {
288 Ok(Ok(status)) => {
289 tracing::info!("Node {} stopped: {:?}", self.config.id, status);
290 }
291 Ok(Err(e)) => {
292 tracing::error!("Error waiting for node {}: {}", self.config.id, e);
293 }
294 Err(_) => {
295 tracing::warn!("Node {} did not stop gracefully, killing", self.config.id);
297 let _ = child.kill().await;
298 }
299 }
300
301 self.state = NodeState::Stopped;
302 }
303
304 Ok(())
305 }
306
307 async fn check_health(&mut self) -> RuntimeResult<bool> {
308 if let Some(child) = &mut self.process {
309 match child.try_wait() {
311 Ok(Some(status)) => {
312 tracing::warn!("Node {} exited: {:?}", self.config.id, status);
314 self.state = NodeState::Crashed;
315 self.process = None;
316 return Ok(false);
317 }
318 Ok(None) => {
319 self.last_health_check = std::time::Instant::now();
321 return Ok(true);
322 }
323 Err(e) => {
324 tracing::error!("Error checking node {}: {}", self.config.id, e);
325 self.state = NodeState::Unhealthy;
326 return Ok(false);
327 }
328 }
329 }
330
331 Ok(false)
332 }
333
334 fn should_restart(&self) -> bool {
335 self.config.auto_restart
336 && (self.state == NodeState::Crashed || self.state == NodeState::Unhealthy)
337 && self.restart_count < 3
338 }
339}
340
341pub struct Runtime {
343 #[allow(dead_code)]
345 robot_id: String,
346
347 nodes: Arc<RwLock<HashMap<String, ManagedNode>>>,
349
350 running: Arc<RwLock<bool>>,
352}
353
354impl Runtime {
355 pub async fn new(robot_id: &str) -> RuntimeResult<Self> {
357 Ok(Self {
358 robot_id: robot_id.to_string(),
359 nodes: Arc::new(RwLock::new(HashMap::new())),
360 running: Arc::new(RwLock::new(false)),
361 })
362 }
363
364 pub async fn start_node(&mut self, config: NodeConfig) -> RuntimeResult<()> {
366 let node_id = config.id.clone();
367
368 let mut nodes = self.nodes.write().await;
369
370 if nodes.contains_key(&node_id) {
371 return Err(RuntimeError::Node(format!("Node {} already exists", node_id)));
372 }
373
374 let mut node = ManagedNode::new(config);
375 node.start().await?;
376
377 nodes.insert(node_id, node);
378
379 Ok(())
380 }
381
382 pub async fn stop_node(&mut self, node_id: &str) -> RuntimeResult<()> {
384 let mut nodes = self.nodes.write().await;
385
386 if let Some(node) = nodes.get_mut(node_id) {
387 node.stop().await?;
388 nodes.remove(node_id);
389 Ok(())
390 } else {
391 Err(RuntimeError::Node(format!("Node {} not found", node_id)))
392 }
393 }
394
395 pub async fn restart_node(&mut self, node_id: &str) -> RuntimeResult<()> {
397 let mut nodes = self.nodes.write().await;
398
399 if let Some(node) = nodes.get_mut(node_id) {
400 node.stop().await?;
401 node.start().await?;
402 node.restart_count += 1;
403 Ok(())
404 } else {
405 Err(RuntimeError::Node(format!("Node {} not found", node_id)))
406 }
407 }
408
409 pub async fn node_state(&self, node_id: &str) -> Option<NodeState> {
411 let nodes = self.nodes.read().await;
412 nodes.get(node_id).map(|n| n.state.clone())
413 }
414
415 pub async fn list_nodes(&self) -> Vec<String> {
417 let nodes = self.nodes.read().await;
418 nodes.keys().cloned().collect()
419 }
420
421 pub async fn run(&mut self) -> RuntimeResult<()> {
423 *self.running.write().await = true;
424
425 let nodes = Arc::clone(&self.nodes);
426 let running = Arc::clone(&self.running);
427
428 let health_check_task = tokio::spawn(async move {
430 let mut interval = interval(Duration::from_secs(5));
431
432 while *running.read().await {
433 interval.tick().await;
434
435 let mut nodes = nodes.write().await;
436 let node_ids: Vec<String> = nodes.keys().cloned().collect();
437
438 for node_id in node_ids {
439 if let Some(node) = nodes.get_mut(&node_id) {
440 if node.state == NodeState::Running {
441 match node.check_health().await {
442 Ok(true) => {
443 }
445 Ok(false) => {
446 if node.should_restart() {
448 tracing::info!("Auto-restarting node: {}", node_id);
449 let _ = node.start().await;
450 node.restart_count += 1;
451 }
452 }
453 Err(e) => {
454 tracing::error!("Health check error for {}: {}", node_id, e);
455 }
456 }
457 }
458 }
459 }
460 }
461 });
462
463 tokio::signal::ctrl_c()
465 .await
466 .map_err(|e| RuntimeError::Process(format!("Error waiting for shutdown: {}", e)))?;
467
468 tracing::info!("Shutting down runtime...");
469 *self.running.write().await = false;
470
471 health_check_task.abort();
472
473 let mut nodes = self.nodes.write().await;
475 for (node_id, node) in nodes.iter_mut() {
476 tracing::info!("Stopping node: {}", node_id);
477 let _ = node.stop().await;
478 }
479
480 Ok(())
481 }
482
483 pub async fn shutdown(&mut self) -> RuntimeResult<()> {
485 *self.running.write().await = false;
486
487 let mut nodes = self.nodes.write().await;
488 for (node_id, node) in nodes.iter_mut() {
489 tracing::info!("Stopping node: {}", node_id);
490 let _ = node.stop().await;
491 }
492
493 Ok(())
494 }
495}