wasmcloud_control_interface/
client.rs

1//! Control interface client
2
3use core::fmt::{self, Debug};
4use core::time::Duration;
5
6use std::collections::{BTreeMap, HashMap};
7
8use async_nats::Subscriber;
9use cloudevents::event::Event;
10use futures::{StreamExt, TryFutureExt};
11use serde::de::DeserializeOwned;
12use tokio::sync::mpsc::Receiver;
13use tracing::{debug, error, instrument, trace};
14
15use crate::types::ctl::{
16    CtlResponse, ScaleComponentCommand, StartProviderCommand, StopHostCommand, StopProviderCommand,
17    UpdateComponentCommand,
18};
19use crate::types::host::{Host, HostInventory, HostLabel};
20use crate::types::link::Link;
21use crate::types::registry::RegistryCredential;
22use crate::types::rpc::{
23    ComponentAuctionAck, ComponentAuctionRequest, DeleteInterfaceLinkDefinitionRequest,
24    ProviderAuctionAck, ProviderAuctionRequest,
25};
26use crate::{
27    broker, json_deserialize, json_serialize, otel, HostLabelIdentifier, IdentifierKind, Result,
28};
29
30/// A client builder that can be used to fluently provide configuration settings used to construct
31/// the control interface client
32#[derive(Debug, Clone)]
33#[non_exhaustive]
34pub struct ClientBuilder {
35    nc: async_nats::Client,
36    topic_prefix: Option<String>,
37    lattice: String,
38    timeout: Duration,
39    auction_timeout: Duration,
40}
41
42impl ClientBuilder {
43    /// Creates a new client builder using the given client with all configuration values set to
44    /// their defaults
45    #[must_use]
46    pub fn new(nc: async_nats::Client) -> ClientBuilder {
47        ClientBuilder {
48            nc,
49            topic_prefix: None,
50            lattice: "default".to_string(),
51            timeout: Duration::from_secs(2),
52            auction_timeout: Duration::from_secs(5),
53        }
54    }
55
56    /// Sets the topic prefix for the NATS topic used for all control requests. Not to be confused
57    /// with lattice ID/prefix
58    #[must_use]
59    pub fn topic_prefix(self, prefix: impl Into<String>) -> ClientBuilder {
60        ClientBuilder {
61            topic_prefix: Some(prefix.into()),
62            ..self
63        }
64    }
65
66    /// The lattice ID/prefix used for this client. If this function is not invoked, the prefix will
67    /// be set to `default`
68    #[must_use]
69    pub fn lattice(self, prefix: impl Into<String>) -> ClientBuilder {
70        ClientBuilder {
71            lattice: prefix.into(),
72            ..self
73        }
74    }
75
76    /// Sets the timeout for control interface requests issued by the client. If not set, the
77    /// default will be 2 seconds
78    #[must_use]
79    pub fn timeout(self, timeout: Duration) -> ClientBuilder {
80        ClientBuilder { timeout, ..self }
81    }
82
83    /// Sets the timeout for auction (scatter/gather) operations. If not set, the default will be 5
84    /// seconds
85    #[must_use]
86    pub fn auction_timeout(self, timeout: Duration) -> ClientBuilder {
87        ClientBuilder {
88            auction_timeout: timeout,
89            ..self
90        }
91    }
92
93    /// Constructs the client with the given configuration from the builder
94    #[must_use]
95    pub fn build(self) -> Client {
96        Client {
97            nc: self.nc,
98            topic_prefix: self.topic_prefix,
99            lattice: self.lattice,
100            timeout: self.timeout,
101            auction_timeout: self.auction_timeout,
102        }
103    }
104}
105
106/// Lattice control interface client
107#[derive(Clone)]
108#[non_exhaustive]
109pub struct Client {
110    /// Internal `async-nats` client
111    nc: async_nats::Client,
112    /// Topic prefix that should be used with this lattice control client
113    topic_prefix: Option<String>,
114    /// Lattice prefix
115    lattice: String,
116    /// Timeout
117    timeout: Duration,
118    /// Timeout to use when limiting auctions
119    auction_timeout: Duration,
120}
121
122impl Debug for Client {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        f.debug_struct("Client")
125            .field("topic_prefix", &self.topic_prefix)
126            .field("lattice", &self.lattice)
127            .field("timeout", &self.timeout)
128            .field("auction_timeout", &self.auction_timeout)
129            .finish_non_exhaustive()
130    }
131}
132
133impl Client {
134    /// Convenience method for creating a new client with all default settings. This is the same as
135    /// calling `ClientBuilder::new(nc).build()`
136    #[must_use]
137    pub fn new(nc: async_nats::Client) -> Client {
138        ClientBuilder::new(nc).build()
139    }
140
141    /// Get a copy of the NATS client in use by this control client
142    #[allow(unused)]
143    #[must_use]
144    pub fn nats_client(&self) -> async_nats::Client {
145        self.nc.clone()
146    }
147
148    /// Retrieve the lattice in use by the [`Client`]
149    pub fn lattice(&self) -> &str {
150        self.lattice.as_ref()
151    }
152
153    /// Perform a request with a timeout
154    #[instrument(level = "debug", skip_all)]
155    pub(crate) async fn request_timeout(
156        &self,
157        subject: String,
158        payload: Vec<u8>,
159        timeout: Duration,
160    ) -> Result<async_nats::Message> {
161        match tokio::time::timeout(
162            timeout,
163            self.nc.request_with_headers(
164                subject,
165                otel::HeaderInjector::default_with_span().into(),
166                payload.into(),
167            ),
168        )
169        .await
170        {
171            Err(_) => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timed out").into()),
172            Ok(Ok(message)) => Ok(message),
173            Ok(Err(e)) => Err(e.into()),
174        }
175    }
176
177    /// Queries the lattice for all responsive hosts, waiting for the full period specified by
178    /// _timeout_.
179    #[instrument(level = "debug", skip_all)]
180    pub async fn get_hosts(&self) -> Result<Vec<CtlResponse<Host>>> {
181        let subject = broker::v1::queries::hosts(&self.topic_prefix, &self.lattice);
182        debug!("get_hosts:publish {}", &subject);
183        self.publish_and_wait(subject, Vec::new()).await
184    }
185
186    /// Retrieves the contents of a running host
187    #[instrument(level = "debug", skip_all)]
188    pub async fn get_host_inventory(&self, host_id: &str) -> Result<CtlResponse<HostInventory>> {
189        let subject = broker::v1::queries::host_inventory(
190            &self.topic_prefix,
191            &self.lattice,
192            IdentifierKind::is_host_id(host_id)?.as_str(),
193        );
194        debug!("get_host_inventory:request {}", &subject);
195        match self.request_timeout(subject, vec![], self.timeout).await {
196            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
197            Err(e) => Err(format!("Did not receive host inventory from target host: {e}").into()),
198        }
199    }
200
201    /// Retrieves the full set of all cached claims in the lattice.
202    #[instrument(level = "debug", skip_all)]
203    pub async fn get_claims(&self) -> Result<CtlResponse<Vec<HashMap<String, String>>>> {
204        let subject = broker::v1::queries::claims(&self.topic_prefix, &self.lattice);
205        debug!("get_claims:request {}", &subject);
206        match self.request_timeout(subject, vec![], self.timeout).await {
207            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
208            Err(e) => Err(format!("Did not receive claims from lattice: {e}").into()),
209        }
210    }
211
212    /// Performs an component auction within the lattice, publishing a set of constraints and the
213    /// metadata for the component in question. This will always wait for the full period specified by
214    /// _duration_, and then return the set of gathered results. It is then up to the client to
215    /// choose from among the "auction winners" to issue the appropriate command to start an component.
216    /// Clients cannot assume that auctions will always return at least one result.
217    #[instrument(level = "debug", skip_all)]
218    pub async fn perform_component_auction(
219        &self,
220        component_ref: &str,
221        component_id: &str,
222        constraints: impl Into<BTreeMap<String, String>>,
223    ) -> Result<Vec<CtlResponse<ComponentAuctionAck>>> {
224        let subject = broker::v1::component_auction_subject(&self.topic_prefix, &self.lattice);
225        let bytes = json_serialize(
226            ComponentAuctionRequest::builder()
227                .component_ref(IdentifierKind::is_component_ref(component_ref)?)
228                .component_id(IdentifierKind::is_component_id(component_id)?)
229                .constraints(constraints.into())
230                .build()?,
231        )?;
232        debug!("component_auction:publish {}", &subject);
233        self.publish_and_wait(subject, bytes).await
234    }
235
236    /// Performs a provider auction within the lattice, publishing a set of constraints and the
237    /// metadata for the provider in question.
238    ///
239    /// This will always wait for the full period specified by _duration_, and then return the set of gathered results.
240    /// It is then up to the client to choose from among the "auction winners" and issue the appropriate command to start a
241    /// provider.
242    ///
243    /// Clients should not assume that auctions will always return at least one result.
244    ///
245    /// # Arguments
246    ///
247    /// * `provider_ref` - The ID of the provider to auction
248    /// * `provider_id` - The ID of the provider auction
249    /// * `constraints` - Constraints that govern where the provider can be placed
250    ///
251    #[instrument(level = "debug", skip_all)]
252    pub async fn perform_provider_auction(
253        &self,
254        provider_ref: &str,
255        provider_id: &str,
256        constraints: impl Into<BTreeMap<String, String>>,
257    ) -> Result<Vec<CtlResponse<ProviderAuctionAck>>> {
258        let subject = broker::v1::provider_auction_subject(&self.topic_prefix, &self.lattice);
259        let bytes = json_serialize(
260            ProviderAuctionRequest::builder()
261                .provider_ref(IdentifierKind::is_provider_ref(provider_ref)?)
262                .provider_id(IdentifierKind::is_provider_id(provider_id)?)
263                .constraints(constraints.into())
264                .build()?,
265        )?;
266        debug!("provider_auction:publish {}", &subject);
267        self.publish_and_wait(subject, bytes).await
268    }
269
270    /// Sends a request to the given host to scale a given component.
271    ///
272    /// This returns an acknowledgement of _receipt_ of the command, not a confirmation that the component scaled.
273    /// An acknowledgement will either indicate some form of validation failure, or, if no failure occurs, the receipt of
274    /// the command.
275    ///
276    /// To avoid blocking consumers, wasmCloud hosts will acknowledge the scale component
277    /// command prior to fetching the component's OCI bytes.
278    ///
279    /// Client that need deterministic results as to whether the component completed its startup process
280    /// must monitor the appropriate event in the control event stream.
281    ///
282    /// # Arguments
283    ///
284    /// * `host_id` - The ID of the host to scale the component on
285    /// * `component_ref` - The OCI reference of the component to scale
286    /// * `max_instances` - The maximum number of instances this component can run concurrently. Specifying `0` will stop the component.
287    /// * `annotations` - Optional annotations to apply to the component
288    /// * `config` - List of named configuration to use for the component
289    /// * `allow_update` - Whether to perform allow updates to the component (triggering a separate update)
290    ///
291    #[instrument(level = "debug", skip_all)]
292    #[allow(clippy::too_many_arguments)]
293    pub async fn scale_component(
294        &self,
295        host_id: &str,
296        component_ref: &str,
297        component_id: &str,
298        max_instances: u32,
299        annotations: Option<BTreeMap<String, String>>,
300        config: Vec<String>,
301        allow_update: bool,
302    ) -> Result<CtlResponse<()>> {
303        let host_id = IdentifierKind::is_host_id(host_id)?;
304        let subject = broker::v1::commands::scale_component(
305            &self.topic_prefix,
306            &self.lattice,
307            host_id.as_str(),
308        );
309        debug!("scale_component:request {}", &subject);
310        let bytes = json_serialize(ScaleComponentCommand {
311            max_instances,
312            component_ref: IdentifierKind::is_component_ref(component_ref)?,
313            component_id: IdentifierKind::is_component_id(component_id)?,
314            host_id,
315            annotations,
316            config,
317            allow_update,
318            ..Default::default()
319        })?;
320        match self.request_timeout(subject, bytes, self.timeout).await {
321            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
322            Err(e) => Err(format!("Did not receive scale component acknowledgement: {e}").into()),
323        }
324    }
325
326    /// Publishes a registry credential map to the control interface of the lattice.
327    ///
328    /// All hosts will be listening and overwrite their registry credential maps with the new information.
329    ///
330    /// It is highly recommended you use TLS connections with NATS and isolate the control interface
331    /// credentials when using this function in production as the data contains secrets
332    ///
333    /// # Arguments
334    ///
335    /// * `registries` - A map of registry names to their credentials to be used for fetching from specific registries
336    ///
337    #[instrument(level = "debug", skip_all)]
338    pub async fn put_registries(
339        &self,
340        registries: HashMap<String, RegistryCredential>,
341    ) -> Result<CtlResponse<()>> {
342        let subject = broker::v1::publish_registries(&self.topic_prefix, &self.lattice);
343        debug!("put_registries:publish {}", &subject);
344        let bytes = json_serialize(&registries)?;
345        let resp = self
346            .nc
347            .publish_with_headers(
348                subject,
349                otel::HeaderInjector::default_with_span().into(),
350                bytes.into(),
351            )
352            .await;
353        if let Err(e) = resp {
354            Err(format!("Failed to push registry credential map: {e}").into())
355        } else {
356            Ok(CtlResponse::<()>::success(
357                "successfully added registries".into(),
358            ))
359        }
360    }
361
362    /// Puts a link into the lattice.
363    ///
364    /// # Errors
365    ///
366    /// Returns an error if it was unable to put the link
367    #[instrument(level = "debug", skip_all)]
368    pub async fn put_link(&self, link: Link) -> Result<CtlResponse<()>> {
369        // Validate link parameters
370        IdentifierKind::is_component_id(&link.source_id)?;
371        IdentifierKind::is_component_id(&link.target)?;
372        IdentifierKind::is_link_name(&link.name)?;
373
374        let subject = broker::v1::put_link(&self.topic_prefix, &self.lattice);
375        debug!("put_link:request {}", &subject);
376
377        let bytes = crate::json_serialize(link)?;
378        match self.request_timeout(subject, bytes, self.timeout).await {
379            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
380            Err(e) => Err(format!("Did not receive put link acknowledgement: {e}").into()),
381        }
382    }
383
384    /// Deletes a link from the lattice metadata keyvalue bucket.
385    ///
386    /// This is an idempotent operation.
387    ///
388    /// # Errors
389    ///
390    /// Returns an error if it was unable to delete.
391    #[instrument(level = "debug", skip_all)]
392    pub async fn delete_link(
393        &self,
394        source_id: &str,
395        link_name: &str,
396        wit_namespace: &str,
397        wit_package: &str,
398    ) -> Result<CtlResponse<()>> {
399        let subject = broker::v1::delete_link(&self.topic_prefix, &self.lattice);
400        let ld = DeleteInterfaceLinkDefinitionRequest::from_source_and_link_metadata(
401            &IdentifierKind::is_component_id(source_id)?,
402            &IdentifierKind::is_link_name(link_name)?,
403            wit_namespace,
404            wit_package,
405        );
406        let bytes = crate::json_serialize(&ld)?;
407        match self.request_timeout(subject, bytes, self.timeout).await {
408            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
409            Err(e) => Err(format!("Did not receive delete link acknowledgement: {e}").into()),
410        }
411    }
412
413    /// Retrieves the list of link definitions stored in the lattice metadata key-value bucket.
414    ///
415    /// If the client was created with caching, this will return the cached list of links. Otherwise,
416    /// it will query the bucket for the list of links.
417    ///
418    #[instrument(level = "debug", skip_all)]
419    pub async fn get_links(&self) -> Result<CtlResponse<Vec<Link>>> {
420        let subject = broker::v1::queries::link_definitions(&self.topic_prefix, &self.lattice);
421        debug!("get_links:request {}", &subject);
422        match self.request_timeout(subject, vec![], self.timeout).await {
423            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
424            Err(e) => Err(format!("Did not receive a response to get links: {e}").into()),
425        }
426    }
427
428    /// Puts a named config, replacing any data that is already present.
429    ///
430    /// Config names must be valid NATS subject strings and not contain any `.` or `>` characters.
431    ///
432    /// # Arguments
433    ///
434    /// * `config_name` - Name of the configuration that should be saved
435    /// * `config` - contents of the configuration to be saved
436    ///
437    #[instrument(level = "debug", skip_all)]
438    pub async fn put_config(
439        &self,
440        config_name: &str,
441        config: impl Into<HashMap<String, String>>,
442    ) -> Result<CtlResponse<()>> {
443        let subject = broker::v1::put_config(&self.topic_prefix, &self.lattice, config_name);
444        debug!(%subject, %config_name, "Putting config");
445        let data = serde_json::to_vec(&config.into())?;
446        match self.request_timeout(subject, data, self.timeout).await {
447            Ok(msg) => json_deserialize(&msg.payload),
448            Err(e) => Err(format!("Did not receive a response to put config request: {e}").into()),
449        }
450    }
451
452    /// Delete the named config item.
453    ///
454    /// Config names must be valid NATS subject strings and not contain any `.` or `>` characters.
455    ///
456    /// # Arguments
457    ///
458    /// * `config_name` - Name of the configuration that should be deleted
459    ///
460    #[instrument(level = "debug", skip_all)]
461    pub async fn delete_config(&self, config_name: &str) -> Result<CtlResponse<()>> {
462        let subject = broker::v1::delete_config(&self.topic_prefix, &self.lattice, config_name);
463        debug!(%subject, %config_name, "Delete config");
464        match self
465            .request_timeout(subject, Vec::default(), self.timeout)
466            .await
467        {
468            Ok(msg) => json_deserialize(&msg.payload),
469            Err(e) => {
470                Err(format!("Did not receive a response to delete config request: {e}").into())
471            }
472        }
473    }
474
475    /// Get the named config item.
476    ///
477    /// # Arguments
478    ///
479    /// * `config_name` -  The name of the config to fetch. Config names must be valid NATS subject strings and not contain any `.` or `>` characters.
480    ///
481    /// # Returns
482    ///
483    /// A map of key-value pairs representing the contents of the config item. This response is wrapped in the [CtlResponse] type. If
484    /// the config item does not exist, the host will return a [CtlResponse] with a `success` field set to `true` and a `response` field
485    /// set to [Option::None]. If the config item exists, the host will return a [CtlResponse] with a `success` field set to `true` and a
486    /// `response` field set to [Option::Some] containing the key-value pairs of the config item.
487    ///
488    /// # Example
489    ///
490    /// ```no_run
491    /// # use std::collections::HashMap;
492    /// # #[tokio::main(flavor = "current_thread")]
493    /// # async fn main() {
494    /// let nc_client = async_nats::connect("127.0.0.1:4222").await.expect("failed to build NATS client");
495    /// let ctl_client = wasmcloud_control_interface::Client::new(nc_client);
496    /// ctl_client.put_config(
497    ///     "foo",
498    ///     HashMap::from_iter(vec![("key".to_string(), "value".to_string())]),
499    /// )
500    /// .await
501    /// .expect("should be able to put config");
502    ///
503    /// let config_resp = ctl_client.get_config("foo").await.expect("should be able to get config");
504    /// assert!(config_resp.succeeded());
505    /// assert_eq!(config_resp.data(), Some(&HashMap::from_iter(vec![("key".to_string(), "value".to_string())])));
506    ///
507    /// // Note that the host will return a success response even if the config item does not exist.
508    /// // Errors are reserved for communication problems with the host or with the config store.
509    /// let absent_config_resp = ctl_client.get_config("bar").await.expect("should be able to get config");
510    /// assert!(absent_config_resp.succeeded());
511    /// assert_eq!(absent_config_resp.data(), None);
512    ///
513    /// # }
514    /// ```
515    ///
516    #[instrument(level = "debug", skip_all)]
517    pub async fn get_config(
518        &self,
519        config_name: &str,
520    ) -> Result<CtlResponse<HashMap<String, String>>> {
521        let subject = broker::v1::queries::config(&self.topic_prefix, &self.lattice, config_name);
522        debug!(%subject, %config_name, "Getting config");
523        match self
524            .request_timeout(subject, Vec::default(), self.timeout)
525            .await
526        {
527            Ok(msg) => json_deserialize(&msg.payload),
528            Err(e) => Err(format!("Did not receive a response to get config request: {e}").into()),
529        }
530    }
531
532    /// Put a new (or update an existing) label on the given host.
533    ///
534    /// # Arguments
535    ///
536    /// * `host_id` - ID of the host on which the label should be placed
537    /// * `key` - The key of the label
538    /// * `value` - The value of the label
539    ///
540    /// # Errors
541    ///
542    /// Will return an error if there is a communication problem with the host
543    ///
544    pub async fn put_label(
545        &self,
546        host_id: &str,
547        key: &str,
548        value: &str,
549    ) -> Result<CtlResponse<()>> {
550        let subject = broker::v1::put_label(&self.topic_prefix, &self.lattice, host_id);
551        debug!(%subject, "putting label");
552        let bytes = json_serialize(HostLabel {
553            key: key.to_string(),
554            value: value.to_string(),
555        })?;
556        match self.request_timeout(subject, bytes, self.timeout).await {
557            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
558            Err(e) => Err(format!("Did not receive put label acknowledgement: {e}").into()),
559        }
560    }
561
562    /// Removes a label from the given host.
563    ///
564    /// # Arguments
565    ///
566    /// * `host_id` - ID of the host on which the label should be deleted
567    /// * `key` - The key of the label that should be deleted
568    ///
569    /// # Errors
570    ///
571    /// Will return an error if there is a communication problem with the host
572    ///
573    pub async fn delete_label(&self, host_id: &str, key: &str) -> Result<CtlResponse<()>> {
574        let subject = broker::v1::delete_label(&self.topic_prefix, &self.lattice, host_id);
575        debug!(%subject, "removing label");
576        let bytes = json_serialize(HostLabelIdentifier {
577            key: key.to_string(),
578        })?;
579        match self.request_timeout(subject, bytes, self.timeout).await {
580            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
581            Err(e) => Err(format!("Did not receive remove label acknowledgement: {e}").into()),
582        }
583    }
584
585    /// Command a host to replace an existing component with a new component indicated by an OCI image reference.
586    ///
587    /// The host will acknowledge this request as soon as it verifies that the target component is running.
588    ///
589    /// Note that acknowledgement occurs **before** the new bytes are downloaded. Live-updating an component can take a long time
590    /// and control clients cannot block waiting for a reply that could come several seconds later.
591    ///
592    /// To properly verify that a component has been updated, create  listener for the appropriate [`PublishedEvent`] on the
593    /// control events channel
594    ///
595    /// # Arguments
596    ///
597    /// * `host_id` - ID of the host on which the component should be updated
598    /// * `existing_component_id` - ID of the existing component
599    /// * `new_component_ref` - New component reference that should be used
600    /// * `annotations` - Annotations to place on the newly updated component
601    ///
602    #[instrument(level = "debug", skip_all)]
603    pub async fn update_component(
604        &self,
605        host_id: &str,
606        existing_component_id: &str,
607        new_component_ref: &str,
608        annotations: Option<BTreeMap<String, String>>,
609    ) -> Result<CtlResponse<()>> {
610        let host_id = IdentifierKind::is_host_id(host_id)?;
611        let subject = broker::v1::commands::update_component(
612            &self.topic_prefix,
613            &self.lattice,
614            host_id.as_str(),
615        );
616        debug!("update_component:request {}", &subject);
617        let bytes = json_serialize(UpdateComponentCommand {
618            host_id,
619            component_id: IdentifierKind::is_component_id(existing_component_id)?,
620            new_component_ref: IdentifierKind::is_component_ref(new_component_ref)?,
621            annotations,
622        })?;
623        match self.request_timeout(subject, bytes, self.timeout).await {
624            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
625            Err(e) => Err(format!("Did not receive update component acknowledgement: {e}").into()),
626        }
627    }
628
629    /// Command a host to start a provider with a given OCI reference.
630    ///
631    /// The specified link name will be used (or "default" if none is specified).
632    ///
633    /// The target wasmCloud host will acknowledge the receipt of this command _before_ downloading the provider's bytes from the
634    /// OCI registry, indicating either a validation failure or success.
635    ///
636    /// Clients that need deterministic guarantees that the provider has completed its startup process, should
637    /// monitor the control event stream for the appropriate event.
638    ///
639    /// The `provider_configuration` parameter is a list of named configs to use for this provider, and configurations are not required.
640    ///
641    /// # Arguments
642    ///
643    /// * `host_id` - ID of the host on which to start the provider
644    /// * `provider_ref` - Image reference of the provider to start
645    /// * `provider_id` - ID of the provider to start
646    /// * `annotations` - Annotations to place on the started provider
647    /// * `provider_configuration` - Configuration relevant to the provider (if any)
648    ///
649    #[instrument(level = "debug", skip_all)]
650    pub async fn start_provider(
651        &self,
652        host_id: &str,
653        provider_ref: &str,
654        provider_id: &str,
655        annotations: Option<BTreeMap<String, String>>,
656        provider_configuration: Vec<String>,
657    ) -> Result<CtlResponse<()>> {
658        let host_id = IdentifierKind::is_host_id(host_id)?;
659        let subject = broker::v1::commands::start_provider(
660            &self.topic_prefix,
661            &self.lattice,
662            host_id.as_str(),
663        );
664        debug!("start_provider:request {}", &subject);
665        let mut cmd = StartProviderCommand::builder()
666            .host_id(&host_id)
667            .provider_ref(&IdentifierKind::is_provider_ref(provider_ref)?)
668            .provider_id(&IdentifierKind::is_component_id(provider_id)?);
669        if let Some(annotations) = annotations {
670            cmd = cmd.annotations(annotations);
671        }
672        let cmd = cmd.config(provider_configuration).build()?;
673        let bytes = json_serialize(cmd)?;
674
675        match self.request_timeout(subject, bytes, self.timeout).await {
676            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
677            Err(e) => Err(format!("Did not receive start provider acknowledgement: {e}").into()),
678        }
679    }
680
681    /// Issues a command to a host to stop a provider for the given OCI reference, link name, and
682    /// contract ID.
683    ///
684    /// The target wasmCloud host will acknowledge the receipt of this command, and
685    /// _will not_ supply a discrete confirmation that a provider has terminated. For that kind of
686    /// information, the client must also monitor the control event stream
687    ///
688    /// # Arguments
689    ///
690    /// * `host_id` - ID of the host on which to stop the provider
691    /// * `provider_id` - ID of the provider to stop
692    ///
693    #[instrument(level = "debug", skip_all)]
694    pub async fn stop_provider(&self, host_id: &str, provider_id: &str) -> Result<CtlResponse<()>> {
695        let host_id = IdentifierKind::is_host_id(host_id)?;
696
697        let subject = broker::v1::commands::stop_provider(
698            &self.topic_prefix,
699            &self.lattice,
700            host_id.as_str(),
701        );
702        debug!("stop_provider:request {}", &subject);
703        let bytes = json_serialize(StopProviderCommand {
704            host_id,
705            provider_id: IdentifierKind::is_component_id(provider_id)?,
706        })?;
707
708        match self.request_timeout(subject, bytes, self.timeout).await {
709            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
710            Err(e) => Err(format!("Did not receive stop provider acknowledgement: {e}").into()),
711        }
712    }
713
714    /// Issues a command to a specific host to perform a graceful termination.
715    ///
716    /// The target host will acknowledge receipt of the command before it attempts a shutdown.
717    ///
718    /// To deterministically verify that the host is down, a client should monitor for the "host stopped" event or
719    /// passively detect the host down by way of a lack of heartbeat receipts
720    ///
721    /// # Arguments
722    ///
723    /// * `host_id` - ID of the host to stop
724    /// * `timeout_ms` - (optional) amount of time to allow the host to complete stopping
725    ///
726    #[instrument(level = "debug", skip_all)]
727    pub async fn stop_host(
728        &self,
729        host_id: &str,
730        timeout_ms: Option<u64>,
731    ) -> Result<CtlResponse<()>> {
732        let host_id = IdentifierKind::is_host_id(host_id)?;
733        let subject =
734            broker::v1::commands::stop_host(&self.topic_prefix, &self.lattice, host_id.as_str());
735        debug!("stop_host:request {}", &subject);
736        let bytes = json_serialize(StopHostCommand {
737            host_id,
738            timeout: timeout_ms,
739        })?;
740
741        match self.request_timeout(subject, bytes, self.timeout).await {
742            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
743            Err(e) => Err(format!("Did not receive stop host acknowledgement: {e}").into()),
744        }
745    }
746
747    /// Publish a message and wait for a response
748    async fn publish_and_wait<D: DeserializeOwned>(
749        &self,
750        subject: String,
751        payload: Vec<u8>,
752    ) -> Result<Vec<D>> {
753        let reply = self.nc.new_inbox();
754        let sub = self.nc.subscribe(reply.clone()).await?;
755        self.nc
756            .publish_with_reply_and_headers(
757                subject.clone(),
758                reply,
759                otel::HeaderInjector::default_with_span().into(),
760                payload.into(),
761            )
762            .await?;
763        let nc = self.nc.clone();
764        tokio::spawn(async move {
765            if let Err(error) = nc.flush().await {
766                error!(%error, "flush after publish");
767            }
768        });
769        Ok(collect_sub_timeout::<D>(sub, self.auction_timeout, subject.as_str()).await)
770    }
771
772    /// Returns the receiver end of a channel that subscribes to the lattice event stream.
773    ///
774    /// Any [`Event`]s that are published after this channel is created
775    /// will be added to the receiver channel's buffer, which can be observed or handled if needed.
776    ///
777    /// See the example for how you could use this receiver to handle events.
778    ///
779    /// # Example
780    ///
781    /// ```rust
782    /// use wasmcloud_control_interface::{Client, ClientBuilder};
783    /// async {
784    ///   let nc = async_nats::connect("127.0.0.1:4222").await.unwrap();
785    ///   let client = ClientBuilder::new(nc)
786    ///                 .timeout(std::time::Duration::from_millis(1000))
787    ///                 .auction_timeout(std::time::Duration::from_millis(1000))
788    ///                 .build();
789    ///   let mut receiver = client.events_receiver(vec!["component_scaled".to_string()]).await.unwrap();
790    ///   while let Some(evt) = receiver.recv().await {
791    ///       println!("Event received: {:?}", evt);
792    ///   }
793    /// };
794    /// ```
795    ///
796    /// # Arguments
797    ///
798    /// * `event_types` - List of types of events to listen for
799    ///
800    #[allow(clippy::missing_errors_doc)] // TODO: Document errors
801    pub async fn events_receiver(&self, event_types: Vec<String>) -> Result<Receiver<Event>> {
802        let (sender, receiver) = tokio::sync::mpsc::channel(5000);
803        let futs = event_types.into_iter().map(|event_type| {
804            self.nc
805                .subscribe(format!("wasmbus.evt.{}.{}", self.lattice, event_type))
806                .map_err(|err| Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
807        });
808        let subs: Vec<Subscriber> = futures::future::join_all(futs)
809            .await
810            .into_iter()
811            .collect::<Result<_>>()?;
812        let mut stream = futures::stream::select_all(subs);
813        tokio::spawn(async move {
814            while let Some(msg) = stream.next().await {
815                let Ok(evt) = json_deserialize::<Event>(&msg.payload) else {
816                    error!("Object received on event stream was not a CloudEvent");
817                    continue;
818                };
819                trace!("received event: {:?}", evt);
820                let Ok(()) = sender.send(evt).await else {
821                    break;
822                };
823            }
824        });
825        Ok(receiver)
826    }
827}
828
829/// Collect `T` values until timeout has elapsed
830pub(crate) async fn collect_sub_timeout<T: DeserializeOwned>(
831    mut sub: async_nats::Subscriber,
832    timeout: Duration,
833    reason: &str,
834) -> Vec<T> {
835    let mut items = Vec::new();
836    let sleep = tokio::time::sleep(timeout);
837    tokio::pin!(sleep);
838    loop {
839        tokio::select! {
840            msg = sub.next() => {
841                let Some(msg) = msg else {
842                    break;
843                };
844                if msg.payload.is_empty() {
845                    break;
846                }
847                match json_deserialize::<T>(&msg.payload) {
848                    Ok(item) => items.push(item),
849                    Err(error) => {
850                        error!(%reason, %error,
851                            "deserialization error in auction - results may be incomplete",
852                        );
853                        break;
854                    }
855                }
856            },
857            () = &mut sleep => { /* timeout */ break; }
858        }
859    }
860    items
861}
862
863#[cfg(test)]
864mod tests {
865    use super::*;
866
867    /// Note: This test is a means of manually watching the event stream as CloudEvents are received
868    /// It does not assert functionality, and so we've marked it as ignore to ensure it's not run by default
869    /// It currently listens for 120 seconds then exits
870    #[tokio::test]
871    #[ignore]
872    async fn test_events_receiver() {
873        let nc = async_nats::connect("127.0.0.1:4222").await.unwrap();
874        let client = ClientBuilder::new(nc)
875            .timeout(Duration::from_millis(1000))
876            .auction_timeout(Duration::from_millis(1000))
877            .build();
878        let mut receiver = client
879            .events_receiver(vec!["foobar".to_string()])
880            .await
881            .unwrap();
882        tokio::spawn(async move {
883            while let Some(evt) = receiver.recv().await {
884                println!("Event received: {evt:?}");
885            }
886        });
887        println!("Listening to Cloud Events for 120 seconds. Then we will quit.");
888        tokio::time::sleep(Duration::from_secs(120)).await;
889    }
890
891    #[test]
892    fn test_check_identifier() -> Result<()> {
893        assert!(IdentifierKind::is_host_id("").is_err());
894        assert!(IdentifierKind::is_host_id(" ").is_err());
895        let host_id = IdentifierKind::is_host_id("             ");
896        assert!(host_id.is_err(), "parsing host id should have failed");
897        assert!(host_id
898            .unwrap_err()
899            .to_string()
900            .contains("Host ID cannot be empty"));
901        let provider_ref = IdentifierKind::is_provider_ref("");
902        assert!(
903            provider_ref.is_err(),
904            "parsing provider ref should have failed"
905        );
906        assert!(provider_ref
907            .unwrap_err()
908            .to_string()
909            .contains("Provider OCI reference cannot be empty"));
910        assert!(IdentifierKind::is_host_id("host_id").is_ok());
911        let component_id = IdentifierKind::is_component_id("            iambatman  ")?;
912        assert_eq!(component_id, "iambatman");
913
914        Ok(())
915    }
916
917    #[tokio::test]
918    #[ignore]
919    /// Test after large 1.0 refcomponents to ensure all return types are formatted as [CtlResponse] types, and that
920    /// the host can handle all of the requests.
921    ///
922    /// You must run NATS and one host locally to run this test successfully.
923    async fn ctl_response_comprehensive() {
924        let client = Client::new(
925            async_nats::connect("127.0.0.1:4222")
926                .await
927                .expect("should be able to connect to local NATS"),
928        );
929        // Fetch the one host we ran
930        let hosts = client
931            .get_hosts()
932            .await
933            .expect("should be able to fetch at least a host");
934        assert_eq!(hosts.len(), 1);
935        let host = hosts.first().expect("one host to exist");
936        assert!(host.success);
937        assert!(host.message.is_empty());
938        assert!(host.response.is_some());
939        let host = host.response.as_ref().unwrap();
940        ////
941        // Actor operations
942        ////
943        // Actor Auction
944        let auction_response = client
945            .perform_component_auction(
946                "ghcr.io/brooksmtownsend/http-hello-world-rust:0.1.0",
947                "echo",
948                BTreeMap::new(),
949            )
950            .await
951            .expect("should be able to auction an component");
952        assert_eq!(auction_response.len(), 1);
953        let first_ack = auction_response.first().expect("a single component ack");
954        let auction_ack = first_ack.response.as_ref().unwrap();
955        let (component_ref, component_id) = (&auction_ack.component_ref, &auction_ack.component_id);
956        // Actor Scale
957        let scale_response = client
958            .scale_component(
959                &host.id,
960                component_ref,
961                component_id,
962                1,
963                None,
964                Vec::with_capacity(0),
965                false,
966            )
967            .await
968            .expect("should be able to scale component");
969        assert!(scale_response.success);
970        assert!(scale_response.message.is_empty());
971        assert!(scale_response.response.is_none());
972        // Actor Update (TODO(brooksmtownsend): we should test this with a real update, but I'm using a failure case)
973        let update_component_resp = client
974            .update_component(
975                &host.id,
976                "nonexistantcomponentID",
977                "ghcr.io/wasmcloud/components/http-keyvalue-counter-rust:0.1.0",
978                None,
979            )
980            .await
981            .expect("should be able to issue update component request");
982        assert!(!update_component_resp.success);
983        assert_eq!(
984            update_component_resp.message,
985            "component not found".to_string()
986        );
987        assert_eq!(update_component_resp.response, None);
988
989        ////
990        // Provider operations
991        ////
992        // Provider Auction
993        let provider_acks = client
994            .perform_provider_auction(
995                "ghcr.io/wasmcloud/http-server:0.26.0",
996                "httpserver",
997                BTreeMap::new(),
998            )
999            .await
1000            .expect("should be able to hold provider auction");
1001        assert_eq!(provider_acks.len(), 1);
1002        let provider_ack = provider_acks.first().expect("a single provider ack");
1003        assert!(provider_ack.success);
1004        assert!(provider_ack.message.is_empty());
1005        assert!(provider_ack.response.is_some());
1006        let auction_ack = provider_ack.response.as_ref().unwrap();
1007        let (provider_ref, provider_id) = (&auction_ack.provider_ref, &auction_ack.provider_id);
1008        // Provider Start
1009        let start_response = client
1010            .start_provider(&host.id, provider_ref, provider_id, None, vec![])
1011            .await
1012            .expect("should be able to start provider");
1013        assert!(start_response.success);
1014        assert!(start_response.message.is_empty());
1015        assert!(start_response.response.is_none());
1016        // Provider Stop (TODO(brooksmtownsend): not enough time to let the provider really stop, so I'm using a failure case)
1017        let stop_response = client
1018            .stop_provider(&host.id, "notarealproviderID")
1019            .await
1020            .expect("should be able to issue stop provider request");
1021        assert!(!stop_response.success);
1022        assert_eq!(
1023            stop_response.message,
1024            "provider with that ID is not running".to_string()
1025        );
1026        assert!(stop_response.response.is_none());
1027        ////
1028        // Link operations
1029        ////
1030        tokio::time::sleep(Duration::from_secs(5)).await;
1031        // Link Put
1032        let link_put = client
1033            .put_link(Link {
1034                source_id: "echo".to_string(),
1035                target: "httpserver".to_string(),
1036                name: "default".to_string(),
1037                wit_namespace: "wasi".to_string(),
1038                wit_package: "http".to_string(),
1039                interfaces: vec!["incoming-handler".to_string()],
1040                ..Default::default()
1041            })
1042            .await
1043            .expect("should be able to put link");
1044        assert!(link_put.success);
1045        assert!(link_put.message.is_empty());
1046        assert!(link_put.response.is_none());
1047        let links_get = client
1048            .get_links()
1049            .await
1050            .expect("should be able to get links");
1051        assert!(links_get.success);
1052        assert!(links_get.message.is_empty());
1053        assert!(links_get.response.is_some());
1054        // Link Get
1055        let link_get = links_get.response.as_ref().unwrap().first().unwrap();
1056        assert_eq!(link_get.source_id, "echo");
1057        assert_eq!(link_get.target, "httpserver");
1058        assert_eq!(link_get.name, "default");
1059        assert_eq!(link_get.wit_namespace, "wasi");
1060        assert_eq!(link_get.wit_package, "http");
1061        // Link Del
1062        let link_del = client
1063            .delete_link("echo", "default", "wasi", "http")
1064            .await
1065            .expect("should be able to delete link");
1066        assert!(link_del.success);
1067        assert!(link_del.message.is_empty());
1068        assert!(link_del.response.is_none());
1069
1070        ////
1071        // Label operations
1072        ////
1073        // Label Put
1074        let label_one = client
1075            .put_label(&host.id, "idk", "lol")
1076            .await
1077            .expect("should be able to put label");
1078        assert!(label_one.success);
1079        assert!(label_one.message.is_empty());
1080        assert!(label_one.response.is_none());
1081        let label_two = client
1082            .put_label(&host.id, "foo", "bar")
1083            .await
1084            .expect("should be able to put another label");
1085        assert!(label_two.success);
1086        assert!(label_two.message.is_empty());
1087        assert!(label_two.response.is_none());
1088        // Label Del
1089        let del_label_one = client
1090            .delete_label(&host.id, "idk")
1091            .await
1092            .expect("should be able to delete label");
1093        assert!(del_label_one.success);
1094        assert!(del_label_one.message.is_empty());
1095        assert!(del_label_one.response.is_none());
1096        ////
1097        // Registry operations
1098        ////
1099        // Registry Put
1100        let registry_put = client
1101            .put_registries(HashMap::from_iter([(
1102                "mycloud.io".to_string(),
1103                RegistryCredential {
1104                    username: Some("user".to_string()),
1105                    password: Some("pass".to_string()),
1106                    registry_type: "oci".to_string(),
1107                    token: None,
1108                },
1109            )]))
1110            .await
1111            .expect("should be able to put registries");
1112        assert!(registry_put.success);
1113        assert!(registry_put.message.is_empty());
1114        assert!(registry_put.response.is_none());
1115
1116        ////
1117        // Config operations
1118        ////
1119        // Config Put
1120        let config_put = client
1121            .put_config(
1122                "test_config",
1123                HashMap::from_iter([("sup".to_string(), "hey".to_string())]),
1124            )
1125            .await
1126            .expect("should be able to put config");
1127        assert!(config_put.success);
1128        assert!(config_put.message.is_empty());
1129        assert!(config_put.response.is_none());
1130        // Config Get
1131        let config_get = client
1132            .get_config("test_config")
1133            .await
1134            .expect("should be able to get config");
1135        assert!(config_get.success);
1136        assert!(config_get.message.is_empty());
1137        assert!(config_get
1138            .response
1139            .is_some_and(|r| r.get("sup").is_some_and(|s| s == "hey")));
1140        // Config Del
1141        let config_del = client
1142            .delete_config("test_config")
1143            .await
1144            .expect("should be able to delete config");
1145        assert!(config_del.success);
1146        assert!(config_del.message.is_empty());
1147        assert!(config_del.response.is_none());
1148
1149        ////
1150        // Host operations
1151        ////
1152        // Host Get
1153        let inventory = client
1154            .get_host_inventory(&host.id)
1155            .await
1156            .expect("should be able to fetch at least a host");
1157        assert!(inventory.success);
1158        assert!(inventory.message.is_empty());
1159        assert!(inventory.response.is_some());
1160        let host_inventory = inventory.response.unwrap();
1161        assert!(host_inventory.components.iter().all(|a| a.id == "echo"));
1162        assert!(!host_inventory.labels.contains_key("idk"));
1163        assert!(host_inventory
1164            .labels
1165            .get("foo")
1166            .is_some_and(|f| f == &"bar".to_string()));
1167        // Host Stop
1168        let stop_host = client
1169            .stop_host(&host.id, Some(1234))
1170            .await
1171            .expect("should be able to stop host");
1172        assert!(stop_host.success);
1173        assert!(stop_host.message.is_empty());
1174        assert!(stop_host.response.is_none());
1175    }
1176}