floxide_core/distributed/
worker.rs

1use crate::context::Context;
2use crate::distributed::{
3    ContextStore, ErrorStore, LivenessStore, MetricsStore, RunInfoStore, RunStatus,
4    WorkItemStateStore, WorkItemStatus, WorkQueue, WorkerHealth, WorkerStatus, WorkflowError,
5};
6use crate::error::FloxideError;
7use crate::retry::{BackoffStrategy, RetryError, RetryPolicy};
8use crate::workflow::Workflow;
9use async_trait::async_trait;
10use rand::Rng;
11use serde_json;
12use std::marker::PhantomData;
13use std::sync::Arc;
14use tokio::task::JoinHandle;
15use tokio::time::{sleep, Duration};
16use tokio_util::sync::CancellationToken;
17use tracing::error;
18
19use super::{ItemProcessedOutcome, StepCallbacks};
20
21/// A distributed workflow worker that polls a work queue, processes workflow steps, and updates state in distributed stores.
22///
23/// Use [`run_once`] to process a single work item, or [`run_forever`] to continuously poll for work.
24#[derive(Clone)]
25pub struct DistributedWorker<W, C, Q, RIS, MS, ES, LS, WISS, CS>
26where
27    W: Workflow<C, WorkItem: 'static>,
28    C: Context + crate::merge::Merge + Default,
29    Q: WorkQueue<C, W::WorkItem> + Send + Sync + 'static,
30    RIS: RunInfoStore + Send + Sync,
31    MS: MetricsStore + Send + Sync,
32    ES: ErrorStore + Send + Sync,
33    LS: LivenessStore + Send + Sync,
34    WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
35    CS: ContextStore<C> + Send + Sync + Clone + 'static,
36{
37    workflow: W,
38    queue: Q,
39    context_store: CS,
40    run_info_store: RIS,
41    metrics_store: MS,
42    error_store: ES,
43    liveness_store: LS,
44    work_item_state_store: WISS,
45    retry_policy: Option<RetryPolicy>,
46    idle_sleep_duration: Duration,
47    idle_sleep_jitter: Duration,
48    phantom: PhantomData<C>,
49}
50
51impl<W, C, Q, RIS, MS, ES, LS, WISS, CS> DistributedWorker<W, C, Q, RIS, MS, ES, LS, WISS, CS>
52where
53    W: Workflow<C, WorkItem: 'static> + 'static,
54    C: Context + crate::merge::Merge + Default + 'static,
55    Q: WorkQueue<C, W::WorkItem> + Send + Sync + Clone,
56    RIS: RunInfoStore + Send + Sync + Clone + 'static,
57    MS: MetricsStore + Send + Sync + Clone + 'static,
58    ES: ErrorStore + Send + Sync + Clone + 'static,
59    LS: LivenessStore + Send + Sync + Clone + 'static,
60    WISS: WorkItemStateStore<W::WorkItem> + Send + Sync + Clone + 'static,
61    CS: ContextStore<C> + Send + Sync + Clone + 'static,
62    Self: Clone,
63{
64    /// Create a new distributed worker with all required stores and workflow.
65    ///
66    /// See [`WorkerBuilder`] for ergonomic construction with defaults.
67    #[allow(clippy::too_many_arguments)]
68    pub fn new(
69        workflow: W,
70        queue: Q,
71        context_store: CS,
72        run_info_store: RIS,
73        metrics_store: MS,
74        error_store: ES,
75        liveness_store: LS,
76        work_item_state_store: WISS,
77    ) -> Self {
78        Self {
79            workflow,
80            queue,
81            context_store,
82            run_info_store,
83            metrics_store,
84            error_store,
85            liveness_store,
86            work_item_state_store,
87            retry_policy: None,
88            idle_sleep_duration: Duration::from_millis(100),
89            idle_sleep_jitter: Duration::from_millis(50),
90            phantom: PhantomData,
91        }
92    }
93
94    /// Set a retry policy for all work items.
95    pub fn set_retry_policy(&mut self, policy: RetryPolicy) {
96        self.retry_policy = Some(policy);
97    }
98
99    #[allow(clippy::type_complexity)]
100    fn build_callbacks(
101        &self,
102        worker_id: usize,
103    ) -> Arc<StepCallbacksImpl<C, W, Q, RIS, MS, ES, LS, WISS, CS>> {
104        let cloned_worker = self.clone();
105        Arc::new(StepCallbacksImpl {
106            worker: Arc::new(cloned_worker),
107            worker_id,
108        })
109    }
110
111    // --- Callback-style state update methods ---
112    /// Called when a work item is about to be processed. Only allows Pending items to proceed.
113    /// Returns Err if the item is not in a processable state.
114    async fn on_started_state_updates(
115        &self,
116        worker_id: usize,
117        run_id: &str,
118        work_item: &W::WorkItem,
119    ) -> Result<(), FloxideError> {
120        let mut health = self
121            .liveness_store
122            .get_health(worker_id)
123            .await
124            .ok()
125            .flatten()
126            .unwrap_or_default();
127        health.status = WorkerStatus::InProgress;
128        health.current_work_item = Some(format!("{:?}", work_item));
129        health.current_work_item_run_id = Some(run_id.to_string());
130        self.liveness_store
131            .update_health(worker_id, health)
132            .await
133            .ok();
134        let item_state = self
135            .work_item_state_store
136            .get_status(run_id, work_item)
137            .await;
138        match item_state {
139            Ok(WorkItemStatus::Pending) => {
140                // Normal path: transition to InProgress and increment attempts
141                self.work_item_state_store
142                    .set_status(run_id, work_item, WorkItemStatus::InProgress)
143                    .await
144                    .ok();
145                self.work_item_state_store
146                    .increment_attempts(run_id, work_item)
147                    .await
148                    .ok();
149                Ok(())
150            }
151            Ok(WorkItemStatus::InProgress) => {
152                // [Distributed edge case]: Multiple workers may dequeue the same work item nearly simultaneously.
153                // Only one will succeed in updating the state; the others will see it as already in progress or completed.
154                // This is expected in distributed systems without strict distributed locking or atomic compare-and-set.
155                tracing::warn!(
156                    worker_id,
157                    run_id,
158                    "Work item {:?} is already in progress",
159                    work_item
160                );
161                Err(FloxideError::Generic(format!(
162                    "Work item {:?} is already in progress",
163                    work_item
164                )))
165            }
166            Ok(WorkItemStatus::Completed) => {
167                // [Distributed edge case]: Multiple workers may attempt to process the same work item, but only one can complete it.
168                // The others will see it as already completed. This is not a fatal error, but is expected in at-least-once delivery systems.
169                tracing::warn!(
170                    worker_id,
171                    run_id,
172                    "Work item {:?} is already completed",
173                    work_item
174                );
175                Err(FloxideError::Generic(format!(
176                    "Work item {:?} is already completed",
177                    work_item
178                )))
179            }
180            Ok(WorkItemStatus::Failed) => {
181                tracing::error!(
182                    worker_id,
183                    run_id,
184                    "Work item {:?} previously failed and should not be processed again",
185                    work_item
186                );
187                Err(FloxideError::Generic(format!(
188                    "Work item {:?} previously failed and should not be processed again",
189                    work_item
190                )))
191            }
192            Ok(WorkItemStatus::WaitingRetry) => {
193                tracing::warn!(
194                    worker_id,
195                    run_id,
196                    "Work item {:?} is waiting for retry backoff",
197                    work_item
198                );
199                Err(FloxideError::Generic(format!(
200                    "Work item {:?} is waiting for retry backoff",
201                    work_item
202                )))
203            }
204            Ok(WorkItemStatus::PermanentlyFailed) => {
205                tracing::error!(
206                    worker_id,
207                    run_id,
208                    "Work item {:?} is permanently failed and should not be processed again",
209                    work_item
210                );
211                Err(FloxideError::Generic(format!(
212                    "Work item {:?} is permanently failed and should not be processed again",
213                    work_item
214                )))
215            }
216            Err(e) => {
217                tracing::error!(worker_id, run_id, "Error getting work item status: {:?}", e);
218                Err(FloxideError::Generic(format!(
219                    "Error getting work item status: {:?}",
220                    e
221                )))
222            }
223        }
224    }
225
226    /// Called when a terminal work item is processed successfully.
227    async fn on_item_processed_success_terminal_state_updates(
228        &self,
229        worker_id: usize,
230        run_id: &str,
231        work_item: &W::WorkItem,
232        output: &serde_json::Value,
233    ) -> Result<(), FloxideError> {
234        let status_result = self
235            .work_item_state_store
236            .get_status(run_id, work_item)
237            .await;
238        let status = status_result.ok(); // Get Option<WorkItemStatus>
239        tracing::debug!(worker_id, run_id=%run_id, ?work_item, current_status=?status, "Processing successful terminal item");
240        match status {
241            Some(WorkItemStatus::Completed) => {
242                tracing::warn!(
243                    worker_id,
244                    run_id,
245                    "Work item {:?} is already completed (terminal)",
246                    work_item
247                );
248                return Ok(());
249            }
250            Some(WorkItemStatus::PermanentlyFailed) => {
251                tracing::warn!(
252                    worker_id,
253                    run_id,
254                    "Work item {:?} is permanently failed (terminal)",
255                    work_item
256                );
257                return Ok(());
258            }
259            _ => {
260                self.work_item_state_store
261                    .set_status(run_id, work_item, WorkItemStatus::Completed)
262                    .await
263                    .ok();
264            }
265        }
266        let mut metrics = self
267            .metrics_store
268            .get_metrics(run_id)
269            .await
270            .ok()
271            .flatten()
272            .unwrap_or_default();
273        metrics.completed += 1;
274        metrics.total_work_items += 1;
275        self.metrics_store
276            .update_metrics(run_id, metrics)
277            .await
278            .ok();
279        let now = chrono::Utc::now();
280        tracing::debug!(worker_id, run_id=%run_id, "Attempting to set run status to Completed");
281        self.run_info_store
282            .update_status(run_id, RunStatus::Completed)
283            .await
284            .map_err(|e| {
285                FloxideError::Generic(format!("Failed to set run status to Completed: {}", e))
286            })?;
287        // Set the output field
288        self.run_info_store
289            .update_output(run_id, output.clone())
290            .await
291            .map_err(|e| FloxideError::Generic(format!("Failed to set run output: {}", e)))?;
292        self.run_info_store
293            .update_finished_at(run_id, now)
294            .await
295            .map_err(|e| FloxideError::Generic(format!("Failed to set run finished at: {}", e)))?;
296        Ok(())
297    }
298
299    /// Called when a non-terminal work item is processed successfully.
300    async fn on_item_processed_success_non_terminal_state_updates(
301        &self,
302        worker_id: usize,
303        run_id: &str,
304        work_item: &W::WorkItem,
305    ) -> Result<(), FloxideError> {
306        let status_result = self
307            .work_item_state_store
308            .get_status(run_id, work_item)
309            .await;
310        let status = status_result.ok(); // Get Option<WorkItemStatus>
311        tracing::debug!(worker_id, run_id=%run_id, ?work_item, current_status=?status, "Processing successful non-terminal item");
312        match status {
313            Some(WorkItemStatus::Completed) => {
314                tracing::warn!(
315                    worker_id,
316                    run_id,
317                    "Work item {:?} is already completed (non-terminal)",
318                    work_item
319                );
320                return Ok(());
321            }
322            Some(WorkItemStatus::PermanentlyFailed) => {
323                tracing::warn!(
324                    worker_id,
325                    run_id,
326                    "Work item {:?} is permanently failed (non-terminal)",
327                    work_item
328                );
329                return Ok(());
330            }
331            _ => {
332                self.work_item_state_store
333                    .set_status(run_id, work_item, WorkItemStatus::Completed)
334                    .await
335                    .ok();
336            }
337        }
338        let mut metrics = self
339            .metrics_store
340            .get_metrics(run_id)
341            .await
342            .ok()
343            .flatten()
344            .unwrap_or_default();
345        metrics.total_work_items += 1;
346        self.metrics_store
347            .update_metrics(run_id, metrics)
348            .await
349            .ok();
350        Ok(())
351    }
352
353    /// Called when a work item processing returns an error.
354    async fn on_item_processed_error_state_updates(
355        &self,
356        worker_id: usize,
357        run_id: &str,
358        work_item: &W::WorkItem,
359        e: &FloxideError,
360    ) -> Result<(), FloxideError> {
361        let status = self
362            .work_item_state_store
363            .get_status(run_id, work_item)
364            .await
365            .ok();
366        match status {
367            Some(WorkItemStatus::Completed) => {
368                tracing::warn!(
369                    worker_id,
370                    run_id,
371                    "Work item {:?} is already completed (error)",
372                    work_item
373                );
374                return Ok(());
375            }
376            Some(WorkItemStatus::PermanentlyFailed) => {
377                tracing::warn!(
378                    worker_id,
379                    run_id,
380                    "Work item {:?} is permanently failed (error)",
381                    work_item
382                );
383                return Ok(());
384            }
385            _ => {}
386        }
387        let mut health = self
388            .liveness_store
389            .get_health(worker_id)
390            .await
391            .ok()
392            .flatten()
393            .unwrap_or_default();
394        health.error_count += 1;
395        let policy = self.retry_policy.as_ref();
396        let attempt = self
397            .work_item_state_store
398            .get_attempts(run_id, work_item)
399            .await
400            .unwrap_or(0) as usize;
401        let should_retry = policy.map(|p| p.should_retry(e, attempt)).unwrap_or(false);
402        let max_attempts = policy.map(|p| p.max_attempts).unwrap_or(5);
403        let mut is_permanent = false;
404        if should_retry {
405            health.status = WorkerStatus::Retrying(attempt, max_attempts);
406            self.work_item_state_store
407                .set_status(run_id, work_item, WorkItemStatus::Failed)
408                .await
409                .ok();
410            let attempts = self
411                .work_item_state_store
412                .get_attempts(run_id, work_item)
413                .await
414                .unwrap_or(0);
415            if attempts >= max_attempts as u32 {
416                self.work_item_state_store
417                    .set_status(run_id, work_item, WorkItemStatus::PermanentlyFailed)
418                    .await
419                    .ok();
420                is_permanent = true;
421            } else {
422                // Set to WaitingRetry for backoff
423                tracing::debug!(worker_id, run_id=%run_id, ?work_item, attempt, "Setting item status to WaitingRetry");
424                self.work_item_state_store
425                    .set_status(run_id, work_item, WorkItemStatus::WaitingRetry)
426                    .await
427                    .ok();
428            }
429        } else {
430            self.work_item_state_store
431                .set_status(run_id, work_item, WorkItemStatus::PermanentlyFailed)
432                .await
433                .ok();
434            is_permanent = true;
435        }
436        self.liveness_store
437            .update_health(worker_id, health)
438            .await
439            .ok();
440        // Record error
441        let work_item_str = format!("{:?}", work_item);
442        let workflow_error = WorkflowError {
443            work_item: work_item_str,
444            error: format!("{:?}", e),
445            attempt,
446            timestamp: chrono::Utc::now(),
447        };
448        self.error_store
449            .record_error(run_id, workflow_error)
450            .await
451            .ok();
452        // Update metrics
453        let mut metrics = self
454            .metrics_store
455            .get_metrics(run_id)
456            .await
457            .ok()
458            .flatten()
459            .unwrap_or_default();
460        if should_retry && !is_permanent {
461            metrics.retries += 1;
462        } else {
463            metrics.failed += 1;
464        }
465        self.metrics_store
466            .update_metrics(run_id, metrics)
467            .await
468            .ok();
469        // On permanent failure, mark run as failed and purge
470        if is_permanent {
471            let now = chrono::Utc::now();
472            self.run_info_store
473                .update_status(run_id, RunStatus::Failed)
474                .await
475                .ok();
476            self.run_info_store
477                .update_finished_at(run_id, now)
478                .await
479                .ok();
480        }
481        // Retry or break: on retry, re-enqueue the failed work item
482        if should_retry && !is_permanent {
483            if let Some(policy) = policy {
484                let queue = self.queue.clone();
485                let run_id = run_id.to_string();
486                let work_item = work_item.clone();
487                let work_item_state_store = self.work_item_state_store.clone();
488                let backoff = policy.backoff_duration(attempt);
489                tracing::debug!(
490                    worker_id,
491                    run_id = %run_id,
492                    ?work_item,
493                    ?backoff,
494                    "Spawning task to re-enqueue work item after backoff"
495                );
496                tokio::spawn(async move {
497                    let task_run_id = run_id.clone();
498                    let task_work_item = work_item.clone();
499                    tracing::debug!(run_id = %task_run_id, work_item = ?task_work_item, ?backoff, "Retry task SPAWNED, will sleep");
500                    tokio::time::sleep(backoff).await;
501                    tracing::debug!(run_id = %task_run_id, work_item = ?task_work_item, "Retry task AWAKE after sleep");
502
503                    // Set to Pending before re-enqueue
504                    tracing::debug!(run_id = %task_run_id, work_item = ?task_work_item, "Retry task attempting to set item status to Pending");
505                    match work_item_state_store
506                        .set_status(&task_run_id, &task_work_item, WorkItemStatus::Pending)
507                        .await
508                    {
509                        Ok(_) => {
510                            tracing::debug!(run_id = %task_run_id, work_item = ?task_work_item, "Retry task successfully set item status to Pending");
511                        }
512                        Err(e) => {
513                            tracing::error!(
514                                run_id = %task_run_id,
515                                work_item = ?task_work_item,
516                                error = %e,
517                                "Retry task FAILED to set status to Pending"
518                            );
519                            // Optionally, decide if we should still attempt enqueue or just return
520                            return;
521                        }
522                    }
523
524                    tracing::debug!(run_id = %task_run_id, work_item = ?task_work_item, "Retry task attempting enqueue");
525                    match queue.enqueue(&task_run_id, task_work_item.clone()).await {
526                        Ok(_) => {
527                            tracing::debug!(run_id = %task_run_id, work_item = ?task_work_item, "Retry task successfully enqueued work item");
528                        }
529                        Err(e) => {
530                            tracing::error!(
531                                run_id = %task_run_id,
532                                work_item = ?task_work_item,
533                                error = %e,
534                                "Retry task FAILED to enqueue work item!"
535                            );
536                            // CRITICAL: The item failed to re-enqueue. It's now potentially lost.
537                            // Consider adding logic here to signal this failure more robustly,
538                            // maybe update the WorkItemStatus to a specific 'EnqueueFailed' state,
539                            // or alert an external monitoring system.
540                        }
541                    }
542                });
543            }
544        }
545        Ok(())
546    }
547
548    /// Update the worker's health to idle.
549    async fn on_idle_state_updates(&self, worker_id: usize) -> Result<(), FloxideError> {
550        let mut health = self
551            .liveness_store
552            .get_health(worker_id)
553            .await
554            .ok()
555            .flatten()
556            .unwrap_or_default();
557        health.status = WorkerStatus::Idle;
558        health.current_work_item = None;
559        health.current_work_item_run_id = None;
560        self.liveness_store
561            .update_health(worker_id, health)
562            .await
563            .ok();
564        Ok(())
565    }
566
567    /// Check if the worker is permanently failed and should stop.
568    async fn can_worker_continue(&self, worker_id: usize) -> bool {
569        let health = self
570            .liveness_store
571            .get_health(worker_id)
572            .await
573            .ok()
574            .flatten()
575            .unwrap_or_default();
576        matches!(health.status, WorkerStatus::Idle)
577    }
578
579    /// Process a single work item from the queue, updating all distributed state.
580    ///
581    /// Returns `Ok(Some((run_id, output)))` if a work item was processed, `Ok(None)` if no work was available, or `Err` on permanent failure.
582    ///
583    /// # Instrumentation
584    /// This method is instrumented with `tracing` for async span tracking.
585    #[tracing::instrument(skip(self))]
586    pub async fn run_once(
587        &self,
588        worker_id: usize,
589    ) -> Result<Option<(String, W::Output)>, FloxideError>
590    where
591        C: std::fmt::Debug + Clone + Send + Sync,
592    {
593        if !self.can_worker_continue(worker_id).await {
594            tracing::debug!(worker_id, "Worker is permanently failed, skipping work");
595            return Ok(None);
596        }
597        self.heartbeat(worker_id).await;
598        match self
599            .workflow
600            .step_distributed(
601                &self.context_store,
602                &self.queue,
603                worker_id,
604                self.build_callbacks(worker_id),
605            )
606            .await
607        {
608            Ok(Some((run_id, output))) => {
609                self.on_idle_state_updates(worker_id).await?;
610                Ok(Some((run_id, output)))
611            }
612            Ok(None) => {
613                self.on_idle_state_updates(worker_id).await?;
614                Ok(None)
615            }
616            Err(e) => {
617                self.on_idle_state_updates(worker_id).await?;
618                Err(e.error)
619            }
620        }
621    }
622
623    /// Continuously poll for work and process items, sleeping briefly when idle or on error.
624    ///
625    /// This method never returns and is suitable for running in a background task.
626    ///
627    /// # Instrumentation
628    /// This method is instrumented with `tracing` for async span tracking.
629    ///
630    /// Note: Returns [`std::convert::Infallible`] for compatibility with stable Rust (instead of the experimental `!` type).
631    #[tracing::instrument(skip(self))]
632    pub async fn run_forever(&self, worker_id: usize) -> std::convert::Infallible
633    where
634        C: std::fmt::Debug + Clone + Send + Sync,
635    {
636        let base_sleep_ms = self.idle_sleep_duration.as_millis() as u64;
637        // Jitter range is +/- half the jitter duration
638        let jitter_range_ms = (self.idle_sleep_jitter.as_millis() / 2) as i64;
639
640        loop {
641            match self.run_once(worker_id).await {
642                Ok(Some((_run_id, _output))) => {
643                    // Work was done, continue immediately
644                }
645                Ok(None) => {
646                    // No work available, sleep before polling again
647                    let jitter_ms =
648                        rand::thread_rng().gen_range(-jitter_range_ms..=jitter_range_ms);
649                    let sleep_ms = ((base_sleep_ms as i64) + jitter_ms).max(0) as u64;
650                    let sleep_duration = Duration::from_millis(sleep_ms);
651                    sleep(sleep_duration).await;
652                }
653                Err(e) => {
654                    error!(worker_id, error = ?e, "Worker encountered error in run_once");
655                    let jitter_ms =
656                        rand::thread_rng().gen_range(-jitter_range_ms..=jitter_range_ms);
657                    let sleep_ms = ((base_sleep_ms as i64) + jitter_ms).max(0) as u64;
658                    let sleep_duration = Duration::from_millis(sleep_ms);
659                    sleep(sleep_duration).await;
660                }
661            }
662        }
663    }
664
665    /// Heartbeat: update liveness store with current timestamp and update health.
666    ///
667    /// # Instrumentation
668    /// This method is instrumented with `tracing` for async span tracking.
669    #[tracing::instrument(skip(self))]
670    pub async fn heartbeat(&self, worker_id: usize)
671    where
672        C: std::fmt::Debug + Clone + Send + Sync,
673    {
674        let now = chrono::Utc::now();
675        let _ = self.liveness_store.update_heartbeat(worker_id, now).await;
676        // Fetch and update health
677        let mut health = self
678            .liveness_store
679            .get_health(worker_id)
680            .await
681            .ok()
682            .flatten()
683            .unwrap_or_default();
684        health.last_heartbeat = now;
685        let _ = self.liveness_store.update_health(worker_id, health).await;
686    }
687}
688
689pub struct WorkerBuilder<W, C, Q, RIS, MS, ES, LS, WISS, CS>
690where
691    W: Workflow<C, WorkItem: 'static>,
692    C: Context + crate::merge::Merge + Default,
693    Q: WorkQueue<C, W::WorkItem> + Send + Sync,
694    RIS: RunInfoStore + Send + Sync,
695    MS: MetricsStore + Send + Sync,
696    ES: ErrorStore + Send + Sync,
697    LS: LivenessStore + Send + Sync,
698    WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
699    CS: ContextStore<C> + Send + Sync + Clone + 'static,
700{
701    workflow: Option<W>,
702    queue: Option<Q>,
703    context_store: Option<CS>,
704    run_info_store: Option<RIS>,
705    metrics_store: Option<MS>,
706    error_store: Option<ES>,
707    liveness_store: Option<LS>,
708    work_item_state_store: Option<WISS>,
709    retry_policy: Option<RetryPolicy>,
710    idle_sleep_duration: Option<Duration>,
711    idle_sleep_jitter: Option<Duration>,
712    _phantom: std::marker::PhantomData<C>,
713}
714
715impl<W, C, Q, RIS, MS, ES, LS, WISS, CS> WorkerBuilder<W, C, Q, RIS, MS, ES, LS, WISS, CS>
716where
717    W: Workflow<C, WorkItem: 'static>,
718    C: Context + crate::merge::Merge + Default,
719    Q: WorkQueue<C, W::WorkItem> + Send + Sync,
720    RIS: RunInfoStore + Send + Sync,
721    MS: MetricsStore + Send + Sync,
722    ES: ErrorStore + Send + Sync,
723    LS: LivenessStore + Send + Sync,
724    WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
725    CS: ContextStore<C> + Send + Sync + Clone + 'static,
726{
727    pub fn new() -> Self {
728        Self {
729            workflow: None,
730            queue: None,
731            context_store: None,
732            run_info_store: None,
733            metrics_store: None,
734            error_store: None,
735            liveness_store: None,
736            work_item_state_store: None,
737            retry_policy: None,
738            idle_sleep_duration: None,
739            idle_sleep_jitter: None,
740            _phantom: std::marker::PhantomData,
741        }
742    }
743    pub fn workflow(mut self, workflow: W) -> Self {
744        self.workflow = Some(workflow);
745        self
746    }
747    pub fn queue(mut self, queue: Q) -> Self {
748        self.queue = Some(queue);
749        self
750    }
751    pub fn context_store(mut self, context_store: CS) -> Self {
752        self.context_store = Some(context_store);
753        self
754    }
755    pub fn run_info_store(mut self, ris: RIS) -> Self {
756        self.run_info_store = Some(ris);
757        self
758    }
759    pub fn metrics_store(mut self, ms: MS) -> Self {
760        self.metrics_store = Some(ms);
761        self
762    }
763    pub fn error_store(mut self, es: ES) -> Self {
764        self.error_store = Some(es);
765        self
766    }
767    pub fn liveness_store(mut self, ls: LS) -> Self {
768        self.liveness_store = Some(ls);
769        self
770    }
771    pub fn work_item_state_store(mut self, wiss: WISS) -> Self {
772        self.work_item_state_store = Some(wiss);
773        self
774    }
775    pub fn retry_policy(mut self, policy: RetryPolicy) -> Self {
776        self.retry_policy = Some(policy);
777        self
778    }
779    pub fn idle_sleep_duration(mut self, duration: Duration) -> Self {
780        self.idle_sleep_duration = Some(duration);
781        self
782    }
783    pub fn idle_sleep_jitter(mut self, jitter: Duration) -> Self {
784        self.idle_sleep_jitter = Some(jitter);
785        self
786    }
787    #[allow(clippy::type_complexity)]
788    pub fn build(self) -> Result<DistributedWorker<W, C, Q, RIS, MS, ES, LS, WISS, CS>, String>
789    where
790        W: Workflow<C, WorkItem: 'static>,
791        C: std::fmt::Debug + Clone + Send + Sync,
792        Q: WorkQueue<C, W::WorkItem> + Send + Sync,
793        RIS: RunInfoStore + Send + Sync,
794        MS: MetricsStore + Send + Sync,
795        ES: ErrorStore + Send + Sync,
796        LS: LivenessStore + Send + Sync,
797        WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
798        CS: ContextStore<C> + Send + Sync + Clone + 'static,
799    {
800        Ok(DistributedWorker {
801            workflow: self.workflow.ok_or("workflow is required")?,
802            queue: self.queue.ok_or("queue is required")?,
803            context_store: self.context_store.ok_or("context_store is required")?,
804            run_info_store: self.run_info_store.ok_or("run_info_store is required")?,
805            metrics_store: self.metrics_store.ok_or("metrics_store is required")?,
806            error_store: self.error_store.ok_or("error_store is required")?,
807            liveness_store: self.liveness_store.ok_or("liveness_store is required")?,
808            work_item_state_store: self
809                .work_item_state_store
810                .ok_or("work_item_state_store is required")?,
811            retry_policy: Some(self.retry_policy.unwrap_or_else(|| {
812                RetryPolicy::new(
813                    5,
814                    std::time::Duration::from_millis(1000),
815                    std::time::Duration::from_secs(10),
816                    BackoffStrategy::Exponential,
817                    RetryError::All,
818                )
819            })),
820            idle_sleep_duration: self
821                .idle_sleep_duration
822                .unwrap_or(Duration::from_millis(100)),
823            idle_sleep_jitter: self.idle_sleep_jitter.unwrap_or(Duration::from_millis(50)),
824            phantom: std::marker::PhantomData,
825        })
826    }
827}
828
829impl<W, C, Q, RIS, MS, ES, LS, WISS, CS> Default
830    for WorkerBuilder<W, C, Q, RIS, MS, ES, LS, WISS, CS>
831where
832    W: Workflow<C, WorkItem: 'static>,
833    C: Context + crate::merge::Merge + Default,
834    Q: WorkQueue<C, W::WorkItem> + Send + Sync,
835    RIS: RunInfoStore + Send + Sync,
836    MS: MetricsStore + Send + Sync,
837    ES: ErrorStore + Send + Sync,
838    LS: LivenessStore + Send + Sync,
839    WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
840    CS: ContextStore<C> + Send + Sync + Clone + 'static,
841{
842    fn default() -> Self {
843        Self::new()
844    }
845}
846
847/// A pool of distributed workflow workers, each running in its own async task.
848///
849/// The pool manages worker lifecycles, graceful shutdown, and health reporting.
850#[allow(clippy::type_complexity)]
851pub struct WorkerPool<W, C, Q, RIS, MS, ES, LS, WISS, CS>
852where
853    W: Workflow<C, WorkItem: 'static>,
854    C: Context + crate::merge::Merge + Default,
855    Q: WorkQueue<C, W::WorkItem> + Send + Sync,
856    RIS: RunInfoStore + Send + Sync,
857    MS: MetricsStore + Send + Sync,
858    ES: ErrorStore + Send + Sync,
859    LS: LivenessStore + Send + Sync,
860    WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
861    CS: ContextStore<C> + Send + Sync + Clone + 'static,
862{
863    worker: DistributedWorker<W, C, Q, RIS, MS, ES, LS, WISS, CS>,
864    num_workers: usize,
865    handles: Vec<JoinHandle<()>>,
866    cancel_tokens: Vec<CancellationToken>,
867}
868
869impl<W, C, Q, RIS, MS, ES, LS, WISS, CS> WorkerPool<W, C, Q, RIS, MS, ES, LS, WISS, CS>
870where
871    W: Workflow<C, WorkItem: 'static> + 'static,
872    C: Context + crate::merge::Merge + Default + 'static,
873    Q: WorkQueue<C, W::WorkItem> + Send + Sync + Clone + 'static,
874    RIS: RunInfoStore + Send + Sync + Clone + 'static,
875    MS: MetricsStore + Send + Sync + Clone + 'static,
876    ES: ErrorStore + Send + Sync + Clone + 'static,
877    LS: LivenessStore + Send + Sync + Clone + 'static,
878    WISS: WorkItemStateStore<W::WorkItem> + Send + Sync + Clone + 'static,
879    CS: ContextStore<C> + Send + Sync + Clone + 'static,
880{
881    /// Create a new worker pool with the given worker and number of workers.
882    pub fn new(
883        worker: DistributedWorker<W, C, Q, RIS, MS, ES, LS, WISS, CS>,
884        num_workers: usize,
885    ) -> Self {
886        Self {
887            worker,
888            num_workers,
889            handles: Vec::new(),
890            cancel_tokens: Vec::new(),
891        }
892    }
893
894    /// Start all workers in the pool. Each worker runs in its own async task.
895    pub fn start(&mut self) {
896        for worker_id in 0..self.num_workers {
897            let cancel_token = CancellationToken::new();
898            let cancel_token_child = cancel_token.child_token();
899            let worker = self.worker.clone();
900            let handle = tokio::spawn(async move {
901                let token = cancel_token_child;
902                tokio::select! {
903                    _ = worker.run_forever(worker_id) => {},
904                    _ = token.cancelled() => {},
905                }
906            });
907            self.handles.push(handle);
908            self.cancel_tokens.push(cancel_token);
909        }
910    }
911
912    /// Gracefully stop all workers by signalling cancellation and waiting for them to finish.
913    pub async fn stop(&mut self) {
914        for token in &self.cancel_tokens {
915            token.cancel();
916        }
917        for handle in self.handles.drain(..) {
918            let _ = handle.await;
919        }
920    }
921
922    /// Wait for all workers to finish.
923    pub async fn join(&mut self) {
924        for handle in self.handles.drain(..) {
925            let _ = handle.await;
926        }
927    }
928
929    /// Get health/status of all workers from the liveness store.
930    pub async fn health(&self) -> Vec<WorkerHealth> {
931        self.worker
932            .liveness_store
933            .list_health()
934            .await
935            .unwrap_or_default()
936    }
937}
938
939#[allow(clippy::type_complexity)]
940struct StepCallbacksImpl<
941    C: Context + crate::merge::Merge + Default,
942    W: Workflow<C>,
943    Q,
944    RIS,
945    MS,
946    ES,
947    LS,
948    WISS,
949    CS,
950> where
951    W: Workflow<C, WorkItem: 'static>,
952    C: Context + crate::merge::Merge + Default,
953    Q: WorkQueue<C, W::WorkItem> + Send + Sync,
954    RIS: RunInfoStore + Send + Sync,
955    MS: MetricsStore + Send + Sync,
956    ES: ErrorStore + Send + Sync,
957    LS: LivenessStore + Send + Sync,
958    WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
959    CS: ContextStore<C> + Send + Sync + Clone + 'static,
960{
961    worker: Arc<DistributedWorker<W, C, Q, RIS, MS, ES, LS, WISS, CS>>,
962    worker_id: usize,
963}
964
965#[async_trait]
966impl<C, W, Q, RIS, MS, ES, LS, WISS, CS> StepCallbacks<C, W>
967    for StepCallbacksImpl<C, W, Q, RIS, MS, ES, LS, WISS, CS>
968where
969    W: Workflow<C, WorkItem: 'static> + 'static,
970    C: Context + crate::merge::Merge + Default + 'static,
971    Q: WorkQueue<C, W::WorkItem> + Send + Sync + Clone,
972    RIS: RunInfoStore + Send + Sync + Clone + 'static,
973    MS: MetricsStore + Send + Sync + Clone + 'static,
974    ES: ErrorStore + Send + Sync + Clone + 'static,
975    LS: LivenessStore + Send + Sync + Clone + 'static,
976    WISS: WorkItemStateStore<W::WorkItem> + Send + Sync + Clone + 'static,
977    CS: ContextStore<C> + Send + Sync + Clone + 'static,
978{
979    async fn on_started(&self, run_id: String, item: W::WorkItem) -> Result<(), FloxideError> {
980        if let Err(e) = self
981            .worker
982            .on_started_state_updates(self.worker_id, &run_id, &item)
983            .await
984        {
985            tracing::error!(worker_id = self.worker_id, run_id = %run_id, "on_started_state_updates failed: {:?}", e);
986        }
987        Ok(())
988    }
989    async fn on_item_processed(
990        &self,
991        run_id: String,
992        item: W::WorkItem,
993        outcome: ItemProcessedOutcome,
994    ) -> Result<(), FloxideError> {
995        let result = match outcome {
996            ItemProcessedOutcome::SuccessTerminal(output) => {
997                self.worker
998                    .on_item_processed_success_terminal_state_updates(
999                        self.worker_id,
1000                        &run_id,
1001                        &item,
1002                        &output,
1003                    )
1004                    .await
1005            }
1006            ItemProcessedOutcome::SuccessNonTerminal => {
1007                self.worker
1008                    .on_item_processed_success_non_terminal_state_updates(
1009                        self.worker_id,
1010                        &run_id,
1011                        &item,
1012                    )
1013                    .await
1014            }
1015            ItemProcessedOutcome::Error(e) => {
1016                self.worker
1017                    .on_item_processed_error_state_updates(self.worker_id, &run_id, &item, &e)
1018                    .await
1019            }
1020        };
1021        if let Err(e) = result {
1022            tracing::error!(worker_id = self.worker_id, run_id = %run_id, "on_item_processed_state_updates failed: {:?}", e);
1023        }
1024        Ok(())
1025    }
1026}