use std::{
collections::HashSet,
fs,
net::SocketAddr,
path::{Path, PathBuf},
time::Duration,
};
use serde::Deserialize;
use crate::error::ServerError;
pub mod env;
pub mod file;
const DEFAULT_HTTP_ADDRESS: SocketAddr =
SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 8080);
const DEFAULT_GRPC_ADDRESS: SocketAddr =
SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 50051);
#[derive(Debug, Default)]
pub struct CliOverrides {
pub config_path: Option<PathBuf>,
pub listen_address: Option<SocketAddr>,
pub store_url: Option<String>,
pub scheduler_threads: Option<usize>,
pub drain_timeout_seconds: Option<u64>,
pub workflow_packages: Vec<PathBuf>,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default, deny_unknown_fields)]
#[derive(Default)]
pub struct ServerConfig {
pub server: ServerSection,
pub store: StoreConfig,
pub runtime: RuntimeSection,
pub drain: DrainConfig,
pub auth: AuthConfig,
pub metrics: MetricsConfig,
pub namespaces: NamespacesConfig,
pub tls: Option<TlsConfig>,
pub dashboard: DashboardConfig,
pub namespace: NamespaceConfig,
pub worker: WorkerConfig,
pub websocket: WebSocketConfig,
pub workflow_packages: Vec<PathBuf>,
pub deploy: DeployConfig,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct ServerSection {
pub listen_address: SocketAddr,
pub grpc_address: SocketAddr,
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum StoreBackend {
Memory,
LibSql,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct StoreConfig {
pub backend: StoreBackend,
pub url: Option<String>,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct RuntimeSection {
pub scheduler_threads: usize,
pub query_timeout_ms: Option<u64>,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct DrainConfig {
pub timeout_seconds: u64,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct AuthConfig {
pub enabled: bool,
pub jwks_url: Option<String>,
pub jwks_refresh_seconds: u64,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct MetricsConfig {
pub enabled: bool,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct NamespacesConfig {
pub default: String,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct ListenConfig {
pub grpc: SocketAddr,
pub http: SocketAddr,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct TlsConfig {
pub certificate_chain_path: PathBuf,
pub private_key_path: PathBuf,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct DashboardConfig {
pub source: DashboardAssetSource,
}
#[derive(Clone, Debug, Deserialize)]
pub enum DashboardAssetSource {
FileSystem {
asset_path: PathBuf,
},
Embedded,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct NamespaceConfig {
pub mode: NamespaceMode,
}
#[derive(Clone, Debug, Deserialize)]
pub enum NamespaceMode {
SharedEngine,
SingleTenant {
namespace: String,
},
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct WorkerConfig {
#[serde(with = "duration_millis")]
pub heartbeat_window: Duration,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct WebSocketConfig {
pub outbound_buffer_bound: usize,
pub event_broadcast_capacity: Option<usize>,
}
pub(crate) const EVENT_BROADCAST_CAPACITY_REQUIRED: &str = "websocket.event_broadcast_capacity is required and has no default: the server always mounts /events/stream, so live event streaming capacity must be configured explicitly; set websocket.event_broadcast_capacity (or AION_WEBSOCKET_EVENT_BROADCAST_CAPACITY) to a positive integer sized for global event volume across all namespaces";
#[derive(Clone, Debug, Default, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct DeployConfig {
pub enabled: bool,
pub max_archive_bytes: Option<u64>,
pub max_inflated_bytes: Option<u64>,
}
pub(crate) const DEPLOY_MAX_ARCHIVE_BYTES_REQUIRED: &str = "deploy.max_archive_bytes is required and has no default when deploy.enabled is true: the archive upload ceiling must be an explicit operator decision sized for the deployment's packages; set deploy.max_archive_bytes (or AION_DEPLOY_MAX_ARCHIVE_BYTES) to a positive number of bytes";
pub(crate) const DEPLOY_MAX_INFLATED_BYTES_REQUIRED: &str = "deploy.max_inflated_bytes is required and has no default when deploy.enabled is true: the decompressed-contents ceiling for uploaded archives must be an explicit operator decision (a compressed upload under deploy.max_archive_bytes can inflate ~1000:1); set deploy.max_inflated_bytes (or AION_DEPLOY_MAX_INFLATED_BYTES) to a positive number of bytes no smaller than deploy.max_archive_bytes";
pub(crate) const QUERY_TIMEOUT_REQUIRED: &str = "runtime.query_timeout_ms is required and has no default: the server always mounts /workflows/query, so the workflow query reply deadline must be configured explicitly; set runtime.query_timeout_ms (or AION_RUNTIME_QUERY_TIMEOUT_MS) to a positive number of milliseconds";
#[derive(Clone, Debug)]
pub struct RuntimeConfig {
pub listen: ListenConfig,
pub tls: Option<TlsConfig>,
pub auth: AuthConfig,
pub dashboard: DashboardConfig,
pub namespace: NamespaceConfig,
pub worker: WorkerConfig,
pub websocket: WebSocketConfig,
pub workflow_packages: Vec<PathBuf>,
pub deploy: DeployConfig,
pub scheduler_threads: usize,
pub query_timeout: Option<Duration>,
pub default_namespace: String,
pub drain_timeout: Duration,
pub metrics: MetricsConfig,
}
impl ServerConfig {
pub fn load(cli: &CliOverrides) -> Result<Self, ServerError> {
let mut config = file::load(cli.config_path.as_deref())?.unwrap_or_default();
env::overlay(&mut config)?;
config.apply_cli_overrides(cli);
config.load_discovered_workflow_packages(cli, Path::new("."))?;
config.validate()?;
Ok(config)
}
fn load_discovered_workflow_packages(
&mut self,
cli: &CliOverrides,
directory: &Path,
) -> Result<(), ServerError> {
let discovered_packages = discover_workflow_packages(directory)?;
merge_workflow_packages(
&mut self.workflow_packages,
discovered_packages,
&cli.workflow_packages,
);
Ok(())
}
pub fn from_slice(bytes: &[u8]) -> Result<Self, ServerError> {
let config: Self = toml::from_slice(bytes).map_err(|source| ServerError::Config {
message: format!("invalid server config: {source}"),
})?;
config.validate()?;
Ok(config)
}
pub fn load_from_path(path: impl Into<PathBuf>) -> Result<Self, ServerError> {
file::load_required(&path.into())
}
#[must_use]
pub fn into_parts(self) -> (StoreConfig, RuntimeConfig) {
let runtime = RuntimeConfig {
listen: ListenConfig {
grpc: self.server.grpc_address,
http: self.server.listen_address,
},
tls: self.tls,
auth: self.auth,
dashboard: self.dashboard,
namespace: self.namespace,
worker: self.worker,
websocket: self.websocket,
workflow_packages: self.workflow_packages,
deploy: self.deploy,
scheduler_threads: self.runtime.scheduler_threads,
query_timeout: self.runtime.query_timeout_ms.map(Duration::from_millis),
default_namespace: self.namespaces.default,
drain_timeout: Duration::from_secs(self.drain.timeout_seconds),
metrics: self.metrics,
};
(self.store, runtime)
}
fn apply_cli_overrides(&mut self, cli: &CliOverrides) {
if let Some(address) = cli.listen_address {
self.server.listen_address = address;
}
if let Some(url) = &cli.store_url {
self.store.url = Some(url.clone());
if self.store.backend == StoreBackend::Memory {
self.store.backend = StoreBackend::LibSql;
}
}
if let Some(threads) = cli.scheduler_threads {
self.runtime.scheduler_threads = threads;
}
if let Some(timeout) = cli.drain_timeout_seconds {
self.drain.timeout_seconds = timeout;
}
}
fn validate(&self) -> Result<(), ServerError> {
if self.server.listen_address.port() == 0 {
return config_error("server.listen_address must use an explicit non-zero port");
}
if self.server.grpc_address.port() == 0 {
return config_error("server.grpc_address must use an explicit non-zero port");
}
if self.runtime.scheduler_threads == 0 {
return config_error("runtime.scheduler_threads must be greater than zero");
}
if self.drain.timeout_seconds == 0 {
return config_error("drain.timeout_seconds must be greater than zero");
}
if self.auth.enabled && self.auth.jwks_url.as_deref().is_none_or(str::is_empty) {
return config_error("auth.jwks_url must not be empty when auth.enabled is true");
}
if self.auth.jwks_refresh_seconds == 0 {
return config_error("auth.jwks_refresh_seconds must be greater than zero");
}
if self.namespaces.default.is_empty() {
return config_error("namespaces.default must not be empty");
}
if matches!(self.store.backend, StoreBackend::LibSql)
&& self.store.url.as_deref().is_none_or(str::is_empty)
{
return config_error("store.url must not be empty when store.backend is libsql");
}
if let Some(url) = &self.store.url {
if url.is_empty() {
return config_error("store.url must not be empty");
}
}
if let DashboardAssetSource::FileSystem { asset_path } = &self.dashboard.source {
if asset_path.as_os_str().is_empty() {
return config_error("dashboard.source.FileSystem.asset_path must not be empty");
}
}
if let NamespaceMode::SingleTenant { namespace } = &self.namespace.mode {
if namespace.is_empty() {
return config_error("namespace.mode.SingleTenant.namespace must not be empty");
}
}
if self.worker.heartbeat_window.is_zero() {
return config_error("worker.heartbeat_window must be greater than zero");
}
if self.websocket.outbound_buffer_bound == 0 {
return config_error("websocket.outbound_buffer_bound must be greater than zero");
}
match self.websocket.event_broadcast_capacity {
None | Some(0) => return config_error(EVENT_BROADCAST_CAPACITY_REQUIRED),
Some(_) => {}
}
match self.runtime.query_timeout_ms {
None | Some(0) => return config_error(QUERY_TIMEOUT_REQUIRED),
Some(_) => {}
}
if self.deploy.enabled {
let max_archive_bytes = match self.deploy.max_archive_bytes {
None | Some(0) => return config_error(DEPLOY_MAX_ARCHIVE_BYTES_REQUIRED),
Some(value) => value,
};
let max_inflated_bytes = match self.deploy.max_inflated_bytes {
None | Some(0) => return config_error(DEPLOY_MAX_INFLATED_BYTES_REQUIRED),
Some(value) => value,
};
ensure_fits_usize("deploy.max_archive_bytes", max_archive_bytes)?;
ensure_fits_usize("deploy.max_inflated_bytes", max_inflated_bytes)?;
if max_inflated_bytes < max_archive_bytes {
return config_error(format!(
"deploy.max_inflated_bytes ({max_inflated_bytes}) must be at least deploy.max_archive_bytes ({max_archive_bytes}): an inflate ceiling below the upload ceiling would refuse archives the upload ceiling admits, even stored uncompressed"
));
}
}
Ok(())
}
}
fn ensure_fits_usize(key: &str, value: u64) -> Result<(), ServerError> {
if usize::try_from(value).is_err() {
return config_error(format!(
"{key} ({value}) exceeds this platform's addressable memory; set it to at most {}",
usize::MAX
));
}
Ok(())
}
impl Default for ServerSection {
fn default() -> Self {
Self {
listen_address: DEFAULT_HTTP_ADDRESS,
grpc_address: DEFAULT_GRPC_ADDRESS,
}
}
}
impl Default for StoreConfig {
fn default() -> Self {
Self {
backend: StoreBackend::Memory,
url: None,
}
}
}
impl Default for RuntimeSection {
fn default() -> Self {
Self {
scheduler_threads: 1,
query_timeout_ms: None,
}
}
}
impl Default for DrainConfig {
fn default() -> Self {
Self {
timeout_seconds: 30,
}
}
}
impl Default for AuthConfig {
fn default() -> Self {
Self {
enabled: false,
jwks_url: None,
jwks_refresh_seconds: 300,
}
}
}
impl Default for MetricsConfig {
fn default() -> Self {
Self { enabled: true }
}
}
impl Default for NamespacesConfig {
fn default() -> Self {
Self {
default: "default".to_owned(),
}
}
}
impl Default for ListenConfig {
fn default() -> Self {
Self {
grpc: DEFAULT_GRPC_ADDRESS,
http: DEFAULT_HTTP_ADDRESS,
}
}
}
impl Default for DashboardConfig {
fn default() -> Self {
Self {
source: DashboardAssetSource::Embedded,
}
}
}
impl Default for NamespaceConfig {
fn default() -> Self {
Self {
mode: NamespaceMode::SharedEngine,
}
}
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
heartbeat_window: Duration::from_secs(30),
}
}
}
impl Default for WebSocketConfig {
fn default() -> Self {
Self {
outbound_buffer_bound: 32,
event_broadcast_capacity: None,
}
}
}
pub(crate) fn config_error<T>(message: impl Into<String>) -> Result<T, ServerError> {
Err(ServerError::Config {
message: message.into(),
})
}
fn discover_workflow_packages(directory: &Path) -> Result<Vec<PathBuf>, ServerError> {
let mut packages = Vec::new();
let entries = fs::read_dir(directory).map_err(|source| ServerError::Config {
message: format!(
"failed to scan workflow packages in `{}`: {source}",
directory.display()
),
})?;
for entry in entries {
let entry = entry.map_err(|source| ServerError::Config {
message: format!(
"failed to read workflow package entry in `{}`: {source}",
directory.display()
),
})?;
let path = entry.path();
let has_aion_extension = path
.extension()
.is_some_and(|extension| extension == "aion");
if path.is_file() && has_aion_extension {
packages.push(path);
}
}
packages.sort_by(|left, right| left.as_os_str().cmp(right.as_os_str()));
Ok(packages)
}
fn merge_workflow_packages(
workflow_packages: &mut Vec<PathBuf>,
discovered_packages: Vec<PathBuf>,
cli_packages: &[PathBuf],
) {
let mut seen: HashSet<PathBuf> = workflow_packages
.iter()
.map(|package| deduplicated_package_key(package))
.collect();
for package in discovered_packages
.into_iter()
.chain(cli_packages.iter().cloned())
{
if seen.insert(deduplicated_package_key(&package)) {
workflow_packages.push(package);
}
}
}
fn deduplicated_package_key(path: &Path) -> PathBuf {
path.canonicalize().unwrap_or_else(|_| path.to_path_buf())
}
mod duration_millis {
use std::time::Duration;
use serde::{Deserialize, Deserializer};
pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let millis = u64::deserialize(deserializer)?;
Ok(Duration::from_millis(millis))
}
}
#[cfg(test)]
mod tests {
use super::{
CliOverrides, ServerConfig, StoreBackend, discover_workflow_packages,
merge_workflow_packages,
};
#[test]
fn valid_toml_is_parsed_into_typed_config() -> Result<(), Box<dyn std::error::Error>> {
let config = ServerConfig::from_slice(
br#"
[server]
listen_address = "127.0.0.1:18080"
grpc_address = "127.0.0.1:15051"
[store]
backend = "libsql"
url = "aion.db"
[runtime]
scheduler_threads = 2
query_timeout_ms = 10000
[drain]
timeout_seconds = 45
[auth]
enabled = true
jwks_url = "https://issuer.example.com/.well-known/jwks.json"
jwks_refresh_seconds = 60
[metrics]
enabled = true
[namespaces]
default = "production"
[websocket]
outbound_buffer_bound = 16
event_broadcast_capacity = 1024
"#,
)?;
assert_eq!(config.store.backend, StoreBackend::LibSql);
assert_eq!(config.store.url.as_deref(), Some("aion.db"));
assert_eq!(config.runtime.scheduler_threads, 2);
assert_eq!(config.runtime.query_timeout_ms, Some(10_000));
assert_eq!(config.namespaces.default, "production");
assert_eq!(config.websocket.outbound_buffer_bound, 16);
assert_eq!(config.websocket.event_broadcast_capacity, Some(1024));
Ok(())
}
#[test]
fn missing_event_broadcast_capacity_fails_startup_validation_naming_the_key() {
let result = ServerConfig::default().validate();
let message = result
.err()
.map_or_else(String::new, |error| error.to_string());
assert!(
message.contains("websocket.event_broadcast_capacity"),
"validation message must name the missing key: {message}"
);
assert!(
message.contains("AION_WEBSOCKET_EVENT_BROADCAST_CAPACITY"),
"validation message must name the environment override: {message}"
);
}
#[test]
fn zero_event_broadcast_capacity_fails_startup_validation() {
let result = ServerConfig::from_slice(
br"
[websocket]
event_broadcast_capacity = 0
",
);
let message = result
.err()
.map_or_else(String::new, |error| error.to_string());
assert!(
message.contains("websocket.event_broadcast_capacity"),
"validation message must name the zero-valued key: {message}"
);
}
#[test]
fn missing_query_timeout_fails_startup_validation_naming_the_key() {
let result = ServerConfig::from_slice(
br"
[runtime]
scheduler_threads = 1
[websocket]
event_broadcast_capacity = 64
",
);
let message = result
.err()
.map_or_else(String::new, |error| error.to_string());
assert!(
message.contains("runtime.query_timeout_ms"),
"validation message must name the missing key: {message}"
);
assert!(
message.contains("AION_RUNTIME_QUERY_TIMEOUT_MS"),
"validation message must name the environment override: {message}"
);
}
#[test]
fn zero_query_timeout_fails_startup_validation() {
let result = ServerConfig::from_slice(
br"
[runtime]
query_timeout_ms = 0
[websocket]
event_broadcast_capacity = 64
",
);
let message = result
.err()
.map_or_else(String::new, |error| error.to_string());
assert!(
message.contains("runtime.query_timeout_ms"),
"validation message must name the zero-valued key: {message}"
);
}
#[test]
fn deploy_enabled_without_max_archive_bytes_fails_naming_key_and_env() {
let result = ServerConfig::from_slice(
br"
[runtime]
query_timeout_ms = 10000
[websocket]
event_broadcast_capacity = 64
[deploy]
enabled = true
",
);
let message = result
.err()
.map_or_else(String::new, |error| error.to_string());
assert!(
message.contains("deploy.max_archive_bytes"),
"validation message must name the missing key: {message}"
);
assert!(
message.contains("AION_DEPLOY_MAX_ARCHIVE_BYTES"),
"validation message must name the environment override: {message}"
);
}
#[test]
fn deploy_zero_max_archive_bytes_fails_startup_validation() {
let result = ServerConfig::from_slice(
br"
[runtime]
query_timeout_ms = 10000
[websocket]
event_broadcast_capacity = 64
[deploy]
enabled = true
max_archive_bytes = 0
",
);
let message = result
.err()
.map_or_else(String::new, |error| error.to_string());
assert!(
message.contains("deploy.max_archive_bytes"),
"validation message must name the zero-valued key: {message}"
);
}
#[test]
fn deploy_enabled_without_max_inflated_bytes_fails_naming_key_and_env() {
let result = ServerConfig::from_slice(
br"
[runtime]
query_timeout_ms = 10000
[websocket]
event_broadcast_capacity = 64
[deploy]
enabled = true
max_archive_bytes = 16777216
",
);
let message = result
.err()
.map_or_else(String::new, |error| error.to_string());
assert!(
message.contains("deploy.max_inflated_bytes"),
"validation message must name the missing key: {message}"
);
assert!(
message.contains("AION_DEPLOY_MAX_INFLATED_BYTES"),
"validation message must name the environment override: {message}"
);
}
#[test]
fn deploy_zero_max_inflated_bytes_fails_startup_validation() {
let result = ServerConfig::from_slice(
br"
[runtime]
query_timeout_ms = 10000
[websocket]
event_broadcast_capacity = 64
[deploy]
enabled = true
max_archive_bytes = 16777216
max_inflated_bytes = 0
",
);
let message = result
.err()
.map_or_else(String::new, |error| error.to_string());
assert!(
message.contains("deploy.max_inflated_bytes"),
"validation message must name the zero-valued key: {message}"
);
}
#[test]
fn deploy_max_inflated_below_max_archive_fails_startup_validation() {
let result = ServerConfig::from_slice(
br"
[runtime]
query_timeout_ms = 10000
[websocket]
event_broadcast_capacity = 64
[deploy]
enabled = true
max_archive_bytes = 16777216
max_inflated_bytes = 16777215
",
);
let message = result
.err()
.map_or_else(String::new, |error| error.to_string());
assert!(
message.contains("deploy.max_inflated_bytes")
&& message.contains("deploy.max_archive_bytes"),
"validation message must name both ceilings: {message}"
);
}
#[test]
fn deploy_disabled_requires_no_archive_ceiling() -> Result<(), Box<dyn std::error::Error>> {
let config = ServerConfig::from_slice(
br"
[runtime]
query_timeout_ms = 10000
[websocket]
event_broadcast_capacity = 64
",
)?;
assert!(!config.deploy.enabled);
assert_eq!(config.deploy.max_archive_bytes, None);
assert_eq!(config.deploy.max_inflated_bytes, None);
Ok(())
}
#[test]
fn deploy_section_parses_enabled_with_ceilings() -> Result<(), Box<dyn std::error::Error>> {
let config = ServerConfig::from_slice(
br"
[runtime]
query_timeout_ms = 10000
[websocket]
event_broadcast_capacity = 64
[deploy]
enabled = true
max_archive_bytes = 16777216
max_inflated_bytes = 67108864
",
)?;
assert!(config.deploy.enabled);
assert_eq!(config.deploy.max_archive_bytes, Some(16_777_216));
assert_eq!(config.deploy.max_inflated_bytes, Some(67_108_864));
Ok(())
}
#[test]
fn invalid_values_name_problematic_field() {
let result = ServerConfig::from_slice(
br"
[runtime]
scheduler_threads = 0
",
);
let message = result
.err()
.map_or_else(String::new, |error| error.to_string());
assert!(message.contains("runtime.scheduler_threads"));
}
#[test]
fn cli_overrides_win_over_loaded_values() -> Result<(), Box<dyn std::error::Error>> {
let mut config = ServerConfig::from_slice(
br#"
[store]
backend = "libsql"
url = "file.db"
[runtime]
query_timeout_ms = 10000
[websocket]
event_broadcast_capacity = 64
"#,
)?;
let cli = CliOverrides {
store_url: Some("cli.db".to_owned()),
scheduler_threads: Some(3),
..CliOverrides::default()
};
config.apply_cli_overrides(&cli);
config.validate()?;
assert_eq!(config.store.url.as_deref(), Some("cli.db"));
assert_eq!(config.runtime.scheduler_threads, 3);
Ok(())
}
#[test]
fn default_config_defaults() -> Result<(), Box<dyn std::error::Error>> {
let mut config = ServerConfig::default();
assert_eq!(config.store.backend, StoreBackend::Memory);
assert_eq!(config.store.url, None);
assert_eq!(config.server.grpc_address.to_string(), "127.0.0.1:50051");
assert_eq!(config.server.listen_address.to_string(), "127.0.0.1:8080");
assert_eq!(config.namespaces.default, "default");
assert!(!config.auth.enabled);
assert!(config.metrics.enabled);
assert_eq!(config.websocket.event_broadcast_capacity, None);
assert_eq!(config.runtime.query_timeout_ms, None);
config.websocket.event_broadcast_capacity = Some(64);
config.runtime.query_timeout_ms = Some(10_000);
config.validate()?;
Ok(())
}
#[test]
fn package_discovery_is_sorted() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
std::fs::write(temp_dir.path().join("zeta.aion"), b"package")?;
std::fs::write(temp_dir.path().join("alpha.aion"), b"package")?;
std::fs::write(temp_dir.path().join("ignored.txt"), b"package")?;
std::fs::create_dir(temp_dir.path().join("nested"))?;
std::fs::write(
temp_dir.path().join("nested").join("nested.aion"),
b"package",
)?;
let packages = discover_workflow_packages(temp_dir.path())?;
assert_eq!(
packages,
vec![
temp_dir.path().join("alpha.aion"),
temp_dir.path().join("zeta.aion"),
]
);
Ok(())
}
#[test]
fn workflow_package_merge_is_additive_and_deduplicated() {
let mut packages = vec!["config.aion".into(), "shared.aion".into()];
let discovered = vec!["auto.aion".into(), "shared.aion".into()];
let cli = vec!["cli.aion".into(), "auto.aion".into()];
merge_workflow_packages(&mut packages, discovered, &cli);
assert_eq!(
packages,
vec![
std::path::PathBuf::from("config.aion"),
std::path::PathBuf::from("shared.aion"),
std::path::PathBuf::from("auto.aion"),
std::path::PathBuf::from("cli.aion"),
]
);
}
#[test]
fn package_merge_deduplicates_canonical_files() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
let package = temp_dir.path().join("hello.aion");
std::fs::write(&package, b"package")?;
let mut packages = vec![package.clone()];
let discovered = vec![temp_dir.path().join(".").join("hello.aion")];
merge_workflow_packages(&mut packages, discovered, &[]);
assert_eq!(packages, vec![package]);
Ok(())
}
#[test]
fn zero_config_cli_workflow_package_uses_in_memory_defaults()
-> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
let cli = CliOverrides {
workflow_packages: vec!["hello-world.aion".into()],
..CliOverrides::default()
};
let mut config = ServerConfig::default();
config.websocket.event_broadcast_capacity = Some(64);
config.runtime.query_timeout_ms = Some(10_000);
config.load_discovered_workflow_packages(&cli, temp_dir.path())?;
config.validate()?;
assert_eq!(config.store.backend, StoreBackend::Memory);
assert_eq!(config.store.url, None);
assert_eq!(
config.workflow_packages,
vec![std::path::PathBuf::from("hello-world.aion")]
);
Ok(())
}
#[test]
fn cli_packages_are_additive() -> Result<(), Box<dyn std::error::Error>> {
let mut config = ServerConfig::from_slice(
br#"
workflow_packages = ["config.aion"]
[runtime]
query_timeout_ms = 10000
[websocket]
event_broadcast_capacity = 64
"#,
)?;
let cli = CliOverrides {
workflow_packages: vec!["cli-one.aion".into(), "cli-two.aion".into()],
..CliOverrides::default()
};
merge_workflow_packages(
&mut config.workflow_packages,
Vec::new(),
&cli.workflow_packages,
);
assert_eq!(
config.workflow_packages,
vec![
std::path::PathBuf::from("config.aion"),
std::path::PathBuf::from("cli-one.aion"),
std::path::PathBuf::from("cli-two.aion"),
]
);
Ok(())
}
}