mecha10_core/
lib.rs

1//! Mecha10 Core Runtime
2//!
3//! This crate provides the foundational runtime for the Mecha10 framework,
4//! including node lifecycle management, health monitoring, and orchestration.
5//!
6//! # Features
7//!
8//! - Node lifecycle management (start, stop, restart)
9//! - Health monitoring and automatic recovery
10//! - Process management for node execution
11//! - Integration with config and messaging systems
12//! - Graceful shutdown handling
13//! - Type-safe topics system
14//! - Comprehensive error handling
15//! - Shared message types
16//!
17//! # Quick Start
18//!
19//! ```rust
20//! use mecha10::prelude::*;
21//!
22//! #[derive(Node)]
23//! #[subscribe(topics::sensor::CAMERA_RGB)]
24//! #[publish(topics::perception::DETECTIONS)]
25//! struct MyNode {
26//!     config: MyConfig,
27//! }
28//!
29//! #[derive(Config)]
30//! struct MyConfig {
31//!     #[validate(range = "0.0..=1.0")]
32//!     threshold: f32,
33//! }
34//!
35//! #[async_trait]
36//! impl NodeImpl for MyNode {
37//!     type Config = MyConfig;
38//!
39//!     async fn init(config: Self::Config) -> Result<Self> {
40//!         Ok(Self { config })
41//!     }
42//!
43//!     async fn run(&mut self, ctx: &Context) -> Result<()> {
44//!         let mut images = ctx.subscribe(topics::sensor::CAMERA_RGB).await?;
45//!
46//!         while let Some(image) = images.recv().await {
47//!             // Process image
48//!             ctx.publish(&result).await?;
49//!         }
50//!
51//!         Ok(())
52//!     }
53//! }
54//! ```
55
56// ============================================================================
57// Module Exports
58// ============================================================================
59
60// Core type system (all shared types)
61pub mod types;
62
63// Shared message types (sensor, actuator, perception, navigation, system)
64pub mod messages;
65
66// Shared sensor message types (camera, lidar, imu, odometry)
67pub mod sensor;
68
69// Shared actuator message types (motor commands, status)
70pub mod actuator;
71
72// Teleoperation message types (input commands, status)
73pub mod teleop;
74
75// Type-safe topics system
76pub mod topics;
77
78// Topic utilities (categorization, formatting, metadata)
79pub mod topic_utils;
80
81// Structured error handling
82pub mod error;
83
84// Node execution context
85pub mod context;
86
87// Stream combinators for declarative message processing
88pub mod stream;
89
90// Conditional publishing
91pub mod publish;
92
93// Node trait and lifecycle management
94pub mod node;
95
96// Configuration loading and validation
97pub mod config;
98
99// Configuration hot-reload support
100pub mod config_watcher;
101
102// Model configuration utilities for AI nodes
103pub mod model;
104
105// Convenience macros for reducing boilerplate
106pub mod macros;
107
108// Request/Response (RPC) pattern over pub/sub
109pub mod rpc;
110
111// RPC with circuit breaker integration
112pub mod rpc_breaker;
113
114// Streaming RPC for long-running operations
115pub mod rpc_streaming;
116
117// Service discovery for node registration and discovery
118pub mod discovery;
119
120// Schema registry for versioned message schemas
121pub mod schema;
122
123// Authorization and access control
124pub mod auth;
125
126// Secrets management
127pub mod secrets;
128
129// Health monitoring and aggregation
130pub mod health;
131
132// Metrics collection
133pub mod metrics;
134
135// Rate limiting
136pub mod rate_limit;
137
138// Circuit breaker for fault tolerance
139pub mod circuit_breaker;
140
141// Dead letter queue for failed message handling
142pub mod dead_letter;
143
144// JSON Schema validation for configuration files
145pub mod schema_validation;
146
147// Dependency resolution for node ordering
148pub mod dependency;
149
150// Lifecycle event schemas for mode-based management
151pub mod lifecycle;
152
153// Topic registry for runtime introspection
154pub mod topic_registry;
155
156// Persistent state storage abstraction
157pub mod state;
158
159// Graceful degradation and recovery policies
160pub mod recovery;
161
162// Distributed tracing
163pub mod tracing_otel;
164
165// Performance profiling helpers
166pub mod profiling;
167
168// Priority queue for quality-of-service
169pub mod priority_queue;
170
171// Service pattern for infrastructure components
172pub mod service;
173pub mod service_manager;
174
175// Aggregated topics for multi-topic subscriptions
176pub mod aggregated;
177
178// Bag file recording and replay (ROS bags-like)
179pub mod recorder;
180
181// Redis bridge for local ↔ control plane message forwarding
182pub mod redis_bridge;
183
184// Behavior interrupt trigger for pausing autonomous behaviors
185pub mod behavior_interrupt;
186
187// Comprehensive prelude (single import for everything)
188pub mod prelude;
189
190// Re-export commonly used items at crate root
191pub use context::{Context, Receiver};
192pub use error::{Mecha10Error, Result};
193pub use messages::Message;
194pub use types::*;
195
196use std::collections::HashMap;
197use std::process::Stdio;
198use std::sync::Arc;
199use thiserror::Error;
200use tokio::process::Child;
201use tokio::sync::RwLock;
202use tokio::time::{interval, Duration};
203
204/// Core runtime errors
205#[derive(Debug, Error)]
206pub enum RuntimeError {
207    #[error("IO error: {0}")]
208    Io(#[from] std::io::Error),
209
210    #[error("Node error: {0}")]
211    Node(String),
212
213    #[error("Configuration error: {0}")]
214    Config(String),
215
216    #[error("Process error: {0}")]
217    Process(String),
218
219    #[error("Health check failed: {0}")]
220    HealthCheck(String),
221}
222
223/// Result type for runtime operations
224pub type RuntimeResult<T> = std::result::Result<T, RuntimeError>;
225
226/// Managed node instance
227struct ManagedNode {
228    config: NodeConfig,
229    state: NodeState,
230    process: Option<Child>,
231    restart_count: u32,
232    last_health_check: std::time::Instant,
233}
234
235impl ManagedNode {
236    fn new(config: NodeConfig) -> Self {
237        Self {
238            config,
239            state: NodeState::Stopped,
240            process: None,
241            restart_count: 0,
242            last_health_check: std::time::Instant::now(),
243        }
244    }
245
246    async fn start(&mut self) -> RuntimeResult<()> {
247        if self.state == NodeState::Running {
248            return Ok(());
249        }
250
251        self.state = NodeState::Starting;
252
253        let mut cmd = tokio::process::Command::new(&self.config.executable);
254        cmd.args(&self.config.args);
255        cmd.envs(&self.config.env);
256        cmd.stdout(Stdio::piped());
257        cmd.stderr(Stdio::piped());
258
259        let child = cmd
260            .spawn()
261            .map_err(|e| RuntimeError::Process(format!("Failed to spawn {}: {}", self.config.id, e)))?;
262
263        self.process = Some(child);
264        self.state = NodeState::Running;
265
266        tracing::info!("Started node: {}", self.config.id);
267
268        Ok(())
269    }
270
271    async fn stop(&mut self) -> RuntimeResult<()> {
272        if let Some(mut child) = self.process.take() {
273            self.state = NodeState::Stopping;
274
275            // Try graceful shutdown first
276            #[cfg(unix)]
277            {
278                use nix::sys::signal::{self, Signal};
279                use nix::unistd::Pid;
280
281                if let Some(pid) = child.id() {
282                    let _ = signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
283                }
284            }
285
286            // Wait for process to exit (with timeout)
287            match tokio::time::timeout(Duration::from_secs(5), child.wait()).await {
288                Ok(Ok(status)) => {
289                    tracing::info!("Node {} stopped: {:?}", self.config.id, status);
290                }
291                Ok(Err(e)) => {
292                    tracing::error!("Error waiting for node {}: {}", self.config.id, e);
293                }
294                Err(_) => {
295                    // Timeout - force kill
296                    tracing::warn!("Node {} did not stop gracefully, killing", self.config.id);
297                    let _ = child.kill().await;
298                }
299            }
300
301            self.state = NodeState::Stopped;
302        }
303
304        Ok(())
305    }
306
307    async fn check_health(&mut self) -> RuntimeResult<bool> {
308        if let Some(child) = &mut self.process {
309            // Check if process is still alive
310            match child.try_wait() {
311                Ok(Some(status)) => {
312                    // Process exited
313                    tracing::warn!("Node {} exited: {:?}", self.config.id, status);
314                    self.state = NodeState::Crashed;
315                    self.process = None;
316                    return Ok(false);
317                }
318                Ok(None) => {
319                    // Process still running
320                    self.last_health_check = std::time::Instant::now();
321                    return Ok(true);
322                }
323                Err(e) => {
324                    tracing::error!("Error checking node {}: {}", self.config.id, e);
325                    self.state = NodeState::Unhealthy;
326                    return Ok(false);
327                }
328            }
329        }
330
331        Ok(false)
332    }
333
334    fn should_restart(&self) -> bool {
335        self.config.auto_restart
336            && (self.state == NodeState::Crashed || self.state == NodeState::Unhealthy)
337            && self.restart_count < 3
338    }
339}
340
341/// Runtime orchestrator for managing nodes
342pub struct Runtime {
343    /// Robot/instance identifier
344    #[allow(dead_code)]
345    robot_id: String,
346
347    /// Managed nodes
348    nodes: Arc<RwLock<HashMap<String, ManagedNode>>>,
349
350    /// Running state
351    running: Arc<RwLock<bool>>,
352}
353
354impl Runtime {
355    /// Create a new runtime instance
356    pub async fn new(robot_id: &str) -> RuntimeResult<Self> {
357        Ok(Self {
358            robot_id: robot_id.to_string(),
359            nodes: Arc::new(RwLock::new(HashMap::new())),
360            running: Arc::new(RwLock::new(false)),
361        })
362    }
363
364    /// Start a node
365    pub async fn start_node(&mut self, config: NodeConfig) -> RuntimeResult<()> {
366        let node_id = config.id.clone();
367
368        let mut nodes = self.nodes.write().await;
369
370        if nodes.contains_key(&node_id) {
371            return Err(RuntimeError::Node(format!("Node {} already exists", node_id)));
372        }
373
374        let mut node = ManagedNode::new(config);
375        node.start().await?;
376
377        nodes.insert(node_id, node);
378
379        Ok(())
380    }
381
382    /// Stop a node
383    pub async fn stop_node(&mut self, node_id: &str) -> RuntimeResult<()> {
384        let mut nodes = self.nodes.write().await;
385
386        if let Some(node) = nodes.get_mut(node_id) {
387            node.stop().await?;
388            nodes.remove(node_id);
389            Ok(())
390        } else {
391            Err(RuntimeError::Node(format!("Node {} not found", node_id)))
392        }
393    }
394
395    /// Restart a node
396    pub async fn restart_node(&mut self, node_id: &str) -> RuntimeResult<()> {
397        let mut nodes = self.nodes.write().await;
398
399        if let Some(node) = nodes.get_mut(node_id) {
400            node.stop().await?;
401            node.start().await?;
402            node.restart_count += 1;
403            Ok(())
404        } else {
405            Err(RuntimeError::Node(format!("Node {} not found", node_id)))
406        }
407    }
408
409    /// Get node state
410    pub async fn node_state(&self, node_id: &str) -> Option<NodeState> {
411        let nodes = self.nodes.read().await;
412        nodes.get(node_id).map(|n| n.state.clone())
413    }
414
415    /// List all nodes
416    pub async fn list_nodes(&self) -> Vec<String> {
417        let nodes = self.nodes.read().await;
418        nodes.keys().cloned().collect()
419    }
420
421    /// Run the runtime (blocks until shutdown)
422    pub async fn run(&mut self) -> RuntimeResult<()> {
423        *self.running.write().await = true;
424
425        let nodes = Arc::clone(&self.nodes);
426        let running = Arc::clone(&self.running);
427
428        // Health check loop
429        let health_check_task = tokio::spawn(async move {
430            let mut interval = interval(Duration::from_secs(5));
431
432            while *running.read().await {
433                interval.tick().await;
434
435                let mut nodes = nodes.write().await;
436                let node_ids: Vec<String> = nodes.keys().cloned().collect();
437
438                for node_id in node_ids {
439                    if let Some(node) = nodes.get_mut(&node_id) {
440                        if node.state == NodeState::Running {
441                            match node.check_health().await {
442                                Ok(true) => {
443                                    // Node healthy
444                                }
445                                Ok(false) => {
446                                    // Node unhealthy or crashed
447                                    if node.should_restart() {
448                                        tracing::info!("Auto-restarting node: {}", node_id);
449                                        let _ = node.start().await;
450                                        node.restart_count += 1;
451                                    }
452                                }
453                                Err(e) => {
454                                    tracing::error!("Health check error for {}: {}", node_id, e);
455                                }
456                            }
457                        }
458                    }
459                }
460            }
461        });
462
463        // Wait for shutdown signal
464        tokio::signal::ctrl_c()
465            .await
466            .map_err(|e| RuntimeError::Process(format!("Error waiting for shutdown: {}", e)))?;
467
468        tracing::info!("Shutting down runtime...");
469        *self.running.write().await = false;
470
471        health_check_task.abort();
472
473        // Stop all nodes
474        let mut nodes = self.nodes.write().await;
475        for (node_id, node) in nodes.iter_mut() {
476            tracing::info!("Stopping node: {}", node_id);
477            let _ = node.stop().await;
478        }
479
480        Ok(())
481    }
482
483    /// Shutdown the runtime gracefully
484    pub async fn shutdown(&mut self) -> RuntimeResult<()> {
485        *self.running.write().await = false;
486
487        let mut nodes = self.nodes.write().await;
488        for (node_id, node) in nodes.iter_mut() {
489            tracing::info!("Stopping node: {}", node_id);
490            let _ = node.stop().await;
491        }
492
493        Ok(())
494    }
495}