1use std::path::PathBuf;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use serde::Serialize;
12use serde::de::DeserializeOwned;
13use serde_json::Value;
14
15use crate::capability::{LeanWorkerCapability, LeanWorkerCapabilityBuilder};
16use crate::session::{
17 LeanWorkerCancellationToken, LeanWorkerDiagnosticSink, LeanWorkerJsonCommand, LeanWorkerProgressSink,
18 LeanWorkerRuntimeMetadata, LeanWorkerStreamingCommand, LeanWorkerTypedDataSink, LeanWorkerTypedStreamSummary,
19};
20use crate::supervisor::{LeanWorkerError, LeanWorkerRestartReason, LeanWorkerStatus};
21
22#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
29#[non_exhaustive]
30pub enum LeanWorkerRestartPolicyClass {
31 Default,
32 Custom,
33}
34
35#[derive(Clone, Debug, Eq, PartialEq)]
42pub struct LeanWorkerSessionKey {
43 project_root: PathBuf,
44 package: String,
45 lib_name: String,
46 imports: Vec<String>,
47 metadata_expectation: Option<LeanWorkerMetadataExpectationKey>,
48 toolchain_fingerprint: lean_toolchain::ToolchainFingerprint,
49 restart_policy_class: LeanWorkerRestartPolicyClass,
50}
51
52impl LeanWorkerSessionKey {
53 #[must_use]
55 pub fn new(
56 project_root: impl Into<PathBuf>,
57 package: impl Into<String>,
58 lib_name: impl Into<String>,
59 imports: impl IntoIterator<Item = impl Into<String>>,
60 ) -> Self {
61 Self {
62 project_root: project_root.into(),
63 package: package.into(),
64 lib_name: lib_name.into(),
65 imports: imports.into_iter().map(Into::into).collect(),
66 metadata_expectation: None,
67 toolchain_fingerprint: lean_toolchain::ToolchainFingerprint::current(),
68 restart_policy_class: LeanWorkerRestartPolicyClass::Default,
69 }
70 }
71
72 #[must_use]
78 pub fn metadata_expectation(
79 mut self,
80 export: impl Into<String>,
81 request: Value,
82 expected: Option<crate::session::LeanWorkerCapabilityMetadata>,
83 ) -> Self {
84 self.metadata_expectation = Some(LeanWorkerMetadataExpectationKey {
85 export: export.into(),
86 request,
87 expected,
88 });
89 self
90 }
91
92 #[must_use]
94 pub fn restart_policy_class(mut self, class: LeanWorkerRestartPolicyClass) -> Self {
95 self.restart_policy_class = class;
96 self
97 }
98
99 #[must_use]
101 pub fn project_root(&self) -> &std::path::Path {
102 &self.project_root
103 }
104
105 #[must_use]
107 pub fn package(&self) -> &str {
108 &self.package
109 }
110
111 #[must_use]
113 pub fn lib_name(&self) -> &str {
114 &self.lib_name
115 }
116
117 #[must_use]
119 pub fn imports(&self) -> &[String] {
120 &self.imports
121 }
122
123 #[must_use]
125 pub fn toolchain_fingerprint(&self) -> &lean_toolchain::ToolchainFingerprint {
126 &self.toolchain_fingerprint
127 }
128
129 #[must_use]
131 pub fn policy_class(&self) -> LeanWorkerRestartPolicyClass {
132 self.restart_policy_class
133 }
134}
135
136#[derive(Clone, Debug, Eq, PartialEq)]
137struct LeanWorkerMetadataExpectationKey {
138 export: String,
139 request: Value,
140 expected: Option<crate::session::LeanWorkerCapabilityMetadata>,
141}
142
143#[derive(Clone, Debug, Eq, PartialEq)]
145pub struct LeanWorkerPoolConfig {
146 max_workers: usize,
147 max_total_child_rss_kib: Option<u64>,
148 per_worker_rss_ceiling_kib: Option<u64>,
149 idle_cycle_after: Option<Duration>,
150 queue_wait_timeout: Duration,
151}
152
153impl LeanWorkerPoolConfig {
154 #[must_use]
156 pub fn new(max_workers: usize) -> Self {
157 Self {
158 max_workers: max_workers.max(1),
159 max_total_child_rss_kib: None,
160 per_worker_rss_ceiling_kib: None,
161 idle_cycle_after: None,
162 queue_wait_timeout: Duration::ZERO,
163 }
164 }
165
166 #[must_use]
168 pub fn max_workers(&self) -> usize {
169 self.max_workers
170 }
171
172 #[must_use]
178 pub fn max_total_child_rss_kib(mut self, limit: u64) -> Self {
179 self.max_total_child_rss_kib = Some(limit.max(1));
180 self
181 }
182
183 #[must_use]
185 pub fn per_worker_rss_ceiling_kib(mut self, limit: u64) -> Self {
186 self.per_worker_rss_ceiling_kib = Some(limit.max(1));
187 self
188 }
189
190 #[must_use]
192 pub fn idle_cycle_after(mut self, limit: Duration) -> Self {
193 self.idle_cycle_after = Some(limit);
194 self
195 }
196
197 #[must_use]
202 pub fn queue_wait_timeout(mut self, timeout: Duration) -> Self {
203 self.queue_wait_timeout = timeout;
204 self
205 }
206
207 #[must_use]
209 pub fn max_total_child_rss_kib_limit(&self) -> Option<u64> {
210 self.max_total_child_rss_kib
211 }
212
213 #[must_use]
215 pub fn per_worker_rss_ceiling_kib_limit(&self) -> Option<u64> {
216 self.per_worker_rss_ceiling_kib
217 }
218
219 #[must_use]
221 pub fn idle_cycle_after_limit(&self) -> Option<Duration> {
222 self.idle_cycle_after
223 }
224
225 #[must_use]
227 pub fn queue_wait_timeout_limit(&self) -> Duration {
228 self.queue_wait_timeout
229 }
230}
231
232impl Default for LeanWorkerPoolConfig {
233 fn default() -> Self {
234 Self::new(1)
235 }
236}
237
238#[derive(Clone, Debug, Eq, PartialEq)]
243pub struct LeanWorkerPoolSnapshot {
244 pub max_workers: usize,
245 pub workers: usize,
246 pub active_workers: usize,
247 pub warm_leases: usize,
248 pub queue_depth: usize,
249 pub total_child_rss_kib: Option<u64>,
250 pub rss_samples_unavailable: u64,
251 pub requests: u64,
252 pub imports: u64,
253 pub worker_restarts: u64,
254 pub max_request_restarts: u64,
255 pub max_import_restarts: u64,
256 pub rss_restarts: u64,
257 pub idle_restarts: u64,
258 pub cancelled_restarts: u64,
259 pub timeout_restarts: u64,
260 pub policy_restarts: u64,
261 pub queue_timeouts: u64,
262 pub memory_budget_rejections: u64,
263 pub last_restart_reason: Option<LeanWorkerRestartReason>,
264 pub stream_requests: u64,
265 pub stream_successes: u64,
266 pub stream_failures: u64,
267 pub data_rows_delivered: u64,
268 pub data_row_payload_bytes: u64,
269 pub stream_elapsed: Duration,
270 pub backpressure_waits: u64,
271 pub backpressure_failures: u64,
272}
273
274#[derive(Debug)]
276pub struct LeanWorkerPool {
277 config: LeanWorkerPoolConfig,
278 entries: Vec<PoolEntry>,
279 queue_timeouts: u64,
280 memory_budget_rejections: u64,
281}
282
283impl LeanWorkerPool {
284 #[must_use]
286 pub fn new(config: LeanWorkerPoolConfig) -> Self {
287 Self {
288 config,
289 entries: Vec::new(),
290 queue_timeouts: 0,
291 memory_budget_rejections: 0,
292 }
293 }
294
295 pub fn acquire_lease(
308 &mut self,
309 builder: LeanWorkerCapabilityBuilder,
310 ) -> Result<LeanWorkerSessionLease<'_>, LeanWorkerError> {
311 let key = builder.session_key();
312 if let Some(index) = self.entries.iter().position(|entry| entry.key == key) {
313 self.ensure_entry_running(index)?;
314 self.enforce_entry_policy_before_assignment(index)?;
315 let entry = self.entries.get_mut(index).ok_or_else(|| LeanWorkerError::Protocol {
316 message: "worker pool entry disappeared during lease acquisition".to_owned(),
317 })?;
318 entry.active_leases = entry.active_leases.saturating_add(1);
319 return Ok(LeanWorkerSessionLease {
320 entry,
321 config: self.config.clone(),
322 valid: true,
323 invalidation_reason: None,
324 request_timeout_override: None,
325 });
326 }
327
328 if self.entries.len() >= self.config.max_workers {
329 return self.pool_full_error();
330 }
331 self.ensure_spawn_within_total_rss_budget()?;
332
333 let capability = builder.clone().open()?;
334 let base_request_timeout = builder.pool_request_timeout();
335 self.entries.push(PoolEntry {
336 key,
337 builder,
338 capability,
339 base_request_timeout,
340 last_rss_kib: None,
341 rss_samples_unavailable: 0,
342 last_activity: Instant::now(),
343 last_restart_reason: None,
344 policy_restarts: 0,
345 active_leases: 0,
346 });
347 let entry = self.entries.last_mut().ok_or_else(|| LeanWorkerError::Protocol {
348 message: "worker pool failed to retain newly opened entry".to_owned(),
349 })?;
350 let _ = entry.sample_rss();
351 entry.active_leases = entry.active_leases.saturating_add(1);
352 Ok(LeanWorkerSessionLease {
353 entry,
354 config: self.config.clone(),
355 valid: true,
356 invalidation_reason: None,
357 request_timeout_override: None,
358 })
359 }
360
361 #[must_use]
363 pub fn snapshot(&self) -> LeanWorkerPoolSnapshot {
364 snapshot_from_entries(
365 &self.config,
366 &self.entries,
367 self.queue_timeouts,
368 self.memory_budget_rejections,
369 )
370 }
371
372 fn snapshot_from_lease_config(config: &LeanWorkerPoolConfig, entry: &PoolEntry) -> LeanWorkerPoolSnapshot {
373 snapshot_from_entries(config, std::slice::from_ref(entry), 0, 0)
374 }
375}
376
377fn snapshot_from_entries(
378 config: &LeanWorkerPoolConfig,
379 entries: &[PoolEntry],
380 queue_timeouts: u64,
381 memory_budget_rejections: u64,
382) -> LeanWorkerPoolSnapshot {
383 LeanWorkerPoolSnapshot {
384 max_workers: config.max_workers,
385 workers: entries.len(),
386 active_workers: entries.iter().filter(|entry| entry.active_leases > 0).count(),
387 warm_leases: entries.iter().filter(|entry| entry.active_leases == 0).count(),
388 queue_depth: 0,
389 total_child_rss_kib: total_known_child_rss_kib(entries),
390 rss_samples_unavailable: entries.iter().map(|entry| entry.rss_samples_unavailable).sum(),
391 requests: entries
392 .iter()
393 .map(|entry| entry.capability.worker().stats().requests)
394 .sum(),
395 imports: entries
396 .iter()
397 .map(|entry| entry.capability.worker().stats().imports)
398 .sum(),
399 worker_restarts: entries
400 .iter()
401 .map(|entry| entry.capability.worker().stats().restarts)
402 .sum(),
403 max_request_restarts: entries
404 .iter()
405 .map(|entry| entry.capability.worker().stats().max_request_restarts)
406 .sum(),
407 max_import_restarts: entries
408 .iter()
409 .map(|entry| entry.capability.worker().stats().max_import_restarts)
410 .sum(),
411 rss_restarts: entries
412 .iter()
413 .map(|entry| entry.capability.worker().stats().rss_restarts)
414 .sum(),
415 idle_restarts: entries
416 .iter()
417 .map(|entry| entry.capability.worker().stats().idle_restarts)
418 .sum(),
419 cancelled_restarts: entries
420 .iter()
421 .map(|entry| entry.capability.worker().stats().cancelled_restarts)
422 .sum(),
423 timeout_restarts: entries
424 .iter()
425 .map(|entry| entry.capability.worker().stats().timeout_restarts)
426 .sum(),
427 policy_restarts: entries.iter().map(|entry| entry.policy_restarts).sum(),
428 queue_timeouts,
429 memory_budget_rejections,
430 last_restart_reason: entries.iter().rev().find_map(|entry| entry.last_restart_reason.clone()),
431 stream_requests: entries
432 .iter()
433 .map(|entry| entry.capability.worker().stats().stream_requests)
434 .sum(),
435 stream_successes: entries
436 .iter()
437 .map(|entry| entry.capability.worker().stats().stream_successes)
438 .sum(),
439 stream_failures: entries
440 .iter()
441 .map(|entry| entry.capability.worker().stats().stream_failures)
442 .sum(),
443 data_rows_delivered: entries
444 .iter()
445 .map(|entry| entry.capability.worker().stats().data_rows_delivered)
446 .sum(),
447 data_row_payload_bytes: entries
448 .iter()
449 .map(|entry| entry.capability.worker().stats().data_row_payload_bytes)
450 .sum(),
451 stream_elapsed: entries.iter().fold(Duration::ZERO, |acc, entry| {
452 acc.saturating_add(entry.capability.worker().stats().stream_elapsed)
453 }),
454 backpressure_waits: entries
455 .iter()
456 .map(|entry| entry.capability.worker().stats().backpressure_waits)
457 .sum(),
458 backpressure_failures: entries
459 .iter()
460 .map(|entry| entry.capability.worker().stats().backpressure_failures)
461 .sum(),
462 }
463}
464
465fn total_known_child_rss_kib(entries: &[PoolEntry]) -> Option<u64> {
466 entries
467 .iter()
468 .map(|entry| entry.last_rss_kib)
469 .try_fold(0_u64, |acc, value| value.map(|rss| acc.saturating_add(rss)))
470}
471
472impl LeanWorkerPool {
473 fn ensure_entry_running(&mut self, index: usize) -> Result<(), LeanWorkerError> {
474 let entry = self.entries.get_mut(index).ok_or_else(|| LeanWorkerError::Protocol {
475 message: "worker pool entry disappeared during liveness check".to_owned(),
476 })?;
477 match entry.capability.worker_mut().status()? {
478 LeanWorkerStatus::Running => Ok(()),
479 LeanWorkerStatus::Exited(_exit) => {
480 entry.capability = entry.builder.clone().open()?;
481 entry.last_activity = Instant::now();
482 Ok(())
483 }
484 }
485 }
486
487 fn enforce_entry_policy_before_assignment(&mut self, index: usize) -> Result<(), LeanWorkerError> {
488 let entry = self.entries.get_mut(index).ok_or_else(|| LeanWorkerError::Protocol {
489 message: "worker pool entry disappeared during policy check".to_owned(),
490 })?;
491 entry.enforce_policy(&self.config).map(|_| ())
492 }
493
494 fn ensure_spawn_within_total_rss_budget(&mut self) -> Result<(), LeanWorkerError> {
495 let Some(limit_kib) = self.config.max_total_child_rss_kib else {
496 return Ok(());
497 };
498 let rss = self.refresh_total_child_rss();
499 if rss.unavailable > 0 {
500 return Ok(());
501 }
502 if rss.total_kib >= limit_kib {
503 self.memory_budget_rejections = self.memory_budget_rejections.saturating_add(1);
504 return Err(LeanWorkerError::WorkerPoolMemoryBudgetExceeded {
505 current_kib: rss.total_kib,
506 limit_kib,
507 });
508 }
509 Ok(())
510 }
511
512 fn refresh_total_child_rss(&mut self) -> PoolRssTotal {
513 let mut total_kib = 0_u64;
514 let mut unavailable = 0_u64;
515 for entry in &mut self.entries {
516 match entry.sample_rss() {
517 Some(value) => {
518 total_kib = total_kib.saturating_add(value);
519 }
520 None => {
521 unavailable = unavailable.saturating_add(1);
522 }
523 }
524 }
525 PoolRssTotal { total_kib, unavailable }
526 }
527
528 fn pool_full_error<T>(&mut self) -> Result<T, LeanWorkerError> {
529 if self.config.queue_wait_timeout.is_zero() {
530 return Err(LeanWorkerError::WorkerPoolExhausted {
531 max_workers: self.config.max_workers,
532 });
533 }
534 let started = Instant::now();
535 while started.elapsed() < self.config.queue_wait_timeout {
536 let remaining = self.config.queue_wait_timeout.saturating_sub(started.elapsed());
537 thread::sleep(remaining.min(Duration::from_millis(10)));
538 }
539 self.queue_timeouts = self.queue_timeouts.saturating_add(1);
540 Err(LeanWorkerError::WorkerPoolQueueTimeout {
541 waited: self.config.queue_wait_timeout,
542 })
543 }
544}
545
546impl Default for LeanWorkerPool {
547 fn default() -> Self {
548 Self::new(LeanWorkerPoolConfig::default())
549 }
550}
551
552#[derive(Debug)]
553struct PoolEntry {
554 key: LeanWorkerSessionKey,
555 builder: LeanWorkerCapabilityBuilder,
556 capability: LeanWorkerCapability,
557 base_request_timeout: Duration,
558 last_rss_kib: Option<u64>,
559 rss_samples_unavailable: u64,
560 last_activity: Instant,
561 last_restart_reason: Option<LeanWorkerRestartReason>,
562 policy_restarts: u64,
563 active_leases: u64,
564}
565
566impl PoolEntry {
567 fn sample_rss(&mut self) -> Option<u64> {
568 match self.capability.worker_mut().rss_kib() {
569 Some(value) => {
570 self.last_rss_kib = Some(value);
571 Some(value)
572 }
573 None => {
574 self.rss_samples_unavailable = self.rss_samples_unavailable.saturating_add(1);
575 None
576 }
577 }
578 }
579
580 fn enforce_policy(&mut self, config: &LeanWorkerPoolConfig) -> Result<Option<String>, LeanWorkerError> {
581 if let Some(limit_kib) = config.per_worker_rss_ceiling_kib {
582 match self.sample_rss() {
583 Some(current_kib) if current_kib >= limit_kib => {
584 let reason = LeanWorkerRestartReason::RssCeiling { current_kib, limit_kib };
585 self.cycle_for_policy(reason)?;
586 return Ok(Some(format!(
587 "memory policy cycled worker at {current_kib} KiB RSS with limit {limit_kib} KiB"
588 )));
589 }
590 Some(_) | None => {}
591 }
592 }
593
594 if let Some(limit) = config.idle_cycle_after {
595 let idle_for = self.last_activity.elapsed();
596 if idle_for >= limit {
597 let reason = LeanWorkerRestartReason::Idle { idle_for, limit };
598 self.cycle_for_policy(reason)?;
599 return Ok(Some(format!(
600 "idle policy cycled worker after {idle_for:?} idle with limit {limit:?}"
601 )));
602 }
603 }
604
605 Ok(None)
606 }
607
608 fn cycle_for_policy(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
609 self.capability.worker_mut().cycle_with_restart_reason(reason.clone())?;
610 self.last_restart_reason = Some(reason);
611 self.last_activity = Instant::now();
612 self.last_rss_kib = None;
613 self.policy_restarts = self.policy_restarts.saturating_add(1);
614 Ok(())
615 }
616}
617
618#[derive(Clone, Copy, Debug, Eq, PartialEq)]
619struct PoolRssTotal {
620 total_kib: u64,
621 unavailable: u64,
622}
623
624#[derive(Debug)]
630pub struct LeanWorkerSessionLease<'pool> {
631 entry: &'pool mut PoolEntry,
632 config: LeanWorkerPoolConfig,
633 valid: bool,
634 invalidation_reason: Option<String>,
635 request_timeout_override: Option<Duration>,
636}
637
638impl LeanWorkerSessionLease<'_> {
639 #[must_use]
641 pub fn session_key(&self) -> &LeanWorkerSessionKey {
642 &self.entry.key
643 }
644
645 #[must_use]
647 pub fn runtime_metadata(&self) -> LeanWorkerRuntimeMetadata {
648 self.entry.capability.runtime_metadata()
649 }
650
651 #[must_use]
653 pub fn is_valid(&self) -> bool {
654 self.valid
655 }
656
657 #[must_use]
664 pub fn snapshot(&self) -> LeanWorkerPoolSnapshot {
665 LeanWorkerPool::snapshot_from_lease_config(&self.config, self.entry)
666 }
667
668 pub fn cycle(&mut self) -> Result<(), LeanWorkerError> {
678 self.ensure_valid()?;
679 self.entry.capability.worker_mut().cycle()?;
680 self.invalidate("explicit worker cycle");
681 Ok(())
682 }
683
684 pub fn set_request_timeout(&mut self, timeout: Duration) -> Result<(), LeanWorkerError> {
694 self.ensure_valid()?;
695 self.request_timeout_override = Some(timeout);
696 Ok(())
697 }
698
699 pub fn run_json_command<Req, Resp>(
707 &mut self,
708 command: &LeanWorkerJsonCommand<Req, Resp>,
709 request: &Req,
710 cancellation: Option<&LeanWorkerCancellationToken>,
711 progress: Option<&dyn LeanWorkerProgressSink>,
712 ) -> Result<Resp, LeanWorkerError>
713 where
714 Req: Serialize,
715 Resp: DeserializeOwned,
716 {
717 self.ensure_valid()?;
718 self.enforce_policy_before_request()?;
719 let request_timeout = self.request_timeout_override;
720 let result = self
721 .entry
722 .capability
723 .open_session(cancellation, progress)
724 .and_then(|mut session| {
725 if let Some(timeout) = request_timeout {
726 session.set_request_timeout(timeout);
727 }
728 session.run_json_command(command, request, cancellation, progress)
729 });
730 self.map_lifecycle_result(result)
731 }
732
733 pub fn run_streaming_command<Req, Row, Summary>(
741 &mut self,
742 command: &LeanWorkerStreamingCommand<Req, Row, Summary>,
743 request: &Req,
744 rows: &dyn LeanWorkerTypedDataSink<Row>,
745 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
746 cancellation: Option<&LeanWorkerCancellationToken>,
747 progress: Option<&dyn LeanWorkerProgressSink>,
748 ) -> Result<LeanWorkerTypedStreamSummary<Summary>, LeanWorkerError>
749 where
750 Req: Serialize,
751 Row: DeserializeOwned,
752 Summary: DeserializeOwned,
753 {
754 self.ensure_valid()?;
755 self.enforce_policy_before_request()?;
756 let request_timeout = self.request_timeout_override;
757 let result = self
758 .entry
759 .capability
760 .open_session(cancellation, progress)
761 .and_then(|mut session| {
762 if let Some(timeout) = request_timeout {
763 session.set_request_timeout(timeout);
764 }
765 session.run_streaming_command(command, request, rows, diagnostics, cancellation, progress)
766 });
767 self.map_lifecycle_result(result)
768 }
769
770 fn ensure_valid(&self) -> Result<(), LeanWorkerError> {
771 if self.valid {
772 Ok(())
773 } else {
774 Err(LeanWorkerError::LeaseInvalidated {
775 reason: self
776 .invalidation_reason
777 .clone()
778 .unwrap_or_else(|| "lease was invalidated by a worker lifecycle transition".to_owned()),
779 })
780 }
781 }
782
783 fn enforce_policy_before_request(&mut self) -> Result<(), LeanWorkerError> {
784 if let Some(reason) = self.entry.enforce_policy(&self.config)? {
785 self.invalidate(reason.clone());
786 return Err(LeanWorkerError::LeaseInvalidated { reason });
787 }
788 Ok(())
789 }
790
791 fn map_lifecycle_result<T>(&mut self, result: Result<T, LeanWorkerError>) -> Result<T, LeanWorkerError> {
792 if self.request_timeout_override.is_some() {
793 self.entry
794 .capability
795 .worker_mut()
796 .set_request_timeout(self.entry.base_request_timeout);
797 }
798 match result {
799 Ok(value) => {
800 self.entry.last_activity = Instant::now();
801 Ok(value)
802 }
803 Err(err) => {
804 self.entry.last_activity = Instant::now();
805 if invalidates_lease(&err) {
806 self.invalidate(invalidation_reason(&err));
807 }
808 Err(err)
809 }
810 }
811 }
812
813 fn invalidate(&mut self, reason: impl Into<String>) {
814 self.valid = false;
815 self.invalidation_reason = Some(reason.into());
816 }
817}
818
819impl Drop for LeanWorkerSessionLease<'_> {
820 fn drop(&mut self) {
821 self.entry.active_leases = self.entry.active_leases.saturating_sub(1);
822 }
823}
824
825fn invalidates_lease(err: &LeanWorkerError) -> bool {
826 matches!(
827 err,
828 LeanWorkerError::Cancelled { .. }
829 | LeanWorkerError::Timeout { .. }
830 | LeanWorkerError::ChildExited { .. }
831 | LeanWorkerError::ChildPanicOrAbort { .. }
832 | LeanWorkerError::CapabilityMetadataMismatch { .. }
833 )
834}
835
836fn invalidation_reason(err: &LeanWorkerError) -> String {
837 if let LeanWorkerError::Cancelled { operation } = err {
838 format!("cancelled during {operation}")
839 } else if let LeanWorkerError::Timeout { operation, .. } = err {
840 format!("timed out during {operation}")
841 } else if matches!(err, LeanWorkerError::ChildExited { .. }) {
842 "worker child exited".to_owned()
843 } else if matches!(err, LeanWorkerError::ChildPanicOrAbort { .. }) {
844 "worker child exited fatally".to_owned()
845 } else if let LeanWorkerError::CapabilityMetadataMismatch { export, .. } = err {
846 format!("capability metadata mismatch from {export}")
847 } else {
848 "worker lifecycle transition".to_owned()
849 }
850}