1use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7use tokio::spawn;
8use tokio::sync::{Semaphore, mpsc};
9use tokio::time::{sleep, timeout};
10use tokio_util::sync::CancellationToken;
11use tracing::{error, info, warn};
12
13#[cfg(feature = "prometheus")]
14use ironflow_core::metric_names::{WORKER_ACTIVE, WORKER_POLLS_TOTAL};
15use ironflow_core::provider::AgentProvider;
16use ironflow_engine::engine::Engine;
17use ironflow_engine::handler::WorkflowHandler;
18use ironflow_engine::log_sender::LogReceiver;
19use ironflow_store::entities::RunStatus;
20use ironflow_store::store::Store;
21#[cfg(feature = "prometheus")]
22use metrics::{counter, gauge};
23#[cfg(feature = "heartbeat")]
24use reqwest::Client;
25
26use crate::api_store::ApiRunStore;
27use crate::error::WorkerError;
28use crate::log_pusher::LogPusher;
29
30const DEFAULT_CONCURRENCY: usize = 2;
31const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(2);
32const DEFAULT_RUN_TIMEOUT: Duration = Duration::from_secs(30 * 60);
33const DEFAULT_MAX_CONSECUTIVE_PANICS: u32 = 3;
34const DEFAULT_PANIC_COOLDOWN: Duration = Duration::from_secs(5 * 60);
35#[cfg(feature = "heartbeat")]
36const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
37
38pub struct WorkerBuilder {
62 api_url: String,
63 worker_token: String,
64 provider: Option<Arc<dyn AgentProvider>>,
65 handlers: Vec<Box<dyn WorkflowHandler>>,
66 concurrency: usize,
67 poll_interval: Duration,
68 run_timeout: Duration,
69 max_consecutive_panics: u32,
70 panic_cooldown: Duration,
71 #[cfg(feature = "heartbeat")]
72 heartbeat_url: Option<String>,
73 #[cfg(feature = "heartbeat")]
74 heartbeat_interval: Duration,
75}
76
77impl WorkerBuilder {
78 pub fn new(api_url: &str, worker_token: &str) -> Self {
80 Self {
81 api_url: api_url.to_string(),
82 worker_token: worker_token.to_string(),
83 provider: None,
84 handlers: Vec::new(),
85 concurrency: DEFAULT_CONCURRENCY,
86 poll_interval: DEFAULT_POLL_INTERVAL,
87 run_timeout: DEFAULT_RUN_TIMEOUT,
88 max_consecutive_panics: DEFAULT_MAX_CONSECUTIVE_PANICS,
89 panic_cooldown: DEFAULT_PANIC_COOLDOWN,
90 #[cfg(feature = "heartbeat")]
91 heartbeat_url: None,
92 #[cfg(feature = "heartbeat")]
93 heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
94 }
95 }
96
97 pub fn provider(mut self, provider: Arc<dyn AgentProvider>) -> Self {
99 self.provider = Some(provider);
100 self
101 }
102
103 pub fn register(mut self, handler: impl WorkflowHandler + 'static) -> Self {
105 self.handlers.push(Box::new(handler));
106 self
107 }
108
109 pub fn concurrency(mut self, n: usize) -> Self {
111 self.concurrency = n;
112 self
113 }
114
115 pub fn poll_interval(mut self, interval: Duration) -> Self {
117 self.poll_interval = interval;
118 self
119 }
120
121 pub fn run_timeout(mut self, timeout: Duration) -> Self {
138 self.run_timeout = timeout;
139 self
140 }
141
142 pub fn max_consecutive_panics(mut self, n: u32) -> Self {
160 self.max_consecutive_panics = n;
161 self
162 }
163
164 pub fn panic_cooldown(mut self, cooldown: Duration) -> Self {
181 self.panic_cooldown = cooldown;
182 self
183 }
184
185 #[cfg(feature = "heartbeat")]
204 pub fn heartbeat_url(mut self, url: &str) -> Self {
205 self.heartbeat_url = Some(url.to_string());
206 self
207 }
208
209 #[cfg(feature = "heartbeat")]
227 pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
228 self.heartbeat_interval = interval;
229 self
230 }
231
232 pub fn build(self) -> Result<Worker, WorkerError> {
239 let provider = self
240 .provider
241 .ok_or_else(|| WorkerError::Internal("WorkerBuilder: provider is required".into()))?;
242
243 let store: Arc<dyn Store> = Arc::new(ApiRunStore::new(&self.api_url, &self.worker_token));
244
245 let mut engine = Engine::new(store, provider);
246 for handler in self.handlers {
247 engine
248 .register_boxed(handler)
249 .map_err(WorkerError::Engine)?;
250 }
251
252 let (log_sender, log_receiver) = ironflow_engine::log_sender::channel();
253 engine.set_log_sender(log_sender);
254
255 #[cfg(feature = "heartbeat")]
256 let heartbeat_client = Client::builder()
257 .timeout(Duration::from_secs(5))
258 .build()
259 .expect("failed to build heartbeat HTTP client");
260
261 Ok(Worker {
262 engine: Arc::new(engine),
263 api_url: self.api_url,
264 worker_token: self.worker_token,
265 log_receiver: Mutex::new(Some(log_receiver)),
266 concurrency: self.concurrency,
267 poll_interval: self.poll_interval,
268 run_timeout: self.run_timeout,
269 max_consecutive_panics: self.max_consecutive_panics,
270 panic_cooldown: self.panic_cooldown,
271 #[cfg(feature = "heartbeat")]
272 heartbeat_url: self.heartbeat_url,
273 #[cfg(feature = "heartbeat")]
274 heartbeat_interval: self.heartbeat_interval,
275 #[cfg(feature = "heartbeat")]
276 heartbeat_client,
277 })
278 }
279}
280
281pub struct Worker {
283 engine: Arc<Engine>,
284 api_url: String,
285 worker_token: String,
286 log_receiver: Mutex<Option<LogReceiver>>,
287 concurrency: usize,
288 poll_interval: Duration,
289 run_timeout: Duration,
290 max_consecutive_panics: u32,
291 panic_cooldown: Duration,
292 #[cfg(feature = "heartbeat")]
293 heartbeat_url: Option<String>,
294 #[cfg(feature = "heartbeat")]
295 heartbeat_interval: Duration,
296 #[cfg(feature = "heartbeat")]
297 heartbeat_client: Client,
298}
299
300struct PoisonPillTracker {
302 max_consecutive: u32,
303 cooldown: Duration,
304 state: HashMap<String, (u32, Instant)>,
306}
307
308impl PoisonPillTracker {
309 fn new(max_consecutive: u32, cooldown: Duration) -> Self {
310 Self {
311 max_consecutive,
312 cooldown,
313 state: HashMap::new(),
314 }
315 }
316
317 fn record_panic(&mut self, workflow: &str) -> bool {
320 let entry = self
321 .state
322 .entry(workflow.to_string())
323 .or_insert((0, Instant::now()));
324 entry.0 += 1;
325 entry.1 = Instant::now();
326 entry.0 >= self.max_consecutive
327 }
328
329 fn record_success(&mut self, workflow: &str) {
331 self.state.remove(workflow);
332 }
333
334 fn is_blocked(&self, workflow: &str) -> bool {
336 self.state.get(workflow).is_some_and(|(count, last_panic)| {
337 *count >= self.max_consecutive && last_panic.elapsed() < self.cooldown
338 })
339 }
340}
341
342impl Worker {
343 pub async fn run(&self) -> Result<(), WorkerError> {
352 let semaphore = Arc::new(Semaphore::new(self.concurrency));
353 let shutdown = CancellationToken::new();
354 let mut idle_streak = 0u32;
355 let poison_tracker = Arc::new(Mutex::new(PoisonPillTracker::new(
356 self.max_consecutive_panics,
357 self.panic_cooldown,
358 )));
359 let (outcome_tx, mut outcome_rx) = mpsc::unbounded_channel::<RunOutcome>();
360
361 info!(
362 concurrency = self.concurrency,
363 poll_interval_ms = self.poll_interval.as_millis() as u64,
364 run_timeout_secs = self.run_timeout.as_secs(),
365 "worker started"
366 );
367
368 if let Some(receiver) = self.log_receiver.lock().expect("log_receiver lock").take() {
369 let pusher = LogPusher::new(&self.api_url, &self.worker_token);
370 spawn(pusher.run(receiver));
371 info!("log pusher started");
372 }
373
374 let shutdown_clone = shutdown.clone();
376 spawn(async move {
377 shutdown_signal().await;
378 info!("shutdown signal received, draining in-flight runs...");
379 shutdown_clone.cancel();
380 });
381
382 #[cfg(feature = "heartbeat")]
383 if let Some(ref url) = self.heartbeat_url {
384 let interval = self.heartbeat_interval;
385 let url = url.clone();
386 let client = self.heartbeat_client.clone();
387
388 spawn(async move {
389 let mut ticker = tokio::time::interval(interval);
390 ticker.tick().await;
392 loop {
393 ticker.tick().await;
394 match client.head(&url).send().await {
395 Ok(resp) if resp.status().is_success() => {
396 info!(url = %url, "heartbeat sent");
397 }
398 Ok(resp) => {
399 warn!(
400 url = %url,
401 status = %resp.status(),
402 "heartbeat ping returned non-success status"
403 );
404 }
405 Err(err) => {
406 warn!(
407 url = %url,
408 error = %err,
409 "heartbeat ping failed"
410 );
411 }
412 }
413 }
414 });
415 }
416
417 while !shutdown.is_cancelled() {
418 while let Ok(outcome) = outcome_rx.try_recv() {
420 let mut tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
421 match outcome {
422 RunOutcome::Success(ref wf) => tracker.record_success(wf),
423 RunOutcome::Failed(ref wf) | RunOutcome::Timeout(ref wf) => {
424 if tracker.record_panic(wf) {
425 warn!(workflow = %wf, "workflow flagged as poison pill after consecutive failures");
426 }
427 }
428 RunOutcome::Panicked(ref wf) => {
429 if tracker.record_panic(wf) {
430 error!(workflow = %wf, "workflow flagged as poison pill after consecutive panics");
431 }
432 }
433 }
434 }
435
436 let run = self.engine.store().pick_next_pending().await;
437
438 match run {
439 Ok(Some(run)) => {
440 #[cfg(feature = "prometheus")]
441 counter!(WORKER_POLLS_TOTAL, "result" => "hit").increment(1);
442
443 let is_blocked = {
445 let tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
446 tracker.is_blocked(&run.workflow_name)
447 };
448 if is_blocked {
449 warn!(
450 workflow = %run.workflow_name,
451 run_id = %run.id,
452 "skipping run: workflow flagged as poison pill, marking as failed"
453 );
454 if let Err(e) = self
455 .engine
456 .store()
457 .update_run_status(run.id, RunStatus::Failed)
458 .await
459 {
460 error!(run_id = %run.id, error = %e, "failed to mark poisoned run as failed");
461 }
462 continue;
463 }
464
465 let permit = semaphore
466 .clone()
467 .acquire_owned()
468 .await
469 .map_err(|_| WorkerError::Internal("semaphore closed".to_string()))?;
470
471 idle_streak = 0;
472 let engine = self.engine.clone();
473 let run_id = run.id;
474 let workflow = run.workflow_name.clone();
475 let workflow_for_watcher = workflow.clone();
476 let run_timeout = self.run_timeout;
477
478 info!(run_id = %run_id, workflow = %workflow, "executing run");
479
480 #[cfg(feature = "prometheus")]
481 gauge!(WORKER_ACTIVE).increment(1.0);
482
483 let handle = spawn(async move {
484 let _permit = permit;
485 let result = timeout(run_timeout, engine.execute_handler_run(run_id)).await;
486
487 match result {
488 Ok(Ok(_)) => {
489 info!(run_id = %run_id, workflow = %workflow, "run completed");
490 RunOutcome::Success(workflow)
491 }
492 Ok(Err(e)) => {
493 error!(run_id = %run_id, workflow = %workflow, error = %e, "run failed");
494 if let Err(store_err) = engine
495 .store()
496 .update_run_status(run_id, RunStatus::Failed)
497 .await
498 {
499 error!(run_id = %run_id, error = %store_err, "failed to mark run as failed");
500 }
501 if let Err(cleanup_err) = engine
502 .fail_orphaned_steps(run_id, "parent run failed")
503 .await
504 {
505 error!(run_id = %run_id, error = %cleanup_err, "failed to cleanup orphaned steps");
506 }
507 RunOutcome::Failed(workflow)
508 }
509 Err(_) => {
510 error!(
511 run_id = %run_id,
512 workflow = %workflow,
513 timeout_secs = run_timeout.as_secs(),
514 "run timed out"
515 );
516 if let Err(e) = engine
517 .store()
518 .update_run_status(run_id, RunStatus::Failed)
519 .await
520 {
521 error!(run_id = %run_id, error = %e, "failed to mark timed-out run as failed");
522 }
523 if let Err(e) = engine
524 .fail_orphaned_steps(run_id, "parent run timed out")
525 .await
526 {
527 error!(run_id = %run_id, error = %e, "failed to cleanup orphaned steps after timeout");
528 }
529 RunOutcome::Timeout(workflow)
530 }
531 }
532 });
533
534 let watcher_engine = self.engine.clone();
536 let tx = outcome_tx.clone();
537 spawn(async move {
538 match handle.await {
539 Ok(outcome) => {
540 let _ = tx.send(outcome);
541 }
542 Err(e) => {
543 error!(run_id = %run_id, "spawned task panicked: {e}");
544 if let Err(store_err) = watcher_engine
545 .store()
546 .update_run_status(run_id, RunStatus::Failed)
547 .await
548 {
549 error!(run_id = %run_id, error = %store_err, "failed to mark panicked run as failed");
550 }
551 if let Err(cleanup_err) = watcher_engine
552 .fail_orphaned_steps(run_id, "parent run panicked")
553 .await
554 {
555 error!(run_id = %run_id, error = %cleanup_err, "failed to cleanup orphaned steps after panic");
556 }
557 let _ = tx.send(RunOutcome::Panicked(workflow_for_watcher));
558 }
559 }
560 #[cfg(feature = "prometheus")]
561 gauge!(WORKER_ACTIVE).decrement(1.0);
562 });
563 }
564 Ok(None) => {
565 #[cfg(feature = "prometheus")]
566 counter!(WORKER_POLLS_TOTAL, "result" => "miss").increment(1);
567
568 idle_streak += 1;
569 let backoff = if idle_streak > 10 {
570 self.poll_interval * 3
571 } else if idle_streak > 5 {
572 self.poll_interval * 2
573 } else {
574 self.poll_interval
575 };
576 sleep(backoff).await;
577 }
578 Err(e) => {
579 warn!(error = %e, "poll error");
580 sleep(self.poll_interval).await;
581 }
582 }
583 }
584
585 info!(
587 in_flight = self.concurrency - semaphore.available_permits(),
588 "waiting for in-flight runs to complete..."
589 );
590 let _ = semaphore
591 .acquire_many(self.concurrency as u32)
592 .await
593 .map_err(|_| WorkerError::Shutdown("semaphore closed during drain".to_string()))?;
594
595 info!("all in-flight runs completed, worker shut down");
596 Ok(())
597 }
598}
599
600enum RunOutcome {
602 Success(String),
604 Failed(String),
606 Timeout(String),
608 Panicked(String),
610}
611
612async fn shutdown_signal() {
614 use tokio::signal;
615
616 let ctrl_c = async {
617 signal::ctrl_c()
618 .await
619 .expect("failed to install Ctrl+C handler");
620 };
621
622 #[cfg(unix)]
623 let terminate = async {
624 use tokio::signal::unix::{SignalKind, signal};
625
626 signal(SignalKind::terminate())
627 .expect("failed to install SIGTERM handler")
628 .recv()
629 .await;
630 };
631
632 #[cfg(not(unix))]
633 let terminate = {
634 use std::future::pending;
635 pending::<()>()
636 };
637
638 tokio::select! {
639 () = ctrl_c => {},
640 () = terminate => {},
641 }
642}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647 use ironflow_core::providers::claude::ClaudeCodeProvider;
648
649 #[test]
650 fn builder_new_creates_default_config() {
651 let builder = WorkerBuilder::new("http://localhost:3000", "my-token");
652 assert_eq!(builder.api_url, "http://localhost:3000");
653 assert_eq!(builder.worker_token, "my-token");
654 assert_eq!(builder.concurrency, DEFAULT_CONCURRENCY);
655 assert_eq!(builder.poll_interval, DEFAULT_POLL_INTERVAL);
656 assert_eq!(builder.run_timeout, DEFAULT_RUN_TIMEOUT);
657 assert_eq!(
658 builder.max_consecutive_panics,
659 DEFAULT_MAX_CONSECUTIVE_PANICS
660 );
661 assert_eq!(builder.panic_cooldown, DEFAULT_PANIC_COOLDOWN);
662 assert!(builder.provider.is_none());
663 }
664
665 #[test]
666 fn builder_with_trailing_slash_normalized() {
667 let builder = WorkerBuilder::new("http://localhost:3000/", "token");
668 assert_eq!(builder.api_url, "http://localhost:3000/");
669 }
670
671 #[test]
672 fn builder_provider_sets_provider() {
673 let provider = Arc::new(ClaudeCodeProvider::new());
674 let builder =
675 WorkerBuilder::new("http://localhost:3000", "token").provider(provider.clone());
676 assert!(builder.provider.is_some());
677 }
678
679 #[test]
680 fn builder_concurrency_sets_concurrency() {
681 let builder = WorkerBuilder::new("http://localhost:3000", "token").concurrency(8);
682 assert_eq!(builder.concurrency, 8);
683 }
684
685 #[test]
686 fn builder_concurrency_zero_accepted() {
687 let provider = Arc::new(ClaudeCodeProvider::new());
688 let builder = WorkerBuilder::new("http://localhost:3000", "token")
689 .provider(provider)
690 .concurrency(0);
691 assert_eq!(builder.concurrency, 0);
692 }
693
694 #[test]
695 fn builder_poll_interval_sets_interval() {
696 let interval = Duration::from_secs(5);
697 let builder = WorkerBuilder::new("http://localhost:3000", "token").poll_interval(interval);
698 assert_eq!(builder.poll_interval, interval);
699 }
700
701 #[test]
702 fn builder_run_timeout_sets_timeout() {
703 let dur = Duration::from_secs(120);
704 let builder = WorkerBuilder::new("http://localhost:3000", "token").run_timeout(dur);
705 assert_eq!(builder.run_timeout, dur);
706 }
707
708 #[test]
709 fn builder_max_consecutive_panics_sets_value() {
710 let builder =
711 WorkerBuilder::new("http://localhost:3000", "token").max_consecutive_panics(10);
712 assert_eq!(builder.max_consecutive_panics, 10);
713 }
714
715 #[test]
716 fn builder_panic_cooldown_sets_value() {
717 let dur = Duration::from_secs(600);
718 let builder = WorkerBuilder::new("http://localhost:3000", "token").panic_cooldown(dur);
719 assert_eq!(builder.panic_cooldown, dur);
720 }
721
722 #[test]
723 fn builder_build_without_provider_fails() {
724 let builder = WorkerBuilder::new("http://localhost:3000", "token");
725 let result = builder.build();
726 assert!(result.is_err());
727 match result {
728 Err(WorkerError::Internal(msg)) => {
729 assert!(msg.contains("provider is required"));
730 }
731 _ => panic!("expected Internal error about missing provider"),
732 }
733 }
734
735 #[test]
736 fn builder_build_with_provider_succeeds() {
737 let provider = Arc::new(ClaudeCodeProvider::new());
738 let builder = WorkerBuilder::new("http://localhost:3000", "token").provider(provider);
739 let result = builder.build();
740 assert!(result.is_ok());
741 }
742
743 #[test]
744 fn builder_build_creates_worker_with_correct_concurrency() {
745 let provider = Arc::new(ClaudeCodeProvider::new());
746 let builder = WorkerBuilder::new("http://localhost:3000", "token")
747 .provider(provider)
748 .concurrency(16);
749 let worker = builder.build().unwrap();
750 assert_eq!(worker.concurrency, 16);
751 }
752
753 #[test]
754 fn builder_build_creates_worker_with_correct_interval() {
755 let provider = Arc::new(ClaudeCodeProvider::new());
756 let interval = Duration::from_secs(10);
757 let builder = WorkerBuilder::new("http://localhost:3000", "token")
758 .provider(provider)
759 .poll_interval(interval);
760 let worker = builder.build().unwrap();
761 assert_eq!(worker.poll_interval, interval);
762 }
763
764 #[test]
765 fn builder_build_preserves_timeout() {
766 let provider = Arc::new(ClaudeCodeProvider::new());
767 let dur = Duration::from_secs(300);
768 let worker = WorkerBuilder::new("http://localhost:3000", "token")
769 .provider(provider)
770 .run_timeout(dur)
771 .build()
772 .unwrap();
773 assert_eq!(worker.run_timeout, dur);
774 }
775
776 #[test]
777 fn builder_build_preserves_poison_pill_config() {
778 let provider = Arc::new(ClaudeCodeProvider::new());
779 let cooldown = Duration::from_secs(120);
780 let worker = WorkerBuilder::new("http://localhost:3000", "token")
781 .provider(provider)
782 .max_consecutive_panics(7)
783 .panic_cooldown(cooldown)
784 .build()
785 .unwrap();
786 assert_eq!(worker.max_consecutive_panics, 7);
787 assert_eq!(worker.panic_cooldown, cooldown);
788 }
789
790 #[test]
791 fn builder_chaining_works() {
792 let provider = Arc::new(ClaudeCodeProvider::new());
793 let result = WorkerBuilder::new("http://localhost:3000", "token")
794 .provider(provider)
795 .concurrency(4)
796 .poll_interval(Duration::from_secs(3))
797 .run_timeout(Duration::from_secs(600))
798 .max_consecutive_panics(5)
799 .panic_cooldown(Duration::from_secs(120))
800 .build();
801 assert!(result.is_ok());
802 let worker = result.unwrap();
803 assert_eq!(worker.concurrency, 4);
804 assert_eq!(worker.poll_interval, Duration::from_secs(3));
805 assert_eq!(worker.run_timeout, Duration::from_secs(600));
806 assert_eq!(worker.max_consecutive_panics, 5);
807 assert_eq!(worker.panic_cooldown, Duration::from_secs(120));
808 }
809
810 #[test]
811 fn builder_empty_api_url_accepted() {
812 let provider = Arc::new(ClaudeCodeProvider::new());
813 let builder = WorkerBuilder::new("", "token").provider(provider);
814 let result = builder.build();
815 assert!(result.is_ok());
816 }
817
818 #[test]
819 fn builder_empty_token_accepted() {
820 let provider = Arc::new(ClaudeCodeProvider::new());
821 let builder = WorkerBuilder::new("http://localhost:3000", "").provider(provider);
822 let result = builder.build();
823 assert!(result.is_ok());
824 }
825
826 #[cfg(feature = "heartbeat")]
827 #[test]
828 fn builder_heartbeat_defaults() {
829 let builder = WorkerBuilder::new("http://localhost:3000", "token");
830 assert!(builder.heartbeat_url.is_none());
831 assert_eq!(builder.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
832 }
833
834 #[cfg(feature = "heartbeat")]
835 #[test]
836 fn builder_heartbeat_url_sets_url() {
837 let builder = WorkerBuilder::new("http://localhost:3000", "token")
838 .heartbeat_url("https://uptime.betterstack.com/api/v1/heartbeat/abc");
839 assert_eq!(
840 builder.heartbeat_url.as_deref(),
841 Some("https://uptime.betterstack.com/api/v1/heartbeat/abc")
842 );
843 }
844
845 #[cfg(feature = "heartbeat")]
846 #[test]
847 fn builder_heartbeat_custom_interval() {
848 let interval = Duration::from_secs(10);
849 let builder =
850 WorkerBuilder::new("http://localhost:3000", "token").heartbeat_interval(interval);
851 assert_eq!(builder.heartbeat_interval, interval);
852 }
853
854 #[cfg(feature = "heartbeat")]
855 #[test]
856 fn builder_build_preserves_heartbeat_config() {
857 let provider = Arc::new(ClaudeCodeProvider::new());
858 let interval = Duration::from_secs(15);
859 let worker = WorkerBuilder::new("http://localhost:3000", "token")
860 .provider(provider)
861 .heartbeat_url("https://example.com/heartbeat")
862 .heartbeat_interval(interval)
863 .build()
864 .unwrap();
865 assert_eq!(
866 worker.heartbeat_url.as_deref(),
867 Some("https://example.com/heartbeat")
868 );
869 assert_eq!(worker.heartbeat_interval, interval);
870 }
871
872 #[cfg(feature = "heartbeat")]
873 #[test]
874 fn builder_build_without_heartbeat_url_has_none() {
875 let provider = Arc::new(ClaudeCodeProvider::new());
876 let worker = WorkerBuilder::new("http://localhost:3000", "token")
877 .provider(provider)
878 .build()
879 .unwrap();
880 assert!(worker.heartbeat_url.is_none());
881 }
882
883 #[test]
886 fn poison_tracker_not_blocked_initially() {
887 let tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
888 assert!(!tracker.is_blocked("my-workflow"));
889 }
890
891 #[test]
892 fn poison_tracker_blocked_after_max_panics() {
893 let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
894 assert!(!tracker.record_panic("wf"));
895 assert!(!tracker.record_panic("wf"));
896 assert!(tracker.record_panic("wf"));
897 assert!(tracker.is_blocked("wf"));
898 }
899
900 #[test]
901 fn poison_tracker_success_resets_count() {
902 let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
903 tracker.record_panic("wf");
904 tracker.record_panic("wf");
905 tracker.record_success("wf");
906 assert!(!tracker.is_blocked("wf"));
907 assert!(!tracker.record_panic("wf"));
909 }
910
911 #[test]
912 fn poison_tracker_independent_per_workflow() {
913 let mut tracker = PoisonPillTracker::new(2, Duration::from_secs(300));
914 tracker.record_panic("wf-a");
915 tracker.record_panic("wf-a");
916 assert!(tracker.is_blocked("wf-a"));
917 assert!(!tracker.is_blocked("wf-b"));
918 }
919
920 #[test]
921 fn poison_tracker_unblocks_after_cooldown() {
922 let mut tracker = PoisonPillTracker::new(2, Duration::from_millis(0));
923 tracker.record_panic("wf");
924 tracker.record_panic("wf");
925 assert!(!tracker.is_blocked("wf"));
927 }
928}