Skip to main content

plexus_registry/
activation.rs

1use crate::storage::RegistryStorage;
2use crate::types::{BackendSource, RegistryEvent, RegistryStorageConfig};
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::hub_methods;
6use std::sync::Arc;
7
8/// Registry activation - backend discovery and registration service
9#[derive(Clone)]
10pub struct Registry {
11    storage: Arc<RegistryStorage>,
12}
13
14impl Registry {
15    /// Create a new Registry instance with the given configuration
16    pub async fn new(config: RegistryStorageConfig) -> Result<Self, String> {
17        let storage = RegistryStorage::new(config).await?;
18
19        // Load backends from config file
20        storage.load_config().await?;
21
22        Ok(Self {
23            storage: Arc::new(storage),
24        })
25    }
26
27    /// Create with default configuration
28    pub async fn with_defaults() -> Result<Self, String> {
29        Self::new(RegistryStorageConfig::default()).await
30    }
31
32    /// Set the local backend name (called by the host)
33    pub fn set_local_backend(&self, _name: String, _description: Option<String>) {
34        // Store in memory - this is the backend hosting this registry instance
35        // We can't get this from the DB since the registry doesn't know who's hosting it
36        // TODO: implement local backend tracking
37    }
38}
39
40#[hub_methods(
41    namespace = "registry",
42    version = "1.0.0",
43    description = "Backend discovery and registration service for Plexus hubs"
44)]
45impl Registry {
46    /// Register a new backend
47    #[plexus_macros::hub_method(description = "Register a new Plexus backend for discovery")]
48    async fn register(
49        &self,
50        name: String,
51        host: String,
52        port: u16,
53        protocol: Option<String>,
54        description: Option<String>,
55        namespace: Option<String>,
56    ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
57        let storage = self.storage.clone();
58
59        stream! {
60            let protocol = protocol.unwrap_or_else(|| "ws".to_string());
61
62            match storage
63                .register(
64                    name,
65                    host,
66                    port,
67                    protocol,
68                    description,
69                    namespace,
70                    BackendSource::Manual,
71                )
72                .await
73            {
74                Ok(backend) => {
75                    yield RegistryEvent::BackendRegistered { backend };
76                }
77                Err(e) => {
78                    tracing::error!("Failed to register backend: {}", e);
79                    yield RegistryEvent::Error { message: e };
80                }
81            }
82        }
83    }
84
85    /// List all registered backends
86    #[plexus_macros::hub_method(description = "List all registered backends")]
87    async fn list(
88        &self,
89        active_only: Option<bool>,
90    ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
91        let storage = self.storage.clone();
92
93        stream! {
94            let active_only = active_only.unwrap_or(true);
95
96            match storage.list(active_only).await {
97                Ok(backends) => {
98                    yield RegistryEvent::Backends { backends };
99                }
100                Err(e) => {
101                    tracing::error!("Failed to list backends: {}", e);
102                    yield RegistryEvent::Error { message: e };
103                }
104            }
105        }
106    }
107
108    /// Get a specific backend by name
109    #[plexus_macros::hub_method(description = "Get information about a specific backend by name")]
110    async fn get(
111        &self,
112        name: String,
113    ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
114        let storage = self.storage.clone();
115
116        stream! {
117            match storage.get(&name).await {
118                Ok(backend) => {
119                    yield RegistryEvent::Backend { backend };
120                }
121                Err(e) => {
122                    tracing::error!("Failed to get backend {}: {}", name, e);
123                    yield RegistryEvent::Error { message: e };
124                }
125            }
126        }
127    }
128
129    /// Update an existing backend
130    #[plexus_macros::hub_method(description = "Update an existing backend's connection information")]
131    async fn update(
132        &self,
133        name: String,
134        host: Option<String>,
135        port: Option<u16>,
136        protocol: Option<String>,
137        description: Option<String>,
138        namespace: Option<String>,
139    ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
140        let storage = self.storage.clone();
141
142        stream! {
143            match storage.update(&name, host, port, protocol, description, namespace).await {
144                Ok(Some(backend)) => {
145                    yield RegistryEvent::BackendUpdated { backend };
146                }
147                Ok(None) => {
148                    yield RegistryEvent::Error {
149                        message: format!("Backend not found: {}", name),
150                    };
151                }
152                Err(e) => {
153                    tracing::error!("Failed to update backend {}: {}", name, e);
154                    yield RegistryEvent::Error { message: e };
155                }
156            }
157        }
158    }
159
160    /// Delete a backend
161    #[plexus_macros::hub_method(description = "Remove a backend from the registry")]
162    async fn delete(
163        &self,
164        name: String,
165    ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
166        let storage = self.storage.clone();
167
168        stream! {
169            match storage.delete(&name).await {
170                Ok(true) => {
171                    yield RegistryEvent::BackendDeleted { name };
172                }
173                Ok(false) => {
174                    yield RegistryEvent::Error {
175                        message: format!("Backend not found: {}", name),
176                    };
177                }
178                Err(e) => {
179                    tracing::error!("Failed to delete backend {}: {}", name, e);
180                    yield RegistryEvent::Error { message: e };
181                }
182            }
183        }
184    }
185
186    /// Ping a backend to update its last_seen timestamp
187    #[plexus_macros::hub_method(description = "Update the health check timestamp for a backend")]
188    async fn ping(
189        &self,
190        name: String,
191    ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
192        let storage = self.storage.clone();
193
194        stream! {
195            match storage.ping(&name).await {
196                Ok(true) => {
197                    yield RegistryEvent::Ping {
198                        name,
199                        success: true,
200                        message: "Backend health check updated".to_string(),
201                    };
202                }
203                Ok(false) => {
204                    yield RegistryEvent::Ping {
205                        name: name.clone(),
206                        success: false,
207                        message: format!("Backend not found: {}", name),
208                    };
209                }
210                Err(e) => {
211                    tracing::error!("Failed to ping backend {}: {}", name, e);
212                    yield RegistryEvent::Error { message: e };
213                }
214            }
215        }
216    }
217
218    /// Reload the configuration file
219    #[plexus_macros::hub_method(description = "Reload backends from the configuration file")]
220    async fn reload(&self) -> impl Stream<Item = RegistryEvent> + Send + 'static {
221        let storage = self.storage.clone();
222
223        stream! {
224            match storage.reload_config().await {
225                Ok(count) => {
226                    yield RegistryEvent::Reloaded { count };
227                }
228                Err(e) => {
229                    tracing::error!("Failed to reload config: {}", e);
230                    yield RegistryEvent::Error { message: e };
231                }
232            }
233        }
234    }
235}