1#![allow(let_underscore_drop, clippy::needless_pass_by_value)]
10
11use std::collections::{BTreeMap, VecDeque};
12use std::num::NonZeroUsize;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
16use std::thread;
17use std::time::{Duration, Instant};
18
19use lean_rs_worker_parent::{
20 LeanWorkerCapabilityBuilder, LeanWorkerChild, LeanWorkerDeclarationInspectionRequest,
21 LeanWorkerDeclarationInspectionResult, LeanWorkerDeclarationSearch, LeanWorkerDeclarationSearchResult,
22 LeanWorkerDeclarationVerificationRequest, LeanWorkerDeclarationVerificationResult, LeanWorkerElabOptions,
23 LeanWorkerError, LeanWorkerHostHandle, LeanWorkerHostHandleBuilder, LeanWorkerLifecycleSnapshot,
24 LeanWorkerModuleCacheLimits, LeanWorkerModuleQuery, LeanWorkerModuleQueryBatchOutcome,
25 LeanWorkerModuleQueryOutcome, LeanWorkerModuleQuerySelector, LeanWorkerOutputBudgets,
26 LeanWorkerProofAttemptRequest, LeanWorkerProofAttemptResult, LeanWorkerRestartPolicy, LeanWorkerRestartReason,
27};
28use lean_semantic_search_runtime::SemanticSearchRuntimeBuild;
29use parking_lot::Mutex;
30use tokio::sync::{mpsc, oneshot};
31
32use crate::admission::SemanticPermit;
33use crate::cache::ModuleQueryCache;
34use crate::config_file::RuntimeFileConfig;
35use crate::envelope::{Freshness, RuntimeFacts, RuntimeRestartEvent};
36use crate::error::{Result, ServerError, WorkerUnavailable, map_worker_err};
37use crate::lake_meta::LakeProjectMeta;
38use crate::semantic_search::{SemanticProofSearchRequest, SemanticProofSearchResult};
39use crate::toolchain::{Readiness, ToolchainId, WorkerBinary};
40
41const MODULE_QUERY_CACHE_CAPACITY: usize = 256;
43const WORKER_REQUEST_RESTARTS: u64 = 64;
44const PROJECT_MAILBOX_CAPACITY: usize = 8;
45const WORKER_RSS_POST_JOB_RESTART_KIB: u64 = 5 * 1024 * 1024;
46const WORKER_RSS_HARD_KILL_KIB: u64 = 16 * 1024 * 1024;
47const WORKER_RSS_SAMPLE_MILLIS: u64 = 250;
48const IMPORT_SWITCH_RSS_SOFT_KIB: u64 = 2 * 1024 * 1024;
49const MODULE_CACHE_RSS_GUARD_KIB: u64 = 2 * 1024 * 1024;
50const MODULE_CACHE_MAX_BYTES: u64 = 32 * 1024 * 1024;
51const REQUEST_TIMEOUT_MILLIS: u64 = 120 * 1000;
58const MAX_JOB_RETRIES: u32 = 1;
59const MAX_RESTARTS_PER_WINDOW: usize = 3;
60const RESTART_WINDOW: Duration = Duration::from_mins(1);
61
62#[derive(Debug, Clone, Eq, PartialEq)]
68pub struct ProjectRuntimeConfig {
69 worker_rss_post_job_restart_kib: u64,
70 worker_rss_hard_kill_kib: u64,
71 worker_rss_sample_millis: u64,
72 import_switch_rss_soft_kib: u64,
73 module_cache_rss_guard_kib: u64,
74 module_cache_max_bytes: u64,
75 request_timeout_millis: u64,
76 mailbox_capacity: usize,
77 max_restarts_per_window: usize,
78 restart_window: Duration,
79}
80
81impl Default for ProjectRuntimeConfig {
82 fn default() -> Self {
83 Self {
84 worker_rss_post_job_restart_kib: WORKER_RSS_POST_JOB_RESTART_KIB,
85 worker_rss_hard_kill_kib: WORKER_RSS_HARD_KILL_KIB,
86 worker_rss_sample_millis: WORKER_RSS_SAMPLE_MILLIS,
87 import_switch_rss_soft_kib: IMPORT_SWITCH_RSS_SOFT_KIB,
88 module_cache_rss_guard_kib: MODULE_CACHE_RSS_GUARD_KIB,
89 module_cache_max_bytes: MODULE_CACHE_MAX_BYTES,
90 request_timeout_millis: REQUEST_TIMEOUT_MILLIS,
91 mailbox_capacity: PROJECT_MAILBOX_CAPACITY,
92 max_restarts_per_window: MAX_RESTARTS_PER_WINDOW,
93 restart_window: RESTART_WINDOW,
94 }
95 }
96}
97
98impl ProjectRuntimeConfig {
99 pub fn from_env() -> Result<Self> {
106 Self::from_env_with_file(&RuntimeFileConfig::default())
107 }
108
109 pub fn from_env_with_file(file: &RuntimeFileConfig) -> Result<Self> {
118 parse_runtime_config(
119 RuntimeEnv {
120 worker_rss_post_job_restart_kib: runtime_env_var("LEAN_HOST_MCP_WORKER_RSS_POST_JOB_RESTART_KIB")?,
121 worker_rss_hard_kill_kib: runtime_env_var("LEAN_HOST_MCP_WORKER_RSS_HARD_KILL_KIB")?,
122 worker_rss_sample_millis: runtime_env_var("LEAN_HOST_MCP_WORKER_RSS_SAMPLE_MILLIS")?,
123 import_switch_rss_soft_kib: runtime_env_var("LEAN_HOST_MCP_IMPORT_SWITCH_RSS_SOFT_KIB")?,
124 module_cache_rss_guard_kib: runtime_env_var("LEAN_HOST_MCP_MODULE_CACHE_RSS_GUARD_KIB")?,
125 module_cache_max_bytes: runtime_env_var("LEAN_HOST_MCP_MODULE_CACHE_MAX_BYTES")?,
126 request_timeout_millis: runtime_env_var("LEAN_HOST_MCP_REQUEST_TIMEOUT_MILLIS")?,
127 project_mailbox_capacity: runtime_env_var("LEAN_HOST_MCP_PROJECT_MAILBOX_CAPACITY")?,
128 worker_restart_limit: runtime_env_var("LEAN_HOST_MCP_WORKER_RESTART_LIMIT")?,
129 worker_restart_window_secs: runtime_env_var("LEAN_HOST_MCP_WORKER_RESTART_WINDOW_SECS")?,
130 },
131 file,
132 )
133 }
134
135 #[must_use]
136 pub const fn worker_rss_post_job_restart_kib(&self) -> u64 {
137 self.worker_rss_post_job_restart_kib
138 }
139
140 #[must_use]
141 pub const fn worker_rss_hard_kill_kib(&self) -> u64 {
142 self.worker_rss_hard_kill_kib
143 }
144
145 #[must_use]
146 pub const fn worker_rss_sample_millis(&self) -> u64 {
147 self.worker_rss_sample_millis
148 }
149
150 #[must_use]
151 pub const fn import_switch_rss_soft_kib(&self) -> u64 {
152 self.import_switch_rss_soft_kib
153 }
154
155 #[must_use]
156 pub const fn module_cache_rss_guard_kib(&self) -> u64 {
157 self.module_cache_rss_guard_kib
158 }
159
160 #[must_use]
161 pub const fn module_cache_max_bytes(&self) -> u64 {
162 self.module_cache_max_bytes
163 }
164
165 #[must_use]
166 pub const fn request_timeout_millis(&self) -> u64 {
167 self.request_timeout_millis
168 }
169
170 #[must_use]
171 pub const fn mailbox_capacity(&self) -> usize {
172 self.mailbox_capacity
173 }
174
175 #[must_use]
176 pub const fn max_restarts_per_window(&self) -> usize {
177 self.max_restarts_per_window
178 }
179
180 #[must_use]
181 pub const fn restart_window(&self) -> Duration {
182 self.restart_window
183 }
184}
185
186#[derive(Debug, Default)]
187struct RuntimeEnv {
188 worker_rss_post_job_restart_kib: Option<String>,
189 worker_rss_hard_kill_kib: Option<String>,
190 worker_rss_sample_millis: Option<String>,
191 import_switch_rss_soft_kib: Option<String>,
192 module_cache_rss_guard_kib: Option<String>,
193 module_cache_max_bytes: Option<String>,
194 request_timeout_millis: Option<String>,
195 project_mailbox_capacity: Option<String>,
196 worker_restart_limit: Option<String>,
197 worker_restart_window_secs: Option<String>,
198}
199
200fn parse_runtime_config(env: RuntimeEnv, file: &RuntimeFileConfig) -> Result<ProjectRuntimeConfig> {
201 let defaults = ProjectRuntimeConfig::default();
202 let config = ProjectRuntimeConfig {
203 worker_rss_post_job_restart_kib: parse_nonzero_u64(
204 "LEAN_HOST_MCP_WORKER_RSS_POST_JOB_RESTART_KIB",
205 env.worker_rss_post_job_restart_kib.as_deref(),
206 file.worker_rss_post_job_restart_kib,
207 defaults.worker_rss_post_job_restart_kib,
208 )?,
209 worker_rss_hard_kill_kib: parse_nonzero_u64(
210 "LEAN_HOST_MCP_WORKER_RSS_HARD_KILL_KIB",
211 env.worker_rss_hard_kill_kib.as_deref(),
212 file.worker_rss_hard_kill_kib,
213 defaults.worker_rss_hard_kill_kib,
214 )?,
215 worker_rss_sample_millis: parse_nonzero_u64(
216 "LEAN_HOST_MCP_WORKER_RSS_SAMPLE_MILLIS",
217 env.worker_rss_sample_millis.as_deref(),
218 file.worker_rss_sample_millis,
219 defaults.worker_rss_sample_millis,
220 )?,
221 import_switch_rss_soft_kib: parse_nonzero_u64(
222 "LEAN_HOST_MCP_IMPORT_SWITCH_RSS_SOFT_KIB",
223 env.import_switch_rss_soft_kib.as_deref(),
224 file.import_switch_rss_soft_kib,
225 defaults.import_switch_rss_soft_kib,
226 )?,
227 module_cache_rss_guard_kib: parse_nonzero_u64(
228 "LEAN_HOST_MCP_MODULE_CACHE_RSS_GUARD_KIB",
229 env.module_cache_rss_guard_kib.as_deref(),
230 file.module_cache_rss_guard_kib,
231 defaults.module_cache_rss_guard_kib,
232 )?,
233 module_cache_max_bytes: parse_nonzero_u64(
234 "LEAN_HOST_MCP_MODULE_CACHE_MAX_BYTES",
235 env.module_cache_max_bytes.as_deref(),
236 file.module_cache_max_bytes,
237 defaults.module_cache_max_bytes,
238 )?,
239 request_timeout_millis: parse_nonzero_u64(
240 "LEAN_HOST_MCP_REQUEST_TIMEOUT_MILLIS",
241 env.request_timeout_millis.as_deref(),
242 file.request_timeout_millis,
243 defaults.request_timeout_millis,
244 )?,
245 mailbox_capacity: parse_nonzero_usize(
246 "LEAN_HOST_MCP_PROJECT_MAILBOX_CAPACITY",
247 env.project_mailbox_capacity.as_deref(),
248 file.project_mailbox_capacity,
249 defaults.mailbox_capacity,
250 )?,
251 max_restarts_per_window: parse_nonzero_usize(
252 "LEAN_HOST_MCP_WORKER_RESTART_LIMIT",
253 env.worker_restart_limit.as_deref(),
254 file.worker_restart_limit,
255 defaults.max_restarts_per_window,
256 )?,
257 restart_window: Duration::from_secs(parse_nonzero_u64(
258 "LEAN_HOST_MCP_WORKER_RESTART_WINDOW_SECS",
259 env.worker_restart_window_secs.as_deref(),
260 file.worker_restart_window_secs,
261 defaults.restart_window.as_secs(),
262 )?),
263 };
264 validate_rss_ordering(&config)?;
265 Ok(config)
266}
267
268fn validate_rss_ordering(config: &ProjectRuntimeConfig) -> Result<()> {
276 if config.import_switch_rss_soft_kib > config.worker_rss_post_job_restart_kib {
277 return Err(ServerError::Internal(format!(
278 "invalid RSS config: import_switch={} KiB exceeds post_job={} KiB \
279 (need import_switch <= post_job <= hard_kill)",
280 config.import_switch_rss_soft_kib, config.worker_rss_post_job_restart_kib,
281 )));
282 }
283 if config.worker_rss_post_job_restart_kib > config.worker_rss_hard_kill_kib {
284 return Err(ServerError::Internal(format!(
285 "invalid RSS config: post_job={} KiB exceeds hard_kill={} KiB \
286 (need import_switch <= post_job <= hard_kill)",
287 config.worker_rss_post_job_restart_kib, config.worker_rss_hard_kill_kib,
288 )));
289 }
290 Ok(())
291}
292
293fn runtime_env_var(name: &str) -> Result<Option<String>> {
294 match std::env::var(name) {
295 Ok(value) => Ok(Some(value)),
296 Err(std::env::VarError::NotPresent) => Ok(None),
297 Err(err @ std::env::VarError::NotUnicode(_)) => {
298 Err(ServerError::Internal(format!("{name} is not valid unicode: {err}")))
299 }
300 }
301}
302
303#[derive(Debug, Clone)]
305pub(crate) struct ProjectCall<T> {
306 value: T,
307 runtime: RuntimeFacts,
308}
309
310impl<T> ProjectCall<T> {
311 pub(crate) fn new(value: T, runtime: RuntimeFacts) -> Self {
312 Self { value, runtime }
313 }
314
315 pub(crate) fn into_parts(self) -> (T, RuntimeFacts) {
316 (self.value, self.runtime)
317 }
318}
319
320#[derive(Clone, Copy, Debug, Eq, PartialEq)]
321enum RetryPolicy {
322 RetryOnceReadOnly,
323}
324
325impl RetryPolicy {
326 fn retries(self) -> u32 {
327 match self {
328 Self::RetryOnceReadOnly => MAX_JOB_RETRIES,
329 }
330 }
331}
332
333struct ActiveJobGuard {
334 active_jobs: Arc<AtomicUsize>,
335}
336
337impl Drop for ActiveJobGuard {
338 fn drop(&mut self) {
339 self.active_jobs.fetch_sub(1, Ordering::AcqRel);
340 }
341}
342
343struct JobMeta {
344 imports: Vec<String>,
345 import_fingerprint: String,
346 _created_at: Instant,
347 queued_at: Instant,
348 admission_wait_millis: u64,
349 _correlation_id: uuid::Uuid,
350 retry_policy: RetryPolicy,
351 _active_job: ActiveJobGuard,
352 _semantic_permit: SemanticPermit,
353}
354
355enum ProjectMessage {
356 ModuleQuery {
357 meta: JobMeta,
358 source: String,
359 query: LeanWorkerModuleQuery,
360 options: LeanWorkerElabOptions,
361 reply: oneshot::Sender<Result<ProjectCall<LeanWorkerModuleQueryOutcome>>>,
362 },
363 ModuleQueryBatch {
364 meta: JobMeta,
365 source: String,
366 selectors: Vec<LeanWorkerModuleQuerySelector>,
367 budgets: LeanWorkerOutputBudgets,
368 options: LeanWorkerElabOptions,
369 reply: oneshot::Sender<Result<ProjectCall<LeanWorkerModuleQueryBatchOutcome>>>,
370 },
371 DeclarationInspection {
372 meta: JobMeta,
373 request: LeanWorkerDeclarationInspectionRequest,
374 reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationInspectionResult>>>,
375 },
376 DeclarationSearch {
377 meta: JobMeta,
378 request: LeanWorkerDeclarationSearch,
379 reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationSearchResult>>>,
380 },
381 ProofAttempt {
382 meta: JobMeta,
383 request: LeanWorkerProofAttemptRequest,
384 options: LeanWorkerElabOptions,
385 reply: oneshot::Sender<Result<ProjectCall<LeanWorkerProofAttemptResult>>>,
386 },
387 DeclarationVerification {
388 meta: JobMeta,
389 request: LeanWorkerDeclarationVerificationRequest,
390 options: LeanWorkerElabOptions,
391 reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationVerificationResult>>>,
392 },
393 SemanticProofSearch {
394 meta: JobMeta,
395 request: SemanticProofSearchRequest,
396 reply: oneshot::Sender<Result<ProjectCall<SemanticProofSearchResult>>>,
397 },
398}
399
400impl ProjectMessage {
401 fn imports(&self) -> &[String] {
402 match self {
403 Self::ModuleQuery { meta, .. }
404 | Self::ModuleQueryBatch { meta, .. }
405 | Self::DeclarationInspection { meta, .. }
406 | Self::DeclarationSearch { meta, .. }
407 | Self::ProofAttempt { meta, .. }
408 | Self::DeclarationVerification { meta, .. }
409 | Self::SemanticProofSearch { meta, .. } => &meta.imports,
410 }
411 }
412}
413
414#[derive(Clone, Copy, Debug, Eq, PartialEq)]
415enum RestartCause {
416 #[expect(dead_code, reason = "stable wire cause reserved for future non-RSS profile cycling")]
417 ImportProfileSwitch,
418 RssImportSwitch,
419 RssPostJob,
420 RssHardLimit,
421 MaxRequests,
422 MaxImports,
423 Idle,
424 Timeout,
425 Cancelled,
426 ChildExit,
427 ChildAbort,
428 SessionMissing,
429 Explicit,
430 WorkerInternal,
431}
432
433impl RestartCause {
434 const fn as_str(self) -> &'static str {
435 match self {
436 Self::ImportProfileSwitch => "import_profile_switch",
437 Self::RssImportSwitch => "rss_import_switch",
438 Self::RssPostJob => "rss_post_job",
439 Self::RssHardLimit => "rss_hard_limit_exceeded",
440 Self::MaxRequests => "max_requests",
441 Self::MaxImports => "max_imports",
442 Self::Idle => "idle",
443 Self::Timeout => "timeout",
444 Self::Cancelled => "cancelled",
445 Self::ChildExit => "child_exit",
446 Self::ChildAbort => "child_abort",
447 Self::SessionMissing => "session_missing",
448 Self::Explicit => "explicit",
449 Self::WorkerInternal => "worker_internal",
450 }
451 }
452
453 const fn counts_toward_restart_limit(self) -> bool {
454 matches!(
455 self,
456 Self::Timeout
457 | Self::Cancelled
458 | Self::ChildExit
459 | Self::ChildAbort
460 | Self::SessionMissing
461 | Self::RssHardLimit
462 )
463 }
464
465 const fn is_planned(self) -> bool {
466 !self.counts_toward_restart_limit()
467 }
468}
469
470fn restart_event(
471 cause: RestartCause,
472 reason: impl Into<String>,
473 worker_generation: u64,
474 rss_kib: Option<u64>,
475 limit_kib: Option<u64>,
476) -> RuntimeRestartEvent {
477 RuntimeRestartEvent {
478 cause: cause.as_str().to_owned(),
479 reason: reason.into(),
480 worker_generation,
481 planned: cause.is_planned(),
482 rss_kib,
483 limit_kib,
484 }
485}
486
487#[derive(Debug, Clone, Default)]
494struct RestartStats {
495 total: u64,
496 by_cause: BTreeMap<String, u64>,
497}
498
499impl RestartStats {
500 fn observe(&mut self, cause: &str) {
501 self.total = self.total.saturating_add(1);
502 let count = self.by_cause.entry(cause.to_owned()).or_default();
503 *count = count.saturating_add(1);
504 }
505}
506
507fn log_restart(event: &RuntimeRestartEvent, restarts_total: u64) {
511 macro_rules! emit {
512 ($level:ident, $msg:literal) => {
513 tracing::$level!(
514 cause = %event.cause,
515 reason = %event.reason,
516 worker_generation = event.worker_generation,
517 rss_kib = ?event.rss_kib,
518 limit_kib = ?event.limit_kib,
519 planned = event.planned,
520 restarts_total,
521 $msg
522 )
523 };
524 }
525 match event.cause.as_str() {
526 "rss_hard_limit_exceeded"
527 | "child_abort"
528 | "child_exit"
529 | "session_missing"
530 | "worker_internal"
531 | "timeout"
532 | "cancelled" => emit!(warn, "worker recycled (abnormal)"),
533 "rss_post_job" | "rss_import_switch" => emit!(info, "worker recycled (memory pressure)"),
534 _ => emit!(debug, "worker recycled (hygiene)"),
535 }
536}
537
538fn restart_cause_from_worker(reason: &LeanWorkerRestartReason) -> RestartCause {
539 match reason.stable_cause() {
540 "explicit" => RestartCause::Explicit,
541 "max_requests" => RestartCause::MaxRequests,
542 "max_imports" => RestartCause::MaxImports,
543 "rss_ceiling" => RestartCause::RssPostJob,
544 "rss_hard_limit" => RestartCause::RssHardLimit,
545 "idle" => RestartCause::Idle,
546 "cancelled" => RestartCause::Cancelled,
547 "timeout" => RestartCause::Timeout,
548 _ => RestartCause::WorkerInternal,
549 }
550}
551
552#[derive(Debug, Clone)]
553struct RuntimeSnapshot {
554 worker_generation: u64,
555 last_restart: Option<RuntimeRestartEvent>,
556 rss_kib: Option<u64>,
557 import_profile: Option<String>,
558 profile_switch_count: u64,
559 restarts_total: u64,
560 restarts_by_cause: BTreeMap<String, u64>,
561}
562
563impl RuntimeSnapshot {
564 fn facts(&self) -> RuntimeFacts {
565 RuntimeFacts {
566 worker_generation: self.worker_generation,
567 worker_restarted: false,
568 retry_count: 0,
569 admission_wait_millis: 0,
570 queue_wait_millis: 0,
571 call_restart: None,
572 last_restart: self.last_restart.clone(),
573 rss_kib: self.rss_kib,
574 worker_lanes: 1,
575 import_profile: self.import_profile.clone(),
576 profile_switch_count: self.profile_switch_count,
577 restarts_total: self.restarts_total,
578 restarts_by_cause: self.restarts_by_cause.clone(),
579 }
580 }
581}
582
583pub(crate) struct LeanProject {
586 canonical_root: PathBuf,
587 toolchain: String,
588 package: Option<String>,
589 library: Option<String>,
590 manifest_hash: String,
591 session_id: String,
592 open_warnings: Vec<String>,
596 actor_tx: Mutex<Option<mpsc::Sender<ProjectMessage>>>,
597 active_jobs: Arc<AtomicUsize>,
598 healthy: Arc<AtomicBool>,
599 runtime: Arc<Mutex<RuntimeSnapshot>>,
600 module_queries: ModuleQueryCache,
601}
602
603impl std::fmt::Debug for LeanProject {
604 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
605 f.debug_struct("LeanProject")
606 .field("canonical_root", &self.canonical_root)
607 .field("toolchain", &self.toolchain)
608 .field("package", &self.package)
609 .field("library", &self.library)
610 .field("manifest_hash", &self.manifest_hash)
611 .finish_non_exhaustive()
612 }
613}
614
615impl LeanProject {
616 pub(crate) fn open(meta: LakeProjectMeta, runtime_config: ProjectRuntimeConfig) -> Result<Arc<Self>> {
617 let session_id = uuid::Uuid::new_v4().to_string();
618 let runtime = Arc::new(Mutex::new(RuntimeSnapshot {
619 worker_generation: 1,
620 last_restart: None,
621 rss_kib: None,
622 import_profile: None,
623 profile_switch_count: 0,
624 restarts_total: 0,
625 restarts_by_cause: BTreeMap::new(),
626 }));
627 let active_jobs = Arc::new(AtomicUsize::new(0));
628 let healthy = Arc::new(AtomicBool::new(true));
629 let (config, open_warnings) = ActorConfig::from_meta(
630 &meta,
631 session_id.clone(),
632 Arc::clone(&runtime),
633 Arc::clone(&healthy),
634 runtime_config,
635 )?;
636 type InitMsg = std::result::Result<(String, mpsc::Sender<ProjectMessage>), ServerError>;
637 let (init_tx, init_rx) = std::sync::mpsc::channel::<InitMsg>();
638 let thread_name = actor_thread_name(&meta.canonical_root);
639
640 thread::Builder::new()
641 .name(thread_name)
642 .spawn(move || {
643 actor_main(config, init_tx);
644 })
645 .map_err(|e| ServerError::Internal(format!("spawn project actor thread: {e}")))?;
646
647 let (runtime_toolchain, actor_tx) = init_rx
648 .recv()
649 .map_err(|_| ServerError::Internal("project actor thread died during init".into()))??;
650
651 let cache_cap = NonZeroUsize::new(MODULE_QUERY_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
652 Ok(Arc::new(Self {
653 canonical_root: meta.canonical_root,
654 toolchain: runtime_toolchain,
655 package: meta.package,
656 library: meta.library,
657 manifest_hash: meta.manifest_hash,
658 session_id,
659 open_warnings,
660 actor_tx: Mutex::new(Some(actor_tx)),
661 active_jobs,
662 healthy,
663 runtime,
664 module_queries: ModuleQueryCache::with_capacity(cache_cap),
665 }))
666 }
667
668 pub(crate) async fn process_module_query(
675 &self,
676 imports: Vec<String>,
677 semantic_permit: SemanticPermit,
678 admission_wait_millis: u64,
679 source: String,
680 query: LeanWorkerModuleQuery,
681 options: LeanWorkerElabOptions,
682 ) -> Result<ProjectCall<LeanWorkerModuleQueryOutcome>> {
683 let (reply, rx) = oneshot::channel();
684 let message = ProjectMessage::ModuleQuery {
685 meta: self.job_meta(
686 imports,
687 RetryPolicy::RetryOnceReadOnly,
688 semantic_permit,
689 admission_wait_millis,
690 ),
691 source,
692 query,
693 options,
694 reply,
695 };
696 self.enqueue(message, rx).await
697 }
698
699 pub(crate) async fn process_module_query_batch(
706 &self,
707 imports: Vec<String>,
708 semantic_permit: SemanticPermit,
709 admission_wait_millis: u64,
710 source: String,
711 selectors: Vec<LeanWorkerModuleQuerySelector>,
712 budgets: LeanWorkerOutputBudgets,
713 options: LeanWorkerElabOptions,
714 ) -> Result<ProjectCall<LeanWorkerModuleQueryBatchOutcome>> {
715 let (reply, rx) = oneshot::channel();
716 let message = ProjectMessage::ModuleQueryBatch {
717 meta: self.job_meta(
718 imports,
719 RetryPolicy::RetryOnceReadOnly,
720 semantic_permit,
721 admission_wait_millis,
722 ),
723 source,
724 selectors,
725 budgets,
726 options,
727 reply,
728 };
729 self.enqueue(message, rx).await
730 }
731
732 pub(crate) async fn inspect_declaration(
739 &self,
740 imports: Vec<String>,
741 semantic_permit: SemanticPermit,
742 admission_wait_millis: u64,
743 request: LeanWorkerDeclarationInspectionRequest,
744 ) -> Result<ProjectCall<LeanWorkerDeclarationInspectionResult>> {
745 let (reply, rx) = oneshot::channel();
746 let message = ProjectMessage::DeclarationInspection {
747 meta: self.job_meta(
748 imports,
749 RetryPolicy::RetryOnceReadOnly,
750 semantic_permit,
751 admission_wait_millis,
752 ),
753 request,
754 reply,
755 };
756 self.enqueue(message, rx).await
757 }
758
759 pub(crate) async fn search_declarations(
766 &self,
767 imports: Vec<String>,
768 semantic_permit: SemanticPermit,
769 admission_wait_millis: u64,
770 request: LeanWorkerDeclarationSearch,
771 ) -> Result<ProjectCall<LeanWorkerDeclarationSearchResult>> {
772 let (reply, rx) = oneshot::channel();
773 let message = ProjectMessage::DeclarationSearch {
774 meta: self.job_meta(
775 imports,
776 RetryPolicy::RetryOnceReadOnly,
777 semantic_permit,
778 admission_wait_millis,
779 ),
780 request,
781 reply,
782 };
783 self.enqueue(message, rx).await
784 }
785
786 pub(crate) async fn semantic_proof_search(
793 &self,
794 imports: Vec<String>,
795 semantic_permit: SemanticPermit,
796 admission_wait_millis: u64,
797 request: SemanticProofSearchRequest,
798 ) -> Result<ProjectCall<SemanticProofSearchResult>> {
799 let (reply, rx) = oneshot::channel();
800 let message = ProjectMessage::SemanticProofSearch {
801 meta: self.job_meta(
802 imports,
803 RetryPolicy::RetryOnceReadOnly,
804 semantic_permit,
805 admission_wait_millis,
806 ),
807 request,
808 reply,
809 };
810 self.enqueue(message, rx).await
811 }
812
813 pub(crate) async fn attempt_proof(
820 &self,
821 imports: Vec<String>,
822 semantic_permit: SemanticPermit,
823 admission_wait_millis: u64,
824 request: LeanWorkerProofAttemptRequest,
825 options: LeanWorkerElabOptions,
826 ) -> Result<ProjectCall<LeanWorkerProofAttemptResult>> {
827 let (reply, rx) = oneshot::channel();
828 let message = ProjectMessage::ProofAttempt {
829 meta: self.job_meta(
830 imports,
831 RetryPolicy::RetryOnceReadOnly,
832 semantic_permit,
833 admission_wait_millis,
834 ),
835 request,
836 options,
837 reply,
838 };
839 self.enqueue(message, rx).await
840 }
841
842 pub(crate) async fn verify_declaration(
849 &self,
850 imports: Vec<String>,
851 semantic_permit: SemanticPermit,
852 admission_wait_millis: u64,
853 request: LeanWorkerDeclarationVerificationRequest,
854 options: LeanWorkerElabOptions,
855 ) -> Result<ProjectCall<LeanWorkerDeclarationVerificationResult>> {
856 let (reply, rx) = oneshot::channel();
857 let message = ProjectMessage::DeclarationVerification {
858 meta: self.job_meta(
859 imports,
860 RetryPolicy::RetryOnceReadOnly,
861 semantic_permit,
862 admission_wait_millis,
863 ),
864 request,
865 options,
866 reply,
867 };
868 self.enqueue(message, rx).await
869 }
870
871 fn job_meta(
872 &self,
873 imports: Vec<String>,
874 retry_policy: RetryPolicy,
875 semantic_permit: SemanticPermit,
876 admission_wait_millis: u64,
877 ) -> JobMeta {
878 let created_at = Instant::now();
879 self.active_jobs.fetch_add(1, Ordering::AcqRel);
880 JobMeta {
881 import_fingerprint: import_fingerprint(&imports),
882 imports,
883 _created_at: created_at,
884 queued_at: Instant::now(),
885 admission_wait_millis,
886 _correlation_id: uuid::Uuid::new_v4(),
887 retry_policy,
888 _active_job: ActiveJobGuard {
889 active_jobs: Arc::clone(&self.active_jobs),
890 },
891 _semantic_permit: semantic_permit,
892 }
893 }
894
895 async fn enqueue<T>(
896 &self,
897 message: ProjectMessage,
898 reply_rx: oneshot::Receiver<Result<ProjectCall<T>>>,
899 ) -> Result<ProjectCall<T>>
900 where
901 T: Send + 'static,
902 {
903 let project_info = self.worker_error_context(message.imports());
904 let tx = self
905 .actor_tx
906 .lock()
907 .as_ref()
908 .cloned()
909 .ok_or_else(|| self.unavailable("project actor is stopped", false, false))?;
910 match tx.try_send(message) {
911 Ok(()) => {}
912 Err(mpsc::error::TrySendError::Full(_)) => {
913 return Err(ServerError::worker_unavailable(WorkerUnavailable {
914 retryable: true,
915 worker_restarted: false,
916 reason: "mailbox_full".to_owned(),
917 ..project_info
918 }));
919 }
920 Err(mpsc::error::TrySendError::Closed(_)) => {
921 self.shutdown();
922 return Err(ServerError::worker_unavailable(WorkerUnavailable {
923 retryable: true,
924 worker_restarted: false,
925 reason: "mailbox_closed".to_owned(),
926 ..project_info
927 }));
928 }
929 }
930
931 match reply_rx.await {
932 Ok(result) => result,
933 Err(_) => {
934 self.shutdown();
935 Err(self.unavailable("mailbox_closed_before_reply", true, false))
936 }
937 }
938 }
939
940 pub(crate) fn manifest_hash(&self) -> &str {
941 &self.manifest_hash
942 }
943
944 pub(crate) fn toolchain(&self) -> &str {
945 &self.toolchain
946 }
947
948 pub(crate) fn canonical_root(&self) -> &Path {
949 &self.canonical_root
950 }
951
952 pub(crate) fn module_query_cache(&self) -> &ModuleQueryCache {
953 &self.module_queries
954 }
955
956 #[must_use]
957 pub(crate) fn freshness(&self, request_imports: &[String]) -> Freshness {
958 Freshness {
959 project_root: self.canonical_root.to_string_lossy().into_owned(),
960 project_hash: self.manifest_hash.clone(),
961 imports: request_imports.to_vec(),
962 session_id: self.session_id.clone(),
963 lean_toolchain: self.toolchain.clone(),
964 toolchain_advisories: self.open_warnings.clone(),
965 }
966 }
967
968 #[must_use]
969 pub(crate) fn runtime_facts(&self) -> RuntimeFacts {
970 self.runtime.lock().facts()
971 }
972
973 pub(crate) fn shutdown(&self) {
974 self.healthy.store(false, Ordering::Release);
975 let _ = self.actor_tx.lock().take();
976 }
977
978 pub(crate) fn is_healthy(&self) -> bool {
979 self.healthy.load(Ordering::Acquire) && self.actor_tx.lock().as_ref().is_some_and(|tx| !tx.is_closed())
980 }
981
982 pub(crate) fn is_idle(&self) -> bool {
983 self.active_jobs.load(Ordering::Acquire) == 0
984 }
985
986 fn unavailable(&self, reason: impl Into<String>, retryable: bool, worker_restarted: bool) -> ServerError {
987 ServerError::worker_unavailable(WorkerUnavailable {
988 retryable,
989 worker_restarted,
990 reason: reason.into(),
991 ..self.worker_error_context(&[])
992 })
993 }
994
995 fn worker_error_context(&self, imports: &[String]) -> WorkerUnavailable {
996 let snapshot = self.runtime.lock().clone();
997 let runtime = snapshot.facts();
998 WorkerUnavailable {
999 retryable: true,
1000 worker_restarted: false,
1001 project_root: self.canonical_root.to_string_lossy().into_owned(),
1002 project_hash: self.manifest_hash.clone(),
1003 imports: imports.to_vec(),
1004 session_id: self.session_id.clone(),
1005 lean_toolchain: self.toolchain.clone(),
1006 worker_generation: snapshot.worker_generation,
1007 reason: String::new(),
1008 restart_cause: snapshot.last_restart.as_ref().map(|event| event.cause.clone()),
1009 rss_kib: snapshot.rss_kib,
1010 limit_kib: None,
1011 retry_after_millis: None,
1012 restarts_in_window: None,
1013 window_millis: None,
1014 runtime,
1015 toolchain_advisories: self.open_warnings.clone(),
1016 }
1017 }
1018}
1019
1020impl Drop for LeanProject {
1021 fn drop(&mut self) {
1022 self.shutdown();
1023 }
1024}
1025
1026#[derive(Clone)]
1027struct ActorConfig {
1028 lake_root: PathBuf,
1029 manifest_hash: String,
1030 toolchain_label: String,
1031 worker_path: PathBuf,
1032 lean_sysroot: PathBuf,
1033 session_id: String,
1034 runtime: Arc<Mutex<RuntimeSnapshot>>,
1035 healthy: Arc<AtomicBool>,
1036 worker_rss_post_job_restart_kib: u64,
1037 worker_rss_hard_kill_kib: u64,
1038 worker_rss_sample_millis: u64,
1039 import_switch_rss_soft_kib: u64,
1040 module_cache_rss_guard_kib: u64,
1041 module_cache_max_bytes: u64,
1042 request_timeout_millis: u64,
1043 mailbox_capacity: usize,
1044 max_restarts_per_window: usize,
1045 restart_window: Duration,
1046 toolchain_advisories: Vec<String>,
1052}
1053
1054impl ActorConfig {
1055 fn from_meta(
1062 meta: &LakeProjectMeta,
1063 session_id: String,
1064 runtime: Arc<Mutex<RuntimeSnapshot>>,
1065 healthy: Arc<AtomicBool>,
1066 runtime_config: ProjectRuntimeConfig,
1067 ) -> Result<(Self, Vec<String>)> {
1068 let toolchain_id = ToolchainId::parse(&meta.toolchain).map_err(|e| ServerError::BadProject(e.to_string()))?;
1069 let (worker_path, lean_sysroot, open_warnings) = match WorkerBinary::resolve_ready_for(&toolchain_id) {
1070 Readiness::Ready {
1071 worker,
1072 lean_sysroot,
1073 note,
1074 } => (worker.path, lean_sysroot, note.into_iter().collect()),
1075 Readiness::UnknownPin {
1076 pin,
1077 worker,
1078 lean_sysroot,
1079 } => (
1080 worker.path,
1081 lean_sysroot,
1082 vec![format!(
1083 "lean-toolchain pins {pin}, which is not a recognized lean-rs supported version \
1084 (e.g. a nightly); proceeding, but the host cannot vouch for ABI compatibility"
1085 )],
1086 ),
1087 Readiness::Unsupported { window, nearest } => {
1088 return Err(ServerError::BadProject(format!(
1089 "lean-toolchain pins {toolchain_id}, outside the lean-rs supported window {window}; \
1090 nearest supported: {nearest}. Pin a supported toolchain (or bump lean-rs) and reopen."
1091 )));
1092 }
1093 Readiness::Stale { toolchain, install_cmd } => {
1094 return Err(ServerError::BadProject(format!(
1095 "worker for {toolchain} was built against a different lean.h than the toolchain now \
1096 provides (header drift); rebuild it: {install_cmd}"
1097 )));
1098 }
1099 Readiness::Unusable {
1100 toolchain,
1101 detail,
1102 install_cmd,
1103 } => {
1104 return Err(ServerError::BadProject(format!(
1105 "worker for {toolchain} failed its runtime smoke test ({detail}); the toolchain's \
1106 libleanshared is ABI-incompatible with this lean-rs build and cannot be served. \
1107 Pin a supported toolchain the host can run, or rebuild lean-rs and reinstall: {install_cmd}"
1108 )));
1109 }
1110 Readiness::NotInstalled { toolchain, install_cmd } => {
1111 return Err(ServerError::BadProject(format!(
1112 "no worker binary for toolchain {toolchain}; run: {install_cmd}"
1113 )));
1114 }
1115 Readiness::ToolchainNotInstalled { toolchain, elan_dir } => {
1116 return Err(ServerError::BadProject(format!(
1117 "elan toolchain {toolchain} is not installed (expected {})",
1118 elan_dir.display()
1119 )));
1120 }
1121 };
1122 tracing::debug!(
1123 toolchain = %toolchain_id,
1124 worker = %worker_path.display(),
1125 sysroot = %lean_sysroot.display(),
1126 "resolved ready worker binary"
1127 );
1128 let config = Self {
1129 lake_root: meta.canonical_root.clone(),
1130 manifest_hash: meta.manifest_hash.clone(),
1131 toolchain_label: meta.toolchain.clone(),
1132 worker_path,
1133 lean_sysroot,
1134 session_id,
1135 runtime,
1136 healthy,
1137 worker_rss_post_job_restart_kib: runtime_config.worker_rss_post_job_restart_kib(),
1138 worker_rss_hard_kill_kib: runtime_config.worker_rss_hard_kill_kib(),
1139 worker_rss_sample_millis: runtime_config.worker_rss_sample_millis(),
1140 import_switch_rss_soft_kib: runtime_config.import_switch_rss_soft_kib(),
1141 module_cache_rss_guard_kib: runtime_config.module_cache_rss_guard_kib(),
1142 module_cache_max_bytes: runtime_config.module_cache_max_bytes(),
1143 request_timeout_millis: runtime_config.request_timeout_millis(),
1144 mailbox_capacity: runtime_config.mailbox_capacity(),
1145 max_restarts_per_window: runtime_config.max_restarts_per_window(),
1146 restart_window: runtime_config.restart_window(),
1147 toolchain_advisories: open_warnings.clone(),
1148 };
1149 Ok((config, open_warnings))
1150 }
1151}
1152
1153struct ProjectActorState {
1154 config: ActorConfig,
1155 handle: LeanWorkerHostHandle,
1156 worker_generation_base: u64,
1157 last_restart: Option<RuntimeRestartEvent>,
1158 last_import_fingerprint: Option<String>,
1159 profile_switch_count: u64,
1160 last_rss_kib: Option<u64>,
1161 runtime: Arc<Mutex<RuntimeSnapshot>>,
1162 abnormal_restart_times: VecDeque<Instant>,
1163 restart_stats: RestartStats,
1164}
1165
1166impl ProjectActorState {
1167 fn handle_message(&mut self, message: ProjectMessage) {
1168 match message {
1169 ProjectMessage::ModuleQuery {
1170 meta,
1171 source,
1172 query,
1173 options,
1174 reply,
1175 } => {
1176 let result = self.run_job(meta, |handle, imports| {
1177 handle.process_module_query_with_imports(imports, &source, &query, &options, None, None)
1178 });
1179 let _ = reply.send(result);
1180 }
1181 ProjectMessage::ModuleQueryBatch {
1182 meta,
1183 source,
1184 selectors,
1185 budgets,
1186 options,
1187 reply,
1188 } => {
1189 let result = self.run_job(meta, |handle, imports| {
1190 handle.process_module_query_batch_with_imports(
1191 imports, &source, &selectors, &budgets, &options, None, None,
1192 )
1193 });
1194 let _ = reply.send(result);
1195 }
1196 ProjectMessage::DeclarationInspection { meta, request, reply } => {
1197 let result = self.run_job(meta, |handle, imports| {
1198 handle.inspect_declaration_with_imports(imports, &request, None, None)
1199 });
1200 let _ = reply.send(result);
1201 }
1202 ProjectMessage::DeclarationSearch { meta, request, reply } => {
1203 let result = self.run_job(meta, |handle, imports| {
1204 handle.search_declarations_with_imports(imports, &request, None, None)
1205 });
1206 let _ = reply.send(result);
1207 }
1208 ProjectMessage::ProofAttempt {
1209 meta,
1210 request,
1211 options,
1212 reply,
1213 } => {
1214 let result = self.run_job(meta, |handle, imports| {
1215 handle.attempt_proof_with_imports(imports, &request, &options, None, None)
1216 });
1217 let _ = reply.send(result);
1218 }
1219 ProjectMessage::DeclarationVerification {
1220 meta,
1221 request,
1222 options,
1223 reply,
1224 } => {
1225 let result = self.run_job(meta, |handle, imports| {
1226 handle.verify_declaration_with_imports(imports, &request, &options, None, None)
1227 });
1228 let _ = reply.send(result);
1229 }
1230 ProjectMessage::SemanticProofSearch { meta, request, reply } => {
1231 let result = self.run_semantic_job(meta, &request);
1232 let _ = reply.send(result);
1233 }
1234 }
1235 }
1236
1237 fn run_job<R>(
1238 &mut self,
1239 meta: JobMeta,
1240 job: impl Fn(&mut LeanWorkerHostHandle, Vec<String>) -> std::result::Result<R, LeanWorkerError>,
1241 ) -> Result<ProjectCall<R>> {
1242 let _span = tracing::debug_span!(
1245 "job",
1246 session_id = %self.config.session_id,
1247 imports = meta.imports.len(),
1248 queue_wait_millis = millis_u64(meta.queued_at.elapsed()),
1249 )
1250 .entered();
1251 let queue_wait_millis = millis_u64(meta.queued_at.elapsed());
1252 let generation_before = self.observed_generation();
1253 let mut call_restart = self.cycle_before_import_switch_if_needed(&meta)?;
1254 let mut lifecycle_baseline = self.handle.lifecycle_snapshot();
1255
1256 let max_retries = meta.retry_policy.retries();
1257 let mut retry_count = 0_u32;
1258 loop {
1259 match job(&mut self.handle, meta.imports.clone()) {
1260 Ok(value) => {
1261 if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1262 call_restart = Some(event);
1263 }
1264 self.last_import_fingerprint = Some(meta.import_fingerprint.clone());
1265 self.last_rss_kib = self.handle.rss_kib().or(self.last_rss_kib);
1266 if let Some(event) = self.cycle_after_post_job_rss_if_needed(&meta)? {
1267 call_restart = Some(event);
1268 }
1269 let runtime =
1270 self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1271 tracing::debug!(
1272 retry_count,
1273 rss_kib = ?runtime.rss_kib,
1274 worker_generation = runtime.worker_generation,
1275 "job complete"
1276 );
1277 self.publish_runtime(&runtime);
1278 return Ok(ProjectCall::new(value, runtime));
1279 }
1280 Err(err) if worker_error_is_recoverable_death(&err) && retry_count < max_retries => {
1281 self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)?;
1282 let first_reason = err.to_string();
1283 call_restart =
1284 Some(self.rebuild_after_worker_death(first_reason, worker_death_cause(&err), &meta)?);
1285 lifecycle_baseline = self.handle.lifecycle_snapshot();
1286 retry_count = retry_count.saturating_add(1);
1287 }
1288 Err(err) if worker_error_is_recoverable_death(&err) => {
1289 if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1290 call_restart = Some(event);
1291 }
1292 let reason = format!("worker_died_after_retry: {err}");
1293 let generation = self.observed_generation();
1294 let runtime =
1295 self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1296 self.publish_runtime(&runtime);
1297 return Err(self.worker_unavailable_for(
1298 &meta,
1299 reason,
1300 true,
1301 generation > generation_before,
1302 Some(worker_death_cause(&err)),
1303 None,
1304 None,
1305 ));
1306 }
1307 Err(err) if worker_error_is_session_missing(&err) && retry_count < max_retries => {
1308 self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)?;
1309 call_restart = Some(self.rebuild_after_worker_death(
1310 format!("session_missing: {err}"),
1311 RestartCause::SessionMissing,
1312 &meta,
1313 )?);
1314 lifecycle_baseline = self.handle.lifecycle_snapshot();
1315 retry_count = retry_count.saturating_add(1);
1316 }
1317 Err(err) if worker_error_is_session_missing(&err) => {
1318 if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1319 call_restart = Some(event);
1320 }
1321 let generation = self.observed_generation();
1322 let runtime =
1323 self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1324 self.publish_runtime(&runtime);
1325 return Err(self.worker_unavailable_for(
1326 &meta,
1327 format!("session_missing: {err}"),
1328 true,
1329 generation > generation_before,
1330 Some(RestartCause::SessionMissing),
1331 None,
1332 None,
1333 ));
1334 }
1335 Err(LeanWorkerError::RssHardLimitExceeded {
1336 operation,
1337 current_kib,
1338 limit_kib,
1339 ..
1340 }) => {
1341 if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1342 call_restart = Some(event);
1343 }
1344 let runtime =
1345 self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1346 self.publish_runtime(&runtime);
1347 return Err(self.worker_unavailable_for(
1348 &meta,
1349 format!(
1350 "rss_hard_limit_exceeded operation={operation} current_kib={current_kib} limit_kib={limit_kib}"
1351 ),
1352 false,
1353 true,
1354 Some(RestartCause::RssHardLimit),
1355 Some(limit_kib),
1356 None,
1357 ));
1358 }
1359 Err(err) if matches!(err, LeanWorkerError::Timeout { .. }) => {
1360 if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1361 call_restart = Some(event);
1362 }
1363 let generation = self.observed_generation();
1364 let runtime =
1365 self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1366 self.publish_runtime(&runtime);
1367 return Err(self.worker_unavailable_for(
1368 &meta,
1369 format!("timeout: {err}"),
1370 true,
1371 generation > generation_before,
1372 Some(RestartCause::Timeout),
1373 None,
1374 None,
1375 ));
1376 }
1377 Err(err) => {
1378 self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)?;
1379 self.last_import_fingerprint = Some(meta.import_fingerprint.clone());
1380 return Err(map_worker_err(err));
1381 }
1382 }
1383 }
1384 }
1385
1386 fn run_semantic_job(
1387 &self,
1388 meta: JobMeta,
1389 request: &SemanticProofSearchRequest,
1390 ) -> Result<ProjectCall<SemanticProofSearchResult>> {
1391 let _span = tracing::debug_span!(
1392 "semantic_job",
1393 session_id = %self.config.session_id,
1394 imports = meta.imports.len(),
1395 queue_wait_millis = millis_u64(meta.queued_at.elapsed()),
1396 )
1397 .entered();
1398 let queue_wait_millis = millis_u64(meta.queued_at.elapsed());
1399 let generation_before = self.observed_generation();
1400 let mut capability = self.open_semantic_capability(&meta)?;
1401 let result = {
1402 let mut session = capability
1403 .open_session_with_imports(meta.imports.clone(), None, None)
1404 .map_err(map_worker_err)?;
1405 crate::semantic_search::run_semantic_proof_search(&mut session, request)
1406 };
1407 let runtime = self.runtime_facts(&meta, generation_before, 0, queue_wait_millis, None);
1408 self.publish_runtime(&runtime);
1409 result.map(|value| ProjectCall::new(value, runtime))
1410 }
1411
1412 fn open_semantic_capability(&self, meta: &JobMeta) -> Result<lean_rs_worker_parent::LeanWorkerCapability> {
1413 let runtime = lean_semantic_search_runtime::build_cached(SemanticSearchRuntimeBuild {
1414 cache_root: semantic_runtime_cache_root()?,
1415 toolchain_label: self.config.toolchain_label.clone(),
1416 lean_sysroot: self.config.lean_sysroot.clone(),
1417 })
1418 .map_err(|err| {
1419 self.worker_unavailable_for(
1420 meta,
1421 format!("semantic runtime build failed for this toolchain: {err}"),
1422 true,
1423 false,
1424 None,
1425 None,
1426 None,
1427 )
1428 })?;
1429 semantic_capability_builder(&self.config, &runtime.built)?
1430 .open()
1431 .map_err(|err| {
1432 self.worker_unavailable_for(
1433 meta,
1434 format!(
1435 "semantic capability open failed for this toolchain: {}",
1436 map_worker_err(err)
1437 ),
1438 true,
1439 false,
1440 None,
1441 None,
1442 None,
1443 )
1444 })
1445 }
1446
1447 fn cycle_before_import_switch_if_needed(&mut self, meta: &JobMeta) -> Result<Option<RuntimeRestartEvent>> {
1448 let Some(previous_import_fingerprint) = self.last_import_fingerprint.as_deref() else {
1449 return Ok(None);
1450 };
1451 if previous_import_fingerprint == meta.import_fingerprint {
1452 return Ok(None);
1453 }
1454 self.profile_switch_count = self.profile_switch_count.saturating_add(1);
1455 let Some(current_kib) = self.handle.rss_kib() else {
1456 return Ok(None);
1457 };
1458 self.last_rss_kib = Some(current_kib);
1459 if current_kib < self.config.import_switch_rss_soft_kib {
1460 return Ok(None);
1461 }
1462 let limit_kib = self.config.import_switch_rss_soft_kib;
1463 let reason = format!("rss_import_switch current_kib={current_kib} limit_kib={limit_kib}");
1464 self.record_restart_or_stop(RestartCause::RssImportSwitch, &reason)
1465 .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1466 self.handle.cycle().map_err(map_worker_err)?;
1467 self.last_rss_kib = self.handle.rss_kib().or(Some(current_kib));
1468 let event = restart_event(
1469 RestartCause::RssImportSwitch,
1470 reason,
1471 self.observed_generation(),
1472 Some(current_kib),
1473 Some(limit_kib),
1474 );
1475 self.record_restart(event.clone());
1476 Ok(Some(event))
1477 }
1478
1479 fn cycle_after_post_job_rss_if_needed(&mut self, meta: &JobMeta) -> Result<Option<RuntimeRestartEvent>> {
1480 let Some(current_kib) = self.handle.rss_kib() else {
1481 return Ok(None);
1482 };
1483 self.last_rss_kib = Some(current_kib);
1484 let limit_kib = self.config.worker_rss_post_job_restart_kib;
1485 tracing::debug!(rss_kib = current_kib, limit_kib, "post-job rss check");
1486 if current_kib < limit_kib {
1487 return Ok(None);
1488 }
1489 let reason = format!("rss_post_job current_kib={current_kib} limit_kib={limit_kib}");
1490 self.record_restart_or_stop(RestartCause::RssPostJob, &reason)
1491 .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1492 self.handle.cycle().map_err(map_worker_err)?;
1493 self.last_rss_kib = self.handle.rss_kib().or(Some(current_kib));
1494 let event = restart_event(
1495 RestartCause::RssPostJob,
1496 reason,
1497 self.observed_generation(),
1498 Some(current_kib),
1499 Some(limit_kib),
1500 );
1501 self.record_restart(event.clone());
1502 Ok(Some(event))
1503 }
1504
1505 fn rebuild_after_worker_death(
1506 &mut self,
1507 reason: String,
1508 cause: RestartCause,
1509 meta: &JobMeta,
1510 ) -> Result<RuntimeRestartEvent> {
1511 self.record_restart_or_stop(cause, &reason)
1512 .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1513 let next_generation = self.observed_generation().saturating_add(1);
1514 let (handle, _) = open_worker(&self.config, false)?;
1515 self.handle = handle;
1516 self.worker_generation_base = next_generation;
1517 self.last_rss_kib = self.handle.rss_kib().or(self.last_rss_kib);
1518 let event = restart_event(cause, reason, self.observed_generation(), self.last_rss_kib, None);
1519 self.record_restart(event.clone());
1520 Ok(event)
1521 }
1522
1523 fn account_lifecycle_restarts_since(
1524 &mut self,
1525 before: &LeanWorkerLifecycleSnapshot,
1526 meta: &JobMeta,
1527 ) -> Result<Option<RuntimeRestartEvent>> {
1528 let after = self.handle.lifecycle_snapshot();
1529 let restarted = after.restarts.saturating_sub(before.restarts);
1530 if restarted == 0 {
1531 self.last_rss_kib = after.last_rss_kib.or(self.last_rss_kib);
1532 return Ok(None);
1533 }
1534 let (cause, reason) = after.last_restart_reason.as_ref().map_or_else(
1535 || (RestartCause::WorkerInternal, "worker_internal_restart".to_owned()),
1536 |reason| (restart_cause_from_worker(reason), restart_reason_text(reason)),
1537 );
1538 for _ in 0..restarted {
1539 self.record_restart_or_stop(cause, &reason)
1540 .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1541 }
1542 self.last_rss_kib = after.last_rss_kib.or(self.last_rss_kib);
1543 let event = restart_event(cause, reason, self.observed_generation(), self.last_rss_kib, None);
1544 self.record_restart(event.clone());
1545 Ok(Some(event))
1546 }
1547
1548 fn record_restart(&mut self, event: RuntimeRestartEvent) {
1554 self.restart_stats.observe(&event.cause);
1555 log_restart(&event, self.restart_stats.total);
1556 self.last_restart = Some(event);
1557 }
1558
1559 fn record_restart_or_stop(
1560 &mut self,
1561 cause: RestartCause,
1562 reason: &str,
1563 ) -> std::result::Result<(), RestartLimitExceeded> {
1564 if !cause.counts_toward_restart_limit() {
1565 return Ok(());
1566 }
1567 let now = Instant::now();
1568 while self
1569 .abnormal_restart_times
1570 .front()
1571 .is_some_and(|seen| now.saturating_duration_since(*seen) > self.config.restart_window)
1572 {
1573 self.abnormal_restart_times.pop_front();
1574 }
1575 if self.abnormal_restart_times.len() >= self.config.max_restarts_per_window {
1576 self.config.healthy.store(false, Ordering::Release);
1577 tracing::warn!(
1578 cause = cause.as_str(),
1579 restarts_in_window = self.abnormal_restart_times.len(),
1580 window_millis = millis_u64(self.config.restart_window),
1581 "restart limit exceeded; marking project unhealthy"
1582 );
1583 let message = format!(
1584 "restart_limit_exceeded after {} restarts in {:?}; latest: {reason}",
1585 self.config.max_restarts_per_window, self.config.restart_window
1586 );
1587 let event = restart_event(
1588 cause,
1589 message.clone(),
1590 self.observed_generation(),
1591 self.last_rss_kib,
1592 None,
1593 );
1594 self.record_restart(event.clone());
1595 self.publish_runtime(&RuntimeFacts {
1596 worker_generation: self.observed_generation(),
1597 worker_restarted: false,
1598 retry_count: MAX_JOB_RETRIES,
1599 admission_wait_millis: 0,
1600 queue_wait_millis: 0,
1601 call_restart: None,
1602 last_restart: Some(event),
1603 rss_kib: self.last_rss_kib,
1604 worker_lanes: 1,
1605 import_profile: self.last_import_fingerprint.clone(),
1606 profile_switch_count: self.profile_switch_count,
1607 restarts_total: self.restart_stats.total,
1608 restarts_by_cause: self.restart_stats.by_cause.clone(),
1609 });
1610 return Err(RestartLimitExceeded {
1611 message,
1612 cause,
1613 restarts_in_window: self.abnormal_restart_times.len() as u64,
1614 window_millis: millis_u64(self.config.restart_window),
1615 });
1616 }
1617 self.abnormal_restart_times.push_back(now);
1618 Ok(())
1619 }
1620
1621 fn observed_generation(&self) -> u64 {
1622 self.worker_generation_base
1623 .saturating_add(self.handle.lifecycle_snapshot().worker_generation)
1624 }
1625
1626 fn runtime_facts(
1627 &self,
1628 meta: &JobMeta,
1629 generation_before: u64,
1630 retry_count: u32,
1631 queue_wait_millis: u64,
1632 call_restart: Option<RuntimeRestartEvent>,
1633 ) -> RuntimeFacts {
1634 let generation = self.observed_generation();
1635 let snapshot = self.handle.lifecycle_snapshot();
1636 RuntimeFacts {
1637 worker_generation: generation,
1638 worker_restarted: call_restart.is_some() || generation > generation_before,
1639 retry_count,
1640 admission_wait_millis: meta.admission_wait_millis,
1641 queue_wait_millis,
1642 call_restart,
1643 last_restart: self.last_restart.clone(),
1644 rss_kib: snapshot.last_rss_kib.or(self.last_rss_kib),
1645 worker_lanes: 1,
1646 import_profile: Some(meta.import_fingerprint.clone()),
1647 profile_switch_count: self.profile_switch_count,
1648 restarts_total: self.restart_stats.total,
1649 restarts_by_cause: self.restart_stats.by_cause.clone(),
1650 }
1651 }
1652
1653 fn publish_runtime(&self, runtime: &RuntimeFacts) {
1654 *self.runtime.lock() = RuntimeSnapshot {
1655 worker_generation: runtime.worker_generation,
1656 last_restart: runtime.last_restart.clone().or_else(|| runtime.call_restart.clone()),
1657 rss_kib: runtime.rss_kib,
1658 import_profile: runtime.import_profile.clone(),
1659 profile_switch_count: runtime.profile_switch_count,
1660 restarts_total: runtime.restarts_total,
1661 restarts_by_cause: runtime.restarts_by_cause.clone(),
1662 };
1663 }
1664
1665 fn worker_unavailable_for(
1666 &self,
1667 meta: &JobMeta,
1668 reason: String,
1669 retryable: bool,
1670 worker_restarted: bool,
1671 cause: Option<RestartCause>,
1672 limit_kib: Option<u64>,
1673 retry_after_millis: Option<u64>,
1674 ) -> ServerError {
1675 let snapshot = self.runtime.lock().facts();
1676 ServerError::worker_unavailable(WorkerUnavailable {
1677 retryable,
1678 worker_restarted,
1679 project_root: self.config.lake_root.to_string_lossy().into_owned(),
1680 project_hash: self.config.manifest_hash.clone(),
1681 imports: meta.imports.clone(),
1682 session_id: self.config.session_id.clone(),
1683 lean_toolchain: self.config.toolchain_label.clone(),
1684 worker_generation: self.observed_generation(),
1685 restart_cause: cause.map(|cause| cause.as_str().to_owned()),
1686 rss_kib: self.last_rss_kib,
1687 limit_kib,
1688 retry_after_millis,
1689 restarts_in_window: Some(self.abnormal_restart_times.len() as u64),
1690 window_millis: Some(millis_u64(self.config.restart_window)),
1691 runtime: snapshot,
1692 reason,
1693 toolchain_advisories: self.config.toolchain_advisories.clone(),
1694 })
1695 }
1696
1697 fn restart_limit_error(&self, imports: &[String], limit: RestartLimitExceeded) -> ServerError {
1698 let snapshot = self.runtime.lock().facts();
1699 ServerError::worker_unavailable(WorkerUnavailable {
1700 retryable: false,
1701 worker_restarted: false,
1702 project_root: self.config.lake_root.to_string_lossy().into_owned(),
1703 project_hash: self.config.manifest_hash.clone(),
1704 imports: imports.to_vec(),
1705 session_id: self.config.session_id.clone(),
1706 lean_toolchain: self.config.toolchain_label.clone(),
1707 worker_generation: self.observed_generation(),
1708 reason: limit.message,
1709 restart_cause: Some(limit.cause.as_str().to_owned()),
1710 rss_kib: self.last_rss_kib,
1711 limit_kib: None,
1712 retry_after_millis: Some(limit.window_millis),
1713 restarts_in_window: Some(limit.restarts_in_window),
1714 window_millis: Some(limit.window_millis),
1715 runtime: snapshot,
1716 toolchain_advisories: self.config.toolchain_advisories.clone(),
1717 })
1718 }
1719}
1720
1721struct RestartLimitExceeded {
1722 message: String,
1723 cause: RestartCause,
1724 restarts_in_window: u64,
1725 window_millis: u64,
1726}
1727
1728fn actor_main(
1729 config: ActorConfig,
1730 init_reply: std::sync::mpsc::Sender<std::result::Result<(String, mpsc::Sender<ProjectMessage>), ServerError>>,
1731) {
1732 let (handle, runtime_toolchain) = match open_worker(&config, true) {
1733 Ok(value) => value,
1734 Err(err) => {
1735 let _ = init_reply.send(Err(err));
1736 return;
1737 }
1738 };
1739 let mut state = ProjectActorState {
1740 config: config.clone(),
1741 handle,
1742 worker_generation_base: 1,
1743 last_restart: None,
1744 last_import_fingerprint: None,
1745 profile_switch_count: 0,
1746 last_rss_kib: None,
1747 runtime: Arc::clone(&config.runtime),
1748 abnormal_restart_times: VecDeque::new(),
1749 restart_stats: RestartStats::default(),
1750 };
1751
1752 let (tx, mut rx) = mpsc::channel::<ProjectMessage>(config.mailbox_capacity);
1753 if init_reply.send(Ok((runtime_toolchain, tx))).is_err() {
1754 return;
1755 }
1756
1757 while let Some(message) = rx.blocking_recv() {
1758 state.handle_message(message);
1759 }
1760}
1761
1762fn open_worker(config: &ActorConfig, preflight: bool) -> Result<(LeanWorkerHostHandle, String)> {
1763 let builder = worker_builder(config);
1764 if preflight {
1765 let report = builder.check();
1766 if let Some(first) = report.first_error() {
1767 if bootstrap_failure_is_hard_rss(&first.code().to_string(), first.message()) {
1768 return Err(bootstrap_hard_rss_unavailable(
1769 config,
1770 first.message().to_owned(),
1771 parse_keyed_u64(first.message(), "current_kib"),
1772 parse_keyed_u64(first.message(), "limit_kib").or(Some(config.worker_rss_hard_kill_kib)),
1773 ));
1774 }
1775 return Err(ServerError::BadProject(format!(
1776 "{}: {}",
1777 first.code(),
1778 first.message()
1779 )));
1780 }
1781 }
1782 let handle = match builder.open() {
1783 Ok(handle) => handle,
1784 Err(LeanWorkerError::RssHardLimitExceeded {
1785 operation,
1786 current_kib,
1787 limit_kib,
1788 ..
1789 }) => {
1790 return Err(bootstrap_hard_rss_unavailable(
1791 config,
1792 format!(
1793 "rss_hard_limit_exceeded operation={operation} current_kib={current_kib} limit_kib={limit_kib}"
1794 ),
1795 Some(current_kib),
1796 Some(limit_kib),
1797 ));
1798 }
1799 Err(err) => return Err(map_worker_err(err)),
1800 };
1801 let runtime_toolchain = handle
1802 .runtime_metadata()
1803 .lean_version
1804 .unwrap_or_else(|| config.toolchain_label.clone());
1805 Ok((handle, runtime_toolchain))
1806}
1807
1808fn bootstrap_failure_is_hard_rss(code: &str, message: &str) -> bool {
1809 let lower = message.to_lowercase();
1810 code.contains("rss")
1811 || lower.contains("hard rss limit")
1812 || lower.contains("rss_hard_limit")
1813 || lower.contains("rss_hard_limit_exceeded")
1814 || lower.contains("rss hard limit")
1815}
1816
1817fn bootstrap_hard_rss_unavailable(
1818 config: &ActorConfig,
1819 reason: String,
1820 current_kib: Option<u64>,
1821 limit_kib: Option<u64>,
1822) -> ServerError {
1823 config.healthy.store(false, Ordering::Release);
1824 let generation = config.runtime.lock().worker_generation;
1825 let event = restart_event(
1826 RestartCause::RssHardLimit,
1827 reason.clone(),
1828 generation,
1829 current_kib,
1830 limit_kib,
1831 );
1832 let restarts_total = 1;
1836 log_restart(&event, restarts_total);
1837 let by_cause = BTreeMap::from([(RestartCause::RssHardLimit.as_str().to_owned(), restarts_total)]);
1838 let runtime = RuntimeFacts {
1839 worker_generation: generation,
1840 worker_restarted: true,
1841 retry_count: 0,
1842 admission_wait_millis: 0,
1843 queue_wait_millis: 0,
1844 call_restart: Some(event.clone()),
1845 last_restart: Some(event.clone()),
1846 rss_kib: current_kib,
1847 worker_lanes: 1,
1848 import_profile: None,
1849 profile_switch_count: 0,
1850 restarts_total,
1851 restarts_by_cause: by_cause.clone(),
1852 };
1853 *config.runtime.lock() = RuntimeSnapshot {
1854 worker_generation: generation,
1855 last_restart: Some(event),
1856 rss_kib: current_kib,
1857 import_profile: None,
1858 profile_switch_count: 0,
1859 restarts_total,
1860 restarts_by_cause: by_cause,
1861 };
1862 ServerError::worker_unavailable(WorkerUnavailable {
1863 retryable: false,
1864 worker_restarted: true,
1865 project_root: config.lake_root.to_string_lossy().into_owned(),
1866 project_hash: config.manifest_hash.clone(),
1867 imports: Vec::new(),
1868 session_id: config.session_id.clone(),
1869 lean_toolchain: config.toolchain_label.clone(),
1870 worker_generation: generation,
1871 reason,
1872 restart_cause: Some(RestartCause::RssHardLimit.as_str().to_owned()),
1873 rss_kib: current_kib,
1874 limit_kib,
1875 retry_after_millis: None,
1876 restarts_in_window: None,
1877 window_millis: None,
1878 runtime,
1879 toolchain_advisories: config.toolchain_advisories.clone(),
1880 })
1881}
1882
1883fn worker_builder(config: &ActorConfig) -> LeanWorkerHostHandleBuilder {
1884 let restart_policy = LeanWorkerRestartPolicy::default().max_requests(WORKER_REQUEST_RESTARTS);
1885 let module_cache_limits = module_cache_limits(config);
1886 LeanWorkerHostHandleBuilder::shims_only(&config.lake_root, std::iter::empty::<String>())
1887 .worker_child(LeanWorkerChild::for_toolchain(
1888 config.worker_path.clone(),
1889 config.lean_sysroot.clone(),
1890 ))
1891 .startup_timeout(Duration::from_secs(30))
1892 .request_timeout(Duration::from_millis(config.request_timeout_millis))
1893 .restart_policy(restart_policy)
1894 .rss_hard_limit(
1895 config.worker_rss_hard_kill_kib,
1896 Duration::from_millis(config.worker_rss_sample_millis),
1897 )
1898 .module_cache_limits(module_cache_limits)
1899}
1900
1901fn semantic_capability_builder(
1902 config: &ActorConfig,
1903 built: &lean_toolchain::LeanBuiltCapability,
1904) -> Result<LeanWorkerCapabilityBuilder> {
1905 let restart_policy = LeanWorkerRestartPolicy::default().max_requests(WORKER_REQUEST_RESTARTS);
1906 let builder = LeanWorkerCapabilityBuilder::from_built_capability(built, std::iter::empty::<String>())
1907 .map_err(map_worker_err)?
1908 .import_workspace_root(config.lake_root.clone())
1909 .worker_child(LeanWorkerChild::for_toolchain(
1910 config.worker_path.clone(),
1911 config.lean_sysroot.clone(),
1912 ))
1913 .startup_timeout(Duration::from_secs(30))
1914 .request_timeout(Duration::from_millis(config.request_timeout_millis))
1915 .restart_policy(restart_policy)
1916 .rss_hard_limit(
1917 config.worker_rss_hard_kill_kib,
1918 Duration::from_millis(config.worker_rss_sample_millis),
1919 )
1920 .module_cache_limits(module_cache_limits(config))
1921 .json_command_export(lean_semantic_search_capability::DECLARATION_FEATURES_EXPORT)
1922 .json_command_export(lean_semantic_search_capability::PROOF_GOAL_FEATURES_EXPORT);
1923 Ok(builder)
1924}
1925
1926fn semantic_runtime_cache_root() -> Result<PathBuf> {
1927 let cache_dir =
1928 dirs::cache_dir().ok_or_else(|| ServerError::Internal("could not resolve user cache directory".to_owned()))?;
1929 Ok(cache_dir.join("lean-host-mcp").join("semantic-runtimes"))
1930}
1931
1932fn module_cache_limits(config: &ActorConfig) -> LeanWorkerModuleCacheLimits {
1933 LeanWorkerModuleCacheLimits::default()
1934 .rss_guard_kib(config.module_cache_rss_guard_kib)
1935 .max_bytes(config.module_cache_max_bytes)
1936}
1937
1938fn actor_thread_name(canonical_root: &Path) -> String {
1939 let basename = canonical_root.file_name().and_then(|s| s.to_str()).unwrap_or("project");
1940 format!("lean-host-mcp/project/{basename}")
1941}
1942
1943fn worker_error_is_recoverable_death(err: &LeanWorkerError) -> bool {
1944 matches!(
1945 err,
1946 LeanWorkerError::ChildExited { .. } | LeanWorkerError::ChildPanicOrAbort { .. }
1947 )
1948}
1949
1950fn worker_error_is_session_missing(err: &LeanWorkerError) -> bool {
1951 matches!(err, LeanWorkerError::Worker { code, .. } if code == "lean_rs.worker.session_missing")
1952}
1953
1954#[allow(
1955 clippy::wildcard_enum_match_arm,
1956 reason = "only worker process death variants are restart causes; all other errors are classified elsewhere"
1957)]
1958fn worker_death_cause(err: &LeanWorkerError) -> RestartCause {
1959 match err {
1960 LeanWorkerError::ChildPanicOrAbort { .. } => RestartCause::ChildAbort,
1961 LeanWorkerError::ChildExited { .. } => RestartCause::ChildExit,
1962 _ => RestartCause::WorkerInternal,
1963 }
1964}
1965
1966fn restart_reason_text(reason: &LeanWorkerRestartReason) -> String {
1967 match reason {
1968 LeanWorkerRestartReason::Explicit => RestartCause::Explicit.as_str().to_owned(),
1969 LeanWorkerRestartReason::MaxRequests { limit } => format!("max_requests limit={limit}"),
1970 LeanWorkerRestartReason::MaxImports { limit } => format!("max_imports limit={limit}"),
1971 LeanWorkerRestartReason::RssCeiling {
1972 current_kib, limit_kib, ..
1973 } => {
1974 format!("rss_ceiling current_kib={current_kib} limit_kib={limit_kib}")
1975 }
1976 LeanWorkerRestartReason::RssHardLimit {
1977 operation,
1978 current_kib,
1979 limit_kib,
1980 ..
1981 } => {
1982 format!("rss_hard_limit operation={operation} current_kib={current_kib} limit_kib={limit_kib}")
1983 }
1984 LeanWorkerRestartReason::Idle { idle_for, limit } => {
1985 format!(
1986 "idle idle_for_millis={} limit_millis={}",
1987 millis_u64(*idle_for),
1988 millis_u64(*limit)
1989 )
1990 }
1991 LeanWorkerRestartReason::Cancelled { operation } => format!("cancelled operation={operation}"),
1992 LeanWorkerRestartReason::ChildAbort { operation } => format!("child_abort operation={operation}"),
1993 LeanWorkerRestartReason::RequestTimeout { operation, duration } => {
1994 format!(
1995 "timeout operation={operation} duration_millis={}",
1996 millis_u64(*duration)
1997 )
1998 }
1999 }
2000}
2001
2002fn import_fingerprint(imports: &[String]) -> String {
2003 imports.join("\n")
2004}
2005
2006fn millis_u64(duration: Duration) -> u64 {
2007 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
2008}
2009
2010fn parse_keyed_u64(text: &str, key: &str) -> Option<u64> {
2011 let prefix = format!("{key}=");
2012 let start = text.find(&prefix)?.checked_add(prefix.len())?;
2013 let digits = text[start..]
2014 .chars()
2015 .take_while(|ch| ch.is_ascii_digit())
2016 .collect::<String>();
2017 digits.parse().ok()
2018}
2019
2020fn parse_nonzero_u64(name: &str, env: Option<&str>, file: Option<u64>, default: u64) -> Result<u64> {
2024 let value = match env {
2025 Some(raw) => raw
2026 .parse::<u64>()
2027 .map_err(|e| ServerError::Internal(format!("{name}={raw:?} not a u64: {e}")))?,
2028 None => file.unwrap_or(default),
2029 };
2030 if value == 0 {
2031 return Err(ServerError::Internal(format!(
2032 "{name} resolved to 0, which is not allowed"
2033 )));
2034 }
2035 Ok(value)
2036}
2037
2038fn parse_nonzero_usize(name: &str, env: Option<&str>, file: Option<usize>, default: usize) -> Result<usize> {
2039 let parsed = parse_nonzero_u64(name, env, file.map(|v| v as u64), default as u64)?;
2040 usize::try_from(parsed).map_err(|_| ServerError::Internal(format!("{name}={parsed} does not fit in usize")))
2041}
2042
2043#[cfg(test)]
2044#[allow(
2045 clippy::expect_used,
2046 clippy::panic,
2047 clippy::unwrap_used,
2048 reason = "unit tests use expect/unwrap_err to state the branch under test directly"
2049)]
2050mod tests {
2051 use super::*;
2052
2053 #[test]
2054 fn runtime_config_parses_runtime_policy_without_env_reads() {
2055 let config = parse_runtime_config(
2058 RuntimeEnv {
2059 worker_rss_post_job_restart_kib: Some("5".to_owned()),
2060 worker_rss_hard_kill_kib: Some("7".to_owned()),
2061 worker_rss_sample_millis: Some("11".to_owned()),
2062 import_switch_rss_soft_kib: Some("3".to_owned()),
2063 module_cache_rss_guard_kib: Some("17".to_owned()),
2064 module_cache_max_bytes: Some("19".to_owned()),
2065 request_timeout_millis: Some("37".to_owned()),
2066 project_mailbox_capacity: Some("23".to_owned()),
2067 worker_restart_limit: Some("29".to_owned()),
2068 worker_restart_window_secs: Some("31".to_owned()),
2069 },
2070 &RuntimeFileConfig::default(),
2071 )
2072 .unwrap();
2073
2074 assert_eq!(config.worker_rss_post_job_restart_kib(), 5);
2075 assert_eq!(config.worker_rss_hard_kill_kib(), 7);
2076 assert_eq!(config.worker_rss_sample_millis(), 11);
2077 assert_eq!(config.import_switch_rss_soft_kib(), 3);
2078 assert_eq!(config.module_cache_rss_guard_kib(), 17);
2079 assert_eq!(config.module_cache_max_bytes(), 19);
2080 assert_eq!(config.request_timeout_millis(), 37);
2081 assert_eq!(config.mailbox_capacity(), 23);
2082 assert_eq!(config.max_restarts_per_window(), 29);
2083 assert_eq!(config.restart_window(), Duration::from_secs(31));
2084 }
2085
2086 #[test]
2087 fn request_timeout_precedence_env_over_file_over_default() {
2088 let file = RuntimeFileConfig {
2089 request_timeout_millis: Some(45_000),
2090 ..RuntimeFileConfig::default()
2091 };
2092 let config = parse_runtime_config(RuntimeEnv::default(), &file).unwrap();
2094 assert_eq!(config.request_timeout_millis(), 45_000);
2095 let env = RuntimeEnv {
2097 request_timeout_millis: Some("90000".to_owned()),
2098 ..RuntimeEnv::default()
2099 };
2100 let config = parse_runtime_config(env, &file).unwrap();
2101 assert_eq!(config.request_timeout_millis(), 90_000);
2102 let config = parse_runtime_config(RuntimeEnv::default(), &RuntimeFileConfig::default()).unwrap();
2104 assert_eq!(config.request_timeout_millis(), REQUEST_TIMEOUT_MILLIS);
2105 }
2106
2107 #[test]
2108 fn request_timeout_zero_is_rejected() {
2109 let err = parse_runtime_config(
2112 RuntimeEnv {
2113 request_timeout_millis: Some("0".to_owned()),
2114 ..RuntimeEnv::default()
2115 },
2116 &RuntimeFileConfig::default(),
2117 )
2118 .unwrap_err();
2119 let ServerError::Internal(message) = err else {
2120 panic!("expected Internal config error");
2121 };
2122 assert!(
2123 message.contains("LEAN_HOST_MCP_REQUEST_TIMEOUT_MILLIS"),
2124 "message: {message}"
2125 );
2126 }
2127
2128 #[test]
2129 fn rss_config_rejects_post_job_above_hard_kill() {
2130 let err = parse_runtime_config(
2131 RuntimeEnv {
2132 worker_rss_post_job_restart_kib: Some("20971520".to_owned()),
2135 ..RuntimeEnv::default()
2136 },
2137 &RuntimeFileConfig::default(),
2138 )
2139 .unwrap_err();
2140 let ServerError::Internal(message) = err else {
2141 panic!("expected Internal config error");
2142 };
2143 assert!(message.contains("invalid RSS config"), "message: {message}");
2144 assert!(message.contains("hard_kill"), "message: {message}");
2145 }
2146
2147 #[test]
2148 fn rss_config_rejects_import_switch_above_post_job() {
2149 let err = parse_runtime_config(
2150 RuntimeEnv {
2151 import_switch_rss_soft_kib: Some("6291456".to_owned()),
2153 ..RuntimeEnv::default()
2154 },
2155 &RuntimeFileConfig::default(),
2156 )
2157 .unwrap_err();
2158 let ServerError::Internal(message) = err else {
2159 panic!("expected Internal config error");
2160 };
2161 assert!(message.contains("invalid RSS config"), "message: {message}");
2162 assert!(message.contains("import_switch"), "message: {message}");
2163 }
2164
2165 #[test]
2166 fn rss_config_accepts_raising_post_job_to_8gib() {
2167 let config = parse_runtime_config(
2170 RuntimeEnv {
2171 worker_rss_post_job_restart_kib: Some("8388608".to_owned()),
2172 ..RuntimeEnv::default()
2173 },
2174 &RuntimeFileConfig::default(),
2175 )
2176 .unwrap();
2177 assert_eq!(config.worker_rss_post_job_restart_kib(), 8_388_608);
2178 }
2179
2180 #[test]
2181 fn runtime_config_precedence_env_over_file_over_default() {
2182 let file = RuntimeFileConfig {
2183 worker_rss_post_job_restart_kib: Some(8_388_608),
2184 ..RuntimeFileConfig::default()
2185 };
2186 let config = parse_runtime_config(RuntimeEnv::default(), &file).unwrap();
2188 assert_eq!(config.worker_rss_post_job_restart_kib(), 8_388_608);
2189 let env = RuntimeEnv {
2191 worker_rss_post_job_restart_kib: Some("6291456".to_owned()),
2192 ..RuntimeEnv::default()
2193 };
2194 let config = parse_runtime_config(env, &file).unwrap();
2195 assert_eq!(config.worker_rss_post_job_restart_kib(), 6_291_456);
2196 let config = parse_runtime_config(RuntimeEnv::default(), &RuntimeFileConfig::default()).unwrap();
2198 assert_eq!(
2199 config.worker_rss_post_job_restart_kib(),
2200 WORKER_RSS_POST_JOB_RESTART_KIB
2201 );
2202 }
2203
2204 #[test]
2205 fn runtime_config_rejects_zero_and_bad_ordering_from_file() {
2206 let zero = RuntimeFileConfig {
2207 worker_rss_sample_millis: Some(0),
2208 ..RuntimeFileConfig::default()
2209 };
2210 assert!(parse_runtime_config(RuntimeEnv::default(), &zero).is_err());
2211
2212 let inverted = RuntimeFileConfig {
2213 worker_rss_post_job_restart_kib: Some(20_971_520), ..RuntimeFileConfig::default()
2215 };
2216 let err = parse_runtime_config(RuntimeEnv::default(), &inverted).unwrap_err();
2217 let ServerError::Internal(message) = err else {
2218 panic!("expected Internal config error");
2219 };
2220 assert!(message.contains("invalid RSS config"), "message: {message}");
2221 }
2222
2223 #[test]
2224 fn restart_stats_tally_total_and_per_cause() {
2225 let mut stats = RestartStats::default();
2226 stats.observe("rss_post_job");
2227 stats.observe("rss_post_job");
2228 stats.observe("child_abort");
2229
2230 assert_eq!(stats.total, 3);
2231 assert_eq!(stats.by_cause.get("rss_post_job"), Some(&2));
2232 assert_eq!(stats.by_cause.get("child_abort"), Some(&1));
2233 assert_eq!(stats.by_cause.get("idle"), None);
2234 }
2235
2236 #[test]
2237 fn semantic_capability_builder_omits_capability_import_module() {
2238 let tmp = tempfile::tempdir().unwrap();
2239 let capability_root = tmp.path().join("capability");
2240 let lib_dir = capability_root.join(".lake").join("build").join("lib");
2241 std::fs::create_dir_all(&lib_dir).unwrap();
2242 let dylib_name = if cfg!(target_os = "macos") {
2243 "libLeanSemanticSearch.dylib"
2244 } else {
2245 "libLeanSemanticSearch.so"
2246 };
2247 let dylib = lib_dir.join(dylib_name);
2248 std::fs::write(&dylib, "").unwrap();
2249 let built = lean_toolchain::LeanBuiltCapability::path(&dylib)
2250 .package("lean_semantic_search")
2251 .module("LeanSemanticSearch");
2252 let runtime = Arc::new(Mutex::new(RuntimeSnapshot {
2253 worker_generation: 1,
2254 last_restart: None,
2255 rss_kib: None,
2256 import_profile: None,
2257 profile_switch_count: 0,
2258 restarts_total: 0,
2259 restarts_by_cause: BTreeMap::new(),
2260 }));
2261 let config = ActorConfig {
2262 lake_root: tmp.path().join("consumer"),
2263 manifest_hash: "sha256-test".to_owned(),
2264 toolchain_label: "leanprover/lean4:test".to_owned(),
2265 worker_path: tmp.path().join("worker"),
2266 lean_sysroot: tmp.path().join("lean"),
2267 session_id: "session-test".to_owned(),
2268 runtime,
2269 healthy: Arc::new(AtomicBool::new(true)),
2270 worker_rss_post_job_restart_kib: WORKER_RSS_POST_JOB_RESTART_KIB,
2271 worker_rss_hard_kill_kib: WORKER_RSS_HARD_KILL_KIB,
2272 worker_rss_sample_millis: WORKER_RSS_SAMPLE_MILLIS,
2273 import_switch_rss_soft_kib: IMPORT_SWITCH_RSS_SOFT_KIB,
2274 module_cache_rss_guard_kib: MODULE_CACHE_RSS_GUARD_KIB,
2275 module_cache_max_bytes: MODULE_CACHE_MAX_BYTES,
2276 request_timeout_millis: REQUEST_TIMEOUT_MILLIS,
2277 mailbox_capacity: PROJECT_MAILBOX_CAPACITY,
2278 max_restarts_per_window: MAX_RESTARTS_PER_WINDOW,
2279 restart_window: RESTART_WINDOW,
2280 toolchain_advisories: Vec::new(),
2281 };
2282
2283 let builder = semantic_capability_builder(&config, &built).unwrap();
2284 let debug = format!("{builder:?}");
2285
2286 assert!(debug.contains("imports: []"), "builder debug: {debug}");
2287 assert!(
2288 !debug.contains("LeanSemanticSearch.Capability"),
2289 "builder must not import the capability module: {debug}"
2290 );
2291 assert!(
2292 debug.contains("import_workspace_root: Some"),
2293 "builder should import sessions against the consumer workspace: {debug}"
2294 );
2295 }
2296
2297 #[test]
2298 fn bootstrap_hard_rss_failure_is_structured_runtime_unavailable() {
2299 let runtime = Arc::new(Mutex::new(RuntimeSnapshot {
2300 worker_generation: 1,
2301 last_restart: None,
2302 rss_kib: None,
2303 import_profile: None,
2304 profile_switch_count: 0,
2305 restarts_total: 0,
2306 restarts_by_cause: BTreeMap::new(),
2307 }));
2308 let config = ActorConfig {
2309 lake_root: PathBuf::from("/tmp/lean-host-mcp-bootstrap-rss-test"),
2310 manifest_hash: "sha256-test".to_owned(),
2311 toolchain_label: "leanprover/lean4:test".to_owned(),
2312 worker_path: PathBuf::from("/tmp/worker"),
2313 lean_sysroot: PathBuf::from("/tmp/lean"),
2314 session_id: "session-test".to_owned(),
2315 runtime: Arc::clone(&runtime),
2316 healthy: Arc::new(AtomicBool::new(true)),
2317 worker_rss_post_job_restart_kib: WORKER_RSS_POST_JOB_RESTART_KIB,
2318 worker_rss_hard_kill_kib: 64,
2319 worker_rss_sample_millis: WORKER_RSS_SAMPLE_MILLIS,
2320 import_switch_rss_soft_kib: IMPORT_SWITCH_RSS_SOFT_KIB,
2321 module_cache_rss_guard_kib: MODULE_CACHE_RSS_GUARD_KIB,
2322 module_cache_max_bytes: MODULE_CACHE_MAX_BYTES,
2323 request_timeout_millis: REQUEST_TIMEOUT_MILLIS,
2324 mailbox_capacity: PROJECT_MAILBOX_CAPACITY,
2325 max_restarts_per_window: MAX_RESTARTS_PER_WINDOW,
2326 restart_window: RESTART_WINDOW,
2327 toolchain_advisories: Vec::new(),
2328 };
2329
2330 let err = bootstrap_hard_rss_unavailable(
2331 &config,
2332 "rss_hard_limit_exceeded operation=startup current_kib=128 limit_kib=64".to_owned(),
2333 Some(128),
2334 Some(64),
2335 );
2336
2337 let ServerError::WorkerUnavailable(info) = err else {
2338 panic!("expected WorkerUnavailable");
2339 };
2340 assert!(!info.retryable);
2341 assert_eq!(info.restart_cause.as_deref(), Some("rss_hard_limit_exceeded"));
2342 assert_eq!(info.rss_kib, Some(128));
2343 assert_eq!(info.limit_kib, Some(64));
2344 assert_eq!(
2345 info.runtime.last_restart.as_ref().map(|event| event.cause.as_str()),
2346 Some("rss_hard_limit_exceeded")
2347 );
2348 }
2349
2350 #[test]
2351 fn planned_restart_causes_do_not_consume_abnormal_restart_budget() {
2352 assert!(!RestartCause::RssImportSwitch.counts_toward_restart_limit());
2353 assert!(!RestartCause::RssPostJob.counts_toward_restart_limit());
2354 assert!(!RestartCause::MaxRequests.counts_toward_restart_limit());
2355 assert!(!RestartCause::MaxImports.counts_toward_restart_limit());
2356 assert!(!RestartCause::Idle.counts_toward_restart_limit());
2357
2358 assert!(RestartCause::ChildExit.counts_toward_restart_limit());
2359 assert!(RestartCause::ChildAbort.counts_toward_restart_limit());
2360 assert!(RestartCause::Timeout.counts_toward_restart_limit());
2361 assert!(RestartCause::Cancelled.counts_toward_restart_limit());
2362 assert!(RestartCause::SessionMissing.counts_toward_restart_limit());
2363 assert!(RestartCause::RssHardLimit.counts_toward_restart_limit());
2364 }
2365
2366 #[test]
2367 fn worker_restart_reason_maps_to_stable_cause() {
2368 assert_eq!(
2369 restart_cause_from_worker(&LeanWorkerRestartReason::MaxRequests { limit: 1 }).as_str(),
2370 "max_requests"
2371 );
2372 assert_eq!(
2373 restart_cause_from_worker(&LeanWorkerRestartReason::RssCeiling {
2374 current_kib: 2,
2375 limit_kib: 1,
2376 last_import_stats: None,
2377 })
2378 .as_str(),
2379 "rss_post_job"
2380 );
2381 assert_eq!(
2382 restart_cause_from_worker(&LeanWorkerRestartReason::RssHardLimit {
2383 operation: "test",
2384 current_kib: 2,
2385 limit_kib: 1,
2386 last_import_stats: None,
2387 })
2388 .as_str(),
2389 "rss_hard_limit_exceeded"
2390 );
2391 assert_eq!(
2392 restart_cause_from_worker(&LeanWorkerRestartReason::RequestTimeout {
2393 operation: "test",
2394 duration: Duration::from_millis(1),
2395 })
2396 .as_str(),
2397 "timeout"
2398 );
2399 }
2400}