mod extraction;
mod loading;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::{interval, Interval};
use tracing::{debug, error, info, warn};
use crate::registry::error::RegistryError;
use crate::registry::loader::package_loader::PackageLoader;
use crate::registry::loader::task_registrar::TaskRegistrar;
use crate::registry::traits::WorkflowRegistry;
use crate::registry::types::{WorkflowMetadata, WorkflowPackageId};
use crate::task::TaskNamespace;
#[derive(Debug, Clone)]
pub struct ReconcilerConfig {
pub reconcile_interval: Duration,
pub enable_startup_reconciliation: bool,
pub package_operation_timeout: Duration,
pub continue_on_package_error: bool,
pub default_tenant_id: String,
}
impl Default for ReconcilerConfig {
fn default() -> Self {
Self {
reconcile_interval: Duration::from_secs(30),
enable_startup_reconciliation: true,
package_operation_timeout: Duration::from_secs(30),
continue_on_package_error: true,
default_tenant_id: "public".to_string(),
}
}
}
#[derive(Debug, Clone)]
pub struct ReconcileResult {
pub packages_loaded: Vec<WorkflowPackageId>,
pub packages_unloaded: Vec<WorkflowPackageId>,
pub packages_failed: Vec<(WorkflowPackageId, String)>,
pub total_packages_tracked: usize,
pub reconciliation_duration: Duration,
}
impl ReconcileResult {
pub fn has_changes(&self) -> bool {
!self.packages_loaded.is_empty() || !self.packages_unloaded.is_empty()
}
pub fn has_failures(&self) -> bool {
!self.packages_failed.is_empty()
}
}
#[derive(Debug, Clone)]
pub(super) struct PackageState {
pub(super) metadata: WorkflowMetadata,
pub(super) task_namespaces: Vec<TaskNamespace>,
pub(super) workflow_name: Option<String>,
pub(super) trigger_names: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct ReconcilerStatus {
pub packages_loaded: usize,
pub package_details: Vec<PackageStatusDetail>,
}
#[derive(Debug, Clone)]
pub struct PackageStatusDetail {
pub package_name: String,
pub version: String,
pub task_count: usize,
pub has_workflow: bool,
}
pub struct RegistryReconciler {
pub(super) registry: Arc<dyn WorkflowRegistry>,
pub(super) config: ReconcilerConfig,
pub(super) loaded_packages: Arc<tokio::sync::RwLock<HashMap<WorkflowPackageId, PackageState>>>,
pub(super) package_loader: PackageLoader,
pub(super) task_registrar: TaskRegistrar,
shutdown_rx: watch::Receiver<bool>,
interval: Interval,
}
impl RegistryReconciler {
pub fn new(
registry: Arc<dyn WorkflowRegistry>,
config: ReconcilerConfig,
shutdown_rx: watch::Receiver<bool>,
) -> Result<Self, RegistryError> {
let interval = interval(config.reconcile_interval);
let package_loader = PackageLoader::new().map_err(RegistryError::Loader)?;
let task_registrar = TaskRegistrar::new().map_err(RegistryError::Loader)?;
Ok(Self {
registry,
config,
loaded_packages: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
package_loader,
task_registrar,
shutdown_rx,
interval,
})
}
pub async fn start_reconciliation_loop(mut self) -> Result<(), RegistryError> {
info!(
"Starting Registry Reconciler with interval {:?}",
self.config.reconcile_interval
);
if self.config.enable_startup_reconciliation {
info!("Performing startup reconciliation");
match self.reconcile().await {
Ok(result) => {
info!(
"Startup reconciliation completed: {} loaded, {} unloaded, {} failed",
result.packages_loaded.len(),
result.packages_unloaded.len(),
result.packages_failed.len()
);
}
Err(e) => {
error!("Startup reconciliation failed: {}", e);
if !self.config.continue_on_package_error {
return Err(e);
}
}
}
}
loop {
tokio::select! {
_ = self.interval.tick() => {
debug!("Running periodic reconciliation");
match self.reconcile().await {
Ok(result) => {
if result.has_changes() {
info!(
"Reconciliation completed: {} loaded, {} unloaded",
result.packages_loaded.len(),
result.packages_unloaded.len()
);
} else {
debug!("Reconciliation completed with no changes");
}
if result.has_failures() {
warn!("Reconciliation had {} failures", result.packages_failed.len());
for (package_id, error) in &result.packages_failed {
warn!("Package {} failed: {}", package_id, error);
}
}
}
Err(e) => {
error!("Reconciliation failed: {}", e);
if !self.config.continue_on_package_error {
return Err(e);
}
}
}
}
_ = self.shutdown_rx.changed() => {
if *self.shutdown_rx.borrow() {
info!("Registry Reconciler shutdown requested");
break;
}
}
}
}
info!("Registry Reconciler shutting down");
self.shutdown_cleanup().await?;
Ok(())
}
pub async fn reconcile(&self) -> Result<ReconcileResult, RegistryError> {
let start_time = std::time::Instant::now();
let db_packages = self.registry.list_workflows().await?;
let db_package_ids: HashSet<WorkflowPackageId> = db_packages.iter().map(|p| p.id).collect();
let loaded_packages = self.loaded_packages.read().await;
let loaded_package_ids: HashSet<WorkflowPackageId> =
loaded_packages.keys().cloned().collect();
drop(loaded_packages);
let packages_to_load: Vec<_> = db_package_ids
.difference(&loaded_package_ids)
.cloned()
.collect();
let packages_to_unload: Vec<_> = loaded_package_ids
.difference(&db_package_ids)
.cloned()
.collect();
debug!(
"Reconciliation: {} packages to load, {} to unload",
packages_to_load.len(),
packages_to_unload.len()
);
let mut result = ReconcileResult {
packages_loaded: Vec::new(),
packages_unloaded: Vec::new(),
packages_failed: Vec::new(),
total_packages_tracked: 0,
reconciliation_duration: Duration::ZERO,
};
for package_id in packages_to_unload {
match self.unload_package(package_id).await {
Ok(()) => {
result.packages_unloaded.push(package_id);
info!("Unloaded package: {}", package_id);
}
Err(e) => {
let error_msg = format!("Failed to unload package {}: {}", package_id, e);
error!("{}", error_msg);
result.packages_failed.push((package_id, error_msg));
if !self.config.continue_on_package_error {
return Err(e);
}
}
}
}
for package_id in packages_to_load {
if let Some(package_metadata) = db_packages.iter().find(|p| p.id == package_id) {
match self.load_package(package_metadata.clone()).await {
Ok(()) => {
result.packages_loaded.push(package_id);
info!(
"Loaded package: {} v{}",
package_metadata.package_name, package_metadata.version
);
}
Err(e) => {
let error_msg = format!(
"Failed to load package {} ({}:{}): {}",
package_id, package_metadata.package_name, package_metadata.version, e
);
error!("{}", error_msg);
result.packages_failed.push((package_id, error_msg));
if !self.config.continue_on_package_error {
return Err(e);
}
}
}
} else {
let error_msg = format!("Package {} not found in database during load", package_id);
error!("{}", error_msg);
result.packages_failed.push((package_id, error_msg));
}
}
let loaded_packages = self.loaded_packages.read().await;
result.total_packages_tracked = loaded_packages.len();
drop(loaded_packages);
result.reconciliation_duration = start_time.elapsed();
Ok(result)
}
async fn shutdown_cleanup(&self) -> Result<(), RegistryError> {
info!("Performing Registry Reconciler shutdown cleanup");
let loaded_packages = self.loaded_packages.read().await;
if !loaded_packages.is_empty() {
info!(
"Shutdown with {} packages still loaded",
loaded_packages.len()
);
for (package_id, state) in loaded_packages.iter() {
debug!(
"Loaded package on shutdown: {} - {} v{}",
package_id, state.metadata.package_name, state.metadata.version
);
}
}
Ok(())
}
pub async fn get_status(&self) -> ReconcilerStatus {
let loaded_packages = self.loaded_packages.read().await;
ReconcilerStatus {
packages_loaded: loaded_packages.len(),
package_details: loaded_packages
.values()
.map(|state| PackageStatusDetail {
package_name: state.metadata.package_name.clone(),
version: state.metadata.version.clone(),
task_count: state.task_namespaces.len(),
has_workflow: state.workflow_name.is_some(),
})
.collect(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use uuid::Uuid;
#[test]
fn test_reconciler_config_default() {
let config = ReconcilerConfig::default();
assert_eq!(config.reconcile_interval, Duration::from_secs(30));
assert!(config.enable_startup_reconciliation);
assert_eq!(config.package_operation_timeout, Duration::from_secs(30));
assert!(config.continue_on_package_error);
assert_eq!(config.default_tenant_id, "public");
}
#[test]
fn test_reconcile_result_methods() {
let result = ReconcileResult {
packages_loaded: vec![Uuid::new_v4()],
packages_unloaded: vec![],
packages_failed: vec![],
total_packages_tracked: 1,
reconciliation_duration: Duration::from_millis(100),
};
assert!(result.has_changes());
assert!(!result.has_failures());
let result_no_changes = ReconcileResult {
packages_loaded: vec![],
packages_unloaded: vec![],
packages_failed: vec![(Uuid::new_v4(), "error".to_string())],
total_packages_tracked: 0,
reconciliation_duration: Duration::from_millis(50),
};
assert!(!result_no_changes.has_changes());
assert!(result_no_changes.has_failures());
}
#[test]
fn test_reconciler_status() {
let status = ReconcilerStatus {
packages_loaded: 2,
package_details: vec![
PackageStatusDetail {
package_name: "pkg1".to_string(),
version: "1.0.0".to_string(),
task_count: 3,
has_workflow: true,
},
PackageStatusDetail {
package_name: "pkg2".to_string(),
version: "2.0.0".to_string(),
task_count: 1,
has_workflow: false,
},
],
};
assert_eq!(status.packages_loaded, 2);
assert_eq!(status.package_details.len(), 2);
assert_eq!(status.package_details[0].package_name, "pkg1");
assert!(status.package_details[0].has_workflow);
assert!(!status.package_details[1].has_workflow);
}
#[test]
fn test_reconciler_config_custom_values() {
let config = ReconcilerConfig {
reconcile_interval: Duration::from_secs(60),
enable_startup_reconciliation: false,
package_operation_timeout: Duration::from_secs(120),
continue_on_package_error: false,
default_tenant_id: "tenant-42".to_string(),
};
assert_eq!(config.reconcile_interval, Duration::from_secs(60));
assert!(!config.enable_startup_reconciliation);
assert_eq!(config.package_operation_timeout, Duration::from_secs(120));
assert!(!config.continue_on_package_error);
assert_eq!(config.default_tenant_id, "tenant-42");
}
#[test]
fn test_reconcile_result_no_changes_no_failures() {
let result = ReconcileResult {
packages_loaded: vec![],
packages_unloaded: vec![],
packages_failed: vec![],
total_packages_tracked: 5,
reconciliation_duration: Duration::from_millis(10),
};
assert!(!result.has_changes());
assert!(!result.has_failures());
assert_eq!(result.total_packages_tracked, 5);
}
#[test]
fn test_reconcile_result_unloaded_counts_as_change() {
let result = ReconcileResult {
packages_loaded: vec![],
packages_unloaded: vec![Uuid::new_v4()],
packages_failed: vec![],
total_packages_tracked: 0,
reconciliation_duration: Duration::from_millis(20),
};
assert!(result.has_changes());
assert!(!result.has_failures());
}
#[test]
fn test_reconcile_result_both_loaded_and_unloaded() {
let result = ReconcileResult {
packages_loaded: vec![Uuid::new_v4(), Uuid::new_v4()],
packages_unloaded: vec![Uuid::new_v4()],
packages_failed: vec![(Uuid::new_v4(), "timeout".to_string())],
total_packages_tracked: 3,
reconciliation_duration: Duration::from_secs(2),
};
assert!(result.has_changes());
assert!(result.has_failures());
assert_eq!(result.packages_loaded.len(), 2);
assert_eq!(result.packages_unloaded.len(), 1);
assert_eq!(result.packages_failed.len(), 1);
}
#[test]
fn test_package_status_detail_fields() {
let detail = PackageStatusDetail {
package_name: "my-workflow".to_string(),
version: "2.3.1".to_string(),
task_count: 7,
has_workflow: true,
};
assert_eq!(detail.package_name, "my-workflow");
assert_eq!(detail.version, "2.3.1");
assert_eq!(detail.task_count, 7);
assert!(detail.has_workflow);
}
#[test]
fn test_reconciler_status_empty() {
let status = ReconcilerStatus {
packages_loaded: 0,
package_details: vec![],
};
assert_eq!(status.packages_loaded, 0);
assert!(status.package_details.is_empty());
}
#[test]
fn test_reconciler_config_clone() {
let config = ReconcilerConfig::default();
let cloned = config.clone();
assert_eq!(config.reconcile_interval, cloned.reconcile_interval);
assert_eq!(
config.enable_startup_reconciliation,
cloned.enable_startup_reconciliation
);
assert_eq!(config.default_tenant_id, cloned.default_tenant_id);
}
#[test]
fn test_reconcile_result_clone() {
let id = Uuid::new_v4();
let result = ReconcileResult {
packages_loaded: vec![id],
packages_unloaded: vec![],
packages_failed: vec![],
total_packages_tracked: 1,
reconciliation_duration: Duration::from_millis(50),
};
let cloned = result.clone();
assert_eq!(cloned.packages_loaded, vec![id]);
assert_eq!(cloned.reconciliation_duration, Duration::from_millis(50));
}
}