Skip to main content

actr_cli/core/components/
service_discovery.rs

1use crate::core::{
2    AvailabilityStatus, HealthStatus, ProtoFile, ServiceDetails, ServiceDiscovery, ServiceFilter,
3    ServiceInfo,
4};
5use actr_config::Config;
6use actr_protocol::ActrTypeExt;
7use actr_protocol::{
8    AIdCredential, ActrId, ActrToSignaling, ActrType, DiscoveryRequest, ErrorResponse,
9    GetServiceSpecRequest, PeerToSignaling, RegisterRequest, SignalingEnvelope, actr_to_signaling,
10    discovery_response, get_service_spec_response, peer_to_signaling, register_response,
11    signaling_envelope, signaling_to_actr,
12};
13use anyhow::{Context, Result, anyhow};
14use async_trait::async_trait;
15use futures_util::{SinkExt, StreamExt};
16use prost::Message;
17use std::path::PathBuf;
18use std::time::SystemTime;
19use tokio::sync::Mutex;
20use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
21
22type SignalingSocket =
23    tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
24
25struct SignalingState {
26    socket: SignalingSocket,
27    actr_id: ActrId,
28    credential: AIdCredential,
29}
30
31pub struct NetworkServiceDiscovery {
32    config: Config,
33    state: Mutex<Option<SignalingState>>,
34}
35
36impl NetworkServiceDiscovery {
37    pub fn new(config: Config) -> Self {
38        Self {
39            config,
40            state: Mutex::new(None),
41        }
42    }
43
44    fn format_actr_type(actr_type: &ActrType) -> String {
45        actr_type.to_string_repr()
46    }
47
48    async fn ensure_connected(&self) -> Result<()> {
49        let mut state_guard = self.state.lock().await;
50        if state_guard.is_some() {
51            return Ok(());
52        }
53
54        let state = self.connect_and_register().await?;
55        *state_guard = Some(state);
56        Ok(())
57    }
58
59    // TODO: add filter support
60    async fn discover_entries(
61        &self,
62        _filter: Option<&ServiceFilter>,
63    ) -> Result<Vec<discovery_response::TypeEntry>> {
64        self.ensure_connected().await?;
65        let mut state_guard = self.state.lock().await;
66        let state = state_guard
67            .as_mut()
68            .context("Signaling state not initialized")?;
69
70        // TODO: add filter support
71        let request = DiscoveryRequest {
72            manufacturer: None,
73            limit: None,
74        };
75        let payload = actr_to_signaling::Payload::DiscoveryRequest(request);
76        let envelope =
77            Self::build_envelope(signaling_envelope::Flow::ActrToServer(ActrToSignaling {
78                source: state.actr_id.clone(),
79                credential: state.credential.clone(),
80                payload: Some(payload),
81            }))?;
82
83        let result = match Self::send_envelope(&mut state.socket, envelope).await {
84            Ok(()) => loop {
85                let envelope = Self::read_envelope(&mut state.socket).await?;
86                match envelope.flow {
87                    Some(signaling_envelope::Flow::ServerToActr(server)) => match server.payload {
88                        Some(signaling_to_actr::Payload::DiscoveryResponse(response)) => {
89                            break Self::handle_discovery_response(response);
90                        }
91                        Some(signaling_to_actr::Payload::Error(error)) => {
92                            break Err(Self::as_error("Discovery failed", &error));
93                        }
94                        _ => {}
95                    },
96                    Some(signaling_envelope::Flow::EnvelopeError(error)) => {
97                        break Err(Self::as_error("Discovery failed", &error));
98                    }
99                    _ => {}
100                }
101            },
102            Err(err) => Err(err),
103        };
104        if result.is_err() {
105            *state_guard = None;
106        }
107        result
108    }
109
110    fn handle_discovery_response(
111        response: actr_protocol::DiscoveryResponse,
112    ) -> Result<Vec<discovery_response::TypeEntry>> {
113        match response.result {
114            Some(discovery_response::Result::Success(success)) => Ok(success.entries),
115            Some(discovery_response::Result::Error(error)) => {
116                Err(Self::as_error("Discovery failed", &error))
117            }
118            None => Err(anyhow!("Discovery response is missing result")),
119        }
120    }
121
122    async fn connect_and_register(&self) -> Result<SignalingState> {
123        let signaling_url = self.config.signaling_url.as_str();
124        let (mut socket, _) = connect_async(signaling_url)
125            .await
126            .with_context(|| format!("Failed to connect to signaling: {signaling_url}"))?;
127
128        let register_request = RegisterRequest {
129            actr_type: self.config.package.actr_type.clone(),
130            realm: self.config.realm,
131            service_spec: None,
132            acl: None,
133        };
134
135        let envelope =
136            Self::build_envelope(signaling_envelope::Flow::PeerToServer(PeerToSignaling {
137                payload: Some(peer_to_signaling::Payload::RegisterRequest(
138                    register_request,
139                )),
140            }))?;
141
142        Self::send_envelope(&mut socket, envelope).await?;
143
144        let (actr_id, credential) = loop {
145            let envelope = Self::read_envelope(&mut socket).await?;
146            match envelope.flow {
147                Some(signaling_envelope::Flow::ServerToActr(server)) => match server.payload {
148                    Some(signaling_to_actr::Payload::RegisterResponse(response)) => {
149                        match response.result {
150                            Some(register_response::Result::Success(success)) => {
151                                break (success.actr_id, success.credential);
152                            }
153                            Some(register_response::Result::Error(error)) => {
154                                return Err(Self::as_error("Register failed", &error));
155                            }
156                            None => return Err(anyhow!("Register response is missing result")),
157                        }
158                    }
159                    Some(signaling_to_actr::Payload::Error(error)) => {
160                        return Err(Self::as_error("Register failed", &error));
161                    }
162                    _ => {}
163                },
164                Some(signaling_envelope::Flow::EnvelopeError(error)) => {
165                    return Err(Self::as_error("Register failed", &error));
166                }
167                _ => {}
168            }
169        };
170
171        Ok(SignalingState {
172            socket,
173            actr_id,
174            credential,
175        })
176    }
177
178    fn as_error(context: &str, error: &ErrorResponse) -> anyhow::Error {
179        anyhow!("{context}: {} ({})", error.message, error.code)
180    }
181
182    async fn send_envelope(
183        socket: &mut SignalingSocket,
184        envelope: SignalingEnvelope,
185    ) -> Result<()> {
186        let mut buf = Vec::new();
187        envelope
188            .encode(&mut buf)
189            .context("Failed to encode signaling envelope")?;
190        socket
191            .send(WsMessage::Binary(buf.into()))
192            .await
193            .context("Failed to send signaling envelope")?;
194        Ok(())
195    }
196
197    async fn read_envelope(socket: &mut SignalingSocket) -> Result<SignalingEnvelope> {
198        while let Some(message) = socket.next().await {
199            match message.context("Failed to read signaling response")? {
200                WsMessage::Binary(bytes) => {
201                    return SignalingEnvelope::decode(bytes)
202                        .context("Failed to decode signaling envelope");
203                }
204                WsMessage::Close(_) => {
205                    return Err(anyhow!("Signaling connection closed"));
206                }
207                WsMessage::Ping(_) | WsMessage::Pong(_) => {}
208                WsMessage::Text(text) => {
209                    return Err(anyhow!("Unexpected text message from signaling: {text}"));
210                }
211                WsMessage::Frame(_) => {}
212            }
213        }
214
215        Err(anyhow!("Signaling connection closed"))
216    }
217
218    fn build_envelope(flow: signaling_envelope::Flow) -> Result<SignalingEnvelope> {
219        Ok(SignalingEnvelope {
220            envelope_version: 1,
221            envelope_id: uuid::Uuid::new_v4().to_string(),
222            reply_for: None,
223            timestamp: prost_types::Timestamp {
224                seconds: chrono::Utc::now().timestamp(),
225                nanos: 0,
226            },
227            traceparent: None,
228            tracestate: None,
229            flow: Some(flow),
230        })
231    }
232
233    fn select_version(entry: &discovery_response::TypeEntry) -> String {
234        entry
235            .tags
236            .iter()
237            .find(|tag| tag.as_str() == "latest")
238            .cloned()
239            .or_else(|| entry.tags.first().cloned())
240            .unwrap_or_else(|| "unknown".to_string())
241    }
242
243    fn matches_filter(entry: &discovery_response::TypeEntry, filter: &ServiceFilter) -> bool {
244        if let Some(pattern) = &filter.name_pattern {
245            let full_name = Self::format_actr_type(&entry.actr_type);
246            let matches = Self::matches_pattern(&entry.name, pattern)
247                || Self::matches_pattern(&full_name, pattern);
248            if !matches {
249                return false;
250            }
251        }
252
253        if let Some(version_range) = &filter.version_range
254            && Self::select_version(entry) != *version_range
255            && !entry.tags.iter().any(|tag| tag == version_range)
256        {
257            return false;
258        }
259
260        if let Some(tags) = &filter.tags {
261            let has_all = tags.iter().all(|tag| entry.tags.iter().any(|t| t == tag));
262            if !has_all {
263                return false;
264            }
265        }
266
267        true
268    }
269
270    fn matches_pattern(value: &str, pattern: &str) -> bool {
271        if pattern == "*" {
272            return true;
273        }
274
275        let segments: Vec<&str> = pattern.split('*').collect();
276        if segments.len() == 1 {
277            return value == pattern;
278        }
279
280        if !pattern.starts_with('*')
281            && let Some(first) = segments.first()
282            && !value.starts_with(first)
283        {
284            return false;
285        }
286
287        if !pattern.ends_with('*')
288            && let Some(last) = segments.last()
289            && !value.ends_with(last)
290        {
291            return false;
292        }
293
294        let mut search_start = 0;
295        let end_limit = if !pattern.ends_with('*') {
296            value
297                .len()
298                .saturating_sub(segments.last().unwrap_or(&"").len())
299        } else {
300            value.len()
301        };
302
303        for (index, segment) in segments.iter().enumerate() {
304            if segment.is_empty() {
305                continue;
306            }
307            if index == 0 && !pattern.starts_with('*') {
308                search_start = segment.len();
309                continue;
310            }
311            if index == segments.len() - 1 && !pattern.ends_with('*') {
312                continue;
313            }
314            if let Some(found) = value[search_start..end_limit].find(segment) {
315                search_start += found + segment.len();
316            } else {
317                return false;
318            }
319        }
320
321        true
322    }
323}
324
325#[async_trait]
326impl ServiceDiscovery for NetworkServiceDiscovery {
327    async fn discover_services(&self, filter: Option<&ServiceFilter>) -> Result<Vec<ServiceInfo>> {
328        let entries = self.discover_entries(filter).await?;
329        let services = entries
330            .into_iter()
331            .filter(|entry| match filter {
332                Some(filter) => Self::matches_filter(entry, filter),
333                None => true,
334            })
335            .map(ServiceInfo::from)
336            .collect();
337        Ok(services)
338    }
339
340    async fn get_service_details(&self, name: &str) -> Result<ServiceDetails> {
341        let entries = self.discover_entries(None).await?;
342        let entry = entries
343            .into_iter()
344            .find(|entry| entry.name == name || Self::format_actr_type(&entry.actr_type) == name);
345
346        let entry = entry.ok_or_else(|| anyhow!("Service not found: {name}"))?;
347        let info = ServiceInfo::from(entry.clone());
348
349        // Try to get ServiceSpec with proto files
350        let proto_files = match self.get_service_proto(&entry.name).await {
351            Ok(proto_files) => proto_files,
352            Err(e) => {
353                tracing::warn!("Failed to get ServiceSpec for {name}: {e}");
354                Vec::new()
355            }
356        };
357
358        Ok(ServiceDetails {
359            info,
360            proto_files,
361            dependencies: Vec::new(),
362        })
363    }
364
365    // TODO: improve the performance of this method
366    async fn check_service_availability(&self, name: &str) -> Result<AvailabilityStatus> {
367        let entries = self.discover_entries(None).await?;
368        let available = entries.iter().any(|entry| entry.name == name);
369
370        Ok(AvailabilityStatus {
371            is_available: available,
372            last_seen: available.then(SystemTime::now),
373            health: if available {
374                HealthStatus::Healthy
375            } else {
376                HealthStatus::Unknown
377            },
378        })
379    }
380
381    async fn get_service_proto(&self, name: &str) -> Result<Vec<ProtoFile>> {
382        self.ensure_connected().await?;
383        let mut state_guard = self.state.lock().await;
384        let state = state_guard
385            .as_mut()
386            .context("Signaling state not initialized")?;
387
388        let request = GetServiceSpecRequest {
389            name: name.to_string(),
390        };
391        let payload = actr_to_signaling::Payload::GetServiceSpecRequest(request);
392        let envelope =
393            Self::build_envelope(signaling_envelope::Flow::ActrToServer(ActrToSignaling {
394                source: state.actr_id.clone(),
395                credential: state.credential.clone(),
396                payload: Some(payload),
397            }))?;
398
399        let result = match Self::send_envelope(&mut state.socket, envelope).await {
400            Ok(()) => loop {
401                let envelope = Self::read_envelope(&mut state.socket).await?;
402                match envelope.flow {
403                    Some(signaling_envelope::Flow::ServerToActr(server)) => match server.payload {
404                        Some(signaling_to_actr::Payload::GetServiceSpecResponse(response)) => {
405                            let proto_files = match response.result {
406                                Some(get_service_spec_response::Result::Success(success)) => {
407                                    success
408                                        .protobufs
409                                        .into_iter()
410                                        .map(|p| ProtoFile {
411                                            name: format!("{}.proto", p.package),
412                                            path: PathBuf::new(),
413                                            content: p.content,
414                                            services: Vec::new(),
415                                        })
416                                        .collect()
417                                }
418                                Some(get_service_spec_response::Result::Error(error)) => {
419                                    break Err(Self::as_error("Get service spec failed", &error));
420                                }
421                                None => {
422                                    break Err(anyhow!(
423                                        "Get service spec response is missing result"
424                                    ));
425                                }
426                            };
427                            break Ok(proto_files);
428                        }
429                        Some(signaling_to_actr::Payload::Error(error)) => {
430                            break Err(Self::as_error("Get service spec failed", &error));
431                        }
432                        _ => {}
433                    },
434                    Some(signaling_envelope::Flow::EnvelopeError(error)) => {
435                        break Err(Self::as_error("Get service spec failed", &error));
436                    }
437                    _ => {}
438                }
439            },
440            Err(err) => Err(err),
441        };
442
443        if result.is_err() {
444            *state_guard = None;
445        }
446
447        result
448    }
449}