floxide_core/workflow.rs
1//! # Workflow Execution Modes in Floxide
2//!
3//! Floxide workflows can be executed in several modes, each suited to different use cases:
4//!
5//! ## 1. Monolithic (Single-Process) Execution
6//! - **Method:** [`run`]
7//! - **Description:** Runs the entire workflow from start to finish in a single process, with no checkpointing or distributed coordination.
8//! - **Use Case:** Simple, local workflows where failure recovery and distributed scaling are not needed.
9//!
10//! ## 2. Checkpointed (Resumable) Execution
11//! - **Method:** [`run_with_checkpoint`]
12//! - **Description:** Runs the workflow, checkpointing state after each step. If interrupted, can be resumed from the last checkpoint.
13//! - **Use Case:** Long-running workflows, or those that need to recover from process crashes or restarts.
14//!
15//! ## 3. Resuming from Checkpoint
16//! - **Method:** [`resume`]
17//! - **Description:** Resumes a workflow run from its last checkpoint, restoring context and queue from the store.
18//! - **Use Case:** Recovery after failure, or continuing a workflow after a pause.
19//!
20//! ## 4. Distributed Execution Primitives
21//! - **a. Orchestration/Seeding**
22//! - **Method:** [`start_distributed`]
23//! - **Description:** Seeds the distributed workflow by checkpointing the initial state and enqueuing the first work item(s). Does not execute any steps.
24//! - **Use Case:** Used by an orchestrator to start a new distributed workflow run, preparing it for workers to process.
25//! - **b. Worker Step**
26//! - **Method:** [`step_distributed`]
27//! - **Description:** A distributed worker dequeues a work item, loads the latest checkpoint, processes the node, enqueues successors, and persists state. Returns output if a terminal node is reached.
28//! - **Use Case:** Used by distributed workers to process workflow steps in parallel, with coordination via distributed queue and checkpoint store.
29//!
30//! These distributed methods are the core primitives for building scalable, fault-tolerant workflow systems in Floxide.
31//!
32use std::collections::VecDeque;
33use std::fmt::{Debug, Display};
34use std::sync::Arc;
35
36// crates/floxide-core/src/workflow.rs
37use crate::context::Context;
38use crate::distributed::context_store::ContextStore;
39use crate::distributed::{ItemProcessedOutcome, StepCallbacks, StepError, WorkQueue};
40use crate::error::FloxideError;
41use crate::{Checkpoint, CheckpointStore};
42use async_trait::async_trait;
43use serde::de::DeserializeOwned;
44use serde::Serialize;
45use serde_json;
46use tracing::{debug, info, span, Level};
47
48/// Trait for a workflow work item.
49///
50/// Implementations provide a way to serialize and deserialize work items, and
51/// track unique instances within a workflow run.
52pub trait WorkItem:
53 Debug + Display + Send + Sync + Serialize + DeserializeOwned + Clone + PartialEq
54{
55 /// Returns a unique identifier for this work item instance.
56 fn instance_id(&self) -> String;
57 /// Returns true if this work item is a terminal node (no successors)
58 fn is_terminal(&self) -> bool;
59}
60
61/// Trait for a workflow.
62///
63#[async_trait]
64pub trait Workflow<C: Context>: Debug + Clone + Send + Sync {
65 /// Input type for the workflow
66 type Input: Send + Sync + Serialize + DeserializeOwned;
67 /// Output type returned by the workflow's terminal branch
68 type Output: Send + Sync + Serialize + DeserializeOwned;
69 /// Workflow-specific work item type (macro-generated enum)
70 type WorkItem: WorkItem;
71
72 /// Name of the workflow, used for logging and tracing.
73 fn name(&self) -> &'static str;
74
75 /// Create the initial work item for the workflow start node.
76 fn start_work_item(&self, input: Self::Input) -> Self::WorkItem;
77
78 /// Execute the workflow, returning the output of the terminal branch.
79 ///
80 /// Default implementation dispatches work items via `process_work_item`.
81 async fn run<'a>(
82 &'a self,
83 ctx: &'a crate::WorkflowCtx<C>,
84 input: Self::Input,
85 ) -> Result<Self::Output, FloxideError> {
86 let span = span!(Level::INFO, "workflow_run", workflow = self.name());
87 let _enter = span.enter();
88 let mut queue: VecDeque<Self::WorkItem> = VecDeque::new();
89 queue.push_back(self.start_work_item(input));
90 while let Some(item) = queue.pop_front() {
91 debug!(?item, queue_len = queue.len(), "Processing work item");
92 if let Some(output) = self.process_work_item(ctx, item, &mut queue).await? {
93 return Ok(output);
94 }
95 debug!(queue_len = queue.len(), "Queue state after processing");
96 }
97 unreachable!("Workflow did not reach terminal branch");
98 }
99
100 /// Process a single work item, returning the next work item to be processed if any.
101 ///
102 /// This is the core primitive for distributed execution: given a work item (node + input),
103 /// processes it and enqueues any successor work items. If the work item is a terminal node,
104 /// returns its output.
105 ///
106 /// # Arguments
107 /// * `ctx` - The workflow context.
108 /// * `item` - The work item to process (node + input).
109 /// * `__q` - The work queue for successor items (used internally).
110 ///
111 /// # Returns
112 /// * `Ok(Some(Self::Output))` - If this work item was a terminal node and produced output.
113 /// * `Ok(None)` - If more work remains (successors enqueued).
114 /// * `Err(FloxideError)` - If the node processing failed or aborted.
115 async fn process_work_item<'a>(
116 &'a self,
117 ctx: &'a crate::WorkflowCtx<C>,
118 item: Self::WorkItem,
119 queue: &mut std::collections::VecDeque<Self::WorkItem>,
120 ) -> Result<Option<Self::Output>, FloxideError>;
121
122 /// Execute the workflow with checkpointing, saving state after each step.
123 ///
124 /// This allows the workflow to be resumed after interruption or failure.
125 ///
126 /// # Arguments
127 /// * `ctx` - The workflow context.
128 /// * `input` - The input to the workflow's start node.
129 /// * `store` - The checkpoint store to persist state.
130 /// * `id` - The unique run ID for this workflow execution.
131 ///
132 /// # Returns
133 /// * `Ok(Self::Output)` - The output of the terminal node if the workflow completes successfully.
134 /// * `Err(FloxideError)` - If any node returns an error or aborts.
135 async fn run_with_checkpoint<CS: CheckpointStore<C, Self::WorkItem> + Send + Sync>(
136 &self,
137 ctx: &crate::WorkflowCtx<C>,
138 input: Self::Input,
139 store: &CS,
140 id: &str,
141 ) -> Result<Self::Output, FloxideError> {
142 let span = span!(
143 Level::INFO,
144 "workflow_run_with_checkpoint",
145 workflow = self.name(),
146 run_id = id
147 );
148 let _enter = span.enter();
149 // load existing checkpoint or start new
150 let mut cp: Checkpoint<C, Self::WorkItem> = match store
151 .load(id)
152 .await
153 .map_err(|e| FloxideError::Generic(e.to_string()))?
154 {
155 Some(saved) => {
156 debug!("Loaded existing checkpoint");
157 saved
158 }
159 None => {
160 debug!("No checkpoint found, starting new");
161 let mut init_q = VecDeque::new();
162 init_q.push_back(self.start_work_item(input));
163 Checkpoint::new(ctx.store.clone(), init_q)
164 }
165 };
166 let mut queue = cp.queue.clone();
167 if queue.is_empty() {
168 info!("Workflow already completed (empty queue)");
169 return Err(FloxideError::AlreadyCompleted);
170 }
171 while let Some(item) = queue.pop_front() {
172 debug!(?item, queue_len = queue.len(), "Processing work item");
173 if let Some(output) = self.process_work_item(ctx, item, &mut queue).await? {
174 return Ok(output);
175 }
176 debug!(queue_len = queue.len(), "Queue state after processing");
177 cp.context = ctx.store.clone();
178 cp.queue = queue.clone();
179 store
180 .save(id, &cp)
181 .await
182 .map_err(|e| FloxideError::Generic(e.to_string()))?;
183 debug!("Checkpoint saved");
184 }
185 unreachable!("Workflow did not reach terminal branch");
186 }
187
188 /// Resume a workflow run from its last checkpoint, restoring context and queue from the store.
189 ///
190 /// # Arguments
191 /// * `store` - The checkpoint store containing saved state.
192 /// * `id` - The unique run ID for this workflow execution.
193 ///
194 /// # Returns
195 /// * `Ok(Self::Output)` - The output of the terminal node if the workflow completes successfully.
196 /// * `Err(FloxideError)` - If any node returns an error or aborts, or if no checkpoint is found.
197 async fn resume<CS: CheckpointStore<C, Self::WorkItem> + Send + Sync>(
198 &self,
199 store: &CS,
200 id: &str,
201 ) -> Result<Self::Output, FloxideError> {
202 let span = span!(
203 Level::INFO,
204 "workflow_resume",
205 workflow = self.name(),
206 checkpoint_id = id
207 );
208 let _enter = span.enter();
209 // load persisted checkpoint or error
210 let mut cp = store
211 .load(id)
212 .await
213 .map_err(|e| FloxideError::Generic(e.to_string()))?
214 .ok_or(FloxideError::NotStarted)?;
215 debug!("Loaded checkpoint for resume");
216 let wf_ctx = crate::WorkflowCtx::new(cp.context.clone());
217 let ctx = &wf_ctx;
218 let mut queue: VecDeque<Self::WorkItem> = cp.queue.clone();
219 if queue.is_empty() {
220 info!("Workflow already completed (empty queue)");
221 return Err(FloxideError::AlreadyCompleted);
222 }
223 // If the queue contains exactly one item and it is terminal, treat as already completed
224 if queue.len() == 1
225 && queue
226 .front()
227 .map(|item| item.is_terminal())
228 .unwrap_or(false)
229 {
230 info!("Workflow already completed (terminal node in queue)");
231 return Err(FloxideError::AlreadyCompleted);
232 }
233 while let Some(item) = queue.pop_front() {
234 debug!(?item, queue_len = queue.len(), "Processing work item");
235 if let Some(output) = self.process_work_item(ctx, item, &mut queue).await? {
236 return Ok(output);
237 }
238 cp.context = ctx.store.clone();
239 cp.queue = queue.clone();
240 store
241 .save(id, &cp)
242 .await
243 .map_err(|e| FloxideError::Generic(e.to_string()))?;
244 debug!("Checkpoint saved");
245 debug!(queue_len = queue.len(), "Queue state after processing");
246 }
247 unreachable!("Workflow did not reach terminal branch");
248 }
249
250 /// Orchestrator primitive: seed the distributed workflow (context + queue) but do not execute steps.
251 ///
252 /// This method is used to initialize a distributed workflow run, creating the initial context and enqueuing the first work item(s).
253 /// No workflow steps are executed by this method; workers will process the steps via `step_distributed`.
254 ///
255 /// # Arguments
256 /// * `ctx` - The workflow context.
257 /// * `input` - The input to the workflow's start node.
258 /// * `context_store` - The distributed context store.
259 /// * `queue` - The distributed work queue.
260 /// * `id` - The unique run ID for this workflow execution.
261 ///
262 /// # Returns
263 /// * `Ok(())` - If the workflow was successfully seeded.
264 /// * `Err(FloxideError)` - If context or queueing failed.
265 async fn start_distributed<CS, Q>(
266 &self,
267 ctx: &crate::WorkflowCtx<C>,
268 input: Self::Input,
269 context_store: &CS,
270 queue: &Q,
271 id: &str,
272 ) -> Result<(), FloxideError>
273 where
274 CS: ContextStore<C> + Send + Sync,
275 Q: WorkQueue<C, Self::WorkItem> + Send + Sync,
276 C: crate::merge::Merge + Default,
277 {
278 let seed_span =
279 span!(Level::DEBUG, "start_distributed", workflow = self.name(), run_id = %id);
280 let _enter = seed_span.enter();
281 debug!(run_id = %id, "start_distributed seeding");
282 // Only seed if not present
283 if context_store
284 .get(id)
285 .await
286 .map_err(|e| FloxideError::Generic(e.to_string()))?
287 .is_none()
288 {
289 let item = self.start_work_item(input);
290 context_store
291 .set(id, ctx.store.clone())
292 .await
293 .map_err(|e| FloxideError::Generic(e.to_string()))?;
294 queue
295 .enqueue(id, item)
296 .await
297 .map_err(|e| FloxideError::Generic(e.to_string()))?;
298 }
299 Ok(())
300 }
301
302 /// Worker primitive: perform one distributed step (dequeue, process, enqueue successors, persist context).
303 ///
304 /// This method is called by distributed workers to process a single work item for any workflow run.
305 /// It loads the latest context, processes the node, enqueues successors, and persists/merges context.
306 /// If a terminal node is reached, returns the output.
307 ///
308 /// # Arguments
309 /// * `context_store` - The distributed context store.
310 /// * `queue` - The distributed work queue.
311 /// * `worker_id` - The unique ID of the worker processing this step.
312 ///
313 /// # Returns
314 /// * `Ok(Some((run_id, output)))` - If a terminal node was processed and output produced.
315 /// * `Ok(None)` - If more work remains for this run.
316 /// * `Err(StepError)` - If processing failed or context/queueing failed.
317 async fn step_distributed<CS, Q>(
318 &self,
319 context_store: &CS,
320 queue: &Q,
321 worker_id: usize,
322 callbacks: Arc<dyn StepCallbacks<C, Self>>,
323 ) -> Result<Option<(String, Self::Output)>, StepError<Self::WorkItem>>
324 where
325 C: 'static + crate::merge::Merge + Default,
326 CS: ContextStore<C> + Send + Sync,
327 Q: crate::distributed::WorkQueue<C, Self::WorkItem> + Send + Sync,
328 {
329 // dequeue one item
330 let work = queue.dequeue().await.map_err(|e| StepError {
331 error: FloxideError::Generic(e.to_string()),
332 run_id: None,
333 work_item: None,
334 })?;
335 let (run_id, item) = match work {
336 None => return Ok(None),
337 Some((rid, it)) => (rid, it),
338 };
339 let step_span = span!(Level::DEBUG, "step_distributed",
340 workflow = self.name(), run_id = %run_id, worker = worker_id);
341 let _enter = step_span.enter();
342 debug!(worker = worker_id, run_id = %run_id, ?item, "Worker dequeued item");
343 // Call on_started and abort if it returns an error
344 let on_started_result = callbacks.on_started(run_id.clone(), item.clone()).await;
345 if let Err(e) = on_started_result {
346 return Err(StepError {
347 error: FloxideError::Generic(format!("on_started_state_updates failed: {:?}", e)),
348 run_id: Some(run_id.clone()),
349 work_item: Some(item.clone()),
350 });
351 }
352 // load context
353 let ctx_val = context_store.get(&run_id).await.map_err(|e| StepError {
354 error: FloxideError::Generic(e.to_string()),
355 run_id: Some(run_id.clone()),
356 work_item: Some(item.clone()),
357 })?;
358
359 let ctx_val = ctx_val.ok_or_else(|| StepError {
360 error: FloxideError::NotStarted,
361 run_id: Some(run_id.clone()),
362 work_item: Some(item.clone()),
363 })?;
364
365 let wf_ctx = crate::WorkflowCtx::new(ctx_val.clone());
366 let ctx_ref = &wf_ctx;
367 let mut local_q = VecDeque::new();
368 let process_result = self
369 .process_work_item(ctx_ref, item.clone(), &mut local_q)
370 .await;
371
372 match process_result {
373 Ok(Some(out)) => {
374 context_store
375 .merge(&run_id, wf_ctx.store.clone())
376 .await
377 .map_err(|e| StepError {
378 error: FloxideError::Generic(e.to_string()),
379 run_id: Some(run_id.clone()),
380 work_item: Some(item.clone()),
381 })?;
382 debug!(worker = worker_id, run_id = %run_id, "Context merged (terminal)");
383 let output_json = serde_json::to_value(&out).map_err(|e| StepError {
384 error: FloxideError::Generic(format!("Failed to serialize output: {}", e)),
385 run_id: Some(run_id.clone()),
386 work_item: Some(item.clone()),
387 })?;
388 let on_item_processed_result = callbacks
389 .on_item_processed(
390 run_id.clone(),
391 item.clone(),
392 ItemProcessedOutcome::SuccessTerminal(output_json),
393 )
394 .await;
395 if let Err(e) = on_item_processed_result {
396 return Err(StepError {
397 error: e,
398 run_id: Some(run_id.clone()),
399 work_item: Some(item.clone()),
400 });
401 }
402 return Ok(Some((run_id.clone(), out)));
403 }
404 Ok(None) => {
405 for succ in local_q.iter() {
406 queue
407 .enqueue(&run_id, succ.clone())
408 .await
409 .map_err(|e| StepError {
410 error: FloxideError::Generic(e.to_string()),
411 run_id: Some(run_id.clone()),
412 work_item: Some(item.clone()),
413 })?;
414 }
415 context_store
416 .merge(&run_id, wf_ctx.store.clone())
417 .await
418 .map_err(|e| StepError {
419 error: FloxideError::Generic(e.to_string()),
420 run_id: Some(run_id.clone()),
421 work_item: Some(item.clone()),
422 })?;
423 debug!(worker = worker_id, run_id = %run_id, "Context merged");
424 let on_item_processed_result = callbacks
425 .on_item_processed(
426 run_id.clone(),
427 item.clone(),
428 ItemProcessedOutcome::SuccessNonTerminal,
429 )
430 .await;
431 if let Err(e) = on_item_processed_result {
432 return Err(StepError {
433 error: e,
434 run_id: Some(run_id.clone()),
435 work_item: Some(item.clone()),
436 });
437 }
438 return Ok(None);
439 }
440 Err(e) => {
441 let on_item_processed_result = callbacks
442 .on_item_processed(
443 run_id.clone(),
444 item.clone(),
445 ItemProcessedOutcome::Error(e.clone()),
446 )
447 .await;
448 if let Err(e) = on_item_processed_result {
449 return Err(StepError {
450 error: e,
451 run_id: Some(run_id.clone()),
452 work_item: Some(item.clone()),
453 });
454 }
455 Err(StepError {
456 error: e,
457 run_id: Some(run_id),
458 work_item: Some(item),
459 })
460 }
461 }
462 }
463
464 /// Export the workflow definition as a Graphviz DOT string.
465 ///
466 /// This method returns a static DOT-format string representing the workflow graph, for visualization or debugging.
467 fn to_dot(&self) -> &'static str;
468}