1use std::collections::BTreeMap;
2use std::collections::{hash_map::Entry, HashMap, HashSet};
3
4use anyhow::Result;
5use tracing::{debug, instrument, trace, warn};
6use wadm_types::api::{ScalerStatus, Status, StatusInfo};
7use wasmcloud_control_interface::{ComponentDescription, ProviderDescription};
8
9use crate::commands::Command;
10use crate::consumers::{
11 manager::{WorkError, WorkResult, Worker},
12 ScopedMessage,
13};
14use crate::events::*;
15use crate::publisher::Publisher;
16use crate::scaler::manager::{ScalerList, ScalerManager};
17use crate::storage::{Component, Host, Provider, ProviderStatus, Store, WadmComponentInfo};
18use crate::APP_SPEC_ANNOTATION;
19
20use super::event_helpers::*;
21
22pub struct EventWorker<StateStore, C: Clone, P: Clone> {
23 store: StateStore,
24 ctl_client: C,
25 command_publisher: CommandPublisher<P>,
26 status_publisher: StatusPublisher<P>,
27 scalers: ScalerManager<StateStore, P, C>,
28}
29
30impl<StateStore, C, P> EventWorker<StateStore, C, P>
31where
32 StateStore: Store + Send + Sync + Clone + 'static,
33 C: ClaimsSource
34 + InventorySource
35 + LinkSource
36 + ConfigSource
37 + SecretSource
38 + Clone
39 + Send
40 + Sync
41 + 'static,
42 P: Publisher + Clone + Send + Sync + 'static,
43{
44 pub fn new(
46 store: StateStore,
47 ctl_client: C,
48 command_publisher: CommandPublisher<P>,
49 status_publisher: StatusPublisher<P>,
50 manager: ScalerManager<StateStore, P, C>,
51 ) -> EventWorker<StateStore, C, P> {
52 EventWorker {
53 store,
54 ctl_client,
55 command_publisher,
56 status_publisher,
57 scalers: manager,
58 }
59 }
60
61 #[instrument(level = "debug", skip(self, component), fields(component_id = %component.component_id, host_id = %component.host_id))]
68 async fn handle_component_scaled(
69 &self,
70 lattice_id: &str,
71 component: &ComponentScaled,
72 ) -> anyhow::Result<()> {
73 trace!("Scaling component in store");
74 debug!("Fetching current data for component");
75
76 let mut component_data = Component::from(component);
79 if let Some(mut current) = self
80 .store
81 .get::<Component>(lattice_id, &component.component_id)
82 .await?
83 {
84 trace!(component = ?current, "Found existing component data");
85
86 match current.instances.get_mut(&component.host_id) {
87 Some(current_instances) if component.max_instances == 0 => {
89 current_instances.remove(&component.annotations);
90 }
91 Some(current_instances) => {
93 current_instances.replace(WadmComponentInfo {
94 count: component.max_instances,
95 annotations: component.annotations.clone(),
96 });
97 }
98 None if component.max_instances == 0 => (),
101 None => {
103 current.instances.insert(
104 component.host_id.clone(),
105 HashSet::from([WadmComponentInfo {
106 count: component.max_instances,
107 annotations: component.annotations.clone(),
108 }]),
109 );
110 }
111 }
112
113 if current
115 .instances
116 .get(&component.host_id)
117 .is_some_and(|instances| instances.is_empty())
118 {
119 current.instances.remove(&component.host_id);
120 }
121
122 component_data.instances = current.instances;
124 };
125
126 if let Some(mut host) = self
128 .store
129 .get::<Host>(lattice_id, &component.host_id)
130 .await?
131 {
132 trace!(host = ?host, "Found existing host data");
133
134 if component.max_instances == 0 {
135 host.components.remove(&component.component_id);
136 } else {
137 host.components
138 .entry(component.component_id.clone())
139 .and_modify(|count| *count = component.max_instances)
140 .or_insert(component.max_instances);
141 }
142
143 self.store
144 .store(lattice_id, host.id.to_owned(), host)
145 .await?
146 }
147
148 if component_data.instances.is_empty() {
149 self.store
150 .delete::<Component>(lattice_id, &component.component_id)
151 .await
152 .map_err(anyhow::Error::from)
153 } else {
154 self.store
155 .store(lattice_id, component.component_id.clone(), component_data)
156 .await
157 .map_err(anyhow::Error::from)
158 }
159 }
160
161 #[instrument(level = "debug", skip(self, host), fields(host_id = %host.host_id))]
162 async fn handle_host_heartbeat(
163 &self,
164 lattice_id: &str,
165 host: &HostHeartbeat,
166 ) -> anyhow::Result<()> {
167 debug!("Updating store with current host heartbeat information");
168 let host_data = Host::from(host);
169 self.store
170 .store(lattice_id, host.host_id.clone(), host_data)
171 .await?;
172
173 self.heartbeat_provider_update(lattice_id, host, &host.providers)
176 .await?;
177
178 self.heartbeat_component_update(lattice_id, host, &host.components)
181 .await?;
182
183 Ok(())
184 }
185
186 #[instrument(level = "debug", skip(self, host), fields(host_id = %host.id))]
187 async fn handle_host_started(
188 &self,
189 lattice_id: &str,
190 host: &HostStarted,
191 ) -> anyhow::Result<()> {
192 debug!("Updating store with new host");
193 self.store
195 .store(lattice_id, host.id.clone(), Host::from(host))
196 .await
197 .map_err(anyhow::Error::from)
198 }
199
200 #[instrument(level = "debug", skip(self, host), fields(host_id = %host.id))]
201 async fn handle_host_stopped(
202 &self,
203 lattice_id: &str,
204 host: &HostStopped,
205 ) -> anyhow::Result<()> {
206 debug!("Handling host stopped event");
207 trace!("Fetching current host data");
211 let current: Host = match self.store.get(lattice_id, &host.id).await? {
212 Some(h) => h,
213 None => {
214 debug!("Got host stopped event for a host we didn't have in the store");
215 return Ok(());
216 }
217 };
218
219 trace!("Fetching components from store to remove stopped instances");
220 let all_components = self.store.list::<Component>(lattice_id).await?;
221
222 #[allow(clippy::type_complexity)]
223 let (components_to_update, components_to_delete): (
224 Vec<(String, Component)>,
225 Vec<(String, Component)>,
226 ) = all_components
227 .into_iter()
228 .filter_map(|(id, mut component)| {
229 if current.components.contains_key(&id) {
230 component.instances.remove(¤t.id);
231 Some((id, component))
232 } else {
233 None
234 }
235 })
236 .partition(|(_, component)| !component.instances.is_empty());
237 trace!("Storing updated components in store");
238 self.store
239 .store_many(lattice_id, components_to_update)
240 .await?;
241
242 trace!("Removing components with no more running instances");
243 self.store
244 .delete_many::<Component, _, _>(
245 lattice_id,
246 components_to_delete.into_iter().map(|(id, _)| id),
247 )
248 .await?;
249
250 trace!("Fetching providers from store to remove stopped instances");
251 let all_providers = self.store.list::<Provider>(lattice_id).await?;
252
253 #[allow(clippy::type_complexity)]
254 let (providers_to_update, providers_to_delete): (Vec<(String, Provider)>, Vec<(String, Provider)>) = current
255 .providers
256 .into_iter()
257 .filter_map(|info| {
258 let key = info.provider_id;
259 match all_providers.get(&key).cloned() {
263 Some(mut prov) => prov.hosts.remove(&host.id).map(|_| (key, prov)),
266 None => {
267 warn!(key = %key, "Didn't find provider in storage even though host said it existed");
268 None
269 }
270 }
271 })
272 .partition(|(_, provider)| !provider.hosts.is_empty());
273 trace!("Storing updated providers in store");
274 self.store
275 .store_many(lattice_id, providers_to_update)
276 .await?;
277
278 trace!("Removing providers with no more running instances");
279 self.store
280 .delete_many::<Provider, _, _>(
281 lattice_id,
282 providers_to_delete.into_iter().map(|(id, _)| id),
283 )
284 .await?;
285
286 debug!("Deleting host from store");
290 self.store
291 .delete::<Host>(lattice_id, &host.id)
292 .await
293 .map_err(anyhow::Error::from)
294 }
295
296 #[instrument(
297 level = "debug",
298 skip(self, provider),
299 fields(
300 provider_id = %provider.provider_id,
301 image_ref = %provider.image_ref,
302 )
303 )]
304 async fn handle_provider_started(
305 &self,
306 lattice_id: &str,
307 provider: &ProviderStarted,
308 ) -> anyhow::Result<()> {
309 debug!("Handling provider started event");
310 let id = &provider.provider_id;
311 trace!("Fetching current data from store");
312 let mut needs_host_update = false;
313 let provider_data = if let Some(mut current) =
314 self.store.get::<Provider>(lattice_id, id).await?
315 {
316 let mut prov = match current.hosts.entry(provider.host_id.clone()) {
318 Entry::Occupied(_) => {
319 trace!("Found host entry for the provider already in store. Will not update");
320 current
321 }
322 Entry::Vacant(entry) => {
323 entry.insert(ProviderStatus::default());
324 needs_host_update = true;
325 current
326 }
327 };
328 if prov.issuer.is_empty() || prov.reference.is_empty() {
331 let new_prov = Provider::from(provider);
332 prov.issuer = new_prov.issuer;
333 prov.reference = new_prov.reference;
334 }
335 prov
336 } else {
337 trace!("No current provider found in store");
338 let mut prov = Provider::from(provider);
339 prov.hosts = HashMap::from([(provider.host_id.clone(), ProviderStatus::default())]);
340 needs_host_update = true;
341 prov
342 };
343
344 if let (Some(mut host), true) = (
346 self.store
347 .get::<Host>(lattice_id, &provider.host_id)
348 .await?,
349 needs_host_update,
350 ) {
351 trace!(host = ?host, "Found existing host data");
352
353 host.providers.replace(ProviderInfo {
354 provider_id: id.to_owned(),
355 provider_ref: provider.image_ref.to_owned(),
356 annotations: provider.annotations.to_owned(),
357 });
358
359 self.store
360 .store(lattice_id, host.id.to_owned(), host)
361 .await?
362 }
363
364 debug!("Storing updated provider in store");
365 self.store
366 .store(lattice_id, id.to_owned(), provider_data)
367 .await
368 .map_err(anyhow::Error::from)
369 }
370
371 #[instrument(
372 level = "debug",
373 skip(self, provider),
374 fields(
375 provider_id = %provider.provider_id,
376 )
377 )]
378 async fn handle_provider_stopped(
379 &self,
380 lattice_id: &str,
381 provider: &ProviderStopped,
382 ) -> anyhow::Result<()> {
383 debug!("Handling provider stopped event");
384 let id = &provider.provider_id;
385 trace!("Fetching current data from store");
386
387 if let Some(mut host) = self
389 .store
390 .get::<Host>(lattice_id, &provider.host_id)
391 .await?
392 {
393 trace!(host = ?host, "Found existing host data");
394
395 host.providers.remove(&ProviderInfo {
396 provider_id: provider.provider_id.to_owned(),
397 provider_ref: "".to_string(),
399 annotations: BTreeMap::default(),
402 });
403
404 self.store
405 .store(lattice_id, host.id.to_owned(), host)
406 .await?
407 }
408
409 if let Some(mut current) = self.store.get::<Provider>(lattice_id, id).await? {
410 if current.hosts.remove(&provider.host_id).is_none() {
411 trace!(host_id = %provider.host_id, "Did not find host entry in provider");
412 return Ok(());
413 }
414 if current.hosts.is_empty() {
415 debug!("Provider is no longer running on any hosts. Removing from store");
416 self.store
417 .delete::<Provider>(lattice_id, id)
418 .await
419 .map_err(anyhow::Error::from)
420 } else {
421 debug!("Storing updated provider");
422 self.store
423 .store(lattice_id, id.to_owned(), current)
424 .await
425 .map_err(anyhow::Error::from)
426 }
427 } else {
428 trace!("No current provider found in store");
429 Ok(())
430 }
431 }
432
433 #[instrument(
434 level = "debug",
435 skip(self, provider),
436 fields(
437 provider_id = %provider.provider_id,
438 )
439 )]
440 async fn handle_provider_health_check(
441 &self,
442 lattice_id: &str,
443 provider: &ProviderHealthCheckInfo,
444 failed: Option<bool>,
445 ) -> anyhow::Result<()> {
446 debug!("Handling provider health check event");
447 trace!("Getting current provider");
448 let id = &provider.provider_id;
449 let host_id = &provider.host_id;
450 let mut current: Provider = match self.store.get(lattice_id, id).await? {
451 Some(p) => p,
452 None => {
453 trace!("Didn't find provider in store. Creating");
454 Provider {
455 id: id.clone(),
456 ..Default::default()
457 }
458 }
459 };
460
461 let status = match (current.hosts.get(host_id), failed) {
462 (_, Some(true)) => Some(ProviderStatus::Failed),
464 (_, Some(false)) => Some(ProviderStatus::Running),
465 (Some(ProviderStatus::Pending) | None, None) => Some(ProviderStatus::Running),
468 _ => None,
469 };
470
471 if let Some(status) = status {
472 debug!("Updating store with current status");
473 current.hosts.insert(host_id.to_owned(), status);
474 }
475
476 self.store
480 .store(lattice_id, id.to_owned(), current)
481 .await
482 .map_err(anyhow::Error::from)
483 }
484
485 async fn populate_component_info(
487 &self,
488 components: &HashMap<String, Component>,
489 host_id: &str,
490 instance_map: Vec<ComponentDescription>,
491 ) -> anyhow::Result<Vec<(String, Component)>> {
492 let claims = self.ctl_client.get_claims().await?;
493
494 Ok(instance_map
495 .into_iter()
496 .map(|component_description| {
497 let instance = HashSet::from_iter([WadmComponentInfo {
498 count: component_description.max_instances() as usize,
499 annotations: component_description
500 .annotations()
501 .cloned()
502 .unwrap_or_default(),
503 }]);
504 if let Some(component) = components.get(component_description.id()) {
505 let mut new_instances = component.instances.clone();
507 new_instances.insert(host_id.to_owned(), instance);
508 let component = Component {
509 instances: new_instances,
510 reference: component_description.image_ref().into(),
511 name: component_description
512 .name()
513 .unwrap_or(&component.name)
514 .into(),
515 ..component.clone()
516 };
517
518 (component_description.id().to_string(), component)
519 } else if let Some(claim) = claims.get(component_description.id()) {
520 (
521 component_description.id().to_string(),
522 Component {
523 id: component_description.id().into(),
524 name: claim.name.to_owned(),
525 issuer: claim.issuer.to_owned(),
526 instances: HashMap::from_iter([(host_id.to_owned(), instance)]),
527 reference: component_description.image_ref().into(),
528 },
529 )
530 } else {
531 debug!("Claims not found for component on host, component is unsigned");
532
533 (
534 component_description.id().to_string(),
535 Component {
536 id: component_description.id().into(),
537 name: "".to_owned(),
538 issuer: "".to_owned(),
539 instances: HashMap::from_iter([(host_id.to_owned(), instance)]),
540 reference: component_description.image_ref().into(),
541 },
542 )
543 }
544 })
545 .collect::<Vec<(String, Component)>>())
546 }
547
548 #[instrument(level = "debug", skip(self, host), fields(host_id = %host.host_id))]
549 async fn heartbeat_component_update(
550 &self,
551 lattice_id: &str,
552 host: &HostHeartbeat,
553 inventory_components: &Vec<ComponentDescription>,
554 ) -> anyhow::Result<()> {
555 debug!("Fetching current component state");
556 let components = self.store.list::<Component>(lattice_id).await?;
557
558 let components_to_update = inventory_components
561 .iter()
562 .filter_map(|component_description| {
563 if components
564 .get(component_description.id())
565 .map(|component| {
568 component_description.image_ref() == component.reference
570 && component
571 .instances
572 .get(&host.host_id)
573 .map(|store_instances| {
574 if store_instances.len() != component.instances.len() {
576 return false;
577 }
578 let annotations: BTreeMap<String, String> =
580 component_description
581 .annotations()
582 .cloned()
583 .map(|a| a.into_iter().collect())
584 .unwrap_or_default();
585 store_instances.get(&annotations).map_or(
586 false,
587 |store_instance| {
588 component_description.max_instances() as usize
589 == store_instance.count
590 },
591 )
592 })
593 .unwrap_or(false)
594 })
595 .unwrap_or(false)
596 {
597 None
598 } else {
599 Some(component_description.to_owned())
600 }
601 })
602 .collect::<Vec<ComponentDescription>>();
604
605 let components_to_store = self
606 .populate_component_info(&components, &host.host_id, components_to_update)
607 .await?;
608
609 trace!("Updating components with new status from host");
610
611 self.store
612 .store_many(lattice_id, components_to_store)
613 .await?;
614
615 Ok(())
616 }
617
618 #[instrument(level = "debug", skip(self, heartbeat), fields(host_id = %heartbeat.host_id))]
619 async fn heartbeat_provider_update(
620 &self,
621 lattice_id: &str,
622 heartbeat: &HostHeartbeat,
623 inventory_providers: &Vec<ProviderDescription>,
624 ) -> anyhow::Result<()> {
625 debug!("Fetching current provider state");
626 let providers = self.store.list::<Provider>(lattice_id).await?;
627 let providers_to_update = inventory_providers.iter().filter_map(|info| {
628 match providers.get(info.id()).cloned() {
632 Some(mut prov) => {
633 let mut has_changes = false;
634 if prov.name.is_empty() {
635 prov.name = info.name().map(String::from).unwrap_or_default();
636 has_changes = true;
637 }
638 if prov.reference.is_empty() {
639 prov.reference = info.image_ref().map(String::from).unwrap_or_default();
640 has_changes = true;
641 }
642 if let Entry::Vacant(entry) = prov.hosts.entry(heartbeat.host_id.clone()) {
643 entry.insert(ProviderStatus::default());
644 has_changes = true;
645 }
646 if has_changes {
647 Some((info.id().to_string(), prov))
648 } else {
649 None
650 }
651 }
652 None => {
653 Some((
656 info.id().to_string(),
657 Provider {
658 id: info.id().to_string(),
659 hosts: [(heartbeat.host_id.clone(), ProviderStatus::default())].into(),
660 name: info.name().map(String::from).unwrap_or_default(),
661 reference: info.image_ref().map(String::from).unwrap_or_default(),
662 ..Default::default()
663 },
664 ))
665 }
666 }
667 });
668
669 trace!("Updating providers with new status from host");
670 self.store
671 .store_many(lattice_id, providers_to_update)
672 .await?;
673
674 Ok(())
675 }
676
677 #[instrument(level = "debug", skip(self, data), fields(name = %data.manifest.metadata.name))]
678 async fn handle_manifest_published(
679 &self,
680 lattice_id: &str,
681 data: &ManifestPublished,
682 ) -> anyhow::Result<()> {
683 debug!(name = %data.manifest.metadata.name, "Handling published manifest");
684
685 let old_scalers = self
686 .scalers
687 .remove_raw_scalers(&data.manifest.metadata.name)
688 .await;
689 let scalers = self.scalers.scalers_for_manifest(&data.manifest);
690
691 self.scalers.refresh_data().await?;
693 let cleanup_commands = if let Some(old_scalers) = old_scalers {
694 let (_updated_component, outdated_component): (ScalerList, ScalerList) = old_scalers
697 .into_iter()
698 .partition(|old| scalers.iter().any(|new| new.id() == old.id()));
699
700 let futs = outdated_component
702 .iter()
703 .map(|s| async { s.cleanup().await });
704 futures::future::join_all(futs)
705 .await
706 .into_iter()
707 .filter_map(|res: Result<Vec<Command>>| match res {
708 Ok(commands) => Some(commands),
709 Err(e) => {
710 warn!(error = ?e, "Failed to clean up old scaler");
711 None
712 }
713 })
714 .flatten()
715 .collect::<Vec<Command>>()
716 } else {
717 vec![]
718 };
719
720 let scalers = self.scalers.add_scalers(&data.manifest, scalers).await?;
727
728 let (commands, res) = get_commands_and_result(
729 scalers.iter().map(|s| s.reconcile()),
730 "Errors occurred during initial reconciliation",
731 )
732 .await;
733
734 let status = detailed_scaler_status(&scalers).await;
735
736 trace!(?status, "Setting status");
737 if let Err(e) = self
738 .status_publisher
739 .publish_status(data.manifest.metadata.name.as_ref(), status)
740 .await
741 {
742 warn!("Failed to set manifest status: {e:}");
743 };
744
745 trace!(?commands, "Publishing commands");
746 self.command_publisher.publish_commands(commands).await?;
749
750 if let Err(e) = self
754 .command_publisher
755 .publish_commands(cleanup_commands)
756 .await
757 {
758 warn!(error = ?e, "Failed to publish cleanup commands from old application, some resources may be left behind");
759 }
760
761 res
762 }
763
764 #[instrument(level = "debug", skip(self))]
765 async fn run_scalers_with_hint(&self, event: &Event, name: &str) -> anyhow::Result<()> {
766 let scalers = match self.scalers.get_scalers(name).await {
767 Some(scalers) => scalers,
768 None => {
769 debug!("No scalers currently exist for model");
770 return Ok(());
771 }
772 };
773 self.scalers.refresh_data().await?;
775 let (commands, res) = get_commands_and_result(
776 scalers.iter().map(|s| s.handle_event(event)),
777 "Errors occurred while handling event",
778 )
779 .await;
780
781 let status = detailed_scaler_status(&scalers).await;
782 trace!(?status, "Setting status");
783 if let Err(e) = self.status_publisher.publish_status(name, status).await {
784 warn!(error = ?e, "Failed to set status for scaler");
785 };
786
787 trace!(?commands, "Publishing commands");
788 self.command_publisher.publish_commands(commands).await?;
789
790 res
791 }
792
793 #[instrument(level = "debug", skip(self))]
794 async fn run_all_scalers(&self, event: &Event) -> anyhow::Result<()> {
795 let scalers = self.scalers.get_all_scalers().await;
796 self.scalers.refresh_data().await?;
798 let futs = scalers.iter().map(|(name, scalers)| async move {
799 let (commands, res) = get_commands_and_result(
800 scalers.iter().map(|scaler| scaler.handle_event(event)),
801 "Errors occurred while handling event with all scalers",
802 )
803 .await;
804
805 let status = detailed_scaler_status(scalers).await;
806
807 trace!(?status, "Setting status");
808 if let Err(e) = self.status_publisher.publish_status(name, status).await {
809 warn!(error = ?e, "Failed to set status for scaler");
810 };
811
812 (commands, res)
813 });
814
815 let (commands, res) = futures::future::join_all(futs).await.into_iter().fold(
817 (vec![], Ok(())),
818 |(mut cmds, res), (mut new_cmds, new_res)| {
819 cmds.append(&mut new_cmds);
820 let res = match (res, new_res) {
821 (Ok(_), Ok(_)) => Ok(()),
822 (Ok(_), Err(e)) | (Err(e), Ok(_)) => Err(e),
823 (Err(e), Err(e2)) => Err(e.context(e2)),
824 };
825 (cmds, res)
826 },
827 );
828
829 trace!(?commands, "Publishing commands");
830 self.command_publisher.publish_commands(commands).await?;
831
832 res
833 }
834}
835
836#[async_trait::async_trait]
837impl<StateStore, C, P> Worker for EventWorker<StateStore, C, P>
838where
839 StateStore: Store + Send + Sync + Clone + 'static,
840 C: ClaimsSource
841 + InventorySource
842 + LinkSource
843 + ConfigSource
844 + SecretSource
845 + Clone
846 + Send
847 + Sync
848 + 'static,
849 P: Publisher + Clone + Send + Sync + 'static,
850{
851 type Message = Event;
852
853 #[instrument(level = "debug", skip(self))]
854 async fn do_work(&self, mut message: ScopedMessage<Self::Message>) -> WorkResult<()> {
855 let res = match message.as_ref() {
857 Event::ComponentScaled(component) => self
858 .handle_component_scaled(&message.lattice_id, component)
859 .await
860 .map(|_| {
861 component
862 .annotations
863 .get(APP_SPEC_ANNOTATION)
864 .map(|s| s.as_str())
865 }),
866 Event::HostHeartbeat(host) => self
867 .handle_host_heartbeat(&message.lattice_id, host)
868 .await
869 .map(|_| None),
870 Event::HostStarted(host) => self
871 .handle_host_started(&message.lattice_id, host)
872 .await
873 .map(|_| None),
874 Event::HostStopped(host) => self
875 .handle_host_stopped(&message.lattice_id, host)
876 .await
877 .map(|_| None),
878 Event::ProviderStarted(provider) => self
879 .handle_provider_started(&message.lattice_id, provider)
880 .await
881 .map(|_| {
882 provider
883 .annotations
884 .get(APP_SPEC_ANNOTATION)
885 .map(|s| s.as_str())
886 }),
887 Event::ProviderStopped(provider) => self
894 .handle_provider_stopped(&message.lattice_id, provider)
895 .await
896 .map(|_| None),
897 Event::ProviderHealthCheckStatus(ProviderHealthCheckStatus { data }) => self
898 .handle_provider_health_check(&message.lattice_id, data, None)
899 .await
900 .map(|_| None),
901 Event::ProviderHealthCheckPassed(ProviderHealthCheckPassed { data }) => self
902 .handle_provider_health_check(&message.lattice_id, data, Some(false))
903 .await
904 .map(|_| None),
905 Event::ProviderHealthCheckFailed(ProviderHealthCheckFailed { data }) => self
906 .handle_provider_health_check(&message.lattice_id, data, Some(true))
907 .await
908 .map(|_| None),
909 Event::ManifestPublished(data) => self
910 .handle_manifest_published(&message.lattice_id, data)
911 .await
912 .map(|_| None),
913 Event::ManifestUnpublished(data) => {
914 debug!("Handling unpublished manifest");
915
916 match self.scalers.remove_scalers(&data.name).await {
917 Some(Ok(_)) => {
918 return message.ack().await.map_err(WorkError::from);
919 }
920 Some(Err(e)) => {
921 message.nack().await;
922 return Err(WorkError::Other(e.into()));
923 }
924 None => Ok(None),
925 }
926 }
927 Event::LinkdefSet(_)
930 | Event::LinkdefDeleted(_)
931 | Event::ConfigSet(_)
932 | Event::ConfigDeleted(_)
933 | Event::ProviderStartFailed(_)
934 | Event::ComponentScaleFailed(_) => {
935 trace!("Got event we don't care about. Not modifying state.");
936 Ok(None)
937 }
938 };
939
940 let res = match res {
941 Ok(Some(name)) => self.run_scalers_with_hint(&message, name).await,
942 Ok(None) => self.run_all_scalers(&message).await,
943 Err(e) => Err(e),
944 }
945 .map_err(Box::<dyn std::error::Error + Send + 'static>::from);
946
947 if let Err(e) = res {
948 message.nack().await;
949 return Err(WorkError::Other(e));
950 }
951
952 message.ack().await.map_err(WorkError::from)
953 }
954}
955
956pub(crate) async fn get_commands_and_result<Fut, I>(
959 futs: I,
960 error_message: &str,
961) -> (Vec<Command>, Result<()>)
962where
963 Fut: futures::Future<Output = Result<Vec<Command>>>,
964 I: IntoIterator<Item = Fut>,
965{
966 let (commands, errors): (Vec<_>, Vec<_>) = futures::future::join_all(futs)
967 .await
968 .into_iter()
969 .partition(Result::is_ok);
970 (
972 commands.into_iter().flat_map(Result::unwrap).collect(),
973 map_to_result(
974 errors.into_iter().map(Result::unwrap_err).collect(),
975 error_message,
976 ),
977 )
978}
979
980pub async fn detailed_scaler_status(scalers: &ScalerList) -> Status {
982 let futs = scalers.iter().map(|s| async {
983 (
984 s.id().to_string(),
985 s.kind().to_string(),
986 s.name(),
987 s.status().await,
988 )
989 });
990 let status = futures::future::join_all(futs).await;
991 Status::new(
992 StatusInfo {
993 status_type: status
994 .iter()
995 .map(|(_id, _name, _kind, s)| s.status_type)
996 .sum(),
997 message: status
998 .iter()
999 .filter_map(|(_id, _name, _kind, s)| {
1000 let message = s.message.trim();
1001 if message.is_empty() {
1002 None
1003 } else {
1004 Some(message.to_owned())
1005 }
1006 })
1007 .collect::<Vec<_>>()
1008 .join(", "),
1009 },
1010 status
1011 .into_iter()
1012 .map(|(id, kind, name, info)| ScalerStatus {
1013 id,
1014 name,
1015 kind,
1016 info,
1017 })
1018 .collect(),
1019 )
1020}
1021
1022fn map_to_result(errors: Vec<anyhow::Error>, error_message: &str) -> Result<()> {
1023 if errors.is_empty() {
1024 Ok(())
1025 } else {
1026 let mut error = anyhow::anyhow!("{error_message}");
1027 for e in errors {
1028 error = error.context(e);
1029 }
1030 Err(error)
1031 }
1032}
1033
1034#[cfg(test)]
1035mod test {
1036 use std::sync::Arc;
1037
1038 use tokio::sync::RwLock;
1039 use wasmcloud_control_interface::{ComponentDescription, HostInventory, ProviderDescription};
1040
1041 use super::*;
1042
1043 use crate::{
1044 storage::ReadStore,
1045 test_util::{NoopPublisher, TestLatticeSource, TestStore},
1046 };
1047
1048 #[tokio::test]
1052 async fn test_all_state() {
1053 let store = Arc::new(TestStore::default());
1054 let inventory = Arc::new(RwLock::new(HashMap::default()));
1055 let lattice_source = TestLatticeSource {
1056 inventory: inventory.clone(),
1057 ..Default::default()
1058 };
1059
1060 let lattice_id = "all_state";
1061
1062 let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
1063 let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
1064 let worker = EventWorker::new(
1065 store.clone(),
1066 lattice_source.clone(),
1067 command_publisher.clone(),
1068 status_publisher.clone(),
1069 ScalerManager::test_new(
1070 NoopPublisher,
1071 lattice_id,
1072 store.clone(),
1073 command_publisher,
1074 status_publisher.clone(),
1075 lattice_source,
1076 )
1077 .await,
1078 );
1079
1080 let host1_id = "DS1";
1081 let host2_id = "starkiller";
1082
1083 let labels = HashMap::from([("superweapon".to_string(), "true".to_string())]);
1087 worker
1088 .handle_host_started(
1089 lattice_id,
1090 &HostStarted {
1091 friendly_name: "death-star-42".to_string(),
1092 id: host1_id.into(),
1093 labels: labels.clone(),
1094 },
1095 )
1096 .await
1097 .expect("Should be able to handle event");
1098
1099 let current_state = store.list::<Host>(lattice_id).await.unwrap();
1100 assert_eq!(current_state.len(), 1, "Only one host should be in store");
1101 let host = current_state
1102 .get("DS1")
1103 .expect("Host should exist in state");
1104 assert_eq!(
1105 host.friendly_name, "death-star-42",
1106 "Host should have the proper name in state"
1107 );
1108 assert_eq!(host.labels, labels, "Host should have the correct labels");
1109
1110 let labels2 = HashMap::from([
1111 ("superweapon".to_string(), "true".to_string()),
1112 ("lazy_writing".to_string(), "true".to_string()),
1113 ]);
1114 worker
1115 .handle_host_started(
1116 lattice_id,
1117 &HostStarted {
1118 friendly_name: "starkiller-base-2015".to_string(),
1119 id: host2_id.into(),
1120 labels: labels2.clone(),
1121 },
1122 )
1123 .await
1124 .expect("Should be able to handle event");
1125
1126 let current_state = store.list::<Host>(lattice_id).await.unwrap();
1127 assert_eq!(current_state.len(), 2, "Both hosts should be in the store");
1128 let host = current_state
1129 .get("starkiller")
1130 .expect("Host should exist in state");
1131 assert_eq!(
1132 host.friendly_name, "starkiller-base-2015",
1133 "Host should have the proper name in state"
1134 );
1135 assert_eq!(host.labels, labels2, "Host should have the correct labels");
1136
1137 let host = current_state
1139 .get("DS1")
1140 .expect("Host should exist in state");
1141 assert_eq!(
1142 host.friendly_name, "death-star-42",
1143 "Host should have the proper name in state"
1144 );
1145 assert_eq!(host.labels, labels, "Host should have the correct labels");
1146
1147 let component1_scaled = ComponentScaled {
1152 claims: Some(ComponentClaims {
1153 call_alias: Some("Grand Moff".into()),
1154 issuer: "Sheev Palpatine".into(),
1155 name: "Grand Moff Tarkin".into(),
1156 version: Some("0.1.0".into()),
1157 ..Default::default()
1158 }),
1159 image_ref: "coruscant.galactic.empire/tarkin:0.1.0".into(),
1160 component_id: "TARKIN".into(),
1161 host_id: host1_id.into(),
1162 annotations: BTreeMap::default(),
1163 max_instances: 500,
1164 };
1165 worker
1166 .handle_component_scaled(lattice_id, &component1_scaled)
1167 .await
1168 .expect("Should be able to handle component event");
1169 let components = store.list::<Component>(lattice_id).await.unwrap();
1170 let component = components
1171 .get("TARKIN")
1172 .expect("Component should exist in state");
1173 let hosts = store.list::<Host>(lattice_id).await.unwrap();
1174 let host = hosts.get(host1_id).expect("Host should exist in state");
1175 assert_eq!(
1176 host.components.get(&component1_scaled.component_id),
1177 Some(&500),
1178 "Component count in host should be updated"
1179 );
1180 assert_eq!(
1181 component.count_for_host(host1_id),
1182 500,
1183 "Component count should be modified with an increase in scale"
1184 );
1185
1186 let component1_scaled = ComponentScaled {
1187 claims: Some(ComponentClaims {
1188 call_alias: Some("Grand Moff".into()),
1189 issuer: "Sheev Palpatine".into(),
1190 name: "Grand Moff Tarkin".into(),
1191 version: Some("0.1.0".into()),
1192 ..Default::default()
1193 }),
1194 image_ref: "coruscant.galactic.empire/tarkin:0.1.0".into(),
1195 component_id: "TARKIN".into(),
1196 host_id: host1_id.into(),
1197 annotations: BTreeMap::default(),
1198 max_instances: 200,
1199 };
1200 worker
1201 .handle_component_scaled(lattice_id, &component1_scaled)
1202 .await
1203 .expect("Should be able to handle component event");
1204 let components = store.list::<Component>(lattice_id).await.unwrap();
1205 let component = components
1206 .get("TARKIN")
1207 .expect("Component should exist in state");
1208 let hosts = store.list::<Host>(lattice_id).await.unwrap();
1209 let host = hosts.get(host1_id).expect("Host should exist in state");
1210 assert_eq!(
1211 host.components.get(&component1_scaled.component_id),
1212 Some(&200),
1213 "Component count in host should be updated"
1214 );
1215 assert_eq!(
1216 component.count_for_host(host1_id),
1217 200,
1218 "Component count should be modified with a decrease in scale"
1219 );
1220
1221 let component1_scaled = ComponentScaled {
1222 claims: Some(ComponentClaims {
1223 call_alias: Some("Grand Moff".into()),
1224 issuer: "Sheev Palpatine".into(),
1225 name: "Grand Moff Tarkin".into(),
1226 version: Some("0.1.0".into()),
1227 ..Default::default()
1228 }),
1229 image_ref: "coruscant.galactic.empire/tarkin:0.1.0".into(),
1230 component_id: "TARKIN".into(),
1231 host_id: host1_id.into(),
1232 annotations: BTreeMap::default(),
1233 max_instances: 0,
1234 };
1235 worker
1236 .handle_component_scaled(lattice_id, &component1_scaled)
1237 .await
1238 .expect("Should be able to handle component event");
1239 let components = store.list::<Component>(lattice_id).await.unwrap();
1240 let hosts = store.list::<Host>(lattice_id).await.unwrap();
1241 let host = hosts.get(host1_id).expect("Host should exist in state");
1242 assert_eq!(
1243 host.components.get(&component1_scaled.component_id),
1244 None,
1245 "Component in host should be removed"
1246 );
1247 assert!(
1248 !components.contains_key("TARKIN"),
1249 "Component should be removed from state"
1250 );
1251
1252 let component1_scaled = ComponentScaled {
1253 claims: Some(ComponentClaims {
1254 call_alias: Some("Grand Moff".into()),
1255 issuer: "Sheev Palpatine".into(),
1256 name: "Grand Moff Tarkin".into(),
1257 version: Some("0.1.0".into()),
1258 ..Default::default()
1259 }),
1260 image_ref: "coruscant.galactic.empire/tarkin:0.1.0".into(),
1261 component_id: "TARKIN".into(),
1262 host_id: host1_id.into(),
1263 annotations: BTreeMap::default(),
1264 max_instances: 1,
1265 };
1266 worker
1267 .handle_component_scaled(lattice_id, &component1_scaled)
1268 .await
1269 .expect("Should be able to handle component event");
1270 let components = store.list::<Component>(lattice_id).await.unwrap();
1271 let component = components
1272 .get("TARKIN")
1273 .expect("Component should exist in state");
1274 let hosts = store.list::<Host>(lattice_id).await.unwrap();
1275 let host = hosts.get(host1_id).expect("Host should exist in state");
1276 assert_eq!(
1277 host.components.get(&component1_scaled.component_id),
1278 Some(&1),
1279 "Component in host should be readded from scratch"
1280 );
1281 assert_eq!(
1282 component.count_for_host(host1_id),
1283 1,
1284 "Component count should be modified with an initial start"
1285 );
1286 worker
1287 .handle_component_scaled(
1288 lattice_id,
1289 &ComponentScaled {
1290 host_id: host2_id.into(),
1291 ..component1_scaled.clone()
1292 },
1293 )
1294 .await
1295 .expect("Should be able to handle component event");
1296
1297 let component2_scaled = ComponentScaled {
1298 claims: Some(ComponentClaims {
1299 call_alias: Some("Darth".into()),
1300 issuer: "Sheev Palpatine".into(),
1301 name: "Darth Vader".into(),
1302 version: Some("0.1.0".into()),
1303 ..Default::default()
1304 }),
1305 image_ref: "coruscant.galactic.empire/vader:0.1.0".into(),
1306 host_id: host1_id.into(),
1307 component_id: "DARTHVADER".into(),
1308 annotations: BTreeMap::default(),
1309 max_instances: 2,
1310 };
1311
1312 worker
1313 .handle_component_scaled(lattice_id, &component2_scaled)
1314 .await
1315 .expect("Should be able to handle component scaled event");
1316 worker
1317 .handle_component_scaled(
1318 lattice_id,
1319 &ComponentScaled {
1320 host_id: host2_id.into(),
1321 ..component2_scaled.clone()
1322 },
1323 )
1324 .await
1325 .expect("Should be able to handle component event");
1326
1327 let provider1 = ProviderStarted {
1332 claims: Some(ProviderClaims {
1333 issuer: "Sheev Palpatine".into(),
1334 name: "Force Choke".into(),
1335 version: "0.1.0".into(),
1336 ..Default::default()
1337 }),
1338 image_ref: "coruscant.galactic.empire/force_choke:0.1.0".into(),
1339 provider_id: "CHOKE".into(),
1340 host_id: host1_id.into(),
1341 annotations: BTreeMap::default(),
1342 };
1343
1344 let provider2 = ProviderStarted {
1345 claims: Some(ProviderClaims {
1346 issuer: "Sheev Palpatine".into(),
1347 name: "Death Star Laser".into(),
1348 version: "0.1.0".into(),
1349 ..Default::default()
1350 }),
1351 image_ref: "coruscant.galactic.empire/laser:0.1.0".into(),
1352 provider_id: "BYEBYEALDERAAN".into(),
1353 host_id: host2_id.into(),
1354 annotations: BTreeMap::default(),
1355 };
1356
1357 worker
1358 .handle_provider_started(lattice_id, &provider1)
1359 .await
1360 .expect("Should be able to handle provider event");
1361 let providers = store.list::<Provider>(lattice_id).await.unwrap();
1362 assert_eq!(providers.len(), 1, "Should only be 1 provider in state");
1363 assert_provider(&providers, &provider1, &[host1_id]);
1364
1365 worker
1367 .handle_provider_started(lattice_id, &provider2)
1368 .await
1369 .expect("Should be able to handle provider event");
1370 worker
1371 .handle_provider_started(
1372 lattice_id,
1373 &ProviderStarted {
1374 host_id: host1_id.into(),
1375 provider_id: provider2.provider_id.clone(),
1376 ..provider2.clone()
1377 },
1378 )
1379 .await
1380 .expect("Should be able to handle provider event");
1381 let providers = store.list::<Provider>(lattice_id).await.unwrap();
1382 assert_eq!(providers.len(), 2, "Should only be 2 providers in state");
1383 assert_provider(&providers, &provider2, &[host1_id, host2_id]);
1384
1385 let hosts = store.list::<Host>(lattice_id).await.unwrap();
1387 assert_eq!(hosts.len(), 2, "Should only have 2 hosts");
1388 let host = hosts.get(host1_id).expect("Host should still exist");
1389 assert_eq!(
1390 host.components.len(),
1391 2,
1392 "Should have two different components running"
1393 );
1394 assert_eq!(
1395 host.providers.len(),
1396 2,
1397 "Should have two different providers running"
1398 );
1399 let host = hosts.get(host2_id).expect("Host should still exist");
1400 assert_eq!(
1401 host.components.len(),
1402 2,
1403 "Should have two different components running"
1404 );
1405 assert_eq!(
1406 host.providers.len(),
1407 1,
1408 "Should have a single provider running"
1409 );
1410
1411 let component_1_id = "TARKIN";
1412 let component_2_id = "DARTHVADER";
1413
1414 worker
1415 .handle_host_heartbeat(
1416 lattice_id,
1417 &HostHeartbeat {
1418 components: vec![
1419 ComponentDescription::builder()
1420 .id(component_1_id.into())
1421 .revision(0)
1422 .image_ref("ref1".into())
1423 .max_instances(2)
1424 .build()
1425 .expect("failed to build description"),
1426 ComponentDescription::builder()
1427 .id(component_2_id.into())
1428 .revision(0)
1429 .image_ref("ref2".into())
1430 .max_instances(2)
1431 .build()
1432 .expect("failed to build description"),
1433 ],
1434 friendly_name: "death-star-42".into(),
1435 labels: labels.clone(),
1436 issuer: "".to_string(),
1437 providers: vec![
1438 ProviderDescription::builder()
1439 .id(&provider1.provider_id)
1440 .image_ref(&provider1.image_ref)
1441 .revision(0)
1442 .build()
1443 .expect("failed to build provider description"),
1444 ProviderDescription::builder()
1445 .id(&provider2.provider_id)
1446 .image_ref(&provider2.image_ref)
1447 .revision(0)
1448 .build()
1449 .expect("failed to build provider description"),
1450 ],
1451 uptime_human: "30s".into(),
1452 uptime_seconds: 30,
1453 version: semver::Version::parse("0.61.0").unwrap(),
1454 host_id: host1_id.into(),
1455 },
1456 )
1457 .await
1458 .expect("Should be able to handle host heartbeat");
1459
1460 worker
1461 .handle_host_heartbeat(
1462 lattice_id,
1463 &HostHeartbeat {
1464 components: vec![
1465 ComponentDescription::builder()
1466 .id(component_1_id.into())
1467 .image_ref("ref1".into())
1468 .revision(0)
1469 .max_instances(2)
1470 .build()
1471 .expect("failed to build description"),
1472 ComponentDescription::builder()
1473 .id(component_2_id.into())
1474 .image_ref("ref2".into())
1475 .revision(0)
1476 .max_instances(2)
1477 .build()
1478 .expect("failed to build description"),
1479 ],
1480 issuer: "".to_string(),
1481 friendly_name: "starkiller-base-2015".to_string(),
1482 labels: labels2.clone(),
1483 providers: vec![ProviderDescription::builder()
1484 .id(&provider2.provider_id)
1485 .image_ref(&provider2.image_ref)
1486 .revision(0)
1487 .build()
1488 .expect("failed to build provider description")],
1489 uptime_human: "30s".into(),
1490 uptime_seconds: 30,
1491 version: semver::Version::parse("0.61.0").unwrap(),
1492 host_id: host2_id.into(),
1493 },
1494 )
1495 .await
1496 .expect("Should be able to handle host heartbeat");
1497
1498 let providers = store.list::<Provider>(lattice_id).await.unwrap();
1500 assert_eq!(providers.len(), 2, "Should still have 2 providers in state");
1501 assert_provider(&providers, &provider1, &[host1_id]);
1502 assert_provider(&providers, &provider2, &[host1_id, host2_id]);
1503
1504 let components = store.list::<Component>(lattice_id).await.unwrap();
1505 assert_eq!(
1506 components.len(),
1507 2,
1508 "Should still have 2 components in state"
1509 );
1510 assert_component(&components, component_1_id, &[(host1_id, 2), (host2_id, 2)]);
1511 assert_component(&components, component_2_id, &[(host1_id, 2), (host2_id, 2)]);
1512
1513 let stopped = ComponentScaled {
1519 claims: None,
1520 image_ref: "coruscant.galactic.empire/tarkin:0.1.0".into(),
1521 annotations: BTreeMap::default(),
1522 component_id: component_1_id.into(),
1523 host_id: host1_id.into(),
1524 max_instances: 0,
1525 };
1526
1527 worker
1528 .handle_component_scaled(lattice_id, &stopped)
1529 .await
1530 .expect("Should be able to handle component stop event");
1531
1532 let components = store.list::<Component>(lattice_id).await.unwrap();
1533 assert_eq!(
1534 components.len(),
1535 2,
1536 "Should still have 2 components in state"
1537 );
1538 assert_component(&components, component_1_id, &[(host2_id, 2)]);
1539 assert_component(&components, component_2_id, &[(host1_id, 2), (host2_id, 2)]);
1540
1541 let host = store
1542 .get::<Host>(lattice_id, host2_id)
1543 .await
1544 .expect("Should be able to access store")
1545 .expect("Should have the host in the store");
1546 assert_eq!(*host.components.get(component_1_id).unwrap_or(&0), 2_usize);
1547 assert_eq!(*host.components.get(component_2_id).unwrap_or(&0), 2_usize);
1548
1549 let stopped2 = ComponentScaled {
1551 host_id: host2_id.into(),
1552 ..stopped
1553 };
1554
1555 worker
1556 .handle_component_scaled(lattice_id, &stopped2)
1557 .await
1558 .expect("Should be able to handle component scale event");
1559
1560 let components = store.list::<Component>(lattice_id).await.unwrap();
1561 assert_eq!(components.len(), 1, "Should only have 1 component in state");
1562 assert_component(&components, component_2_id, &[(host1_id, 2), (host2_id, 2)]);
1564
1565 worker
1570 .handle_provider_stopped(
1571 lattice_id,
1572 &ProviderStopped {
1573 annotations: BTreeMap::default(),
1574 provider_id: provider2.provider_id.clone(),
1575 reason: String::new(),
1576 host_id: host1_id.into(),
1577 },
1578 )
1579 .await
1580 .expect("Should be able to handle provider stop event");
1581
1582 let providers = store.list::<Provider>(lattice_id).await.unwrap();
1583 assert_eq!(providers.len(), 2, "Should still have 2 providers in state");
1584 assert_provider(&providers, &provider1, &[host1_id]);
1585 assert_provider(&providers, &provider2, &[host2_id]);
1586
1587 let hosts = store.list::<Host>(lattice_id).await.unwrap();
1589 assert_eq!(hosts.len(), 2, "Should only have 2 hosts");
1590 let host = hosts.get(host1_id).expect("Host should still exist");
1591 assert_eq!(host.components.len(), 1, "Should have 1 component running");
1592 assert_eq!(host.providers.len(), 1, "Should have 1 provider running");
1593 let host = hosts.get(host2_id).expect("Host should still exist");
1594 assert_eq!(host.components.len(), 1, "Should have 1 component running");
1595 assert_eq!(
1596 host.providers.len(),
1597 1,
1598 "Should have a single provider running"
1599 );
1600
1601 *inventory.write().await = HashMap::from_iter([
1608 (
1609 host1_id.to_string(),
1610 HostInventory::builder()
1611 .friendly_name("my-host-3".into())
1612 .components(vec![ComponentDescription::builder()
1613 .id(component_2_id.into())
1614 .image_ref("ref2".into())
1615 .revision(0)
1616 .max_instances(2)
1617 .build()
1618 .expect("failed to build description")])
1619 .host_id(host1_id.into())
1620 .version(semver::Version::parse("0.61.0").unwrap().to_string())
1621 .uptime_human("60s".into())
1622 .uptime_seconds(60)
1623 .build()
1624 .expect("failed to build host inventory"),
1625 ),
1626 (
1627 host2_id.to_string(),
1628 HostInventory::builder()
1629 .friendly_name("my-host-4".into())
1630 .components(vec![ComponentDescription::builder()
1631 .id(component_2_id.into())
1632 .image_ref("ref2".into())
1633 .revision(0)
1634 .max_instances(2)
1635 .build()
1636 .expect("failed to build description")])
1637 .host_id(host2_id.into())
1638 .version(semver::Version::parse("1.2.3").unwrap().to_string())
1639 .uptime_human("100s".into())
1640 .uptime_seconds(100)
1641 .build()
1642 .expect("failed to build host inventory"),
1643 ),
1644 ]);
1645
1646 worker
1648 .handle_host_heartbeat(
1649 lattice_id,
1650 &HostHeartbeat {
1651 components: vec![ComponentDescription::builder()
1652 .id(component_2_id.into())
1653 .image_ref("ref2".into())
1654 .revision(0)
1655 .max_instances(2)
1656 .build()
1657 .expect("failed to build description")],
1658 friendly_name: "death-star-42".to_string(),
1659 issuer: "".to_string(),
1660 labels,
1661 providers: vec![ProviderDescription::builder()
1662 .id(&provider1.provider_id)
1663 .image_ref(&provider1.image_ref)
1664 .revision(1)
1665 .build()
1666 .expect("failed to build provider description")],
1667 uptime_human: "60s".into(),
1668 uptime_seconds: 60,
1669 version: semver::Version::parse("0.61.0").unwrap(),
1670 host_id: host1_id.into(),
1671 },
1672 )
1673 .await
1674 .expect("Should be able to handle host heartbeat");
1675
1676 worker
1677 .handle_host_heartbeat(
1678 lattice_id,
1679 &HostHeartbeat {
1680 components: vec![ComponentDescription::builder()
1681 .id(component_2_id.into())
1682 .image_ref("ref2".into())
1683 .revision(0)
1684 .max_instances(2)
1685 .build()
1686 .expect("failed to build description")],
1687 friendly_name: "starkiller-base-2015".to_string(),
1688 labels: labels2,
1689 issuer: "".to_string(),
1690 providers: vec![ProviderDescription::builder()
1691 .id(&provider2.provider_id)
1692 .image_ref(&provider2.image_ref)
1693 .revision(0)
1694 .build()
1695 .expect("failed to build provider description")],
1696 uptime_human: "60s".into(),
1697 uptime_seconds: 60,
1698 version: semver::Version::parse("0.61.0").unwrap(),
1699 host_id: host2_id.into(),
1700 },
1701 )
1702 .await
1703 .expect("Should be able to handle host heartbeat");
1704
1705 let hosts = store.list::<Host>(lattice_id).await.unwrap();
1707 assert_eq!(hosts.len(), 2, "Should only have 2 hosts");
1708 let host = hosts.get(host1_id).expect("Host should still exist");
1709 assert_eq!(host.components.len(), 1, "Should have 1 component running");
1710 assert_eq!(host.providers.len(), 1, "Should have 1 provider running");
1711 let host = hosts.get(host2_id).expect("Host should still exist");
1712 assert_eq!(host.components.len(), 1, "Should have 1 component running");
1713 assert_eq!(
1714 host.providers.len(),
1715 1,
1716 "Should have a single provider running"
1717 );
1718
1719 let components = store.list::<Component>(lattice_id).await.unwrap();
1721 assert_eq!(components.len(), 1, "Should only have 1 component in state");
1722 assert_component(&components, component_2_id, &[(host1_id, 2), (host2_id, 2)]);
1723
1724 let providers = store.list::<Provider>(lattice_id).await.unwrap();
1725 assert_eq!(providers.len(), 2, "Should still have 2 providers in state");
1726 assert_provider(&providers, &provider1, &[host1_id]);
1727 assert_provider(&providers, &provider2, &[host2_id]);
1728
1729 worker
1734 .handle_host_stopped(
1735 lattice_id,
1736 &HostStopped {
1737 labels: HashMap::default(),
1738 id: host1_id.into(),
1739 },
1740 )
1741 .await
1742 .expect("Should be able to handle host stopped event");
1743
1744 let hosts = store.list::<Host>(lattice_id).await.unwrap();
1745 assert_eq!(hosts.len(), 1, "Should only have 1 host");
1746 let host = hosts.get(host2_id).expect("Host should still exist");
1747 assert_eq!(host.components.len(), 1, "Should have 1 component running");
1748 assert_eq!(
1749 host.providers.len(),
1750 1,
1751 "Should have a single provider running"
1752 );
1753
1754 let components = store.list::<Component>(lattice_id).await.unwrap();
1756 assert_eq!(components.len(), 1, "Should only have 1 component in state");
1757 assert_component(&components, component_2_id, &[(host2_id, 2)]);
1758
1759 let providers = store.list::<Provider>(lattice_id).await.unwrap();
1760 assert_eq!(providers.len(), 1, "Should now have 1 provider in state");
1761 assert_provider(&providers, &provider2, &[host2_id]);
1762 }
1763
1764 #[tokio::test]
1765 async fn test_discover_running_host() {
1766 let component1_id = "SKYWALKER";
1767 let component1_ref = "fakecloud.io/skywalker:0.1.0";
1768 let component2_id = "ORGANA";
1769 let component2_ref = "fakecloud.io/organa:0.1.0";
1770 let lattice_id = "discover_running_host";
1771 let claims = HashMap::from([
1772 (
1773 component1_id.into(),
1774 Claims {
1775 name: "tosche_station".to_string(),
1776 capabilities: vec!["wasmcloud:httpserver".to_string()],
1777 issuer: "GEORGELUCAS".to_string(),
1778 },
1779 ),
1780 (
1781 component2_id.into(),
1782 Claims {
1783 name: "alderaan".to_string(),
1784 capabilities: vec!["wasmcloud:keyvalue".to_string()],
1785 issuer: "GEORGELUCAS".to_string(),
1786 },
1787 ),
1788 ]);
1789 let store = Arc::new(TestStore::default());
1790 let inventory = Arc::new(RwLock::new(HashMap::default()));
1791 let lattice_source = TestLatticeSource {
1792 claims: claims.clone(),
1793 inventory: inventory.clone(),
1794 ..Default::default()
1795 };
1796 let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
1797 let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
1798 let worker = EventWorker::new(
1799 store.clone(),
1800 lattice_source.clone(),
1801 command_publisher.clone(),
1802 status_publisher.clone(),
1803 ScalerManager::test_new(
1804 NoopPublisher,
1805 lattice_id,
1806 store.clone(),
1807 command_publisher,
1808 status_publisher.clone(),
1809 lattice_source,
1810 )
1811 .await,
1812 );
1813
1814 let provider_id = "HYPERDRIVE";
1815 let host_id = "WHATAPIECEOFJUNK";
1816 *inventory.write().await = HashMap::from_iter([(
1819 host_id.to_string(),
1820 HostInventory::builder()
1821 .friendly_name("my-host-5".into())
1822 .components(vec![
1823 ComponentDescription::builder()
1824 .id(component1_id.into())
1825 .image_ref(component1_ref.into())
1826 .revision(0)
1827 .max_instances(2)
1828 .build()
1829 .expect("failed to build description"),
1830 ComponentDescription::builder()
1831 .id(component2_id.into())
1832 .image_ref(component2_ref.into())
1833 .revision(0)
1834 .max_instances(1)
1835 .build()
1836 .expect("failed to build description"),
1837 ])
1838 .host_id(host_id.into())
1839 .providers(vec![ProviderDescription::builder()
1840 .id(provider_id)
1841 .revision(0)
1842 .build()
1843 .expect("failed to build provider description")])
1844 .version(semver::Version::parse("0.61.0").unwrap().to_string())
1845 .uptime_human("60s".into())
1846 .uptime_seconds(60)
1847 .build()
1848 .expect("failed to build host inventory"),
1849 )]);
1850
1851 worker
1853 .handle_host_heartbeat(
1854 lattice_id,
1855 &HostHeartbeat {
1856 components: vec![
1857 ComponentDescription::builder()
1858 .id(component1_id.into())
1859 .image_ref(component1_ref.into())
1860 .revision(0)
1861 .max_instances(2)
1862 .build()
1863 .expect("failed to build description"),
1864 ComponentDescription::builder()
1865 .id(component2_id.into())
1866 .image_ref(component2_ref.into())
1867 .revision(0)
1868 .max_instances(1)
1869 .build()
1870 .expect("failed to build description"),
1871 ],
1872 friendly_name: "millenium_falcon-1977".to_string(),
1873 labels: HashMap::default(),
1874 issuer: "".to_string(),
1875 providers: vec![ProviderDescription::builder()
1876 .id(provider_id)
1877 .revision(0)
1878 .build()
1879 .expect("failed to build provider description")],
1880 uptime_human: "60s".into(),
1881 uptime_seconds: 60,
1882 version: semver::Version::parse("0.61.0").unwrap(),
1883 host_id: host_id.into(),
1884 },
1885 )
1886 .await
1887 .expect("Should be able to handle host heartbeat");
1888
1889 let components = store.list::<Component>(lattice_id).await.unwrap();
1892 assert_eq!(components.len(), 2, "Store should now have two components");
1893 let component = components
1894 .get(component1_id)
1895 .expect("component should exist");
1896 let expected = claims.get(component1_id).unwrap();
1897 assert_eq!(component.name, expected.name, "Data should match");
1898 assert_eq!(component.issuer, expected.issuer, "Data should match");
1899 assert_eq!(
1900 component
1901 .instances
1902 .get(host_id)
1903 .expect("Host should exist in count")
1904 .get(&BTreeMap::new())
1905 .expect("Should find a component with the correct annotations")
1906 .count,
1907 2,
1908 "Should have the right number of components running"
1909 );
1910
1911 let component = components
1912 .get(component2_id)
1913 .expect("Component should exist");
1914 let expected = claims.get(component2_id).unwrap();
1915 assert_eq!(component.name, expected.name, "Data should match");
1916 assert_eq!(component.issuer, expected.issuer, "Data should match");
1917 assert_eq!(
1918 component
1919 .instances
1920 .get(host_id)
1921 .expect("Host should exist in count")
1922 .len(),
1923 1,
1924 "Should have the right number of components running"
1925 );
1926
1927 let providers = store.list::<Provider>(lattice_id).await.unwrap();
1928 assert_eq!(providers.len(), 1, "Should have 1 provider in the store");
1929 let provider = providers.get(provider_id).expect("Provider should exist");
1930 assert_eq!(provider.id, provider_id, "Data should match");
1931 assert!(
1932 provider.hosts.contains_key(host_id),
1933 "Should have found host in provider store"
1934 );
1935 }
1936
1937 #[tokio::test]
1938 async fn test_provider_status_update() {
1939 let store = Arc::new(TestStore::default());
1940 let lattice_source = TestLatticeSource::default();
1941 let lattice_id = "provider_status";
1942 let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
1943 let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
1944 let worker = EventWorker::new(
1945 store.clone(),
1946 lattice_source.clone(),
1947 command_publisher.clone(),
1948 status_publisher.clone(),
1949 ScalerManager::test_new(
1950 NoopPublisher,
1951 lattice_id,
1952 store.clone(),
1953 command_publisher,
1954 status_publisher.clone(),
1955 lattice_source,
1956 )
1957 .await,
1958 );
1959
1960 let host_id = "CLOUDCITY";
1961
1962 let provider = ProviderStarted {
1964 claims: Some(ProviderClaims {
1965 issuer: "Lando Calrissian".into(),
1966 name: "Tibanna Gas Mining".into(),
1967 version: "0.1.0".into(),
1968 ..Default::default()
1969 }),
1970 image_ref: "bespin.lando.inc/tibanna:0.1.0".into(),
1971 provider_id: "GAS".into(),
1972 host_id: host_id.into(),
1973 annotations: BTreeMap::default(),
1974 };
1975
1976 worker
1977 .handle_provider_started(lattice_id, &provider)
1978 .await
1979 .expect("Should be able to handle provider started event");
1980 worker
1981 .handle_provider_health_check(
1982 lattice_id,
1983 &ProviderHealthCheckInfo {
1984 provider_id: provider.provider_id.clone(),
1985 host_id: host_id.into(),
1986 },
1987 Some(false),
1988 )
1989 .await
1990 .expect("Should be able to handle a provider health check event");
1991
1992 let providers = store.list::<Provider>(lattice_id).await.unwrap();
1993 assert_eq!(providers.len(), 1, "Only 1 provider should exist");
1994 let prov = providers
1995 .get(&provider.provider_id)
1996 .expect("Provider should exist");
1997 assert!(
1998 matches!(
1999 prov.hosts
2000 .get(host_id)
2001 .expect("Should find status for host"),
2002 ProviderStatus::Running
2003 ),
2004 "Provider should have a running status"
2005 );
2006
2007 worker
2009 .handle_provider_health_check(
2010 lattice_id,
2011 &ProviderHealthCheckInfo {
2012 provider_id: provider.provider_id.clone(),
2013 host_id: host_id.into(),
2014 },
2015 Some(true),
2016 )
2017 .await
2018 .expect("Should be able to handle a provider health check event");
2019
2020 let providers = store.list::<Provider>(lattice_id).await.unwrap();
2021 assert_eq!(providers.len(), 1, "Only 1 provider should exist");
2022 let prov = providers
2023 .get(&provider.provider_id)
2024 .expect("Provider should exist");
2025 assert!(
2026 matches!(
2027 prov.hosts
2028 .get(host_id)
2029 .expect("Should find status for host"),
2030 ProviderStatus::Failed
2031 ),
2032 "Provider should have a running status"
2033 );
2034 }
2035
2036 #[tokio::test]
2037 async fn test_heartbeat_updates_stale_data() {
2038 let store = Arc::new(TestStore::default());
2039 let inventory = Arc::new(RwLock::new(HashMap::default()));
2040 let lattice_source = TestLatticeSource {
2041 inventory: inventory.clone(),
2042 ..Default::default()
2043 };
2044 let lattice_id = "update_data";
2045
2046 let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
2047 let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
2048 let worker = EventWorker::new(
2049 store.clone(),
2050 lattice_source.clone(),
2051 command_publisher.clone(),
2052 status_publisher.clone(),
2053 ScalerManager::test_new(
2054 NoopPublisher,
2055 lattice_id,
2056 store.clone(),
2057 command_publisher,
2058 status_publisher.clone(),
2059 lattice_source,
2060 )
2061 .await,
2062 );
2063
2064 let host_id = "jabbaspalace";
2065
2066 store
2069 .store(
2070 lattice_id,
2071 "jabba".to_string(),
2072 Component {
2073 id: "jabba".to_string(),
2074 instances: HashMap::from([(
2075 host_id.to_string(),
2076 HashSet::from_iter([WadmComponentInfo {
2077 count: 1,
2078 annotations: BTreeMap::default(),
2079 }]),
2080 )]),
2081 ..Default::default()
2082 },
2083 )
2084 .await
2085 .unwrap();
2086 store
2087 .store(
2088 lattice_id,
2089 "jabbatheprovider".to_string(),
2090 Provider {
2091 id: "jabbatheprovider".to_string(),
2092 hosts: HashMap::from_iter([(host_id.to_string(), ProviderStatus::Pending)]),
2093 ..Default::default()
2094 },
2095 )
2096 .await
2097 .unwrap();
2098
2099 worker
2101 .handle_host_heartbeat(
2102 lattice_id,
2103 &HostHeartbeat {
2104 components: vec![
2105 ComponentDescription::builder()
2106 .id("jabba".into())
2107 .image_ref("jabba.tatooinecr.io/jabba:latest".into())
2108 .name("Da Hutt".into())
2109 .annotations(BTreeMap::from_iter([(
2110 "da".to_string(),
2111 "gobah".to_string(),
2112 )]))
2113 .revision(0)
2114 .max_instances(5)
2115 .build()
2116 .expect("failed to build description"),
2117 ComponentDescription::builder()
2118 .id("jabba2".into())
2119 .image_ref("jabba.tatooinecr.io/jabba:latest".into())
2120 .name("Da Hutt".into())
2121 .revision(0)
2122 .max_instances(1)
2123 .build()
2124 .expect("failed to build description"),
2125 ],
2126 friendly_name: "palace-1983".to_string(),
2127 labels: HashMap::default(),
2128 issuer: "".to_string(),
2129 providers: vec![ProviderDescription::builder()
2130 .annotations(BTreeMap::from_iter([(
2131 "one".to_string(),
2132 "two".to_string(),
2133 )]))
2134 .id("jabbatheprovider")
2135 .image_ref("jabba.tatooinecr.io/provider:latest")
2136 .name("Jabba The Provider")
2137 .revision(0)
2138 .build()
2139 .expect("failed to build provider description")],
2140 uptime_human: "60s".into(),
2141 uptime_seconds: 60,
2142 version: semver::Version::parse("0.61.0").unwrap(),
2143 host_id: host_id.to_string(),
2144 },
2145 )
2146 .await
2147 .expect("Should be able to handle host heartbeat");
2148
2149 let components = store.list::<Component>(lattice_id).await.unwrap();
2150 assert_eq!(components.len(), 2, "Should have 2 components in the store");
2151 let component = components.get("jabba").expect("Component should exist");
2152 assert_eq!(
2153 component.count(),
2154 5,
2155 "Component should have the correct number of instances"
2156 );
2157 assert_eq!(component.name, "Da Hutt", "Should have the correct name");
2158 assert_eq!(
2159 component.reference, "jabba.tatooinecr.io/jabba:latest",
2160 "Should have the correct reference"
2161 );
2162 assert_eq!(
2163 component
2164 .instances
2165 .get(host_id)
2166 .expect("instances to be on our specified host")
2167 .iter()
2168 .find(|i| !i.annotations.is_empty())
2169 .expect("instance with annotations to exist")
2170 .annotations
2171 .get("da")
2172 .expect("annotation to exist"),
2173 "gobah"
2174 );
2175
2176 let providers = store
2177 .list::<Provider>(lattice_id)
2178 .await
2179 .expect("should be able to grab providers from store");
2180 assert_eq!(providers.len(), 1, "Should have 1 provider in the store");
2181 let provider = providers
2182 .get("jabbatheprovider")
2183 .expect("Provider should exist");
2184 assert_eq!(
2185 provider.name, "Jabba The Provider",
2186 "Should have the correct name"
2187 );
2188 assert_eq!(
2189 provider.reference, "jabba.tatooinecr.io/provider:latest",
2190 "Should have the correct reference"
2191 );
2192 let hosts = store
2193 .list::<Host>(lattice_id)
2194 .await
2195 .expect("should be able to get hosts from store");
2196 assert_eq!(hosts.len(), 1);
2197 let host = hosts.get(host_id).expect("host with generated ID to exist");
2198 let host_provider = host
2199 .providers
2200 .get(&ProviderInfo {
2201 provider_id: "jabbatheprovider".to_string(),
2202 provider_ref: "jabba.tatooinecr.io/provider:latest".to_string(),
2203 annotations: BTreeMap::default(),
2204 })
2205 .expect("provider to exist on host");
2206
2207 assert_eq!(
2208 host_provider
2209 .annotations
2210 .get("one")
2211 .expect("annotation to exist"),
2212 "two"
2213 );
2214 }
2215
2216 fn assert_component(
2217 components: &HashMap<String, Component>,
2218 component_id: &str,
2219 expected_counts: &[(&str, usize)],
2220 ) {
2221 let component = components
2222 .get(component_id)
2223 .expect("Component should exist in store");
2224 assert_eq!(
2225 component.id, component_id,
2226 "Component ID stored should be correct"
2227 );
2228 assert_eq!(
2229 expected_counts.len(),
2230 component.instances.len(),
2231 "Should have the proper number of hosts the component is running on"
2232 );
2233 for (expected_host, expected_count) in expected_counts.iter() {
2234 assert_eq!(
2235 component.count_for_host(expected_host),
2236 *expected_count,
2237 "Component count on host should be correct"
2238 );
2239 }
2240 }
2241
2242 fn assert_provider(
2243 providers: &HashMap<String, Provider>,
2244 event: &ProviderStarted,
2245 running_on_hosts: &[&str],
2246 ) {
2247 let provider = providers
2248 .get(&event.provider_id)
2249 .expect("Correct provider should exist in store");
2250 assert!(
2251 event
2252 .claims
2253 .clone()
2254 .is_some_and(|claims| claims.name == provider.name),
2255 "Provider should have the correct data in state"
2256 );
2257 assert!(
2258 provider.hosts.len() == running_on_hosts.len()
2259 && running_on_hosts
2260 .iter()
2261 .all(|host_id| provider.hosts.contains_key(*host_id)),
2262 "Provider should be set to the correct hosts"
2263 );
2264 }
2265}