pub struct FlowEngine { /* private fields */ }Expand description
Central entry point for managing workflow executions.
§Example
use a3s_flow::{FlowEngine, NodeRegistry};
use serde_json::json;
use std::collections::HashMap;
#[tokio::main]
async fn main() -> a3s_flow::Result<()> {
let engine = FlowEngine::new(NodeRegistry::with_defaults());
// Query available node types.
println!("node types: {:?}", engine.node_types());
// Start a workflow and get its execution ID.
let definition = json!({
"nodes": [
{ "id": "a", "type": "noop" },
{ "id": "b", "type": "noop" }
],
"edges": [{ "source": "a", "target": "b" }]
});
let id = engine.start(&definition, HashMap::new()).await?;
// Inspect state, pause, resume, or terminate.
println!("state: {:?}", engine.state(id).await?);
Ok(())
}Implementations§
Source§impl FlowEngine
impl FlowEngine
Sourcepub fn new(registry: NodeRegistry) -> Self
pub fn new(registry: NodeRegistry) -> Self
Create a new engine with the given node registry.
Uses NoopEventEmitter and no execution store by default. Use the
builder methods with_execution_store and
with_event_emitter to customise behaviour.
Sourcepub fn with_execution_store(self, store: Arc<dyn ExecutionStore>) -> Self
pub fn with_execution_store(self, store: Arc<dyn ExecutionStore>) -> Self
Attach an execution store.
When set, every successfully completed execution result is saved to the
store automatically. Returns self for method chaining.
Sourcepub fn with_flow_store(self, store: Arc<dyn FlowStore>) -> Self
pub fn with_flow_store(self, store: Arc<dyn FlowStore>) -> Self
Attach a flow definition store.
Required for start_named. Allows any backend
(in-memory, SQLite, remote API, …) by implementing FlowStore.
Returns self for method chaining.
Sourcepub fn with_event_emitter(self, emitter: Arc<dyn EventEmitter>) -> Self
pub fn with_event_emitter(self, emitter: Arc<dyn EventEmitter>) -> Self
Attach a custom event emitter.
The emitter is passed to every runner created by this engine and
receives all node and flow lifecycle events. Returns self for chaining.
Sourcepub fn with_max_concurrency(self, n: usize) -> Self
pub fn with_max_concurrency(self, n: usize) -> Self
Limit the number of nodes that may execute concurrently within a single wave across all executions started by this engine.
Delegates to FlowRunner::with_max_concurrency. Returns self for chaining.
Sourcepub fn node_types(&self) -> Vec<String>
pub fn node_types(&self) -> Vec<String>
Return all registered node type strings, sorted alphabetically.
Includes built-in types (e.g. "noop") and any types registered via
NodeRegistry::register.
Sourcepub fn validate(&self, definition: &Value) -> Vec<ValidationIssue>
pub fn validate(&self, definition: &Value) -> Vec<ValidationIssue>
Validate a flow definition without executing it.
Returns a list of ValidationIssues describing structural problems.
An empty list means the definition is valid and ready to run.
The following checks are performed:
- DAG structural validity: no cycles, no unknown edge references, no duplicate node IDs, at least one node.
- All node types are registered in the engine’s
NodeRegistry. - Every
run_if.fromfield references an existing node ID.
use a3s_flow::{FlowEngine, NodeRegistry};
use serde_json::json;
let engine = FlowEngine::new(NodeRegistry::with_defaults());
let def = json!({
"nodes": [
{ "id": "a", "type": "noop" },
{ "id": "b", "type": "unknown-type" }
],
"edges": []
});
let issues = engine.validate(&def);
assert_eq!(issues.len(), 1);
assert!(issues[0].message.contains("unknown node type"));Sourcepub async fn start(
&self,
definition: &Value,
variables: HashMap<String, Value>,
) -> Result<Uuid>
pub async fn start( &self, definition: &Value, variables: HashMap<String, Value>, ) -> Result<Uuid>
Start a new workflow execution from a JSON DAG definition.
The definition is parsed and validated synchronously. If valid, the execution is launched in a background Tokio task and the execution ID is returned immediately — the flow runs concurrently with the caller.
§Errors
Returns an error if the definition is invalid (cycle, unknown node ID, bad JSON, unregistered node type).
Sourcepub async fn start_streaming(
&self,
definition: &Value,
variables: HashMap<String, Value>,
) -> Result<(Uuid, Receiver<FlowEvent>)>
pub async fn start_streaming( &self, definition: &Value, variables: HashMap<String, Value>, ) -> Result<(Uuid, Receiver<FlowEvent>)>
Start a workflow and return a live event stream alongside the execution ID.
The returned broadcast::Receiver<FlowEvent> is created before the
execution task is spawned, guaranteeing that no events are missed —
including FlowStarted. Multiple subscribers can be created by calling
broadcast::Receiver::resubscribe.
The stream closes (returns Err(RecvError::Closed)) when the execution
reaches a terminal state (Completed, Failed, or Terminated).
If the engine also has a custom EventEmitter configured via
with_event_emitter, both the emitter and
the broadcast channel receive every event.
§Example
use a3s_flow::{FlowEngine, FlowEvent, NodeRegistry};
use serde_json::json;
use std::collections::HashMap;
#[tokio::main]
async fn main() -> a3s_flow::Result<()> {
let engine = FlowEngine::new(NodeRegistry::with_defaults());
let def = json!({
"nodes": [{ "id": "a", "type": "noop" }],
"edges": []
});
let (id, mut rx) = engine.start_streaming(&def, HashMap::new()).await?;
while let Ok(event) = rx.recv().await {
match event {
FlowEvent::NodeCompleted { node_id, .. } => println!("done: {node_id}"),
FlowEvent::FlowCompleted { .. } => break,
_ => {}
}
}
Ok(())
}Sourcepub async fn start_named(
&self,
name: &str,
variables: HashMap<String, Value>,
) -> Result<Uuid>
pub async fn start_named( &self, name: &str, variables: HashMap<String, Value>, ) -> Result<Uuid>
Start a workflow by loading its definition from the configured
FlowStore by name.
Equivalent to:
let def = flow_store.load(name).await?.ok_or(...)?;
engine.start(&def, variables).await§Errors
FlowError::Internalif noFlowStorewas configured viawith_flow_store.FlowError::FlowNotFoundif no definition exists undername.- Any error returned by
start(invalid definition, etc.).
Sourcepub async fn pause(&self, id: Uuid) -> Result<()>
pub async fn pause(&self, id: Uuid) -> Result<()>
Pause a running execution at the next wave boundary.
Nodes in the current wave continue until they finish. No new wave
starts until resume is called.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.FlowError::InvalidTransitionif the execution is notRunning.
Sourcepub async fn resume(&self, id: Uuid) -> Result<()>
pub async fn resume(&self, id: Uuid) -> Result<()>
Resume a paused execution.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.FlowError::InvalidTransitionif the execution is notPaused.
Sourcepub async fn terminate(&self, id: Uuid) -> Result<()>
pub async fn terminate(&self, id: Uuid) -> Result<()>
Terminate an execution immediately.
Sends a cancellation signal. The execution task stops at the next cancellation checkpoint (between waves, or within a wave’s result collection). If the execution is currently paused it is unblocked so it can observe the cancellation.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.FlowError::InvalidTransitionif the execution is already in a terminal state (Completed,Failed,Terminated).
Sourcepub async fn state(&self, id: Uuid) -> Result<ExecutionState>
pub async fn state(&self, id: Uuid) -> Result<ExecutionState>
Return a snapshot of the current state of an execution.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.
Sourcepub async fn get_context(&self, id: Uuid) -> Result<HashMap<String, Value>>
pub async fn get_context(&self, id: Uuid) -> Result<HashMap<String, Value>>
Return a snapshot of the shared mutable context for a running execution.
The context is a HashMap<String, Value> that nodes may read and write
via ExecContext::context during
execution. This method lets the caller inspect (or react to) the
accumulated state from outside the runner.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.
Sourcepub async fn set_context_entry(
&self,
id: Uuid,
key: String,
value: Value,
) -> Result<()>
pub async fn set_context_entry( &self, id: Uuid, key: String, value: Value, ) -> Result<()>
Insert or overwrite a single entry in the shared context of a running execution.
The change is immediately visible to any node that reads the context after this call returns.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.
Sourcepub async fn delete_context_entry(&self, id: Uuid, key: &str) -> Result<bool>
pub async fn delete_context_entry(&self, id: Uuid, key: &str) -> Result<bool>
Remove a single entry from the shared context of a running execution.
Returns true if the key existed and was removed, false if it was
not present.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.