floxide_core/distributed/
orchestrator.rs

1use crate::context::{Context, WorkflowCtx};
2use crate::distributed::context_store::ContextStore;
3use crate::distributed::{
4    ErrorStore, LivenessStatus, LivenessStore, LivenessStoreError, MetricsStore, RunInfo,
5    RunInfoError, RunInfoStore, RunMetrics, RunStatus, WorkItemStateStore, WorkQueue,
6    WorkflowError,
7};
8use crate::error::FloxideError;
9use crate::workflow::Workflow;
10use chrono::Utc;
11use std::marker::PhantomData;
12use std::time::Duration;
13use uuid;
14
15use super::{WorkItemState, WorkItemStateStoreError, WorkItemStatus, WorkerHealth};
16
17pub struct DistributedOrchestrator<W, C, Q, RIS, MS, ES, LS, WIS, CS>
18where
19    W: Workflow<C>,
20    C: Context + crate::merge::Merge + Default,
21    Q: WorkQueue<C, W::WorkItem> + Send + Sync,
22    RIS: RunInfoStore + Send + Sync,
23    MS: MetricsStore + Send + Sync,
24    ES: ErrorStore + Send + Sync,
25    LS: LivenessStore + Send + Sync,
26    WIS: WorkItemStateStore<W::WorkItem> + Send + Sync,
27    CS: ContextStore<C> + Send + Sync,
28{
29    workflow: W,
30    queue: Q,
31    run_info_store: RIS,
32    metrics_store: MS,
33    error_store: ES,
34    liveness_store: LS,
35    work_item_state_store: WIS,
36    context_store: CS,
37    phantom: PhantomData<C>,
38}
39
40impl<W, C, Q, RIS, MS, ES, LS, WIS, CS> DistributedOrchestrator<W, C, Q, RIS, MS, ES, LS, WIS, CS>
41where
42    W: Workflow<C>,
43    C: Context + crate::merge::Merge + Default,
44    Q: WorkQueue<C, W::WorkItem> + Send + Sync,
45    RIS: RunInfoStore + Send + Sync,
46    MS: MetricsStore + Send + Sync,
47    ES: ErrorStore + Send + Sync,
48    LS: LivenessStore + Send + Sync,
49    WIS: WorkItemStateStore<W::WorkItem> + Send + Sync,
50    CS: ContextStore<C> + Send + Sync,
51{
52    /// Create a new orchestrator.
53    #[allow(clippy::too_many_arguments)]
54    pub fn new(
55        workflow: W,
56        queue: Q,
57        run_info_store: RIS,
58        metrics_store: MS,
59        error_store: ES,
60        liveness_store: LS,
61        work_item_state_store: WIS,
62        context_store: CS,
63    ) -> Self {
64        Self {
65            workflow,
66            queue,
67            run_info_store,
68            metrics_store,
69            error_store,
70            liveness_store,
71            work_item_state_store,
72            context_store,
73            phantom: PhantomData,
74        }
75    }
76
77    /// Start a new workflow run. Returns a run_id.
78    ///
79    /// Note: Requires `uuid` crate in Cargo.toml: uuid = { version = "1", features = ["v4"] }
80    pub async fn start_run(
81        &self,
82        ctx: &WorkflowCtx<C>,
83        input: W::Input,
84    ) -> Result<String, FloxideError> {
85        // Generate a unique run_id
86        let run_id = uuid::Uuid::new_v4().to_string();
87        // Seed the workflow (creates context and enqueues first work item)
88        self.workflow
89            .start_distributed(ctx, input, &self.context_store, &self.queue, &run_id)
90            .await?;
91        // Insert run info into run_info_store
92        let run_info = RunInfo {
93            run_id: run_id.clone(),
94            status: RunStatus::Running,
95            started_at: Utc::now(),
96            finished_at: None,
97            output: None,
98        };
99        self.run_info_store
100            .insert_run(run_info)
101            .await
102            .map_err(|e| FloxideError::Generic(format!("run_info_store error: {e}")))?;
103        Ok(run_id)
104    }
105
106    /// Query the status of a run.
107    pub async fn status(&self, run_id: &str) -> Result<RunStatus, FloxideError>
108    where
109        C: std::fmt::Debug + Clone + Send + Sync,
110    {
111        match self.run_info_store.get_run(run_id).await {
112            Ok(Some(info)) => Ok(info.status),
113            Ok(None) => Err(FloxideError::NotStarted),
114            Err(e) => Err(FloxideError::Generic(format!("run_info_store error: {e}"))),
115        }
116    }
117
118    /// List all runs (optionally filter by status).
119    pub async fn list_runs(&self, filter: Option<RunStatus>) -> Result<Vec<RunInfo>, FloxideError>
120    where
121        C: std::fmt::Debug + Clone + Send + Sync,
122    {
123        self.run_info_store
124            .list_runs(filter)
125            .await
126            .map_err(|e| FloxideError::Generic(format!("run_info_store error: {e}")))
127    }
128
129    /// Cancel a run.
130    pub async fn cancel(&self, run_id: &str) -> Result<(), FloxideError>
131    where
132        C: std::fmt::Debug + Clone + Send + Sync,
133    {
134        self.run_info_store
135            .update_status(run_id, RunStatus::Cancelled)
136            .await
137            .map_err(|e| match e {
138                RunInfoError::NotFound => FloxideError::NotStarted,
139                e => FloxideError::Generic(format!("run_info_store error: {e}")),
140            })?;
141        let now = chrono::Utc::now();
142        self.run_info_store
143            .update_finished_at(run_id, now)
144            .await
145            .map_err(|e| FloxideError::Generic(format!("run_info_store error: {e}")))?;
146        self.queue
147            .purge_run(run_id)
148            .await
149            .map_err(|e| FloxideError::Generic(format!("work_queue error: {e}")))?;
150        Ok(())
151    }
152
153    /// Pause a run.
154    pub async fn pause(&self, run_id: &str) -> Result<(), FloxideError>
155    where
156        C: std::fmt::Debug + Clone + Send + Sync,
157    {
158        self.run_info_store
159            .update_status(run_id, RunStatus::Paused)
160            .await
161            .map_err(|e| match e {
162                RunInfoError::NotFound => FloxideError::NotStarted,
163                e => FloxideError::Generic(format!("run_info_store error: {e}")),
164            })
165    }
166
167    /// Resume a run.
168    pub async fn resume(&self, run_id: &str) -> Result<(), FloxideError>
169    where
170        C: std::fmt::Debug + Clone + Send + Sync,
171    {
172        let run_info = self
173            .run_info_store
174            .get_run(run_id)
175            .await
176            .map_err(|e| match e {
177                RunInfoError::NotFound => FloxideError::NotStarted,
178                e => FloxideError::Generic(format!("run_info_store error: {e}")),
179            })?;
180
181        if run_info.is_none() {
182            return Err(FloxideError::NotStarted);
183        }
184
185        match run_info.unwrap().status {
186            // If already running, do nothing
187            RunStatus::Running => Ok(()),
188            RunStatus::Failed => {
189                // Fetch pending work too
190                let pending_work = self
191                    .pending_work(run_id)
192                    .await
193                    .map_err(|e| FloxideError::Generic(format!("work_queue error: {e}")))?;
194
195                // Re-establish the work queue from the work item state store
196                for item in self.list_work_items(run_id).await.map_err(|e| {
197                    FloxideError::Generic(format!("work_item_state_store error: {e}"))
198                })? {
199                    if item.status != WorkItemStatus::Completed
200                        && !pending_work.contains(&item.work_item)
201                    {
202                        self.work_item_state_store
203                            .set_status(run_id, &item.work_item, WorkItemStatus::Pending)
204                            .await
205                            .map_err(|e| {
206                                FloxideError::Generic(format!("work_item_state_store error: {e}"))
207                            })?;
208                        self.work_item_state_store
209                            .reset_attempts(run_id, &item.work_item)
210                            .await
211                            .map_err(|e| {
212                                FloxideError::Generic(format!("work_item_state_store error: {e}"))
213                            })?;
214                        self.queue
215                            .enqueue(run_id, item.work_item.clone())
216                            .await
217                            .map_err(|e| FloxideError::Generic(format!("work_queue error: {e}")))?;
218                    }
219                }
220
221                // Reset run status to Running
222                self.run_info_store
223                    .update_status(run_id, RunStatus::Running)
224                    .await
225                    .map_err(|e| match e {
226                        RunInfoError::NotFound => FloxideError::NotStarted,
227                        e => FloxideError::Generic(format!("run_info_store error: {e}")),
228                    })
229            }
230            RunStatus::Completed => Err(FloxideError::Generic("run already completed".to_string())),
231            RunStatus::Cancelled => Err(FloxideError::AlreadyCompleted),
232            RunStatus::Paused => {
233                // Change the status of all work items to Pending
234                for item in self.list_work_items(run_id).await.map_err(|e| {
235                    FloxideError::Generic(format!("work_item_state_store error: {e}"))
236                })? {
237                    self.work_item_state_store
238                        .set_status(run_id, &item.work_item, WorkItemStatus::Pending)
239                        .await
240                        .map_err(|e| {
241                            FloxideError::Generic(format!("work_item_state_store error: {e}"))
242                        })?;
243                }
244                // Reset run status to Running
245                self.run_info_store
246                    .update_status(run_id, RunStatus::Running)
247                    .await
248                    .map_err(|e| match e {
249                        RunInfoError::NotFound => FloxideError::NotStarted,
250                        e => FloxideError::Generic(format!("run_info_store error: {e}")),
251                    })
252            }
253        }
254    }
255
256    /// Get all errors for a run.
257    pub async fn errors(&self, run_id: &str) -> Result<Vec<WorkflowError>, FloxideError>
258    where
259        C: std::fmt::Debug + Clone + Send + Sync,
260    {
261        self.error_store
262            .get_errors(run_id)
263            .await
264            .map_err(|e| FloxideError::Generic(format!("error_store error: {e}")))
265    }
266
267    // Get liveness status for a run.
268    pub async fn liveness(&self) -> Result<Vec<WorkerHealth>, FloxideError>
269    where
270        C: std::fmt::Debug + Clone + Send + Sync,
271    {
272        self.liveness_store
273            .list_health()
274            .await
275            .map_err(|e| FloxideError::Generic(format!("liveness_store error: {e}")))
276    }
277
278    // Get context for a run.
279    pub async fn context(&self, run_id: &str) -> Result<C, FloxideError>
280    where
281        C: std::fmt::Debug + Clone + Send + Sync,
282    {
283        match self.context_store.get(run_id).await {
284            Ok(Some(context)) => Ok(context),
285            Ok(None) => Err(FloxideError::NotStarted),
286            Err(e) => Err(FloxideError::Generic(format!("context_store error: {e}"))),
287        }
288    }
289
290    /// Get progress/metrics for a run.
291    pub async fn metrics(&self, run_id: &str) -> Result<RunMetrics, FloxideError>
292    where
293        C: std::fmt::Debug + Clone + Send + Sync,
294    {
295        match self.metrics_store.get_metrics(run_id).await {
296            Ok(Some(metrics)) => Ok(metrics),
297            Ok(None) => Err(FloxideError::NotStarted),
298            Err(e) => Err(FloxideError::Generic(format!("metrics_store error: {e}"))),
299        }
300    }
301
302    /// Get pending work for a run.
303    pub async fn pending_work(&self, run_id: &str) -> Result<Vec<W::WorkItem>, FloxideError>
304    where
305        C: std::fmt::Debug + Clone + Send + Sync,
306    {
307        self.queue
308            .pending_work(run_id)
309            .await
310            .map_err(|e| FloxideError::Generic(format!("work_queue error: {e}")))
311    }
312
313    /// Check liveness status of a list of workers.
314    pub async fn check_worker_liveness(
315        &self,
316        worker_ids: &[usize],
317        threshold: Duration,
318    ) -> Vec<(usize, LivenessStatus)> {
319        let now = Utc::now();
320        let mut statuses = Vec::new();
321        for &worker_id in worker_ids {
322            let status = match self.liveness_store.get_heartbeat(worker_id).await {
323                Ok(Some(ts)) => {
324                    let elapsed = now
325                        .signed_duration_since(ts)
326                        .to_std()
327                        .unwrap_or(Duration::MAX);
328                    if elapsed < threshold {
329                        LivenessStatus::Alive
330                    } else if elapsed < threshold * 3 {
331                        LivenessStatus::Stale
332                    } else {
333                        LivenessStatus::Dead
334                    }
335                }
336                Ok(None) => LivenessStatus::Dead,
337                Err(_) => LivenessStatus::Dead,
338            };
339            statuses.push((worker_id, status));
340        }
341        statuses
342    }
343
344    /// List all known worker IDs.
345    pub async fn list_workers(&self) -> Result<Vec<usize>, LivenessStoreError> {
346        self.liveness_store.list_workers().await
347    }
348
349    /// List all worker health info.
350    pub async fn list_worker_health(
351        &self,
352    ) -> Result<Vec<crate::distributed::WorkerHealth>, LivenessStoreError> {
353        self.liveness_store.list_health().await
354    }
355
356    /// Get all work items for a run.
357    pub async fn list_work_items(
358        &self,
359        run_id: &str,
360    ) -> Result<Vec<WorkItemState<W::WorkItem>>, WorkItemStateStoreError> {
361        self.work_item_state_store.get_all(run_id).await
362    }
363
364    /// Mark a run as completed and set finished_at timestamp.
365    pub async fn complete_run(&self, run_id: &str) -> Result<(), FloxideError> {
366        let now = chrono::Utc::now();
367        self.run_info_store
368            .update_status(run_id, RunStatus::Completed)
369            .await
370            .map_err(|e| FloxideError::Generic(format!("run_info_store error: {e}")))?;
371        self.run_info_store
372            .update_finished_at(run_id, now)
373            .await
374            .map_err(|e| FloxideError::Generic(format!("run_info_store error: {e}")))?;
375        Ok(())
376    }
377
378    /// Wait for a run to reach a terminal state (Completed, Failed, or Cancelled).
379    pub async fn wait_for_completion(
380        &self,
381        run_id: &str,
382        poll_interval: std::time::Duration,
383    ) -> Result<RunInfo, FloxideError> {
384        loop {
385            let status = self
386                .run_info_store
387                .get_run(run_id)
388                .await
389                .map_err(|e| FloxideError::Generic(format!("run_info_store error: {e}")))?;
390            if let Some(info) = status {
391                match info.status {
392                    RunStatus::Completed | RunStatus::Failed | RunStatus::Cancelled => {
393                        return Ok(info)
394                    }
395                    _ => tokio::time::sleep(poll_interval).await,
396                }
397            } else {
398                return Err(FloxideError::NotStarted);
399            }
400        }
401    }
402}
403
404pub struct OrchestratorBuilder<W, C, Q, RIS, MS, ES, LS, WIS, CS>
405where
406    W: Workflow<C>,
407    C: Context + crate::merge::Merge + Default,
408    Q: WorkQueue<C, W::WorkItem> + Send + Sync,
409    RIS: RunInfoStore + Send + Sync,
410    MS: MetricsStore + Send + Sync,
411    ES: ErrorStore + Send + Sync,
412    LS: LivenessStore + Send + Sync,
413    WIS: WorkItemStateStore<W::WorkItem> + Send + Sync,
414    CS: ContextStore<C> + Send + Sync,
415{
416    workflow: Option<W>,
417    queue: Option<Q>,
418    run_info_store: Option<RIS>,
419    metrics_store: Option<MS>,
420    error_store: Option<ES>,
421    liveness_store: Option<LS>,
422    work_item_state_store: Option<WIS>,
423    context_store: Option<CS>,
424    _phantom: std::marker::PhantomData<C>,
425}
426
427impl<W, C, Q, RIS, MS, ES, LS, WIS, CS> OrchestratorBuilder<W, C, Q, RIS, MS, ES, LS, WIS, CS>
428where
429    W: Workflow<C>,
430    C: Context + crate::merge::Merge + Default,
431    Q: WorkQueue<C, W::WorkItem> + Send + Sync,
432    RIS: RunInfoStore + Send + Sync,
433    MS: MetricsStore + Send + Sync,
434    ES: ErrorStore + Send + Sync,
435    LS: LivenessStore + Send + Sync,
436    WIS: WorkItemStateStore<W::WorkItem> + Send + Sync,
437    CS: ContextStore<C> + Send + Sync,
438{
439    pub fn new() -> Self {
440        Self {
441            workflow: None,
442            queue: None,
443            run_info_store: None,
444            metrics_store: None,
445            error_store: None,
446            liveness_store: None,
447            work_item_state_store: None,
448            context_store: None,
449            _phantom: std::marker::PhantomData,
450        }
451    }
452    pub fn workflow(mut self, workflow: W) -> Self {
453        self.workflow = Some(workflow);
454        self
455    }
456    pub fn queue(mut self, queue: Q) -> Self {
457        self.queue = Some(queue);
458        self
459    }
460    pub fn run_info_store(mut self, ris: RIS) -> Self {
461        self.run_info_store = Some(ris);
462        self
463    }
464    pub fn metrics_store(mut self, ms: MS) -> Self {
465        self.metrics_store = Some(ms);
466        self
467    }
468    pub fn error_store(mut self, es: ES) -> Self {
469        self.error_store = Some(es);
470        self
471    }
472    pub fn liveness_store(mut self, ls: LS) -> Self {
473        self.liveness_store = Some(ls);
474        self
475    }
476    pub fn work_item_state_store(mut self, wiss: WIS) -> Self {
477        self.work_item_state_store = Some(wiss);
478        self
479    }
480    pub fn context_store(mut self, context_store: CS) -> Self {
481        self.context_store = Some(context_store);
482        self
483    }
484    #[allow(clippy::type_complexity)]
485    pub fn build(self) -> Result<DistributedOrchestrator<W, C, Q, RIS, MS, ES, LS, WIS, CS>, String>
486    where
487        W: Workflow<C>,
488        C: Context + crate::merge::Merge + Default,
489        Q: WorkQueue<C, W::WorkItem> + Send + Sync,
490        RIS: RunInfoStore + Send + Sync,
491        MS: MetricsStore + Send + Sync,
492        ES: ErrorStore + Send + Sync,
493        LS: LivenessStore + Send + Sync,
494        WIS: WorkItemStateStore<W::WorkItem> + Send + Sync,
495        CS: ContextStore<C> + Send + Sync,
496    {
497        Ok(DistributedOrchestrator {
498            workflow: self.workflow.ok_or("workflow is required")?,
499            queue: self.queue.ok_or("queue is required")?,
500            run_info_store: self.run_info_store.ok_or("run_info_store is required")?,
501            metrics_store: self.metrics_store.ok_or("metrics_store is required")?,
502            error_store: self.error_store.ok_or("error_store is required")?,
503            liveness_store: self.liveness_store.ok_or("liveness_store is required")?,
504            work_item_state_store: self
505                .work_item_state_store
506                .ok_or("work_item_state_store is required")?,
507            context_store: self.context_store.ok_or("context_store is required")?,
508            phantom: std::marker::PhantomData,
509        })
510    }
511}
512
513impl<W, C, Q, RIS, MS, ES, LS, WIS, CS> Default
514    for OrchestratorBuilder<W, C, Q, RIS, MS, ES, LS, WIS, CS>
515where
516    W: Workflow<C>,
517    C: Context + crate::merge::Merge + Default,
518    Q: WorkQueue<C, W::WorkItem> + Send + Sync,
519    RIS: RunInfoStore + Send + Sync,
520    MS: MetricsStore + Send + Sync,
521    ES: ErrorStore + Send + Sync,
522    LS: LivenessStore + Send + Sync,
523    WIS: WorkItemStateStore<W::WorkItem> + Send + Sync,
524    CS: ContextStore<C> + Send + Sync,
525{
526    fn default() -> Self {
527        Self::new()
528    }
529}