Skip to main content

FlowEngine

Struct FlowEngine 

Source
pub struct FlowEngine { /* private fields */ }
Expand description

Central entry point for managing workflow executions.

§Example

use a3s_flow::{FlowEngine, NodeRegistry};
use serde_json::json;
use std::collections::HashMap;

#[tokio::main]
async fn main() -> a3s_flow::Result<()> {
    let engine = FlowEngine::new(NodeRegistry::with_defaults());

    // Query available node types.
    println!("node types: {:?}", engine.node_types());

    // Start a workflow and get its execution ID.
    let definition = json!({
        "nodes": [
            { "id": "a", "type": "noop" },
            { "id": "b", "type": "noop" }
        ],
        "edges": [{ "source": "a", "target": "b" }]
    });
    let id = engine.start(&definition, HashMap::new()).await?;

    // Inspect state, pause, resume, or terminate.
    println!("state: {:?}", engine.state(id).await?);
    Ok(())
}

Implementations§

Source§

impl FlowEngine

Source

pub fn new(registry: NodeRegistry) -> Self

Create a new engine with the given node registry.

Uses NoopEventEmitter and no execution store by default. Use the builder methods with_execution_store and with_event_emitter to customise behaviour.

Source

pub fn with_execution_store(self, store: Arc<dyn ExecutionStore>) -> Self

Attach an execution store.

When set, every successfully completed execution result is saved to the store automatically. Returns self for method chaining.

Source

pub fn with_flow_store(self, store: Arc<dyn FlowStore>) -> Self

Attach a flow definition store.

Required for start_named. Allows any backend (in-memory, SQLite, remote API, …) by implementing FlowStore. Returns self for method chaining.

Source

pub fn with_event_emitter(self, emitter: Arc<dyn EventEmitter>) -> Self

Attach a custom event emitter.

The emitter is passed to every runner created by this engine and receives all node and flow lifecycle events. Returns self for chaining.

Source

pub fn with_max_concurrency(self, n: usize) -> Self

Limit the number of nodes that may execute concurrently within a single wave across all executions started by this engine.

Delegates to FlowRunner::with_max_concurrency. Returns self for chaining.

Source

pub fn node_types(&self) -> Vec<String>

Return all registered node type strings, sorted alphabetically.

Includes built-in types (e.g. "noop") and any types registered via NodeRegistry::register.

Source

pub fn node_descriptors(&self) -> Vec<NodeDescriptor>

Return structured descriptors for all registered node types.

This is the preferred discovery API for building UIs, skill pickers, and progressive capability endpoints on top of a3s-flow.

Source

pub fn capabilities(&self) -> FlowCapabilities

Return a transport-friendly capabilities document for this engine.

Higher layers can serialize this value directly as JSON for progressive discovery APIs.

Source

pub fn register_node_type(&self, node: Arc<dyn Node>)

Register or replace a node type for future executions started by this engine.

Source

pub fn register_node_type_with_descriptor( &self, node: Arc<dyn Node>, descriptor: NodeDescriptor, )

Register or replace a node type with explicit discovery metadata.

Source

pub fn unregister_node_type(&self, node_type: &str) -> Result<bool>

Remove a node type from this engine’s registry.

Returns Ok(true) if the node type existed and was removed, Ok(false) if it was not registered, or an error if the type is protected.

Removal only affects future validations and executions. Already running executions keep using the registry snapshot captured when they were started.

Source

pub fn validate(&self, definition: &Value) -> Vec<ValidationIssue>

Validate a flow definition without executing it.

Returns a list of ValidationIssues describing structural problems. An empty list means the definition is valid and ready to run.

The following checks are performed:

  • DAG structural validity: no cycles, no unknown edge references, no duplicate node IDs, at least one node.
  • All node types are registered in the engine’s NodeRegistry.
  • Every run_if.from field references an existing node ID.
use a3s_flow::{FlowEngine, NodeRegistry};
use serde_json::json;

let engine = FlowEngine::new(NodeRegistry::with_defaults());
let def = json!({
    "nodes": [
        { "id": "a", "type": "noop" },
        { "id": "b", "type": "unknown-type" }
    ],
    "edges": []
});
let issues = engine.validate(&def);
assert_eq!(issues.len(), 1);
assert!(issues[0].message.contains("unknown node type"));
Source

pub async fn start( &self, definition: &Value, variables: HashMap<String, Value>, ) -> Result<Uuid>

Start a new workflow execution from a JSON DAG definition.

The definition is parsed and validated synchronously. If valid, the execution is launched in a background Tokio task and the execution ID is returned immediately — the flow runs concurrently with the caller.

§Errors

Returns an error if the definition is invalid (cycle, unknown node ID, bad JSON, unregistered node type).

Source

pub async fn start_streaming( &self, definition: &Value, variables: HashMap<String, Value>, ) -> Result<(Uuid, Receiver<FlowEvent>)>

Start a workflow and return a live event stream alongside the execution ID.

The returned broadcast::Receiver<FlowEvent> is created before the execution task is spawned, guaranteeing that no events are missed — including FlowStarted. Multiple subscribers can be created by calling broadcast::Receiver::resubscribe.

The stream closes (returns Err(RecvError::Closed)) when the execution reaches a terminal state (Completed, Failed, or Terminated).

If the engine also has a custom EventEmitter configured via with_event_emitter, both the emitter and the broadcast channel receive every event.

§Example
use a3s_flow::{FlowEngine, FlowEvent, NodeRegistry};
use serde_json::json;
use std::collections::HashMap;

#[tokio::main]
async fn main() -> a3s_flow::Result<()> {
    let engine = FlowEngine::new(NodeRegistry::with_defaults());
    let def = json!({
        "nodes": [{ "id": "a", "type": "noop" }],
        "edges": []
    });

    let (id, mut rx) = engine.start_streaming(&def, HashMap::new()).await?;

    while let Ok(event) = rx.recv().await {
        match event {
            FlowEvent::NodeCompleted { node_id, .. } => println!("done: {node_id}"),
            FlowEvent::FlowCompleted { .. } => break,
            _ => {}
        }
    }
    Ok(())
}
Source

pub async fn subscribe(&self, id: Uuid) -> Result<Receiver<FlowEvent>>

Subscribe to live events for an existing execution.

The returned receiver attaches to the execution’s dedicated broadcast channel. Events emitted before subscription are not replayed.

Source

pub async fn start_named( &self, name: &str, variables: HashMap<String, Value>, ) -> Result<Uuid>

Start a workflow by loading its definition from the configured FlowStore by name.

Equivalent to:

let def = flow_store.load(name).await?.ok_or(...)?;
engine.start(&def, variables).await
§Errors
Source

pub async fn pause(&self, id: Uuid) -> Result<()>

Pause a running execution at the next wave boundary.

Nodes in the current wave continue until they finish. No new wave starts until resume is called.

§Errors
Source

pub async fn resume(&self, id: Uuid) -> Result<()>

Resume a paused execution.

§Errors
Source

pub async fn terminate(&self, id: Uuid) -> Result<()>

Terminate an execution immediately.

Sends a cancellation signal. The execution task stops at the next cancellation checkpoint (between waves, or within a wave’s result collection). If the execution is currently paused it is unblocked so it can observe the cancellation.

§Errors
Source

pub async fn state(&self, id: Uuid) -> Result<ExecutionState>

Return a snapshot of the current state of an execution.

§Errors
Source

pub async fn get_context(&self, id: Uuid) -> Result<HashMap<String, Value>>

Return a snapshot of the shared mutable context for a running execution.

The context is a HashMap<String, Value> that nodes may read and write via ExecContext::context during execution. This method lets the caller inspect (or react to) the accumulated state from outside the runner.

§Errors
Source

pub async fn set_context_entry( &self, id: Uuid, key: String, value: Value, ) -> Result<()>

Insert or overwrite a single entry in the shared context of a running execution.

The change is immediately visible to any node that reads the context after this call returns.

§Errors
Source

pub async fn delete_context_entry(&self, id: Uuid, key: &str) -> Result<bool>

Remove a single entry from the shared context of a running execution.

Returns true if the key existed and was removed, false if it was not present.

§Errors

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<A, B, T> HttpServerConnExec<A, B> for T
where B: Body,