Skip to main content

coil_runtime/plan/
mod.rs

1use super::*;
2use crate::builder::CustomerHookSet;
3use coil_assets::ActiveAssetManifest;
4use coil_storage::execution::ObjectStoreClientConfig;
5use std::fmt;
6use std::path::PathBuf;
7use std::sync::{Arc, OnceLock};
8use url::Url;
9
10mod execution;
11mod shared_state;
12#[cfg(test)]
13mod testing;
14
15pub(crate) use shared_state::shared_state_root;
16#[cfg(test)]
17pub(crate) use testing::{shared_cache_runtime_for_test, shared_jobs_runtime_for_test};
18
19#[derive(Clone)]
20pub(crate) struct SharedJobsRuntimeHandle {
21    namespace: String,
22    runtime: Arc<OnceLock<Result<Arc<dyn coil_jobs::JobsCoordinationRuntime>, String>>>,
23}
24
25impl SharedJobsRuntimeHandle {
26    pub(crate) fn new(namespace: impl Into<String>) -> Self {
27        Self {
28            namespace: namespace.into(),
29            runtime: Arc::new(OnceLock::new()),
30        }
31    }
32
33    pub(crate) fn get_or_init(
34        &self,
35        runtime: &JobsRuntimeServices,
36    ) -> Result<Arc<dyn coil_jobs::JobsCoordinationRuntime>, RuntimeJobsError> {
37        let namespace = self.namespace.clone();
38        self.runtime
39            .get_or_init(|| shared_jobs_runtime(runtime, namespace.clone()))
40            .clone()
41            .map_err(|error| {
42                RuntimeJobsError::Jobs(JobsModelError::LiveSharedBackendRequiresExplicitRuntime {
43                    backend: runtime.backend,
44                    namespace: error,
45                })
46            })
47    }
48}
49
50impl fmt::Debug for SharedJobsRuntimeHandle {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        f.debug_struct("SharedJobsRuntimeHandle")
53            .field("namespace", &self.namespace)
54            .field("initialized", &self.runtime.get().is_some())
55            .finish()
56    }
57}
58
59#[derive(Debug, Clone)]
60pub struct RuntimePlan {
61    pub config: PlatformConfig,
62    pub auth_package_name: String,
63    pub auth_package: AuthModelPackageSelection,
64    pub approved_outbound_http_endpoints: BTreeMap<String, Url>,
65    pub shared_backend_scope: String,
66    pub shared_state_root: PathBuf,
67    pub cache_topology: CacheTopology,
68    pub cache_planner: CachePlanner,
69    pub i18n: I18nRuntimeServices,
70    pub seo: SeoRuntimeServices,
71    pub browser: BrowserSecurityServices,
72    pub cli: CliRuntimeServices,
73    pub data: DataRuntimeServices,
74    pub jobs: JobsRuntimeServices,
75    pub observability: ObservabilityRuntimeServices,
76    pub http: HttpRuntimePlan,
77    pub handlers: BTreeMap<String, HandlerDefinition>,
78    pub storage_planner: StoragePlanner,
79    pub storefront_catalog: StorefrontCatalog,
80    pub theme_asset_manifest: Option<ActiveAssetManifest>,
81    pub template: TemplateRuntimeServices,
82    pub tls: TlsRuntimeServices,
83    pub wasm: WasmRuntimeServices,
84    pub services: Vec<ServiceDescriptor>,
85    pub modules: Vec<ModuleManifest>,
86    pub install_migrations: MigrationPlan,
87    pub extension_registry: ExtensionRegistry,
88    pub registered_extension_slots: Vec<RegisteredExtensionSlot>,
89    pub installed_extensions: Vec<InstalledExtensionSummary>,
90    pub linked_customer_plugins: Vec<LinkedCustomerPluginSummary>,
91    pub(crate) customer_hooks: CustomerHookSet,
92    pub(crate) shared_jobs_runtime: SharedJobsRuntimeHandle,
93    pub module_jobs: Vec<RegisteredModuleJob>,
94    pub module_event_subscriptions: Vec<RegisteredEventSubscription>,
95    pub module_data_repositories: Vec<RegisteredDataRepository>,
96    pub module_search_contributions: Vec<RegisteredSearchContribution>,
97    pub module_report_definitions: Vec<RegisteredReportDefinition>,
98    pub module_bulk_operations: Vec<RegisteredBulkOperation>,
99    pub registered_runtime_jobs: Vec<RuntimeJobDefinition>,
100    pub registered_runtime_event_subscriptions: Vec<RuntimeEventSubscriptionDefinition>,
101    pub jobs_domain: JobsDomain,
102    pub ops_catalog: OpsCatalog,
103}
104
105#[derive(Debug, Clone)]
106pub(crate) enum MetadataAuditBackendSelection {
107    LocalSqlite {
108        root: std::path::PathBuf,
109        namespace: String,
110    },
111    SharedPostgres {
112        runtime: coil_data::DataRuntime,
113    },
114}
115
116impl RuntimePlan {
117    pub fn auth_package(&self) -> &dyn AuthModelPackage {
118        self.auth_package.package()
119    }
120
121    pub(crate) fn metadata_audit_backend_selection(&self) -> MetadataAuditBackendSelection {
122        match self.config.storage.deployment {
123            coil_config::StorageDeployment::Distributed => {
124                MetadataAuditBackendSelection::SharedPostgres {
125                    runtime: self.data.clone(),
126                }
127            }
128            coil_config::StorageDeployment::SingleNode => {
129                MetadataAuditBackendSelection::LocalSqlite {
130                    root: std::path::PathBuf::from(&self.config.storage.local_root),
131                    namespace: self.shared_backend_namespace(),
132                }
133            }
134        }
135    }
136
137    pub fn approved_outbound_http_endpoints(&self) -> &BTreeMap<String, Url> {
138        &self.approved_outbound_http_endpoints
139    }
140
141    pub fn tenant_id(&self) -> i64 {
142        self.config.auth.tenant_id
143    }
144
145    pub fn jobs_host(
146        &self,
147        scheduler_node_id: impl Into<String>,
148    ) -> Result<JobsHost, RuntimeJobsError> {
149        let scheduler_node_id =
150            validate_runtime_identifier("scheduler_node_id", scheduler_node_id.into())?;
151        let namespace = self.shared_backend_namespace();
152        let shared_runtime = self.shared_jobs_runtime.get_or_init(&self.jobs)?;
153        Ok(JobsHost::new(
154            self.config.app.name.clone(),
155            scheduler_node_id,
156            self.jobs.clone(),
157            self.observability.telemetry.clone(),
158            self.jobs.describe().clone(),
159            self.registered_runtime_jobs.clone(),
160            self.registered_runtime_event_subscriptions.clone(),
161            self.jobs_domain.clone(),
162            shared_runtime,
163            namespace,
164        ))
165    }
166
167    pub fn ops_host(
168        &self,
169        scheduler_node_id: impl Into<String>,
170    ) -> Result<OpsHost, RuntimeOpsError> {
171        Ok(OpsHost::new(
172            OpsPlanner::new(self.jobs.clone(), self.ops_catalog.clone())?,
173            self.jobs_host(scheduler_node_id)?,
174        ))
175    }
176
177    pub fn search_host(
178        &self,
179        scheduler_node_id: impl Into<String>,
180    ) -> Result<SearchHost, RuntimeSearchError> {
181        Ok(SearchHost::new(
182            self.ops_catalog.search.clone(),
183            self.ops_host(scheduler_node_id)?,
184        ))
185    }
186
187    pub fn cache_host(&self) -> Result<CacheHost, RuntimeCacheError> {
188        let namespace = self.cache_namespace()?;
189        let shared_namespace = self.shared_backend_namespace();
190        if self.cache_planner.topology().supports_shared_invalidation() {
191            #[cfg(test)]
192            {
193                let backend = match self
194                    .cache_planner
195                    .topology()
196                    .l2()
197                    .expect("shared cache runtime requires distributed l2")
198                {
199                    coil_cache::DistributedCacheBackend::Redis => {
200                        coil_cache::CacheBackendKind::Redis
201                    }
202                    coil_cache::DistributedCacheBackend::Valkey => {
203                        coil_cache::CacheBackendKind::Valkey
204                    }
205                };
206                let runtime = shared_cache_runtime_for_test(backend, shared_namespace.clone());
207                return Ok(CacheHost::new(
208                    self.config.app.name.clone(),
209                    namespace,
210                    self.cache_planner,
211                    Some(runtime),
212                    shared_namespace,
213                ));
214            }
215
216            #[cfg(not(test))]
217            {
218                let backend = match self
219                    .cache_planner
220                    .topology()
221                    .l2()
222                    .expect("shared cache runtime requires distributed l2")
223                {
224                    coil_cache::DistributedCacheBackend::Redis => {
225                        coil_cache::CacheBackendKind::Redis
226                    }
227                    coil_cache::DistributedCacheBackend::Valkey => {
228                        coil_cache::CacheBackendKind::Valkey
229                    }
230                };
231                let runtime = shared_cache_runtime(backend, shared_namespace.clone());
232                return Ok(CacheHost::new(
233                    self.config.app.name.clone(),
234                    namespace,
235                    self.cache_planner,
236                    Some(runtime),
237                    shared_namespace,
238                ));
239            }
240        }
241
242        Ok(CacheHost::new(
243            self.config.app.name.clone(),
244            namespace,
245            self.cache_planner,
246            None,
247            shared_namespace,
248        ))
249    }
250
251    #[cfg(test)]
252    pub fn browser_host(&self) -> Result<BrowserHost, BrowserHostBuildError> {
253        BrowserHost::new_with_scope(
254            self.config.app.name.clone(),
255            self.browser.clone(),
256            self.shared_backend_scope.clone(),
257        )
258    }
259
260    pub fn tls_host(&self) -> Result<TlsHost, RuntimeTlsError> {
261        self.tls_host_with_secret_resolver(&crate::server::EnvironmentSecretResolver)
262    }
263
264    pub fn tls_host_with_secret_resolver<R: crate::server::SecretResolver>(
265        &self,
266        resolver: &R,
267    ) -> Result<TlsHost, RuntimeTlsError> {
268        let account_secret = self
269            .config
270            .tls
271            .account_secret
272            .as_ref()
273            .map(|secret| resolver.resolve(secret))
274            .transpose()?;
275        TlsHost::new(
276            self.config.app.name.clone(),
277            self.tls.clone(),
278            self.data.clone(),
279            self.shared_backend_scope.clone(),
280            account_secret,
281        )
282    }
283
284    pub fn tls_validation_host_with_secret_resolver<R: crate::server::SecretResolver>(
285        &self,
286        resolver: &R,
287    ) -> Result<TlsHost, RuntimeTlsError> {
288        let account_secret = self
289            .config
290            .tls
291            .account_secret
292            .as_ref()
293            .map(|secret| resolver.resolve(secret))
294            .transpose()?;
295        TlsHost::new_for_validation(
296            self.config.app.name.clone(),
297            self.tls.clone(),
298            self.shared_backend_scope.clone(),
299            account_secret,
300        )
301    }
302
303    pub fn storage_host(&self) -> StorageHost {
304        self.storage_host_with_object_store(None)
305    }
306
307    pub fn storage_host_with_object_store(
308        &self,
309        object_store: Option<ObjectStoreClientConfig>,
310    ) -> StorageHost {
311        StorageHost::new(
312            self.config.app.name.clone(),
313            self.storage_planner.clone(),
314            self.config.assets.cdn_base_url.clone(),
315            object_store,
316        )
317    }
318
319    pub fn wasm_host(&self) -> WasmHost {
320        WasmHost::new(
321            self.clone(),
322            self.config.app.name.clone(),
323            self.wasm.clone(),
324            self.extension_registry.clone(),
325            self.config.i18n.default_locale.clone(),
326            self.registered_runtime_jobs.clone(),
327        )
328    }
329
330    pub fn wasm_host_with_secret_resolver<R: SecretResolver>(
331        &self,
332        resolver: &R,
333    ) -> Result<WasmHost, RuntimeServerError> {
334        let wasm_secrets = self.wasm_secret_values(resolver)?;
335        let storage_host =
336            self.storage_host_with_object_store(self.object_store_client_config(resolver)?);
337        Ok(WasmHost::with_host_services(
338            self.clone(),
339            self.config.app.name.clone(),
340            self.wasm.clone(),
341            self.extension_registry.clone(),
342            self.config.i18n.default_locale.clone(),
343            self.registered_runtime_jobs.clone(),
344            RuntimeWasmHostServices::with_runtime_secrets(self.clone(), storage_host, wasm_secrets),
345        ))
346    }
347
348    pub fn wasm_secret_values<R: SecretResolver>(
349        &self,
350        resolver: &R,
351    ) -> Result<BTreeMap<String, String>, RuntimeServerError> {
352        self.config
353            .wasm
354            .secret_bindings
355            .iter()
356            .map(|(name, secret)| {
357                resolver
358                    .resolve(secret)
359                    .map(|value| (name.clone(), value))
360                    .map_err(|error| RuntimeServerError::Secret(error))
361            })
362            .collect()
363    }
364
365    pub fn shared_backend_clients<R: SecretResolver>(
366        &self,
367        resolver: &R,
368    ) -> Result<SharedBackendClients, RuntimeServerError> {
369        Ok(SharedBackendClients::from_config(&self.config, resolver)?)
370    }
371
372    pub fn object_store_client_config<R: SecretResolver>(
373        &self,
374        resolver: &R,
375    ) -> Result<Option<ObjectStoreClientConfig>, RuntimeServerError> {
376        Ok(SharedBackendClients::object_store_client_config(
377            &self.config,
378            resolver,
379        )?)
380    }
381
382    pub fn server_host<R: SecretResolver>(
383        &self,
384        resolver: &R,
385        cookie_secret: &[u8],
386        csrf_secret: &[u8],
387    ) -> Result<HttpServerHost, RuntimeServerError> {
388        if self.browser.sessions.store == coil_core::SessionStoreTopology::Memory {
389            return Err(BrowserHostBuildError::MemoryStoreRequiresTestOnlyBrowserHost.into());
390        }
391
392        let wasm_secrets = self.wasm_secret_values(resolver)?;
393        let payment_webhook_secret =
394            crate::server::resolve_commerce_payment_webhook_secret(&self.config, resolver)?;
395        HttpServerHost::new(
396            self.clone(),
397            self.shared_backend_clients(resolver)?,
398            wasm_secrets,
399            payment_webhook_secret,
400            cookie_secret.to_vec(),
401            csrf_secret.to_vec(),
402        )
403    }
404
405    #[cfg(test)]
406    pub(crate) fn server_host_with_checkout_client<R: SecretResolver>(
407        &self,
408        resolver: &R,
409        cookie_secret: &[u8],
410        csrf_secret: &[u8],
411        hosted_checkout_client: std::sync::Arc<dyn crate::server::HostedCheckoutClient>,
412    ) -> Result<HttpServerHost, RuntimeServerError> {
413        if self.browser.sessions.store == coil_core::SessionStoreTopology::Memory {
414            return Err(BrowserHostBuildError::MemoryStoreRequiresTestOnlyBrowserHost.into());
415        }
416
417        let wasm_secrets = self.wasm_secret_values(resolver)?;
418        let payment_webhook_secret =
419            crate::server::resolve_commerce_payment_webhook_secret(&self.config, resolver)?;
420        HttpServerHost::new_with_checkout_client(
421            self.clone(),
422            self.shared_backend_clients(resolver)?,
423            wasm_secrets,
424            payment_webhook_secret,
425            cookie_secret.to_vec(),
426            csrf_secret.to_vec(),
427            hosted_checkout_client,
428        )
429    }
430
431    pub fn serve_from_env(
432        self,
433        bind_override: Option<String>,
434    ) -> Result<(), RuntimeBootstrapError> {
435        let cookie_secret = required_env_bytes("COIL_COOKIE_SECRET")?;
436        let csrf_secret = required_env_bytes("COIL_CSRF_SECRET")?;
437        let bind = bind_override.unwrap_or_else(|| self.config.server.bind.clone());
438        let runtime = tokio::runtime::Builder::new_multi_thread()
439            .enable_all()
440            .build()
441            .map_err(|error| RuntimeBootstrapError::Serve {
442                reason: error.to_string(),
443            })?;
444        let server = {
445            let _runtime_guard = runtime.enter();
446            self.server_host(
447                &crate::server::EnvironmentSecretResolver,
448                &cookie_secret,
449                &csrf_secret,
450            )?
451        };
452
453        runtime.block_on(async move {
454            let listener = tokio::net::TcpListener::bind(&bind)
455                .await
456                .map_err(|error| RuntimeBootstrapError::Bind {
457                    bind: bind.clone(),
458                    reason: error.to_string(),
459                })?;
460            server
461                .serve(listener)
462                .await
463                .map_err(|error| RuntimeBootstrapError::Serve {
464                    reason: error.to_string(),
465                })
466        })
467    }
468
469    pub(crate) fn cache_namespace(&self) -> Result<CacheNamespace, CacheModelError> {
470        CacheNamespace::new(format!("customer-app:{}", self.config.app.name))
471    }
472
473    pub(crate) fn shared_backend_namespace(&self) -> String {
474        format!(
475            "customer-app:{}:{}",
476            self.config.app.name, self.shared_backend_scope
477        )
478    }
479
480    pub(crate) fn shared_state_root(&self) -> &PathBuf {
481        &self.shared_state_root
482    }
483}
484
485#[cfg(test)]
486fn shared_jobs_runtime(
487    runtime: &JobsRuntimeServices,
488    namespace: String,
489) -> Result<Arc<dyn coil_jobs::JobsCoordinationRuntime>, String> {
490    Ok(crate::plan::shared_jobs_runtime_for_test(
491        runtime, namespace,
492    ))
493}
494
495#[cfg(not(test))]
496fn shared_jobs_runtime(
497    runtime: &JobsRuntimeServices,
498    namespace: String,
499) -> Result<Arc<dyn coil_jobs::JobsCoordinationRuntime>, String> {
500    coil_jobs::JobsBackendAdapter::live_shared_runtime(runtime, namespace, PathBuf::new())
501        .map_err(|error| error.to_string())
502}
503
504#[cfg(test)]
505fn shared_cache_runtime(
506    backend: coil_cache::CacheBackendKind,
507    namespace: String,
508) -> Arc<dyn coil_cache::DistributedCacheRuntime> {
509    crate::plan::shared_cache_runtime_for_test(backend, namespace)
510}
511
512#[cfg(not(test))]
513fn shared_cache_runtime(
514    backend: coil_cache::CacheBackendKind,
515    namespace: String,
516) -> Arc<dyn coil_cache::DistributedCacheRuntime> {
517    coil_cache::DistributedCacheClient::live_shared_runtime(backend, namespace, PathBuf::new())
518}
519
520fn required_env_bytes(name: &'static str) -> Result<Vec<u8>, RuntimeBootstrapError> {
521    match std::env::var(name) {
522        Ok(value) if !value.is_empty() => Ok(value.into_bytes()),
523        _ => Err(RuntimeBootstrapError::MissingEnvironmentVariable { name }),
524    }
525}