1#![allow(let_underscore_drop, clippy::needless_pass_by_value)]
10
11use std::collections::{BTreeMap, VecDeque};
12use std::num::NonZeroUsize;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
16use std::thread;
17use std::time::{Duration, Instant};
18
19use lean_rs_worker_parent::{
20 LeanWorkerCapabilityBuilder, LeanWorkerChild, LeanWorkerDeclarationInspectionRequest,
21 LeanWorkerDeclarationInspectionResult, LeanWorkerDeclarationSearch, LeanWorkerDeclarationSearchResult,
22 LeanWorkerDeclarationVerificationBatchRequest, LeanWorkerDeclarationVerificationBatchResult,
23 LeanWorkerDeclarationVerificationRequest, LeanWorkerDeclarationVerificationResult, LeanWorkerElabOptions,
24 LeanWorkerError, LeanWorkerHostHandle, LeanWorkerHostHandleBuilder, LeanWorkerLifecycleSnapshot,
25 LeanWorkerModuleCacheLimits, LeanWorkerModuleQuery, LeanWorkerModuleQueryBatchOutcome,
26 LeanWorkerModuleQueryOutcome, LeanWorkerModuleQuerySelector, LeanWorkerOutputBudgets,
27 LeanWorkerProofAttemptRequest, LeanWorkerProofAttemptResult, LeanWorkerRestartPolicy, LeanWorkerRestartReason,
28};
29use lean_semantic_search_runtime::SemanticSearchRuntimeBuild;
30use parking_lot::Mutex;
31use tokio::sync::{mpsc, oneshot};
32
33use crate::admission::SemanticPermit;
34use crate::cache::ModuleQueryCache;
35use crate::config_file::RuntimeFileConfig;
36use crate::envelope::{Freshness, RuntimeFacts, RuntimeRestartEvent};
37use crate::error::{Result, ServerError, WorkerUnavailable, map_worker_err};
38use crate::lake_meta::LakeProjectMeta;
39use crate::semantic_search::{SemanticProofSearchRequest, SemanticProofSearchResult};
40use crate::toolchain::{Readiness, ToolchainId, WorkerBinary};
41
42const MODULE_QUERY_CACHE_CAPACITY: usize = 256;
44const WORKER_REQUEST_RESTARTS: u64 = 64;
45const PROJECT_MAILBOX_CAPACITY: usize = 8;
46const WORKER_RSS_POST_JOB_RESTART_KIB: u64 = 5 * 1024 * 1024;
47const WORKER_RSS_HARD_KILL_KIB: u64 = 16 * 1024 * 1024;
48const WORKER_RSS_SAMPLE_MILLIS: u64 = 250;
49const IMPORT_SWITCH_RSS_SOFT_KIB: u64 = 2 * 1024 * 1024;
50const MODULE_CACHE_RSS_GUARD_KIB: u64 = 2 * 1024 * 1024;
51const MODULE_CACHE_MAX_BYTES: u64 = 32 * 1024 * 1024;
52const REQUEST_TIMEOUT_MILLIS: u64 = 120 * 1000;
59const MAX_JOB_RETRIES: u32 = 1;
60const MAX_RESTARTS_PER_WINDOW: usize = 3;
61const RESTART_WINDOW: Duration = Duration::from_mins(1);
62
63#[derive(Debug, Clone, Eq, PartialEq)]
69pub struct ProjectRuntimeConfig {
70 worker_rss_post_job_restart_kib: u64,
71 worker_rss_hard_kill_kib: u64,
72 worker_rss_sample_millis: u64,
73 import_switch_rss_soft_kib: u64,
74 module_cache_rss_guard_kib: u64,
75 module_cache_max_bytes: u64,
76 request_timeout_millis: u64,
77 mailbox_capacity: usize,
78 max_restarts_per_window: usize,
79 restart_window: Duration,
80}
81
82impl Default for ProjectRuntimeConfig {
83 fn default() -> Self {
84 Self {
85 worker_rss_post_job_restart_kib: WORKER_RSS_POST_JOB_RESTART_KIB,
86 worker_rss_hard_kill_kib: WORKER_RSS_HARD_KILL_KIB,
87 worker_rss_sample_millis: WORKER_RSS_SAMPLE_MILLIS,
88 import_switch_rss_soft_kib: IMPORT_SWITCH_RSS_SOFT_KIB,
89 module_cache_rss_guard_kib: MODULE_CACHE_RSS_GUARD_KIB,
90 module_cache_max_bytes: MODULE_CACHE_MAX_BYTES,
91 request_timeout_millis: REQUEST_TIMEOUT_MILLIS,
92 mailbox_capacity: PROJECT_MAILBOX_CAPACITY,
93 max_restarts_per_window: MAX_RESTARTS_PER_WINDOW,
94 restart_window: RESTART_WINDOW,
95 }
96 }
97}
98
99impl ProjectRuntimeConfig {
100 pub fn from_env() -> Result<Self> {
107 Self::from_env_with_file(&RuntimeFileConfig::default())
108 }
109
110 pub fn from_env_with_file(file: &RuntimeFileConfig) -> Result<Self> {
119 parse_runtime_config(
120 RuntimeEnv {
121 worker_rss_post_job_restart_kib: runtime_env_var("LEAN_HOST_MCP_WORKER_RSS_POST_JOB_RESTART_KIB")?,
122 worker_rss_hard_kill_kib: runtime_env_var("LEAN_HOST_MCP_WORKER_RSS_HARD_KILL_KIB")?,
123 worker_rss_sample_millis: runtime_env_var("LEAN_HOST_MCP_WORKER_RSS_SAMPLE_MILLIS")?,
124 import_switch_rss_soft_kib: runtime_env_var("LEAN_HOST_MCP_IMPORT_SWITCH_RSS_SOFT_KIB")?,
125 module_cache_rss_guard_kib: runtime_env_var("LEAN_HOST_MCP_MODULE_CACHE_RSS_GUARD_KIB")?,
126 module_cache_max_bytes: runtime_env_var("LEAN_HOST_MCP_MODULE_CACHE_MAX_BYTES")?,
127 request_timeout_millis: runtime_env_var("LEAN_HOST_MCP_REQUEST_TIMEOUT_MILLIS")?,
128 project_mailbox_capacity: runtime_env_var("LEAN_HOST_MCP_PROJECT_MAILBOX_CAPACITY")?,
129 worker_restart_limit: runtime_env_var("LEAN_HOST_MCP_WORKER_RESTART_LIMIT")?,
130 worker_restart_window_secs: runtime_env_var("LEAN_HOST_MCP_WORKER_RESTART_WINDOW_SECS")?,
131 },
132 file,
133 )
134 }
135
136 #[must_use]
137 pub const fn worker_rss_post_job_restart_kib(&self) -> u64 {
138 self.worker_rss_post_job_restart_kib
139 }
140
141 #[must_use]
142 pub const fn worker_rss_hard_kill_kib(&self) -> u64 {
143 self.worker_rss_hard_kill_kib
144 }
145
146 #[must_use]
147 pub const fn worker_rss_sample_millis(&self) -> u64 {
148 self.worker_rss_sample_millis
149 }
150
151 #[must_use]
152 pub const fn import_switch_rss_soft_kib(&self) -> u64 {
153 self.import_switch_rss_soft_kib
154 }
155
156 #[must_use]
157 pub const fn module_cache_rss_guard_kib(&self) -> u64 {
158 self.module_cache_rss_guard_kib
159 }
160
161 #[must_use]
162 pub const fn module_cache_max_bytes(&self) -> u64 {
163 self.module_cache_max_bytes
164 }
165
166 #[must_use]
167 pub const fn request_timeout_millis(&self) -> u64 {
168 self.request_timeout_millis
169 }
170
171 #[must_use]
172 pub const fn mailbox_capacity(&self) -> usize {
173 self.mailbox_capacity
174 }
175
176 #[must_use]
177 pub const fn max_restarts_per_window(&self) -> usize {
178 self.max_restarts_per_window
179 }
180
181 #[must_use]
182 pub const fn restart_window(&self) -> Duration {
183 self.restart_window
184 }
185}
186
187#[derive(Debug, Default)]
188struct RuntimeEnv {
189 worker_rss_post_job_restart_kib: Option<String>,
190 worker_rss_hard_kill_kib: Option<String>,
191 worker_rss_sample_millis: Option<String>,
192 import_switch_rss_soft_kib: Option<String>,
193 module_cache_rss_guard_kib: Option<String>,
194 module_cache_max_bytes: Option<String>,
195 request_timeout_millis: Option<String>,
196 project_mailbox_capacity: Option<String>,
197 worker_restart_limit: Option<String>,
198 worker_restart_window_secs: Option<String>,
199}
200
201fn parse_runtime_config(env: RuntimeEnv, file: &RuntimeFileConfig) -> Result<ProjectRuntimeConfig> {
202 let defaults = ProjectRuntimeConfig::default();
203 let config = ProjectRuntimeConfig {
204 worker_rss_post_job_restart_kib: parse_nonzero_u64(
205 "LEAN_HOST_MCP_WORKER_RSS_POST_JOB_RESTART_KIB",
206 env.worker_rss_post_job_restart_kib.as_deref(),
207 file.worker_rss_post_job_restart_kib,
208 defaults.worker_rss_post_job_restart_kib,
209 )?,
210 worker_rss_hard_kill_kib: parse_nonzero_u64(
211 "LEAN_HOST_MCP_WORKER_RSS_HARD_KILL_KIB",
212 env.worker_rss_hard_kill_kib.as_deref(),
213 file.worker_rss_hard_kill_kib,
214 defaults.worker_rss_hard_kill_kib,
215 )?,
216 worker_rss_sample_millis: parse_nonzero_u64(
217 "LEAN_HOST_MCP_WORKER_RSS_SAMPLE_MILLIS",
218 env.worker_rss_sample_millis.as_deref(),
219 file.worker_rss_sample_millis,
220 defaults.worker_rss_sample_millis,
221 )?,
222 import_switch_rss_soft_kib: parse_nonzero_u64(
223 "LEAN_HOST_MCP_IMPORT_SWITCH_RSS_SOFT_KIB",
224 env.import_switch_rss_soft_kib.as_deref(),
225 file.import_switch_rss_soft_kib,
226 defaults.import_switch_rss_soft_kib,
227 )?,
228 module_cache_rss_guard_kib: parse_nonzero_u64(
229 "LEAN_HOST_MCP_MODULE_CACHE_RSS_GUARD_KIB",
230 env.module_cache_rss_guard_kib.as_deref(),
231 file.module_cache_rss_guard_kib,
232 defaults.module_cache_rss_guard_kib,
233 )?,
234 module_cache_max_bytes: parse_nonzero_u64(
235 "LEAN_HOST_MCP_MODULE_CACHE_MAX_BYTES",
236 env.module_cache_max_bytes.as_deref(),
237 file.module_cache_max_bytes,
238 defaults.module_cache_max_bytes,
239 )?,
240 request_timeout_millis: parse_nonzero_u64(
241 "LEAN_HOST_MCP_REQUEST_TIMEOUT_MILLIS",
242 env.request_timeout_millis.as_deref(),
243 file.request_timeout_millis,
244 defaults.request_timeout_millis,
245 )?,
246 mailbox_capacity: parse_nonzero_usize(
247 "LEAN_HOST_MCP_PROJECT_MAILBOX_CAPACITY",
248 env.project_mailbox_capacity.as_deref(),
249 file.project_mailbox_capacity,
250 defaults.mailbox_capacity,
251 )?,
252 max_restarts_per_window: parse_nonzero_usize(
253 "LEAN_HOST_MCP_WORKER_RESTART_LIMIT",
254 env.worker_restart_limit.as_deref(),
255 file.worker_restart_limit,
256 defaults.max_restarts_per_window,
257 )?,
258 restart_window: Duration::from_secs(parse_nonzero_u64(
259 "LEAN_HOST_MCP_WORKER_RESTART_WINDOW_SECS",
260 env.worker_restart_window_secs.as_deref(),
261 file.worker_restart_window_secs,
262 defaults.restart_window.as_secs(),
263 )?),
264 };
265 validate_rss_ordering(&config)?;
266 Ok(config)
267}
268
269fn validate_rss_ordering(config: &ProjectRuntimeConfig) -> Result<()> {
277 if config.import_switch_rss_soft_kib > config.worker_rss_post_job_restart_kib {
278 return Err(ServerError::Internal(format!(
279 "invalid RSS config: import_switch={} KiB exceeds post_job={} KiB \
280 (need import_switch <= post_job <= hard_kill)",
281 config.import_switch_rss_soft_kib, config.worker_rss_post_job_restart_kib,
282 )));
283 }
284 if config.worker_rss_post_job_restart_kib > config.worker_rss_hard_kill_kib {
285 return Err(ServerError::Internal(format!(
286 "invalid RSS config: post_job={} KiB exceeds hard_kill={} KiB \
287 (need import_switch <= post_job <= hard_kill)",
288 config.worker_rss_post_job_restart_kib, config.worker_rss_hard_kill_kib,
289 )));
290 }
291 Ok(())
292}
293
294fn runtime_env_var(name: &str) -> Result<Option<String>> {
295 match std::env::var(name) {
296 Ok(value) => Ok(Some(value)),
297 Err(std::env::VarError::NotPresent) => Ok(None),
298 Err(err @ std::env::VarError::NotUnicode(_)) => {
299 Err(ServerError::Internal(format!("{name} is not valid unicode: {err}")))
300 }
301 }
302}
303
304#[derive(Debug, Clone)]
306pub(crate) struct ProjectCall<T> {
307 value: T,
308 runtime: RuntimeFacts,
309}
310
311impl<T> ProjectCall<T> {
312 pub(crate) fn new(value: T, runtime: RuntimeFacts) -> Self {
313 Self { value, runtime }
314 }
315
316 pub(crate) fn into_parts(self) -> (T, RuntimeFacts) {
317 (self.value, self.runtime)
318 }
319}
320
321#[derive(Clone, Copy, Debug, Eq, PartialEq)]
322enum RetryPolicy {
323 RetryOnceReadOnly,
324}
325
326impl RetryPolicy {
327 fn retries(self) -> u32 {
328 match self {
329 Self::RetryOnceReadOnly => MAX_JOB_RETRIES,
330 }
331 }
332}
333
334struct ActiveJobGuard {
335 active_jobs: Arc<AtomicUsize>,
336}
337
338impl Drop for ActiveJobGuard {
339 fn drop(&mut self) {
340 self.active_jobs.fetch_sub(1, Ordering::AcqRel);
341 }
342}
343
344struct JobMeta {
345 imports: Vec<String>,
346 import_fingerprint: String,
347 _created_at: Instant,
348 queued_at: Instant,
349 admission_wait_millis: u64,
350 _correlation_id: uuid::Uuid,
351 retry_policy: RetryPolicy,
352 _active_job: ActiveJobGuard,
353 _semantic_permit: SemanticPermit,
354}
355
356enum ProjectMessage {
357 ModuleQuery {
358 meta: JobMeta,
359 source: String,
360 query: LeanWorkerModuleQuery,
361 options: LeanWorkerElabOptions,
362 reply: oneshot::Sender<Result<ProjectCall<LeanWorkerModuleQueryOutcome>>>,
363 },
364 ModuleQueryBatch {
365 meta: JobMeta,
366 source: String,
367 selectors: Vec<LeanWorkerModuleQuerySelector>,
368 budgets: LeanWorkerOutputBudgets,
369 options: LeanWorkerElabOptions,
370 reply: oneshot::Sender<Result<ProjectCall<LeanWorkerModuleQueryBatchOutcome>>>,
371 },
372 DeclarationInspection {
373 meta: JobMeta,
374 request: LeanWorkerDeclarationInspectionRequest,
375 reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationInspectionResult>>>,
376 },
377 DeclarationSearch {
378 meta: JobMeta,
379 request: LeanWorkerDeclarationSearch,
380 reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationSearchResult>>>,
381 },
382 ProofAttempt {
383 meta: JobMeta,
384 request: LeanWorkerProofAttemptRequest,
385 options: LeanWorkerElabOptions,
386 reply: oneshot::Sender<Result<ProjectCall<LeanWorkerProofAttemptResult>>>,
387 },
388 DeclarationVerification {
389 meta: JobMeta,
390 request: LeanWorkerDeclarationVerificationRequest,
391 options: LeanWorkerElabOptions,
392 reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationVerificationResult>>>,
393 },
394 DeclarationVerificationBatch {
395 meta: JobMeta,
396 request: LeanWorkerDeclarationVerificationBatchRequest,
397 options: LeanWorkerElabOptions,
398 reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationVerificationBatchResult>>>,
399 },
400 SemanticProofSearch {
401 meta: JobMeta,
402 request: SemanticProofSearchRequest,
403 reply: oneshot::Sender<Result<ProjectCall<SemanticProofSearchResult>>>,
404 },
405}
406
407impl ProjectMessage {
408 fn imports(&self) -> &[String] {
409 match self {
410 Self::ModuleQuery { meta, .. }
411 | Self::ModuleQueryBatch { meta, .. }
412 | Self::DeclarationInspection { meta, .. }
413 | Self::DeclarationSearch { meta, .. }
414 | Self::ProofAttempt { meta, .. }
415 | Self::DeclarationVerification { meta, .. }
416 | Self::DeclarationVerificationBatch { meta, .. }
417 | Self::SemanticProofSearch { meta, .. } => &meta.imports,
418 }
419 }
420
421 fn reject(self, state: &ProjectActorState, reason: &'static str) {
422 match self {
423 Self::ModuleQuery { meta, reply, .. } => {
424 let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
425 }
426 Self::ModuleQueryBatch { meta, reply, .. } => {
427 let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
428 }
429 Self::DeclarationInspection { meta, reply, .. } => {
430 let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
431 }
432 Self::DeclarationSearch { meta, reply, .. } => {
433 let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
434 }
435 Self::ProofAttempt { meta, reply, .. } => {
436 let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
437 }
438 Self::DeclarationVerification { meta, reply, .. } => {
439 let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
440 }
441 Self::DeclarationVerificationBatch { meta, reply, .. } => {
442 let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
443 }
444 Self::SemanticProofSearch { meta, reply, .. } => {
445 let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
446 }
447 }
448 }
449}
450
451#[derive(Clone, Copy, Debug, Eq, PartialEq)]
452enum RestartCause {
453 #[expect(dead_code, reason = "stable wire cause reserved for future non-RSS profile cycling")]
454 ImportProfileSwitch,
455 RssImportSwitch,
456 RssPostJob,
457 RssHardLimit,
458 MaxRequests,
459 MaxImports,
460 Idle,
461 Timeout,
462 Cancelled,
463 ChildExit,
464 ChildAbort,
465 SessionMissing,
466 Explicit,
467 WorkerInternal,
468}
469
470impl RestartCause {
471 const fn as_str(self) -> &'static str {
472 match self {
473 Self::ImportProfileSwitch => "import_profile_switch",
474 Self::RssImportSwitch => "rss_import_switch",
475 Self::RssPostJob => "rss_post_job",
476 Self::RssHardLimit => "rss_hard_limit_exceeded",
477 Self::MaxRequests => "max_requests",
478 Self::MaxImports => "max_imports",
479 Self::Idle => "idle",
480 Self::Timeout => "timeout",
481 Self::Cancelled => "cancelled",
482 Self::ChildExit => "child_exit",
483 Self::ChildAbort => "child_abort",
484 Self::SessionMissing => "session_missing",
485 Self::Explicit => "explicit",
486 Self::WorkerInternal => "worker_internal",
487 }
488 }
489
490 const fn counts_toward_restart_limit(self) -> bool {
491 matches!(
492 self,
493 Self::Timeout
494 | Self::Cancelled
495 | Self::ChildExit
496 | Self::ChildAbort
497 | Self::SessionMissing
498 | Self::RssHardLimit
499 )
500 }
501
502 const fn is_planned(self) -> bool {
503 !self.counts_toward_restart_limit()
504 }
505}
506
507fn restart_event(
508 cause: RestartCause,
509 reason: impl Into<String>,
510 worker_generation: u64,
511 rss_kib: Option<u64>,
512 limit_kib: Option<u64>,
513) -> RuntimeRestartEvent {
514 RuntimeRestartEvent {
515 cause: cause.as_str().to_owned(),
516 reason: reason.into(),
517 worker_generation,
518 planned: cause.is_planned(),
519 rss_kib,
520 limit_kib,
521 }
522}
523
524#[derive(Debug, Clone, Default)]
531struct RestartStats {
532 total: u64,
533 by_cause: BTreeMap<String, u64>,
534}
535
536impl RestartStats {
537 fn observe(&mut self, cause: &str) {
538 self.total = self.total.saturating_add(1);
539 let count = self.by_cause.entry(cause.to_owned()).or_default();
540 *count = count.saturating_add(1);
541 }
542}
543
544fn log_restart(event: &RuntimeRestartEvent, restarts_total: u64) {
548 macro_rules! emit {
549 ($level:ident, $msg:literal) => {
550 tracing::$level!(
551 cause = %event.cause,
552 reason = %event.reason,
553 worker_generation = event.worker_generation,
554 rss_kib = ?event.rss_kib,
555 limit_kib = ?event.limit_kib,
556 planned = event.planned,
557 restarts_total,
558 $msg
559 )
560 };
561 }
562 match event.cause.as_str() {
563 "rss_hard_limit_exceeded"
564 | "child_abort"
565 | "child_exit"
566 | "session_missing"
567 | "worker_internal"
568 | "timeout"
569 | "cancelled" => emit!(warn, "worker recycled (abnormal)"),
570 "rss_post_job" | "rss_import_switch" => emit!(info, "worker recycled (memory pressure)"),
571 _ => emit!(debug, "worker recycled (hygiene)"),
572 }
573}
574
575fn restart_cause_from_worker(reason: &LeanWorkerRestartReason) -> RestartCause {
576 match reason.stable_cause() {
577 "explicit" => RestartCause::Explicit,
578 "max_requests" => RestartCause::MaxRequests,
579 "max_imports" => RestartCause::MaxImports,
580 "rss_ceiling" => RestartCause::RssPostJob,
581 "rss_hard_limit" => RestartCause::RssHardLimit,
582 "idle" => RestartCause::Idle,
583 "cancelled" => RestartCause::Cancelled,
584 "timeout" => RestartCause::Timeout,
585 _ => RestartCause::WorkerInternal,
586 }
587}
588
589#[derive(Debug, Clone)]
590struct RuntimeSnapshot {
591 worker_generation: u64,
592 last_restart: Option<RuntimeRestartEvent>,
593 rss_kib: Option<u64>,
594 import_profile: Option<String>,
595 profile_switch_count: u64,
596 restarts_total: u64,
597 restarts_by_cause: BTreeMap<String, u64>,
598}
599
600impl RuntimeSnapshot {
601 fn facts(&self) -> RuntimeFacts {
602 RuntimeFacts {
603 worker_generation: self.worker_generation,
604 worker_restarted: false,
605 retry_count: 0,
606 admission_wait_millis: 0,
607 queue_wait_millis: 0,
608 call_restart: None,
609 last_restart: self.last_restart.clone(),
610 rss_kib: self.rss_kib,
611 worker_lanes: 1,
612 import_profile: self.import_profile.clone(),
613 profile_switch_count: self.profile_switch_count,
614 restarts_total: self.restarts_total,
615 restarts_by_cause: self.restarts_by_cause.clone(),
616 }
617 }
618}
619
620pub(crate) struct LeanProject {
623 canonical_root: PathBuf,
624 toolchain: String,
625 package: Option<String>,
626 library: Option<String>,
627 manifest_hash: String,
628 session_id: String,
629 open_warnings: Vec<String>,
633 actor_tx: Mutex<Option<mpsc::Sender<ProjectMessage>>>,
634 active_jobs: Arc<AtomicUsize>,
635 healthy: Arc<AtomicBool>,
636 runtime: Arc<Mutex<RuntimeSnapshot>>,
637 module_queries: ModuleQueryCache,
638}
639
640impl std::fmt::Debug for LeanProject {
641 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
642 f.debug_struct("LeanProject")
643 .field("canonical_root", &self.canonical_root)
644 .field("toolchain", &self.toolchain)
645 .field("package", &self.package)
646 .field("library", &self.library)
647 .field("manifest_hash", &self.manifest_hash)
648 .finish_non_exhaustive()
649 }
650}
651
652impl LeanProject {
653 pub(crate) fn open(meta: LakeProjectMeta, runtime_config: ProjectRuntimeConfig) -> Result<Arc<Self>> {
654 let session_id = uuid::Uuid::new_v4().to_string();
655 let runtime = Arc::new(Mutex::new(RuntimeSnapshot {
656 worker_generation: 1,
657 last_restart: None,
658 rss_kib: None,
659 import_profile: None,
660 profile_switch_count: 0,
661 restarts_total: 0,
662 restarts_by_cause: BTreeMap::new(),
663 }));
664 let active_jobs = Arc::new(AtomicUsize::new(0));
665 let healthy = Arc::new(AtomicBool::new(true));
666 let (config, open_warnings) = ActorConfig::from_meta(
667 &meta,
668 session_id.clone(),
669 Arc::clone(&runtime),
670 Arc::clone(&healthy),
671 runtime_config,
672 )?;
673 type InitMsg = std::result::Result<(String, mpsc::Sender<ProjectMessage>), ServerError>;
674 let (init_tx, init_rx) = std::sync::mpsc::channel::<InitMsg>();
675 let thread_name = actor_thread_name(&meta.canonical_root);
676
677 thread::Builder::new()
678 .name(thread_name)
679 .spawn(move || {
680 actor_main(config, init_tx);
681 })
682 .map_err(|e| ServerError::Internal(format!("spawn project actor thread: {e}")))?;
683
684 let (runtime_toolchain, actor_tx) = init_rx
685 .recv()
686 .map_err(|_| ServerError::Internal("project actor thread died during init".into()))??;
687
688 let cache_cap = NonZeroUsize::new(MODULE_QUERY_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
689 Ok(Arc::new(Self {
690 canonical_root: meta.canonical_root,
691 toolchain: runtime_toolchain,
692 package: meta.package,
693 library: meta.library,
694 manifest_hash: meta.manifest_hash,
695 session_id,
696 open_warnings,
697 actor_tx: Mutex::new(Some(actor_tx)),
698 active_jobs,
699 healthy,
700 runtime,
701 module_queries: ModuleQueryCache::with_capacity(cache_cap),
702 }))
703 }
704
705 pub(crate) async fn process_module_query(
712 &self,
713 imports: Vec<String>,
714 semantic_permit: SemanticPermit,
715 admission_wait_millis: u64,
716 source: String,
717 query: LeanWorkerModuleQuery,
718 options: LeanWorkerElabOptions,
719 ) -> Result<ProjectCall<LeanWorkerModuleQueryOutcome>> {
720 let (reply, rx) = oneshot::channel();
721 let message = ProjectMessage::ModuleQuery {
722 meta: self.job_meta(
723 imports,
724 RetryPolicy::RetryOnceReadOnly,
725 semantic_permit,
726 admission_wait_millis,
727 ),
728 source,
729 query,
730 options,
731 reply,
732 };
733 self.enqueue(message, rx).await
734 }
735
736 pub(crate) async fn process_module_query_batch(
743 &self,
744 imports: Vec<String>,
745 semantic_permit: SemanticPermit,
746 admission_wait_millis: u64,
747 source: String,
748 selectors: Vec<LeanWorkerModuleQuerySelector>,
749 budgets: LeanWorkerOutputBudgets,
750 options: LeanWorkerElabOptions,
751 ) -> Result<ProjectCall<LeanWorkerModuleQueryBatchOutcome>> {
752 let (reply, rx) = oneshot::channel();
753 let message = ProjectMessage::ModuleQueryBatch {
754 meta: self.job_meta(
755 imports,
756 RetryPolicy::RetryOnceReadOnly,
757 semantic_permit,
758 admission_wait_millis,
759 ),
760 source,
761 selectors,
762 budgets,
763 options,
764 reply,
765 };
766 self.enqueue(message, rx).await
767 }
768
769 pub(crate) async fn inspect_declaration(
776 &self,
777 imports: Vec<String>,
778 semantic_permit: SemanticPermit,
779 admission_wait_millis: u64,
780 request: LeanWorkerDeclarationInspectionRequest,
781 ) -> Result<ProjectCall<LeanWorkerDeclarationInspectionResult>> {
782 let (reply, rx) = oneshot::channel();
783 let message = ProjectMessage::DeclarationInspection {
784 meta: self.job_meta(
785 imports,
786 RetryPolicy::RetryOnceReadOnly,
787 semantic_permit,
788 admission_wait_millis,
789 ),
790 request,
791 reply,
792 };
793 self.enqueue(message, rx).await
794 }
795
796 pub(crate) async fn search_declarations(
803 &self,
804 imports: Vec<String>,
805 semantic_permit: SemanticPermit,
806 admission_wait_millis: u64,
807 request: LeanWorkerDeclarationSearch,
808 ) -> Result<ProjectCall<LeanWorkerDeclarationSearchResult>> {
809 let (reply, rx) = oneshot::channel();
810 let message = ProjectMessage::DeclarationSearch {
811 meta: self.job_meta(
812 imports,
813 RetryPolicy::RetryOnceReadOnly,
814 semantic_permit,
815 admission_wait_millis,
816 ),
817 request,
818 reply,
819 };
820 self.enqueue(message, rx).await
821 }
822
823 pub(crate) async fn semantic_proof_search(
830 &self,
831 imports: Vec<String>,
832 semantic_permit: SemanticPermit,
833 admission_wait_millis: u64,
834 request: SemanticProofSearchRequest,
835 ) -> Result<ProjectCall<SemanticProofSearchResult>> {
836 let (reply, rx) = oneshot::channel();
837 let message = ProjectMessage::SemanticProofSearch {
838 meta: self.job_meta(
839 imports,
840 RetryPolicy::RetryOnceReadOnly,
841 semantic_permit,
842 admission_wait_millis,
843 ),
844 request,
845 reply,
846 };
847 self.enqueue(message, rx).await
848 }
849
850 pub(crate) async fn attempt_proof(
857 &self,
858 imports: Vec<String>,
859 semantic_permit: SemanticPermit,
860 admission_wait_millis: u64,
861 request: LeanWorkerProofAttemptRequest,
862 options: LeanWorkerElabOptions,
863 ) -> Result<ProjectCall<LeanWorkerProofAttemptResult>> {
864 let (reply, rx) = oneshot::channel();
865 let message = ProjectMessage::ProofAttempt {
866 meta: self.job_meta(
867 imports,
868 RetryPolicy::RetryOnceReadOnly,
869 semantic_permit,
870 admission_wait_millis,
871 ),
872 request,
873 options,
874 reply,
875 };
876 self.enqueue(message, rx).await
877 }
878
879 pub(crate) async fn verify_declaration(
886 &self,
887 imports: Vec<String>,
888 semantic_permit: SemanticPermit,
889 admission_wait_millis: u64,
890 request: LeanWorkerDeclarationVerificationRequest,
891 options: LeanWorkerElabOptions,
892 ) -> Result<ProjectCall<LeanWorkerDeclarationVerificationResult>> {
893 let (reply, rx) = oneshot::channel();
894 let message = ProjectMessage::DeclarationVerification {
895 meta: self.job_meta(
896 imports,
897 RetryPolicy::RetryOnceReadOnly,
898 semantic_permit,
899 admission_wait_millis,
900 ),
901 request,
902 options,
903 reply,
904 };
905 self.enqueue(message, rx).await
906 }
907
908 pub(crate) async fn verify_declaration_batch(
916 &self,
917 imports: Vec<String>,
918 semantic_permit: SemanticPermit,
919 admission_wait_millis: u64,
920 request: LeanWorkerDeclarationVerificationBatchRequest,
921 options: LeanWorkerElabOptions,
922 ) -> Result<ProjectCall<LeanWorkerDeclarationVerificationBatchResult>> {
923 let (reply, rx) = oneshot::channel();
924 let message = ProjectMessage::DeclarationVerificationBatch {
925 meta: self.job_meta(
926 imports,
927 RetryPolicy::RetryOnceReadOnly,
928 semantic_permit,
929 admission_wait_millis,
930 ),
931 request,
932 options,
933 reply,
934 };
935 self.enqueue(message, rx).await
936 }
937
938 fn job_meta(
939 &self,
940 imports: Vec<String>,
941 retry_policy: RetryPolicy,
942 semantic_permit: SemanticPermit,
943 admission_wait_millis: u64,
944 ) -> JobMeta {
945 let created_at = Instant::now();
946 self.active_jobs.fetch_add(1, Ordering::AcqRel);
947 JobMeta {
948 import_fingerprint: import_fingerprint(&imports),
949 imports,
950 _created_at: created_at,
951 queued_at: Instant::now(),
952 admission_wait_millis,
953 _correlation_id: uuid::Uuid::new_v4(),
954 retry_policy,
955 _active_job: ActiveJobGuard {
956 active_jobs: Arc::clone(&self.active_jobs),
957 },
958 _semantic_permit: semantic_permit,
959 }
960 }
961
962 async fn enqueue<T>(
963 &self,
964 message: ProjectMessage,
965 reply_rx: oneshot::Receiver<Result<ProjectCall<T>>>,
966 ) -> Result<ProjectCall<T>>
967 where
968 T: Send + 'static,
969 {
970 let project_info = self.worker_error_context(message.imports());
971 let tx = self
972 .actor_tx
973 .lock()
974 .as_ref()
975 .cloned()
976 .ok_or_else(|| self.unavailable("project actor is stopped", false, false))?;
977 match tx.try_send(message) {
978 Ok(()) => {}
979 Err(mpsc::error::TrySendError::Full(_)) => {
980 return Err(ServerError::worker_unavailable(WorkerUnavailable {
981 retryable: true,
982 worker_restarted: false,
983 reason: "mailbox_full".to_owned(),
984 ..project_info
985 }));
986 }
987 Err(mpsc::error::TrySendError::Closed(_)) => {
988 self.shutdown();
989 return Err(ServerError::worker_unavailable(WorkerUnavailable {
990 retryable: true,
991 worker_restarted: false,
992 reason: "mailbox_closed".to_owned(),
993 ..project_info
994 }));
995 }
996 }
997
998 match reply_rx.await {
999 Ok(result) => result,
1000 Err(_) => {
1001 self.shutdown();
1002 Err(self.unavailable("mailbox_closed_before_reply", true, false))
1003 }
1004 }
1005 }
1006
1007 pub(crate) fn manifest_hash(&self) -> &str {
1008 &self.manifest_hash
1009 }
1010
1011 pub(crate) fn toolchain(&self) -> &str {
1012 &self.toolchain
1013 }
1014
1015 pub(crate) fn canonical_root(&self) -> &Path {
1016 &self.canonical_root
1017 }
1018
1019 pub(crate) fn module_query_cache(&self) -> &ModuleQueryCache {
1020 &self.module_queries
1021 }
1022
1023 #[must_use]
1024 pub(crate) fn freshness(&self, request_imports: &[String]) -> Freshness {
1025 Freshness {
1026 project_root: self.canonical_root.to_string_lossy().into_owned(),
1027 project_hash: self.manifest_hash.clone(),
1028 imports: request_imports.to_vec(),
1029 session_id: self.session_id.clone(),
1030 lean_toolchain: self.toolchain.clone(),
1031 toolchain_advisories: self.open_warnings.clone(),
1032 }
1033 }
1034
1035 #[must_use]
1036 pub(crate) fn runtime_facts(&self) -> RuntimeFacts {
1037 self.runtime.lock().facts()
1038 }
1039
1040 pub(crate) fn shutdown(&self) {
1041 self.healthy.store(false, Ordering::Release);
1042 let _ = self.actor_tx.lock().take();
1043 }
1044
1045 pub(crate) fn is_healthy(&self) -> bool {
1046 self.healthy.load(Ordering::Acquire) && self.actor_tx.lock().as_ref().is_some_and(|tx| !tx.is_closed())
1047 }
1048
1049 pub(crate) fn is_idle(&self) -> bool {
1050 self.active_jobs.load(Ordering::Acquire) == 0
1051 }
1052
1053 fn unavailable(&self, reason: impl Into<String>, retryable: bool, worker_restarted: bool) -> ServerError {
1054 ServerError::worker_unavailable(WorkerUnavailable {
1055 retryable,
1056 worker_restarted,
1057 reason: reason.into(),
1058 ..self.worker_error_context(&[])
1059 })
1060 }
1061
1062 fn worker_error_context(&self, imports: &[String]) -> WorkerUnavailable {
1063 let snapshot = self.runtime.lock().clone();
1064 let runtime = snapshot.facts();
1065 WorkerUnavailable {
1066 retryable: true,
1067 worker_restarted: false,
1068 project_root: self.canonical_root.to_string_lossy().into_owned(),
1069 project_hash: self.manifest_hash.clone(),
1070 imports: imports.to_vec(),
1071 session_id: self.session_id.clone(),
1072 lean_toolchain: self.toolchain.clone(),
1073 worker_generation: snapshot.worker_generation,
1074 reason: String::new(),
1075 restart_cause: snapshot.last_restart.as_ref().map(|event| event.cause.clone()),
1076 rss_kib: snapshot.rss_kib,
1077 limit_kib: None,
1078 retry_after_millis: None,
1079 restarts_in_window: None,
1080 window_millis: None,
1081 runtime,
1082 toolchain_advisories: self.open_warnings.clone(),
1083 }
1084 }
1085}
1086
1087impl Drop for LeanProject {
1088 fn drop(&mut self) {
1089 self.shutdown();
1090 }
1091}
1092
1093#[derive(Clone)]
1094struct ActorConfig {
1095 lake_root: PathBuf,
1096 manifest_hash: String,
1097 toolchain_label: String,
1098 worker_path: PathBuf,
1099 lean_sysroot: PathBuf,
1100 session_id: String,
1101 runtime: Arc<Mutex<RuntimeSnapshot>>,
1102 healthy: Arc<AtomicBool>,
1103 worker_rss_post_job_restart_kib: u64,
1104 worker_rss_hard_kill_kib: u64,
1105 worker_rss_sample_millis: u64,
1106 import_switch_rss_soft_kib: u64,
1107 module_cache_rss_guard_kib: u64,
1108 module_cache_max_bytes: u64,
1109 request_timeout_millis: u64,
1110 mailbox_capacity: usize,
1111 max_restarts_per_window: usize,
1112 restart_window: Duration,
1113 toolchain_advisories: Vec<String>,
1119}
1120
1121impl ActorConfig {
1122 fn from_meta(
1129 meta: &LakeProjectMeta,
1130 session_id: String,
1131 runtime: Arc<Mutex<RuntimeSnapshot>>,
1132 healthy: Arc<AtomicBool>,
1133 runtime_config: ProjectRuntimeConfig,
1134 ) -> Result<(Self, Vec<String>)> {
1135 let toolchain_id = ToolchainId::parse(&meta.toolchain).map_err(|e| ServerError::BadProject(e.to_string()))?;
1136 let (worker_path, lean_sysroot, open_warnings) = match WorkerBinary::resolve_ready_for(&toolchain_id) {
1137 Readiness::Ready {
1138 worker,
1139 lean_sysroot,
1140 note,
1141 } => (worker.path, lean_sysroot, note.into_iter().collect()),
1142 Readiness::UnknownPin {
1143 pin,
1144 worker,
1145 lean_sysroot,
1146 } => (
1147 worker.path,
1148 lean_sysroot,
1149 vec![format!(
1150 "lean-toolchain pins {pin}, which is not a recognized lean-rs supported version \
1151 (e.g. a nightly); proceeding, but the host cannot vouch for ABI compatibility"
1152 )],
1153 ),
1154 Readiness::Unsupported { window, nearest } => {
1155 return Err(ServerError::BadProject(format!(
1156 "lean-toolchain pins {toolchain_id}, outside the lean-rs supported window {window}; \
1157 nearest supported: {nearest}. Pin a supported toolchain (or bump lean-rs) and reopen."
1158 )));
1159 }
1160 Readiness::Stale { toolchain, install_cmd } => {
1161 return Err(ServerError::BadProject(format!(
1162 "worker for {toolchain} was built against a different lean.h than the toolchain now \
1163 provides (header drift); rebuild it: {install_cmd}"
1164 )));
1165 }
1166 Readiness::Unusable {
1167 toolchain,
1168 detail,
1169 install_cmd,
1170 } => {
1171 return Err(ServerError::BadProject(format!(
1172 "worker for {toolchain} failed its runtime smoke test ({detail}); the toolchain's \
1173 libleanshared is ABI-incompatible with this lean-rs build and cannot be served. \
1174 Pin a supported toolchain the host can run, or rebuild lean-rs and reinstall: {install_cmd}"
1175 )));
1176 }
1177 Readiness::NotInstalled { toolchain, install_cmd } => {
1178 return Err(ServerError::BadProject(format!(
1179 "no worker binary for toolchain {toolchain}; run: {install_cmd}"
1180 )));
1181 }
1182 Readiness::ToolchainNotInstalled { toolchain, elan_dir } => {
1183 return Err(ServerError::BadProject(format!(
1184 "elan toolchain {toolchain} is not installed (expected {})",
1185 elan_dir.display()
1186 )));
1187 }
1188 };
1189 tracing::debug!(
1190 toolchain = %toolchain_id,
1191 worker = %worker_path.display(),
1192 sysroot = %lean_sysroot.display(),
1193 "resolved ready worker binary"
1194 );
1195 let config = Self {
1196 lake_root: meta.canonical_root.clone(),
1197 manifest_hash: meta.manifest_hash.clone(),
1198 toolchain_label: meta.toolchain.clone(),
1199 worker_path,
1200 lean_sysroot,
1201 session_id,
1202 runtime,
1203 healthy,
1204 worker_rss_post_job_restart_kib: runtime_config.worker_rss_post_job_restart_kib(),
1205 worker_rss_hard_kill_kib: runtime_config.worker_rss_hard_kill_kib(),
1206 worker_rss_sample_millis: runtime_config.worker_rss_sample_millis(),
1207 import_switch_rss_soft_kib: runtime_config.import_switch_rss_soft_kib(),
1208 module_cache_rss_guard_kib: runtime_config.module_cache_rss_guard_kib(),
1209 module_cache_max_bytes: runtime_config.module_cache_max_bytes(),
1210 request_timeout_millis: runtime_config.request_timeout_millis(),
1211 mailbox_capacity: runtime_config.mailbox_capacity(),
1212 max_restarts_per_window: runtime_config.max_restarts_per_window(),
1213 restart_window: runtime_config.restart_window(),
1214 toolchain_advisories: open_warnings.clone(),
1215 };
1216 Ok((config, open_warnings))
1217 }
1218}
1219
1220struct ProjectActorState {
1221 config: ActorConfig,
1222 handle: LeanWorkerHostHandle,
1223 worker_generation_base: u64,
1224 last_restart: Option<RuntimeRestartEvent>,
1225 last_import_fingerprint: Option<String>,
1226 profile_switch_count: u64,
1227 last_rss_kib: Option<u64>,
1228 runtime: Arc<Mutex<RuntimeSnapshot>>,
1229 abnormal_restart_times: VecDeque<Instant>,
1230 restart_stats: RestartStats,
1231}
1232
1233impl ProjectActorState {
1234 fn handle_message(&mut self, message: ProjectMessage) {
1235 match message {
1236 ProjectMessage::ModuleQuery {
1237 meta,
1238 source,
1239 query,
1240 options,
1241 reply,
1242 } => {
1243 let result = self.run_job(meta, |handle, imports| {
1244 handle.process_module_query_with_imports(imports, &source, &query, &options, None, None)
1245 });
1246 let _ = reply.send(result);
1247 }
1248 ProjectMessage::ModuleQueryBatch {
1249 meta,
1250 source,
1251 selectors,
1252 budgets,
1253 options,
1254 reply,
1255 } => {
1256 let result = self.run_job(meta, |handle, imports| {
1257 handle.process_module_query_batch_with_imports(
1258 imports, &source, &selectors, &budgets, &options, None, None,
1259 )
1260 });
1261 let _ = reply.send(result);
1262 }
1263 ProjectMessage::DeclarationInspection { meta, request, reply } => {
1264 let result = self.run_job(meta, |handle, imports| {
1265 handle.inspect_declaration_with_imports(imports, &request, None, None)
1266 });
1267 let _ = reply.send(result);
1268 }
1269 ProjectMessage::DeclarationSearch { meta, request, reply } => {
1270 let result = self.run_job(meta, |handle, imports| {
1271 handle.search_declarations_with_imports(imports, &request, None, None)
1272 });
1273 let _ = reply.send(result);
1274 }
1275 ProjectMessage::ProofAttempt {
1276 meta,
1277 request,
1278 options,
1279 reply,
1280 } => {
1281 let result = self.run_job(meta, |handle, imports| {
1282 handle.attempt_proof_with_imports(imports, &request, &options, None, None)
1283 });
1284 let _ = reply.send(result);
1285 }
1286 ProjectMessage::DeclarationVerification {
1287 meta,
1288 request,
1289 options,
1290 reply,
1291 } => {
1292 let result = self.run_job(meta, |handle, imports| {
1293 handle.verify_declaration_with_imports(imports, &request, &options, None, None)
1294 });
1295 let _ = reply.send(result);
1296 }
1297 ProjectMessage::DeclarationVerificationBatch {
1298 meta,
1299 request,
1300 options,
1301 reply,
1302 } => {
1303 let result = self.run_job(meta, |handle, imports| {
1304 handle.verify_declaration_batch_with_imports(imports, &request, &options, None, None)
1305 });
1306 let _ = reply.send(result);
1307 }
1308 ProjectMessage::SemanticProofSearch { meta, request, reply } => {
1309 let result = self.run_semantic_job(meta, &request);
1310 let _ = reply.send(result);
1311 }
1312 }
1313 }
1314
1315 fn run_job<R>(
1316 &mut self,
1317 meta: JobMeta,
1318 job: impl Fn(&mut LeanWorkerHostHandle, Vec<String>) -> std::result::Result<R, LeanWorkerError>,
1319 ) -> Result<ProjectCall<R>> {
1320 let _span = tracing::debug_span!(
1323 "job",
1324 session_id = %self.config.session_id,
1325 imports = meta.imports.len(),
1326 queue_wait_millis = millis_u64(meta.queued_at.elapsed()),
1327 )
1328 .entered();
1329 let queue_wait_millis = millis_u64(meta.queued_at.elapsed());
1330 let generation_before = self.observed_generation();
1331 let mut call_restart = self.cycle_before_import_switch_if_needed(&meta)?;
1332 let mut lifecycle_baseline = self.handle.lifecycle_snapshot();
1333
1334 let max_retries = meta.retry_policy.retries();
1335 let mut retry_count = 0_u32;
1336 loop {
1337 match job(&mut self.handle, meta.imports.clone()) {
1338 Ok(value) => {
1339 if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1340 call_restart = Some(event);
1341 }
1342 self.last_import_fingerprint = Some(meta.import_fingerprint.clone());
1343 self.last_rss_kib = self.handle.rss_kib().or(self.last_rss_kib);
1344 if let Some(event) = self.cycle_after_post_job_rss_if_needed(&meta)? {
1345 call_restart = Some(event);
1346 }
1347 let runtime =
1348 self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1349 tracing::debug!(
1350 retry_count,
1351 rss_kib = ?runtime.rss_kib,
1352 worker_generation = runtime.worker_generation,
1353 "job complete"
1354 );
1355 self.publish_runtime(&runtime);
1356 return Ok(ProjectCall::new(value, runtime));
1357 }
1358 Err(err) if worker_error_is_recoverable_death(&err) && retry_count < max_retries => {
1359 self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)?;
1360 let first_reason = err.to_string();
1361 call_restart =
1362 Some(self.rebuild_after_worker_death(first_reason, worker_death_cause(&err), &meta)?);
1363 lifecycle_baseline = self.handle.lifecycle_snapshot();
1364 retry_count = retry_count.saturating_add(1);
1365 }
1366 Err(err) if worker_error_is_recoverable_death(&err) => {
1367 if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1368 call_restart = Some(event);
1369 }
1370 let reason = format!("worker_died_after_retry: {err}");
1371 let generation = self.observed_generation();
1372 let runtime =
1373 self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1374 self.publish_runtime(&runtime);
1375 return Err(self.worker_unavailable_for(
1376 &meta,
1377 reason,
1378 true,
1379 generation > generation_before,
1380 Some(worker_death_cause(&err)),
1381 None,
1382 None,
1383 ));
1384 }
1385 Err(err) if worker_error_is_session_missing(&err) && retry_count < max_retries => {
1386 self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)?;
1387 call_restart = Some(self.rebuild_after_worker_death(
1388 format!("session_missing: {err}"),
1389 RestartCause::SessionMissing,
1390 &meta,
1391 )?);
1392 lifecycle_baseline = self.handle.lifecycle_snapshot();
1393 retry_count = retry_count.saturating_add(1);
1394 }
1395 Err(err) if worker_error_is_session_missing(&err) => {
1396 if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1397 call_restart = Some(event);
1398 }
1399 let generation = self.observed_generation();
1400 let runtime =
1401 self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1402 self.publish_runtime(&runtime);
1403 return Err(self.worker_unavailable_for(
1404 &meta,
1405 format!("session_missing: {err}"),
1406 true,
1407 generation > generation_before,
1408 Some(RestartCause::SessionMissing),
1409 None,
1410 None,
1411 ));
1412 }
1413 Err(LeanWorkerError::RssHardLimitExceeded {
1414 operation,
1415 current_kib,
1416 limit_kib,
1417 ..
1418 }) => {
1419 if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1420 call_restart = Some(event);
1421 }
1422 let runtime =
1423 self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1424 self.publish_runtime(&runtime);
1425 return Err(self.worker_unavailable_for(
1426 &meta,
1427 format!(
1428 "rss_hard_limit_exceeded operation={operation} current_kib={current_kib} limit_kib={limit_kib}"
1429 ),
1430 false,
1431 true,
1432 Some(RestartCause::RssHardLimit),
1433 Some(limit_kib),
1434 None,
1435 ));
1436 }
1437 Err(err) if matches!(err, LeanWorkerError::Timeout { .. }) => {
1438 if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1439 call_restart = Some(event);
1440 }
1441 let generation = self.observed_generation();
1442 let runtime =
1443 self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1444 self.publish_runtime(&runtime);
1445 return Err(self.worker_unavailable_for(
1446 &meta,
1447 format!("timeout: {err}"),
1448 true,
1449 generation > generation_before,
1450 Some(RestartCause::Timeout),
1451 None,
1452 None,
1453 ));
1454 }
1455 Err(err) => {
1456 self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)?;
1457 self.last_import_fingerprint = Some(meta.import_fingerprint.clone());
1458 return Err(map_worker_err(err));
1459 }
1460 }
1461 }
1462 }
1463
1464 fn run_semantic_job(
1465 &self,
1466 meta: JobMeta,
1467 request: &SemanticProofSearchRequest,
1468 ) -> Result<ProjectCall<SemanticProofSearchResult>> {
1469 let _span = tracing::debug_span!(
1470 "semantic_job",
1471 session_id = %self.config.session_id,
1472 imports = meta.imports.len(),
1473 queue_wait_millis = millis_u64(meta.queued_at.elapsed()),
1474 )
1475 .entered();
1476 let queue_wait_millis = millis_u64(meta.queued_at.elapsed());
1477 let generation_before = self.observed_generation();
1478 let mut capability = self.open_semantic_capability(&meta)?;
1479 let result = {
1480 let mut session = capability
1481 .open_session_with_imports(meta.imports.clone(), None, None)
1482 .map_err(map_worker_err)?;
1483 crate::semantic_search::run_semantic_proof_search(&mut session, request)
1484 };
1485 let runtime = self.runtime_facts(&meta, generation_before, 0, queue_wait_millis, None);
1486 self.publish_runtime(&runtime);
1487 result.map(|value| ProjectCall::new(value, runtime))
1488 }
1489
1490 fn open_semantic_capability(&self, meta: &JobMeta) -> Result<lean_rs_worker_parent::LeanWorkerCapability> {
1491 let runtime = lean_semantic_search_runtime::build_cached(SemanticSearchRuntimeBuild {
1492 cache_root: semantic_runtime_cache_root()?,
1493 toolchain_label: self.config.toolchain_label.clone(),
1494 lean_sysroot: self.config.lean_sysroot.clone(),
1495 })
1496 .map_err(|err| {
1497 self.worker_unavailable_for(
1498 meta,
1499 format!("semantic runtime build failed for this toolchain: {err}"),
1500 true,
1501 false,
1502 None,
1503 None,
1504 None,
1505 )
1506 })?;
1507 semantic_capability_builder(&self.config, &runtime.built)?
1508 .open()
1509 .map_err(|err| {
1510 self.worker_unavailable_for(
1511 meta,
1512 format!(
1513 "semantic capability open failed for this toolchain: {}",
1514 map_worker_err(err)
1515 ),
1516 true,
1517 false,
1518 None,
1519 None,
1520 None,
1521 )
1522 })
1523 }
1524
1525 fn cycle_before_import_switch_if_needed(&mut self, meta: &JobMeta) -> Result<Option<RuntimeRestartEvent>> {
1526 let Some(previous_import_fingerprint) = self.last_import_fingerprint.as_deref() else {
1527 return Ok(None);
1528 };
1529 if previous_import_fingerprint == meta.import_fingerprint {
1530 return Ok(None);
1531 }
1532 self.profile_switch_count = self.profile_switch_count.saturating_add(1);
1533 let Some(current_kib) = self.handle.rss_kib() else {
1534 return Ok(None);
1535 };
1536 self.last_rss_kib = Some(current_kib);
1537 if current_kib < self.config.import_switch_rss_soft_kib {
1538 return Ok(None);
1539 }
1540 let limit_kib = self.config.import_switch_rss_soft_kib;
1541 let reason = format!("rss_import_switch current_kib={current_kib} limit_kib={limit_kib}");
1542 self.record_restart_or_stop(RestartCause::RssImportSwitch, &reason)
1543 .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1544 self.handle.cycle().map_err(map_worker_err)?;
1545 self.last_rss_kib = self.handle.rss_kib().or(Some(current_kib));
1546 let event = restart_event(
1547 RestartCause::RssImportSwitch,
1548 reason,
1549 self.observed_generation(),
1550 Some(current_kib),
1551 Some(limit_kib),
1552 );
1553 self.record_restart(event.clone());
1554 Ok(Some(event))
1555 }
1556
1557 fn cycle_after_post_job_rss_if_needed(&mut self, meta: &JobMeta) -> Result<Option<RuntimeRestartEvent>> {
1558 let Some(current_kib) = self.handle.rss_kib() else {
1559 return Ok(None);
1560 };
1561 self.last_rss_kib = Some(current_kib);
1562 let limit_kib = self.config.worker_rss_post_job_restart_kib;
1563 tracing::debug!(rss_kib = current_kib, limit_kib, "post-job rss check");
1564 if current_kib < limit_kib {
1565 return Ok(None);
1566 }
1567 let reason = format!("rss_post_job current_kib={current_kib} limit_kib={limit_kib}");
1568 self.record_restart_or_stop(RestartCause::RssPostJob, &reason)
1569 .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1570 self.handle.cycle().map_err(map_worker_err)?;
1571 self.last_rss_kib = self.handle.rss_kib().or(Some(current_kib));
1572 let event = restart_event(
1573 RestartCause::RssPostJob,
1574 reason,
1575 self.observed_generation(),
1576 Some(current_kib),
1577 Some(limit_kib),
1578 );
1579 self.record_restart(event.clone());
1580 Ok(Some(event))
1581 }
1582
1583 fn rebuild_after_worker_death(
1584 &mut self,
1585 reason: String,
1586 cause: RestartCause,
1587 meta: &JobMeta,
1588 ) -> Result<RuntimeRestartEvent> {
1589 self.record_restart_or_stop(cause, &reason)
1590 .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1591 let next_generation = self.observed_generation().saturating_add(1);
1592 let (handle, _) = open_worker(&self.config, false)?;
1593 self.handle = handle;
1594 self.worker_generation_base = next_generation;
1595 self.last_rss_kib = self.handle.rss_kib().or(self.last_rss_kib);
1596 let event = restart_event(cause, reason, self.observed_generation(), self.last_rss_kib, None);
1597 self.record_restart(event.clone());
1598 Ok(event)
1599 }
1600
1601 fn account_lifecycle_restarts_since(
1602 &mut self,
1603 before: &LeanWorkerLifecycleSnapshot,
1604 meta: &JobMeta,
1605 ) -> Result<Option<RuntimeRestartEvent>> {
1606 let after = self.handle.lifecycle_snapshot();
1607 let restarted = after.restarts.saturating_sub(before.restarts);
1608 if restarted == 0 {
1609 self.last_rss_kib = after.last_rss_kib.or(self.last_rss_kib);
1610 return Ok(None);
1611 }
1612 let (cause, reason) = after.last_restart_reason.as_ref().map_or_else(
1613 || (RestartCause::WorkerInternal, "worker_internal_restart".to_owned()),
1614 |reason| (restart_cause_from_worker(reason), restart_reason_text(reason)),
1615 );
1616 for _ in 0..restarted {
1617 self.record_restart_or_stop(cause, &reason)
1618 .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1619 }
1620 self.last_rss_kib = after.last_rss_kib.or(self.last_rss_kib);
1621 let event = restart_event(cause, reason, self.observed_generation(), self.last_rss_kib, None);
1622 self.record_restart(event.clone());
1623 Ok(Some(event))
1624 }
1625
1626 fn record_restart(&mut self, event: RuntimeRestartEvent) {
1632 self.restart_stats.observe(&event.cause);
1633 log_restart(&event, self.restart_stats.total);
1634 self.last_restart = Some(event);
1635 }
1636
1637 fn record_restart_or_stop(
1638 &mut self,
1639 cause: RestartCause,
1640 reason: &str,
1641 ) -> std::result::Result<(), RestartLimitExceeded> {
1642 if !cause.counts_toward_restart_limit() {
1643 return Ok(());
1644 }
1645 let now = Instant::now();
1646 while self
1647 .abnormal_restart_times
1648 .front()
1649 .is_some_and(|seen| now.saturating_duration_since(*seen) > self.config.restart_window)
1650 {
1651 self.abnormal_restart_times.pop_front();
1652 }
1653 if self.abnormal_restart_times.len() >= self.config.max_restarts_per_window {
1654 self.config.healthy.store(false, Ordering::Release);
1655 tracing::warn!(
1656 cause = cause.as_str(),
1657 restarts_in_window = self.abnormal_restart_times.len(),
1658 window_millis = millis_u64(self.config.restart_window),
1659 "restart limit exceeded; marking project unhealthy"
1660 );
1661 let message = format!(
1662 "restart_limit_exceeded after {} restarts in {:?}; latest: {reason}",
1663 self.config.max_restarts_per_window, self.config.restart_window
1664 );
1665 let event = restart_event(
1666 cause,
1667 message.clone(),
1668 self.observed_generation(),
1669 self.last_rss_kib,
1670 None,
1671 );
1672 self.record_restart(event.clone());
1673 self.publish_runtime(&RuntimeFacts {
1674 worker_generation: self.observed_generation(),
1675 worker_restarted: false,
1676 retry_count: MAX_JOB_RETRIES,
1677 admission_wait_millis: 0,
1678 queue_wait_millis: 0,
1679 call_restart: None,
1680 last_restart: Some(event),
1681 rss_kib: self.last_rss_kib,
1682 worker_lanes: 1,
1683 import_profile: self.last_import_fingerprint.clone(),
1684 profile_switch_count: self.profile_switch_count,
1685 restarts_total: self.restart_stats.total,
1686 restarts_by_cause: self.restart_stats.by_cause.clone(),
1687 });
1688 return Err(RestartLimitExceeded {
1689 message,
1690 cause,
1691 restarts_in_window: self.abnormal_restart_times.len() as u64,
1692 window_millis: millis_u64(self.config.restart_window),
1693 });
1694 }
1695 self.abnormal_restart_times.push_back(now);
1696 Ok(())
1697 }
1698
1699 fn observed_generation(&self) -> u64 {
1700 self.worker_generation_base
1701 .saturating_add(self.handle.lifecycle_snapshot().worker_generation)
1702 }
1703
1704 fn runtime_facts(
1705 &self,
1706 meta: &JobMeta,
1707 generation_before: u64,
1708 retry_count: u32,
1709 queue_wait_millis: u64,
1710 call_restart: Option<RuntimeRestartEvent>,
1711 ) -> RuntimeFacts {
1712 let generation = self.observed_generation();
1713 let snapshot = self.handle.lifecycle_snapshot();
1714 RuntimeFacts {
1715 worker_generation: generation,
1716 worker_restarted: call_restart.is_some() || generation > generation_before,
1717 retry_count,
1718 admission_wait_millis: meta.admission_wait_millis,
1719 queue_wait_millis,
1720 call_restart,
1721 last_restart: self.last_restart.clone(),
1722 rss_kib: snapshot.last_rss_kib.or(self.last_rss_kib),
1723 worker_lanes: 1,
1724 import_profile: Some(meta.import_fingerprint.clone()),
1725 profile_switch_count: self.profile_switch_count,
1726 restarts_total: self.restart_stats.total,
1727 restarts_by_cause: self.restart_stats.by_cause.clone(),
1728 }
1729 }
1730
1731 fn publish_runtime(&self, runtime: &RuntimeFacts) {
1732 *self.runtime.lock() = RuntimeSnapshot {
1733 worker_generation: runtime.worker_generation,
1734 last_restart: runtime.last_restart.clone().or_else(|| runtime.call_restart.clone()),
1735 rss_kib: runtime.rss_kib,
1736 import_profile: runtime.import_profile.clone(),
1737 profile_switch_count: runtime.profile_switch_count,
1738 restarts_total: runtime.restarts_total,
1739 restarts_by_cause: runtime.restarts_by_cause.clone(),
1740 };
1741 }
1742
1743 fn worker_unavailable_for(
1744 &self,
1745 meta: &JobMeta,
1746 reason: String,
1747 retryable: bool,
1748 worker_restarted: bool,
1749 cause: Option<RestartCause>,
1750 limit_kib: Option<u64>,
1751 retry_after_millis: Option<u64>,
1752 ) -> ServerError {
1753 let snapshot = self.runtime.lock().facts();
1754 ServerError::worker_unavailable(WorkerUnavailable {
1755 retryable,
1756 worker_restarted,
1757 project_root: self.config.lake_root.to_string_lossy().into_owned(),
1758 project_hash: self.config.manifest_hash.clone(),
1759 imports: meta.imports.clone(),
1760 session_id: self.config.session_id.clone(),
1761 lean_toolchain: self.config.toolchain_label.clone(),
1762 worker_generation: self.observed_generation(),
1763 restart_cause: cause.map(|cause| cause.as_str().to_owned()),
1764 rss_kib: self.last_rss_kib,
1765 limit_kib,
1766 retry_after_millis,
1767 restarts_in_window: Some(self.abnormal_restart_times.len() as u64),
1768 window_millis: Some(millis_u64(self.config.restart_window)),
1769 runtime: snapshot,
1770 reason,
1771 toolchain_advisories: self.config.toolchain_advisories.clone(),
1772 })
1773 }
1774
1775 fn shutdown_unavailable(&self, meta: &JobMeta, reason: &'static str) -> ServerError {
1776 self.worker_unavailable_for(meta, reason.to_owned(), true, false, None, None, None)
1777 }
1778
1779 fn restart_limit_error(&self, imports: &[String], limit: RestartLimitExceeded) -> ServerError {
1780 let snapshot = self.runtime.lock().facts();
1781 ServerError::worker_unavailable(WorkerUnavailable {
1782 retryable: false,
1783 worker_restarted: false,
1784 project_root: self.config.lake_root.to_string_lossy().into_owned(),
1785 project_hash: self.config.manifest_hash.clone(),
1786 imports: imports.to_vec(),
1787 session_id: self.config.session_id.clone(),
1788 lean_toolchain: self.config.toolchain_label.clone(),
1789 worker_generation: self.observed_generation(),
1790 reason: limit.message,
1791 restart_cause: Some(limit.cause.as_str().to_owned()),
1792 rss_kib: self.last_rss_kib,
1793 limit_kib: None,
1794 retry_after_millis: Some(limit.window_millis),
1795 restarts_in_window: Some(limit.restarts_in_window),
1796 window_millis: Some(limit.window_millis),
1797 runtime: snapshot,
1798 toolchain_advisories: self.config.toolchain_advisories.clone(),
1799 })
1800 }
1801}
1802
1803struct RestartLimitExceeded {
1804 message: String,
1805 cause: RestartCause,
1806 restarts_in_window: u64,
1807 window_millis: u64,
1808}
1809
1810fn actor_main(
1811 config: ActorConfig,
1812 init_reply: std::sync::mpsc::Sender<std::result::Result<(String, mpsc::Sender<ProjectMessage>), ServerError>>,
1813) {
1814 let (handle, runtime_toolchain) = match open_worker(&config, true) {
1815 Ok(value) => value,
1816 Err(err) => {
1817 let _ = init_reply.send(Err(err));
1818 return;
1819 }
1820 };
1821 let mut state = ProjectActorState {
1822 config: config.clone(),
1823 handle,
1824 worker_generation_base: 1,
1825 last_restart: None,
1826 last_import_fingerprint: None,
1827 profile_switch_count: 0,
1828 last_rss_kib: None,
1829 runtime: Arc::clone(&config.runtime),
1830 abnormal_restart_times: VecDeque::new(),
1831 restart_stats: RestartStats::default(),
1832 };
1833
1834 let (tx, mut rx) = mpsc::channel::<ProjectMessage>(config.mailbox_capacity);
1835 if init_reply.send(Ok((runtime_toolchain, tx))).is_err() {
1836 return;
1837 }
1838
1839 while let Some(message) = rx.blocking_recv() {
1840 if !config.healthy.load(Ordering::Acquire) {
1841 message.reject(&state, "project_shutting_down");
1842 continue;
1843 }
1844 state.handle_message(message);
1845 }
1846
1847 match state.handle.shutdown() {
1848 Ok(report) => {
1849 tracing::debug!(
1850 outcome = ?report.outcome,
1851 elapsed_millis = millis_u64(report.elapsed),
1852 wait_millis = millis_u64(report.wait_elapsed),
1853 "project worker shutdown complete"
1854 );
1855 }
1856 Err(err) => {
1857 tracing::warn!(error = %err, "project worker shutdown failed");
1858 }
1859 }
1860}
1861
1862fn open_worker(config: &ActorConfig, preflight: bool) -> Result<(LeanWorkerHostHandle, String)> {
1863 let builder = worker_builder(config);
1864 if preflight {
1865 let report = builder.check();
1866 if let Some(first) = report.first_error() {
1867 if bootstrap_failure_is_hard_rss(&first.code().to_string(), first.message()) {
1868 return Err(bootstrap_hard_rss_unavailable(
1869 config,
1870 first.message().to_owned(),
1871 parse_keyed_u64(first.message(), "current_kib"),
1872 parse_keyed_u64(first.message(), "limit_kib").or(Some(config.worker_rss_hard_kill_kib)),
1873 ));
1874 }
1875 return Err(ServerError::BadProject(format!(
1876 "{}: {}",
1877 first.code(),
1878 first.message()
1879 )));
1880 }
1881 }
1882 let handle = match builder.open() {
1883 Ok(handle) => handle,
1884 Err(LeanWorkerError::RssHardLimitExceeded {
1885 operation,
1886 current_kib,
1887 limit_kib,
1888 ..
1889 }) => {
1890 return Err(bootstrap_hard_rss_unavailable(
1891 config,
1892 format!(
1893 "rss_hard_limit_exceeded operation={operation} current_kib={current_kib} limit_kib={limit_kib}"
1894 ),
1895 Some(current_kib),
1896 Some(limit_kib),
1897 ));
1898 }
1899 Err(err) => return Err(map_worker_err(err)),
1900 };
1901 let runtime_toolchain = handle
1902 .runtime_metadata()
1903 .lean_version
1904 .unwrap_or_else(|| config.toolchain_label.clone());
1905 Ok((handle, runtime_toolchain))
1906}
1907
1908fn bootstrap_failure_is_hard_rss(code: &str, message: &str) -> bool {
1909 let lower = message.to_lowercase();
1910 code.contains("rss")
1911 || lower.contains("hard rss limit")
1912 || lower.contains("rss_hard_limit")
1913 || lower.contains("rss_hard_limit_exceeded")
1914 || lower.contains("rss hard limit")
1915}
1916
1917fn bootstrap_hard_rss_unavailable(
1918 config: &ActorConfig,
1919 reason: String,
1920 current_kib: Option<u64>,
1921 limit_kib: Option<u64>,
1922) -> ServerError {
1923 config.healthy.store(false, Ordering::Release);
1924 let generation = config.runtime.lock().worker_generation;
1925 let event = restart_event(
1926 RestartCause::RssHardLimit,
1927 reason.clone(),
1928 generation,
1929 current_kib,
1930 limit_kib,
1931 );
1932 let restarts_total = 1;
1936 log_restart(&event, restarts_total);
1937 let by_cause = BTreeMap::from([(RestartCause::RssHardLimit.as_str().to_owned(), restarts_total)]);
1938 let runtime = RuntimeFacts {
1939 worker_generation: generation,
1940 worker_restarted: true,
1941 retry_count: 0,
1942 admission_wait_millis: 0,
1943 queue_wait_millis: 0,
1944 call_restart: Some(event.clone()),
1945 last_restart: Some(event.clone()),
1946 rss_kib: current_kib,
1947 worker_lanes: 1,
1948 import_profile: None,
1949 profile_switch_count: 0,
1950 restarts_total,
1951 restarts_by_cause: by_cause.clone(),
1952 };
1953 *config.runtime.lock() = RuntimeSnapshot {
1954 worker_generation: generation,
1955 last_restart: Some(event),
1956 rss_kib: current_kib,
1957 import_profile: None,
1958 profile_switch_count: 0,
1959 restarts_total,
1960 restarts_by_cause: by_cause,
1961 };
1962 ServerError::worker_unavailable(WorkerUnavailable {
1963 retryable: false,
1964 worker_restarted: true,
1965 project_root: config.lake_root.to_string_lossy().into_owned(),
1966 project_hash: config.manifest_hash.clone(),
1967 imports: Vec::new(),
1968 session_id: config.session_id.clone(),
1969 lean_toolchain: config.toolchain_label.clone(),
1970 worker_generation: generation,
1971 reason,
1972 restart_cause: Some(RestartCause::RssHardLimit.as_str().to_owned()),
1973 rss_kib: current_kib,
1974 limit_kib,
1975 retry_after_millis: None,
1976 restarts_in_window: None,
1977 window_millis: None,
1978 runtime,
1979 toolchain_advisories: config.toolchain_advisories.clone(),
1980 })
1981}
1982
1983fn worker_builder(config: &ActorConfig) -> LeanWorkerHostHandleBuilder {
1984 let restart_policy = LeanWorkerRestartPolicy::default().max_requests(WORKER_REQUEST_RESTARTS);
1985 let module_cache_limits = module_cache_limits(config);
1986 LeanWorkerHostHandleBuilder::shims_only(&config.lake_root, std::iter::empty::<String>())
1987 .worker_child(LeanWorkerChild::for_toolchain(
1988 config.worker_path.clone(),
1989 config.lean_sysroot.clone(),
1990 ))
1991 .startup_timeout(Duration::from_secs(30))
1992 .request_timeout(Duration::from_millis(config.request_timeout_millis))
1993 .restart_policy(restart_policy)
1994 .rss_hard_limit(
1995 config.worker_rss_hard_kill_kib,
1996 Duration::from_millis(config.worker_rss_sample_millis),
1997 )
1998 .module_cache_limits(module_cache_limits)
1999}
2000
2001fn semantic_capability_builder(
2002 config: &ActorConfig,
2003 built: &lean_toolchain::LeanBuiltCapability,
2004) -> Result<LeanWorkerCapabilityBuilder> {
2005 let restart_policy = LeanWorkerRestartPolicy::default().max_requests(WORKER_REQUEST_RESTARTS);
2006 let builder = LeanWorkerCapabilityBuilder::from_built_capability(built, std::iter::empty::<String>())
2007 .map_err(map_worker_err)?
2008 .import_workspace_root(config.lake_root.clone())
2009 .worker_child(LeanWorkerChild::for_toolchain(
2010 config.worker_path.clone(),
2011 config.lean_sysroot.clone(),
2012 ))
2013 .startup_timeout(Duration::from_secs(30))
2014 .request_timeout(Duration::from_millis(config.request_timeout_millis))
2015 .restart_policy(restart_policy)
2016 .rss_hard_limit(
2017 config.worker_rss_hard_kill_kib,
2018 Duration::from_millis(config.worker_rss_sample_millis),
2019 )
2020 .module_cache_limits(module_cache_limits(config))
2021 .json_command_export(lean_semantic_search_capability::DECLARATION_FEATURES_EXPORT)
2022 .json_command_export(lean_semantic_search_capability::PROOF_GOAL_FEATURES_EXPORT);
2023 Ok(builder)
2024}
2025
2026fn semantic_runtime_cache_root() -> Result<PathBuf> {
2027 let cache_dir =
2028 dirs::cache_dir().ok_or_else(|| ServerError::Internal("could not resolve user cache directory".to_owned()))?;
2029 Ok(cache_dir.join("lean-host-mcp").join("semantic-runtimes"))
2030}
2031
2032fn module_cache_limits(config: &ActorConfig) -> LeanWorkerModuleCacheLimits {
2033 LeanWorkerModuleCacheLimits::default()
2034 .rss_guard_kib(config.module_cache_rss_guard_kib)
2035 .max_bytes(config.module_cache_max_bytes)
2036}
2037
2038fn actor_thread_name(canonical_root: &Path) -> String {
2039 let basename = canonical_root.file_name().and_then(|s| s.to_str()).unwrap_or("project");
2040 format!("lean-host-mcp/project/{basename}")
2041}
2042
2043fn worker_error_is_recoverable_death(err: &LeanWorkerError) -> bool {
2044 matches!(
2045 err,
2046 LeanWorkerError::ChildExited { .. } | LeanWorkerError::ChildPanicOrAbort { .. }
2047 )
2048}
2049
2050fn worker_error_is_session_missing(err: &LeanWorkerError) -> bool {
2051 matches!(err, LeanWorkerError::Worker { code, .. } if code == "lean_rs.worker.session_missing")
2052}
2053
2054#[allow(
2055 clippy::wildcard_enum_match_arm,
2056 reason = "only worker process death variants are restart causes; all other errors are classified elsewhere"
2057)]
2058fn worker_death_cause(err: &LeanWorkerError) -> RestartCause {
2059 match err {
2060 LeanWorkerError::ChildPanicOrAbort { .. } => RestartCause::ChildAbort,
2061 LeanWorkerError::ChildExited { .. } => RestartCause::ChildExit,
2062 _ => RestartCause::WorkerInternal,
2063 }
2064}
2065
2066fn restart_reason_text(reason: &LeanWorkerRestartReason) -> String {
2067 match reason {
2068 LeanWorkerRestartReason::Explicit => RestartCause::Explicit.as_str().to_owned(),
2069 LeanWorkerRestartReason::MaxRequests { limit } => format!("max_requests limit={limit}"),
2070 LeanWorkerRestartReason::MaxImports { limit } => format!("max_imports limit={limit}"),
2071 LeanWorkerRestartReason::RssCeiling {
2072 current_kib, limit_kib, ..
2073 } => {
2074 format!("rss_ceiling current_kib={current_kib} limit_kib={limit_kib}")
2075 }
2076 LeanWorkerRestartReason::RssHardLimit {
2077 operation,
2078 current_kib,
2079 limit_kib,
2080 ..
2081 } => {
2082 format!("rss_hard_limit operation={operation} current_kib={current_kib} limit_kib={limit_kib}")
2083 }
2084 LeanWorkerRestartReason::Idle { idle_for, limit } => {
2085 format!(
2086 "idle idle_for_millis={} limit_millis={}",
2087 millis_u64(*idle_for),
2088 millis_u64(*limit)
2089 )
2090 }
2091 LeanWorkerRestartReason::Cancelled { operation } => format!("cancelled operation={operation}"),
2092 LeanWorkerRestartReason::ChildAbort { operation } => format!("child_abort operation={operation}"),
2093 LeanWorkerRestartReason::RequestTimeout { operation, duration } => {
2094 format!(
2095 "timeout operation={operation} duration_millis={}",
2096 millis_u64(*duration)
2097 )
2098 }
2099 }
2100}
2101
2102fn import_fingerprint(imports: &[String]) -> String {
2103 imports.join("\n")
2104}
2105
2106fn millis_u64(duration: Duration) -> u64 {
2107 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
2108}
2109
2110fn parse_keyed_u64(text: &str, key: &str) -> Option<u64> {
2111 let prefix = format!("{key}=");
2112 let start = text.find(&prefix)?.checked_add(prefix.len())?;
2113 let digits = text[start..]
2114 .chars()
2115 .take_while(|ch| ch.is_ascii_digit())
2116 .collect::<String>();
2117 digits.parse().ok()
2118}
2119
2120fn parse_nonzero_u64(name: &str, env: Option<&str>, file: Option<u64>, default: u64) -> Result<u64> {
2124 let value = match env {
2125 Some(raw) => raw
2126 .parse::<u64>()
2127 .map_err(|e| ServerError::Internal(format!("{name}={raw:?} not a u64: {e}")))?,
2128 None => file.unwrap_or(default),
2129 };
2130 if value == 0 {
2131 return Err(ServerError::Internal(format!(
2132 "{name} resolved to 0, which is not allowed"
2133 )));
2134 }
2135 Ok(value)
2136}
2137
2138fn parse_nonzero_usize(name: &str, env: Option<&str>, file: Option<usize>, default: usize) -> Result<usize> {
2139 let parsed = parse_nonzero_u64(name, env, file.map(|v| v as u64), default as u64)?;
2140 usize::try_from(parsed).map_err(|_| ServerError::Internal(format!("{name}={parsed} does not fit in usize")))
2141}
2142
2143#[cfg(test)]
2144#[allow(
2145 clippy::expect_used,
2146 clippy::panic,
2147 clippy::unwrap_used,
2148 reason = "unit tests use expect/unwrap_err to state the branch under test directly"
2149)]
2150mod tests {
2151 use super::*;
2152
2153 #[test]
2154 fn runtime_config_parses_runtime_policy_without_env_reads() {
2155 let config = parse_runtime_config(
2158 RuntimeEnv {
2159 worker_rss_post_job_restart_kib: Some("5".to_owned()),
2160 worker_rss_hard_kill_kib: Some("7".to_owned()),
2161 worker_rss_sample_millis: Some("11".to_owned()),
2162 import_switch_rss_soft_kib: Some("3".to_owned()),
2163 module_cache_rss_guard_kib: Some("17".to_owned()),
2164 module_cache_max_bytes: Some("19".to_owned()),
2165 request_timeout_millis: Some("37".to_owned()),
2166 project_mailbox_capacity: Some("23".to_owned()),
2167 worker_restart_limit: Some("29".to_owned()),
2168 worker_restart_window_secs: Some("31".to_owned()),
2169 },
2170 &RuntimeFileConfig::default(),
2171 )
2172 .unwrap();
2173
2174 assert_eq!(config.worker_rss_post_job_restart_kib(), 5);
2175 assert_eq!(config.worker_rss_hard_kill_kib(), 7);
2176 assert_eq!(config.worker_rss_sample_millis(), 11);
2177 assert_eq!(config.import_switch_rss_soft_kib(), 3);
2178 assert_eq!(config.module_cache_rss_guard_kib(), 17);
2179 assert_eq!(config.module_cache_max_bytes(), 19);
2180 assert_eq!(config.request_timeout_millis(), 37);
2181 assert_eq!(config.mailbox_capacity(), 23);
2182 assert_eq!(config.max_restarts_per_window(), 29);
2183 assert_eq!(config.restart_window(), Duration::from_secs(31));
2184 }
2185
2186 #[test]
2187 fn request_timeout_precedence_env_over_file_over_default() {
2188 let file = RuntimeFileConfig {
2189 request_timeout_millis: Some(45_000),
2190 ..RuntimeFileConfig::default()
2191 };
2192 let config = parse_runtime_config(RuntimeEnv::default(), &file).unwrap();
2194 assert_eq!(config.request_timeout_millis(), 45_000);
2195 let env = RuntimeEnv {
2197 request_timeout_millis: Some("90000".to_owned()),
2198 ..RuntimeEnv::default()
2199 };
2200 let config = parse_runtime_config(env, &file).unwrap();
2201 assert_eq!(config.request_timeout_millis(), 90_000);
2202 let config = parse_runtime_config(RuntimeEnv::default(), &RuntimeFileConfig::default()).unwrap();
2204 assert_eq!(config.request_timeout_millis(), REQUEST_TIMEOUT_MILLIS);
2205 }
2206
2207 #[test]
2208 fn request_timeout_zero_is_rejected() {
2209 let err = parse_runtime_config(
2212 RuntimeEnv {
2213 request_timeout_millis: Some("0".to_owned()),
2214 ..RuntimeEnv::default()
2215 },
2216 &RuntimeFileConfig::default(),
2217 )
2218 .unwrap_err();
2219 let ServerError::Internal(message) = err else {
2220 panic!("expected Internal config error");
2221 };
2222 assert!(
2223 message.contains("LEAN_HOST_MCP_REQUEST_TIMEOUT_MILLIS"),
2224 "message: {message}"
2225 );
2226 }
2227
2228 #[test]
2229 fn rss_config_rejects_post_job_above_hard_kill() {
2230 let err = parse_runtime_config(
2231 RuntimeEnv {
2232 worker_rss_post_job_restart_kib: Some("20971520".to_owned()),
2235 ..RuntimeEnv::default()
2236 },
2237 &RuntimeFileConfig::default(),
2238 )
2239 .unwrap_err();
2240 let ServerError::Internal(message) = err else {
2241 panic!("expected Internal config error");
2242 };
2243 assert!(message.contains("invalid RSS config"), "message: {message}");
2244 assert!(message.contains("hard_kill"), "message: {message}");
2245 }
2246
2247 #[test]
2248 fn rss_config_rejects_import_switch_above_post_job() {
2249 let err = parse_runtime_config(
2250 RuntimeEnv {
2251 import_switch_rss_soft_kib: Some("6291456".to_owned()),
2253 ..RuntimeEnv::default()
2254 },
2255 &RuntimeFileConfig::default(),
2256 )
2257 .unwrap_err();
2258 let ServerError::Internal(message) = err else {
2259 panic!("expected Internal config error");
2260 };
2261 assert!(message.contains("invalid RSS config"), "message: {message}");
2262 assert!(message.contains("import_switch"), "message: {message}");
2263 }
2264
2265 #[test]
2266 fn rss_config_accepts_raising_post_job_to_8gib() {
2267 let config = parse_runtime_config(
2270 RuntimeEnv {
2271 worker_rss_post_job_restart_kib: Some("8388608".to_owned()),
2272 ..RuntimeEnv::default()
2273 },
2274 &RuntimeFileConfig::default(),
2275 )
2276 .unwrap();
2277 assert_eq!(config.worker_rss_post_job_restart_kib(), 8_388_608);
2278 }
2279
2280 #[test]
2281 fn runtime_config_precedence_env_over_file_over_default() {
2282 let file = RuntimeFileConfig {
2283 worker_rss_post_job_restart_kib: Some(8_388_608),
2284 ..RuntimeFileConfig::default()
2285 };
2286 let config = parse_runtime_config(RuntimeEnv::default(), &file).unwrap();
2288 assert_eq!(config.worker_rss_post_job_restart_kib(), 8_388_608);
2289 let env = RuntimeEnv {
2291 worker_rss_post_job_restart_kib: Some("6291456".to_owned()),
2292 ..RuntimeEnv::default()
2293 };
2294 let config = parse_runtime_config(env, &file).unwrap();
2295 assert_eq!(config.worker_rss_post_job_restart_kib(), 6_291_456);
2296 let config = parse_runtime_config(RuntimeEnv::default(), &RuntimeFileConfig::default()).unwrap();
2298 assert_eq!(
2299 config.worker_rss_post_job_restart_kib(),
2300 WORKER_RSS_POST_JOB_RESTART_KIB
2301 );
2302 }
2303
2304 #[test]
2305 fn runtime_config_rejects_zero_and_bad_ordering_from_file() {
2306 let zero = RuntimeFileConfig {
2307 worker_rss_sample_millis: Some(0),
2308 ..RuntimeFileConfig::default()
2309 };
2310 assert!(parse_runtime_config(RuntimeEnv::default(), &zero).is_err());
2311
2312 let inverted = RuntimeFileConfig {
2313 worker_rss_post_job_restart_kib: Some(20_971_520), ..RuntimeFileConfig::default()
2315 };
2316 let err = parse_runtime_config(RuntimeEnv::default(), &inverted).unwrap_err();
2317 let ServerError::Internal(message) = err else {
2318 panic!("expected Internal config error");
2319 };
2320 assert!(message.contains("invalid RSS config"), "message: {message}");
2321 }
2322
2323 #[test]
2324 fn restart_stats_tally_total_and_per_cause() {
2325 let mut stats = RestartStats::default();
2326 stats.observe("rss_post_job");
2327 stats.observe("rss_post_job");
2328 stats.observe("child_abort");
2329
2330 assert_eq!(stats.total, 3);
2331 assert_eq!(stats.by_cause.get("rss_post_job"), Some(&2));
2332 assert_eq!(stats.by_cause.get("child_abort"), Some(&1));
2333 assert_eq!(stats.by_cause.get("idle"), None);
2334 }
2335
2336 #[test]
2337 fn semantic_capability_builder_omits_capability_import_module() {
2338 let tmp = tempfile::tempdir().unwrap();
2339 let capability_root = tmp.path().join("capability");
2340 let lib_dir = capability_root.join(".lake").join("build").join("lib");
2341 std::fs::create_dir_all(&lib_dir).unwrap();
2342 let dylib_name = if cfg!(target_os = "macos") {
2343 "libLeanSemanticSearch.dylib"
2344 } else {
2345 "libLeanSemanticSearch.so"
2346 };
2347 let dylib = lib_dir.join(dylib_name);
2348 std::fs::write(&dylib, "").unwrap();
2349 let built = lean_toolchain::LeanBuiltCapability::path(&dylib)
2350 .package("lean_semantic_search")
2351 .module("LeanSemanticSearch");
2352 let runtime = Arc::new(Mutex::new(RuntimeSnapshot {
2353 worker_generation: 1,
2354 last_restart: None,
2355 rss_kib: None,
2356 import_profile: None,
2357 profile_switch_count: 0,
2358 restarts_total: 0,
2359 restarts_by_cause: BTreeMap::new(),
2360 }));
2361 let config = ActorConfig {
2362 lake_root: tmp.path().join("consumer"),
2363 manifest_hash: "sha256-test".to_owned(),
2364 toolchain_label: "leanprover/lean4:test".to_owned(),
2365 worker_path: tmp.path().join("worker"),
2366 lean_sysroot: tmp.path().join("lean"),
2367 session_id: "session-test".to_owned(),
2368 runtime,
2369 healthy: Arc::new(AtomicBool::new(true)),
2370 worker_rss_post_job_restart_kib: WORKER_RSS_POST_JOB_RESTART_KIB,
2371 worker_rss_hard_kill_kib: WORKER_RSS_HARD_KILL_KIB,
2372 worker_rss_sample_millis: WORKER_RSS_SAMPLE_MILLIS,
2373 import_switch_rss_soft_kib: IMPORT_SWITCH_RSS_SOFT_KIB,
2374 module_cache_rss_guard_kib: MODULE_CACHE_RSS_GUARD_KIB,
2375 module_cache_max_bytes: MODULE_CACHE_MAX_BYTES,
2376 request_timeout_millis: REQUEST_TIMEOUT_MILLIS,
2377 mailbox_capacity: PROJECT_MAILBOX_CAPACITY,
2378 max_restarts_per_window: MAX_RESTARTS_PER_WINDOW,
2379 restart_window: RESTART_WINDOW,
2380 toolchain_advisories: Vec::new(),
2381 };
2382
2383 let builder = semantic_capability_builder(&config, &built).unwrap();
2384 let debug = format!("{builder:?}");
2385
2386 assert!(debug.contains("imports: []"), "builder debug: {debug}");
2387 assert!(
2388 !debug.contains("LeanSemanticSearch.Capability"),
2389 "builder must not import the capability module: {debug}"
2390 );
2391 assert!(
2392 debug.contains("import_workspace_root: Some"),
2393 "builder should import sessions against the consumer workspace: {debug}"
2394 );
2395 }
2396
2397 #[test]
2398 fn bootstrap_hard_rss_failure_is_structured_runtime_unavailable() {
2399 let runtime = Arc::new(Mutex::new(RuntimeSnapshot {
2400 worker_generation: 1,
2401 last_restart: None,
2402 rss_kib: None,
2403 import_profile: None,
2404 profile_switch_count: 0,
2405 restarts_total: 0,
2406 restarts_by_cause: BTreeMap::new(),
2407 }));
2408 let config = ActorConfig {
2409 lake_root: PathBuf::from("/tmp/lean-host-mcp-bootstrap-rss-test"),
2410 manifest_hash: "sha256-test".to_owned(),
2411 toolchain_label: "leanprover/lean4:test".to_owned(),
2412 worker_path: PathBuf::from("/tmp/worker"),
2413 lean_sysroot: PathBuf::from("/tmp/lean"),
2414 session_id: "session-test".to_owned(),
2415 runtime: Arc::clone(&runtime),
2416 healthy: Arc::new(AtomicBool::new(true)),
2417 worker_rss_post_job_restart_kib: WORKER_RSS_POST_JOB_RESTART_KIB,
2418 worker_rss_hard_kill_kib: 64,
2419 worker_rss_sample_millis: WORKER_RSS_SAMPLE_MILLIS,
2420 import_switch_rss_soft_kib: IMPORT_SWITCH_RSS_SOFT_KIB,
2421 module_cache_rss_guard_kib: MODULE_CACHE_RSS_GUARD_KIB,
2422 module_cache_max_bytes: MODULE_CACHE_MAX_BYTES,
2423 request_timeout_millis: REQUEST_TIMEOUT_MILLIS,
2424 mailbox_capacity: PROJECT_MAILBOX_CAPACITY,
2425 max_restarts_per_window: MAX_RESTARTS_PER_WINDOW,
2426 restart_window: RESTART_WINDOW,
2427 toolchain_advisories: Vec::new(),
2428 };
2429
2430 let err = bootstrap_hard_rss_unavailable(
2431 &config,
2432 "rss_hard_limit_exceeded operation=startup current_kib=128 limit_kib=64".to_owned(),
2433 Some(128),
2434 Some(64),
2435 );
2436
2437 let ServerError::WorkerUnavailable(info) = err else {
2438 panic!("expected WorkerUnavailable");
2439 };
2440 assert!(!info.retryable);
2441 assert_eq!(info.restart_cause.as_deref(), Some("rss_hard_limit_exceeded"));
2442 assert_eq!(info.rss_kib, Some(128));
2443 assert_eq!(info.limit_kib, Some(64));
2444 assert_eq!(
2445 info.runtime.last_restart.as_ref().map(|event| event.cause.as_str()),
2446 Some("rss_hard_limit_exceeded")
2447 );
2448 }
2449
2450 #[test]
2451 fn planned_restart_causes_do_not_consume_abnormal_restart_budget() {
2452 assert!(!RestartCause::RssImportSwitch.counts_toward_restart_limit());
2453 assert!(!RestartCause::RssPostJob.counts_toward_restart_limit());
2454 assert!(!RestartCause::MaxRequests.counts_toward_restart_limit());
2455 assert!(!RestartCause::MaxImports.counts_toward_restart_limit());
2456 assert!(!RestartCause::Idle.counts_toward_restart_limit());
2457
2458 assert!(RestartCause::ChildExit.counts_toward_restart_limit());
2459 assert!(RestartCause::ChildAbort.counts_toward_restart_limit());
2460 assert!(RestartCause::Timeout.counts_toward_restart_limit());
2461 assert!(RestartCause::Cancelled.counts_toward_restart_limit());
2462 assert!(RestartCause::SessionMissing.counts_toward_restart_limit());
2463 assert!(RestartCause::RssHardLimit.counts_toward_restart_limit());
2464 }
2465
2466 #[test]
2467 fn worker_restart_reason_maps_to_stable_cause() {
2468 assert_eq!(
2469 restart_cause_from_worker(&LeanWorkerRestartReason::MaxRequests { limit: 1 }).as_str(),
2470 "max_requests"
2471 );
2472 assert_eq!(
2473 restart_cause_from_worker(&LeanWorkerRestartReason::RssCeiling {
2474 current_kib: 2,
2475 limit_kib: 1,
2476 last_import_stats: None,
2477 })
2478 .as_str(),
2479 "rss_post_job"
2480 );
2481 assert_eq!(
2482 restart_cause_from_worker(&LeanWorkerRestartReason::RssHardLimit {
2483 operation: "test",
2484 current_kib: 2,
2485 limit_kib: 1,
2486 last_import_stats: None,
2487 })
2488 .as_str(),
2489 "rss_hard_limit_exceeded"
2490 );
2491 assert_eq!(
2492 restart_cause_from_worker(&LeanWorkerRestartReason::RequestTimeout {
2493 operation: "test",
2494 duration: Duration::from_millis(1),
2495 })
2496 .as_str(),
2497 "timeout"
2498 );
2499 }
2500}