1use core::fmt::{self, Debug};
4use core::time::Duration;
5
6use std::collections::{BTreeMap, HashMap};
7
8use async_nats::Subscriber;
9use cloudevents::event::Event;
10use futures::{StreamExt, TryFutureExt};
11use serde::de::DeserializeOwned;
12use tokio::sync::mpsc::Receiver;
13use tracing::{debug, error, instrument, trace};
14
15use crate::types::ctl::{
16 CtlResponse, ScaleComponentCommand, StartProviderCommand, StopHostCommand, StopProviderCommand,
17 UpdateComponentCommand,
18};
19use crate::types::host::{Host, HostInventory, HostLabel};
20use crate::types::link::Link;
21use crate::types::registry::RegistryCredential;
22use crate::types::rpc::{
23 ComponentAuctionAck, ComponentAuctionRequest, DeleteInterfaceLinkDefinitionRequest,
24 ProviderAuctionAck, ProviderAuctionRequest,
25};
26use crate::{
27 broker, json_deserialize, json_serialize, otel, HostLabelIdentifier, IdentifierKind, Result,
28};
29
30#[derive(Debug, Clone)]
33#[non_exhaustive]
34pub struct ClientBuilder {
35 nc: async_nats::Client,
36 topic_prefix: Option<String>,
37 lattice: String,
38 timeout: Duration,
39 auction_timeout: Duration,
40}
41
42impl ClientBuilder {
43 #[must_use]
46 pub fn new(nc: async_nats::Client) -> ClientBuilder {
47 ClientBuilder {
48 nc,
49 topic_prefix: None,
50 lattice: "default".to_string(),
51 timeout: Duration::from_secs(2),
52 auction_timeout: Duration::from_secs(5),
53 }
54 }
55
56 #[must_use]
59 pub fn topic_prefix(self, prefix: impl Into<String>) -> ClientBuilder {
60 ClientBuilder {
61 topic_prefix: Some(prefix.into()),
62 ..self
63 }
64 }
65
66 #[must_use]
69 pub fn lattice(self, prefix: impl Into<String>) -> ClientBuilder {
70 ClientBuilder {
71 lattice: prefix.into(),
72 ..self
73 }
74 }
75
76 #[must_use]
79 pub fn timeout(self, timeout: Duration) -> ClientBuilder {
80 ClientBuilder { timeout, ..self }
81 }
82
83 #[must_use]
86 pub fn auction_timeout(self, timeout: Duration) -> ClientBuilder {
87 ClientBuilder {
88 auction_timeout: timeout,
89 ..self
90 }
91 }
92
93 #[must_use]
95 pub fn build(self) -> Client {
96 Client {
97 nc: self.nc,
98 topic_prefix: self.topic_prefix,
99 lattice: self.lattice,
100 timeout: self.timeout,
101 auction_timeout: self.auction_timeout,
102 }
103 }
104}
105
106#[derive(Clone)]
108#[non_exhaustive]
109pub struct Client {
110 nc: async_nats::Client,
112 topic_prefix: Option<String>,
114 lattice: String,
116 timeout: Duration,
118 auction_timeout: Duration,
120}
121
122impl Debug for Client {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 f.debug_struct("Client")
125 .field("topic_prefix", &self.topic_prefix)
126 .field("lattice", &self.lattice)
127 .field("timeout", &self.timeout)
128 .field("auction_timeout", &self.auction_timeout)
129 .finish_non_exhaustive()
130 }
131}
132
133impl Client {
134 #[must_use]
137 pub fn new(nc: async_nats::Client) -> Client {
138 ClientBuilder::new(nc).build()
139 }
140
141 #[allow(unused)]
143 #[must_use]
144 pub fn nats_client(&self) -> async_nats::Client {
145 self.nc.clone()
146 }
147
148 pub fn lattice(&self) -> &str {
150 self.lattice.as_ref()
151 }
152
153 #[instrument(level = "debug", skip_all)]
155 pub(crate) async fn request_timeout(
156 &self,
157 subject: String,
158 payload: Vec<u8>,
159 timeout: Duration,
160 ) -> Result<async_nats::Message> {
161 match tokio::time::timeout(
162 timeout,
163 self.nc.request_with_headers(
164 subject,
165 otel::HeaderInjector::default_with_span().into(),
166 payload.into(),
167 ),
168 )
169 .await
170 {
171 Err(_) => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timed out").into()),
172 Ok(Ok(message)) => Ok(message),
173 Ok(Err(e)) => Err(e.into()),
174 }
175 }
176
177 #[instrument(level = "debug", skip_all)]
180 pub async fn get_hosts(&self) -> Result<Vec<CtlResponse<Host>>> {
181 let subject = broker::v1::queries::hosts(&self.topic_prefix, &self.lattice);
182 debug!("get_hosts:publish {}", &subject);
183 self.publish_and_wait(subject, Vec::new()).await
184 }
185
186 #[instrument(level = "debug", skip_all)]
188 pub async fn get_host_inventory(&self, host_id: &str) -> Result<CtlResponse<HostInventory>> {
189 let subject = broker::v1::queries::host_inventory(
190 &self.topic_prefix,
191 &self.lattice,
192 IdentifierKind::is_host_id(host_id)?.as_str(),
193 );
194 debug!("get_host_inventory:request {}", &subject);
195 match self.request_timeout(subject, vec![], self.timeout).await {
196 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
197 Err(e) => Err(format!("Did not receive host inventory from target host: {e}").into()),
198 }
199 }
200
201 #[instrument(level = "debug", skip_all)]
203 pub async fn get_claims(&self) -> Result<CtlResponse<Vec<HashMap<String, String>>>> {
204 let subject = broker::v1::queries::claims(&self.topic_prefix, &self.lattice);
205 debug!("get_claims:request {}", &subject);
206 match self.request_timeout(subject, vec![], self.timeout).await {
207 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
208 Err(e) => Err(format!("Did not receive claims from lattice: {e}").into()),
209 }
210 }
211
212 #[instrument(level = "debug", skip_all)]
218 pub async fn perform_component_auction(
219 &self,
220 component_ref: &str,
221 component_id: &str,
222 constraints: impl Into<BTreeMap<String, String>>,
223 ) -> Result<Vec<CtlResponse<ComponentAuctionAck>>> {
224 let subject = broker::v1::component_auction_subject(&self.topic_prefix, &self.lattice);
225 let bytes = json_serialize(
226 ComponentAuctionRequest::builder()
227 .component_ref(IdentifierKind::is_component_ref(component_ref)?)
228 .component_id(IdentifierKind::is_component_id(component_id)?)
229 .constraints(constraints.into())
230 .build()?,
231 )?;
232 debug!("component_auction:publish {}", &subject);
233 self.publish_and_wait(subject, bytes).await
234 }
235
236 #[instrument(level = "debug", skip_all)]
252 pub async fn perform_provider_auction(
253 &self,
254 provider_ref: &str,
255 provider_id: &str,
256 constraints: impl Into<BTreeMap<String, String>>,
257 ) -> Result<Vec<CtlResponse<ProviderAuctionAck>>> {
258 let subject = broker::v1::provider_auction_subject(&self.topic_prefix, &self.lattice);
259 let bytes = json_serialize(
260 ProviderAuctionRequest::builder()
261 .provider_ref(IdentifierKind::is_provider_ref(provider_ref)?)
262 .provider_id(IdentifierKind::is_provider_id(provider_id)?)
263 .constraints(constraints.into())
264 .build()?,
265 )?;
266 debug!("provider_auction:publish {}", &subject);
267 self.publish_and_wait(subject, bytes).await
268 }
269
270 #[instrument(level = "debug", skip_all)]
292 pub async fn scale_component(
293 &self,
294 host_id: &str,
295 component_ref: &str,
296 component_id: &str,
297 max_instances: u32,
298 annotations: Option<BTreeMap<String, String>>,
299 config: Vec<String>,
300 ) -> Result<CtlResponse<()>> {
301 let host_id = IdentifierKind::is_host_id(host_id)?;
302 let subject = broker::v1::commands::scale_component(
303 &self.topic_prefix,
304 &self.lattice,
305 host_id.as_str(),
306 );
307 debug!("scale_component:request {}", &subject);
308 let bytes = json_serialize(ScaleComponentCommand {
309 max_instances,
310 component_ref: IdentifierKind::is_component_ref(component_ref)?,
311 component_id: IdentifierKind::is_component_id(component_id)?,
312 host_id,
313 annotations,
314 config,
315 ..Default::default()
316 })?;
317 match self.request_timeout(subject, bytes, self.timeout).await {
318 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
319 Err(e) => Err(format!("Did not receive scale component acknowledgement: {e}").into()),
320 }
321 }
322
323 #[instrument(level = "debug", skip_all)]
335 pub async fn put_registries(
336 &self,
337 registries: HashMap<String, RegistryCredential>,
338 ) -> Result<CtlResponse<()>> {
339 let subject = broker::v1::publish_registries(&self.topic_prefix, &self.lattice);
340 debug!("put_registries:publish {}", &subject);
341 let bytes = json_serialize(®istries)?;
342 let resp = self
343 .nc
344 .publish_with_headers(
345 subject,
346 otel::HeaderInjector::default_with_span().into(),
347 bytes.into(),
348 )
349 .await;
350 if let Err(e) = resp {
351 Err(format!("Failed to push registry credential map: {e}").into())
352 } else {
353 Ok(CtlResponse::<()>::success(
354 "successfully added registries".into(),
355 ))
356 }
357 }
358
359 #[instrument(level = "debug", skip_all)]
365 pub async fn put_link(&self, link: Link) -> Result<CtlResponse<()>> {
366 IdentifierKind::is_component_id(&link.source_id)?;
368 IdentifierKind::is_component_id(&link.target)?;
369 IdentifierKind::is_link_name(&link.name)?;
370
371 let subject = broker::v1::put_link(&self.topic_prefix, &self.lattice);
372 debug!("put_link:request {}", &subject);
373
374 let bytes = crate::json_serialize(link)?;
375 match self.request_timeout(subject, bytes, self.timeout).await {
376 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
377 Err(e) => Err(format!("Did not receive put link acknowledgement: {e}").into()),
378 }
379 }
380
381 #[instrument(level = "debug", skip_all)]
389 pub async fn delete_link(
390 &self,
391 source_id: &str,
392 link_name: &str,
393 wit_namespace: &str,
394 wit_package: &str,
395 ) -> Result<CtlResponse<()>> {
396 let subject = broker::v1::delete_link(&self.topic_prefix, &self.lattice);
397 let ld = DeleteInterfaceLinkDefinitionRequest::from_source_and_link_metadata(
398 &IdentifierKind::is_component_id(source_id)?,
399 &IdentifierKind::is_link_name(link_name)?,
400 wit_namespace,
401 wit_package,
402 );
403 let bytes = crate::json_serialize(&ld)?;
404 match self.request_timeout(subject, bytes, self.timeout).await {
405 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
406 Err(e) => Err(format!("Did not receive delete link acknowledgement: {e}").into()),
407 }
408 }
409
410 #[instrument(level = "debug", skip_all)]
416 pub async fn get_links(&self) -> Result<CtlResponse<Vec<Link>>> {
417 let subject = broker::v1::queries::link_definitions(&self.topic_prefix, &self.lattice);
418 debug!("get_links:request {}", &subject);
419 match self.request_timeout(subject, vec![], self.timeout).await {
420 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
421 Err(e) => Err(format!("Did not receive a response to get links: {e}").into()),
422 }
423 }
424
425 #[instrument(level = "debug", skip_all)]
435 pub async fn put_config(
436 &self,
437 config_name: &str,
438 config: impl Into<HashMap<String, String>>,
439 ) -> Result<CtlResponse<()>> {
440 let subject = broker::v1::put_config(&self.topic_prefix, &self.lattice, config_name);
441 debug!(%subject, %config_name, "Putting config");
442 let data = serde_json::to_vec(&config.into())?;
443 match self.request_timeout(subject, data, self.timeout).await {
444 Ok(msg) => json_deserialize(&msg.payload),
445 Err(e) => Err(format!("Did not receive a response to put config request: {e}").into()),
446 }
447 }
448
449 #[instrument(level = "debug", skip_all)]
458 pub async fn delete_config(&self, config_name: &str) -> Result<CtlResponse<()>> {
459 let subject = broker::v1::delete_config(&self.topic_prefix, &self.lattice, config_name);
460 debug!(%subject, %config_name, "Delete config");
461 match self
462 .request_timeout(subject, Vec::default(), self.timeout)
463 .await
464 {
465 Ok(msg) => json_deserialize(&msg.payload),
466 Err(e) => {
467 Err(format!("Did not receive a response to delete config request: {e}").into())
468 }
469 }
470 }
471
472 #[instrument(level = "debug", skip_all)]
514 pub async fn get_config(
515 &self,
516 config_name: &str,
517 ) -> Result<CtlResponse<HashMap<String, String>>> {
518 let subject = broker::v1::queries::config(&self.topic_prefix, &self.lattice, config_name);
519 debug!(%subject, %config_name, "Getting config");
520 match self
521 .request_timeout(subject, Vec::default(), self.timeout)
522 .await
523 {
524 Ok(msg) => json_deserialize(&msg.payload),
525 Err(e) => Err(format!("Did not receive a response to get config request: {e}").into()),
526 }
527 }
528
529 pub async fn put_label(
542 &self,
543 host_id: &str,
544 key: &str,
545 value: &str,
546 ) -> Result<CtlResponse<()>> {
547 let subject = broker::v1::put_label(&self.topic_prefix, &self.lattice, host_id);
548 debug!(%subject, "putting label");
549 let bytes = json_serialize(HostLabel {
550 key: key.to_string(),
551 value: value.to_string(),
552 })?;
553 match self.request_timeout(subject, bytes, self.timeout).await {
554 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
555 Err(e) => Err(format!("Did not receive put label acknowledgement: {e}").into()),
556 }
557 }
558
559 pub async fn delete_label(&self, host_id: &str, key: &str) -> Result<CtlResponse<()>> {
571 let subject = broker::v1::delete_label(&self.topic_prefix, &self.lattice, host_id);
572 debug!(%subject, "removing label");
573 let bytes = json_serialize(HostLabelIdentifier {
574 key: key.to_string(),
575 })?;
576 match self.request_timeout(subject, bytes, self.timeout).await {
577 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
578 Err(e) => Err(format!("Did not receive remove label acknowledgement: {e}").into()),
579 }
580 }
581
582 #[instrument(level = "debug", skip_all)]
600 pub async fn update_component(
601 &self,
602 host_id: &str,
603 existing_component_id: &str,
604 new_component_ref: &str,
605 annotations: Option<BTreeMap<String, String>>,
606 ) -> Result<CtlResponse<()>> {
607 let host_id = IdentifierKind::is_host_id(host_id)?;
608 let subject = broker::v1::commands::update_component(
609 &self.topic_prefix,
610 &self.lattice,
611 host_id.as_str(),
612 );
613 debug!("update_component:request {}", &subject);
614 let bytes = json_serialize(UpdateComponentCommand {
615 host_id,
616 component_id: IdentifierKind::is_component_id(existing_component_id)?,
617 new_component_ref: IdentifierKind::is_component_ref(new_component_ref)?,
618 annotations,
619 })?;
620 match self.request_timeout(subject, bytes, self.timeout).await {
621 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
622 Err(e) => Err(format!("Did not receive update component acknowledgement: {e}").into()),
623 }
624 }
625
626 #[instrument(level = "debug", skip_all)]
647 pub async fn start_provider(
648 &self,
649 host_id: &str,
650 provider_ref: &str,
651 provider_id: &str,
652 annotations: Option<BTreeMap<String, String>>,
653 provider_configuration: Vec<String>,
654 ) -> Result<CtlResponse<()>> {
655 let host_id = IdentifierKind::is_host_id(host_id)?;
656 let subject = broker::v1::commands::start_provider(
657 &self.topic_prefix,
658 &self.lattice,
659 host_id.as_str(),
660 );
661 debug!("start_provider:request {}", &subject);
662 let mut cmd = StartProviderCommand::builder()
663 .host_id(&host_id)
664 .provider_ref(&IdentifierKind::is_provider_ref(provider_ref)?)
665 .provider_id(&IdentifierKind::is_component_id(provider_id)?);
666 if let Some(annotations) = annotations {
667 cmd = cmd.annotations(annotations);
668 }
669 let cmd = cmd.config(provider_configuration).build()?;
670 let bytes = json_serialize(cmd)?;
671
672 match self.request_timeout(subject, bytes, self.timeout).await {
673 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
674 Err(e) => Err(format!("Did not receive start provider acknowledgement: {e}").into()),
675 }
676 }
677
678 #[instrument(level = "debug", skip_all)]
691 pub async fn stop_provider(&self, host_id: &str, provider_id: &str) -> Result<CtlResponse<()>> {
692 let host_id = IdentifierKind::is_host_id(host_id)?;
693
694 let subject = broker::v1::commands::stop_provider(
695 &self.topic_prefix,
696 &self.lattice,
697 host_id.as_str(),
698 );
699 debug!("stop_provider:request {}", &subject);
700 let bytes = json_serialize(StopProviderCommand {
701 host_id,
702 provider_id: IdentifierKind::is_component_id(provider_id)?,
703 })?;
704
705 match self.request_timeout(subject, bytes, self.timeout).await {
706 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
707 Err(e) => Err(format!("Did not receive stop provider acknowledgement: {e}").into()),
708 }
709 }
710
711 #[instrument(level = "debug", skip_all)]
724 pub async fn stop_host(
725 &self,
726 host_id: &str,
727 timeout_ms: Option<u64>,
728 ) -> Result<CtlResponse<()>> {
729 let host_id = IdentifierKind::is_host_id(host_id)?;
730 let subject =
731 broker::v1::commands::stop_host(&self.topic_prefix, &self.lattice, host_id.as_str());
732 debug!("stop_host:request {}", &subject);
733 let bytes = json_serialize(StopHostCommand {
734 host_id,
735 timeout: timeout_ms,
736 })?;
737
738 match self.request_timeout(subject, bytes, self.timeout).await {
739 Ok(msg) => Ok(json_deserialize(&msg.payload)?),
740 Err(e) => Err(format!("Did not receive stop host acknowledgement: {e}").into()),
741 }
742 }
743
744 async fn publish_and_wait<D: DeserializeOwned>(
746 &self,
747 subject: String,
748 payload: Vec<u8>,
749 ) -> Result<Vec<D>> {
750 let reply = self.nc.new_inbox();
751 let sub = self.nc.subscribe(reply.clone()).await?;
752 self.nc
753 .publish_with_reply_and_headers(
754 subject.clone(),
755 reply,
756 otel::HeaderInjector::default_with_span().into(),
757 payload.into(),
758 )
759 .await?;
760 let nc = self.nc.clone();
761 tokio::spawn(async move {
762 if let Err(error) = nc.flush().await {
763 error!(%error, "flush after publish");
764 }
765 });
766 Ok(collect_sub_timeout::<D>(sub, self.auction_timeout, subject.as_str()).await)
767 }
768
769 #[allow(clippy::missing_errors_doc)] pub async fn events_receiver(&self, event_types: Vec<String>) -> Result<Receiver<Event>> {
799 let (sender, receiver) = tokio::sync::mpsc::channel(5000);
800 let futs = event_types.into_iter().map(|event_type| {
801 self.nc
802 .subscribe(format!("wasmbus.evt.{}.{}", self.lattice, event_type))
803 .map_err(|err| Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
804 });
805 let subs: Vec<Subscriber> = futures::future::join_all(futs)
806 .await
807 .into_iter()
808 .collect::<Result<_>>()?;
809 let mut stream = futures::stream::select_all(subs);
810 tokio::spawn(async move {
811 while let Some(msg) = stream.next().await {
812 let Ok(evt) = json_deserialize::<Event>(&msg.payload) else {
813 error!("Object received on event stream was not a CloudEvent");
814 continue;
815 };
816 trace!("received event: {:?}", evt);
817 let Ok(()) = sender.send(evt).await else {
818 break;
819 };
820 }
821 });
822 Ok(receiver)
823 }
824}
825
826pub(crate) async fn collect_sub_timeout<T: DeserializeOwned>(
828 mut sub: async_nats::Subscriber,
829 timeout: Duration,
830 reason: &str,
831) -> Vec<T> {
832 let mut items = Vec::new();
833 let sleep = tokio::time::sleep(timeout);
834 tokio::pin!(sleep);
835 loop {
836 tokio::select! {
837 msg = sub.next() => {
838 let Some(msg) = msg else {
839 break;
840 };
841 if msg.payload.is_empty() {
842 break;
843 }
844 match json_deserialize::<T>(&msg.payload) {
845 Ok(item) => items.push(item),
846 Err(error) => {
847 error!(%reason, %error,
848 "deserialization error in auction - results may be incomplete",
849 );
850 break;
851 }
852 }
853 },
854 () = &mut sleep => { break; }
855 }
856 }
857 items
858}
859
860#[cfg(test)]
861mod tests {
862 use super::*;
863
864 #[tokio::test]
868 #[ignore]
869 async fn test_events_receiver() {
870 let nc = async_nats::connect("127.0.0.1:4222").await.unwrap();
871 let client = ClientBuilder::new(nc)
872 .timeout(Duration::from_millis(1000))
873 .auction_timeout(Duration::from_millis(1000))
874 .build();
875 let mut receiver = client
876 .events_receiver(vec!["foobar".to_string()])
877 .await
878 .unwrap();
879 tokio::spawn(async move {
880 while let Some(evt) = receiver.recv().await {
881 println!("Event received: {evt:?}");
882 }
883 });
884 println!("Listening to Cloud Events for 120 seconds. Then we will quit.");
885 tokio::time::sleep(Duration::from_secs(120)).await;
886 }
887
888 #[test]
889 fn test_check_identifier() -> Result<()> {
890 assert!(IdentifierKind::is_host_id("").is_err());
891 assert!(IdentifierKind::is_host_id(" ").is_err());
892 let host_id = IdentifierKind::is_host_id(" ");
893 assert!(host_id.is_err(), "parsing host id should have failed");
894 assert!(host_id
895 .unwrap_err()
896 .to_string()
897 .contains("Host ID cannot be empty"));
898 let provider_ref = IdentifierKind::is_provider_ref("");
899 assert!(
900 provider_ref.is_err(),
901 "parsing provider ref should have failed"
902 );
903 assert!(provider_ref
904 .unwrap_err()
905 .to_string()
906 .contains("Provider OCI reference cannot be empty"));
907 assert!(IdentifierKind::is_host_id("host_id").is_ok());
908 let component_id = IdentifierKind::is_component_id(" iambatman ")?;
909 assert_eq!(component_id, "iambatman");
910
911 Ok(())
912 }
913
914 #[tokio::test]
915 #[ignore]
916 async fn ctl_response_comprehensive() {
921 let client = Client::new(
922 async_nats::connect("127.0.0.1:4222")
923 .await
924 .expect("should be able to connect to local NATS"),
925 );
926 let hosts = client
928 .get_hosts()
929 .await
930 .expect("should be able to fetch at least a host");
931 assert_eq!(hosts.len(), 1);
932 let host = hosts.first().expect("one host to exist");
933 assert!(host.success);
934 assert!(host.message.is_empty());
935 assert!(host.response.is_some());
936 let host = host.response.as_ref().unwrap();
937 let auction_response = client
942 .perform_component_auction(
943 "ghcr.io/brooksmtownsend/http-hello-world-rust:0.1.0",
944 "echo",
945 BTreeMap::new(),
946 )
947 .await
948 .expect("should be able to auction an component");
949 assert_eq!(auction_response.len(), 1);
950 let first_ack = auction_response.first().expect("a single component ack");
951 let auction_ack = first_ack.response.as_ref().unwrap();
952 let (component_ref, component_id) = (&auction_ack.component_ref, &auction_ack.component_id);
953 let scale_response = client
955 .scale_component(
956 &host.id,
957 component_ref,
958 component_id,
959 1,
960 None,
961 Vec::with_capacity(0),
962 )
963 .await
964 .expect("should be able to scale component");
965 assert!(scale_response.success);
966 assert!(scale_response.message.is_empty());
967 assert!(scale_response.response.is_none());
968 let update_component_resp = client
970 .update_component(
971 &host.id,
972 "nonexistantcomponentID",
973 "wasmcloud.azurecr.io/kvcounter:0.4.0",
974 None,
975 )
976 .await
977 .expect("should be able to issue update component request");
978 assert!(!update_component_resp.success);
979 assert_eq!(
980 update_component_resp.message,
981 "component not found".to_string()
982 );
983 assert_eq!(update_component_resp.response, None);
984
985 let provider_acks = client
990 .perform_provider_auction(
991 "wasmcloud.azurecr.io/httpserver:0.19.1",
992 "httpserver",
993 BTreeMap::new(),
994 )
995 .await
996 .expect("should be able to hold provider auction");
997 assert_eq!(provider_acks.len(), 1);
998 let provider_ack = provider_acks.first().expect("a single provider ack");
999 assert!(provider_ack.success);
1000 assert!(provider_ack.message.is_empty());
1001 assert!(provider_ack.response.is_some());
1002 let auction_ack = provider_ack.response.as_ref().unwrap();
1003 let (provider_ref, provider_id) = (&auction_ack.provider_ref, &auction_ack.provider_id);
1004 let start_response = client
1006 .start_provider(&host.id, provider_ref, provider_id, None, vec![])
1007 .await
1008 .expect("should be able to start provider");
1009 assert!(start_response.success);
1010 assert!(start_response.message.is_empty());
1011 assert!(start_response.response.is_none());
1012 let stop_response = client
1014 .stop_provider(&host.id, "notarealproviderID")
1015 .await
1016 .expect("should be able to issue stop provider request");
1017 assert!(!stop_response.success);
1018 assert_eq!(
1019 stop_response.message,
1020 "provider with that ID is not running".to_string()
1021 );
1022 assert!(stop_response.response.is_none());
1023 tokio::time::sleep(Duration::from_secs(5)).await;
1027 let link_put = client
1029 .put_link(Link {
1030 source_id: "echo".to_string(),
1031 target: "httpserver".to_string(),
1032 name: "default".to_string(),
1033 wit_namespace: "wasi".to_string(),
1034 wit_package: "http".to_string(),
1035 interfaces: vec!["incoming-handler".to_string()],
1036 ..Default::default()
1037 })
1038 .await
1039 .expect("should be able to put link");
1040 assert!(link_put.success);
1041 assert!(link_put.message.is_empty());
1042 assert!(link_put.response.is_none());
1043 let links_get = client
1044 .get_links()
1045 .await
1046 .expect("should be able to get links");
1047 assert!(links_get.success);
1048 assert!(links_get.message.is_empty());
1049 assert!(links_get.response.is_some());
1050 let link_get = links_get.response.as_ref().unwrap().first().unwrap();
1052 assert_eq!(link_get.source_id, "echo");
1053 assert_eq!(link_get.target, "httpserver");
1054 assert_eq!(link_get.name, "default");
1055 assert_eq!(link_get.wit_namespace, "wasi");
1056 assert_eq!(link_get.wit_package, "http");
1057 let link_del = client
1059 .delete_link("echo", "default", "wasi", "http")
1060 .await
1061 .expect("should be able to delete link");
1062 assert!(link_del.success);
1063 assert!(link_del.message.is_empty());
1064 assert!(link_del.response.is_none());
1065
1066 let label_one = client
1071 .put_label(&host.id, "idk", "lol")
1072 .await
1073 .expect("should be able to put label");
1074 assert!(label_one.success);
1075 assert!(label_one.message.is_empty());
1076 assert!(label_one.response.is_none());
1077 let label_two = client
1078 .put_label(&host.id, "foo", "bar")
1079 .await
1080 .expect("should be able to put another label");
1081 assert!(label_two.success);
1082 assert!(label_two.message.is_empty());
1083 assert!(label_two.response.is_none());
1084 let del_label_one = client
1086 .delete_label(&host.id, "idk")
1087 .await
1088 .expect("should be able to delete label");
1089 assert!(del_label_one.success);
1090 assert!(del_label_one.message.is_empty());
1091 assert!(del_label_one.response.is_none());
1092 let registry_put = client
1097 .put_registries(HashMap::from_iter([(
1098 "mycloud.io".to_string(),
1099 RegistryCredential {
1100 username: Some("user".to_string()),
1101 password: Some("pass".to_string()),
1102 registry_type: "oci".to_string(),
1103 token: None,
1104 },
1105 )]))
1106 .await
1107 .expect("should be able to put registries");
1108 assert!(registry_put.success);
1109 assert!(registry_put.message.is_empty());
1110 assert!(registry_put.response.is_none());
1111
1112 let config_put = client
1117 .put_config(
1118 "test_config",
1119 HashMap::from_iter([("sup".to_string(), "hey".to_string())]),
1120 )
1121 .await
1122 .expect("should be able to put config");
1123 assert!(config_put.success);
1124 assert!(config_put.message.is_empty());
1125 assert!(config_put.response.is_none());
1126 let config_get = client
1128 .get_config("test_config")
1129 .await
1130 .expect("should be able to get config");
1131 assert!(config_get.success);
1132 assert!(config_get.message.is_empty());
1133 assert!(config_get
1134 .response
1135 .is_some_and(|r| r.get("sup").is_some_and(|s| s == "hey")));
1136 let config_del = client
1138 .delete_config("test_config")
1139 .await
1140 .expect("should be able to delete config");
1141 assert!(config_del.success);
1142 assert!(config_del.message.is_empty());
1143 assert!(config_del.response.is_none());
1144
1145 let inventory = client
1150 .get_host_inventory(&host.id)
1151 .await
1152 .expect("should be able to fetch at least a host");
1153 assert!(inventory.success);
1154 assert!(inventory.message.is_empty());
1155 assert!(inventory.response.is_some());
1156 let host_inventory = inventory.response.unwrap();
1157 assert!(host_inventory.components.iter().all(|a| a.id == "echo"));
1158 assert!(!host_inventory.labels.contains_key("idk"));
1159 assert!(host_inventory
1160 .labels
1161 .get("foo")
1162 .is_some_and(|f| f == &"bar".to_string()));
1163 let stop_host = client
1165 .stop_host(&host.id, Some(1234))
1166 .await
1167 .expect("should be able to stop host");
1168 assert!(stop_host.success);
1169 assert!(stop_host.message.is_empty());
1170 assert!(stop_host.response.is_none());
1171 }
1172}