pub struct FlowEngine { /* private fields */ }Expand description
Central entry point for managing workflow executions.
§Example
use a3s_flow::{FlowEngine, NodeRegistry};
use serde_json::json;
use std::collections::HashMap;
#[tokio::main]
async fn main() -> a3s_flow::Result<()> {
let engine = FlowEngine::new(NodeRegistry::with_defaults());
// Query available node types.
println!("node types: {:?}", engine.node_types());
// Start a workflow and get its execution ID.
let definition = json!({
"nodes": [
{ "id": "a", "type": "noop" },
{ "id": "b", "type": "noop" }
],
"edges": [{ "source": "a", "target": "b" }]
});
let id = engine.start(&definition, HashMap::new()).await?;
// Inspect state, pause, resume, or terminate.
println!("state: {:?}", engine.state(id).await?);
Ok(())
}Implementations§
Source§impl FlowEngine
impl FlowEngine
Sourcepub fn new(registry: NodeRegistry) -> Self
pub fn new(registry: NodeRegistry) -> Self
Create a new engine with the given node registry.
Uses NoopEventEmitter and no execution store by default. Use the
builder methods with_execution_store and
with_event_emitter to customise behaviour.
Sourcepub fn with_execution_store(self, store: Arc<dyn ExecutionStore>) -> Self
pub fn with_execution_store(self, store: Arc<dyn ExecutionStore>) -> Self
Attach an execution store.
When set, every successfully completed execution result is saved to the
store automatically. Returns self for method chaining.
Sourcepub fn with_flow_store(self, store: Arc<dyn FlowStore>) -> Self
pub fn with_flow_store(self, store: Arc<dyn FlowStore>) -> Self
Attach a flow definition store.
Required for start_named. Allows any backend
(in-memory, SQLite, remote API, …) by implementing FlowStore.
Returns self for method chaining.
Sourcepub fn with_event_emitter(self, emitter: Arc<dyn EventEmitter>) -> Self
pub fn with_event_emitter(self, emitter: Arc<dyn EventEmitter>) -> Self
Attach a custom event emitter.
The emitter is passed to every runner created by this engine and
receives all node and flow lifecycle events. Returns self for chaining.
Sourcepub fn with_max_concurrency(self, n: usize) -> Self
pub fn with_max_concurrency(self, n: usize) -> Self
Limit the number of nodes that may execute concurrently within a single wave across all executions started by this engine.
Delegates to FlowRunner::with_max_concurrency. Returns self for chaining.
Sourcepub fn node_types(&self) -> Vec<String>
pub fn node_types(&self) -> Vec<String>
Return all registered node type strings, sorted alphabetically.
Includes built-in types (e.g. "noop") and any types registered via
NodeRegistry::register.
Sourcepub fn node_descriptors(&self) -> Vec<NodeDescriptor>
pub fn node_descriptors(&self) -> Vec<NodeDescriptor>
Return structured descriptors for all registered node types.
This is the preferred discovery API for building UIs, skill pickers,
and progressive capability endpoints on top of a3s-flow.
Sourcepub fn capabilities(&self) -> FlowCapabilities
pub fn capabilities(&self) -> FlowCapabilities
Return a transport-friendly capabilities document for this engine.
Higher layers can serialize this value directly as JSON for progressive discovery APIs.
Sourcepub fn register_node_type(&self, node: Arc<dyn Node>)
pub fn register_node_type(&self, node: Arc<dyn Node>)
Register or replace a node type for future executions started by this engine.
Sourcepub fn register_node_type_with_descriptor(
&self,
node: Arc<dyn Node>,
descriptor: NodeDescriptor,
)
pub fn register_node_type_with_descriptor( &self, node: Arc<dyn Node>, descriptor: NodeDescriptor, )
Register or replace a node type with explicit discovery metadata.
Sourcepub fn unregister_node_type(&self, node_type: &str) -> Result<bool>
pub fn unregister_node_type(&self, node_type: &str) -> Result<bool>
Remove a node type from this engine’s registry.
Returns Ok(true) if the node type existed and was removed, Ok(false)
if it was not registered, or an error if the type is protected.
Removal only affects future validations and executions. Already running executions keep using the registry snapshot captured when they were started.
Sourcepub fn validate(&self, definition: &Value) -> Vec<ValidationIssue>
pub fn validate(&self, definition: &Value) -> Vec<ValidationIssue>
Validate a flow definition without executing it.
Returns a list of ValidationIssues describing structural problems.
An empty list means the definition is valid and ready to run.
The following checks are performed:
- DAG structural validity: no cycles, no unknown edge references, no duplicate node IDs, at least one node.
- All node types are registered in the engine’s
NodeRegistry. - Every
run_if.fromfield references an existing node ID.
use a3s_flow::{FlowEngine, NodeRegistry};
use serde_json::json;
let engine = FlowEngine::new(NodeRegistry::with_defaults());
let def = json!({
"nodes": [
{ "id": "a", "type": "noop" },
{ "id": "b", "type": "unknown-type" }
],
"edges": []
});
let issues = engine.validate(&def);
assert_eq!(issues.len(), 1);
assert!(issues[0].message.contains("unknown node type"));Sourcepub async fn start(
&self,
definition: &Value,
variables: HashMap<String, Value>,
) -> Result<Uuid>
pub async fn start( &self, definition: &Value, variables: HashMap<String, Value>, ) -> Result<Uuid>
Start a new workflow execution from a JSON DAG definition.
The definition is parsed and validated synchronously. If valid, the execution is launched in a background Tokio task and the execution ID is returned immediately — the flow runs concurrently with the caller.
§Errors
Returns an error if the definition is invalid (cycle, unknown node ID, bad JSON, unregistered node type).
Sourcepub async fn start_streaming(
&self,
definition: &Value,
variables: HashMap<String, Value>,
) -> Result<(Uuid, Receiver<FlowEvent>)>
pub async fn start_streaming( &self, definition: &Value, variables: HashMap<String, Value>, ) -> Result<(Uuid, Receiver<FlowEvent>)>
Start a workflow and return a live event stream alongside the execution ID.
The returned broadcast::Receiver<FlowEvent> is created before the
execution task is spawned, guaranteeing that no events are missed —
including FlowStarted. Multiple subscribers can be created by calling
broadcast::Receiver::resubscribe.
The stream closes (returns Err(RecvError::Closed)) when the execution
reaches a terminal state (Completed, Failed, or Terminated).
If the engine also has a custom EventEmitter configured via
with_event_emitter, both the emitter and
the broadcast channel receive every event.
§Example
use a3s_flow::{FlowEngine, FlowEvent, NodeRegistry};
use serde_json::json;
use std::collections::HashMap;
#[tokio::main]
async fn main() -> a3s_flow::Result<()> {
let engine = FlowEngine::new(NodeRegistry::with_defaults());
let def = json!({
"nodes": [{ "id": "a", "type": "noop" }],
"edges": []
});
let (id, mut rx) = engine.start_streaming(&def, HashMap::new()).await?;
while let Ok(event) = rx.recv().await {
match event {
FlowEvent::NodeCompleted { node_id, .. } => println!("done: {node_id}"),
FlowEvent::FlowCompleted { .. } => break,
_ => {}
}
}
Ok(())
}Sourcepub async fn subscribe(&self, id: Uuid) -> Result<Receiver<FlowEvent>>
pub async fn subscribe(&self, id: Uuid) -> Result<Receiver<FlowEvent>>
Subscribe to live events for an existing execution.
The returned receiver attaches to the execution’s dedicated broadcast channel. Events emitted before subscription are not replayed.
Sourcepub async fn start_named(
&self,
name: &str,
variables: HashMap<String, Value>,
) -> Result<Uuid>
pub async fn start_named( &self, name: &str, variables: HashMap<String, Value>, ) -> Result<Uuid>
Start a workflow by loading its definition from the configured
FlowStore by name.
Equivalent to:
let def = flow_store.load(name).await?.ok_or(...)?;
engine.start(&def, variables).await§Errors
FlowError::Internalif noFlowStorewas configured viawith_flow_store.FlowError::FlowNotFoundif no definition exists undername.- Any error returned by
start(invalid definition, etc.).
Sourcepub async fn pause(&self, id: Uuid) -> Result<()>
pub async fn pause(&self, id: Uuid) -> Result<()>
Pause a running execution at the next wave boundary.
Nodes in the current wave continue until they finish. No new wave
starts until resume is called.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.FlowError::InvalidTransitionif the execution is notRunning.
Sourcepub async fn resume(&self, id: Uuid) -> Result<()>
pub async fn resume(&self, id: Uuid) -> Result<()>
Resume a paused execution.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.FlowError::InvalidTransitionif the execution is notPaused.
Sourcepub async fn terminate(&self, id: Uuid) -> Result<()>
pub async fn terminate(&self, id: Uuid) -> Result<()>
Terminate an execution immediately.
Sends a cancellation signal. The execution task stops at the next cancellation checkpoint (between waves, or within a wave’s result collection). If the execution is currently paused it is unblocked so it can observe the cancellation.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.FlowError::InvalidTransitionif the execution is already in a terminal state (Completed,Failed,Terminated).
Sourcepub async fn state(&self, id: Uuid) -> Result<ExecutionState>
pub async fn state(&self, id: Uuid) -> Result<ExecutionState>
Return a snapshot of the current state of an execution.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.
Sourcepub async fn get_context(&self, id: Uuid) -> Result<HashMap<String, Value>>
pub async fn get_context(&self, id: Uuid) -> Result<HashMap<String, Value>>
Return a snapshot of the shared mutable context for a running execution.
The context is a HashMap<String, Value> that nodes may read and write
via ExecContext::context during
execution. This method lets the caller inspect (or react to) the
accumulated state from outside the runner.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.
Sourcepub async fn set_context_entry(
&self,
id: Uuid,
key: String,
value: Value,
) -> Result<()>
pub async fn set_context_entry( &self, id: Uuid, key: String, value: Value, ) -> Result<()>
Insert or overwrite a single entry in the shared context of a running execution.
The change is immediately visible to any node that reads the context after this call returns.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.
Sourcepub async fn delete_context_entry(&self, id: Uuid, key: &str) -> Result<bool>
pub async fn delete_context_entry(&self, id: Uuid, key: &str) -> Result<bool>
Remove a single entry from the shared context of a running execution.
Returns true if the key existed and was removed, false if it was
not present.
§Errors
FlowError::ExecutionNotFoundif the ID is unknown.