Skip to main content

plexus_registry/
activation.rs

1use crate::storage::RegistryStorage;
2use crate::types::{BackendSource, RegistryEvent, RegistryStorageConfig};
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::activation;
6use std::sync::Arc;
7
8/// Registry activation - backend discovery and registration service
9#[derive(Clone)]
10pub struct Registry {
11    storage: Arc<RegistryStorage>,
12}
13
14impl Registry {
15    /// Create a new Registry instance with the given configuration
16    pub async fn new(config: RegistryStorageConfig) -> Result<Self, String> {
17        let storage = RegistryStorage::new(config).await?;
18
19        // Load backends from config file
20        storage.load_config().await?;
21
22        Ok(Self {
23            storage: Arc::new(storage),
24        })
25    }
26
27    /// Create with default configuration
28    pub async fn with_defaults() -> Result<Self, String> {
29        Self::new(RegistryStorageConfig::default()).await
30    }
31
32    /// Set the local backend name (called by the host)
33    pub fn set_local_backend(&self, _name: String, _description: Option<String>) {
34        // Store in memory - this is the backend hosting this registry instance
35        // We can't get this from the DB since the registry doesn't know who's hosting it
36        // TODO: implement local backend tracking
37    }
38}
39
40#[activation(
41    namespace = "registry",
42    version = "1.0.0",
43    description = "Backend discovery and registration service for Plexus hubs",
44    crate_path = "plexus_core"
45)]
46impl Registry {
47    /// Register a new backend
48    #[plexus_macros::method(description = "Register a new Plexus backend for discovery")]
49    async fn register(
50        &self,
51        name: String,
52        host: String,
53        port: u16,
54        protocol: Option<String>,
55        description: Option<String>,
56        namespace: Option<String>,
57    ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
58        let storage = self.storage.clone();
59
60        stream! {
61            let protocol = protocol.unwrap_or_else(|| "ws".to_string());
62
63            match storage
64                .register(
65                    name,
66                    host,
67                    port,
68                    protocol,
69                    description,
70                    namespace,
71                    BackendSource::Manual,
72                )
73                .await
74            {
75                Ok(backend) => {
76                    yield RegistryEvent::BackendRegistered { backend };
77                }
78                Err(e) => {
79                    tracing::error!("Failed to register backend: {}", e);
80                    yield RegistryEvent::Error { message: e };
81                }
82            }
83        }
84    }
85
86    /// List all registered backends
87    #[plexus_macros::method(description = "List all registered backends")]
88    async fn list(
89        &self,
90        active_only: Option<bool>,
91    ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
92        let storage = self.storage.clone();
93
94        stream! {
95            let active_only = active_only.unwrap_or(true);
96
97            match storage.list(active_only).await {
98                Ok(backends) => {
99                    yield RegistryEvent::Backends { backends };
100                }
101                Err(e) => {
102                    tracing::error!("Failed to list backends: {}", e);
103                    yield RegistryEvent::Error { message: e };
104                }
105            }
106        }
107    }
108
109    /// Get a specific backend by name
110    #[plexus_macros::method(description = "Get information about a specific backend by name")]
111    async fn get(
112        &self,
113        name: String,
114    ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
115        let storage = self.storage.clone();
116
117        stream! {
118            match storage.get(&name).await {
119                Ok(backend) => {
120                    yield RegistryEvent::Backend { backend };
121                }
122                Err(e) => {
123                    tracing::error!("Failed to get backend {}: {}", name, e);
124                    yield RegistryEvent::Error { message: e };
125                }
126            }
127        }
128    }
129
130    /// Update an existing backend
131    #[plexus_macros::method(description = "Update an existing backend's connection information")]
132    async fn update(
133        &self,
134        name: String,
135        host: Option<String>,
136        port: Option<u16>,
137        protocol: Option<String>,
138        description: Option<String>,
139        namespace: Option<String>,
140    ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
141        let storage = self.storage.clone();
142
143        stream! {
144            match storage.update(&name, host, port, protocol, description, namespace).await {
145                Ok(Some(backend)) => {
146                    yield RegistryEvent::BackendUpdated { backend };
147                }
148                Ok(None) => {
149                    yield RegistryEvent::Error {
150                        message: format!("Backend not found: {}", name),
151                    };
152                }
153                Err(e) => {
154                    tracing::error!("Failed to update backend {}: {}", name, e);
155                    yield RegistryEvent::Error { message: e };
156                }
157            }
158        }
159    }
160
161    /// Delete a backend
162    #[plexus_macros::method(description = "Remove a backend from the registry")]
163    async fn delete(
164        &self,
165        name: String,
166    ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
167        let storage = self.storage.clone();
168
169        stream! {
170            match storage.delete(&name).await {
171                Ok(true) => {
172                    yield RegistryEvent::BackendDeleted { name };
173                }
174                Ok(false) => {
175                    yield RegistryEvent::Error {
176                        message: format!("Backend not found: {}", name),
177                    };
178                }
179                Err(e) => {
180                    tracing::error!("Failed to delete backend {}: {}", name, e);
181                    yield RegistryEvent::Error { message: e };
182                }
183            }
184        }
185    }
186
187    /// Ping a backend to update its last_seen timestamp
188    #[plexus_macros::method(description = "Update the health check timestamp for a backend")]
189    async fn ping(
190        &self,
191        name: String,
192    ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
193        let storage = self.storage.clone();
194
195        stream! {
196            match storage.ping(&name).await {
197                Ok(true) => {
198                    yield RegistryEvent::Ping {
199                        name,
200                        success: true,
201                        message: "Backend health check updated".to_string(),
202                    };
203                }
204                Ok(false) => {
205                    yield RegistryEvent::Ping {
206                        name: name.clone(),
207                        success: false,
208                        message: format!("Backend not found: {}", name),
209                    };
210                }
211                Err(e) => {
212                    tracing::error!("Failed to ping backend {}: {}", name, e);
213                    yield RegistryEvent::Error { message: e };
214                }
215            }
216        }
217    }
218
219    /// Reload the configuration file
220    #[plexus_macros::method(description = "Reload backends from the configuration file")]
221    async fn reload(&self) -> impl Stream<Item = RegistryEvent> + Send + 'static {
222        let storage = self.storage.clone();
223
224        stream! {
225            match storage.reload_config().await {
226                Ok(count) => {
227                    yield RegistryEvent::Reloaded { count };
228                }
229                Err(e) => {
230                    tracing::error!("Failed to reload config: {}", e);
231                    yield RegistryEvent::Error { message: e };
232                }
233            }
234        }
235    }
236}