Skip to main content

ironflow_worker/
worker.rs

1//! Worker -- polls the API for pending runs and executes them.
2
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7use tokio::spawn;
8use tokio::sync::{Semaphore, mpsc};
9use tokio::time::{sleep, timeout};
10use tokio_util::sync::CancellationToken;
11use tracing::{error, info, warn};
12
13#[cfg(feature = "prometheus")]
14use ironflow_core::metric_names::{WORKER_ACTIVE, WORKER_POLLS_TOTAL};
15use ironflow_core::provider::AgentProvider;
16use ironflow_engine::engine::Engine;
17use ironflow_engine::handler::WorkflowHandler;
18use ironflow_engine::log_sender::LogReceiver;
19use ironflow_store::entities::RunStatus;
20use ironflow_store::store::Store;
21#[cfg(feature = "prometheus")]
22use metrics::{counter, gauge};
23#[cfg(feature = "heartbeat")]
24use reqwest::Client;
25
26use crate::api_store::ApiRunStore;
27use crate::error::WorkerError;
28use crate::log_pusher::LogPusher;
29
30const DEFAULT_CONCURRENCY: usize = 2;
31const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(2);
32const DEFAULT_RUN_TIMEOUT: Duration = Duration::from_secs(30 * 60);
33const DEFAULT_MAX_CONSECUTIVE_PANICS: u32 = 3;
34const DEFAULT_PANIC_COOLDOWN: Duration = Duration::from_secs(5 * 60);
35#[cfg(feature = "heartbeat")]
36const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
37
38/// Builder for configuring and creating a [`Worker`].
39///
40/// # Examples
41///
42/// ```no_run
43/// use std::sync::Arc;
44/// use std::time::Duration;
45/// use ironflow_worker::WorkerBuilder;
46/// use ironflow_core::providers::claude::ClaudeCodeProvider;
47///
48/// # async fn example() -> Result<(), ironflow_worker::WorkerError> {
49/// let worker = WorkerBuilder::new("http://localhost:3000", "my-token")
50///     .provider(Arc::new(ClaudeCodeProvider::new()))
51///     .concurrency(4)
52///     .poll_interval(Duration::from_secs(2))
53///     .run_timeout(Duration::from_secs(600))
54///     .max_consecutive_panics(5)
55///     .build()?;
56///
57/// worker.run().await?;
58/// # Ok(())
59/// # }
60/// ```
61pub struct WorkerBuilder {
62    api_url: String,
63    worker_token: String,
64    provider: Option<Arc<dyn AgentProvider>>,
65    handlers: Vec<Box<dyn WorkflowHandler>>,
66    concurrency: usize,
67    poll_interval: Duration,
68    run_timeout: Duration,
69    max_consecutive_panics: u32,
70    panic_cooldown: Duration,
71    #[cfg(feature = "heartbeat")]
72    heartbeat_url: Option<String>,
73    #[cfg(feature = "heartbeat")]
74    heartbeat_interval: Duration,
75}
76
77impl WorkerBuilder {
78    /// Create a new builder targeting the given API server.
79    pub fn new(api_url: &str, worker_token: &str) -> Self {
80        Self {
81            api_url: api_url.to_string(),
82            worker_token: worker_token.to_string(),
83            provider: None,
84            handlers: Vec::new(),
85            concurrency: DEFAULT_CONCURRENCY,
86            poll_interval: DEFAULT_POLL_INTERVAL,
87            run_timeout: DEFAULT_RUN_TIMEOUT,
88            max_consecutive_panics: DEFAULT_MAX_CONSECUTIVE_PANICS,
89            panic_cooldown: DEFAULT_PANIC_COOLDOWN,
90            #[cfg(feature = "heartbeat")]
91            heartbeat_url: None,
92            #[cfg(feature = "heartbeat")]
93            heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
94        }
95    }
96
97    /// Set the agent provider for AI operations.
98    pub fn provider(mut self, provider: Arc<dyn AgentProvider>) -> Self {
99        self.provider = Some(provider);
100        self
101    }
102
103    /// Register a workflow handler.
104    pub fn register(mut self, handler: impl WorkflowHandler + 'static) -> Self {
105        self.handlers.push(Box::new(handler));
106        self
107    }
108
109    /// Set the maximum number of concurrent workflow executions.
110    pub fn concurrency(mut self, n: usize) -> Self {
111        self.concurrency = n;
112        self
113    }
114
115    /// Set the interval between polls for new runs.
116    pub fn poll_interval(mut self, interval: Duration) -> Self {
117        self.poll_interval = interval;
118        self
119    }
120
121    /// Set the maximum execution time per run.
122    ///
123    /// If a run exceeds this duration, it is cancelled and marked as `Failed`
124    /// with a timeout error. Defaults to 30 minutes.
125    ///
126    /// # Examples
127    ///
128    /// ```no_run
129    /// use std::time::Duration;
130    /// use ironflow_worker::WorkerBuilder;
131    ///
132    /// # fn example() {
133    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
134    ///     .run_timeout(Duration::from_secs(600));
135    /// # }
136    /// ```
137    pub fn run_timeout(mut self, timeout: Duration) -> Self {
138        self.run_timeout = timeout;
139        self
140    }
141
142    /// Set the maximum number of consecutive panics per workflow before
143    /// the worker stops picking runs for that workflow (poison pill guard).
144    ///
145    /// When a workflow panics `max_consecutive_panics` times in a row without
146    /// a single success, the worker skips it for a cooldown period (see
147    /// [`panic_cooldown`](Self::panic_cooldown)). Defaults to 3.
148    ///
149    /// # Examples
150    ///
151    /// ```no_run
152    /// use ironflow_worker::WorkerBuilder;
153    ///
154    /// # fn example() {
155    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
156    ///     .max_consecutive_panics(5);
157    /// # }
158    /// ```
159    pub fn max_consecutive_panics(mut self, n: u32) -> Self {
160        self.max_consecutive_panics = n;
161        self
162    }
163
164    /// Set the cooldown duration after a workflow is flagged as a poison pill.
165    ///
166    /// After `max_consecutive_panics` is reached, runs for that workflow are
167    /// skipped until this duration elapses. Defaults to 5 minutes.
168    ///
169    /// # Examples
170    ///
171    /// ```no_run
172    /// use std::time::Duration;
173    /// use ironflow_worker::WorkerBuilder;
174    ///
175    /// # fn example() {
176    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
177    ///     .panic_cooldown(Duration::from_secs(600));
178    /// # }
179    /// ```
180    pub fn panic_cooldown(mut self, cooldown: Duration) -> Self {
181        self.panic_cooldown = cooldown;
182        self
183    }
184
185    /// Set the heartbeat URL (dead man's switch).
186    ///
187    /// The worker pings this URL at every heartbeat interval with an HTTP
188    /// HEAD request. Compatible with BetterStack Heartbeats, Cronitor,
189    /// Healthchecks.io, or any dead man's switch service.
190    ///
191    /// If not set, no heartbeat is emitted even when the feature is enabled.
192    ///
193    /// # Examples
194    ///
195    /// ```no_run
196    /// use ironflow_worker::WorkerBuilder;
197    ///
198    /// # fn example() {
199    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
200    ///     .heartbeat_url("https://uptime.betterstack.com/api/v1/heartbeat/abc123");
201    /// # }
202    /// ```
203    #[cfg(feature = "heartbeat")]
204    pub fn heartbeat_url(mut self, url: &str) -> Self {
205        self.heartbeat_url = Some(url.to_string());
206        self
207    }
208
209    /// Set the heartbeat interval.
210    ///
211    /// Controls how often the worker pings the [`heartbeat_url`](Self::heartbeat_url).
212    /// Defaults to 30 seconds.
213    ///
214    /// # Examples
215    ///
216    /// ```no_run
217    /// use std::time::Duration;
218    /// use ironflow_worker::WorkerBuilder;
219    ///
220    /// # fn example() {
221    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
222    ///     .heartbeat_url("https://uptime.betterstack.com/api/v1/heartbeat/abc123")
223    ///     .heartbeat_interval(Duration::from_secs(60));
224    /// # }
225    /// ```
226    #[cfg(feature = "heartbeat")]
227    pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
228        self.heartbeat_interval = interval;
229        self
230    }
231
232    /// Build the worker.
233    ///
234    /// # Errors
235    ///
236    /// Returns [`WorkerError::Internal`] if no provider has been set.
237    /// Returns [`WorkerError::Engine`] if a handler registration fails.
238    pub fn build(self) -> Result<Worker, WorkerError> {
239        let provider = self
240            .provider
241            .ok_or_else(|| WorkerError::Internal("WorkerBuilder: provider is required".into()))?;
242
243        let store: Arc<dyn Store> = Arc::new(ApiRunStore::new(&self.api_url, &self.worker_token));
244
245        let mut engine = Engine::new(store, provider);
246        for handler in self.handlers {
247            engine
248                .register_boxed(handler)
249                .map_err(WorkerError::Engine)?;
250        }
251
252        let (log_sender, log_receiver) = ironflow_engine::log_sender::channel();
253        engine.set_log_sender(log_sender);
254
255        #[cfg(feature = "heartbeat")]
256        let heartbeat_client = Client::builder()
257            .timeout(Duration::from_secs(5))
258            .build()
259            .expect("failed to build heartbeat HTTP client");
260
261        Ok(Worker {
262            engine: Arc::new(engine),
263            api_url: self.api_url,
264            worker_token: self.worker_token,
265            log_receiver: Mutex::new(Some(log_receiver)),
266            concurrency: self.concurrency,
267            poll_interval: self.poll_interval,
268            run_timeout: self.run_timeout,
269            max_consecutive_panics: self.max_consecutive_panics,
270            panic_cooldown: self.panic_cooldown,
271            #[cfg(feature = "heartbeat")]
272            heartbeat_url: self.heartbeat_url,
273            #[cfg(feature = "heartbeat")]
274            heartbeat_interval: self.heartbeat_interval,
275            #[cfg(feature = "heartbeat")]
276            heartbeat_client,
277        })
278    }
279}
280
281/// Background worker that polls the API and executes workflows.
282pub struct Worker {
283    engine: Arc<Engine>,
284    api_url: String,
285    worker_token: String,
286    log_receiver: Mutex<Option<LogReceiver>>,
287    concurrency: usize,
288    poll_interval: Duration,
289    run_timeout: Duration,
290    max_consecutive_panics: u32,
291    panic_cooldown: Duration,
292    #[cfg(feature = "heartbeat")]
293    heartbeat_url: Option<String>,
294    #[cfg(feature = "heartbeat")]
295    heartbeat_interval: Duration,
296    #[cfg(feature = "heartbeat")]
297    heartbeat_client: Client,
298}
299
300/// Tracks consecutive failures per workflow for poison pill detection.
301struct PoisonPillTracker {
302    max_consecutive: u32,
303    cooldown: Duration,
304    /// Maps workflow name to (consecutive panic count, last panic time).
305    state: HashMap<String, (u32, Instant)>,
306}
307
308impl PoisonPillTracker {
309    fn new(max_consecutive: u32, cooldown: Duration) -> Self {
310        Self {
311            max_consecutive,
312            cooldown,
313            state: HashMap::new(),
314        }
315    }
316
317    /// Record a panic for a workflow. Returns `true` if the workflow is now
318    /// considered a poison pill.
319    fn record_panic(&mut self, workflow: &str) -> bool {
320        let entry = self
321            .state
322            .entry(workflow.to_string())
323            .or_insert((0, Instant::now()));
324        entry.0 += 1;
325        entry.1 = Instant::now();
326        entry.0 >= self.max_consecutive
327    }
328
329    /// Record a successful execution, resetting the panic counter.
330    fn record_success(&mut self, workflow: &str) {
331        self.state.remove(workflow);
332    }
333
334    /// Returns `true` if the workflow is currently blocked as a poison pill.
335    fn is_blocked(&self, workflow: &str) -> bool {
336        self.state.get(workflow).is_some_and(|(count, last_panic)| {
337            *count >= self.max_consecutive && last_panic.elapsed() < self.cooldown
338        })
339    }
340}
341
342impl Worker {
343    /// Run the worker loop until a shutdown signal (SIGTERM/SIGINT) is received.
344    ///
345    /// On shutdown, the worker stops picking new runs and waits for all
346    /// in-flight executions to complete before returning.
347    ///
348    /// # Errors
349    ///
350    /// Returns [`WorkerError`] if the polling loop encounters an unrecoverable error.
351    pub async fn run(&self) -> Result<(), WorkerError> {
352        let semaphore = Arc::new(Semaphore::new(self.concurrency));
353        let shutdown = CancellationToken::new();
354        let mut idle_streak = 0u32;
355        let poison_tracker = Arc::new(Mutex::new(PoisonPillTracker::new(
356            self.max_consecutive_panics,
357            self.panic_cooldown,
358        )));
359        let (outcome_tx, mut outcome_rx) = mpsc::unbounded_channel::<RunOutcome>();
360
361        info!(
362            concurrency = self.concurrency,
363            poll_interval_ms = self.poll_interval.as_millis() as u64,
364            run_timeout_secs = self.run_timeout.as_secs(),
365            "worker started"
366        );
367
368        if let Some(receiver) = self.log_receiver.lock().expect("log_receiver lock").take() {
369            let pusher = LogPusher::new(&self.api_url, &self.worker_token);
370            spawn(pusher.run(receiver));
371            info!("log pusher started");
372        }
373
374        // Spawn shutdown signal handler
375        let shutdown_clone = shutdown.clone();
376        spawn(async move {
377            shutdown_signal().await;
378            info!("shutdown signal received, draining in-flight runs...");
379            shutdown_clone.cancel();
380        });
381
382        #[cfg(feature = "heartbeat")]
383        if let Some(ref url) = self.heartbeat_url {
384            let interval = self.heartbeat_interval;
385            let url = url.clone();
386            let client = self.heartbeat_client.clone();
387
388            spawn(async move {
389                let mut ticker = tokio::time::interval(interval);
390                // skip the first immediate tick
391                ticker.tick().await;
392                loop {
393                    ticker.tick().await;
394                    match client.head(&url).send().await {
395                        Ok(resp) if resp.status().is_success() => {
396                            info!(url = %url, "heartbeat sent");
397                        }
398                        Ok(resp) => {
399                            warn!(
400                                url = %url,
401                                status = %resp.status(),
402                                "heartbeat ping returned non-success status"
403                            );
404                        }
405                        Err(err) => {
406                            warn!(
407                                url = %url,
408                                error = %err,
409                                "heartbeat ping failed"
410                            );
411                        }
412                    }
413                }
414            });
415        }
416
417        while !shutdown.is_cancelled() {
418            // Drain outcome channel to update poison pill tracker
419            while let Ok(outcome) = outcome_rx.try_recv() {
420                let mut tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
421                match outcome {
422                    RunOutcome::Success(ref wf) => tracker.record_success(wf),
423                    RunOutcome::Failed(ref wf) | RunOutcome::Timeout(ref wf) => {
424                        if tracker.record_panic(wf) {
425                            warn!(workflow = %wf, "workflow flagged as poison pill after consecutive failures");
426                        }
427                    }
428                    RunOutcome::Panicked(ref wf) => {
429                        if tracker.record_panic(wf) {
430                            error!(workflow = %wf, "workflow flagged as poison pill after consecutive panics");
431                        }
432                    }
433                }
434            }
435
436            let run = self.engine.store().pick_next_pending().await;
437
438            match run {
439                Ok(Some(run)) => {
440                    #[cfg(feature = "prometheus")]
441                    counter!(WORKER_POLLS_TOTAL, "result" => "hit").increment(1);
442
443                    // Poison pill check: skip workflows that keep failing
444                    let is_blocked = {
445                        let tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
446                        tracker.is_blocked(&run.workflow_name)
447                    };
448                    if is_blocked {
449                        warn!(
450                            workflow = %run.workflow_name,
451                            run_id = %run.id,
452                            "skipping run: workflow flagged as poison pill, marking as failed"
453                        );
454                        if let Err(e) = self
455                            .engine
456                            .store()
457                            .update_run_status(run.id, RunStatus::Failed)
458                            .await
459                        {
460                            error!(run_id = %run.id, error = %e, "failed to mark poisoned run as failed");
461                        }
462                        continue;
463                    }
464
465                    let permit = semaphore
466                        .clone()
467                        .acquire_owned()
468                        .await
469                        .map_err(|_| WorkerError::Internal("semaphore closed".to_string()))?;
470
471                    idle_streak = 0;
472                    let engine = self.engine.clone();
473                    let run_id = run.id;
474                    let workflow = run.workflow_name.clone();
475                    let workflow_for_watcher = workflow.clone();
476                    let run_timeout = self.run_timeout;
477
478                    info!(run_id = %run_id, workflow = %workflow, "executing run");
479
480                    #[cfg(feature = "prometheus")]
481                    gauge!(WORKER_ACTIVE).increment(1.0);
482
483                    let handle = spawn(async move {
484                        let _permit = permit;
485                        let result = timeout(run_timeout, engine.execute_handler_run(run_id)).await;
486
487                        match result {
488                            Ok(Ok(_)) => {
489                                info!(run_id = %run_id, workflow = %workflow, "run completed");
490                                RunOutcome::Success(workflow)
491                            }
492                            Ok(Err(e)) => {
493                                error!(run_id = %run_id, workflow = %workflow, error = %e, "run failed");
494                                if let Err(store_err) = engine
495                                    .store()
496                                    .update_run_status(run_id, RunStatus::Failed)
497                                    .await
498                                {
499                                    error!(run_id = %run_id, error = %store_err, "failed to mark run as failed");
500                                }
501                                if let Err(cleanup_err) = engine
502                                    .fail_orphaned_steps(run_id, "parent run failed")
503                                    .await
504                                {
505                                    error!(run_id = %run_id, error = %cleanup_err, "failed to cleanup orphaned steps");
506                                }
507                                RunOutcome::Failed(workflow)
508                            }
509                            Err(_) => {
510                                error!(
511                                    run_id = %run_id,
512                                    workflow = %workflow,
513                                    timeout_secs = run_timeout.as_secs(),
514                                    "run timed out"
515                                );
516                                if let Err(e) = engine
517                                    .store()
518                                    .update_run_status(run_id, RunStatus::Failed)
519                                    .await
520                                {
521                                    error!(run_id = %run_id, error = %e, "failed to mark timed-out run as failed");
522                                }
523                                if let Err(e) = engine
524                                    .fail_orphaned_steps(run_id, "parent run timed out")
525                                    .await
526                                {
527                                    error!(run_id = %run_id, error = %e, "failed to cleanup orphaned steps after timeout");
528                                }
529                                RunOutcome::Timeout(workflow)
530                            }
531                        }
532                    });
533
534                    // Spawn a watcher to catch panics and report outcomes
535                    let watcher_engine = self.engine.clone();
536                    let tx = outcome_tx.clone();
537                    spawn(async move {
538                        match handle.await {
539                            Ok(outcome) => {
540                                let _ = tx.send(outcome);
541                            }
542                            Err(e) => {
543                                error!(run_id = %run_id, "spawned task panicked: {e}");
544                                if let Err(store_err) = watcher_engine
545                                    .store()
546                                    .update_run_status(run_id, RunStatus::Failed)
547                                    .await
548                                {
549                                    error!(run_id = %run_id, error = %store_err, "failed to mark panicked run as failed");
550                                }
551                                if let Err(cleanup_err) = watcher_engine
552                                    .fail_orphaned_steps(run_id, "parent run panicked")
553                                    .await
554                                {
555                                    error!(run_id = %run_id, error = %cleanup_err, "failed to cleanup orphaned steps after panic");
556                                }
557                                let _ = tx.send(RunOutcome::Panicked(workflow_for_watcher));
558                            }
559                        }
560                        #[cfg(feature = "prometheus")]
561                        gauge!(WORKER_ACTIVE).decrement(1.0);
562                    });
563                }
564                Ok(None) => {
565                    #[cfg(feature = "prometheus")]
566                    counter!(WORKER_POLLS_TOTAL, "result" => "miss").increment(1);
567
568                    idle_streak += 1;
569                    let backoff = if idle_streak > 10 {
570                        self.poll_interval * 3
571                    } else if idle_streak > 5 {
572                        self.poll_interval * 2
573                    } else {
574                        self.poll_interval
575                    };
576                    sleep(backoff).await;
577                }
578                Err(e) => {
579                    warn!(error = %e, "poll error");
580                    sleep(self.poll_interval).await;
581                }
582            }
583        }
584
585        // Graceful drain: wait for all in-flight tasks to release their permits
586        info!(
587            in_flight = self.concurrency - semaphore.available_permits(),
588            "waiting for in-flight runs to complete..."
589        );
590        let _ = semaphore
591            .acquire_many(self.concurrency as u32)
592            .await
593            .map_err(|_| WorkerError::Shutdown("semaphore closed during drain".to_string()))?;
594
595        info!("all in-flight runs completed, worker shut down");
596        Ok(())
597    }
598}
599
600/// Outcome of a single run execution, used for poison pill tracking.
601enum RunOutcome {
602    /// Run completed successfully.
603    Success(String),
604    /// Run failed with an error (engine returned Err).
605    Failed(String),
606    /// Run exceeded its timeout.
607    Timeout(String),
608    /// Run panicked (task JoinError).
609    Panicked(String),
610}
611
612/// Wait for SIGTERM or SIGINT (Ctrl+C).
613async fn shutdown_signal() {
614    use tokio::signal;
615
616    let ctrl_c = async {
617        signal::ctrl_c()
618            .await
619            .expect("failed to install Ctrl+C handler");
620    };
621
622    #[cfg(unix)]
623    let terminate = async {
624        use tokio::signal::unix::{SignalKind, signal};
625
626        signal(SignalKind::terminate())
627            .expect("failed to install SIGTERM handler")
628            .recv()
629            .await;
630    };
631
632    #[cfg(not(unix))]
633    let terminate = {
634        use std::future::pending;
635        pending::<()>()
636    };
637
638    tokio::select! {
639        () = ctrl_c => {},
640        () = terminate => {},
641    }
642}
643
644#[cfg(test)]
645mod tests {
646    use super::*;
647    use ironflow_core::providers::claude::ClaudeCodeProvider;
648
649    #[test]
650    fn builder_new_creates_default_config() {
651        let builder = WorkerBuilder::new("http://localhost:3000", "my-token");
652        assert_eq!(builder.api_url, "http://localhost:3000");
653        assert_eq!(builder.worker_token, "my-token");
654        assert_eq!(builder.concurrency, DEFAULT_CONCURRENCY);
655        assert_eq!(builder.poll_interval, DEFAULT_POLL_INTERVAL);
656        assert_eq!(builder.run_timeout, DEFAULT_RUN_TIMEOUT);
657        assert_eq!(
658            builder.max_consecutive_panics,
659            DEFAULT_MAX_CONSECUTIVE_PANICS
660        );
661        assert_eq!(builder.panic_cooldown, DEFAULT_PANIC_COOLDOWN);
662        assert!(builder.provider.is_none());
663    }
664
665    #[test]
666    fn builder_with_trailing_slash_normalized() {
667        let builder = WorkerBuilder::new("http://localhost:3000/", "token");
668        assert_eq!(builder.api_url, "http://localhost:3000/");
669    }
670
671    #[test]
672    fn builder_provider_sets_provider() {
673        let provider = Arc::new(ClaudeCodeProvider::new());
674        let builder =
675            WorkerBuilder::new("http://localhost:3000", "token").provider(provider.clone());
676        assert!(builder.provider.is_some());
677    }
678
679    #[test]
680    fn builder_concurrency_sets_concurrency() {
681        let builder = WorkerBuilder::new("http://localhost:3000", "token").concurrency(8);
682        assert_eq!(builder.concurrency, 8);
683    }
684
685    #[test]
686    fn builder_concurrency_zero_accepted() {
687        let provider = Arc::new(ClaudeCodeProvider::new());
688        let builder = WorkerBuilder::new("http://localhost:3000", "token")
689            .provider(provider)
690            .concurrency(0);
691        assert_eq!(builder.concurrency, 0);
692    }
693
694    #[test]
695    fn builder_poll_interval_sets_interval() {
696        let interval = Duration::from_secs(5);
697        let builder = WorkerBuilder::new("http://localhost:3000", "token").poll_interval(interval);
698        assert_eq!(builder.poll_interval, interval);
699    }
700
701    #[test]
702    fn builder_run_timeout_sets_timeout() {
703        let dur = Duration::from_secs(120);
704        let builder = WorkerBuilder::new("http://localhost:3000", "token").run_timeout(dur);
705        assert_eq!(builder.run_timeout, dur);
706    }
707
708    #[test]
709    fn builder_max_consecutive_panics_sets_value() {
710        let builder =
711            WorkerBuilder::new("http://localhost:3000", "token").max_consecutive_panics(10);
712        assert_eq!(builder.max_consecutive_panics, 10);
713    }
714
715    #[test]
716    fn builder_panic_cooldown_sets_value() {
717        let dur = Duration::from_secs(600);
718        let builder = WorkerBuilder::new("http://localhost:3000", "token").panic_cooldown(dur);
719        assert_eq!(builder.panic_cooldown, dur);
720    }
721
722    #[test]
723    fn builder_build_without_provider_fails() {
724        let builder = WorkerBuilder::new("http://localhost:3000", "token");
725        let result = builder.build();
726        assert!(result.is_err());
727        match result {
728            Err(WorkerError::Internal(msg)) => {
729                assert!(msg.contains("provider is required"));
730            }
731            _ => panic!("expected Internal error about missing provider"),
732        }
733    }
734
735    #[test]
736    fn builder_build_with_provider_succeeds() {
737        let provider = Arc::new(ClaudeCodeProvider::new());
738        let builder = WorkerBuilder::new("http://localhost:3000", "token").provider(provider);
739        let result = builder.build();
740        assert!(result.is_ok());
741    }
742
743    #[test]
744    fn builder_build_creates_worker_with_correct_concurrency() {
745        let provider = Arc::new(ClaudeCodeProvider::new());
746        let builder = WorkerBuilder::new("http://localhost:3000", "token")
747            .provider(provider)
748            .concurrency(16);
749        let worker = builder.build().unwrap();
750        assert_eq!(worker.concurrency, 16);
751    }
752
753    #[test]
754    fn builder_build_creates_worker_with_correct_interval() {
755        let provider = Arc::new(ClaudeCodeProvider::new());
756        let interval = Duration::from_secs(10);
757        let builder = WorkerBuilder::new("http://localhost:3000", "token")
758            .provider(provider)
759            .poll_interval(interval);
760        let worker = builder.build().unwrap();
761        assert_eq!(worker.poll_interval, interval);
762    }
763
764    #[test]
765    fn builder_build_preserves_timeout() {
766        let provider = Arc::new(ClaudeCodeProvider::new());
767        let dur = Duration::from_secs(300);
768        let worker = WorkerBuilder::new("http://localhost:3000", "token")
769            .provider(provider)
770            .run_timeout(dur)
771            .build()
772            .unwrap();
773        assert_eq!(worker.run_timeout, dur);
774    }
775
776    #[test]
777    fn builder_build_preserves_poison_pill_config() {
778        let provider = Arc::new(ClaudeCodeProvider::new());
779        let cooldown = Duration::from_secs(120);
780        let worker = WorkerBuilder::new("http://localhost:3000", "token")
781            .provider(provider)
782            .max_consecutive_panics(7)
783            .panic_cooldown(cooldown)
784            .build()
785            .unwrap();
786        assert_eq!(worker.max_consecutive_panics, 7);
787        assert_eq!(worker.panic_cooldown, cooldown);
788    }
789
790    #[test]
791    fn builder_chaining_works() {
792        let provider = Arc::new(ClaudeCodeProvider::new());
793        let result = WorkerBuilder::new("http://localhost:3000", "token")
794            .provider(provider)
795            .concurrency(4)
796            .poll_interval(Duration::from_secs(3))
797            .run_timeout(Duration::from_secs(600))
798            .max_consecutive_panics(5)
799            .panic_cooldown(Duration::from_secs(120))
800            .build();
801        assert!(result.is_ok());
802        let worker = result.unwrap();
803        assert_eq!(worker.concurrency, 4);
804        assert_eq!(worker.poll_interval, Duration::from_secs(3));
805        assert_eq!(worker.run_timeout, Duration::from_secs(600));
806        assert_eq!(worker.max_consecutive_panics, 5);
807        assert_eq!(worker.panic_cooldown, Duration::from_secs(120));
808    }
809
810    #[test]
811    fn builder_empty_api_url_accepted() {
812        let provider = Arc::new(ClaudeCodeProvider::new());
813        let builder = WorkerBuilder::new("", "token").provider(provider);
814        let result = builder.build();
815        assert!(result.is_ok());
816    }
817
818    #[test]
819    fn builder_empty_token_accepted() {
820        let provider = Arc::new(ClaudeCodeProvider::new());
821        let builder = WorkerBuilder::new("http://localhost:3000", "").provider(provider);
822        let result = builder.build();
823        assert!(result.is_ok());
824    }
825
826    #[cfg(feature = "heartbeat")]
827    #[test]
828    fn builder_heartbeat_defaults() {
829        let builder = WorkerBuilder::new("http://localhost:3000", "token");
830        assert!(builder.heartbeat_url.is_none());
831        assert_eq!(builder.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
832    }
833
834    #[cfg(feature = "heartbeat")]
835    #[test]
836    fn builder_heartbeat_url_sets_url() {
837        let builder = WorkerBuilder::new("http://localhost:3000", "token")
838            .heartbeat_url("https://uptime.betterstack.com/api/v1/heartbeat/abc");
839        assert_eq!(
840            builder.heartbeat_url.as_deref(),
841            Some("https://uptime.betterstack.com/api/v1/heartbeat/abc")
842        );
843    }
844
845    #[cfg(feature = "heartbeat")]
846    #[test]
847    fn builder_heartbeat_custom_interval() {
848        let interval = Duration::from_secs(10);
849        let builder =
850            WorkerBuilder::new("http://localhost:3000", "token").heartbeat_interval(interval);
851        assert_eq!(builder.heartbeat_interval, interval);
852    }
853
854    #[cfg(feature = "heartbeat")]
855    #[test]
856    fn builder_build_preserves_heartbeat_config() {
857        let provider = Arc::new(ClaudeCodeProvider::new());
858        let interval = Duration::from_secs(15);
859        let worker = WorkerBuilder::new("http://localhost:3000", "token")
860            .provider(provider)
861            .heartbeat_url("https://example.com/heartbeat")
862            .heartbeat_interval(interval)
863            .build()
864            .unwrap();
865        assert_eq!(
866            worker.heartbeat_url.as_deref(),
867            Some("https://example.com/heartbeat")
868        );
869        assert_eq!(worker.heartbeat_interval, interval);
870    }
871
872    #[cfg(feature = "heartbeat")]
873    #[test]
874    fn builder_build_without_heartbeat_url_has_none() {
875        let provider = Arc::new(ClaudeCodeProvider::new());
876        let worker = WorkerBuilder::new("http://localhost:3000", "token")
877            .provider(provider)
878            .build()
879            .unwrap();
880        assert!(worker.heartbeat_url.is_none());
881    }
882
883    // --- PoisonPillTracker tests ---
884
885    #[test]
886    fn poison_tracker_not_blocked_initially() {
887        let tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
888        assert!(!tracker.is_blocked("my-workflow"));
889    }
890
891    #[test]
892    fn poison_tracker_blocked_after_max_panics() {
893        let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
894        assert!(!tracker.record_panic("wf"));
895        assert!(!tracker.record_panic("wf"));
896        assert!(tracker.record_panic("wf"));
897        assert!(tracker.is_blocked("wf"));
898    }
899
900    #[test]
901    fn poison_tracker_success_resets_count() {
902        let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
903        tracker.record_panic("wf");
904        tracker.record_panic("wf");
905        tracker.record_success("wf");
906        assert!(!tracker.is_blocked("wf"));
907        // After reset, need 3 more panics to block
908        assert!(!tracker.record_panic("wf"));
909    }
910
911    #[test]
912    fn poison_tracker_independent_per_workflow() {
913        let mut tracker = PoisonPillTracker::new(2, Duration::from_secs(300));
914        tracker.record_panic("wf-a");
915        tracker.record_panic("wf-a");
916        assert!(tracker.is_blocked("wf-a"));
917        assert!(!tracker.is_blocked("wf-b"));
918    }
919
920    #[test]
921    fn poison_tracker_unblocks_after_cooldown() {
922        let mut tracker = PoisonPillTracker::new(2, Duration::from_millis(0));
923        tracker.record_panic("wf");
924        tracker.record_panic("wf");
925        // Cooldown is 0ms, should immediately unblock
926        assert!(!tracker.is_blocked("wf"));
927    }
928}