1use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7use tokio::spawn;
8use tokio::sync::{Semaphore, mpsc};
9use tokio::time::{sleep, timeout};
10use tokio_util::sync::CancellationToken;
11use tracing::{error, info, warn};
12
13#[cfg(feature = "prometheus")]
14use ironflow_core::metric_names::{WORKER_ACTIVE, WORKER_POLLS_TOTAL};
15use ironflow_core::provider::AgentProvider;
16use ironflow_engine::engine::Engine;
17use ironflow_engine::handler::WorkflowHandler;
18use ironflow_engine::log_sender::LogReceiver;
19use ironflow_store::entities::RunStatus;
20use ironflow_store::store::Store;
21#[cfg(feature = "prometheus")]
22use metrics::{counter, gauge};
23#[cfg(feature = "heartbeat")]
24use reqwest::Client;
25
26use crate::api_store::ApiRunStore;
27use crate::error::WorkerError;
28use crate::log_pusher::LogPusher;
29
30const DEFAULT_CONCURRENCY: usize = 2;
31const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(2);
32const DEFAULT_RUN_TIMEOUT: Duration = Duration::from_secs(30 * 60);
33const DEFAULT_MAX_CONSECUTIVE_PANICS: u32 = 3;
34const DEFAULT_PANIC_COOLDOWN: Duration = Duration::from_secs(5 * 60);
35#[cfg(feature = "heartbeat")]
36const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
37
38pub struct WorkerBuilder {
62 api_url: String,
63 worker_token: String,
64 provider: Option<Arc<dyn AgentProvider>>,
65 handlers: Vec<Box<dyn WorkflowHandler>>,
66 concurrency: usize,
67 poll_interval: Duration,
68 run_timeout: Duration,
69 max_consecutive_panics: u32,
70 panic_cooldown: Duration,
71 #[cfg(feature = "heartbeat")]
72 heartbeat_url: Option<String>,
73 #[cfg(feature = "heartbeat")]
74 heartbeat_interval: Duration,
75}
76
77impl WorkerBuilder {
78 pub fn new(api_url: &str, worker_token: &str) -> Self {
80 Self {
81 api_url: api_url.to_string(),
82 worker_token: worker_token.to_string(),
83 provider: None,
84 handlers: Vec::new(),
85 concurrency: DEFAULT_CONCURRENCY,
86 poll_interval: DEFAULT_POLL_INTERVAL,
87 run_timeout: DEFAULT_RUN_TIMEOUT,
88 max_consecutive_panics: DEFAULT_MAX_CONSECUTIVE_PANICS,
89 panic_cooldown: DEFAULT_PANIC_COOLDOWN,
90 #[cfg(feature = "heartbeat")]
91 heartbeat_url: None,
92 #[cfg(feature = "heartbeat")]
93 heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
94 }
95 }
96
97 pub fn provider(mut self, provider: Arc<dyn AgentProvider>) -> Self {
99 self.provider = Some(provider);
100 self
101 }
102
103 pub fn register(mut self, handler: impl WorkflowHandler + 'static) -> Self {
105 self.handlers.push(Box::new(handler));
106 self
107 }
108
109 pub fn concurrency(mut self, n: usize) -> Self {
111 self.concurrency = n;
112 self
113 }
114
115 pub fn poll_interval(mut self, interval: Duration) -> Self {
117 self.poll_interval = interval;
118 self
119 }
120
121 pub fn run_timeout(mut self, timeout: Duration) -> Self {
138 self.run_timeout = timeout;
139 self
140 }
141
142 pub fn max_consecutive_panics(mut self, n: u32) -> Self {
160 self.max_consecutive_panics = n;
161 self
162 }
163
164 pub fn panic_cooldown(mut self, cooldown: Duration) -> Self {
181 self.panic_cooldown = cooldown;
182 self
183 }
184
185 #[cfg(feature = "heartbeat")]
204 pub fn heartbeat_url(mut self, url: &str) -> Self {
205 self.heartbeat_url = Some(url.to_string());
206 self
207 }
208
209 #[cfg(feature = "heartbeat")]
227 pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
228 self.heartbeat_interval = interval;
229 self
230 }
231
232 pub fn build(self) -> Result<Worker, WorkerError> {
239 let provider = self
240 .provider
241 .ok_or_else(|| WorkerError::Internal("WorkerBuilder: provider is required".into()))?;
242
243 let store: Arc<dyn Store> = Arc::new(ApiRunStore::new(&self.api_url, &self.worker_token));
244
245 let mut engine = Engine::new(store, provider);
246 for handler in self.handlers {
247 engine
248 .register_boxed(handler)
249 .map_err(WorkerError::Engine)?;
250 }
251
252 let (log_sender, log_receiver) = ironflow_engine::log_sender::channel();
253 engine.set_log_sender(log_sender);
254
255 #[cfg(feature = "heartbeat")]
256 let heartbeat_client = Client::builder()
257 .timeout(Duration::from_secs(5))
258 .build()
259 .expect("failed to build heartbeat HTTP client");
260
261 Ok(Worker {
262 engine: Arc::new(engine),
263 api_url: self.api_url,
264 worker_token: self.worker_token,
265 log_receiver: Mutex::new(Some(log_receiver)),
266 concurrency: self.concurrency,
267 poll_interval: self.poll_interval,
268 run_timeout: self.run_timeout,
269 max_consecutive_panics: self.max_consecutive_panics,
270 panic_cooldown: self.panic_cooldown,
271 #[cfg(feature = "heartbeat")]
272 heartbeat_url: self.heartbeat_url,
273 #[cfg(feature = "heartbeat")]
274 heartbeat_interval: self.heartbeat_interval,
275 #[cfg(feature = "heartbeat")]
276 heartbeat_client,
277 })
278 }
279}
280
281pub struct Worker {
283 engine: Arc<Engine>,
284 api_url: String,
285 worker_token: String,
286 log_receiver: Mutex<Option<LogReceiver>>,
287 concurrency: usize,
288 poll_interval: Duration,
289 run_timeout: Duration,
290 max_consecutive_panics: u32,
291 panic_cooldown: Duration,
292 #[cfg(feature = "heartbeat")]
293 heartbeat_url: Option<String>,
294 #[cfg(feature = "heartbeat")]
295 heartbeat_interval: Duration,
296 #[cfg(feature = "heartbeat")]
297 heartbeat_client: Client,
298}
299
300struct PoisonPillTracker {
302 max_consecutive: u32,
303 cooldown: Duration,
304 state: HashMap<String, (u32, Instant)>,
306}
307
308impl PoisonPillTracker {
309 fn new(max_consecutive: u32, cooldown: Duration) -> Self {
310 Self {
311 max_consecutive,
312 cooldown,
313 state: HashMap::new(),
314 }
315 }
316
317 fn record_panic(&mut self, workflow: &str) -> bool {
320 let entry = self
321 .state
322 .entry(workflow.to_string())
323 .or_insert((0, Instant::now()));
324 entry.0 += 1;
325 entry.1 = Instant::now();
326 entry.0 >= self.max_consecutive
327 }
328
329 fn record_success(&mut self, workflow: &str) {
331 self.state.remove(workflow);
332 }
333
334 fn is_blocked(&self, workflow: &str) -> bool {
336 self.state.get(workflow).is_some_and(|(count, last_panic)| {
337 *count >= self.max_consecutive && last_panic.elapsed() < self.cooldown
338 })
339 }
340}
341
342impl Worker {
343 pub async fn run(&self) -> Result<(), WorkerError> {
352 let semaphore = Arc::new(Semaphore::new(self.concurrency));
353 let shutdown = CancellationToken::new();
354 let mut idle_streak = 0u32;
355 let poison_tracker = Arc::new(Mutex::new(PoisonPillTracker::new(
356 self.max_consecutive_panics,
357 self.panic_cooldown,
358 )));
359 let (outcome_tx, mut outcome_rx) = mpsc::unbounded_channel::<RunOutcome>();
360
361 info!(
362 concurrency = self.concurrency,
363 poll_interval_ms = self.poll_interval.as_millis() as u64,
364 run_timeout_secs = self.run_timeout.as_secs(),
365 "worker started"
366 );
367
368 if let Some(receiver) = self.log_receiver.lock().expect("log_receiver lock").take() {
369 let pusher = LogPusher::new(&self.api_url, &self.worker_token);
370 spawn(pusher.run(receiver));
371 info!("log pusher started");
372 }
373
374 let shutdown_clone = shutdown.clone();
376 spawn(async move {
377 shutdown_signal().await;
378 info!("shutdown signal received, draining in-flight runs...");
379 shutdown_clone.cancel();
380 });
381
382 #[cfg(feature = "heartbeat")]
383 if let Some(ref url) = self.heartbeat_url {
384 let interval = self.heartbeat_interval;
385 let url = url.clone();
386 let client = self.heartbeat_client.clone();
387
388 spawn(async move {
389 let mut ticker = tokio::time::interval(interval);
390 ticker.tick().await;
392 loop {
393 ticker.tick().await;
394 match client.head(&url).send().await {
395 Ok(resp) if resp.status().is_success() => {
396 info!(url = %url, "heartbeat sent");
397 }
398 Ok(resp) => {
399 warn!(
400 url = %url,
401 status = %resp.status(),
402 "heartbeat ping returned non-success status"
403 );
404 }
405 Err(err) => {
406 warn!(
407 url = %url,
408 error = %err,
409 "heartbeat ping failed"
410 );
411 }
412 }
413 }
414 });
415 }
416
417 while !shutdown.is_cancelled() {
418 while let Ok(outcome) = outcome_rx.try_recv() {
420 let mut tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
421 match outcome {
422 RunOutcome::Success(ref wf) => tracker.record_success(wf),
423 RunOutcome::Failed(ref wf) | RunOutcome::Timeout(ref wf) => {
424 if tracker.record_panic(wf) {
425 warn!(workflow = %wf, "workflow flagged as poison pill after consecutive failures");
426 }
427 }
428 RunOutcome::Panicked(ref wf) => {
429 if tracker.record_panic(wf) {
430 error!(workflow = %wf, "workflow flagged as poison pill after consecutive panics");
431 }
432 }
433 }
434 }
435
436 let run = self.engine.store().pick_next_pending().await;
437
438 match run {
439 Ok(Some(run)) => {
440 #[cfg(feature = "prometheus")]
441 counter!(WORKER_POLLS_TOTAL, "result" => "hit").increment(1);
442
443 let is_blocked = {
445 let tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
446 tracker.is_blocked(&run.workflow_name)
447 };
448 if is_blocked {
449 warn!(
450 workflow = %run.workflow_name,
451 run_id = %run.id,
452 "skipping run: workflow flagged as poison pill, marking as failed"
453 );
454 if let Err(e) = self
455 .engine
456 .store()
457 .update_run_status(run.id, RunStatus::Failed)
458 .await
459 {
460 error!(run_id = %run.id, error = %e, "failed to mark poisoned run as failed");
461 }
462 continue;
463 }
464
465 let permit = semaphore
466 .clone()
467 .acquire_owned()
468 .await
469 .map_err(|_| WorkerError::Internal("semaphore closed".to_string()))?;
470
471 idle_streak = 0;
472 let engine = self.engine.clone();
473 let run_id = run.id;
474 let workflow = run.workflow_name.clone();
475 let workflow_for_watcher = workflow.clone();
476 let run_timeout = self.run_timeout;
477
478 info!(run_id = %run_id, workflow = %workflow, "executing run");
479
480 #[cfg(feature = "prometheus")]
481 gauge!(WORKER_ACTIVE).increment(1.0);
482
483 let handle = spawn(async move {
484 let _permit = permit;
485 let result = timeout(run_timeout, engine.execute_handler_run(run_id)).await;
486
487 match result {
488 Ok(Ok(_)) => {
489 info!(run_id = %run_id, workflow = %workflow, "run completed");
490 RunOutcome::Success(workflow)
491 }
492 Ok(Err(e)) => {
493 error!(run_id = %run_id, workflow = %workflow, error = %e, "run failed");
494 if let Err(store_err) = engine
495 .store()
496 .update_run_status(run_id, RunStatus::Failed)
497 .await
498 {
499 error!(run_id = %run_id, error = %store_err, "failed to mark run as failed");
500 }
501 RunOutcome::Failed(workflow)
502 }
503 Err(_) => {
504 error!(
505 run_id = %run_id,
506 workflow = %workflow,
507 timeout_secs = run_timeout.as_secs(),
508 "run timed out"
509 );
510 if let Err(e) = engine
511 .store()
512 .update_run_status(run_id, RunStatus::Failed)
513 .await
514 {
515 error!(run_id = %run_id, error = %e, "failed to mark timed-out run as failed");
516 }
517 RunOutcome::Timeout(workflow)
518 }
519 }
520 });
521
522 let store = self.engine.store().clone();
524 let tx = outcome_tx.clone();
525 spawn(async move {
526 match handle.await {
527 Ok(outcome) => {
528 let _ = tx.send(outcome);
529 }
530 Err(e) => {
531 error!(run_id = %run_id, "spawned task panicked: {e}");
532 if let Err(store_err) =
533 store.update_run_status(run_id, RunStatus::Failed).await
534 {
535 error!(run_id = %run_id, error = %store_err, "failed to mark panicked run as failed");
536 }
537 let _ = tx.send(RunOutcome::Panicked(workflow_for_watcher));
538 }
539 }
540 #[cfg(feature = "prometheus")]
541 gauge!(WORKER_ACTIVE).decrement(1.0);
542 });
543 }
544 Ok(None) => {
545 #[cfg(feature = "prometheus")]
546 counter!(WORKER_POLLS_TOTAL, "result" => "miss").increment(1);
547
548 idle_streak += 1;
549 let backoff = if idle_streak > 10 {
550 self.poll_interval * 3
551 } else if idle_streak > 5 {
552 self.poll_interval * 2
553 } else {
554 self.poll_interval
555 };
556 sleep(backoff).await;
557 }
558 Err(e) => {
559 warn!(error = %e, "poll error");
560 sleep(self.poll_interval).await;
561 }
562 }
563 }
564
565 info!(
567 in_flight = self.concurrency - semaphore.available_permits(),
568 "waiting for in-flight runs to complete..."
569 );
570 let _ = semaphore
571 .acquire_many(self.concurrency as u32)
572 .await
573 .map_err(|_| WorkerError::Shutdown("semaphore closed during drain".to_string()))?;
574
575 info!("all in-flight runs completed, worker shut down");
576 Ok(())
577 }
578}
579
580enum RunOutcome {
582 Success(String),
584 Failed(String),
586 Timeout(String),
588 Panicked(String),
590}
591
592async fn shutdown_signal() {
594 use tokio::signal;
595
596 let ctrl_c = async {
597 signal::ctrl_c()
598 .await
599 .expect("failed to install Ctrl+C handler");
600 };
601
602 #[cfg(unix)]
603 let terminate = async {
604 use tokio::signal::unix::{SignalKind, signal};
605
606 signal(SignalKind::terminate())
607 .expect("failed to install SIGTERM handler")
608 .recv()
609 .await;
610 };
611
612 #[cfg(not(unix))]
613 let terminate = {
614 use std::future::pending;
615 pending::<()>()
616 };
617
618 tokio::select! {
619 () = ctrl_c => {},
620 () = terminate => {},
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use super::*;
627 use ironflow_core::providers::claude::ClaudeCodeProvider;
628
629 #[test]
630 fn builder_new_creates_default_config() {
631 let builder = WorkerBuilder::new("http://localhost:3000", "my-token");
632 assert_eq!(builder.api_url, "http://localhost:3000");
633 assert_eq!(builder.worker_token, "my-token");
634 assert_eq!(builder.concurrency, DEFAULT_CONCURRENCY);
635 assert_eq!(builder.poll_interval, DEFAULT_POLL_INTERVAL);
636 assert_eq!(builder.run_timeout, DEFAULT_RUN_TIMEOUT);
637 assert_eq!(
638 builder.max_consecutive_panics,
639 DEFAULT_MAX_CONSECUTIVE_PANICS
640 );
641 assert_eq!(builder.panic_cooldown, DEFAULT_PANIC_COOLDOWN);
642 assert!(builder.provider.is_none());
643 }
644
645 #[test]
646 fn builder_with_trailing_slash_normalized() {
647 let builder = WorkerBuilder::new("http://localhost:3000/", "token");
648 assert_eq!(builder.api_url, "http://localhost:3000/");
649 }
650
651 #[test]
652 fn builder_provider_sets_provider() {
653 let provider = Arc::new(ClaudeCodeProvider::new());
654 let builder =
655 WorkerBuilder::new("http://localhost:3000", "token").provider(provider.clone());
656 assert!(builder.provider.is_some());
657 }
658
659 #[test]
660 fn builder_concurrency_sets_concurrency() {
661 let builder = WorkerBuilder::new("http://localhost:3000", "token").concurrency(8);
662 assert_eq!(builder.concurrency, 8);
663 }
664
665 #[test]
666 fn builder_concurrency_zero_accepted() {
667 let provider = Arc::new(ClaudeCodeProvider::new());
668 let builder = WorkerBuilder::new("http://localhost:3000", "token")
669 .provider(provider)
670 .concurrency(0);
671 assert_eq!(builder.concurrency, 0);
672 }
673
674 #[test]
675 fn builder_poll_interval_sets_interval() {
676 let interval = Duration::from_secs(5);
677 let builder = WorkerBuilder::new("http://localhost:3000", "token").poll_interval(interval);
678 assert_eq!(builder.poll_interval, interval);
679 }
680
681 #[test]
682 fn builder_run_timeout_sets_timeout() {
683 let dur = Duration::from_secs(120);
684 let builder = WorkerBuilder::new("http://localhost:3000", "token").run_timeout(dur);
685 assert_eq!(builder.run_timeout, dur);
686 }
687
688 #[test]
689 fn builder_max_consecutive_panics_sets_value() {
690 let builder =
691 WorkerBuilder::new("http://localhost:3000", "token").max_consecutive_panics(10);
692 assert_eq!(builder.max_consecutive_panics, 10);
693 }
694
695 #[test]
696 fn builder_panic_cooldown_sets_value() {
697 let dur = Duration::from_secs(600);
698 let builder = WorkerBuilder::new("http://localhost:3000", "token").panic_cooldown(dur);
699 assert_eq!(builder.panic_cooldown, dur);
700 }
701
702 #[test]
703 fn builder_build_without_provider_fails() {
704 let builder = WorkerBuilder::new("http://localhost:3000", "token");
705 let result = builder.build();
706 assert!(result.is_err());
707 match result {
708 Err(WorkerError::Internal(msg)) => {
709 assert!(msg.contains("provider is required"));
710 }
711 _ => panic!("expected Internal error about missing provider"),
712 }
713 }
714
715 #[test]
716 fn builder_build_with_provider_succeeds() {
717 let provider = Arc::new(ClaudeCodeProvider::new());
718 let builder = WorkerBuilder::new("http://localhost:3000", "token").provider(provider);
719 let result = builder.build();
720 assert!(result.is_ok());
721 }
722
723 #[test]
724 fn builder_build_creates_worker_with_correct_concurrency() {
725 let provider = Arc::new(ClaudeCodeProvider::new());
726 let builder = WorkerBuilder::new("http://localhost:3000", "token")
727 .provider(provider)
728 .concurrency(16);
729 let worker = builder.build().unwrap();
730 assert_eq!(worker.concurrency, 16);
731 }
732
733 #[test]
734 fn builder_build_creates_worker_with_correct_interval() {
735 let provider = Arc::new(ClaudeCodeProvider::new());
736 let interval = Duration::from_secs(10);
737 let builder = WorkerBuilder::new("http://localhost:3000", "token")
738 .provider(provider)
739 .poll_interval(interval);
740 let worker = builder.build().unwrap();
741 assert_eq!(worker.poll_interval, interval);
742 }
743
744 #[test]
745 fn builder_build_preserves_timeout() {
746 let provider = Arc::new(ClaudeCodeProvider::new());
747 let dur = Duration::from_secs(300);
748 let worker = WorkerBuilder::new("http://localhost:3000", "token")
749 .provider(provider)
750 .run_timeout(dur)
751 .build()
752 .unwrap();
753 assert_eq!(worker.run_timeout, dur);
754 }
755
756 #[test]
757 fn builder_build_preserves_poison_pill_config() {
758 let provider = Arc::new(ClaudeCodeProvider::new());
759 let cooldown = Duration::from_secs(120);
760 let worker = WorkerBuilder::new("http://localhost:3000", "token")
761 .provider(provider)
762 .max_consecutive_panics(7)
763 .panic_cooldown(cooldown)
764 .build()
765 .unwrap();
766 assert_eq!(worker.max_consecutive_panics, 7);
767 assert_eq!(worker.panic_cooldown, cooldown);
768 }
769
770 #[test]
771 fn builder_chaining_works() {
772 let provider = Arc::new(ClaudeCodeProvider::new());
773 let result = WorkerBuilder::new("http://localhost:3000", "token")
774 .provider(provider)
775 .concurrency(4)
776 .poll_interval(Duration::from_secs(3))
777 .run_timeout(Duration::from_secs(600))
778 .max_consecutive_panics(5)
779 .panic_cooldown(Duration::from_secs(120))
780 .build();
781 assert!(result.is_ok());
782 let worker = result.unwrap();
783 assert_eq!(worker.concurrency, 4);
784 assert_eq!(worker.poll_interval, Duration::from_secs(3));
785 assert_eq!(worker.run_timeout, Duration::from_secs(600));
786 assert_eq!(worker.max_consecutive_panics, 5);
787 assert_eq!(worker.panic_cooldown, Duration::from_secs(120));
788 }
789
790 #[test]
791 fn builder_empty_api_url_accepted() {
792 let provider = Arc::new(ClaudeCodeProvider::new());
793 let builder = WorkerBuilder::new("", "token").provider(provider);
794 let result = builder.build();
795 assert!(result.is_ok());
796 }
797
798 #[test]
799 fn builder_empty_token_accepted() {
800 let provider = Arc::new(ClaudeCodeProvider::new());
801 let builder = WorkerBuilder::new("http://localhost:3000", "").provider(provider);
802 let result = builder.build();
803 assert!(result.is_ok());
804 }
805
806 #[cfg(feature = "heartbeat")]
807 #[test]
808 fn builder_heartbeat_defaults() {
809 let builder = WorkerBuilder::new("http://localhost:3000", "token");
810 assert!(builder.heartbeat_url.is_none());
811 assert_eq!(builder.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
812 }
813
814 #[cfg(feature = "heartbeat")]
815 #[test]
816 fn builder_heartbeat_url_sets_url() {
817 let builder = WorkerBuilder::new("http://localhost:3000", "token")
818 .heartbeat_url("https://uptime.betterstack.com/api/v1/heartbeat/abc");
819 assert_eq!(
820 builder.heartbeat_url.as_deref(),
821 Some("https://uptime.betterstack.com/api/v1/heartbeat/abc")
822 );
823 }
824
825 #[cfg(feature = "heartbeat")]
826 #[test]
827 fn builder_heartbeat_custom_interval() {
828 let interval = Duration::from_secs(10);
829 let builder =
830 WorkerBuilder::new("http://localhost:3000", "token").heartbeat_interval(interval);
831 assert_eq!(builder.heartbeat_interval, interval);
832 }
833
834 #[cfg(feature = "heartbeat")]
835 #[test]
836 fn builder_build_preserves_heartbeat_config() {
837 let provider = Arc::new(ClaudeCodeProvider::new());
838 let interval = Duration::from_secs(15);
839 let worker = WorkerBuilder::new("http://localhost:3000", "token")
840 .provider(provider)
841 .heartbeat_url("https://example.com/heartbeat")
842 .heartbeat_interval(interval)
843 .build()
844 .unwrap();
845 assert_eq!(
846 worker.heartbeat_url.as_deref(),
847 Some("https://example.com/heartbeat")
848 );
849 assert_eq!(worker.heartbeat_interval, interval);
850 }
851
852 #[cfg(feature = "heartbeat")]
853 #[test]
854 fn builder_build_without_heartbeat_url_has_none() {
855 let provider = Arc::new(ClaudeCodeProvider::new());
856 let worker = WorkerBuilder::new("http://localhost:3000", "token")
857 .provider(provider)
858 .build()
859 .unwrap();
860 assert!(worker.heartbeat_url.is_none());
861 }
862
863 #[test]
866 fn poison_tracker_not_blocked_initially() {
867 let tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
868 assert!(!tracker.is_blocked("my-workflow"));
869 }
870
871 #[test]
872 fn poison_tracker_blocked_after_max_panics() {
873 let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
874 assert!(!tracker.record_panic("wf"));
875 assert!(!tracker.record_panic("wf"));
876 assert!(tracker.record_panic("wf"));
877 assert!(tracker.is_blocked("wf"));
878 }
879
880 #[test]
881 fn poison_tracker_success_resets_count() {
882 let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
883 tracker.record_panic("wf");
884 tracker.record_panic("wf");
885 tracker.record_success("wf");
886 assert!(!tracker.is_blocked("wf"));
887 assert!(!tracker.record_panic("wf"));
889 }
890
891 #[test]
892 fn poison_tracker_independent_per_workflow() {
893 let mut tracker = PoisonPillTracker::new(2, Duration::from_secs(300));
894 tracker.record_panic("wf-a");
895 tracker.record_panic("wf-a");
896 assert!(tracker.is_blocked("wf-a"));
897 assert!(!tracker.is_blocked("wf-b"));
898 }
899
900 #[test]
901 fn poison_tracker_unblocks_after_cooldown() {
902 let mut tracker = PoisonPillTracker::new(2, Duration::from_millis(0));
903 tracker.record_panic("wf");
904 tracker.record_panic("wf");
905 assert!(!tracker.is_blocked("wf"));
907 }
908}