1use crate::context::Context;
2use crate::distributed::{
3 ContextStore, ErrorStore, LivenessStore, MetricsStore, RunInfoStore, RunStatus,
4 WorkItemStateStore, WorkItemStatus, WorkQueue, WorkerHealth, WorkerStatus, WorkflowError,
5};
6use crate::error::FloxideError;
7use crate::retry::{BackoffStrategy, RetryError, RetryPolicy};
8use crate::workflow::Workflow;
9use async_trait::async_trait;
10use rand::Rng;
11use serde_json;
12use std::marker::PhantomData;
13use std::sync::Arc;
14use tokio::task::JoinHandle;
15use tokio::time::{sleep, Duration};
16use tokio_util::sync::CancellationToken;
17use tracing::error;
18
19use super::{ItemProcessedOutcome, StepCallbacks};
20
21#[derive(Clone)]
25pub struct DistributedWorker<W, C, Q, RIS, MS, ES, LS, WISS, CS>
26where
27 W: Workflow<C, WorkItem: 'static>,
28 C: Context + crate::merge::Merge + Default,
29 Q: WorkQueue<C, W::WorkItem> + Send + Sync + 'static,
30 RIS: RunInfoStore + Send + Sync,
31 MS: MetricsStore + Send + Sync,
32 ES: ErrorStore + Send + Sync,
33 LS: LivenessStore + Send + Sync,
34 WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
35 CS: ContextStore<C> + Send + Sync + Clone + 'static,
36{
37 workflow: W,
38 queue: Q,
39 context_store: CS,
40 run_info_store: RIS,
41 metrics_store: MS,
42 error_store: ES,
43 liveness_store: LS,
44 work_item_state_store: WISS,
45 retry_policy: Option<RetryPolicy>,
46 idle_sleep_duration: Duration,
47 idle_sleep_jitter: Duration,
48 phantom: PhantomData<C>,
49}
50
51impl<W, C, Q, RIS, MS, ES, LS, WISS, CS> DistributedWorker<W, C, Q, RIS, MS, ES, LS, WISS, CS>
52where
53 W: Workflow<C, WorkItem: 'static> + 'static,
54 C: Context + crate::merge::Merge + Default + 'static,
55 Q: WorkQueue<C, W::WorkItem> + Send + Sync + Clone,
56 RIS: RunInfoStore + Send + Sync + Clone + 'static,
57 MS: MetricsStore + Send + Sync + Clone + 'static,
58 ES: ErrorStore + Send + Sync + Clone + 'static,
59 LS: LivenessStore + Send + Sync + Clone + 'static,
60 WISS: WorkItemStateStore<W::WorkItem> + Send + Sync + Clone + 'static,
61 CS: ContextStore<C> + Send + Sync + Clone + 'static,
62 Self: Clone,
63{
64 #[allow(clippy::too_many_arguments)]
68 pub fn new(
69 workflow: W,
70 queue: Q,
71 context_store: CS,
72 run_info_store: RIS,
73 metrics_store: MS,
74 error_store: ES,
75 liveness_store: LS,
76 work_item_state_store: WISS,
77 ) -> Self {
78 Self {
79 workflow,
80 queue,
81 context_store,
82 run_info_store,
83 metrics_store,
84 error_store,
85 liveness_store,
86 work_item_state_store,
87 retry_policy: None,
88 idle_sleep_duration: Duration::from_millis(100),
89 idle_sleep_jitter: Duration::from_millis(50),
90 phantom: PhantomData,
91 }
92 }
93
94 pub fn set_retry_policy(&mut self, policy: RetryPolicy) {
96 self.retry_policy = Some(policy);
97 }
98
99 #[allow(clippy::type_complexity)]
100 fn build_callbacks(
101 &self,
102 worker_id: usize,
103 ) -> Arc<StepCallbacksImpl<C, W, Q, RIS, MS, ES, LS, WISS, CS>> {
104 let cloned_worker = self.clone();
105 Arc::new(StepCallbacksImpl {
106 worker: Arc::new(cloned_worker),
107 worker_id,
108 })
109 }
110
111 async fn on_started_state_updates(
115 &self,
116 worker_id: usize,
117 run_id: &str,
118 work_item: &W::WorkItem,
119 ) -> Result<(), FloxideError> {
120 let mut health = self
121 .liveness_store
122 .get_health(worker_id)
123 .await
124 .ok()
125 .flatten()
126 .unwrap_or_default();
127 health.status = WorkerStatus::InProgress;
128 health.current_work_item = Some(format!("{:?}", work_item));
129 health.current_work_item_run_id = Some(run_id.to_string());
130 self.liveness_store
131 .update_health(worker_id, health)
132 .await
133 .ok();
134 let item_state = self
135 .work_item_state_store
136 .get_status(run_id, work_item)
137 .await;
138 match item_state {
139 Ok(WorkItemStatus::Pending) => {
140 self.work_item_state_store
142 .set_status(run_id, work_item, WorkItemStatus::InProgress)
143 .await
144 .ok();
145 self.work_item_state_store
146 .increment_attempts(run_id, work_item)
147 .await
148 .ok();
149 Ok(())
150 }
151 Ok(WorkItemStatus::InProgress) => {
152 tracing::warn!(
156 worker_id,
157 run_id,
158 "Work item {:?} is already in progress",
159 work_item
160 );
161 Err(FloxideError::Generic(format!(
162 "Work item {:?} is already in progress",
163 work_item
164 )))
165 }
166 Ok(WorkItemStatus::Completed) => {
167 tracing::warn!(
170 worker_id,
171 run_id,
172 "Work item {:?} is already completed",
173 work_item
174 );
175 Err(FloxideError::Generic(format!(
176 "Work item {:?} is already completed",
177 work_item
178 )))
179 }
180 Ok(WorkItemStatus::Failed) => {
181 tracing::error!(
182 worker_id,
183 run_id,
184 "Work item {:?} previously failed and should not be processed again",
185 work_item
186 );
187 Err(FloxideError::Generic(format!(
188 "Work item {:?} previously failed and should not be processed again",
189 work_item
190 )))
191 }
192 Ok(WorkItemStatus::WaitingRetry) => {
193 tracing::warn!(
194 worker_id,
195 run_id,
196 "Work item {:?} is waiting for retry backoff",
197 work_item
198 );
199 Err(FloxideError::Generic(format!(
200 "Work item {:?} is waiting for retry backoff",
201 work_item
202 )))
203 }
204 Ok(WorkItemStatus::PermanentlyFailed) => {
205 tracing::error!(
206 worker_id,
207 run_id,
208 "Work item {:?} is permanently failed and should not be processed again",
209 work_item
210 );
211 Err(FloxideError::Generic(format!(
212 "Work item {:?} is permanently failed and should not be processed again",
213 work_item
214 )))
215 }
216 Err(e) => {
217 tracing::error!(worker_id, run_id, "Error getting work item status: {:?}", e);
218 Err(FloxideError::Generic(format!(
219 "Error getting work item status: {:?}",
220 e
221 )))
222 }
223 }
224 }
225
226 async fn on_item_processed_success_terminal_state_updates(
228 &self,
229 worker_id: usize,
230 run_id: &str,
231 work_item: &W::WorkItem,
232 output: &serde_json::Value,
233 ) -> Result<(), FloxideError> {
234 let status_result = self
235 .work_item_state_store
236 .get_status(run_id, work_item)
237 .await;
238 let status = status_result.ok(); tracing::debug!(worker_id, run_id=%run_id, ?work_item, current_status=?status, "Processing successful terminal item");
240 match status {
241 Some(WorkItemStatus::Completed) => {
242 tracing::warn!(
243 worker_id,
244 run_id,
245 "Work item {:?} is already completed (terminal)",
246 work_item
247 );
248 return Ok(());
249 }
250 Some(WorkItemStatus::PermanentlyFailed) => {
251 tracing::warn!(
252 worker_id,
253 run_id,
254 "Work item {:?} is permanently failed (terminal)",
255 work_item
256 );
257 return Ok(());
258 }
259 _ => {
260 self.work_item_state_store
261 .set_status(run_id, work_item, WorkItemStatus::Completed)
262 .await
263 .ok();
264 }
265 }
266 let mut metrics = self
267 .metrics_store
268 .get_metrics(run_id)
269 .await
270 .ok()
271 .flatten()
272 .unwrap_or_default();
273 metrics.completed += 1;
274 metrics.total_work_items += 1;
275 self.metrics_store
276 .update_metrics(run_id, metrics)
277 .await
278 .ok();
279 let now = chrono::Utc::now();
280 tracing::debug!(worker_id, run_id=%run_id, "Attempting to set run status to Completed");
281 self.run_info_store
282 .update_status(run_id, RunStatus::Completed)
283 .await
284 .map_err(|e| {
285 FloxideError::Generic(format!("Failed to set run status to Completed: {}", e))
286 })?;
287 self.run_info_store
289 .update_output(run_id, output.clone())
290 .await
291 .map_err(|e| FloxideError::Generic(format!("Failed to set run output: {}", e)))?;
292 self.run_info_store
293 .update_finished_at(run_id, now)
294 .await
295 .map_err(|e| FloxideError::Generic(format!("Failed to set run finished at: {}", e)))?;
296 Ok(())
297 }
298
299 async fn on_item_processed_success_non_terminal_state_updates(
301 &self,
302 worker_id: usize,
303 run_id: &str,
304 work_item: &W::WorkItem,
305 ) -> Result<(), FloxideError> {
306 let status_result = self
307 .work_item_state_store
308 .get_status(run_id, work_item)
309 .await;
310 let status = status_result.ok(); tracing::debug!(worker_id, run_id=%run_id, ?work_item, current_status=?status, "Processing successful non-terminal item");
312 match status {
313 Some(WorkItemStatus::Completed) => {
314 tracing::warn!(
315 worker_id,
316 run_id,
317 "Work item {:?} is already completed (non-terminal)",
318 work_item
319 );
320 return Ok(());
321 }
322 Some(WorkItemStatus::PermanentlyFailed) => {
323 tracing::warn!(
324 worker_id,
325 run_id,
326 "Work item {:?} is permanently failed (non-terminal)",
327 work_item
328 );
329 return Ok(());
330 }
331 _ => {
332 self.work_item_state_store
333 .set_status(run_id, work_item, WorkItemStatus::Completed)
334 .await
335 .ok();
336 }
337 }
338 let mut metrics = self
339 .metrics_store
340 .get_metrics(run_id)
341 .await
342 .ok()
343 .flatten()
344 .unwrap_or_default();
345 metrics.total_work_items += 1;
346 self.metrics_store
347 .update_metrics(run_id, metrics)
348 .await
349 .ok();
350 Ok(())
351 }
352
353 async fn on_item_processed_error_state_updates(
355 &self,
356 worker_id: usize,
357 run_id: &str,
358 work_item: &W::WorkItem,
359 e: &FloxideError,
360 ) -> Result<(), FloxideError> {
361 let status = self
362 .work_item_state_store
363 .get_status(run_id, work_item)
364 .await
365 .ok();
366 match status {
367 Some(WorkItemStatus::Completed) => {
368 tracing::warn!(
369 worker_id,
370 run_id,
371 "Work item {:?} is already completed (error)",
372 work_item
373 );
374 return Ok(());
375 }
376 Some(WorkItemStatus::PermanentlyFailed) => {
377 tracing::warn!(
378 worker_id,
379 run_id,
380 "Work item {:?} is permanently failed (error)",
381 work_item
382 );
383 return Ok(());
384 }
385 _ => {}
386 }
387 let mut health = self
388 .liveness_store
389 .get_health(worker_id)
390 .await
391 .ok()
392 .flatten()
393 .unwrap_or_default();
394 health.error_count += 1;
395 let policy = self.retry_policy.as_ref();
396 let attempt = self
397 .work_item_state_store
398 .get_attempts(run_id, work_item)
399 .await
400 .unwrap_or(0) as usize;
401 let should_retry = policy.map(|p| p.should_retry(e, attempt)).unwrap_or(false);
402 let max_attempts = policy.map(|p| p.max_attempts).unwrap_or(5);
403 let mut is_permanent = false;
404 if should_retry {
405 health.status = WorkerStatus::Retrying(attempt, max_attempts);
406 self.work_item_state_store
407 .set_status(run_id, work_item, WorkItemStatus::Failed)
408 .await
409 .ok();
410 let attempts = self
411 .work_item_state_store
412 .get_attempts(run_id, work_item)
413 .await
414 .unwrap_or(0);
415 if attempts >= max_attempts as u32 {
416 self.work_item_state_store
417 .set_status(run_id, work_item, WorkItemStatus::PermanentlyFailed)
418 .await
419 .ok();
420 is_permanent = true;
421 } else {
422 tracing::debug!(worker_id, run_id=%run_id, ?work_item, attempt, "Setting item status to WaitingRetry");
424 self.work_item_state_store
425 .set_status(run_id, work_item, WorkItemStatus::WaitingRetry)
426 .await
427 .ok();
428 }
429 } else {
430 self.work_item_state_store
431 .set_status(run_id, work_item, WorkItemStatus::PermanentlyFailed)
432 .await
433 .ok();
434 is_permanent = true;
435 }
436 self.liveness_store
437 .update_health(worker_id, health)
438 .await
439 .ok();
440 let work_item_str = format!("{:?}", work_item);
442 let workflow_error = WorkflowError {
443 work_item: work_item_str,
444 error: format!("{:?}", e),
445 attempt,
446 timestamp: chrono::Utc::now(),
447 };
448 self.error_store
449 .record_error(run_id, workflow_error)
450 .await
451 .ok();
452 let mut metrics = self
454 .metrics_store
455 .get_metrics(run_id)
456 .await
457 .ok()
458 .flatten()
459 .unwrap_or_default();
460 if should_retry && !is_permanent {
461 metrics.retries += 1;
462 } else {
463 metrics.failed += 1;
464 }
465 self.metrics_store
466 .update_metrics(run_id, metrics)
467 .await
468 .ok();
469 if is_permanent {
471 let now = chrono::Utc::now();
472 self.run_info_store
473 .update_status(run_id, RunStatus::Failed)
474 .await
475 .ok();
476 self.run_info_store
477 .update_finished_at(run_id, now)
478 .await
479 .ok();
480 }
481 if should_retry && !is_permanent {
483 if let Some(policy) = policy {
484 let queue = self.queue.clone();
485 let run_id = run_id.to_string();
486 let work_item = work_item.clone();
487 let work_item_state_store = self.work_item_state_store.clone();
488 let backoff = policy.backoff_duration(attempt);
489 tracing::debug!(
490 worker_id,
491 run_id = %run_id,
492 ?work_item,
493 ?backoff,
494 "Spawning task to re-enqueue work item after backoff"
495 );
496 tokio::spawn(async move {
497 let task_run_id = run_id.clone();
498 let task_work_item = work_item.clone();
499 tracing::debug!(run_id = %task_run_id, work_item = ?task_work_item, ?backoff, "Retry task SPAWNED, will sleep");
500 tokio::time::sleep(backoff).await;
501 tracing::debug!(run_id = %task_run_id, work_item = ?task_work_item, "Retry task AWAKE after sleep");
502
503 tracing::debug!(run_id = %task_run_id, work_item = ?task_work_item, "Retry task attempting to set item status to Pending");
505 match work_item_state_store
506 .set_status(&task_run_id, &task_work_item, WorkItemStatus::Pending)
507 .await
508 {
509 Ok(_) => {
510 tracing::debug!(run_id = %task_run_id, work_item = ?task_work_item, "Retry task successfully set item status to Pending");
511 }
512 Err(e) => {
513 tracing::error!(
514 run_id = %task_run_id,
515 work_item = ?task_work_item,
516 error = %e,
517 "Retry task FAILED to set status to Pending"
518 );
519 return;
521 }
522 }
523
524 tracing::debug!(run_id = %task_run_id, work_item = ?task_work_item, "Retry task attempting enqueue");
525 match queue.enqueue(&task_run_id, task_work_item.clone()).await {
526 Ok(_) => {
527 tracing::debug!(run_id = %task_run_id, work_item = ?task_work_item, "Retry task successfully enqueued work item");
528 }
529 Err(e) => {
530 tracing::error!(
531 run_id = %task_run_id,
532 work_item = ?task_work_item,
533 error = %e,
534 "Retry task FAILED to enqueue work item!"
535 );
536 }
541 }
542 });
543 }
544 }
545 Ok(())
546 }
547
548 async fn on_idle_state_updates(&self, worker_id: usize) -> Result<(), FloxideError> {
550 let mut health = self
551 .liveness_store
552 .get_health(worker_id)
553 .await
554 .ok()
555 .flatten()
556 .unwrap_or_default();
557 health.status = WorkerStatus::Idle;
558 health.current_work_item = None;
559 health.current_work_item_run_id = None;
560 self.liveness_store
561 .update_health(worker_id, health)
562 .await
563 .ok();
564 Ok(())
565 }
566
567 async fn can_worker_continue(&self, worker_id: usize) -> bool {
569 let health = self
570 .liveness_store
571 .get_health(worker_id)
572 .await
573 .ok()
574 .flatten()
575 .unwrap_or_default();
576 matches!(health.status, WorkerStatus::Idle)
577 }
578
579 #[tracing::instrument(skip(self))]
586 pub async fn run_once(
587 &self,
588 worker_id: usize,
589 ) -> Result<Option<(String, W::Output)>, FloxideError>
590 where
591 C: std::fmt::Debug + Clone + Send + Sync,
592 {
593 if !self.can_worker_continue(worker_id).await {
594 tracing::debug!(worker_id, "Worker is permanently failed, skipping work");
595 return Ok(None);
596 }
597 self.heartbeat(worker_id).await;
598 match self
599 .workflow
600 .step_distributed(
601 &self.context_store,
602 &self.queue,
603 worker_id,
604 self.build_callbacks(worker_id),
605 )
606 .await
607 {
608 Ok(Some((run_id, output))) => {
609 self.on_idle_state_updates(worker_id).await?;
610 Ok(Some((run_id, output)))
611 }
612 Ok(None) => {
613 self.on_idle_state_updates(worker_id).await?;
614 Ok(None)
615 }
616 Err(e) => {
617 self.on_idle_state_updates(worker_id).await?;
618 Err(e.error)
619 }
620 }
621 }
622
623 #[tracing::instrument(skip(self))]
632 pub async fn run_forever(&self, worker_id: usize) -> std::convert::Infallible
633 where
634 C: std::fmt::Debug + Clone + Send + Sync,
635 {
636 let base_sleep_ms = self.idle_sleep_duration.as_millis() as u64;
637 let jitter_range_ms = (self.idle_sleep_jitter.as_millis() / 2) as i64;
639
640 loop {
641 match self.run_once(worker_id).await {
642 Ok(Some((_run_id, _output))) => {
643 }
645 Ok(None) => {
646 let jitter_ms =
648 rand::thread_rng().gen_range(-jitter_range_ms..=jitter_range_ms);
649 let sleep_ms = ((base_sleep_ms as i64) + jitter_ms).max(0) as u64;
650 let sleep_duration = Duration::from_millis(sleep_ms);
651 sleep(sleep_duration).await;
652 }
653 Err(e) => {
654 error!(worker_id, error = ?e, "Worker encountered error in run_once");
655 let jitter_ms =
656 rand::thread_rng().gen_range(-jitter_range_ms..=jitter_range_ms);
657 let sleep_ms = ((base_sleep_ms as i64) + jitter_ms).max(0) as u64;
658 let sleep_duration = Duration::from_millis(sleep_ms);
659 sleep(sleep_duration).await;
660 }
661 }
662 }
663 }
664
665 #[tracing::instrument(skip(self))]
670 pub async fn heartbeat(&self, worker_id: usize)
671 where
672 C: std::fmt::Debug + Clone + Send + Sync,
673 {
674 let now = chrono::Utc::now();
675 let _ = self.liveness_store.update_heartbeat(worker_id, now).await;
676 let mut health = self
678 .liveness_store
679 .get_health(worker_id)
680 .await
681 .ok()
682 .flatten()
683 .unwrap_or_default();
684 health.last_heartbeat = now;
685 let _ = self.liveness_store.update_health(worker_id, health).await;
686 }
687}
688
689pub struct WorkerBuilder<W, C, Q, RIS, MS, ES, LS, WISS, CS>
690where
691 W: Workflow<C, WorkItem: 'static>,
692 C: Context + crate::merge::Merge + Default,
693 Q: WorkQueue<C, W::WorkItem> + Send + Sync,
694 RIS: RunInfoStore + Send + Sync,
695 MS: MetricsStore + Send + Sync,
696 ES: ErrorStore + Send + Sync,
697 LS: LivenessStore + Send + Sync,
698 WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
699 CS: ContextStore<C> + Send + Sync + Clone + 'static,
700{
701 workflow: Option<W>,
702 queue: Option<Q>,
703 context_store: Option<CS>,
704 run_info_store: Option<RIS>,
705 metrics_store: Option<MS>,
706 error_store: Option<ES>,
707 liveness_store: Option<LS>,
708 work_item_state_store: Option<WISS>,
709 retry_policy: Option<RetryPolicy>,
710 idle_sleep_duration: Option<Duration>,
711 idle_sleep_jitter: Option<Duration>,
712 _phantom: std::marker::PhantomData<C>,
713}
714
715impl<W, C, Q, RIS, MS, ES, LS, WISS, CS> WorkerBuilder<W, C, Q, RIS, MS, ES, LS, WISS, CS>
716where
717 W: Workflow<C, WorkItem: 'static>,
718 C: Context + crate::merge::Merge + Default,
719 Q: WorkQueue<C, W::WorkItem> + Send + Sync,
720 RIS: RunInfoStore + Send + Sync,
721 MS: MetricsStore + Send + Sync,
722 ES: ErrorStore + Send + Sync,
723 LS: LivenessStore + Send + Sync,
724 WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
725 CS: ContextStore<C> + Send + Sync + Clone + 'static,
726{
727 pub fn new() -> Self {
728 Self {
729 workflow: None,
730 queue: None,
731 context_store: None,
732 run_info_store: None,
733 metrics_store: None,
734 error_store: None,
735 liveness_store: None,
736 work_item_state_store: None,
737 retry_policy: None,
738 idle_sleep_duration: None,
739 idle_sleep_jitter: None,
740 _phantom: std::marker::PhantomData,
741 }
742 }
743 pub fn workflow(mut self, workflow: W) -> Self {
744 self.workflow = Some(workflow);
745 self
746 }
747 pub fn queue(mut self, queue: Q) -> Self {
748 self.queue = Some(queue);
749 self
750 }
751 pub fn context_store(mut self, context_store: CS) -> Self {
752 self.context_store = Some(context_store);
753 self
754 }
755 pub fn run_info_store(mut self, ris: RIS) -> Self {
756 self.run_info_store = Some(ris);
757 self
758 }
759 pub fn metrics_store(mut self, ms: MS) -> Self {
760 self.metrics_store = Some(ms);
761 self
762 }
763 pub fn error_store(mut self, es: ES) -> Self {
764 self.error_store = Some(es);
765 self
766 }
767 pub fn liveness_store(mut self, ls: LS) -> Self {
768 self.liveness_store = Some(ls);
769 self
770 }
771 pub fn work_item_state_store(mut self, wiss: WISS) -> Self {
772 self.work_item_state_store = Some(wiss);
773 self
774 }
775 pub fn retry_policy(mut self, policy: RetryPolicy) -> Self {
776 self.retry_policy = Some(policy);
777 self
778 }
779 pub fn idle_sleep_duration(mut self, duration: Duration) -> Self {
780 self.idle_sleep_duration = Some(duration);
781 self
782 }
783 pub fn idle_sleep_jitter(mut self, jitter: Duration) -> Self {
784 self.idle_sleep_jitter = Some(jitter);
785 self
786 }
787 #[allow(clippy::type_complexity)]
788 pub fn build(self) -> Result<DistributedWorker<W, C, Q, RIS, MS, ES, LS, WISS, CS>, String>
789 where
790 W: Workflow<C, WorkItem: 'static>,
791 C: std::fmt::Debug + Clone + Send + Sync,
792 Q: WorkQueue<C, W::WorkItem> + Send + Sync,
793 RIS: RunInfoStore + Send + Sync,
794 MS: MetricsStore + Send + Sync,
795 ES: ErrorStore + Send + Sync,
796 LS: LivenessStore + Send + Sync,
797 WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
798 CS: ContextStore<C> + Send + Sync + Clone + 'static,
799 {
800 Ok(DistributedWorker {
801 workflow: self.workflow.ok_or("workflow is required")?,
802 queue: self.queue.ok_or("queue is required")?,
803 context_store: self.context_store.ok_or("context_store is required")?,
804 run_info_store: self.run_info_store.ok_or("run_info_store is required")?,
805 metrics_store: self.metrics_store.ok_or("metrics_store is required")?,
806 error_store: self.error_store.ok_or("error_store is required")?,
807 liveness_store: self.liveness_store.ok_or("liveness_store is required")?,
808 work_item_state_store: self
809 .work_item_state_store
810 .ok_or("work_item_state_store is required")?,
811 retry_policy: Some(self.retry_policy.unwrap_or_else(|| {
812 RetryPolicy::new(
813 5,
814 std::time::Duration::from_millis(1000),
815 std::time::Duration::from_secs(10),
816 BackoffStrategy::Exponential,
817 RetryError::All,
818 )
819 })),
820 idle_sleep_duration: self
821 .idle_sleep_duration
822 .unwrap_or(Duration::from_millis(100)),
823 idle_sleep_jitter: self.idle_sleep_jitter.unwrap_or(Duration::from_millis(50)),
824 phantom: std::marker::PhantomData,
825 })
826 }
827}
828
829impl<W, C, Q, RIS, MS, ES, LS, WISS, CS> Default
830 for WorkerBuilder<W, C, Q, RIS, MS, ES, LS, WISS, CS>
831where
832 W: Workflow<C, WorkItem: 'static>,
833 C: Context + crate::merge::Merge + Default,
834 Q: WorkQueue<C, W::WorkItem> + Send + Sync,
835 RIS: RunInfoStore + Send + Sync,
836 MS: MetricsStore + Send + Sync,
837 ES: ErrorStore + Send + Sync,
838 LS: LivenessStore + Send + Sync,
839 WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
840 CS: ContextStore<C> + Send + Sync + Clone + 'static,
841{
842 fn default() -> Self {
843 Self::new()
844 }
845}
846
847#[allow(clippy::type_complexity)]
851pub struct WorkerPool<W, C, Q, RIS, MS, ES, LS, WISS, CS>
852where
853 W: Workflow<C, WorkItem: 'static>,
854 C: Context + crate::merge::Merge + Default,
855 Q: WorkQueue<C, W::WorkItem> + Send + Sync,
856 RIS: RunInfoStore + Send + Sync,
857 MS: MetricsStore + Send + Sync,
858 ES: ErrorStore + Send + Sync,
859 LS: LivenessStore + Send + Sync,
860 WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
861 CS: ContextStore<C> + Send + Sync + Clone + 'static,
862{
863 worker: DistributedWorker<W, C, Q, RIS, MS, ES, LS, WISS, CS>,
864 num_workers: usize,
865 handles: Vec<JoinHandle<()>>,
866 cancel_tokens: Vec<CancellationToken>,
867}
868
869impl<W, C, Q, RIS, MS, ES, LS, WISS, CS> WorkerPool<W, C, Q, RIS, MS, ES, LS, WISS, CS>
870where
871 W: Workflow<C, WorkItem: 'static> + 'static,
872 C: Context + crate::merge::Merge + Default + 'static,
873 Q: WorkQueue<C, W::WorkItem> + Send + Sync + Clone + 'static,
874 RIS: RunInfoStore + Send + Sync + Clone + 'static,
875 MS: MetricsStore + Send + Sync + Clone + 'static,
876 ES: ErrorStore + Send + Sync + Clone + 'static,
877 LS: LivenessStore + Send + Sync + Clone + 'static,
878 WISS: WorkItemStateStore<W::WorkItem> + Send + Sync + Clone + 'static,
879 CS: ContextStore<C> + Send + Sync + Clone + 'static,
880{
881 pub fn new(
883 worker: DistributedWorker<W, C, Q, RIS, MS, ES, LS, WISS, CS>,
884 num_workers: usize,
885 ) -> Self {
886 Self {
887 worker,
888 num_workers,
889 handles: Vec::new(),
890 cancel_tokens: Vec::new(),
891 }
892 }
893
894 pub fn start(&mut self) {
896 for worker_id in 0..self.num_workers {
897 let cancel_token = CancellationToken::new();
898 let cancel_token_child = cancel_token.child_token();
899 let worker = self.worker.clone();
900 let handle = tokio::spawn(async move {
901 let token = cancel_token_child;
902 tokio::select! {
903 _ = worker.run_forever(worker_id) => {},
904 _ = token.cancelled() => {},
905 }
906 });
907 self.handles.push(handle);
908 self.cancel_tokens.push(cancel_token);
909 }
910 }
911
912 pub async fn stop(&mut self) {
914 for token in &self.cancel_tokens {
915 token.cancel();
916 }
917 for handle in self.handles.drain(..) {
918 let _ = handle.await;
919 }
920 }
921
922 pub async fn join(&mut self) {
924 for handle in self.handles.drain(..) {
925 let _ = handle.await;
926 }
927 }
928
929 pub async fn health(&self) -> Vec<WorkerHealth> {
931 self.worker
932 .liveness_store
933 .list_health()
934 .await
935 .unwrap_or_default()
936 }
937}
938
939#[allow(clippy::type_complexity)]
940struct StepCallbacksImpl<
941 C: Context + crate::merge::Merge + Default,
942 W: Workflow<C>,
943 Q,
944 RIS,
945 MS,
946 ES,
947 LS,
948 WISS,
949 CS,
950> where
951 W: Workflow<C, WorkItem: 'static>,
952 C: Context + crate::merge::Merge + Default,
953 Q: WorkQueue<C, W::WorkItem> + Send + Sync,
954 RIS: RunInfoStore + Send + Sync,
955 MS: MetricsStore + Send + Sync,
956 ES: ErrorStore + Send + Sync,
957 LS: LivenessStore + Send + Sync,
958 WISS: WorkItemStateStore<W::WorkItem> + Send + Sync,
959 CS: ContextStore<C> + Send + Sync + Clone + 'static,
960{
961 worker: Arc<DistributedWorker<W, C, Q, RIS, MS, ES, LS, WISS, CS>>,
962 worker_id: usize,
963}
964
965#[async_trait]
966impl<C, W, Q, RIS, MS, ES, LS, WISS, CS> StepCallbacks<C, W>
967 for StepCallbacksImpl<C, W, Q, RIS, MS, ES, LS, WISS, CS>
968where
969 W: Workflow<C, WorkItem: 'static> + 'static,
970 C: Context + crate::merge::Merge + Default + 'static,
971 Q: WorkQueue<C, W::WorkItem> + Send + Sync + Clone,
972 RIS: RunInfoStore + Send + Sync + Clone + 'static,
973 MS: MetricsStore + Send + Sync + Clone + 'static,
974 ES: ErrorStore + Send + Sync + Clone + 'static,
975 LS: LivenessStore + Send + Sync + Clone + 'static,
976 WISS: WorkItemStateStore<W::WorkItem> + Send + Sync + Clone + 'static,
977 CS: ContextStore<C> + Send + Sync + Clone + 'static,
978{
979 async fn on_started(&self, run_id: String, item: W::WorkItem) -> Result<(), FloxideError> {
980 if let Err(e) = self
981 .worker
982 .on_started_state_updates(self.worker_id, &run_id, &item)
983 .await
984 {
985 tracing::error!(worker_id = self.worker_id, run_id = %run_id, "on_started_state_updates failed: {:?}", e);
986 }
987 Ok(())
988 }
989 async fn on_item_processed(
990 &self,
991 run_id: String,
992 item: W::WorkItem,
993 outcome: ItemProcessedOutcome,
994 ) -> Result<(), FloxideError> {
995 let result = match outcome {
996 ItemProcessedOutcome::SuccessTerminal(output) => {
997 self.worker
998 .on_item_processed_success_terminal_state_updates(
999 self.worker_id,
1000 &run_id,
1001 &item,
1002 &output,
1003 )
1004 .await
1005 }
1006 ItemProcessedOutcome::SuccessNonTerminal => {
1007 self.worker
1008 .on_item_processed_success_non_terminal_state_updates(
1009 self.worker_id,
1010 &run_id,
1011 &item,
1012 )
1013 .await
1014 }
1015 ItemProcessedOutcome::Error(e) => {
1016 self.worker
1017 .on_item_processed_error_state_updates(self.worker_id, &run_id, &item, &e)
1018 .await
1019 }
1020 };
1021 if let Err(e) = result {
1022 tracing::error!(worker_id = self.worker_id, run_id = %run_id, "on_item_processed_state_updates failed: {:?}", e);
1023 }
1024 Ok(())
1025 }
1026}