use std::collections::HashMap;
use std::sync::{Arc, Mutex, PoisonError, RwLock};
use aion_core::PackageVersion;
use aion_package::{ContentHash, ManifestDigest, ManifestVersion, Package};
use chrono::{DateTime, Utc};
use super::load::{LoadOutcome, LoadedWorkflow, StagedLoad, load_error, rollback_registered};
use super::version_info::WorkflowVersionInfo;
use crate::{error::EngineError, runtime::RuntimeHandle};
type StartPins = Arc<Mutex<HashMap<(String, ContentHash), usize>>>;
pub struct WorkflowCatalog {
snapshot: RwLock<Arc<CatalogSnapshot>>,
mutations: tokio::sync::Mutex<()>,
pinned_starts: StartPins,
}
#[derive(Clone, Default)]
struct CatalogSnapshot {
by_version: HashMap<(String, ContentHash), CatalogEntry>,
routed: HashMap<String, ContentHash>,
registered_modules: HashMap<String, ContentHash>,
}
#[derive(Clone, Debug)]
struct CatalogEntry {
workflow: LoadedWorkflow,
manifest_version: ManifestVersion,
manifest_digest: ManifestDigest,
loaded_at: DateTime<Utc>,
}
pub struct PinnedWorkflow {
workflow: LoadedWorkflow,
_pin: StartPin,
}
impl PinnedWorkflow {
#[must_use]
pub fn workflow(&self) -> &LoadedWorkflow {
&self.workflow
}
}
struct StartPin {
pins: StartPins,
key: (String, ContentHash),
}
impl Drop for StartPin {
fn drop(&mut self) {
let mut pins = self.pins.lock().unwrap_or_else(PoisonError::into_inner);
if let Some(count) = pins.get_mut(&self.key) {
*count = count.saturating_sub(1);
if *count == 0 {
pins.remove(&self.key);
}
}
}
}
#[derive(Debug)]
pub(crate) struct RemovedVersion {
workflow_type: String,
version: ContentHash,
entry: CatalogEntry,
modules: Vec<(String, ContentHash)>,
}
impl RemovedVersion {
pub(crate) fn module_names(&self) -> impl Iterator<Item = &str> {
self.modules.iter().map(|(name, _)| name.as_str())
}
}
impl Default for WorkflowCatalog {
fn default() -> Self {
Self::new()
}
}
impl WorkflowCatalog {
#[must_use]
pub fn new() -> Self {
Self {
snapshot: RwLock::new(Arc::new(CatalogSnapshot::default())),
mutations: tokio::sync::Mutex::new(()),
pinned_starts: Arc::new(Mutex::new(HashMap::new())),
}
}
fn current(&self) -> Result<Arc<CatalogSnapshot>, EngineError> {
let guard = self
.snapshot
.read()
.map_err(|_| EngineError::CatalogPoisoned)?;
Ok(Arc::clone(&guard))
}
fn install(&self, snapshot: CatalogSnapshot) -> Result<(), EngineError> {
*self
.snapshot
.write()
.map_err(|_| EngineError::CatalogPoisoned)? = Arc::new(snapshot);
Ok(())
}
pub fn routed(&self, workflow_type: &str) -> Result<Option<LoadedWorkflow>, EngineError> {
let snapshot = self.current()?;
Ok(snapshot
.routed_entry(workflow_type)
.map(|entry| entry.workflow.clone()))
}
pub fn routed_version(
&self,
workflow_type: &str,
) -> Result<Option<PackageVersion>, EngineError> {
Ok(self
.routed(workflow_type)?
.map(|workflow| super::package_version_of(workflow.version())))
}
pub fn get(
&self,
workflow_type: &str,
version: &ContentHash,
) -> Result<Option<LoadedWorkflow>, EngineError> {
let snapshot = self.current()?;
Ok(snapshot
.by_version
.get(&(workflow_type.to_owned(), version.clone()))
.map(|entry| entry.workflow.clone()))
}
pub fn workflows(&self) -> Result<Vec<LoadedWorkflow>, EngineError> {
let snapshot = self.current()?;
Ok(snapshot
.by_version
.values()
.map(|entry| entry.workflow.clone())
.collect())
}
pub fn versions(&self) -> Result<Vec<WorkflowVersionInfo>, EngineError> {
let snapshot = self.current()?;
let mut versions: Vec<WorkflowVersionInfo> = snapshot
.by_version
.values()
.map(|entry| WorkflowVersionInfo {
workflow_type: entry.workflow.workflow_type().to_owned(),
content_hash: entry.workflow.version().clone(),
deployed_entry_module: entry.workflow.deployed_entry_module().to_owned(),
entry_function: entry.workflow.entry_function().to_owned(),
manifest_version: entry.manifest_version.clone(),
loaded_at: entry.loaded_at,
route_active: snapshot.routed.get(entry.workflow.workflow_type())
== Some(entry.workflow.version()),
})
.collect();
versions.sort_by(|left, right| {
left.workflow_type
.cmp(&right.workflow_type)
.then(left.loaded_at.cmp(&right.loaded_at))
.then_with(|| {
left.content_hash
.to_string()
.cmp(&right.content_hash.to_string())
})
});
Ok(versions)
}
pub(crate) fn resolve_routed(
&self,
workflow_type: &str,
) -> Result<Option<PinnedWorkflow>, EngineError> {
let snapshot = self.current()?;
let Some(entry) = snapshot.routed_entry(workflow_type) else {
return Ok(None);
};
self.pin_validated(entry.workflow.clone())
}
pub(crate) fn resolve_exact(
&self,
workflow_type: &str,
version: &ContentHash,
) -> Result<Option<PinnedWorkflow>, EngineError> {
let snapshot = self.current()?;
let Some(entry) = snapshot
.by_version
.get(&(workflow_type.to_owned(), version.clone()))
else {
return Ok(None);
};
self.pin_validated(entry.workflow.clone())
}
fn pin_validated(
&self,
workflow: LoadedWorkflow,
) -> Result<Option<PinnedWorkflow>, EngineError> {
let pinned = self.pin(workflow)?;
let key = (
pinned.workflow.workflow_type().to_owned(),
pinned.workflow.version().clone(),
);
if self.current()?.by_version.contains_key(&key) {
Ok(Some(pinned))
} else {
drop(pinned);
Ok(None)
}
}
fn pin(&self, workflow: LoadedWorkflow) -> Result<PinnedWorkflow, EngineError> {
let key = (
workflow.workflow_type().to_owned(),
workflow.version().clone(),
);
{
let mut pins = self
.pinned_starts
.lock()
.map_err(|_| EngineError::CatalogPoisoned)?;
*pins.entry(key.clone()).or_insert(0) += 1;
}
Ok(PinnedWorkflow {
workflow,
_pin: StartPin {
pins: Arc::clone(&self.pinned_starts),
key,
},
})
}
pub(crate) fn has_pinned_starts(
&self,
workflow_type: &str,
version: &ContentHash,
) -> Result<bool, EngineError> {
let pins = self
.pinned_starts
.lock()
.map_err(|_| EngineError::CatalogPoisoned)?;
Ok(pins
.get(&(workflow_type.to_owned(), version.clone()))
.is_some_and(|count| *count > 0))
}
pub async fn load_package(
&self,
runtime: &RuntimeHandle,
package: &Package,
) -> Result<LoadOutcome, EngineError> {
let hash = package.content_hash();
let nif_modules = runtime.registered_nif_modules();
let originals: Vec<&str> = package
.beams()
.iter()
.map(aion_package::BeamModule::name)
.filter(|name| !nif_modules.contains(&(*name).to_owned()))
.collect();
let deployed: Vec<String> = originals
.iter()
.map(|name| aion_package::deployed_name(name, hash))
.collect();
let deployed_refs: Vec<&str> = deployed.iter().map(String::as_str).collect();
let rename_map = runtime.package_rename_map(&originals, &deployed_refs);
let nif_set: std::collections::HashSet<&str> =
nif_modules.iter().map(String::as_str).collect();
let is_nif = |name: &str| {
let original = name.split('$').next().unwrap_or(name);
nif_set.contains(original)
};
self.load_package_with(
package,
|name, bytes| {
if is_nif(name) {
return Ok(());
}
runtime.register_module_with_renames(name, bytes, &rename_map)
},
|name| {
if is_nif(name) {
return Ok(());
}
runtime.unregister_module(name)
},
|entry_module, entry_function| {
if runtime.module_exports_function(entry_module, entry_function) {
Ok(())
} else {
Err(load_error(format!(
"deployed entry module `{entry_module}` does not export entry function `{entry_function}`"
)))
}
},
)
.await
}
pub(crate) async fn load_package_with<F, R, V>(
&self,
package: &Package,
mut register: F,
mut rollback: R,
verify_entry: V,
) -> Result<LoadOutcome, EngineError>
where
F: FnMut(&str, &[u8]) -> Result<(), EngineError>,
R: FnMut(&str) -> Result<(), EngineError>,
V: FnOnce(&str, &str) -> Result<(), EngineError>,
{
let _mutation = self.mutations.lock().await;
let staged = StagedLoad::new(package)?;
let snapshot = self.current()?;
for module in &staged.modules {
if let Some(existing) = snapshot.registered_modules.get(&module.deployed_name) {
if existing != &staged.version {
return Err(load_error(format!(
"deployed module `{}` is already registered for content hash `{existing}`, not `{}`",
module.deployed_name, staged.version
)));
}
}
}
let key = (staged.workflow_type.clone(), staged.version.clone());
if let Some(existing) = snapshot.by_version.get(&key) {
if existing.manifest_digest != staged.manifest_digest {
return Err(EngineError::ManifestMismatch {
workflow_type: staged.workflow_type.clone(),
version: staged.version.clone(),
resident_digest: existing.manifest_digest.to_string(),
incoming_digest: staged.manifest_digest.to_string(),
});
}
let record = existing.workflow.clone();
let route_changed = snapshot.routed.get(&staged.workflow_type) != Some(&staged.version);
if route_changed {
let mut next = (*snapshot).clone();
next.routed
.insert(staged.workflow_type.clone(), staged.version.clone());
self.install(next)?;
}
return Ok(LoadOutcome {
record,
freshly_loaded: false,
route_changed,
});
}
let mut registered_now = Vec::new();
for module in &staged.modules {
if snapshot
.registered_modules
.contains_key(&module.deployed_name)
{
continue;
}
if let Err(error) = register(&module.deployed_name, module.bytes) {
let rollback_errors = rollback_registered(&mut rollback, ®istered_now);
return Err(load_error(format!(
"runtime rejected deployed module `{}` after {} staged registrations: {error}{}",
module.deployed_name,
registered_now.len(),
rollback_errors
)));
}
registered_now.push(module.deployed_name.clone());
}
if let Err(error) = verify_entry(&staged.deployed_entry_module, &staged.entry_function) {
let rollback_errors = rollback_registered(&mut rollback, ®istered_now);
return Err(load_error(format!(
"entry verification failed for `{}`: {error}{}",
staged.deployed_entry_module, rollback_errors
)));
}
let record = staged.record();
let mut next = (*snapshot).clone();
for module in &staged.modules {
next.registered_modules
.entry(module.deployed_name.clone())
.or_insert_with(|| staged.version.clone());
}
next.by_version.insert(
key,
CatalogEntry {
workflow: record.clone(),
manifest_version: staged.manifest_version.clone(),
manifest_digest: staged.manifest_digest.clone(),
loaded_at: Utc::now(),
},
);
let route_changed = snapshot.routed.get(&staged.workflow_type) != Some(&staged.version);
next.routed
.insert(staged.workflow_type.clone(), staged.version.clone());
self.install(next)?;
Ok(LoadOutcome {
record,
freshly_loaded: true,
route_changed,
})
}
pub(crate) async fn route_version(
&self,
workflow_type: &str,
version: &ContentHash,
) -> Result<(), EngineError> {
let _mutation = self.mutations.lock().await;
let snapshot = self.current()?;
let key = (workflow_type.to_owned(), version.clone());
if !snapshot.by_version.contains_key(&key) {
return Err(EngineError::UnknownVersion {
workflow_type: workflow_type.to_owned(),
version: version.clone(),
loaded: snapshot.loaded_versions_of(workflow_type),
});
}
if snapshot.routed.get(workflow_type) == Some(version) {
return Ok(());
}
let mut next = (*snapshot).clone();
next.routed
.insert(workflow_type.to_owned(), version.clone());
self.install(next)
}
pub(crate) async fn begin_mutation(&self) -> tokio::sync::MutexGuard<'_, ()> {
self.mutations.lock().await
}
pub(crate) fn swap_out_version(
&self,
workflow_type: &str,
version: &ContentHash,
) -> Result<RemovedVersion, EngineError> {
let snapshot = self.current()?;
let key = (workflow_type.to_owned(), version.clone());
let Some(entry) = snapshot.by_version.get(&key) else {
return Err(EngineError::UnknownVersion {
workflow_type: workflow_type.to_owned(),
version: version.clone(),
loaded: snapshot.loaded_versions_of(workflow_type),
});
};
if snapshot.routed.get(workflow_type) == Some(version) {
return Err(EngineError::RouteActive {
workflow_type: workflow_type.to_owned(),
version: version.clone(),
});
}
let mut next = (*snapshot).clone();
next.by_version.remove(&key);
let modules: Vec<(String, ContentHash)> = next
.registered_modules
.iter()
.filter(|(_, hash)| *hash == version)
.map(|(name, hash)| (name.clone(), hash.clone()))
.collect();
for (name, _) in &modules {
next.registered_modules.remove(name);
}
self.install(next)?;
Ok(RemovedVersion {
workflow_type: workflow_type.to_owned(),
version: version.clone(),
entry: entry.clone(),
modules,
})
}
pub(crate) fn restore_version(&self, removed: RemovedVersion) -> Result<(), EngineError> {
let snapshot = self.current()?;
let mut next = (*snapshot).clone();
next.by_version.insert(
(removed.workflow_type.clone(), removed.version.clone()),
removed.entry,
);
for (name, hash) in removed.modules {
next.registered_modules.insert(name, hash);
}
self.install(next)
}
}
#[cfg(test)]
#[path = "catalog_test_support.rs"]
mod test_support;
impl CatalogSnapshot {
fn routed_entry(&self, workflow_type: &str) -> Option<&CatalogEntry> {
let version = self.routed.get(workflow_type)?;
self.by_version
.get(&(workflow_type.to_owned(), version.clone()))
}
fn loaded_versions_of(&self, workflow_type: &str) -> String {
let mut versions: Vec<String> = self
.by_version
.keys()
.filter(|(loaded_type, _)| loaded_type == workflow_type)
.map(|(_, version)| version.to_string())
.collect();
versions.sort();
if versions.is_empty() {
"none".to_owned()
} else {
versions.join(", ")
}
}
}
#[cfg(test)]
#[path = "catalog_tests.rs"]
mod catalog_tests;