mecha10_core/discovery/
registry.rs

1//! Service registry implementation
2
3use super::metadata::{current_timestamp, NodeMetadata};
4use super::redis_client::{get_redis_connection, get_ttl, get_ttl_expire, RedisKeys};
5use crate::context::Context;
6use crate::error::{Mecha10Error, Result};
7use anyhow::Context as _;
8
9/// Service discovery extension trait for Context
10pub trait DiscoveryExt {
11    /// Register this node in the service registry
12    ///
13    /// # Arguments
14    ///
15    /// * `metadata` - Node metadata to register
16    ///
17    /// # Example
18    ///
19    /// ```rust
20    /// use mecha10::prelude::*;
21    /// use mecha10::discovery::{DiscoveryExt, NodeMetadata};
22    ///
23    /// # async fn example(ctx: &Context) -> Result<()> {
24    /// ctx.register(NodeMetadata::new("my-node")
25    ///     .with_type("perception")
26    ///     .publishes("/perception/detections")
27    ///     .subscribes("/sensors/camera/rgb")
28    /// ).await?;
29    /// # Ok(())
30    /// # }
31    /// ```
32    fn register(&self, metadata: NodeMetadata) -> impl std::future::Future<Output = Result<()>> + Send;
33
34    /// Unregister this node from the service registry
35    fn unregister(&self) -> impl std::future::Future<Output = Result<()>> + Send;
36
37    /// Send a heartbeat to update last seen timestamp
38    fn heartbeat(&self) -> impl std::future::Future<Output = Result<()>> + Send;
39
40    /// Discover all registered nodes
41    fn discover_all(&self) -> impl std::future::Future<Output = Result<Vec<NodeMetadata>>> + Send;
42
43    /// Discover nodes by type
44    ///
45    /// # Arguments
46    ///
47    /// * `node_type` - Type of nodes to find (e.g., "camera_driver")
48    fn discover_by_type(&self, node_type: &str) -> impl std::future::Future<Output = Result<Vec<NodeMetadata>>> + Send;
49
50    /// Discover who publishes to a specific topic
51    ///
52    /// # Arguments
53    ///
54    /// * `topic` - Topic path to query
55    fn discover_publishers(&self, topic: &str) -> impl std::future::Future<Output = Result<Vec<NodeMetadata>>> + Send;
56
57    /// Discover who subscribes to a specific topic
58    ///
59    /// # Arguments
60    ///
61    /// * `topic` - Topic path to query
62    fn discover_subscribers(&self, topic: &str) -> impl std::future::Future<Output = Result<Vec<NodeMetadata>>> + Send;
63
64    /// Get metadata for a specific node
65    ///
66    /// # Arguments
67    ///
68    /// * `node_id` - Node identifier to query
69    fn get_node(&self, node_id: &str) -> impl std::future::Future<Output = Result<Option<NodeMetadata>>> + Send;
70}
71
72impl DiscoveryExt for Context {
73    async fn register(&self, mut metadata: NodeMetadata) -> Result<()> {
74        // Ensure node_id matches context
75        if metadata.node_id.is_empty() {
76            metadata.node_id = self.node_id().to_string();
77        }
78
79        // Update timestamps
80        metadata.last_heartbeat = current_timestamp();
81        if metadata.started_at == 0 {
82            metadata.started_at = current_timestamp();
83        }
84
85        #[cfg(feature = "messaging")]
86        {
87            use redis::AsyncCommands;
88
89            let mut conn = get_redis_connection("node registration").await?;
90            let ttl = get_ttl();
91            let ttl_expire = get_ttl_expire();
92
93            // Serialize metadata
94            let metadata_json = serde_json::to_string(&metadata)
95                .with_context(|| format!("Failed to serialize metadata for node '{}'", metadata.node_id))
96                .map_err(|e| Mecha10Error::Other(format!("{:#}", e)))?;
97
98            // Store in Redis with TTL
99            let key = RedisKeys::node(&metadata.node_id);
100            conn.set_ex::<_, _, ()>(&key, &metadata_json, ttl)
101                .await
102                .with_context(|| format!("Failed to register node '{}' in Redis", metadata.node_id))
103                .map_err(|e| Mecha10Error::MessagingError {
104                    message: format!("{:#}", e),
105                    suggestion: "Check Redis connection".to_string(),
106                })?;
107
108            // Add to type index
109            if !metadata.node_type.is_empty() {
110                let type_key = RedisKeys::by_type(&metadata.node_type);
111                conn.sadd::<_, _, ()>(&type_key, &metadata.node_id)
112                    .await
113                    .with_context(|| {
114                        format!(
115                            "Failed to update type index for node '{}' of type '{}'",
116                            metadata.node_id, metadata.node_type
117                        )
118                    })
119                    .map_err(|e| Mecha10Error::MessagingError {
120                        message: format!("{:#}", e),
121                        suggestion: "Check Redis connection".to_string(),
122                    })?;
123                conn.expire::<_, ()>(&type_key, ttl_expire).await.ok();
124            }
125
126            // Add to publisher index
127            for topic in &metadata.publishes {
128                let pub_key = RedisKeys::publishers(topic);
129                conn.sadd::<_, _, ()>(&pub_key, &metadata.node_id)
130                    .await
131                    .with_context(|| {
132                        format!(
133                            "Failed to update publisher index for topic '{}' on node '{}'",
134                            topic, metadata.node_id
135                        )
136                    })
137                    .map_err(|e| Mecha10Error::MessagingError {
138                        message: format!("{:#}", e),
139                        suggestion: "Check Redis connection".to_string(),
140                    })?;
141                conn.expire::<_, ()>(&pub_key, ttl_expire).await.ok();
142            }
143
144            // Add to subscriber index
145            for topic in &metadata.subscribes {
146                let sub_key = RedisKeys::subscribers(topic);
147                conn.sadd::<_, _, ()>(&sub_key, &metadata.node_id)
148                    .await
149                    .with_context(|| {
150                        format!(
151                            "Failed to update subscriber index for topic '{}' on node '{}'",
152                            topic, metadata.node_id
153                        )
154                    })
155                    .map_err(|e| Mecha10Error::MessagingError {
156                        message: format!("{:#}", e),
157                        suggestion: "Check Redis connection".to_string(),
158                    })?;
159                conn.expire::<_, ()>(&sub_key, ttl_expire).await.ok();
160            }
161
162            Ok(())
163        }
164
165        #[cfg(not(feature = "messaging"))]
166        {
167            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
168        }
169    }
170
171    async fn unregister(&self) -> Result<()> {
172        #[cfg(feature = "messaging")]
173        {
174            use redis::AsyncCommands;
175
176            let mut conn = get_redis_connection("node unregistration").await?;
177
178            // Delete node metadata
179            let key = RedisKeys::node(self.node_id());
180            conn.del::<_, ()>(&key)
181                .await
182                .with_context(|| format!("Failed to unregister node '{}' from Redis", self.node_id()))
183                .map_err(|e| Mecha10Error::MessagingError {
184                    message: format!("{:#}", e),
185                    suggestion: "Check Redis connection".to_string(),
186                })?;
187
188            Ok(())
189        }
190
191        #[cfg(not(feature = "messaging"))]
192        {
193            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
194        }
195    }
196
197    async fn heartbeat(&self) -> Result<()> {
198        // Get current metadata and re-register to update TTL
199        if let Some(mut metadata) = self.get_node(self.node_id()).await? {
200            metadata.last_heartbeat = current_timestamp();
201            self.register(metadata).await
202        } else {
203            // Node not registered, register with defaults
204            self.register(NodeMetadata::new(self.node_id())).await
205        }
206    }
207
208    async fn discover_all(&self) -> Result<Vec<NodeMetadata>> {
209        #[cfg(feature = "messaging")]
210        {
211            use redis::AsyncCommands;
212
213            let mut conn = get_redis_connection("node discovery").await?;
214
215            // Scan for all node keys
216            let pattern = RedisKeys::all_nodes_pattern();
217            let keys: Vec<String> = conn
218                .keys(pattern)
219                .await
220                .with_context(|| format!("Failed to query all nodes with pattern '{}'", pattern))
221                .map_err(|e| Mecha10Error::MessagingError {
222                    message: format!("{:#}", e),
223                    suggestion: "Check Redis connection".to_string(),
224                })?;
225
226            let mut nodes = Vec::new();
227            for key in keys {
228                let json: String = conn
229                    .get(&key)
230                    .await
231                    .with_context(|| format!("Failed to get node metadata from key '{}'", key))
232                    .map_err(|e| Mecha10Error::MessagingError {
233                        message: format!("{:#}", e),
234                        suggestion: "Check Redis connection".to_string(),
235                    })?;
236
237                if let Ok(metadata) = serde_json::from_str::<NodeMetadata>(&json) {
238                    nodes.push(metadata);
239                }
240            }
241
242            Ok(nodes)
243        }
244
245        #[cfg(not(feature = "messaging"))]
246        {
247            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
248        }
249    }
250
251    async fn discover_by_type(&self, node_type: &str) -> Result<Vec<NodeMetadata>> {
252        #[cfg(feature = "messaging")]
253        {
254            use redis::AsyncCommands;
255
256            let mut conn = get_redis_connection("type discovery").await?;
257
258            // Get node IDs from type index
259            let type_key = RedisKeys::by_type(node_type);
260            let node_ids: Vec<String> = conn
261                .smembers(&type_key)
262                .await
263                .with_context(|| format!("Failed to query nodes of type '{}'", node_type))
264                .map_err(|e| Mecha10Error::MessagingError {
265                    message: format!("{:#}", e),
266                    suggestion: "Check Redis connection".to_string(),
267                })?;
268
269            let mut nodes = Vec::new();
270            for node_id in node_ids {
271                if let Some(metadata) = self.get_node(&node_id).await? {
272                    nodes.push(metadata);
273                }
274            }
275
276            Ok(nodes)
277        }
278
279        #[cfg(not(feature = "messaging"))]
280        {
281            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
282        }
283    }
284
285    async fn discover_publishers(&self, topic: &str) -> Result<Vec<NodeMetadata>> {
286        #[cfg(feature = "messaging")]
287        {
288            use redis::AsyncCommands;
289
290            let mut conn = get_redis_connection("publisher discovery").await?;
291
292            let pub_key = RedisKeys::publishers(topic);
293            let node_ids: Vec<String> = conn
294                .smembers(&pub_key)
295                .await
296                .with_context(|| format!("Failed to query publishers for topic '{}'", topic))
297                .map_err(|e| Mecha10Error::MessagingError {
298                    message: format!("{:#}", e),
299                    suggestion: "Check Redis connection".to_string(),
300                })?;
301
302            let mut nodes = Vec::new();
303            for node_id in node_ids {
304                if let Some(metadata) = self.get_node(&node_id).await? {
305                    nodes.push(metadata);
306                }
307            }
308
309            Ok(nodes)
310        }
311
312        #[cfg(not(feature = "messaging"))]
313        {
314            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
315        }
316    }
317
318    async fn discover_subscribers(&self, topic: &str) -> Result<Vec<NodeMetadata>> {
319        #[cfg(feature = "messaging")]
320        {
321            use redis::AsyncCommands;
322
323            let mut conn = get_redis_connection("subscriber discovery").await?;
324
325            let sub_key = RedisKeys::subscribers(topic);
326            let node_ids: Vec<String> = conn
327                .smembers(&sub_key)
328                .await
329                .with_context(|| format!("Failed to query subscribers for topic '{}'", topic))
330                .map_err(|e| Mecha10Error::MessagingError {
331                    message: format!("{:#}", e),
332                    suggestion: "Check Redis connection".to_string(),
333                })?;
334
335            let mut nodes = Vec::new();
336            for node_id in node_ids {
337                if let Some(metadata) = self.get_node(&node_id).await? {
338                    nodes.push(metadata);
339                }
340            }
341
342            Ok(nodes)
343        }
344
345        #[cfg(not(feature = "messaging"))]
346        {
347            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
348        }
349    }
350
351    async fn get_node(&self, node_id: &str) -> Result<Option<NodeMetadata>> {
352        #[cfg(feature = "messaging")]
353        {
354            use redis::AsyncCommands;
355
356            let mut conn = get_redis_connection("node lookup").await?;
357
358            let key = RedisKeys::node(node_id);
359            let json: Option<String> = conn
360                .get(&key)
361                .await
362                .with_context(|| format!("Failed to get metadata for node '{}'", node_id))
363                .map_err(|e| Mecha10Error::MessagingError {
364                    message: format!("{:#}", e),
365                    suggestion: "Check Redis connection".to_string(),
366                })?;
367
368            if let Some(json) = json {
369                let metadata = serde_json::from_str::<NodeMetadata>(&json)
370                    .with_context(|| format!("Failed to parse metadata JSON for node '{}'", node_id))
371                    .map_err(|e| Mecha10Error::Other(format!("{:#}", e)))?;
372                Ok(Some(metadata))
373            } else {
374                Ok(None)
375            }
376        }
377
378        #[cfg(not(feature = "messaging"))]
379        {
380            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
381        }
382    }
383}