use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use lean_rs_worker_parent::{
LeanWorkerDeclarationInspectionRequest, LeanWorkerDeclarationInspectionResult, LeanWorkerDeclarationSearch,
LeanWorkerDeclarationSearchResult, LeanWorkerDeclarationVerificationRequest,
LeanWorkerDeclarationVerificationResult, LeanWorkerElabOptions, LeanWorkerModuleQuery,
LeanWorkerModuleQueryBatchOutcome, LeanWorkerModuleQueryOutcome, LeanWorkerModuleQuerySelector,
LeanWorkerOutputBudgets, LeanWorkerProofAttemptRequest, LeanWorkerProofAttemptResult,
};
use lru::LruCache;
use parking_lot::Mutex;
use tokio::sync::Mutex as AsyncMutex;
use crate::cache::{ModuleQueryBatchKey, ModuleQueryKey};
use crate::config_file::BrokerFileConfig;
use crate::envelope::{Freshness, RuntimeFacts};
use crate::error::{Result, ServerError};
use crate::lake_meta::{LakeProjectMeta, fingerprint_lake_project};
use crate::project::{LeanProject, ProjectCall, ProjectRuntimeConfig, SemanticAdmission};
pub const DEFAULT_MAX_PROJECTS: usize = 4;
pub const DEFAULT_IDLE_TIMEOUT_SECS: u64 = 600;
pub const DEFAULT_SEMANTIC_PERMITS: usize = 1;
pub const DEFAULT_SEMANTIC_WAITERS: usize = 16;
pub const DEFAULT_SEMANTIC_ADMISSION_TIMEOUT_MILLIS: u64 = 60_000;
const REAPER_TICK: Duration = Duration::from_mins(1);
#[derive(Debug, Clone)]
pub struct BrokerConfig {
pub config_default: Option<PathBuf>,
pub env_default: Option<PathBuf>,
pub cwd: PathBuf,
pub max_projects: NonZeroUsize,
pub idle_timeout: Duration,
pub semantic_permits: NonZeroUsize,
pub semantic_waiters: NonZeroUsize,
pub semantic_admission_timeout: Duration,
}
impl BrokerConfig {
pub fn pool_from_env() -> Result<(NonZeroUsize, Duration, NonZeroUsize, NonZeroUsize, Duration)> {
Self::pool_from_env_with_file(&BrokerFileConfig::default())
}
pub fn pool_from_env_with_file(
file: &BrokerFileConfig,
) -> Result<(NonZeroUsize, Duration, NonZeroUsize, NonZeroUsize, Duration)> {
parse_pool_config(
std::env::var("LEAN_HOST_MCP_MAX_PROJECTS").ok().as_deref(),
std::env::var("LEAN_HOST_MCP_IDLE_TIMEOUT_SECS").ok().as_deref(),
std::env::var("LEAN_HOST_MCP_SEMANTIC_PERMITS").ok().as_deref(),
std::env::var("LEAN_HOST_MCP_SEMANTIC_WAITERS").ok().as_deref(),
std::env::var("LEAN_HOST_MCP_SEMANTIC_ADMISSION_TIMEOUT_MILLIS")
.ok()
.as_deref(),
file,
)
}
#[must_use]
pub fn default_max_projects() -> NonZeroUsize {
NonZeroUsize::new(DEFAULT_MAX_PROJECTS).unwrap_or(NonZeroUsize::MIN)
}
#[must_use]
pub const fn default_idle_timeout() -> Duration {
Duration::from_secs(DEFAULT_IDLE_TIMEOUT_SECS)
}
#[must_use]
pub fn default_semantic_permits() -> NonZeroUsize {
NonZeroUsize::new(DEFAULT_SEMANTIC_PERMITS).unwrap_or(NonZeroUsize::MIN)
}
#[must_use]
pub fn default_semantic_waiters() -> NonZeroUsize {
NonZeroUsize::new(DEFAULT_SEMANTIC_WAITERS).unwrap_or(NonZeroUsize::MIN)
}
#[must_use]
pub const fn default_semantic_admission_timeout() -> Duration {
Duration::from_millis(DEFAULT_SEMANTIC_ADMISSION_TIMEOUT_MILLIS)
}
}
fn parse_pool_config(
max: Option<&str>,
idle: Option<&str>,
semantic: Option<&str>,
semantic_waiters: Option<&str>,
semantic_timeout_millis: Option<&str>,
file: &BrokerFileConfig,
) -> Result<(NonZeroUsize, Duration, NonZeroUsize, NonZeroUsize, Duration)> {
let max_projects = nonzero(
resolve_usize(
"LEAN_HOST_MCP_MAX_PROJECTS",
max,
file.max_projects,
DEFAULT_MAX_PROJECTS,
)?,
"LEAN_HOST_MCP_MAX_PROJECTS=0 would deadlock the pool",
)?;
let idle_timeout = Duration::from_secs(resolve_u64(
"LEAN_HOST_MCP_IDLE_TIMEOUT_SECS",
idle,
file.idle_timeout_secs,
DEFAULT_IDLE_TIMEOUT_SECS,
)?);
let semantic_permits = nonzero(
resolve_usize(
"LEAN_HOST_MCP_SEMANTIC_PERMITS",
semantic,
file.semantic_permits,
DEFAULT_SEMANTIC_PERMITS,
)?,
"LEAN_HOST_MCP_SEMANTIC_PERMITS=0 would deadlock semantic work",
)?;
let semantic_waiters = nonzero(
resolve_usize(
"LEAN_HOST_MCP_SEMANTIC_WAITERS",
semantic_waiters,
file.semantic_waiters,
DEFAULT_SEMANTIC_WAITERS,
)?,
"LEAN_HOST_MCP_SEMANTIC_WAITERS=0 would reject all waiters",
)?;
let timeout_millis = resolve_u64(
"LEAN_HOST_MCP_SEMANTIC_ADMISSION_TIMEOUT_MILLIS",
semantic_timeout_millis,
file.semantic_admission_timeout_millis,
DEFAULT_SEMANTIC_ADMISSION_TIMEOUT_MILLIS,
)?;
if timeout_millis == 0 {
return Err(ServerError::Internal(
"LEAN_HOST_MCP_SEMANTIC_ADMISSION_TIMEOUT_MILLIS=0 is not allowed".into(),
));
}
Ok((
max_projects,
idle_timeout,
semantic_permits,
semantic_waiters,
Duration::from_millis(timeout_millis),
))
}
fn resolve_usize(name: &str, env: Option<&str>, file: Option<usize>, default: usize) -> Result<usize> {
match env {
Some(s) => s
.parse()
.map_err(|e| ServerError::Internal(format!("{name}={s:?} not a usize: {e}"))),
None => Ok(file.unwrap_or(default)),
}
}
fn resolve_u64(name: &str, env: Option<&str>, file: Option<u64>, default: u64) -> Result<u64> {
match env {
Some(s) => s
.parse()
.map_err(|e| ServerError::Internal(format!("{name}={s:?} not a u64: {e}"))),
None => Ok(file.unwrap_or(default)),
}
}
fn nonzero(value: usize, zero_message: &str) -> Result<NonZeroUsize> {
NonZeroUsize::new(value).ok_or_else(|| ServerError::Internal(zero_message.to_owned()))
}
#[derive(Debug, Clone)]
pub enum ProjectHint {
Explicit(PathBuf),
Default,
}
impl ProjectHint {
#[must_use]
pub fn from_request(value: Option<String>) -> Self {
match value {
Some(s) if !s.is_empty() => Self::Explicit(PathBuf::from(s)),
_ => Self::Default,
}
}
}
struct BrokerInner {
registry: LruCache<PathBuf, Arc<LeanProject>>,
last_used: HashMap<PathBuf, Instant>,
opening_locks: HashMap<PathBuf, Arc<AsyncMutex<()>>>,
}
#[derive(Debug, Clone)]
pub struct BrokerCall<T> {
pub value: T,
pub runtime: RuntimeFacts,
pub freshness: Freshness,
}
#[derive(Debug, Clone)]
pub(crate) struct CachedBrokerCall<T> {
pub value: T,
pub runtime: RuntimeFacts,
pub freshness: Freshness,
pub freshly_processed: bool,
}
pub struct ProjectBroker {
inner: Mutex<BrokerInner>,
config: BrokerConfig,
runtime_config: ProjectRuntimeConfig,
semantic_admission: Arc<SemanticAdmission>,
}
impl std::fmt::Debug for ProjectBroker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProjectBroker")
.field("config", &self.config)
.finish_non_exhaustive()
}
}
impl ProjectBroker {
#[must_use]
pub fn new(config: BrokerConfig) -> Arc<Self> {
Self::new_with_runtime_config(config, ProjectRuntimeConfig::default())
}
#[must_use]
pub fn new_with_runtime_config(config: BrokerConfig, runtime_config: ProjectRuntimeConfig) -> Arc<Self> {
let broker = Arc::new(Self {
inner: Mutex::new(BrokerInner {
registry: LruCache::new(config.max_projects),
last_used: HashMap::new(),
opening_locks: HashMap::new(),
}),
runtime_config,
semantic_admission: SemanticAdmission::new(
config.semantic_permits,
config.semantic_waiters,
config.semantic_admission_timeout,
),
config,
});
broker.spawn_reaper();
broker
}
fn spawn_reaper(self: &Arc<Self>) {
if self.config.idle_timeout.is_zero() {
return;
}
let Ok(handle) = tokio::runtime::Handle::try_current() else {
return;
};
let weak: Weak<Self> = Arc::downgrade(self);
handle.spawn(async move {
let mut tick = tokio::time::interval(REAPER_TICK);
tick.tick().await;
loop {
tick.tick().await;
let Some(broker) = weak.upgrade() else {
break;
};
broker.reap_idle();
}
});
}
pub fn resolve(&self, hint: &ProjectHint) -> Result<PathBuf> {
if let ProjectHint::Explicit(p) = hint {
return canonicalise(p);
}
if let Some(env) = &self.config.env_default {
return canonicalise(env);
}
if let Some(found) = walk_up_for_lakefile(&self.config.cwd) {
return canonicalise(&found);
}
if let Some(cfg) = &self.config.config_default {
return canonicalise(cfg);
}
Err(ServerError::BadProject(format!(
"no lakefile found from {} and no env/config default set",
self.config.cwd.display()
)))
}
pub fn resolve_meta(&self, hint: &ProjectHint) -> Result<LakeProjectMeta> {
let root = self.resolve(hint)?;
LakeProjectMeta::from_explicit(&root)
}
pub async fn process_module_query(
&self,
hint: ProjectHint,
session_imports: Vec<String>,
freshness_imports: Vec<String>,
source: String,
query: LeanWorkerModuleQuery,
options: LeanWorkerElabOptions,
) -> Result<BrokerCall<LeanWorkerModuleQueryOutcome>> {
let project = self.project_for(hint).await?;
let call = project
.process_module_query(session_imports, source, query, options)
.await?;
let out = broker_call(&project, &freshness_imports, call);
drop(project);
Ok(out)
}
pub(crate) async fn process_cached_module_query(
&self,
hint: ProjectHint,
path: PathBuf,
content_hash: [u8; 32],
session_imports: Vec<String>,
freshness_imports: Vec<String>,
source: String,
query: LeanWorkerModuleQuery,
options: LeanWorkerElabOptions,
) -> Result<CachedBrokerCall<LeanWorkerModuleQueryOutcome>> {
let project = self.project_for(hint).await?;
let key = ModuleQueryKey::from_query(&query);
if let Some(value) = project.module_query_cache().get(&path, content_hash, &key) {
return Ok(CachedBrokerCall {
value,
runtime: project.runtime_facts(),
freshness: project.freshness(&freshness_imports),
freshly_processed: false,
});
}
let call = project
.process_module_query(session_imports, source, query, options)
.await?;
let (value, runtime) = call.into_parts();
project
.module_query_cache()
.insert(path, content_hash, key, value.clone());
Ok(CachedBrokerCall {
value,
runtime,
freshness: project.freshness(&freshness_imports),
freshly_processed: true,
})
}
pub async fn process_module_query_batch(
&self,
hint: ProjectHint,
session_imports: Vec<String>,
freshness_imports: Vec<String>,
source: String,
selectors: Vec<LeanWorkerModuleQuerySelector>,
budgets: LeanWorkerOutputBudgets,
options: LeanWorkerElabOptions,
) -> Result<BrokerCall<LeanWorkerModuleQueryBatchOutcome>> {
let project = self.project_for(hint).await?;
let call = project
.process_module_query_batch(session_imports, source, selectors, budgets, options)
.await?;
let out = broker_call(&project, &freshness_imports, call);
drop(project);
Ok(out)
}
pub(crate) async fn process_cached_module_query_batch(
&self,
hint: ProjectHint,
path: PathBuf,
content_hash: [u8; 32],
session_imports: Vec<String>,
freshness_imports: Vec<String>,
source: String,
selectors: Vec<LeanWorkerModuleQuerySelector>,
budgets: LeanWorkerOutputBudgets,
options: LeanWorkerElabOptions,
) -> Result<CachedBrokerCall<LeanWorkerModuleQueryBatchOutcome>> {
let project = self.project_for(hint).await?;
let key = ModuleQueryBatchKey::from_batch(&selectors, &budgets);
if let Some(value) = project.module_query_cache().get_batch(&path, content_hash, &key) {
return Ok(CachedBrokerCall {
value,
runtime: project.runtime_facts(),
freshness: project.freshness(&freshness_imports),
freshly_processed: false,
});
}
let call = project
.process_module_query_batch(session_imports, source, selectors, budgets, options)
.await?;
let (value, runtime) = call.into_parts();
project
.module_query_cache()
.insert_batch(path, content_hash, key, value.clone());
Ok(CachedBrokerCall {
value,
runtime,
freshness: project.freshness(&freshness_imports),
freshly_processed: true,
})
}
pub async fn inspect_declaration(
&self,
hint: ProjectHint,
session_imports: Vec<String>,
freshness_imports: Vec<String>,
request: LeanWorkerDeclarationInspectionRequest,
) -> Result<BrokerCall<LeanWorkerDeclarationInspectionResult>> {
let project = self.project_for(hint).await?;
let call = project.inspect_declaration(session_imports, request).await?;
let out = broker_call(&project, &freshness_imports, call);
drop(project);
Ok(out)
}
pub async fn search_declarations(
&self,
hint: ProjectHint,
session_imports: Vec<String>,
freshness_imports: Vec<String>,
request: LeanWorkerDeclarationSearch,
) -> Result<BrokerCall<LeanWorkerDeclarationSearchResult>> {
let project = self.project_for(hint).await?;
let call = project.search_declarations(session_imports, request).await?;
let out = broker_call(&project, &freshness_imports, call);
drop(project);
Ok(out)
}
pub async fn attempt_proof(
&self,
hint: ProjectHint,
session_imports: Vec<String>,
freshness_imports: Vec<String>,
request: LeanWorkerProofAttemptRequest,
options: LeanWorkerElabOptions,
) -> Result<BrokerCall<LeanWorkerProofAttemptResult>> {
let project = self.project_for(hint).await?;
let call = project.attempt_proof(session_imports, request, options).await?;
let out = broker_call(&project, &freshness_imports, call);
drop(project);
Ok(out)
}
pub async fn verify_declaration(
&self,
hint: ProjectHint,
session_imports: Vec<String>,
freshness_imports: Vec<String>,
request: LeanWorkerDeclarationVerificationRequest,
options: LeanWorkerElabOptions,
) -> Result<BrokerCall<LeanWorkerDeclarationVerificationResult>> {
let project = self.project_for(hint).await?;
let call = project.verify_declaration(session_imports, request, options).await?;
let out = broker_call(&project, &freshness_imports, call);
drop(project);
Ok(out)
}
pub async fn project_runtime(&self, hint: ProjectHint, freshness_imports: Vec<String>) -> Result<BrokerCall<()>> {
let project = self.project_for(hint).await?;
let out = BrokerCall {
value: (),
runtime: project.runtime_facts(),
freshness: project.freshness(&freshness_imports),
};
drop(project);
Ok(out)
}
async fn project_for(&self, hint: ProjectHint) -> Result<Arc<LeanProject>> {
let root = self.resolve(&hint)?;
self.acquire(root).await
}
async fn acquire(&self, root: PathBuf) -> Result<Arc<LeanProject>> {
let cached = {
let mut inner = self.inner.lock();
inner.registry.get(&root).cloned()
};
if let Some(project) = cached {
let current_hash = fingerprint_lake_project(&root)?;
if project.manifest_hash() == current_hash && project.is_healthy() {
tracing::debug!(project = %root.display(), cache_hit = true, "reusing resident project");
self.inner.lock().last_used.insert(root, Instant::now());
return Ok(project);
}
tracing::debug!(
project = %root.display(),
manifest_changed = project.manifest_hash() != current_hash,
healthy = project.is_healthy(),
"evicting stale project before reopen"
);
let stale = {
let mut inner = self.inner.lock();
inner.last_used.remove(&root);
inner.registry.pop(&root)
};
if let Some(s) = stale {
s.shutdown();
}
drop(project);
}
let open_lock = {
let mut inner = self.inner.lock();
Arc::clone(
inner
.opening_locks
.entry(root.clone())
.or_insert_with(|| Arc::new(AsyncMutex::new(()))),
)
};
let _open_guard = open_lock.lock().await;
let cached_after_wait = {
let mut inner = self.inner.lock();
inner.registry.get(&root).cloned()
};
if let Some(project) = cached_after_wait {
let current_hash = fingerprint_lake_project(&root)?;
if project.manifest_hash() == current_hash && project.is_healthy() {
self.inner.lock().opening_locks.remove(&root);
self.inner.lock().last_used.insert(root, Instant::now());
return Ok(project);
}
let stale = {
let mut inner = self.inner.lock();
inner.last_used.remove(&root);
inner.registry.pop(&root)
};
if let Some(s) = stale {
s.shutdown();
}
}
let meta_root = root.clone();
let meta = match tokio::task::spawn_blocking(move || LakeProjectMeta::from_explicit(&meta_root)).await {
Ok(Ok(meta)) => meta,
Ok(Err(err)) => {
self.inner.lock().opening_locks.remove(&root);
return Err(err);
}
Err(err) => {
self.inner.lock().opening_locks.remove(&root);
return Err(ServerError::Internal(format!("project metadata task failed: {err}")));
}
};
let admission = Arc::clone(&self.semantic_admission);
let runtime_config = self.runtime_config.clone();
let opened = match tokio::task::spawn_blocking(move || {
LeanProject::open_with_admission(meta, admission, runtime_config)
})
.await
{
Ok(Ok(project)) => project,
Ok(Err(err)) => {
self.inner.lock().opening_locks.remove(&root);
tracing::warn!(project = %root.display(), error = %err, "project open failed");
return Err(err);
}
Err(err) => {
self.inner.lock().opening_locks.remove(&root);
return Err(ServerError::Internal(format!("project open task failed: {err}")));
}
};
tracing::info!(project = %root.display(), toolchain = %opened.toolchain(), "opened project; worker spawned");
let mut inner = self.inner.lock();
let (project, victim) = if let Some(existing) = inner.registry.get(&root).cloned() {
inner.last_used.insert(root.clone(), Instant::now());
(existing, Some(opened))
} else {
let victim = if inner.registry.len() >= inner.registry.cap().get() {
pop_idle_lru(&mut inner.registry)
} else {
None
};
if victim.is_none() && inner.registry.len() >= inner.registry.cap().get() {
inner.opening_locks.remove(&root);
drop(inner);
opened.shutdown();
tracing::warn!(
project = %root.display(),
"project pool full and every entry is active; rejecting (retryable)"
);
return Err(ServerError::worker_unavailable(crate::error::WorkerUnavailable {
retryable: true,
worker_restarted: false,
project_root: root.to_string_lossy().into_owned(),
project_hash: String::new(),
imports: Vec::new(),
session_id: String::new(),
lean_toolchain: String::new(),
worker_generation: 0,
reason: "project_pool_busy_all_entries_active".to_owned(),
restart_cause: None,
rss_kib: None,
limit_kib: None,
retry_after_millis: None,
restarts_in_window: None,
window_millis: None,
runtime: crate::envelope::RuntimeFacts::default(),
toolchain_advisories: Vec::new(),
}));
}
if let Some((ref evicted_path, _)) = victim {
inner.last_used.remove(evicted_path);
}
inner.registry.put(root.clone(), Arc::clone(&opened));
inner.last_used.insert(root.clone(), Instant::now());
(opened, victim.map(|(_, v)| v))
};
inner.opening_locks.remove(&root);
drop(inner);
if let Some(v) = victim {
tracing::debug!(evicted = %v.canonical_root().display(), "evicted LRU project to make room");
v.shutdown();
}
Ok(project)
}
pub fn reap_idle(&self) {
if self.config.idle_timeout.is_zero() {
return;
}
let now = Instant::now();
let evicted: Vec<Arc<LeanProject>> = {
let mut inner = self.inner.lock();
let expired: Vec<PathBuf> = inner
.last_used
.iter()
.filter(|(_, last)| now.saturating_duration_since(**last) >= self.config.idle_timeout)
.map(|(p, _)| p.clone())
.collect();
let mut out: Vec<Arc<LeanProject>> = Vec::with_capacity(expired.len());
for p in &expired {
if let Some(proj) = inner.registry.peek(p)
&& !proj.is_idle()
{
continue;
}
if let Some(proj) = inner.registry.pop(p) {
out.push(proj);
}
inner.last_used.remove(p);
}
out
};
if !evicted.is_empty() {
tracing::info!(
evicted_count = evicted.len(),
idle_timeout_secs = self.config.idle_timeout.as_secs(),
"idle reaper evicted projects"
);
}
for v in evicted {
v.shutdown();
}
}
#[must_use]
pub fn resident_paths(&self) -> Vec<PathBuf> {
let inner = self.inner.lock();
inner.registry.iter().map(|(p, _)| p.clone()).collect()
}
}
fn canonicalise(path: &Path) -> Result<PathBuf> {
path.canonicalize()
.map_err(|e| ServerError::BadProject(format!("canonicalise {}: {e}", path.display())))
}
fn walk_up_for_lakefile(start: &Path) -> Option<PathBuf> {
let mut cur: Option<&Path> = Some(start);
while let Some(dir) = cur {
if dir.join("lakefile.toml").is_file() || dir.join("lakefile.lean").is_file() {
return Some(dir.to_path_buf());
}
cur = dir.parent();
}
None
}
fn pop_idle_lru(registry: &mut LruCache<PathBuf, Arc<LeanProject>>) -> Option<(PathBuf, Arc<LeanProject>)> {
let key = registry
.iter()
.rev()
.find_map(|(path, project)| project.is_idle().then(|| path.clone()))?;
let project = registry.pop(&key)?;
Some((key, project))
}
fn broker_call<T>(project: &LeanProject, freshness_imports: &[String], call: ProjectCall<T>) -> BrokerCall<T> {
let (value, runtime) = call.into_parts();
BrokerCall {
value,
runtime,
freshness: project.freshness(freshness_imports),
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::panic,
reason = "test code uses unwrap/panic to surface failure paths concisely"
)]
mod tests {
use std::fs;
use super::*;
fn make_lake_dir(root: &Path, name: &str) -> PathBuf {
let dir = root.join(name);
fs::create_dir_all(&dir).unwrap();
fs::write(
dir.join("lakefile.lean"),
format!("package {name}\nlean_lib {}\n", name.replace('-', "_")),
)
.unwrap();
dir.canonicalize().unwrap()
}
fn cfg(cwd: PathBuf, env: Option<PathBuf>, conf: Option<PathBuf>) -> BrokerConfig {
BrokerConfig {
config_default: conf,
env_default: env,
cwd,
max_projects: BrokerConfig::default_max_projects(),
idle_timeout: Duration::ZERO,
semantic_permits: NonZeroUsize::MIN,
semantic_waiters: BrokerConfig::default_semantic_waiters(),
semantic_admission_timeout: BrokerConfig::default_semantic_admission_timeout(),
}
}
#[test]
fn walk_up_for_lakefile_finds_in_parent() {
let tmp = tempfile::tempdir().unwrap();
let proj = make_lake_dir(tmp.path(), "myproj");
let nested = proj.join("a/b");
fs::create_dir_all(&nested).unwrap();
assert_eq!(walk_up_for_lakefile(&nested).unwrap(), proj);
}
#[test]
fn resolve_returns_canonicalised_path_when_no_resolution_needed() {
let tmp = tempfile::tempdir().unwrap();
let proj = make_lake_dir(tmp.path(), "explicit");
let resolved = ProjectBroker::new(cfg(tmp.path().to_path_buf(), None, None))
.resolve(&ProjectHint::Explicit(proj.clone()))
.unwrap();
assert_eq!(resolved, proj);
}
#[test]
fn parse_pool_config_uses_defaults_when_unset() {
let empty = BrokerFileConfig::default();
let (max, idle, semantic, waiters, timeout) = parse_pool_config(None, None, None, None, None, &empty).unwrap();
assert_eq!(max.get(), DEFAULT_MAX_PROJECTS);
assert_eq!(idle, Duration::from_secs(DEFAULT_IDLE_TIMEOUT_SECS));
assert_eq!(semantic.get(), DEFAULT_SEMANTIC_PERMITS);
assert_eq!(waiters.get(), DEFAULT_SEMANTIC_WAITERS);
assert_eq!(
timeout,
Duration::from_millis(DEFAULT_SEMANTIC_ADMISSION_TIMEOUT_MILLIS)
);
}
#[test]
fn parse_pool_config_accepts_explicit_values() {
let empty = BrokerFileConfig::default();
let (max, idle, semantic, waiters, timeout) =
parse_pool_config(Some("8"), Some("30"), Some("2"), Some("12"), Some("250"), &empty).unwrap();
assert_eq!(max.get(), 8);
assert_eq!(idle, Duration::from_secs(30));
assert_eq!(semantic.get(), 2);
assert_eq!(waiters.get(), 12);
assert_eq!(timeout, Duration::from_millis(250));
}
#[test]
fn parse_pool_config_treats_zero_idle_as_disable() {
let empty = BrokerFileConfig::default();
let (_, idle, _, _, _) = parse_pool_config(None, Some("0"), None, None, None, &empty).unwrap();
assert_eq!(idle, Duration::ZERO);
}
#[test]
fn parse_pool_config_rejects_max_projects_zero() {
let empty = BrokerFileConfig::default();
let err = parse_pool_config(Some("0"), None, None, None, None, &empty).unwrap_err();
assert!(matches!(err, ServerError::Internal(_)), "{err:?}");
}
#[test]
fn parse_pool_config_rejects_garbage() {
let e = BrokerFileConfig::default();
assert!(parse_pool_config(Some("seven"), None, None, None, None, &e).is_err());
assert!(parse_pool_config(None, Some("forever"), None, None, None, &e).is_err());
assert!(parse_pool_config(None, None, Some("many"), None, None, &e).is_err());
assert!(parse_pool_config(None, None, Some("0"), None, None, &e).is_err());
assert!(parse_pool_config(None, None, None, Some("0"), None, &e).is_err());
assert!(parse_pool_config(None, None, None, None, Some("0"), &e).is_err());
}
#[test]
fn parse_pool_config_file_value_used_when_env_unset_and_env_wins() {
let file = BrokerFileConfig {
max_projects: Some(7),
semantic_admission_timeout_millis: Some(0), ..BrokerFileConfig::default()
};
let (max, ..) = parse_pool_config(None, None, None, None, Some("250"), &file).unwrap();
assert_eq!(max.get(), 7);
let (max, _, _, _, timeout) = parse_pool_config(Some("3"), None, None, None, Some("250"), &file).unwrap();
assert_eq!(max.get(), 3);
assert_eq!(timeout, Duration::from_millis(250));
}
#[test]
fn parse_pool_config_rejects_zero_max_projects_from_file() {
let file = BrokerFileConfig {
max_projects: Some(0),
..BrokerFileConfig::default()
};
assert!(parse_pool_config(None, None, None, None, None, &file).is_err());
}
}