plexus_registry/
activation.rs1use crate::storage::RegistryStorage;
2use crate::types::{BackendSource, RegistryEvent, RegistryStorageConfig};
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::hub_methods;
6use std::sync::Arc;
7
8#[derive(Clone)]
10pub struct Registry {
11 storage: Arc<RegistryStorage>,
12}
13
14impl Registry {
15 pub async fn new(config: RegistryStorageConfig) -> Result<Self, String> {
17 let storage = RegistryStorage::new(config).await?;
18
19 storage.load_config().await?;
21
22 Ok(Self {
23 storage: Arc::new(storage),
24 })
25 }
26
27 pub async fn with_defaults() -> Result<Self, String> {
29 Self::new(RegistryStorageConfig::default()).await
30 }
31
32 pub fn set_local_backend(&self, _name: String, _description: Option<String>) {
34 }
38}
39
40#[hub_methods(
41 namespace = "registry",
42 version = "1.0.0",
43 description = "Backend discovery and registration service for Plexus hubs"
44)]
45impl Registry {
46 #[plexus_macros::hub_method(description = "Register a new Plexus backend for discovery")]
48 async fn register(
49 &self,
50 name: String,
51 host: String,
52 port: u16,
53 protocol: Option<String>,
54 description: Option<String>,
55 namespace: Option<String>,
56 ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
57 let storage = self.storage.clone();
58
59 stream! {
60 let protocol = protocol.unwrap_or_else(|| "ws".to_string());
61
62 match storage
63 .register(
64 name,
65 host,
66 port,
67 protocol,
68 description,
69 namespace,
70 BackendSource::Manual,
71 )
72 .await
73 {
74 Ok(backend) => {
75 yield RegistryEvent::BackendRegistered { backend };
76 }
77 Err(e) => {
78 tracing::error!("Failed to register backend: {}", e);
79 yield RegistryEvent::Error { message: e };
80 }
81 }
82 }
83 }
84
85 #[plexus_macros::hub_method(description = "List all registered backends")]
87 async fn list(
88 &self,
89 active_only: Option<bool>,
90 ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
91 let storage = self.storage.clone();
92
93 stream! {
94 let active_only = active_only.unwrap_or(true);
95
96 match storage.list(active_only).await {
97 Ok(backends) => {
98 yield RegistryEvent::Backends { backends };
99 }
100 Err(e) => {
101 tracing::error!("Failed to list backends: {}", e);
102 yield RegistryEvent::Error { message: e };
103 }
104 }
105 }
106 }
107
108 #[plexus_macros::hub_method(description = "Get information about a specific backend by name")]
110 async fn get(
111 &self,
112 name: String,
113 ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
114 let storage = self.storage.clone();
115
116 stream! {
117 match storage.get(&name).await {
118 Ok(backend) => {
119 yield RegistryEvent::Backend { backend };
120 }
121 Err(e) => {
122 tracing::error!("Failed to get backend {}: {}", name, e);
123 yield RegistryEvent::Error { message: e };
124 }
125 }
126 }
127 }
128
129 #[plexus_macros::hub_method(description = "Update an existing backend's connection information")]
131 async fn update(
132 &self,
133 name: String,
134 host: Option<String>,
135 port: Option<u16>,
136 protocol: Option<String>,
137 description: Option<String>,
138 namespace: Option<String>,
139 ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
140 let storage = self.storage.clone();
141
142 stream! {
143 match storage.update(&name, host, port, protocol, description, namespace).await {
144 Ok(Some(backend)) => {
145 yield RegistryEvent::BackendUpdated { backend };
146 }
147 Ok(None) => {
148 yield RegistryEvent::Error {
149 message: format!("Backend not found: {}", name),
150 };
151 }
152 Err(e) => {
153 tracing::error!("Failed to update backend {}: {}", name, e);
154 yield RegistryEvent::Error { message: e };
155 }
156 }
157 }
158 }
159
160 #[plexus_macros::hub_method(description = "Remove a backend from the registry")]
162 async fn delete(
163 &self,
164 name: String,
165 ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
166 let storage = self.storage.clone();
167
168 stream! {
169 match storage.delete(&name).await {
170 Ok(true) => {
171 yield RegistryEvent::BackendDeleted { name };
172 }
173 Ok(false) => {
174 yield RegistryEvent::Error {
175 message: format!("Backend not found: {}", name),
176 };
177 }
178 Err(e) => {
179 tracing::error!("Failed to delete backend {}: {}", name, e);
180 yield RegistryEvent::Error { message: e };
181 }
182 }
183 }
184 }
185
186 #[plexus_macros::hub_method(description = "Update the health check timestamp for a backend")]
188 async fn ping(
189 &self,
190 name: String,
191 ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
192 let storage = self.storage.clone();
193
194 stream! {
195 match storage.ping(&name).await {
196 Ok(true) => {
197 yield RegistryEvent::Ping {
198 name,
199 success: true,
200 message: "Backend health check updated".to_string(),
201 };
202 }
203 Ok(false) => {
204 yield RegistryEvent::Ping {
205 name: name.clone(),
206 success: false,
207 message: format!("Backend not found: {}", name),
208 };
209 }
210 Err(e) => {
211 tracing::error!("Failed to ping backend {}: {}", name, e);
212 yield RegistryEvent::Error { message: e };
213 }
214 }
215 }
216 }
217
218 #[plexus_macros::hub_method(description = "Reload backends from the configuration file")]
220 async fn reload(&self) -> impl Stream<Item = RegistryEvent> + Send + 'static {
221 let storage = self.storage.clone();
222
223 stream! {
224 match storage.reload_config().await {
225 Ok(count) => {
226 yield RegistryEvent::Reloaded { count };
227 }
228 Err(e) => {
229 tracing::error!("Failed to reload config: {}", e);
230 yield RegistryEvent::Error { message: e };
231 }
232 }
233 }
234 }
235}