pub struct Engine { /* private fields */ }Expand description
The main Varpulis engine
Implementations§
Source§impl Engine
impl Engine
Sourcepub async fn process(&mut self, event: Event) -> Result<(), EngineError>
pub async fn process(&mut self, event: Event) -> Result<(), EngineError>
Process an incoming event (async-runtime only).
Process a pre-wrapped SharedEvent (async-runtime only).
Sourcepub async fn process_batch(
&mut self,
events: Vec<Event>,
) -> Result<(), EngineError>
pub async fn process_batch( &mut self, events: Vec<Event>, ) -> Result<(), EngineError>
Process a batch of events for improved throughput (async-runtime only).
Sourcepub fn process_batch_sync(
&mut self,
events: Vec<Event>,
) -> Result<(), EngineError>
pub fn process_batch_sync( &mut self, events: Vec<Event>, ) -> Result<(), EngineError>
Synchronous batch processing for maximum throughput. Use this when no .to() sink operations are in the pipeline (e.g., benchmark mode). Avoids async runtime overhead completely.
Process a batch of pre-wrapped SharedEvents (async-runtime only).
Sourcepub async fn flush_expired_sessions(&mut self) -> Result<(), EngineError>
pub async fn flush_expired_sessions(&mut self) -> Result<(), EngineError>
Flush all expired session windows (async-runtime only).
Source§impl Engine
impl Engine
Sourcepub fn builder() -> EngineBuilder
pub fn builder() -> EngineBuilder
Create an EngineBuilder for fluent engine construction (async-runtime only).
Sourcepub fn new(output_tx: Sender<Event>) -> Self
pub fn new(output_tx: Sender<Event>) -> Self
Create engine with an owned output channel (async-runtime only).
Sourcepub fn new_benchmark() -> Self
pub fn new_benchmark() -> Self
Create engine without output channel (for benchmarking - skips Event cloning overhead). Available in both async and WASM builds.
Sourcepub fn new_with_optional_output(output_tx: Option<Sender<Event>>) -> Self
pub fn new_with_optional_output(output_tx: Option<Sender<Event>>) -> Self
Create engine with optional output channel (async-runtime only, legacy API).
Create engine with zero-copy SharedEvent output channel (async-runtime only).
Sourcepub fn set_credentials_store(&mut self, store: Arc<CredentialsStore>)
pub fn set_credentials_store(&mut self, store: Arc<CredentialsStore>)
Set the connector credentials store for profile resolution (async-runtime only).
Sourcepub fn new_sync() -> Self
pub fn new_sync() -> Self
Constructor for sync-only / WASM builds (no tokio, no output channel).
Outputs are collected internally and returned via process_batch_sync_collect().
Sourcepub fn process_batch_sync_collect(
&mut self,
events: Vec<Event>,
) -> Result<Vec<Event>, EngineError>
pub fn process_batch_sync_collect( &mut self, events: Vec<Event>, ) -> Result<Vec<Event>, EngineError>
Process events synchronously and return collected outputs directly. Used by WASM builds and tests where mpsc channels are not needed.
Sourcepub fn set_context_name(&mut self, name: &str)
pub fn set_context_name(&mut self, name: &str)
Set the context name for this engine instance.
Sourcepub fn set_topic_prefix(&mut self, prefix: &str)
pub fn set_topic_prefix(&mut self, prefix: &str)
Set a topic prefix for multi-tenant isolation (call before load()).
When set, all Kafka and MQTT topic names will be prefixed with
{prefix}. to enforce per-tenant topic isolation.
Sourcepub fn set_trace_enabled(&mut self, enabled: bool)
pub fn set_trace_enabled(&mut self, enabled: bool)
Enable or disable pipeline trace collection.
When enabled, the engine records which streams each event is routed to,
which operators pass or block the event, SASE pattern state, and emitted
output events. Use drain_trace to retrieve entries.
Sourcepub fn is_trace_enabled(&self) -> bool
pub fn is_trace_enabled(&self) -> bool
Whether pipeline trace collection is currently enabled.
Sourcepub fn drain_trace(&mut self) -> Vec<TraceEntry>
pub fn drain_trace(&mut self) -> Vec<TraceEntry>
Drain all collected trace entries since the last drain.
Sourcepub fn set_dlq_path(&mut self, path: PathBuf)
pub fn set_dlq_path(&mut self, path: PathBuf)
Set a custom DLQ file path (call before load()).
Sourcepub const fn set_dlq_config(&mut self, config: DlqConfig)
pub const fn set_dlq_config(&mut self, config: DlqConfig)
Set custom DLQ configuration (call before load()).
Sourcepub const fn dlq(&self) -> Option<&Arc<DeadLetterQueue>>
pub const fn dlq(&self) -> Option<&Arc<DeadLetterQueue>>
Access the shared DLQ instance (created during load()).
Sourcepub fn get_pattern(&self, name: &str) -> Option<&NamedPattern>
pub fn get_pattern(&self, name: &str) -> Option<&NamedPattern>
Get a named pattern by name
Sourcepub const fn patterns(&self) -> &FxHashMap<String, NamedPattern>
pub const fn patterns(&self) -> &FxHashMap<String, NamedPattern>
Get all registered patterns
Sourcepub fn get_config(&self, name: &str) -> Option<&EngineConfig>
pub fn get_config(&self, name: &str) -> Option<&EngineConfig>
Get a configuration block by name
Sourcepub fn get_connector(&self, name: &str) -> Option<&ConnectorConfig>
pub fn get_connector(&self, name: &str) -> Option<&ConnectorConfig>
Get a declared connector by name (async-runtime only).
Sourcepub const fn connector_configs(&self) -> &FxHashMap<String, ConnectorConfig>
pub const fn connector_configs(&self) -> &FxHashMap<String, ConnectorConfig>
Get all declared connector configs (async-runtime only).
Sourcepub fn source_bindings(&self) -> &[SourceBinding]
pub fn source_bindings(&self) -> &[SourceBinding]
Get source connector bindings from .from() declarations
Sourcepub fn get_variable(&self, name: &str) -> Option<&Value>
pub fn get_variable(&self, name: &str) -> Option<&Value>
Get a variable value by name
Sourcepub fn set_variable(
&mut self,
name: &str,
value: Value,
) -> Result<(), EngineError>
pub fn set_variable( &mut self, name: &str, value: Value, ) -> Result<(), EngineError>
Set a variable value (must be mutable or new)
Sourcepub const fn variables(&self) -> &FxHashMap<String, Value>
pub const fn variables(&self) -> &FxHashMap<String, Value>
Get all variables (for debugging/testing)
Sourcepub const fn context_map(&self) -> &ContextMap
pub const fn context_map(&self) -> &ContextMap
Get the context map (for orchestrator setup, async-runtime only).
Sourcepub fn has_contexts(&self) -> bool
pub fn has_contexts(&self) -> bool
Check if the loaded program declares any contexts (async-runtime only).
Sourcepub fn with_metrics(self, metrics: Metrics) -> Self
pub fn with_metrics(self, metrics: Metrics) -> Self
Enable Prometheus metrics (async-runtime only).
Sourcepub fn add_filter<F>(
&mut self,
stream_name: &str,
filter: F,
) -> Result<(), EngineError>
pub fn add_filter<F>( &mut self, stream_name: &str, filter: F, ) -> Result<(), EngineError>
Add a programmatic filter to a stream using a closure
Sourcepub fn load_with_source(
&mut self,
source: &str,
program: &Program,
) -> Result<(), EngineError>
pub fn load_with_source( &mut self, source: &str, program: &Program, ) -> Result<(), EngineError>
Load a program into the engine with semantic validation.
Sourcepub fn load(&mut self, program: &Program) -> Result<(), EngineError>
pub fn load(&mut self, program: &Program) -> Result<(), EngineError>
Load a program into the engine (no semantic validation).
Sourcepub async fn connect_sinks(&self) -> Result<(), EngineError>
pub async fn connect_sinks(&self) -> Result<(), EngineError>
Connect all sinks that require explicit connection (async-runtime only).
Sourcepub fn inject_sink(&mut self, key: &str, sink: Arc<dyn Sink>)
pub fn inject_sink(&mut self, key: &str, sink: Arc<dyn Sink>)
Inject a pre-built sink into the engine’s registry (async-runtime only).
Sourcepub fn has_sink(&self, key: &str) -> bool
pub fn has_sink(&self, key: &str) -> bool
Check whether a given key has a registered sink (async-runtime only).
Sourcepub fn sink_keys_for_connector(&self, connector_name: &str) -> Vec<String>
pub fn sink_keys_for_connector(&self, connector_name: &str) -> Vec<String>
Return all sink keys that belong to a given connector name (async-runtime only).
Sourcepub fn has_sink_operations(&self) -> bool
pub fn has_sink_operations(&self) -> bool
Check if any registered stream uses .to() or .enrich() operations.
Sourcepub const fn event_counters(&self) -> (u64, u64)
pub const fn event_counters(&self) -> (u64, u64)
Returns (events_in, events_out) counters for this engine.
Sourcepub fn is_stateless(&self) -> bool
pub fn is_stateless(&self) -> bool
Check if all streams are stateless (safe for round-robin distribution).
Sourcepub fn partition_key(&self) -> Option<String>
pub fn partition_key(&self) -> Option<String>
Return the partition key used by partitioned operations, if any.
Sourcepub fn has_session_windows(&self) -> bool
pub fn has_session_windows(&self) -> bool
Check if any registered stream has session windows.
Sourcepub fn min_session_gap(&self) -> Option<Duration>
pub fn min_session_gap(&self) -> Option<Duration>
Return the smallest session gap across all streams (used as sweep interval).
Sourcepub fn metrics(&self) -> EngineMetrics
pub fn metrics(&self) -> EngineMetrics
Get metrics
Sourcepub fn stream_names(&self) -> Vec<&str>
pub fn stream_names(&self) -> Vec<&str>
Get the names of all loaded streams.
Sourcepub fn physical_plan_summary(&self) -> Option<String>
pub fn physical_plan_summary(&self) -> Option<String>
Get the physical plan summary (available after load()).
pub fn explain(&self, program: &Program) -> Result<String, EngineError>
Sourcepub fn topology(&self) -> Topology
pub fn topology(&self) -> Topology
Build a topology snapshot of the currently loaded streams and routes.
Sourcepub fn get_function(&self, name: &str) -> Option<&UserFunction>
pub fn get_function(&self, name: &str) -> Option<&UserFunction>
Get a user-defined function by name
Sourcepub fn function_names(&self) -> Vec<&str>
pub fn function_names(&self) -> Vec<&str>
Get all registered function names
Sourcepub fn get_timers(&self) -> Vec<(u64, Option<u64>, String)>
pub fn get_timers(&self) -> Vec<(u64, Option<u64>, String)>
Get all timer configurations for spawning timer tasks
Sourcepub fn register_scalar_udf(&mut self, udf: Arc<dyn ScalarUDF>)
pub fn register_scalar_udf(&mut self, udf: Arc<dyn ScalarUDF>)
Register a native scalar UDF.
Sourcepub fn register_aggregate_udf(&mut self, udf: Arc<dyn AggregateUDF>)
pub fn register_aggregate_udf(&mut self, udf: Arc<dyn AggregateUDF>)
Register a native aggregate UDF.
Sourcepub const fn udf_registry(&self) -> &UdfRegistry
pub const fn udf_registry(&self) -> &UdfRegistry
Get a reference to the UDF registry.
Sourcepub fn reload(&mut self, program: &Program) -> Result<ReloadReport, EngineError>
pub fn reload(&mut self, program: &Program) -> Result<ReloadReport, EngineError>
Reload program without losing state where possible (async-runtime only).
Sourcepub fn create_checkpoint(&self) -> EngineCheckpoint
pub fn create_checkpoint(&self) -> EngineCheckpoint
Create a checkpoint of the engine state (windows, SASE engines, joins, variables).
Sourcepub fn restore_checkpoint(
&mut self,
cp: &EngineCheckpoint,
) -> Result<(), StoreError>
pub fn restore_checkpoint( &mut self, cp: &EngineCheckpoint, ) -> Result<(), StoreError>
Restore engine state from a checkpoint.
Sourcepub fn enable_checkpointing(
&mut self,
store: Arc<dyn StateStore>,
config: CheckpointConfig,
) -> Result<(), StoreError>
pub fn enable_checkpointing( &mut self, store: Arc<dyn StateStore>, config: CheckpointConfig, ) -> Result<(), StoreError>
Enable auto-checkpointing with the given store and config.
Sourcepub fn checkpoint_tick(&mut self) -> Result<(), StoreError>
pub fn checkpoint_tick(&mut self) -> Result<(), StoreError>
Check if a checkpoint is due and create one if needed.
Sourcepub fn force_checkpoint(&mut self) -> Result<(), StoreError>
pub fn force_checkpoint(&mut self) -> Result<(), StoreError>
Force an immediate checkpoint regardless of the interval.
Sourcepub const fn has_checkpointing(&self) -> bool
pub const fn has_checkpointing(&self) -> bool
Returns true if auto-checkpointing is enabled.
Sourcepub fn enable_watermark_tracking(&mut self)
pub fn enable_watermark_tracking(&mut self)
Enable per-source watermark tracking for this engine.
Sourcepub fn register_watermark_source(&mut self, source: &str, max_ooo: Duration)
pub fn register_watermark_source(&mut self, source: &str, max_ooo: Duration)
Register a source for watermark tracking with its max out-of-orderness.
Sourcepub async fn advance_external_watermark(
&mut self,
source_context: &str,
watermark_ms: i64,
) -> Result<(), EngineError>
pub async fn advance_external_watermark( &mut self, source_context: &str, watermark_ms: i64, ) -> Result<(), EngineError>
Advance the watermark from an external source (async-runtime only).
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Engine
impl !RefUnwindSafe for Engine
impl Send for Engine
impl Sync for Engine
impl Unpin for Engine
impl UnsafeUnpin for Engine
impl !UnwindSafe for Engine
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more