mecha10_core/node.rs
1// Mecha10 Node Trait and Lifecycle Management
2//
3// This module defines the core Node trait and lifecycle management for Mecha10 nodes.
4// Every Mecha10 node implements the NodeImpl trait to define its behavior.
5
6use crate::{Context, HealthStatus, NodeState, Result};
7use async_trait::async_trait;
8use serde::de::DeserializeOwned;
9use serde::{Deserialize, Serialize};
10use std::fmt::Debug;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::Mutex;
14use tracing::{debug, error, info, warn};
15
16// ============================================================================
17// Health Priority
18// ============================================================================
19
20/// Node health reporting priority
21///
22/// Determines heartbeat frequency and timeout thresholds based on
23/// node criticality. Safety-critical nodes report more frequently.
24///
25/// # Priority Levels
26///
27/// | Priority | Interval | TTL | Timeout | Use Case |
28/// |----------|----------|-----|---------|----------|
29/// | Critical | 1s | 5s | 3s | motor-controller, estop |
30/// | Normal | 5s | 60s | 15s | sensors, AI nodes |
31/// | Background | 30s | 120s | 90s | logging, diagnostics |
32///
33/// # Example
34///
35/// ```rust
36/// use mecha10::prelude::*;
37///
38/// // Create config with critical priority
39/// let config = HealthReportingConfig::critical();
40///
41/// // Or specify priority explicitly
42/// let config = HealthReportingConfig {
43/// priority: HealthPriority::Critical,
44/// ..Default::default()
45/// };
46/// ```
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
48#[serde(rename_all = "lowercase")]
49pub enum HealthPriority {
50 /// Safety-critical nodes that require immediate failure detection.
51 ///
52 /// Examples: motor-controller, estop, safety-monitor
53 ///
54 /// - Interval: 1 second
55 /// - TTL: 5 seconds
56 /// - Timeout: 3 seconds
57 Critical,
58
59 /// Standard operational nodes (default).
60 ///
61 /// Examples: camera, sensors, simulation-bridge
62 ///
63 /// - Interval: 5 seconds
64 /// - TTL: 60 seconds
65 /// - Timeout: 15 seconds
66 #[default]
67 Normal,
68
69 /// Background/auxiliary nodes where delayed detection is acceptable.
70 ///
71 /// Examples: logging, telemetry, diagnostics
72 ///
73 /// - Interval: 30 seconds
74 /// - TTL: 120 seconds
75 /// - Timeout: 90 seconds
76 Background,
77}
78
79impl HealthPriority {
80 /// Get the heartbeat interval for this priority level
81 pub const fn interval(&self) -> Duration {
82 match self {
83 Self::Critical => Duration::from_secs(1),
84 Self::Normal => Duration::from_secs(5),
85 Self::Background => Duration::from_secs(30),
86 }
87 }
88
89 /// Get the Redis TTL (time-to-live) in seconds for this priority level
90 ///
91 /// The TTL should be longer than the interval to allow for some jitter,
92 /// but short enough that stale data expires quickly.
93 pub const fn ttl_seconds(&self) -> u64 {
94 match self {
95 Self::Critical => 5,
96 Self::Normal => 60,
97 Self::Background => 120,
98 }
99 }
100
101 /// Get the timeout threshold for monitoring
102 ///
103 /// A node is considered unresponsive if no health report is received
104 /// within this duration.
105 pub const fn timeout(&self) -> Duration {
106 match self {
107 Self::Critical => Duration::from_secs(3),
108 Self::Normal => Duration::from_secs(15),
109 Self::Background => Duration::from_secs(90),
110 }
111 }
112
113 /// Get the timeout in microseconds (for comparison with timestamps)
114 pub const fn timeout_micros(&self) -> u64 {
115 match self {
116 Self::Critical => 3_000_000,
117 Self::Normal => 15_000_000,
118 Self::Background => 90_000_000,
119 }
120 }
121}
122
123impl std::fmt::Display for HealthPriority {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 match self {
126 Self::Critical => write!(f, "critical"),
127 Self::Normal => write!(f, "normal"),
128 Self::Background => write!(f, "background"),
129 }
130 }
131}
132
133// ============================================================================
134// Configuration
135// ============================================================================
136
137/// Configuration for node health reporting
138///
139/// # Example
140///
141/// ```rust
142/// use mecha10::prelude::*;
143///
144/// // Default config (Normal priority, 5s interval)
145/// let config = HealthReportingConfig::default();
146///
147/// // Critical priority (1s interval)
148/// let config = HealthReportingConfig::critical();
149///
150/// // Background priority (30s interval)
151/// let config = HealthReportingConfig::background();
152///
153/// // Custom config
154/// let config = HealthReportingConfig {
155/// enabled: true,
156/// priority: HealthPriority::Normal,
157/// store_history: false, // Disable history for high-frequency nodes
158/// };
159/// ```
160#[derive(Debug, Clone)]
161pub struct HealthReportingConfig {
162 /// Whether health reporting is enabled (default: true)
163 pub enabled: bool,
164
165 /// Priority level determines interval and timeout (default: Normal)
166 pub priority: HealthPriority,
167
168 /// Whether to store health history in Redis (default: true)
169 ///
170 /// Disable for high-frequency critical nodes to reduce Redis load.
171 /// History is stored as a sorted set with 24-hour retention.
172 pub store_history: bool,
173}
174
175impl Default for HealthReportingConfig {
176 fn default() -> Self {
177 Self {
178 enabled: true,
179 priority: HealthPriority::Normal,
180 store_history: true,
181 }
182 }
183}
184
185impl HealthReportingConfig {
186 /// Create config for critical priority nodes (1s interval)
187 pub fn critical() -> Self {
188 Self {
189 enabled: true,
190 priority: HealthPriority::Critical,
191 store_history: false, // Disable history for critical nodes to reduce load
192 }
193 }
194
195 /// Create config for normal priority nodes (5s interval)
196 pub fn normal() -> Self {
197 Self::default()
198 }
199
200 /// Create config for background priority nodes (30s interval)
201 pub fn background() -> Self {
202 Self {
203 enabled: true,
204 priority: HealthPriority::Background,
205 store_history: true,
206 }
207 }
208
209 /// Get the effective interval based on priority
210 pub fn interval(&self) -> Duration {
211 self.priority.interval()
212 }
213
214 /// Get the effective TTL based on priority
215 pub fn ttl_seconds(&self) -> u64 {
216 self.priority.ttl_seconds()
217 }
218}
219
220/// Trait for node configurations that specify an update rate
221///
222/// This trait provides a standard interface for nodes that publish at a fixed rate.
223/// Implement this trait on your node's config struct to enable convenient rate-based
224/// interval creation.
225///
226/// # Example
227///
228/// ```rust
229/// use mecha10::prelude::*;
230///
231/// #[derive(Debug, Clone, Serialize, Deserialize)]
232/// pub struct CameraConfig {
233/// pub fps: f32,
234/// pub width: u32,
235/// pub height: u32,
236/// }
237///
238/// impl RateConfig for CameraConfig {
239/// fn rate_hz(&self) -> f32 {
240/// self.fps
241/// }
242/// }
243///
244/// // Now you can use the config to create intervals easily:
245/// # async fn example(config: CameraConfig) {
246/// let mut interval = config.create_interval();
247/// loop {
248/// interval.tick().await;
249/// // Publish at configured rate
250/// }
251/// # }
252/// ```
253pub trait RateConfig {
254 /// Get the update rate in Hertz (cycles per second)
255 fn rate_hz(&self) -> f32;
256
257 /// Create a tokio interval from the configured rate
258 ///
259 /// This is a convenience method that converts the rate to a Duration
260 /// and creates a tokio::time::Interval.
261 fn create_interval(&self) -> tokio::time::Interval {
262 let duration = Duration::from_secs_f32(1.0 / self.rate_hz());
263 tokio::time::interval(duration)
264 }
265
266 /// Get the period (time between updates) as a Duration
267 fn period(&self) -> Duration {
268 Duration::from_secs_f32(1.0 / self.rate_hz())
269 }
270}
271
272// ============================================================================
273// Health Status Helper
274// ============================================================================
275
276/// Helper methods for creating HealthStatus instances
277pub trait HealthStatusExt {
278 /// Create a healthy status
279 fn healthy(message: impl Into<String>) -> Self;
280 /// Create an unhealthy status
281 fn unhealthy(message: impl Into<String>) -> Self;
282}
283
284impl HealthStatusExt for HealthStatus {
285 fn healthy(message: impl Into<String>) -> Self {
286 Self {
287 healthy: true,
288 last_check: crate::prelude::now_micros(),
289 failure_count: 0,
290 message: Some(message.into()),
291 }
292 }
293
294 fn unhealthy(message: impl Into<String>) -> Self {
295 Self {
296 healthy: false,
297 last_check: crate::prelude::now_micros(),
298 failure_count: 1,
299 message: Some(message.into()),
300 }
301 }
302}
303
304// ============================================================================
305// NodeImpl Trait
306// ============================================================================
307
308/// Core trait that all Mecha10 nodes must implement.
309///
310/// This trait defines the lifecycle of a node:
311/// 1. `init()` - Initialize the node with configuration
312/// 2. `run()` - Main execution loop
313/// 3. `shutdown()` - Graceful shutdown
314/// 4. `health_check()` - Report health status
315///
316/// # Example
317///
318/// ```rust
319/// use mecha10::prelude::*;
320///
321/// #[derive(Debug)]
322/// struct MyNode {
323/// config: MyConfig,
324/// }
325///
326/// #[derive(Debug, Deserialize)]
327/// struct MyConfig {
328/// rate: f32,
329/// }
330///
331/// #[async_trait]
332/// impl NodeImpl for MyNode {
333/// type Config = MyConfig;
334///
335/// async fn init(config: Self::Config) -> Result<Self> {
336/// info!("Initializing node with rate: {}", config.rate);
337/// Ok(Self { config })
338/// }
339///
340/// async fn run(&mut self, ctx: &Context) -> Result<()> {
341/// let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
342///
343/// while let Some(image) = images.recv().await {
344/// // Process image
345/// info!("Received image: {}x{}", image.width, image.height);
346/// }
347///
348/// Ok(())
349/// }
350/// }
351/// ```
352#[async_trait]
353pub trait NodeImpl: Send + Sync + Debug + 'static {
354 /// Configuration type for this node
355 type Config: DeserializeOwned + Send + Sync + Debug;
356
357 /// Initialize the node with the given configuration.
358 ///
359 /// This is called once when the node starts. Use this to:
360 /// - Load configuration
361 /// - Initialize resources (models, connections, etc.)
362 /// - Validate prerequisites
363 ///
364 /// # Errors
365 ///
366 /// Return an error if initialization fails. The node will not start.
367 async fn init(config: Self::Config) -> Result<Self>
368 where
369 Self: Sized;
370
371 /// Main execution loop for the node.
372 ///
373 /// This is where your node's business logic lives. Typical patterns:
374 /// - Subscribe to topics and process messages
375 /// - Run periodic tasks
376 /// - Publish results
377 ///
378 /// This method should run indefinitely until:
379 /// - The context is cancelled (shutdown requested)
380 /// - An unrecoverable error occurs
381 ///
382 /// # Errors
383 ///
384 /// Return an error if the node encounters an unrecoverable error.
385 /// The node will transition to Error state and shutdown.
386 async fn run(&mut self, ctx: &Context) -> Result<()>;
387
388 /// Graceful shutdown hook.
389 ///
390 /// Called when the node is shutting down. Use this to:
391 /// - Close connections
392 /// - Flush buffers
393 /// - Save state
394 /// - Release resources
395 ///
396 /// The default implementation does nothing.
397 ///
398 /// # Errors
399 ///
400 /// Errors during shutdown are logged but don't prevent shutdown.
401 async fn shutdown(&mut self) -> Result<()> {
402 Ok(())
403 }
404
405 /// Health check for monitoring.
406 ///
407 /// Called periodically to check node health. Use this to:
408 /// - Report current state
409 /// - Check resource usage
410 /// - Validate connections
411 ///
412 /// The default implementation returns healthy status.
413 async fn health_check(&self) -> HealthStatus {
414 HealthStatus {
415 healthy: true,
416 last_check: crate::prelude::now_micros(),
417 failure_count: 0,
418 message: Some("Node is running".to_string()),
419 }
420 }
421}
422
423// ============================================================================
424// Node Runner
425// ============================================================================
426
427/// Wrapper that manages the lifecycle of a NodeImpl.
428///
429/// This provides:
430/// - State management
431/// - Lifecycle orchestration
432/// - Error handling
433/// - Health monitoring
434#[derive(Clone)]
435pub struct Node<T: NodeImpl> {
436 node: Arc<Mutex<T>>,
437 state: Arc<Mutex<NodeState>>,
438}
439
440impl<T: NodeImpl> Node<T> {
441 /// Create a new node from a configuration.
442 ///
443 /// This calls `NodeImpl::init()` and wraps the result in a Node.
444 ///
445 /// # Errors
446 ///
447 /// Returns an error if initialization fails.
448 pub async fn from_config(config: T::Config) -> Result<Self> {
449 info!("Initializing node");
450
451 let node = T::init(config).await?;
452
453 Ok(Self {
454 node: Arc::new(Mutex::new(node)),
455 state: Arc::new(Mutex::new(NodeState::Starting)),
456 })
457 }
458
459 /// Start the node with the given context.
460 ///
461 /// This orchestrates the full lifecycle:
462 /// 1. Transition to Running state
463 /// 2. Call `run()`
464 /// 3. Call `shutdown()` on completion or error
465 /// 4. Transition to final state
466 ///
467 /// # Errors
468 ///
469 /// Returns an error if the node fails during execution.
470 pub async fn start(&self, ctx: Context) -> Result<()> {
471 // Transition to running
472 {
473 let mut state = self.state.lock().await;
474 *state = NodeState::Running;
475 info!("Node transitioned to Running state");
476 }
477
478 // Run the node
479 let result = {
480 let mut node = self.node.lock().await;
481 debug!("Starting node execution");
482 node.run(&ctx).await
483 };
484
485 // Handle result
486 match result {
487 Ok(()) => {
488 info!("Node completed successfully");
489 self.shutdown().await?;
490 }
491 Err(e) => {
492 error!("Node failed with error: {}", e);
493 {
494 let mut state = self.state.lock().await;
495 *state = NodeState::Crashed;
496 }
497 self.shutdown().await?;
498 return Err(e);
499 }
500 }
501
502 Ok(())
503 }
504
505 /// Gracefully shutdown the node.
506 ///
507 /// This calls `shutdown()` and transitions to Stopped state.
508 pub async fn shutdown(&self) -> Result<()> {
509 info!("Shutting down node");
510
511 // Transition to shutting down
512 {
513 let mut state = self.state.lock().await;
514 *state = NodeState::Stopping;
515 }
516
517 // Call shutdown hook
518 let result = {
519 let mut node = self.node.lock().await;
520 node.shutdown().await
521 };
522
523 // Transition to stopped
524 {
525 let mut state = self.state.lock().await;
526 *state = NodeState::Stopped;
527 info!("Node stopped");
528 }
529
530 if let Err(e) = result {
531 warn!("Error during shutdown: {}", e);
532 return Err(e);
533 }
534
535 Ok(())
536 }
537
538 /// Get the current state of the node.
539 pub async fn state(&self) -> NodeState {
540 self.state.lock().await.clone()
541 }
542
543 /// Perform a health check on the node.
544 pub async fn health_check(&self) -> HealthStatus {
545 let node = self.node.lock().await;
546 node.health_check().await
547 }
548
549 /// Check if the node is healthy.
550 pub async fn is_healthy(&self) -> bool {
551 matches!(self.state().await, NodeState::Running | NodeState::Starting)
552 }
553}
554
555// ============================================================================
556// Helper Functions
557// ============================================================================
558
559/// Run a node with the given configuration and context.
560///
561/// This is a convenience function that:
562/// 1. Initializes the node from config
563/// 2. Starts the node with the context
564/// 3. Handles errors
565///
566/// # Arguments
567///
568/// * `config` - Node-specific configuration
569/// * `ctx` - Execution context
570/// * `health_config` - Health reporting configuration
571///
572/// # Example
573///
574/// ```rust
575/// use mecha10::prelude::*;
576///
577/// #[tokio::main]
578/// async fn main() -> Result<()> {
579/// let config = MyConfig { rate: 10.0 };
580/// let ctx = Context::new("my_node");
581///
582/// // Use default health reporting (Normal priority, 5s interval)
583/// run_node::<MyNode>(config, ctx, HealthReportingConfig::default()).await?;
584///
585/// // Critical priority for safety-critical nodes (1s interval)
586/// run_node::<MyNode>(config, ctx, HealthReportingConfig::critical()).await?;
587///
588/// // Background priority for auxiliary nodes (30s interval)
589/// run_node::<MyNode>(config, ctx, HealthReportingConfig::background()).await?;
590///
591/// Ok(())
592/// }
593/// ```
594pub async fn run_node<T: NodeImpl>(
595 config: T::Config,
596 ctx: Context,
597 health_config: HealthReportingConfig,
598) -> Result<()> {
599 // Initialize logging (safe to call multiple times)
600 crate::prelude::init_logging();
601
602 let node = Node::<T>::from_config(config).await?;
603
604 // Start automatic health reporting
605 if health_config.enabled {
606 // Create clones for the health reporting task
607 let node_for_health = node.node.clone();
608 let ctx_for_health = ctx.clone();
609 let priority = health_config.priority;
610 let report_interval = health_config.interval();
611 let ttl_seconds = health_config.ttl_seconds();
612 let store_history = health_config.store_history;
613
614 tokio::spawn(async move {
615 use crate::health::HealthReporter;
616
617 // Try to create health reporter with priority config
618 let reporter =
619 match HealthReporter::with_config(&ctx_for_health, priority, ttl_seconds, store_history).await {
620 Ok(r) => r,
621 Err(e) => {
622 warn!("Health reporting disabled: {}", e);
623 return;
624 }
625 };
626
627 info!(
628 "Health reporting enabled: priority={}, interval={:?}, ttl={}s, history={}",
629 priority, report_interval, ttl_seconds, store_history
630 );
631
632 let mut interval = tokio::time::interval(report_interval);
633
634 loop {
635 interval.tick().await;
636
637 // Get health status from node
638 let status = {
639 let node = node_for_health.lock().await;
640 node.health_check().await
641 };
642
643 // Convert types::HealthStatus to messages::system::HealthStatus
644 let msg_status = crate::messages::system::HealthStatus {
645 timestamp: status.last_check,
646 level: if status.healthy {
647 crate::messages::system::HealthLevel::Ok
648 } else {
649 crate::messages::system::HealthLevel::Error
650 },
651 message: status.message.unwrap_or_else(|| "No message".to_string()),
652 diagnostics: None,
653 };
654
655 // Report to centralized monitoring
656 if let Err(e) = reporter.report(msg_status).await {
657 debug!("Failed to report health: {}", e);
658 }
659 }
660 });
661 } else {
662 info!("Health reporting disabled");
663 }
664
665 node.start(ctx).await
666}