Skip to main content

lean_host_mcp/
project.rs

1//! `LeanProject`—the unit of Lean semantic execution.
2//!
3//! One Lake project owns one private serialized controller. The controller
4//! submits one worker request at a time, applies host memory/retry policy, and
5//! exposes only typed request/reply calls to tool modules. The lower
6//! `lean-rs-worker-parent` service owns child-process shutdown, generation
7//! separation, terminal outcomes, and primitive restart mechanics.
8
9#![allow(let_underscore_drop, clippy::needless_pass_by_value)]
10
11use std::collections::{BTreeMap, VecDeque};
12use std::num::NonZeroUsize;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
16use std::thread;
17use std::time::{Duration, Instant};
18
19use lean_rs_worker_parent::{
20    LeanWorkerCapabilityBuilder, LeanWorkerChild, LeanWorkerDeclarationInspectionRequest,
21    LeanWorkerDeclarationInspectionResult, LeanWorkerDeclarationSearch, LeanWorkerDeclarationSearchResult,
22    LeanWorkerDeclarationVerificationBatchRequest, LeanWorkerDeclarationVerificationBatchResult,
23    LeanWorkerDeclarationVerificationRequest, LeanWorkerDeclarationVerificationResult, LeanWorkerElabOptions,
24    LeanWorkerError, LeanWorkerHostHandle, LeanWorkerHostHandleBuilder, LeanWorkerLifecycleSnapshot,
25    LeanWorkerModuleCacheLimits, LeanWorkerModuleQuery, LeanWorkerModuleQueryBatchOutcome,
26    LeanWorkerModuleQueryOutcome, LeanWorkerModuleQuerySelector, LeanWorkerOutputBudgets,
27    LeanWorkerProofAttemptRequest, LeanWorkerProofAttemptResult, LeanWorkerRestartPolicy, LeanWorkerRestartReason,
28};
29use lean_semantic_search_runtime::SemanticSearchRuntimeBuild;
30use parking_lot::Mutex;
31use tokio::sync::{mpsc, oneshot};
32
33use crate::admission::SemanticPermit;
34use crate::cache::ModuleQueryCache;
35use crate::config_file::RuntimeFileConfig;
36use crate::envelope::{Freshness, RuntimeFacts, RuntimeRestartEvent};
37use crate::error::{Result, ServerError, WorkerUnavailable, map_worker_err};
38use crate::lake_meta::LakeProjectMeta;
39use crate::semantic_search::{SemanticProofSearchRequest, SemanticProofSearchResult};
40use crate::toolchain::{Readiness, ToolchainId, WorkerBinary};
41
42/// LRU capacity for exact bounded module query results.
43const MODULE_QUERY_CACHE_CAPACITY: usize = 256;
44const WORKER_REQUEST_RESTARTS: u64 = 64;
45const PROJECT_MAILBOX_CAPACITY: usize = 8;
46const WORKER_RSS_POST_JOB_RESTART_KIB: u64 = 5 * 1024 * 1024;
47const WORKER_RSS_HARD_KILL_KIB: u64 = 16 * 1024 * 1024;
48const WORKER_RSS_SAMPLE_MILLIS: u64 = 250;
49const IMPORT_SWITCH_RSS_SOFT_KIB: u64 = 2 * 1024 * 1024;
50const MODULE_CACHE_RSS_GUARD_KIB: u64 = 2 * 1024 * 1024;
51const MODULE_CACHE_MAX_BYTES: u64 = 32 * 1024 * 1024;
52/// Per-request worker deadline. Covers one tool call end to end (live rows,
53/// diagnostics, terminal response); on expiry the worker is recycled and the
54/// call returns a retryable runtime error. Replaces the worker-parent's 10-min
55/// `long_running_requests` profile, which let whole-project scans (e.g.
56/// `find_references` at project scope) appear to hang. Raise it for unusually
57/// heavy modules whose `verify`/`proof_state` legitimately runs longer.
58const REQUEST_TIMEOUT_MILLIS: u64 = 120 * 1000;
59const MAX_JOB_RETRIES: u32 = 1;
60const MAX_RESTARTS_PER_WINDOW: usize = 3;
61const RESTART_WINDOW: Duration = Duration::from_mins(1);
62
63/// Runtime policy for one private project actor.
64///
65/// The binary parses this once at server startup and passes it into the
66/// broker. Tests and embedders can construct the default directly without
67/// rereading process environment during project open.
68#[derive(Debug, Clone, Eq, PartialEq)]
69pub struct ProjectRuntimeConfig {
70    worker_rss_post_job_restart_kib: u64,
71    worker_rss_hard_kill_kib: u64,
72    worker_rss_sample_millis: u64,
73    import_switch_rss_soft_kib: u64,
74    module_cache_rss_guard_kib: u64,
75    module_cache_max_bytes: u64,
76    request_timeout_millis: u64,
77    mailbox_capacity: usize,
78    max_restarts_per_window: usize,
79    restart_window: Duration,
80}
81
82impl Default for ProjectRuntimeConfig {
83    fn default() -> Self {
84        Self {
85            worker_rss_post_job_restart_kib: WORKER_RSS_POST_JOB_RESTART_KIB,
86            worker_rss_hard_kill_kib: WORKER_RSS_HARD_KILL_KIB,
87            worker_rss_sample_millis: WORKER_RSS_SAMPLE_MILLIS,
88            import_switch_rss_soft_kib: IMPORT_SWITCH_RSS_SOFT_KIB,
89            module_cache_rss_guard_kib: MODULE_CACHE_RSS_GUARD_KIB,
90            module_cache_max_bytes: MODULE_CACHE_MAX_BYTES,
91            request_timeout_millis: REQUEST_TIMEOUT_MILLIS,
92            mailbox_capacity: PROJECT_MAILBOX_CAPACITY,
93            max_restarts_per_window: MAX_RESTARTS_PER_WINDOW,
94            restart_window: RESTART_WINDOW,
95        }
96    }
97}
98
99impl ProjectRuntimeConfig {
100    /// Parse runtime env vars once at server startup.
101    ///
102    /// # Errors
103    ///
104    /// [`ServerError::Internal`] when a runtime env var is malformed, zero
105    /// where zero is unsafe.
106    pub fn from_env() -> Result<Self> {
107        Self::from_env_with_file(&RuntimeFileConfig::default())
108    }
109
110    /// Resolve the runtime policy with a config-file section as the layer
111    /// beneath env vars: each knob is `env var > file > built-in default`.
112    ///
113    /// # Errors
114    ///
115    /// [`ServerError::Internal`] when an env var is malformed, or a resolved
116    /// value (from env or file) is zero where zero is unsafe, or the RSS
117    /// ceilings violate `import_switch <= post_job <= hard_kill`.
118    pub fn from_env_with_file(file: &RuntimeFileConfig) -> Result<Self> {
119        parse_runtime_config(
120            RuntimeEnv {
121                worker_rss_post_job_restart_kib: runtime_env_var("LEAN_HOST_MCP_WORKER_RSS_POST_JOB_RESTART_KIB")?,
122                worker_rss_hard_kill_kib: runtime_env_var("LEAN_HOST_MCP_WORKER_RSS_HARD_KILL_KIB")?,
123                worker_rss_sample_millis: runtime_env_var("LEAN_HOST_MCP_WORKER_RSS_SAMPLE_MILLIS")?,
124                import_switch_rss_soft_kib: runtime_env_var("LEAN_HOST_MCP_IMPORT_SWITCH_RSS_SOFT_KIB")?,
125                module_cache_rss_guard_kib: runtime_env_var("LEAN_HOST_MCP_MODULE_CACHE_RSS_GUARD_KIB")?,
126                module_cache_max_bytes: runtime_env_var("LEAN_HOST_MCP_MODULE_CACHE_MAX_BYTES")?,
127                request_timeout_millis: runtime_env_var("LEAN_HOST_MCP_REQUEST_TIMEOUT_MILLIS")?,
128                project_mailbox_capacity: runtime_env_var("LEAN_HOST_MCP_PROJECT_MAILBOX_CAPACITY")?,
129                worker_restart_limit: runtime_env_var("LEAN_HOST_MCP_WORKER_RESTART_LIMIT")?,
130                worker_restart_window_secs: runtime_env_var("LEAN_HOST_MCP_WORKER_RESTART_WINDOW_SECS")?,
131            },
132            file,
133        )
134    }
135
136    #[must_use]
137    pub const fn worker_rss_post_job_restart_kib(&self) -> u64 {
138        self.worker_rss_post_job_restart_kib
139    }
140
141    #[must_use]
142    pub const fn worker_rss_hard_kill_kib(&self) -> u64 {
143        self.worker_rss_hard_kill_kib
144    }
145
146    #[must_use]
147    pub const fn worker_rss_sample_millis(&self) -> u64 {
148        self.worker_rss_sample_millis
149    }
150
151    #[must_use]
152    pub const fn import_switch_rss_soft_kib(&self) -> u64 {
153        self.import_switch_rss_soft_kib
154    }
155
156    #[must_use]
157    pub const fn module_cache_rss_guard_kib(&self) -> u64 {
158        self.module_cache_rss_guard_kib
159    }
160
161    #[must_use]
162    pub const fn module_cache_max_bytes(&self) -> u64 {
163        self.module_cache_max_bytes
164    }
165
166    #[must_use]
167    pub const fn request_timeout_millis(&self) -> u64 {
168        self.request_timeout_millis
169    }
170
171    #[must_use]
172    pub const fn mailbox_capacity(&self) -> usize {
173        self.mailbox_capacity
174    }
175
176    #[must_use]
177    pub const fn max_restarts_per_window(&self) -> usize {
178        self.max_restarts_per_window
179    }
180
181    #[must_use]
182    pub const fn restart_window(&self) -> Duration {
183        self.restart_window
184    }
185}
186
187#[derive(Debug, Default)]
188struct RuntimeEnv {
189    worker_rss_post_job_restart_kib: Option<String>,
190    worker_rss_hard_kill_kib: Option<String>,
191    worker_rss_sample_millis: Option<String>,
192    import_switch_rss_soft_kib: Option<String>,
193    module_cache_rss_guard_kib: Option<String>,
194    module_cache_max_bytes: Option<String>,
195    request_timeout_millis: Option<String>,
196    project_mailbox_capacity: Option<String>,
197    worker_restart_limit: Option<String>,
198    worker_restart_window_secs: Option<String>,
199}
200
201fn parse_runtime_config(env: RuntimeEnv, file: &RuntimeFileConfig) -> Result<ProjectRuntimeConfig> {
202    let defaults = ProjectRuntimeConfig::default();
203    let config = ProjectRuntimeConfig {
204        worker_rss_post_job_restart_kib: parse_nonzero_u64(
205            "LEAN_HOST_MCP_WORKER_RSS_POST_JOB_RESTART_KIB",
206            env.worker_rss_post_job_restart_kib.as_deref(),
207            file.worker_rss_post_job_restart_kib,
208            defaults.worker_rss_post_job_restart_kib,
209        )?,
210        worker_rss_hard_kill_kib: parse_nonzero_u64(
211            "LEAN_HOST_MCP_WORKER_RSS_HARD_KILL_KIB",
212            env.worker_rss_hard_kill_kib.as_deref(),
213            file.worker_rss_hard_kill_kib,
214            defaults.worker_rss_hard_kill_kib,
215        )?,
216        worker_rss_sample_millis: parse_nonzero_u64(
217            "LEAN_HOST_MCP_WORKER_RSS_SAMPLE_MILLIS",
218            env.worker_rss_sample_millis.as_deref(),
219            file.worker_rss_sample_millis,
220            defaults.worker_rss_sample_millis,
221        )?,
222        import_switch_rss_soft_kib: parse_nonzero_u64(
223            "LEAN_HOST_MCP_IMPORT_SWITCH_RSS_SOFT_KIB",
224            env.import_switch_rss_soft_kib.as_deref(),
225            file.import_switch_rss_soft_kib,
226            defaults.import_switch_rss_soft_kib,
227        )?,
228        module_cache_rss_guard_kib: parse_nonzero_u64(
229            "LEAN_HOST_MCP_MODULE_CACHE_RSS_GUARD_KIB",
230            env.module_cache_rss_guard_kib.as_deref(),
231            file.module_cache_rss_guard_kib,
232            defaults.module_cache_rss_guard_kib,
233        )?,
234        module_cache_max_bytes: parse_nonzero_u64(
235            "LEAN_HOST_MCP_MODULE_CACHE_MAX_BYTES",
236            env.module_cache_max_bytes.as_deref(),
237            file.module_cache_max_bytes,
238            defaults.module_cache_max_bytes,
239        )?,
240        request_timeout_millis: parse_nonzero_u64(
241            "LEAN_HOST_MCP_REQUEST_TIMEOUT_MILLIS",
242            env.request_timeout_millis.as_deref(),
243            file.request_timeout_millis,
244            defaults.request_timeout_millis,
245        )?,
246        mailbox_capacity: parse_nonzero_usize(
247            "LEAN_HOST_MCP_PROJECT_MAILBOX_CAPACITY",
248            env.project_mailbox_capacity.as_deref(),
249            file.project_mailbox_capacity,
250            defaults.mailbox_capacity,
251        )?,
252        max_restarts_per_window: parse_nonzero_usize(
253            "LEAN_HOST_MCP_WORKER_RESTART_LIMIT",
254            env.worker_restart_limit.as_deref(),
255            file.worker_restart_limit,
256            defaults.max_restarts_per_window,
257        )?,
258        restart_window: Duration::from_secs(parse_nonzero_u64(
259            "LEAN_HOST_MCP_WORKER_RESTART_WINDOW_SECS",
260            env.worker_restart_window_secs.as_deref(),
261            file.worker_restart_window_secs,
262            defaults.restart_window.as_secs(),
263        )?),
264    };
265    validate_rss_ordering(&config)?;
266    Ok(config)
267}
268
269/// The three RSS ceilings escalate: a worker cycles cleanly before an
270/// import-profile switch (`import_switch`), again after a job that grew past the
271/// post-job budget (`post_job`), and is killed in-flight only at the hard limit
272/// (`hard_kill`). If a tuned value inverts that order the cheaper cycle can
273/// never fire — e.g. `post_job > hard_kill` means the planned post-job recycle
274/// is unreachable and every overrun escalates straight to a hard kill. Reject it
275/// at startup with the offending values, rather than degrade silently.
276fn validate_rss_ordering(config: &ProjectRuntimeConfig) -> Result<()> {
277    if config.import_switch_rss_soft_kib > config.worker_rss_post_job_restart_kib {
278        return Err(ServerError::Internal(format!(
279            "invalid RSS config: import_switch={} KiB exceeds post_job={} KiB \
280             (need import_switch <= post_job <= hard_kill)",
281            config.import_switch_rss_soft_kib, config.worker_rss_post_job_restart_kib,
282        )));
283    }
284    if config.worker_rss_post_job_restart_kib > config.worker_rss_hard_kill_kib {
285        return Err(ServerError::Internal(format!(
286            "invalid RSS config: post_job={} KiB exceeds hard_kill={} KiB \
287             (need import_switch <= post_job <= hard_kill)",
288            config.worker_rss_post_job_restart_kib, config.worker_rss_hard_kill_kib,
289        )));
290    }
291    Ok(())
292}
293
294fn runtime_env_var(name: &str) -> Result<Option<String>> {
295    match std::env::var(name) {
296        Ok(value) => Ok(Some(value)),
297        Err(std::env::VarError::NotPresent) => Ok(None),
298        Err(err @ std::env::VarError::NotUnicode(_)) => {
299            Err(ServerError::Internal(format!("{name} is not valid unicode: {err}")))
300        }
301    }
302}
303
304/// Result of one project actor call.
305#[derive(Debug, Clone)]
306pub(crate) struct ProjectCall<T> {
307    value: T,
308    runtime: RuntimeFacts,
309}
310
311impl<T> ProjectCall<T> {
312    pub(crate) fn new(value: T, runtime: RuntimeFacts) -> Self {
313        Self { value, runtime }
314    }
315
316    pub(crate) fn into_parts(self) -> (T, RuntimeFacts) {
317        (self.value, self.runtime)
318    }
319}
320
321#[derive(Clone, Copy, Debug, Eq, PartialEq)]
322enum RetryPolicy {
323    RetryOnceReadOnly,
324}
325
326impl RetryPolicy {
327    fn retries(self) -> u32 {
328        match self {
329            Self::RetryOnceReadOnly => MAX_JOB_RETRIES,
330        }
331    }
332}
333
334struct ActiveJobGuard {
335    active_jobs: Arc<AtomicUsize>,
336}
337
338impl Drop for ActiveJobGuard {
339    fn drop(&mut self) {
340        self.active_jobs.fetch_sub(1, Ordering::AcqRel);
341    }
342}
343
344struct JobMeta {
345    imports: Vec<String>,
346    import_fingerprint: String,
347    _created_at: Instant,
348    queued_at: Instant,
349    admission_wait_millis: u64,
350    _correlation_id: uuid::Uuid,
351    retry_policy: RetryPolicy,
352    _active_job: ActiveJobGuard,
353    _semantic_permit: SemanticPermit,
354}
355
356enum ProjectMessage {
357    ModuleQuery {
358        meta: JobMeta,
359        source: String,
360        query: LeanWorkerModuleQuery,
361        options: LeanWorkerElabOptions,
362        reply: oneshot::Sender<Result<ProjectCall<LeanWorkerModuleQueryOutcome>>>,
363    },
364    ModuleQueryBatch {
365        meta: JobMeta,
366        source: String,
367        selectors: Vec<LeanWorkerModuleQuerySelector>,
368        budgets: LeanWorkerOutputBudgets,
369        options: LeanWorkerElabOptions,
370        reply: oneshot::Sender<Result<ProjectCall<LeanWorkerModuleQueryBatchOutcome>>>,
371    },
372    DeclarationInspection {
373        meta: JobMeta,
374        request: LeanWorkerDeclarationInspectionRequest,
375        reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationInspectionResult>>>,
376    },
377    DeclarationSearch {
378        meta: JobMeta,
379        request: LeanWorkerDeclarationSearch,
380        reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationSearchResult>>>,
381    },
382    ProofAttempt {
383        meta: JobMeta,
384        request: LeanWorkerProofAttemptRequest,
385        options: LeanWorkerElabOptions,
386        reply: oneshot::Sender<Result<ProjectCall<LeanWorkerProofAttemptResult>>>,
387    },
388    DeclarationVerification {
389        meta: JobMeta,
390        request: LeanWorkerDeclarationVerificationRequest,
391        options: LeanWorkerElabOptions,
392        reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationVerificationResult>>>,
393    },
394    DeclarationVerificationBatch {
395        meta: JobMeta,
396        request: LeanWorkerDeclarationVerificationBatchRequest,
397        options: LeanWorkerElabOptions,
398        reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationVerificationBatchResult>>>,
399    },
400    SemanticProofSearch {
401        meta: JobMeta,
402        request: SemanticProofSearchRequest,
403        reply: oneshot::Sender<Result<ProjectCall<SemanticProofSearchResult>>>,
404    },
405}
406
407impl ProjectMessage {
408    fn imports(&self) -> &[String] {
409        match self {
410            Self::ModuleQuery { meta, .. }
411            | Self::ModuleQueryBatch { meta, .. }
412            | Self::DeclarationInspection { meta, .. }
413            | Self::DeclarationSearch { meta, .. }
414            | Self::ProofAttempt { meta, .. }
415            | Self::DeclarationVerification { meta, .. }
416            | Self::DeclarationVerificationBatch { meta, .. }
417            | Self::SemanticProofSearch { meta, .. } => &meta.imports,
418        }
419    }
420
421    fn reject(self, state: &ProjectActorState, reason: &'static str) {
422        match self {
423            Self::ModuleQuery { meta, reply, .. } => {
424                let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
425            }
426            Self::ModuleQueryBatch { meta, reply, .. } => {
427                let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
428            }
429            Self::DeclarationInspection { meta, reply, .. } => {
430                let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
431            }
432            Self::DeclarationSearch { meta, reply, .. } => {
433                let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
434            }
435            Self::ProofAttempt { meta, reply, .. } => {
436                let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
437            }
438            Self::DeclarationVerification { meta, reply, .. } => {
439                let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
440            }
441            Self::DeclarationVerificationBatch { meta, reply, .. } => {
442                let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
443            }
444            Self::SemanticProofSearch { meta, reply, .. } => {
445                let _ = reply.send(Err(state.shutdown_unavailable(&meta, reason)));
446            }
447        }
448    }
449}
450
451#[derive(Clone, Copy, Debug, Eq, PartialEq)]
452enum RestartCause {
453    #[expect(dead_code, reason = "stable wire cause reserved for future non-RSS profile cycling")]
454    ImportProfileSwitch,
455    RssImportSwitch,
456    RssPostJob,
457    RssHardLimit,
458    MaxRequests,
459    MaxImports,
460    Idle,
461    Timeout,
462    Cancelled,
463    ChildExit,
464    ChildAbort,
465    SessionMissing,
466    Explicit,
467    WorkerInternal,
468}
469
470impl RestartCause {
471    const fn as_str(self) -> &'static str {
472        match self {
473            Self::ImportProfileSwitch => "import_profile_switch",
474            Self::RssImportSwitch => "rss_import_switch",
475            Self::RssPostJob => "rss_post_job",
476            Self::RssHardLimit => "rss_hard_limit_exceeded",
477            Self::MaxRequests => "max_requests",
478            Self::MaxImports => "max_imports",
479            Self::Idle => "idle",
480            Self::Timeout => "timeout",
481            Self::Cancelled => "cancelled",
482            Self::ChildExit => "child_exit",
483            Self::ChildAbort => "child_abort",
484            Self::SessionMissing => "session_missing",
485            Self::Explicit => "explicit",
486            Self::WorkerInternal => "worker_internal",
487        }
488    }
489
490    const fn counts_toward_restart_limit(self) -> bool {
491        matches!(
492            self,
493            Self::Timeout
494                | Self::Cancelled
495                | Self::ChildExit
496                | Self::ChildAbort
497                | Self::SessionMissing
498                | Self::RssHardLimit
499        )
500    }
501
502    const fn is_planned(self) -> bool {
503        !self.counts_toward_restart_limit()
504    }
505}
506
507fn restart_event(
508    cause: RestartCause,
509    reason: impl Into<String>,
510    worker_generation: u64,
511    rss_kib: Option<u64>,
512    limit_kib: Option<u64>,
513) -> RuntimeRestartEvent {
514    RuntimeRestartEvent {
515        cause: cause.as_str().to_owned(),
516        reason: reason.into(),
517        worker_generation,
518        planned: cause.is_planned(),
519        rss_kib,
520        limit_kib,
521    }
522}
523
524/// Per-project recycle tally over the worker's lifetime, all causes.
525///
526/// Recorded once per event at [`ProjectActorState::record_restart`] and copied
527/// into [`RuntimeSnapshot`] on publish, so the no-call and error paths report
528/// the same totals a live call would. This answers "how *often*, and why?"; the
529/// single most-recent event stays in `last_restart`.
530#[derive(Debug, Clone, Default)]
531struct RestartStats {
532    total: u64,
533    by_cause: BTreeMap<String, u64>,
534}
535
536impl RestartStats {
537    fn observe(&mut self, cause: &str) {
538        self.total = self.total.saturating_add(1);
539        let count = self.by_cause.entry(cause.to_owned()).or_default();
540        *count = count.saturating_add(1);
541    }
542}
543
544/// Emit one structured log line for a recycle. Level tracks the *signal*, not
545/// `planned`: crash/abnormal causes `warn`, memory-pressure cycles `info` (the
546/// frequency an operator tuning the RSS budget watches), pure hygiene `debug`.
547fn log_restart(event: &RuntimeRestartEvent, restarts_total: u64) {
548    macro_rules! emit {
549        ($level:ident, $msg:literal) => {
550            tracing::$level!(
551                cause = %event.cause,
552                reason = %event.reason,
553                worker_generation = event.worker_generation,
554                rss_kib = ?event.rss_kib,
555                limit_kib = ?event.limit_kib,
556                planned = event.planned,
557                restarts_total,
558                $msg
559            )
560        };
561    }
562    match event.cause.as_str() {
563        "rss_hard_limit_exceeded"
564        | "child_abort"
565        | "child_exit"
566        | "session_missing"
567        | "worker_internal"
568        | "timeout"
569        | "cancelled" => emit!(warn, "worker recycled (abnormal)"),
570        "rss_post_job" | "rss_import_switch" => emit!(info, "worker recycled (memory pressure)"),
571        _ => emit!(debug, "worker recycled (hygiene)"),
572    }
573}
574
575fn restart_cause_from_worker(reason: &LeanWorkerRestartReason) -> RestartCause {
576    match reason.stable_cause() {
577        "explicit" => RestartCause::Explicit,
578        "max_requests" => RestartCause::MaxRequests,
579        "max_imports" => RestartCause::MaxImports,
580        "rss_ceiling" => RestartCause::RssPostJob,
581        "rss_hard_limit" => RestartCause::RssHardLimit,
582        "idle" => RestartCause::Idle,
583        "cancelled" => RestartCause::Cancelled,
584        "timeout" => RestartCause::Timeout,
585        _ => RestartCause::WorkerInternal,
586    }
587}
588
589#[derive(Debug, Clone)]
590struct RuntimeSnapshot {
591    worker_generation: u64,
592    last_restart: Option<RuntimeRestartEvent>,
593    rss_kib: Option<u64>,
594    import_profile: Option<String>,
595    profile_switch_count: u64,
596    restarts_total: u64,
597    restarts_by_cause: BTreeMap<String, u64>,
598}
599
600impl RuntimeSnapshot {
601    fn facts(&self) -> RuntimeFacts {
602        RuntimeFacts {
603            worker_generation: self.worker_generation,
604            worker_restarted: false,
605            retry_count: 0,
606            admission_wait_millis: 0,
607            queue_wait_millis: 0,
608            call_restart: None,
609            last_restart: self.last_restart.clone(),
610            rss_kib: self.rss_kib,
611            worker_lanes: 1,
612            import_profile: self.import_profile.clone(),
613            profile_switch_count: self.profile_switch_count,
614            restarts_total: self.restarts_total,
615            restarts_by_cause: self.restarts_by_cause.clone(),
616        }
617    }
618}
619
620/// One Lake project, one serialized worker controller, one in-memory cache.
621/// Cheap to clone via `Arc`.
622pub(crate) struct LeanProject {
623    canonical_root: PathBuf,
624    toolchain: String,
625    package: Option<String>,
626    library: Option<String>,
627    manifest_hash: String,
628    session_id: String,
629    /// Toolchain-provenance advisories captured at open (unknown pin, missing
630    /// sidecar). Surfaced into every response's envelope warnings via
631    /// [`Self::freshness`]; empty for a fully-vouched-for worker.
632    open_warnings: Vec<String>,
633    actor_tx: Mutex<Option<mpsc::Sender<ProjectMessage>>>,
634    active_jobs: Arc<AtomicUsize>,
635    healthy: Arc<AtomicBool>,
636    runtime: Arc<Mutex<RuntimeSnapshot>>,
637    module_queries: ModuleQueryCache,
638}
639
640impl std::fmt::Debug for LeanProject {
641    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
642        f.debug_struct("LeanProject")
643            .field("canonical_root", &self.canonical_root)
644            .field("toolchain", &self.toolchain)
645            .field("package", &self.package)
646            .field("library", &self.library)
647            .field("manifest_hash", &self.manifest_hash)
648            .finish_non_exhaustive()
649    }
650}
651
652impl LeanProject {
653    pub(crate) fn open(meta: LakeProjectMeta, runtime_config: ProjectRuntimeConfig) -> Result<Arc<Self>> {
654        let session_id = uuid::Uuid::new_v4().to_string();
655        let runtime = Arc::new(Mutex::new(RuntimeSnapshot {
656            worker_generation: 1,
657            last_restart: None,
658            rss_kib: None,
659            import_profile: None,
660            profile_switch_count: 0,
661            restarts_total: 0,
662            restarts_by_cause: BTreeMap::new(),
663        }));
664        let active_jobs = Arc::new(AtomicUsize::new(0));
665        let healthy = Arc::new(AtomicBool::new(true));
666        let (config, open_warnings) = ActorConfig::from_meta(
667            &meta,
668            session_id.clone(),
669            Arc::clone(&runtime),
670            Arc::clone(&healthy),
671            runtime_config,
672        )?;
673        type InitMsg = std::result::Result<(String, mpsc::Sender<ProjectMessage>), ServerError>;
674        let (init_tx, init_rx) = std::sync::mpsc::channel::<InitMsg>();
675        let thread_name = actor_thread_name(&meta.canonical_root);
676
677        thread::Builder::new()
678            .name(thread_name)
679            .spawn(move || {
680                actor_main(config, init_tx);
681            })
682            .map_err(|e| ServerError::Internal(format!("spawn project actor thread: {e}")))?;
683
684        let (runtime_toolchain, actor_tx) = init_rx
685            .recv()
686            .map_err(|_| ServerError::Internal("project actor thread died during init".into()))??;
687
688        let cache_cap = NonZeroUsize::new(MODULE_QUERY_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
689        Ok(Arc::new(Self {
690            canonical_root: meta.canonical_root,
691            toolchain: runtime_toolchain,
692            package: meta.package,
693            library: meta.library,
694            manifest_hash: meta.manifest_hash,
695            session_id,
696            open_warnings,
697            actor_tx: Mutex::new(Some(actor_tx)),
698            active_jobs,
699            healthy,
700            runtime,
701            module_queries: ModuleQueryCache::with_capacity(cache_cap),
702        }))
703    }
704
705    /// Process one module query through this project's serialized worker actor.
706    ///
707    /// # Errors
708    ///
709    /// Returns `ServerError` when admission, mailbox enqueue, actor reply, or
710    /// worker execution fails.
711    pub(crate) async fn process_module_query(
712        &self,
713        imports: Vec<String>,
714        semantic_permit: SemanticPermit,
715        admission_wait_millis: u64,
716        source: String,
717        query: LeanWorkerModuleQuery,
718        options: LeanWorkerElabOptions,
719    ) -> Result<ProjectCall<LeanWorkerModuleQueryOutcome>> {
720        let (reply, rx) = oneshot::channel();
721        let message = ProjectMessage::ModuleQuery {
722            meta: self.job_meta(
723                imports,
724                RetryPolicy::RetryOnceReadOnly,
725                semantic_permit,
726                admission_wait_millis,
727            ),
728            source,
729            query,
730            options,
731            reply,
732        };
733        self.enqueue(message, rx).await
734    }
735
736    /// Process one module-query batch through this project's serialized worker actor.
737    ///
738    /// # Errors
739    ///
740    /// Returns `ServerError` when admission, mailbox enqueue, actor reply, or
741    /// worker execution fails.
742    pub(crate) async fn process_module_query_batch(
743        &self,
744        imports: Vec<String>,
745        semantic_permit: SemanticPermit,
746        admission_wait_millis: u64,
747        source: String,
748        selectors: Vec<LeanWorkerModuleQuerySelector>,
749        budgets: LeanWorkerOutputBudgets,
750        options: LeanWorkerElabOptions,
751    ) -> Result<ProjectCall<LeanWorkerModuleQueryBatchOutcome>> {
752        let (reply, rx) = oneshot::channel();
753        let message = ProjectMessage::ModuleQueryBatch {
754            meta: self.job_meta(
755                imports,
756                RetryPolicy::RetryOnceReadOnly,
757                semantic_permit,
758                admission_wait_millis,
759            ),
760            source,
761            selectors,
762            budgets,
763            options,
764            reply,
765        };
766        self.enqueue(message, rx).await
767    }
768
769    /// Inspect one declaration through this project's serialized worker actor.
770    ///
771    /// # Errors
772    ///
773    /// Returns `ServerError` when admission, mailbox enqueue, actor reply, or
774    /// worker execution fails.
775    pub(crate) async fn inspect_declaration(
776        &self,
777        imports: Vec<String>,
778        semantic_permit: SemanticPermit,
779        admission_wait_millis: u64,
780        request: LeanWorkerDeclarationInspectionRequest,
781    ) -> Result<ProjectCall<LeanWorkerDeclarationInspectionResult>> {
782        let (reply, rx) = oneshot::channel();
783        let message = ProjectMessage::DeclarationInspection {
784            meta: self.job_meta(
785                imports,
786                RetryPolicy::RetryOnceReadOnly,
787                semantic_permit,
788                admission_wait_millis,
789            ),
790            request,
791            reply,
792        };
793        self.enqueue(message, rx).await
794    }
795
796    /// Run bounded declaration search through this project's serialized worker actor.
797    ///
798    /// # Errors
799    ///
800    /// Returns `ServerError` when admission, mailbox enqueue, actor reply, or
801    /// worker execution fails.
802    pub(crate) async fn search_declarations(
803        &self,
804        imports: Vec<String>,
805        semantic_permit: SemanticPermit,
806        admission_wait_millis: u64,
807        request: LeanWorkerDeclarationSearch,
808    ) -> Result<ProjectCall<LeanWorkerDeclarationSearchResult>> {
809        let (reply, rx) = oneshot::channel();
810        let message = ProjectMessage::DeclarationSearch {
811            meta: self.job_meta(
812                imports,
813                RetryPolicy::RetryOnceReadOnly,
814                semantic_permit,
815                admission_wait_millis,
816            ),
817            request,
818            reply,
819        };
820        self.enqueue(message, rx).await
821    }
822
823    /// Run source-backed semantic proof search through this project's actor.
824    ///
825    /// # Errors
826    ///
827    /// Returns `ServerError` when semantic capability setup, mailbox enqueue,
828    /// actor reply, or worker execution fails.
829    pub(crate) async fn semantic_proof_search(
830        &self,
831        imports: Vec<String>,
832        semantic_permit: SemanticPermit,
833        admission_wait_millis: u64,
834        request: SemanticProofSearchRequest,
835    ) -> Result<ProjectCall<SemanticProofSearchResult>> {
836        let (reply, rx) = oneshot::channel();
837        let message = ProjectMessage::SemanticProofSearch {
838            meta: self.job_meta(
839                imports,
840                RetryPolicy::RetryOnceReadOnly,
841                semantic_permit,
842                admission_wait_millis,
843            ),
844            request,
845            reply,
846        };
847        self.enqueue(message, rx).await
848    }
849
850    /// Try proof fragments in-memory through this project's serialized worker actor.
851    ///
852    /// # Errors
853    ///
854    /// Returns `ServerError` when admission, mailbox enqueue, actor reply, or
855    /// worker execution fails.
856    pub(crate) async fn attempt_proof(
857        &self,
858        imports: Vec<String>,
859        semantic_permit: SemanticPermit,
860        admission_wait_millis: u64,
861        request: LeanWorkerProofAttemptRequest,
862        options: LeanWorkerElabOptions,
863    ) -> Result<ProjectCall<LeanWorkerProofAttemptResult>> {
864        let (reply, rx) = oneshot::channel();
865        let message = ProjectMessage::ProofAttempt {
866            meta: self.job_meta(
867                imports,
868                RetryPolicy::RetryOnceReadOnly,
869                semantic_permit,
870                admission_wait_millis,
871            ),
872            request,
873            options,
874            reply,
875        };
876        self.enqueue(message, rx).await
877    }
878
879    /// Verify one declaration in-memory through this project's serialized worker actor.
880    ///
881    /// # Errors
882    ///
883    /// Returns `ServerError` when admission, mailbox enqueue, actor reply, or
884    /// worker execution fails.
885    pub(crate) async fn verify_declaration(
886        &self,
887        imports: Vec<String>,
888        semantic_permit: SemanticPermit,
889        admission_wait_millis: u64,
890        request: LeanWorkerDeclarationVerificationRequest,
891        options: LeanWorkerElabOptions,
892    ) -> Result<ProjectCall<LeanWorkerDeclarationVerificationResult>> {
893        let (reply, rx) = oneshot::channel();
894        let message = ProjectMessage::DeclarationVerification {
895            meta: self.job_meta(
896                imports,
897                RetryPolicy::RetryOnceReadOnly,
898                semantic_permit,
899                admission_wait_millis,
900            ),
901            request,
902            options,
903            reply,
904        };
905        self.enqueue(message, rx).await
906    }
907
908    /// Verify several declarations in one in-memory source snapshot through
909    /// this project's serialized worker actor.
910    ///
911    /// # Errors
912    ///
913    /// Returns `ServerError` when admission, mailbox enqueue, actor reply, or
914    /// worker execution fails.
915    pub(crate) async fn verify_declaration_batch(
916        &self,
917        imports: Vec<String>,
918        semantic_permit: SemanticPermit,
919        admission_wait_millis: u64,
920        request: LeanWorkerDeclarationVerificationBatchRequest,
921        options: LeanWorkerElabOptions,
922    ) -> Result<ProjectCall<LeanWorkerDeclarationVerificationBatchResult>> {
923        let (reply, rx) = oneshot::channel();
924        let message = ProjectMessage::DeclarationVerificationBatch {
925            meta: self.job_meta(
926                imports,
927                RetryPolicy::RetryOnceReadOnly,
928                semantic_permit,
929                admission_wait_millis,
930            ),
931            request,
932            options,
933            reply,
934        };
935        self.enqueue(message, rx).await
936    }
937
938    fn job_meta(
939        &self,
940        imports: Vec<String>,
941        retry_policy: RetryPolicy,
942        semantic_permit: SemanticPermit,
943        admission_wait_millis: u64,
944    ) -> JobMeta {
945        let created_at = Instant::now();
946        self.active_jobs.fetch_add(1, Ordering::AcqRel);
947        JobMeta {
948            import_fingerprint: import_fingerprint(&imports),
949            imports,
950            _created_at: created_at,
951            queued_at: Instant::now(),
952            admission_wait_millis,
953            _correlation_id: uuid::Uuid::new_v4(),
954            retry_policy,
955            _active_job: ActiveJobGuard {
956                active_jobs: Arc::clone(&self.active_jobs),
957            },
958            _semantic_permit: semantic_permit,
959        }
960    }
961
962    async fn enqueue<T>(
963        &self,
964        message: ProjectMessage,
965        reply_rx: oneshot::Receiver<Result<ProjectCall<T>>>,
966    ) -> Result<ProjectCall<T>>
967    where
968        T: Send + 'static,
969    {
970        let project_info = self.worker_error_context(message.imports());
971        let tx = self
972            .actor_tx
973            .lock()
974            .as_ref()
975            .cloned()
976            .ok_or_else(|| self.unavailable("project actor is stopped", false, false))?;
977        match tx.try_send(message) {
978            Ok(()) => {}
979            Err(mpsc::error::TrySendError::Full(_)) => {
980                return Err(ServerError::worker_unavailable(WorkerUnavailable {
981                    retryable: true,
982                    worker_restarted: false,
983                    reason: "mailbox_full".to_owned(),
984                    ..project_info
985                }));
986            }
987            Err(mpsc::error::TrySendError::Closed(_)) => {
988                self.shutdown();
989                return Err(ServerError::worker_unavailable(WorkerUnavailable {
990                    retryable: true,
991                    worker_restarted: false,
992                    reason: "mailbox_closed".to_owned(),
993                    ..project_info
994                }));
995            }
996        }
997
998        match reply_rx.await {
999            Ok(result) => result,
1000            Err(_) => {
1001                self.shutdown();
1002                Err(self.unavailable("mailbox_closed_before_reply", true, false))
1003            }
1004        }
1005    }
1006
1007    pub(crate) fn manifest_hash(&self) -> &str {
1008        &self.manifest_hash
1009    }
1010
1011    pub(crate) fn toolchain(&self) -> &str {
1012        &self.toolchain
1013    }
1014
1015    pub(crate) fn canonical_root(&self) -> &Path {
1016        &self.canonical_root
1017    }
1018
1019    pub(crate) fn module_query_cache(&self) -> &ModuleQueryCache {
1020        &self.module_queries
1021    }
1022
1023    #[must_use]
1024    pub(crate) fn freshness(&self, request_imports: &[String]) -> Freshness {
1025        Freshness {
1026            project_root: self.canonical_root.to_string_lossy().into_owned(),
1027            project_hash: self.manifest_hash.clone(),
1028            imports: request_imports.to_vec(),
1029            session_id: self.session_id.clone(),
1030            lean_toolchain: self.toolchain.clone(),
1031            toolchain_advisories: self.open_warnings.clone(),
1032        }
1033    }
1034
1035    #[must_use]
1036    pub(crate) fn runtime_facts(&self) -> RuntimeFacts {
1037        self.runtime.lock().facts()
1038    }
1039
1040    pub(crate) fn shutdown(&self) {
1041        self.healthy.store(false, Ordering::Release);
1042        let _ = self.actor_tx.lock().take();
1043    }
1044
1045    pub(crate) fn is_healthy(&self) -> bool {
1046        self.healthy.load(Ordering::Acquire) && self.actor_tx.lock().as_ref().is_some_and(|tx| !tx.is_closed())
1047    }
1048
1049    pub(crate) fn is_idle(&self) -> bool {
1050        self.active_jobs.load(Ordering::Acquire) == 0
1051    }
1052
1053    fn unavailable(&self, reason: impl Into<String>, retryable: bool, worker_restarted: bool) -> ServerError {
1054        ServerError::worker_unavailable(WorkerUnavailable {
1055            retryable,
1056            worker_restarted,
1057            reason: reason.into(),
1058            ..self.worker_error_context(&[])
1059        })
1060    }
1061
1062    fn worker_error_context(&self, imports: &[String]) -> WorkerUnavailable {
1063        let snapshot = self.runtime.lock().clone();
1064        let runtime = snapshot.facts();
1065        WorkerUnavailable {
1066            retryable: true,
1067            worker_restarted: false,
1068            project_root: self.canonical_root.to_string_lossy().into_owned(),
1069            project_hash: self.manifest_hash.clone(),
1070            imports: imports.to_vec(),
1071            session_id: self.session_id.clone(),
1072            lean_toolchain: self.toolchain.clone(),
1073            worker_generation: snapshot.worker_generation,
1074            reason: String::new(),
1075            restart_cause: snapshot.last_restart.as_ref().map(|event| event.cause.clone()),
1076            rss_kib: snapshot.rss_kib,
1077            limit_kib: None,
1078            retry_after_millis: None,
1079            restarts_in_window: None,
1080            window_millis: None,
1081            runtime,
1082            toolchain_advisories: self.open_warnings.clone(),
1083        }
1084    }
1085}
1086
1087impl Drop for LeanProject {
1088    fn drop(&mut self) {
1089        self.shutdown();
1090    }
1091}
1092
1093#[derive(Clone)]
1094struct ActorConfig {
1095    lake_root: PathBuf,
1096    manifest_hash: String,
1097    toolchain_label: String,
1098    worker_path: PathBuf,
1099    lean_sysroot: PathBuf,
1100    session_id: String,
1101    runtime: Arc<Mutex<RuntimeSnapshot>>,
1102    healthy: Arc<AtomicBool>,
1103    worker_rss_post_job_restart_kib: u64,
1104    worker_rss_hard_kill_kib: u64,
1105    worker_rss_sample_millis: u64,
1106    import_switch_rss_soft_kib: u64,
1107    module_cache_rss_guard_kib: u64,
1108    module_cache_max_bytes: u64,
1109    request_timeout_millis: u64,
1110    mailbox_capacity: usize,
1111    max_restarts_per_window: usize,
1112    restart_window: Duration,
1113    /// Open-time toolchain advisories (unknown pin, missing sidecar, no smoke
1114    /// record). The actor carries them so a `runtime_unavailable` it produces
1115    /// after worker death still flags a suspect worker. Mirrors
1116    /// [`LeanProject::open_warnings`]; both come from the one
1117    /// [`WorkerBinary::resolve_ready_for`] verdict at open.
1118    toolchain_advisories: Vec<String>,
1119}
1120
1121impl ActorConfig {
1122    /// Resolve the pinned toolchain into a spawnable config plus any
1123    /// open-time provenance advisories. All version-drift situations collapse
1124    /// into the one [`WorkerBinary::resolve_ready_for`] verdict: hard failures
1125    /// become a typed [`ServerError::BadProject`] carrying the corrective
1126    /// command; soft ones (unknown pin, missing sidecar) ride along as
1127    /// warnings the project surfaces in every envelope.
1128    fn from_meta(
1129        meta: &LakeProjectMeta,
1130        session_id: String,
1131        runtime: Arc<Mutex<RuntimeSnapshot>>,
1132        healthy: Arc<AtomicBool>,
1133        runtime_config: ProjectRuntimeConfig,
1134    ) -> Result<(Self, Vec<String>)> {
1135        let toolchain_id = ToolchainId::parse(&meta.toolchain).map_err(|e| ServerError::BadProject(e.to_string()))?;
1136        let (worker_path, lean_sysroot, open_warnings) = match WorkerBinary::resolve_ready_for(&toolchain_id) {
1137            Readiness::Ready {
1138                worker,
1139                lean_sysroot,
1140                note,
1141            } => (worker.path, lean_sysroot, note.into_iter().collect()),
1142            Readiness::UnknownPin {
1143                pin,
1144                worker,
1145                lean_sysroot,
1146            } => (
1147                worker.path,
1148                lean_sysroot,
1149                vec![format!(
1150                    "lean-toolchain pins {pin}, which is not a recognized lean-rs supported version \
1151                     (e.g. a nightly); proceeding, but the host cannot vouch for ABI compatibility"
1152                )],
1153            ),
1154            Readiness::Unsupported { window, nearest } => {
1155                return Err(ServerError::BadProject(format!(
1156                    "lean-toolchain pins {toolchain_id}, outside the lean-rs supported window {window}; \
1157                     nearest supported: {nearest}. Pin a supported toolchain (or bump lean-rs) and reopen."
1158                )));
1159            }
1160            Readiness::Stale { toolchain, install_cmd } => {
1161                return Err(ServerError::BadProject(format!(
1162                    "worker for {toolchain} was built against a different lean.h than the toolchain now \
1163                     provides (header drift); rebuild it: {install_cmd}"
1164                )));
1165            }
1166            Readiness::Unusable {
1167                toolchain,
1168                detail,
1169                install_cmd,
1170            } => {
1171                return Err(ServerError::BadProject(format!(
1172                    "worker for {toolchain} failed its runtime smoke test ({detail}); the toolchain's \
1173                     libleanshared is ABI-incompatible with this lean-rs build and cannot be served. \
1174                     Pin a supported toolchain the host can run, or rebuild lean-rs and reinstall: {install_cmd}"
1175                )));
1176            }
1177            Readiness::NotInstalled { toolchain, install_cmd } => {
1178                return Err(ServerError::BadProject(format!(
1179                    "no worker binary for toolchain {toolchain}; run: {install_cmd}"
1180                )));
1181            }
1182            Readiness::ToolchainNotInstalled { toolchain, elan_dir } => {
1183                return Err(ServerError::BadProject(format!(
1184                    "elan toolchain {toolchain} is not installed (expected {})",
1185                    elan_dir.display()
1186                )));
1187            }
1188        };
1189        tracing::debug!(
1190            toolchain = %toolchain_id,
1191            worker = %worker_path.display(),
1192            sysroot = %lean_sysroot.display(),
1193            "resolved ready worker binary"
1194        );
1195        let config = Self {
1196            lake_root: meta.canonical_root.clone(),
1197            manifest_hash: meta.manifest_hash.clone(),
1198            toolchain_label: meta.toolchain.clone(),
1199            worker_path,
1200            lean_sysroot,
1201            session_id,
1202            runtime,
1203            healthy,
1204            worker_rss_post_job_restart_kib: runtime_config.worker_rss_post_job_restart_kib(),
1205            worker_rss_hard_kill_kib: runtime_config.worker_rss_hard_kill_kib(),
1206            worker_rss_sample_millis: runtime_config.worker_rss_sample_millis(),
1207            import_switch_rss_soft_kib: runtime_config.import_switch_rss_soft_kib(),
1208            module_cache_rss_guard_kib: runtime_config.module_cache_rss_guard_kib(),
1209            module_cache_max_bytes: runtime_config.module_cache_max_bytes(),
1210            request_timeout_millis: runtime_config.request_timeout_millis(),
1211            mailbox_capacity: runtime_config.mailbox_capacity(),
1212            max_restarts_per_window: runtime_config.max_restarts_per_window(),
1213            restart_window: runtime_config.restart_window(),
1214            toolchain_advisories: open_warnings.clone(),
1215        };
1216        Ok((config, open_warnings))
1217    }
1218}
1219
1220struct ProjectActorState {
1221    config: ActorConfig,
1222    handle: LeanWorkerHostHandle,
1223    worker_generation_base: u64,
1224    last_restart: Option<RuntimeRestartEvent>,
1225    last_import_fingerprint: Option<String>,
1226    profile_switch_count: u64,
1227    last_rss_kib: Option<u64>,
1228    runtime: Arc<Mutex<RuntimeSnapshot>>,
1229    abnormal_restart_times: VecDeque<Instant>,
1230    restart_stats: RestartStats,
1231}
1232
1233impl ProjectActorState {
1234    fn handle_message(&mut self, message: ProjectMessage) {
1235        match message {
1236            ProjectMessage::ModuleQuery {
1237                meta,
1238                source,
1239                query,
1240                options,
1241                reply,
1242            } => {
1243                let result = self.run_job(meta, |handle, imports| {
1244                    handle.process_module_query_with_imports(imports, &source, &query, &options, None, None)
1245                });
1246                let _ = reply.send(result);
1247            }
1248            ProjectMessage::ModuleQueryBatch {
1249                meta,
1250                source,
1251                selectors,
1252                budgets,
1253                options,
1254                reply,
1255            } => {
1256                let result = self.run_job(meta, |handle, imports| {
1257                    handle.process_module_query_batch_with_imports(
1258                        imports, &source, &selectors, &budgets, &options, None, None,
1259                    )
1260                });
1261                let _ = reply.send(result);
1262            }
1263            ProjectMessage::DeclarationInspection { meta, request, reply } => {
1264                let result = self.run_job(meta, |handle, imports| {
1265                    handle.inspect_declaration_with_imports(imports, &request, None, None)
1266                });
1267                let _ = reply.send(result);
1268            }
1269            ProjectMessage::DeclarationSearch { meta, request, reply } => {
1270                let result = self.run_job(meta, |handle, imports| {
1271                    handle.search_declarations_with_imports(imports, &request, None, None)
1272                });
1273                let _ = reply.send(result);
1274            }
1275            ProjectMessage::ProofAttempt {
1276                meta,
1277                request,
1278                options,
1279                reply,
1280            } => {
1281                let result = self.run_job(meta, |handle, imports| {
1282                    handle.attempt_proof_with_imports(imports, &request, &options, None, None)
1283                });
1284                let _ = reply.send(result);
1285            }
1286            ProjectMessage::DeclarationVerification {
1287                meta,
1288                request,
1289                options,
1290                reply,
1291            } => {
1292                let result = self.run_job(meta, |handle, imports| {
1293                    handle.verify_declaration_with_imports(imports, &request, &options, None, None)
1294                });
1295                let _ = reply.send(result);
1296            }
1297            ProjectMessage::DeclarationVerificationBatch {
1298                meta,
1299                request,
1300                options,
1301                reply,
1302            } => {
1303                let result = self.run_job(meta, |handle, imports| {
1304                    handle.verify_declaration_batch_with_imports(imports, &request, &options, None, None)
1305                });
1306                let _ = reply.send(result);
1307            }
1308            ProjectMessage::SemanticProofSearch { meta, request, reply } => {
1309                let result = self.run_semantic_job(meta, &request);
1310                let _ = reply.send(result);
1311            }
1312        }
1313    }
1314
1315    fn run_job<R>(
1316        &mut self,
1317        meta: JobMeta,
1318        job: impl Fn(&mut LeanWorkerHostHandle, Vec<String>) -> std::result::Result<R, LeanWorkerError>,
1319    ) -> Result<ProjectCall<R>> {
1320        // Runs on the project's dedicated actor thread (no async), so an entered
1321        // span is correct and ties every nested worker/recycle log to this call.
1322        let _span = tracing::debug_span!(
1323            "job",
1324            session_id = %self.config.session_id,
1325            imports = meta.imports.len(),
1326            queue_wait_millis = millis_u64(meta.queued_at.elapsed()),
1327        )
1328        .entered();
1329        let queue_wait_millis = millis_u64(meta.queued_at.elapsed());
1330        let generation_before = self.observed_generation();
1331        let mut call_restart = self.cycle_before_import_switch_if_needed(&meta)?;
1332        let mut lifecycle_baseline = self.handle.lifecycle_snapshot();
1333
1334        let max_retries = meta.retry_policy.retries();
1335        let mut retry_count = 0_u32;
1336        loop {
1337            match job(&mut self.handle, meta.imports.clone()) {
1338                Ok(value) => {
1339                    if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1340                        call_restart = Some(event);
1341                    }
1342                    self.last_import_fingerprint = Some(meta.import_fingerprint.clone());
1343                    self.last_rss_kib = self.handle.rss_kib().or(self.last_rss_kib);
1344                    if let Some(event) = self.cycle_after_post_job_rss_if_needed(&meta)? {
1345                        call_restart = Some(event);
1346                    }
1347                    let runtime =
1348                        self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1349                    tracing::debug!(
1350                        retry_count,
1351                        rss_kib = ?runtime.rss_kib,
1352                        worker_generation = runtime.worker_generation,
1353                        "job complete"
1354                    );
1355                    self.publish_runtime(&runtime);
1356                    return Ok(ProjectCall::new(value, runtime));
1357                }
1358                Err(err) if worker_error_is_recoverable_death(&err) && retry_count < max_retries => {
1359                    self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)?;
1360                    let first_reason = err.to_string();
1361                    call_restart =
1362                        Some(self.rebuild_after_worker_death(first_reason, worker_death_cause(&err), &meta)?);
1363                    lifecycle_baseline = self.handle.lifecycle_snapshot();
1364                    retry_count = retry_count.saturating_add(1);
1365                }
1366                Err(err) if worker_error_is_recoverable_death(&err) => {
1367                    if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1368                        call_restart = Some(event);
1369                    }
1370                    let reason = format!("worker_died_after_retry: {err}");
1371                    let generation = self.observed_generation();
1372                    let runtime =
1373                        self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1374                    self.publish_runtime(&runtime);
1375                    return Err(self.worker_unavailable_for(
1376                        &meta,
1377                        reason,
1378                        true,
1379                        generation > generation_before,
1380                        Some(worker_death_cause(&err)),
1381                        None,
1382                        None,
1383                    ));
1384                }
1385                Err(err) if worker_error_is_session_missing(&err) && retry_count < max_retries => {
1386                    self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)?;
1387                    call_restart = Some(self.rebuild_after_worker_death(
1388                        format!("session_missing: {err}"),
1389                        RestartCause::SessionMissing,
1390                        &meta,
1391                    )?);
1392                    lifecycle_baseline = self.handle.lifecycle_snapshot();
1393                    retry_count = retry_count.saturating_add(1);
1394                }
1395                Err(err) if worker_error_is_session_missing(&err) => {
1396                    if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1397                        call_restart = Some(event);
1398                    }
1399                    let generation = self.observed_generation();
1400                    let runtime =
1401                        self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1402                    self.publish_runtime(&runtime);
1403                    return Err(self.worker_unavailable_for(
1404                        &meta,
1405                        format!("session_missing: {err}"),
1406                        true,
1407                        generation > generation_before,
1408                        Some(RestartCause::SessionMissing),
1409                        None,
1410                        None,
1411                    ));
1412                }
1413                Err(LeanWorkerError::RssHardLimitExceeded {
1414                    operation,
1415                    current_kib,
1416                    limit_kib,
1417                    ..
1418                }) => {
1419                    if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1420                        call_restart = Some(event);
1421                    }
1422                    let runtime =
1423                        self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1424                    self.publish_runtime(&runtime);
1425                    return Err(self.worker_unavailable_for(
1426                        &meta,
1427                        format!(
1428                            "rss_hard_limit_exceeded operation={operation} current_kib={current_kib} limit_kib={limit_kib}"
1429                        ),
1430                        false,
1431                        true,
1432                        Some(RestartCause::RssHardLimit),
1433                        Some(limit_kib),
1434                        None,
1435                    ));
1436                }
1437                Err(err) if matches!(err, LeanWorkerError::Timeout { .. }) => {
1438                    if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1439                        call_restart = Some(event);
1440                    }
1441                    let generation = self.observed_generation();
1442                    let runtime =
1443                        self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1444                    self.publish_runtime(&runtime);
1445                    return Err(self.worker_unavailable_for(
1446                        &meta,
1447                        format!("timeout: {err}"),
1448                        true,
1449                        generation > generation_before,
1450                        Some(RestartCause::Timeout),
1451                        None,
1452                        None,
1453                    ));
1454                }
1455                Err(err) => {
1456                    self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)?;
1457                    self.last_import_fingerprint = Some(meta.import_fingerprint.clone());
1458                    return Err(map_worker_err(err));
1459                }
1460            }
1461        }
1462    }
1463
1464    fn run_semantic_job(
1465        &self,
1466        meta: JobMeta,
1467        request: &SemanticProofSearchRequest,
1468    ) -> Result<ProjectCall<SemanticProofSearchResult>> {
1469        let _span = tracing::debug_span!(
1470            "semantic_job",
1471            session_id = %self.config.session_id,
1472            imports = meta.imports.len(),
1473            queue_wait_millis = millis_u64(meta.queued_at.elapsed()),
1474        )
1475        .entered();
1476        let queue_wait_millis = millis_u64(meta.queued_at.elapsed());
1477        let generation_before = self.observed_generation();
1478        let mut capability = self.open_semantic_capability(&meta)?;
1479        let result = {
1480            let mut session = capability
1481                .open_session_with_imports(meta.imports.clone(), None, None)
1482                .map_err(map_worker_err)?;
1483            crate::semantic_search::run_semantic_proof_search(&mut session, request)
1484        };
1485        let runtime = self.runtime_facts(&meta, generation_before, 0, queue_wait_millis, None);
1486        self.publish_runtime(&runtime);
1487        result.map(|value| ProjectCall::new(value, runtime))
1488    }
1489
1490    fn open_semantic_capability(&self, meta: &JobMeta) -> Result<lean_rs_worker_parent::LeanWorkerCapability> {
1491        let runtime = lean_semantic_search_runtime::build_cached(SemanticSearchRuntimeBuild {
1492            cache_root: semantic_runtime_cache_root()?,
1493            toolchain_label: self.config.toolchain_label.clone(),
1494            lean_sysroot: self.config.lean_sysroot.clone(),
1495        })
1496        .map_err(|err| {
1497            self.worker_unavailable_for(
1498                meta,
1499                format!("semantic runtime build failed for this toolchain: {err}"),
1500                true,
1501                false,
1502                None,
1503                None,
1504                None,
1505            )
1506        })?;
1507        semantic_capability_builder(&self.config, &runtime.built)?
1508            .open()
1509            .map_err(|err| {
1510                self.worker_unavailable_for(
1511                    meta,
1512                    format!(
1513                        "semantic capability open failed for this toolchain: {}",
1514                        map_worker_err(err)
1515                    ),
1516                    true,
1517                    false,
1518                    None,
1519                    None,
1520                    None,
1521                )
1522            })
1523    }
1524
1525    fn cycle_before_import_switch_if_needed(&mut self, meta: &JobMeta) -> Result<Option<RuntimeRestartEvent>> {
1526        let Some(previous_import_fingerprint) = self.last_import_fingerprint.as_deref() else {
1527            return Ok(None);
1528        };
1529        if previous_import_fingerprint == meta.import_fingerprint {
1530            return Ok(None);
1531        }
1532        self.profile_switch_count = self.profile_switch_count.saturating_add(1);
1533        let Some(current_kib) = self.handle.rss_kib() else {
1534            return Ok(None);
1535        };
1536        self.last_rss_kib = Some(current_kib);
1537        if current_kib < self.config.import_switch_rss_soft_kib {
1538            return Ok(None);
1539        }
1540        let limit_kib = self.config.import_switch_rss_soft_kib;
1541        let reason = format!("rss_import_switch current_kib={current_kib} limit_kib={limit_kib}");
1542        self.record_restart_or_stop(RestartCause::RssImportSwitch, &reason)
1543            .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1544        self.handle.cycle().map_err(map_worker_err)?;
1545        self.last_rss_kib = self.handle.rss_kib().or(Some(current_kib));
1546        let event = restart_event(
1547            RestartCause::RssImportSwitch,
1548            reason,
1549            self.observed_generation(),
1550            Some(current_kib),
1551            Some(limit_kib),
1552        );
1553        self.record_restart(event.clone());
1554        Ok(Some(event))
1555    }
1556
1557    fn cycle_after_post_job_rss_if_needed(&mut self, meta: &JobMeta) -> Result<Option<RuntimeRestartEvent>> {
1558        let Some(current_kib) = self.handle.rss_kib() else {
1559            return Ok(None);
1560        };
1561        self.last_rss_kib = Some(current_kib);
1562        let limit_kib = self.config.worker_rss_post_job_restart_kib;
1563        tracing::debug!(rss_kib = current_kib, limit_kib, "post-job rss check");
1564        if current_kib < limit_kib {
1565            return Ok(None);
1566        }
1567        let reason = format!("rss_post_job current_kib={current_kib} limit_kib={limit_kib}");
1568        self.record_restart_or_stop(RestartCause::RssPostJob, &reason)
1569            .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1570        self.handle.cycle().map_err(map_worker_err)?;
1571        self.last_rss_kib = self.handle.rss_kib().or(Some(current_kib));
1572        let event = restart_event(
1573            RestartCause::RssPostJob,
1574            reason,
1575            self.observed_generation(),
1576            Some(current_kib),
1577            Some(limit_kib),
1578        );
1579        self.record_restart(event.clone());
1580        Ok(Some(event))
1581    }
1582
1583    fn rebuild_after_worker_death(
1584        &mut self,
1585        reason: String,
1586        cause: RestartCause,
1587        meta: &JobMeta,
1588    ) -> Result<RuntimeRestartEvent> {
1589        self.record_restart_or_stop(cause, &reason)
1590            .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1591        let next_generation = self.observed_generation().saturating_add(1);
1592        let (handle, _) = open_worker(&self.config, false)?;
1593        self.handle = handle;
1594        self.worker_generation_base = next_generation;
1595        self.last_rss_kib = self.handle.rss_kib().or(self.last_rss_kib);
1596        let event = restart_event(cause, reason, self.observed_generation(), self.last_rss_kib, None);
1597        self.record_restart(event.clone());
1598        Ok(event)
1599    }
1600
1601    fn account_lifecycle_restarts_since(
1602        &mut self,
1603        before: &LeanWorkerLifecycleSnapshot,
1604        meta: &JobMeta,
1605    ) -> Result<Option<RuntimeRestartEvent>> {
1606        let after = self.handle.lifecycle_snapshot();
1607        let restarted = after.restarts.saturating_sub(before.restarts);
1608        if restarted == 0 {
1609            self.last_rss_kib = after.last_rss_kib.or(self.last_rss_kib);
1610            return Ok(None);
1611        }
1612        let (cause, reason) = after.last_restart_reason.as_ref().map_or_else(
1613            || (RestartCause::WorkerInternal, "worker_internal_restart".to_owned()),
1614            |reason| (restart_cause_from_worker(reason), restart_reason_text(reason)),
1615        );
1616        for _ in 0..restarted {
1617            self.record_restart_or_stop(cause, &reason)
1618                .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1619        }
1620        self.last_rss_kib = after.last_rss_kib.or(self.last_rss_kib);
1621        let event = restart_event(cause, reason, self.observed_generation(), self.last_rss_kib, None);
1622        self.record_restart(event.clone());
1623        Ok(Some(event))
1624    }
1625
1626    /// The single place a recycle becomes observable: tally it for frequency
1627    /// reporting, log it at a signal-appropriate level, and store it as the
1628    /// latest event. Every restart path funnels through here, so adding one is
1629    /// a single call. Kept distinct from [`Self::record_restart_or_stop`], which
1630    /// owns the orthogonal sliding-window health *policy*.
1631    fn record_restart(&mut self, event: RuntimeRestartEvent) {
1632        self.restart_stats.observe(&event.cause);
1633        log_restart(&event, self.restart_stats.total);
1634        self.last_restart = Some(event);
1635    }
1636
1637    fn record_restart_or_stop(
1638        &mut self,
1639        cause: RestartCause,
1640        reason: &str,
1641    ) -> std::result::Result<(), RestartLimitExceeded> {
1642        if !cause.counts_toward_restart_limit() {
1643            return Ok(());
1644        }
1645        let now = Instant::now();
1646        while self
1647            .abnormal_restart_times
1648            .front()
1649            .is_some_and(|seen| now.saturating_duration_since(*seen) > self.config.restart_window)
1650        {
1651            self.abnormal_restart_times.pop_front();
1652        }
1653        if self.abnormal_restart_times.len() >= self.config.max_restarts_per_window {
1654            self.config.healthy.store(false, Ordering::Release);
1655            tracing::warn!(
1656                cause = cause.as_str(),
1657                restarts_in_window = self.abnormal_restart_times.len(),
1658                window_millis = millis_u64(self.config.restart_window),
1659                "restart limit exceeded; marking project unhealthy"
1660            );
1661            let message = format!(
1662                "restart_limit_exceeded after {} restarts in {:?}; latest: {reason}",
1663                self.config.max_restarts_per_window, self.config.restart_window
1664            );
1665            let event = restart_event(
1666                cause,
1667                message.clone(),
1668                self.observed_generation(),
1669                self.last_rss_kib,
1670                None,
1671            );
1672            self.record_restart(event.clone());
1673            self.publish_runtime(&RuntimeFacts {
1674                worker_generation: self.observed_generation(),
1675                worker_restarted: false,
1676                retry_count: MAX_JOB_RETRIES,
1677                admission_wait_millis: 0,
1678                queue_wait_millis: 0,
1679                call_restart: None,
1680                last_restart: Some(event),
1681                rss_kib: self.last_rss_kib,
1682                worker_lanes: 1,
1683                import_profile: self.last_import_fingerprint.clone(),
1684                profile_switch_count: self.profile_switch_count,
1685                restarts_total: self.restart_stats.total,
1686                restarts_by_cause: self.restart_stats.by_cause.clone(),
1687            });
1688            return Err(RestartLimitExceeded {
1689                message,
1690                cause,
1691                restarts_in_window: self.abnormal_restart_times.len() as u64,
1692                window_millis: millis_u64(self.config.restart_window),
1693            });
1694        }
1695        self.abnormal_restart_times.push_back(now);
1696        Ok(())
1697    }
1698
1699    fn observed_generation(&self) -> u64 {
1700        self.worker_generation_base
1701            .saturating_add(self.handle.lifecycle_snapshot().worker_generation)
1702    }
1703
1704    fn runtime_facts(
1705        &self,
1706        meta: &JobMeta,
1707        generation_before: u64,
1708        retry_count: u32,
1709        queue_wait_millis: u64,
1710        call_restart: Option<RuntimeRestartEvent>,
1711    ) -> RuntimeFacts {
1712        let generation = self.observed_generation();
1713        let snapshot = self.handle.lifecycle_snapshot();
1714        RuntimeFacts {
1715            worker_generation: generation,
1716            worker_restarted: call_restart.is_some() || generation > generation_before,
1717            retry_count,
1718            admission_wait_millis: meta.admission_wait_millis,
1719            queue_wait_millis,
1720            call_restart,
1721            last_restart: self.last_restart.clone(),
1722            rss_kib: snapshot.last_rss_kib.or(self.last_rss_kib),
1723            worker_lanes: 1,
1724            import_profile: Some(meta.import_fingerprint.clone()),
1725            profile_switch_count: self.profile_switch_count,
1726            restarts_total: self.restart_stats.total,
1727            restarts_by_cause: self.restart_stats.by_cause.clone(),
1728        }
1729    }
1730
1731    fn publish_runtime(&self, runtime: &RuntimeFacts) {
1732        *self.runtime.lock() = RuntimeSnapshot {
1733            worker_generation: runtime.worker_generation,
1734            last_restart: runtime.last_restart.clone().or_else(|| runtime.call_restart.clone()),
1735            rss_kib: runtime.rss_kib,
1736            import_profile: runtime.import_profile.clone(),
1737            profile_switch_count: runtime.profile_switch_count,
1738            restarts_total: runtime.restarts_total,
1739            restarts_by_cause: runtime.restarts_by_cause.clone(),
1740        };
1741    }
1742
1743    fn worker_unavailable_for(
1744        &self,
1745        meta: &JobMeta,
1746        reason: String,
1747        retryable: bool,
1748        worker_restarted: bool,
1749        cause: Option<RestartCause>,
1750        limit_kib: Option<u64>,
1751        retry_after_millis: Option<u64>,
1752    ) -> ServerError {
1753        let snapshot = self.runtime.lock().facts();
1754        ServerError::worker_unavailable(WorkerUnavailable {
1755            retryable,
1756            worker_restarted,
1757            project_root: self.config.lake_root.to_string_lossy().into_owned(),
1758            project_hash: self.config.manifest_hash.clone(),
1759            imports: meta.imports.clone(),
1760            session_id: self.config.session_id.clone(),
1761            lean_toolchain: self.config.toolchain_label.clone(),
1762            worker_generation: self.observed_generation(),
1763            restart_cause: cause.map(|cause| cause.as_str().to_owned()),
1764            rss_kib: self.last_rss_kib,
1765            limit_kib,
1766            retry_after_millis,
1767            restarts_in_window: Some(self.abnormal_restart_times.len() as u64),
1768            window_millis: Some(millis_u64(self.config.restart_window)),
1769            runtime: snapshot,
1770            reason,
1771            toolchain_advisories: self.config.toolchain_advisories.clone(),
1772        })
1773    }
1774
1775    fn shutdown_unavailable(&self, meta: &JobMeta, reason: &'static str) -> ServerError {
1776        self.worker_unavailable_for(meta, reason.to_owned(), true, false, None, None, None)
1777    }
1778
1779    fn restart_limit_error(&self, imports: &[String], limit: RestartLimitExceeded) -> ServerError {
1780        let snapshot = self.runtime.lock().facts();
1781        ServerError::worker_unavailable(WorkerUnavailable {
1782            retryable: false,
1783            worker_restarted: false,
1784            project_root: self.config.lake_root.to_string_lossy().into_owned(),
1785            project_hash: self.config.manifest_hash.clone(),
1786            imports: imports.to_vec(),
1787            session_id: self.config.session_id.clone(),
1788            lean_toolchain: self.config.toolchain_label.clone(),
1789            worker_generation: self.observed_generation(),
1790            reason: limit.message,
1791            restart_cause: Some(limit.cause.as_str().to_owned()),
1792            rss_kib: self.last_rss_kib,
1793            limit_kib: None,
1794            retry_after_millis: Some(limit.window_millis),
1795            restarts_in_window: Some(limit.restarts_in_window),
1796            window_millis: Some(limit.window_millis),
1797            runtime: snapshot,
1798            toolchain_advisories: self.config.toolchain_advisories.clone(),
1799        })
1800    }
1801}
1802
1803struct RestartLimitExceeded {
1804    message: String,
1805    cause: RestartCause,
1806    restarts_in_window: u64,
1807    window_millis: u64,
1808}
1809
1810fn actor_main(
1811    config: ActorConfig,
1812    init_reply: std::sync::mpsc::Sender<std::result::Result<(String, mpsc::Sender<ProjectMessage>), ServerError>>,
1813) {
1814    let (handle, runtime_toolchain) = match open_worker(&config, true) {
1815        Ok(value) => value,
1816        Err(err) => {
1817            let _ = init_reply.send(Err(err));
1818            return;
1819        }
1820    };
1821    let mut state = ProjectActorState {
1822        config: config.clone(),
1823        handle,
1824        worker_generation_base: 1,
1825        last_restart: None,
1826        last_import_fingerprint: None,
1827        profile_switch_count: 0,
1828        last_rss_kib: None,
1829        runtime: Arc::clone(&config.runtime),
1830        abnormal_restart_times: VecDeque::new(),
1831        restart_stats: RestartStats::default(),
1832    };
1833
1834    let (tx, mut rx) = mpsc::channel::<ProjectMessage>(config.mailbox_capacity);
1835    if init_reply.send(Ok((runtime_toolchain, tx))).is_err() {
1836        return;
1837    }
1838
1839    while let Some(message) = rx.blocking_recv() {
1840        if !config.healthy.load(Ordering::Acquire) {
1841            message.reject(&state, "project_shutting_down");
1842            continue;
1843        }
1844        state.handle_message(message);
1845    }
1846
1847    match state.handle.shutdown() {
1848        Ok(report) => {
1849            tracing::debug!(
1850                outcome = ?report.outcome,
1851                elapsed_millis = millis_u64(report.elapsed),
1852                wait_millis = millis_u64(report.wait_elapsed),
1853                "project worker shutdown complete"
1854            );
1855        }
1856        Err(err) => {
1857            tracing::warn!(error = %err, "project worker shutdown failed");
1858        }
1859    }
1860}
1861
1862fn open_worker(config: &ActorConfig, preflight: bool) -> Result<(LeanWorkerHostHandle, String)> {
1863    let builder = worker_builder(config);
1864    if preflight {
1865        let report = builder.check();
1866        if let Some(first) = report.first_error() {
1867            if bootstrap_failure_is_hard_rss(&first.code().to_string(), first.message()) {
1868                return Err(bootstrap_hard_rss_unavailable(
1869                    config,
1870                    first.message().to_owned(),
1871                    parse_keyed_u64(first.message(), "current_kib"),
1872                    parse_keyed_u64(first.message(), "limit_kib").or(Some(config.worker_rss_hard_kill_kib)),
1873                ));
1874            }
1875            return Err(ServerError::BadProject(format!(
1876                "{}: {}",
1877                first.code(),
1878                first.message()
1879            )));
1880        }
1881    }
1882    let handle = match builder.open() {
1883        Ok(handle) => handle,
1884        Err(LeanWorkerError::RssHardLimitExceeded {
1885            operation,
1886            current_kib,
1887            limit_kib,
1888            ..
1889        }) => {
1890            return Err(bootstrap_hard_rss_unavailable(
1891                config,
1892                format!(
1893                    "rss_hard_limit_exceeded operation={operation} current_kib={current_kib} limit_kib={limit_kib}"
1894                ),
1895                Some(current_kib),
1896                Some(limit_kib),
1897            ));
1898        }
1899        Err(err) => return Err(map_worker_err(err)),
1900    };
1901    let runtime_toolchain = handle
1902        .runtime_metadata()
1903        .lean_version
1904        .unwrap_or_else(|| config.toolchain_label.clone());
1905    Ok((handle, runtime_toolchain))
1906}
1907
1908fn bootstrap_failure_is_hard_rss(code: &str, message: &str) -> bool {
1909    let lower = message.to_lowercase();
1910    code.contains("rss")
1911        || lower.contains("hard rss limit")
1912        || lower.contains("rss_hard_limit")
1913        || lower.contains("rss_hard_limit_exceeded")
1914        || lower.contains("rss hard limit")
1915}
1916
1917fn bootstrap_hard_rss_unavailable(
1918    config: &ActorConfig,
1919    reason: String,
1920    current_kib: Option<u64>,
1921    limit_kib: Option<u64>,
1922) -> ServerError {
1923    config.healthy.store(false, Ordering::Release);
1924    let generation = config.runtime.lock().worker_generation;
1925    let event = restart_event(
1926        RestartCause::RssHardLimit,
1927        reason.clone(),
1928        generation,
1929        current_kib,
1930        limit_kib,
1931    );
1932    // First-spawn worker tripped the hard RSS limit before serving a call: a
1933    // single terminal recycle. Emit the same log line a live recycle would, so
1934    // the cause is visible on stderr even when the project never opens.
1935    let restarts_total = 1;
1936    log_restart(&event, restarts_total);
1937    let by_cause = BTreeMap::from([(RestartCause::RssHardLimit.as_str().to_owned(), restarts_total)]);
1938    let runtime = RuntimeFacts {
1939        worker_generation: generation,
1940        worker_restarted: true,
1941        retry_count: 0,
1942        admission_wait_millis: 0,
1943        queue_wait_millis: 0,
1944        call_restart: Some(event.clone()),
1945        last_restart: Some(event.clone()),
1946        rss_kib: current_kib,
1947        worker_lanes: 1,
1948        import_profile: None,
1949        profile_switch_count: 0,
1950        restarts_total,
1951        restarts_by_cause: by_cause.clone(),
1952    };
1953    *config.runtime.lock() = RuntimeSnapshot {
1954        worker_generation: generation,
1955        last_restart: Some(event),
1956        rss_kib: current_kib,
1957        import_profile: None,
1958        profile_switch_count: 0,
1959        restarts_total,
1960        restarts_by_cause: by_cause,
1961    };
1962    ServerError::worker_unavailable(WorkerUnavailable {
1963        retryable: false,
1964        worker_restarted: true,
1965        project_root: config.lake_root.to_string_lossy().into_owned(),
1966        project_hash: config.manifest_hash.clone(),
1967        imports: Vec::new(),
1968        session_id: config.session_id.clone(),
1969        lean_toolchain: config.toolchain_label.clone(),
1970        worker_generation: generation,
1971        reason,
1972        restart_cause: Some(RestartCause::RssHardLimit.as_str().to_owned()),
1973        rss_kib: current_kib,
1974        limit_kib,
1975        retry_after_millis: None,
1976        restarts_in_window: None,
1977        window_millis: None,
1978        runtime,
1979        toolchain_advisories: config.toolchain_advisories.clone(),
1980    })
1981}
1982
1983fn worker_builder(config: &ActorConfig) -> LeanWorkerHostHandleBuilder {
1984    let restart_policy = LeanWorkerRestartPolicy::default().max_requests(WORKER_REQUEST_RESTARTS);
1985    let module_cache_limits = module_cache_limits(config);
1986    LeanWorkerHostHandleBuilder::shims_only(&config.lake_root, std::iter::empty::<String>())
1987        .worker_child(LeanWorkerChild::for_toolchain(
1988            config.worker_path.clone(),
1989            config.lean_sysroot.clone(),
1990        ))
1991        .startup_timeout(Duration::from_secs(30))
1992        .request_timeout(Duration::from_millis(config.request_timeout_millis))
1993        .restart_policy(restart_policy)
1994        .rss_hard_limit(
1995            config.worker_rss_hard_kill_kib,
1996            Duration::from_millis(config.worker_rss_sample_millis),
1997        )
1998        .module_cache_limits(module_cache_limits)
1999}
2000
2001fn semantic_capability_builder(
2002    config: &ActorConfig,
2003    built: &lean_toolchain::LeanBuiltCapability,
2004) -> Result<LeanWorkerCapabilityBuilder> {
2005    let restart_policy = LeanWorkerRestartPolicy::default().max_requests(WORKER_REQUEST_RESTARTS);
2006    let builder = LeanWorkerCapabilityBuilder::from_built_capability(built, std::iter::empty::<String>())
2007        .map_err(map_worker_err)?
2008        .import_workspace_root(config.lake_root.clone())
2009        .worker_child(LeanWorkerChild::for_toolchain(
2010            config.worker_path.clone(),
2011            config.lean_sysroot.clone(),
2012        ))
2013        .startup_timeout(Duration::from_secs(30))
2014        .request_timeout(Duration::from_millis(config.request_timeout_millis))
2015        .restart_policy(restart_policy)
2016        .rss_hard_limit(
2017            config.worker_rss_hard_kill_kib,
2018            Duration::from_millis(config.worker_rss_sample_millis),
2019        )
2020        .module_cache_limits(module_cache_limits(config))
2021        .json_command_export(lean_semantic_search_capability::DECLARATION_FEATURES_EXPORT)
2022        .json_command_export(lean_semantic_search_capability::PROOF_GOAL_FEATURES_EXPORT);
2023    Ok(builder)
2024}
2025
2026fn semantic_runtime_cache_root() -> Result<PathBuf> {
2027    let cache_dir =
2028        dirs::cache_dir().ok_or_else(|| ServerError::Internal("could not resolve user cache directory".to_owned()))?;
2029    Ok(cache_dir.join("lean-host-mcp").join("semantic-runtimes"))
2030}
2031
2032fn module_cache_limits(config: &ActorConfig) -> LeanWorkerModuleCacheLimits {
2033    LeanWorkerModuleCacheLimits::default()
2034        .rss_guard_kib(config.module_cache_rss_guard_kib)
2035        .max_bytes(config.module_cache_max_bytes)
2036}
2037
2038fn actor_thread_name(canonical_root: &Path) -> String {
2039    let basename = canonical_root.file_name().and_then(|s| s.to_str()).unwrap_or("project");
2040    format!("lean-host-mcp/project/{basename}")
2041}
2042
2043fn worker_error_is_recoverable_death(err: &LeanWorkerError) -> bool {
2044    matches!(
2045        err,
2046        LeanWorkerError::ChildExited { .. } | LeanWorkerError::ChildPanicOrAbort { .. }
2047    )
2048}
2049
2050fn worker_error_is_session_missing(err: &LeanWorkerError) -> bool {
2051    matches!(err, LeanWorkerError::Worker { code, .. } if code == "lean_rs.worker.session_missing")
2052}
2053
2054#[allow(
2055    clippy::wildcard_enum_match_arm,
2056    reason = "only worker process death variants are restart causes; all other errors are classified elsewhere"
2057)]
2058fn worker_death_cause(err: &LeanWorkerError) -> RestartCause {
2059    match err {
2060        LeanWorkerError::ChildPanicOrAbort { .. } => RestartCause::ChildAbort,
2061        LeanWorkerError::ChildExited { .. } => RestartCause::ChildExit,
2062        _ => RestartCause::WorkerInternal,
2063    }
2064}
2065
2066fn restart_reason_text(reason: &LeanWorkerRestartReason) -> String {
2067    match reason {
2068        LeanWorkerRestartReason::Explicit => RestartCause::Explicit.as_str().to_owned(),
2069        LeanWorkerRestartReason::MaxRequests { limit } => format!("max_requests limit={limit}"),
2070        LeanWorkerRestartReason::MaxImports { limit } => format!("max_imports limit={limit}"),
2071        LeanWorkerRestartReason::RssCeiling {
2072            current_kib, limit_kib, ..
2073        } => {
2074            format!("rss_ceiling current_kib={current_kib} limit_kib={limit_kib}")
2075        }
2076        LeanWorkerRestartReason::RssHardLimit {
2077            operation,
2078            current_kib,
2079            limit_kib,
2080            ..
2081        } => {
2082            format!("rss_hard_limit operation={operation} current_kib={current_kib} limit_kib={limit_kib}")
2083        }
2084        LeanWorkerRestartReason::Idle { idle_for, limit } => {
2085            format!(
2086                "idle idle_for_millis={} limit_millis={}",
2087                millis_u64(*idle_for),
2088                millis_u64(*limit)
2089            )
2090        }
2091        LeanWorkerRestartReason::Cancelled { operation } => format!("cancelled operation={operation}"),
2092        LeanWorkerRestartReason::ChildAbort { operation } => format!("child_abort operation={operation}"),
2093        LeanWorkerRestartReason::RequestTimeout { operation, duration } => {
2094            format!(
2095                "timeout operation={operation} duration_millis={}",
2096                millis_u64(*duration)
2097            )
2098        }
2099    }
2100}
2101
2102fn import_fingerprint(imports: &[String]) -> String {
2103    imports.join("\n")
2104}
2105
2106fn millis_u64(duration: Duration) -> u64 {
2107    u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
2108}
2109
2110fn parse_keyed_u64(text: &str, key: &str) -> Option<u64> {
2111    let prefix = format!("{key}=");
2112    let start = text.find(&prefix)?.checked_add(prefix.len())?;
2113    let digits = text[start..]
2114        .chars()
2115        .take_while(|ch| ch.is_ascii_digit())
2116        .collect::<String>();
2117    digits.parse().ok()
2118}
2119
2120/// Resolve a knob through `env > file > default` and reject a zero result
2121/// whatever its source. `env` is the raw env-var string (parsed here); `file`
2122/// is the already-typed config-file value; `default` is the built-in constant.
2123fn parse_nonzero_u64(name: &str, env: Option<&str>, file: Option<u64>, default: u64) -> Result<u64> {
2124    let value = match env {
2125        Some(raw) => raw
2126            .parse::<u64>()
2127            .map_err(|e| ServerError::Internal(format!("{name}={raw:?} not a u64: {e}")))?,
2128        None => file.unwrap_or(default),
2129    };
2130    if value == 0 {
2131        return Err(ServerError::Internal(format!(
2132            "{name} resolved to 0, which is not allowed"
2133        )));
2134    }
2135    Ok(value)
2136}
2137
2138fn parse_nonzero_usize(name: &str, env: Option<&str>, file: Option<usize>, default: usize) -> Result<usize> {
2139    let parsed = parse_nonzero_u64(name, env, file.map(|v| v as u64), default as u64)?;
2140    usize::try_from(parsed).map_err(|_| ServerError::Internal(format!("{name}={parsed} does not fit in usize")))
2141}
2142
2143#[cfg(test)]
2144#[allow(
2145    clippy::expect_used,
2146    clippy::panic,
2147    clippy::unwrap_used,
2148    reason = "unit tests use expect/unwrap_err to state the branch under test directly"
2149)]
2150mod tests {
2151    use super::*;
2152
2153    #[test]
2154    fn runtime_config_parses_runtime_policy_without_env_reads() {
2155        // Distinct values that also satisfy the RSS ordering invariant
2156        // (import_switch <= post_job <= hard_kill); see validate_rss_ordering.
2157        let config = parse_runtime_config(
2158            RuntimeEnv {
2159                worker_rss_post_job_restart_kib: Some("5".to_owned()),
2160                worker_rss_hard_kill_kib: Some("7".to_owned()),
2161                worker_rss_sample_millis: Some("11".to_owned()),
2162                import_switch_rss_soft_kib: Some("3".to_owned()),
2163                module_cache_rss_guard_kib: Some("17".to_owned()),
2164                module_cache_max_bytes: Some("19".to_owned()),
2165                request_timeout_millis: Some("37".to_owned()),
2166                project_mailbox_capacity: Some("23".to_owned()),
2167                worker_restart_limit: Some("29".to_owned()),
2168                worker_restart_window_secs: Some("31".to_owned()),
2169            },
2170            &RuntimeFileConfig::default(),
2171        )
2172        .unwrap();
2173
2174        assert_eq!(config.worker_rss_post_job_restart_kib(), 5);
2175        assert_eq!(config.worker_rss_hard_kill_kib(), 7);
2176        assert_eq!(config.worker_rss_sample_millis(), 11);
2177        assert_eq!(config.import_switch_rss_soft_kib(), 3);
2178        assert_eq!(config.module_cache_rss_guard_kib(), 17);
2179        assert_eq!(config.module_cache_max_bytes(), 19);
2180        assert_eq!(config.request_timeout_millis(), 37);
2181        assert_eq!(config.mailbox_capacity(), 23);
2182        assert_eq!(config.max_restarts_per_window(), 29);
2183        assert_eq!(config.restart_window(), Duration::from_secs(31));
2184    }
2185
2186    #[test]
2187    fn request_timeout_precedence_env_over_file_over_default() {
2188        let file = RuntimeFileConfig {
2189            request_timeout_millis: Some(45_000),
2190            ..RuntimeFileConfig::default()
2191        };
2192        // Env unset -> file value is used.
2193        let config = parse_runtime_config(RuntimeEnv::default(), &file).unwrap();
2194        assert_eq!(config.request_timeout_millis(), 45_000);
2195        // Env set -> env wins over the file.
2196        let env = RuntimeEnv {
2197            request_timeout_millis: Some("90000".to_owned()),
2198            ..RuntimeEnv::default()
2199        };
2200        let config = parse_runtime_config(env, &file).unwrap();
2201        assert_eq!(config.request_timeout_millis(), 90_000);
2202        // Neither -> built-in default (120 s).
2203        let config = parse_runtime_config(RuntimeEnv::default(), &RuntimeFileConfig::default()).unwrap();
2204        assert_eq!(config.request_timeout_millis(), REQUEST_TIMEOUT_MILLIS);
2205    }
2206
2207    #[test]
2208    fn request_timeout_zero_is_rejected() {
2209        // A zero deadline would time every call out instantly; parse_nonzero_u64
2210        // must reject it.
2211        let err = parse_runtime_config(
2212            RuntimeEnv {
2213                request_timeout_millis: Some("0".to_owned()),
2214                ..RuntimeEnv::default()
2215            },
2216            &RuntimeFileConfig::default(),
2217        )
2218        .unwrap_err();
2219        let ServerError::Internal(message) = err else {
2220            panic!("expected Internal config error");
2221        };
2222        assert!(
2223            message.contains("LEAN_HOST_MCP_REQUEST_TIMEOUT_MILLIS"),
2224            "message: {message}"
2225        );
2226    }
2227
2228    #[test]
2229    fn rss_config_rejects_post_job_above_hard_kill() {
2230        let err = parse_runtime_config(
2231            RuntimeEnv {
2232                // 20 GiB post-job ceiling above the 16 GiB default hard kill: the
2233                // planned post-job cycle could never fire before the hard kill.
2234                worker_rss_post_job_restart_kib: Some("20971520".to_owned()),
2235                ..RuntimeEnv::default()
2236            },
2237            &RuntimeFileConfig::default(),
2238        )
2239        .unwrap_err();
2240        let ServerError::Internal(message) = err else {
2241            panic!("expected Internal config error");
2242        };
2243        assert!(message.contains("invalid RSS config"), "message: {message}");
2244        assert!(message.contains("hard_kill"), "message: {message}");
2245    }
2246
2247    #[test]
2248    fn rss_config_rejects_import_switch_above_post_job() {
2249        let err = parse_runtime_config(
2250            RuntimeEnv {
2251                // Import-switch soft limit above the 5 GiB default post-job ceiling.
2252                import_switch_rss_soft_kib: Some("6291456".to_owned()),
2253                ..RuntimeEnv::default()
2254            },
2255            &RuntimeFileConfig::default(),
2256        )
2257        .unwrap_err();
2258        let ServerError::Internal(message) = err else {
2259            panic!("expected Internal config error");
2260        };
2261        assert!(message.contains("invalid RSS config"), "message: {message}");
2262        assert!(message.contains("import_switch"), "message: {message}");
2263    }
2264
2265    #[test]
2266    fn rss_config_accepts_raising_post_job_to_8gib() {
2267        // The motivating case: 8 GiB post-job ceiling, below the 16 GiB hard
2268        // kill and above the 2 GiB import-switch soft limit.
2269        let config = parse_runtime_config(
2270            RuntimeEnv {
2271                worker_rss_post_job_restart_kib: Some("8388608".to_owned()),
2272                ..RuntimeEnv::default()
2273            },
2274            &RuntimeFileConfig::default(),
2275        )
2276        .unwrap();
2277        assert_eq!(config.worker_rss_post_job_restart_kib(), 8_388_608);
2278    }
2279
2280    #[test]
2281    fn runtime_config_precedence_env_over_file_over_default() {
2282        let file = RuntimeFileConfig {
2283            worker_rss_post_job_restart_kib: Some(8_388_608),
2284            ..RuntimeFileConfig::default()
2285        };
2286        // Env unset -> file value is used.
2287        let config = parse_runtime_config(RuntimeEnv::default(), &file).unwrap();
2288        assert_eq!(config.worker_rss_post_job_restart_kib(), 8_388_608);
2289        // Env set -> env wins over the file (6 GiB, still a valid ordering).
2290        let env = RuntimeEnv {
2291            worker_rss_post_job_restart_kib: Some("6291456".to_owned()),
2292            ..RuntimeEnv::default()
2293        };
2294        let config = parse_runtime_config(env, &file).unwrap();
2295        assert_eq!(config.worker_rss_post_job_restart_kib(), 6_291_456);
2296        // Neither -> built-in default.
2297        let config = parse_runtime_config(RuntimeEnv::default(), &RuntimeFileConfig::default()).unwrap();
2298        assert_eq!(
2299            config.worker_rss_post_job_restart_kib(),
2300            WORKER_RSS_POST_JOB_RESTART_KIB
2301        );
2302    }
2303
2304    #[test]
2305    fn runtime_config_rejects_zero_and_bad_ordering_from_file() {
2306        let zero = RuntimeFileConfig {
2307            worker_rss_sample_millis: Some(0),
2308            ..RuntimeFileConfig::default()
2309        };
2310        assert!(parse_runtime_config(RuntimeEnv::default(), &zero).is_err());
2311
2312        let inverted = RuntimeFileConfig {
2313            worker_rss_post_job_restart_kib: Some(20_971_520), // above 16 GiB default hard kill
2314            ..RuntimeFileConfig::default()
2315        };
2316        let err = parse_runtime_config(RuntimeEnv::default(), &inverted).unwrap_err();
2317        let ServerError::Internal(message) = err else {
2318            panic!("expected Internal config error");
2319        };
2320        assert!(message.contains("invalid RSS config"), "message: {message}");
2321    }
2322
2323    #[test]
2324    fn restart_stats_tally_total_and_per_cause() {
2325        let mut stats = RestartStats::default();
2326        stats.observe("rss_post_job");
2327        stats.observe("rss_post_job");
2328        stats.observe("child_abort");
2329
2330        assert_eq!(stats.total, 3);
2331        assert_eq!(stats.by_cause.get("rss_post_job"), Some(&2));
2332        assert_eq!(stats.by_cause.get("child_abort"), Some(&1));
2333        assert_eq!(stats.by_cause.get("idle"), None);
2334    }
2335
2336    #[test]
2337    fn semantic_capability_builder_omits_capability_import_module() {
2338        let tmp = tempfile::tempdir().unwrap();
2339        let capability_root = tmp.path().join("capability");
2340        let lib_dir = capability_root.join(".lake").join("build").join("lib");
2341        std::fs::create_dir_all(&lib_dir).unwrap();
2342        let dylib_name = if cfg!(target_os = "macos") {
2343            "libLeanSemanticSearch.dylib"
2344        } else {
2345            "libLeanSemanticSearch.so"
2346        };
2347        let dylib = lib_dir.join(dylib_name);
2348        std::fs::write(&dylib, "").unwrap();
2349        let built = lean_toolchain::LeanBuiltCapability::path(&dylib)
2350            .package("lean_semantic_search")
2351            .module("LeanSemanticSearch");
2352        let runtime = Arc::new(Mutex::new(RuntimeSnapshot {
2353            worker_generation: 1,
2354            last_restart: None,
2355            rss_kib: None,
2356            import_profile: None,
2357            profile_switch_count: 0,
2358            restarts_total: 0,
2359            restarts_by_cause: BTreeMap::new(),
2360        }));
2361        let config = ActorConfig {
2362            lake_root: tmp.path().join("consumer"),
2363            manifest_hash: "sha256-test".to_owned(),
2364            toolchain_label: "leanprover/lean4:test".to_owned(),
2365            worker_path: tmp.path().join("worker"),
2366            lean_sysroot: tmp.path().join("lean"),
2367            session_id: "session-test".to_owned(),
2368            runtime,
2369            healthy: Arc::new(AtomicBool::new(true)),
2370            worker_rss_post_job_restart_kib: WORKER_RSS_POST_JOB_RESTART_KIB,
2371            worker_rss_hard_kill_kib: WORKER_RSS_HARD_KILL_KIB,
2372            worker_rss_sample_millis: WORKER_RSS_SAMPLE_MILLIS,
2373            import_switch_rss_soft_kib: IMPORT_SWITCH_RSS_SOFT_KIB,
2374            module_cache_rss_guard_kib: MODULE_CACHE_RSS_GUARD_KIB,
2375            module_cache_max_bytes: MODULE_CACHE_MAX_BYTES,
2376            request_timeout_millis: REQUEST_TIMEOUT_MILLIS,
2377            mailbox_capacity: PROJECT_MAILBOX_CAPACITY,
2378            max_restarts_per_window: MAX_RESTARTS_PER_WINDOW,
2379            restart_window: RESTART_WINDOW,
2380            toolchain_advisories: Vec::new(),
2381        };
2382
2383        let builder = semantic_capability_builder(&config, &built).unwrap();
2384        let debug = format!("{builder:?}");
2385
2386        assert!(debug.contains("imports: []"), "builder debug: {debug}");
2387        assert!(
2388            !debug.contains("LeanSemanticSearch.Capability"),
2389            "builder must not import the capability module: {debug}"
2390        );
2391        assert!(
2392            debug.contains("import_workspace_root: Some"),
2393            "builder should import sessions against the consumer workspace: {debug}"
2394        );
2395    }
2396
2397    #[test]
2398    fn bootstrap_hard_rss_failure_is_structured_runtime_unavailable() {
2399        let runtime = Arc::new(Mutex::new(RuntimeSnapshot {
2400            worker_generation: 1,
2401            last_restart: None,
2402            rss_kib: None,
2403            import_profile: None,
2404            profile_switch_count: 0,
2405            restarts_total: 0,
2406            restarts_by_cause: BTreeMap::new(),
2407        }));
2408        let config = ActorConfig {
2409            lake_root: PathBuf::from("/tmp/lean-host-mcp-bootstrap-rss-test"),
2410            manifest_hash: "sha256-test".to_owned(),
2411            toolchain_label: "leanprover/lean4:test".to_owned(),
2412            worker_path: PathBuf::from("/tmp/worker"),
2413            lean_sysroot: PathBuf::from("/tmp/lean"),
2414            session_id: "session-test".to_owned(),
2415            runtime: Arc::clone(&runtime),
2416            healthy: Arc::new(AtomicBool::new(true)),
2417            worker_rss_post_job_restart_kib: WORKER_RSS_POST_JOB_RESTART_KIB,
2418            worker_rss_hard_kill_kib: 64,
2419            worker_rss_sample_millis: WORKER_RSS_SAMPLE_MILLIS,
2420            import_switch_rss_soft_kib: IMPORT_SWITCH_RSS_SOFT_KIB,
2421            module_cache_rss_guard_kib: MODULE_CACHE_RSS_GUARD_KIB,
2422            module_cache_max_bytes: MODULE_CACHE_MAX_BYTES,
2423            request_timeout_millis: REQUEST_TIMEOUT_MILLIS,
2424            mailbox_capacity: PROJECT_MAILBOX_CAPACITY,
2425            max_restarts_per_window: MAX_RESTARTS_PER_WINDOW,
2426            restart_window: RESTART_WINDOW,
2427            toolchain_advisories: Vec::new(),
2428        };
2429
2430        let err = bootstrap_hard_rss_unavailable(
2431            &config,
2432            "rss_hard_limit_exceeded operation=startup current_kib=128 limit_kib=64".to_owned(),
2433            Some(128),
2434            Some(64),
2435        );
2436
2437        let ServerError::WorkerUnavailable(info) = err else {
2438            panic!("expected WorkerUnavailable");
2439        };
2440        assert!(!info.retryable);
2441        assert_eq!(info.restart_cause.as_deref(), Some("rss_hard_limit_exceeded"));
2442        assert_eq!(info.rss_kib, Some(128));
2443        assert_eq!(info.limit_kib, Some(64));
2444        assert_eq!(
2445            info.runtime.last_restart.as_ref().map(|event| event.cause.as_str()),
2446            Some("rss_hard_limit_exceeded")
2447        );
2448    }
2449
2450    #[test]
2451    fn planned_restart_causes_do_not_consume_abnormal_restart_budget() {
2452        assert!(!RestartCause::RssImportSwitch.counts_toward_restart_limit());
2453        assert!(!RestartCause::RssPostJob.counts_toward_restart_limit());
2454        assert!(!RestartCause::MaxRequests.counts_toward_restart_limit());
2455        assert!(!RestartCause::MaxImports.counts_toward_restart_limit());
2456        assert!(!RestartCause::Idle.counts_toward_restart_limit());
2457
2458        assert!(RestartCause::ChildExit.counts_toward_restart_limit());
2459        assert!(RestartCause::ChildAbort.counts_toward_restart_limit());
2460        assert!(RestartCause::Timeout.counts_toward_restart_limit());
2461        assert!(RestartCause::Cancelled.counts_toward_restart_limit());
2462        assert!(RestartCause::SessionMissing.counts_toward_restart_limit());
2463        assert!(RestartCause::RssHardLimit.counts_toward_restart_limit());
2464    }
2465
2466    #[test]
2467    fn worker_restart_reason_maps_to_stable_cause() {
2468        assert_eq!(
2469            restart_cause_from_worker(&LeanWorkerRestartReason::MaxRequests { limit: 1 }).as_str(),
2470            "max_requests"
2471        );
2472        assert_eq!(
2473            restart_cause_from_worker(&LeanWorkerRestartReason::RssCeiling {
2474                current_kib: 2,
2475                limit_kib: 1,
2476                last_import_stats: None,
2477            })
2478            .as_str(),
2479            "rss_post_job"
2480        );
2481        assert_eq!(
2482            restart_cause_from_worker(&LeanWorkerRestartReason::RssHardLimit {
2483                operation: "test",
2484                current_kib: 2,
2485                limit_kib: 1,
2486                last_import_stats: None,
2487            })
2488            .as_str(),
2489            "rss_hard_limit_exceeded"
2490        );
2491        assert_eq!(
2492            restart_cause_from_worker(&LeanWorkerRestartReason::RequestTimeout {
2493                operation: "test",
2494                duration: Duration::from_millis(1),
2495            })
2496            .as_str(),
2497            "timeout"
2498        );
2499    }
2500}