actr_cli/core/components/
service_discovery.rs1use crate::core::{
2 AvailabilityStatus, HealthStatus, ProtoFile, ServiceDetails, ServiceDiscovery, ServiceFilter,
3 ServiceInfo,
4};
5use actr_config::Config;
6use actr_protocol::ActrTypeExt;
7use actr_protocol::{
8 AIdCredential, ActrId, ActrToSignaling, ActrType, DiscoveryRequest, ErrorResponse,
9 GetServiceSpecRequest, PeerToSignaling, RegisterRequest, SignalingEnvelope, actr_to_signaling,
10 discovery_response, get_service_spec_response, peer_to_signaling, register_response,
11 signaling_envelope, signaling_to_actr,
12};
13use anyhow::{Context, Result, anyhow};
14use async_trait::async_trait;
15use futures_util::{SinkExt, StreamExt};
16use prost::Message;
17use std::path::PathBuf;
18use std::time::SystemTime;
19use tokio::sync::Mutex;
20use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
21
22type SignalingSocket =
23 tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
24
25struct SignalingState {
26 socket: SignalingSocket,
27 actr_id: ActrId,
28 credential: AIdCredential,
29}
30
31pub struct NetworkServiceDiscovery {
32 config: Config,
33 state: Mutex<Option<SignalingState>>,
34}
35
36impl NetworkServiceDiscovery {
37 pub fn new(config: Config) -> Self {
38 Self {
39 config,
40 state: Mutex::new(None),
41 }
42 }
43
44 fn format_actr_type(actr_type: &ActrType) -> String {
45 actr_type.to_string_repr()
46 }
47
48 async fn ensure_connected(&self) -> Result<()> {
49 let mut state_guard = self.state.lock().await;
50 if state_guard.is_some() {
51 return Ok(());
52 }
53
54 let state = self.connect_and_register().await?;
55 *state_guard = Some(state);
56 Ok(())
57 }
58
59 async fn discover_entries(
61 &self,
62 _filter: Option<&ServiceFilter>,
63 ) -> Result<Vec<discovery_response::TypeEntry>> {
64 self.ensure_connected().await?;
65 let mut state_guard = self.state.lock().await;
66 let state = state_guard
67 .as_mut()
68 .context("Signaling state not initialized")?;
69
70 let request = DiscoveryRequest {
72 manufacturer: None,
73 limit: None,
74 };
75 let payload = actr_to_signaling::Payload::DiscoveryRequest(request);
76 let envelope =
77 Self::build_envelope(signaling_envelope::Flow::ActrToServer(ActrToSignaling {
78 source: state.actr_id.clone(),
79 credential: state.credential.clone(),
80 payload: Some(payload),
81 }))?;
82
83 let result = match Self::send_envelope(&mut state.socket, envelope).await {
84 Ok(()) => loop {
85 let envelope = Self::read_envelope(&mut state.socket).await?;
86 match envelope.flow {
87 Some(signaling_envelope::Flow::ServerToActr(server)) => match server.payload {
88 Some(signaling_to_actr::Payload::DiscoveryResponse(response)) => {
89 break Self::handle_discovery_response(response);
90 }
91 Some(signaling_to_actr::Payload::Error(error)) => {
92 break Err(Self::as_error("Discovery failed", &error));
93 }
94 _ => {}
95 },
96 Some(signaling_envelope::Flow::EnvelopeError(error)) => {
97 break Err(Self::as_error("Discovery failed", &error));
98 }
99 _ => {}
100 }
101 },
102 Err(err) => Err(err),
103 };
104 if result.is_err() {
105 *state_guard = None;
106 }
107 result
108 }
109
110 fn handle_discovery_response(
111 response: actr_protocol::DiscoveryResponse,
112 ) -> Result<Vec<discovery_response::TypeEntry>> {
113 match response.result {
114 Some(discovery_response::Result::Success(success)) => Ok(success.entries),
115 Some(discovery_response::Result::Error(error)) => {
116 Err(Self::as_error("Discovery failed", &error))
117 }
118 None => Err(anyhow!("Discovery response is missing result")),
119 }
120 }
121
122 async fn connect_and_register(&self) -> Result<SignalingState> {
123 let signaling_url = self.config.signaling_url.as_str();
124 let (mut socket, _) = connect_async(signaling_url)
125 .await
126 .with_context(|| format!("Failed to connect to signaling: {signaling_url}"))?;
127
128 let register_request = RegisterRequest {
129 actr_type: self.config.package.actr_type.clone(),
130 realm: self.config.realm,
131 service_spec: None,
132 acl: None,
133 };
134
135 let envelope =
136 Self::build_envelope(signaling_envelope::Flow::PeerToServer(PeerToSignaling {
137 payload: Some(peer_to_signaling::Payload::RegisterRequest(
138 register_request,
139 )),
140 }))?;
141
142 Self::send_envelope(&mut socket, envelope).await?;
143
144 let (actr_id, credential) = loop {
145 let envelope = Self::read_envelope(&mut socket).await?;
146 match envelope.flow {
147 Some(signaling_envelope::Flow::ServerToActr(server)) => match server.payload {
148 Some(signaling_to_actr::Payload::RegisterResponse(response)) => {
149 match response.result {
150 Some(register_response::Result::Success(success)) => {
151 break (success.actr_id, success.credential);
152 }
153 Some(register_response::Result::Error(error)) => {
154 return Err(Self::as_error("Register failed", &error));
155 }
156 None => return Err(anyhow!("Register response is missing result")),
157 }
158 }
159 Some(signaling_to_actr::Payload::Error(error)) => {
160 return Err(Self::as_error("Register failed", &error));
161 }
162 _ => {}
163 },
164 Some(signaling_envelope::Flow::EnvelopeError(error)) => {
165 return Err(Self::as_error("Register failed", &error));
166 }
167 _ => {}
168 }
169 };
170
171 Ok(SignalingState {
172 socket,
173 actr_id,
174 credential,
175 })
176 }
177
178 fn as_error(context: &str, error: &ErrorResponse) -> anyhow::Error {
179 anyhow!("{context}: {} ({})", error.message, error.code)
180 }
181
182 async fn send_envelope(
183 socket: &mut SignalingSocket,
184 envelope: SignalingEnvelope,
185 ) -> Result<()> {
186 let mut buf = Vec::new();
187 envelope
188 .encode(&mut buf)
189 .context("Failed to encode signaling envelope")?;
190 socket
191 .send(WsMessage::Binary(buf.into()))
192 .await
193 .context("Failed to send signaling envelope")?;
194 Ok(())
195 }
196
197 async fn read_envelope(socket: &mut SignalingSocket) -> Result<SignalingEnvelope> {
198 while let Some(message) = socket.next().await {
199 match message.context("Failed to read signaling response")? {
200 WsMessage::Binary(bytes) => {
201 return SignalingEnvelope::decode(bytes)
202 .context("Failed to decode signaling envelope");
203 }
204 WsMessage::Close(_) => {
205 return Err(anyhow!("Signaling connection closed"));
206 }
207 WsMessage::Ping(_) | WsMessage::Pong(_) => {}
208 WsMessage::Text(text) => {
209 return Err(anyhow!("Unexpected text message from signaling: {text}"));
210 }
211 WsMessage::Frame(_) => {}
212 }
213 }
214
215 Err(anyhow!("Signaling connection closed"))
216 }
217
218 fn build_envelope(flow: signaling_envelope::Flow) -> Result<SignalingEnvelope> {
219 Ok(SignalingEnvelope {
220 envelope_version: 1,
221 envelope_id: uuid::Uuid::new_v4().to_string(),
222 reply_for: None,
223 timestamp: prost_types::Timestamp {
224 seconds: chrono::Utc::now().timestamp(),
225 nanos: 0,
226 },
227 traceparent: None,
228 tracestate: None,
229 flow: Some(flow),
230 })
231 }
232
233 fn select_version(entry: &discovery_response::TypeEntry) -> String {
234 entry
235 .tags
236 .iter()
237 .find(|tag| tag.as_str() == "latest")
238 .cloned()
239 .or_else(|| entry.tags.first().cloned())
240 .unwrap_or_else(|| "unknown".to_string())
241 }
242
243 fn matches_filter(entry: &discovery_response::TypeEntry, filter: &ServiceFilter) -> bool {
244 if let Some(pattern) = &filter.name_pattern {
245 let full_name = Self::format_actr_type(&entry.actr_type);
246 let matches = Self::matches_pattern(&entry.name, pattern)
247 || Self::matches_pattern(&full_name, pattern);
248 if !matches {
249 return false;
250 }
251 }
252
253 if let Some(version_range) = &filter.version_range
254 && Self::select_version(entry) != *version_range
255 && !entry.tags.iter().any(|tag| tag == version_range)
256 {
257 return false;
258 }
259
260 if let Some(tags) = &filter.tags {
261 let has_all = tags.iter().all(|tag| entry.tags.iter().any(|t| t == tag));
262 if !has_all {
263 return false;
264 }
265 }
266
267 true
268 }
269
270 fn matches_pattern(value: &str, pattern: &str) -> bool {
271 if pattern == "*" {
272 return true;
273 }
274
275 let segments: Vec<&str> = pattern.split('*').collect();
276 if segments.len() == 1 {
277 return value == pattern;
278 }
279
280 if !pattern.starts_with('*')
281 && let Some(first) = segments.first()
282 && !value.starts_with(first)
283 {
284 return false;
285 }
286
287 if !pattern.ends_with('*')
288 && let Some(last) = segments.last()
289 && !value.ends_with(last)
290 {
291 return false;
292 }
293
294 let mut search_start = 0;
295 let end_limit = if !pattern.ends_with('*') {
296 value
297 .len()
298 .saturating_sub(segments.last().unwrap_or(&"").len())
299 } else {
300 value.len()
301 };
302
303 for (index, segment) in segments.iter().enumerate() {
304 if segment.is_empty() {
305 continue;
306 }
307 if index == 0 && !pattern.starts_with('*') {
308 search_start = segment.len();
309 continue;
310 }
311 if index == segments.len() - 1 && !pattern.ends_with('*') {
312 continue;
313 }
314 if let Some(found) = value[search_start..end_limit].find(segment) {
315 search_start += found + segment.len();
316 } else {
317 return false;
318 }
319 }
320
321 true
322 }
323}
324
325#[async_trait]
326impl ServiceDiscovery for NetworkServiceDiscovery {
327 async fn discover_services(&self, filter: Option<&ServiceFilter>) -> Result<Vec<ServiceInfo>> {
328 let entries = self.discover_entries(filter).await?;
329 let services = entries
330 .into_iter()
331 .filter(|entry| match filter {
332 Some(filter) => Self::matches_filter(entry, filter),
333 None => true,
334 })
335 .map(ServiceInfo::from)
336 .collect();
337 Ok(services)
338 }
339
340 async fn get_service_details(&self, name: &str) -> Result<ServiceDetails> {
341 let entries = self.discover_entries(None).await?;
342 let entry = entries
343 .into_iter()
344 .find(|entry| entry.name == name || Self::format_actr_type(&entry.actr_type) == name);
345
346 let entry = entry.ok_or_else(|| anyhow!("Service not found: {name}"))?;
347 let info = ServiceInfo::from(entry.clone());
348
349 let proto_files = match self.get_service_proto(&entry.name).await {
351 Ok(proto_files) => proto_files,
352 Err(e) => {
353 tracing::warn!("Failed to get ServiceSpec for {name}: {e}");
354 Vec::new()
355 }
356 };
357
358 Ok(ServiceDetails {
359 info,
360 proto_files,
361 dependencies: Vec::new(),
362 })
363 }
364
365 async fn check_service_availability(&self, name: &str) -> Result<AvailabilityStatus> {
367 let entries = self.discover_entries(None).await?;
368 let available = entries.iter().any(|entry| entry.name == name);
369
370 Ok(AvailabilityStatus {
371 is_available: available,
372 last_seen: available.then(SystemTime::now),
373 health: if available {
374 HealthStatus::Healthy
375 } else {
376 HealthStatus::Unknown
377 },
378 })
379 }
380
381 async fn get_service_proto(&self, name: &str) -> Result<Vec<ProtoFile>> {
382 self.ensure_connected().await?;
383 let mut state_guard = self.state.lock().await;
384 let state = state_guard
385 .as_mut()
386 .context("Signaling state not initialized")?;
387
388 let request = GetServiceSpecRequest {
389 name: name.to_string(),
390 };
391 let payload = actr_to_signaling::Payload::GetServiceSpecRequest(request);
392 let envelope =
393 Self::build_envelope(signaling_envelope::Flow::ActrToServer(ActrToSignaling {
394 source: state.actr_id.clone(),
395 credential: state.credential.clone(),
396 payload: Some(payload),
397 }))?;
398
399 let result = match Self::send_envelope(&mut state.socket, envelope).await {
400 Ok(()) => loop {
401 let envelope = Self::read_envelope(&mut state.socket).await?;
402 match envelope.flow {
403 Some(signaling_envelope::Flow::ServerToActr(server)) => match server.payload {
404 Some(signaling_to_actr::Payload::GetServiceSpecResponse(response)) => {
405 let proto_files = match response.result {
406 Some(get_service_spec_response::Result::Success(success)) => {
407 success
408 .protobufs
409 .into_iter()
410 .map(|p| ProtoFile {
411 name: format!("{}.proto", p.package),
412 path: PathBuf::new(),
413 content: p.content,
414 services: Vec::new(),
415 })
416 .collect()
417 }
418 Some(get_service_spec_response::Result::Error(error)) => {
419 break Err(Self::as_error("Get service spec failed", &error));
420 }
421 None => {
422 break Err(anyhow!(
423 "Get service spec response is missing result"
424 ));
425 }
426 };
427 break Ok(proto_files);
428 }
429 Some(signaling_to_actr::Payload::Error(error)) => {
430 break Err(Self::as_error("Get service spec failed", &error));
431 }
432 _ => {}
433 },
434 Some(signaling_envelope::Flow::EnvelopeError(error)) => {
435 break Err(Self::as_error("Get service spec failed", &error));
436 }
437 _ => {}
438 }
439 },
440 Err(err) => Err(err),
441 };
442
443 if result.is_err() {
444 *state_guard = None;
445 }
446
447 result
448 }
449}