1use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7use tokio::spawn;
8use tokio::sync::{Semaphore, mpsc};
9use tokio::time::{sleep, timeout};
10use tokio_util::sync::CancellationToken;
11use tracing::{error, info, warn};
12
13#[cfg(feature = "prometheus")]
14use ironflow_core::metric_names::{WORKER_ACTIVE, WORKER_POLLS_TOTAL};
15use ironflow_core::provider::AgentProvider;
16use ironflow_engine::engine::Engine;
17use ironflow_engine::handler::WorkflowHandler;
18use ironflow_engine::log_sender::LogReceiver;
19use ironflow_store::entities::{RunStatus, RunUpdate};
20use ironflow_store::store::Store;
21#[cfg(feature = "prometheus")]
22use metrics::{counter, gauge};
23#[cfg(feature = "heartbeat")]
24use reqwest::Client;
25
26use crate::api_store::ApiRunStore;
27use crate::error::WorkerError;
28use crate::log_pusher::LogPusher;
29
30const DEFAULT_CONCURRENCY: usize = 2;
31const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(2);
32const DEFAULT_RUN_TIMEOUT: Duration = Duration::from_secs(30 * 60);
33const DEFAULT_MAX_CONSECUTIVE_PANICS: u32 = 3;
34const DEFAULT_PANIC_COOLDOWN: Duration = Duration::from_secs(5 * 60);
35#[cfg(feature = "heartbeat")]
36const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
37
38pub struct WorkerBuilder {
62 api_url: String,
63 worker_token: String,
64 provider: Option<Arc<dyn AgentProvider>>,
65 handlers: Vec<Box<dyn WorkflowHandler>>,
66 concurrency: usize,
67 poll_interval: Duration,
68 run_timeout: Duration,
69 max_consecutive_panics: u32,
70 panic_cooldown: Duration,
71 #[cfg(feature = "heartbeat")]
72 heartbeat_url: Option<String>,
73 #[cfg(feature = "heartbeat")]
74 heartbeat_interval: Duration,
75}
76
77impl WorkerBuilder {
78 pub fn new(api_url: &str, worker_token: &str) -> Self {
80 Self {
81 api_url: api_url.to_string(),
82 worker_token: worker_token.to_string(),
83 provider: None,
84 handlers: Vec::new(),
85 concurrency: DEFAULT_CONCURRENCY,
86 poll_interval: DEFAULT_POLL_INTERVAL,
87 run_timeout: DEFAULT_RUN_TIMEOUT,
88 max_consecutive_panics: DEFAULT_MAX_CONSECUTIVE_PANICS,
89 panic_cooldown: DEFAULT_PANIC_COOLDOWN,
90 #[cfg(feature = "heartbeat")]
91 heartbeat_url: None,
92 #[cfg(feature = "heartbeat")]
93 heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
94 }
95 }
96
97 pub fn provider(mut self, provider: Arc<dyn AgentProvider>) -> Self {
99 self.provider = Some(provider);
100 self
101 }
102
103 pub fn register(mut self, handler: impl WorkflowHandler + 'static) -> Self {
105 self.handlers.push(Box::new(handler));
106 self
107 }
108
109 pub fn concurrency(mut self, n: usize) -> Self {
111 self.concurrency = n;
112 self
113 }
114
115 pub fn poll_interval(mut self, interval: Duration) -> Self {
117 self.poll_interval = interval;
118 self
119 }
120
121 pub fn run_timeout(mut self, timeout: Duration) -> Self {
138 self.run_timeout = timeout;
139 self
140 }
141
142 pub fn max_consecutive_panics(mut self, n: u32) -> Self {
160 self.max_consecutive_panics = n;
161 self
162 }
163
164 pub fn panic_cooldown(mut self, cooldown: Duration) -> Self {
181 self.panic_cooldown = cooldown;
182 self
183 }
184
185 #[cfg(feature = "heartbeat")]
204 pub fn heartbeat_url(mut self, url: &str) -> Self {
205 self.heartbeat_url = Some(url.to_string());
206 self
207 }
208
209 #[cfg(feature = "heartbeat")]
227 pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
228 self.heartbeat_interval = interval;
229 self
230 }
231
232 pub fn build(self) -> Result<Worker, WorkerError> {
239 let provider = self
240 .provider
241 .ok_or_else(|| WorkerError::Internal("WorkerBuilder: provider is required".into()))?;
242
243 let store: Arc<dyn Store> = Arc::new(ApiRunStore::new(&self.api_url, &self.worker_token));
244
245 let mut engine = Engine::new(store, provider);
246 for handler in self.handlers {
247 engine
248 .register_boxed(handler)
249 .map_err(WorkerError::Engine)?;
250 }
251
252 let (log_sender, log_receiver) = ironflow_engine::log_sender::channel();
253 engine.set_log_sender(log_sender);
254
255 #[cfg(feature = "heartbeat")]
256 let heartbeat_client = Client::builder()
257 .timeout(Duration::from_secs(5))
258 .build()
259 .expect("failed to build heartbeat HTTP client");
260
261 Ok(Worker {
262 engine: Arc::new(engine),
263 api_url: self.api_url,
264 worker_token: self.worker_token,
265 log_receiver: Mutex::new(Some(log_receiver)),
266 concurrency: self.concurrency,
267 poll_interval: self.poll_interval,
268 run_timeout: self.run_timeout,
269 max_consecutive_panics: self.max_consecutive_panics,
270 panic_cooldown: self.panic_cooldown,
271 #[cfg(feature = "heartbeat")]
272 heartbeat_url: self.heartbeat_url,
273 #[cfg(feature = "heartbeat")]
274 heartbeat_interval: self.heartbeat_interval,
275 #[cfg(feature = "heartbeat")]
276 heartbeat_client,
277 })
278 }
279}
280
281pub struct Worker {
283 engine: Arc<Engine>,
284 api_url: String,
285 worker_token: String,
286 log_receiver: Mutex<Option<LogReceiver>>,
287 concurrency: usize,
288 poll_interval: Duration,
289 run_timeout: Duration,
290 max_consecutive_panics: u32,
291 panic_cooldown: Duration,
292 #[cfg(feature = "heartbeat")]
293 heartbeat_url: Option<String>,
294 #[cfg(feature = "heartbeat")]
295 heartbeat_interval: Duration,
296 #[cfg(feature = "heartbeat")]
297 heartbeat_client: Client,
298}
299
300struct PoisonPillTracker {
302 max_consecutive: u32,
303 cooldown: Duration,
304 state: HashMap<String, (u32, Instant)>,
306}
307
308impl PoisonPillTracker {
309 fn new(max_consecutive: u32, cooldown: Duration) -> Self {
310 Self {
311 max_consecutive,
312 cooldown,
313 state: HashMap::new(),
314 }
315 }
316
317 fn record_panic(&mut self, workflow: &str) -> bool {
320 let entry = self
321 .state
322 .entry(workflow.to_string())
323 .or_insert((0, Instant::now()));
324 entry.0 += 1;
325 entry.1 = Instant::now();
326 entry.0 >= self.max_consecutive
327 }
328
329 fn record_success(&mut self, workflow: &str) {
331 self.state.remove(workflow);
332 }
333
334 fn is_blocked(&self, workflow: &str) -> bool {
336 self.state.get(workflow).is_some_and(|(count, last_panic)| {
337 *count >= self.max_consecutive && last_panic.elapsed() < self.cooldown
338 })
339 }
340}
341
342impl Worker {
343 pub async fn run(&self) -> Result<(), WorkerError> {
352 let semaphore = Arc::new(Semaphore::new(self.concurrency));
353 let shutdown = CancellationToken::new();
354 let mut idle_streak = 0u32;
355 let poison_tracker = Arc::new(Mutex::new(PoisonPillTracker::new(
356 self.max_consecutive_panics,
357 self.panic_cooldown,
358 )));
359 let (outcome_tx, mut outcome_rx) = mpsc::unbounded_channel::<RunOutcome>();
360
361 info!(
362 concurrency = self.concurrency,
363 poll_interval_ms = self.poll_interval.as_millis() as u64,
364 run_timeout_secs = self.run_timeout.as_secs(),
365 "worker started"
366 );
367
368 if let Some(receiver) = self.log_receiver.lock().expect("log_receiver lock").take() {
369 let pusher = LogPusher::new(&self.api_url, &self.worker_token);
370 spawn(pusher.run(receiver));
371 info!("log pusher started");
372 }
373
374 let shutdown_clone = shutdown.clone();
376 spawn(async move {
377 shutdown_signal().await;
378 info!("shutdown signal received, draining in-flight runs...");
379 shutdown_clone.cancel();
380 });
381
382 #[cfg(feature = "heartbeat")]
383 if let Some(ref url) = self.heartbeat_url {
384 let interval = self.heartbeat_interval;
385 let url = url.clone();
386 let client = self.heartbeat_client.clone();
387
388 spawn(async move {
389 let mut ticker = tokio::time::interval(interval);
390 ticker.tick().await;
392 loop {
393 ticker.tick().await;
394 match client.head(&url).send().await {
395 Ok(resp) if resp.status().is_success() => {
396 info!(url = %url, "heartbeat sent");
397 }
398 Ok(resp) => {
399 warn!(
400 url = %url,
401 status = %resp.status(),
402 "heartbeat ping returned non-success status"
403 );
404 }
405 Err(err) => {
406 warn!(
407 url = %url,
408 error = %err,
409 "heartbeat ping failed"
410 );
411 }
412 }
413 }
414 });
415 }
416
417 while !shutdown.is_cancelled() {
418 while let Ok(outcome) = outcome_rx.try_recv() {
420 let mut tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
421 match outcome {
422 RunOutcome::Success(ref wf) => tracker.record_success(wf),
423 RunOutcome::Failed(ref wf) | RunOutcome::Timeout(ref wf) => {
424 if tracker.record_panic(wf) {
425 warn!(workflow = %wf, "workflow flagged as poison pill after consecutive failures");
426 }
427 }
428 RunOutcome::Panicked(ref wf) => {
429 if tracker.record_panic(wf) {
430 error!(workflow = %wf, "workflow flagged as poison pill after consecutive panics");
431 }
432 }
433 }
434 }
435
436 let run = self.engine.store().pick_next_pending().await;
437
438 match run {
439 Ok(Some(run)) => {
440 #[cfg(feature = "prometheus")]
441 counter!(WORKER_POLLS_TOTAL, "result" => "hit").increment(1);
442
443 let is_blocked = {
445 let tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
446 tracker.is_blocked(&run.workflow_name)
447 };
448 if is_blocked {
449 warn!(
450 workflow = %run.workflow_name,
451 run_id = %run.id,
452 "skipping run: workflow flagged as poison pill, marking as failed"
453 );
454 if let Err(e) = self
455 .engine
456 .store()
457 .update_run_status(run.id, RunStatus::Failed)
458 .await
459 {
460 error!(run_id = %run.id, error = %e, "failed to mark poisoned run as failed");
461 }
462 continue;
463 }
464
465 let permit = semaphore
466 .clone()
467 .acquire_owned()
468 .await
469 .map_err(|_| WorkerError::Internal("semaphore closed".to_string()))?;
470
471 idle_streak = 0;
472 let engine = self.engine.clone();
473 let run_id = run.id;
474 let workflow = run.workflow_name.clone();
475 let workflow_for_watcher = workflow.clone();
476 let run_timeout = self.run_timeout;
477
478 info!(run_id = %run_id, workflow = %workflow, "executing run");
479
480 #[cfg(feature = "prometheus")]
481 gauge!(WORKER_ACTIVE).increment(1.0);
482
483 let handle = spawn(async move {
484 let _permit = permit;
485 let result = timeout(run_timeout, engine.execute_handler_run(run_id)).await;
486
487 match result {
488 Ok(Ok(_)) => {
489 info!(run_id = %run_id, workflow = %workflow, "run completed");
490 RunOutcome::Success(workflow)
491 }
492 Ok(Err(e)) => {
493 error!(run_id = %run_id, workflow = %workflow, error = %e, "run failed");
494 if let Err(store_err) = engine
495 .store()
496 .update_run(
497 run_id,
498 RunUpdate {
499 status: Some(RunStatus::Failed),
500 error: Some(e.to_string()),
501 ..RunUpdate::default()
502 },
503 )
504 .await
505 {
506 error!(run_id = %run_id, error = %store_err, "failed to mark run as failed");
507 }
508 if let Err(cleanup_err) = engine
509 .fail_orphaned_steps(run_id, "parent run failed")
510 .await
511 {
512 error!(run_id = %run_id, error = %cleanup_err, "failed to cleanup orphaned steps");
513 }
514 RunOutcome::Failed(workflow)
515 }
516 Err(_) => {
517 error!(
518 run_id = %run_id,
519 workflow = %workflow,
520 timeout_secs = run_timeout.as_secs(),
521 "run timed out"
522 );
523 let timeout_msg =
524 format!("run timed out after {}s", run_timeout.as_secs());
525 if let Err(e) = engine
526 .store()
527 .update_run(
528 run_id,
529 RunUpdate {
530 status: Some(RunStatus::Failed),
531 error: Some(timeout_msg),
532 ..RunUpdate::default()
533 },
534 )
535 .await
536 {
537 error!(run_id = %run_id, error = %e, "failed to mark timed-out run as failed");
538 }
539 if let Err(e) = engine
540 .fail_orphaned_steps(run_id, "parent run timed out")
541 .await
542 {
543 error!(run_id = %run_id, error = %e, "failed to cleanup orphaned steps after timeout");
544 }
545 RunOutcome::Timeout(workflow)
546 }
547 }
548 });
549
550 let watcher_engine = self.engine.clone();
552 let tx = outcome_tx.clone();
553 spawn(async move {
554 match handle.await {
555 Ok(outcome) => {
556 let _ = tx.send(outcome);
557 }
558 Err(e) => {
559 error!(run_id = %run_id, "spawned task panicked: {e}");
560 if let Err(store_err) = watcher_engine
561 .store()
562 .update_run_status(run_id, RunStatus::Failed)
563 .await
564 {
565 error!(run_id = %run_id, error = %store_err, "failed to mark panicked run as failed");
566 }
567 if let Err(cleanup_err) = watcher_engine
568 .fail_orphaned_steps(run_id, "parent run panicked")
569 .await
570 {
571 error!(run_id = %run_id, error = %cleanup_err, "failed to cleanup orphaned steps after panic");
572 }
573 let _ = tx.send(RunOutcome::Panicked(workflow_for_watcher));
574 }
575 }
576 #[cfg(feature = "prometheus")]
577 gauge!(WORKER_ACTIVE).decrement(1.0);
578 });
579 }
580 Ok(None) => {
581 #[cfg(feature = "prometheus")]
582 counter!(WORKER_POLLS_TOTAL, "result" => "miss").increment(1);
583
584 idle_streak += 1;
585 let backoff = if idle_streak > 10 {
586 self.poll_interval * 3
587 } else if idle_streak > 5 {
588 self.poll_interval * 2
589 } else {
590 self.poll_interval
591 };
592 sleep(backoff).await;
593 }
594 Err(e) => {
595 warn!(error = %e, "poll error");
596 sleep(self.poll_interval).await;
597 }
598 }
599 }
600
601 info!(
603 in_flight = self.concurrency - semaphore.available_permits(),
604 "waiting for in-flight runs to complete..."
605 );
606 let _ = semaphore
607 .acquire_many(self.concurrency as u32)
608 .await
609 .map_err(|_| WorkerError::Shutdown("semaphore closed during drain".to_string()))?;
610
611 info!("all in-flight runs completed, worker shut down");
612 Ok(())
613 }
614}
615
616enum RunOutcome {
618 Success(String),
620 Failed(String),
622 Timeout(String),
624 Panicked(String),
626}
627
628async fn shutdown_signal() {
630 use tokio::signal;
631
632 let ctrl_c = async {
633 signal::ctrl_c()
634 .await
635 .expect("failed to install Ctrl+C handler");
636 };
637
638 #[cfg(unix)]
639 let terminate = async {
640 use tokio::signal::unix::{SignalKind, signal};
641
642 signal(SignalKind::terminate())
643 .expect("failed to install SIGTERM handler")
644 .recv()
645 .await;
646 };
647
648 #[cfg(not(unix))]
649 let terminate = {
650 use std::future::pending;
651 pending::<()>()
652 };
653
654 tokio::select! {
655 () = ctrl_c => {},
656 () = terminate => {},
657 }
658}
659
660#[cfg(test)]
661mod tests {
662 use super::*;
663 use ironflow_core::providers::claude::ClaudeCodeProvider;
664
665 #[test]
666 fn builder_new_creates_default_config() {
667 let builder = WorkerBuilder::new("http://localhost:3000", "my-token");
668 assert_eq!(builder.api_url, "http://localhost:3000");
669 assert_eq!(builder.worker_token, "my-token");
670 assert_eq!(builder.concurrency, DEFAULT_CONCURRENCY);
671 assert_eq!(builder.poll_interval, DEFAULT_POLL_INTERVAL);
672 assert_eq!(builder.run_timeout, DEFAULT_RUN_TIMEOUT);
673 assert_eq!(
674 builder.max_consecutive_panics,
675 DEFAULT_MAX_CONSECUTIVE_PANICS
676 );
677 assert_eq!(builder.panic_cooldown, DEFAULT_PANIC_COOLDOWN);
678 assert!(builder.provider.is_none());
679 }
680
681 #[test]
682 fn builder_with_trailing_slash_normalized() {
683 let builder = WorkerBuilder::new("http://localhost:3000/", "token");
684 assert_eq!(builder.api_url, "http://localhost:3000/");
685 }
686
687 #[test]
688 fn builder_provider_sets_provider() {
689 let provider = Arc::new(ClaudeCodeProvider::new());
690 let builder =
691 WorkerBuilder::new("http://localhost:3000", "token").provider(provider.clone());
692 assert!(builder.provider.is_some());
693 }
694
695 #[test]
696 fn builder_concurrency_sets_concurrency() {
697 let builder = WorkerBuilder::new("http://localhost:3000", "token").concurrency(8);
698 assert_eq!(builder.concurrency, 8);
699 }
700
701 #[test]
702 fn builder_concurrency_zero_accepted() {
703 let provider = Arc::new(ClaudeCodeProvider::new());
704 let builder = WorkerBuilder::new("http://localhost:3000", "token")
705 .provider(provider)
706 .concurrency(0);
707 assert_eq!(builder.concurrency, 0);
708 }
709
710 #[test]
711 fn builder_poll_interval_sets_interval() {
712 let interval = Duration::from_secs(5);
713 let builder = WorkerBuilder::new("http://localhost:3000", "token").poll_interval(interval);
714 assert_eq!(builder.poll_interval, interval);
715 }
716
717 #[test]
718 fn builder_run_timeout_sets_timeout() {
719 let dur = Duration::from_secs(120);
720 let builder = WorkerBuilder::new("http://localhost:3000", "token").run_timeout(dur);
721 assert_eq!(builder.run_timeout, dur);
722 }
723
724 #[test]
725 fn builder_max_consecutive_panics_sets_value() {
726 let builder =
727 WorkerBuilder::new("http://localhost:3000", "token").max_consecutive_panics(10);
728 assert_eq!(builder.max_consecutive_panics, 10);
729 }
730
731 #[test]
732 fn builder_panic_cooldown_sets_value() {
733 let dur = Duration::from_secs(600);
734 let builder = WorkerBuilder::new("http://localhost:3000", "token").panic_cooldown(dur);
735 assert_eq!(builder.panic_cooldown, dur);
736 }
737
738 #[test]
739 fn builder_build_without_provider_fails() {
740 let builder = WorkerBuilder::new("http://localhost:3000", "token");
741 let result = builder.build();
742 assert!(result.is_err());
743 match result {
744 Err(WorkerError::Internal(msg)) => {
745 assert!(msg.contains("provider is required"));
746 }
747 _ => panic!("expected Internal error about missing provider"),
748 }
749 }
750
751 #[test]
752 fn builder_build_with_provider_succeeds() {
753 let provider = Arc::new(ClaudeCodeProvider::new());
754 let builder = WorkerBuilder::new("http://localhost:3000", "token").provider(provider);
755 let result = builder.build();
756 assert!(result.is_ok());
757 }
758
759 #[test]
760 fn builder_build_creates_worker_with_correct_concurrency() {
761 let provider = Arc::new(ClaudeCodeProvider::new());
762 let builder = WorkerBuilder::new("http://localhost:3000", "token")
763 .provider(provider)
764 .concurrency(16);
765 let worker = builder.build().unwrap();
766 assert_eq!(worker.concurrency, 16);
767 }
768
769 #[test]
770 fn builder_build_creates_worker_with_correct_interval() {
771 let provider = Arc::new(ClaudeCodeProvider::new());
772 let interval = Duration::from_secs(10);
773 let builder = WorkerBuilder::new("http://localhost:3000", "token")
774 .provider(provider)
775 .poll_interval(interval);
776 let worker = builder.build().unwrap();
777 assert_eq!(worker.poll_interval, interval);
778 }
779
780 #[test]
781 fn builder_build_preserves_timeout() {
782 let provider = Arc::new(ClaudeCodeProvider::new());
783 let dur = Duration::from_secs(300);
784 let worker = WorkerBuilder::new("http://localhost:3000", "token")
785 .provider(provider)
786 .run_timeout(dur)
787 .build()
788 .unwrap();
789 assert_eq!(worker.run_timeout, dur);
790 }
791
792 #[test]
793 fn builder_build_preserves_poison_pill_config() {
794 let provider = Arc::new(ClaudeCodeProvider::new());
795 let cooldown = Duration::from_secs(120);
796 let worker = WorkerBuilder::new("http://localhost:3000", "token")
797 .provider(provider)
798 .max_consecutive_panics(7)
799 .panic_cooldown(cooldown)
800 .build()
801 .unwrap();
802 assert_eq!(worker.max_consecutive_panics, 7);
803 assert_eq!(worker.panic_cooldown, cooldown);
804 }
805
806 #[test]
807 fn builder_chaining_works() {
808 let provider = Arc::new(ClaudeCodeProvider::new());
809 let result = WorkerBuilder::new("http://localhost:3000", "token")
810 .provider(provider)
811 .concurrency(4)
812 .poll_interval(Duration::from_secs(3))
813 .run_timeout(Duration::from_secs(600))
814 .max_consecutive_panics(5)
815 .panic_cooldown(Duration::from_secs(120))
816 .build();
817 assert!(result.is_ok());
818 let worker = result.unwrap();
819 assert_eq!(worker.concurrency, 4);
820 assert_eq!(worker.poll_interval, Duration::from_secs(3));
821 assert_eq!(worker.run_timeout, Duration::from_secs(600));
822 assert_eq!(worker.max_consecutive_panics, 5);
823 assert_eq!(worker.panic_cooldown, Duration::from_secs(120));
824 }
825
826 #[test]
827 fn builder_empty_api_url_accepted() {
828 let provider = Arc::new(ClaudeCodeProvider::new());
829 let builder = WorkerBuilder::new("", "token").provider(provider);
830 let result = builder.build();
831 assert!(result.is_ok());
832 }
833
834 #[test]
835 fn builder_empty_token_accepted() {
836 let provider = Arc::new(ClaudeCodeProvider::new());
837 let builder = WorkerBuilder::new("http://localhost:3000", "").provider(provider);
838 let result = builder.build();
839 assert!(result.is_ok());
840 }
841
842 #[cfg(feature = "heartbeat")]
843 #[test]
844 fn builder_heartbeat_defaults() {
845 let builder = WorkerBuilder::new("http://localhost:3000", "token");
846 assert!(builder.heartbeat_url.is_none());
847 assert_eq!(builder.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
848 }
849
850 #[cfg(feature = "heartbeat")]
851 #[test]
852 fn builder_heartbeat_url_sets_url() {
853 let builder = WorkerBuilder::new("http://localhost:3000", "token")
854 .heartbeat_url("https://uptime.betterstack.com/api/v1/heartbeat/abc");
855 assert_eq!(
856 builder.heartbeat_url.as_deref(),
857 Some("https://uptime.betterstack.com/api/v1/heartbeat/abc")
858 );
859 }
860
861 #[cfg(feature = "heartbeat")]
862 #[test]
863 fn builder_heartbeat_custom_interval() {
864 let interval = Duration::from_secs(10);
865 let builder =
866 WorkerBuilder::new("http://localhost:3000", "token").heartbeat_interval(interval);
867 assert_eq!(builder.heartbeat_interval, interval);
868 }
869
870 #[cfg(feature = "heartbeat")]
871 #[test]
872 fn builder_build_preserves_heartbeat_config() {
873 let provider = Arc::new(ClaudeCodeProvider::new());
874 let interval = Duration::from_secs(15);
875 let worker = WorkerBuilder::new("http://localhost:3000", "token")
876 .provider(provider)
877 .heartbeat_url("https://example.com/heartbeat")
878 .heartbeat_interval(interval)
879 .build()
880 .unwrap();
881 assert_eq!(
882 worker.heartbeat_url.as_deref(),
883 Some("https://example.com/heartbeat")
884 );
885 assert_eq!(worker.heartbeat_interval, interval);
886 }
887
888 #[cfg(feature = "heartbeat")]
889 #[test]
890 fn builder_build_without_heartbeat_url_has_none() {
891 let provider = Arc::new(ClaudeCodeProvider::new());
892 let worker = WorkerBuilder::new("http://localhost:3000", "token")
893 .provider(provider)
894 .build()
895 .unwrap();
896 assert!(worker.heartbeat_url.is_none());
897 }
898
899 #[test]
902 fn poison_tracker_not_blocked_initially() {
903 let tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
904 assert!(!tracker.is_blocked("my-workflow"));
905 }
906
907 #[test]
908 fn poison_tracker_blocked_after_max_panics() {
909 let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
910 assert!(!tracker.record_panic("wf"));
911 assert!(!tracker.record_panic("wf"));
912 assert!(tracker.record_panic("wf"));
913 assert!(tracker.is_blocked("wf"));
914 }
915
916 #[test]
917 fn poison_tracker_success_resets_count() {
918 let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
919 tracker.record_panic("wf");
920 tracker.record_panic("wf");
921 tracker.record_success("wf");
922 assert!(!tracker.is_blocked("wf"));
923 assert!(!tracker.record_panic("wf"));
925 }
926
927 #[test]
928 fn poison_tracker_independent_per_workflow() {
929 let mut tracker = PoisonPillTracker::new(2, Duration::from_secs(300));
930 tracker.record_panic("wf-a");
931 tracker.record_panic("wf-a");
932 assert!(tracker.is_blocked("wf-a"));
933 assert!(!tracker.is_blocked("wf-b"));
934 }
935
936 #[test]
937 fn poison_tracker_unblocks_after_cooldown() {
938 let mut tracker = PoisonPillTracker::new(2, Duration::from_millis(0));
939 tracker.record_panic("wf");
940 tracker.record_panic("wf");
941 assert!(!tracker.is_blocked("wf"));
943 }
944}