mecha10_core/publish.rs
1//! Conditional publishing for declarative message publishing
2//!
3//! This module provides a fluent API for conditional and throttled publishing.
4//!
5//! # Examples
6//!
7//! ## Conditional Publishing
8//!
9//! ```rust
10//! use mecha10::prelude::*;
11//! use mecha10::topics::perception;
12//!
13//! # async fn example(ctx: &Context, detections: Vec<Detection>) -> Result<()> {
14//! // Only publish when detections are non-empty
15//! ctx.publish_to(perception::DETECTIONS, &detections)
16//! .when(|d| !d.is_empty())
17//! .await?;
18//! # Ok(())
19//! # }
20//! ```
21//!
22//! ## Throttled Publishing
23//!
24//! ```rust
25//! use mecha10::prelude::*;
26//! use mecha10::topics::system;
27//!
28//! # async fn example(ctx: &Context, status: String) -> Result<()> {
29//! // Publish at most once per second
30//! ctx.publish_to(system::NODE_STATUS, &status)
31//! .throttle(Duration::from_secs(1))
32//! .await?;
33//! # Ok(())
34//! # }
35//! ```
36
37use crate::context::Context;
38use crate::error::Result;
39use crate::messages::Message;
40use crate::topics::Topic;
41use serde::Serialize;
42use std::collections::HashMap;
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45use tokio::sync::Mutex;
46
47/// Global throttle state for tracking last publish times
48static THROTTLE_STATE: once_cell::sync::Lazy<Arc<Mutex<HashMap<String, Instant>>>> =
49 once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashMap::new())));
50
51/// Builder for conditional/throttled publishing
52///
53/// This builder allows chaining operations to create declarative
54/// publishing behavior.
55pub struct PublishBuilder<'a, 'b, T: Message + Serialize> {
56 ctx: &'a Context,
57 topic: Topic<T>,
58 message: &'b T,
59 #[allow(clippy::type_complexity)]
60 condition: Option<Box<dyn Fn(&T) -> bool + Send + Sync>>,
61 throttle_duration: Option<Duration>,
62}
63
64impl<'a, 'b, T: Message + Serialize> PublishBuilder<'a, 'b, T> {
65 /// Create a new publish builder
66 pub fn new(ctx: &'a Context, topic: Topic<T>, message: &'b T) -> Self {
67 Self {
68 ctx,
69 topic,
70 message,
71 condition: None,
72 throttle_duration: None,
73 }
74 }
75
76 /// Only publish if condition is true
77 ///
78 /// If the predicate returns `false`, the message will not be published.
79 ///
80 /// # Example
81 ///
82 /// ```rust
83 /// # use mecha10::prelude::*;
84 /// # async fn example(ctx: &Context, detections: Vec<Detection>) -> Result<()> {
85 /// ctx.publish_to(perception::DETECTIONS, &detections)
86 /// .when(|d| !d.is_empty())
87 /// .await?;
88 /// # Ok(())
89 /// # }
90 /// ```
91 pub fn when<F>(mut self, predicate: F) -> Self
92 where
93 F: Fn(&T) -> bool + Send + Sync + 'static,
94 {
95 self.condition = Some(Box::new(predicate));
96 self
97 }
98
99 /// Throttle publishing to a maximum rate
100 ///
101 /// Messages will be rate-limited to at most one per `duration`.
102 /// If you try to publish more frequently, the message will be dropped.
103 ///
104 /// # Example
105 ///
106 /// ```rust
107 /// # use mecha10::prelude::*;
108 /// # async fn example(ctx: &Context, status: String) -> Result<()> {
109 /// // Max once per second
110 /// ctx.publish_to(system::NODE_STATUS, &status)
111 /// .throttle(Duration::from_secs(1))
112 /// .await?;
113 /// # Ok(())
114 /// # }
115 /// ```
116 pub fn throttle(mut self, duration: Duration) -> Self {
117 self.throttle_duration = Some(duration);
118 self
119 }
120
121 /// Execute the publish with all configured conditions
122 ///
123 /// This method applies all the configured filters and throttling,
124 /// then publishes the message if all conditions are met.
125 pub async fn execute(self) -> Result<()> {
126 // Apply condition check
127 if let Some(ref condition) = self.condition {
128 if !condition(self.message) {
129 // Condition not met, skip publishing
130 return Ok(());
131 }
132 }
133
134 // Apply throttle check
135 if let Some(throttle_dur) = self.throttle_duration {
136 let topic_key = format!("{}-{}", self.ctx.node_id(), self.topic.path());
137
138 let mut throttle_state = THROTTLE_STATE.lock().await;
139
140 if let Some(last_time) = throttle_state.get(&topic_key) {
141 let elapsed = last_time.elapsed();
142 if elapsed < throttle_dur {
143 // Throttle limit not reached, skip publishing
144 return Ok(());
145 }
146 }
147
148 // Update last publish time
149 throttle_state.insert(topic_key, Instant::now());
150 }
151
152 // All conditions met, publish the message
153 self.ctx.publish_to(self.topic, self.message).await
154 }
155}
156
157/// Extension trait to add conditional publishing to Context
158pub trait ContextPublishExt {
159 /// Start building a conditional publish operation
160 fn publish_with<'a, 'b, T>(&'a self, topic: Topic<T>, message: &'b T) -> PublishBuilder<'a, 'b, T>
161 where
162 T: Message + Serialize;
163}
164
165impl ContextPublishExt for Context {
166 fn publish_with<'a, 'b, T>(&'a self, topic: Topic<T>, message: &'b T) -> PublishBuilder<'a, 'b, T>
167 where
168 T: Message + Serialize,
169 {
170 PublishBuilder::new(self, topic, message)
171 }
172}
173
174// Allow PublishBuilder to be awaited directly
175impl<'a, 'b, T: Message + Serialize> std::future::IntoFuture for PublishBuilder<'a, 'b, T>
176where
177 'b: 'a,
178{
179 type Output = Result<()>;
180 type IntoFuture = std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>>;
181
182 fn into_future(self) -> Self::IntoFuture {
183 Box::pin(self.execute())
184 }
185}