Skip to main content

lean_rs_worker/
pool.rs

1//! Local worker-pool orchestration and session leasing.
2//!
3//! The pool sits above `LeanWorkerCapabilityBuilder` and typed commands. It
4//! chooses a compatible local child process for capability work, while callers
5//! only see session requirements and a lease that can run typed commands.
6
7use std::path::PathBuf;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use serde::Serialize;
12use serde::de::DeserializeOwned;
13use serde_json::Value;
14
15use crate::capability::{LeanWorkerCapability, LeanWorkerCapabilityBuilder};
16use crate::session::{
17    LeanWorkerCancellationToken, LeanWorkerDiagnosticSink, LeanWorkerJsonCommand, LeanWorkerProgressSink,
18    LeanWorkerRuntimeMetadata, LeanWorkerStreamingCommand, LeanWorkerTypedDataSink, LeanWorkerTypedStreamSummary,
19};
20use crate::supervisor::{LeanWorkerError, LeanWorkerRestartReason, LeanWorkerStatus};
21
22/// Coarse restart-policy class used in pool session keys.
23///
24/// The pool key records whether a session was opened under the default policy
25/// or a caller-selected policy class. It deliberately does not expose every
26/// restart-policy knob as key material; memory-aware scheduling and richer
27/// policy admission are not part of the session-key contract.
28#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
29#[non_exhaustive]
30pub enum LeanWorkerRestartPolicyClass {
31    Default,
32    Custom,
33}
34
35/// Worker reuse key for a capability-backed session.
36///
37/// A session key answers only one pool question: can an already-open child
38/// session safely host compatible work? It is not a downstream cache key, and
39/// it does not encode row schemas, cache validity, ranking, reporting, or
40/// source provenance.
41#[derive(Clone, Debug, Eq, PartialEq)]
42pub struct LeanWorkerSessionKey {
43    project_root: PathBuf,
44    package: String,
45    lib_name: String,
46    imports: Vec<String>,
47    metadata_expectation: Option<LeanWorkerMetadataExpectationKey>,
48    toolchain_fingerprint: lean_toolchain::ToolchainFingerprint,
49    restart_policy_class: LeanWorkerRestartPolicyClass,
50}
51
52impl LeanWorkerSessionKey {
53    /// Create a session key from the caller-visible capability requirements.
54    #[must_use]
55    pub fn new(
56        project_root: impl Into<PathBuf>,
57        package: impl Into<String>,
58        lib_name: impl Into<String>,
59        imports: impl IntoIterator<Item = impl Into<String>>,
60    ) -> Self {
61        Self {
62            project_root: project_root.into(),
63            package: package.into(),
64            lib_name: lib_name.into(),
65            imports: imports.into_iter().map(Into::into).collect(),
66            metadata_expectation: None,
67            toolchain_fingerprint: lean_toolchain::ToolchainFingerprint::current(),
68            restart_policy_class: LeanWorkerRestartPolicyClass::Default,
69        }
70    }
71
72    /// Add the metadata expectation used to decide safe session reuse.
73    ///
74    /// `expected` is downstream metadata transported by the generic metadata
75    /// envelope. The pool compares it as opaque facts and does not interpret
76    /// command names or semantic versions.
77    #[must_use]
78    pub fn metadata_expectation(
79        mut self,
80        export: impl Into<String>,
81        request: Value,
82        expected: Option<crate::session::LeanWorkerCapabilityMetadata>,
83    ) -> Self {
84        self.metadata_expectation = Some(LeanWorkerMetadataExpectationKey {
85            export: export.into(),
86            request,
87            expected,
88        });
89        self
90    }
91
92    /// Set the coarse restart-policy class for this session key.
93    #[must_use]
94    pub fn restart_policy_class(mut self, class: LeanWorkerRestartPolicyClass) -> Self {
95        self.restart_policy_class = class;
96        self
97    }
98
99    /// Return the Lake project root for this session key.
100    #[must_use]
101    pub fn project_root(&self) -> &std::path::Path {
102        &self.project_root
103    }
104
105    /// Return the Lake package name for this session key.
106    #[must_use]
107    pub fn package(&self) -> &str {
108        &self.package
109    }
110
111    /// Return the Lake library target for this session key.
112    #[must_use]
113    pub fn lib_name(&self) -> &str {
114        &self.lib_name
115    }
116
117    /// Return the imports required by this session key.
118    #[must_use]
119    pub fn imports(&self) -> &[String] {
120        &self.imports
121    }
122
123    /// Return the build-baked Lean toolchain fingerprint used by this key.
124    #[must_use]
125    pub fn toolchain_fingerprint(&self) -> &lean_toolchain::ToolchainFingerprint {
126        &self.toolchain_fingerprint
127    }
128
129    /// Return the restart-policy class used by this key.
130    #[must_use]
131    pub fn policy_class(&self) -> LeanWorkerRestartPolicyClass {
132        self.restart_policy_class
133    }
134}
135
136#[derive(Clone, Debug, Eq, PartialEq)]
137struct LeanWorkerMetadataExpectationKey {
138    export: String,
139    request: Value,
140    expected: Option<crate::session::LeanWorkerCapabilityMetadata>,
141}
142
143/// Configuration for a local `LeanWorkerPool`.
144#[derive(Clone, Debug, Eq, PartialEq)]
145pub struct LeanWorkerPoolConfig {
146    max_workers: usize,
147    max_total_child_rss_kib: Option<u64>,
148    per_worker_rss_ceiling_kib: Option<u64>,
149    idle_cycle_after: Option<Duration>,
150    queue_wait_timeout: Duration,
151}
152
153impl LeanWorkerPoolConfig {
154    /// Create pool configuration with a fixed local worker limit.
155    #[must_use]
156    pub fn new(max_workers: usize) -> Self {
157        Self {
158            max_workers: max_workers.max(1),
159            max_total_child_rss_kib: None,
160            per_worker_rss_ceiling_kib: None,
161            idle_cycle_after: None,
162            queue_wait_timeout: Duration::ZERO,
163        }
164    }
165
166    /// Return the maximum number of local child workers the pool may own.
167    #[must_use]
168    pub fn max_workers(&self) -> usize {
169        self.max_workers
170    }
171
172    /// Reject new distinct workers when known total child RSS reaches `limit`.
173    ///
174    /// RSS sampling is best effort. On platforms where the pool cannot obtain
175    /// samples, it records unavailable samples and does not make a false
176    /// admission claim.
177    #[must_use]
178    pub fn max_total_child_rss_kib(mut self, limit: u64) -> Self {
179        self.max_total_child_rss_kib = Some(limit.max(1));
180        self
181    }
182
183    /// Cycle a worker before assigning work when its sampled RSS reaches `limit`.
184    #[must_use]
185    pub fn per_worker_rss_ceiling_kib(mut self, limit: u64) -> Self {
186        self.per_worker_rss_ceiling_kib = Some(limit.max(1));
187        self
188    }
189
190    /// Cycle an idle worker before assigning more work through an old lease.
191    #[must_use]
192    pub fn idle_cycle_after(mut self, limit: Duration) -> Self {
193        self.idle_cycle_after = Some(limit);
194        self
195    }
196
197    /// Wait this long for local pool admission before returning a typed error.
198    ///
199    /// The current pool is synchronous. This timeout documents and bounds the
200    /// admission point without exposing worker ids or queue internals.
201    #[must_use]
202    pub fn queue_wait_timeout(mut self, timeout: Duration) -> Self {
203        self.queue_wait_timeout = timeout;
204        self
205    }
206
207    /// Return the configured total child RSS budget in KiB.
208    #[must_use]
209    pub fn max_total_child_rss_kib_limit(&self) -> Option<u64> {
210        self.max_total_child_rss_kib
211    }
212
213    /// Return the configured per-worker RSS ceiling in KiB.
214    #[must_use]
215    pub fn per_worker_rss_ceiling_kib_limit(&self) -> Option<u64> {
216        self.per_worker_rss_ceiling_kib
217    }
218
219    /// Return the configured idle-cycle duration.
220    #[must_use]
221    pub fn idle_cycle_after_limit(&self) -> Option<Duration> {
222        self.idle_cycle_after
223    }
224
225    /// Return the configured pool admission wait timeout.
226    #[must_use]
227    pub fn queue_wait_timeout_limit(&self) -> Duration {
228        self.queue_wait_timeout
229    }
230}
231
232impl Default for LeanWorkerPoolConfig {
233    fn default() -> Self {
234        Self::new(1)
235    }
236}
237
238/// Summary of public pool state.
239///
240/// This snapshot exposes admission and reuse facts without revealing worker
241/// ids, child pids, pipe handles, or which warm child will be selected.
242#[derive(Clone, Debug, Eq, PartialEq)]
243pub struct LeanWorkerPoolSnapshot {
244    pub max_workers: usize,
245    pub workers: usize,
246    pub active_workers: usize,
247    pub warm_leases: usize,
248    pub queue_depth: usize,
249    pub total_child_rss_kib: Option<u64>,
250    pub rss_samples_unavailable: u64,
251    pub requests: u64,
252    pub imports: u64,
253    pub worker_restarts: u64,
254    pub max_request_restarts: u64,
255    pub max_import_restarts: u64,
256    pub rss_restarts: u64,
257    pub idle_restarts: u64,
258    pub cancelled_restarts: u64,
259    pub timeout_restarts: u64,
260    pub policy_restarts: u64,
261    pub queue_timeouts: u64,
262    pub memory_budget_rejections: u64,
263    pub last_restart_reason: Option<LeanWorkerRestartReason>,
264    pub stream_requests: u64,
265    pub stream_successes: u64,
266    pub stream_failures: u64,
267    pub data_rows_delivered: u64,
268    pub data_row_payload_bytes: u64,
269    pub stream_elapsed: Duration,
270    pub backpressure_waits: u64,
271    pub backpressure_failures: u64,
272}
273
274/// Local pool for worker-backed capability sessions.
275#[derive(Debug)]
276pub struct LeanWorkerPool {
277    config: LeanWorkerPoolConfig,
278    entries: Vec<PoolEntry>,
279    queue_timeouts: u64,
280    memory_budget_rejections: u64,
281}
282
283impl LeanWorkerPool {
284    /// Create an empty local worker pool.
285    #[must_use]
286    pub fn new(config: LeanWorkerPoolConfig) -> Self {
287        Self {
288            config,
289            entries: Vec::new(),
290            queue_timeouts: 0,
291            memory_budget_rejections: 0,
292        }
293    }
294
295    /// Acquire a lease for the capability described by `builder`.
296    ///
297    /// The pool reuses a warm compatible worker session when possible. If a
298    /// matching worker has died, the pool replaces it before returning the
299    /// lease. If admitting a distinct session key would exceed `max_workers`,
300    /// the pool returns `LeanWorkerError::WorkerPoolExhausted`.
301    ///
302    /// # Errors
303    ///
304    /// Returns `LeanWorkerError` when the capability cannot be built, metadata
305    /// validation fails, a dead compatible worker cannot be replaced, or the
306    /// fixed local worker limit is already full of distinct session keys.
307    pub fn acquire_lease(
308        &mut self,
309        builder: LeanWorkerCapabilityBuilder,
310    ) -> Result<LeanWorkerSessionLease<'_>, LeanWorkerError> {
311        let key = builder.session_key();
312        if let Some(index) = self.entries.iter().position(|entry| entry.key == key) {
313            self.ensure_entry_running(index)?;
314            self.enforce_entry_policy_before_assignment(index)?;
315            let entry = self.entries.get_mut(index).ok_or_else(|| LeanWorkerError::Protocol {
316                message: "worker pool entry disappeared during lease acquisition".to_owned(),
317            })?;
318            entry.active_leases = entry.active_leases.saturating_add(1);
319            return Ok(LeanWorkerSessionLease {
320                entry,
321                config: self.config.clone(),
322                valid: true,
323                invalidation_reason: None,
324                request_timeout_override: None,
325            });
326        }
327
328        if self.entries.len() >= self.config.max_workers {
329            return self.pool_full_error();
330        }
331        self.ensure_spawn_within_total_rss_budget()?;
332
333        let capability = builder.clone().open()?;
334        let base_request_timeout = builder.pool_request_timeout();
335        self.entries.push(PoolEntry {
336            key,
337            builder,
338            capability,
339            base_request_timeout,
340            last_rss_kib: None,
341            rss_samples_unavailable: 0,
342            last_activity: Instant::now(),
343            last_restart_reason: None,
344            policy_restarts: 0,
345            active_leases: 0,
346        });
347        let entry = self.entries.last_mut().ok_or_else(|| LeanWorkerError::Protocol {
348            message: "worker pool failed to retain newly opened entry".to_owned(),
349        })?;
350        let _ = entry.sample_rss();
351        entry.active_leases = entry.active_leases.saturating_add(1);
352        Ok(LeanWorkerSessionLease {
353            entry,
354            config: self.config.clone(),
355            valid: true,
356            invalidation_reason: None,
357            request_timeout_override: None,
358        })
359    }
360
361    /// Return a public snapshot of pool state.
362    #[must_use]
363    pub fn snapshot(&self) -> LeanWorkerPoolSnapshot {
364        snapshot_from_entries(
365            &self.config,
366            &self.entries,
367            self.queue_timeouts,
368            self.memory_budget_rejections,
369        )
370    }
371
372    fn snapshot_from_lease_config(config: &LeanWorkerPoolConfig, entry: &PoolEntry) -> LeanWorkerPoolSnapshot {
373        snapshot_from_entries(config, std::slice::from_ref(entry), 0, 0)
374    }
375}
376
377fn snapshot_from_entries(
378    config: &LeanWorkerPoolConfig,
379    entries: &[PoolEntry],
380    queue_timeouts: u64,
381    memory_budget_rejections: u64,
382) -> LeanWorkerPoolSnapshot {
383    LeanWorkerPoolSnapshot {
384        max_workers: config.max_workers,
385        workers: entries.len(),
386        active_workers: entries.iter().filter(|entry| entry.active_leases > 0).count(),
387        warm_leases: entries.iter().filter(|entry| entry.active_leases == 0).count(),
388        queue_depth: 0,
389        total_child_rss_kib: total_known_child_rss_kib(entries),
390        rss_samples_unavailable: entries.iter().map(|entry| entry.rss_samples_unavailable).sum(),
391        requests: entries
392            .iter()
393            .map(|entry| entry.capability.worker().stats().requests)
394            .sum(),
395        imports: entries
396            .iter()
397            .map(|entry| entry.capability.worker().stats().imports)
398            .sum(),
399        worker_restarts: entries
400            .iter()
401            .map(|entry| entry.capability.worker().stats().restarts)
402            .sum(),
403        max_request_restarts: entries
404            .iter()
405            .map(|entry| entry.capability.worker().stats().max_request_restarts)
406            .sum(),
407        max_import_restarts: entries
408            .iter()
409            .map(|entry| entry.capability.worker().stats().max_import_restarts)
410            .sum(),
411        rss_restarts: entries
412            .iter()
413            .map(|entry| entry.capability.worker().stats().rss_restarts)
414            .sum(),
415        idle_restarts: entries
416            .iter()
417            .map(|entry| entry.capability.worker().stats().idle_restarts)
418            .sum(),
419        cancelled_restarts: entries
420            .iter()
421            .map(|entry| entry.capability.worker().stats().cancelled_restarts)
422            .sum(),
423        timeout_restarts: entries
424            .iter()
425            .map(|entry| entry.capability.worker().stats().timeout_restarts)
426            .sum(),
427        policy_restarts: entries.iter().map(|entry| entry.policy_restarts).sum(),
428        queue_timeouts,
429        memory_budget_rejections,
430        last_restart_reason: entries.iter().rev().find_map(|entry| entry.last_restart_reason.clone()),
431        stream_requests: entries
432            .iter()
433            .map(|entry| entry.capability.worker().stats().stream_requests)
434            .sum(),
435        stream_successes: entries
436            .iter()
437            .map(|entry| entry.capability.worker().stats().stream_successes)
438            .sum(),
439        stream_failures: entries
440            .iter()
441            .map(|entry| entry.capability.worker().stats().stream_failures)
442            .sum(),
443        data_rows_delivered: entries
444            .iter()
445            .map(|entry| entry.capability.worker().stats().data_rows_delivered)
446            .sum(),
447        data_row_payload_bytes: entries
448            .iter()
449            .map(|entry| entry.capability.worker().stats().data_row_payload_bytes)
450            .sum(),
451        stream_elapsed: entries.iter().fold(Duration::ZERO, |acc, entry| {
452            acc.saturating_add(entry.capability.worker().stats().stream_elapsed)
453        }),
454        backpressure_waits: entries
455            .iter()
456            .map(|entry| entry.capability.worker().stats().backpressure_waits)
457            .sum(),
458        backpressure_failures: entries
459            .iter()
460            .map(|entry| entry.capability.worker().stats().backpressure_failures)
461            .sum(),
462    }
463}
464
465fn total_known_child_rss_kib(entries: &[PoolEntry]) -> Option<u64> {
466    entries
467        .iter()
468        .map(|entry| entry.last_rss_kib)
469        .try_fold(0_u64, |acc, value| value.map(|rss| acc.saturating_add(rss)))
470}
471
472impl LeanWorkerPool {
473    fn ensure_entry_running(&mut self, index: usize) -> Result<(), LeanWorkerError> {
474        let entry = self.entries.get_mut(index).ok_or_else(|| LeanWorkerError::Protocol {
475            message: "worker pool entry disappeared during liveness check".to_owned(),
476        })?;
477        match entry.capability.worker_mut().status()? {
478            LeanWorkerStatus::Running => Ok(()),
479            LeanWorkerStatus::Exited(_exit) => {
480                entry.capability = entry.builder.clone().open()?;
481                entry.last_activity = Instant::now();
482                Ok(())
483            }
484        }
485    }
486
487    fn enforce_entry_policy_before_assignment(&mut self, index: usize) -> Result<(), LeanWorkerError> {
488        let entry = self.entries.get_mut(index).ok_or_else(|| LeanWorkerError::Protocol {
489            message: "worker pool entry disappeared during policy check".to_owned(),
490        })?;
491        entry.enforce_policy(&self.config).map(|_| ())
492    }
493
494    fn ensure_spawn_within_total_rss_budget(&mut self) -> Result<(), LeanWorkerError> {
495        let Some(limit_kib) = self.config.max_total_child_rss_kib else {
496            return Ok(());
497        };
498        let rss = self.refresh_total_child_rss();
499        if rss.unavailable > 0 {
500            return Ok(());
501        }
502        if rss.total_kib >= limit_kib {
503            self.memory_budget_rejections = self.memory_budget_rejections.saturating_add(1);
504            return Err(LeanWorkerError::WorkerPoolMemoryBudgetExceeded {
505                current_kib: rss.total_kib,
506                limit_kib,
507            });
508        }
509        Ok(())
510    }
511
512    fn refresh_total_child_rss(&mut self) -> PoolRssTotal {
513        let mut total_kib = 0_u64;
514        let mut unavailable = 0_u64;
515        for entry in &mut self.entries {
516            match entry.sample_rss() {
517                Some(value) => {
518                    total_kib = total_kib.saturating_add(value);
519                }
520                None => {
521                    unavailable = unavailable.saturating_add(1);
522                }
523            }
524        }
525        PoolRssTotal { total_kib, unavailable }
526    }
527
528    fn pool_full_error<T>(&mut self) -> Result<T, LeanWorkerError> {
529        if self.config.queue_wait_timeout.is_zero() {
530            return Err(LeanWorkerError::WorkerPoolExhausted {
531                max_workers: self.config.max_workers,
532            });
533        }
534        let started = Instant::now();
535        while started.elapsed() < self.config.queue_wait_timeout {
536            let remaining = self.config.queue_wait_timeout.saturating_sub(started.elapsed());
537            thread::sleep(remaining.min(Duration::from_millis(10)));
538        }
539        self.queue_timeouts = self.queue_timeouts.saturating_add(1);
540        Err(LeanWorkerError::WorkerPoolQueueTimeout {
541            waited: self.config.queue_wait_timeout,
542        })
543    }
544}
545
546impl Default for LeanWorkerPool {
547    fn default() -> Self {
548        Self::new(LeanWorkerPoolConfig::default())
549    }
550}
551
552#[derive(Debug)]
553struct PoolEntry {
554    key: LeanWorkerSessionKey,
555    builder: LeanWorkerCapabilityBuilder,
556    capability: LeanWorkerCapability,
557    base_request_timeout: Duration,
558    last_rss_kib: Option<u64>,
559    rss_samples_unavailable: u64,
560    last_activity: Instant,
561    last_restart_reason: Option<LeanWorkerRestartReason>,
562    policy_restarts: u64,
563    active_leases: u64,
564}
565
566impl PoolEntry {
567    fn sample_rss(&mut self) -> Option<u64> {
568        match self.capability.worker_mut().rss_kib() {
569            Some(value) => {
570                self.last_rss_kib = Some(value);
571                Some(value)
572            }
573            None => {
574                self.rss_samples_unavailable = self.rss_samples_unavailable.saturating_add(1);
575                None
576            }
577        }
578    }
579
580    fn enforce_policy(&mut self, config: &LeanWorkerPoolConfig) -> Result<Option<String>, LeanWorkerError> {
581        if let Some(limit_kib) = config.per_worker_rss_ceiling_kib {
582            match self.sample_rss() {
583                Some(current_kib) if current_kib >= limit_kib => {
584                    let reason = LeanWorkerRestartReason::RssCeiling { current_kib, limit_kib };
585                    self.cycle_for_policy(reason)?;
586                    return Ok(Some(format!(
587                        "memory policy cycled worker at {current_kib} KiB RSS with limit {limit_kib} KiB"
588                    )));
589                }
590                Some(_) | None => {}
591            }
592        }
593
594        if let Some(limit) = config.idle_cycle_after {
595            let idle_for = self.last_activity.elapsed();
596            if idle_for >= limit {
597                let reason = LeanWorkerRestartReason::Idle { idle_for, limit };
598                self.cycle_for_policy(reason)?;
599                return Ok(Some(format!(
600                    "idle policy cycled worker after {idle_for:?} idle with limit {limit:?}"
601                )));
602            }
603        }
604
605        Ok(None)
606    }
607
608    fn cycle_for_policy(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
609        self.capability.worker_mut().cycle_with_restart_reason(reason.clone())?;
610        self.last_restart_reason = Some(reason);
611        self.last_activity = Instant::now();
612        self.last_rss_kib = None;
613        self.policy_restarts = self.policy_restarts.saturating_add(1);
614        Ok(())
615    }
616}
617
618#[derive(Clone, Copy, Debug, Eq, PartialEq)]
619struct PoolRssTotal {
620    total_kib: u64,
621    unavailable: u64,
622}
623
624/// Borrowed lease for running typed commands on a compatible worker session.
625///
626/// The lease does not expose which worker was selected. If a command triggers
627/// timeout, cancellation, child failure, or explicit cycle, the lease becomes
628/// invalid and a fresh lease must be acquired from the pool.
629#[derive(Debug)]
630pub struct LeanWorkerSessionLease<'pool> {
631    entry: &'pool mut PoolEntry,
632    config: LeanWorkerPoolConfig,
633    valid: bool,
634    invalidation_reason: Option<String>,
635    request_timeout_override: Option<Duration>,
636}
637
638impl LeanWorkerSessionLease<'_> {
639    /// Return the session key that justified this lease.
640    #[must_use]
641    pub fn session_key(&self) -> &LeanWorkerSessionKey {
642        &self.entry.key
643    }
644
645    /// Return protocol/runtime facts reported by the leased worker child.
646    #[must_use]
647    pub fn runtime_metadata(&self) -> LeanWorkerRuntimeMetadata {
648        self.entry.capability.runtime_metadata()
649    }
650
651    /// Return whether this lease can still run commands.
652    #[must_use]
653    pub fn is_valid(&self) -> bool {
654        self.valid
655    }
656
657    /// Return an operational snapshot for the worker entry behind this lease.
658    ///
659    /// This is the sampling hook to use while a lease is checked out. It keeps
660    /// child identity, pipe state, and protocol details hidden; the snapshot
661    /// only reports the same aggregate counters as `LeanWorkerPool::snapshot`
662    /// for the leased entry.
663    #[must_use]
664    pub fn snapshot(&self) -> LeanWorkerPoolSnapshot {
665        LeanWorkerPool::snapshot_from_lease_config(&self.config, self.entry)
666    }
667
668    /// Explicitly cycle the leased worker and invalidate this lease.
669    ///
670    /// Acquire a fresh lease before running more work. The pool keeps the
671    /// restarted child available for compatible future leases.
672    ///
673    /// # Errors
674    ///
675    /// Returns `LeanWorkerError` if the lease was already invalid or the
676    /// underlying worker cannot be cycled.
677    pub fn cycle(&mut self) -> Result<(), LeanWorkerError> {
678        self.ensure_valid()?;
679        self.entry.capability.worker_mut().cycle()?;
680        self.invalidate("explicit worker cycle");
681        Ok(())
682    }
683
684    /// Set the request timeout for commands run through this lease.
685    ///
686    /// The pool and supervisor still own the watchdog, child kill, and restart
687    /// bookkeeping. This method only selects the deadline for subsequent
688    /// leased requests.
689    ///
690    /// # Errors
691    ///
692    /// Returns `LeanWorkerError` if the lease was already invalidated.
693    pub fn set_request_timeout(&mut self, timeout: Duration) -> Result<(), LeanWorkerError> {
694        self.ensure_valid()?;
695        self.request_timeout_override = Some(timeout);
696        Ok(())
697    }
698
699    /// Run a typed non-streaming downstream JSON command through this lease.
700    ///
701    /// # Errors
702    ///
703    /// Returns `LeanWorkerError` for invalidated leases, session startup
704    /// failures, typed command errors, cancellation, timeout, child failure,
705    /// progress panic, or protocol failure.
706    pub fn run_json_command<Req, Resp>(
707        &mut self,
708        command: &LeanWorkerJsonCommand<Req, Resp>,
709        request: &Req,
710        cancellation: Option<&LeanWorkerCancellationToken>,
711        progress: Option<&dyn LeanWorkerProgressSink>,
712    ) -> Result<Resp, LeanWorkerError>
713    where
714        Req: Serialize,
715        Resp: DeserializeOwned,
716    {
717        self.ensure_valid()?;
718        self.enforce_policy_before_request()?;
719        let request_timeout = self.request_timeout_override;
720        let result = self
721            .entry
722            .capability
723            .open_session(cancellation, progress)
724            .and_then(|mut session| {
725                if let Some(timeout) = request_timeout {
726                    session.set_request_timeout(timeout);
727                }
728                session.run_json_command(command, request, cancellation, progress)
729            });
730        self.map_lifecycle_result(result)
731    }
732
733    /// Run a typed downstream streaming command through this lease.
734    ///
735    /// # Errors
736    ///
737    /// Returns `LeanWorkerError` for invalidated leases, row or summary decode
738    /// errors, sink failures, cancellation, timeout, child failure, or protocol
739    /// failure.
740    pub fn run_streaming_command<Req, Row, Summary>(
741        &mut self,
742        command: &LeanWorkerStreamingCommand<Req, Row, Summary>,
743        request: &Req,
744        rows: &dyn LeanWorkerTypedDataSink<Row>,
745        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
746        cancellation: Option<&LeanWorkerCancellationToken>,
747        progress: Option<&dyn LeanWorkerProgressSink>,
748    ) -> Result<LeanWorkerTypedStreamSummary<Summary>, LeanWorkerError>
749    where
750        Req: Serialize,
751        Row: DeserializeOwned,
752        Summary: DeserializeOwned,
753    {
754        self.ensure_valid()?;
755        self.enforce_policy_before_request()?;
756        let request_timeout = self.request_timeout_override;
757        let result = self
758            .entry
759            .capability
760            .open_session(cancellation, progress)
761            .and_then(|mut session| {
762                if let Some(timeout) = request_timeout {
763                    session.set_request_timeout(timeout);
764                }
765                session.run_streaming_command(command, request, rows, diagnostics, cancellation, progress)
766            });
767        self.map_lifecycle_result(result)
768    }
769
770    fn ensure_valid(&self) -> Result<(), LeanWorkerError> {
771        if self.valid {
772            Ok(())
773        } else {
774            Err(LeanWorkerError::LeaseInvalidated {
775                reason: self
776                    .invalidation_reason
777                    .clone()
778                    .unwrap_or_else(|| "lease was invalidated by a worker lifecycle transition".to_owned()),
779            })
780        }
781    }
782
783    fn enforce_policy_before_request(&mut self) -> Result<(), LeanWorkerError> {
784        if let Some(reason) = self.entry.enforce_policy(&self.config)? {
785            self.invalidate(reason.clone());
786            return Err(LeanWorkerError::LeaseInvalidated { reason });
787        }
788        Ok(())
789    }
790
791    fn map_lifecycle_result<T>(&mut self, result: Result<T, LeanWorkerError>) -> Result<T, LeanWorkerError> {
792        if self.request_timeout_override.is_some() {
793            self.entry
794                .capability
795                .worker_mut()
796                .set_request_timeout(self.entry.base_request_timeout);
797        }
798        match result {
799            Ok(value) => {
800                self.entry.last_activity = Instant::now();
801                Ok(value)
802            }
803            Err(err) => {
804                self.entry.last_activity = Instant::now();
805                if invalidates_lease(&err) {
806                    self.invalidate(invalidation_reason(&err));
807                }
808                Err(err)
809            }
810        }
811    }
812
813    fn invalidate(&mut self, reason: impl Into<String>) {
814        self.valid = false;
815        self.invalidation_reason = Some(reason.into());
816    }
817}
818
819impl Drop for LeanWorkerSessionLease<'_> {
820    fn drop(&mut self) {
821        self.entry.active_leases = self.entry.active_leases.saturating_sub(1);
822    }
823}
824
825fn invalidates_lease(err: &LeanWorkerError) -> bool {
826    matches!(
827        err,
828        LeanWorkerError::Cancelled { .. }
829            | LeanWorkerError::Timeout { .. }
830            | LeanWorkerError::ChildExited { .. }
831            | LeanWorkerError::ChildPanicOrAbort { .. }
832            | LeanWorkerError::CapabilityMetadataMismatch { .. }
833    )
834}
835
836fn invalidation_reason(err: &LeanWorkerError) -> String {
837    if let LeanWorkerError::Cancelled { operation } = err {
838        format!("cancelled during {operation}")
839    } else if let LeanWorkerError::Timeout { operation, .. } = err {
840        format!("timed out during {operation}")
841    } else if matches!(err, LeanWorkerError::ChildExited { .. }) {
842        "worker child exited".to_owned()
843    } else if matches!(err, LeanWorkerError::ChildPanicOrAbort { .. }) {
844        "worker child exited fatally".to_owned()
845    } else if let LeanWorkerError::CapabilityMetadataMismatch { export, .. } = err {
846        format!("capability metadata mismatch from {export}")
847    } else {
848        "worker lifecycle transition".to_owned()
849    }
850}