1use super::*;
2use crate::builder::CustomerHookSet;
3use coil_assets::ActiveAssetManifest;
4use coil_storage::execution::ObjectStoreClientConfig;
5use std::fmt;
6use std::path::PathBuf;
7use std::sync::{Arc, OnceLock};
8use url::Url;
9
10mod execution;
11mod shared_state;
12#[cfg(test)]
13mod testing;
14
15pub(crate) use shared_state::shared_state_root;
16#[cfg(test)]
17pub(crate) use testing::{shared_cache_runtime_for_test, shared_jobs_runtime_for_test};
18
19#[derive(Clone)]
20pub(crate) struct SharedJobsRuntimeHandle {
21 namespace: String,
22 runtime: Arc<OnceLock<Result<Arc<dyn coil_jobs::JobsCoordinationRuntime>, String>>>,
23}
24
25impl SharedJobsRuntimeHandle {
26 pub(crate) fn new(namespace: impl Into<String>) -> Self {
27 Self {
28 namespace: namespace.into(),
29 runtime: Arc::new(OnceLock::new()),
30 }
31 }
32
33 pub(crate) fn get_or_init(
34 &self,
35 runtime: &JobsRuntimeServices,
36 ) -> Result<Arc<dyn coil_jobs::JobsCoordinationRuntime>, RuntimeJobsError> {
37 let namespace = self.namespace.clone();
38 self.runtime
39 .get_or_init(|| shared_jobs_runtime(runtime, namespace.clone()))
40 .clone()
41 .map_err(|error| {
42 RuntimeJobsError::Jobs(JobsModelError::LiveSharedBackendRequiresExplicitRuntime {
43 backend: runtime.backend,
44 namespace: error,
45 })
46 })
47 }
48}
49
50impl fmt::Debug for SharedJobsRuntimeHandle {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 f.debug_struct("SharedJobsRuntimeHandle")
53 .field("namespace", &self.namespace)
54 .field("initialized", &self.runtime.get().is_some())
55 .finish()
56 }
57}
58
59#[derive(Debug, Clone)]
60pub struct RuntimePlan {
61 pub config: PlatformConfig,
62 pub auth_package_name: String,
63 pub auth_package: AuthModelPackageSelection,
64 pub approved_outbound_http_endpoints: BTreeMap<String, Url>,
65 pub shared_backend_scope: String,
66 pub shared_state_root: PathBuf,
67 pub cache_topology: CacheTopology,
68 pub cache_planner: CachePlanner,
69 pub i18n: I18nRuntimeServices,
70 pub seo: SeoRuntimeServices,
71 pub browser: BrowserSecurityServices,
72 pub cli: CliRuntimeServices,
73 pub data: DataRuntimeServices,
74 pub jobs: JobsRuntimeServices,
75 pub observability: ObservabilityRuntimeServices,
76 pub http: HttpRuntimePlan,
77 pub handlers: BTreeMap<String, HandlerDefinition>,
78 pub storage_planner: StoragePlanner,
79 pub storefront_catalog: StorefrontCatalog,
80 pub theme_asset_manifest: Option<ActiveAssetManifest>,
81 pub template: TemplateRuntimeServices,
82 pub tls: TlsRuntimeServices,
83 pub wasm: WasmRuntimeServices,
84 pub services: Vec<ServiceDescriptor>,
85 pub modules: Vec<ModuleManifest>,
86 pub install_migrations: MigrationPlan,
87 pub extension_registry: ExtensionRegistry,
88 pub registered_extension_slots: Vec<RegisteredExtensionSlot>,
89 pub installed_extensions: Vec<InstalledExtensionSummary>,
90 pub linked_customer_plugins: Vec<LinkedCustomerPluginSummary>,
91 pub(crate) customer_hooks: CustomerHookSet,
92 pub(crate) shared_jobs_runtime: SharedJobsRuntimeHandle,
93 pub module_jobs: Vec<RegisteredModuleJob>,
94 pub module_event_subscriptions: Vec<RegisteredEventSubscription>,
95 pub module_data_repositories: Vec<RegisteredDataRepository>,
96 pub module_search_contributions: Vec<RegisteredSearchContribution>,
97 pub module_report_definitions: Vec<RegisteredReportDefinition>,
98 pub module_bulk_operations: Vec<RegisteredBulkOperation>,
99 pub registered_runtime_jobs: Vec<RuntimeJobDefinition>,
100 pub registered_runtime_event_subscriptions: Vec<RuntimeEventSubscriptionDefinition>,
101 pub jobs_domain: JobsDomain,
102 pub ops_catalog: OpsCatalog,
103}
104
105#[derive(Debug, Clone)]
106pub(crate) enum MetadataAuditBackendSelection {
107 LocalSqlite {
108 root: std::path::PathBuf,
109 namespace: String,
110 },
111 SharedPostgres {
112 runtime: coil_data::DataRuntime,
113 },
114}
115
116impl RuntimePlan {
117 pub fn auth_package(&self) -> &dyn AuthModelPackage {
118 self.auth_package.package()
119 }
120
121 pub(crate) fn metadata_audit_backend_selection(&self) -> MetadataAuditBackendSelection {
122 match self.config.storage.deployment {
123 coil_config::StorageDeployment::Distributed => {
124 MetadataAuditBackendSelection::SharedPostgres {
125 runtime: self.data.clone(),
126 }
127 }
128 coil_config::StorageDeployment::SingleNode => {
129 MetadataAuditBackendSelection::LocalSqlite {
130 root: std::path::PathBuf::from(&self.config.storage.local_root),
131 namespace: self.shared_backend_namespace(),
132 }
133 }
134 }
135 }
136
137 pub fn approved_outbound_http_endpoints(&self) -> &BTreeMap<String, Url> {
138 &self.approved_outbound_http_endpoints
139 }
140
141 pub fn tenant_id(&self) -> i64 {
142 self.config.auth.tenant_id
143 }
144
145 pub fn jobs_host(
146 &self,
147 scheduler_node_id: impl Into<String>,
148 ) -> Result<JobsHost, RuntimeJobsError> {
149 let scheduler_node_id =
150 validate_runtime_identifier("scheduler_node_id", scheduler_node_id.into())?;
151 let namespace = self.shared_backend_namespace();
152 let shared_runtime = self.shared_jobs_runtime.get_or_init(&self.jobs)?;
153 Ok(JobsHost::new(
154 self.config.app.name.clone(),
155 scheduler_node_id,
156 self.jobs.clone(),
157 self.observability.telemetry.clone(),
158 self.jobs.describe().clone(),
159 self.registered_runtime_jobs.clone(),
160 self.registered_runtime_event_subscriptions.clone(),
161 self.jobs_domain.clone(),
162 shared_runtime,
163 namespace,
164 ))
165 }
166
167 pub fn ops_host(
168 &self,
169 scheduler_node_id: impl Into<String>,
170 ) -> Result<OpsHost, RuntimeOpsError> {
171 Ok(OpsHost::new(
172 OpsPlanner::new(self.jobs.clone(), self.ops_catalog.clone())?,
173 self.jobs_host(scheduler_node_id)?,
174 ))
175 }
176
177 pub fn search_host(
178 &self,
179 scheduler_node_id: impl Into<String>,
180 ) -> Result<SearchHost, RuntimeSearchError> {
181 Ok(SearchHost::new(
182 self.ops_catalog.search.clone(),
183 self.ops_host(scheduler_node_id)?,
184 ))
185 }
186
187 pub fn cache_host(&self) -> Result<CacheHost, RuntimeCacheError> {
188 let namespace = self.cache_namespace()?;
189 let shared_namespace = self.shared_backend_namespace();
190 if self.cache_planner.topology().supports_shared_invalidation() {
191 #[cfg(test)]
192 {
193 let backend = match self
194 .cache_planner
195 .topology()
196 .l2()
197 .expect("shared cache runtime requires distributed l2")
198 {
199 coil_cache::DistributedCacheBackend::Redis => {
200 coil_cache::CacheBackendKind::Redis
201 }
202 coil_cache::DistributedCacheBackend::Valkey => {
203 coil_cache::CacheBackendKind::Valkey
204 }
205 };
206 let runtime = shared_cache_runtime_for_test(backend, shared_namespace.clone());
207 return Ok(CacheHost::new(
208 self.config.app.name.clone(),
209 namespace,
210 self.cache_planner,
211 Some(runtime),
212 shared_namespace,
213 ));
214 }
215
216 #[cfg(not(test))]
217 {
218 let backend = match self
219 .cache_planner
220 .topology()
221 .l2()
222 .expect("shared cache runtime requires distributed l2")
223 {
224 coil_cache::DistributedCacheBackend::Redis => {
225 coil_cache::CacheBackendKind::Redis
226 }
227 coil_cache::DistributedCacheBackend::Valkey => {
228 coil_cache::CacheBackendKind::Valkey
229 }
230 };
231 let runtime = shared_cache_runtime(backend, shared_namespace.clone());
232 return Ok(CacheHost::new(
233 self.config.app.name.clone(),
234 namespace,
235 self.cache_planner,
236 Some(runtime),
237 shared_namespace,
238 ));
239 }
240 }
241
242 Ok(CacheHost::new(
243 self.config.app.name.clone(),
244 namespace,
245 self.cache_planner,
246 None,
247 shared_namespace,
248 ))
249 }
250
251 #[cfg(test)]
252 pub fn browser_host(&self) -> Result<BrowserHost, BrowserHostBuildError> {
253 BrowserHost::new_with_scope(
254 self.config.app.name.clone(),
255 self.browser.clone(),
256 self.shared_backend_scope.clone(),
257 )
258 }
259
260 pub fn tls_host(&self) -> Result<TlsHost, RuntimeTlsError> {
261 self.tls_host_with_secret_resolver(&crate::server::EnvironmentSecretResolver)
262 }
263
264 pub fn tls_host_with_secret_resolver<R: crate::server::SecretResolver>(
265 &self,
266 resolver: &R,
267 ) -> Result<TlsHost, RuntimeTlsError> {
268 let account_secret = self
269 .config
270 .tls
271 .account_secret
272 .as_ref()
273 .map(|secret| resolver.resolve(secret))
274 .transpose()?;
275 TlsHost::new(
276 self.config.app.name.clone(),
277 self.tls.clone(),
278 self.data.clone(),
279 self.shared_backend_scope.clone(),
280 account_secret,
281 )
282 }
283
284 pub fn tls_validation_host_with_secret_resolver<R: crate::server::SecretResolver>(
285 &self,
286 resolver: &R,
287 ) -> Result<TlsHost, RuntimeTlsError> {
288 let account_secret = self
289 .config
290 .tls
291 .account_secret
292 .as_ref()
293 .map(|secret| resolver.resolve(secret))
294 .transpose()?;
295 TlsHost::new_for_validation(
296 self.config.app.name.clone(),
297 self.tls.clone(),
298 self.shared_backend_scope.clone(),
299 account_secret,
300 )
301 }
302
303 pub fn storage_host(&self) -> StorageHost {
304 self.storage_host_with_object_store(None)
305 }
306
307 pub fn storage_host_with_object_store(
308 &self,
309 object_store: Option<ObjectStoreClientConfig>,
310 ) -> StorageHost {
311 StorageHost::new(
312 self.config.app.name.clone(),
313 self.storage_planner.clone(),
314 self.config.assets.cdn_base_url.clone(),
315 object_store,
316 )
317 }
318
319 pub fn wasm_host(&self) -> WasmHost {
320 WasmHost::new(
321 self.clone(),
322 self.config.app.name.clone(),
323 self.wasm.clone(),
324 self.extension_registry.clone(),
325 self.config.i18n.default_locale.clone(),
326 self.registered_runtime_jobs.clone(),
327 )
328 }
329
330 pub fn wasm_host_with_secret_resolver<R: SecretResolver>(
331 &self,
332 resolver: &R,
333 ) -> Result<WasmHost, RuntimeServerError> {
334 let wasm_secrets = self.wasm_secret_values(resolver)?;
335 let storage_host =
336 self.storage_host_with_object_store(self.object_store_client_config(resolver)?);
337 Ok(WasmHost::with_host_services(
338 self.clone(),
339 self.config.app.name.clone(),
340 self.wasm.clone(),
341 self.extension_registry.clone(),
342 self.config.i18n.default_locale.clone(),
343 self.registered_runtime_jobs.clone(),
344 RuntimeWasmHostServices::with_runtime_secrets(self.clone(), storage_host, wasm_secrets),
345 ))
346 }
347
348 pub fn wasm_secret_values<R: SecretResolver>(
349 &self,
350 resolver: &R,
351 ) -> Result<BTreeMap<String, String>, RuntimeServerError> {
352 self.config
353 .wasm
354 .secret_bindings
355 .iter()
356 .map(|(name, secret)| {
357 resolver
358 .resolve(secret)
359 .map(|value| (name.clone(), value))
360 .map_err(|error| RuntimeServerError::Secret(error))
361 })
362 .collect()
363 }
364
365 pub fn shared_backend_clients<R: SecretResolver>(
366 &self,
367 resolver: &R,
368 ) -> Result<SharedBackendClients, RuntimeServerError> {
369 Ok(SharedBackendClients::from_config(&self.config, resolver)?)
370 }
371
372 pub fn object_store_client_config<R: SecretResolver>(
373 &self,
374 resolver: &R,
375 ) -> Result<Option<ObjectStoreClientConfig>, RuntimeServerError> {
376 Ok(SharedBackendClients::object_store_client_config(
377 &self.config,
378 resolver,
379 )?)
380 }
381
382 pub fn server_host<R: SecretResolver>(
383 &self,
384 resolver: &R,
385 cookie_secret: &[u8],
386 csrf_secret: &[u8],
387 ) -> Result<HttpServerHost, RuntimeServerError> {
388 if self.browser.sessions.store == coil_core::SessionStoreTopology::Memory {
389 return Err(BrowserHostBuildError::MemoryStoreRequiresTestOnlyBrowserHost.into());
390 }
391
392 let wasm_secrets = self.wasm_secret_values(resolver)?;
393 let payment_webhook_secret =
394 crate::server::resolve_commerce_payment_webhook_secret(&self.config, resolver)?;
395 HttpServerHost::new(
396 self.clone(),
397 self.shared_backend_clients(resolver)?,
398 wasm_secrets,
399 payment_webhook_secret,
400 cookie_secret.to_vec(),
401 csrf_secret.to_vec(),
402 )
403 }
404
405 #[cfg(test)]
406 pub(crate) fn server_host_with_checkout_client<R: SecretResolver>(
407 &self,
408 resolver: &R,
409 cookie_secret: &[u8],
410 csrf_secret: &[u8],
411 hosted_checkout_client: std::sync::Arc<dyn crate::server::HostedCheckoutClient>,
412 ) -> Result<HttpServerHost, RuntimeServerError> {
413 if self.browser.sessions.store == coil_core::SessionStoreTopology::Memory {
414 return Err(BrowserHostBuildError::MemoryStoreRequiresTestOnlyBrowserHost.into());
415 }
416
417 let wasm_secrets = self.wasm_secret_values(resolver)?;
418 let payment_webhook_secret =
419 crate::server::resolve_commerce_payment_webhook_secret(&self.config, resolver)?;
420 HttpServerHost::new_with_checkout_client(
421 self.clone(),
422 self.shared_backend_clients(resolver)?,
423 wasm_secrets,
424 payment_webhook_secret,
425 cookie_secret.to_vec(),
426 csrf_secret.to_vec(),
427 hosted_checkout_client,
428 )
429 }
430
431 pub fn serve_from_env(
432 self,
433 bind_override: Option<String>,
434 ) -> Result<(), RuntimeBootstrapError> {
435 let cookie_secret = required_env_bytes("COIL_COOKIE_SECRET")?;
436 let csrf_secret = required_env_bytes("COIL_CSRF_SECRET")?;
437 let bind = bind_override.unwrap_or_else(|| self.config.server.bind.clone());
438 let runtime = tokio::runtime::Builder::new_multi_thread()
439 .enable_all()
440 .build()
441 .map_err(|error| RuntimeBootstrapError::Serve {
442 reason: error.to_string(),
443 })?;
444 let server = {
445 let _runtime_guard = runtime.enter();
446 self.server_host(
447 &crate::server::EnvironmentSecretResolver,
448 &cookie_secret,
449 &csrf_secret,
450 )?
451 };
452
453 runtime.block_on(async move {
454 let listener = tokio::net::TcpListener::bind(&bind)
455 .await
456 .map_err(|error| RuntimeBootstrapError::Bind {
457 bind: bind.clone(),
458 reason: error.to_string(),
459 })?;
460 server
461 .serve(listener)
462 .await
463 .map_err(|error| RuntimeBootstrapError::Serve {
464 reason: error.to_string(),
465 })
466 })
467 }
468
469 pub(crate) fn cache_namespace(&self) -> Result<CacheNamespace, CacheModelError> {
470 CacheNamespace::new(format!("customer-app:{}", self.config.app.name))
471 }
472
473 pub(crate) fn shared_backend_namespace(&self) -> String {
474 format!(
475 "customer-app:{}:{}",
476 self.config.app.name, self.shared_backend_scope
477 )
478 }
479
480 pub(crate) fn shared_state_root(&self) -> &PathBuf {
481 &self.shared_state_root
482 }
483}
484
485#[cfg(test)]
486fn shared_jobs_runtime(
487 runtime: &JobsRuntimeServices,
488 namespace: String,
489) -> Result<Arc<dyn coil_jobs::JobsCoordinationRuntime>, String> {
490 Ok(crate::plan::shared_jobs_runtime_for_test(
491 runtime, namespace,
492 ))
493}
494
495#[cfg(not(test))]
496fn shared_jobs_runtime(
497 runtime: &JobsRuntimeServices,
498 namespace: String,
499) -> Result<Arc<dyn coil_jobs::JobsCoordinationRuntime>, String> {
500 if use_emulated_shared_backends() {
501 return Ok(emulated_shared_jobs_runtime(runtime, namespace));
502 }
503 coil_jobs::JobsBackendAdapter::live_shared_runtime(runtime, namespace, PathBuf::new())
504 .map_err(|error| error.to_string())
505}
506
507#[cfg(test)]
508fn shared_cache_runtime(
509 backend: coil_cache::CacheBackendKind,
510 namespace: String,
511) -> Arc<dyn coil_cache::DistributedCacheRuntime> {
512 crate::plan::shared_cache_runtime_for_test(backend, namespace)
513}
514
515#[cfg(not(test))]
516fn shared_cache_runtime(
517 backend: coil_cache::CacheBackendKind,
518 namespace: String,
519) -> Arc<dyn coil_cache::DistributedCacheRuntime> {
520 if use_emulated_shared_backends() {
521 return emulated_shared_cache_runtime(backend, namespace);
522 }
523 coil_cache::DistributedCacheClient::live_shared_runtime(backend, namespace, PathBuf::new())
524}
525
526fn use_emulated_shared_backends() -> bool {
527 std::env::var("COIL_EMULATED_SHARED_BACKENDS")
528 .map(|value| {
529 let normalized = value.trim().to_ascii_lowercase();
530 matches!(normalized.as_str(), "1" | "true" | "yes" | "on")
531 })
532 .unwrap_or(false)
533}
534
535#[cfg(not(test))]
536fn emulated_shared_jobs_runtime(
537 runtime: &JobsRuntimeServices,
538 namespace: String,
539) -> Arc<dyn coil_jobs::JobsCoordinationRuntime> {
540 use std::collections::BTreeMap;
541 use std::sync::{Mutex, OnceLock};
542
543 static REGISTRY: OnceLock<
544 Mutex<BTreeMap<String, Arc<dyn coil_jobs::JobsCoordinationRuntime>>>,
545 > = OnceLock::new();
546
547 let key = format!(
548 "{:?}:{}:{}:{}:{}:{}",
549 runtime.backend,
550 runtime.topology.work_queue.as_str(),
551 runtime.topology.scheduled_queue.as_str(),
552 runtime.topology.domain_events_queue.as_str(),
553 runtime.topology.dead_letter_queue.as_str(),
554 namespace
555 );
556 let registry = REGISTRY.get_or_init(|| Mutex::new(BTreeMap::new()));
557 let mut guard = registry.lock().expect("emulated jobs registry mutex poisoned");
558 guard
559 .entry(key)
560 .or_insert_with(|| coil_jobs::JobsBackendAdapter::emulated_shared_runtime(runtime))
561 .clone()
562}
563
564#[cfg(not(test))]
565fn emulated_shared_cache_runtime(
566 backend: coil_cache::CacheBackendKind,
567 namespace: String,
568) -> Arc<dyn coil_cache::DistributedCacheRuntime> {
569 use std::collections::BTreeMap;
570 use std::sync::{Mutex, OnceLock};
571
572 static REGISTRY: OnceLock<
573 Mutex<BTreeMap<String, Arc<dyn coil_cache::DistributedCacheRuntime>>>,
574 > = OnceLock::new();
575
576 let key = format!("{backend:?}:{namespace}");
577 let registry = REGISTRY.get_or_init(|| Mutex::new(BTreeMap::new()));
578 let mut guard = registry.lock().expect("emulated cache registry mutex poisoned");
579 guard
580 .entry(key)
581 .or_insert_with(|| coil_cache::DistributedCacheClient::emulated_shared_runtime(backend))
582 .clone()
583}
584
585fn required_env_bytes(name: &'static str) -> Result<Vec<u8>, RuntimeBootstrapError> {
586 match std::env::var(name) {
587 Ok(value) if !value.is_empty() => Ok(value.into_bytes()),
588 _ => Err(RuntimeBootstrapError::MissingEnvironmentVariable { name }),
589 }
590}