wasmcloud_control_interface/
client.rs

1//! Control interface client
2
3use core::fmt::{self, Debug};
4use core::time::Duration;
5
6use std::collections::{BTreeMap, HashMap};
7
8use async_nats::Subscriber;
9use cloudevents::event::Event;
10use futures::{StreamExt, TryFutureExt};
11use serde::de::DeserializeOwned;
12use tokio::sync::mpsc::Receiver;
13use tracing::{debug, error, instrument, trace};
14
15use crate::types::ctl::{
16    CtlResponse, ScaleComponentCommand, StartProviderCommand, StopHostCommand, StopProviderCommand,
17    UpdateComponentCommand,
18};
19use crate::types::host::{Host, HostInventory, HostLabel};
20use crate::types::link::Link;
21use crate::types::registry::RegistryCredential;
22use crate::types::rpc::{
23    ComponentAuctionAck, ComponentAuctionRequest, DeleteInterfaceLinkDefinitionRequest,
24    ProviderAuctionAck, ProviderAuctionRequest,
25};
26use crate::{
27    broker, json_deserialize, json_serialize, otel, HostLabelIdentifier, IdentifierKind, Result,
28};
29
30/// A client builder that can be used to fluently provide configuration settings used to construct
31/// the control interface client
32#[derive(Debug, Clone)]
33#[non_exhaustive]
34pub struct ClientBuilder {
35    nc: async_nats::Client,
36    topic_prefix: Option<String>,
37    lattice: String,
38    timeout: Duration,
39    auction_timeout: Duration,
40}
41
42impl ClientBuilder {
43    /// Creates a new client builder using the given client with all configuration values set to
44    /// their defaults
45    #[must_use]
46    pub fn new(nc: async_nats::Client) -> ClientBuilder {
47        ClientBuilder {
48            nc,
49            topic_prefix: None,
50            lattice: "default".to_string(),
51            timeout: Duration::from_secs(2),
52            auction_timeout: Duration::from_secs(5),
53        }
54    }
55
56    /// Sets the topic prefix for the NATS topic used for all control requests. Not to be confused
57    /// with lattice ID/prefix
58    #[must_use]
59    pub fn topic_prefix(self, prefix: impl Into<String>) -> ClientBuilder {
60        ClientBuilder {
61            topic_prefix: Some(prefix.into()),
62            ..self
63        }
64    }
65
66    /// The lattice ID/prefix used for this client. If this function is not invoked, the prefix will
67    /// be set to `default`
68    #[must_use]
69    pub fn lattice(self, prefix: impl Into<String>) -> ClientBuilder {
70        ClientBuilder {
71            lattice: prefix.into(),
72            ..self
73        }
74    }
75
76    /// Sets the timeout for control interface requests issued by the client. If not set, the
77    /// default will be 2 seconds
78    #[must_use]
79    pub fn timeout(self, timeout: Duration) -> ClientBuilder {
80        ClientBuilder { timeout, ..self }
81    }
82
83    /// Sets the timeout for auction (scatter/gather) operations. If not set, the default will be 5
84    /// seconds
85    #[must_use]
86    pub fn auction_timeout(self, timeout: Duration) -> ClientBuilder {
87        ClientBuilder {
88            auction_timeout: timeout,
89            ..self
90        }
91    }
92
93    /// Constructs the client with the given configuration from the builder
94    #[must_use]
95    pub fn build(self) -> Client {
96        Client {
97            nc: self.nc,
98            topic_prefix: self.topic_prefix,
99            lattice: self.lattice,
100            timeout: self.timeout,
101            auction_timeout: self.auction_timeout,
102        }
103    }
104}
105
106/// Lattice control interface client
107#[derive(Clone)]
108#[non_exhaustive]
109pub struct Client {
110    /// Internal `async-nats` client
111    nc: async_nats::Client,
112    /// Topic prefix that should be used with this lattice control client
113    topic_prefix: Option<String>,
114    /// Lattice prefix
115    lattice: String,
116    /// Timeout
117    timeout: Duration,
118    /// Timeout to use when limiting auctions
119    auction_timeout: Duration,
120}
121
122impl Debug for Client {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        f.debug_struct("Client")
125            .field("topic_prefix", &self.topic_prefix)
126            .field("lattice", &self.lattice)
127            .field("timeout", &self.timeout)
128            .field("auction_timeout", &self.auction_timeout)
129            .finish_non_exhaustive()
130    }
131}
132
133impl Client {
134    /// Convenience method for creating a new client with all default settings. This is the same as
135    /// calling `ClientBuilder::new(nc).build()`
136    #[must_use]
137    pub fn new(nc: async_nats::Client) -> Client {
138        ClientBuilder::new(nc).build()
139    }
140
141    /// Get a copy of the NATS client in use by this control client
142    #[allow(unused)]
143    #[must_use]
144    pub fn nats_client(&self) -> async_nats::Client {
145        self.nc.clone()
146    }
147
148    /// Retrieve the lattice in use by the [`Client`]
149    pub fn lattice(&self) -> &str {
150        self.lattice.as_ref()
151    }
152
153    /// Perform a request with a timeout
154    #[instrument(level = "debug", skip_all)]
155    pub(crate) async fn request_timeout(
156        &self,
157        subject: String,
158        payload: Vec<u8>,
159        timeout: Duration,
160    ) -> Result<async_nats::Message> {
161        match tokio::time::timeout(
162            timeout,
163            self.nc.request_with_headers(
164                subject,
165                otel::HeaderInjector::default_with_span().into(),
166                payload.into(),
167            ),
168        )
169        .await
170        {
171            Err(_) => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timed out").into()),
172            Ok(Ok(message)) => Ok(message),
173            Ok(Err(e)) => Err(e.into()),
174        }
175    }
176
177    /// Queries the lattice for all responsive hosts, waiting for the full period specified by
178    /// _timeout_.
179    #[instrument(level = "debug", skip_all)]
180    pub async fn get_hosts(&self) -> Result<Vec<CtlResponse<Host>>> {
181        let subject = broker::v1::queries::hosts(&self.topic_prefix, &self.lattice);
182        debug!("get_hosts:publish {}", &subject);
183        self.publish_and_wait(subject, Vec::new()).await
184    }
185
186    /// Retrieves the contents of a running host
187    #[instrument(level = "debug", skip_all)]
188    pub async fn get_host_inventory(&self, host_id: &str) -> Result<CtlResponse<HostInventory>> {
189        let subject = broker::v1::queries::host_inventory(
190            &self.topic_prefix,
191            &self.lattice,
192            IdentifierKind::is_host_id(host_id)?.as_str(),
193        );
194        debug!("get_host_inventory:request {}", &subject);
195        match self.request_timeout(subject, vec![], self.timeout).await {
196            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
197            Err(e) => Err(format!("Did not receive host inventory from target host: {e}").into()),
198        }
199    }
200
201    /// Retrieves the full set of all cached claims in the lattice.
202    #[instrument(level = "debug", skip_all)]
203    pub async fn get_claims(&self) -> Result<CtlResponse<Vec<HashMap<String, String>>>> {
204        let subject = broker::v1::queries::claims(&self.topic_prefix, &self.lattice);
205        debug!("get_claims:request {}", &subject);
206        match self.request_timeout(subject, vec![], self.timeout).await {
207            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
208            Err(e) => Err(format!("Did not receive claims from lattice: {e}").into()),
209        }
210    }
211
212    /// Performs an component auction within the lattice, publishing a set of constraints and the
213    /// metadata for the component in question. This will always wait for the full period specified by
214    /// _duration_, and then return the set of gathered results. It is then up to the client to
215    /// choose from among the "auction winners" to issue the appropriate command to start an component.
216    /// Clients cannot assume that auctions will always return at least one result.
217    #[instrument(level = "debug", skip_all)]
218    pub async fn perform_component_auction(
219        &self,
220        component_ref: &str,
221        component_id: &str,
222        constraints: impl Into<BTreeMap<String, String>>,
223    ) -> Result<Vec<CtlResponse<ComponentAuctionAck>>> {
224        let subject = broker::v1::component_auction_subject(&self.topic_prefix, &self.lattice);
225        let bytes = json_serialize(
226            ComponentAuctionRequest::builder()
227                .component_ref(IdentifierKind::is_component_ref(component_ref)?)
228                .component_id(IdentifierKind::is_component_id(component_id)?)
229                .constraints(constraints.into())
230                .build()?,
231        )?;
232        debug!("component_auction:publish {}", &subject);
233        self.publish_and_wait(subject, bytes).await
234    }
235
236    /// Performs a provider auction within the lattice, publishing a set of constraints and the
237    /// metadata for the provider in question.
238    ///
239    /// This will always wait for the full period specified by _duration_, and then return the set of gathered results.
240    /// It is then up to the client to choose from among the "auction winners" and issue the appropriate command to start a
241    /// provider.
242    ///
243    /// Clients should not assume that auctions will always return at least one result.
244    ///
245    /// # Arguments
246    ///
247    /// * `provider_ref` - The ID of the provider to auction
248    /// * `provider_id` - The ID of the provider auction
249    /// * `constraints` - Constraints that govern where the provider can be placed
250    ///
251    #[instrument(level = "debug", skip_all)]
252    pub async fn perform_provider_auction(
253        &self,
254        provider_ref: &str,
255        provider_id: &str,
256        constraints: impl Into<BTreeMap<String, String>>,
257    ) -> Result<Vec<CtlResponse<ProviderAuctionAck>>> {
258        let subject = broker::v1::provider_auction_subject(&self.topic_prefix, &self.lattice);
259        let bytes = json_serialize(
260            ProviderAuctionRequest::builder()
261                .provider_ref(IdentifierKind::is_provider_ref(provider_ref)?)
262                .provider_id(IdentifierKind::is_provider_id(provider_id)?)
263                .constraints(constraints.into())
264                .build()?,
265        )?;
266        debug!("provider_auction:publish {}", &subject);
267        self.publish_and_wait(subject, bytes).await
268    }
269
270    /// Sends a request to the given host to scale a given component.
271    ///
272    /// This returns an acknowledgement of _receipt_ of the command, not a confirmation that the component scaled.
273    /// An acknowledgement will either indicate some form of validation failure, or, if no failure occurs, the receipt of
274    /// the command.
275    ///
276    /// To avoid blocking consumers, wasmCloud hosts will acknowledge the scale component
277    /// command prior to fetching the component's OCI bytes.
278    ///
279    /// Client that need deterministic results as to whether the component completed its startup process
280    /// must monitor the appropriate event in the control event stream.
281    ///
282    /// # Arguments
283    ///
284    /// * `host_id` - The ID of the host to scale the component on
285    /// * `component_ref` - The OCI reference of the component to scale
286    /// * `max_instances` - The maximum number of instances this component can run concurrently. Specifying `0` will stop the component.
287    /// * `annotations` - Optional annotations to apply to the component
288    /// * `config` - List of named configuration to use for the component
289    /// * `allow_update` - Whether to perform allow updates to the component (triggering a separate update)
290    ///
291    #[instrument(level = "debug", skip_all)]
292    pub async fn scale_component(
293        &self,
294        host_id: &str,
295        component_ref: &str,
296        component_id: &str,
297        max_instances: u32,
298        annotations: Option<BTreeMap<String, String>>,
299        config: Vec<String>,
300    ) -> Result<CtlResponse<()>> {
301        let host_id = IdentifierKind::is_host_id(host_id)?;
302        let subject = broker::v1::commands::scale_component(
303            &self.topic_prefix,
304            &self.lattice,
305            host_id.as_str(),
306        );
307        debug!("scale_component:request {}", &subject);
308        let bytes = json_serialize(ScaleComponentCommand {
309            max_instances,
310            component_ref: IdentifierKind::is_component_ref(component_ref)?,
311            component_id: IdentifierKind::is_component_id(component_id)?,
312            host_id,
313            annotations,
314            config,
315            ..Default::default()
316        })?;
317        match self.request_timeout(subject, bytes, self.timeout).await {
318            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
319            Err(e) => Err(format!("Did not receive scale component acknowledgement: {e}").into()),
320        }
321    }
322
323    /// Publishes a registry credential map to the control interface of the lattice.
324    ///
325    /// All hosts will be listening and overwrite their registry credential maps with the new information.
326    ///
327    /// It is highly recommended you use TLS connections with NATS and isolate the control interface
328    /// credentials when using this function in production as the data contains secrets
329    ///
330    /// # Arguments
331    ///
332    /// * `registries` - A map of registry names to their credentials to be used for fetching from specific registries
333    ///
334    #[instrument(level = "debug", skip_all)]
335    pub async fn put_registries(
336        &self,
337        registries: HashMap<String, RegistryCredential>,
338    ) -> Result<CtlResponse<()>> {
339        let subject = broker::v1::publish_registries(&self.topic_prefix, &self.lattice);
340        debug!("put_registries:publish {}", &subject);
341        let bytes = json_serialize(&registries)?;
342        let resp = self
343            .nc
344            .publish_with_headers(
345                subject,
346                otel::HeaderInjector::default_with_span().into(),
347                bytes.into(),
348            )
349            .await;
350        if let Err(e) = resp {
351            Err(format!("Failed to push registry credential map: {e}").into())
352        } else {
353            Ok(CtlResponse::<()>::success(
354                "successfully added registries".into(),
355            ))
356        }
357    }
358
359    /// Puts a link into the lattice.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if it was unable to put the link
364    #[instrument(level = "debug", skip_all)]
365    pub async fn put_link(&self, link: Link) -> Result<CtlResponse<()>> {
366        // Validate link parameters
367        IdentifierKind::is_component_id(&link.source_id)?;
368        IdentifierKind::is_component_id(&link.target)?;
369        IdentifierKind::is_link_name(&link.name)?;
370
371        let subject = broker::v1::put_link(&self.topic_prefix, &self.lattice);
372        debug!("put_link:request {}", &subject);
373
374        let bytes = crate::json_serialize(link)?;
375        match self.request_timeout(subject, bytes, self.timeout).await {
376            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
377            Err(e) => Err(format!("Did not receive put link acknowledgement: {e}").into()),
378        }
379    }
380
381    /// Deletes a link from the lattice metadata keyvalue bucket.
382    ///
383    /// This is an idempotent operation.
384    ///
385    /// # Errors
386    ///
387    /// Returns an error if it was unable to delete.
388    #[instrument(level = "debug", skip_all)]
389    pub async fn delete_link(
390        &self,
391        source_id: &str,
392        link_name: &str,
393        wit_namespace: &str,
394        wit_package: &str,
395    ) -> Result<CtlResponse<()>> {
396        let subject = broker::v1::delete_link(&self.topic_prefix, &self.lattice);
397        let ld = DeleteInterfaceLinkDefinitionRequest::from_source_and_link_metadata(
398            &IdentifierKind::is_component_id(source_id)?,
399            &IdentifierKind::is_link_name(link_name)?,
400            wit_namespace,
401            wit_package,
402        );
403        let bytes = crate::json_serialize(&ld)?;
404        match self.request_timeout(subject, bytes, self.timeout).await {
405            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
406            Err(e) => Err(format!("Did not receive delete link acknowledgement: {e}").into()),
407        }
408    }
409
410    /// Retrieves the list of link definitions stored in the lattice metadata key-value bucket.
411    ///
412    /// If the client was created with caching, this will return the cached list of links. Otherwise,
413    /// it will query the bucket for the list of links.
414    ///
415    #[instrument(level = "debug", skip_all)]
416    pub async fn get_links(&self) -> Result<CtlResponse<Vec<Link>>> {
417        let subject = broker::v1::queries::link_definitions(&self.topic_prefix, &self.lattice);
418        debug!("get_links:request {}", &subject);
419        match self.request_timeout(subject, vec![], self.timeout).await {
420            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
421            Err(e) => Err(format!("Did not receive a response to get links: {e}").into()),
422        }
423    }
424
425    /// Puts a named config, replacing any data that is already present.
426    ///
427    /// Config names must be valid NATS subject strings and not contain any `.` or `>` characters.
428    ///
429    /// # Arguments
430    ///
431    /// * `config_name` - Name of the configuration that should be saved
432    /// * `config` - contents of the configuration to be saved
433    ///
434    #[instrument(level = "debug", skip_all)]
435    pub async fn put_config(
436        &self,
437        config_name: &str,
438        config: impl Into<HashMap<String, String>>,
439    ) -> Result<CtlResponse<()>> {
440        let subject = broker::v1::put_config(&self.topic_prefix, &self.lattice, config_name);
441        debug!(%subject, %config_name, "Putting config");
442        let data = serde_json::to_vec(&config.into())?;
443        match self.request_timeout(subject, data, self.timeout).await {
444            Ok(msg) => json_deserialize(&msg.payload),
445            Err(e) => Err(format!("Did not receive a response to put config request: {e}").into()),
446        }
447    }
448
449    /// Delete the named config item.
450    ///
451    /// Config names must be valid NATS subject strings and not contain any `.` or `>` characters.
452    ///
453    /// # Arguments
454    ///
455    /// * `config_name` - Name of the configuration that should be deleted
456    ///
457    #[instrument(level = "debug", skip_all)]
458    pub async fn delete_config(&self, config_name: &str) -> Result<CtlResponse<()>> {
459        let subject = broker::v1::delete_config(&self.topic_prefix, &self.lattice, config_name);
460        debug!(%subject, %config_name, "Delete config");
461        match self
462            .request_timeout(subject, Vec::default(), self.timeout)
463            .await
464        {
465            Ok(msg) => json_deserialize(&msg.payload),
466            Err(e) => {
467                Err(format!("Did not receive a response to delete config request: {e}").into())
468            }
469        }
470    }
471
472    /// Get the named config item.
473    ///
474    /// # Arguments
475    ///
476    /// * `config_name` -  The name of the config to fetch. Config names must be valid NATS subject strings and not contain any `.` or `>` characters.
477    ///
478    /// # Returns
479    ///
480    /// A map of key-value pairs representing the contents of the config item. This response is wrapped in the [CtlResponse] type. If
481    /// the config item does not exist, the host will return a [CtlResponse] with a `success` field set to `true` and a `response` field
482    /// set to [Option::None]. If the config item exists, the host will return a [CtlResponse] with a `success` field set to `true` and a
483    /// `response` field set to [Option::Some] containing the key-value pairs of the config item.
484    ///
485    /// # Example
486    ///
487    /// ```no_run
488    /// # use std::collections::HashMap;
489    /// # #[tokio::main(flavor = "current_thread")]
490    /// # async fn main() {
491    /// let nc_client = async_nats::connect("127.0.0.1:4222").await.expect("failed to build NATS client");
492    /// let ctl_client = wasmcloud_control_interface::Client::new(nc_client);
493    /// ctl_client.put_config(
494    ///     "foo",
495    ///     HashMap::from_iter(vec![("key".to_string(), "value".to_string())]),
496    /// )
497    /// .await
498    /// .expect("should be able to put config");
499    ///
500    /// let config_resp = ctl_client.get_config("foo").await.expect("should be able to get config");
501    /// assert!(config_resp.succeeded());
502    /// assert_eq!(config_resp.data(), Some(&HashMap::from_iter(vec![("key".to_string(), "value".to_string())])));
503    ///
504    /// // Note that the host will return a success response even if the config item does not exist.
505    /// // Errors are reserved for communication problems with the host or with the config store.
506    /// let absent_config_resp = ctl_client.get_config("bar").await.expect("should be able to get config");
507    /// assert!(absent_config_resp.succeeded());
508    /// assert_eq!(absent_config_resp.data(), None);
509    ///
510    /// # }
511    /// ```
512    ///
513    #[instrument(level = "debug", skip_all)]
514    pub async fn get_config(
515        &self,
516        config_name: &str,
517    ) -> Result<CtlResponse<HashMap<String, String>>> {
518        let subject = broker::v1::queries::config(&self.topic_prefix, &self.lattice, config_name);
519        debug!(%subject, %config_name, "Getting config");
520        match self
521            .request_timeout(subject, Vec::default(), self.timeout)
522            .await
523        {
524            Ok(msg) => json_deserialize(&msg.payload),
525            Err(e) => Err(format!("Did not receive a response to get config request: {e}").into()),
526        }
527    }
528
529    /// Put a new (or update an existing) label on the given host.
530    ///
531    /// # Arguments
532    ///
533    /// * `host_id` - ID of the host on which the label should be placed
534    /// * `key` - The key of the label
535    /// * `value` - The value of the label
536    ///
537    /// # Errors
538    ///
539    /// Will return an error if there is a communication problem with the host
540    ///
541    pub async fn put_label(
542        &self,
543        host_id: &str,
544        key: &str,
545        value: &str,
546    ) -> Result<CtlResponse<()>> {
547        let subject = broker::v1::put_label(&self.topic_prefix, &self.lattice, host_id);
548        debug!(%subject, "putting label");
549        let bytes = json_serialize(HostLabel {
550            key: key.to_string(),
551            value: value.to_string(),
552        })?;
553        match self.request_timeout(subject, bytes, self.timeout).await {
554            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
555            Err(e) => Err(format!("Did not receive put label acknowledgement: {e}").into()),
556        }
557    }
558
559    /// Removes a label from the given host.
560    ///
561    /// # Arguments
562    ///
563    /// * `host_id` - ID of the host on which the label should be deleted
564    /// * `key` - The key of the label that should be deleted
565    ///
566    /// # Errors
567    ///
568    /// Will return an error if there is a communication problem with the host
569    ///
570    pub async fn delete_label(&self, host_id: &str, key: &str) -> Result<CtlResponse<()>> {
571        let subject = broker::v1::delete_label(&self.topic_prefix, &self.lattice, host_id);
572        debug!(%subject, "removing label");
573        let bytes = json_serialize(HostLabelIdentifier {
574            key: key.to_string(),
575        })?;
576        match self.request_timeout(subject, bytes, self.timeout).await {
577            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
578            Err(e) => Err(format!("Did not receive remove label acknowledgement: {e}").into()),
579        }
580    }
581
582    /// Command a host to replace an existing component with a new component indicated by an OCI image reference.
583    ///
584    /// The host will acknowledge this request as soon as it verifies that the target component is running.
585    ///
586    /// Note that acknowledgement occurs **before** the new bytes are downloaded. Live-updating an component can take a long time
587    /// and control clients cannot block waiting for a reply that could come several seconds later.
588    ///
589    /// To properly verify that a component has been updated, create  listener for the appropriate [`PublishedEvent`] on the
590    /// control events channel
591    ///
592    /// # Arguments
593    ///
594    /// * `host_id` - ID of the host on which the component should be updated
595    /// * `existing_component_id` - ID of the existing component
596    /// * `new_component_ref` - New component reference that should be used
597    /// * `annotations` - Annotations to place on the newly updated component
598    ///
599    #[instrument(level = "debug", skip_all)]
600    pub async fn update_component(
601        &self,
602        host_id: &str,
603        existing_component_id: &str,
604        new_component_ref: &str,
605        annotations: Option<BTreeMap<String, String>>,
606    ) -> Result<CtlResponse<()>> {
607        let host_id = IdentifierKind::is_host_id(host_id)?;
608        let subject = broker::v1::commands::update_component(
609            &self.topic_prefix,
610            &self.lattice,
611            host_id.as_str(),
612        );
613        debug!("update_component:request {}", &subject);
614        let bytes = json_serialize(UpdateComponentCommand {
615            host_id,
616            component_id: IdentifierKind::is_component_id(existing_component_id)?,
617            new_component_ref: IdentifierKind::is_component_ref(new_component_ref)?,
618            annotations,
619        })?;
620        match self.request_timeout(subject, bytes, self.timeout).await {
621            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
622            Err(e) => Err(format!("Did not receive update component acknowledgement: {e}").into()),
623        }
624    }
625
626    /// Command a host to start a provider with a given OCI reference.
627    ///
628    /// The specified link name will be used (or "default" if none is specified).
629    ///
630    /// The target wasmCloud host will acknowledge the receipt of this command _before_ downloading the provider's bytes from the
631    /// OCI registry, indicating either a validation failure or success.
632    ///
633    /// Clients that need deterministic guarantees that the provider has completed its startup process, should
634    /// monitor the control event stream for the appropriate event.
635    ///
636    /// The `provider_configuration` parameter is a list of named configs to use for this provider, and configurations are not required.
637    ///
638    /// # Arguments
639    ///
640    /// * `host_id` - ID of the host on which to start the provider
641    /// * `provider_ref` - Image reference of the provider to start
642    /// * `provider_id` - ID of the provider to start
643    /// * `annotations` - Annotations to place on the started provider
644    /// * `provider_configuration` - Configuration relevant to the provider (if any)
645    ///
646    #[instrument(level = "debug", skip_all)]
647    pub async fn start_provider(
648        &self,
649        host_id: &str,
650        provider_ref: &str,
651        provider_id: &str,
652        annotations: Option<BTreeMap<String, String>>,
653        provider_configuration: Vec<String>,
654    ) -> Result<CtlResponse<()>> {
655        let host_id = IdentifierKind::is_host_id(host_id)?;
656        let subject = broker::v1::commands::start_provider(
657            &self.topic_prefix,
658            &self.lattice,
659            host_id.as_str(),
660        );
661        debug!("start_provider:request {}", &subject);
662        let mut cmd = StartProviderCommand::builder()
663            .host_id(&host_id)
664            .provider_ref(&IdentifierKind::is_provider_ref(provider_ref)?)
665            .provider_id(&IdentifierKind::is_component_id(provider_id)?);
666        if let Some(annotations) = annotations {
667            cmd = cmd.annotations(annotations);
668        }
669        let cmd = cmd.config(provider_configuration).build()?;
670        let bytes = json_serialize(cmd)?;
671
672        match self.request_timeout(subject, bytes, self.timeout).await {
673            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
674            Err(e) => Err(format!("Did not receive start provider acknowledgement: {e}").into()),
675        }
676    }
677
678    /// Issues a command to a host to stop a provider for the given OCI reference, link name, and
679    /// contract ID.
680    ///
681    /// The target wasmCloud host will acknowledge the receipt of this command, and
682    /// _will not_ supply a discrete confirmation that a provider has terminated. For that kind of
683    /// information, the client must also monitor the control event stream
684    ///
685    /// # Arguments
686    ///
687    /// * `host_id` - ID of the host on which to stop the provider
688    /// * `provider_id` - ID of the provider to stop
689    ///
690    #[instrument(level = "debug", skip_all)]
691    pub async fn stop_provider(&self, host_id: &str, provider_id: &str) -> Result<CtlResponse<()>> {
692        let host_id = IdentifierKind::is_host_id(host_id)?;
693
694        let subject = broker::v1::commands::stop_provider(
695            &self.topic_prefix,
696            &self.lattice,
697            host_id.as_str(),
698        );
699        debug!("stop_provider:request {}", &subject);
700        let bytes = json_serialize(StopProviderCommand {
701            host_id,
702            provider_id: IdentifierKind::is_component_id(provider_id)?,
703        })?;
704
705        match self.request_timeout(subject, bytes, self.timeout).await {
706            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
707            Err(e) => Err(format!("Did not receive stop provider acknowledgement: {e}").into()),
708        }
709    }
710
711    /// Issues a command to a specific host to perform a graceful termination.
712    ///
713    /// The target host will acknowledge receipt of the command before it attempts a shutdown.
714    ///
715    /// To deterministically verify that the host is down, a client should monitor for the "host stopped" event or
716    /// passively detect the host down by way of a lack of heartbeat receipts
717    ///
718    /// # Arguments
719    ///
720    /// * `host_id` - ID of the host to stop
721    /// * `timeout_ms` - (optional) amount of time to allow the host to complete stopping
722    ///
723    #[instrument(level = "debug", skip_all)]
724    pub async fn stop_host(
725        &self,
726        host_id: &str,
727        timeout_ms: Option<u64>,
728    ) -> Result<CtlResponse<()>> {
729        let host_id = IdentifierKind::is_host_id(host_id)?;
730        let subject =
731            broker::v1::commands::stop_host(&self.topic_prefix, &self.lattice, host_id.as_str());
732        debug!("stop_host:request {}", &subject);
733        let bytes = json_serialize(StopHostCommand {
734            host_id,
735            timeout: timeout_ms,
736        })?;
737
738        match self.request_timeout(subject, bytes, self.timeout).await {
739            Ok(msg) => Ok(json_deserialize(&msg.payload)?),
740            Err(e) => Err(format!("Did not receive stop host acknowledgement: {e}").into()),
741        }
742    }
743
744    /// Publish a message and wait for a response
745    async fn publish_and_wait<D: DeserializeOwned>(
746        &self,
747        subject: String,
748        payload: Vec<u8>,
749    ) -> Result<Vec<D>> {
750        let reply = self.nc.new_inbox();
751        let sub = self.nc.subscribe(reply.clone()).await?;
752        self.nc
753            .publish_with_reply_and_headers(
754                subject.clone(),
755                reply,
756                otel::HeaderInjector::default_with_span().into(),
757                payload.into(),
758            )
759            .await?;
760        let nc = self.nc.clone();
761        tokio::spawn(async move {
762            if let Err(error) = nc.flush().await {
763                error!(%error, "flush after publish");
764            }
765        });
766        Ok(collect_sub_timeout::<D>(sub, self.auction_timeout, subject.as_str()).await)
767    }
768
769    /// Returns the receiver end of a channel that subscribes to the lattice event stream.
770    ///
771    /// Any [`Event`]s that are published after this channel is created
772    /// will be added to the receiver channel's buffer, which can be observed or handled if needed.
773    ///
774    /// See the example for how you could use this receiver to handle events.
775    ///
776    /// # Example
777    ///
778    /// ```rust
779    /// use wasmcloud_control_interface::{Client, ClientBuilder};
780    /// async {
781    ///   let nc = async_nats::connect("127.0.0.1:4222").await.unwrap();
782    ///   let client = ClientBuilder::new(nc)
783    ///                 .timeout(std::time::Duration::from_millis(1000))
784    ///                 .auction_timeout(std::time::Duration::from_millis(1000))
785    ///                 .build();
786    ///   let mut receiver = client.events_receiver(vec!["component_scaled".to_string()]).await.unwrap();
787    ///   while let Some(evt) = receiver.recv().await {
788    ///       println!("Event received: {:?}", evt);
789    ///   }
790    /// };
791    /// ```
792    ///
793    /// # Arguments
794    ///
795    /// * `event_types` - List of types of events to listen for
796    ///
797    #[allow(clippy::missing_errors_doc)] // TODO: Document errors
798    pub async fn events_receiver(&self, event_types: Vec<String>) -> Result<Receiver<Event>> {
799        let (sender, receiver) = tokio::sync::mpsc::channel(5000);
800        let futs = event_types.into_iter().map(|event_type| {
801            self.nc
802                .subscribe(format!("wasmbus.evt.{}.{}", self.lattice, event_type))
803                .map_err(|err| Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
804        });
805        let subs: Vec<Subscriber> = futures::future::join_all(futs)
806            .await
807            .into_iter()
808            .collect::<Result<_>>()?;
809        let mut stream = futures::stream::select_all(subs);
810        tokio::spawn(async move {
811            while let Some(msg) = stream.next().await {
812                let Ok(evt) = json_deserialize::<Event>(&msg.payload) else {
813                    error!("Object received on event stream was not a CloudEvent");
814                    continue;
815                };
816                trace!("received event: {:?}", evt);
817                let Ok(()) = sender.send(evt).await else {
818                    break;
819                };
820            }
821        });
822        Ok(receiver)
823    }
824}
825
826/// Collect `T` values until timeout has elapsed
827pub(crate) async fn collect_sub_timeout<T: DeserializeOwned>(
828    mut sub: async_nats::Subscriber,
829    timeout: Duration,
830    reason: &str,
831) -> Vec<T> {
832    let mut items = Vec::new();
833    let sleep = tokio::time::sleep(timeout);
834    tokio::pin!(sleep);
835    loop {
836        tokio::select! {
837            msg = sub.next() => {
838                let Some(msg) = msg else {
839                    break;
840                };
841                if msg.payload.is_empty() {
842                    break;
843                }
844                match json_deserialize::<T>(&msg.payload) {
845                    Ok(item) => items.push(item),
846                    Err(error) => {
847                        error!(%reason, %error,
848                            "deserialization error in auction - results may be incomplete",
849                        );
850                        break;
851                    }
852                }
853            },
854            () = &mut sleep => { /* timeout */ break; }
855        }
856    }
857    items
858}
859
860#[cfg(test)]
861mod tests {
862    use super::*;
863
864    /// Note: This test is a means of manually watching the event stream as CloudEvents are received
865    /// It does not assert functionality, and so we've marked it as ignore to ensure it's not run by default
866    /// It currently listens for 120 seconds then exits
867    #[tokio::test]
868    #[ignore]
869    async fn test_events_receiver() {
870        let nc = async_nats::connect("127.0.0.1:4222").await.unwrap();
871        let client = ClientBuilder::new(nc)
872            .timeout(Duration::from_millis(1000))
873            .auction_timeout(Duration::from_millis(1000))
874            .build();
875        let mut receiver = client
876            .events_receiver(vec!["foobar".to_string()])
877            .await
878            .unwrap();
879        tokio::spawn(async move {
880            while let Some(evt) = receiver.recv().await {
881                println!("Event received: {evt:?}");
882            }
883        });
884        println!("Listening to Cloud Events for 120 seconds. Then we will quit.");
885        tokio::time::sleep(Duration::from_secs(120)).await;
886    }
887
888    #[test]
889    fn test_check_identifier() -> Result<()> {
890        assert!(IdentifierKind::is_host_id("").is_err());
891        assert!(IdentifierKind::is_host_id(" ").is_err());
892        let host_id = IdentifierKind::is_host_id("             ");
893        assert!(host_id.is_err(), "parsing host id should have failed");
894        assert!(host_id
895            .unwrap_err()
896            .to_string()
897            .contains("Host ID cannot be empty"));
898        let provider_ref = IdentifierKind::is_provider_ref("");
899        assert!(
900            provider_ref.is_err(),
901            "parsing provider ref should have failed"
902        );
903        assert!(provider_ref
904            .unwrap_err()
905            .to_string()
906            .contains("Provider OCI reference cannot be empty"));
907        assert!(IdentifierKind::is_host_id("host_id").is_ok());
908        let component_id = IdentifierKind::is_component_id("            iambatman  ")?;
909        assert_eq!(component_id, "iambatman");
910
911        Ok(())
912    }
913
914    #[tokio::test]
915    #[ignore]
916    /// Test after large 1.0 refcomponents to ensure all return types are formatted as [CtlResponse] types, and that
917    /// the host can handle all of the requests.
918    ///
919    /// You must run NATS and one host locally to run this test successfully.
920    async fn ctl_response_comprehensive() {
921        let client = Client::new(
922            async_nats::connect("127.0.0.1:4222")
923                .await
924                .expect("should be able to connect to local NATS"),
925        );
926        // Fetch the one host we ran
927        let hosts = client
928            .get_hosts()
929            .await
930            .expect("should be able to fetch at least a host");
931        assert_eq!(hosts.len(), 1);
932        let host = hosts.first().expect("one host to exist");
933        assert!(host.success);
934        assert!(host.message.is_empty());
935        assert!(host.response.is_some());
936        let host = host.response.as_ref().unwrap();
937        ////
938        // Actor operations
939        ////
940        // Actor Auction
941        let auction_response = client
942            .perform_component_auction(
943                "ghcr.io/brooksmtownsend/http-hello-world-rust:0.1.0",
944                "echo",
945                BTreeMap::new(),
946            )
947            .await
948            .expect("should be able to auction an component");
949        assert_eq!(auction_response.len(), 1);
950        let first_ack = auction_response.first().expect("a single component ack");
951        let auction_ack = first_ack.response.as_ref().unwrap();
952        let (component_ref, component_id) = (&auction_ack.component_ref, &auction_ack.component_id);
953        // Actor Scale
954        let scale_response = client
955            .scale_component(
956                &host.id,
957                component_ref,
958                component_id,
959                1,
960                None,
961                Vec::with_capacity(0),
962            )
963            .await
964            .expect("should be able to scale component");
965        assert!(scale_response.success);
966        assert!(scale_response.message.is_empty());
967        assert!(scale_response.response.is_none());
968        // Actor Update (TODO(brooksmtownsend): we should test this with a real update, but I'm using a failure case)
969        let update_component_resp = client
970            .update_component(
971                &host.id,
972                "nonexistantcomponentID",
973                "wasmcloud.azurecr.io/kvcounter:0.4.0",
974                None,
975            )
976            .await
977            .expect("should be able to issue update component request");
978        assert!(!update_component_resp.success);
979        assert_eq!(
980            update_component_resp.message,
981            "component not found".to_string()
982        );
983        assert_eq!(update_component_resp.response, None);
984
985        ////
986        // Provider operations
987        ////
988        // Provider Auction
989        let provider_acks = client
990            .perform_provider_auction(
991                "wasmcloud.azurecr.io/httpserver:0.19.1",
992                "httpserver",
993                BTreeMap::new(),
994            )
995            .await
996            .expect("should be able to hold provider auction");
997        assert_eq!(provider_acks.len(), 1);
998        let provider_ack = provider_acks.first().expect("a single provider ack");
999        assert!(provider_ack.success);
1000        assert!(provider_ack.message.is_empty());
1001        assert!(provider_ack.response.is_some());
1002        let auction_ack = provider_ack.response.as_ref().unwrap();
1003        let (provider_ref, provider_id) = (&auction_ack.provider_ref, &auction_ack.provider_id);
1004        // Provider Start
1005        let start_response = client
1006            .start_provider(&host.id, provider_ref, provider_id, None, vec![])
1007            .await
1008            .expect("should be able to start provider");
1009        assert!(start_response.success);
1010        assert!(start_response.message.is_empty());
1011        assert!(start_response.response.is_none());
1012        // Provider Stop (TODO(brooksmtownsend): not enough time to let the provider really stop, so I'm using a failure case)
1013        let stop_response = client
1014            .stop_provider(&host.id, "notarealproviderID")
1015            .await
1016            .expect("should be able to issue stop provider request");
1017        assert!(!stop_response.success);
1018        assert_eq!(
1019            stop_response.message,
1020            "provider with that ID is not running".to_string()
1021        );
1022        assert!(stop_response.response.is_none());
1023        ////
1024        // Link operations
1025        ////
1026        tokio::time::sleep(Duration::from_secs(5)).await;
1027        // Link Put
1028        let link_put = client
1029            .put_link(Link {
1030                source_id: "echo".to_string(),
1031                target: "httpserver".to_string(),
1032                name: "default".to_string(),
1033                wit_namespace: "wasi".to_string(),
1034                wit_package: "http".to_string(),
1035                interfaces: vec!["incoming-handler".to_string()],
1036                ..Default::default()
1037            })
1038            .await
1039            .expect("should be able to put link");
1040        assert!(link_put.success);
1041        assert!(link_put.message.is_empty());
1042        assert!(link_put.response.is_none());
1043        let links_get = client
1044            .get_links()
1045            .await
1046            .expect("should be able to get links");
1047        assert!(links_get.success);
1048        assert!(links_get.message.is_empty());
1049        assert!(links_get.response.is_some());
1050        // Link Get
1051        let link_get = links_get.response.as_ref().unwrap().first().unwrap();
1052        assert_eq!(link_get.source_id, "echo");
1053        assert_eq!(link_get.target, "httpserver");
1054        assert_eq!(link_get.name, "default");
1055        assert_eq!(link_get.wit_namespace, "wasi");
1056        assert_eq!(link_get.wit_package, "http");
1057        // Link Del
1058        let link_del = client
1059            .delete_link("echo", "default", "wasi", "http")
1060            .await
1061            .expect("should be able to delete link");
1062        assert!(link_del.success);
1063        assert!(link_del.message.is_empty());
1064        assert!(link_del.response.is_none());
1065
1066        ////
1067        // Label operations
1068        ////
1069        // Label Put
1070        let label_one = client
1071            .put_label(&host.id, "idk", "lol")
1072            .await
1073            .expect("should be able to put label");
1074        assert!(label_one.success);
1075        assert!(label_one.message.is_empty());
1076        assert!(label_one.response.is_none());
1077        let label_two = client
1078            .put_label(&host.id, "foo", "bar")
1079            .await
1080            .expect("should be able to put another label");
1081        assert!(label_two.success);
1082        assert!(label_two.message.is_empty());
1083        assert!(label_two.response.is_none());
1084        // Label Del
1085        let del_label_one = client
1086            .delete_label(&host.id, "idk")
1087            .await
1088            .expect("should be able to delete label");
1089        assert!(del_label_one.success);
1090        assert!(del_label_one.message.is_empty());
1091        assert!(del_label_one.response.is_none());
1092        ////
1093        // Registry operations
1094        ////
1095        // Registry Put
1096        let registry_put = client
1097            .put_registries(HashMap::from_iter([(
1098                "mycloud.io".to_string(),
1099                RegistryCredential {
1100                    username: Some("user".to_string()),
1101                    password: Some("pass".to_string()),
1102                    registry_type: "oci".to_string(),
1103                    token: None,
1104                },
1105            )]))
1106            .await
1107            .expect("should be able to put registries");
1108        assert!(registry_put.success);
1109        assert!(registry_put.message.is_empty());
1110        assert!(registry_put.response.is_none());
1111
1112        ////
1113        // Config operations
1114        ////
1115        // Config Put
1116        let config_put = client
1117            .put_config(
1118                "test_config",
1119                HashMap::from_iter([("sup".to_string(), "hey".to_string())]),
1120            )
1121            .await
1122            .expect("should be able to put config");
1123        assert!(config_put.success);
1124        assert!(config_put.message.is_empty());
1125        assert!(config_put.response.is_none());
1126        // Config Get
1127        let config_get = client
1128            .get_config("test_config")
1129            .await
1130            .expect("should be able to get config");
1131        assert!(config_get.success);
1132        assert!(config_get.message.is_empty());
1133        assert!(config_get
1134            .response
1135            .is_some_and(|r| r.get("sup").is_some_and(|s| s == "hey")));
1136        // Config Del
1137        let config_del = client
1138            .delete_config("test_config")
1139            .await
1140            .expect("should be able to delete config");
1141        assert!(config_del.success);
1142        assert!(config_del.message.is_empty());
1143        assert!(config_del.response.is_none());
1144
1145        ////
1146        // Host operations
1147        ////
1148        // Host Get
1149        let inventory = client
1150            .get_host_inventory(&host.id)
1151            .await
1152            .expect("should be able to fetch at least a host");
1153        assert!(inventory.success);
1154        assert!(inventory.message.is_empty());
1155        assert!(inventory.response.is_some());
1156        let host_inventory = inventory.response.unwrap();
1157        assert!(host_inventory.components.iter().all(|a| a.id == "echo"));
1158        assert!(!host_inventory.labels.contains_key("idk"));
1159        assert!(host_inventory
1160            .labels
1161            .get("foo")
1162            .is_some_and(|f| f == &"bar".to_string()));
1163        // Host Stop
1164        let stop_host = client
1165            .stop_host(&host.id, Some(1234))
1166            .await
1167            .expect("should be able to stop host");
1168        assert!(stop_host.success);
1169        assert!(stop_host.message.is_empty());
1170        assert!(stop_host.response.is_none());
1171    }
1172}