mecha10_core/
publish.rs

1//! Conditional publishing for declarative message publishing
2//!
3//! This module provides a fluent API for conditional and throttled publishing.
4//!
5//! # Examples
6//!
7//! ## Conditional Publishing
8//!
9//! ```rust
10//! use mecha10::prelude::*;
11//! use mecha10::topics::perception;
12//!
13//! # async fn example(ctx: &Context, detections: Vec<Detection>) -> Result<()> {
14//! // Only publish when detections are non-empty
15//! ctx.publish_to(perception::DETECTIONS, &detections)
16//!     .when(|d| !d.is_empty())
17//!     .await?;
18//! # Ok(())
19//! # }
20//! ```
21//!
22//! ## Throttled Publishing
23//!
24//! ```rust
25//! use mecha10::prelude::*;
26//! use mecha10::topics::system;
27//!
28//! # async fn example(ctx: &Context, status: String) -> Result<()> {
29//! // Publish at most once per second
30//! ctx.publish_to(system::NODE_STATUS, &status)
31//!     .throttle(Duration::from_secs(1))
32//!     .await?;
33//! # Ok(())
34//! # }
35//! ```
36
37use crate::context::Context;
38use crate::error::Result;
39use crate::messages::Message;
40use crate::topics::Topic;
41use serde::Serialize;
42use std::collections::HashMap;
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45use tokio::sync::Mutex;
46
47/// Global throttle state for tracking last publish times
48static THROTTLE_STATE: once_cell::sync::Lazy<Arc<Mutex<HashMap<String, Instant>>>> =
49    once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashMap::new())));
50
51/// Builder for conditional/throttled publishing
52///
53/// This builder allows chaining operations to create declarative
54/// publishing behavior.
55pub struct PublishBuilder<'a, 'b, T: Message + Serialize> {
56    ctx: &'a Context,
57    topic: Topic<T>,
58    message: &'b T,
59    #[allow(clippy::type_complexity)]
60    condition: Option<Box<dyn Fn(&T) -> bool + Send + Sync>>,
61    throttle_duration: Option<Duration>,
62}
63
64impl<'a, 'b, T: Message + Serialize> PublishBuilder<'a, 'b, T> {
65    /// Create a new publish builder
66    pub fn new(ctx: &'a Context, topic: Topic<T>, message: &'b T) -> Self {
67        Self {
68            ctx,
69            topic,
70            message,
71            condition: None,
72            throttle_duration: None,
73        }
74    }
75
76    /// Only publish if condition is true
77    ///
78    /// If the predicate returns `false`, the message will not be published.
79    ///
80    /// # Example
81    ///
82    /// ```rust
83    /// # use mecha10::prelude::*;
84    /// # async fn example(ctx: &Context, detections: Vec<Detection>) -> Result<()> {
85    /// ctx.publish_to(perception::DETECTIONS, &detections)
86    ///     .when(|d| !d.is_empty())
87    ///     .await?;
88    /// # Ok(())
89    /// # }
90    /// ```
91    pub fn when<F>(mut self, predicate: F) -> Self
92    where
93        F: Fn(&T) -> bool + Send + Sync + 'static,
94    {
95        self.condition = Some(Box::new(predicate));
96        self
97    }
98
99    /// Throttle publishing to a maximum rate
100    ///
101    /// Messages will be rate-limited to at most one per `duration`.
102    /// If you try to publish more frequently, the message will be dropped.
103    ///
104    /// # Example
105    ///
106    /// ```rust
107    /// # use mecha10::prelude::*;
108    /// # async fn example(ctx: &Context, status: String) -> Result<()> {
109    /// // Max once per second
110    /// ctx.publish_to(system::NODE_STATUS, &status)
111    ///     .throttle(Duration::from_secs(1))
112    ///     .await?;
113    /// # Ok(())
114    /// # }
115    /// ```
116    pub fn throttle(mut self, duration: Duration) -> Self {
117        self.throttle_duration = Some(duration);
118        self
119    }
120
121    /// Execute the publish with all configured conditions
122    ///
123    /// This method applies all the configured filters and throttling,
124    /// then publishes the message if all conditions are met.
125    pub async fn execute(self) -> Result<()> {
126        // Apply condition check
127        if let Some(ref condition) = self.condition {
128            if !condition(self.message) {
129                // Condition not met, skip publishing
130                return Ok(());
131            }
132        }
133
134        // Apply throttle check
135        if let Some(throttle_dur) = self.throttle_duration {
136            let topic_key = format!("{}-{}", self.ctx.node_id(), self.topic.path());
137
138            let mut throttle_state = THROTTLE_STATE.lock().await;
139
140            if let Some(last_time) = throttle_state.get(&topic_key) {
141                let elapsed = last_time.elapsed();
142                if elapsed < throttle_dur {
143                    // Throttle limit not reached, skip publishing
144                    return Ok(());
145                }
146            }
147
148            // Update last publish time
149            throttle_state.insert(topic_key, Instant::now());
150        }
151
152        // All conditions met, publish the message
153        self.ctx.publish_to(self.topic, self.message).await
154    }
155}
156
157/// Extension trait to add conditional publishing to Context
158pub trait ContextPublishExt {
159    /// Start building a conditional publish operation
160    fn publish_with<'a, 'b, T>(&'a self, topic: Topic<T>, message: &'b T) -> PublishBuilder<'a, 'b, T>
161    where
162        T: Message + Serialize;
163}
164
165impl ContextPublishExt for Context {
166    fn publish_with<'a, 'b, T>(&'a self, topic: Topic<T>, message: &'b T) -> PublishBuilder<'a, 'b, T>
167    where
168        T: Message + Serialize,
169    {
170        PublishBuilder::new(self, topic, message)
171    }
172}
173
174// Allow PublishBuilder to be awaited directly
175impl<'a, 'b, T: Message + Serialize> std::future::IntoFuture for PublishBuilder<'a, 'b, T>
176where
177    'b: 'a,
178{
179    type Output = Result<()>;
180    type IntoFuture = std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>>;
181
182    fn into_future(self) -> Self::IntoFuture {
183        Box::pin(self.execute())
184    }
185}