1use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7use tokio::spawn;
8use tokio::sync::{Semaphore, mpsc};
9use tokio::time::{sleep, timeout};
10use tokio_util::sync::CancellationToken;
11use tracing::{error, info, warn};
12
13#[cfg(feature = "prometheus")]
14use ironflow_core::metric_names::{WORKER_ACTIVE, WORKER_POLLS_TOTAL};
15use ironflow_core::provider::AgentProvider;
16use ironflow_engine::engine::Engine;
17use ironflow_engine::handler::WorkflowHandler;
18use ironflow_store::entities::RunStatus;
19use ironflow_store::store::RunStore;
20#[cfg(feature = "prometheus")]
21use metrics::{counter, gauge};
22#[cfg(feature = "heartbeat")]
23use reqwest::Client;
24
25use crate::api_store::ApiRunStore;
26use crate::error::WorkerError;
27
28const DEFAULT_CONCURRENCY: usize = 2;
29const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(2);
30const DEFAULT_RUN_TIMEOUT: Duration = Duration::from_secs(30 * 60);
31const DEFAULT_MAX_CONSECUTIVE_PANICS: u32 = 3;
32const DEFAULT_PANIC_COOLDOWN: Duration = Duration::from_secs(5 * 60);
33#[cfg(feature = "heartbeat")]
34const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
35
36pub struct WorkerBuilder {
60 api_url: String,
61 worker_token: String,
62 provider: Option<Arc<dyn AgentProvider>>,
63 handlers: Vec<Box<dyn WorkflowHandler>>,
64 concurrency: usize,
65 poll_interval: Duration,
66 run_timeout: Duration,
67 max_consecutive_panics: u32,
68 panic_cooldown: Duration,
69 #[cfg(feature = "heartbeat")]
70 heartbeat_url: Option<String>,
71 #[cfg(feature = "heartbeat")]
72 heartbeat_interval: Duration,
73}
74
75impl WorkerBuilder {
76 pub fn new(api_url: &str, worker_token: &str) -> Self {
78 Self {
79 api_url: api_url.to_string(),
80 worker_token: worker_token.to_string(),
81 provider: None,
82 handlers: Vec::new(),
83 concurrency: DEFAULT_CONCURRENCY,
84 poll_interval: DEFAULT_POLL_INTERVAL,
85 run_timeout: DEFAULT_RUN_TIMEOUT,
86 max_consecutive_panics: DEFAULT_MAX_CONSECUTIVE_PANICS,
87 panic_cooldown: DEFAULT_PANIC_COOLDOWN,
88 #[cfg(feature = "heartbeat")]
89 heartbeat_url: None,
90 #[cfg(feature = "heartbeat")]
91 heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
92 }
93 }
94
95 pub fn provider(mut self, provider: Arc<dyn AgentProvider>) -> Self {
97 self.provider = Some(provider);
98 self
99 }
100
101 pub fn register(mut self, handler: impl WorkflowHandler + 'static) -> Self {
103 self.handlers.push(Box::new(handler));
104 self
105 }
106
107 pub fn concurrency(mut self, n: usize) -> Self {
109 self.concurrency = n;
110 self
111 }
112
113 pub fn poll_interval(mut self, interval: Duration) -> Self {
115 self.poll_interval = interval;
116 self
117 }
118
119 pub fn run_timeout(mut self, timeout: Duration) -> Self {
136 self.run_timeout = timeout;
137 self
138 }
139
140 pub fn max_consecutive_panics(mut self, n: u32) -> Self {
158 self.max_consecutive_panics = n;
159 self
160 }
161
162 pub fn panic_cooldown(mut self, cooldown: Duration) -> Self {
179 self.panic_cooldown = cooldown;
180 self
181 }
182
183 #[cfg(feature = "heartbeat")]
202 pub fn heartbeat_url(mut self, url: &str) -> Self {
203 self.heartbeat_url = Some(url.to_string());
204 self
205 }
206
207 #[cfg(feature = "heartbeat")]
225 pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
226 self.heartbeat_interval = interval;
227 self
228 }
229
230 pub fn build(self) -> Result<Worker, WorkerError> {
237 let provider = self
238 .provider
239 .ok_or_else(|| WorkerError::Internal("WorkerBuilder: provider is required".into()))?;
240
241 let store: Arc<dyn RunStore> =
242 Arc::new(ApiRunStore::new(&self.api_url, &self.worker_token));
243
244 let mut engine = Engine::new(store, provider);
245 for handler in self.handlers {
246 engine
247 .register_boxed(handler)
248 .map_err(WorkerError::Engine)?;
249 }
250
251 #[cfg(feature = "heartbeat")]
252 let heartbeat_client = Client::builder()
253 .timeout(Duration::from_secs(5))
254 .build()
255 .expect("failed to build heartbeat HTTP client");
256
257 Ok(Worker {
258 engine: Arc::new(engine),
259 concurrency: self.concurrency,
260 poll_interval: self.poll_interval,
261 run_timeout: self.run_timeout,
262 max_consecutive_panics: self.max_consecutive_panics,
263 panic_cooldown: self.panic_cooldown,
264 #[cfg(feature = "heartbeat")]
265 heartbeat_url: self.heartbeat_url,
266 #[cfg(feature = "heartbeat")]
267 heartbeat_interval: self.heartbeat_interval,
268 #[cfg(feature = "heartbeat")]
269 heartbeat_client,
270 })
271 }
272}
273
274pub struct Worker {
276 engine: Arc<Engine>,
277 concurrency: usize,
278 poll_interval: Duration,
279 run_timeout: Duration,
280 max_consecutive_panics: u32,
281 panic_cooldown: Duration,
282 #[cfg(feature = "heartbeat")]
283 heartbeat_url: Option<String>,
284 #[cfg(feature = "heartbeat")]
285 heartbeat_interval: Duration,
286 #[cfg(feature = "heartbeat")]
287 heartbeat_client: Client,
288}
289
290struct PoisonPillTracker {
292 max_consecutive: u32,
293 cooldown: Duration,
294 state: HashMap<String, (u32, Instant)>,
296}
297
298impl PoisonPillTracker {
299 fn new(max_consecutive: u32, cooldown: Duration) -> Self {
300 Self {
301 max_consecutive,
302 cooldown,
303 state: HashMap::new(),
304 }
305 }
306
307 fn record_panic(&mut self, workflow: &str) -> bool {
310 let entry = self
311 .state
312 .entry(workflow.to_string())
313 .or_insert((0, Instant::now()));
314 entry.0 += 1;
315 entry.1 = Instant::now();
316 entry.0 >= self.max_consecutive
317 }
318
319 fn record_success(&mut self, workflow: &str) {
321 self.state.remove(workflow);
322 }
323
324 fn is_blocked(&self, workflow: &str) -> bool {
326 self.state.get(workflow).is_some_and(|(count, last_panic)| {
327 *count >= self.max_consecutive && last_panic.elapsed() < self.cooldown
328 })
329 }
330}
331
332impl Worker {
333 pub async fn run(&self) -> Result<(), WorkerError> {
342 let semaphore = Arc::new(Semaphore::new(self.concurrency));
343 let shutdown = CancellationToken::new();
344 let mut idle_streak = 0u32;
345 let poison_tracker = Arc::new(Mutex::new(PoisonPillTracker::new(
346 self.max_consecutive_panics,
347 self.panic_cooldown,
348 )));
349 let (outcome_tx, mut outcome_rx) = mpsc::unbounded_channel::<RunOutcome>();
350
351 info!(
352 concurrency = self.concurrency,
353 poll_interval_ms = self.poll_interval.as_millis() as u64,
354 run_timeout_secs = self.run_timeout.as_secs(),
355 "worker started"
356 );
357
358 let shutdown_clone = shutdown.clone();
360 spawn(async move {
361 shutdown_signal().await;
362 info!("shutdown signal received, draining in-flight runs...");
363 shutdown_clone.cancel();
364 });
365
366 #[cfg(feature = "heartbeat")]
367 if let Some(ref url) = self.heartbeat_url {
368 let interval = self.heartbeat_interval;
369 let url = url.clone();
370 let client = self.heartbeat_client.clone();
371
372 spawn(async move {
373 let mut ticker = tokio::time::interval(interval);
374 ticker.tick().await;
376 loop {
377 ticker.tick().await;
378 match client.head(&url).send().await {
379 Ok(resp) if resp.status().is_success() => {
380 info!(url = %url, "heartbeat sent");
381 }
382 Ok(resp) => {
383 warn!(
384 url = %url,
385 status = %resp.status(),
386 "heartbeat ping returned non-success status"
387 );
388 }
389 Err(err) => {
390 warn!(
391 url = %url,
392 error = %err,
393 "heartbeat ping failed"
394 );
395 }
396 }
397 }
398 });
399 }
400
401 while !shutdown.is_cancelled() {
402 while let Ok(outcome) = outcome_rx.try_recv() {
404 let mut tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
405 match outcome {
406 RunOutcome::Success(ref wf) => tracker.record_success(wf),
407 RunOutcome::Failed(ref wf) | RunOutcome::Timeout(ref wf) => {
408 if tracker.record_panic(wf) {
409 warn!(workflow = %wf, "workflow flagged as poison pill after consecutive failures");
410 }
411 }
412 RunOutcome::Panicked(ref wf) => {
413 if tracker.record_panic(wf) {
414 error!(workflow = %wf, "workflow flagged as poison pill after consecutive panics");
415 }
416 }
417 }
418 }
419
420 let run = self.engine.store().pick_next_pending().await;
421
422 match run {
423 Ok(Some(run)) => {
424 #[cfg(feature = "prometheus")]
425 counter!(WORKER_POLLS_TOTAL, "result" => "hit").increment(1);
426
427 let is_blocked = {
429 let tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
430 tracker.is_blocked(&run.workflow_name)
431 };
432 if is_blocked {
433 warn!(
434 workflow = %run.workflow_name,
435 run_id = %run.id,
436 "skipping run: workflow flagged as poison pill, marking as failed"
437 );
438 if let Err(e) = self
439 .engine
440 .store()
441 .update_run_status(run.id, RunStatus::Failed)
442 .await
443 {
444 error!(run_id = %run.id, error = %e, "failed to mark poisoned run as failed");
445 }
446 continue;
447 }
448
449 let permit = semaphore
450 .clone()
451 .acquire_owned()
452 .await
453 .map_err(|_| WorkerError::Internal("semaphore closed".to_string()))?;
454
455 idle_streak = 0;
456 let engine = self.engine.clone();
457 let run_id = run.id;
458 let workflow = run.workflow_name.clone();
459 let workflow_for_watcher = workflow.clone();
460 let run_timeout = self.run_timeout;
461
462 info!(run_id = %run_id, workflow = %workflow, "executing run");
463
464 #[cfg(feature = "prometheus")]
465 gauge!(WORKER_ACTIVE).increment(1.0);
466
467 let handle = spawn(async move {
468 let _permit = permit;
469 let result = timeout(run_timeout, engine.execute_handler_run(run_id)).await;
470
471 match result {
472 Ok(Ok(_)) => {
473 info!(run_id = %run_id, workflow = %workflow, "run completed");
474 RunOutcome::Success(workflow)
475 }
476 Ok(Err(e)) => {
477 error!(run_id = %run_id, workflow = %workflow, error = %e, "run failed");
478 RunOutcome::Failed(workflow)
479 }
480 Err(_) => {
481 error!(
482 run_id = %run_id,
483 workflow = %workflow,
484 timeout_secs = run_timeout.as_secs(),
485 "run timed out"
486 );
487 if let Err(e) = engine
488 .store()
489 .update_run_status(run_id, RunStatus::Failed)
490 .await
491 {
492 error!(run_id = %run_id, error = %e, "failed to mark timed-out run as failed");
493 }
494 RunOutcome::Timeout(workflow)
495 }
496 }
497 });
498
499 let store = self.engine.store().clone();
501 let tx = outcome_tx.clone();
502 spawn(async move {
503 match handle.await {
504 Ok(outcome) => {
505 let _ = tx.send(outcome);
506 }
507 Err(e) => {
508 error!(run_id = %run_id, "spawned task panicked: {e}");
509 if let Err(store_err) =
510 store.update_run_status(run_id, RunStatus::Failed).await
511 {
512 error!(run_id = %run_id, error = %store_err, "failed to mark panicked run as failed");
513 }
514 let _ = tx.send(RunOutcome::Panicked(workflow_for_watcher));
515 }
516 }
517 #[cfg(feature = "prometheus")]
518 gauge!(WORKER_ACTIVE).decrement(1.0);
519 });
520 }
521 Ok(None) => {
522 #[cfg(feature = "prometheus")]
523 counter!(WORKER_POLLS_TOTAL, "result" => "miss").increment(1);
524
525 idle_streak += 1;
526 let backoff = if idle_streak > 10 {
527 self.poll_interval * 3
528 } else if idle_streak > 5 {
529 self.poll_interval * 2
530 } else {
531 self.poll_interval
532 };
533 sleep(backoff).await;
534 }
535 Err(e) => {
536 warn!(error = %e, "poll error");
537 sleep(self.poll_interval).await;
538 }
539 }
540 }
541
542 info!(
544 in_flight = self.concurrency - semaphore.available_permits(),
545 "waiting for in-flight runs to complete..."
546 );
547 let _ = semaphore
548 .acquire_many(self.concurrency as u32)
549 .await
550 .map_err(|_| WorkerError::Shutdown("semaphore closed during drain".to_string()))?;
551
552 info!("all in-flight runs completed, worker shut down");
553 Ok(())
554 }
555}
556
557enum RunOutcome {
559 Success(String),
561 Failed(String),
563 Timeout(String),
565 Panicked(String),
567}
568
569async fn shutdown_signal() {
571 use tokio::signal;
572
573 let ctrl_c = async {
574 signal::ctrl_c()
575 .await
576 .expect("failed to install Ctrl+C handler");
577 };
578
579 #[cfg(unix)]
580 let terminate = async {
581 use tokio::signal::unix::{SignalKind, signal};
582
583 signal(SignalKind::terminate())
584 .expect("failed to install SIGTERM handler")
585 .recv()
586 .await;
587 };
588
589 #[cfg(not(unix))]
590 let terminate = {
591 use std::future::pending;
592 pending::<()>()
593 };
594
595 tokio::select! {
596 () = ctrl_c => {},
597 () = terminate => {},
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use super::*;
604 use ironflow_core::providers::claude::ClaudeCodeProvider;
605
606 #[test]
607 fn builder_new_creates_default_config() {
608 let builder = WorkerBuilder::new("http://localhost:3000", "my-token");
609 assert_eq!(builder.api_url, "http://localhost:3000");
610 assert_eq!(builder.worker_token, "my-token");
611 assert_eq!(builder.concurrency, DEFAULT_CONCURRENCY);
612 assert_eq!(builder.poll_interval, DEFAULT_POLL_INTERVAL);
613 assert_eq!(builder.run_timeout, DEFAULT_RUN_TIMEOUT);
614 assert_eq!(
615 builder.max_consecutive_panics,
616 DEFAULT_MAX_CONSECUTIVE_PANICS
617 );
618 assert_eq!(builder.panic_cooldown, DEFAULT_PANIC_COOLDOWN);
619 assert!(builder.provider.is_none());
620 }
621
622 #[test]
623 fn builder_with_trailing_slash_normalized() {
624 let builder = WorkerBuilder::new("http://localhost:3000/", "token");
625 assert_eq!(builder.api_url, "http://localhost:3000/");
626 }
627
628 #[test]
629 fn builder_provider_sets_provider() {
630 let provider = Arc::new(ClaudeCodeProvider::new());
631 let builder =
632 WorkerBuilder::new("http://localhost:3000", "token").provider(provider.clone());
633 assert!(builder.provider.is_some());
634 }
635
636 #[test]
637 fn builder_concurrency_sets_concurrency() {
638 let builder = WorkerBuilder::new("http://localhost:3000", "token").concurrency(8);
639 assert_eq!(builder.concurrency, 8);
640 }
641
642 #[test]
643 fn builder_concurrency_zero_accepted() {
644 let provider = Arc::new(ClaudeCodeProvider::new());
645 let builder = WorkerBuilder::new("http://localhost:3000", "token")
646 .provider(provider)
647 .concurrency(0);
648 assert_eq!(builder.concurrency, 0);
649 }
650
651 #[test]
652 fn builder_poll_interval_sets_interval() {
653 let interval = Duration::from_secs(5);
654 let builder = WorkerBuilder::new("http://localhost:3000", "token").poll_interval(interval);
655 assert_eq!(builder.poll_interval, interval);
656 }
657
658 #[test]
659 fn builder_run_timeout_sets_timeout() {
660 let dur = Duration::from_secs(120);
661 let builder = WorkerBuilder::new("http://localhost:3000", "token").run_timeout(dur);
662 assert_eq!(builder.run_timeout, dur);
663 }
664
665 #[test]
666 fn builder_max_consecutive_panics_sets_value() {
667 let builder =
668 WorkerBuilder::new("http://localhost:3000", "token").max_consecutive_panics(10);
669 assert_eq!(builder.max_consecutive_panics, 10);
670 }
671
672 #[test]
673 fn builder_panic_cooldown_sets_value() {
674 let dur = Duration::from_secs(600);
675 let builder = WorkerBuilder::new("http://localhost:3000", "token").panic_cooldown(dur);
676 assert_eq!(builder.panic_cooldown, dur);
677 }
678
679 #[test]
680 fn builder_build_without_provider_fails() {
681 let builder = WorkerBuilder::new("http://localhost:3000", "token");
682 let result = builder.build();
683 assert!(result.is_err());
684 match result {
685 Err(WorkerError::Internal(msg)) => {
686 assert!(msg.contains("provider is required"));
687 }
688 _ => panic!("expected Internal error about missing provider"),
689 }
690 }
691
692 #[test]
693 fn builder_build_with_provider_succeeds() {
694 let provider = Arc::new(ClaudeCodeProvider::new());
695 let builder = WorkerBuilder::new("http://localhost:3000", "token").provider(provider);
696 let result = builder.build();
697 assert!(result.is_ok());
698 }
699
700 #[test]
701 fn builder_build_creates_worker_with_correct_concurrency() {
702 let provider = Arc::new(ClaudeCodeProvider::new());
703 let builder = WorkerBuilder::new("http://localhost:3000", "token")
704 .provider(provider)
705 .concurrency(16);
706 let worker = builder.build().unwrap();
707 assert_eq!(worker.concurrency, 16);
708 }
709
710 #[test]
711 fn builder_build_creates_worker_with_correct_interval() {
712 let provider = Arc::new(ClaudeCodeProvider::new());
713 let interval = Duration::from_secs(10);
714 let builder = WorkerBuilder::new("http://localhost:3000", "token")
715 .provider(provider)
716 .poll_interval(interval);
717 let worker = builder.build().unwrap();
718 assert_eq!(worker.poll_interval, interval);
719 }
720
721 #[test]
722 fn builder_build_preserves_timeout() {
723 let provider = Arc::new(ClaudeCodeProvider::new());
724 let dur = Duration::from_secs(300);
725 let worker = WorkerBuilder::new("http://localhost:3000", "token")
726 .provider(provider)
727 .run_timeout(dur)
728 .build()
729 .unwrap();
730 assert_eq!(worker.run_timeout, dur);
731 }
732
733 #[test]
734 fn builder_build_preserves_poison_pill_config() {
735 let provider = Arc::new(ClaudeCodeProvider::new());
736 let cooldown = Duration::from_secs(120);
737 let worker = WorkerBuilder::new("http://localhost:3000", "token")
738 .provider(provider)
739 .max_consecutive_panics(7)
740 .panic_cooldown(cooldown)
741 .build()
742 .unwrap();
743 assert_eq!(worker.max_consecutive_panics, 7);
744 assert_eq!(worker.panic_cooldown, cooldown);
745 }
746
747 #[test]
748 fn builder_chaining_works() {
749 let provider = Arc::new(ClaudeCodeProvider::new());
750 let result = WorkerBuilder::new("http://localhost:3000", "token")
751 .provider(provider)
752 .concurrency(4)
753 .poll_interval(Duration::from_secs(3))
754 .run_timeout(Duration::from_secs(600))
755 .max_consecutive_panics(5)
756 .panic_cooldown(Duration::from_secs(120))
757 .build();
758 assert!(result.is_ok());
759 let worker = result.unwrap();
760 assert_eq!(worker.concurrency, 4);
761 assert_eq!(worker.poll_interval, Duration::from_secs(3));
762 assert_eq!(worker.run_timeout, Duration::from_secs(600));
763 assert_eq!(worker.max_consecutive_panics, 5);
764 assert_eq!(worker.panic_cooldown, Duration::from_secs(120));
765 }
766
767 #[test]
768 fn builder_empty_api_url_accepted() {
769 let provider = Arc::new(ClaudeCodeProvider::new());
770 let builder = WorkerBuilder::new("", "token").provider(provider);
771 let result = builder.build();
772 assert!(result.is_ok());
773 }
774
775 #[test]
776 fn builder_empty_token_accepted() {
777 let provider = Arc::new(ClaudeCodeProvider::new());
778 let builder = WorkerBuilder::new("http://localhost:3000", "").provider(provider);
779 let result = builder.build();
780 assert!(result.is_ok());
781 }
782
783 #[cfg(feature = "heartbeat")]
784 #[test]
785 fn builder_heartbeat_defaults() {
786 let builder = WorkerBuilder::new("http://localhost:3000", "token");
787 assert!(builder.heartbeat_url.is_none());
788 assert_eq!(builder.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
789 }
790
791 #[cfg(feature = "heartbeat")]
792 #[test]
793 fn builder_heartbeat_url_sets_url() {
794 let builder = WorkerBuilder::new("http://localhost:3000", "token")
795 .heartbeat_url("https://uptime.betterstack.com/api/v1/heartbeat/abc");
796 assert_eq!(
797 builder.heartbeat_url.as_deref(),
798 Some("https://uptime.betterstack.com/api/v1/heartbeat/abc")
799 );
800 }
801
802 #[cfg(feature = "heartbeat")]
803 #[test]
804 fn builder_heartbeat_custom_interval() {
805 let interval = Duration::from_secs(10);
806 let builder =
807 WorkerBuilder::new("http://localhost:3000", "token").heartbeat_interval(interval);
808 assert_eq!(builder.heartbeat_interval, interval);
809 }
810
811 #[cfg(feature = "heartbeat")]
812 #[test]
813 fn builder_build_preserves_heartbeat_config() {
814 let provider = Arc::new(ClaudeCodeProvider::new());
815 let interval = Duration::from_secs(15);
816 let worker = WorkerBuilder::new("http://localhost:3000", "token")
817 .provider(provider)
818 .heartbeat_url("https://example.com/heartbeat")
819 .heartbeat_interval(interval)
820 .build()
821 .unwrap();
822 assert_eq!(
823 worker.heartbeat_url.as_deref(),
824 Some("https://example.com/heartbeat")
825 );
826 assert_eq!(worker.heartbeat_interval, interval);
827 }
828
829 #[cfg(feature = "heartbeat")]
830 #[test]
831 fn builder_build_without_heartbeat_url_has_none() {
832 let provider = Arc::new(ClaudeCodeProvider::new());
833 let worker = WorkerBuilder::new("http://localhost:3000", "token")
834 .provider(provider)
835 .build()
836 .unwrap();
837 assert!(worker.heartbeat_url.is_none());
838 }
839
840 #[test]
843 fn poison_tracker_not_blocked_initially() {
844 let tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
845 assert!(!tracker.is_blocked("my-workflow"));
846 }
847
848 #[test]
849 fn poison_tracker_blocked_after_max_panics() {
850 let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
851 assert!(!tracker.record_panic("wf"));
852 assert!(!tracker.record_panic("wf"));
853 assert!(tracker.record_panic("wf"));
854 assert!(tracker.is_blocked("wf"));
855 }
856
857 #[test]
858 fn poison_tracker_success_resets_count() {
859 let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
860 tracker.record_panic("wf");
861 tracker.record_panic("wf");
862 tracker.record_success("wf");
863 assert!(!tracker.is_blocked("wf"));
864 assert!(!tracker.record_panic("wf"));
866 }
867
868 #[test]
869 fn poison_tracker_independent_per_workflow() {
870 let mut tracker = PoisonPillTracker::new(2, Duration::from_secs(300));
871 tracker.record_panic("wf-a");
872 tracker.record_panic("wf-a");
873 assert!(tracker.is_blocked("wf-a"));
874 assert!(!tracker.is_blocked("wf-b"));
875 }
876
877 #[test]
878 fn poison_tracker_unblocks_after_cooldown() {
879 let mut tracker = PoisonPillTracker::new(2, Duration::from_millis(0));
880 tracker.record_panic("wf");
881 tracker.record_panic("wf");
882 assert!(!tracker.is_blocked("wf"));
884 }
885}