mecha10_core/node.rs
1// Mecha10 Node Trait and Lifecycle Management
2//
3// This module defines the core Node trait and lifecycle management for Mecha10 nodes.
4// Every Mecha10 node implements the NodeImpl trait to define its behavior.
5
6use crate::{Context, HealthStatus, NodeState, Result};
7use async_trait::async_trait;
8use serde::de::DeserializeOwned;
9use std::fmt::Debug;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::Mutex;
13use tracing::{debug, error, info, warn};
14
15// ============================================================================
16// Configuration
17// ============================================================================
18
19/// Configuration for node health reporting
20#[derive(Debug, Clone)]
21pub struct HealthReportingConfig {
22 /// Interval between health reports (default: 5 seconds)
23 pub interval: Duration,
24 /// Whether health reporting is enabled (default: true)
25 pub enabled: bool,
26}
27
28impl Default for HealthReportingConfig {
29 fn default() -> Self {
30 Self {
31 interval: Duration::from_secs(5),
32 enabled: true,
33 }
34 }
35}
36
37/// Trait for node configurations that specify an update rate
38///
39/// This trait provides a standard interface for nodes that publish at a fixed rate.
40/// Implement this trait on your node's config struct to enable convenient rate-based
41/// interval creation.
42///
43/// # Example
44///
45/// ```rust
46/// use mecha10::prelude::*;
47///
48/// #[derive(Debug, Clone, Serialize, Deserialize)]
49/// pub struct CameraConfig {
50/// pub fps: f32,
51/// pub width: u32,
52/// pub height: u32,
53/// }
54///
55/// impl RateConfig for CameraConfig {
56/// fn rate_hz(&self) -> f32 {
57/// self.fps
58/// }
59/// }
60///
61/// // Now you can use the config to create intervals easily:
62/// # async fn example(config: CameraConfig) {
63/// let mut interval = config.create_interval();
64/// loop {
65/// interval.tick().await;
66/// // Publish at configured rate
67/// }
68/// # }
69/// ```
70pub trait RateConfig {
71 /// Get the update rate in Hertz (cycles per second)
72 fn rate_hz(&self) -> f32;
73
74 /// Create a tokio interval from the configured rate
75 ///
76 /// This is a convenience method that converts the rate to a Duration
77 /// and creates a tokio::time::Interval.
78 fn create_interval(&self) -> tokio::time::Interval {
79 let duration = Duration::from_secs_f32(1.0 / self.rate_hz());
80 tokio::time::interval(duration)
81 }
82
83 /// Get the period (time between updates) as a Duration
84 fn period(&self) -> Duration {
85 Duration::from_secs_f32(1.0 / self.rate_hz())
86 }
87}
88
89// ============================================================================
90// Health Status Helper
91// ============================================================================
92
93/// Helper methods for creating HealthStatus instances
94pub trait HealthStatusExt {
95 /// Create a healthy status
96 fn healthy(message: impl Into<String>) -> Self;
97 /// Create an unhealthy status
98 fn unhealthy(message: impl Into<String>) -> Self;
99}
100
101impl HealthStatusExt for HealthStatus {
102 fn healthy(message: impl Into<String>) -> Self {
103 Self {
104 healthy: true,
105 last_check: crate::prelude::now_micros(),
106 failure_count: 0,
107 message: Some(message.into()),
108 }
109 }
110
111 fn unhealthy(message: impl Into<String>) -> Self {
112 Self {
113 healthy: false,
114 last_check: crate::prelude::now_micros(),
115 failure_count: 1,
116 message: Some(message.into()),
117 }
118 }
119}
120
121// ============================================================================
122// NodeImpl Trait
123// ============================================================================
124
125/// Core trait that all Mecha10 nodes must implement.
126///
127/// This trait defines the lifecycle of a node:
128/// 1. `init()` - Initialize the node with configuration
129/// 2. `run()` - Main execution loop
130/// 3. `shutdown()` - Graceful shutdown
131/// 4. `health_check()` - Report health status
132///
133/// # Example
134///
135/// ```rust
136/// use mecha10::prelude::*;
137///
138/// #[derive(Debug)]
139/// struct MyNode {
140/// config: MyConfig,
141/// }
142///
143/// #[derive(Debug, Deserialize)]
144/// struct MyConfig {
145/// rate: f32,
146/// }
147///
148/// #[async_trait]
149/// impl NodeImpl for MyNode {
150/// type Config = MyConfig;
151///
152/// async fn init(config: Self::Config) -> Result<Self> {
153/// info!("Initializing node with rate: {}", config.rate);
154/// Ok(Self { config })
155/// }
156///
157/// async fn run(&mut self, ctx: &Context) -> Result<()> {
158/// let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
159///
160/// while let Some(image) = images.recv().await {
161/// // Process image
162/// info!("Received image: {}x{}", image.width, image.height);
163/// }
164///
165/// Ok(())
166/// }
167/// }
168/// ```
169#[async_trait]
170pub trait NodeImpl: Send + Sync + Debug + 'static {
171 /// Configuration type for this node
172 type Config: DeserializeOwned + Send + Sync + Debug;
173
174 /// Initialize the node with the given configuration.
175 ///
176 /// This is called once when the node starts. Use this to:
177 /// - Load configuration
178 /// - Initialize resources (models, connections, etc.)
179 /// - Validate prerequisites
180 ///
181 /// # Errors
182 ///
183 /// Return an error if initialization fails. The node will not start.
184 async fn init(config: Self::Config) -> Result<Self>
185 where
186 Self: Sized;
187
188 /// Main execution loop for the node.
189 ///
190 /// This is where your node's business logic lives. Typical patterns:
191 /// - Subscribe to topics and process messages
192 /// - Run periodic tasks
193 /// - Publish results
194 ///
195 /// This method should run indefinitely until:
196 /// - The context is cancelled (shutdown requested)
197 /// - An unrecoverable error occurs
198 ///
199 /// # Errors
200 ///
201 /// Return an error if the node encounters an unrecoverable error.
202 /// The node will transition to Error state and shutdown.
203 async fn run(&mut self, ctx: &Context) -> Result<()>;
204
205 /// Graceful shutdown hook.
206 ///
207 /// Called when the node is shutting down. Use this to:
208 /// - Close connections
209 /// - Flush buffers
210 /// - Save state
211 /// - Release resources
212 ///
213 /// The default implementation does nothing.
214 ///
215 /// # Errors
216 ///
217 /// Errors during shutdown are logged but don't prevent shutdown.
218 async fn shutdown(&mut self) -> Result<()> {
219 Ok(())
220 }
221
222 /// Health check for monitoring.
223 ///
224 /// Called periodically to check node health. Use this to:
225 /// - Report current state
226 /// - Check resource usage
227 /// - Validate connections
228 ///
229 /// The default implementation returns healthy status.
230 async fn health_check(&self) -> HealthStatus {
231 HealthStatus {
232 healthy: true,
233 last_check: crate::prelude::now_micros(),
234 failure_count: 0,
235 message: Some("Node is running".to_string()),
236 }
237 }
238}
239
240// ============================================================================
241// Node Runner
242// ============================================================================
243
244/// Wrapper that manages the lifecycle of a NodeImpl.
245///
246/// This provides:
247/// - State management
248/// - Lifecycle orchestration
249/// - Error handling
250/// - Health monitoring
251#[derive(Clone)]
252pub struct Node<T: NodeImpl> {
253 node: Arc<Mutex<T>>,
254 state: Arc<Mutex<NodeState>>,
255}
256
257impl<T: NodeImpl> Node<T> {
258 /// Create a new node from a configuration.
259 ///
260 /// This calls `NodeImpl::init()` and wraps the result in a Node.
261 ///
262 /// # Errors
263 ///
264 /// Returns an error if initialization fails.
265 pub async fn from_config(config: T::Config) -> Result<Self> {
266 info!("Initializing node");
267
268 let node = T::init(config).await?;
269
270 Ok(Self {
271 node: Arc::new(Mutex::new(node)),
272 state: Arc::new(Mutex::new(NodeState::Starting)),
273 })
274 }
275
276 /// Start the node with the given context.
277 ///
278 /// This orchestrates the full lifecycle:
279 /// 1. Transition to Running state
280 /// 2. Call `run()`
281 /// 3. Call `shutdown()` on completion or error
282 /// 4. Transition to final state
283 ///
284 /// # Errors
285 ///
286 /// Returns an error if the node fails during execution.
287 pub async fn start(&self, ctx: Context) -> Result<()> {
288 // Transition to running
289 {
290 let mut state = self.state.lock().await;
291 *state = NodeState::Running;
292 info!("Node transitioned to Running state");
293 }
294
295 // Run the node
296 let result = {
297 let mut node = self.node.lock().await;
298 debug!("Starting node execution");
299 node.run(&ctx).await
300 };
301
302 // Handle result
303 match result {
304 Ok(()) => {
305 info!("Node completed successfully");
306 self.shutdown().await?;
307 }
308 Err(e) => {
309 error!("Node failed with error: {}", e);
310 {
311 let mut state = self.state.lock().await;
312 *state = NodeState::Crashed;
313 }
314 self.shutdown().await?;
315 return Err(e);
316 }
317 }
318
319 Ok(())
320 }
321
322 /// Gracefully shutdown the node.
323 ///
324 /// This calls `shutdown()` and transitions to Stopped state.
325 pub async fn shutdown(&self) -> Result<()> {
326 info!("Shutting down node");
327
328 // Transition to shutting down
329 {
330 let mut state = self.state.lock().await;
331 *state = NodeState::Stopping;
332 }
333
334 // Call shutdown hook
335 let result = {
336 let mut node = self.node.lock().await;
337 node.shutdown().await
338 };
339
340 // Transition to stopped
341 {
342 let mut state = self.state.lock().await;
343 *state = NodeState::Stopped;
344 info!("Node stopped");
345 }
346
347 if let Err(e) = result {
348 warn!("Error during shutdown: {}", e);
349 return Err(e);
350 }
351
352 Ok(())
353 }
354
355 /// Get the current state of the node.
356 pub async fn state(&self) -> NodeState {
357 self.state.lock().await.clone()
358 }
359
360 /// Perform a health check on the node.
361 pub async fn health_check(&self) -> HealthStatus {
362 let node = self.node.lock().await;
363 node.health_check().await
364 }
365
366 /// Check if the node is healthy.
367 pub async fn is_healthy(&self) -> bool {
368 matches!(self.state().await, NodeState::Running | NodeState::Starting)
369 }
370}
371
372// ============================================================================
373// Helper Functions
374// ============================================================================
375
376/// Run a node with the given configuration and context.
377///
378/// This is a convenience function that:
379/// 1. Initializes the node from config
380/// 2. Starts the node with the context
381/// 3. Handles errors
382///
383/// # Arguments
384///
385/// * `config` - Node-specific configuration
386/// * `ctx` - Execution context
387/// * `health_config` - Health reporting configuration
388///
389/// # Example
390///
391/// ```rust
392/// use mecha10::prelude::*;
393///
394/// #[tokio::main]
395/// async fn main() -> Result<()> {
396/// let config = MyConfig { rate: 10.0 };
397/// let ctx = Context::new("my_node");
398///
399/// // Use default health reporting (5 second interval)
400/// run_node::<MyNode>(config, ctx, HealthReportingConfig::default()).await?;
401///
402/// // Or customize
403/// run_node::<MyNode>(
404/// config,
405/// ctx,
406/// HealthReportingConfig {
407/// interval: Duration::from_secs(10),
408/// enabled: true,
409/// }
410/// ).await?;
411///
412/// Ok(())
413/// }
414/// ```
415pub async fn run_node<T: NodeImpl>(
416 config: T::Config,
417 ctx: Context,
418 health_config: HealthReportingConfig,
419) -> Result<()> {
420 // Initialize logging (safe to call multiple times)
421 crate::prelude::init_logging();
422
423 let node = Node::<T>::from_config(config).await?;
424
425 // Start automatic health reporting
426 if health_config.enabled {
427 // Create a clone for the health reporting task
428 let node_for_health = node.node.clone();
429 let ctx_for_health = ctx.clone();
430 let report_interval = health_config.interval;
431
432 tokio::spawn(async move {
433 use crate::health::HealthReporter;
434
435 // Try to create health reporter
436 let reporter = match HealthReporter::new(&ctx_for_health).await {
437 Ok(r) => r,
438 Err(e) => {
439 warn!("Health reporting disabled: {}", e);
440 return;
441 }
442 };
443
444 info!("Health reporting enabled with interval: {:?}", report_interval);
445
446 let mut interval = tokio::time::interval(report_interval);
447
448 loop {
449 interval.tick().await;
450
451 // Get health status from node
452 let status = {
453 let node = node_for_health.lock().await;
454 node.health_check().await
455 };
456
457 // Convert types::HealthStatus to messages::system::HealthStatus
458 let msg_status = crate::messages::system::HealthStatus {
459 timestamp: status.last_check,
460 level: if status.healthy {
461 crate::messages::system::HealthLevel::Ok
462 } else {
463 crate::messages::system::HealthLevel::Error
464 },
465 message: status.message.unwrap_or_else(|| "No message".to_string()),
466 diagnostics: None,
467 };
468
469 // Report to centralized monitoring
470 if let Err(e) = reporter.report(msg_status).await {
471 debug!("Failed to report health: {}", e);
472 }
473 }
474 });
475 } else {
476 info!("Health reporting disabled");
477 }
478
479 node.start(ctx).await
480}