wadm/workers/
event.rs

1use std::collections::BTreeMap;
2use std::collections::{hash_map::Entry, HashMap, HashSet};
3
4use anyhow::Result;
5use tracing::{debug, instrument, trace, warn};
6use wadm_types::api::{ScalerStatus, Status, StatusInfo};
7use wasmcloud_control_interface::{ComponentDescription, ProviderDescription};
8
9use crate::commands::Command;
10use crate::consumers::{
11    manager::{WorkError, WorkResult, Worker},
12    ScopedMessage,
13};
14use crate::events::*;
15use crate::publisher::Publisher;
16use crate::scaler::manager::{ScalerList, ScalerManager};
17use crate::storage::{Component, Host, Provider, ProviderStatus, Store, WadmComponentInfo};
18use crate::APP_SPEC_ANNOTATION;
19
20use super::event_helpers::*;
21
22pub struct EventWorker<StateStore, C: Clone, P: Clone> {
23    store: StateStore,
24    ctl_client: C,
25    command_publisher: CommandPublisher<P>,
26    status_publisher: StatusPublisher<P>,
27    scalers: ScalerManager<StateStore, P, C>,
28}
29
30impl<StateStore, C, P> EventWorker<StateStore, C, P>
31where
32    StateStore: Store + Send + Sync + Clone + 'static,
33    C: ClaimsSource
34        + InventorySource
35        + LinkSource
36        + ConfigSource
37        + SecretSource
38        + Clone
39        + Send
40        + Sync
41        + 'static,
42    P: Publisher + Clone + Send + Sync + 'static,
43{
44    /// Creates a new event worker configured to use the given store and control interface client for fetching state
45    pub fn new(
46        store: StateStore,
47        ctl_client: C,
48        command_publisher: CommandPublisher<P>,
49        status_publisher: StatusPublisher<P>,
50        manager: ScalerManager<StateStore, P, C>,
51    ) -> EventWorker<StateStore, C, P> {
52        EventWorker {
53            store,
54            ctl_client,
55            command_publisher,
56            status_publisher,
57            scalers: manager,
58        }
59    }
60
61    // BEGIN HANDLERS
62    // NOTE(thomastaylor312): These use anyhow errors because in the _single_ case where we have to
63    // call the lattice controller, we no longer just have error types from the store. To handle the
64    // multiple error cases, it was just easier to catch it into an anyhow Error and then convert at
65    // the end
66
67    #[instrument(level = "debug", skip(self, component), fields(component_id = %component.component_id, host_id = %component.host_id))]
68    async fn handle_component_scaled(
69        &self,
70        lattice_id: &str,
71        component: &ComponentScaled,
72    ) -> anyhow::Result<()> {
73        trace!("Scaling component in store");
74        debug!("Fetching current data for component");
75
76        // Update component count in the component state, adding to the state if it didn't exist or removing
77        // if the scale is down to zero.
78        let mut component_data = Component::from(component);
79        if let Some(mut current) = self
80            .store
81            .get::<Component>(lattice_id, &component.component_id)
82            .await?
83        {
84            trace!(component = ?current, "Found existing component data");
85
86            match current.instances.get_mut(&component.host_id) {
87                // If the component is running and is now scaled down to zero, remove it
88                Some(current_instances) if component.max_instances == 0 => {
89                    current_instances.remove(&component.annotations);
90                }
91                // If a component is already running on a host, update the running count to the scaled max_instances value
92                Some(current_instances) => {
93                    current_instances.replace(WadmComponentInfo {
94                        count: component.max_instances,
95                        annotations: component.annotations.clone(),
96                    });
97                }
98                // Component is not running and now scaled to zero, no action required. This can happen if we
99                // update the state before we receive the ComponentScaled event
100                None if component.max_instances == 0 => (),
101                // If a component isn't running yet, add it with the scaled max_instances value
102                None => {
103                    current.instances.insert(
104                        component.host_id.clone(),
105                        HashSet::from([WadmComponentInfo {
106                            count: component.max_instances,
107                            annotations: component.annotations.clone(),
108                        }]),
109                    );
110                }
111            }
112
113            // If we stopped the last instance on a host, remove the host from the component data
114            if current
115                .instances
116                .get(&component.host_id)
117                .is_some_and(|instances| instances.is_empty())
118            {
119                current.instances.remove(&component.host_id);
120            }
121
122            // Take the updated counts and store them in the component data
123            component_data.instances = current.instances;
124        };
125
126        // Update component count in the host state, removing the component if the scale is zero
127        if let Some(mut host) = self
128            .store
129            .get::<Host>(lattice_id, &component.host_id)
130            .await?
131        {
132            trace!(host = ?host, "Found existing host data");
133
134            if component.max_instances == 0 {
135                host.components.remove(&component.component_id);
136            } else {
137                host.components
138                    .entry(component.component_id.clone())
139                    .and_modify(|count| *count = component.max_instances)
140                    .or_insert(component.max_instances);
141            }
142
143            self.store
144                .store(lattice_id, host.id.to_owned(), host)
145                .await?
146        }
147
148        if component_data.instances.is_empty() {
149            self.store
150                .delete::<Component>(lattice_id, &component.component_id)
151                .await
152                .map_err(anyhow::Error::from)
153        } else {
154            self.store
155                .store(lattice_id, component.component_id.clone(), component_data)
156                .await
157                .map_err(anyhow::Error::from)
158        }
159    }
160
161    #[instrument(level = "debug", skip(self, host), fields(host_id = %host.host_id))]
162    async fn handle_host_heartbeat(
163        &self,
164        lattice_id: &str,
165        host: &HostHeartbeat,
166    ) -> anyhow::Result<()> {
167        debug!("Updating store with current host heartbeat information");
168        let host_data = Host::from(host);
169        self.store
170            .store(lattice_id, host.host_id.clone(), host_data)
171            .await?;
172
173        // NOTE: We can return an error here and then nack because we'll just reupdate the host data
174        // with the exact same host heartbeat entry. There is no possibility of a duplicate
175        self.heartbeat_provider_update(lattice_id, host, &host.providers)
176            .await?;
177
178        // NOTE: We can return an error here and then nack because we'll just reupdate the host data
179        // with the exact same host heartbeat entry. There is no possibility of a duplicate
180        self.heartbeat_component_update(lattice_id, host, &host.components)
181            .await?;
182
183        Ok(())
184    }
185
186    #[instrument(level = "debug", skip(self, host), fields(host_id = %host.id))]
187    async fn handle_host_started(
188        &self,
189        lattice_id: &str,
190        host: &HostStarted,
191    ) -> anyhow::Result<()> {
192        debug!("Updating store with new host");
193        // New hosts have nothing running on them yet, so just drop it in the store
194        self.store
195            .store(lattice_id, host.id.clone(), Host::from(host))
196            .await
197            .map_err(anyhow::Error::from)
198    }
199
200    #[instrument(level = "debug", skip(self, host), fields(host_id = %host.id))]
201    async fn handle_host_stopped(
202        &self,
203        lattice_id: &str,
204        host: &HostStopped,
205    ) -> anyhow::Result<()> {
206        debug!("Handling host stopped event");
207        // NOTE(thomastaylor312): Generally to get a host stopped event, the host should have
208        // already sent a bunch of stop component/provider events, but for correctness sake, we fetch
209        // the current host and make sure all the components and providers are removed
210        trace!("Fetching current host data");
211        let current: Host = match self.store.get(lattice_id, &host.id).await? {
212            Some(h) => h,
213            None => {
214                debug!("Got host stopped event for a host we didn't have in the store");
215                return Ok(());
216            }
217        };
218
219        trace!("Fetching components from store to remove stopped instances");
220        let all_components = self.store.list::<Component>(lattice_id).await?;
221
222        #[allow(clippy::type_complexity)]
223        let (components_to_update, components_to_delete): (
224            Vec<(String, Component)>,
225            Vec<(String, Component)>,
226        ) = all_components
227            .into_iter()
228            .filter_map(|(id, mut component)| {
229                if current.components.contains_key(&id) {
230                    component.instances.remove(&current.id);
231                    Some((id, component))
232                } else {
233                    None
234                }
235            })
236            .partition(|(_, component)| !component.instances.is_empty());
237        trace!("Storing updated components in store");
238        self.store
239            .store_many(lattice_id, components_to_update)
240            .await?;
241
242        trace!("Removing components with no more running instances");
243        self.store
244            .delete_many::<Component, _, _>(
245                lattice_id,
246                components_to_delete.into_iter().map(|(id, _)| id),
247            )
248            .await?;
249
250        trace!("Fetching providers from store to remove stopped instances");
251        let all_providers = self.store.list::<Provider>(lattice_id).await?;
252
253        #[allow(clippy::type_complexity)]
254        let (providers_to_update, providers_to_delete): (Vec<(String, Provider)>, Vec<(String, Provider)>) = current
255            .providers
256            .into_iter()
257            .filter_map(|info| {
258                let key = info.provider_id;
259                // NOTE: We can do this without cloning, but it led to some confusing code involving
260                // `remove` from the owned `all_providers` map. This is more readable at the expense of
261                // a clone for few providers
262                match all_providers.get(&key).cloned() {
263                    // If we successfully remove the host, map it to the right type, otherwise we can
264                    // continue onward
265                    Some(mut prov) => prov.hosts.remove(&host.id).map(|_| (key, prov)),
266                    None => {
267                        warn!(key = %key, "Didn't find provider in storage even though host said it existed");
268                        None
269                    }
270                }
271            })
272            .partition(|(_, provider)| !provider.hosts.is_empty());
273        trace!("Storing updated providers in store");
274        self.store
275            .store_many(lattice_id, providers_to_update)
276            .await?;
277
278        trace!("Removing providers with no more running instances");
279        self.store
280            .delete_many::<Provider, _, _>(
281                lattice_id,
282                providers_to_delete.into_iter().map(|(id, _)| id),
283            )
284            .await?;
285
286        // Order matters here: Now that we've cleaned stuff up, remove the host. We do this last
287        // because if any of the above fails after we remove the host, we won't be able to fetch the
288        // data to remove the components and providers on a retry.
289        debug!("Deleting host from store");
290        self.store
291            .delete::<Host>(lattice_id, &host.id)
292            .await
293            .map_err(anyhow::Error::from)
294    }
295
296    #[instrument(
297        level = "debug",
298        skip(self, provider),
299        fields(
300            provider_id = %provider.provider_id,
301            image_ref = %provider.image_ref,
302        )
303    )]
304    async fn handle_provider_started(
305        &self,
306        lattice_id: &str,
307        provider: &ProviderStarted,
308    ) -> anyhow::Result<()> {
309        debug!("Handling provider started event");
310        let id = &provider.provider_id;
311        trace!("Fetching current data from store");
312        let mut needs_host_update = false;
313        let provider_data = if let Some(mut current) =
314            self.store.get::<Provider>(lattice_id, id).await?
315        {
316            // Using the entry api is a bit more efficient because we do a single key lookup
317            let mut prov = match current.hosts.entry(provider.host_id.clone()) {
318                Entry::Occupied(_) => {
319                    trace!("Found host entry for the provider already in store. Will not update");
320                    current
321                }
322                Entry::Vacant(entry) => {
323                    entry.insert(ProviderStatus::default());
324                    needs_host_update = true;
325                    current
326                }
327            };
328            // Update missing fields if they exist. Right now if we just discover a provider from
329            // health check, these will be empty
330            if prov.issuer.is_empty() || prov.reference.is_empty() {
331                let new_prov = Provider::from(provider);
332                prov.issuer = new_prov.issuer;
333                prov.reference = new_prov.reference;
334            }
335            prov
336        } else {
337            trace!("No current provider found in store");
338            let mut prov = Provider::from(provider);
339            prov.hosts = HashMap::from([(provider.host_id.clone(), ProviderStatus::default())]);
340            needs_host_update = true;
341            prov
342        };
343
344        // Insert provider into host map
345        if let (Some(mut host), true) = (
346            self.store
347                .get::<Host>(lattice_id, &provider.host_id)
348                .await?,
349            needs_host_update,
350        ) {
351            trace!(host = ?host, "Found existing host data");
352
353            host.providers.replace(ProviderInfo {
354                provider_id: id.to_owned(),
355                provider_ref: provider.image_ref.to_owned(),
356                annotations: provider.annotations.to_owned(),
357            });
358
359            self.store
360                .store(lattice_id, host.id.to_owned(), host)
361                .await?
362        }
363
364        debug!("Storing updated provider in store");
365        self.store
366            .store(lattice_id, id.to_owned(), provider_data)
367            .await
368            .map_err(anyhow::Error::from)
369    }
370
371    #[instrument(
372        level = "debug",
373        skip(self, provider),
374        fields(
375            provider_id = %provider.provider_id,
376        )
377    )]
378    async fn handle_provider_stopped(
379        &self,
380        lattice_id: &str,
381        provider: &ProviderStopped,
382    ) -> anyhow::Result<()> {
383        debug!("Handling provider stopped event");
384        let id = &provider.provider_id;
385        trace!("Fetching current data from store");
386
387        // Remove provider from host map
388        if let Some(mut host) = self
389            .store
390            .get::<Host>(lattice_id, &provider.host_id)
391            .await?
392        {
393            trace!(host = ?host, "Found existing host data");
394
395            host.providers.remove(&ProviderInfo {
396                provider_id: provider.provider_id.to_owned(),
397                // We do not hash based on provider reference, so it can be blank here
398                provider_ref: "".to_string(),
399                // We don't have this information, nor do we need it since we don't hash based
400                // on annotations
401                annotations: BTreeMap::default(),
402            });
403
404            self.store
405                .store(lattice_id, host.id.to_owned(), host)
406                .await?
407        }
408
409        if let Some(mut current) = self.store.get::<Provider>(lattice_id, id).await? {
410            if current.hosts.remove(&provider.host_id).is_none() {
411                trace!(host_id = %provider.host_id, "Did not find host entry in provider");
412                return Ok(());
413            }
414            if current.hosts.is_empty() {
415                debug!("Provider is no longer running on any hosts. Removing from store");
416                self.store
417                    .delete::<Provider>(lattice_id, id)
418                    .await
419                    .map_err(anyhow::Error::from)
420            } else {
421                debug!("Storing updated provider");
422                self.store
423                    .store(lattice_id, id.to_owned(), current)
424                    .await
425                    .map_err(anyhow::Error::from)
426            }
427        } else {
428            trace!("No current provider found in store");
429            Ok(())
430        }
431    }
432
433    #[instrument(
434        level = "debug",
435        skip(self, provider),
436        fields(
437            provider_id = %provider.provider_id,
438        )
439    )]
440    async fn handle_provider_health_check(
441        &self,
442        lattice_id: &str,
443        provider: &ProviderHealthCheckInfo,
444        failed: Option<bool>,
445    ) -> anyhow::Result<()> {
446        debug!("Handling provider health check event");
447        trace!("Getting current provider");
448        let id = &provider.provider_id;
449        let host_id = &provider.host_id;
450        let mut current: Provider = match self.store.get(lattice_id, id).await? {
451            Some(p) => p,
452            None => {
453                trace!("Didn't find provider in store. Creating");
454                Provider {
455                    id: id.clone(),
456                    ..Default::default()
457                }
458            }
459        };
460
461        let status = match (current.hosts.get(host_id), failed) {
462            // If the provider status changed from when we last saw it, modify status
463            (_, Some(true)) => Some(ProviderStatus::Failed),
464            (_, Some(false)) => Some(ProviderStatus::Running),
465            // If the provider is pending or we missed the initial start and we get a health check
466            // status, assume it's running fine.
467            (Some(ProviderStatus::Pending) | None, None) => Some(ProviderStatus::Running),
468            _ => None,
469        };
470
471        if let Some(status) = status {
472            debug!("Updating store with current status");
473            current.hosts.insert(host_id.to_owned(), status);
474        }
475
476        // TODO(thomastaylor312): Once we are able to fetch refmaps from the ctl client, we should
477        // make it update any empty references with the data from the refmap
478
479        self.store
480            .store(lattice_id, id.to_owned(), current)
481            .await
482            .map_err(anyhow::Error::from)
483    }
484
485    // END HANDLER FUNCTIONS
486    async fn populate_component_info(
487        &self,
488        components: &HashMap<String, Component>,
489        host_id: &str,
490        instance_map: Vec<ComponentDescription>,
491    ) -> anyhow::Result<Vec<(String, Component)>> {
492        let claims = self.ctl_client.get_claims().await?;
493
494        Ok(instance_map
495            .into_iter()
496            .map(|component_description| {
497                let instance = HashSet::from_iter([WadmComponentInfo {
498                    count: component_description.max_instances() as usize,
499                    annotations: component_description
500                        .annotations()
501                        .cloned()
502                        .unwrap_or_default(),
503                }]);
504                if let Some(component) = components.get(component_description.id()) {
505                    // Construct modified Component with new instances included
506                    let mut new_instances = component.instances.clone();
507                    new_instances.insert(host_id.to_owned(), instance);
508                    let component = Component {
509                        instances: new_instances,
510                        reference: component_description.image_ref().into(),
511                        name: component_description
512                            .name()
513                            .unwrap_or(&component.name)
514                            .into(),
515                        ..component.clone()
516                    };
517
518                    (component_description.id().to_string(), component)
519                } else if let Some(claim) = claims.get(component_description.id()) {
520                    (
521                        component_description.id().to_string(),
522                        Component {
523                            id: component_description.id().into(),
524                            name: claim.name.to_owned(),
525                            issuer: claim.issuer.to_owned(),
526                            instances: HashMap::from_iter([(host_id.to_owned(), instance)]),
527                            reference: component_description.image_ref().into(),
528                        },
529                    )
530                } else {
531                    debug!("Claims not found for component on host, component is unsigned");
532
533                    (
534                        component_description.id().to_string(),
535                        Component {
536                            id: component_description.id().into(),
537                            name: "".to_owned(),
538                            issuer: "".to_owned(),
539                            instances: HashMap::from_iter([(host_id.to_owned(), instance)]),
540                            reference: component_description.image_ref().into(),
541                        },
542                    )
543                }
544            })
545            .collect::<Vec<(String, Component)>>())
546    }
547
548    #[instrument(level = "debug", skip(self, host), fields(host_id = %host.host_id))]
549    async fn heartbeat_component_update(
550        &self,
551        lattice_id: &str,
552        host: &HostHeartbeat,
553        inventory_components: &Vec<ComponentDescription>,
554    ) -> anyhow::Result<()> {
555        debug!("Fetching current component state");
556        let components = self.store.list::<Component>(lattice_id).await?;
557
558        // Compare stored Components to the "true" list on this host, updating stored
559        // Components when they differ from the authoratative heartbeat
560        let components_to_update = inventory_components
561            .iter()
562            .filter_map(|component_description| {
563                if components
564                    .get(component_description.id())
565                    // NOTE(brooksmtownsend): This code maps the component to a boolean indicating if it's up-to-date with the heartbeat or not.
566                    // If the component matches what the heartbeat says, we return None, otherwise we return Some(component_description).
567                    .map(|component| {
568                        // If the stored reference isn't what we receive on the heartbeat, update
569                        component_description.image_ref() == component.reference
570                            && component
571                                .instances
572                                .get(&host.host_id)
573                                .map(|store_instances| {
574                                    // Update if the number of instances is different
575                                    if store_instances.len() != component.instances.len() {
576                                        return false;
577                                    }
578                                    // Update if annotations or counts are different
579                                    let annotations: BTreeMap<String, String> =
580                                        component_description
581                                            .annotations()
582                                            .cloned()
583                                            .map(|a| a.into_iter().collect())
584                                            .unwrap_or_default();
585                                    store_instances.get(&annotations).map_or(
586                                        false,
587                                        |store_instance| {
588                                            component_description.max_instances() as usize
589                                                == store_instance.count
590                                        },
591                                    )
592                                })
593                                .unwrap_or(false)
594                    })
595                    .unwrap_or(false)
596                {
597                    None
598                } else {
599                    Some(component_description.to_owned())
600                }
601            })
602            // component ID to all instances on this host
603            .collect::<Vec<ComponentDescription>>();
604
605        let components_to_store = self
606            .populate_component_info(&components, &host.host_id, components_to_update)
607            .await?;
608
609        trace!("Updating components with new status from host");
610
611        self.store
612            .store_many(lattice_id, components_to_store)
613            .await?;
614
615        Ok(())
616    }
617
618    #[instrument(level = "debug", skip(self, heartbeat), fields(host_id = %heartbeat.host_id))]
619    async fn heartbeat_provider_update(
620        &self,
621        lattice_id: &str,
622        heartbeat: &HostHeartbeat,
623        inventory_providers: &Vec<ProviderDescription>,
624    ) -> anyhow::Result<()> {
625        debug!("Fetching current provider state");
626        let providers = self.store.list::<Provider>(lattice_id).await?;
627        let providers_to_update = inventory_providers.iter().filter_map(|info| {
628            // NOTE: We can do this without cloning, but it led to some confusing code involving
629            // `remove` from the owned `providers` map. This is more readable at the expense of
630            // a clone for few providers
631            match providers.get(info.id()).cloned() {
632                Some(mut prov) => {
633                    let mut has_changes = false;
634                    if prov.name.is_empty() {
635                        prov.name = info.name().map(String::from).unwrap_or_default();
636                        has_changes = true;
637                    }
638                    if prov.reference.is_empty() {
639                        prov.reference = info.image_ref().map(String::from).unwrap_or_default();
640                        has_changes = true;
641                    }
642                    if let Entry::Vacant(entry) = prov.hosts.entry(heartbeat.host_id.clone()) {
643                        entry.insert(ProviderStatus::default());
644                        has_changes = true;
645                    }
646                    if has_changes {
647                        Some((info.id().to_string(), prov))
648                    } else {
649                        None
650                    }
651                }
652                None => {
653                    // If we don't already have the provider, create a basic one so we know it
654                    // exists at least. The next provider heartbeat will fix it for us
655                    Some((
656                        info.id().to_string(),
657                        Provider {
658                            id: info.id().to_string(),
659                            hosts: [(heartbeat.host_id.clone(), ProviderStatus::default())].into(),
660                            name: info.name().map(String::from).unwrap_or_default(),
661                            reference: info.image_ref().map(String::from).unwrap_or_default(),
662                            ..Default::default()
663                        },
664                    ))
665                }
666            }
667        });
668
669        trace!("Updating providers with new status from host");
670        self.store
671            .store_many(lattice_id, providers_to_update)
672            .await?;
673
674        Ok(())
675    }
676
677    #[instrument(level = "debug", skip(self, data), fields(name = %data.manifest.metadata.name))]
678    async fn handle_manifest_published(
679        &self,
680        lattice_id: &str,
681        data: &ManifestPublished,
682    ) -> anyhow::Result<()> {
683        debug!(name = %data.manifest.metadata.name, "Handling published manifest");
684
685        let old_scalers = self
686            .scalers
687            .remove_raw_scalers(&data.manifest.metadata.name)
688            .await;
689        let scalers = self.scalers.scalers_for_manifest(&data.manifest);
690
691        // Refresh the snapshot data before cleaning up and/or adding scalers
692        self.scalers.refresh_data().await?;
693        let cleanup_commands = if let Some(old_scalers) = old_scalers {
694            // This relies on the idea that an ID is a unique identifier for a scaler, and any
695            // change in the ID is indicative of the fact that the scaler is outdated and should be cleaned up.
696            let (_updated_component, outdated_component): (ScalerList, ScalerList) = old_scalers
697                .into_iter()
698                .partition(|old| scalers.iter().any(|new| new.id() == old.id()));
699
700            // Clean up any resources from scalers that no longer exist
701            let futs = outdated_component
702                .iter()
703                .map(|s| async { s.cleanup().await });
704            futures::future::join_all(futs)
705                .await
706                .into_iter()
707                .filter_map(|res: Result<Vec<Command>>| match res {
708                    Ok(commands) => Some(commands),
709                    Err(e) => {
710                        warn!(error = ?e, "Failed to clean up old scaler");
711                        None
712                    }
713                })
714                .flatten()
715                .collect::<Vec<Command>>()
716        } else {
717            vec![]
718        };
719
720        // Get the results of the first reconcilation pass before we store the scalers. Publish the
721        // commands for the ones that succeeded (as those scalers will have entered backoff mode if
722        // they are backoff wrapped), and then return an error indicating failure so the event is
723        // redelivered. When it is, the ones that succeeded will be in backoff mode and the ones
724        // that failed will be retried.
725
726        let scalers = self.scalers.add_scalers(&data.manifest, scalers).await?;
727
728        let (commands, res) = get_commands_and_result(
729            scalers.iter().map(|s| s.reconcile()),
730            "Errors occurred during initial reconciliation",
731        )
732        .await;
733
734        let status = detailed_scaler_status(&scalers).await;
735
736        trace!(?status, "Setting status");
737        if let Err(e) = self
738            .status_publisher
739            .publish_status(data.manifest.metadata.name.as_ref(), status)
740            .await
741        {
742            warn!("Failed to set manifest status: {e:}");
743        };
744
745        trace!(?commands, "Publishing commands");
746        // Handle the result from initial reconciliation. This lets us handle the net new stuff
747        // immediately
748        self.command_publisher.publish_commands(commands).await?;
749
750        // Now publish the cleanup commands from the old scalers. This will cause the new scalers to
751        // react to the components/providers/linkdefs disappearing and create new ones with the new
752        // versions
753        if let Err(e) = self
754            .command_publisher
755            .publish_commands(cleanup_commands)
756            .await
757        {
758            warn!(error = ?e, "Failed to publish cleanup commands from old application, some resources may be left behind");
759        }
760
761        res
762    }
763
764    #[instrument(level = "debug", skip(self))]
765    async fn run_scalers_with_hint(&self, event: &Event, name: &str) -> anyhow::Result<()> {
766        let scalers = match self.scalers.get_scalers(name).await {
767            Some(scalers) => scalers,
768            None => {
769                debug!("No scalers currently exist for model");
770                return Ok(());
771            }
772        };
773        // Refresh the snapshot data before running
774        self.scalers.refresh_data().await?;
775        let (commands, res) = get_commands_and_result(
776            scalers.iter().map(|s| s.handle_event(event)),
777            "Errors occurred while handling event",
778        )
779        .await;
780
781        let status = detailed_scaler_status(&scalers).await;
782        trace!(?status, "Setting status");
783        if let Err(e) = self.status_publisher.publish_status(name, status).await {
784            warn!(error = ?e, "Failed to set status for scaler");
785        };
786
787        trace!(?commands, "Publishing commands");
788        self.command_publisher.publish_commands(commands).await?;
789
790        res
791    }
792
793    #[instrument(level = "debug", skip(self))]
794    async fn run_all_scalers(&self, event: &Event) -> anyhow::Result<()> {
795        let scalers = self.scalers.get_all_scalers().await;
796        // Refresh the snapshot data before running
797        self.scalers.refresh_data().await?;
798        let futs = scalers.iter().map(|(name, scalers)| async move {
799            let (commands, res) = get_commands_and_result(
800                scalers.iter().map(|scaler| scaler.handle_event(event)),
801                "Errors occurred while handling event with all scalers",
802            )
803            .await;
804
805            let status = detailed_scaler_status(scalers).await;
806
807            trace!(?status, "Setting status");
808            if let Err(e) = self.status_publisher.publish_status(name, status).await {
809                warn!(error = ?e, "Failed to set status for scaler");
810            };
811
812            (commands, res)
813        });
814
815        // Resolve futures, computing commands for scalers, publishing statuses, and combining any errors
816        let (commands, res) = futures::future::join_all(futs).await.into_iter().fold(
817            (vec![], Ok(())),
818            |(mut cmds, res), (mut new_cmds, new_res)| {
819                cmds.append(&mut new_cmds);
820                let res = match (res, new_res) {
821                    (Ok(_), Ok(_)) => Ok(()),
822                    (Ok(_), Err(e)) | (Err(e), Ok(_)) => Err(e),
823                    (Err(e), Err(e2)) => Err(e.context(e2)),
824                };
825                (cmds, res)
826            },
827        );
828
829        trace!(?commands, "Publishing commands");
830        self.command_publisher.publish_commands(commands).await?;
831
832        res
833    }
834}
835
836#[async_trait::async_trait]
837impl<StateStore, C, P> Worker for EventWorker<StateStore, C, P>
838where
839    StateStore: Store + Send + Sync + Clone + 'static,
840    C: ClaimsSource
841        + InventorySource
842        + LinkSource
843        + ConfigSource
844        + SecretSource
845        + Clone
846        + Send
847        + Sync
848        + 'static,
849    P: Publisher + Clone + Send + Sync + 'static,
850{
851    type Message = Event;
852
853    #[instrument(level = "debug", skip(self))]
854    async fn do_work(&self, mut message: ScopedMessage<Self::Message>) -> WorkResult<()> {
855        // Everything in this block returns a name hint for the success case and an error otherwise
856        let res = match message.as_ref() {
857            Event::ComponentScaled(component) => self
858                .handle_component_scaled(&message.lattice_id, component)
859                .await
860                .map(|_| {
861                    component
862                        .annotations
863                        .get(APP_SPEC_ANNOTATION)
864                        .map(|s| s.as_str())
865                }),
866            Event::HostHeartbeat(host) => self
867                .handle_host_heartbeat(&message.lattice_id, host)
868                .await
869                .map(|_| None),
870            Event::HostStarted(host) => self
871                .handle_host_started(&message.lattice_id, host)
872                .await
873                .map(|_| None),
874            Event::HostStopped(host) => self
875                .handle_host_stopped(&message.lattice_id, host)
876                .await
877                .map(|_| None),
878            Event::ProviderStarted(provider) => self
879                .handle_provider_started(&message.lattice_id, provider)
880                .await
881                .map(|_| {
882                    provider
883                        .annotations
884                        .get(APP_SPEC_ANNOTATION)
885                        .map(|s| s.as_str())
886                }),
887            // NOTE(thomastaylor312): Provider stopped events need to be handled by all scalers as
888            // it they could need to adjust their provider count based on the number of providers
889            // available throughout the whole lattice (e.g. if a provider managed by another
890            // manifest is removed, but this manifest still needs one). Ideally we should have a way
891            // to have a "global" list of required providers in a lattice so we never shut one down
892            // just to spin it back up, but for now we'll just deal with this as is
893            Event::ProviderStopped(provider) => self
894                .handle_provider_stopped(&message.lattice_id, provider)
895                .await
896                .map(|_| None),
897            Event::ProviderHealthCheckStatus(ProviderHealthCheckStatus { data }) => self
898                .handle_provider_health_check(&message.lattice_id, data, None)
899                .await
900                .map(|_| None),
901            Event::ProviderHealthCheckPassed(ProviderHealthCheckPassed { data }) => self
902                .handle_provider_health_check(&message.lattice_id, data, Some(false))
903                .await
904                .map(|_| None),
905            Event::ProviderHealthCheckFailed(ProviderHealthCheckFailed { data }) => self
906                .handle_provider_health_check(&message.lattice_id, data, Some(true))
907                .await
908                .map(|_| None),
909            Event::ManifestPublished(data) => self
910                .handle_manifest_published(&message.lattice_id, data)
911                .await
912                .map(|_| None),
913            Event::ManifestUnpublished(data) => {
914                debug!("Handling unpublished manifest");
915
916                match self.scalers.remove_scalers(&data.name).await {
917                    Some(Ok(_)) => {
918                        return message.ack().await.map_err(WorkError::from);
919                    }
920                    Some(Err(e)) => {
921                        message.nack().await;
922                        return Err(WorkError::Other(e.into()));
923                    }
924                    None => Ok(None),
925                }
926            }
927            // All other events we don't care about for state. Explicitly mention them in order
928            // to make sure we don't forget to handle them when new events are added.
929            Event::LinkdefSet(_)
930            | Event::LinkdefDeleted(_)
931            | Event::ConfigSet(_)
932            | Event::ConfigDeleted(_)
933            | Event::ProviderStartFailed(_)
934            | Event::ComponentScaleFailed(_) => {
935                trace!("Got event we don't care about. Not modifying state.");
936                Ok(None)
937            }
938        };
939
940        let res = match res {
941            Ok(Some(name)) => self.run_scalers_with_hint(&message, name).await,
942            Ok(None) => self.run_all_scalers(&message).await,
943            Err(e) => Err(e),
944        }
945        .map_err(Box::<dyn std::error::Error + Send + 'static>::from);
946
947        if let Err(e) = res {
948            message.nack().await;
949            return Err(WorkError::Other(e));
950        }
951
952        message.ack().await.map_err(WorkError::from)
953    }
954}
955
956/// Helper that runs any iterable of futures and returns a list of commands and the proper result to
957/// use as a response (Ok if there were no errors, all of the errors combined otherwise)
958pub(crate) async fn get_commands_and_result<Fut, I>(
959    futs: I,
960    error_message: &str,
961) -> (Vec<Command>, Result<()>)
962where
963    Fut: futures::Future<Output = Result<Vec<Command>>>,
964    I: IntoIterator<Item = Fut>,
965{
966    let (commands, errors): (Vec<_>, Vec<_>) = futures::future::join_all(futs)
967        .await
968        .into_iter()
969        .partition(Result::is_ok);
970    // SAFETY: We partitioned by Ok and Err, so the unwraps are fine
971    (
972        commands.into_iter().flat_map(Result::unwrap).collect(),
973        map_to_result(
974            errors.into_iter().map(Result::unwrap_err).collect(),
975            error_message,
976        ),
977    )
978}
979
980/// Helper function to find the [`Status`] of all scalers in a particular manifest.
981pub async fn detailed_scaler_status(scalers: &ScalerList) -> Status {
982    let futs = scalers.iter().map(|s| async {
983        (
984            s.id().to_string(),
985            s.kind().to_string(),
986            s.name(),
987            s.status().await,
988        )
989    });
990    let status = futures::future::join_all(futs).await;
991    Status::new(
992        StatusInfo {
993            status_type: status
994                .iter()
995                .map(|(_id, _name, _kind, s)| s.status_type)
996                .sum(),
997            message: status
998                .iter()
999                .filter_map(|(_id, _name, _kind, s)| {
1000                    let message = s.message.trim();
1001                    if message.is_empty() {
1002                        None
1003                    } else {
1004                        Some(message.to_owned())
1005                    }
1006                })
1007                .collect::<Vec<_>>()
1008                .join(", "),
1009        },
1010        status
1011            .into_iter()
1012            .map(|(id, kind, name, info)| ScalerStatus {
1013                id,
1014                name,
1015                kind,
1016                info,
1017            })
1018            .collect(),
1019    )
1020}
1021
1022fn map_to_result(errors: Vec<anyhow::Error>, error_message: &str) -> Result<()> {
1023    if errors.is_empty() {
1024        Ok(())
1025    } else {
1026        let mut error = anyhow::anyhow!("{error_message}");
1027        for e in errors {
1028            error = error.context(e);
1029        }
1030        Err(error)
1031    }
1032}
1033
1034#[cfg(test)]
1035mod test {
1036    use std::sync::Arc;
1037
1038    use tokio::sync::RwLock;
1039    use wasmcloud_control_interface::{ComponentDescription, HostInventory, ProviderDescription};
1040
1041    use super::*;
1042
1043    use crate::{
1044        storage::ReadStore,
1045        test_util::{NoopPublisher, TestLatticeSource, TestStore},
1046    };
1047
1048    // NOTE: This test is rather long because we want to run through what an actual state generation
1049    // loop would look like. This mostly covers happy path, while the other tests cover more of the
1050    // edge cases
1051    #[tokio::test]
1052    async fn test_all_state() {
1053        let store = Arc::new(TestStore::default());
1054        let inventory = Arc::new(RwLock::new(HashMap::default()));
1055        let lattice_source = TestLatticeSource {
1056            inventory: inventory.clone(),
1057            ..Default::default()
1058        };
1059
1060        let lattice_id = "all_state";
1061
1062        let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
1063        let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
1064        let worker = EventWorker::new(
1065            store.clone(),
1066            lattice_source.clone(),
1067            command_publisher.clone(),
1068            status_publisher.clone(),
1069            ScalerManager::test_new(
1070                NoopPublisher,
1071                lattice_id,
1072                store.clone(),
1073                command_publisher,
1074                status_publisher.clone(),
1075                lattice_source,
1076            )
1077            .await,
1078        );
1079
1080        let host1_id = "DS1";
1081        let host2_id = "starkiller";
1082
1083        /***********************************************************/
1084        /******************** Host Start Tests *********************/
1085        /***********************************************************/
1086        let labels = HashMap::from([("superweapon".to_string(), "true".to_string())]);
1087        worker
1088            .handle_host_started(
1089                lattice_id,
1090                &HostStarted {
1091                    friendly_name: "death-star-42".to_string(),
1092                    id: host1_id.into(),
1093                    labels: labels.clone(),
1094                },
1095            )
1096            .await
1097            .expect("Should be able to handle event");
1098
1099        let current_state = store.list::<Host>(lattice_id).await.unwrap();
1100        assert_eq!(current_state.len(), 1, "Only one host should be in store");
1101        let host = current_state
1102            .get("DS1")
1103            .expect("Host should exist in state");
1104        assert_eq!(
1105            host.friendly_name, "death-star-42",
1106            "Host should have the proper name in state"
1107        );
1108        assert_eq!(host.labels, labels, "Host should have the correct labels");
1109
1110        let labels2 = HashMap::from([
1111            ("superweapon".to_string(), "true".to_string()),
1112            ("lazy_writing".to_string(), "true".to_string()),
1113        ]);
1114        worker
1115            .handle_host_started(
1116                lattice_id,
1117                &HostStarted {
1118                    friendly_name: "starkiller-base-2015".to_string(),
1119                    id: host2_id.into(),
1120                    labels: labels2.clone(),
1121                },
1122            )
1123            .await
1124            .expect("Should be able to handle event");
1125
1126        let current_state = store.list::<Host>(lattice_id).await.unwrap();
1127        assert_eq!(current_state.len(), 2, "Both hosts should be in the store");
1128        let host = current_state
1129            .get("starkiller")
1130            .expect("Host should exist in state");
1131        assert_eq!(
1132            host.friendly_name, "starkiller-base-2015",
1133            "Host should have the proper name in state"
1134        );
1135        assert_eq!(host.labels, labels2, "Host should have the correct labels");
1136
1137        // Now just double check that the other host didn't change in response to the new one
1138        let host = current_state
1139            .get("DS1")
1140            .expect("Host should exist in state");
1141        assert_eq!(
1142            host.friendly_name, "death-star-42",
1143            "Host should have the proper name in state"
1144        );
1145        assert_eq!(host.labels, labels, "Host should have the correct labels");
1146
1147        /***********************************************************/
1148        /****************** Component Scale Tests ******************/
1149        /***********************************************************/
1150
1151        let component1_scaled = ComponentScaled {
1152            claims: Some(ComponentClaims {
1153                call_alias: Some("Grand Moff".into()),
1154                issuer: "Sheev Palpatine".into(),
1155                name: "Grand Moff Tarkin".into(),
1156                version: Some("0.1.0".into()),
1157                ..Default::default()
1158            }),
1159            image_ref: "coruscant.galactic.empire/tarkin:0.1.0".into(),
1160            component_id: "TARKIN".into(),
1161            host_id: host1_id.into(),
1162            annotations: BTreeMap::default(),
1163            max_instances: 500,
1164        };
1165        worker
1166            .handle_component_scaled(lattice_id, &component1_scaled)
1167            .await
1168            .expect("Should be able to handle component event");
1169        let components = store.list::<Component>(lattice_id).await.unwrap();
1170        let component = components
1171            .get("TARKIN")
1172            .expect("Component should exist in state");
1173        let hosts = store.list::<Host>(lattice_id).await.unwrap();
1174        let host = hosts.get(host1_id).expect("Host should exist in state");
1175        assert_eq!(
1176            host.components.get(&component1_scaled.component_id),
1177            Some(&500),
1178            "Component count in host should be updated"
1179        );
1180        assert_eq!(
1181            component.count_for_host(host1_id),
1182            500,
1183            "Component count should be modified with an increase in scale"
1184        );
1185
1186        let component1_scaled = ComponentScaled {
1187            claims: Some(ComponentClaims {
1188                call_alias: Some("Grand Moff".into()),
1189                issuer: "Sheev Palpatine".into(),
1190                name: "Grand Moff Tarkin".into(),
1191                version: Some("0.1.0".into()),
1192                ..Default::default()
1193            }),
1194            image_ref: "coruscant.galactic.empire/tarkin:0.1.0".into(),
1195            component_id: "TARKIN".into(),
1196            host_id: host1_id.into(),
1197            annotations: BTreeMap::default(),
1198            max_instances: 200,
1199        };
1200        worker
1201            .handle_component_scaled(lattice_id, &component1_scaled)
1202            .await
1203            .expect("Should be able to handle component event");
1204        let components = store.list::<Component>(lattice_id).await.unwrap();
1205        let component = components
1206            .get("TARKIN")
1207            .expect("Component should exist in state");
1208        let hosts = store.list::<Host>(lattice_id).await.unwrap();
1209        let host = hosts.get(host1_id).expect("Host should exist in state");
1210        assert_eq!(
1211            host.components.get(&component1_scaled.component_id),
1212            Some(&200),
1213            "Component count in host should be updated"
1214        );
1215        assert_eq!(
1216            component.count_for_host(host1_id),
1217            200,
1218            "Component count should be modified with a decrease in scale"
1219        );
1220
1221        let component1_scaled = ComponentScaled {
1222            claims: Some(ComponentClaims {
1223                call_alias: Some("Grand Moff".into()),
1224                issuer: "Sheev Palpatine".into(),
1225                name: "Grand Moff Tarkin".into(),
1226                version: Some("0.1.0".into()),
1227                ..Default::default()
1228            }),
1229            image_ref: "coruscant.galactic.empire/tarkin:0.1.0".into(),
1230            component_id: "TARKIN".into(),
1231            host_id: host1_id.into(),
1232            annotations: BTreeMap::default(),
1233            max_instances: 0,
1234        };
1235        worker
1236            .handle_component_scaled(lattice_id, &component1_scaled)
1237            .await
1238            .expect("Should be able to handle component event");
1239        let components = store.list::<Component>(lattice_id).await.unwrap();
1240        let hosts = store.list::<Host>(lattice_id).await.unwrap();
1241        let host = hosts.get(host1_id).expect("Host should exist in state");
1242        assert_eq!(
1243            host.components.get(&component1_scaled.component_id),
1244            None,
1245            "Component in host should be removed"
1246        );
1247        assert!(
1248            !components.contains_key("TARKIN"),
1249            "Component should be removed from state"
1250        );
1251
1252        let component1_scaled = ComponentScaled {
1253            claims: Some(ComponentClaims {
1254                call_alias: Some("Grand Moff".into()),
1255                issuer: "Sheev Palpatine".into(),
1256                name: "Grand Moff Tarkin".into(),
1257                version: Some("0.1.0".into()),
1258                ..Default::default()
1259            }),
1260            image_ref: "coruscant.galactic.empire/tarkin:0.1.0".into(),
1261            component_id: "TARKIN".into(),
1262            host_id: host1_id.into(),
1263            annotations: BTreeMap::default(),
1264            max_instances: 1,
1265        };
1266        worker
1267            .handle_component_scaled(lattice_id, &component1_scaled)
1268            .await
1269            .expect("Should be able to handle component event");
1270        let components = store.list::<Component>(lattice_id).await.unwrap();
1271        let component = components
1272            .get("TARKIN")
1273            .expect("Component should exist in state");
1274        let hosts = store.list::<Host>(lattice_id).await.unwrap();
1275        let host = hosts.get(host1_id).expect("Host should exist in state");
1276        assert_eq!(
1277            host.components.get(&component1_scaled.component_id),
1278            Some(&1),
1279            "Component in host should be readded from scratch"
1280        );
1281        assert_eq!(
1282            component.count_for_host(host1_id),
1283            1,
1284            "Component count should be modified with an initial start"
1285        );
1286        worker
1287            .handle_component_scaled(
1288                lattice_id,
1289                &ComponentScaled {
1290                    host_id: host2_id.into(),
1291                    ..component1_scaled.clone()
1292                },
1293            )
1294            .await
1295            .expect("Should be able to handle component event");
1296
1297        let component2_scaled = ComponentScaled {
1298            claims: Some(ComponentClaims {
1299                call_alias: Some("Darth".into()),
1300                issuer: "Sheev Palpatine".into(),
1301                name: "Darth Vader".into(),
1302                version: Some("0.1.0".into()),
1303                ..Default::default()
1304            }),
1305            image_ref: "coruscant.galactic.empire/vader:0.1.0".into(),
1306            host_id: host1_id.into(),
1307            component_id: "DARTHVADER".into(),
1308            annotations: BTreeMap::default(),
1309            max_instances: 2,
1310        };
1311
1312        worker
1313            .handle_component_scaled(lattice_id, &component2_scaled)
1314            .await
1315            .expect("Should be able to handle component scaled event");
1316        worker
1317            .handle_component_scaled(
1318                lattice_id,
1319                &ComponentScaled {
1320                    host_id: host2_id.into(),
1321                    ..component2_scaled.clone()
1322                },
1323            )
1324            .await
1325            .expect("Should be able to handle component event");
1326
1327        /***********************************************************/
1328        /****************** Provider Start Tests *******************/
1329        /***********************************************************/
1330
1331        let provider1 = ProviderStarted {
1332            claims: Some(ProviderClaims {
1333                issuer: "Sheev Palpatine".into(),
1334                name: "Force Choke".into(),
1335                version: "0.1.0".into(),
1336                ..Default::default()
1337            }),
1338            image_ref: "coruscant.galactic.empire/force_choke:0.1.0".into(),
1339            provider_id: "CHOKE".into(),
1340            host_id: host1_id.into(),
1341            annotations: BTreeMap::default(),
1342        };
1343
1344        let provider2 = ProviderStarted {
1345            claims: Some(ProviderClaims {
1346                issuer: "Sheev Palpatine".into(),
1347                name: "Death Star Laser".into(),
1348                version: "0.1.0".into(),
1349                ..Default::default()
1350            }),
1351            image_ref: "coruscant.galactic.empire/laser:0.1.0".into(),
1352            provider_id: "BYEBYEALDERAAN".into(),
1353            host_id: host2_id.into(),
1354            annotations: BTreeMap::default(),
1355        };
1356
1357        worker
1358            .handle_provider_started(lattice_id, &provider1)
1359            .await
1360            .expect("Should be able to handle provider event");
1361        let providers = store.list::<Provider>(lattice_id).await.unwrap();
1362        assert_eq!(providers.len(), 1, "Should only be 1 provider in state");
1363        assert_provider(&providers, &provider1, &[host1_id]);
1364
1365        // Now start the second provider on both hosts (so we can test some things in the next test)
1366        worker
1367            .handle_provider_started(lattice_id, &provider2)
1368            .await
1369            .expect("Should be able to handle provider event");
1370        worker
1371            .handle_provider_started(
1372                lattice_id,
1373                &ProviderStarted {
1374                    host_id: host1_id.into(),
1375                    provider_id: provider2.provider_id.clone(),
1376                    ..provider2.clone()
1377                },
1378            )
1379            .await
1380            .expect("Should be able to handle provider event");
1381        let providers = store.list::<Provider>(lattice_id).await.unwrap();
1382        assert_eq!(providers.len(), 2, "Should only be 2 providers in state");
1383        assert_provider(&providers, &provider2, &[host1_id, host2_id]);
1384
1385        // Check that hosts got updated properly
1386        let hosts = store.list::<Host>(lattice_id).await.unwrap();
1387        assert_eq!(hosts.len(), 2, "Should only have 2 hosts");
1388        let host = hosts.get(host1_id).expect("Host should still exist");
1389        assert_eq!(
1390            host.components.len(),
1391            2,
1392            "Should have two different components running"
1393        );
1394        assert_eq!(
1395            host.providers.len(),
1396            2,
1397            "Should have two different providers running"
1398        );
1399        let host = hosts.get(host2_id).expect("Host should still exist");
1400        assert_eq!(
1401            host.components.len(),
1402            2,
1403            "Should have two different components running"
1404        );
1405        assert_eq!(
1406            host.providers.len(),
1407            1,
1408            "Should have a single provider running"
1409        );
1410
1411        let component_1_id = "TARKIN";
1412        let component_2_id = "DARTHVADER";
1413
1414        worker
1415            .handle_host_heartbeat(
1416                lattice_id,
1417                &HostHeartbeat {
1418                    components: vec![
1419                        ComponentDescription::builder()
1420                            .id(component_1_id.into())
1421                            .revision(0)
1422                            .image_ref("ref1".into())
1423                            .max_instances(2)
1424                            .build()
1425                            .expect("failed to build description"),
1426                        ComponentDescription::builder()
1427                            .id(component_2_id.into())
1428                            .revision(0)
1429                            .image_ref("ref2".into())
1430                            .max_instances(2)
1431                            .build()
1432                            .expect("failed to build description"),
1433                    ],
1434                    friendly_name: "death-star-42".into(),
1435                    labels: labels.clone(),
1436                    issuer: "".to_string(),
1437                    providers: vec![
1438                        ProviderDescription::builder()
1439                            .id(&provider1.provider_id)
1440                            .image_ref(&provider1.image_ref)
1441                            .revision(0)
1442                            .build()
1443                            .expect("failed to build provider description"),
1444                        ProviderDescription::builder()
1445                            .id(&provider2.provider_id)
1446                            .image_ref(&provider2.image_ref)
1447                            .revision(0)
1448                            .build()
1449                            .expect("failed to build provider description"),
1450                    ],
1451                    uptime_human: "30s".into(),
1452                    uptime_seconds: 30,
1453                    version: semver::Version::parse("0.61.0").unwrap(),
1454                    host_id: host1_id.into(),
1455                },
1456            )
1457            .await
1458            .expect("Should be able to handle host heartbeat");
1459
1460        worker
1461            .handle_host_heartbeat(
1462                lattice_id,
1463                &HostHeartbeat {
1464                    components: vec![
1465                        ComponentDescription::builder()
1466                            .id(component_1_id.into())
1467                            .image_ref("ref1".into())
1468                            .revision(0)
1469                            .max_instances(2)
1470                            .build()
1471                            .expect("failed to build description"),
1472                        ComponentDescription::builder()
1473                            .id(component_2_id.into())
1474                            .image_ref("ref2".into())
1475                            .revision(0)
1476                            .max_instances(2)
1477                            .build()
1478                            .expect("failed to build description"),
1479                    ],
1480                    issuer: "".to_string(),
1481                    friendly_name: "starkiller-base-2015".to_string(),
1482                    labels: labels2.clone(),
1483                    providers: vec![ProviderDescription::builder()
1484                        .id(&provider2.provider_id)
1485                        .image_ref(&provider2.image_ref)
1486                        .revision(0)
1487                        .build()
1488                        .expect("failed to build provider description")],
1489                    uptime_human: "30s".into(),
1490                    uptime_seconds: 30,
1491                    version: semver::Version::parse("0.61.0").unwrap(),
1492                    host_id: host2_id.into(),
1493                },
1494            )
1495            .await
1496            .expect("Should be able to handle host heartbeat");
1497
1498        // Check that our component and provider data is still correct.
1499        let providers = store.list::<Provider>(lattice_id).await.unwrap();
1500        assert_eq!(providers.len(), 2, "Should still have 2 providers in state");
1501        assert_provider(&providers, &provider1, &[host1_id]);
1502        assert_provider(&providers, &provider2, &[host1_id, host2_id]);
1503
1504        let components = store.list::<Component>(lattice_id).await.unwrap();
1505        assert_eq!(
1506            components.len(),
1507            2,
1508            "Should still have 2 components in state"
1509        );
1510        assert_component(&components, component_1_id, &[(host1_id, 2), (host2_id, 2)]);
1511        assert_component(&components, component_2_id, &[(host1_id, 2), (host2_id, 2)]);
1512
1513        /***********************************************************/
1514        /************** Component Scale Down Tests *****************/
1515        /***********************************************************/
1516
1517        // Stop them on one host first
1518        let stopped = ComponentScaled {
1519            claims: None,
1520            image_ref: "coruscant.galactic.empire/tarkin:0.1.0".into(),
1521            annotations: BTreeMap::default(),
1522            component_id: component_1_id.into(),
1523            host_id: host1_id.into(),
1524            max_instances: 0,
1525        };
1526
1527        worker
1528            .handle_component_scaled(lattice_id, &stopped)
1529            .await
1530            .expect("Should be able to handle component stop event");
1531
1532        let components = store.list::<Component>(lattice_id).await.unwrap();
1533        assert_eq!(
1534            components.len(),
1535            2,
1536            "Should still have 2 components in state"
1537        );
1538        assert_component(&components, component_1_id, &[(host2_id, 2)]);
1539        assert_component(&components, component_2_id, &[(host1_id, 2), (host2_id, 2)]);
1540
1541        let host = store
1542            .get::<Host>(lattice_id, host2_id)
1543            .await
1544            .expect("Should be able to access store")
1545            .expect("Should have the host in the store");
1546        assert_eq!(*host.components.get(component_1_id).unwrap_or(&0), 2_usize);
1547        assert_eq!(*host.components.get(component_2_id).unwrap_or(&0), 2_usize);
1548
1549        // Now stop on the other
1550        let stopped2 = ComponentScaled {
1551            host_id: host2_id.into(),
1552            ..stopped
1553        };
1554
1555        worker
1556            .handle_component_scaled(lattice_id, &stopped2)
1557            .await
1558            .expect("Should be able to handle component scale event");
1559
1560        let components = store.list::<Component>(lattice_id).await.unwrap();
1561        assert_eq!(components.len(), 1, "Should only have 1 component in state");
1562        // Double check the the old one is still ok
1563        assert_component(&components, component_2_id, &[(host1_id, 2), (host2_id, 2)]);
1564
1565        /***********************************************************/
1566        /******************* Provider Stop Tests *******************/
1567        /***********************************************************/
1568
1569        worker
1570            .handle_provider_stopped(
1571                lattice_id,
1572                &ProviderStopped {
1573                    annotations: BTreeMap::default(),
1574                    provider_id: provider2.provider_id.clone(),
1575                    reason: String::new(),
1576                    host_id: host1_id.into(),
1577                },
1578            )
1579            .await
1580            .expect("Should be able to handle provider stop event");
1581
1582        let providers = store.list::<Provider>(lattice_id).await.unwrap();
1583        assert_eq!(providers.len(), 2, "Should still have 2 providers in state");
1584        assert_provider(&providers, &provider1, &[host1_id]);
1585        assert_provider(&providers, &provider2, &[host2_id]);
1586
1587        // Check that hosts got updated properly
1588        let hosts = store.list::<Host>(lattice_id).await.unwrap();
1589        assert_eq!(hosts.len(), 2, "Should only have 2 hosts");
1590        let host = hosts.get(host1_id).expect("Host should still exist");
1591        assert_eq!(host.components.len(), 1, "Should have 1 component running");
1592        assert_eq!(host.providers.len(), 1, "Should have 1 provider running");
1593        let host = hosts.get(host2_id).expect("Host should still exist");
1594        assert_eq!(host.components.len(), 1, "Should have 1 component running");
1595        assert_eq!(
1596            host.providers.len(),
1597            1,
1598            "Should have a single provider running"
1599        );
1600
1601        /***********************************************************/
1602        /***************** Heartbeat Tests Part 2 ******************/
1603        /***********************************************************/
1604
1605        // NOTE(brooksmtownsend): Painful manual manipulation of host inventory
1606        // to satisfy the way we currently query the inventory when handling heartbeats.
1607        *inventory.write().await = HashMap::from_iter([
1608            (
1609                host1_id.to_string(),
1610                HostInventory::builder()
1611                    .friendly_name("my-host-3".into())
1612                    .components(vec![ComponentDescription::builder()
1613                        .id(component_2_id.into())
1614                        .image_ref("ref2".into())
1615                        .revision(0)
1616                        .max_instances(2)
1617                        .build()
1618                        .expect("failed to build description")])
1619                    .host_id(host1_id.into())
1620                    .version(semver::Version::parse("0.61.0").unwrap().to_string())
1621                    .uptime_human("60s".into())
1622                    .uptime_seconds(60)
1623                    .build()
1624                    .expect("failed to build host inventory"),
1625            ),
1626            (
1627                host2_id.to_string(),
1628                HostInventory::builder()
1629                    .friendly_name("my-host-4".into())
1630                    .components(vec![ComponentDescription::builder()
1631                        .id(component_2_id.into())
1632                        .image_ref("ref2".into())
1633                        .revision(0)
1634                        .max_instances(2)
1635                        .build()
1636                        .expect("failed to build description")])
1637                    .host_id(host2_id.into())
1638                    .version(semver::Version::parse("1.2.3").unwrap().to_string())
1639                    .uptime_human("100s".into())
1640                    .uptime_seconds(100)
1641                    .build()
1642                    .expect("failed to build host inventory"),
1643            ),
1644        ]);
1645
1646        // Heartbeat the first host and make sure nothing has changed
1647        worker
1648            .handle_host_heartbeat(
1649                lattice_id,
1650                &HostHeartbeat {
1651                    components: vec![ComponentDescription::builder()
1652                        .id(component_2_id.into())
1653                        .image_ref("ref2".into())
1654                        .revision(0)
1655                        .max_instances(2)
1656                        .build()
1657                        .expect("failed to build description")],
1658                    friendly_name: "death-star-42".to_string(),
1659                    issuer: "".to_string(),
1660                    labels,
1661                    providers: vec![ProviderDescription::builder()
1662                        .id(&provider1.provider_id)
1663                        .image_ref(&provider1.image_ref)
1664                        .revision(1)
1665                        .build()
1666                        .expect("failed to build provider description")],
1667                    uptime_human: "60s".into(),
1668                    uptime_seconds: 60,
1669                    version: semver::Version::parse("0.61.0").unwrap(),
1670                    host_id: host1_id.into(),
1671                },
1672            )
1673            .await
1674            .expect("Should be able to handle host heartbeat");
1675
1676        worker
1677            .handle_host_heartbeat(
1678                lattice_id,
1679                &HostHeartbeat {
1680                    components: vec![ComponentDescription::builder()
1681                        .id(component_2_id.into())
1682                        .image_ref("ref2".into())
1683                        .revision(0)
1684                        .max_instances(2)
1685                        .build()
1686                        .expect("failed to build description")],
1687                    friendly_name: "starkiller-base-2015".to_string(),
1688                    labels: labels2,
1689                    issuer: "".to_string(),
1690                    providers: vec![ProviderDescription::builder()
1691                        .id(&provider2.provider_id)
1692                        .image_ref(&provider2.image_ref)
1693                        .revision(0)
1694                        .build()
1695                        .expect("failed to build provider description")],
1696                    uptime_human: "60s".into(),
1697                    uptime_seconds: 60,
1698                    version: semver::Version::parse("0.61.0").unwrap(),
1699                    host_id: host2_id.into(),
1700                },
1701            )
1702            .await
1703            .expect("Should be able to handle host heartbeat");
1704
1705        // Check that the heartbeat kept state consistent
1706        let hosts = store.list::<Host>(lattice_id).await.unwrap();
1707        assert_eq!(hosts.len(), 2, "Should only have 2 hosts");
1708        let host = hosts.get(host1_id).expect("Host should still exist");
1709        assert_eq!(host.components.len(), 1, "Should have 1 component running");
1710        assert_eq!(host.providers.len(), 1, "Should have 1 provider running");
1711        let host = hosts.get(host2_id).expect("Host should still exist");
1712        assert_eq!(host.components.len(), 1, "Should have 1 component running");
1713        assert_eq!(
1714            host.providers.len(),
1715            1,
1716            "Should have a single provider running"
1717        );
1718
1719        // Double check providers and components are the same
1720        let components = store.list::<Component>(lattice_id).await.unwrap();
1721        assert_eq!(components.len(), 1, "Should only have 1 component in state");
1722        assert_component(&components, component_2_id, &[(host1_id, 2), (host2_id, 2)]);
1723
1724        let providers = store.list::<Provider>(lattice_id).await.unwrap();
1725        assert_eq!(providers.len(), 2, "Should still have 2 providers in state");
1726        assert_provider(&providers, &provider1, &[host1_id]);
1727        assert_provider(&providers, &provider2, &[host2_id]);
1728
1729        /***********************************************************/
1730        /********************* Host Stop Tests *********************/
1731        /***********************************************************/
1732
1733        worker
1734            .handle_host_stopped(
1735                lattice_id,
1736                &HostStopped {
1737                    labels: HashMap::default(),
1738                    id: host1_id.into(),
1739                },
1740            )
1741            .await
1742            .expect("Should be able to handle host stopped event");
1743
1744        let hosts = store.list::<Host>(lattice_id).await.unwrap();
1745        assert_eq!(hosts.len(), 1, "Should only have 1 host");
1746        let host = hosts.get(host2_id).expect("Host should still exist");
1747        assert_eq!(host.components.len(), 1, "Should have 1 component running");
1748        assert_eq!(
1749            host.providers.len(),
1750            1,
1751            "Should have a single provider running"
1752        );
1753
1754        // Double check providers and components are the same
1755        let components = store.list::<Component>(lattice_id).await.unwrap();
1756        assert_eq!(components.len(), 1, "Should only have 1 component in state");
1757        assert_component(&components, component_2_id, &[(host2_id, 2)]);
1758
1759        let providers = store.list::<Provider>(lattice_id).await.unwrap();
1760        assert_eq!(providers.len(), 1, "Should now have 1 provider in state");
1761        assert_provider(&providers, &provider2, &[host2_id]);
1762    }
1763
1764    #[tokio::test]
1765    async fn test_discover_running_host() {
1766        let component1_id = "SKYWALKER";
1767        let component1_ref = "fakecloud.io/skywalker:0.1.0";
1768        let component2_id = "ORGANA";
1769        let component2_ref = "fakecloud.io/organa:0.1.0";
1770        let lattice_id = "discover_running_host";
1771        let claims = HashMap::from([
1772            (
1773                component1_id.into(),
1774                Claims {
1775                    name: "tosche_station".to_string(),
1776                    capabilities: vec!["wasmcloud:httpserver".to_string()],
1777                    issuer: "GEORGELUCAS".to_string(),
1778                },
1779            ),
1780            (
1781                component2_id.into(),
1782                Claims {
1783                    name: "alderaan".to_string(),
1784                    capabilities: vec!["wasmcloud:keyvalue".to_string()],
1785                    issuer: "GEORGELUCAS".to_string(),
1786                },
1787            ),
1788        ]);
1789        let store = Arc::new(TestStore::default());
1790        let inventory = Arc::new(RwLock::new(HashMap::default()));
1791        let lattice_source = TestLatticeSource {
1792            claims: claims.clone(),
1793            inventory: inventory.clone(),
1794            ..Default::default()
1795        };
1796        let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
1797        let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
1798        let worker = EventWorker::new(
1799            store.clone(),
1800            lattice_source.clone(),
1801            command_publisher.clone(),
1802            status_publisher.clone(),
1803            ScalerManager::test_new(
1804                NoopPublisher,
1805                lattice_id,
1806                store.clone(),
1807                command_publisher,
1808                status_publisher.clone(),
1809                lattice_source,
1810            )
1811            .await,
1812        );
1813
1814        let provider_id = "HYPERDRIVE";
1815        let host_id = "WHATAPIECEOFJUNK";
1816        // NOTE(brooksmtownsend): Painful manual manipulation of host inventory
1817        // to satisfy the way we currently query the inventory when handling heartbeats.
1818        *inventory.write().await = HashMap::from_iter([(
1819            host_id.to_string(),
1820            HostInventory::builder()
1821                .friendly_name("my-host-5".into())
1822                .components(vec![
1823                    ComponentDescription::builder()
1824                        .id(component1_id.into())
1825                        .image_ref(component1_ref.into())
1826                        .revision(0)
1827                        .max_instances(2)
1828                        .build()
1829                        .expect("failed to build description"),
1830                    ComponentDescription::builder()
1831                        .id(component2_id.into())
1832                        .image_ref(component2_ref.into())
1833                        .revision(0)
1834                        .max_instances(1)
1835                        .build()
1836                        .expect("failed to build description"),
1837                ])
1838                .host_id(host_id.into())
1839                .providers(vec![ProviderDescription::builder()
1840                    .id(provider_id)
1841                    .revision(0)
1842                    .build()
1843                    .expect("failed to build provider description")])
1844                .version(semver::Version::parse("0.61.0").unwrap().to_string())
1845                .uptime_human("60s".into())
1846                .uptime_seconds(60)
1847                .build()
1848                .expect("failed to build host inventory"),
1849        )]);
1850
1851        // Heartbeat with components and providers that don't exist in the store yet
1852        worker
1853            .handle_host_heartbeat(
1854                lattice_id,
1855                &HostHeartbeat {
1856                    components: vec![
1857                        ComponentDescription::builder()
1858                            .id(component1_id.into())
1859                            .image_ref(component1_ref.into())
1860                            .revision(0)
1861                            .max_instances(2)
1862                            .build()
1863                            .expect("failed to build description"),
1864                        ComponentDescription::builder()
1865                            .id(component2_id.into())
1866                            .image_ref(component2_ref.into())
1867                            .revision(0)
1868                            .max_instances(1)
1869                            .build()
1870                            .expect("failed to build description"),
1871                    ],
1872                    friendly_name: "millenium_falcon-1977".to_string(),
1873                    labels: HashMap::default(),
1874                    issuer: "".to_string(),
1875                    providers: vec![ProviderDescription::builder()
1876                        .id(provider_id)
1877                        .revision(0)
1878                        .build()
1879                        .expect("failed to build provider description")],
1880                    uptime_human: "60s".into(),
1881                    uptime_seconds: 60,
1882                    version: semver::Version::parse("0.61.0").unwrap(),
1883                    host_id: host_id.into(),
1884                },
1885            )
1886            .await
1887            .expect("Should be able to handle host heartbeat");
1888
1889        // We test that the host is created in other tests, so just check that the components and
1890        // providers were created properly
1891        let components = store.list::<Component>(lattice_id).await.unwrap();
1892        assert_eq!(components.len(), 2, "Store should now have two components");
1893        let component = components
1894            .get(component1_id)
1895            .expect("component should exist");
1896        let expected = claims.get(component1_id).unwrap();
1897        assert_eq!(component.name, expected.name, "Data should match");
1898        assert_eq!(component.issuer, expected.issuer, "Data should match");
1899        assert_eq!(
1900            component
1901                .instances
1902                .get(host_id)
1903                .expect("Host should exist in count")
1904                .get(&BTreeMap::new())
1905                .expect("Should find a component with the correct annotations")
1906                .count,
1907            2,
1908            "Should have the right number of components running"
1909        );
1910
1911        let component = components
1912            .get(component2_id)
1913            .expect("Component should exist");
1914        let expected = claims.get(component2_id).unwrap();
1915        assert_eq!(component.name, expected.name, "Data should match");
1916        assert_eq!(component.issuer, expected.issuer, "Data should match");
1917        assert_eq!(
1918            component
1919                .instances
1920                .get(host_id)
1921                .expect("Host should exist in count")
1922                .len(),
1923            1,
1924            "Should have the right number of components running"
1925        );
1926
1927        let providers = store.list::<Provider>(lattice_id).await.unwrap();
1928        assert_eq!(providers.len(), 1, "Should have 1 provider in the store");
1929        let provider = providers.get(provider_id).expect("Provider should exist");
1930        assert_eq!(provider.id, provider_id, "Data should match");
1931        assert!(
1932            provider.hosts.contains_key(host_id),
1933            "Should have found host in provider store"
1934        );
1935    }
1936
1937    #[tokio::test]
1938    async fn test_provider_status_update() {
1939        let store = Arc::new(TestStore::default());
1940        let lattice_source = TestLatticeSource::default();
1941        let lattice_id = "provider_status";
1942        let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
1943        let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
1944        let worker = EventWorker::new(
1945            store.clone(),
1946            lattice_source.clone(),
1947            command_publisher.clone(),
1948            status_publisher.clone(),
1949            ScalerManager::test_new(
1950                NoopPublisher,
1951                lattice_id,
1952                store.clone(),
1953                command_publisher,
1954                status_publisher.clone(),
1955                lattice_source,
1956            )
1957            .await,
1958        );
1959
1960        let host_id = "CLOUDCITY";
1961
1962        // Trigger a provider started and then a health check
1963        let provider = ProviderStarted {
1964            claims: Some(ProviderClaims {
1965                issuer: "Lando Calrissian".into(),
1966                name: "Tibanna Gas Mining".into(),
1967                version: "0.1.0".into(),
1968                ..Default::default()
1969            }),
1970            image_ref: "bespin.lando.inc/tibanna:0.1.0".into(),
1971            provider_id: "GAS".into(),
1972            host_id: host_id.into(),
1973            annotations: BTreeMap::default(),
1974        };
1975
1976        worker
1977            .handle_provider_started(lattice_id, &provider)
1978            .await
1979            .expect("Should be able to handle provider started event");
1980        worker
1981            .handle_provider_health_check(
1982                lattice_id,
1983                &ProviderHealthCheckInfo {
1984                    provider_id: provider.provider_id.clone(),
1985                    host_id: host_id.into(),
1986                },
1987                Some(false),
1988            )
1989            .await
1990            .expect("Should be able to handle a provider health check event");
1991
1992        let providers = store.list::<Provider>(lattice_id).await.unwrap();
1993        assert_eq!(providers.len(), 1, "Only 1 provider should exist");
1994        let prov = providers
1995            .get(&provider.provider_id)
1996            .expect("Provider should exist");
1997        assert!(
1998            matches!(
1999                prov.hosts
2000                    .get(host_id)
2001                    .expect("Should find status for host"),
2002                ProviderStatus::Running
2003            ),
2004            "Provider should have a running status"
2005        );
2006
2007        // Now try a failed status
2008        worker
2009            .handle_provider_health_check(
2010                lattice_id,
2011                &ProviderHealthCheckInfo {
2012                    provider_id: provider.provider_id.clone(),
2013                    host_id: host_id.into(),
2014                },
2015                Some(true),
2016            )
2017            .await
2018            .expect("Should be able to handle a provider health check event");
2019
2020        let providers = store.list::<Provider>(lattice_id).await.unwrap();
2021        assert_eq!(providers.len(), 1, "Only 1 provider should exist");
2022        let prov = providers
2023            .get(&provider.provider_id)
2024            .expect("Provider should exist");
2025        assert!(
2026            matches!(
2027                prov.hosts
2028                    .get(host_id)
2029                    .expect("Should find status for host"),
2030                ProviderStatus::Failed
2031            ),
2032            "Provider should have a running status"
2033        );
2034    }
2035
2036    #[tokio::test]
2037    async fn test_heartbeat_updates_stale_data() {
2038        let store = Arc::new(TestStore::default());
2039        let inventory = Arc::new(RwLock::new(HashMap::default()));
2040        let lattice_source = TestLatticeSource {
2041            inventory: inventory.clone(),
2042            ..Default::default()
2043        };
2044        let lattice_id = "update_data";
2045
2046        let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
2047        let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
2048        let worker = EventWorker::new(
2049            store.clone(),
2050            lattice_source.clone(),
2051            command_publisher.clone(),
2052            status_publisher.clone(),
2053            ScalerManager::test_new(
2054                NoopPublisher,
2055                lattice_id,
2056                store.clone(),
2057                command_publisher,
2058                status_publisher.clone(),
2059                lattice_source,
2060            )
2061            .await,
2062        );
2063
2064        let host_id = "jabbaspalace";
2065
2066        // Store existing components and providers as-if they had the minimum
2067        // amount of information
2068        store
2069            .store(
2070                lattice_id,
2071                "jabba".to_string(),
2072                Component {
2073                    id: "jabba".to_string(),
2074                    instances: HashMap::from([(
2075                        host_id.to_string(),
2076                        HashSet::from_iter([WadmComponentInfo {
2077                            count: 1,
2078                            annotations: BTreeMap::default(),
2079                        }]),
2080                    )]),
2081                    ..Default::default()
2082                },
2083            )
2084            .await
2085            .unwrap();
2086        store
2087            .store(
2088                lattice_id,
2089                "jabbatheprovider".to_string(),
2090                Provider {
2091                    id: "jabbatheprovider".to_string(),
2092                    hosts: HashMap::from_iter([(host_id.to_string(), ProviderStatus::Pending)]),
2093                    ..Default::default()
2094                },
2095            )
2096            .await
2097            .unwrap();
2098
2099        // Now heartbeat and make sure stuff that isn't running is removed
2100        worker
2101            .handle_host_heartbeat(
2102                lattice_id,
2103                &HostHeartbeat {
2104                    components: vec![
2105                        ComponentDescription::builder()
2106                            .id("jabba".into())
2107                            .image_ref("jabba.tatooinecr.io/jabba:latest".into())
2108                            .name("Da Hutt".into())
2109                            .annotations(BTreeMap::from_iter([(
2110                                "da".to_string(),
2111                                "gobah".to_string(),
2112                            )]))
2113                            .revision(0)
2114                            .max_instances(5)
2115                            .build()
2116                            .expect("failed to build description"),
2117                        ComponentDescription::builder()
2118                            .id("jabba2".into())
2119                            .image_ref("jabba.tatooinecr.io/jabba:latest".into())
2120                            .name("Da Hutt".into())
2121                            .revision(0)
2122                            .max_instances(1)
2123                            .build()
2124                            .expect("failed to build description"),
2125                    ],
2126                    friendly_name: "palace-1983".to_string(),
2127                    labels: HashMap::default(),
2128                    issuer: "".to_string(),
2129                    providers: vec![ProviderDescription::builder()
2130                        .annotations(BTreeMap::from_iter([(
2131                            "one".to_string(),
2132                            "two".to_string(),
2133                        )]))
2134                        .id("jabbatheprovider")
2135                        .image_ref("jabba.tatooinecr.io/provider:latest")
2136                        .name("Jabba The Provider")
2137                        .revision(0)
2138                        .build()
2139                        .expect("failed to build provider description")],
2140                    uptime_human: "60s".into(),
2141                    uptime_seconds: 60,
2142                    version: semver::Version::parse("0.61.0").unwrap(),
2143                    host_id: host_id.to_string(),
2144                },
2145            )
2146            .await
2147            .expect("Should be able to handle host heartbeat");
2148
2149        let components = store.list::<Component>(lattice_id).await.unwrap();
2150        assert_eq!(components.len(), 2, "Should have 2 components in the store");
2151        let component = components.get("jabba").expect("Component should exist");
2152        assert_eq!(
2153            component.count(),
2154            5,
2155            "Component should have the correct number of instances"
2156        );
2157        assert_eq!(component.name, "Da Hutt", "Should have the correct name");
2158        assert_eq!(
2159            component.reference, "jabba.tatooinecr.io/jabba:latest",
2160            "Should have the correct reference"
2161        );
2162        assert_eq!(
2163            component
2164                .instances
2165                .get(host_id)
2166                .expect("instances to be on our specified host")
2167                .iter()
2168                .find(|i| !i.annotations.is_empty())
2169                .expect("instance with annotations to exist")
2170                .annotations
2171                .get("da")
2172                .expect("annotation to exist"),
2173            "gobah"
2174        );
2175
2176        let providers = store
2177            .list::<Provider>(lattice_id)
2178            .await
2179            .expect("should be able to grab providers from store");
2180        assert_eq!(providers.len(), 1, "Should have 1 provider in the store");
2181        let provider = providers
2182            .get("jabbatheprovider")
2183            .expect("Provider should exist");
2184        assert_eq!(
2185            provider.name, "Jabba The Provider",
2186            "Should have the correct name"
2187        );
2188        assert_eq!(
2189            provider.reference, "jabba.tatooinecr.io/provider:latest",
2190            "Should have the correct reference"
2191        );
2192        let hosts = store
2193            .list::<Host>(lattice_id)
2194            .await
2195            .expect("should be able to get hosts from store");
2196        assert_eq!(hosts.len(), 1);
2197        let host = hosts.get(host_id).expect("host with generated ID to exist");
2198        let host_provider = host
2199            .providers
2200            .get(&ProviderInfo {
2201                provider_id: "jabbatheprovider".to_string(),
2202                provider_ref: "jabba.tatooinecr.io/provider:latest".to_string(),
2203                annotations: BTreeMap::default(),
2204            })
2205            .expect("provider to exist on host");
2206
2207        assert_eq!(
2208            host_provider
2209                .annotations
2210                .get("one")
2211                .expect("annotation to exist"),
2212            "two"
2213        );
2214    }
2215
2216    fn assert_component(
2217        components: &HashMap<String, Component>,
2218        component_id: &str,
2219        expected_counts: &[(&str, usize)],
2220    ) {
2221        let component = components
2222            .get(component_id)
2223            .expect("Component should exist in store");
2224        assert_eq!(
2225            component.id, component_id,
2226            "Component ID stored should be correct"
2227        );
2228        assert_eq!(
2229            expected_counts.len(),
2230            component.instances.len(),
2231            "Should have the proper number of hosts the component is running on"
2232        );
2233        for (expected_host, expected_count) in expected_counts.iter() {
2234            assert_eq!(
2235                component.count_for_host(expected_host),
2236                *expected_count,
2237                "Component count on host should be correct"
2238            );
2239        }
2240    }
2241
2242    fn assert_provider(
2243        providers: &HashMap<String, Provider>,
2244        event: &ProviderStarted,
2245        running_on_hosts: &[&str],
2246    ) {
2247        let provider = providers
2248            .get(&event.provider_id)
2249            .expect("Correct provider should exist in store");
2250        assert!(
2251            event
2252                .claims
2253                .clone()
2254                .is_some_and(|claims| claims.name == provider.name),
2255            "Provider should have the correct data in state"
2256        );
2257        assert!(
2258            provider.hosts.len() == running_on_hosts.len()
2259                && running_on_hosts
2260                    .iter()
2261                    .all(|host_id| provider.hosts.contains_key(*host_id)),
2262            "Provider should be set to the correct hosts"
2263        );
2264    }
2265}