floxide_core/workflow.rs
1//! # Workflow Execution Modes in Floxide
2//!
3//! Floxide workflows can be executed in several modes, each suited to different use cases:
4//!
5//! ## 1. Monolithic (Single-Process) Execution
6//! - **Method:** [`run`]
7//! - **Description:** Runs the entire workflow from start to finish in a single process, with no checkpointing or distributed coordination.
8//! - **Use Case:** Simple, local workflows where failure recovery and distributed scaling are not needed.
9//!
10//! ## 2. Checkpointed (Resumable) Execution
11//! - **Method:** [`run_with_checkpoint`]
12//! - **Description:** Runs the workflow, checkpointing state after each step. If interrupted, can be resumed from the last checkpoint.
13//! - **Use Case:** Long-running workflows, or those that need to recover from process crashes or restarts.
14//!
15//! ## 3. Resuming from Checkpoint
16//! - **Method:** [`resume`]
17//! - **Description:** Resumes a workflow run from its last checkpoint, restoring context and queue from the store.
18//! - **Use Case:** Recovery after failure, or continuing a workflow after a pause.
19//!
20//! ## 4. Distributed Execution Primitives
21//! - **a. Orchestration/Seeding**
22//! - **Method:** [`start_distributed`]
23//! - **Description:** Seeds the distributed workflow by checkpointing the initial state and enqueuing the first work item(s). Does not execute any steps.
24//! - **Use Case:** Used by an orchestrator to start a new distributed workflow run, preparing it for workers to process.
25//! - **b. Worker Step**
26//! - **Method:** [`step_distributed`]
27//! - **Description:** A distributed worker dequeues a work item, loads the latest checkpoint, processes the node, enqueues successors, and persists state. Returns output if a terminal node is reached.
28//! - **Use Case:** Used by distributed workers to process workflow steps in parallel, with coordination via distributed queue and checkpoint store.
29//!
30//! These distributed methods are the core primitives for building scalable, fault-tolerant workflow systems in Floxide.
31//!
32use std::collections::VecDeque;
33use std::fmt::{Debug, Display};
34use std::sync::Arc;
35
36// crates/floxide-core/src/workflow.rs
37use crate::context::Context;
38use crate::distributed::context_store::ContextStore;
39use crate::distributed::{ItemProcessedOutcome, StepCallbacks, StepError, WorkQueue};
40use crate::error::FloxideError;
41use crate::{Checkpoint, CheckpointStore};
42use async_trait::async_trait;
43use serde::de::DeserializeOwned;
44use serde::Serialize;
45use serde_json;
46use tracing::{debug, info, span, Level};
47
48/// Trait for a workflow work item.
49///
50/// Implementations provide a way to serialize and deserialize work items, and
51/// track unique instances within a workflow run.
52pub trait WorkItem:
53 Debug + Display + Send + Sync + Serialize + DeserializeOwned + Clone + PartialEq
54{
55 /// Returns a unique identifier for this work item instance.
56 fn instance_id(&self) -> String;
57 /// Returns true if this work item is a terminal node (no successors)
58 fn is_terminal(&self) -> bool;
59}
60
61/// Trait for a workflow.
62///
63#[async_trait]
64pub trait Workflow<C: Context>: Debug + Clone + Send + Sync {
65 /// Input type for the workflow
66 type Input: Send + Sync + Serialize + DeserializeOwned;
67 /// Output type returned by the workflow's terminal branch
68 type Output: Send + Sync + Serialize + DeserializeOwned;
69 /// Workflow-specific work item type (macro-generated enum)
70 type WorkItem: WorkItem;
71
72 /// Name of the workflow, used for logging and tracing.
73 fn name(&self) -> &'static str;
74
75 /// Create the initial work item for the workflow start node.
76 fn start_work_item(&self, input: Self::Input) -> Self::WorkItem;
77
78 /// Execute the workflow, returning the output of the terminal branch.
79 ///
80 /// Default implementation dispatches work items via `process_work_item`.
81 async fn run<'a>(
82 &'a self,
83 ctx: &'a crate::WorkflowCtx<C>,
84 input: Self::Input,
85 ) -> Result<Self::Output, FloxideError> {
86 let span = span!(Level::INFO, "workflow_run", workflow = self.name());
87 let _enter = span.enter();
88 let mut queue: VecDeque<Self::WorkItem> = VecDeque::new();
89 queue.push_back(self.start_work_item(input));
90 while let Some(item) = queue.pop_front() {
91 debug!(?item, queue_len = queue.len(), "Processing work item");
92 if let Some(output) = self.process_work_item(ctx, item, &mut queue).await? {
93 return Ok(output);
94 }
95 debug!(queue_len = queue.len(), "Queue state after processing");
96 }
97 unreachable!("Workflow did not reach terminal branch");
98 }
99
100 /// Process a single work item, returning the next work item to be processed if any.
101 ///
102 /// This is the core primitive for distributed execution: given a work item (node + input),
103 /// processes it and enqueues any successor work items. If the work item is a terminal node,
104 /// returns its output.
105 ///
106 /// # Arguments
107 /// * `ctx` - The workflow context.
108 /// * `item` - The work item to process (node + input).
109 /// * `__q` - The work queue for successor items (used internally).
110 ///
111 /// # Returns
112 /// * `Ok(Some(Self::Output))` - If this work item was a terminal node and produced output.
113 /// * `Ok(None)` - If more work remains (successors enqueued).
114 /// * `Err(FloxideError)` - If the node processing failed or aborted.
115 async fn process_work_item<'a>(
116 &'a self,
117 ctx: &'a crate::WorkflowCtx<C>,
118 item: Self::WorkItem,
119 queue: &mut std::collections::VecDeque<Self::WorkItem>,
120 ) -> Result<Option<Self::Output>, FloxideError>;
121
122 /// Execute the workflow with checkpointing, saving state after each step.
123 ///
124 /// This allows the workflow to be resumed after interruption or failure.
125 ///
126 /// # Arguments
127 /// * `ctx` - The workflow context.
128 /// * `input` - The input to the workflow's start node.
129 /// * `store` - The checkpoint store to persist state.
130 /// * `id` - The unique run ID for this workflow execution.
131 ///
132 /// # Returns
133 /// * `Ok(Self::Output)` - The output of the terminal node if the workflow completes successfully.
134 /// * `Err(FloxideError)` - If any node returns an error or aborts.
135 async fn run_with_checkpoint<CS: CheckpointStore<C, Self::WorkItem> + Send + Sync>(
136 &self,
137 ctx: &crate::WorkflowCtx<C>,
138 input: Self::Input,
139 store: &CS,
140 id: &str,
141 ) -> Result<Self::Output, FloxideError> {
142 let span = span!(
143 Level::INFO,
144 "workflow_run_with_checkpoint",
145 workflow = self.name(),
146 run_id = id
147 );
148 let _enter = span.enter();
149 // load existing checkpoint or start new
150 let mut cp: Checkpoint<C, Self::WorkItem> = match store
151 .load(id)
152 .await
153 .map_err(|e| FloxideError::Generic(e.to_string()))?
154 {
155 Some(saved) => {
156 debug!("Loaded existing checkpoint");
157 saved
158 }
159 None => {
160 debug!("No checkpoint found, starting new");
161 let mut init_q = VecDeque::new();
162 init_q.push_back(self.start_work_item(input));
163 Checkpoint::new(ctx.store.clone(), init_q)
164 }
165 };
166 let mut queue = cp.queue.clone();
167 if queue.is_empty() {
168 info!("Workflow already completed (empty queue)");
169 return Err(FloxideError::AlreadyCompleted);
170 }
171 // Use parent's WorkflowCtx, but replace the internal store with the checkpoint's context
172 let mut wf_ctx = ctx.clone();
173 wf_ctx.store = cp.context.clone();
174 let ctx_ref = &mut wf_ctx;
175 while let Some(item) = queue.pop_front() {
176 debug!(?item, queue_len = queue.len(), "Processing work item");
177 if let Some(output) = self.process_work_item(ctx_ref, item, &mut queue).await? {
178 return Ok(output);
179 }
180 debug!(queue_len = queue.len(), "Queue state after processing");
181 cp.context = ctx_ref.store.clone();
182 cp.queue = queue.clone();
183 store
184 .save(id, &cp)
185 .await
186 .map_err(|e| FloxideError::Generic(e.to_string()))?;
187 debug!("Checkpoint saved");
188 }
189 unreachable!("Workflow did not reach terminal branch");
190 }
191
192 /// Resume a workflow run from its last checkpoint, restoring context and queue from the store.
193 ///
194 /// # Arguments
195 /// * `store` - The checkpoint store containing saved state.
196 /// * `id` - The unique run ID for this workflow execution.
197 ///
198 /// # Returns
199 /// * `Ok(Self::Output)` - The output of the terminal node if the workflow completes successfully.
200 /// * `Err(FloxideError)` - If any node returns an error or aborts, or if no checkpoint is found.
201 async fn resume<CS: CheckpointStore<C, Self::WorkItem> + Send + Sync>(
202 &self,
203 store: &CS,
204 id: &str,
205 ) -> Result<Self::Output, FloxideError> {
206 let span = span!(
207 Level::INFO,
208 "workflow_resume",
209 workflow = self.name(),
210 checkpoint_id = id
211 );
212 let _enter = span.enter();
213 // load persisted checkpoint or error
214 let mut cp = store
215 .load(id)
216 .await
217 .map_err(|e| FloxideError::Generic(e.to_string()))?
218 .ok_or(FloxideError::NotStarted)?;
219 debug!("Loaded checkpoint for resume");
220 let wf_ctx = crate::WorkflowCtx::new(cp.context.clone());
221 let ctx = &wf_ctx;
222 let mut queue: VecDeque<Self::WorkItem> = cp.queue.clone();
223 if queue.is_empty() {
224 info!("Workflow already completed (empty queue)");
225 return Err(FloxideError::AlreadyCompleted);
226 }
227 // If the queue contains exactly one item and it is terminal, treat as already completed
228 if queue.len() == 1
229 && queue
230 .front()
231 .map(|item| item.is_terminal())
232 .unwrap_or(false)
233 {
234 info!("Workflow already completed (terminal node in queue)");
235 return Err(FloxideError::AlreadyCompleted);
236 }
237 while let Some(item) = queue.pop_front() {
238 debug!(?item, queue_len = queue.len(), "Processing work item");
239 if let Some(output) = self.process_work_item(ctx, item, &mut queue).await? {
240 return Ok(output);
241 }
242 cp.context = ctx.store.clone();
243 cp.queue = queue.clone();
244 store
245 .save(id, &cp)
246 .await
247 .map_err(|e| FloxideError::Generic(e.to_string()))?;
248 debug!("Checkpoint saved");
249 debug!(queue_len = queue.len(), "Queue state after processing");
250 }
251 unreachable!("Workflow did not reach terminal branch");
252 }
253
254 /// Orchestrator primitive: seed the distributed workflow (context + queue) but do not execute steps.
255 ///
256 /// This method is used to initialize a distributed workflow run, creating the initial context and enqueuing the first work item(s).
257 /// No workflow steps are executed by this method; workers will process the steps via `step_distributed`.
258 ///
259 /// # Arguments
260 /// * `ctx` - The workflow context.
261 /// * `input` - The input to the workflow's start node.
262 /// * `context_store` - The distributed context store.
263 /// * `queue` - The distributed work queue.
264 /// * `id` - The unique run ID for this workflow execution.
265 ///
266 /// # Returns
267 /// * `Ok(())` - If the workflow was successfully seeded.
268 /// * `Err(FloxideError)` - If context or queueing failed.
269 async fn start_distributed<CS, Q>(
270 &self,
271 ctx: &crate::WorkflowCtx<C>,
272 input: Self::Input,
273 context_store: &CS,
274 queue: &Q,
275 id: &str,
276 ) -> Result<(), FloxideError>
277 where
278 CS: ContextStore<C> + Send + Sync,
279 Q: WorkQueue<C, Self::WorkItem> + Send + Sync,
280 C: crate::merge::Merge + Default,
281 {
282 let seed_span =
283 span!(Level::DEBUG, "start_distributed", workflow = self.name(), run_id = %id);
284 let _enter = seed_span.enter();
285 debug!(run_id = %id, "start_distributed seeding");
286 // Only seed if not present
287 if context_store
288 .get(id)
289 .await
290 .map_err(|e| FloxideError::Generic(e.to_string()))?
291 .is_none()
292 {
293 let item = self.start_work_item(input);
294 context_store
295 .set(id, ctx.store.clone())
296 .await
297 .map_err(|e| FloxideError::Generic(e.to_string()))?;
298 queue
299 .enqueue(id, item)
300 .await
301 .map_err(|e| FloxideError::Generic(e.to_string()))?;
302 }
303 Ok(())
304 }
305
306 /// Worker primitive: perform one distributed step (dequeue, process, enqueue successors, persist context).
307 ///
308 /// This method is called by distributed workers to process a single work item for any workflow run.
309 /// It loads the latest context, processes the node, enqueues successors, and persists/merges context.
310 /// If a terminal node is reached, returns the output.
311 ///
312 /// # Arguments
313 /// * `context_store` - The distributed context store.
314 /// * `queue` - The distributed work queue.
315 /// * `worker_id` - The unique ID of the worker processing this step.
316 ///
317 /// # Returns
318 /// * `Ok(Some((run_id, output)))` - If a terminal node was processed and output produced.
319 /// * `Ok(None)` - If more work remains for this run.
320 /// * `Err(StepError)` - If processing failed or context/queueing failed.
321 async fn step_distributed<CS, Q>(
322 &self,
323 context_store: &CS,
324 queue: &Q,
325 worker_id: usize,
326 callbacks: Arc<dyn StepCallbacks<C, Self>>,
327 ) -> Result<Option<(String, Self::Output)>, StepError<Self::WorkItem>>
328 where
329 C: 'static + crate::merge::Merge + Default,
330 CS: ContextStore<C> + Send + Sync,
331 Q: crate::distributed::WorkQueue<C, Self::WorkItem> + Send + Sync,
332 {
333 // dequeue one item
334 let work = queue.dequeue().await.map_err(|e| StepError {
335 error: FloxideError::Generic(e.to_string()),
336 run_id: None,
337 work_item: None,
338 })?;
339 let (run_id, item) = match work {
340 None => return Ok(None),
341 Some((rid, it)) => (rid, it),
342 };
343 let step_span = span!(Level::DEBUG, "step_distributed",
344 workflow = self.name(), run_id = %run_id, worker = worker_id);
345 let _enter = step_span.enter();
346 debug!(worker = worker_id, run_id = %run_id, ?item, "Worker dequeued item");
347 // Call on_started and abort if it returns an error
348 let on_started_result = callbacks.on_started(run_id.clone(), item.clone()).await;
349 if let Err(e) = on_started_result {
350 return Err(StepError {
351 error: FloxideError::Generic(format!("on_started_state_updates failed: {:?}", e)),
352 run_id: Some(run_id.clone()),
353 work_item: Some(item.clone()),
354 });
355 }
356 // load context
357 let ctx_val = context_store.get(&run_id).await.map_err(|e| StepError {
358 error: FloxideError::Generic(e.to_string()),
359 run_id: Some(run_id.clone()),
360 work_item: Some(item.clone()),
361 })?;
362
363 let ctx_val = ctx_val.ok_or_else(|| StepError {
364 error: FloxideError::NotStarted,
365 run_id: Some(run_id.clone()),
366 work_item: Some(item.clone()),
367 })?;
368
369 let wf_ctx = crate::WorkflowCtx::new(ctx_val.clone());
370 let ctx_ref = &wf_ctx;
371 let mut local_q = VecDeque::new();
372 let process_result = self
373 .process_work_item(ctx_ref, item.clone(), &mut local_q)
374 .await;
375
376 match process_result {
377 Ok(Some(out)) => {
378 context_store
379 .merge(&run_id, wf_ctx.store.clone())
380 .await
381 .map_err(|e| StepError {
382 error: FloxideError::Generic(e.to_string()),
383 run_id: Some(run_id.clone()),
384 work_item: Some(item.clone()),
385 })?;
386 debug!(worker = worker_id, run_id = %run_id, "Context merged (terminal)");
387 let output_json = serde_json::to_value(&out).map_err(|e| StepError {
388 error: FloxideError::Generic(format!("Failed to serialize output: {}", e)),
389 run_id: Some(run_id.clone()),
390 work_item: Some(item.clone()),
391 })?;
392 let on_item_processed_result = callbacks
393 .on_item_processed(
394 run_id.clone(),
395 item.clone(),
396 ItemProcessedOutcome::SuccessTerminal(output_json),
397 )
398 .await;
399 if let Err(e) = on_item_processed_result {
400 return Err(StepError {
401 error: e,
402 run_id: Some(run_id.clone()),
403 work_item: Some(item.clone()),
404 });
405 }
406 return Ok(Some((run_id.clone(), out)));
407 }
408 Ok(None) => {
409 for succ in local_q.iter() {
410 queue
411 .enqueue(&run_id, succ.clone())
412 .await
413 .map_err(|e| StepError {
414 error: FloxideError::Generic(e.to_string()),
415 run_id: Some(run_id.clone()),
416 work_item: Some(item.clone()),
417 })?;
418 }
419 context_store
420 .merge(&run_id, wf_ctx.store.clone())
421 .await
422 .map_err(|e| StepError {
423 error: FloxideError::Generic(e.to_string()),
424 run_id: Some(run_id.clone()),
425 work_item: Some(item.clone()),
426 })?;
427 debug!(worker = worker_id, run_id = %run_id, "Context merged");
428 let on_item_processed_result = callbacks
429 .on_item_processed(
430 run_id.clone(),
431 item.clone(),
432 ItemProcessedOutcome::SuccessNonTerminal,
433 )
434 .await;
435 if let Err(e) = on_item_processed_result {
436 return Err(StepError {
437 error: e,
438 run_id: Some(run_id.clone()),
439 work_item: Some(item.clone()),
440 });
441 }
442 return Ok(None);
443 }
444 Err(e) => {
445 let on_item_processed_result = callbacks
446 .on_item_processed(
447 run_id.clone(),
448 item.clone(),
449 ItemProcessedOutcome::Error(e.clone()),
450 )
451 .await;
452 if let Err(e) = on_item_processed_result {
453 return Err(StepError {
454 error: e,
455 run_id: Some(run_id.clone()),
456 work_item: Some(item.clone()),
457 });
458 }
459 Err(StepError {
460 error: e,
461 run_id: Some(run_id),
462 work_item: Some(item),
463 })
464 }
465 }
466 }
467
468 /// Export the workflow definition as a Graphviz DOT string.
469 ///
470 /// This method returns a static DOT-format string representing the workflow graph, for visualization or debugging.
471 fn to_dot(&self) -> &'static str;
472}