Skip to main content

lean_host_mcp/
project.rs

1//! `LeanProject`—the unit of Lean semantic execution.
2//!
3//! One Lake project owns one private actor. The actor serializes all semantic
4//! worker calls, owns the child-process supervisor, applies memory/restart
5//! policy, and exposes only typed request/reply calls to tool modules. Worker
6//! handles, channels, queue internals, and restart mechanics stay below this
7//! boundary.
8
9#![allow(let_underscore_drop, clippy::needless_pass_by_value)]
10
11use std::collections::{BTreeMap, VecDeque};
12use std::num::NonZeroUsize;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
16use std::thread;
17use std::time::{Duration, Instant};
18
19use lean_rs_worker_parent::{
20    LeanWorkerCapabilityBuilder, LeanWorkerChild, LeanWorkerDeclarationInspectionRequest,
21    LeanWorkerDeclarationInspectionResult, LeanWorkerDeclarationSearch, LeanWorkerDeclarationSearchResult,
22    LeanWorkerDeclarationVerificationRequest, LeanWorkerDeclarationVerificationResult, LeanWorkerElabOptions,
23    LeanWorkerError, LeanWorkerHostHandle, LeanWorkerHostHandleBuilder, LeanWorkerLifecycleSnapshot,
24    LeanWorkerModuleCacheLimits, LeanWorkerModuleQuery, LeanWorkerModuleQueryBatchOutcome,
25    LeanWorkerModuleQueryOutcome, LeanWorkerModuleQuerySelector, LeanWorkerOutputBudgets,
26    LeanWorkerProofAttemptRequest, LeanWorkerProofAttemptResult, LeanWorkerRestartPolicy, LeanWorkerRestartReason,
27};
28use lean_semantic_search_runtime::SemanticSearchRuntimeBuild;
29use parking_lot::Mutex;
30use tokio::sync::{mpsc, oneshot};
31
32use crate::admission::SemanticPermit;
33use crate::cache::ModuleQueryCache;
34use crate::config_file::RuntimeFileConfig;
35use crate::envelope::{Freshness, RuntimeFacts, RuntimeRestartEvent};
36use crate::error::{Result, ServerError, WorkerUnavailable, map_worker_err};
37use crate::lake_meta::LakeProjectMeta;
38use crate::semantic_search::{SemanticProofSearchRequest, SemanticProofSearchResult};
39use crate::toolchain::{Readiness, ToolchainId, WorkerBinary};
40
41/// LRU capacity for exact bounded module query results.
42const MODULE_QUERY_CACHE_CAPACITY: usize = 256;
43const WORKER_REQUEST_RESTARTS: u64 = 64;
44const PROJECT_MAILBOX_CAPACITY: usize = 8;
45const WORKER_RSS_POST_JOB_RESTART_KIB: u64 = 5 * 1024 * 1024;
46const WORKER_RSS_HARD_KILL_KIB: u64 = 16 * 1024 * 1024;
47const WORKER_RSS_SAMPLE_MILLIS: u64 = 250;
48const IMPORT_SWITCH_RSS_SOFT_KIB: u64 = 2 * 1024 * 1024;
49const MODULE_CACHE_RSS_GUARD_KIB: u64 = 2 * 1024 * 1024;
50const MODULE_CACHE_MAX_BYTES: u64 = 32 * 1024 * 1024;
51/// Per-request worker deadline. Covers one tool call end to end (live rows,
52/// diagnostics, terminal response); on expiry the worker is recycled and the
53/// call returns a retryable runtime error. Replaces the worker-parent's 10-min
54/// `long_running_requests` profile, which let whole-project scans (e.g.
55/// `find_references` at project scope) appear to hang. Raise it for unusually
56/// heavy modules whose `verify`/`proof_state` legitimately runs longer.
57const REQUEST_TIMEOUT_MILLIS: u64 = 120 * 1000;
58const MAX_JOB_RETRIES: u32 = 1;
59const MAX_RESTARTS_PER_WINDOW: usize = 3;
60const RESTART_WINDOW: Duration = Duration::from_mins(1);
61
62/// Runtime policy for one private project actor.
63///
64/// The binary parses this once at server startup and passes it into the
65/// broker. Tests and embedders can construct the default directly without
66/// rereading process environment during project open.
67#[derive(Debug, Clone, Eq, PartialEq)]
68pub struct ProjectRuntimeConfig {
69    worker_rss_post_job_restart_kib: u64,
70    worker_rss_hard_kill_kib: u64,
71    worker_rss_sample_millis: u64,
72    import_switch_rss_soft_kib: u64,
73    module_cache_rss_guard_kib: u64,
74    module_cache_max_bytes: u64,
75    request_timeout_millis: u64,
76    mailbox_capacity: usize,
77    max_restarts_per_window: usize,
78    restart_window: Duration,
79}
80
81impl Default for ProjectRuntimeConfig {
82    fn default() -> Self {
83        Self {
84            worker_rss_post_job_restart_kib: WORKER_RSS_POST_JOB_RESTART_KIB,
85            worker_rss_hard_kill_kib: WORKER_RSS_HARD_KILL_KIB,
86            worker_rss_sample_millis: WORKER_RSS_SAMPLE_MILLIS,
87            import_switch_rss_soft_kib: IMPORT_SWITCH_RSS_SOFT_KIB,
88            module_cache_rss_guard_kib: MODULE_CACHE_RSS_GUARD_KIB,
89            module_cache_max_bytes: MODULE_CACHE_MAX_BYTES,
90            request_timeout_millis: REQUEST_TIMEOUT_MILLIS,
91            mailbox_capacity: PROJECT_MAILBOX_CAPACITY,
92            max_restarts_per_window: MAX_RESTARTS_PER_WINDOW,
93            restart_window: RESTART_WINDOW,
94        }
95    }
96}
97
98impl ProjectRuntimeConfig {
99    /// Parse runtime env vars once at server startup.
100    ///
101    /// # Errors
102    ///
103    /// [`ServerError::Internal`] when a runtime env var is malformed, zero
104    /// where zero is unsafe.
105    pub fn from_env() -> Result<Self> {
106        Self::from_env_with_file(&RuntimeFileConfig::default())
107    }
108
109    /// Resolve the runtime policy with a config-file section as the layer
110    /// beneath env vars: each knob is `env var > file > built-in default`.
111    ///
112    /// # Errors
113    ///
114    /// [`ServerError::Internal`] when an env var is malformed, or a resolved
115    /// value (from env or file) is zero where zero is unsafe, or the RSS
116    /// ceilings violate `import_switch <= post_job <= hard_kill`.
117    pub fn from_env_with_file(file: &RuntimeFileConfig) -> Result<Self> {
118        parse_runtime_config(
119            RuntimeEnv {
120                worker_rss_post_job_restart_kib: runtime_env_var("LEAN_HOST_MCP_WORKER_RSS_POST_JOB_RESTART_KIB")?,
121                worker_rss_hard_kill_kib: runtime_env_var("LEAN_HOST_MCP_WORKER_RSS_HARD_KILL_KIB")?,
122                worker_rss_sample_millis: runtime_env_var("LEAN_HOST_MCP_WORKER_RSS_SAMPLE_MILLIS")?,
123                import_switch_rss_soft_kib: runtime_env_var("LEAN_HOST_MCP_IMPORT_SWITCH_RSS_SOFT_KIB")?,
124                module_cache_rss_guard_kib: runtime_env_var("LEAN_HOST_MCP_MODULE_CACHE_RSS_GUARD_KIB")?,
125                module_cache_max_bytes: runtime_env_var("LEAN_HOST_MCP_MODULE_CACHE_MAX_BYTES")?,
126                request_timeout_millis: runtime_env_var("LEAN_HOST_MCP_REQUEST_TIMEOUT_MILLIS")?,
127                project_mailbox_capacity: runtime_env_var("LEAN_HOST_MCP_PROJECT_MAILBOX_CAPACITY")?,
128                worker_restart_limit: runtime_env_var("LEAN_HOST_MCP_WORKER_RESTART_LIMIT")?,
129                worker_restart_window_secs: runtime_env_var("LEAN_HOST_MCP_WORKER_RESTART_WINDOW_SECS")?,
130            },
131            file,
132        )
133    }
134
135    #[must_use]
136    pub const fn worker_rss_post_job_restart_kib(&self) -> u64 {
137        self.worker_rss_post_job_restart_kib
138    }
139
140    #[must_use]
141    pub const fn worker_rss_hard_kill_kib(&self) -> u64 {
142        self.worker_rss_hard_kill_kib
143    }
144
145    #[must_use]
146    pub const fn worker_rss_sample_millis(&self) -> u64 {
147        self.worker_rss_sample_millis
148    }
149
150    #[must_use]
151    pub const fn import_switch_rss_soft_kib(&self) -> u64 {
152        self.import_switch_rss_soft_kib
153    }
154
155    #[must_use]
156    pub const fn module_cache_rss_guard_kib(&self) -> u64 {
157        self.module_cache_rss_guard_kib
158    }
159
160    #[must_use]
161    pub const fn module_cache_max_bytes(&self) -> u64 {
162        self.module_cache_max_bytes
163    }
164
165    #[must_use]
166    pub const fn request_timeout_millis(&self) -> u64 {
167        self.request_timeout_millis
168    }
169
170    #[must_use]
171    pub const fn mailbox_capacity(&self) -> usize {
172        self.mailbox_capacity
173    }
174
175    #[must_use]
176    pub const fn max_restarts_per_window(&self) -> usize {
177        self.max_restarts_per_window
178    }
179
180    #[must_use]
181    pub const fn restart_window(&self) -> Duration {
182        self.restart_window
183    }
184}
185
186#[derive(Debug, Default)]
187struct RuntimeEnv {
188    worker_rss_post_job_restart_kib: Option<String>,
189    worker_rss_hard_kill_kib: Option<String>,
190    worker_rss_sample_millis: Option<String>,
191    import_switch_rss_soft_kib: Option<String>,
192    module_cache_rss_guard_kib: Option<String>,
193    module_cache_max_bytes: Option<String>,
194    request_timeout_millis: Option<String>,
195    project_mailbox_capacity: Option<String>,
196    worker_restart_limit: Option<String>,
197    worker_restart_window_secs: Option<String>,
198}
199
200fn parse_runtime_config(env: RuntimeEnv, file: &RuntimeFileConfig) -> Result<ProjectRuntimeConfig> {
201    let defaults = ProjectRuntimeConfig::default();
202    let config = ProjectRuntimeConfig {
203        worker_rss_post_job_restart_kib: parse_nonzero_u64(
204            "LEAN_HOST_MCP_WORKER_RSS_POST_JOB_RESTART_KIB",
205            env.worker_rss_post_job_restart_kib.as_deref(),
206            file.worker_rss_post_job_restart_kib,
207            defaults.worker_rss_post_job_restart_kib,
208        )?,
209        worker_rss_hard_kill_kib: parse_nonzero_u64(
210            "LEAN_HOST_MCP_WORKER_RSS_HARD_KILL_KIB",
211            env.worker_rss_hard_kill_kib.as_deref(),
212            file.worker_rss_hard_kill_kib,
213            defaults.worker_rss_hard_kill_kib,
214        )?,
215        worker_rss_sample_millis: parse_nonzero_u64(
216            "LEAN_HOST_MCP_WORKER_RSS_SAMPLE_MILLIS",
217            env.worker_rss_sample_millis.as_deref(),
218            file.worker_rss_sample_millis,
219            defaults.worker_rss_sample_millis,
220        )?,
221        import_switch_rss_soft_kib: parse_nonzero_u64(
222            "LEAN_HOST_MCP_IMPORT_SWITCH_RSS_SOFT_KIB",
223            env.import_switch_rss_soft_kib.as_deref(),
224            file.import_switch_rss_soft_kib,
225            defaults.import_switch_rss_soft_kib,
226        )?,
227        module_cache_rss_guard_kib: parse_nonzero_u64(
228            "LEAN_HOST_MCP_MODULE_CACHE_RSS_GUARD_KIB",
229            env.module_cache_rss_guard_kib.as_deref(),
230            file.module_cache_rss_guard_kib,
231            defaults.module_cache_rss_guard_kib,
232        )?,
233        module_cache_max_bytes: parse_nonzero_u64(
234            "LEAN_HOST_MCP_MODULE_CACHE_MAX_BYTES",
235            env.module_cache_max_bytes.as_deref(),
236            file.module_cache_max_bytes,
237            defaults.module_cache_max_bytes,
238        )?,
239        request_timeout_millis: parse_nonzero_u64(
240            "LEAN_HOST_MCP_REQUEST_TIMEOUT_MILLIS",
241            env.request_timeout_millis.as_deref(),
242            file.request_timeout_millis,
243            defaults.request_timeout_millis,
244        )?,
245        mailbox_capacity: parse_nonzero_usize(
246            "LEAN_HOST_MCP_PROJECT_MAILBOX_CAPACITY",
247            env.project_mailbox_capacity.as_deref(),
248            file.project_mailbox_capacity,
249            defaults.mailbox_capacity,
250        )?,
251        max_restarts_per_window: parse_nonzero_usize(
252            "LEAN_HOST_MCP_WORKER_RESTART_LIMIT",
253            env.worker_restart_limit.as_deref(),
254            file.worker_restart_limit,
255            defaults.max_restarts_per_window,
256        )?,
257        restart_window: Duration::from_secs(parse_nonzero_u64(
258            "LEAN_HOST_MCP_WORKER_RESTART_WINDOW_SECS",
259            env.worker_restart_window_secs.as_deref(),
260            file.worker_restart_window_secs,
261            defaults.restart_window.as_secs(),
262        )?),
263    };
264    validate_rss_ordering(&config)?;
265    Ok(config)
266}
267
268/// The three RSS ceilings escalate: a worker cycles cleanly before an
269/// import-profile switch (`import_switch`), again after a job that grew past the
270/// post-job budget (`post_job`), and is killed in-flight only at the hard limit
271/// (`hard_kill`). If a tuned value inverts that order the cheaper cycle can
272/// never fire — e.g. `post_job > hard_kill` means the planned post-job recycle
273/// is unreachable and every overrun escalates straight to a hard kill. Reject it
274/// at startup with the offending values, rather than degrade silently.
275fn validate_rss_ordering(config: &ProjectRuntimeConfig) -> Result<()> {
276    if config.import_switch_rss_soft_kib > config.worker_rss_post_job_restart_kib {
277        return Err(ServerError::Internal(format!(
278            "invalid RSS config: import_switch={} KiB exceeds post_job={} KiB \
279             (need import_switch <= post_job <= hard_kill)",
280            config.import_switch_rss_soft_kib, config.worker_rss_post_job_restart_kib,
281        )));
282    }
283    if config.worker_rss_post_job_restart_kib > config.worker_rss_hard_kill_kib {
284        return Err(ServerError::Internal(format!(
285            "invalid RSS config: post_job={} KiB exceeds hard_kill={} KiB \
286             (need import_switch <= post_job <= hard_kill)",
287            config.worker_rss_post_job_restart_kib, config.worker_rss_hard_kill_kib,
288        )));
289    }
290    Ok(())
291}
292
293fn runtime_env_var(name: &str) -> Result<Option<String>> {
294    match std::env::var(name) {
295        Ok(value) => Ok(Some(value)),
296        Err(std::env::VarError::NotPresent) => Ok(None),
297        Err(err @ std::env::VarError::NotUnicode(_)) => {
298            Err(ServerError::Internal(format!("{name} is not valid unicode: {err}")))
299        }
300    }
301}
302
303/// Result of one project actor call.
304#[derive(Debug, Clone)]
305pub(crate) struct ProjectCall<T> {
306    value: T,
307    runtime: RuntimeFacts,
308}
309
310impl<T> ProjectCall<T> {
311    pub(crate) fn new(value: T, runtime: RuntimeFacts) -> Self {
312        Self { value, runtime }
313    }
314
315    pub(crate) fn into_parts(self) -> (T, RuntimeFacts) {
316        (self.value, self.runtime)
317    }
318}
319
320#[derive(Clone, Copy, Debug, Eq, PartialEq)]
321enum RetryPolicy {
322    RetryOnceReadOnly,
323}
324
325impl RetryPolicy {
326    fn retries(self) -> u32 {
327        match self {
328            Self::RetryOnceReadOnly => MAX_JOB_RETRIES,
329        }
330    }
331}
332
333struct ActiveJobGuard {
334    active_jobs: Arc<AtomicUsize>,
335}
336
337impl Drop for ActiveJobGuard {
338    fn drop(&mut self) {
339        self.active_jobs.fetch_sub(1, Ordering::AcqRel);
340    }
341}
342
343struct JobMeta {
344    imports: Vec<String>,
345    import_fingerprint: String,
346    _created_at: Instant,
347    queued_at: Instant,
348    admission_wait_millis: u64,
349    _correlation_id: uuid::Uuid,
350    retry_policy: RetryPolicy,
351    _active_job: ActiveJobGuard,
352    _semantic_permit: SemanticPermit,
353}
354
355enum ProjectMessage {
356    ModuleQuery {
357        meta: JobMeta,
358        source: String,
359        query: LeanWorkerModuleQuery,
360        options: LeanWorkerElabOptions,
361        reply: oneshot::Sender<Result<ProjectCall<LeanWorkerModuleQueryOutcome>>>,
362    },
363    ModuleQueryBatch {
364        meta: JobMeta,
365        source: String,
366        selectors: Vec<LeanWorkerModuleQuerySelector>,
367        budgets: LeanWorkerOutputBudgets,
368        options: LeanWorkerElabOptions,
369        reply: oneshot::Sender<Result<ProjectCall<LeanWorkerModuleQueryBatchOutcome>>>,
370    },
371    DeclarationInspection {
372        meta: JobMeta,
373        request: LeanWorkerDeclarationInspectionRequest,
374        reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationInspectionResult>>>,
375    },
376    DeclarationSearch {
377        meta: JobMeta,
378        request: LeanWorkerDeclarationSearch,
379        reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationSearchResult>>>,
380    },
381    ProofAttempt {
382        meta: JobMeta,
383        request: LeanWorkerProofAttemptRequest,
384        options: LeanWorkerElabOptions,
385        reply: oneshot::Sender<Result<ProjectCall<LeanWorkerProofAttemptResult>>>,
386    },
387    DeclarationVerification {
388        meta: JobMeta,
389        request: LeanWorkerDeclarationVerificationRequest,
390        options: LeanWorkerElabOptions,
391        reply: oneshot::Sender<Result<ProjectCall<LeanWorkerDeclarationVerificationResult>>>,
392    },
393    SemanticProofSearch {
394        meta: JobMeta,
395        request: SemanticProofSearchRequest,
396        reply: oneshot::Sender<Result<ProjectCall<SemanticProofSearchResult>>>,
397    },
398}
399
400impl ProjectMessage {
401    fn imports(&self) -> &[String] {
402        match self {
403            Self::ModuleQuery { meta, .. }
404            | Self::ModuleQueryBatch { meta, .. }
405            | Self::DeclarationInspection { meta, .. }
406            | Self::DeclarationSearch { meta, .. }
407            | Self::ProofAttempt { meta, .. }
408            | Self::DeclarationVerification { meta, .. }
409            | Self::SemanticProofSearch { meta, .. } => &meta.imports,
410        }
411    }
412}
413
414#[derive(Clone, Copy, Debug, Eq, PartialEq)]
415enum RestartCause {
416    #[expect(dead_code, reason = "stable wire cause reserved for future non-RSS profile cycling")]
417    ImportProfileSwitch,
418    RssImportSwitch,
419    RssPostJob,
420    RssHardLimit,
421    MaxRequests,
422    MaxImports,
423    Idle,
424    Timeout,
425    Cancelled,
426    ChildExit,
427    ChildAbort,
428    SessionMissing,
429    Explicit,
430    WorkerInternal,
431}
432
433impl RestartCause {
434    const fn as_str(self) -> &'static str {
435        match self {
436            Self::ImportProfileSwitch => "import_profile_switch",
437            Self::RssImportSwitch => "rss_import_switch",
438            Self::RssPostJob => "rss_post_job",
439            Self::RssHardLimit => "rss_hard_limit_exceeded",
440            Self::MaxRequests => "max_requests",
441            Self::MaxImports => "max_imports",
442            Self::Idle => "idle",
443            Self::Timeout => "timeout",
444            Self::Cancelled => "cancelled",
445            Self::ChildExit => "child_exit",
446            Self::ChildAbort => "child_abort",
447            Self::SessionMissing => "session_missing",
448            Self::Explicit => "explicit",
449            Self::WorkerInternal => "worker_internal",
450        }
451    }
452
453    const fn counts_toward_restart_limit(self) -> bool {
454        matches!(
455            self,
456            Self::Timeout
457                | Self::Cancelled
458                | Self::ChildExit
459                | Self::ChildAbort
460                | Self::SessionMissing
461                | Self::RssHardLimit
462        )
463    }
464
465    const fn is_planned(self) -> bool {
466        !self.counts_toward_restart_limit()
467    }
468}
469
470fn restart_event(
471    cause: RestartCause,
472    reason: impl Into<String>,
473    worker_generation: u64,
474    rss_kib: Option<u64>,
475    limit_kib: Option<u64>,
476) -> RuntimeRestartEvent {
477    RuntimeRestartEvent {
478        cause: cause.as_str().to_owned(),
479        reason: reason.into(),
480        worker_generation,
481        planned: cause.is_planned(),
482        rss_kib,
483        limit_kib,
484    }
485}
486
487/// Per-project recycle tally over the worker's lifetime, all causes.
488///
489/// Recorded once per event at [`ProjectActorState::record_restart`] and copied
490/// into [`RuntimeSnapshot`] on publish, so the no-call and error paths report
491/// the same totals a live call would. This answers "how *often*, and why?"; the
492/// single most-recent event stays in `last_restart`.
493#[derive(Debug, Clone, Default)]
494struct RestartStats {
495    total: u64,
496    by_cause: BTreeMap<String, u64>,
497}
498
499impl RestartStats {
500    fn observe(&mut self, cause: &str) {
501        self.total = self.total.saturating_add(1);
502        let count = self.by_cause.entry(cause.to_owned()).or_default();
503        *count = count.saturating_add(1);
504    }
505}
506
507/// Emit one structured log line for a recycle. Level tracks the *signal*, not
508/// `planned`: crash/abnormal causes `warn`, memory-pressure cycles `info` (the
509/// frequency an operator tuning the RSS budget watches), pure hygiene `debug`.
510fn log_restart(event: &RuntimeRestartEvent, restarts_total: u64) {
511    macro_rules! emit {
512        ($level:ident, $msg:literal) => {
513            tracing::$level!(
514                cause = %event.cause,
515                reason = %event.reason,
516                worker_generation = event.worker_generation,
517                rss_kib = ?event.rss_kib,
518                limit_kib = ?event.limit_kib,
519                planned = event.planned,
520                restarts_total,
521                $msg
522            )
523        };
524    }
525    match event.cause.as_str() {
526        "rss_hard_limit_exceeded"
527        | "child_abort"
528        | "child_exit"
529        | "session_missing"
530        | "worker_internal"
531        | "timeout"
532        | "cancelled" => emit!(warn, "worker recycled (abnormal)"),
533        "rss_post_job" | "rss_import_switch" => emit!(info, "worker recycled (memory pressure)"),
534        _ => emit!(debug, "worker recycled (hygiene)"),
535    }
536}
537
538fn restart_cause_from_worker(reason: &LeanWorkerRestartReason) -> RestartCause {
539    match reason.stable_cause() {
540        "explicit" => RestartCause::Explicit,
541        "max_requests" => RestartCause::MaxRequests,
542        "max_imports" => RestartCause::MaxImports,
543        "rss_ceiling" => RestartCause::RssPostJob,
544        "rss_hard_limit" => RestartCause::RssHardLimit,
545        "idle" => RestartCause::Idle,
546        "cancelled" => RestartCause::Cancelled,
547        "timeout" => RestartCause::Timeout,
548        _ => RestartCause::WorkerInternal,
549    }
550}
551
552#[derive(Debug, Clone)]
553struct RuntimeSnapshot {
554    worker_generation: u64,
555    last_restart: Option<RuntimeRestartEvent>,
556    rss_kib: Option<u64>,
557    import_profile: Option<String>,
558    profile_switch_count: u64,
559    restarts_total: u64,
560    restarts_by_cause: BTreeMap<String, u64>,
561}
562
563impl RuntimeSnapshot {
564    fn facts(&self) -> RuntimeFacts {
565        RuntimeFacts {
566            worker_generation: self.worker_generation,
567            worker_restarted: false,
568            retry_count: 0,
569            admission_wait_millis: 0,
570            queue_wait_millis: 0,
571            call_restart: None,
572            last_restart: self.last_restart.clone(),
573            rss_kib: self.rss_kib,
574            worker_lanes: 1,
575            import_profile: self.import_profile.clone(),
576            profile_switch_count: self.profile_switch_count,
577            restarts_total: self.restarts_total,
578            restarts_by_cause: self.restarts_by_cause.clone(),
579        }
580    }
581}
582
583/// One Lake project, one supervised worker actor, one in-memory cache. Cheap
584/// to clone via `Arc`.
585pub(crate) struct LeanProject {
586    canonical_root: PathBuf,
587    toolchain: String,
588    package: Option<String>,
589    library: Option<String>,
590    manifest_hash: String,
591    session_id: String,
592    /// Toolchain-provenance advisories captured at open (unknown pin, missing
593    /// sidecar). Surfaced into every response's envelope warnings via
594    /// [`Self::freshness`]; empty for a fully-vouched-for worker.
595    open_warnings: Vec<String>,
596    actor_tx: Mutex<Option<mpsc::Sender<ProjectMessage>>>,
597    active_jobs: Arc<AtomicUsize>,
598    healthy: Arc<AtomicBool>,
599    runtime: Arc<Mutex<RuntimeSnapshot>>,
600    module_queries: ModuleQueryCache,
601}
602
603impl std::fmt::Debug for LeanProject {
604    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
605        f.debug_struct("LeanProject")
606            .field("canonical_root", &self.canonical_root)
607            .field("toolchain", &self.toolchain)
608            .field("package", &self.package)
609            .field("library", &self.library)
610            .field("manifest_hash", &self.manifest_hash)
611            .finish_non_exhaustive()
612    }
613}
614
615impl LeanProject {
616    pub(crate) fn open(meta: LakeProjectMeta, runtime_config: ProjectRuntimeConfig) -> Result<Arc<Self>> {
617        let session_id = uuid::Uuid::new_v4().to_string();
618        let runtime = Arc::new(Mutex::new(RuntimeSnapshot {
619            worker_generation: 1,
620            last_restart: None,
621            rss_kib: None,
622            import_profile: None,
623            profile_switch_count: 0,
624            restarts_total: 0,
625            restarts_by_cause: BTreeMap::new(),
626        }));
627        let active_jobs = Arc::new(AtomicUsize::new(0));
628        let healthy = Arc::new(AtomicBool::new(true));
629        let (config, open_warnings) = ActorConfig::from_meta(
630            &meta,
631            session_id.clone(),
632            Arc::clone(&runtime),
633            Arc::clone(&healthy),
634            runtime_config,
635        )?;
636        type InitMsg = std::result::Result<(String, mpsc::Sender<ProjectMessage>), ServerError>;
637        let (init_tx, init_rx) = std::sync::mpsc::channel::<InitMsg>();
638        let thread_name = actor_thread_name(&meta.canonical_root);
639
640        thread::Builder::new()
641            .name(thread_name)
642            .spawn(move || {
643                actor_main(config, init_tx);
644            })
645            .map_err(|e| ServerError::Internal(format!("spawn project actor thread: {e}")))?;
646
647        let (runtime_toolchain, actor_tx) = init_rx
648            .recv()
649            .map_err(|_| ServerError::Internal("project actor thread died during init".into()))??;
650
651        let cache_cap = NonZeroUsize::new(MODULE_QUERY_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
652        Ok(Arc::new(Self {
653            canonical_root: meta.canonical_root,
654            toolchain: runtime_toolchain,
655            package: meta.package,
656            library: meta.library,
657            manifest_hash: meta.manifest_hash,
658            session_id,
659            open_warnings,
660            actor_tx: Mutex::new(Some(actor_tx)),
661            active_jobs,
662            healthy,
663            runtime,
664            module_queries: ModuleQueryCache::with_capacity(cache_cap),
665        }))
666    }
667
668    /// Process one module query through this project's serialized worker actor.
669    ///
670    /// # Errors
671    ///
672    /// Returns `ServerError` when admission, mailbox enqueue, actor reply, or
673    /// worker execution fails.
674    pub(crate) async fn process_module_query(
675        &self,
676        imports: Vec<String>,
677        semantic_permit: SemanticPermit,
678        admission_wait_millis: u64,
679        source: String,
680        query: LeanWorkerModuleQuery,
681        options: LeanWorkerElabOptions,
682    ) -> Result<ProjectCall<LeanWorkerModuleQueryOutcome>> {
683        let (reply, rx) = oneshot::channel();
684        let message = ProjectMessage::ModuleQuery {
685            meta: self.job_meta(
686                imports,
687                RetryPolicy::RetryOnceReadOnly,
688                semantic_permit,
689                admission_wait_millis,
690            ),
691            source,
692            query,
693            options,
694            reply,
695        };
696        self.enqueue(message, rx).await
697    }
698
699    /// Process one module-query batch through this project's serialized worker actor.
700    ///
701    /// # Errors
702    ///
703    /// Returns `ServerError` when admission, mailbox enqueue, actor reply, or
704    /// worker execution fails.
705    pub(crate) async fn process_module_query_batch(
706        &self,
707        imports: Vec<String>,
708        semantic_permit: SemanticPermit,
709        admission_wait_millis: u64,
710        source: String,
711        selectors: Vec<LeanWorkerModuleQuerySelector>,
712        budgets: LeanWorkerOutputBudgets,
713        options: LeanWorkerElabOptions,
714    ) -> Result<ProjectCall<LeanWorkerModuleQueryBatchOutcome>> {
715        let (reply, rx) = oneshot::channel();
716        let message = ProjectMessage::ModuleQueryBatch {
717            meta: self.job_meta(
718                imports,
719                RetryPolicy::RetryOnceReadOnly,
720                semantic_permit,
721                admission_wait_millis,
722            ),
723            source,
724            selectors,
725            budgets,
726            options,
727            reply,
728        };
729        self.enqueue(message, rx).await
730    }
731
732    /// Inspect one declaration through this project's serialized worker actor.
733    ///
734    /// # Errors
735    ///
736    /// Returns `ServerError` when admission, mailbox enqueue, actor reply, or
737    /// worker execution fails.
738    pub(crate) async fn inspect_declaration(
739        &self,
740        imports: Vec<String>,
741        semantic_permit: SemanticPermit,
742        admission_wait_millis: u64,
743        request: LeanWorkerDeclarationInspectionRequest,
744    ) -> Result<ProjectCall<LeanWorkerDeclarationInspectionResult>> {
745        let (reply, rx) = oneshot::channel();
746        let message = ProjectMessage::DeclarationInspection {
747            meta: self.job_meta(
748                imports,
749                RetryPolicy::RetryOnceReadOnly,
750                semantic_permit,
751                admission_wait_millis,
752            ),
753            request,
754            reply,
755        };
756        self.enqueue(message, rx).await
757    }
758
759    /// Run bounded declaration search through this project's serialized worker actor.
760    ///
761    /// # Errors
762    ///
763    /// Returns `ServerError` when admission, mailbox enqueue, actor reply, or
764    /// worker execution fails.
765    pub(crate) async fn search_declarations(
766        &self,
767        imports: Vec<String>,
768        semantic_permit: SemanticPermit,
769        admission_wait_millis: u64,
770        request: LeanWorkerDeclarationSearch,
771    ) -> Result<ProjectCall<LeanWorkerDeclarationSearchResult>> {
772        let (reply, rx) = oneshot::channel();
773        let message = ProjectMessage::DeclarationSearch {
774            meta: self.job_meta(
775                imports,
776                RetryPolicy::RetryOnceReadOnly,
777                semantic_permit,
778                admission_wait_millis,
779            ),
780            request,
781            reply,
782        };
783        self.enqueue(message, rx).await
784    }
785
786    /// Run source-backed semantic proof search through this project's actor.
787    ///
788    /// # Errors
789    ///
790    /// Returns `ServerError` when semantic capability setup, mailbox enqueue,
791    /// actor reply, or worker execution fails.
792    pub(crate) async fn semantic_proof_search(
793        &self,
794        imports: Vec<String>,
795        semantic_permit: SemanticPermit,
796        admission_wait_millis: u64,
797        request: SemanticProofSearchRequest,
798    ) -> Result<ProjectCall<SemanticProofSearchResult>> {
799        let (reply, rx) = oneshot::channel();
800        let message = ProjectMessage::SemanticProofSearch {
801            meta: self.job_meta(
802                imports,
803                RetryPolicy::RetryOnceReadOnly,
804                semantic_permit,
805                admission_wait_millis,
806            ),
807            request,
808            reply,
809        };
810        self.enqueue(message, rx).await
811    }
812
813    /// Try proof fragments in-memory through this project's serialized worker actor.
814    ///
815    /// # Errors
816    ///
817    /// Returns `ServerError` when admission, mailbox enqueue, actor reply, or
818    /// worker execution fails.
819    pub(crate) async fn attempt_proof(
820        &self,
821        imports: Vec<String>,
822        semantic_permit: SemanticPermit,
823        admission_wait_millis: u64,
824        request: LeanWorkerProofAttemptRequest,
825        options: LeanWorkerElabOptions,
826    ) -> Result<ProjectCall<LeanWorkerProofAttemptResult>> {
827        let (reply, rx) = oneshot::channel();
828        let message = ProjectMessage::ProofAttempt {
829            meta: self.job_meta(
830                imports,
831                RetryPolicy::RetryOnceReadOnly,
832                semantic_permit,
833                admission_wait_millis,
834            ),
835            request,
836            options,
837            reply,
838        };
839        self.enqueue(message, rx).await
840    }
841
842    /// Verify one declaration in-memory through this project's serialized worker actor.
843    ///
844    /// # Errors
845    ///
846    /// Returns `ServerError` when admission, mailbox enqueue, actor reply, or
847    /// worker execution fails.
848    pub(crate) async fn verify_declaration(
849        &self,
850        imports: Vec<String>,
851        semantic_permit: SemanticPermit,
852        admission_wait_millis: u64,
853        request: LeanWorkerDeclarationVerificationRequest,
854        options: LeanWorkerElabOptions,
855    ) -> Result<ProjectCall<LeanWorkerDeclarationVerificationResult>> {
856        let (reply, rx) = oneshot::channel();
857        let message = ProjectMessage::DeclarationVerification {
858            meta: self.job_meta(
859                imports,
860                RetryPolicy::RetryOnceReadOnly,
861                semantic_permit,
862                admission_wait_millis,
863            ),
864            request,
865            options,
866            reply,
867        };
868        self.enqueue(message, rx).await
869    }
870
871    fn job_meta(
872        &self,
873        imports: Vec<String>,
874        retry_policy: RetryPolicy,
875        semantic_permit: SemanticPermit,
876        admission_wait_millis: u64,
877    ) -> JobMeta {
878        let created_at = Instant::now();
879        self.active_jobs.fetch_add(1, Ordering::AcqRel);
880        JobMeta {
881            import_fingerprint: import_fingerprint(&imports),
882            imports,
883            _created_at: created_at,
884            queued_at: Instant::now(),
885            admission_wait_millis,
886            _correlation_id: uuid::Uuid::new_v4(),
887            retry_policy,
888            _active_job: ActiveJobGuard {
889                active_jobs: Arc::clone(&self.active_jobs),
890            },
891            _semantic_permit: semantic_permit,
892        }
893    }
894
895    async fn enqueue<T>(
896        &self,
897        message: ProjectMessage,
898        reply_rx: oneshot::Receiver<Result<ProjectCall<T>>>,
899    ) -> Result<ProjectCall<T>>
900    where
901        T: Send + 'static,
902    {
903        let project_info = self.worker_error_context(message.imports());
904        let tx = self
905            .actor_tx
906            .lock()
907            .as_ref()
908            .cloned()
909            .ok_or_else(|| self.unavailable("project actor is stopped", false, false))?;
910        match tx.try_send(message) {
911            Ok(()) => {}
912            Err(mpsc::error::TrySendError::Full(_)) => {
913                return Err(ServerError::worker_unavailable(WorkerUnavailable {
914                    retryable: true,
915                    worker_restarted: false,
916                    reason: "mailbox_full".to_owned(),
917                    ..project_info
918                }));
919            }
920            Err(mpsc::error::TrySendError::Closed(_)) => {
921                self.shutdown();
922                return Err(ServerError::worker_unavailable(WorkerUnavailable {
923                    retryable: true,
924                    worker_restarted: false,
925                    reason: "mailbox_closed".to_owned(),
926                    ..project_info
927                }));
928            }
929        }
930
931        match reply_rx.await {
932            Ok(result) => result,
933            Err(_) => {
934                self.shutdown();
935                Err(self.unavailable("mailbox_closed_before_reply", true, false))
936            }
937        }
938    }
939
940    pub(crate) fn manifest_hash(&self) -> &str {
941        &self.manifest_hash
942    }
943
944    pub(crate) fn toolchain(&self) -> &str {
945        &self.toolchain
946    }
947
948    pub(crate) fn canonical_root(&self) -> &Path {
949        &self.canonical_root
950    }
951
952    pub(crate) fn module_query_cache(&self) -> &ModuleQueryCache {
953        &self.module_queries
954    }
955
956    #[must_use]
957    pub(crate) fn freshness(&self, request_imports: &[String]) -> Freshness {
958        Freshness {
959            project_root: self.canonical_root.to_string_lossy().into_owned(),
960            project_hash: self.manifest_hash.clone(),
961            imports: request_imports.to_vec(),
962            session_id: self.session_id.clone(),
963            lean_toolchain: self.toolchain.clone(),
964            toolchain_advisories: self.open_warnings.clone(),
965        }
966    }
967
968    #[must_use]
969    pub(crate) fn runtime_facts(&self) -> RuntimeFacts {
970        self.runtime.lock().facts()
971    }
972
973    pub(crate) fn shutdown(&self) {
974        self.healthy.store(false, Ordering::Release);
975        let _ = self.actor_tx.lock().take();
976    }
977
978    pub(crate) fn is_healthy(&self) -> bool {
979        self.healthy.load(Ordering::Acquire) && self.actor_tx.lock().as_ref().is_some_and(|tx| !tx.is_closed())
980    }
981
982    pub(crate) fn is_idle(&self) -> bool {
983        self.active_jobs.load(Ordering::Acquire) == 0
984    }
985
986    fn unavailable(&self, reason: impl Into<String>, retryable: bool, worker_restarted: bool) -> ServerError {
987        ServerError::worker_unavailable(WorkerUnavailable {
988            retryable,
989            worker_restarted,
990            reason: reason.into(),
991            ..self.worker_error_context(&[])
992        })
993    }
994
995    fn worker_error_context(&self, imports: &[String]) -> WorkerUnavailable {
996        let snapshot = self.runtime.lock().clone();
997        let runtime = snapshot.facts();
998        WorkerUnavailable {
999            retryable: true,
1000            worker_restarted: false,
1001            project_root: self.canonical_root.to_string_lossy().into_owned(),
1002            project_hash: self.manifest_hash.clone(),
1003            imports: imports.to_vec(),
1004            session_id: self.session_id.clone(),
1005            lean_toolchain: self.toolchain.clone(),
1006            worker_generation: snapshot.worker_generation,
1007            reason: String::new(),
1008            restart_cause: snapshot.last_restart.as_ref().map(|event| event.cause.clone()),
1009            rss_kib: snapshot.rss_kib,
1010            limit_kib: None,
1011            retry_after_millis: None,
1012            restarts_in_window: None,
1013            window_millis: None,
1014            runtime,
1015            toolchain_advisories: self.open_warnings.clone(),
1016        }
1017    }
1018}
1019
1020impl Drop for LeanProject {
1021    fn drop(&mut self) {
1022        self.shutdown();
1023    }
1024}
1025
1026#[derive(Clone)]
1027struct ActorConfig {
1028    lake_root: PathBuf,
1029    manifest_hash: String,
1030    toolchain_label: String,
1031    worker_path: PathBuf,
1032    lean_sysroot: PathBuf,
1033    session_id: String,
1034    runtime: Arc<Mutex<RuntimeSnapshot>>,
1035    healthy: Arc<AtomicBool>,
1036    worker_rss_post_job_restart_kib: u64,
1037    worker_rss_hard_kill_kib: u64,
1038    worker_rss_sample_millis: u64,
1039    import_switch_rss_soft_kib: u64,
1040    module_cache_rss_guard_kib: u64,
1041    module_cache_max_bytes: u64,
1042    request_timeout_millis: u64,
1043    mailbox_capacity: usize,
1044    max_restarts_per_window: usize,
1045    restart_window: Duration,
1046    /// Open-time toolchain advisories (unknown pin, missing sidecar, no smoke
1047    /// record). The actor carries them so a `runtime_unavailable` it produces
1048    /// after worker death still flags a suspect worker. Mirrors
1049    /// [`LeanProject::open_warnings`]; both come from the one
1050    /// [`WorkerBinary::resolve_ready_for`] verdict at open.
1051    toolchain_advisories: Vec<String>,
1052}
1053
1054impl ActorConfig {
1055    /// Resolve the pinned toolchain into a spawnable config plus any
1056    /// open-time provenance advisories. All version-drift situations collapse
1057    /// into the one [`WorkerBinary::resolve_ready_for`] verdict: hard failures
1058    /// become a typed [`ServerError::BadProject`] carrying the corrective
1059    /// command; soft ones (unknown pin, missing sidecar) ride along as
1060    /// warnings the project surfaces in every envelope.
1061    fn from_meta(
1062        meta: &LakeProjectMeta,
1063        session_id: String,
1064        runtime: Arc<Mutex<RuntimeSnapshot>>,
1065        healthy: Arc<AtomicBool>,
1066        runtime_config: ProjectRuntimeConfig,
1067    ) -> Result<(Self, Vec<String>)> {
1068        let toolchain_id = ToolchainId::parse(&meta.toolchain).map_err(|e| ServerError::BadProject(e.to_string()))?;
1069        let (worker_path, lean_sysroot, open_warnings) = match WorkerBinary::resolve_ready_for(&toolchain_id) {
1070            Readiness::Ready {
1071                worker,
1072                lean_sysroot,
1073                note,
1074            } => (worker.path, lean_sysroot, note.into_iter().collect()),
1075            Readiness::UnknownPin {
1076                pin,
1077                worker,
1078                lean_sysroot,
1079            } => (
1080                worker.path,
1081                lean_sysroot,
1082                vec![format!(
1083                    "lean-toolchain pins {pin}, which is not a recognized lean-rs supported version \
1084                     (e.g. a nightly); proceeding, but the host cannot vouch for ABI compatibility"
1085                )],
1086            ),
1087            Readiness::Unsupported { window, nearest } => {
1088                return Err(ServerError::BadProject(format!(
1089                    "lean-toolchain pins {toolchain_id}, outside the lean-rs supported window {window}; \
1090                     nearest supported: {nearest}. Pin a supported toolchain (or bump lean-rs) and reopen."
1091                )));
1092            }
1093            Readiness::Stale { toolchain, install_cmd } => {
1094                return Err(ServerError::BadProject(format!(
1095                    "worker for {toolchain} was built against a different lean.h than the toolchain now \
1096                     provides (header drift); rebuild it: {install_cmd}"
1097                )));
1098            }
1099            Readiness::Unusable {
1100                toolchain,
1101                detail,
1102                install_cmd,
1103            } => {
1104                return Err(ServerError::BadProject(format!(
1105                    "worker for {toolchain} failed its runtime smoke test ({detail}); the toolchain's \
1106                     libleanshared is ABI-incompatible with this lean-rs build and cannot be served. \
1107                     Pin a supported toolchain the host can run, or rebuild lean-rs and reinstall: {install_cmd}"
1108                )));
1109            }
1110            Readiness::NotInstalled { toolchain, install_cmd } => {
1111                return Err(ServerError::BadProject(format!(
1112                    "no worker binary for toolchain {toolchain}; run: {install_cmd}"
1113                )));
1114            }
1115            Readiness::ToolchainNotInstalled { toolchain, elan_dir } => {
1116                return Err(ServerError::BadProject(format!(
1117                    "elan toolchain {toolchain} is not installed (expected {})",
1118                    elan_dir.display()
1119                )));
1120            }
1121        };
1122        tracing::debug!(
1123            toolchain = %toolchain_id,
1124            worker = %worker_path.display(),
1125            sysroot = %lean_sysroot.display(),
1126            "resolved ready worker binary"
1127        );
1128        let config = Self {
1129            lake_root: meta.canonical_root.clone(),
1130            manifest_hash: meta.manifest_hash.clone(),
1131            toolchain_label: meta.toolchain.clone(),
1132            worker_path,
1133            lean_sysroot,
1134            session_id,
1135            runtime,
1136            healthy,
1137            worker_rss_post_job_restart_kib: runtime_config.worker_rss_post_job_restart_kib(),
1138            worker_rss_hard_kill_kib: runtime_config.worker_rss_hard_kill_kib(),
1139            worker_rss_sample_millis: runtime_config.worker_rss_sample_millis(),
1140            import_switch_rss_soft_kib: runtime_config.import_switch_rss_soft_kib(),
1141            module_cache_rss_guard_kib: runtime_config.module_cache_rss_guard_kib(),
1142            module_cache_max_bytes: runtime_config.module_cache_max_bytes(),
1143            request_timeout_millis: runtime_config.request_timeout_millis(),
1144            mailbox_capacity: runtime_config.mailbox_capacity(),
1145            max_restarts_per_window: runtime_config.max_restarts_per_window(),
1146            restart_window: runtime_config.restart_window(),
1147            toolchain_advisories: open_warnings.clone(),
1148        };
1149        Ok((config, open_warnings))
1150    }
1151}
1152
1153struct ProjectActorState {
1154    config: ActorConfig,
1155    handle: LeanWorkerHostHandle,
1156    worker_generation_base: u64,
1157    last_restart: Option<RuntimeRestartEvent>,
1158    last_import_fingerprint: Option<String>,
1159    profile_switch_count: u64,
1160    last_rss_kib: Option<u64>,
1161    runtime: Arc<Mutex<RuntimeSnapshot>>,
1162    abnormal_restart_times: VecDeque<Instant>,
1163    restart_stats: RestartStats,
1164}
1165
1166impl ProjectActorState {
1167    fn handle_message(&mut self, message: ProjectMessage) {
1168        match message {
1169            ProjectMessage::ModuleQuery {
1170                meta,
1171                source,
1172                query,
1173                options,
1174                reply,
1175            } => {
1176                let result = self.run_job(meta, |handle, imports| {
1177                    handle.process_module_query_with_imports(imports, &source, &query, &options, None, None)
1178                });
1179                let _ = reply.send(result);
1180            }
1181            ProjectMessage::ModuleQueryBatch {
1182                meta,
1183                source,
1184                selectors,
1185                budgets,
1186                options,
1187                reply,
1188            } => {
1189                let result = self.run_job(meta, |handle, imports| {
1190                    handle.process_module_query_batch_with_imports(
1191                        imports, &source, &selectors, &budgets, &options, None, None,
1192                    )
1193                });
1194                let _ = reply.send(result);
1195            }
1196            ProjectMessage::DeclarationInspection { meta, request, reply } => {
1197                let result = self.run_job(meta, |handle, imports| {
1198                    handle.inspect_declaration_with_imports(imports, &request, None, None)
1199                });
1200                let _ = reply.send(result);
1201            }
1202            ProjectMessage::DeclarationSearch { meta, request, reply } => {
1203                let result = self.run_job(meta, |handle, imports| {
1204                    handle.search_declarations_with_imports(imports, &request, None, None)
1205                });
1206                let _ = reply.send(result);
1207            }
1208            ProjectMessage::ProofAttempt {
1209                meta,
1210                request,
1211                options,
1212                reply,
1213            } => {
1214                let result = self.run_job(meta, |handle, imports| {
1215                    handle.attempt_proof_with_imports(imports, &request, &options, None, None)
1216                });
1217                let _ = reply.send(result);
1218            }
1219            ProjectMessage::DeclarationVerification {
1220                meta,
1221                request,
1222                options,
1223                reply,
1224            } => {
1225                let result = self.run_job(meta, |handle, imports| {
1226                    handle.verify_declaration_with_imports(imports, &request, &options, None, None)
1227                });
1228                let _ = reply.send(result);
1229            }
1230            ProjectMessage::SemanticProofSearch { meta, request, reply } => {
1231                let result = self.run_semantic_job(meta, &request);
1232                let _ = reply.send(result);
1233            }
1234        }
1235    }
1236
1237    fn run_job<R>(
1238        &mut self,
1239        meta: JobMeta,
1240        job: impl Fn(&mut LeanWorkerHostHandle, Vec<String>) -> std::result::Result<R, LeanWorkerError>,
1241    ) -> Result<ProjectCall<R>> {
1242        // Runs on the project's dedicated actor thread (no async), so an entered
1243        // span is correct and ties every nested worker/recycle log to this call.
1244        let _span = tracing::debug_span!(
1245            "job",
1246            session_id = %self.config.session_id,
1247            imports = meta.imports.len(),
1248            queue_wait_millis = millis_u64(meta.queued_at.elapsed()),
1249        )
1250        .entered();
1251        let queue_wait_millis = millis_u64(meta.queued_at.elapsed());
1252        let generation_before = self.observed_generation();
1253        let mut call_restart = self.cycle_before_import_switch_if_needed(&meta)?;
1254        let mut lifecycle_baseline = self.handle.lifecycle_snapshot();
1255
1256        let max_retries = meta.retry_policy.retries();
1257        let mut retry_count = 0_u32;
1258        loop {
1259            match job(&mut self.handle, meta.imports.clone()) {
1260                Ok(value) => {
1261                    if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1262                        call_restart = Some(event);
1263                    }
1264                    self.last_import_fingerprint = Some(meta.import_fingerprint.clone());
1265                    self.last_rss_kib = self.handle.rss_kib().or(self.last_rss_kib);
1266                    if let Some(event) = self.cycle_after_post_job_rss_if_needed(&meta)? {
1267                        call_restart = Some(event);
1268                    }
1269                    let runtime =
1270                        self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1271                    tracing::debug!(
1272                        retry_count,
1273                        rss_kib = ?runtime.rss_kib,
1274                        worker_generation = runtime.worker_generation,
1275                        "job complete"
1276                    );
1277                    self.publish_runtime(&runtime);
1278                    return Ok(ProjectCall::new(value, runtime));
1279                }
1280                Err(err) if worker_error_is_recoverable_death(&err) && retry_count < max_retries => {
1281                    self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)?;
1282                    let first_reason = err.to_string();
1283                    call_restart =
1284                        Some(self.rebuild_after_worker_death(first_reason, worker_death_cause(&err), &meta)?);
1285                    lifecycle_baseline = self.handle.lifecycle_snapshot();
1286                    retry_count = retry_count.saturating_add(1);
1287                }
1288                Err(err) if worker_error_is_recoverable_death(&err) => {
1289                    if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1290                        call_restart = Some(event);
1291                    }
1292                    let reason = format!("worker_died_after_retry: {err}");
1293                    let generation = self.observed_generation();
1294                    let runtime =
1295                        self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1296                    self.publish_runtime(&runtime);
1297                    return Err(self.worker_unavailable_for(
1298                        &meta,
1299                        reason,
1300                        true,
1301                        generation > generation_before,
1302                        Some(worker_death_cause(&err)),
1303                        None,
1304                        None,
1305                    ));
1306                }
1307                Err(err) if worker_error_is_session_missing(&err) && retry_count < max_retries => {
1308                    self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)?;
1309                    call_restart = Some(self.rebuild_after_worker_death(
1310                        format!("session_missing: {err}"),
1311                        RestartCause::SessionMissing,
1312                        &meta,
1313                    )?);
1314                    lifecycle_baseline = self.handle.lifecycle_snapshot();
1315                    retry_count = retry_count.saturating_add(1);
1316                }
1317                Err(err) if worker_error_is_session_missing(&err) => {
1318                    if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1319                        call_restart = Some(event);
1320                    }
1321                    let generation = self.observed_generation();
1322                    let runtime =
1323                        self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1324                    self.publish_runtime(&runtime);
1325                    return Err(self.worker_unavailable_for(
1326                        &meta,
1327                        format!("session_missing: {err}"),
1328                        true,
1329                        generation > generation_before,
1330                        Some(RestartCause::SessionMissing),
1331                        None,
1332                        None,
1333                    ));
1334                }
1335                Err(LeanWorkerError::RssHardLimitExceeded {
1336                    operation,
1337                    current_kib,
1338                    limit_kib,
1339                    ..
1340                }) => {
1341                    if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1342                        call_restart = Some(event);
1343                    }
1344                    let runtime =
1345                        self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1346                    self.publish_runtime(&runtime);
1347                    return Err(self.worker_unavailable_for(
1348                        &meta,
1349                        format!(
1350                            "rss_hard_limit_exceeded operation={operation} current_kib={current_kib} limit_kib={limit_kib}"
1351                        ),
1352                        false,
1353                        true,
1354                        Some(RestartCause::RssHardLimit),
1355                        Some(limit_kib),
1356                        None,
1357                    ));
1358                }
1359                Err(err) if matches!(err, LeanWorkerError::Timeout { .. }) => {
1360                    if let Some(event) = self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)? {
1361                        call_restart = Some(event);
1362                    }
1363                    let generation = self.observed_generation();
1364                    let runtime =
1365                        self.runtime_facts(&meta, generation_before, retry_count, queue_wait_millis, call_restart);
1366                    self.publish_runtime(&runtime);
1367                    return Err(self.worker_unavailable_for(
1368                        &meta,
1369                        format!("timeout: {err}"),
1370                        true,
1371                        generation > generation_before,
1372                        Some(RestartCause::Timeout),
1373                        None,
1374                        None,
1375                    ));
1376                }
1377                Err(err) => {
1378                    self.account_lifecycle_restarts_since(&lifecycle_baseline, &meta)?;
1379                    self.last_import_fingerprint = Some(meta.import_fingerprint.clone());
1380                    return Err(map_worker_err(err));
1381                }
1382            }
1383        }
1384    }
1385
1386    fn run_semantic_job(
1387        &self,
1388        meta: JobMeta,
1389        request: &SemanticProofSearchRequest,
1390    ) -> Result<ProjectCall<SemanticProofSearchResult>> {
1391        let _span = tracing::debug_span!(
1392            "semantic_job",
1393            session_id = %self.config.session_id,
1394            imports = meta.imports.len(),
1395            queue_wait_millis = millis_u64(meta.queued_at.elapsed()),
1396        )
1397        .entered();
1398        let queue_wait_millis = millis_u64(meta.queued_at.elapsed());
1399        let generation_before = self.observed_generation();
1400        let mut capability = self.open_semantic_capability(&meta)?;
1401        let result = {
1402            let mut session = capability
1403                .open_session_with_imports(meta.imports.clone(), None, None)
1404                .map_err(map_worker_err)?;
1405            crate::semantic_search::run_semantic_proof_search(&mut session, request)
1406        };
1407        let runtime = self.runtime_facts(&meta, generation_before, 0, queue_wait_millis, None);
1408        self.publish_runtime(&runtime);
1409        result.map(|value| ProjectCall::new(value, runtime))
1410    }
1411
1412    fn open_semantic_capability(&self, meta: &JobMeta) -> Result<lean_rs_worker_parent::LeanWorkerCapability> {
1413        let runtime = lean_semantic_search_runtime::build_cached(SemanticSearchRuntimeBuild {
1414            cache_root: semantic_runtime_cache_root()?,
1415            toolchain_label: self.config.toolchain_label.clone(),
1416            lean_sysroot: self.config.lean_sysroot.clone(),
1417        })
1418        .map_err(|err| {
1419            self.worker_unavailable_for(
1420                meta,
1421                format!("semantic runtime build failed for this toolchain: {err}"),
1422                true,
1423                false,
1424                None,
1425                None,
1426                None,
1427            )
1428        })?;
1429        semantic_capability_builder(&self.config, &runtime.built)?
1430            .open()
1431            .map_err(|err| {
1432                self.worker_unavailable_for(
1433                    meta,
1434                    format!(
1435                        "semantic capability open failed for this toolchain: {}",
1436                        map_worker_err(err)
1437                    ),
1438                    true,
1439                    false,
1440                    None,
1441                    None,
1442                    None,
1443                )
1444            })
1445    }
1446
1447    fn cycle_before_import_switch_if_needed(&mut self, meta: &JobMeta) -> Result<Option<RuntimeRestartEvent>> {
1448        let Some(previous_import_fingerprint) = self.last_import_fingerprint.as_deref() else {
1449            return Ok(None);
1450        };
1451        if previous_import_fingerprint == meta.import_fingerprint {
1452            return Ok(None);
1453        }
1454        self.profile_switch_count = self.profile_switch_count.saturating_add(1);
1455        let Some(current_kib) = self.handle.rss_kib() else {
1456            return Ok(None);
1457        };
1458        self.last_rss_kib = Some(current_kib);
1459        if current_kib < self.config.import_switch_rss_soft_kib {
1460            return Ok(None);
1461        }
1462        let limit_kib = self.config.import_switch_rss_soft_kib;
1463        let reason = format!("rss_import_switch current_kib={current_kib} limit_kib={limit_kib}");
1464        self.record_restart_or_stop(RestartCause::RssImportSwitch, &reason)
1465            .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1466        self.handle.cycle().map_err(map_worker_err)?;
1467        self.last_rss_kib = self.handle.rss_kib().or(Some(current_kib));
1468        let event = restart_event(
1469            RestartCause::RssImportSwitch,
1470            reason,
1471            self.observed_generation(),
1472            Some(current_kib),
1473            Some(limit_kib),
1474        );
1475        self.record_restart(event.clone());
1476        Ok(Some(event))
1477    }
1478
1479    fn cycle_after_post_job_rss_if_needed(&mut self, meta: &JobMeta) -> Result<Option<RuntimeRestartEvent>> {
1480        let Some(current_kib) = self.handle.rss_kib() else {
1481            return Ok(None);
1482        };
1483        self.last_rss_kib = Some(current_kib);
1484        let limit_kib = self.config.worker_rss_post_job_restart_kib;
1485        tracing::debug!(rss_kib = current_kib, limit_kib, "post-job rss check");
1486        if current_kib < limit_kib {
1487            return Ok(None);
1488        }
1489        let reason = format!("rss_post_job current_kib={current_kib} limit_kib={limit_kib}");
1490        self.record_restart_or_stop(RestartCause::RssPostJob, &reason)
1491            .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1492        self.handle.cycle().map_err(map_worker_err)?;
1493        self.last_rss_kib = self.handle.rss_kib().or(Some(current_kib));
1494        let event = restart_event(
1495            RestartCause::RssPostJob,
1496            reason,
1497            self.observed_generation(),
1498            Some(current_kib),
1499            Some(limit_kib),
1500        );
1501        self.record_restart(event.clone());
1502        Ok(Some(event))
1503    }
1504
1505    fn rebuild_after_worker_death(
1506        &mut self,
1507        reason: String,
1508        cause: RestartCause,
1509        meta: &JobMeta,
1510    ) -> Result<RuntimeRestartEvent> {
1511        self.record_restart_or_stop(cause, &reason)
1512            .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1513        let next_generation = self.observed_generation().saturating_add(1);
1514        let (handle, _) = open_worker(&self.config, false)?;
1515        self.handle = handle;
1516        self.worker_generation_base = next_generation;
1517        self.last_rss_kib = self.handle.rss_kib().or(self.last_rss_kib);
1518        let event = restart_event(cause, reason, self.observed_generation(), self.last_rss_kib, None);
1519        self.record_restart(event.clone());
1520        Ok(event)
1521    }
1522
1523    fn account_lifecycle_restarts_since(
1524        &mut self,
1525        before: &LeanWorkerLifecycleSnapshot,
1526        meta: &JobMeta,
1527    ) -> Result<Option<RuntimeRestartEvent>> {
1528        let after = self.handle.lifecycle_snapshot();
1529        let restarted = after.restarts.saturating_sub(before.restarts);
1530        if restarted == 0 {
1531            self.last_rss_kib = after.last_rss_kib.or(self.last_rss_kib);
1532            return Ok(None);
1533        }
1534        let (cause, reason) = after.last_restart_reason.as_ref().map_or_else(
1535            || (RestartCause::WorkerInternal, "worker_internal_restart".to_owned()),
1536            |reason| (restart_cause_from_worker(reason), restart_reason_text(reason)),
1537        );
1538        for _ in 0..restarted {
1539            self.record_restart_or_stop(cause, &reason)
1540                .map_err(|limit| self.restart_limit_error(&meta.imports, limit))?;
1541        }
1542        self.last_rss_kib = after.last_rss_kib.or(self.last_rss_kib);
1543        let event = restart_event(cause, reason, self.observed_generation(), self.last_rss_kib, None);
1544        self.record_restart(event.clone());
1545        Ok(Some(event))
1546    }
1547
1548    /// The single place a recycle becomes observable: tally it for frequency
1549    /// reporting, log it at a signal-appropriate level, and store it as the
1550    /// latest event. Every restart path funnels through here, so adding one is
1551    /// a single call. Kept distinct from [`Self::record_restart_or_stop`], which
1552    /// owns the orthogonal sliding-window health *policy*.
1553    fn record_restart(&mut self, event: RuntimeRestartEvent) {
1554        self.restart_stats.observe(&event.cause);
1555        log_restart(&event, self.restart_stats.total);
1556        self.last_restart = Some(event);
1557    }
1558
1559    fn record_restart_or_stop(
1560        &mut self,
1561        cause: RestartCause,
1562        reason: &str,
1563    ) -> std::result::Result<(), RestartLimitExceeded> {
1564        if !cause.counts_toward_restart_limit() {
1565            return Ok(());
1566        }
1567        let now = Instant::now();
1568        while self
1569            .abnormal_restart_times
1570            .front()
1571            .is_some_and(|seen| now.saturating_duration_since(*seen) > self.config.restart_window)
1572        {
1573            self.abnormal_restart_times.pop_front();
1574        }
1575        if self.abnormal_restart_times.len() >= self.config.max_restarts_per_window {
1576            self.config.healthy.store(false, Ordering::Release);
1577            tracing::warn!(
1578                cause = cause.as_str(),
1579                restarts_in_window = self.abnormal_restart_times.len(),
1580                window_millis = millis_u64(self.config.restart_window),
1581                "restart limit exceeded; marking project unhealthy"
1582            );
1583            let message = format!(
1584                "restart_limit_exceeded after {} restarts in {:?}; latest: {reason}",
1585                self.config.max_restarts_per_window, self.config.restart_window
1586            );
1587            let event = restart_event(
1588                cause,
1589                message.clone(),
1590                self.observed_generation(),
1591                self.last_rss_kib,
1592                None,
1593            );
1594            self.record_restart(event.clone());
1595            self.publish_runtime(&RuntimeFacts {
1596                worker_generation: self.observed_generation(),
1597                worker_restarted: false,
1598                retry_count: MAX_JOB_RETRIES,
1599                admission_wait_millis: 0,
1600                queue_wait_millis: 0,
1601                call_restart: None,
1602                last_restart: Some(event),
1603                rss_kib: self.last_rss_kib,
1604                worker_lanes: 1,
1605                import_profile: self.last_import_fingerprint.clone(),
1606                profile_switch_count: self.profile_switch_count,
1607                restarts_total: self.restart_stats.total,
1608                restarts_by_cause: self.restart_stats.by_cause.clone(),
1609            });
1610            return Err(RestartLimitExceeded {
1611                message,
1612                cause,
1613                restarts_in_window: self.abnormal_restart_times.len() as u64,
1614                window_millis: millis_u64(self.config.restart_window),
1615            });
1616        }
1617        self.abnormal_restart_times.push_back(now);
1618        Ok(())
1619    }
1620
1621    fn observed_generation(&self) -> u64 {
1622        self.worker_generation_base
1623            .saturating_add(self.handle.lifecycle_snapshot().worker_generation)
1624    }
1625
1626    fn runtime_facts(
1627        &self,
1628        meta: &JobMeta,
1629        generation_before: u64,
1630        retry_count: u32,
1631        queue_wait_millis: u64,
1632        call_restart: Option<RuntimeRestartEvent>,
1633    ) -> RuntimeFacts {
1634        let generation = self.observed_generation();
1635        let snapshot = self.handle.lifecycle_snapshot();
1636        RuntimeFacts {
1637            worker_generation: generation,
1638            worker_restarted: call_restart.is_some() || generation > generation_before,
1639            retry_count,
1640            admission_wait_millis: meta.admission_wait_millis,
1641            queue_wait_millis,
1642            call_restart,
1643            last_restart: self.last_restart.clone(),
1644            rss_kib: snapshot.last_rss_kib.or(self.last_rss_kib),
1645            worker_lanes: 1,
1646            import_profile: Some(meta.import_fingerprint.clone()),
1647            profile_switch_count: self.profile_switch_count,
1648            restarts_total: self.restart_stats.total,
1649            restarts_by_cause: self.restart_stats.by_cause.clone(),
1650        }
1651    }
1652
1653    fn publish_runtime(&self, runtime: &RuntimeFacts) {
1654        *self.runtime.lock() = RuntimeSnapshot {
1655            worker_generation: runtime.worker_generation,
1656            last_restart: runtime.last_restart.clone().or_else(|| runtime.call_restart.clone()),
1657            rss_kib: runtime.rss_kib,
1658            import_profile: runtime.import_profile.clone(),
1659            profile_switch_count: runtime.profile_switch_count,
1660            restarts_total: runtime.restarts_total,
1661            restarts_by_cause: runtime.restarts_by_cause.clone(),
1662        };
1663    }
1664
1665    fn worker_unavailable_for(
1666        &self,
1667        meta: &JobMeta,
1668        reason: String,
1669        retryable: bool,
1670        worker_restarted: bool,
1671        cause: Option<RestartCause>,
1672        limit_kib: Option<u64>,
1673        retry_after_millis: Option<u64>,
1674    ) -> ServerError {
1675        let snapshot = self.runtime.lock().facts();
1676        ServerError::worker_unavailable(WorkerUnavailable {
1677            retryable,
1678            worker_restarted,
1679            project_root: self.config.lake_root.to_string_lossy().into_owned(),
1680            project_hash: self.config.manifest_hash.clone(),
1681            imports: meta.imports.clone(),
1682            session_id: self.config.session_id.clone(),
1683            lean_toolchain: self.config.toolchain_label.clone(),
1684            worker_generation: self.observed_generation(),
1685            restart_cause: cause.map(|cause| cause.as_str().to_owned()),
1686            rss_kib: self.last_rss_kib,
1687            limit_kib,
1688            retry_after_millis,
1689            restarts_in_window: Some(self.abnormal_restart_times.len() as u64),
1690            window_millis: Some(millis_u64(self.config.restart_window)),
1691            runtime: snapshot,
1692            reason,
1693            toolchain_advisories: self.config.toolchain_advisories.clone(),
1694        })
1695    }
1696
1697    fn restart_limit_error(&self, imports: &[String], limit: RestartLimitExceeded) -> ServerError {
1698        let snapshot = self.runtime.lock().facts();
1699        ServerError::worker_unavailable(WorkerUnavailable {
1700            retryable: false,
1701            worker_restarted: false,
1702            project_root: self.config.lake_root.to_string_lossy().into_owned(),
1703            project_hash: self.config.manifest_hash.clone(),
1704            imports: imports.to_vec(),
1705            session_id: self.config.session_id.clone(),
1706            lean_toolchain: self.config.toolchain_label.clone(),
1707            worker_generation: self.observed_generation(),
1708            reason: limit.message,
1709            restart_cause: Some(limit.cause.as_str().to_owned()),
1710            rss_kib: self.last_rss_kib,
1711            limit_kib: None,
1712            retry_after_millis: Some(limit.window_millis),
1713            restarts_in_window: Some(limit.restarts_in_window),
1714            window_millis: Some(limit.window_millis),
1715            runtime: snapshot,
1716            toolchain_advisories: self.config.toolchain_advisories.clone(),
1717        })
1718    }
1719}
1720
1721struct RestartLimitExceeded {
1722    message: String,
1723    cause: RestartCause,
1724    restarts_in_window: u64,
1725    window_millis: u64,
1726}
1727
1728fn actor_main(
1729    config: ActorConfig,
1730    init_reply: std::sync::mpsc::Sender<std::result::Result<(String, mpsc::Sender<ProjectMessage>), ServerError>>,
1731) {
1732    let (handle, runtime_toolchain) = match open_worker(&config, true) {
1733        Ok(value) => value,
1734        Err(err) => {
1735            let _ = init_reply.send(Err(err));
1736            return;
1737        }
1738    };
1739    let mut state = ProjectActorState {
1740        config: config.clone(),
1741        handle,
1742        worker_generation_base: 1,
1743        last_restart: None,
1744        last_import_fingerprint: None,
1745        profile_switch_count: 0,
1746        last_rss_kib: None,
1747        runtime: Arc::clone(&config.runtime),
1748        abnormal_restart_times: VecDeque::new(),
1749        restart_stats: RestartStats::default(),
1750    };
1751
1752    let (tx, mut rx) = mpsc::channel::<ProjectMessage>(config.mailbox_capacity);
1753    if init_reply.send(Ok((runtime_toolchain, tx))).is_err() {
1754        return;
1755    }
1756
1757    while let Some(message) = rx.blocking_recv() {
1758        state.handle_message(message);
1759    }
1760}
1761
1762fn open_worker(config: &ActorConfig, preflight: bool) -> Result<(LeanWorkerHostHandle, String)> {
1763    let builder = worker_builder(config);
1764    if preflight {
1765        let report = builder.check();
1766        if let Some(first) = report.first_error() {
1767            if bootstrap_failure_is_hard_rss(&first.code().to_string(), first.message()) {
1768                return Err(bootstrap_hard_rss_unavailable(
1769                    config,
1770                    first.message().to_owned(),
1771                    parse_keyed_u64(first.message(), "current_kib"),
1772                    parse_keyed_u64(first.message(), "limit_kib").or(Some(config.worker_rss_hard_kill_kib)),
1773                ));
1774            }
1775            return Err(ServerError::BadProject(format!(
1776                "{}: {}",
1777                first.code(),
1778                first.message()
1779            )));
1780        }
1781    }
1782    let handle = match builder.open() {
1783        Ok(handle) => handle,
1784        Err(LeanWorkerError::RssHardLimitExceeded {
1785            operation,
1786            current_kib,
1787            limit_kib,
1788            ..
1789        }) => {
1790            return Err(bootstrap_hard_rss_unavailable(
1791                config,
1792                format!(
1793                    "rss_hard_limit_exceeded operation={operation} current_kib={current_kib} limit_kib={limit_kib}"
1794                ),
1795                Some(current_kib),
1796                Some(limit_kib),
1797            ));
1798        }
1799        Err(err) => return Err(map_worker_err(err)),
1800    };
1801    let runtime_toolchain = handle
1802        .runtime_metadata()
1803        .lean_version
1804        .unwrap_or_else(|| config.toolchain_label.clone());
1805    Ok((handle, runtime_toolchain))
1806}
1807
1808fn bootstrap_failure_is_hard_rss(code: &str, message: &str) -> bool {
1809    let lower = message.to_lowercase();
1810    code.contains("rss")
1811        || lower.contains("hard rss limit")
1812        || lower.contains("rss_hard_limit")
1813        || lower.contains("rss_hard_limit_exceeded")
1814        || lower.contains("rss hard limit")
1815}
1816
1817fn bootstrap_hard_rss_unavailable(
1818    config: &ActorConfig,
1819    reason: String,
1820    current_kib: Option<u64>,
1821    limit_kib: Option<u64>,
1822) -> ServerError {
1823    config.healthy.store(false, Ordering::Release);
1824    let generation = config.runtime.lock().worker_generation;
1825    let event = restart_event(
1826        RestartCause::RssHardLimit,
1827        reason.clone(),
1828        generation,
1829        current_kib,
1830        limit_kib,
1831    );
1832    // First-spawn worker tripped the hard RSS limit before serving a call: a
1833    // single terminal recycle. Emit the same log line a live recycle would, so
1834    // the cause is visible on stderr even when the project never opens.
1835    let restarts_total = 1;
1836    log_restart(&event, restarts_total);
1837    let by_cause = BTreeMap::from([(RestartCause::RssHardLimit.as_str().to_owned(), restarts_total)]);
1838    let runtime = RuntimeFacts {
1839        worker_generation: generation,
1840        worker_restarted: true,
1841        retry_count: 0,
1842        admission_wait_millis: 0,
1843        queue_wait_millis: 0,
1844        call_restart: Some(event.clone()),
1845        last_restart: Some(event.clone()),
1846        rss_kib: current_kib,
1847        worker_lanes: 1,
1848        import_profile: None,
1849        profile_switch_count: 0,
1850        restarts_total,
1851        restarts_by_cause: by_cause.clone(),
1852    };
1853    *config.runtime.lock() = RuntimeSnapshot {
1854        worker_generation: generation,
1855        last_restart: Some(event),
1856        rss_kib: current_kib,
1857        import_profile: None,
1858        profile_switch_count: 0,
1859        restarts_total,
1860        restarts_by_cause: by_cause,
1861    };
1862    ServerError::worker_unavailable(WorkerUnavailable {
1863        retryable: false,
1864        worker_restarted: true,
1865        project_root: config.lake_root.to_string_lossy().into_owned(),
1866        project_hash: config.manifest_hash.clone(),
1867        imports: Vec::new(),
1868        session_id: config.session_id.clone(),
1869        lean_toolchain: config.toolchain_label.clone(),
1870        worker_generation: generation,
1871        reason,
1872        restart_cause: Some(RestartCause::RssHardLimit.as_str().to_owned()),
1873        rss_kib: current_kib,
1874        limit_kib,
1875        retry_after_millis: None,
1876        restarts_in_window: None,
1877        window_millis: None,
1878        runtime,
1879        toolchain_advisories: config.toolchain_advisories.clone(),
1880    })
1881}
1882
1883fn worker_builder(config: &ActorConfig) -> LeanWorkerHostHandleBuilder {
1884    let restart_policy = LeanWorkerRestartPolicy::default().max_requests(WORKER_REQUEST_RESTARTS);
1885    let module_cache_limits = module_cache_limits(config);
1886    LeanWorkerHostHandleBuilder::shims_only(&config.lake_root, std::iter::empty::<String>())
1887        .worker_child(LeanWorkerChild::for_toolchain(
1888            config.worker_path.clone(),
1889            config.lean_sysroot.clone(),
1890        ))
1891        .startup_timeout(Duration::from_secs(30))
1892        .request_timeout(Duration::from_millis(config.request_timeout_millis))
1893        .restart_policy(restart_policy)
1894        .rss_hard_limit(
1895            config.worker_rss_hard_kill_kib,
1896            Duration::from_millis(config.worker_rss_sample_millis),
1897        )
1898        .module_cache_limits(module_cache_limits)
1899}
1900
1901fn semantic_capability_builder(
1902    config: &ActorConfig,
1903    built: &lean_toolchain::LeanBuiltCapability,
1904) -> Result<LeanWorkerCapabilityBuilder> {
1905    let restart_policy = LeanWorkerRestartPolicy::default().max_requests(WORKER_REQUEST_RESTARTS);
1906    let builder = LeanWorkerCapabilityBuilder::from_built_capability(built, std::iter::empty::<String>())
1907        .map_err(map_worker_err)?
1908        .import_workspace_root(config.lake_root.clone())
1909        .worker_child(LeanWorkerChild::for_toolchain(
1910            config.worker_path.clone(),
1911            config.lean_sysroot.clone(),
1912        ))
1913        .startup_timeout(Duration::from_secs(30))
1914        .request_timeout(Duration::from_millis(config.request_timeout_millis))
1915        .restart_policy(restart_policy)
1916        .rss_hard_limit(
1917            config.worker_rss_hard_kill_kib,
1918            Duration::from_millis(config.worker_rss_sample_millis),
1919        )
1920        .module_cache_limits(module_cache_limits(config))
1921        .json_command_export(lean_semantic_search_capability::DECLARATION_FEATURES_EXPORT)
1922        .json_command_export(lean_semantic_search_capability::PROOF_GOAL_FEATURES_EXPORT);
1923    Ok(builder)
1924}
1925
1926fn semantic_runtime_cache_root() -> Result<PathBuf> {
1927    let cache_dir =
1928        dirs::cache_dir().ok_or_else(|| ServerError::Internal("could not resolve user cache directory".to_owned()))?;
1929    Ok(cache_dir.join("lean-host-mcp").join("semantic-runtimes"))
1930}
1931
1932fn module_cache_limits(config: &ActorConfig) -> LeanWorkerModuleCacheLimits {
1933    LeanWorkerModuleCacheLimits::default()
1934        .rss_guard_kib(config.module_cache_rss_guard_kib)
1935        .max_bytes(config.module_cache_max_bytes)
1936}
1937
1938fn actor_thread_name(canonical_root: &Path) -> String {
1939    let basename = canonical_root.file_name().and_then(|s| s.to_str()).unwrap_or("project");
1940    format!("lean-host-mcp/project/{basename}")
1941}
1942
1943fn worker_error_is_recoverable_death(err: &LeanWorkerError) -> bool {
1944    matches!(
1945        err,
1946        LeanWorkerError::ChildExited { .. } | LeanWorkerError::ChildPanicOrAbort { .. }
1947    )
1948}
1949
1950fn worker_error_is_session_missing(err: &LeanWorkerError) -> bool {
1951    matches!(err, LeanWorkerError::Worker { code, .. } if code == "lean_rs.worker.session_missing")
1952}
1953
1954#[allow(
1955    clippy::wildcard_enum_match_arm,
1956    reason = "only worker process death variants are restart causes; all other errors are classified elsewhere"
1957)]
1958fn worker_death_cause(err: &LeanWorkerError) -> RestartCause {
1959    match err {
1960        LeanWorkerError::ChildPanicOrAbort { .. } => RestartCause::ChildAbort,
1961        LeanWorkerError::ChildExited { .. } => RestartCause::ChildExit,
1962        _ => RestartCause::WorkerInternal,
1963    }
1964}
1965
1966fn restart_reason_text(reason: &LeanWorkerRestartReason) -> String {
1967    match reason {
1968        LeanWorkerRestartReason::Explicit => RestartCause::Explicit.as_str().to_owned(),
1969        LeanWorkerRestartReason::MaxRequests { limit } => format!("max_requests limit={limit}"),
1970        LeanWorkerRestartReason::MaxImports { limit } => format!("max_imports limit={limit}"),
1971        LeanWorkerRestartReason::RssCeiling {
1972            current_kib, limit_kib, ..
1973        } => {
1974            format!("rss_ceiling current_kib={current_kib} limit_kib={limit_kib}")
1975        }
1976        LeanWorkerRestartReason::RssHardLimit {
1977            operation,
1978            current_kib,
1979            limit_kib,
1980            ..
1981        } => {
1982            format!("rss_hard_limit operation={operation} current_kib={current_kib} limit_kib={limit_kib}")
1983        }
1984        LeanWorkerRestartReason::Idle { idle_for, limit } => {
1985            format!(
1986                "idle idle_for_millis={} limit_millis={}",
1987                millis_u64(*idle_for),
1988                millis_u64(*limit)
1989            )
1990        }
1991        LeanWorkerRestartReason::Cancelled { operation } => format!("cancelled operation={operation}"),
1992        LeanWorkerRestartReason::ChildAbort { operation } => format!("child_abort operation={operation}"),
1993        LeanWorkerRestartReason::RequestTimeout { operation, duration } => {
1994            format!(
1995                "timeout operation={operation} duration_millis={}",
1996                millis_u64(*duration)
1997            )
1998        }
1999    }
2000}
2001
2002fn import_fingerprint(imports: &[String]) -> String {
2003    imports.join("\n")
2004}
2005
2006fn millis_u64(duration: Duration) -> u64 {
2007    u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
2008}
2009
2010fn parse_keyed_u64(text: &str, key: &str) -> Option<u64> {
2011    let prefix = format!("{key}=");
2012    let start = text.find(&prefix)?.checked_add(prefix.len())?;
2013    let digits = text[start..]
2014        .chars()
2015        .take_while(|ch| ch.is_ascii_digit())
2016        .collect::<String>();
2017    digits.parse().ok()
2018}
2019
2020/// Resolve a knob through `env > file > default` and reject a zero result
2021/// whatever its source. `env` is the raw env-var string (parsed here); `file`
2022/// is the already-typed config-file value; `default` is the built-in constant.
2023fn parse_nonzero_u64(name: &str, env: Option<&str>, file: Option<u64>, default: u64) -> Result<u64> {
2024    let value = match env {
2025        Some(raw) => raw
2026            .parse::<u64>()
2027            .map_err(|e| ServerError::Internal(format!("{name}={raw:?} not a u64: {e}")))?,
2028        None => file.unwrap_or(default),
2029    };
2030    if value == 0 {
2031        return Err(ServerError::Internal(format!(
2032            "{name} resolved to 0, which is not allowed"
2033        )));
2034    }
2035    Ok(value)
2036}
2037
2038fn parse_nonzero_usize(name: &str, env: Option<&str>, file: Option<usize>, default: usize) -> Result<usize> {
2039    let parsed = parse_nonzero_u64(name, env, file.map(|v| v as u64), default as u64)?;
2040    usize::try_from(parsed).map_err(|_| ServerError::Internal(format!("{name}={parsed} does not fit in usize")))
2041}
2042
2043#[cfg(test)]
2044#[allow(
2045    clippy::expect_used,
2046    clippy::panic,
2047    clippy::unwrap_used,
2048    reason = "unit tests use expect/unwrap_err to state the branch under test directly"
2049)]
2050mod tests {
2051    use super::*;
2052
2053    #[test]
2054    fn runtime_config_parses_runtime_policy_without_env_reads() {
2055        // Distinct values that also satisfy the RSS ordering invariant
2056        // (import_switch <= post_job <= hard_kill); see validate_rss_ordering.
2057        let config = parse_runtime_config(
2058            RuntimeEnv {
2059                worker_rss_post_job_restart_kib: Some("5".to_owned()),
2060                worker_rss_hard_kill_kib: Some("7".to_owned()),
2061                worker_rss_sample_millis: Some("11".to_owned()),
2062                import_switch_rss_soft_kib: Some("3".to_owned()),
2063                module_cache_rss_guard_kib: Some("17".to_owned()),
2064                module_cache_max_bytes: Some("19".to_owned()),
2065                request_timeout_millis: Some("37".to_owned()),
2066                project_mailbox_capacity: Some("23".to_owned()),
2067                worker_restart_limit: Some("29".to_owned()),
2068                worker_restart_window_secs: Some("31".to_owned()),
2069            },
2070            &RuntimeFileConfig::default(),
2071        )
2072        .unwrap();
2073
2074        assert_eq!(config.worker_rss_post_job_restart_kib(), 5);
2075        assert_eq!(config.worker_rss_hard_kill_kib(), 7);
2076        assert_eq!(config.worker_rss_sample_millis(), 11);
2077        assert_eq!(config.import_switch_rss_soft_kib(), 3);
2078        assert_eq!(config.module_cache_rss_guard_kib(), 17);
2079        assert_eq!(config.module_cache_max_bytes(), 19);
2080        assert_eq!(config.request_timeout_millis(), 37);
2081        assert_eq!(config.mailbox_capacity(), 23);
2082        assert_eq!(config.max_restarts_per_window(), 29);
2083        assert_eq!(config.restart_window(), Duration::from_secs(31));
2084    }
2085
2086    #[test]
2087    fn request_timeout_precedence_env_over_file_over_default() {
2088        let file = RuntimeFileConfig {
2089            request_timeout_millis: Some(45_000),
2090            ..RuntimeFileConfig::default()
2091        };
2092        // Env unset -> file value is used.
2093        let config = parse_runtime_config(RuntimeEnv::default(), &file).unwrap();
2094        assert_eq!(config.request_timeout_millis(), 45_000);
2095        // Env set -> env wins over the file.
2096        let env = RuntimeEnv {
2097            request_timeout_millis: Some("90000".to_owned()),
2098            ..RuntimeEnv::default()
2099        };
2100        let config = parse_runtime_config(env, &file).unwrap();
2101        assert_eq!(config.request_timeout_millis(), 90_000);
2102        // Neither -> built-in default (120 s).
2103        let config = parse_runtime_config(RuntimeEnv::default(), &RuntimeFileConfig::default()).unwrap();
2104        assert_eq!(config.request_timeout_millis(), REQUEST_TIMEOUT_MILLIS);
2105    }
2106
2107    #[test]
2108    fn request_timeout_zero_is_rejected() {
2109        // A zero deadline would time every call out instantly; parse_nonzero_u64
2110        // must reject it.
2111        let err = parse_runtime_config(
2112            RuntimeEnv {
2113                request_timeout_millis: Some("0".to_owned()),
2114                ..RuntimeEnv::default()
2115            },
2116            &RuntimeFileConfig::default(),
2117        )
2118        .unwrap_err();
2119        let ServerError::Internal(message) = err else {
2120            panic!("expected Internal config error");
2121        };
2122        assert!(
2123            message.contains("LEAN_HOST_MCP_REQUEST_TIMEOUT_MILLIS"),
2124            "message: {message}"
2125        );
2126    }
2127
2128    #[test]
2129    fn rss_config_rejects_post_job_above_hard_kill() {
2130        let err = parse_runtime_config(
2131            RuntimeEnv {
2132                // 20 GiB post-job ceiling above the 16 GiB default hard kill: the
2133                // planned post-job cycle could never fire before the hard kill.
2134                worker_rss_post_job_restart_kib: Some("20971520".to_owned()),
2135                ..RuntimeEnv::default()
2136            },
2137            &RuntimeFileConfig::default(),
2138        )
2139        .unwrap_err();
2140        let ServerError::Internal(message) = err else {
2141            panic!("expected Internal config error");
2142        };
2143        assert!(message.contains("invalid RSS config"), "message: {message}");
2144        assert!(message.contains("hard_kill"), "message: {message}");
2145    }
2146
2147    #[test]
2148    fn rss_config_rejects_import_switch_above_post_job() {
2149        let err = parse_runtime_config(
2150            RuntimeEnv {
2151                // Import-switch soft limit above the 5 GiB default post-job ceiling.
2152                import_switch_rss_soft_kib: Some("6291456".to_owned()),
2153                ..RuntimeEnv::default()
2154            },
2155            &RuntimeFileConfig::default(),
2156        )
2157        .unwrap_err();
2158        let ServerError::Internal(message) = err else {
2159            panic!("expected Internal config error");
2160        };
2161        assert!(message.contains("invalid RSS config"), "message: {message}");
2162        assert!(message.contains("import_switch"), "message: {message}");
2163    }
2164
2165    #[test]
2166    fn rss_config_accepts_raising_post_job_to_8gib() {
2167        // The motivating case: 8 GiB post-job ceiling, below the 16 GiB hard
2168        // kill and above the 2 GiB import-switch soft limit.
2169        let config = parse_runtime_config(
2170            RuntimeEnv {
2171                worker_rss_post_job_restart_kib: Some("8388608".to_owned()),
2172                ..RuntimeEnv::default()
2173            },
2174            &RuntimeFileConfig::default(),
2175        )
2176        .unwrap();
2177        assert_eq!(config.worker_rss_post_job_restart_kib(), 8_388_608);
2178    }
2179
2180    #[test]
2181    fn runtime_config_precedence_env_over_file_over_default() {
2182        let file = RuntimeFileConfig {
2183            worker_rss_post_job_restart_kib: Some(8_388_608),
2184            ..RuntimeFileConfig::default()
2185        };
2186        // Env unset -> file value is used.
2187        let config = parse_runtime_config(RuntimeEnv::default(), &file).unwrap();
2188        assert_eq!(config.worker_rss_post_job_restart_kib(), 8_388_608);
2189        // Env set -> env wins over the file (6 GiB, still a valid ordering).
2190        let env = RuntimeEnv {
2191            worker_rss_post_job_restart_kib: Some("6291456".to_owned()),
2192            ..RuntimeEnv::default()
2193        };
2194        let config = parse_runtime_config(env, &file).unwrap();
2195        assert_eq!(config.worker_rss_post_job_restart_kib(), 6_291_456);
2196        // Neither -> built-in default.
2197        let config = parse_runtime_config(RuntimeEnv::default(), &RuntimeFileConfig::default()).unwrap();
2198        assert_eq!(
2199            config.worker_rss_post_job_restart_kib(),
2200            WORKER_RSS_POST_JOB_RESTART_KIB
2201        );
2202    }
2203
2204    #[test]
2205    fn runtime_config_rejects_zero_and_bad_ordering_from_file() {
2206        let zero = RuntimeFileConfig {
2207            worker_rss_sample_millis: Some(0),
2208            ..RuntimeFileConfig::default()
2209        };
2210        assert!(parse_runtime_config(RuntimeEnv::default(), &zero).is_err());
2211
2212        let inverted = RuntimeFileConfig {
2213            worker_rss_post_job_restart_kib: Some(20_971_520), // above 16 GiB default hard kill
2214            ..RuntimeFileConfig::default()
2215        };
2216        let err = parse_runtime_config(RuntimeEnv::default(), &inverted).unwrap_err();
2217        let ServerError::Internal(message) = err else {
2218            panic!("expected Internal config error");
2219        };
2220        assert!(message.contains("invalid RSS config"), "message: {message}");
2221    }
2222
2223    #[test]
2224    fn restart_stats_tally_total_and_per_cause() {
2225        let mut stats = RestartStats::default();
2226        stats.observe("rss_post_job");
2227        stats.observe("rss_post_job");
2228        stats.observe("child_abort");
2229
2230        assert_eq!(stats.total, 3);
2231        assert_eq!(stats.by_cause.get("rss_post_job"), Some(&2));
2232        assert_eq!(stats.by_cause.get("child_abort"), Some(&1));
2233        assert_eq!(stats.by_cause.get("idle"), None);
2234    }
2235
2236    #[test]
2237    fn semantic_capability_builder_omits_capability_import_module() {
2238        let tmp = tempfile::tempdir().unwrap();
2239        let capability_root = tmp.path().join("capability");
2240        let lib_dir = capability_root.join(".lake").join("build").join("lib");
2241        std::fs::create_dir_all(&lib_dir).unwrap();
2242        let dylib_name = if cfg!(target_os = "macos") {
2243            "libLeanSemanticSearch.dylib"
2244        } else {
2245            "libLeanSemanticSearch.so"
2246        };
2247        let dylib = lib_dir.join(dylib_name);
2248        std::fs::write(&dylib, "").unwrap();
2249        let built = lean_toolchain::LeanBuiltCapability::path(&dylib)
2250            .package("lean_semantic_search")
2251            .module("LeanSemanticSearch");
2252        let runtime = Arc::new(Mutex::new(RuntimeSnapshot {
2253            worker_generation: 1,
2254            last_restart: None,
2255            rss_kib: None,
2256            import_profile: None,
2257            profile_switch_count: 0,
2258            restarts_total: 0,
2259            restarts_by_cause: BTreeMap::new(),
2260        }));
2261        let config = ActorConfig {
2262            lake_root: tmp.path().join("consumer"),
2263            manifest_hash: "sha256-test".to_owned(),
2264            toolchain_label: "leanprover/lean4:test".to_owned(),
2265            worker_path: tmp.path().join("worker"),
2266            lean_sysroot: tmp.path().join("lean"),
2267            session_id: "session-test".to_owned(),
2268            runtime,
2269            healthy: Arc::new(AtomicBool::new(true)),
2270            worker_rss_post_job_restart_kib: WORKER_RSS_POST_JOB_RESTART_KIB,
2271            worker_rss_hard_kill_kib: WORKER_RSS_HARD_KILL_KIB,
2272            worker_rss_sample_millis: WORKER_RSS_SAMPLE_MILLIS,
2273            import_switch_rss_soft_kib: IMPORT_SWITCH_RSS_SOFT_KIB,
2274            module_cache_rss_guard_kib: MODULE_CACHE_RSS_GUARD_KIB,
2275            module_cache_max_bytes: MODULE_CACHE_MAX_BYTES,
2276            request_timeout_millis: REQUEST_TIMEOUT_MILLIS,
2277            mailbox_capacity: PROJECT_MAILBOX_CAPACITY,
2278            max_restarts_per_window: MAX_RESTARTS_PER_WINDOW,
2279            restart_window: RESTART_WINDOW,
2280            toolchain_advisories: Vec::new(),
2281        };
2282
2283        let builder = semantic_capability_builder(&config, &built).unwrap();
2284        let debug = format!("{builder:?}");
2285
2286        assert!(debug.contains("imports: []"), "builder debug: {debug}");
2287        assert!(
2288            !debug.contains("LeanSemanticSearch.Capability"),
2289            "builder must not import the capability module: {debug}"
2290        );
2291        assert!(
2292            debug.contains("import_workspace_root: Some"),
2293            "builder should import sessions against the consumer workspace: {debug}"
2294        );
2295    }
2296
2297    #[test]
2298    fn bootstrap_hard_rss_failure_is_structured_runtime_unavailable() {
2299        let runtime = Arc::new(Mutex::new(RuntimeSnapshot {
2300            worker_generation: 1,
2301            last_restart: None,
2302            rss_kib: None,
2303            import_profile: None,
2304            profile_switch_count: 0,
2305            restarts_total: 0,
2306            restarts_by_cause: BTreeMap::new(),
2307        }));
2308        let config = ActorConfig {
2309            lake_root: PathBuf::from("/tmp/lean-host-mcp-bootstrap-rss-test"),
2310            manifest_hash: "sha256-test".to_owned(),
2311            toolchain_label: "leanprover/lean4:test".to_owned(),
2312            worker_path: PathBuf::from("/tmp/worker"),
2313            lean_sysroot: PathBuf::from("/tmp/lean"),
2314            session_id: "session-test".to_owned(),
2315            runtime: Arc::clone(&runtime),
2316            healthy: Arc::new(AtomicBool::new(true)),
2317            worker_rss_post_job_restart_kib: WORKER_RSS_POST_JOB_RESTART_KIB,
2318            worker_rss_hard_kill_kib: 64,
2319            worker_rss_sample_millis: WORKER_RSS_SAMPLE_MILLIS,
2320            import_switch_rss_soft_kib: IMPORT_SWITCH_RSS_SOFT_KIB,
2321            module_cache_rss_guard_kib: MODULE_CACHE_RSS_GUARD_KIB,
2322            module_cache_max_bytes: MODULE_CACHE_MAX_BYTES,
2323            request_timeout_millis: REQUEST_TIMEOUT_MILLIS,
2324            mailbox_capacity: PROJECT_MAILBOX_CAPACITY,
2325            max_restarts_per_window: MAX_RESTARTS_PER_WINDOW,
2326            restart_window: RESTART_WINDOW,
2327            toolchain_advisories: Vec::new(),
2328        };
2329
2330        let err = bootstrap_hard_rss_unavailable(
2331            &config,
2332            "rss_hard_limit_exceeded operation=startup current_kib=128 limit_kib=64".to_owned(),
2333            Some(128),
2334            Some(64),
2335        );
2336
2337        let ServerError::WorkerUnavailable(info) = err else {
2338            panic!("expected WorkerUnavailable");
2339        };
2340        assert!(!info.retryable);
2341        assert_eq!(info.restart_cause.as_deref(), Some("rss_hard_limit_exceeded"));
2342        assert_eq!(info.rss_kib, Some(128));
2343        assert_eq!(info.limit_kib, Some(64));
2344        assert_eq!(
2345            info.runtime.last_restart.as_ref().map(|event| event.cause.as_str()),
2346            Some("rss_hard_limit_exceeded")
2347        );
2348    }
2349
2350    #[test]
2351    fn planned_restart_causes_do_not_consume_abnormal_restart_budget() {
2352        assert!(!RestartCause::RssImportSwitch.counts_toward_restart_limit());
2353        assert!(!RestartCause::RssPostJob.counts_toward_restart_limit());
2354        assert!(!RestartCause::MaxRequests.counts_toward_restart_limit());
2355        assert!(!RestartCause::MaxImports.counts_toward_restart_limit());
2356        assert!(!RestartCause::Idle.counts_toward_restart_limit());
2357
2358        assert!(RestartCause::ChildExit.counts_toward_restart_limit());
2359        assert!(RestartCause::ChildAbort.counts_toward_restart_limit());
2360        assert!(RestartCause::Timeout.counts_toward_restart_limit());
2361        assert!(RestartCause::Cancelled.counts_toward_restart_limit());
2362        assert!(RestartCause::SessionMissing.counts_toward_restart_limit());
2363        assert!(RestartCause::RssHardLimit.counts_toward_restart_limit());
2364    }
2365
2366    #[test]
2367    fn worker_restart_reason_maps_to_stable_cause() {
2368        assert_eq!(
2369            restart_cause_from_worker(&LeanWorkerRestartReason::MaxRequests { limit: 1 }).as_str(),
2370            "max_requests"
2371        );
2372        assert_eq!(
2373            restart_cause_from_worker(&LeanWorkerRestartReason::RssCeiling {
2374                current_kib: 2,
2375                limit_kib: 1,
2376                last_import_stats: None,
2377            })
2378            .as_str(),
2379            "rss_post_job"
2380        );
2381        assert_eq!(
2382            restart_cause_from_worker(&LeanWorkerRestartReason::RssHardLimit {
2383                operation: "test",
2384                current_kib: 2,
2385                limit_kib: 1,
2386                last_import_stats: None,
2387            })
2388            .as_str(),
2389            "rss_hard_limit_exceeded"
2390        );
2391        assert_eq!(
2392            restart_cause_from_worker(&LeanWorkerRestartReason::RequestTimeout {
2393                operation: "test",
2394                duration: Duration::from_millis(1),
2395            })
2396            .as_str(),
2397            "timeout"
2398        );
2399    }
2400}