1use super::metadata::{current_timestamp, NodeMetadata};
4use super::redis_client::{get_redis_connection, get_ttl, get_ttl_expire, RedisKeys};
5use crate::context::Context;
6use crate::error::{Mecha10Error, Result};
7use anyhow::Context as _;
8
9pub trait DiscoveryExt {
11 fn register(&self, metadata: NodeMetadata) -> impl std::future::Future<Output = Result<()>> + Send;
33
34 fn unregister(&self) -> impl std::future::Future<Output = Result<()>> + Send;
36
37 fn heartbeat(&self) -> impl std::future::Future<Output = Result<()>> + Send;
39
40 fn discover_all(&self) -> impl std::future::Future<Output = Result<Vec<NodeMetadata>>> + Send;
42
43 fn discover_by_type(&self, node_type: &str) -> impl std::future::Future<Output = Result<Vec<NodeMetadata>>> + Send;
49
50 fn discover_publishers(&self, topic: &str) -> impl std::future::Future<Output = Result<Vec<NodeMetadata>>> + Send;
56
57 fn discover_subscribers(&self, topic: &str) -> impl std::future::Future<Output = Result<Vec<NodeMetadata>>> + Send;
63
64 fn get_node(&self, node_id: &str) -> impl std::future::Future<Output = Result<Option<NodeMetadata>>> + Send;
70}
71
72impl DiscoveryExt for Context {
73 async fn register(&self, mut metadata: NodeMetadata) -> Result<()> {
74 if metadata.node_id.is_empty() {
76 metadata.node_id = self.node_id().to_string();
77 }
78
79 metadata.last_heartbeat = current_timestamp();
81 if metadata.started_at == 0 {
82 metadata.started_at = current_timestamp();
83 }
84
85 #[cfg(feature = "messaging")]
86 {
87 use redis::AsyncCommands;
88
89 let mut conn = get_redis_connection("node registration").await?;
90 let ttl = get_ttl();
91 let ttl_expire = get_ttl_expire();
92
93 let metadata_json = serde_json::to_string(&metadata)
95 .with_context(|| format!("Failed to serialize metadata for node '{}'", metadata.node_id))
96 .map_err(|e| Mecha10Error::Other(format!("{:#}", e)))?;
97
98 let key = RedisKeys::node(&metadata.node_id);
100 conn.set_ex::<_, _, ()>(&key, &metadata_json, ttl)
101 .await
102 .with_context(|| format!("Failed to register node '{}' in Redis", metadata.node_id))
103 .map_err(|e| Mecha10Error::MessagingError {
104 message: format!("{:#}", e),
105 suggestion: "Check Redis connection".to_string(),
106 })?;
107
108 if !metadata.node_type.is_empty() {
110 let type_key = RedisKeys::by_type(&metadata.node_type);
111 conn.sadd::<_, _, ()>(&type_key, &metadata.node_id)
112 .await
113 .with_context(|| {
114 format!(
115 "Failed to update type index for node '{}' of type '{}'",
116 metadata.node_id, metadata.node_type
117 )
118 })
119 .map_err(|e| Mecha10Error::MessagingError {
120 message: format!("{:#}", e),
121 suggestion: "Check Redis connection".to_string(),
122 })?;
123 conn.expire::<_, ()>(&type_key, ttl_expire).await.ok();
124 }
125
126 for topic in &metadata.publishes {
128 let pub_key = RedisKeys::publishers(topic);
129 conn.sadd::<_, _, ()>(&pub_key, &metadata.node_id)
130 .await
131 .with_context(|| {
132 format!(
133 "Failed to update publisher index for topic '{}' on node '{}'",
134 topic, metadata.node_id
135 )
136 })
137 .map_err(|e| Mecha10Error::MessagingError {
138 message: format!("{:#}", e),
139 suggestion: "Check Redis connection".to_string(),
140 })?;
141 conn.expire::<_, ()>(&pub_key, ttl_expire).await.ok();
142 }
143
144 for topic in &metadata.subscribes {
146 let sub_key = RedisKeys::subscribers(topic);
147 conn.sadd::<_, _, ()>(&sub_key, &metadata.node_id)
148 .await
149 .with_context(|| {
150 format!(
151 "Failed to update subscriber index for topic '{}' on node '{}'",
152 topic, metadata.node_id
153 )
154 })
155 .map_err(|e| Mecha10Error::MessagingError {
156 message: format!("{:#}", e),
157 suggestion: "Check Redis connection".to_string(),
158 })?;
159 conn.expire::<_, ()>(&sub_key, ttl_expire).await.ok();
160 }
161
162 Ok(())
163 }
164
165 #[cfg(not(feature = "messaging"))]
166 {
167 Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
168 }
169 }
170
171 async fn unregister(&self) -> Result<()> {
172 #[cfg(feature = "messaging")]
173 {
174 use redis::AsyncCommands;
175
176 let mut conn = get_redis_connection("node unregistration").await?;
177
178 let key = RedisKeys::node(self.node_id());
180 conn.del::<_, ()>(&key)
181 .await
182 .with_context(|| format!("Failed to unregister node '{}' from Redis", self.node_id()))
183 .map_err(|e| Mecha10Error::MessagingError {
184 message: format!("{:#}", e),
185 suggestion: "Check Redis connection".to_string(),
186 })?;
187
188 Ok(())
189 }
190
191 #[cfg(not(feature = "messaging"))]
192 {
193 Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
194 }
195 }
196
197 async fn heartbeat(&self) -> Result<()> {
198 if let Some(mut metadata) = self.get_node(self.node_id()).await? {
200 metadata.last_heartbeat = current_timestamp();
201 self.register(metadata).await
202 } else {
203 self.register(NodeMetadata::new(self.node_id())).await
205 }
206 }
207
208 async fn discover_all(&self) -> Result<Vec<NodeMetadata>> {
209 #[cfg(feature = "messaging")]
210 {
211 use redis::AsyncCommands;
212
213 let mut conn = get_redis_connection("node discovery").await?;
214
215 let pattern = RedisKeys::all_nodes_pattern();
217 let keys: Vec<String> = conn
218 .keys(pattern)
219 .await
220 .with_context(|| format!("Failed to query all nodes with pattern '{}'", pattern))
221 .map_err(|e| Mecha10Error::MessagingError {
222 message: format!("{:#}", e),
223 suggestion: "Check Redis connection".to_string(),
224 })?;
225
226 let mut nodes = Vec::new();
227 for key in keys {
228 let json: String = conn
229 .get(&key)
230 .await
231 .with_context(|| format!("Failed to get node metadata from key '{}'", key))
232 .map_err(|e| Mecha10Error::MessagingError {
233 message: format!("{:#}", e),
234 suggestion: "Check Redis connection".to_string(),
235 })?;
236
237 if let Ok(metadata) = serde_json::from_str::<NodeMetadata>(&json) {
238 nodes.push(metadata);
239 }
240 }
241
242 Ok(nodes)
243 }
244
245 #[cfg(not(feature = "messaging"))]
246 {
247 Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
248 }
249 }
250
251 async fn discover_by_type(&self, node_type: &str) -> Result<Vec<NodeMetadata>> {
252 #[cfg(feature = "messaging")]
253 {
254 use redis::AsyncCommands;
255
256 let mut conn = get_redis_connection("type discovery").await?;
257
258 let type_key = RedisKeys::by_type(node_type);
260 let node_ids: Vec<String> = conn
261 .smembers(&type_key)
262 .await
263 .with_context(|| format!("Failed to query nodes of type '{}'", node_type))
264 .map_err(|e| Mecha10Error::MessagingError {
265 message: format!("{:#}", e),
266 suggestion: "Check Redis connection".to_string(),
267 })?;
268
269 let mut nodes = Vec::new();
270 for node_id in node_ids {
271 if let Some(metadata) = self.get_node(&node_id).await? {
272 nodes.push(metadata);
273 }
274 }
275
276 Ok(nodes)
277 }
278
279 #[cfg(not(feature = "messaging"))]
280 {
281 Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
282 }
283 }
284
285 async fn discover_publishers(&self, topic: &str) -> Result<Vec<NodeMetadata>> {
286 #[cfg(feature = "messaging")]
287 {
288 use redis::AsyncCommands;
289
290 let mut conn = get_redis_connection("publisher discovery").await?;
291
292 let pub_key = RedisKeys::publishers(topic);
293 let node_ids: Vec<String> = conn
294 .smembers(&pub_key)
295 .await
296 .with_context(|| format!("Failed to query publishers for topic '{}'", topic))
297 .map_err(|e| Mecha10Error::MessagingError {
298 message: format!("{:#}", e),
299 suggestion: "Check Redis connection".to_string(),
300 })?;
301
302 let mut nodes = Vec::new();
303 for node_id in node_ids {
304 if let Some(metadata) = self.get_node(&node_id).await? {
305 nodes.push(metadata);
306 }
307 }
308
309 Ok(nodes)
310 }
311
312 #[cfg(not(feature = "messaging"))]
313 {
314 Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
315 }
316 }
317
318 async fn discover_subscribers(&self, topic: &str) -> Result<Vec<NodeMetadata>> {
319 #[cfg(feature = "messaging")]
320 {
321 use redis::AsyncCommands;
322
323 let mut conn = get_redis_connection("subscriber discovery").await?;
324
325 let sub_key = RedisKeys::subscribers(topic);
326 let node_ids: Vec<String> = conn
327 .smembers(&sub_key)
328 .await
329 .with_context(|| format!("Failed to query subscribers for topic '{}'", topic))
330 .map_err(|e| Mecha10Error::MessagingError {
331 message: format!("{:#}", e),
332 suggestion: "Check Redis connection".to_string(),
333 })?;
334
335 let mut nodes = Vec::new();
336 for node_id in node_ids {
337 if let Some(metadata) = self.get_node(&node_id).await? {
338 nodes.push(metadata);
339 }
340 }
341
342 Ok(nodes)
343 }
344
345 #[cfg(not(feature = "messaging"))]
346 {
347 Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
348 }
349 }
350
351 async fn get_node(&self, node_id: &str) -> Result<Option<NodeMetadata>> {
352 #[cfg(feature = "messaging")]
353 {
354 use redis::AsyncCommands;
355
356 let mut conn = get_redis_connection("node lookup").await?;
357
358 let key = RedisKeys::node(node_id);
359 let json: Option<String> = conn
360 .get(&key)
361 .await
362 .with_context(|| format!("Failed to get metadata for node '{}'", node_id))
363 .map_err(|e| Mecha10Error::MessagingError {
364 message: format!("{:#}", e),
365 suggestion: "Check Redis connection".to_string(),
366 })?;
367
368 if let Some(json) = json {
369 let metadata = serde_json::from_str::<NodeMetadata>(&json)
370 .with_context(|| format!("Failed to parse metadata JSON for node '{}'", node_id))
371 .map_err(|e| Mecha10Error::Other(format!("{:#}", e)))?;
372 Ok(Some(metadata))
373 } else {
374 Ok(None)
375 }
376 }
377
378 #[cfg(not(feature = "messaging"))]
379 {
380 Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
381 }
382 }
383}