plexus_registry/
activation.rs1use crate::storage::RegistryStorage;
2use crate::types::{BackendSource, RegistryEvent, RegistryStorageConfig};
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::activation;
6use std::sync::Arc;
7
8#[derive(Clone)]
10pub struct Registry {
11 storage: Arc<RegistryStorage>,
12}
13
14impl Registry {
15 pub async fn new(config: RegistryStorageConfig) -> Result<Self, String> {
17 let storage = RegistryStorage::new(config).await?;
18
19 storage.load_config().await?;
21
22 Ok(Self {
23 storage: Arc::new(storage),
24 })
25 }
26
27 pub async fn with_defaults() -> Result<Self, String> {
29 Self::new(RegistryStorageConfig::default()).await
30 }
31
32 pub fn set_local_backend(&self, _name: String, _description: Option<String>) {
34 }
38}
39
40#[activation(
41 namespace = "registry",
42 version = "1.0.0",
43 description = "Backend discovery and registration service for Plexus hubs",
44 crate_path = "plexus_core"
45)]
46impl Registry {
47 #[plexus_macros::method(description = "Register a new Plexus backend for discovery")]
49 async fn register(
50 &self,
51 name: String,
52 host: String,
53 port: u16,
54 protocol: Option<String>,
55 description: Option<String>,
56 namespace: Option<String>,
57 ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
58 let storage = self.storage.clone();
59
60 stream! {
61 let protocol = protocol.unwrap_or_else(|| "ws".to_string());
62
63 match storage
64 .register(
65 name,
66 host,
67 port,
68 protocol,
69 description,
70 namespace,
71 BackendSource::Manual,
72 )
73 .await
74 {
75 Ok(backend) => {
76 yield RegistryEvent::BackendRegistered { backend };
77 }
78 Err(e) => {
79 tracing::error!("Failed to register backend: {}", e);
80 yield RegistryEvent::Error { message: e };
81 }
82 }
83 }
84 }
85
86 #[plexus_macros::method(description = "List all registered backends")]
88 async fn list(
89 &self,
90 active_only: Option<bool>,
91 ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
92 let storage = self.storage.clone();
93
94 stream! {
95 let active_only = active_only.unwrap_or(true);
96
97 match storage.list(active_only).await {
98 Ok(backends) => {
99 yield RegistryEvent::Backends { backends };
100 }
101 Err(e) => {
102 tracing::error!("Failed to list backends: {}", e);
103 yield RegistryEvent::Error { message: e };
104 }
105 }
106 }
107 }
108
109 #[plexus_macros::method(description = "Get information about a specific backend by name")]
111 async fn get(
112 &self,
113 name: String,
114 ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
115 let storage = self.storage.clone();
116
117 stream! {
118 match storage.get(&name).await {
119 Ok(backend) => {
120 yield RegistryEvent::Backend { backend };
121 }
122 Err(e) => {
123 tracing::error!("Failed to get backend {}: {}", name, e);
124 yield RegistryEvent::Error { message: e };
125 }
126 }
127 }
128 }
129
130 #[plexus_macros::method(description = "Update an existing backend's connection information")]
132 async fn update(
133 &self,
134 name: String,
135 host: Option<String>,
136 port: Option<u16>,
137 protocol: Option<String>,
138 description: Option<String>,
139 namespace: Option<String>,
140 ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
141 let storage = self.storage.clone();
142
143 stream! {
144 match storage.update(&name, host, port, protocol, description, namespace).await {
145 Ok(Some(backend)) => {
146 yield RegistryEvent::BackendUpdated { backend };
147 }
148 Ok(None) => {
149 yield RegistryEvent::Error {
150 message: format!("Backend not found: {}", name),
151 };
152 }
153 Err(e) => {
154 tracing::error!("Failed to update backend {}: {}", name, e);
155 yield RegistryEvent::Error { message: e };
156 }
157 }
158 }
159 }
160
161 #[plexus_macros::method(description = "Remove a backend from the registry")]
163 async fn delete(
164 &self,
165 name: String,
166 ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
167 let storage = self.storage.clone();
168
169 stream! {
170 match storage.delete(&name).await {
171 Ok(true) => {
172 yield RegistryEvent::BackendDeleted { name };
173 }
174 Ok(false) => {
175 yield RegistryEvent::Error {
176 message: format!("Backend not found: {}", name),
177 };
178 }
179 Err(e) => {
180 tracing::error!("Failed to delete backend {}: {}", name, e);
181 yield RegistryEvent::Error { message: e };
182 }
183 }
184 }
185 }
186
187 #[plexus_macros::method(description = "Update the health check timestamp for a backend")]
189 async fn ping(
190 &self,
191 name: String,
192 ) -> impl Stream<Item = RegistryEvent> + Send + 'static {
193 let storage = self.storage.clone();
194
195 stream! {
196 match storage.ping(&name).await {
197 Ok(true) => {
198 yield RegistryEvent::Ping {
199 name,
200 success: true,
201 message: "Backend health check updated".to_string(),
202 };
203 }
204 Ok(false) => {
205 yield RegistryEvent::Ping {
206 name: name.clone(),
207 success: false,
208 message: format!("Backend not found: {}", name),
209 };
210 }
211 Err(e) => {
212 tracing::error!("Failed to ping backend {}: {}", name, e);
213 yield RegistryEvent::Error { message: e };
214 }
215 }
216 }
217 }
218
219 #[plexus_macros::method(description = "Reload backends from the configuration file")]
221 async fn reload(&self) -> impl Stream<Item = RegistryEvent> + Send + 'static {
222 let storage = self.storage.clone();
223
224 stream! {
225 match storage.reload_config().await {
226 Ok(count) => {
227 yield RegistryEvent::Reloaded { count };
228 }
229 Err(e) => {
230 tracing::error!("Failed to reload config: {}", e);
231 yield RegistryEvent::Error { message: e };
232 }
233 }
234 }
235 }
236}