Skip to main content

ironflow_worker/
worker.rs

1//! Worker -- polls the API for pending runs and executes them.
2
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7use tokio::spawn;
8use tokio::sync::{Semaphore, mpsc};
9use tokio::time::{sleep, timeout};
10use tokio_util::sync::CancellationToken;
11use tracing::{error, info, warn};
12
13#[cfg(feature = "prometheus")]
14use ironflow_core::metric_names::{WORKER_ACTIVE, WORKER_POLLS_TOTAL};
15use ironflow_core::provider::AgentProvider;
16use ironflow_engine::engine::Engine;
17use ironflow_engine::handler::WorkflowHandler;
18use ironflow_engine::log_sender::LogReceiver;
19use ironflow_store::entities::RunStatus;
20use ironflow_store::store::Store;
21#[cfg(feature = "prometheus")]
22use metrics::{counter, gauge};
23#[cfg(feature = "heartbeat")]
24use reqwest::Client;
25
26use crate::api_store::ApiRunStore;
27use crate::error::WorkerError;
28use crate::log_pusher::LogPusher;
29
30const DEFAULT_CONCURRENCY: usize = 2;
31const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(2);
32const DEFAULT_RUN_TIMEOUT: Duration = Duration::from_secs(30 * 60);
33const DEFAULT_MAX_CONSECUTIVE_PANICS: u32 = 3;
34const DEFAULT_PANIC_COOLDOWN: Duration = Duration::from_secs(5 * 60);
35#[cfg(feature = "heartbeat")]
36const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
37
38/// Builder for configuring and creating a [`Worker`].
39///
40/// # Examples
41///
42/// ```no_run
43/// use std::sync::Arc;
44/// use std::time::Duration;
45/// use ironflow_worker::WorkerBuilder;
46/// use ironflow_core::providers::claude::ClaudeCodeProvider;
47///
48/// # async fn example() -> Result<(), ironflow_worker::WorkerError> {
49/// let worker = WorkerBuilder::new("http://localhost:3000", "my-token")
50///     .provider(Arc::new(ClaudeCodeProvider::new()))
51///     .concurrency(4)
52///     .poll_interval(Duration::from_secs(2))
53///     .run_timeout(Duration::from_secs(600))
54///     .max_consecutive_panics(5)
55///     .build()?;
56///
57/// worker.run().await?;
58/// # Ok(())
59/// # }
60/// ```
61pub struct WorkerBuilder {
62    api_url: String,
63    worker_token: String,
64    provider: Option<Arc<dyn AgentProvider>>,
65    handlers: Vec<Box<dyn WorkflowHandler>>,
66    concurrency: usize,
67    poll_interval: Duration,
68    run_timeout: Duration,
69    max_consecutive_panics: u32,
70    panic_cooldown: Duration,
71    #[cfg(feature = "heartbeat")]
72    heartbeat_url: Option<String>,
73    #[cfg(feature = "heartbeat")]
74    heartbeat_interval: Duration,
75}
76
77impl WorkerBuilder {
78    /// Create a new builder targeting the given API server.
79    pub fn new(api_url: &str, worker_token: &str) -> Self {
80        Self {
81            api_url: api_url.to_string(),
82            worker_token: worker_token.to_string(),
83            provider: None,
84            handlers: Vec::new(),
85            concurrency: DEFAULT_CONCURRENCY,
86            poll_interval: DEFAULT_POLL_INTERVAL,
87            run_timeout: DEFAULT_RUN_TIMEOUT,
88            max_consecutive_panics: DEFAULT_MAX_CONSECUTIVE_PANICS,
89            panic_cooldown: DEFAULT_PANIC_COOLDOWN,
90            #[cfg(feature = "heartbeat")]
91            heartbeat_url: None,
92            #[cfg(feature = "heartbeat")]
93            heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
94        }
95    }
96
97    /// Set the agent provider for AI operations.
98    pub fn provider(mut self, provider: Arc<dyn AgentProvider>) -> Self {
99        self.provider = Some(provider);
100        self
101    }
102
103    /// Register a workflow handler.
104    pub fn register(mut self, handler: impl WorkflowHandler + 'static) -> Self {
105        self.handlers.push(Box::new(handler));
106        self
107    }
108
109    /// Set the maximum number of concurrent workflow executions.
110    pub fn concurrency(mut self, n: usize) -> Self {
111        self.concurrency = n;
112        self
113    }
114
115    /// Set the interval between polls for new runs.
116    pub fn poll_interval(mut self, interval: Duration) -> Self {
117        self.poll_interval = interval;
118        self
119    }
120
121    /// Set the maximum execution time per run.
122    ///
123    /// If a run exceeds this duration, it is cancelled and marked as `Failed`
124    /// with a timeout error. Defaults to 30 minutes.
125    ///
126    /// # Examples
127    ///
128    /// ```no_run
129    /// use std::time::Duration;
130    /// use ironflow_worker::WorkerBuilder;
131    ///
132    /// # fn example() {
133    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
134    ///     .run_timeout(Duration::from_secs(600));
135    /// # }
136    /// ```
137    pub fn run_timeout(mut self, timeout: Duration) -> Self {
138        self.run_timeout = timeout;
139        self
140    }
141
142    /// Set the maximum number of consecutive panics per workflow before
143    /// the worker stops picking runs for that workflow (poison pill guard).
144    ///
145    /// When a workflow panics `max_consecutive_panics` times in a row without
146    /// a single success, the worker skips it for a cooldown period (see
147    /// [`panic_cooldown`](Self::panic_cooldown)). Defaults to 3.
148    ///
149    /// # Examples
150    ///
151    /// ```no_run
152    /// use ironflow_worker::WorkerBuilder;
153    ///
154    /// # fn example() {
155    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
156    ///     .max_consecutive_panics(5);
157    /// # }
158    /// ```
159    pub fn max_consecutive_panics(mut self, n: u32) -> Self {
160        self.max_consecutive_panics = n;
161        self
162    }
163
164    /// Set the cooldown duration after a workflow is flagged as a poison pill.
165    ///
166    /// After `max_consecutive_panics` is reached, runs for that workflow are
167    /// skipped until this duration elapses. Defaults to 5 minutes.
168    ///
169    /// # Examples
170    ///
171    /// ```no_run
172    /// use std::time::Duration;
173    /// use ironflow_worker::WorkerBuilder;
174    ///
175    /// # fn example() {
176    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
177    ///     .panic_cooldown(Duration::from_secs(600));
178    /// # }
179    /// ```
180    pub fn panic_cooldown(mut self, cooldown: Duration) -> Self {
181        self.panic_cooldown = cooldown;
182        self
183    }
184
185    /// Set the heartbeat URL (dead man's switch).
186    ///
187    /// The worker pings this URL at every heartbeat interval with an HTTP
188    /// HEAD request. Compatible with BetterStack Heartbeats, Cronitor,
189    /// Healthchecks.io, or any dead man's switch service.
190    ///
191    /// If not set, no heartbeat is emitted even when the feature is enabled.
192    ///
193    /// # Examples
194    ///
195    /// ```no_run
196    /// use ironflow_worker::WorkerBuilder;
197    ///
198    /// # fn example() {
199    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
200    ///     .heartbeat_url("https://uptime.betterstack.com/api/v1/heartbeat/abc123");
201    /// # }
202    /// ```
203    #[cfg(feature = "heartbeat")]
204    pub fn heartbeat_url(mut self, url: &str) -> Self {
205        self.heartbeat_url = Some(url.to_string());
206        self
207    }
208
209    /// Set the heartbeat interval.
210    ///
211    /// Controls how often the worker pings the [`heartbeat_url`](Self::heartbeat_url).
212    /// Defaults to 30 seconds.
213    ///
214    /// # Examples
215    ///
216    /// ```no_run
217    /// use std::time::Duration;
218    /// use ironflow_worker::WorkerBuilder;
219    ///
220    /// # fn example() {
221    /// let builder = WorkerBuilder::new("http://localhost:3000", "token")
222    ///     .heartbeat_url("https://uptime.betterstack.com/api/v1/heartbeat/abc123")
223    ///     .heartbeat_interval(Duration::from_secs(60));
224    /// # }
225    /// ```
226    #[cfg(feature = "heartbeat")]
227    pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
228        self.heartbeat_interval = interval;
229        self
230    }
231
232    /// Build the worker.
233    ///
234    /// # Errors
235    ///
236    /// Returns [`WorkerError::Internal`] if no provider has been set.
237    /// Returns [`WorkerError::Engine`] if a handler registration fails.
238    pub fn build(self) -> Result<Worker, WorkerError> {
239        let provider = self
240            .provider
241            .ok_or_else(|| WorkerError::Internal("WorkerBuilder: provider is required".into()))?;
242
243        let store: Arc<dyn Store> = Arc::new(ApiRunStore::new(&self.api_url, &self.worker_token));
244
245        let mut engine = Engine::new(store, provider);
246        for handler in self.handlers {
247            engine
248                .register_boxed(handler)
249                .map_err(WorkerError::Engine)?;
250        }
251
252        let (log_sender, log_receiver) = ironflow_engine::log_sender::channel();
253        engine.set_log_sender(log_sender);
254
255        #[cfg(feature = "heartbeat")]
256        let heartbeat_client = Client::builder()
257            .timeout(Duration::from_secs(5))
258            .build()
259            .expect("failed to build heartbeat HTTP client");
260
261        Ok(Worker {
262            engine: Arc::new(engine),
263            api_url: self.api_url,
264            worker_token: self.worker_token,
265            log_receiver: Mutex::new(Some(log_receiver)),
266            concurrency: self.concurrency,
267            poll_interval: self.poll_interval,
268            run_timeout: self.run_timeout,
269            max_consecutive_panics: self.max_consecutive_panics,
270            panic_cooldown: self.panic_cooldown,
271            #[cfg(feature = "heartbeat")]
272            heartbeat_url: self.heartbeat_url,
273            #[cfg(feature = "heartbeat")]
274            heartbeat_interval: self.heartbeat_interval,
275            #[cfg(feature = "heartbeat")]
276            heartbeat_client,
277        })
278    }
279}
280
281/// Background worker that polls the API and executes workflows.
282pub struct Worker {
283    engine: Arc<Engine>,
284    api_url: String,
285    worker_token: String,
286    log_receiver: Mutex<Option<LogReceiver>>,
287    concurrency: usize,
288    poll_interval: Duration,
289    run_timeout: Duration,
290    max_consecutive_panics: u32,
291    panic_cooldown: Duration,
292    #[cfg(feature = "heartbeat")]
293    heartbeat_url: Option<String>,
294    #[cfg(feature = "heartbeat")]
295    heartbeat_interval: Duration,
296    #[cfg(feature = "heartbeat")]
297    heartbeat_client: Client,
298}
299
300/// Tracks consecutive failures per workflow for poison pill detection.
301struct PoisonPillTracker {
302    max_consecutive: u32,
303    cooldown: Duration,
304    /// Maps workflow name to (consecutive panic count, last panic time).
305    state: HashMap<String, (u32, Instant)>,
306}
307
308impl PoisonPillTracker {
309    fn new(max_consecutive: u32, cooldown: Duration) -> Self {
310        Self {
311            max_consecutive,
312            cooldown,
313            state: HashMap::new(),
314        }
315    }
316
317    /// Record a panic for a workflow. Returns `true` if the workflow is now
318    /// considered a poison pill.
319    fn record_panic(&mut self, workflow: &str) -> bool {
320        let entry = self
321            .state
322            .entry(workflow.to_string())
323            .or_insert((0, Instant::now()));
324        entry.0 += 1;
325        entry.1 = Instant::now();
326        entry.0 >= self.max_consecutive
327    }
328
329    /// Record a successful execution, resetting the panic counter.
330    fn record_success(&mut self, workflow: &str) {
331        self.state.remove(workflow);
332    }
333
334    /// Returns `true` if the workflow is currently blocked as a poison pill.
335    fn is_blocked(&self, workflow: &str) -> bool {
336        self.state.get(workflow).is_some_and(|(count, last_panic)| {
337            *count >= self.max_consecutive && last_panic.elapsed() < self.cooldown
338        })
339    }
340}
341
342impl Worker {
343    /// Run the worker loop until a shutdown signal (SIGTERM/SIGINT) is received.
344    ///
345    /// On shutdown, the worker stops picking new runs and waits for all
346    /// in-flight executions to complete before returning.
347    ///
348    /// # Errors
349    ///
350    /// Returns [`WorkerError`] if the polling loop encounters an unrecoverable error.
351    pub async fn run(&self) -> Result<(), WorkerError> {
352        let semaphore = Arc::new(Semaphore::new(self.concurrency));
353        let shutdown = CancellationToken::new();
354        let mut idle_streak = 0u32;
355        let poison_tracker = Arc::new(Mutex::new(PoisonPillTracker::new(
356            self.max_consecutive_panics,
357            self.panic_cooldown,
358        )));
359        let (outcome_tx, mut outcome_rx) = mpsc::unbounded_channel::<RunOutcome>();
360
361        info!(
362            concurrency = self.concurrency,
363            poll_interval_ms = self.poll_interval.as_millis() as u64,
364            run_timeout_secs = self.run_timeout.as_secs(),
365            "worker started"
366        );
367
368        if let Some(receiver) = self.log_receiver.lock().expect("log_receiver lock").take() {
369            let pusher = LogPusher::new(&self.api_url, &self.worker_token);
370            spawn(pusher.run(receiver));
371            info!("log pusher started");
372        }
373
374        // Spawn shutdown signal handler
375        let shutdown_clone = shutdown.clone();
376        spawn(async move {
377            shutdown_signal().await;
378            info!("shutdown signal received, draining in-flight runs...");
379            shutdown_clone.cancel();
380        });
381
382        #[cfg(feature = "heartbeat")]
383        if let Some(ref url) = self.heartbeat_url {
384            let interval = self.heartbeat_interval;
385            let url = url.clone();
386            let client = self.heartbeat_client.clone();
387
388            spawn(async move {
389                let mut ticker = tokio::time::interval(interval);
390                // skip the first immediate tick
391                ticker.tick().await;
392                loop {
393                    ticker.tick().await;
394                    match client.head(&url).send().await {
395                        Ok(resp) if resp.status().is_success() => {
396                            info!(url = %url, "heartbeat sent");
397                        }
398                        Ok(resp) => {
399                            warn!(
400                                url = %url,
401                                status = %resp.status(),
402                                "heartbeat ping returned non-success status"
403                            );
404                        }
405                        Err(err) => {
406                            warn!(
407                                url = %url,
408                                error = %err,
409                                "heartbeat ping failed"
410                            );
411                        }
412                    }
413                }
414            });
415        }
416
417        while !shutdown.is_cancelled() {
418            // Drain outcome channel to update poison pill tracker
419            while let Ok(outcome) = outcome_rx.try_recv() {
420                let mut tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
421                match outcome {
422                    RunOutcome::Success(ref wf) => tracker.record_success(wf),
423                    RunOutcome::Failed(ref wf) | RunOutcome::Timeout(ref wf) => {
424                        if tracker.record_panic(wf) {
425                            warn!(workflow = %wf, "workflow flagged as poison pill after consecutive failures");
426                        }
427                    }
428                    RunOutcome::Panicked(ref wf) => {
429                        if tracker.record_panic(wf) {
430                            error!(workflow = %wf, "workflow flagged as poison pill after consecutive panics");
431                        }
432                    }
433                }
434            }
435
436            let run = self.engine.store().pick_next_pending().await;
437
438            match run {
439                Ok(Some(run)) => {
440                    #[cfg(feature = "prometheus")]
441                    counter!(WORKER_POLLS_TOTAL, "result" => "hit").increment(1);
442
443                    // Poison pill check: skip workflows that keep failing
444                    let is_blocked = {
445                        let tracker = poison_tracker.lock().expect("poison tracker lock poisoned");
446                        tracker.is_blocked(&run.workflow_name)
447                    };
448                    if is_blocked {
449                        warn!(
450                            workflow = %run.workflow_name,
451                            run_id = %run.id,
452                            "skipping run: workflow flagged as poison pill, marking as failed"
453                        );
454                        if let Err(e) = self
455                            .engine
456                            .store()
457                            .update_run_status(run.id, RunStatus::Failed)
458                            .await
459                        {
460                            error!(run_id = %run.id, error = %e, "failed to mark poisoned run as failed");
461                        }
462                        continue;
463                    }
464
465                    let permit = semaphore
466                        .clone()
467                        .acquire_owned()
468                        .await
469                        .map_err(|_| WorkerError::Internal("semaphore closed".to_string()))?;
470
471                    idle_streak = 0;
472                    let engine = self.engine.clone();
473                    let run_id = run.id;
474                    let workflow = run.workflow_name.clone();
475                    let workflow_for_watcher = workflow.clone();
476                    let run_timeout = self.run_timeout;
477
478                    info!(run_id = %run_id, workflow = %workflow, "executing run");
479
480                    #[cfg(feature = "prometheus")]
481                    gauge!(WORKER_ACTIVE).increment(1.0);
482
483                    let handle = spawn(async move {
484                        let _permit = permit;
485                        let result = timeout(run_timeout, engine.execute_handler_run(run_id)).await;
486
487                        match result {
488                            Ok(Ok(_)) => {
489                                info!(run_id = %run_id, workflow = %workflow, "run completed");
490                                RunOutcome::Success(workflow)
491                            }
492                            Ok(Err(e)) => {
493                                error!(run_id = %run_id, workflow = %workflow, error = %e, "run failed");
494                                if let Err(store_err) = engine
495                                    .store()
496                                    .update_run_status(run_id, RunStatus::Failed)
497                                    .await
498                                {
499                                    error!(run_id = %run_id, error = %store_err, "failed to mark run as failed");
500                                }
501                                RunOutcome::Failed(workflow)
502                            }
503                            Err(_) => {
504                                error!(
505                                    run_id = %run_id,
506                                    workflow = %workflow,
507                                    timeout_secs = run_timeout.as_secs(),
508                                    "run timed out"
509                                );
510                                if let Err(e) = engine
511                                    .store()
512                                    .update_run_status(run_id, RunStatus::Failed)
513                                    .await
514                                {
515                                    error!(run_id = %run_id, error = %e, "failed to mark timed-out run as failed");
516                                }
517                                RunOutcome::Timeout(workflow)
518                            }
519                        }
520                    });
521
522                    // Spawn a watcher to catch panics and report outcomes
523                    let store = self.engine.store().clone();
524                    let tx = outcome_tx.clone();
525                    spawn(async move {
526                        match handle.await {
527                            Ok(outcome) => {
528                                let _ = tx.send(outcome);
529                            }
530                            Err(e) => {
531                                error!(run_id = %run_id, "spawned task panicked: {e}");
532                                if let Err(store_err) =
533                                    store.update_run_status(run_id, RunStatus::Failed).await
534                                {
535                                    error!(run_id = %run_id, error = %store_err, "failed to mark panicked run as failed");
536                                }
537                                let _ = tx.send(RunOutcome::Panicked(workflow_for_watcher));
538                            }
539                        }
540                        #[cfg(feature = "prometheus")]
541                        gauge!(WORKER_ACTIVE).decrement(1.0);
542                    });
543                }
544                Ok(None) => {
545                    #[cfg(feature = "prometheus")]
546                    counter!(WORKER_POLLS_TOTAL, "result" => "miss").increment(1);
547
548                    idle_streak += 1;
549                    let backoff = if idle_streak > 10 {
550                        self.poll_interval * 3
551                    } else if idle_streak > 5 {
552                        self.poll_interval * 2
553                    } else {
554                        self.poll_interval
555                    };
556                    sleep(backoff).await;
557                }
558                Err(e) => {
559                    warn!(error = %e, "poll error");
560                    sleep(self.poll_interval).await;
561                }
562            }
563        }
564
565        // Graceful drain: wait for all in-flight tasks to release their permits
566        info!(
567            in_flight = self.concurrency - semaphore.available_permits(),
568            "waiting for in-flight runs to complete..."
569        );
570        let _ = semaphore
571            .acquire_many(self.concurrency as u32)
572            .await
573            .map_err(|_| WorkerError::Shutdown("semaphore closed during drain".to_string()))?;
574
575        info!("all in-flight runs completed, worker shut down");
576        Ok(())
577    }
578}
579
580/// Outcome of a single run execution, used for poison pill tracking.
581enum RunOutcome {
582    /// Run completed successfully.
583    Success(String),
584    /// Run failed with an error (engine returned Err).
585    Failed(String),
586    /// Run exceeded its timeout.
587    Timeout(String),
588    /// Run panicked (task JoinError).
589    Panicked(String),
590}
591
592/// Wait for SIGTERM or SIGINT (Ctrl+C).
593async fn shutdown_signal() {
594    use tokio::signal;
595
596    let ctrl_c = async {
597        signal::ctrl_c()
598            .await
599            .expect("failed to install Ctrl+C handler");
600    };
601
602    #[cfg(unix)]
603    let terminate = async {
604        use tokio::signal::unix::{SignalKind, signal};
605
606        signal(SignalKind::terminate())
607            .expect("failed to install SIGTERM handler")
608            .recv()
609            .await;
610    };
611
612    #[cfg(not(unix))]
613    let terminate = {
614        use std::future::pending;
615        pending::<()>()
616    };
617
618    tokio::select! {
619        () = ctrl_c => {},
620        () = terminate => {},
621    }
622}
623
624#[cfg(test)]
625mod tests {
626    use super::*;
627    use ironflow_core::providers::claude::ClaudeCodeProvider;
628
629    #[test]
630    fn builder_new_creates_default_config() {
631        let builder = WorkerBuilder::new("http://localhost:3000", "my-token");
632        assert_eq!(builder.api_url, "http://localhost:3000");
633        assert_eq!(builder.worker_token, "my-token");
634        assert_eq!(builder.concurrency, DEFAULT_CONCURRENCY);
635        assert_eq!(builder.poll_interval, DEFAULT_POLL_INTERVAL);
636        assert_eq!(builder.run_timeout, DEFAULT_RUN_TIMEOUT);
637        assert_eq!(
638            builder.max_consecutive_panics,
639            DEFAULT_MAX_CONSECUTIVE_PANICS
640        );
641        assert_eq!(builder.panic_cooldown, DEFAULT_PANIC_COOLDOWN);
642        assert!(builder.provider.is_none());
643    }
644
645    #[test]
646    fn builder_with_trailing_slash_normalized() {
647        let builder = WorkerBuilder::new("http://localhost:3000/", "token");
648        assert_eq!(builder.api_url, "http://localhost:3000/");
649    }
650
651    #[test]
652    fn builder_provider_sets_provider() {
653        let provider = Arc::new(ClaudeCodeProvider::new());
654        let builder =
655            WorkerBuilder::new("http://localhost:3000", "token").provider(provider.clone());
656        assert!(builder.provider.is_some());
657    }
658
659    #[test]
660    fn builder_concurrency_sets_concurrency() {
661        let builder = WorkerBuilder::new("http://localhost:3000", "token").concurrency(8);
662        assert_eq!(builder.concurrency, 8);
663    }
664
665    #[test]
666    fn builder_concurrency_zero_accepted() {
667        let provider = Arc::new(ClaudeCodeProvider::new());
668        let builder = WorkerBuilder::new("http://localhost:3000", "token")
669            .provider(provider)
670            .concurrency(0);
671        assert_eq!(builder.concurrency, 0);
672    }
673
674    #[test]
675    fn builder_poll_interval_sets_interval() {
676        let interval = Duration::from_secs(5);
677        let builder = WorkerBuilder::new("http://localhost:3000", "token").poll_interval(interval);
678        assert_eq!(builder.poll_interval, interval);
679    }
680
681    #[test]
682    fn builder_run_timeout_sets_timeout() {
683        let dur = Duration::from_secs(120);
684        let builder = WorkerBuilder::new("http://localhost:3000", "token").run_timeout(dur);
685        assert_eq!(builder.run_timeout, dur);
686    }
687
688    #[test]
689    fn builder_max_consecutive_panics_sets_value() {
690        let builder =
691            WorkerBuilder::new("http://localhost:3000", "token").max_consecutive_panics(10);
692        assert_eq!(builder.max_consecutive_panics, 10);
693    }
694
695    #[test]
696    fn builder_panic_cooldown_sets_value() {
697        let dur = Duration::from_secs(600);
698        let builder = WorkerBuilder::new("http://localhost:3000", "token").panic_cooldown(dur);
699        assert_eq!(builder.panic_cooldown, dur);
700    }
701
702    #[test]
703    fn builder_build_without_provider_fails() {
704        let builder = WorkerBuilder::new("http://localhost:3000", "token");
705        let result = builder.build();
706        assert!(result.is_err());
707        match result {
708            Err(WorkerError::Internal(msg)) => {
709                assert!(msg.contains("provider is required"));
710            }
711            _ => panic!("expected Internal error about missing provider"),
712        }
713    }
714
715    #[test]
716    fn builder_build_with_provider_succeeds() {
717        let provider = Arc::new(ClaudeCodeProvider::new());
718        let builder = WorkerBuilder::new("http://localhost:3000", "token").provider(provider);
719        let result = builder.build();
720        assert!(result.is_ok());
721    }
722
723    #[test]
724    fn builder_build_creates_worker_with_correct_concurrency() {
725        let provider = Arc::new(ClaudeCodeProvider::new());
726        let builder = WorkerBuilder::new("http://localhost:3000", "token")
727            .provider(provider)
728            .concurrency(16);
729        let worker = builder.build().unwrap();
730        assert_eq!(worker.concurrency, 16);
731    }
732
733    #[test]
734    fn builder_build_creates_worker_with_correct_interval() {
735        let provider = Arc::new(ClaudeCodeProvider::new());
736        let interval = Duration::from_secs(10);
737        let builder = WorkerBuilder::new("http://localhost:3000", "token")
738            .provider(provider)
739            .poll_interval(interval);
740        let worker = builder.build().unwrap();
741        assert_eq!(worker.poll_interval, interval);
742    }
743
744    #[test]
745    fn builder_build_preserves_timeout() {
746        let provider = Arc::new(ClaudeCodeProvider::new());
747        let dur = Duration::from_secs(300);
748        let worker = WorkerBuilder::new("http://localhost:3000", "token")
749            .provider(provider)
750            .run_timeout(dur)
751            .build()
752            .unwrap();
753        assert_eq!(worker.run_timeout, dur);
754    }
755
756    #[test]
757    fn builder_build_preserves_poison_pill_config() {
758        let provider = Arc::new(ClaudeCodeProvider::new());
759        let cooldown = Duration::from_secs(120);
760        let worker = WorkerBuilder::new("http://localhost:3000", "token")
761            .provider(provider)
762            .max_consecutive_panics(7)
763            .panic_cooldown(cooldown)
764            .build()
765            .unwrap();
766        assert_eq!(worker.max_consecutive_panics, 7);
767        assert_eq!(worker.panic_cooldown, cooldown);
768    }
769
770    #[test]
771    fn builder_chaining_works() {
772        let provider = Arc::new(ClaudeCodeProvider::new());
773        let result = WorkerBuilder::new("http://localhost:3000", "token")
774            .provider(provider)
775            .concurrency(4)
776            .poll_interval(Duration::from_secs(3))
777            .run_timeout(Duration::from_secs(600))
778            .max_consecutive_panics(5)
779            .panic_cooldown(Duration::from_secs(120))
780            .build();
781        assert!(result.is_ok());
782        let worker = result.unwrap();
783        assert_eq!(worker.concurrency, 4);
784        assert_eq!(worker.poll_interval, Duration::from_secs(3));
785        assert_eq!(worker.run_timeout, Duration::from_secs(600));
786        assert_eq!(worker.max_consecutive_panics, 5);
787        assert_eq!(worker.panic_cooldown, Duration::from_secs(120));
788    }
789
790    #[test]
791    fn builder_empty_api_url_accepted() {
792        let provider = Arc::new(ClaudeCodeProvider::new());
793        let builder = WorkerBuilder::new("", "token").provider(provider);
794        let result = builder.build();
795        assert!(result.is_ok());
796    }
797
798    #[test]
799    fn builder_empty_token_accepted() {
800        let provider = Arc::new(ClaudeCodeProvider::new());
801        let builder = WorkerBuilder::new("http://localhost:3000", "").provider(provider);
802        let result = builder.build();
803        assert!(result.is_ok());
804    }
805
806    #[cfg(feature = "heartbeat")]
807    #[test]
808    fn builder_heartbeat_defaults() {
809        let builder = WorkerBuilder::new("http://localhost:3000", "token");
810        assert!(builder.heartbeat_url.is_none());
811        assert_eq!(builder.heartbeat_interval, DEFAULT_HEARTBEAT_INTERVAL);
812    }
813
814    #[cfg(feature = "heartbeat")]
815    #[test]
816    fn builder_heartbeat_url_sets_url() {
817        let builder = WorkerBuilder::new("http://localhost:3000", "token")
818            .heartbeat_url("https://uptime.betterstack.com/api/v1/heartbeat/abc");
819        assert_eq!(
820            builder.heartbeat_url.as_deref(),
821            Some("https://uptime.betterstack.com/api/v1/heartbeat/abc")
822        );
823    }
824
825    #[cfg(feature = "heartbeat")]
826    #[test]
827    fn builder_heartbeat_custom_interval() {
828        let interval = Duration::from_secs(10);
829        let builder =
830            WorkerBuilder::new("http://localhost:3000", "token").heartbeat_interval(interval);
831        assert_eq!(builder.heartbeat_interval, interval);
832    }
833
834    #[cfg(feature = "heartbeat")]
835    #[test]
836    fn builder_build_preserves_heartbeat_config() {
837        let provider = Arc::new(ClaudeCodeProvider::new());
838        let interval = Duration::from_secs(15);
839        let worker = WorkerBuilder::new("http://localhost:3000", "token")
840            .provider(provider)
841            .heartbeat_url("https://example.com/heartbeat")
842            .heartbeat_interval(interval)
843            .build()
844            .unwrap();
845        assert_eq!(
846            worker.heartbeat_url.as_deref(),
847            Some("https://example.com/heartbeat")
848        );
849        assert_eq!(worker.heartbeat_interval, interval);
850    }
851
852    #[cfg(feature = "heartbeat")]
853    #[test]
854    fn builder_build_without_heartbeat_url_has_none() {
855        let provider = Arc::new(ClaudeCodeProvider::new());
856        let worker = WorkerBuilder::new("http://localhost:3000", "token")
857            .provider(provider)
858            .build()
859            .unwrap();
860        assert!(worker.heartbeat_url.is_none());
861    }
862
863    // --- PoisonPillTracker tests ---
864
865    #[test]
866    fn poison_tracker_not_blocked_initially() {
867        let tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
868        assert!(!tracker.is_blocked("my-workflow"));
869    }
870
871    #[test]
872    fn poison_tracker_blocked_after_max_panics() {
873        let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
874        assert!(!tracker.record_panic("wf"));
875        assert!(!tracker.record_panic("wf"));
876        assert!(tracker.record_panic("wf"));
877        assert!(tracker.is_blocked("wf"));
878    }
879
880    #[test]
881    fn poison_tracker_success_resets_count() {
882        let mut tracker = PoisonPillTracker::new(3, Duration::from_secs(300));
883        tracker.record_panic("wf");
884        tracker.record_panic("wf");
885        tracker.record_success("wf");
886        assert!(!tracker.is_blocked("wf"));
887        // After reset, need 3 more panics to block
888        assert!(!tracker.record_panic("wf"));
889    }
890
891    #[test]
892    fn poison_tracker_independent_per_workflow() {
893        let mut tracker = PoisonPillTracker::new(2, Duration::from_secs(300));
894        tracker.record_panic("wf-a");
895        tracker.record_panic("wf-a");
896        assert!(tracker.is_blocked("wf-a"));
897        assert!(!tracker.is_blocked("wf-b"));
898    }
899
900    #[test]
901    fn poison_tracker_unblocks_after_cooldown() {
902        let mut tracker = PoisonPillTracker::new(2, Duration::from_millis(0));
903        tracker.record_panic("wf");
904        tracker.record_panic("wf");
905        // Cooldown is 0ms, should immediately unblock
906        assert!(!tracker.is_blocked("wf"));
907    }
908}