mecha10_core/
node.rs

1// Mecha10 Node Trait and Lifecycle Management
2//
3// This module defines the core Node trait and lifecycle management for Mecha10 nodes.
4// Every Mecha10 node implements the NodeImpl trait to define its behavior.
5
6use crate::{Context, HealthStatus, NodeState, Result};
7use async_trait::async_trait;
8use serde::de::DeserializeOwned;
9use std::fmt::Debug;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::Mutex;
13use tracing::{debug, error, info, warn};
14
15// ============================================================================
16// Configuration
17// ============================================================================
18
19/// Configuration for node health reporting
20#[derive(Debug, Clone)]
21pub struct HealthReportingConfig {
22    /// Interval between health reports (default: 5 seconds)
23    pub interval: Duration,
24    /// Whether health reporting is enabled (default: true)
25    pub enabled: bool,
26}
27
28impl Default for HealthReportingConfig {
29    fn default() -> Self {
30        Self {
31            interval: Duration::from_secs(5),
32            enabled: true,
33        }
34    }
35}
36
37/// Trait for node configurations that specify an update rate
38///
39/// This trait provides a standard interface for nodes that publish at a fixed rate.
40/// Implement this trait on your node's config struct to enable convenient rate-based
41/// interval creation.
42///
43/// # Example
44///
45/// ```rust
46/// use mecha10::prelude::*;
47///
48/// #[derive(Debug, Clone, Serialize, Deserialize)]
49/// pub struct CameraConfig {
50///     pub fps: f32,
51///     pub width: u32,
52///     pub height: u32,
53/// }
54///
55/// impl RateConfig for CameraConfig {
56///     fn rate_hz(&self) -> f32 {
57///         self.fps
58///     }
59/// }
60///
61/// // Now you can use the config to create intervals easily:
62/// # async fn example(config: CameraConfig) {
63/// let mut interval = config.create_interval();
64/// loop {
65///     interval.tick().await;
66///     // Publish at configured rate
67/// }
68/// # }
69/// ```
70pub trait RateConfig {
71    /// Get the update rate in Hertz (cycles per second)
72    fn rate_hz(&self) -> f32;
73
74    /// Create a tokio interval from the configured rate
75    ///
76    /// This is a convenience method that converts the rate to a Duration
77    /// and creates a tokio::time::Interval.
78    fn create_interval(&self) -> tokio::time::Interval {
79        let duration = Duration::from_secs_f32(1.0 / self.rate_hz());
80        tokio::time::interval(duration)
81    }
82
83    /// Get the period (time between updates) as a Duration
84    fn period(&self) -> Duration {
85        Duration::from_secs_f32(1.0 / self.rate_hz())
86    }
87}
88
89// ============================================================================
90// Health Status Helper
91// ============================================================================
92
93/// Helper methods for creating HealthStatus instances
94pub trait HealthStatusExt {
95    /// Create a healthy status
96    fn healthy(message: impl Into<String>) -> Self;
97    /// Create an unhealthy status
98    fn unhealthy(message: impl Into<String>) -> Self;
99}
100
101impl HealthStatusExt for HealthStatus {
102    fn healthy(message: impl Into<String>) -> Self {
103        Self {
104            healthy: true,
105            last_check: crate::prelude::now_micros(),
106            failure_count: 0,
107            message: Some(message.into()),
108        }
109    }
110
111    fn unhealthy(message: impl Into<String>) -> Self {
112        Self {
113            healthy: false,
114            last_check: crate::prelude::now_micros(),
115            failure_count: 1,
116            message: Some(message.into()),
117        }
118    }
119}
120
121// ============================================================================
122// NodeImpl Trait
123// ============================================================================
124
125/// Core trait that all Mecha10 nodes must implement.
126///
127/// This trait defines the lifecycle of a node:
128/// 1. `init()` - Initialize the node with configuration
129/// 2. `run()` - Main execution loop
130/// 3. `shutdown()` - Graceful shutdown
131/// 4. `health_check()` - Report health status
132///
133/// # Example
134///
135/// ```rust
136/// use mecha10::prelude::*;
137///
138/// #[derive(Debug)]
139/// struct MyNode {
140///     config: MyConfig,
141/// }
142///
143/// #[derive(Debug, Deserialize)]
144/// struct MyConfig {
145///     rate: f32,
146/// }
147///
148/// #[async_trait]
149/// impl NodeImpl for MyNode {
150///     type Config = MyConfig;
151///
152///     async fn init(config: Self::Config) -> Result<Self> {
153///         info!("Initializing node with rate: {}", config.rate);
154///         Ok(Self { config })
155///     }
156///
157///     async fn run(&mut self, ctx: &Context) -> Result<()> {
158///         let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
159///
160///         while let Some(image) = images.recv().await {
161///             // Process image
162///             info!("Received image: {}x{}", image.width, image.height);
163///         }
164///
165///         Ok(())
166///     }
167/// }
168/// ```
169#[async_trait]
170pub trait NodeImpl: Send + Sync + Debug + 'static {
171    /// Configuration type for this node
172    type Config: DeserializeOwned + Send + Sync + Debug;
173
174    /// Initialize the node with the given configuration.
175    ///
176    /// This is called once when the node starts. Use this to:
177    /// - Load configuration
178    /// - Initialize resources (models, connections, etc.)
179    /// - Validate prerequisites
180    ///
181    /// # Errors
182    ///
183    /// Return an error if initialization fails. The node will not start.
184    async fn init(config: Self::Config) -> Result<Self>
185    where
186        Self: Sized;
187
188    /// Main execution loop for the node.
189    ///
190    /// This is where your node's business logic lives. Typical patterns:
191    /// - Subscribe to topics and process messages
192    /// - Run periodic tasks
193    /// - Publish results
194    ///
195    /// This method should run indefinitely until:
196    /// - The context is cancelled (shutdown requested)
197    /// - An unrecoverable error occurs
198    ///
199    /// # Errors
200    ///
201    /// Return an error if the node encounters an unrecoverable error.
202    /// The node will transition to Error state and shutdown.
203    async fn run(&mut self, ctx: &Context) -> Result<()>;
204
205    /// Graceful shutdown hook.
206    ///
207    /// Called when the node is shutting down. Use this to:
208    /// - Close connections
209    /// - Flush buffers
210    /// - Save state
211    /// - Release resources
212    ///
213    /// The default implementation does nothing.
214    ///
215    /// # Errors
216    ///
217    /// Errors during shutdown are logged but don't prevent shutdown.
218    async fn shutdown(&mut self) -> Result<()> {
219        Ok(())
220    }
221
222    /// Health check for monitoring.
223    ///
224    /// Called periodically to check node health. Use this to:
225    /// - Report current state
226    /// - Check resource usage
227    /// - Validate connections
228    ///
229    /// The default implementation returns healthy status.
230    async fn health_check(&self) -> HealthStatus {
231        HealthStatus {
232            healthy: true,
233            last_check: crate::prelude::now_micros(),
234            failure_count: 0,
235            message: Some("Node is running".to_string()),
236        }
237    }
238}
239
240// ============================================================================
241// Node Runner
242// ============================================================================
243
244/// Wrapper that manages the lifecycle of a NodeImpl.
245///
246/// This provides:
247/// - State management
248/// - Lifecycle orchestration
249/// - Error handling
250/// - Health monitoring
251#[derive(Clone)]
252pub struct Node<T: NodeImpl> {
253    node: Arc<Mutex<T>>,
254    state: Arc<Mutex<NodeState>>,
255}
256
257impl<T: NodeImpl> Node<T> {
258    /// Create a new node from a configuration.
259    ///
260    /// This calls `NodeImpl::init()` and wraps the result in a Node.
261    ///
262    /// # Errors
263    ///
264    /// Returns an error if initialization fails.
265    pub async fn from_config(config: T::Config) -> Result<Self> {
266        info!("Initializing node");
267
268        let node = T::init(config).await?;
269
270        Ok(Self {
271            node: Arc::new(Mutex::new(node)),
272            state: Arc::new(Mutex::new(NodeState::Starting)),
273        })
274    }
275
276    /// Start the node with the given context.
277    ///
278    /// This orchestrates the full lifecycle:
279    /// 1. Transition to Running state
280    /// 2. Call `run()`
281    /// 3. Call `shutdown()` on completion or error
282    /// 4. Transition to final state
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if the node fails during execution.
287    pub async fn start(&self, ctx: Context) -> Result<()> {
288        // Transition to running
289        {
290            let mut state = self.state.lock().await;
291            *state = NodeState::Running;
292            info!("Node transitioned to Running state");
293        }
294
295        // Run the node
296        let result = {
297            let mut node = self.node.lock().await;
298            debug!("Starting node execution");
299            node.run(&ctx).await
300        };
301
302        // Handle result
303        match result {
304            Ok(()) => {
305                info!("Node completed successfully");
306                self.shutdown().await?;
307            }
308            Err(e) => {
309                error!("Node failed with error: {}", e);
310                {
311                    let mut state = self.state.lock().await;
312                    *state = NodeState::Crashed;
313                }
314                self.shutdown().await?;
315                return Err(e);
316            }
317        }
318
319        Ok(())
320    }
321
322    /// Gracefully shutdown the node.
323    ///
324    /// This calls `shutdown()` and transitions to Stopped state.
325    pub async fn shutdown(&self) -> Result<()> {
326        info!("Shutting down node");
327
328        // Transition to shutting down
329        {
330            let mut state = self.state.lock().await;
331            *state = NodeState::Stopping;
332        }
333
334        // Call shutdown hook
335        let result = {
336            let mut node = self.node.lock().await;
337            node.shutdown().await
338        };
339
340        // Transition to stopped
341        {
342            let mut state = self.state.lock().await;
343            *state = NodeState::Stopped;
344            info!("Node stopped");
345        }
346
347        if let Err(e) = result {
348            warn!("Error during shutdown: {}", e);
349            return Err(e);
350        }
351
352        Ok(())
353    }
354
355    /// Get the current state of the node.
356    pub async fn state(&self) -> NodeState {
357        self.state.lock().await.clone()
358    }
359
360    /// Perform a health check on the node.
361    pub async fn health_check(&self) -> HealthStatus {
362        let node = self.node.lock().await;
363        node.health_check().await
364    }
365
366    /// Check if the node is healthy.
367    pub async fn is_healthy(&self) -> bool {
368        matches!(self.state().await, NodeState::Running | NodeState::Starting)
369    }
370}
371
372// ============================================================================
373// Helper Functions
374// ============================================================================
375
376/// Run a node with the given configuration and context.
377///
378/// This is a convenience function that:
379/// 1. Initializes the node from config
380/// 2. Starts the node with the context
381/// 3. Handles errors
382///
383/// # Arguments
384///
385/// * `config` - Node-specific configuration
386/// * `ctx` - Execution context
387/// * `health_config` - Health reporting configuration
388///
389/// # Example
390///
391/// ```rust
392/// use mecha10::prelude::*;
393///
394/// #[tokio::main]
395/// async fn main() -> Result<()> {
396///     let config = MyConfig { rate: 10.0 };
397///     let ctx = Context::new("my_node");
398///
399///     // Use default health reporting (5 second interval)
400///     run_node::<MyNode>(config, ctx, HealthReportingConfig::default()).await?;
401///
402///     // Or customize
403///     run_node::<MyNode>(
404///         config,
405///         ctx,
406///         HealthReportingConfig {
407///             interval: Duration::from_secs(10),
408///             enabled: true,
409///         }
410///     ).await?;
411///
412///     Ok(())
413/// }
414/// ```
415pub async fn run_node<T: NodeImpl>(
416    config: T::Config,
417    ctx: Context,
418    health_config: HealthReportingConfig,
419) -> Result<()> {
420    // Initialize logging (safe to call multiple times)
421    crate::prelude::init_logging();
422
423    let node = Node::<T>::from_config(config).await?;
424
425    // Start automatic health reporting
426    if health_config.enabled {
427        // Create a clone for the health reporting task
428        let node_for_health = node.node.clone();
429        let ctx_for_health = ctx.clone();
430        let report_interval = health_config.interval;
431
432        tokio::spawn(async move {
433            use crate::health::HealthReporter;
434
435            // Try to create health reporter
436            let reporter = match HealthReporter::new(&ctx_for_health).await {
437                Ok(r) => r,
438                Err(e) => {
439                    warn!("Health reporting disabled: {}", e);
440                    return;
441                }
442            };
443
444            info!("Health reporting enabled with interval: {:?}", report_interval);
445
446            let mut interval = tokio::time::interval(report_interval);
447
448            loop {
449                interval.tick().await;
450
451                // Get health status from node
452                let status = {
453                    let node = node_for_health.lock().await;
454                    node.health_check().await
455                };
456
457                // Convert types::HealthStatus to messages::system::HealthStatus
458                let msg_status = crate::messages::system::HealthStatus {
459                    timestamp: status.last_check,
460                    level: if status.healthy {
461                        crate::messages::system::HealthLevel::Ok
462                    } else {
463                        crate::messages::system::HealthLevel::Error
464                    },
465                    message: status.message.unwrap_or_else(|| "No message".to_string()),
466                    diagnostics: None,
467                };
468
469                // Report to centralized monitoring
470                if let Err(e) = reporter.report(msg_status).await {
471                    debug!("Failed to report health: {}", e);
472                }
473            }
474        });
475    } else {
476        info!("Health reporting disabled");
477    }
478
479    node.start(ctx).await
480}