Skip to main content

ironflow_worker/
worker.rs

1//! Worker -- polls the API for pending runs and executes them.
2
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7use tokio::spawn;
8use tokio::sync::{Semaphore, mpsc};
9use tokio::time::{sleep, timeout};
10use tokio_util::sync::CancellationToken;
11use tracing::{error, info, warn};
12
13#[cfg(feature = "prometheus")]
14use ironflow_core::metric_names::{WORKER_ACTIVE, WORKER_POLLS_TOTAL};
15use ironflow_core::provider::AgentProvider;
16use ironflow_engine::engine::Engine;
17use ironflow_engine::handler::WorkflowHandler;
18use ironflow_store::entities::RunStatus;
19use ironflow_store::store::RunStore;
20#[cfg(feature = "prometheus")]
21use metrics::{counter, gauge};
22#[cfg(feature = "heartbeat")]
23use reqwest::Client;
24
25use crate::api_store::ApiRunStore;
26use crate::error::WorkerError;
27
28const DEFAULT_CONCURRENCY: usize = 2;
29const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(2);
30const DEFAULT_RUN_TIMEOUT: Duration = Duration::from_secs(30 * 60);
31const DEFAULT_MAX_CONSECUTIVE_PANICS: u32 = 3;
32const DEFAULT_PANIC_COOLDOWN: Duration = Duration::from_secs(5 * 60);
33#[cfg(feature = "heartbeat")]
34const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
35
36/// Builder for configuring and creating a [`Worker`].
37///
38/// # Examples
39///
40/// ```no_run
41/// use std::sync::Arc;
42/// use std::time::Duration;
43/// use ironflow_worker::WorkerBuilder;
44/// use ironflow_core::providers::claude::ClaudeCodeProvider;
45///
46/// # async fn example() -> Result<(), ironflow_worker::WorkerError> {
47/// let worker = WorkerBuilder::new("http://localhost:3000", "my-token")
48///     .provider(Arc::new(ClaudeCodeProvider::new()))
49///     .concurrency(4)
50///     .poll_interval(Duration::from_secs(2))
51///     .run_timeout(Duration::from_secs(600))
52///     .max_consecutive_panics(5)
53///     .build()?;
54///
55/// worker.run().await?;
56/// # Ok(())
57/// # }
58/// ```
59pub struct WorkerBuilder {
60    api_url: String,
61    worker_token: String,
62    provider: Option<Arc<dyn AgentProvider>>,
63    handlers: Vec<Box<dyn WorkflowHandler>>,
64    concurrency: usize,
65    poll_interval: Duration,
66    run_timeout: Duration,
67    max_consecutive_panics: u32,
68    panic_cooldown: Duration,
69    #[cfg(feature = "heartbeat")]
70    heartbeat_url: Option<String>,
71    #[cfg(feature = "heartbeat")]
72    heartbeat_interval: Duration,
73}
74
75impl WorkerBuilder {
76    /// Create a new builder targeting the given API server.
77    pub fn new(api_url: &str, worker_token: &str) -> Self {
78        Self {
79            api_url: api_url.to_string(),
80            worker_token: worker_token.to_string(),
81            provider: None,
82            handlers: Vec::new(),
83            concurrency: DEFAULT_CONCURRENCY,
84            poll_interval: DEFAULT_POLL_INTERVAL,
85            run_timeout: DEFAULT_RUN_TIMEOUT,
86            max_consecutive_panics: DEFAULT_MAX_CONSECUTIVE_PANICS,
87            panic_cooldown: DEFAULT_PANIC_COOLDOWN,
88            #[cfg(feature = "heartbeat")]
89            heartbeat_url: None,
90            #[cfg(feature = "heartbeat")]
91            heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
92        }
93    }
94
95    /// Set the agent provider for AI operations.
96    pub fn provider(mut self, provider: Arc<dyn AgentProvider>) -> Self {
97        self.provider = Some(provider);
98        self
99    }
100
101    /// Register a workflow handler.
102    pub fn register(mut self, handler: impl WorkflowHandler + 'static) -> Self {
103        self.handlers.push(Box::new(handler));
104        self
105    }
106
107    /// Set the maximum number of concurrent workflow executions.
108    pub fn concurrency(mut self, n: usize) -> Self {
109        self.concurrency = n;
110        self
111    }
112
113    /// Set the interval between polls for new runs.
114    pub fn poll_interval(mut self, interval: Duration) -> Self {
115        self.poll_interval = interval;
116        self
117    }
118
119    /// Set the maximum execution time per run.
120    ///
121    /// If a run exceeds this duration, it is cancelled and marked as `Failed`
122    /// with a timeout error. Defaults to 30 minutes.
123    ///
124    /// # Examples
125    ///
126    /// ```no_run
127    /// use std::time::Duration;
128    /// use ironflow_worker::WorkerBuilder;
129    ///
130    /// # fn example() {
131    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
132    ///     .run_timeout(Duration::from_secs(600));
133    /// # }
134    /// ```
135    pub fn run_timeout(mut self, timeout: Duration) -> Self {
136        self.run_timeout = timeout;
137        self
138    }
139
140    /// Set the maximum number of consecutive panics per workflow before
141    /// the worker stops picking runs for that workflow (poison pill guard).
142    ///
143    /// When a workflow panics `max_consecutive_panics` times in a row without
144    /// a single success, the worker skips it for a cooldown period (see
145    /// [`panic_cooldown`](Self::panic_cooldown)). Defaults to 3.
146    ///
147    /// # Examples
148    ///
149    /// ```no_run
150    /// use ironflow_worker::WorkerBuilder;
151    ///
152    /// # fn example() {
153    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
154    ///     .max_consecutive_panics(5);
155    /// # }
156    /// ```
157    pub fn max_consecutive_panics(mut self, n: u32) -> Self {
158        self.max_consecutive_panics = n;
159        self
160    }
161
162    /// Set the cooldown duration after a workflow is flagged as a poison pill.
163    ///
164    /// After `max_consecutive_panics` is reached, runs for that workflow are
165    /// skipped until this duration elapses. Defaults to 5 minutes.
166    ///
167    /// # Examples
168    ///
169    /// ```no_run
170    /// use std::time::Duration;
171    /// use ironflow_worker::WorkerBuilder;
172    ///
173    /// # fn example() {
174    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
175    ///     .panic_cooldown(Duration::from_secs(600));
176    /// # }
177    /// ```
178    pub fn panic_cooldown(mut self, cooldown: Duration) -> Self {
179        self.panic_cooldown = cooldown;
180        self
181    }
182
183    /// Set the heartbeat URL (dead man's switch).
184    ///
185    /// The worker pings this URL at every heartbeat interval with an HTTP
186    /// HEAD request. Compatible with BetterStack Heartbeats, Cronitor,
187    /// Healthchecks.io, or any dead man's switch service.
188    ///
189    /// If not set, no heartbeat is emitted even when the feature is enabled.
190    ///
191    /// # Examples
192    ///
193    /// ```no_run
194    /// use ironflow_worker::WorkerBuilder;
195    ///
196    /// # fn example() {
197    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
198    ///     .heartbeat_url("https://uptime.betterstack.com/api/v1/heartbeat/abc123");
199    /// # }
200    /// ```
201    #[cfg(feature = "heartbeat")]
202    pub fn heartbeat_url(mut self, url: &str) -> Self {
203        self.heartbeat_url = Some(url.to_string());
204        self
205    }
206
207    /// Set the heartbeat interval.
208    ///
209    /// Controls how often the worker pings the [`heartbeat_url`](Self::heartbeat_url).
210    /// Defaults to 30 seconds.
211    ///
212    /// # Examples
213    ///
214    /// ```no_run
215    /// use std::time::Duration;
216    /// use ironflow_worker::WorkerBuilder;
217    ///
218    /// # fn example() {
219    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
220    ///     .heartbeat_url("https://uptime.betterstack.com/api/v1/heartbeat/abc123")
221    ///     .heartbeat_interval(Duration::from_secs(60));
222    /// # }
223    /// ```
224    #[cfg(feature = "heartbeat")]
225    pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
226        self.heartbeat_interval = interval;
227        self
228    }
229
230    /// Build the worker.
231    ///
232    /// # Errors
233    ///
234    /// Returns [`WorkerError::Internal`] if no provider has been set.
235    /// Returns [`WorkerError::Engine`] if a handler registration fails.
236    pub fn build(self) -> Result<Worker, WorkerError> {
237        let provider = self
238            .provider
239            .ok_or_else(|| WorkerError::Internal("WorkerBuilder: provider is required".into()))?;
240
241        let store: Arc<dyn RunStore> =
242            Arc::new(ApiRunStore::new(&self.api_url, &self.worker_token));
243
244        let mut engine = Engine::new(store, provider);
245        for handler in self.handlers {
246            engine
247                .register_boxed(handler)
248                .map_err(WorkerError::Engine)?;
249        }
250
251        #[cfg(feature = "heartbeat")]
252        let heartbeat_client = Client::builder()
253            .timeout(Duration::from_secs(5))
254            .build()
255            .expect("failed to build heartbeat HTTP client");
256
257        Ok(Worker {
258            engine: Arc::new(engine),
259            concurrency: self.concurrency,
260            poll_interval: self.poll_interval,
261            run_timeout: self.run_timeout,
262            max_consecutive_panics: self.max_consecutive_panics,
263            panic_cooldown: self.panic_cooldown,
264            #[cfg(feature = "heartbeat")]
265            heartbeat_url: self.heartbeat_url,
266            #[cfg(feature = "heartbeat")]
267            heartbeat_interval: self.heartbeat_interval,
268            #[cfg(feature = "heartbeat")]
269            heartbeat_client,
270        })
271    }
272}
273
274/// Background worker that polls the API and executes workflows.
275pub struct Worker {
276    engine: Arc<Engine>,
277    concurrency: usize,
278    poll_interval: Duration,
279    run_timeout: Duration,
280    max_consecutive_panics: u32,
281    panic_cooldown: Duration,
282    #[cfg(feature = "heartbeat")]
283    heartbeat_url: Option<String>,
284    #[cfg(feature = "heartbeat")]
285    heartbeat_interval: Duration,
286    #[cfg(feature = "heartbeat")]
287    heartbeat_client: Client,
288}
289
290/// Tracks consecutive failures per workflow for poison pill detection.
291struct PoisonPillTracker {
292    max_consecutive: u32,
293    cooldown: Duration,
294    /// Maps workflow name to (consecutive panic count, last panic time).
295    state: HashMap<String, (u32, Instant)>,
296}
297
298impl PoisonPillTracker {
299    fn new(max_consecutive: u32, cooldown: Duration) -> Self {
300        Self {
301            max_consecutive,
302            cooldown,
303            state: HashMap::new(),
304        }
305    }
306
307    /// Record a panic for a workflow. Returns `true` if the workflow is now
308    /// considered a poison pill.
309    fn record_panic(&mut self, workflow: &str) -> bool {
310        let entry = self
311            .state
312            .entry(workflow.to_string())
313            .or_insert((0, Instant::now()));
314        entry.0 += 1;
315        entry.1 = Instant::now();
316        entry.0 >= self.max_consecutive
317    }
318
319    /// Record a successful execution, resetting the panic counter.
320    fn record_success(&mut self, workflow: &str) {
321        self.state.remove(workflow);
322    }
323
324    /// Returns `true` if the workflow is currently blocked as a poison pill.
325    fn is_blocked(&self, workflow: &str) -> bool {
326        self.state.get(workflow).is_some_and(|(count, last_panic)| {
327            *count >= self.max_consecutive && last_panic.elapsed() < self.cooldown
328        })
329    }
330}
331
332impl Worker {
333    /// Run the worker loop until a shutdown signal (SIGTERM/SIGINT) is received.
334    ///
335    /// On shutdown, the worker stops picking new runs and waits for all
336    /// in-flight executions to complete before returning.
337    ///
338    /// # Errors
339    ///
340    /// Returns [`WorkerError`] if the polling loop encounters an unrecoverable error.
341    pub async fn run(&self) -> Result<(), WorkerError> {
342        let semaphore = Arc::new(Semaphore::new(self.concurrency));
343        let shutdown = CancellationToken::new();
344        let mut idle_streak = 0u32;
345        let poison_tracker = Arc::new(Mutex::new(PoisonPillTracker::new(
346            self.max_consecutive_panics,
347            self.panic_cooldown,
348        )));
349        let (outcome_tx, mut outcome_rx) = mpsc::unbounded_channel::<RunOutcome>();
350
351        info!(
352            concurrency = self.concurrency,
353            poll_interval_ms = self.poll_interval.as_millis() as u64,
354            run_timeout_secs = self.run_timeout.as_secs(),
355            "worker started"
356        );
357
358        // Spawn shutdown signal handler
359        let shutdown_clone = shutdown.clone();
360        spawn(async move {
361            shutdown_signal().await;
362            info!("shutdown signal received, draining in-flight runs...");
363            shutdown_clone.cancel();
364        });
365
366        #[cfg(feature = "heartbeat")]
367        if let Some(ref url) = self.heartbeat_url {
368            let interval = self.heartbeat_interval;
369            let url = url.clone();
370            let client = self.heartbeat_client.clone();
371
372            spawn(async move {
373                let mut ticker = tokio::time::interval(interval);
374                // skip the first immediate tick
375                ticker.tick().await;
376                loop {
377                    ticker.tick().await;
378                    match client.head(&url).send().await {
379                        Ok(resp) if resp.status().is_success() => {
380                            info!(url = %url, "heartbeat sent");
381                        }
382                        Ok(resp) => {
383                            warn!(
384                                url = %url,
385                                status = %resp.status(),
386                                "heartbeat ping returned non-success status"
387                            );
388                        }
389                        Err(err) => {
390                            warn!(
391                                url = %url,
392                                error = %err,
393                                "heartbeat ping failed"
394                            );
395                        }
396                    }
397                }
398            });
399        }
400
401        while !shutdown.is_cancelled() {
402            // Drain outcome channel to update poison pill tracker
403            while let Ok(outcome) = outcome_rx.try_recv() {
404                let mut tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
405                match outcome {
406                    RunOutcome::Success(ref wf) => tracker.record_success(wf),
407                    RunOutcome::Failed(ref wf) | RunOutcome::Timeout(ref wf) => {
408                        if tracker.record_panic(wf) {
409                            warn!(workflow = %wf, "workflow flagged as poison pill after consecutive failures");
410                        }
411                    }
412                    RunOutcome::Panicked(ref wf) => {
413                        if tracker.record_panic(wf) {
414                            error!(workflow = %wf, "workflow flagged as poison pill after consecutive panics");
415                        }
416                    }
417                }
418            }
419
420            let run = self.engine.store().pick_next_pending().await;
421
422            match run {
423                Ok(Some(run)) => {
424                    #[cfg(feature = "prometheus")]
425                    counter!(WORKER_POLLS_TOTAL, "result" => "hit").increment(1);
426
427                    // Poison pill check: skip workflows that keep failing
428                    let is_blocked = {
429                        let tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
430                        tracker.is_blocked(&run.workflow_name)
431                    };
432                    if is_blocked {
433                        warn!(
434                            workflow = %run.workflow_name,
435                            run_id = %run.id,
436                            "skipping run: workflow flagged as poison pill, marking as failed"
437                        );
438                        if let Err(e) = self
439                            .engine
440                            .store()
441                            .update_run_status(run.id, RunStatus::Failed)
442                            .await
443                        {
444                            error!(run_id = %run.id, error = %e, "failed to mark poisoned run as failed");
445                        }
446                        continue;
447                    }
448
449                    let permit = semaphore
450                        .clone()
451                        .acquire_owned()
452                        .await
453                        .map_err(|_| WorkerError::Internal("semaphore closed".to_string()))?;
454
455                    idle_streak = 0;
456                    let engine = self.engine.clone();
457                    let run_id = run.id;
458                    let workflow = run.workflow_name.clone();
459                    let workflow_for_watcher = workflow.clone();
460                    let run_timeout = self.run_timeout;
461
462                    info!(run_id = %run_id, workflow = %workflow, "executing run");
463
464                    #[cfg(feature = "prometheus")]
465                    gauge!(WORKER_ACTIVE).increment(1.0);
466
467                    let handle = spawn(async move {
468                        let _permit = permit;
469                        let result = timeout(run_timeout, engine.execute_handler_run(run_id)).await;
470
471                        match result {
472                            Ok(Ok(_)) => {
473                                info!(run_id = %run_id, workflow = %workflow, "run completed");
474                                RunOutcome::Success(workflow)
475                            }
476                            Ok(Err(e)) => {
477                                error!(run_id = %run_id, workflow = %workflow, error = %e, "run failed");
478                                RunOutcome::Failed(workflow)
479                            }
480                            Err(_) => {
481                                error!(
482                                    run_id = %run_id,
483                                    workflow = %workflow,
484                                    timeout_secs = run_timeout.as_secs(),
485                                    "run timed out"
486                                );
487                                if let Err(e) = engine
488                                    .store()
489                                    .update_run_status(run_id, RunStatus::Failed)
490                                    .await
491                                {
492                                    error!(run_id = %run_id, error = %e, "failed to mark timed-out run as failed");
493                                }
494                                RunOutcome::Timeout(workflow)
495                            }
496                        }
497                    });
498
499                    // Spawn a watcher to catch panics and report outcomes
500                    let store = self.engine.store().clone();
501                    let tx = outcome_tx.clone();
502                    spawn(async move {
503                        match handle.await {
504                            Ok(outcome) => {
505                                let _ = tx.send(outcome);
506                            }
507                            Err(e) => {
508                                error!(run_id = %run_id, "spawned task panicked: {e}");
509                                if let Err(store_err) =
510                                    store.update_run_status(run_id, RunStatus::Failed).await
511                                {
512                                    error!(run_id = %run_id, error = %store_err, "failed to mark panicked run as failed");
513                                }
514                                let _ = tx.send(RunOutcome::Panicked(workflow_for_watcher));
515                            }
516                        }
517                        #[cfg(feature = "prometheus")]
518                        gauge!(WORKER_ACTIVE).decrement(1.0);
519                    });
520                }
521                Ok(None) => {
522                    #[cfg(feature = "prometheus")]
523                    counter!(WORKER_POLLS_TOTAL, "result" => "miss").increment(1);
524
525                    idle_streak += 1;
526                    let backoff = if idle_streak > 10 {
527                        self.poll_interval * 3
528                    } else if idle_streak > 5 {
529                        self.poll_interval * 2
530                    } else {
531                        self.poll_interval
532                    };
533                    sleep(backoff).await;
534                }
535                Err(e) => {
536                    warn!(error = %e, "poll error");
537                    sleep(self.poll_interval).await;
538                }
539            }
540        }
541
542        // Graceful drain: wait for all in-flight tasks to release their permits
543        info!(
544            in_flight = self.concurrency - semaphore.available_permits(),
545            "waiting for in-flight runs to complete..."
546        );
547        let _ = semaphore
548            .acquire_many(self.concurrency as u32)
549            .await
550            .map_err(|_| WorkerError::Shutdown("semaphore closed during drain".to_string()))?;
551
552        info!("all in-flight runs completed, worker shut down");
553        Ok(())
554    }
555}
556
557/// Outcome of a single run execution, used for poison pill tracking.
558enum RunOutcome {
559    /// Run completed successfully.
560    Success(String),
561    /// Run failed with an error (engine returned Err).
562    Failed(String),
563    /// Run exceeded its timeout.
564    Timeout(String),
565    /// Run panicked (task JoinError).
566    Panicked(String),
567}
568
569/// Wait for SIGTERM or SIGINT (Ctrl+C).
570async fn shutdown_signal() {
571    use tokio::signal;
572
573    let ctrl_c = async {
574        signal::ctrl_c()
575            .await
576            .expect("failed to install Ctrl+C handler");
577    };
578
579    #[cfg(unix)]
580    let terminate = async {
581        use tokio::signal::unix::{SignalKind, signal};
582
583        signal(SignalKind::terminate())
584            .expect("failed to install SIGTERM handler")
585            .recv()
586            .await;
587    };
588
589    #[cfg(not(unix))]
590    let terminate = {
591        use std::future::pending;
592        pending::<()>()
593    };
594
595    tokio::select! {
596        () = ctrl_c => {},
597        () = terminate => {},
598    }
599}
600
601#[cfg(test)]
602mod tests {
603    use super::*;
604    use ironflow_core::providers::claude::ClaudeCodeProvider;
605
606    #[test]
607    fn builder_new_creates_default_config() {
608        let builder = WorkerBuilder::new("http://localhost:3000", "my-token");
609        assert_eq!(builder.api_url, "http://localhost:3000");
610        assert_eq!(builder.worker_token, "my-token");
611        assert_eq!(builder.concurrency, DEFAULT_CONCURRENCY);
612        assert_eq!(builder.poll_interval, DEFAULT_POLL_INTERVAL);
613        assert_eq!(builder.run_timeout, DEFAULT_RUN_TIMEOUT);
614        assert_eq!(
615            builder.max_consecutive_panics,
616            DEFAULT_MAX_CONSECUTIVE_PANICS
617        );
618        assert_eq!(builder.panic_cooldown, DEFAULT_PANIC_COOLDOWN);
619        assert!(builder.provider.is_none());
620    }
621
622    #[test]
623    fn builder_with_trailing_slash_normalized() {
624        let builder = WorkerBuilder::new("http://localhost:3000/", "token");
625        assert_eq!(builder.api_url, "http://localhost:3000/");
626    }
627
628    #[test]
629    fn builder_provider_sets_provider() {
630        let provider = Arc::new(ClaudeCodeProvider::new());
631        let builder =
632            WorkerBuilder::new("http://localhost:3000", "token").provider(provider.clone());
633        assert!(builder.provider.is_some());
634    }
635
636    #[test]
637    fn builder_concurrency_sets_concurrency() {
638        let builder = WorkerBuilder::new("http://localhost:3000", "token").concurrency(8);
639        assert_eq!(builder.concurrency, 8);
640    }
641
642    #[test]
643    fn builder_concurrency_zero_accepted() {
644        let provider = Arc::new(ClaudeCodeProvider::new());
645        let builder = WorkerBuilder::new("http://localhost:3000", "token")
646            .provider(provider)
647            .concurrency(0);
648        assert_eq!(builder.concurrency, 0);
649    }
650
651    #[test]
652    fn builder_poll_interval_sets_interval() {
653        let interval = Duration::from_secs(5);
654        let builder = WorkerBuilder::new("http://localhost:3000", "token").poll_interval(interval);
655        assert_eq!(builder.poll_interval, interval);
656    }
657
658    #[test]
659    fn builder_run_timeout_sets_timeout() {
660        let dur = Duration::from_secs(120);
661        let builder = WorkerBuilder::new("http://localhost:3000", "token").run_timeout(dur);
662        assert_eq!(builder.run_timeout, dur);
663    }
664
665    #[test]
666    fn builder_max_consecutive_panics_sets_value() {
667        let builder =
668            WorkerBuilder::new("http://localhost:3000", "token").max_consecutive_panics(10);
669        assert_eq!(builder.max_consecutive_panics, 10);
670    }
671
672    #[test]
673    fn builder_panic_cooldown_sets_value() {
674        let dur = Duration::from_secs(600);
675        let builder = WorkerBuilder::new("http://localhost:3000", "token").panic_cooldown(dur);
676        assert_eq!(builder.panic_cooldown, dur);
677    }
678
679    #[test]
680    fn builder_build_without_provider_fails() {
681        let builder = WorkerBuilder::new("http://localhost:3000", "token");
682        let result = builder.build();
683        assert!(result.is_err());
684        match result {
685            Err(WorkerError::Internal(msg)) => {
686                assert!(msg.contains("provider is required"));
687            }
688            _ => panic!("expected Internal error about missing provider"),
689        }
690    }
691
692    #[test]
693    fn builder_build_with_provider_succeeds() {
694        let provider = Arc::new(ClaudeCodeProvider::new());
695        let builder = WorkerBuilder::new("http://localhost:3000", "token").provider(provider);
696        let result = builder.build();
697        assert!(result.is_ok());
698    }
699
700    #[test]
701    fn builder_build_creates_worker_with_correct_concurrency() {
702        let provider = Arc::new(ClaudeCodeProvider::new());
703        let builder = WorkerBuilder::new("http://localhost:3000", "token")
704            .provider(provider)
705            .concurrency(16);
706        let worker = builder.build().unwrap();
707        assert_eq!(worker.concurrency, 16);
708    }
709
710    #[test]
711    fn builder_build_creates_worker_with_correct_interval() {
712        let provider = Arc::new(ClaudeCodeProvider::new());
713        let interval = Duration::from_secs(10);
714        let builder = WorkerBuilder::new("http://localhost:3000", "token")
715            .provider(provider)
716            .poll_interval(interval);
717        let worker = builder.build().unwrap();
718        assert_eq!(worker.poll_interval, interval);
719    }
720
721    #[test]
722    fn builder_build_preserves_timeout() {
723        let provider = Arc::new(ClaudeCodeProvider::new());
724        let dur = Duration::from_secs(300);
725        let worker = WorkerBuilder::new("http://localhost:3000", "token")
726            .provider(provider)
727            .run_timeout(dur)
728            .build()
729            .unwrap();
730        assert_eq!(worker.run_timeout, dur);
731    }
732
733    #[test]
734    fn builder_build_preserves_poison_pill_config() {
735        let provider = Arc::new(ClaudeCodeProvider::new());
736        let cooldown = Duration::from_secs(120);
737        let worker = WorkerBuilder::new("http://localhost:3000", "token")
738            .provider(provider)
739            .max_consecutive_panics(7)
740            .panic_cooldown(cooldown)
741            .build()
742            .unwrap();
743        assert_eq!(worker.max_consecutive_panics, 7);
744        assert_eq!(worker.panic_cooldown, cooldown);
745    }
746
747    #[test]
748    fn builder_chaining_works() {
749        let provider = Arc::new(ClaudeCodeProvider::new());
750        let result = WorkerBuilder::new("http://localhost:3000", "token")
751            .provider(provider)
752            .concurrency(4)
753            .poll_interval(Duration::from_secs(3))
754            .run_timeout(Duration::from_secs(600))
755            .max_consecutive_panics(5)
756            .panic_cooldown(Duration::from_secs(120))
757            .build();
758        assert!(result.is_ok());
759        let worker = result.unwrap();
760        assert_eq!(worker.concurrency, 4);
761        assert_eq!(worker.poll_interval, Duration::from_secs(3));
762        assert_eq!(worker.run_timeout, Duration::from_secs(600));
763        assert_eq!(worker.max_consecutive_panics, 5);
764        assert_eq!(worker.panic_cooldown, Duration::from_secs(120));
765    }
766
767    #[test]
768    fn builder_empty_api_url_accepted() {
769        let provider = Arc::new(ClaudeCodeProvider::new());
770        let builder = WorkerBuilder::new("", "token").provider(provider);
771        let result = builder.build();
772        assert!(result.is_ok());
773    }
774
775    #[test]
776    fn builder_empty_token_accepted() {
777        let provider = Arc::new(ClaudeCodeProvider::new());
778        let builder = WorkerBuilder::new("http://localhost:3000", "").provider(provider);
779        let result = builder.build();
780        assert!(result.is_ok());
781    }
782
783    #[cfg(feature = "heartbeat")]
784    #[test]
785    fn builder_heartbeat_defaults() {
786        let builder = WorkerBuilder::new("http://localhost:3000", "token");
787        assert!(builder.heartbeat_url.is_none());
788        assert_eq!(builder.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
789    }
790
791    #[cfg(feature = "heartbeat")]
792    #[test]
793    fn builder_heartbeat_url_sets_url() {
794        let builder = WorkerBuilder::new("http://localhost:3000", "token")
795            .heartbeat_url("https://uptime.betterstack.com/api/v1/heartbeat/abc");
796        assert_eq!(
797            builder.heartbeat_url.as_deref(),
798            Some("https://uptime.betterstack.com/api/v1/heartbeat/abc")
799        );
800    }
801
802    #[cfg(feature = "heartbeat")]
803    #[test]
804    fn builder_heartbeat_custom_interval() {
805        let interval = Duration::from_secs(10);
806        let builder =
807            WorkerBuilder::new("http://localhost:3000", "token").heartbeat_interval(interval);
808        assert_eq!(builder.heartbeat_interval, interval);
809    }
810
811    #[cfg(feature = "heartbeat")]
812    #[test]
813    fn builder_build_preserves_heartbeat_config() {
814        let provider = Arc::new(ClaudeCodeProvider::new());
815        let interval = Duration::from_secs(15);
816        let worker = WorkerBuilder::new("http://localhost:3000", "token")
817            .provider(provider)
818            .heartbeat_url("https://example.com/heartbeat")
819            .heartbeat_interval(interval)
820            .build()
821            .unwrap();
822        assert_eq!(
823            worker.heartbeat_url.as_deref(),
824            Some("https://example.com/heartbeat")
825        );
826        assert_eq!(worker.heartbeat_interval, interval);
827    }
828
829    #[cfg(feature = "heartbeat")]
830    #[test]
831    fn builder_build_without_heartbeat_url_has_none() {
832        let provider = Arc::new(ClaudeCodeProvider::new());
833        let worker = WorkerBuilder::new("http://localhost:3000", "token")
834            .provider(provider)
835            .build()
836            .unwrap();
837        assert!(worker.heartbeat_url.is_none());
838    }
839
840    // --- PoisonPillTracker tests ---
841
842    #[test]
843    fn poison_tracker_not_blocked_initially() {
844        let tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
845        assert!(!tracker.is_blocked("my-workflow"));
846    }
847
848    #[test]
849    fn poison_tracker_blocked_after_max_panics() {
850        let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
851        assert!(!tracker.record_panic("wf"));
852        assert!(!tracker.record_panic("wf"));
853        assert!(tracker.record_panic("wf"));
854        assert!(tracker.is_blocked("wf"));
855    }
856
857    #[test]
858    fn poison_tracker_success_resets_count() {
859        let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
860        tracker.record_panic("wf");
861        tracker.record_panic("wf");
862        tracker.record_success("wf");
863        assert!(!tracker.is_blocked("wf"));
864        // After reset, need 3 more panics to block
865        assert!(!tracker.record_panic("wf"));
866    }
867
868    #[test]
869    fn poison_tracker_independent_per_workflow() {
870        let mut tracker = PoisonPillTracker::new(2, Duration::from_secs(300));
871        tracker.record_panic("wf-a");
872        tracker.record_panic("wf-a");
873        assert!(tracker.is_blocked("wf-a"));
874        assert!(!tracker.is_blocked("wf-b"));
875    }
876
877    #[test]
878    fn poison_tracker_unblocks_after_cooldown() {
879        let mut tracker = PoisonPillTracker::new(2, Duration::from_millis(0));
880        tracker.record_panic("wf");
881        tracker.record_panic("wf");
882        // Cooldown is 0ms, should immediately unblock
883        assert!(!tracker.is_blocked("wf"));
884    }
885}