1use core::fmt::{self, Debug};
4use core::time::Duration;
5
6use std::collections::{BTreeMap, HashMap};
7
8use async_nats::Subscriber;
9use cloudevents::event::Event;
10use futures::{StreamExt, TryFutureExt};
11use serde::de::DeserializeOwned;
12use tokio::sync::mpsc::Receiver;
13use tracing::{debug, error, instrument, trace};
14
15use crate::types::ctl::{
16 CtlResponse, ScaleComponentCommand, StartProviderCommand, StopHostCommand, StopProviderCommand,
17 UpdateComponentCommand,
18};
19use crate::types::host::{Host, HostInventory, HostLabel};
20use crate::types::link::Link;
21use crate::types::registry::RegistryCredential;
22use crate::types::rpc::{
23 ComponentAuctionAck, ComponentAuctionRequest, DeleteInterfaceLinkDefinitionRequest,
24 ProviderAuctionAck, ProviderAuctionRequest,
25};
26use crate::{
27 broker, json_deserialize, json_serialize, otel, HostLabelIdentifier, IdentifierKind, Result,
28};
29
30#[derive(Debug, Clone)]
33#[non_exhaustive]
34pub struct ClientBuilder {
35 nc: async_nats::Client,
36 topic_prefix: Option<String>,
37 lattice: String,
38 timeout: Duration,
39 auction_timeout: Duration,
40}
41
42impl ClientBuilder {
43 #[must_use]
46 pub fn new(nc: async_nats::Client) -> ClientBuilder {
47 ClientBuilder {
48 nc,
49 topic_prefix: None,
50 lattice: "default".to_string(),
51 timeout: Duration::from_secs(2),
52 auction_timeout: Duration::from_secs(5),
53 }
54 }
55
56 #[must_use]
59 pub fn topic_prefix(self, prefix: impl Into<String>) -> ClientBuilder {
60 ClientBuilder {
61 topic_prefix: Some(prefix.into()),
62 ..self
63 }
64 }
65
66 #[must_use]
69 pub fn lattice(self, prefix: impl Into<String>) -> ClientBuilder {
70 ClientBuilder {
71 lattice: prefix.into(),
72 ..self
73 }
74 }
75
76 #[must_use]
79 pub fn timeout(self, timeout: Duration) -> ClientBuilder {
80 ClientBuilder { timeout, ..self }
81 }
82
83 #[must_use]
86 pub fn auction_timeout(self, timeout: Duration) -> ClientBuilder {
87 ClientBuilder {
88 auction_timeout: timeout,
89 ..self
90 }
91 }
92
93 #[must_use]
95 pub fn build(self) -> Client {
96 Client {
97 nc: self.nc,
98 topic_prefix: self.topic_prefix,
99 lattice: self.lattice,
100 timeout: self.timeout,
101 auction_timeout: self.auction_timeout,
102 }
103 }
104}
105
106#[derive(Clone)]
108#[non_exhaustive]
109pub struct Client {
110 nc: async_nats::Client,
112 topic_prefix: Option<String>,
114 lattice: String,
116 timeout: Duration,
118 auction_timeout: Duration,
120}
121
122impl Debug for Client {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 f.debug_struct("Client")
125 .field("topic_prefix", &self.topic_prefix)
126 .field("lattice", &self.lattice)
127 .field("timeout", &self.timeout)
128 .field("auction_timeout", &self.auction_timeout)
129 .finish_non_exhaustive()
130 }
131}
132
133impl Client {
134 #[must_use]
137 pub fn new(nc: async_nats::Client) -> Client {
138 ClientBuilder::new(nc).build()
139 }
140
141 #[allow(unused)]
143 #[must_use]
144 pub fn nats_client(&self) -> async_nats::Client {
145 self.nc.clone()
146 }
147
148 pub fn lattice(&self) -> &str {
150 self.lattice.as_ref()
151 }
152
153 #[instrument(level = "debug", skip_all)]
155 pub(crate) async fn request_timeout(
156 &self,
157 subject: String,
158 payload: Vec<u8>,
159 timeout: Duration,
160 ) -> Result<async_nats::Message> {
161 match tokio::time::timeout(
162 timeout,
163 self.nc.request_with_headers(
164 subject,
165 otel::HeaderInjector::default_with_span().into(),
166 payload.into(),
167 ),
168 )
169 .await
170 {
171 Err(_) => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timed out").into()),
172 Ok(Ok(message)) => Ok(message),
173 Ok(Err(e)) => Err(e.into()),
174 }
175 }
176
177 #[instrument(level = "debug", skip_all)]
180 pub async fn get_hosts(&self) -> Result<Vec<CtlResponse<Host>>> {
181 let subject = broker::v1::queries::hosts(&self.topic_prefix, &self.lattice);
182 debug!("get_hosts:publish {}", &subject);
183 self.publish_and_wait(subject, Vec::new()).await
184 }
185
186 #[instrument(level = "debug", skip_all)]
188 pub async fn get_host_inventory(&self, host_id: &str) -> Result<CtlResponse<HostInventory>> {
189 let subject = broker::v1::queries::host_inventory(
190 &self.topic_prefix,
191 &self.lattice,
192 IdentifierKind::is_host_id(host_id)?.as_str(),
193 );
194 debug!("get_host_inventory:request {}", &subject);
195 match self.request_timeout(subject, vec![], self.timeout).await {
196 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
197 Err(e) => Err(format!("Did not receive host inventory from target host: {e}").into()),
198 }
199 }
200
201 #[instrument(level = "debug", skip_all)]
203 pub async fn get_claims(&self) -> Result<CtlResponse<Vec<HashMap<String, String>>>> {
204 let subject = broker::v1::queries::claims(&self.topic_prefix, &self.lattice);
205 debug!("get_claims:request {}", &subject);
206 match self.request_timeout(subject, vec![], self.timeout).await {
207 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
208 Err(e) => Err(format!("Did not receive claims from lattice: {e}").into()),
209 }
210 }
211
212 #[instrument(level = "debug", skip_all)]
218 pub async fn perform_component_auction(
219 &self,
220 component_ref: &str,
221 component_id: &str,
222 constraints: impl Into<BTreeMap<String, String>>,
223 ) -> Result<Vec<CtlResponse<ComponentAuctionAck>>> {
224 let subject = broker::v1::component_auction_subject(&self.topic_prefix, &self.lattice);
225 let bytes = json_serialize(
226 ComponentAuctionRequest::builder()
227 .component_ref(IdentifierKind::is_component_ref(component_ref)?)
228 .component_id(IdentifierKind::is_component_id(component_id)?)
229 .constraints(constraints.into())
230 .build()?,
231 )?;
232 debug!("component_auction:publish {}", &subject);
233 self.publish_and_wait(subject, bytes).await
234 }
235
236 #[instrument(level = "debug", skip_all)]
252 pub async fn perform_provider_auction(
253 &self,
254 provider_ref: &str,
255 provider_id: &str,
256 constraints: impl Into<BTreeMap<String, String>>,
257 ) -> Result<Vec<CtlResponse<ProviderAuctionAck>>> {
258 let subject = broker::v1::provider_auction_subject(&self.topic_prefix, &self.lattice);
259 let bytes = json_serialize(
260 ProviderAuctionRequest::builder()
261 .provider_ref(IdentifierKind::is_provider_ref(provider_ref)?)
262 .provider_id(IdentifierKind::is_provider_id(provider_id)?)
263 .constraints(constraints.into())
264 .build()?,
265 )?;
266 debug!("provider_auction:publish {}", &subject);
267 self.publish_and_wait(subject, bytes).await
268 }
269
270 #[instrument(level = "debug", skip_all)]
292 #[allow(clippy::too_many_arguments)]
293 pub async fn scale_component(
294 &self,
295 host_id: &str,
296 component_ref: &str,
297 component_id: &str,
298 max_instances: u32,
299 annotations: Option<BTreeMap<String, String>>,
300 config: Vec<String>,
301 allow_update: bool,
302 ) -> Result<CtlResponse<()>> {
303 let host_id = IdentifierKind::is_host_id(host_id)?;
304 let subject = broker::v1::commands::scale_component(
305 &self.topic_prefix,
306 &self.lattice,
307 host_id.as_str(),
308 );
309 debug!("scale_component:request {}", &subject);
310 let bytes = json_serialize(ScaleComponentCommand {
311 max_instances,
312 component_ref: IdentifierKind::is_component_ref(component_ref)?,
313 component_id: IdentifierKind::is_component_id(component_id)?,
314 host_id,
315 annotations,
316 config,
317 allow_update,
318 ..Default::default()
319 })?;
320 match self.request_timeout(subject, bytes, self.timeout).await {
321 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
322 Err(e) => Err(format!("Did not receive scale component acknowledgement: {e}").into()),
323 }
324 }
325
326 #[instrument(level = "debug", skip_all)]
338 pub async fn put_registries(
339 &self,
340 registries: HashMap<String, RegistryCredential>,
341 ) -> Result<CtlResponse<()>> {
342 let subject = broker::v1::publish_registries(&self.topic_prefix, &self.lattice);
343 debug!("put_registries:publish {}", &subject);
344 let bytes = json_serialize(®istries)?;
345 let resp = self
346 .nc
347 .publish_with_headers(
348 subject,
349 otel::HeaderInjector::default_with_span().into(),
350 bytes.into(),
351 )
352 .await;
353 if let Err(e) = resp {
354 Err(format!("Failed to push registry credential map: {e}").into())
355 } else {
356 Ok(CtlResponse::<()>::success(
357 "successfully added registries".into(),
358 ))
359 }
360 }
361
362 #[instrument(level = "debug", skip_all)]
368 pub async fn put_link(&self, link: Link) -> Result<CtlResponse<()>> {
369 IdentifierKind::is_component_id(&link.source_id)?;
371 IdentifierKind::is_component_id(&link.target)?;
372 IdentifierKind::is_link_name(&link.name)?;
373
374 let subject = broker::v1::put_link(&self.topic_prefix, &self.lattice);
375 debug!("put_link:request {}", &subject);
376
377 let bytes = crate::json_serialize(link)?;
378 match self.request_timeout(subject, bytes, self.timeout).await {
379 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
380 Err(e) => Err(format!("Did not receive put link acknowledgement: {e}").into()),
381 }
382 }
383
384 #[instrument(level = "debug", skip_all)]
392 pub async fn delete_link(
393 &self,
394 source_id: &str,
395 link_name: &str,
396 wit_namespace: &str,
397 wit_package: &str,
398 ) -> Result<CtlResponse<()>> {
399 let subject = broker::v1::delete_link(&self.topic_prefix, &self.lattice);
400 let ld = DeleteInterfaceLinkDefinitionRequest::from_source_and_link_metadata(
401 &IdentifierKind::is_component_id(source_id)?,
402 &IdentifierKind::is_link_name(link_name)?,
403 wit_namespace,
404 wit_package,
405 );
406 let bytes = crate::json_serialize(&ld)?;
407 match self.request_timeout(subject, bytes, self.timeout).await {
408 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
409 Err(e) => Err(format!("Did not receive delete link acknowledgement: {e}").into()),
410 }
411 }
412
413 #[instrument(level = "debug", skip_all)]
419 pub async fn get_links(&self) -> Result<CtlResponse<Vec<Link>>> {
420 let subject = broker::v1::queries::link_definitions(&self.topic_prefix, &self.lattice);
421 debug!("get_links:request {}", &subject);
422 match self.request_timeout(subject, vec![], self.timeout).await {
423 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
424 Err(e) => Err(format!("Did not receive a response to get links: {e}").into()),
425 }
426 }
427
428 #[instrument(level = "debug", skip_all)]
438 pub async fn put_config(
439 &self,
440 config_name: &str,
441 config: impl Into<HashMap<String, String>>,
442 ) -> Result<CtlResponse<()>> {
443 let subject = broker::v1::put_config(&self.topic_prefix, &self.lattice, config_name);
444 debug!(%subject, %config_name, "Putting config");
445 let data = serde_json::to_vec(&config.into())?;
446 match self.request_timeout(subject, data, self.timeout).await {
447 Ok(msg) => json_deserialize(&msg.payload),
448 Err(e) => Err(format!("Did not receive a response to put config request: {e}").into()),
449 }
450 }
451
452 #[instrument(level = "debug", skip_all)]
461 pub async fn delete_config(&self, config_name: &str) -> Result<CtlResponse<()>> {
462 let subject = broker::v1::delete_config(&self.topic_prefix, &self.lattice, config_name);
463 debug!(%subject, %config_name, "Delete config");
464 match self
465 .request_timeout(subject, Vec::default(), self.timeout)
466 .await
467 {
468 Ok(msg) => json_deserialize(&msg.payload),
469 Err(e) => {
470 Err(format!("Did not receive a response to delete config request: {e}").into())
471 }
472 }
473 }
474
475 #[instrument(level = "debug", skip_all)]
517 pub async fn get_config(
518 &self,
519 config_name: &str,
520 ) -> Result<CtlResponse<HashMap<String, String>>> {
521 let subject = broker::v1::queries::config(&self.topic_prefix, &self.lattice, config_name);
522 debug!(%subject, %config_name, "Getting config");
523 match self
524 .request_timeout(subject, Vec::default(), self.timeout)
525 .await
526 {
527 Ok(msg) => json_deserialize(&msg.payload),
528 Err(e) => Err(format!("Did not receive a response to get config request: {e}").into()),
529 }
530 }
531
532 pub async fn put_label(
545 &self,
546 host_id: &str,
547 key: &str,
548 value: &str,
549 ) -> Result<CtlResponse<()>> {
550 let subject = broker::v1::put_label(&self.topic_prefix, &self.lattice, host_id);
551 debug!(%subject, "putting label");
552 let bytes = json_serialize(HostLabel {
553 key: key.to_string(),
554 value: value.to_string(),
555 })?;
556 match self.request_timeout(subject, bytes, self.timeout).await {
557 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
558 Err(e) => Err(format!("Did not receive put label acknowledgement: {e}").into()),
559 }
560 }
561
562 pub async fn delete_label(&self, host_id: &str, key: &str) -> Result<CtlResponse<()>> {
574 let subject = broker::v1::delete_label(&self.topic_prefix, &self.lattice, host_id);
575 debug!(%subject, "removing label");
576 let bytes = json_serialize(HostLabelIdentifier {
577 key: key.to_string(),
578 })?;
579 match self.request_timeout(subject, bytes, self.timeout).await {
580 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
581 Err(e) => Err(format!("Did not receive remove label acknowledgement: {e}").into()),
582 }
583 }
584
585 #[instrument(level = "debug", skip_all)]
603 pub async fn update_component(
604 &self,
605 host_id: &str,
606 existing_component_id: &str,
607 new_component_ref: &str,
608 annotations: Option<BTreeMap<String, String>>,
609 ) -> Result<CtlResponse<()>> {
610 let host_id = IdentifierKind::is_host_id(host_id)?;
611 let subject = broker::v1::commands::update_component(
612 &self.topic_prefix,
613 &self.lattice,
614 host_id.as_str(),
615 );
616 debug!("update_component:request {}", &subject);
617 let bytes = json_serialize(UpdateComponentCommand {
618 host_id,
619 component_id: IdentifierKind::is_component_id(existing_component_id)?,
620 new_component_ref: IdentifierKind::is_component_ref(new_component_ref)?,
621 annotations,
622 })?;
623 match self.request_timeout(subject, bytes, self.timeout).await {
624 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
625 Err(e) => Err(format!("Did not receive update component acknowledgement: {e}").into()),
626 }
627 }
628
629 #[instrument(level = "debug", skip_all)]
650 pub async fn start_provider(
651 &self,
652 host_id: &str,
653 provider_ref: &str,
654 provider_id: &str,
655 annotations: Option<BTreeMap<String, String>>,
656 provider_configuration: Vec<String>,
657 ) -> Result<CtlResponse<()>> {
658 let host_id = IdentifierKind::is_host_id(host_id)?;
659 let subject = broker::v1::commands::start_provider(
660 &self.topic_prefix,
661 &self.lattice,
662 host_id.as_str(),
663 );
664 debug!("start_provider:request {}", &subject);
665 let mut cmd = StartProviderCommand::builder()
666 .host_id(&host_id)
667 .provider_ref(&IdentifierKind::is_provider_ref(provider_ref)?)
668 .provider_id(&IdentifierKind::is_component_id(provider_id)?);
669 if let Some(annotations) = annotations {
670 cmd = cmd.annotations(annotations);
671 }
672 let cmd = cmd.config(provider_configuration).build()?;
673 let bytes = json_serialize(cmd)?;
674
675 match self.request_timeout(subject, bytes, self.timeout).await {
676 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
677 Err(e) => Err(format!("Did not receive start provider acknowledgement: {e}").into()),
678 }
679 }
680
681 #[instrument(level = "debug", skip_all)]
694 pub async fn stop_provider(&self, host_id: &str, provider_id: &str) -> Result<CtlResponse<()>> {
695 let host_id = IdentifierKind::is_host_id(host_id)?;
696
697 let subject = broker::v1::commands::stop_provider(
698 &self.topic_prefix,
699 &self.lattice,
700 host_id.as_str(),
701 );
702 debug!("stop_provider:request {}", &subject);
703 let bytes = json_serialize(StopProviderCommand {
704 host_id,
705 provider_id: IdentifierKind::is_component_id(provider_id)?,
706 })?;
707
708 match self.request_timeout(subject, bytes, self.timeout).await {
709 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
710 Err(e) => Err(format!("Did not receive stop provider acknowledgement: {e}").into()),
711 }
712 }
713
714 #[instrument(level = "debug", skip_all)]
727 pub async fn stop_host(
728 &self,
729 host_id: &str,
730 timeout_ms: Option<u64>,
731 ) -> Result<CtlResponse<()>> {
732 let host_id = IdentifierKind::is_host_id(host_id)?;
733 let subject =
734 broker::v1::commands::stop_host(&self.topic_prefix, &self.lattice, host_id.as_str());
735 debug!("stop_host:request {}", &subject);
736 let bytes = json_serialize(StopHostCommand {
737 host_id,
738 timeout: timeout_ms,
739 })?;
740
741 match self.request_timeout(subject, bytes, self.timeout).await {
742 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
743 Err(e) => Err(format!("Did not receive stop host acknowledgement: {e}").into()),
744 }
745 }
746
747 async fn publish_and_wait<D: DeserializeOwned>(
749 &self,
750 subject: String,
751 payload: Vec<u8>,
752 ) -> Result<Vec<D>> {
753 let reply = self.nc.new_inbox();
754 let sub = self.nc.subscribe(reply.clone()).await?;
755 self.nc
756 .publish_with_reply_and_headers(
757 subject.clone(),
758 reply,
759 otel::HeaderInjector::default_with_span().into(),
760 payload.into(),
761 )
762 .await?;
763 let nc = self.nc.clone();
764 tokio::spawn(async move {
765 if let Err(error) = nc.flush().await {
766 error!(%error, "flush after publish");
767 }
768 });
769 Ok(collect_sub_timeout::<D>(sub, self.auction_timeout, subject.as_str()).await)
770 }
771
772 #[allow(clippy::missing_errors_doc)] pub async fn events_receiver(&self, event_types: Vec<String>) -> Result<Receiver<Event>> {
802 let (sender, receiver) = tokio::sync::mpsc::channel(5000);
803 let futs = event_types.into_iter().map(|event_type| {
804 self.nc
805 .subscribe(format!("wasmbus.evt.{}.{}", self.lattice, event_type))
806 .map_err(|err| Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
807 });
808 let subs: Vec<Subscriber> = futures::future::join_all(futs)
809 .await
810 .into_iter()
811 .collect::<Result<_>>()?;
812 let mut stream = futures::stream::select_all(subs);
813 tokio::spawn(async move {
814 while let Some(msg) = stream.next().await {
815 let Ok(evt) = json_deserialize::<Event>(&msg.payload) else {
816 error!("Object received on event stream was not a CloudEvent");
817 continue;
818 };
819 trace!("received event: {:?}", evt);
820 let Ok(()) = sender.send(evt).await else {
821 break;
822 };
823 }
824 });
825 Ok(receiver)
826 }
827}
828
829pub(crate) async fn collect_sub_timeout<T: DeserializeOwned>(
831 mut sub: async_nats::Subscriber,
832 timeout: Duration,
833 reason: &str,
834) -> Vec<T> {
835 let mut items = Vec::new();
836 let sleep = tokio::time::sleep(timeout);
837 tokio::pin!(sleep);
838 loop {
839 tokio::select! {
840 msg = sub.next() => {
841 let Some(msg) = msg else {
842 break;
843 };
844 if msg.payload.is_empty() {
845 break;
846 }
847 match json_deserialize::<T>(&msg.payload) {
848 Ok(item) => items.push(item),
849 Err(error) => {
850 error!(%reason, %error,
851 "deserialization error in auction - results may be incomplete",
852 );
853 break;
854 }
855 }
856 },
857 () = &mut sleep => { break; }
858 }
859 }
860 items
861}
862
863#[cfg(test)]
864mod tests {
865 use super::*;
866
867 #[tokio::test]
871 #[ignore]
872 async fn test_events_receiver() {
873 let nc = async_nats::connect("127.0.0.1:4222").await.unwrap();
874 let client = ClientBuilder::new(nc)
875 .timeout(Duration::from_millis(1000))
876 .auction_timeout(Duration::from_millis(1000))
877 .build();
878 let mut receiver = client
879 .events_receiver(vec!["foobar".to_string()])
880 .await
881 .unwrap();
882 tokio::spawn(async move {
883 while let Some(evt) = receiver.recv().await {
884 println!("Event received: {evt:?}");
885 }
886 });
887 println!("Listening to Cloud Events for 120 seconds. Then we will quit.");
888 tokio::time::sleep(Duration::from_secs(120)).await;
889 }
890
891 #[test]
892 fn test_check_identifier() -> Result<()> {
893 assert!(IdentifierKind::is_host_id("").is_err());
894 assert!(IdentifierKind::is_host_id(" ").is_err());
895 let host_id = IdentifierKind::is_host_id(" ");
896 assert!(host_id.is_err(), "parsing host id should have failed");
897 assert!(host_id
898 .unwrap_err()
899 .to_string()
900 .contains("Host ID cannot be empty"));
901 let provider_ref = IdentifierKind::is_provider_ref("");
902 assert!(
903 provider_ref.is_err(),
904 "parsing provider ref should have failed"
905 );
906 assert!(provider_ref
907 .unwrap_err()
908 .to_string()
909 .contains("Provider OCI reference cannot be empty"));
910 assert!(IdentifierKind::is_host_id("host_id").is_ok());
911 let component_id = IdentifierKind::is_component_id(" iambatman ")?;
912 assert_eq!(component_id, "iambatman");
913
914 Ok(())
915 }
916
917 #[tokio::test]
918 #[ignore]
919 async fn ctl_response_comprehensive() {
924 let client = Client::new(
925 async_nats::connect("127.0.0.1:4222")
926 .await
927 .expect("should be able to connect to local NATS"),
928 );
929 let hosts = client
931 .get_hosts()
932 .await
933 .expect("should be able to fetch at least a host");
934 assert_eq!(hosts.len(), 1);
935 let host = hosts.first().expect("one host to exist");
936 assert!(host.success);
937 assert!(host.message.is_empty());
938 assert!(host.response.is_some());
939 let host = host.response.as_ref().unwrap();
940 let auction_response = client
945 .perform_component_auction(
946 "ghcr.io/brooksmtownsend/http-hello-world-rust:0.1.0",
947 "echo",
948 BTreeMap::new(),
949 )
950 .await
951 .expect("should be able to auction an component");
952 assert_eq!(auction_response.len(), 1);
953 let first_ack = auction_response.first().expect("a single component ack");
954 let auction_ack = first_ack.response.as_ref().unwrap();
955 let (component_ref, component_id) = (&auction_ack.component_ref, &auction_ack.component_id);
956 let scale_response = client
958 .scale_component(
959 &host.id,
960 component_ref,
961 component_id,
962 1,
963 None,
964 Vec::with_capacity(0),
965 false,
966 )
967 .await
968 .expect("should be able to scale component");
969 assert!(scale_response.success);
970 assert!(scale_response.message.is_empty());
971 assert!(scale_response.response.is_none());
972 let update_component_resp = client
974 .update_component(
975 &host.id,
976 "nonexistantcomponentID",
977 "ghcr.io/wasmcloud/components/http-keyvalue-counter-rust:0.1.0",
978 None,
979 )
980 .await
981 .expect("should be able to issue update component request");
982 assert!(!update_component_resp.success);
983 assert_eq!(
984 update_component_resp.message,
985 "component not found".to_string()
986 );
987 assert_eq!(update_component_resp.response, None);
988
989 let provider_acks = client
994 .perform_provider_auction(
995 "ghcr.io/wasmcloud/http-server:0.26.0",
996 "httpserver",
997 BTreeMap::new(),
998 )
999 .await
1000 .expect("should be able to hold provider auction");
1001 assert_eq!(provider_acks.len(), 1);
1002 let provider_ack = provider_acks.first().expect("a single provider ack");
1003 assert!(provider_ack.success);
1004 assert!(provider_ack.message.is_empty());
1005 assert!(provider_ack.response.is_some());
1006 let auction_ack = provider_ack.response.as_ref().unwrap();
1007 let (provider_ref, provider_id) = (&auction_ack.provider_ref, &auction_ack.provider_id);
1008 let start_response = client
1010 .start_provider(&host.id, provider_ref, provider_id, None, vec![])
1011 .await
1012 .expect("should be able to start provider");
1013 assert!(start_response.success);
1014 assert!(start_response.message.is_empty());
1015 assert!(start_response.response.is_none());
1016 let stop_response = client
1018 .stop_provider(&host.id, "notarealproviderID")
1019 .await
1020 .expect("should be able to issue stop provider request");
1021 assert!(!stop_response.success);
1022 assert_eq!(
1023 stop_response.message,
1024 "provider with that ID is not running".to_string()
1025 );
1026 assert!(stop_response.response.is_none());
1027 tokio::time::sleep(Duration::from_secs(5)).await;
1031 let link_put = client
1033 .put_link(Link {
1034 source_id: "echo".to_string(),
1035 target: "httpserver".to_string(),
1036 name: "default".to_string(),
1037 wit_namespace: "wasi".to_string(),
1038 wit_package: "http".to_string(),
1039 interfaces: vec!["incoming-handler".to_string()],
1040 ..Default::default()
1041 })
1042 .await
1043 .expect("should be able to put link");
1044 assert!(link_put.success);
1045 assert!(link_put.message.is_empty());
1046 assert!(link_put.response.is_none());
1047 let links_get = client
1048 .get_links()
1049 .await
1050 .expect("should be able to get links");
1051 assert!(links_get.success);
1052 assert!(links_get.message.is_empty());
1053 assert!(links_get.response.is_some());
1054 let link_get = links_get.response.as_ref().unwrap().first().unwrap();
1056 assert_eq!(link_get.source_id, "echo");
1057 assert_eq!(link_get.target, "httpserver");
1058 assert_eq!(link_get.name, "default");
1059 assert_eq!(link_get.wit_namespace, "wasi");
1060 assert_eq!(link_get.wit_package, "http");
1061 let link_del = client
1063 .delete_link("echo", "default", "wasi", "http")
1064 .await
1065 .expect("should be able to delete link");
1066 assert!(link_del.success);
1067 assert!(link_del.message.is_empty());
1068 assert!(link_del.response.is_none());
1069
1070 let label_one = client
1075 .put_label(&host.id, "idk", "lol")
1076 .await
1077 .expect("should be able to put label");
1078 assert!(label_one.success);
1079 assert!(label_one.message.is_empty());
1080 assert!(label_one.response.is_none());
1081 let label_two = client
1082 .put_label(&host.id, "foo", "bar")
1083 .await
1084 .expect("should be able to put another label");
1085 assert!(label_two.success);
1086 assert!(label_two.message.is_empty());
1087 assert!(label_two.response.is_none());
1088 let del_label_one = client
1090 .delete_label(&host.id, "idk")
1091 .await
1092 .expect("should be able to delete label");
1093 assert!(del_label_one.success);
1094 assert!(del_label_one.message.is_empty());
1095 assert!(del_label_one.response.is_none());
1096 let registry_put = client
1101 .put_registries(HashMap::from_iter([(
1102 "mycloud.io".to_string(),
1103 RegistryCredential {
1104 username: Some("user".to_string()),
1105 password: Some("pass".to_string()),
1106 registry_type: "oci".to_string(),
1107 token: None,
1108 },
1109 )]))
1110 .await
1111 .expect("should be able to put registries");
1112 assert!(registry_put.success);
1113 assert!(registry_put.message.is_empty());
1114 assert!(registry_put.response.is_none());
1115
1116 let config_put = client
1121 .put_config(
1122 "test_config",
1123 HashMap::from_iter([("sup".to_string(), "hey".to_string())]),
1124 )
1125 .await
1126 .expect("should be able to put config");
1127 assert!(config_put.success);
1128 assert!(config_put.message.is_empty());
1129 assert!(config_put.response.is_none());
1130 let config_get = client
1132 .get_config("test_config")
1133 .await
1134 .expect("should be able to get config");
1135 assert!(config_get.success);
1136 assert!(config_get.message.is_empty());
1137 assert!(config_get
1138 .response
1139 .is_some_and(|r| r.get("sup").is_some_and(|s| s == "hey")));
1140 let config_del = client
1142 .delete_config("test_config")
1143 .await
1144 .expect("should be able to delete config");
1145 assert!(config_del.success);
1146 assert!(config_del.message.is_empty());
1147 assert!(config_del.response.is_none());
1148
1149 let inventory = client
1154 .get_host_inventory(&host.id)
1155 .await
1156 .expect("should be able to fetch at least a host");
1157 assert!(inventory.success);
1158 assert!(inventory.message.is_empty());
1159 assert!(inventory.response.is_some());
1160 let host_inventory = inventory.response.unwrap();
1161 assert!(host_inventory.components.iter().all(|a| a.id == "echo"));
1162 assert!(!host_inventory.labels.contains_key("idk"));
1163 assert!(host_inventory
1164 .labels
1165 .get("foo")
1166 .is_some_and(|f| f == &"bar".to_string()));
1167 let stop_host = client
1169 .stop_host(&host.id, Some(1234))
1170 .await
1171 .expect("should be able to stop host");
1172 assert!(stop_host.success);
1173 assert!(stop_host.message.is_empty());
1174 assert!(stop_host.response.is_none());
1175 }
1176}