1use std::path::PathBuf;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use serde::Serialize;
12use serde::de::DeserializeOwned;
13use serde_json::Value;
14
15use crate::capability::{LeanWorkerCapability, LeanWorkerCapabilityBuilder};
16use crate::session::{
17 LeanWorkerCancellationToken, LeanWorkerDiagnosticSink, LeanWorkerJsonCommand, LeanWorkerProgressSink,
18 LeanWorkerRuntimeMetadata, LeanWorkerStreamingCommand, LeanWorkerTypedDataSink, LeanWorkerTypedStreamSummary,
19};
20use crate::supervisor::{LeanWorkerError, LeanWorkerRestartReason, LeanWorkerStatus};
21
22#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
29pub enum LeanWorkerRestartPolicyClass {
30 Default,
31 Custom,
32}
33
34#[derive(Clone, Debug, Eq, PartialEq)]
41pub struct LeanWorkerSessionKey {
42 project_root: PathBuf,
43 package: String,
44 lib_name: String,
45 imports: Vec<String>,
46 metadata_expectation: Option<LeanWorkerMetadataExpectationKey>,
47 toolchain_fingerprint: lean_toolchain::ToolchainFingerprint,
48 restart_policy_class: LeanWorkerRestartPolicyClass,
49}
50
51impl LeanWorkerSessionKey {
52 #[must_use]
54 pub fn new(
55 project_root: impl Into<PathBuf>,
56 package: impl Into<String>,
57 lib_name: impl Into<String>,
58 imports: impl IntoIterator<Item = impl Into<String>>,
59 ) -> Self {
60 Self {
61 project_root: project_root.into(),
62 package: package.into(),
63 lib_name: lib_name.into(),
64 imports: imports.into_iter().map(Into::into).collect(),
65 metadata_expectation: None,
66 toolchain_fingerprint: lean_toolchain::ToolchainFingerprint::current(),
67 restart_policy_class: LeanWorkerRestartPolicyClass::Default,
68 }
69 }
70
71 #[must_use]
77 pub fn metadata_expectation(
78 mut self,
79 export: impl Into<String>,
80 request: Value,
81 expected: Option<crate::types::LeanWorkerCapabilityMetadata>,
82 ) -> Self {
83 self.metadata_expectation = Some(LeanWorkerMetadataExpectationKey {
84 export: export.into(),
85 request,
86 expected,
87 });
88 self
89 }
90
91 #[must_use]
93 pub fn restart_policy_class(mut self, class: LeanWorkerRestartPolicyClass) -> Self {
94 self.restart_policy_class = class;
95 self
96 }
97
98 #[must_use]
100 pub fn project_root(&self) -> &std::path::Path {
101 &self.project_root
102 }
103
104 #[must_use]
106 pub fn package(&self) -> &str {
107 &self.package
108 }
109
110 #[must_use]
112 pub fn lib_name(&self) -> &str {
113 &self.lib_name
114 }
115
116 #[must_use]
118 pub fn imports(&self) -> &[String] {
119 &self.imports
120 }
121
122 #[must_use]
124 pub fn toolchain_fingerprint(&self) -> &lean_toolchain::ToolchainFingerprint {
125 &self.toolchain_fingerprint
126 }
127
128 #[must_use]
130 pub fn policy_class(&self) -> LeanWorkerRestartPolicyClass {
131 self.restart_policy_class
132 }
133}
134
135#[derive(Clone, Debug, Eq, PartialEq)]
136struct LeanWorkerMetadataExpectationKey {
137 export: String,
138 request: Value,
139 expected: Option<crate::types::LeanWorkerCapabilityMetadata>,
140}
141
142#[derive(Clone, Debug, Eq, PartialEq)]
144pub struct LeanWorkerPoolConfig {
145 max_workers: usize,
146 max_total_child_rss_kib: Option<u64>,
147 per_worker_rss_ceiling_kib: Option<u64>,
148 idle_cycle_after: Option<Duration>,
149 queue_wait_timeout: Duration,
150}
151
152impl LeanWorkerPoolConfig {
153 #[must_use]
155 pub fn new(max_workers: usize) -> Self {
156 Self {
157 max_workers: max_workers.max(1),
158 max_total_child_rss_kib: None,
159 per_worker_rss_ceiling_kib: None,
160 idle_cycle_after: None,
161 queue_wait_timeout: Duration::ZERO,
162 }
163 }
164
165 #[must_use]
167 pub fn max_workers(&self) -> usize {
168 self.max_workers
169 }
170
171 #[must_use]
177 pub fn max_total_child_rss_kib(mut self, limit: u64) -> Self {
178 self.max_total_child_rss_kib = Some(limit.max(1));
179 self
180 }
181
182 #[must_use]
184 pub fn per_worker_rss_ceiling_kib(mut self, limit: u64) -> Self {
185 self.per_worker_rss_ceiling_kib = Some(limit.max(1));
186 self
187 }
188
189 #[must_use]
191 pub fn idle_cycle_after(mut self, limit: Duration) -> Self {
192 self.idle_cycle_after = Some(limit);
193 self
194 }
195
196 #[must_use]
201 pub fn queue_wait_timeout(mut self, timeout: Duration) -> Self {
202 self.queue_wait_timeout = timeout;
203 self
204 }
205
206 #[must_use]
208 pub fn max_total_child_rss_kib_limit(&self) -> Option<u64> {
209 self.max_total_child_rss_kib
210 }
211
212 #[must_use]
214 pub fn per_worker_rss_ceiling_kib_limit(&self) -> Option<u64> {
215 self.per_worker_rss_ceiling_kib
216 }
217
218 #[must_use]
220 pub fn idle_cycle_after_limit(&self) -> Option<Duration> {
221 self.idle_cycle_after
222 }
223
224 #[must_use]
226 pub fn queue_wait_timeout_limit(&self) -> Duration {
227 self.queue_wait_timeout
228 }
229}
230
231impl Default for LeanWorkerPoolConfig {
232 fn default() -> Self {
233 Self::new(1)
234 }
235}
236
237#[derive(Clone, Debug, Eq, PartialEq)]
242pub struct LeanWorkerPoolSnapshot {
243 pub max_workers: usize,
244 pub workers: usize,
245 pub active_workers: usize,
246 pub warm_leases: usize,
247 pub queue_depth: usize,
248 pub total_child_rss_kib: Option<u64>,
249 pub rss_samples_unavailable: u64,
250 pub requests: u64,
251 pub imports: u64,
252 pub worker_restarts: u64,
253 pub max_request_restarts: u64,
254 pub max_import_restarts: u64,
255 pub rss_restarts: u64,
256 pub idle_restarts: u64,
257 pub cancelled_restarts: u64,
258 pub timeout_restarts: u64,
259 pub policy_restarts: u64,
260 pub queue_timeouts: u64,
261 pub memory_budget_rejections: u64,
262 pub last_restart_reason: Option<LeanWorkerRestartReason>,
263 pub stream_requests: u64,
264 pub stream_successes: u64,
265 pub stream_failures: u64,
266 pub data_rows_delivered: u64,
267 pub data_row_payload_bytes: u64,
268 pub stream_elapsed: Duration,
269 pub backpressure_waits: u64,
270 pub backpressure_failures: u64,
271}
272
273#[derive(Debug)]
275pub struct LeanWorkerPool {
276 config: LeanWorkerPoolConfig,
277 entries: Vec<PoolEntry>,
278 queue_timeouts: u64,
279 memory_budget_rejections: u64,
280}
281
282impl LeanWorkerPool {
283 #[must_use]
285 pub fn new(config: LeanWorkerPoolConfig) -> Self {
286 Self {
287 config,
288 entries: Vec::new(),
289 queue_timeouts: 0,
290 memory_budget_rejections: 0,
291 }
292 }
293
294 pub fn acquire_lease(
307 &mut self,
308 builder: LeanWorkerCapabilityBuilder,
309 ) -> Result<LeanWorkerSessionLease<'_>, LeanWorkerError> {
310 let key = builder.session_key();
311 if let Some(index) = self.entries.iter().position(|entry| entry.key == key) {
312 self.ensure_entry_running(index)?;
313 self.enforce_entry_policy_before_assignment(index)?;
314 let entry = self.entries.get_mut(index).ok_or_else(|| LeanWorkerError::Protocol {
315 message: "worker pool entry disappeared during lease acquisition".to_owned(),
316 })?;
317 entry.active_leases = entry.active_leases.saturating_add(1);
318 return Ok(LeanWorkerSessionLease {
319 entry,
320 config: self.config.clone(),
321 valid: true,
322 invalidation_reason: None,
323 request_timeout_override: None,
324 });
325 }
326
327 if self.entries.len() >= self.config.max_workers {
328 return self.pool_full_error();
329 }
330 self.ensure_spawn_within_total_rss_budget()?;
331
332 let capability = builder.clone().open()?;
333 let base_request_timeout = builder.pool_request_timeout();
334 self.entries.push(PoolEntry {
335 key,
336 builder,
337 capability,
338 base_request_timeout,
339 last_rss_kib: None,
340 rss_samples_unavailable: 0,
341 last_activity: Instant::now(),
342 last_restart_reason: None,
343 policy_restarts: 0,
344 active_leases: 0,
345 });
346 let entry = self.entries.last_mut().ok_or_else(|| LeanWorkerError::Protocol {
347 message: "worker pool failed to retain newly opened entry".to_owned(),
348 })?;
349 let _ = entry.sample_rss();
350 entry.active_leases = entry.active_leases.saturating_add(1);
351 Ok(LeanWorkerSessionLease {
352 entry,
353 config: self.config.clone(),
354 valid: true,
355 invalidation_reason: None,
356 request_timeout_override: None,
357 })
358 }
359
360 #[must_use]
362 pub fn snapshot(&self) -> LeanWorkerPoolSnapshot {
363 snapshot_from_entries(
364 &self.config,
365 &self.entries,
366 self.queue_timeouts,
367 self.memory_budget_rejections,
368 )
369 }
370
371 fn snapshot_from_lease_config(config: &LeanWorkerPoolConfig, entry: &PoolEntry) -> LeanWorkerPoolSnapshot {
372 snapshot_from_entries(config, std::slice::from_ref(entry), 0, 0)
373 }
374}
375
376fn snapshot_from_entries(
377 config: &LeanWorkerPoolConfig,
378 entries: &[PoolEntry],
379 queue_timeouts: u64,
380 memory_budget_rejections: u64,
381) -> LeanWorkerPoolSnapshot {
382 LeanWorkerPoolSnapshot {
383 max_workers: config.max_workers,
384 workers: entries.len(),
385 active_workers: entries.iter().filter(|entry| entry.active_leases > 0).count(),
386 warm_leases: entries.iter().filter(|entry| entry.active_leases == 0).count(),
387 queue_depth: 0,
388 total_child_rss_kib: total_known_child_rss_kib(entries),
389 rss_samples_unavailable: entries.iter().map(|entry| entry.rss_samples_unavailable).sum(),
390 requests: entries
391 .iter()
392 .map(|entry| entry.capability.worker().stats().requests)
393 .sum(),
394 imports: entries
395 .iter()
396 .map(|entry| entry.capability.worker().stats().imports)
397 .sum(),
398 worker_restarts: entries
399 .iter()
400 .map(|entry| entry.capability.worker().stats().restarts)
401 .sum(),
402 max_request_restarts: entries
403 .iter()
404 .map(|entry| entry.capability.worker().stats().max_request_restarts)
405 .sum(),
406 max_import_restarts: entries
407 .iter()
408 .map(|entry| entry.capability.worker().stats().max_import_restarts)
409 .sum(),
410 rss_restarts: entries
411 .iter()
412 .map(|entry| entry.capability.worker().stats().rss_restarts)
413 .sum(),
414 idle_restarts: entries
415 .iter()
416 .map(|entry| entry.capability.worker().stats().idle_restarts)
417 .sum(),
418 cancelled_restarts: entries
419 .iter()
420 .map(|entry| entry.capability.worker().stats().cancelled_restarts)
421 .sum(),
422 timeout_restarts: entries
423 .iter()
424 .map(|entry| entry.capability.worker().stats().timeout_restarts)
425 .sum(),
426 policy_restarts: entries.iter().map(|entry| entry.policy_restarts).sum(),
427 queue_timeouts,
428 memory_budget_rejections,
429 last_restart_reason: entries.iter().rev().find_map(|entry| entry.last_restart_reason.clone()),
430 stream_requests: entries
431 .iter()
432 .map(|entry| entry.capability.worker().stats().stream_requests)
433 .sum(),
434 stream_successes: entries
435 .iter()
436 .map(|entry| entry.capability.worker().stats().stream_successes)
437 .sum(),
438 stream_failures: entries
439 .iter()
440 .map(|entry| entry.capability.worker().stats().stream_failures)
441 .sum(),
442 data_rows_delivered: entries
443 .iter()
444 .map(|entry| entry.capability.worker().stats().data_rows_delivered)
445 .sum(),
446 data_row_payload_bytes: entries
447 .iter()
448 .map(|entry| entry.capability.worker().stats().data_row_payload_bytes)
449 .sum(),
450 stream_elapsed: entries.iter().fold(Duration::ZERO, |acc, entry| {
451 acc.saturating_add(entry.capability.worker().stats().stream_elapsed)
452 }),
453 backpressure_waits: entries
454 .iter()
455 .map(|entry| entry.capability.worker().stats().backpressure_waits)
456 .sum(),
457 backpressure_failures: entries
458 .iter()
459 .map(|entry| entry.capability.worker().stats().backpressure_failures)
460 .sum(),
461 }
462}
463
464fn total_known_child_rss_kib(entries: &[PoolEntry]) -> Option<u64> {
465 entries
466 .iter()
467 .map(|entry| entry.last_rss_kib)
468 .try_fold(0_u64, |acc, value| value.map(|rss| acc.saturating_add(rss)))
469}
470
471impl LeanWorkerPool {
472 fn ensure_entry_running(&mut self, index: usize) -> Result<(), LeanWorkerError> {
473 let entry = self.entries.get_mut(index).ok_or_else(|| LeanWorkerError::Protocol {
474 message: "worker pool entry disappeared during liveness check".to_owned(),
475 })?;
476 match entry.capability.worker_mut().status()? {
477 LeanWorkerStatus::Running => Ok(()),
478 LeanWorkerStatus::Exited(_exit) => {
479 entry.capability = entry.builder.clone().open()?;
480 entry.last_activity = Instant::now();
481 Ok(())
482 }
483 }
484 }
485
486 fn enforce_entry_policy_before_assignment(&mut self, index: usize) -> Result<(), LeanWorkerError> {
487 let entry = self.entries.get_mut(index).ok_or_else(|| LeanWorkerError::Protocol {
488 message: "worker pool entry disappeared during policy check".to_owned(),
489 })?;
490 entry.enforce_policy(&self.config).map(|_| ())
491 }
492
493 fn ensure_spawn_within_total_rss_budget(&mut self) -> Result<(), LeanWorkerError> {
494 let Some(limit_kib) = self.config.max_total_child_rss_kib else {
495 return Ok(());
496 };
497 let rss = self.refresh_total_child_rss();
498 if rss.unavailable > 0 {
499 return Ok(());
500 }
501 if rss.total_kib >= limit_kib {
502 self.memory_budget_rejections = self.memory_budget_rejections.saturating_add(1);
503 return Err(LeanWorkerError::WorkerPoolMemoryBudgetExceeded {
504 current_kib: rss.total_kib,
505 limit_kib,
506 });
507 }
508 Ok(())
509 }
510
511 fn refresh_total_child_rss(&mut self) -> PoolRssTotal {
512 let mut total_kib = 0_u64;
513 let mut unavailable = 0_u64;
514 for entry in &mut self.entries {
515 match entry.sample_rss() {
516 Some(value) => {
517 total_kib = total_kib.saturating_add(value);
518 }
519 None => {
520 unavailable = unavailable.saturating_add(1);
521 }
522 }
523 }
524 PoolRssTotal { total_kib, unavailable }
525 }
526
527 fn pool_full_error<T>(&mut self) -> Result<T, LeanWorkerError> {
528 if self.config.queue_wait_timeout.is_zero() {
529 return Err(LeanWorkerError::WorkerPoolExhausted {
530 max_workers: self.config.max_workers,
531 });
532 }
533 let started = Instant::now();
534 while started.elapsed() < self.config.queue_wait_timeout {
535 let remaining = self.config.queue_wait_timeout.saturating_sub(started.elapsed());
536 thread::sleep(remaining.min(Duration::from_millis(10)));
537 }
538 self.queue_timeouts = self.queue_timeouts.saturating_add(1);
539 Err(LeanWorkerError::WorkerPoolQueueTimeout {
540 waited: self.config.queue_wait_timeout,
541 })
542 }
543}
544
545impl Default for LeanWorkerPool {
546 fn default() -> Self {
547 Self::new(LeanWorkerPoolConfig::default())
548 }
549}
550
551#[derive(Debug)]
552struct PoolEntry {
553 key: LeanWorkerSessionKey,
554 builder: LeanWorkerCapabilityBuilder,
555 capability: LeanWorkerCapability,
556 base_request_timeout: Duration,
557 last_rss_kib: Option<u64>,
558 rss_samples_unavailable: u64,
559 last_activity: Instant,
560 last_restart_reason: Option<LeanWorkerRestartReason>,
561 policy_restarts: u64,
562 active_leases: u64,
563}
564
565impl PoolEntry {
566 fn sample_rss(&mut self) -> Option<u64> {
567 match self.capability.worker_mut().rss_kib() {
568 Some(value) => {
569 self.last_rss_kib = Some(value);
570 Some(value)
571 }
572 None => {
573 self.rss_samples_unavailable = self.rss_samples_unavailable.saturating_add(1);
574 None
575 }
576 }
577 }
578
579 fn enforce_policy(&mut self, config: &LeanWorkerPoolConfig) -> Result<Option<String>, LeanWorkerError> {
580 if let Some(limit_kib) = config.per_worker_rss_ceiling_kib {
581 match self.sample_rss() {
582 Some(current_kib) if current_kib >= limit_kib => {
583 let reason = LeanWorkerRestartReason::RssCeiling { current_kib, limit_kib };
584 self.cycle_for_policy(reason)?;
585 return Ok(Some(format!(
586 "memory policy cycled worker at {current_kib} KiB RSS with limit {limit_kib} KiB"
587 )));
588 }
589 Some(_) | None => {}
590 }
591 }
592
593 if let Some(limit) = config.idle_cycle_after {
594 let idle_for = self.last_activity.elapsed();
595 if idle_for >= limit {
596 let reason = LeanWorkerRestartReason::Idle { idle_for, limit };
597 self.cycle_for_policy(reason)?;
598 return Ok(Some(format!(
599 "idle policy cycled worker after {idle_for:?} idle with limit {limit:?}"
600 )));
601 }
602 }
603
604 Ok(None)
605 }
606
607 fn cycle_for_policy(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
608 self.capability.worker_mut().cycle_with_restart_reason(reason.clone())?;
609 self.last_restart_reason = Some(reason);
610 self.last_activity = Instant::now();
611 self.last_rss_kib = None;
612 self.policy_restarts = self.policy_restarts.saturating_add(1);
613 Ok(())
614 }
615}
616
617#[derive(Clone, Copy, Debug, Eq, PartialEq)]
618struct PoolRssTotal {
619 total_kib: u64,
620 unavailable: u64,
621}
622
623#[derive(Debug)]
629pub struct LeanWorkerSessionLease<'pool> {
630 entry: &'pool mut PoolEntry,
631 config: LeanWorkerPoolConfig,
632 valid: bool,
633 invalidation_reason: Option<String>,
634 request_timeout_override: Option<Duration>,
635}
636
637impl LeanWorkerSessionLease<'_> {
638 #[must_use]
640 pub fn session_key(&self) -> &LeanWorkerSessionKey {
641 &self.entry.key
642 }
643
644 #[must_use]
646 pub fn runtime_metadata(&self) -> LeanWorkerRuntimeMetadata {
647 self.entry.capability.runtime_metadata()
648 }
649
650 #[must_use]
652 pub fn is_valid(&self) -> bool {
653 self.valid
654 }
655
656 #[must_use]
663 pub fn snapshot(&self) -> LeanWorkerPoolSnapshot {
664 LeanWorkerPool::snapshot_from_lease_config(&self.config, self.entry)
665 }
666
667 pub fn cycle(&mut self) -> Result<(), LeanWorkerError> {
677 self.ensure_valid()?;
678 self.entry.capability.worker_mut().cycle()?;
679 self.invalidate("explicit worker cycle");
680 Ok(())
681 }
682
683 pub fn set_request_timeout(&mut self, timeout: Duration) -> Result<(), LeanWorkerError> {
693 self.ensure_valid()?;
694 self.request_timeout_override = Some(timeout);
695 Ok(())
696 }
697
698 pub fn run_json_command<Req, Resp>(
706 &mut self,
707 command: &LeanWorkerJsonCommand<Req, Resp>,
708 request: &Req,
709 cancellation: Option<&LeanWorkerCancellationToken>,
710 progress: Option<&dyn LeanWorkerProgressSink>,
711 ) -> Result<Resp, LeanWorkerError>
712 where
713 Req: Serialize,
714 Resp: DeserializeOwned,
715 {
716 self.ensure_valid()?;
717 self.enforce_policy_before_request()?;
718 let request_timeout = self.request_timeout_override;
719 let result = self
720 .entry
721 .capability
722 .open_session(cancellation, progress)
723 .and_then(|mut session| {
724 if let Some(timeout) = request_timeout {
725 session.set_request_timeout(timeout);
726 }
727 session.run_json_command(command, request, cancellation, progress)
728 });
729 self.map_lifecycle_result(result)
730 }
731
732 pub fn run_streaming_command<Req, Row, Summary>(
740 &mut self,
741 command: &LeanWorkerStreamingCommand<Req, Row, Summary>,
742 request: &Req,
743 rows: &dyn LeanWorkerTypedDataSink<Row>,
744 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
745 cancellation: Option<&LeanWorkerCancellationToken>,
746 progress: Option<&dyn LeanWorkerProgressSink>,
747 ) -> Result<LeanWorkerTypedStreamSummary<Summary>, LeanWorkerError>
748 where
749 Req: Serialize,
750 Row: DeserializeOwned,
751 Summary: DeserializeOwned,
752 {
753 self.ensure_valid()?;
754 self.enforce_policy_before_request()?;
755 let request_timeout = self.request_timeout_override;
756 let result = self
757 .entry
758 .capability
759 .open_session(cancellation, progress)
760 .and_then(|mut session| {
761 if let Some(timeout) = request_timeout {
762 session.set_request_timeout(timeout);
763 }
764 session.run_streaming_command(command, request, rows, diagnostics, cancellation, progress)
765 });
766 self.map_lifecycle_result(result)
767 }
768
769 fn ensure_valid(&self) -> Result<(), LeanWorkerError> {
770 if self.valid {
771 Ok(())
772 } else {
773 Err(LeanWorkerError::LeaseInvalidated {
774 reason: self
775 .invalidation_reason
776 .clone()
777 .unwrap_or_else(|| "lease was invalidated by a worker lifecycle transition".to_owned()),
778 })
779 }
780 }
781
782 fn enforce_policy_before_request(&mut self) -> Result<(), LeanWorkerError> {
783 if let Some(reason) = self.entry.enforce_policy(&self.config)? {
784 self.invalidate(reason.clone());
785 return Err(LeanWorkerError::LeaseInvalidated { reason });
786 }
787 Ok(())
788 }
789
790 fn map_lifecycle_result<T>(&mut self, result: Result<T, LeanWorkerError>) -> Result<T, LeanWorkerError> {
791 if self.request_timeout_override.is_some() {
792 self.entry
793 .capability
794 .worker_mut()
795 .set_request_timeout(self.entry.base_request_timeout);
796 }
797 match result {
798 Ok(value) => {
799 self.entry.last_activity = Instant::now();
800 Ok(value)
801 }
802 Err(err) => {
803 self.entry.last_activity = Instant::now();
804 if invalidates_lease(&err) {
805 self.invalidate(invalidation_reason(&err));
806 }
807 Err(err)
808 }
809 }
810 }
811
812 fn invalidate(&mut self, reason: impl Into<String>) {
813 self.valid = false;
814 self.invalidation_reason = Some(reason.into());
815 }
816}
817
818impl Drop for LeanWorkerSessionLease<'_> {
819 fn drop(&mut self) {
820 self.entry.active_leases = self.entry.active_leases.saturating_sub(1);
821 }
822}
823
824fn invalidates_lease(err: &LeanWorkerError) -> bool {
825 matches!(
826 err,
827 LeanWorkerError::Cancelled { .. }
828 | LeanWorkerError::Timeout { .. }
829 | LeanWorkerError::ChildExited { .. }
830 | LeanWorkerError::ChildPanicOrAbort { .. }
831 | LeanWorkerError::CapabilityMetadataMismatch { .. }
832 )
833}
834
835fn invalidation_reason(err: &LeanWorkerError) -> String {
836 if let LeanWorkerError::Cancelled { operation } = err {
837 format!("cancelled during {operation}")
838 } else if let LeanWorkerError::Timeout { operation, .. } = err {
839 format!("timed out during {operation}")
840 } else if matches!(err, LeanWorkerError::ChildExited { .. }) {
841 "worker child exited".to_owned()
842 } else if matches!(err, LeanWorkerError::ChildPanicOrAbort { .. }) {
843 "worker child exited fatally".to_owned()
844 } else if let LeanWorkerError::CapabilityMetadataMismatch { export, .. } = err {
845 format!("capability metadata mismatch from {export}")
846 } else {
847 "worker lifecycle transition".to_owned()
848 }
849}