1use crate::context::{Context, WorkflowCtx};
2use crate::distributed::context_store::ContextStore;
3use crate::distributed::{
4 ErrorStore, LivenessStatus, LivenessStore, LivenessStoreError, MetricsStore, RunInfo,
5 RunInfoError, RunInfoStore, RunMetrics, RunStatus, WorkItemStateStore, WorkQueue,
6 WorkflowError,
7};
8use crate::error::FloxideError;
9use crate::workflow::Workflow;
10use chrono::Utc;
11use std::marker::PhantomData;
12use std::time::Duration;
13use uuid;
14
15use super::{WorkItemState, WorkItemStateStoreError, WorkItemStatus, WorkerHealth};
16
17pub struct DistributedOrchestrator<W, C, Q, RIS, MS, ES, LS, WIS, CS>
18where
19 W: Workflow<C>,
20 C: Context + crate::merge::Merge + Default,
21 Q: WorkQueue<C, W::WorkItem> + Send + Sync,
22 RIS: RunInfoStore + Send + Sync,
23 MS: MetricsStore + Send + Sync,
24 ES: ErrorStore + Send + Sync,
25 LS: LivenessStore + Send + Sync,
26 WIS: WorkItemStateStore<W::WorkItem> + Send + Sync,
27 CS: ContextStore<C> + Send + Sync,
28{
29 workflow: W,
30 queue: Q,
31 run_info_store: RIS,
32 metrics_store: MS,
33 error_store: ES,
34 liveness_store: LS,
35 work_item_state_store: WIS,
36 context_store: CS,
37 phantom: PhantomData<C>,
38}
39
40impl<W, C, Q, RIS, MS, ES, LS, WIS, CS> DistributedOrchestrator<W, C, Q, RIS, MS, ES, LS, WIS, CS>
41where
42 W: Workflow<C>,
43 C: Context + crate::merge::Merge + Default,
44 Q: WorkQueue<C, W::WorkItem> + Send + Sync,
45 RIS: RunInfoStore + Send + Sync,
46 MS: MetricsStore + Send + Sync,
47 ES: ErrorStore + Send + Sync,
48 LS: LivenessStore + Send + Sync,
49 WIS: WorkItemStateStore<W::WorkItem> + Send + Sync,
50 CS: ContextStore<C> + Send + Sync,
51{
52 #[allow(clippy::too_many_arguments)]
54 pub fn new(
55 workflow: W,
56 queue: Q,
57 run_info_store: RIS,
58 metrics_store: MS,
59 error_store: ES,
60 liveness_store: LS,
61 work_item_state_store: WIS,
62 context_store: CS,
63 ) -> Self {
64 Self {
65 workflow,
66 queue,
67 run_info_store,
68 metrics_store,
69 error_store,
70 liveness_store,
71 work_item_state_store,
72 context_store,
73 phantom: PhantomData,
74 }
75 }
76
77 pub async fn start_run(
81 &self,
82 ctx: &WorkflowCtx<C>,
83 input: W::Input,
84 ) -> Result<String, FloxideError> {
85 let run_id = uuid::Uuid::new_v4().to_string();
87 self.workflow
89 .start_distributed(ctx, input, &self.context_store, &self.queue, &run_id)
90 .await?;
91 let run_info = RunInfo {
93 run_id: run_id.clone(),
94 status: RunStatus::Running,
95 started_at: Utc::now(),
96 finished_at: None,
97 output: None,
98 };
99 self.run_info_store
100 .insert_run(run_info)
101 .await
102 .map_err(|e| FloxideError::Generic(format!("run_info_store error: {e}")))?;
103 Ok(run_id)
104 }
105
106 pub async fn status(&self, run_id: &str) -> Result<RunStatus, FloxideError>
108 where
109 C: std::fmt::Debug + Clone + Send + Sync,
110 {
111 match self.run_info_store.get_run(run_id).await {
112 Ok(Some(info)) => Ok(info.status),
113 Ok(None) => Err(FloxideError::NotStarted),
114 Err(e) => Err(FloxideError::Generic(format!("run_info_store error: {e}"))),
115 }
116 }
117
118 pub async fn list_runs(&self, filter: Option<RunStatus>) -> Result<Vec<RunInfo>, FloxideError>
120 where
121 C: std::fmt::Debug + Clone + Send + Sync,
122 {
123 self.run_info_store
124 .list_runs(filter)
125 .await
126 .map_err(|e| FloxideError::Generic(format!("run_info_store error: {e}")))
127 }
128
129 pub async fn cancel(&self, run_id: &str) -> Result<(), FloxideError>
131 where
132 C: std::fmt::Debug + Clone + Send + Sync,
133 {
134 self.run_info_store
135 .update_status(run_id, RunStatus::Cancelled)
136 .await
137 .map_err(|e| match e {
138 RunInfoError::NotFound => FloxideError::NotStarted,
139 e => FloxideError::Generic(format!("run_info_store error: {e}")),
140 })?;
141 let now = chrono::Utc::now();
142 self.run_info_store
143 .update_finished_at(run_id, now)
144 .await
145 .map_err(|e| FloxideError::Generic(format!("run_info_store error: {e}")))?;
146 self.queue
147 .purge_run(run_id)
148 .await
149 .map_err(|e| FloxideError::Generic(format!("work_queue error: {e}")))?;
150 Ok(())
151 }
152
153 pub async fn pause(&self, run_id: &str) -> Result<(), FloxideError>
155 where
156 C: std::fmt::Debug + Clone + Send + Sync,
157 {
158 self.run_info_store
159 .update_status(run_id, RunStatus::Paused)
160 .await
161 .map_err(|e| match e {
162 RunInfoError::NotFound => FloxideError::NotStarted,
163 e => FloxideError::Generic(format!("run_info_store error: {e}")),
164 })
165 }
166
167 pub async fn resume(&self, run_id: &str) -> Result<(), FloxideError>
169 where
170 C: std::fmt::Debug + Clone + Send + Sync,
171 {
172 let run_info = self
173 .run_info_store
174 .get_run(run_id)
175 .await
176 .map_err(|e| match e {
177 RunInfoError::NotFound => FloxideError::NotStarted,
178 e => FloxideError::Generic(format!("run_info_store error: {e}")),
179 })?;
180
181 if run_info.is_none() {
182 return Err(FloxideError::NotStarted);
183 }
184
185 match run_info.unwrap().status {
186 RunStatus::Running => Ok(()),
188 RunStatus::Failed => {
189 let pending_work = self
191 .pending_work(run_id)
192 .await
193 .map_err(|e| FloxideError::Generic(format!("work_queue error: {e}")))?;
194
195 for item in self.list_work_items(run_id).await.map_err(|e| {
197 FloxideError::Generic(format!("work_item_state_store error: {e}"))
198 })? {
199 if item.status != WorkItemStatus::Completed
200 && !pending_work.contains(&item.work_item)
201 {
202 self.work_item_state_store
203 .set_status(run_id, &item.work_item, WorkItemStatus::Pending)
204 .await
205 .map_err(|e| {
206 FloxideError::Generic(format!("work_item_state_store error: {e}"))
207 })?;
208 self.work_item_state_store
209 .reset_attempts(run_id, &item.work_item)
210 .await
211 .map_err(|e| {
212 FloxideError::Generic(format!("work_item_state_store error: {e}"))
213 })?;
214 self.queue
215 .enqueue(run_id, item.work_item.clone())
216 .await
217 .map_err(|e| FloxideError::Generic(format!("work_queue error: {e}")))?;
218 }
219 }
220
221 self.run_info_store
223 .update_status(run_id, RunStatus::Running)
224 .await
225 .map_err(|e| match e {
226 RunInfoError::NotFound => FloxideError::NotStarted,
227 e => FloxideError::Generic(format!("run_info_store error: {e}")),
228 })
229 }
230 RunStatus::Completed => Err(FloxideError::Generic("run already completed".to_string())),
231 RunStatus::Cancelled => Err(FloxideError::AlreadyCompleted),
232 RunStatus::Paused => {
233 for item in self.list_work_items(run_id).await.map_err(|e| {
235 FloxideError::Generic(format!("work_item_state_store error: {e}"))
236 })? {
237 self.work_item_state_store
238 .set_status(run_id, &item.work_item, WorkItemStatus::Pending)
239 .await
240 .map_err(|e| {
241 FloxideError::Generic(format!("work_item_state_store error: {e}"))
242 })?;
243 }
244 self.run_info_store
246 .update_status(run_id, RunStatus::Running)
247 .await
248 .map_err(|e| match e {
249 RunInfoError::NotFound => FloxideError::NotStarted,
250 e => FloxideError::Generic(format!("run_info_store error: {e}")),
251 })
252 }
253 }
254 }
255
256 pub async fn errors(&self, run_id: &str) -> Result<Vec<WorkflowError>, FloxideError>
258 where
259 C: std::fmt::Debug + Clone + Send + Sync,
260 {
261 self.error_store
262 .get_errors(run_id)
263 .await
264 .map_err(|e| FloxideError::Generic(format!("error_store error: {e}")))
265 }
266
267 pub async fn liveness(&self) -> Result<Vec<WorkerHealth>, FloxideError>
269 where
270 C: std::fmt::Debug + Clone + Send + Sync,
271 {
272 self.liveness_store
273 .list_health()
274 .await
275 .map_err(|e| FloxideError::Generic(format!("liveness_store error: {e}")))
276 }
277
278 pub async fn context(&self, run_id: &str) -> Result<C, FloxideError>
280 where
281 C: std::fmt::Debug + Clone + Send + Sync,
282 {
283 match self.context_store.get(run_id).await {
284 Ok(Some(context)) => Ok(context),
285 Ok(None) => Err(FloxideError::NotStarted),
286 Err(e) => Err(FloxideError::Generic(format!("context_store error: {e}"))),
287 }
288 }
289
290 pub async fn metrics(&self, run_id: &str) -> Result<RunMetrics, FloxideError>
292 where
293 C: std::fmt::Debug + Clone + Send + Sync,
294 {
295 match self.metrics_store.get_metrics(run_id).await {
296 Ok(Some(metrics)) => Ok(metrics),
297 Ok(None) => Err(FloxideError::NotStarted),
298 Err(e) => Err(FloxideError::Generic(format!("metrics_store error: {e}"))),
299 }
300 }
301
302 pub async fn pending_work(&self, run_id: &str) -> Result<Vec<W::WorkItem>, FloxideError>
304 where
305 C: std::fmt::Debug + Clone + Send + Sync,
306 {
307 self.queue
308 .pending_work(run_id)
309 .await
310 .map_err(|e| FloxideError::Generic(format!("work_queue error: {e}")))
311 }
312
313 pub async fn check_worker_liveness(
315 &self,
316 worker_ids: &[usize],
317 threshold: Duration,
318 ) -> Vec<(usize, LivenessStatus)> {
319 let now = Utc::now();
320 let mut statuses = Vec::new();
321 for &worker_id in worker_ids {
322 let status = match self.liveness_store.get_heartbeat(worker_id).await {
323 Ok(Some(ts)) => {
324 let elapsed = now
325 .signed_duration_since(ts)
326 .to_std()
327 .unwrap_or(Duration::MAX);
328 if elapsed < threshold {
329 LivenessStatus::Alive
330 } else if elapsed < threshold * 3 {
331 LivenessStatus::Stale
332 } else {
333 LivenessStatus::Dead
334 }
335 }
336 Ok(None) => LivenessStatus::Dead,
337 Err(_) => LivenessStatus::Dead,
338 };
339 statuses.push((worker_id, status));
340 }
341 statuses
342 }
343
344 pub async fn list_workers(&self) -> Result<Vec<usize>, LivenessStoreError> {
346 self.liveness_store.list_workers().await
347 }
348
349 pub async fn list_worker_health(
351 &self,
352 ) -> Result<Vec<crate::distributed::WorkerHealth>, LivenessStoreError> {
353 self.liveness_store.list_health().await
354 }
355
356 pub async fn list_work_items(
358 &self,
359 run_id: &str,
360 ) -> Result<Vec<WorkItemState<W::WorkItem>>, WorkItemStateStoreError> {
361 self.work_item_state_store.get_all(run_id).await
362 }
363
364 pub async fn complete_run(&self, run_id: &str) -> Result<(), FloxideError> {
366 let now = chrono::Utc::now();
367 self.run_info_store
368 .update_status(run_id, RunStatus::Completed)
369 .await
370 .map_err(|e| FloxideError::Generic(format!("run_info_store error: {e}")))?;
371 self.run_info_store
372 .update_finished_at(run_id, now)
373 .await
374 .map_err(|e| FloxideError::Generic(format!("run_info_store error: {e}")))?;
375 Ok(())
376 }
377
378 pub async fn wait_for_completion(
380 &self,
381 run_id: &str,
382 poll_interval: std::time::Duration,
383 ) -> Result<RunInfo, FloxideError> {
384 loop {
385 let status = self
386 .run_info_store
387 .get_run(run_id)
388 .await
389 .map_err(|e| FloxideError::Generic(format!("run_info_store error: {e}")))?;
390 if let Some(info) = status {
391 match info.status {
392 RunStatus::Completed | RunStatus::Failed | RunStatus::Cancelled => {
393 return Ok(info)
394 }
395 _ => tokio::time::sleep(poll_interval).await,
396 }
397 } else {
398 return Err(FloxideError::NotStarted);
399 }
400 }
401 }
402}
403
404pub struct OrchestratorBuilder<W, C, Q, RIS, MS, ES, LS, WIS, CS>
405where
406 W: Workflow<C>,
407 C: Context + crate::merge::Merge + Default,
408 Q: WorkQueue<C, W::WorkItem> + Send + Sync,
409 RIS: RunInfoStore + Send + Sync,
410 MS: MetricsStore + Send + Sync,
411 ES: ErrorStore + Send + Sync,
412 LS: LivenessStore + Send + Sync,
413 WIS: WorkItemStateStore<W::WorkItem> + Send + Sync,
414 CS: ContextStore<C> + Send + Sync,
415{
416 workflow: Option<W>,
417 queue: Option<Q>,
418 run_info_store: Option<RIS>,
419 metrics_store: Option<MS>,
420 error_store: Option<ES>,
421 liveness_store: Option<LS>,
422 work_item_state_store: Option<WIS>,
423 context_store: Option<CS>,
424 _phantom: std::marker::PhantomData<C>,
425}
426
427impl<W, C, Q, RIS, MS, ES, LS, WIS, CS> OrchestratorBuilder<W, C, Q, RIS, MS, ES, LS, WIS, CS>
428where
429 W: Workflow<C>,
430 C: Context + crate::merge::Merge + Default,
431 Q: WorkQueue<C, W::WorkItem> + Send + Sync,
432 RIS: RunInfoStore + Send + Sync,
433 MS: MetricsStore + Send + Sync,
434 ES: ErrorStore + Send + Sync,
435 LS: LivenessStore + Send + Sync,
436 WIS: WorkItemStateStore<W::WorkItem> + Send + Sync,
437 CS: ContextStore<C> + Send + Sync,
438{
439 pub fn new() -> Self {
440 Self {
441 workflow: None,
442 queue: None,
443 run_info_store: None,
444 metrics_store: None,
445 error_store: None,
446 liveness_store: None,
447 work_item_state_store: None,
448 context_store: None,
449 _phantom: std::marker::PhantomData,
450 }
451 }
452 pub fn workflow(mut self, workflow: W) -> Self {
453 self.workflow = Some(workflow);
454 self
455 }
456 pub fn queue(mut self, queue: Q) -> Self {
457 self.queue = Some(queue);
458 self
459 }
460 pub fn run_info_store(mut self, ris: RIS) -> Self {
461 self.run_info_store = Some(ris);
462 self
463 }
464 pub fn metrics_store(mut self, ms: MS) -> Self {
465 self.metrics_store = Some(ms);
466 self
467 }
468 pub fn error_store(mut self, es: ES) -> Self {
469 self.error_store = Some(es);
470 self
471 }
472 pub fn liveness_store(mut self, ls: LS) -> Self {
473 self.liveness_store = Some(ls);
474 self
475 }
476 pub fn work_item_state_store(mut self, wiss: WIS) -> Self {
477 self.work_item_state_store = Some(wiss);
478 self
479 }
480 pub fn context_store(mut self, context_store: CS) -> Self {
481 self.context_store = Some(context_store);
482 self
483 }
484 #[allow(clippy::type_complexity)]
485 pub fn build(self) -> Result<DistributedOrchestrator<W, C, Q, RIS, MS, ES, LS, WIS, CS>, String>
486 where
487 W: Workflow<C>,
488 C: Context + crate::merge::Merge + Default,
489 Q: WorkQueue<C, W::WorkItem> + Send + Sync,
490 RIS: RunInfoStore + Send + Sync,
491 MS: MetricsStore + Send + Sync,
492 ES: ErrorStore + Send + Sync,
493 LS: LivenessStore + Send + Sync,
494 WIS: WorkItemStateStore<W::WorkItem> + Send + Sync,
495 CS: ContextStore<C> + Send + Sync,
496 {
497 Ok(DistributedOrchestrator {
498 workflow: self.workflow.ok_or("workflow is required")?,
499 queue: self.queue.ok_or("queue is required")?,
500 run_info_store: self.run_info_store.ok_or("run_info_store is required")?,
501 metrics_store: self.metrics_store.ok_or("metrics_store is required")?,
502 error_store: self.error_store.ok_or("error_store is required")?,
503 liveness_store: self.liveness_store.ok_or("liveness_store is required")?,
504 work_item_state_store: self
505 .work_item_state_store
506 .ok_or("work_item_state_store is required")?,
507 context_store: self.context_store.ok_or("context_store is required")?,
508 phantom: std::marker::PhantomData,
509 })
510 }
511}
512
513impl<W, C, Q, RIS, MS, ES, LS, WIS, CS> Default
514 for OrchestratorBuilder<W, C, Q, RIS, MS, ES, LS, WIS, CS>
515where
516 W: Workflow<C>,
517 C: Context + crate::merge::Merge + Default,
518 Q: WorkQueue<C, W::WorkItem> + Send + Sync,
519 RIS: RunInfoStore + Send + Sync,
520 MS: MetricsStore + Send + Sync,
521 ES: ErrorStore + Send + Sync,
522 LS: LivenessStore + Send + Sync,
523 WIS: WorkItemStateStore<W::WorkItem> + Send + Sync,
524 CS: ContextStore<C> + Send + Sync,
525{
526 fn default() -> Self {
527 Self::new()
528 }
529}