1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::future::Future;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use anyhow::{Context, Result, anyhow, bail};
7use arc_swap::ArcSwap;
8use parking_lot::Mutex;
9use reqwest::Client;
10use serde_json::Value;
11use tokio::runtime::{Handle, Runtime};
12use tokio::task::JoinHandle;
13
14use crate::config::HostConfig;
15use crate::engine::host::{SessionHost, StateHost};
16use crate::engine::runtime::StateMachineRuntime;
17use crate::oauth::{OAuthBrokerConfig, request_resource_token};
18use crate::operator_metrics::OperatorMetrics;
19use crate::operator_registry::OperatorRegistry;
20use crate::pack::{ComponentResolution, PackRuntime};
21use crate::runner::adapt_events_email::{
22 EmailExecutionPlan, EmailSendRequest, build_email_execution_plan, execute_email_request,
23};
24use crate::runner::contract_cache::{ContractCache, ContractCacheStats};
25use crate::runner::engine::FlowEngine;
26use crate::runner::mocks::MockLayer;
27use crate::secrets::{DynSecretsManager, canonicalize_secret_key, read_secret_blocking};
28use crate::storage::session::DynSessionStore;
29use crate::storage::state::DynStateStore;
30use crate::telemetry::RolloutIds;
31use crate::trace::PackTraceInfo;
32use crate::wasi::RunnerWasiPolicy;
33use greentic_deploy_spec::ids::{BundleId, DeploymentId, RevisionId};
34use greentic_types::SecretRequirement;
35use runner_core::packs::PackDigest;
36
37const RUNTIME_SECRETS_PACK_ID: &str = "_runner";
38
39#[derive(Clone, Debug, PartialEq, Eq, Hash)]
45pub struct RuntimeKey {
46 pub tenant: String,
47 pub deployment_id: Option<DeploymentId>,
48 pub bundle_id: Option<BundleId>,
49 pub revision_id: Option<RevisionId>,
50}
51
52impl RuntimeKey {
53 pub fn legacy(tenant: impl Into<String>) -> Self {
55 Self {
56 tenant: tenant.into(),
57 deployment_id: None,
58 bundle_id: None,
59 revision_id: None,
60 }
61 }
62
63 pub fn revision(
65 tenant: impl Into<String>,
66 deployment_id: DeploymentId,
67 bundle_id: BundleId,
68 revision_id: RevisionId,
69 ) -> Self {
70 Self {
71 tenant: tenant.into(),
72 deployment_id: Some(deployment_id),
73 bundle_id: Some(bundle_id),
74 revision_id: Some(revision_id),
75 }
76 }
77
78 pub fn is_legacy(&self) -> bool {
80 self.deployment_id.is_none() && self.bundle_id.is_none() && self.revision_id.is_none()
81 }
82}
83
84fn merge_legacy_reload<V: Clone>(
89 prev: &HashMap<RuntimeKey, V>,
90 mut legacy: HashMap<RuntimeKey, V>,
91) -> HashMap<RuntimeKey, V> {
92 for (key, value) in prev {
93 if !key.is_legacy() {
94 legacy.insert(key.clone(), value.clone());
95 }
96 }
97 legacy
98}
99
100fn remove_keyed_entry<V: Clone>(
105 prev: &HashMap<RuntimeKey, V>,
106 key: &RuntimeKey,
107) -> Option<(HashMap<RuntimeKey, V>, V)> {
108 let removed = prev.get(key)?.clone();
109 let mut next = prev.clone();
110 next.remove(key);
111 Some((next, removed))
112}
113
114fn insert_keyed_entry<V: Clone>(
118 prev: &HashMap<RuntimeKey, V>,
119 key: RuntimeKey,
120 value: V,
121) -> HashMap<RuntimeKey, V> {
122 let mut next = prev.clone();
123 next.insert(key, value);
124 next
125}
126
127pub struct ActivePacks {
133 inner: ArcSwap<HashMap<RuntimeKey, Arc<TenantRuntime>>>,
134 write_lock: Mutex<()>,
135}
136
137impl ActivePacks {
138 pub fn new() -> Self {
139 Self {
140 inner: ArcSwap::from_pointee(HashMap::new()),
141 write_lock: Mutex::new(()),
142 }
143 }
144
145 pub fn load_pack(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
148 self.inner.load().get(&RuntimeKey::legacy(tenant)).cloned()
149 }
150
151 pub fn load_revision(
153 &self,
154 tenant: &str,
155 deployment_id: DeploymentId,
156 bundle_id: BundleId,
157 revision_id: RevisionId,
158 ) -> Option<Arc<TenantRuntime>> {
159 self.inner
160 .load()
161 .get(&RuntimeKey::revision(
162 tenant,
163 deployment_id,
164 bundle_id,
165 revision_id,
166 ))
167 .cloned()
168 }
169
170 pub fn snapshot(&self) -> Arc<HashMap<RuntimeKey, Arc<TenantRuntime>>> {
171 self.inner.load_full()
172 }
173
174 pub fn insert_pack(&self, tenant: &str, runtime: Arc<TenantRuntime>) {
177 let _guard = self.write_lock.lock();
178 let mut next = (*self.inner.load_full()).clone();
179 next.insert(RuntimeKey::legacy(tenant), runtime);
180 self.inner.store(Arc::new(next));
181 }
182
183 pub fn insert_revision(
197 &self,
198 tenant: &str,
199 deployment_id: DeploymentId,
200 bundle_id: BundleId,
201 revision_id: RevisionId,
202 runtime: Arc<TenantRuntime>,
203 ) -> Result<()> {
204 if runtime.tenant() != tenant {
205 bail!(
206 "revision runtime tenant `{}` does not match key tenant `{tenant}`",
207 runtime.tenant()
208 );
209 }
210 let ids = runtime.engine().rollout_ids();
211 let key_deployment = deployment_id.to_string();
212 let key_bundle = bundle_id.as_str();
213 let key_revision = revision_id.to_string();
214 if ids.deployment_id.as_deref() != Some(key_deployment.as_str())
215 || ids.bundle_id.as_deref() != Some(key_bundle)
216 || ids.revision_id.as_deref() != Some(key_revision.as_str())
217 {
218 bail!(
219 "revision runtime rollout identity (deployment={:?}, bundle={:?}, revision={:?}) \
220 does not match key (deployment=`{key_deployment}`, bundle=`{key_bundle}`, \
221 revision=`{key_revision}`)",
222 ids.deployment_id,
223 ids.bundle_id,
224 ids.revision_id
225 );
226 }
227 let _guard = self.write_lock.lock();
228 let key = RuntimeKey::revision(tenant, deployment_id, bundle_id, revision_id);
229 let next = insert_keyed_entry(&self.inner.load_full(), key, runtime);
230 self.inner.store(Arc::new(next));
231 Ok(())
232 }
233
234 pub fn replace_legacy(&self, legacy: HashMap<RuntimeKey, Arc<TenantRuntime>>) {
238 let _guard = self.write_lock.lock();
239 let prev = self.inner.load_full();
240 let next = merge_legacy_reload(&prev, legacy);
241 self.inner.store(Arc::new(next));
242 }
243
244 pub fn remove_revision(
250 &self,
251 tenant: &str,
252 deployment_id: DeploymentId,
253 bundle_id: BundleId,
254 revision_id: RevisionId,
255 ) -> Option<Arc<TenantRuntime>> {
256 let _guard = self.write_lock.lock();
257 let prev = self.inner.load_full();
258 let key = RuntimeKey::revision(tenant, deployment_id, bundle_id, revision_id);
259 let (next, removed) = remove_keyed_entry(&prev, &key)?;
260 self.inner.store(Arc::new(next));
261 Some(removed)
262 }
263
264 pub fn replace(&self, next: HashMap<RuntimeKey, Arc<TenantRuntime>>) {
267 let _guard = self.write_lock.lock();
268 self.inner.store(Arc::new(next));
269 }
270
271 pub fn len(&self) -> usize {
272 self.inner.load().len()
273 }
274
275 pub fn is_empty(&self) -> bool {
276 self.len() == 0
277 }
278}
279
280impl Default for ActivePacks {
281 fn default() -> Self {
282 Self::new()
283 }
284}
285
286pub struct TenantRuntime {
288 tenant: String,
289 config: Arc<HostConfig>,
290 packs: Vec<Arc<PackRuntime>>,
291 digests: Vec<Option<String>>,
292 engine: Arc<FlowEngine>,
293 state_machine: Arc<StateMachineRuntime>,
294 session_store: DynSessionStore,
301 http_client: Client,
302 mocks: Option<Arc<MockLayer>>,
303 timer_handles: Mutex<Vec<JoinHandle<()>>>,
304 secrets: DynSecretsManager,
305 operator_registry: OperatorRegistry,
306 operator_metrics: Arc<OperatorMetrics>,
307 contract_cache: ContractCache,
308}
309
310#[derive(Clone)]
311pub struct ResolvedComponent {
312 pub digest: String,
313 pub component_ref: String,
314 pub pack: Arc<PackRuntime>,
315}
316
317#[derive(Clone, Debug)]
323pub struct RevisionPackRef {
324 pub path: PathBuf,
325 pub digest: String,
326}
327
328pub fn block_on<F: Future<Output = R>, R>(future: F) -> R {
330 if let Ok(handle) = Handle::try_current() {
331 handle.block_on(future)
332 } else {
333 Runtime::new()
334 .expect("failed to create tokio runtime")
335 .block_on(future)
336 }
337}
338
339impl TenantRuntime {
340 #[allow(clippy::too_many_arguments)]
341 pub async fn load(
342 pack_path: &Path,
343 config: Arc<HostConfig>,
344 mocks: Option<Arc<MockLayer>>,
345 archive_source: Option<&Path>,
346 digest: Option<String>,
347 wasi_policy: Arc<RunnerWasiPolicy>,
348 session_host: Arc<dyn SessionHost>,
349 session_store: DynSessionStore,
350 state_store: DynStateStore,
351 state_host: Arc<dyn StateHost>,
352 secrets_manager: DynSecretsManager,
353 ) -> Result<Arc<Self>> {
354 let pack = Self::load_pack_runtime(
355 pack_path,
356 &config,
357 mocks.clone(),
358 archive_source,
359 &wasi_policy,
360 &session_store,
361 &state_store,
362 &secrets_manager,
363 &BTreeMap::new(),
364 &BTreeMap::new(),
365 None,
366 )
367 .await?;
368 Self::from_packs(
369 config,
370 vec![(pack, digest)],
371 mocks,
372 session_host,
373 session_store,
374 state_store,
375 state_host,
376 secrets_manager,
377 )
378 .await
379 }
380
381 #[allow(clippy::too_many_arguments)]
394 pub async fn load_revision(
395 pack_refs: &[RevisionPackRef],
396 config: Arc<HostConfig>,
397 mocks: Option<Arc<MockLayer>>,
398 wasi_policy: Arc<RunnerWasiPolicy>,
399 session_host: Arc<dyn SessionHost>,
400 session_store: DynSessionStore,
401 state_store: DynStateStore,
402 state_host: Arc<dyn StateHost>,
403 secrets_manager: DynSecretsManager,
404 deployment_id: DeploymentId,
405 bundle_id: BundleId,
406 revision_id: RevisionId,
407 customer_id: Option<String>,
408 runtime_configs_by_pack_id: &BTreeMap<String, Arc<BTreeMap<String, Value>>>,
409 runtime_refs_by_pack_id: &BTreeMap<String, Arc<BTreeMap<String, String>>>,
410 runtime_ref_resolver: Option<Arc<dyn crate::runtime_refs::RuntimeRefResolver>>,
411 ) -> Result<Arc<Self>> {
412 if pack_refs.is_empty() {
413 bail!(
414 "revision runtime for tenant {} requires at least one pack",
415 config.tenant
416 );
417 }
418 let mut packs = Vec::with_capacity(pack_refs.len());
419 let mut seen_pack_ids = HashSet::with_capacity(pack_refs.len());
420 for pack_ref in pack_refs {
421 let expected = PackDigest::parse(&pack_ref.digest).with_context(|| {
422 format!(
423 "revision pack `{}` has an invalid digest `{}`",
424 pack_ref.path.display(),
425 pack_ref.digest
426 )
427 })?;
428 if expected.algorithm() != "sha256" {
432 bail!(
433 "revision pack `{}` pins unsupported digest algorithm `{}`; only sha256 is supported",
434 pack_ref.path.display(),
435 expected.algorithm()
436 );
437 }
438 if !expected.matches_file(&pack_ref.path).with_context(|| {
439 format!(
440 "hashing revision pack `{}` for digest verification",
441 pack_ref.path.display()
442 )
443 })? {
444 bail!(
445 "revision pack `{}` does not match pinned digest `{}`",
446 pack_ref.path.display(),
447 pack_ref.digest
448 );
449 }
450 let pack = Self::load_pack_runtime(
451 &pack_ref.path,
452 &config,
453 mocks.clone(),
454 None,
455 &wasi_policy,
456 &session_store,
457 &state_store,
458 &secrets_manager,
459 runtime_configs_by_pack_id,
460 runtime_refs_by_pack_id,
461 runtime_ref_resolver.as_ref(),
462 )
463 .await?;
464 let pack_id = pack.metadata().pack_id.clone();
468 if !seen_pack_ids.insert(pack_id.clone()) {
469 bail!(
470 "revision for tenant {} contains duplicate pack_id `{}` (path `{}`)",
471 config.tenant,
472 pack_id,
473 pack_ref.path.display(),
474 );
475 }
476 packs.push((pack, Some(expected.raw_string())));
477 }
478 let rollout = RolloutIds {
479 customer_id,
480 deployment_id: Some(deployment_id.to_string()),
481 bundle_id: Some(bundle_id.as_str().to_string()),
482 revision_id: Some(revision_id.to_string()),
483 };
484 Self::from_packs_with_rollout(
485 config,
486 packs,
487 mocks,
488 session_host,
489 session_store,
490 state_store,
491 state_host,
492 secrets_manager,
493 rollout,
494 )
495 .await
496 }
497
498 #[allow(clippy::too_many_arguments)]
513 async fn load_pack_runtime(
514 pack_path: &Path,
515 config: &Arc<HostConfig>,
516 mocks: Option<Arc<MockLayer>>,
517 archive_source: Option<&Path>,
518 wasi_policy: &Arc<RunnerWasiPolicy>,
519 session_store: &DynSessionStore,
520 state_store: &DynStateStore,
521 secrets_manager: &DynSecretsManager,
522 runtime_configs_by_pack_id: &BTreeMap<String, Arc<BTreeMap<String, Value>>>,
523 runtime_refs_by_pack_id: &BTreeMap<String, Arc<BTreeMap<String, String>>>,
524 runtime_ref_resolver: Option<&Arc<dyn crate::runtime_refs::RuntimeRefResolver>>,
525 ) -> Result<Arc<PackRuntime>> {
526 let oauth_config = config.oauth_broker_config();
527 let mut pack = PackRuntime::load(
528 pack_path,
529 Arc::clone(config),
530 mocks,
531 archive_source,
532 Some(Arc::clone(session_store)),
533 Some(Arc::clone(state_store)),
534 Arc::clone(wasi_policy),
535 Arc::clone(secrets_manager),
536 oauth_config,
537 true,
538 ComponentResolution::default(),
539 )
540 .await
541 .with_context(|| {
542 format!(
543 "failed to load pack {} for tenant {}",
544 pack_path.display(),
545 config.tenant
546 )
547 })?;
548 let pack_id = pack.metadata().pack_id.clone();
549 if let Some(non_secret) = runtime_configs_by_pack_id.get(pack_id.as_str()) {
550 pack.set_runtime_config_non_secret(Some(Arc::clone(non_secret)));
551 }
552 if let Some(refs) = runtime_refs_by_pack_id.get(pack_id.as_str()) {
553 let resolver = runtime_ref_resolver.ok_or_else(|| {
554 anyhow!(
555 "pack `{}` has runtime_refs bound but no RuntimeRefResolver was provided",
556 pack_id,
557 )
558 })?;
559 pack.set_runtime_refs(Some(crate::runtime_refs::RuntimeRefsInjection {
560 refs: Arc::clone(refs),
561 resolver: Arc::clone(resolver),
562 }));
563 }
564 Ok(Arc::new(pack))
565 }
566
567 #[allow(clippy::too_many_arguments)]
568 pub async fn from_packs(
569 config: Arc<HostConfig>,
570 packs: Vec<(Arc<PackRuntime>, Option<String>)>,
571 mocks: Option<Arc<MockLayer>>,
572 session_host: Arc<dyn SessionHost>,
573 session_store: DynSessionStore,
574 state_store: DynStateStore,
575 state_host: Arc<dyn StateHost>,
576 secrets_manager: DynSecretsManager,
577 ) -> Result<Arc<Self>> {
578 Self::from_packs_with_rollout(
579 config,
580 packs,
581 mocks,
582 session_host,
583 session_store,
584 state_store,
585 state_host,
586 secrets_manager,
587 RolloutIds::default(),
588 )
589 .await
590 }
591
592 #[allow(clippy::too_many_arguments)]
597 pub(crate) async fn from_packs_with_rollout(
598 config: Arc<HostConfig>,
599 packs: Vec<(Arc<PackRuntime>, Option<String>)>,
600 mocks: Option<Arc<MockLayer>>,
601 session_host: Arc<dyn SessionHost>,
602 session_store: DynSessionStore,
603 _state_store: DynStateStore,
604 state_host: Arc<dyn StateHost>,
605 secrets_manager: DynSecretsManager,
606 rollout: RolloutIds,
607 ) -> Result<Arc<Self>> {
608 let operator_registry = OperatorRegistry::build(&packs)?;
609 let operator_metrics = Arc::new(OperatorMetrics::default());
610 let pack_runtimes = packs
611 .iter()
612 .map(|(pack, _)| Arc::clone(pack))
613 .collect::<Vec<_>>();
614 let digests = packs
615 .iter()
616 .map(|(_, digest)| digest.clone())
617 .collect::<Vec<_>>();
618 let mut pack_trace = HashMap::new();
619 for (pack, digest) in &packs {
620 let pack_id = pack.metadata().pack_id.clone();
621 let pack_ref = config
622 .pack_bindings
623 .iter()
624 .find(|binding| binding.pack_id == pack_id)
625 .map(|binding| binding.pack_ref.clone())
626 .unwrap_or_else(|| pack_id.clone());
627 pack_trace.insert(
628 pack_id,
629 PackTraceInfo {
630 pack_ref,
631 resolved_digest: digest.clone(),
632 },
633 );
634 }
635 let engine = Arc::new(
636 FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
637 .await
638 .context("failed to prime flow engine")?
639 .with_rollout_ids(rollout),
640 );
641 let state_machine = Arc::new(
642 StateMachineRuntime::from_flow_engine(
643 Arc::clone(&config),
644 Arc::clone(&engine),
645 pack_trace,
646 session_host,
647 Arc::clone(&session_store),
648 state_host,
649 Arc::clone(&secrets_manager),
650 mocks.clone(),
651 )
652 .context("failed to initialise state machine runtime")?,
653 );
654 let http_client = Client::builder().build()?;
655 Ok(Arc::new(Self {
656 tenant: config.tenant.clone(),
657 config,
658 packs: pack_runtimes,
659 digests,
660 engine,
661 state_machine,
662 session_store,
663 http_client,
664 mocks,
665 timer_handles: Mutex::new(Vec::new()),
666 secrets: secrets_manager,
667 operator_registry,
668 operator_metrics,
669 contract_cache: ContractCache::from_env(),
670 }))
671 }
672
673 pub fn tenant(&self) -> &str {
674 &self.tenant
675 }
676
677 pub fn config(&self) -> &Arc<HostConfig> {
678 &self.config
679 }
680
681 pub fn operator_registry(&self) -> &OperatorRegistry {
682 &self.operator_registry
683 }
684
685 pub fn operator_metrics(&self) -> &OperatorMetrics {
686 &self.operator_metrics
687 }
688
689 pub fn contract_cache(&self) -> &ContractCache {
690 &self.contract_cache
691 }
692
693 pub fn contract_cache_stats(&self) -> ContractCacheStats {
694 self.contract_cache.stats()
695 }
696
697 pub fn main_pack(&self) -> &Arc<PackRuntime> {
698 self.packs
699 .first()
700 .expect("tenant runtime must contain at least one pack")
701 }
702
703 pub fn pack(&self) -> Arc<PackRuntime> {
704 Arc::clone(self.main_pack())
705 }
706
707 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
708 self.packs.iter().skip(1).cloned().collect()
709 }
710
711 pub fn all_packs(&self) -> &[Arc<PackRuntime>] {
719 &self.packs
720 }
721
722 pub fn pack_digests(&self) -> &[Option<String>] {
726 &self.digests
727 }
728
729 pub fn engine(&self) -> &Arc<FlowEngine> {
730 &self.engine
731 }
732
733 pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
734 &self.state_machine
735 }
736
737 pub fn session_store(&self) -> &DynSessionStore {
742 &self.session_store
743 }
744
745 pub fn http_client(&self) -> &Client {
746 &self.http_client
747 }
748
749 pub fn oauth_config(&self) -> Option<OAuthBrokerConfig> {
750 self.config.oauth_broker_config()
751 }
752
753 pub fn digest(&self) -> Option<&str> {
754 self.digests.first().and_then(|d| d.as_deref())
755 }
756
757 pub fn overlay_digests(&self) -> Vec<Option<String>> {
758 self.digests.iter().skip(1).cloned().collect()
759 }
760
761 pub fn required_secrets(&self) -> Vec<SecretRequirement> {
762 self.packs
763 .iter()
764 .flat_map(|pack| pack.required_secrets().iter().cloned())
765 .collect()
766 }
767
768 pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
769 self.packs
770 .iter()
771 .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
772 .collect()
773 }
774
775 pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
776 self.mocks.as_ref()
777 }
778
779 pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
780 self.timer_handles.lock().extend(handles);
781 }
782
783 pub fn get_secret(&self, key: &str) -> Result<String> {
784 if crate::provider_core_only::is_enabled() {
785 bail!(crate::provider_core_only::blocked_message("secrets"))
786 }
787 if !self.config.secrets_policy.is_allowed(key) {
788 bail!("secret {key} is not permitted by bindings policy");
789 }
790 let ctx = self.config.tenant_ctx();
791 let canonical_key = canonicalize_secret_key(key);
792 let bytes =
793 read_secret_blocking(&self.secrets, &ctx, RUNTIME_SECRETS_PACK_ID, &canonical_key)
794 .context("failed to read secret from manager")?;
795 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
796 Ok(value)
797 }
798
799 pub fn build_events_email_execution_plan(
800 &self,
801 tenant: &greentic_types::TenantCtx,
802 request: &EmailSendRequest,
803 ) -> Result<EmailExecutionPlan> {
804 let oauth = self
805 .oauth_config()
806 .ok_or_else(|| anyhow!("oauth broker config is not configured for tenant runtime"))?;
807 build_email_execution_plan(&oauth, tenant, request)
808 }
809
810 pub async fn execute_events_email_request(
811 &self,
812 access_token: &str,
813 request: &EmailSendRequest,
814 ) -> Result<()> {
815 execute_email_request(self.http_client(), access_token, request).await
816 }
817
818 pub async fn execute_events_email_with_oauth(
819 &self,
820 tenant: &greentic_types::TenantCtx,
821 request: &EmailSendRequest,
822 ) -> Result<()> {
823 let plan = self.build_events_email_execution_plan(tenant, request)?;
824 let token = request_resource_token(self.http_client(), &plan.token_request).await?;
825 self.execute_events_email_request(&token.access_token, request)
826 .await
827 }
828
829 pub fn pack_for_component(&self, component_ref: &str) -> Option<Arc<PackRuntime>> {
830 self.packs
831 .iter()
832 .find(|pack| pack.contains_component(component_ref))
833 .cloned()
834 }
835
836 pub fn pack_for_component_with_digest(
837 &self,
838 component_ref: &str,
839 ) -> Option<(Arc<PackRuntime>, Option<String>)> {
840 self.packs
841 .iter()
842 .zip(self.digests.iter())
843 .find(|(pack, _)| pack.contains_component(component_ref))
844 .map(|(pack, digest)| (Arc::clone(pack), digest.clone()))
845 }
846
847 pub fn resolve_component(&self, component_ref: &str) -> Option<ResolvedComponent> {
848 self.pack_for_component_with_digest(component_ref)
849 .map(|(pack, digest)| ResolvedComponent {
850 digest: digest
851 .or_else(|| self.digest().map(ToString::to_string))
852 .unwrap_or_else(|| "unknown".to_string()),
853 component_ref: component_ref.to_string(),
854 pack,
855 })
856 }
857}
858
859impl Drop for TenantRuntime {
860 fn drop(&mut self) {
861 for handle in self.timer_handles.lock().drain(..) {
862 handle.abort();
863 }
864 }
865}
866
867#[cfg(test)]
868mod runtime_key_tests {
869 use super::*;
870
871 #[test]
872 fn legacy_keys_match_only_on_tenant() {
873 assert_eq!(RuntimeKey::legacy("acme"), RuntimeKey::legacy("acme"));
874 assert_ne!(RuntimeKey::legacy("acme"), RuntimeKey::legacy("other"));
875 }
876
877 #[test]
878 fn legacy_and_revision_keys_never_collide() {
879 let key = RuntimeKey::revision(
880 "acme",
881 DeploymentId::new(),
882 BundleId::from("bundle-a"),
883 RevisionId::new(),
884 );
885 assert_ne!(RuntimeKey::legacy("acme"), key);
886 }
887
888 #[test]
889 fn revision_keys_distinguish_revision_id() {
890 let deployment = DeploymentId::new();
891 let bundle = BundleId::from("bundle-a");
892 let rev_a = RevisionId::new();
893 let rev_b = RevisionId::new();
894 let key_a = RuntimeKey::revision("acme", deployment, bundle.clone(), rev_a);
895 let key_b = RuntimeKey::revision("acme", deployment, bundle.clone(), rev_b);
896 let same = RuntimeKey::revision("acme", deployment, bundle, rev_a);
897 assert_ne!(key_a, key_b);
898 assert_eq!(key_a, same);
899 }
900
901 #[test]
902 fn legacy_reload_preserves_revision_entries() {
903 let revision_key = RuntimeKey::revision(
904 "acme",
905 DeploymentId::new(),
906 BundleId::from("bundle-a"),
907 RevisionId::new(),
908 );
909 let mut prev: HashMap<RuntimeKey, u32> = HashMap::new();
910 prev.insert(RuntimeKey::legacy("acme"), 1); prev.insert(RuntimeKey::legacy("retired"), 2); prev.insert(revision_key.clone(), 99); let mut legacy: HashMap<RuntimeKey, u32> = HashMap::new();
915 legacy.insert(RuntimeKey::legacy("acme"), 10);
916 legacy.insert(RuntimeKey::legacy("newcomer"), 20);
917
918 let next = merge_legacy_reload(&prev, legacy);
919
920 assert_eq!(next.get(&RuntimeKey::legacy("acme")), Some(&10));
921 assert_eq!(next.get(&RuntimeKey::legacy("newcomer")), Some(&20));
922 assert_eq!(next.get(&RuntimeKey::legacy("retired")), None);
923 assert_eq!(next.get(&revision_key), Some(&99));
924 }
925
926 #[test]
927 fn remove_keyed_entry_pops_only_the_targeted_key() {
928 let deployment = DeploymentId::new();
929 let bundle = BundleId::from("bundle-a");
930 let rev_a = RevisionId::new();
931 let rev_b = RevisionId::new();
932 let key_a = RuntimeKey::revision("acme", deployment, bundle.clone(), rev_a);
933 let key_b = RuntimeKey::revision("acme", deployment, bundle, rev_b);
934
935 let mut prev: HashMap<RuntimeKey, u32> = HashMap::new();
936 prev.insert(RuntimeKey::legacy("acme"), 1);
937 prev.insert(key_a.clone(), 10);
938 prev.insert(key_b.clone(), 20);
939
940 let (next, removed) = remove_keyed_entry(&prev, &key_a).expect("present");
941 assert_eq!(removed, 10);
942 assert_eq!(next.get(&key_a), None);
943 assert_eq!(next.get(&key_b), Some(&20));
944 assert_eq!(next.get(&RuntimeKey::legacy("acme")), Some(&1));
945 }
946
947 #[test]
948 fn remove_keyed_entry_returns_none_for_missing_key() {
949 let prev: HashMap<RuntimeKey, u32> = HashMap::new();
950 let ghost = RuntimeKey::revision(
951 "acme",
952 DeploymentId::new(),
953 BundleId::from("bundle-a"),
954 RevisionId::new(),
955 );
956 assert!(remove_keyed_entry(&prev, &ghost).is_none());
957 }
958
959 #[test]
960 fn remove_keyed_entry_leaves_other_deployments_alone() {
961 let bundle = BundleId::from("bundle-a");
962 let dep_a = DeploymentId::new();
963 let dep_b = DeploymentId::new();
964 let rev = RevisionId::new();
965 let key_a = RuntimeKey::revision("acme", dep_a, bundle.clone(), rev);
966 let key_b = RuntimeKey::revision("acme", dep_b, bundle, rev);
967
968 let mut prev: HashMap<RuntimeKey, u32> = HashMap::new();
969 prev.insert(key_a.clone(), 100);
970 prev.insert(key_b.clone(), 200);
971
972 let (next, removed) = remove_keyed_entry(&prev, &key_a).expect("present");
973 assert_eq!(removed, 100);
974 assert_eq!(next.get(&key_b), Some(&200));
975 assert_eq!(next.len(), 1);
976 }
977
978 #[test]
979 fn map_lookup_separates_legacy_from_revision() {
980 let deployment = DeploymentId::new();
981 let bundle = BundleId::from("bundle-a");
982 let revision = RevisionId::new();
983
984 let mut map: HashMap<RuntimeKey, u32> = HashMap::new();
985 map.insert(RuntimeKey::legacy("acme"), 1);
986 map.insert(
987 RuntimeKey::revision("acme", deployment, bundle.clone(), revision),
988 2,
989 );
990
991 assert_eq!(map.get(&RuntimeKey::legacy("acme")), Some(&1));
992 assert_eq!(
993 map.get(&RuntimeKey::revision("acme", deployment, bundle, revision)),
994 Some(&2)
995 );
996 assert_eq!(map.get(&RuntimeKey::legacy("ghost")), None);
997 }
998
999 #[test]
1000 fn insert_keyed_entry_adds_revision_preserving_legacy_and_siblings() {
1001 let deployment = DeploymentId::new();
1002 let bundle = BundleId::from("bundle-a");
1003 let rev_a = RevisionId::new();
1004 let rev_b = RevisionId::new();
1005 let key_a = RuntimeKey::revision("acme", deployment, bundle.clone(), rev_a);
1006 let key_b = RuntimeKey::revision("acme", deployment, bundle, rev_b);
1007
1008 let mut prev: HashMap<RuntimeKey, u32> = HashMap::new();
1009 prev.insert(RuntimeKey::legacy("acme"), 1);
1010 prev.insert(key_a.clone(), 10);
1011
1012 let next = insert_keyed_entry(&prev, key_b.clone(), 20);
1013
1014 assert_eq!(next.get(&key_b), Some(&20));
1016 assert_eq!(next.get(&key_a), Some(&10));
1017 assert_eq!(next.get(&RuntimeKey::legacy("acme")), Some(&1));
1018 assert_eq!(next.len(), 3);
1019 }
1020
1021 #[test]
1022 fn insert_keyed_entry_replaces_existing_revision() {
1023 let key = RuntimeKey::revision(
1024 "acme",
1025 DeploymentId::new(),
1026 BundleId::from("bundle-a"),
1027 RevisionId::new(),
1028 );
1029 let mut prev: HashMap<RuntimeKey, u32> = HashMap::new();
1030 prev.insert(key.clone(), 10);
1031
1032 let next = insert_keyed_entry(&prev, key.clone(), 99);
1033
1034 assert_eq!(next.get(&key), Some(&99));
1035 assert_eq!(next.len(), 1);
1036 }
1037}