Skip to main content

Engine

Struct Engine 

Source
pub struct Engine { /* private fields */ }
Expand description

The main Varpulis engine

Implementations§

Source§

impl Engine

Source

pub async fn process(&mut self, event: Event) -> Result<(), EngineError>

Process an incoming event (async-runtime only).

Source

pub async fn process_shared( &mut self, event: SharedEvent, ) -> Result<(), EngineError>

Process a pre-wrapped SharedEvent (async-runtime only).

Source

pub async fn process_batch( &mut self, events: Vec<Event>, ) -> Result<(), EngineError>

Process a batch of events for improved throughput (async-runtime only).

Source

pub fn process_batch_sync( &mut self, events: Vec<Event>, ) -> Result<(), EngineError>

Synchronous batch processing for maximum throughput. Use this when no .to() sink operations are in the pipeline (e.g., benchmark mode). Avoids async runtime overhead completely.

Source

pub async fn process_batch_shared( &mut self, events: Vec<SharedEvent>, ) -> Result<(), EngineError>

Process a batch of pre-wrapped SharedEvents (async-runtime only).

Source

pub async fn flush_expired_sessions(&mut self) -> Result<(), EngineError>

Flush all expired session windows (async-runtime only).

Source§

impl Engine

Source

pub fn builder() -> EngineBuilder

Create an EngineBuilder for fluent engine construction (async-runtime only).

Source

pub fn new(output_tx: Sender<Event>) -> Self

Create engine with an owned output channel (async-runtime only).

Source

pub fn new_benchmark() -> Self

Create engine without output channel (for benchmarking - skips Event cloning overhead). Available in both async and WASM builds.

Source

pub fn new_with_optional_output(output_tx: Option<Sender<Event>>) -> Self

Create engine with optional output channel (async-runtime only, legacy API).

Source

pub fn new_shared(output_tx: Sender<SharedEvent>) -> Self

Create engine with zero-copy SharedEvent output channel (async-runtime only).

Source

pub fn set_credentials_store(&mut self, store: Arc<CredentialsStore>)

Set the connector credentials store for profile resolution (async-runtime only).

Source

pub fn new_sync() -> Self

Constructor for sync-only / WASM builds (no tokio, no output channel). Outputs are collected internally and returned via process_batch_sync_collect().

Source

pub fn process_batch_sync_collect( &mut self, events: Vec<Event>, ) -> Result<Vec<Event>, EngineError>

Process events synchronously and return collected outputs directly. Used by WASM builds and tests where mpsc channels are not needed.

Source

pub fn set_context_name(&mut self, name: &str)

Set the context name for this engine instance.

Source

pub fn set_topic_prefix(&mut self, prefix: &str)

Set a topic prefix for multi-tenant isolation (call before load()).

When set, all Kafka and MQTT topic names will be prefixed with {prefix}. to enforce per-tenant topic isolation.

Source

pub fn set_trace_enabled(&mut self, enabled: bool)

Enable or disable pipeline trace collection.

When enabled, the engine records which streams each event is routed to, which operators pass or block the event, SASE pattern state, and emitted output events. Use drain_trace to retrieve entries.

Source

pub fn is_trace_enabled(&self) -> bool

Whether pipeline trace collection is currently enabled.

Source

pub fn drain_trace(&mut self) -> Vec<TraceEntry>

Drain all collected trace entries since the last drain.

Source

pub fn set_dlq_path(&mut self, path: PathBuf)

Set a custom DLQ file path (call before load()).

Source

pub const fn set_dlq_config(&mut self, config: DlqConfig)

Set custom DLQ configuration (call before load()).

Source

pub const fn dlq(&self) -> Option<&Arc<DeadLetterQueue>>

Access the shared DLQ instance (created during load()).

Source

pub fn get_pattern(&self, name: &str) -> Option<&NamedPattern>

Get a named pattern by name

Source

pub const fn patterns(&self) -> &FxHashMap<String, NamedPattern>

Get all registered patterns

Source

pub fn get_config(&self, name: &str) -> Option<&EngineConfig>

Get a configuration block by name

Source

pub fn get_connector(&self, name: &str) -> Option<&ConnectorConfig>

Get a declared connector by name (async-runtime only).

Source

pub const fn connector_configs(&self) -> &FxHashMap<String, ConnectorConfig>

Get all declared connector configs (async-runtime only).

Source

pub fn source_bindings(&self) -> &[SourceBinding]

Get source connector bindings from .from() declarations

Source

pub fn get_variable(&self, name: &str) -> Option<&Value>

Get a variable value by name

Source

pub fn set_variable( &mut self, name: &str, value: Value, ) -> Result<(), EngineError>

Set a variable value (must be mutable or new)

Source

pub const fn variables(&self) -> &FxHashMap<String, Value>

Get all variables (for debugging/testing)

Source

pub const fn context_map(&self) -> &ContextMap

Get the context map (for orchestrator setup, async-runtime only).

Source

pub fn has_contexts(&self) -> bool

Check if the loaded program declares any contexts (async-runtime only).

Source

pub fn with_metrics(self, metrics: Metrics) -> Self

Enable Prometheus metrics (async-runtime only).

Source

pub fn add_filter<F>( &mut self, stream_name: &str, filter: F, ) -> Result<(), EngineError>
where F: Fn(&Event) -> bool + Send + Sync + 'static,

Add a programmatic filter to a stream using a closure

Source

pub fn load_with_source( &mut self, source: &str, program: &Program, ) -> Result<(), EngineError>

Load a program into the engine with semantic validation.

Source

pub fn load(&mut self, program: &Program) -> Result<(), EngineError>

Load a program into the engine (no semantic validation).

Source

pub async fn connect_sinks(&self) -> Result<(), EngineError>

Connect all sinks that require explicit connection (async-runtime only).

Source

pub fn inject_sink(&mut self, key: &str, sink: Arc<dyn Sink>)

Inject a pre-built sink into the engine’s registry (async-runtime only).

Source

pub fn has_sink(&self, key: &str) -> bool

Check whether a given key has a registered sink (async-runtime only).

Source

pub fn sink_keys_for_connector(&self, connector_name: &str) -> Vec<String>

Return all sink keys that belong to a given connector name (async-runtime only).

Source

pub fn has_sink_operations(&self) -> bool

Check if any registered stream uses .to() or .enrich() operations.

Source

pub const fn event_counters(&self) -> (u64, u64)

Returns (events_in, events_out) counters for this engine.

Source

pub fn is_stateless(&self) -> bool

Check if all streams are stateless (safe for round-robin distribution).

Source

pub fn partition_key(&self) -> Option<String>

Return the partition key used by partitioned operations, if any.

Source

pub fn has_session_windows(&self) -> bool

Check if any registered stream has session windows.

Source

pub fn min_session_gap(&self) -> Option<Duration>

Return the smallest session gap across all streams (used as sweep interval).

Source

pub fn metrics(&self) -> EngineMetrics

Get metrics

Source

pub fn stream_names(&self) -> Vec<&str>

Get the names of all loaded streams.

Source

pub fn physical_plan_summary(&self) -> Option<String>

Get the physical plan summary (available after load()).

Source

pub fn explain(&self, program: &Program) -> Result<String, EngineError>

Source

pub fn topology(&self) -> Topology

Build a topology snapshot of the currently loaded streams and routes.

Source

pub fn get_function(&self, name: &str) -> Option<&UserFunction>

Get a user-defined function by name

Source

pub fn function_names(&self) -> Vec<&str>

Get all registered function names

Source

pub fn get_timers(&self) -> Vec<(u64, Option<u64>, String)>

Get all timer configurations for spawning timer tasks

Source

pub fn register_scalar_udf(&mut self, udf: Arc<dyn ScalarUDF>)

Register a native scalar UDF.

Source

pub fn register_aggregate_udf(&mut self, udf: Arc<dyn AggregateUDF>)

Register a native aggregate UDF.

Source

pub const fn udf_registry(&self) -> &UdfRegistry

Get a reference to the UDF registry.

Source

pub fn reload(&mut self, program: &Program) -> Result<ReloadReport, EngineError>

Reload program without losing state where possible (async-runtime only).

Source

pub fn create_checkpoint(&self) -> EngineCheckpoint

Create a checkpoint of the engine state (windows, SASE engines, joins, variables).

Source

pub fn restore_checkpoint( &mut self, cp: &EngineCheckpoint, ) -> Result<(), StoreError>

Restore engine state from a checkpoint.

Source

pub fn enable_checkpointing( &mut self, store: Arc<dyn StateStore>, config: CheckpointConfig, ) -> Result<(), StoreError>

Enable auto-checkpointing with the given store and config.

Source

pub fn checkpoint_tick(&mut self) -> Result<(), StoreError>

Check if a checkpoint is due and create one if needed.

Source

pub fn force_checkpoint(&mut self) -> Result<(), StoreError>

Force an immediate checkpoint regardless of the interval.

Source

pub const fn has_checkpointing(&self) -> bool

Returns true if auto-checkpointing is enabled.

Source

pub fn enable_watermark_tracking(&mut self)

Enable per-source watermark tracking for this engine.

Source

pub fn register_watermark_source(&mut self, source: &str, max_ooo: Duration)

Register a source for watermark tracking with its max out-of-orderness.

Source

pub async fn advance_external_watermark( &mut self, source_context: &str, watermark_ms: i64, ) -> Result<(), EngineError>

Advance the watermark from an external source (async-runtime only).

Trait Implementations§

Source§

impl Debug for Engine

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<A, B, T> HttpServerConnExec<A, B> for T
where B: Body,