mecha10_core/
node.rs

1// Mecha10 Node Trait and Lifecycle Management
2//
3// This module defines the core Node trait and lifecycle management for Mecha10 nodes.
4// Every Mecha10 node implements the NodeImpl trait to define its behavior.
5
6use crate::{Context, HealthStatus, NodeState, Result};
7use async_trait::async_trait;
8use serde::de::DeserializeOwned;
9use serde::{Deserialize, Serialize};
10use std::fmt::Debug;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::Mutex;
14use tracing::{debug, error, info, warn};
15
16// ============================================================================
17// Health Priority
18// ============================================================================
19
20/// Node health reporting priority
21///
22/// Determines heartbeat frequency and timeout thresholds based on
23/// node criticality. Safety-critical nodes report more frequently.
24///
25/// # Priority Levels
26///
27/// | Priority | Interval | TTL | Timeout | Use Case |
28/// |----------|----------|-----|---------|----------|
29/// | Critical | 1s | 5s | 3s | motor-controller, estop |
30/// | Normal | 5s | 60s | 15s | sensors, AI nodes |
31/// | Background | 30s | 120s | 90s | logging, diagnostics |
32///
33/// # Example
34///
35/// ```rust
36/// use mecha10::prelude::*;
37///
38/// // Create config with critical priority
39/// let config = HealthReportingConfig::critical();
40///
41/// // Or specify priority explicitly
42/// let config = HealthReportingConfig {
43///     priority: HealthPriority::Critical,
44///     ..Default::default()
45/// };
46/// ```
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
48#[serde(rename_all = "lowercase")]
49pub enum HealthPriority {
50    /// Safety-critical nodes that require immediate failure detection.
51    ///
52    /// Examples: motor-controller, estop, safety-monitor
53    ///
54    /// - Interval: 1 second
55    /// - TTL: 5 seconds
56    /// - Timeout: 3 seconds
57    Critical,
58
59    /// Standard operational nodes (default).
60    ///
61    /// Examples: camera, sensors, simulation-bridge
62    ///
63    /// - Interval: 5 seconds
64    /// - TTL: 60 seconds
65    /// - Timeout: 15 seconds
66    #[default]
67    Normal,
68
69    /// Background/auxiliary nodes where delayed detection is acceptable.
70    ///
71    /// Examples: logging, telemetry, diagnostics
72    ///
73    /// - Interval: 30 seconds
74    /// - TTL: 120 seconds
75    /// - Timeout: 90 seconds
76    Background,
77}
78
79impl HealthPriority {
80    /// Get the heartbeat interval for this priority level
81    pub const fn interval(&self) -> Duration {
82        match self {
83            Self::Critical => Duration::from_secs(1),
84            Self::Normal => Duration::from_secs(5),
85            Self::Background => Duration::from_secs(30),
86        }
87    }
88
89    /// Get the Redis TTL (time-to-live) in seconds for this priority level
90    ///
91    /// The TTL should be longer than the interval to allow for some jitter,
92    /// but short enough that stale data expires quickly.
93    pub const fn ttl_seconds(&self) -> u64 {
94        match self {
95            Self::Critical => 5,
96            Self::Normal => 60,
97            Self::Background => 120,
98        }
99    }
100
101    /// Get the timeout threshold for monitoring
102    ///
103    /// A node is considered unresponsive if no health report is received
104    /// within this duration.
105    pub const fn timeout(&self) -> Duration {
106        match self {
107            Self::Critical => Duration::from_secs(3),
108            Self::Normal => Duration::from_secs(15),
109            Self::Background => Duration::from_secs(90),
110        }
111    }
112
113    /// Get the timeout in microseconds (for comparison with timestamps)
114    pub const fn timeout_micros(&self) -> u64 {
115        match self {
116            Self::Critical => 3_000_000,
117            Self::Normal => 15_000_000,
118            Self::Background => 90_000_000,
119        }
120    }
121}
122
123impl std::fmt::Display for HealthPriority {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        match self {
126            Self::Critical => write!(f, "critical"),
127            Self::Normal => write!(f, "normal"),
128            Self::Background => write!(f, "background"),
129        }
130    }
131}
132
133// ============================================================================
134// Configuration
135// ============================================================================
136
137/// Configuration for node health reporting
138///
139/// # Example
140///
141/// ```rust
142/// use mecha10::prelude::*;
143///
144/// // Default config (Normal priority, 5s interval)
145/// let config = HealthReportingConfig::default();
146///
147/// // Critical priority (1s interval)
148/// let config = HealthReportingConfig::critical();
149///
150/// // Background priority (30s interval)
151/// let config = HealthReportingConfig::background();
152///
153/// // Custom config
154/// let config = HealthReportingConfig {
155///     enabled: true,
156///     priority: HealthPriority::Normal,
157///     store_history: false, // Disable history for high-frequency nodes
158/// };
159/// ```
160#[derive(Debug, Clone)]
161pub struct HealthReportingConfig {
162    /// Whether health reporting is enabled (default: true)
163    pub enabled: bool,
164
165    /// Priority level determines interval and timeout (default: Normal)
166    pub priority: HealthPriority,
167
168    /// Whether to store health history in Redis (default: true)
169    ///
170    /// Disable for high-frequency critical nodes to reduce Redis load.
171    /// History is stored as a sorted set with 24-hour retention.
172    pub store_history: bool,
173}
174
175impl Default for HealthReportingConfig {
176    fn default() -> Self {
177        Self {
178            enabled: true,
179            priority: HealthPriority::Normal,
180            store_history: true,
181        }
182    }
183}
184
185impl HealthReportingConfig {
186    /// Create config for critical priority nodes (1s interval)
187    pub fn critical() -> Self {
188        Self {
189            enabled: true,
190            priority: HealthPriority::Critical,
191            store_history: false, // Disable history for critical nodes to reduce load
192        }
193    }
194
195    /// Create config for normal priority nodes (5s interval)
196    pub fn normal() -> Self {
197        Self::default()
198    }
199
200    /// Create config for background priority nodes (30s interval)
201    pub fn background() -> Self {
202        Self {
203            enabled: true,
204            priority: HealthPriority::Background,
205            store_history: true,
206        }
207    }
208
209    /// Get the effective interval based on priority
210    pub fn interval(&self) -> Duration {
211        self.priority.interval()
212    }
213
214    /// Get the effective TTL based on priority
215    pub fn ttl_seconds(&self) -> u64 {
216        self.priority.ttl_seconds()
217    }
218}
219
220/// Trait for node configurations that specify an update rate
221///
222/// This trait provides a standard interface for nodes that publish at a fixed rate.
223/// Implement this trait on your node's config struct to enable convenient rate-based
224/// interval creation.
225///
226/// # Example
227///
228/// ```rust
229/// use mecha10::prelude::*;
230///
231/// #[derive(Debug, Clone, Serialize, Deserialize)]
232/// pub struct CameraConfig {
233///     pub fps: f32,
234///     pub width: u32,
235///     pub height: u32,
236/// }
237///
238/// impl RateConfig for CameraConfig {
239///     fn rate_hz(&self) -> f32 {
240///         self.fps
241///     }
242/// }
243///
244/// // Now you can use the config to create intervals easily:
245/// # async fn example(config: CameraConfig) {
246/// let mut interval = config.create_interval();
247/// loop {
248///     interval.tick().await;
249///     // Publish at configured rate
250/// }
251/// # }
252/// ```
253pub trait RateConfig {
254    /// Get the update rate in Hertz (cycles per second)
255    fn rate_hz(&self) -> f32;
256
257    /// Create a tokio interval from the configured rate
258    ///
259    /// This is a convenience method that converts the rate to a Duration
260    /// and creates a tokio::time::Interval.
261    fn create_interval(&self) -> tokio::time::Interval {
262        let duration = Duration::from_secs_f32(1.0 / self.rate_hz());
263        tokio::time::interval(duration)
264    }
265
266    /// Get the period (time between updates) as a Duration
267    fn period(&self) -> Duration {
268        Duration::from_secs_f32(1.0 / self.rate_hz())
269    }
270}
271
272// ============================================================================
273// Health Status Helper
274// ============================================================================
275
276/// Helper methods for creating HealthStatus instances
277pub trait HealthStatusExt {
278    /// Create a healthy status
279    fn healthy(message: impl Into<String>) -> Self;
280    /// Create an unhealthy status
281    fn unhealthy(message: impl Into<String>) -> Self;
282}
283
284impl HealthStatusExt for HealthStatus {
285    fn healthy(message: impl Into<String>) -> Self {
286        Self {
287            healthy: true,
288            last_check: crate::prelude::now_micros(),
289            failure_count: 0,
290            message: Some(message.into()),
291        }
292    }
293
294    fn unhealthy(message: impl Into<String>) -> Self {
295        Self {
296            healthy: false,
297            last_check: crate::prelude::now_micros(),
298            failure_count: 1,
299            message: Some(message.into()),
300        }
301    }
302}
303
304// ============================================================================
305// NodeImpl Trait
306// ============================================================================
307
308/// Core trait that all Mecha10 nodes must implement.
309///
310/// This trait defines the lifecycle of a node:
311/// 1. `init()` - Initialize the node with configuration
312/// 2. `run()` - Main execution loop
313/// 3. `shutdown()` - Graceful shutdown
314/// 4. `health_check()` - Report health status
315///
316/// # Example
317///
318/// ```rust
319/// use mecha10::prelude::*;
320///
321/// #[derive(Debug)]
322/// struct MyNode {
323///     config: MyConfig,
324/// }
325///
326/// #[derive(Debug, Deserialize)]
327/// struct MyConfig {
328///     rate: f32,
329/// }
330///
331/// #[async_trait]
332/// impl NodeImpl for MyNode {
333///     type Config = MyConfig;
334///
335///     async fn init(config: Self::Config) -> Result<Self> {
336///         info!("Initializing node with rate: {}", config.rate);
337///         Ok(Self { config })
338///     }
339///
340///     async fn run(&mut self, ctx: &Context) -> Result<()> {
341///         let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
342///
343///         while let Some(image) = images.recv().await {
344///             // Process image
345///             info!("Received image: {}x{}", image.width, image.height);
346///         }
347///
348///         Ok(())
349///     }
350/// }
351/// ```
352#[async_trait]
353pub trait NodeImpl: Send + Sync + Debug + 'static {
354    /// Configuration type for this node
355    type Config: DeserializeOwned + Send + Sync + Debug;
356
357    /// Initialize the node with the given configuration.
358    ///
359    /// This is called once when the node starts. Use this to:
360    /// - Load configuration
361    /// - Initialize resources (models, connections, etc.)
362    /// - Validate prerequisites
363    ///
364    /// # Errors
365    ///
366    /// Return an error if initialization fails. The node will not start.
367    async fn init(config: Self::Config) -> Result<Self>
368    where
369        Self: Sized;
370
371    /// Main execution loop for the node.
372    ///
373    /// This is where your node's business logic lives. Typical patterns:
374    /// - Subscribe to topics and process messages
375    /// - Run periodic tasks
376    /// - Publish results
377    ///
378    /// This method should run indefinitely until:
379    /// - The context is cancelled (shutdown requested)
380    /// - An unrecoverable error occurs
381    ///
382    /// # Errors
383    ///
384    /// Return an error if the node encounters an unrecoverable error.
385    /// The node will transition to Error state and shutdown.
386    async fn run(&mut self, ctx: &Context) -> Result<()>;
387
388    /// Graceful shutdown hook.
389    ///
390    /// Called when the node is shutting down. Use this to:
391    /// - Close connections
392    /// - Flush buffers
393    /// - Save state
394    /// - Release resources
395    ///
396    /// The default implementation does nothing.
397    ///
398    /// # Errors
399    ///
400    /// Errors during shutdown are logged but don't prevent shutdown.
401    async fn shutdown(&mut self) -> Result<()> {
402        Ok(())
403    }
404
405    /// Health check for monitoring.
406    ///
407    /// Called periodically to check node health. Use this to:
408    /// - Report current state
409    /// - Check resource usage
410    /// - Validate connections
411    ///
412    /// The default implementation returns healthy status.
413    async fn health_check(&self) -> HealthStatus {
414        HealthStatus {
415            healthy: true,
416            last_check: crate::prelude::now_micros(),
417            failure_count: 0,
418            message: Some("Node is running".to_string()),
419        }
420    }
421}
422
423// ============================================================================
424// Node Runner
425// ============================================================================
426
427/// Wrapper that manages the lifecycle of a NodeImpl.
428///
429/// This provides:
430/// - State management
431/// - Lifecycle orchestration
432/// - Error handling
433/// - Health monitoring
434#[derive(Clone)]
435pub struct Node<T: NodeImpl> {
436    node: Arc<Mutex<T>>,
437    state: Arc<Mutex<NodeState>>,
438}
439
440impl<T: NodeImpl> Node<T> {
441    /// Create a new node from a configuration.
442    ///
443    /// This calls `NodeImpl::init()` and wraps the result in a Node.
444    ///
445    /// # Errors
446    ///
447    /// Returns an error if initialization fails.
448    pub async fn from_config(config: T::Config) -> Result<Self> {
449        info!("Initializing node");
450
451        let node = T::init(config).await?;
452
453        Ok(Self {
454            node: Arc::new(Mutex::new(node)),
455            state: Arc::new(Mutex::new(NodeState::Starting)),
456        })
457    }
458
459    /// Start the node with the given context.
460    ///
461    /// This orchestrates the full lifecycle:
462    /// 1. Transition to Running state
463    /// 2. Call `run()`
464    /// 3. Call `shutdown()` on completion or error
465    /// 4. Transition to final state
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if the node fails during execution.
470    pub async fn start(&self, ctx: Context) -> Result<()> {
471        // Transition to running
472        {
473            let mut state = self.state.lock().await;
474            *state = NodeState::Running;
475            info!("Node transitioned to Running state");
476        }
477
478        // Run the node
479        let result = {
480            let mut node = self.node.lock().await;
481            debug!("Starting node execution");
482            node.run(&ctx).await
483        };
484
485        // Handle result
486        match result {
487            Ok(()) => {
488                info!("Node completed successfully");
489                self.shutdown().await?;
490            }
491            Err(e) => {
492                error!("Node failed with error: {}", e);
493                {
494                    let mut state = self.state.lock().await;
495                    *state = NodeState::Crashed;
496                }
497                self.shutdown().await?;
498                return Err(e);
499            }
500        }
501
502        Ok(())
503    }
504
505    /// Gracefully shutdown the node.
506    ///
507    /// This calls `shutdown()` and transitions to Stopped state.
508    pub async fn shutdown(&self) -> Result<()> {
509        info!("Shutting down node");
510
511        // Transition to shutting down
512        {
513            let mut state = self.state.lock().await;
514            *state = NodeState::Stopping;
515        }
516
517        // Call shutdown hook
518        let result = {
519            let mut node = self.node.lock().await;
520            node.shutdown().await
521        };
522
523        // Transition to stopped
524        {
525            let mut state = self.state.lock().await;
526            *state = NodeState::Stopped;
527            info!("Node stopped");
528        }
529
530        if let Err(e) = result {
531            warn!("Error during shutdown: {}", e);
532            return Err(e);
533        }
534
535        Ok(())
536    }
537
538    /// Get the current state of the node.
539    pub async fn state(&self) -> NodeState {
540        self.state.lock().await.clone()
541    }
542
543    /// Perform a health check on the node.
544    pub async fn health_check(&self) -> HealthStatus {
545        let node = self.node.lock().await;
546        node.health_check().await
547    }
548
549    /// Check if the node is healthy.
550    pub async fn is_healthy(&self) -> bool {
551        matches!(self.state().await, NodeState::Running | NodeState::Starting)
552    }
553}
554
555// ============================================================================
556// Helper Functions
557// ============================================================================
558
559/// Run a node with the given configuration and context.
560///
561/// This is a convenience function that:
562/// 1. Initializes the node from config
563/// 2. Starts the node with the context
564/// 3. Handles errors
565///
566/// # Arguments
567///
568/// * `config` - Node-specific configuration
569/// * `ctx` - Execution context
570/// * `health_config` - Health reporting configuration
571///
572/// # Example
573///
574/// ```rust
575/// use mecha10::prelude::*;
576///
577/// #[tokio::main]
578/// async fn main() -> Result<()> {
579///     let config = MyConfig { rate: 10.0 };
580///     let ctx = Context::new("my_node");
581///
582///     // Use default health reporting (Normal priority, 5s interval)
583///     run_node::<MyNode>(config, ctx, HealthReportingConfig::default()).await?;
584///
585///     // Critical priority for safety-critical nodes (1s interval)
586///     run_node::<MyNode>(config, ctx, HealthReportingConfig::critical()).await?;
587///
588///     // Background priority for auxiliary nodes (30s interval)
589///     run_node::<MyNode>(config, ctx, HealthReportingConfig::background()).await?;
590///
591///     Ok(())
592/// }
593/// ```
594pub async fn run_node<T: NodeImpl>(
595    config: T::Config,
596    ctx: Context,
597    health_config: HealthReportingConfig,
598) -> Result<()> {
599    // Initialize logging (safe to call multiple times)
600    crate::prelude::init_logging();
601
602    let node = Node::<T>::from_config(config).await?;
603
604    // Start automatic health reporting
605    if health_config.enabled {
606        // Create clones for the health reporting task
607        let node_for_health = node.node.clone();
608        let ctx_for_health = ctx.clone();
609        let priority = health_config.priority;
610        let report_interval = health_config.interval();
611        let ttl_seconds = health_config.ttl_seconds();
612        let store_history = health_config.store_history;
613
614        tokio::spawn(async move {
615            use crate::health::HealthReporter;
616
617            // Try to create health reporter with priority config
618            let reporter =
619                match HealthReporter::with_config(&ctx_for_health, priority, ttl_seconds, store_history).await {
620                    Ok(r) => r,
621                    Err(e) => {
622                        warn!("Health reporting disabled: {}", e);
623                        return;
624                    }
625                };
626
627            info!(
628                "Health reporting enabled: priority={}, interval={:?}, ttl={}s, history={}",
629                priority, report_interval, ttl_seconds, store_history
630            );
631
632            let mut interval = tokio::time::interval(report_interval);
633
634            loop {
635                interval.tick().await;
636
637                // Get health status from node
638                let status = {
639                    let node = node_for_health.lock().await;
640                    node.health_check().await
641                };
642
643                // Convert types::HealthStatus to messages::system::HealthStatus
644                let msg_status = crate::messages::system::HealthStatus {
645                    timestamp: status.last_check,
646                    level: if status.healthy {
647                        crate::messages::system::HealthLevel::Ok
648                    } else {
649                        crate::messages::system::HealthLevel::Error
650                    },
651                    message: status.message.unwrap_or_else(|| "No message".to_string()),
652                    diagnostics: None,
653                };
654
655                // Report to centralized monitoring
656                if let Err(e) = reporter.report(msg_status).await {
657                    debug!("Failed to report health: {}", e);
658                }
659            }
660        });
661    } else {
662        info!("Health reporting disabled");
663    }
664
665    node.start(ctx).await
666}