Skip to main content

lean_rs_worker/
pool.rs

1//! Local worker-pool orchestration and session leasing.
2//!
3//! The pool sits above `LeanWorkerCapabilityBuilder` and typed commands. It
4//! chooses a compatible local child process for capability work, while callers
5//! only see session requirements and a lease that can run typed commands.
6
7use std::path::PathBuf;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use serde::Serialize;
12use serde::de::DeserializeOwned;
13use serde_json::Value;
14
15use crate::capability::{LeanWorkerCapability, LeanWorkerCapabilityBuilder};
16use crate::session::{
17    LeanWorkerCancellationToken, LeanWorkerDiagnosticSink, LeanWorkerJsonCommand, LeanWorkerProgressSink,
18    LeanWorkerRuntimeMetadata, LeanWorkerStreamingCommand, LeanWorkerTypedDataSink, LeanWorkerTypedStreamSummary,
19};
20use crate::supervisor::{LeanWorkerError, LeanWorkerRestartReason, LeanWorkerStatus};
21
22/// Coarse restart-policy class used in pool session keys.
23///
24/// The pool key records whether a session was opened under the default policy
25/// or a caller-selected policy class. It deliberately does not expose every
26/// restart-policy knob as key material; memory-aware scheduling and richer
27/// policy admission are not part of the session-key contract.
28#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
29pub enum LeanWorkerRestartPolicyClass {
30    Default,
31    Custom,
32}
33
34/// Worker reuse key for a capability-backed session.
35///
36/// A session key answers only one pool question: can an already-open child
37/// session safely host compatible work? It is not a downstream cache key, and
38/// it does not encode row schemas, cache validity, ranking, reporting, or
39/// source provenance.
40#[derive(Clone, Debug, Eq, PartialEq)]
41pub struct LeanWorkerSessionKey {
42    project_root: PathBuf,
43    package: String,
44    lib_name: String,
45    imports: Vec<String>,
46    metadata_expectation: Option<LeanWorkerMetadataExpectationKey>,
47    toolchain_fingerprint: lean_toolchain::ToolchainFingerprint,
48    restart_policy_class: LeanWorkerRestartPolicyClass,
49}
50
51impl LeanWorkerSessionKey {
52    /// Create a session key from the caller-visible capability requirements.
53    #[must_use]
54    pub fn new(
55        project_root: impl Into<PathBuf>,
56        package: impl Into<String>,
57        lib_name: impl Into<String>,
58        imports: impl IntoIterator<Item = impl Into<String>>,
59    ) -> Self {
60        Self {
61            project_root: project_root.into(),
62            package: package.into(),
63            lib_name: lib_name.into(),
64            imports: imports.into_iter().map(Into::into).collect(),
65            metadata_expectation: None,
66            toolchain_fingerprint: lean_toolchain::ToolchainFingerprint::current(),
67            restart_policy_class: LeanWorkerRestartPolicyClass::Default,
68        }
69    }
70
71    /// Add the metadata expectation used to decide safe session reuse.
72    ///
73    /// `expected` is downstream metadata transported by the generic metadata
74    /// envelope. The pool compares it as opaque facts and does not interpret
75    /// command names or semantic versions.
76    #[must_use]
77    pub fn metadata_expectation(
78        mut self,
79        export: impl Into<String>,
80        request: Value,
81        expected: Option<crate::types::LeanWorkerCapabilityMetadata>,
82    ) -> Self {
83        self.metadata_expectation = Some(LeanWorkerMetadataExpectationKey {
84            export: export.into(),
85            request,
86            expected,
87        });
88        self
89    }
90
91    /// Set the coarse restart-policy class for this session key.
92    #[must_use]
93    pub fn restart_policy_class(mut self, class: LeanWorkerRestartPolicyClass) -> Self {
94        self.restart_policy_class = class;
95        self
96    }
97
98    /// Return the Lake project root for this session key.
99    #[must_use]
100    pub fn project_root(&self) -> &std::path::Path {
101        &self.project_root
102    }
103
104    /// Return the Lake package name for this session key.
105    #[must_use]
106    pub fn package(&self) -> &str {
107        &self.package
108    }
109
110    /// Return the Lake library target for this session key.
111    #[must_use]
112    pub fn lib_name(&self) -> &str {
113        &self.lib_name
114    }
115
116    /// Return the imports required by this session key.
117    #[must_use]
118    pub fn imports(&self) -> &[String] {
119        &self.imports
120    }
121
122    /// Return the build-baked Lean toolchain fingerprint used by this key.
123    #[must_use]
124    pub fn toolchain_fingerprint(&self) -> &lean_toolchain::ToolchainFingerprint {
125        &self.toolchain_fingerprint
126    }
127
128    /// Return the restart-policy class used by this key.
129    #[must_use]
130    pub fn policy_class(&self) -> LeanWorkerRestartPolicyClass {
131        self.restart_policy_class
132    }
133}
134
135#[derive(Clone, Debug, Eq, PartialEq)]
136struct LeanWorkerMetadataExpectationKey {
137    export: String,
138    request: Value,
139    expected: Option<crate::types::LeanWorkerCapabilityMetadata>,
140}
141
142/// Configuration for a local `LeanWorkerPool`.
143#[derive(Clone, Debug, Eq, PartialEq)]
144pub struct LeanWorkerPoolConfig {
145    max_workers: usize,
146    max_total_child_rss_kib: Option<u64>,
147    per_worker_rss_ceiling_kib: Option<u64>,
148    idle_cycle_after: Option<Duration>,
149    queue_wait_timeout: Duration,
150}
151
152impl LeanWorkerPoolConfig {
153    /// Create pool configuration with a fixed local worker limit.
154    #[must_use]
155    pub fn new(max_workers: usize) -> Self {
156        Self {
157            max_workers: max_workers.max(1),
158            max_total_child_rss_kib: None,
159            per_worker_rss_ceiling_kib: None,
160            idle_cycle_after: None,
161            queue_wait_timeout: Duration::ZERO,
162        }
163    }
164
165    /// Return the maximum number of local child workers the pool may own.
166    #[must_use]
167    pub fn max_workers(&self) -> usize {
168        self.max_workers
169    }
170
171    /// Reject new distinct workers when known total child RSS reaches `limit`.
172    ///
173    /// RSS sampling is best effort. On platforms where the pool cannot obtain
174    /// samples, it records unavailable samples and does not make a false
175    /// admission claim.
176    #[must_use]
177    pub fn max_total_child_rss_kib(mut self, limit: u64) -> Self {
178        self.max_total_child_rss_kib = Some(limit.max(1));
179        self
180    }
181
182    /// Cycle a worker before assigning work when its sampled RSS reaches `limit`.
183    #[must_use]
184    pub fn per_worker_rss_ceiling_kib(mut self, limit: u64) -> Self {
185        self.per_worker_rss_ceiling_kib = Some(limit.max(1));
186        self
187    }
188
189    /// Cycle an idle worker before assigning more work through an old lease.
190    #[must_use]
191    pub fn idle_cycle_after(mut self, limit: Duration) -> Self {
192        self.idle_cycle_after = Some(limit);
193        self
194    }
195
196    /// Wait this long for local pool admission before returning a typed error.
197    ///
198    /// The current pool is synchronous. This timeout documents and bounds the
199    /// admission point without exposing worker ids or queue internals.
200    #[must_use]
201    pub fn queue_wait_timeout(mut self, timeout: Duration) -> Self {
202        self.queue_wait_timeout = timeout;
203        self
204    }
205
206    /// Return the configured total child RSS budget in KiB.
207    #[must_use]
208    pub fn max_total_child_rss_kib_limit(&self) -> Option<u64> {
209        self.max_total_child_rss_kib
210    }
211
212    /// Return the configured per-worker RSS ceiling in KiB.
213    #[must_use]
214    pub fn per_worker_rss_ceiling_kib_limit(&self) -> Option<u64> {
215        self.per_worker_rss_ceiling_kib
216    }
217
218    /// Return the configured idle-cycle duration.
219    #[must_use]
220    pub fn idle_cycle_after_limit(&self) -> Option<Duration> {
221        self.idle_cycle_after
222    }
223
224    /// Return the configured pool admission wait timeout.
225    #[must_use]
226    pub fn queue_wait_timeout_limit(&self) -> Duration {
227        self.queue_wait_timeout
228    }
229}
230
231impl Default for LeanWorkerPoolConfig {
232    fn default() -> Self {
233        Self::new(1)
234    }
235}
236
237/// Summary of public pool state.
238///
239/// This snapshot exposes admission and reuse facts without revealing worker
240/// ids, child pids, pipe handles, or which warm child will be selected.
241#[derive(Clone, Debug, Eq, PartialEq)]
242pub struct LeanWorkerPoolSnapshot {
243    pub max_workers: usize,
244    pub workers: usize,
245    pub active_workers: usize,
246    pub warm_leases: usize,
247    pub queue_depth: usize,
248    pub total_child_rss_kib: Option<u64>,
249    pub rss_samples_unavailable: u64,
250    pub requests: u64,
251    pub imports: u64,
252    pub worker_restarts: u64,
253    pub max_request_restarts: u64,
254    pub max_import_restarts: u64,
255    pub rss_restarts: u64,
256    pub idle_restarts: u64,
257    pub cancelled_restarts: u64,
258    pub timeout_restarts: u64,
259    pub policy_restarts: u64,
260    pub queue_timeouts: u64,
261    pub memory_budget_rejections: u64,
262    pub last_restart_reason: Option<LeanWorkerRestartReason>,
263    pub stream_requests: u64,
264    pub stream_successes: u64,
265    pub stream_failures: u64,
266    pub data_rows_delivered: u64,
267    pub data_row_payload_bytes: u64,
268    pub stream_elapsed: Duration,
269    pub backpressure_waits: u64,
270    pub backpressure_failures: u64,
271}
272
273/// Local pool for worker-backed capability sessions.
274#[derive(Debug)]
275pub struct LeanWorkerPool {
276    config: LeanWorkerPoolConfig,
277    entries: Vec<PoolEntry>,
278    queue_timeouts: u64,
279    memory_budget_rejections: u64,
280}
281
282impl LeanWorkerPool {
283    /// Create an empty local worker pool.
284    #[must_use]
285    pub fn new(config: LeanWorkerPoolConfig) -> Self {
286        Self {
287            config,
288            entries: Vec::new(),
289            queue_timeouts: 0,
290            memory_budget_rejections: 0,
291        }
292    }
293
294    /// Acquire a lease for the capability described by `builder`.
295    ///
296    /// The pool reuses a warm compatible worker session when possible. If a
297    /// matching worker has died, the pool replaces it before returning the
298    /// lease. If admitting a distinct session key would exceed `max_workers`,
299    /// the pool returns `LeanWorkerError::WorkerPoolExhausted`.
300    ///
301    /// # Errors
302    ///
303    /// Returns `LeanWorkerError` when the capability cannot be built, metadata
304    /// validation fails, a dead compatible worker cannot be replaced, or the
305    /// fixed local worker limit is already full of distinct session keys.
306    pub fn acquire_lease(
307        &mut self,
308        builder: LeanWorkerCapabilityBuilder,
309    ) -> Result<LeanWorkerSessionLease<'_>, LeanWorkerError> {
310        let key = builder.session_key();
311        if let Some(index) = self.entries.iter().position(|entry| entry.key == key) {
312            self.ensure_entry_running(index)?;
313            self.enforce_entry_policy_before_assignment(index)?;
314            let entry = self.entries.get_mut(index).ok_or_else(|| LeanWorkerError::Protocol {
315                message: "worker pool entry disappeared during lease acquisition".to_owned(),
316            })?;
317            entry.active_leases = entry.active_leases.saturating_add(1);
318            return Ok(LeanWorkerSessionLease {
319                entry,
320                config: self.config.clone(),
321                valid: true,
322                invalidation_reason: None,
323                request_timeout_override: None,
324            });
325        }
326
327        if self.entries.len() >= self.config.max_workers {
328            return self.pool_full_error();
329        }
330        self.ensure_spawn_within_total_rss_budget()?;
331
332        let capability = builder.clone().open()?;
333        let base_request_timeout = builder.pool_request_timeout();
334        self.entries.push(PoolEntry {
335            key,
336            builder,
337            capability,
338            base_request_timeout,
339            last_rss_kib: None,
340            rss_samples_unavailable: 0,
341            last_activity: Instant::now(),
342            last_restart_reason: None,
343            policy_restarts: 0,
344            active_leases: 0,
345        });
346        let entry = self.entries.last_mut().ok_or_else(|| LeanWorkerError::Protocol {
347            message: "worker pool failed to retain newly opened entry".to_owned(),
348        })?;
349        let _ = entry.sample_rss();
350        entry.active_leases = entry.active_leases.saturating_add(1);
351        Ok(LeanWorkerSessionLease {
352            entry,
353            config: self.config.clone(),
354            valid: true,
355            invalidation_reason: None,
356            request_timeout_override: None,
357        })
358    }
359
360    /// Return a public snapshot of pool state.
361    #[must_use]
362    pub fn snapshot(&self) -> LeanWorkerPoolSnapshot {
363        snapshot_from_entries(
364            &self.config,
365            &self.entries,
366            self.queue_timeouts,
367            self.memory_budget_rejections,
368        )
369    }
370
371    fn snapshot_from_lease_config(config: &LeanWorkerPoolConfig, entry: &PoolEntry) -> LeanWorkerPoolSnapshot {
372        snapshot_from_entries(config, std::slice::from_ref(entry), 0, 0)
373    }
374}
375
376fn snapshot_from_entries(
377    config: &LeanWorkerPoolConfig,
378    entries: &[PoolEntry],
379    queue_timeouts: u64,
380    memory_budget_rejections: u64,
381) -> LeanWorkerPoolSnapshot {
382    LeanWorkerPoolSnapshot {
383        max_workers: config.max_workers,
384        workers: entries.len(),
385        active_workers: entries.iter().filter(|entry| entry.active_leases > 0).count(),
386        warm_leases: entries.iter().filter(|entry| entry.active_leases == 0).count(),
387        queue_depth: 0,
388        total_child_rss_kib: total_known_child_rss_kib(entries),
389        rss_samples_unavailable: entries.iter().map(|entry| entry.rss_samples_unavailable).sum(),
390        requests: entries
391            .iter()
392            .map(|entry| entry.capability.worker().stats().requests)
393            .sum(),
394        imports: entries
395            .iter()
396            .map(|entry| entry.capability.worker().stats().imports)
397            .sum(),
398        worker_restarts: entries
399            .iter()
400            .map(|entry| entry.capability.worker().stats().restarts)
401            .sum(),
402        max_request_restarts: entries
403            .iter()
404            .map(|entry| entry.capability.worker().stats().max_request_restarts)
405            .sum(),
406        max_import_restarts: entries
407            .iter()
408            .map(|entry| entry.capability.worker().stats().max_import_restarts)
409            .sum(),
410        rss_restarts: entries
411            .iter()
412            .map(|entry| entry.capability.worker().stats().rss_restarts)
413            .sum(),
414        idle_restarts: entries
415            .iter()
416            .map(|entry| entry.capability.worker().stats().idle_restarts)
417            .sum(),
418        cancelled_restarts: entries
419            .iter()
420            .map(|entry| entry.capability.worker().stats().cancelled_restarts)
421            .sum(),
422        timeout_restarts: entries
423            .iter()
424            .map(|entry| entry.capability.worker().stats().timeout_restarts)
425            .sum(),
426        policy_restarts: entries.iter().map(|entry| entry.policy_restarts).sum(),
427        queue_timeouts,
428        memory_budget_rejections,
429        last_restart_reason: entries.iter().rev().find_map(|entry| entry.last_restart_reason.clone()),
430        stream_requests: entries
431            .iter()
432            .map(|entry| entry.capability.worker().stats().stream_requests)
433            .sum(),
434        stream_successes: entries
435            .iter()
436            .map(|entry| entry.capability.worker().stats().stream_successes)
437            .sum(),
438        stream_failures: entries
439            .iter()
440            .map(|entry| entry.capability.worker().stats().stream_failures)
441            .sum(),
442        data_rows_delivered: entries
443            .iter()
444            .map(|entry| entry.capability.worker().stats().data_rows_delivered)
445            .sum(),
446        data_row_payload_bytes: entries
447            .iter()
448            .map(|entry| entry.capability.worker().stats().data_row_payload_bytes)
449            .sum(),
450        stream_elapsed: entries.iter().fold(Duration::ZERO, |acc, entry| {
451            acc.saturating_add(entry.capability.worker().stats().stream_elapsed)
452        }),
453        backpressure_waits: entries
454            .iter()
455            .map(|entry| entry.capability.worker().stats().backpressure_waits)
456            .sum(),
457        backpressure_failures: entries
458            .iter()
459            .map(|entry| entry.capability.worker().stats().backpressure_failures)
460            .sum(),
461    }
462}
463
464fn total_known_child_rss_kib(entries: &[PoolEntry]) -> Option<u64> {
465    entries
466        .iter()
467        .map(|entry| entry.last_rss_kib)
468        .try_fold(0_u64, |acc, value| value.map(|rss| acc.saturating_add(rss)))
469}
470
471impl LeanWorkerPool {
472    fn ensure_entry_running(&mut self, index: usize) -> Result<(), LeanWorkerError> {
473        let entry = self.entries.get_mut(index).ok_or_else(|| LeanWorkerError::Protocol {
474            message: "worker pool entry disappeared during liveness check".to_owned(),
475        })?;
476        match entry.capability.worker_mut().status()? {
477            LeanWorkerStatus::Running => Ok(()),
478            LeanWorkerStatus::Exited(_exit) => {
479                entry.capability = entry.builder.clone().open()?;
480                entry.last_activity = Instant::now();
481                Ok(())
482            }
483        }
484    }
485
486    fn enforce_entry_policy_before_assignment(&mut self, index: usize) -> Result<(), LeanWorkerError> {
487        let entry = self.entries.get_mut(index).ok_or_else(|| LeanWorkerError::Protocol {
488            message: "worker pool entry disappeared during policy check".to_owned(),
489        })?;
490        entry.enforce_policy(&self.config).map(|_| ())
491    }
492
493    fn ensure_spawn_within_total_rss_budget(&mut self) -> Result<(), LeanWorkerError> {
494        let Some(limit_kib) = self.config.max_total_child_rss_kib else {
495            return Ok(());
496        };
497        let rss = self.refresh_total_child_rss();
498        if rss.unavailable > 0 {
499            return Ok(());
500        }
501        if rss.total_kib >= limit_kib {
502            self.memory_budget_rejections = self.memory_budget_rejections.saturating_add(1);
503            return Err(LeanWorkerError::WorkerPoolMemoryBudgetExceeded {
504                current_kib: rss.total_kib,
505                limit_kib,
506            });
507        }
508        Ok(())
509    }
510
511    fn refresh_total_child_rss(&mut self) -> PoolRssTotal {
512        let mut total_kib = 0_u64;
513        let mut unavailable = 0_u64;
514        for entry in &mut self.entries {
515            match entry.sample_rss() {
516                Some(value) => {
517                    total_kib = total_kib.saturating_add(value);
518                }
519                None => {
520                    unavailable = unavailable.saturating_add(1);
521                }
522            }
523        }
524        PoolRssTotal { total_kib, unavailable }
525    }
526
527    fn pool_full_error<T>(&mut self) -> Result<T, LeanWorkerError> {
528        if self.config.queue_wait_timeout.is_zero() {
529            return Err(LeanWorkerError::WorkerPoolExhausted {
530                max_workers: self.config.max_workers,
531            });
532        }
533        let started = Instant::now();
534        while started.elapsed() < self.config.queue_wait_timeout {
535            let remaining = self.config.queue_wait_timeout.saturating_sub(started.elapsed());
536            thread::sleep(remaining.min(Duration::from_millis(10)));
537        }
538        self.queue_timeouts = self.queue_timeouts.saturating_add(1);
539        Err(LeanWorkerError::WorkerPoolQueueTimeout {
540            waited: self.config.queue_wait_timeout,
541        })
542    }
543}
544
545impl Default for LeanWorkerPool {
546    fn default() -> Self {
547        Self::new(LeanWorkerPoolConfig::default())
548    }
549}
550
551#[derive(Debug)]
552struct PoolEntry {
553    key: LeanWorkerSessionKey,
554    builder: LeanWorkerCapabilityBuilder,
555    capability: LeanWorkerCapability,
556    base_request_timeout: Duration,
557    last_rss_kib: Option<u64>,
558    rss_samples_unavailable: u64,
559    last_activity: Instant,
560    last_restart_reason: Option<LeanWorkerRestartReason>,
561    policy_restarts: u64,
562    active_leases: u64,
563}
564
565impl PoolEntry {
566    fn sample_rss(&mut self) -> Option<u64> {
567        match self.capability.worker_mut().rss_kib() {
568            Some(value) => {
569                self.last_rss_kib = Some(value);
570                Some(value)
571            }
572            None => {
573                self.rss_samples_unavailable = self.rss_samples_unavailable.saturating_add(1);
574                None
575            }
576        }
577    }
578
579    fn enforce_policy(&mut self, config: &LeanWorkerPoolConfig) -> Result<Option<String>, LeanWorkerError> {
580        if let Some(limit_kib) = config.per_worker_rss_ceiling_kib {
581            match self.sample_rss() {
582                Some(current_kib) if current_kib >= limit_kib => {
583                    let reason = LeanWorkerRestartReason::RssCeiling { current_kib, limit_kib };
584                    self.cycle_for_policy(reason)?;
585                    return Ok(Some(format!(
586                        "memory policy cycled worker at {current_kib} KiB RSS with limit {limit_kib} KiB"
587                    )));
588                }
589                Some(_) | None => {}
590            }
591        }
592
593        if let Some(limit) = config.idle_cycle_after {
594            let idle_for = self.last_activity.elapsed();
595            if idle_for >= limit {
596                let reason = LeanWorkerRestartReason::Idle { idle_for, limit };
597                self.cycle_for_policy(reason)?;
598                return Ok(Some(format!(
599                    "idle policy cycled worker after {idle_for:?} idle with limit {limit:?}"
600                )));
601            }
602        }
603
604        Ok(None)
605    }
606
607    fn cycle_for_policy(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
608        self.capability.worker_mut().cycle_with_restart_reason(reason.clone())?;
609        self.last_restart_reason = Some(reason);
610        self.last_activity = Instant::now();
611        self.last_rss_kib = None;
612        self.policy_restarts = self.policy_restarts.saturating_add(1);
613        Ok(())
614    }
615}
616
617#[derive(Clone, Copy, Debug, Eq, PartialEq)]
618struct PoolRssTotal {
619    total_kib: u64,
620    unavailable: u64,
621}
622
623/// Borrowed lease for running typed commands on a compatible worker session.
624///
625/// The lease does not expose which worker was selected. If a command triggers
626/// timeout, cancellation, child failure, or explicit cycle, the lease becomes
627/// invalid and a fresh lease must be acquired from the pool.
628#[derive(Debug)]
629pub struct LeanWorkerSessionLease<'pool> {
630    entry: &'pool mut PoolEntry,
631    config: LeanWorkerPoolConfig,
632    valid: bool,
633    invalidation_reason: Option<String>,
634    request_timeout_override: Option<Duration>,
635}
636
637impl LeanWorkerSessionLease<'_> {
638    /// Return the session key that justified this lease.
639    #[must_use]
640    pub fn session_key(&self) -> &LeanWorkerSessionKey {
641        &self.entry.key
642    }
643
644    /// Return protocol/runtime facts reported by the leased worker child.
645    #[must_use]
646    pub fn runtime_metadata(&self) -> LeanWorkerRuntimeMetadata {
647        self.entry.capability.runtime_metadata()
648    }
649
650    /// Return whether this lease can still run commands.
651    #[must_use]
652    pub fn is_valid(&self) -> bool {
653        self.valid
654    }
655
656    /// Return an operational snapshot for the worker entry behind this lease.
657    ///
658    /// This is the sampling hook to use while a lease is checked out. It keeps
659    /// child identity, pipe state, and protocol details hidden; the snapshot
660    /// only reports the same aggregate counters as `LeanWorkerPool::snapshot`
661    /// for the leased entry.
662    #[must_use]
663    pub fn snapshot(&self) -> LeanWorkerPoolSnapshot {
664        LeanWorkerPool::snapshot_from_lease_config(&self.config, self.entry)
665    }
666
667    /// Explicitly cycle the leased worker and invalidate this lease.
668    ///
669    /// Acquire a fresh lease before running more work. The pool keeps the
670    /// restarted child available for compatible future leases.
671    ///
672    /// # Errors
673    ///
674    /// Returns `LeanWorkerError` if the lease was already invalid or the
675    /// underlying worker cannot be cycled.
676    pub fn cycle(&mut self) -> Result<(), LeanWorkerError> {
677        self.ensure_valid()?;
678        self.entry.capability.worker_mut().cycle()?;
679        self.invalidate("explicit worker cycle");
680        Ok(())
681    }
682
683    /// Set the request timeout for commands run through this lease.
684    ///
685    /// The pool and supervisor still own the watchdog, child kill, and restart
686    /// bookkeeping. This method only selects the deadline for subsequent
687    /// leased requests.
688    ///
689    /// # Errors
690    ///
691    /// Returns `LeanWorkerError` if the lease was already invalidated.
692    pub fn set_request_timeout(&mut self, timeout: Duration) -> Result<(), LeanWorkerError> {
693        self.ensure_valid()?;
694        self.request_timeout_override = Some(timeout);
695        Ok(())
696    }
697
698    /// Run a typed non-streaming downstream JSON command through this lease.
699    ///
700    /// # Errors
701    ///
702    /// Returns `LeanWorkerError` for invalidated leases, session startup
703    /// failures, typed command errors, cancellation, timeout, child failure,
704    /// progress panic, or protocol failure.
705    pub fn run_json_command<Req, Resp>(
706        &mut self,
707        command: &LeanWorkerJsonCommand<Req, Resp>,
708        request: &Req,
709        cancellation: Option<&LeanWorkerCancellationToken>,
710        progress: Option<&dyn LeanWorkerProgressSink>,
711    ) -> Result<Resp, LeanWorkerError>
712    where
713        Req: Serialize,
714        Resp: DeserializeOwned,
715    {
716        self.ensure_valid()?;
717        self.enforce_policy_before_request()?;
718        let request_timeout = self.request_timeout_override;
719        let result = self
720            .entry
721            .capability
722            .open_session(cancellation, progress)
723            .and_then(|mut session| {
724                if let Some(timeout) = request_timeout {
725                    session.set_request_timeout(timeout);
726                }
727                session.run_json_command(command, request, cancellation, progress)
728            });
729        self.map_lifecycle_result(result)
730    }
731
732    /// Run a typed downstream streaming command through this lease.
733    ///
734    /// # Errors
735    ///
736    /// Returns `LeanWorkerError` for invalidated leases, row or summary decode
737    /// errors, sink failures, cancellation, timeout, child failure, or protocol
738    /// failure.
739    pub fn run_streaming_command<Req, Row, Summary>(
740        &mut self,
741        command: &LeanWorkerStreamingCommand<Req, Row, Summary>,
742        request: &Req,
743        rows: &dyn LeanWorkerTypedDataSink<Row>,
744        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
745        cancellation: Option<&LeanWorkerCancellationToken>,
746        progress: Option<&dyn LeanWorkerProgressSink>,
747    ) -> Result<LeanWorkerTypedStreamSummary<Summary>, LeanWorkerError>
748    where
749        Req: Serialize,
750        Row: DeserializeOwned,
751        Summary: DeserializeOwned,
752    {
753        self.ensure_valid()?;
754        self.enforce_policy_before_request()?;
755        let request_timeout = self.request_timeout_override;
756        let result = self
757            .entry
758            .capability
759            .open_session(cancellation, progress)
760            .and_then(|mut session| {
761                if let Some(timeout) = request_timeout {
762                    session.set_request_timeout(timeout);
763                }
764                session.run_streaming_command(command, request, rows, diagnostics, cancellation, progress)
765            });
766        self.map_lifecycle_result(result)
767    }
768
769    fn ensure_valid(&self) -> Result<(), LeanWorkerError> {
770        if self.valid {
771            Ok(())
772        } else {
773            Err(LeanWorkerError::LeaseInvalidated {
774                reason: self
775                    .invalidation_reason
776                    .clone()
777                    .unwrap_or_else(|| "lease was invalidated by a worker lifecycle transition".to_owned()),
778            })
779        }
780    }
781
782    fn enforce_policy_before_request(&mut self) -> Result<(), LeanWorkerError> {
783        if let Some(reason) = self.entry.enforce_policy(&self.config)? {
784            self.invalidate(reason.clone());
785            return Err(LeanWorkerError::LeaseInvalidated { reason });
786        }
787        Ok(())
788    }
789
790    fn map_lifecycle_result<T>(&mut self, result: Result<T, LeanWorkerError>) -> Result<T, LeanWorkerError> {
791        if self.request_timeout_override.is_some() {
792            self.entry
793                .capability
794                .worker_mut()
795                .set_request_timeout(self.entry.base_request_timeout);
796        }
797        match result {
798            Ok(value) => {
799                self.entry.last_activity = Instant::now();
800                Ok(value)
801            }
802            Err(err) => {
803                self.entry.last_activity = Instant::now();
804                if invalidates_lease(&err) {
805                    self.invalidate(invalidation_reason(&err));
806                }
807                Err(err)
808            }
809        }
810    }
811
812    fn invalidate(&mut self, reason: impl Into<String>) {
813        self.valid = false;
814        self.invalidation_reason = Some(reason.into());
815    }
816}
817
818impl Drop for LeanWorkerSessionLease<'_> {
819    fn drop(&mut self) {
820        self.entry.active_leases = self.entry.active_leases.saturating_sub(1);
821    }
822}
823
824fn invalidates_lease(err: &LeanWorkerError) -> bool {
825    matches!(
826        err,
827        LeanWorkerError::Cancelled { .. }
828            | LeanWorkerError::Timeout { .. }
829            | LeanWorkerError::ChildExited { .. }
830            | LeanWorkerError::ChildPanicOrAbort { .. }
831            | LeanWorkerError::CapabilityMetadataMismatch { .. }
832    )
833}
834
835fn invalidation_reason(err: &LeanWorkerError) -> String {
836    if let LeanWorkerError::Cancelled { operation } = err {
837        format!("cancelled during {operation}")
838    } else if let LeanWorkerError::Timeout { operation, .. } = err {
839        format!("timed out during {operation}")
840    } else if matches!(err, LeanWorkerError::ChildExited { .. }) {
841        "worker child exited".to_owned()
842    } else if matches!(err, LeanWorkerError::ChildPanicOrAbort { .. }) {
843        "worker child exited fatally".to_owned()
844    } else if let LeanWorkerError::CapabilityMetadataMismatch { export, .. } = err {
845        format!("capability metadata mismatch from {export}")
846    } else {
847        "worker lifecycle transition".to_owned()
848    }
849}