Module runtime

Module runtime 

Source
Expand description

High-performance graph-based stream processing runtime.

The runtime module provides the core execution engine for wavelet’s cooperative stream processing model. Built around a computation graph where nodes represent stream processors and edges define data dependencies, the runtime delivers deterministic, low-latency execution without the overhead of async runtimes or actor systems.

§Architecture Overview

§Computation Model

  • Nodes: Stateful stream processors that transform data
  • Relationships: Define when nodes should execute (Trigger vs Observe)
  • Cooperative scheduling: Nodes voluntarily yield control after processing
  • Dependency ordering: Execution follows graph topology (depth-first, scheduled first)
  • Incremental computation: Only recompute when dependencies actually change

§Core Components

§Executor

The central execution engine that orchestrates:

  • Graph topology management and node lifecycle
  • Event-driven scheduling (I/O, timers, yields)
  • Dependency-ordered execution cycles
  • Garbage collection and resource cleanup

§Node<T> and NodeBuilder<T>

Type-safe containers for node state with controlled mutation:

  • Data-oriented design: Separate data (T) from behavior (cycle functions)
  • Controlled mutation: Data changes only within cycle functions
  • Builder pattern: Fluent API for configuring relationships and lifecycle

§Runtime<C>

Complete runtime orchestration combining:

  • Clock abstraction: Consistent time across execution cycles
  • Execution modes: Different CPU/latency trade-offs (Spin, Park)
  • Runtime loops: Automated execution patterns for different use cases

§Event System

Unified event handling for external stimulus:

  • I/O Events: Network sockets, file handles, external notifications
  • Timer Events: Time-based scheduling with precise expiration
  • Yield Events: Immediate re-scheduling for continued processing

§Design Principles

§Single-Threaded Cooperative Model

  • Predictable performance: No hidden thread spawning or context switching
  • Deterministic execution: Same inputs always produce the same execution order
  • Zero-cost abstractions: Direct function calls without async overhead
  • Resource control: Explicit management of CPU, memory, and I/O

§Data-Oriented Design

  • Type safety: Compile-time guarantees about node data types
  • Memory efficiency: Minimal indirection and cache-friendly layouts
  • Controlled mutation: Runtime coordinates when and how data changes
  • Clear ownership: Data lifecycle tied to node lifecycle

§Event-Driven Execution

  • External integration: Clean interfaces to operating system events
  • Backpressure handling: Natural flow control through graph topology
  • Resource efficiency: Sleep when no work is available
  • Low latency: Direct event dispatch without queueing overhead

§Usage Patterns

§Basic Stream Processing

use wavelet::runtime::*;

let mut executor = Executor::new();

// Create data source
let data_source = NodeBuilder::new(DataSource::new())
    .on_init(|executor, _, idx| {
        executor.yield_driver().yield_now(idx); // Start processing
    })
    .build(&mut executor, |source, ctx| {
        if let Some(data) = source.poll_data() {
            source.latest = data;
            Control::Broadcast // Notify downstream
        } else {
            Control::Unchanged
        }
    });

// Create processor that reacts to data
let processor = NodeBuilder::new(Processor::new())
    .triggered_by(&data_source)
    .build(&mut executor, |proc, ctx| {
        proc.process_data();
        Control::Unchanged
    });

// Run the graph
let runtime = RealtimeRuntime::new(ExecutionMode::Park);
runtime.run_forever();

§I/O Integration

let (network_node, notifier) = NodeBuilder::new(NetworkHandler::new())
    .build_with_notifier(&mut executor, |handler, ctx| {
        match handler.socket.try_read(&mut handler.buffer) {
            Ok(0) => Control::Sweep, // Connection closed
            Ok(n) => {
                handler.process_bytes(n);
                Control::Broadcast
            }
            Err(e) if e.kind() == ErrorKind::WouldBlock => {
                // Re-register for readiness
                handler.reregister_interest(ctx);
                Control::Unchanged
            }
            Err(_) => Control::Sweep, // Connection error
        }
    })?;

// External thread can wake the network node
notifier.notify()?;

§Dynamic Graph Construction

let spawner = NodeBuilder::new(DynamicSpawner::new())
    .build(&mut executor, |spawner, ctx| {
        if spawner.should_create_worker() {
            ctx.spawn_subgraph(|executor| {
                let worker = NodeBuilder::new(Worker::new())
                    .triggered_by(&spawner.work_queue)
                    .build(executor, process_work);
            });
        }
        Control::Unchanged
    });

§Performance Characteristics

  • Latency: Sub-microsecond node execution overhead
  • Throughput: Millions of events per second on modern hardware
  • Memory: Predictable allocation patterns, minimal runtime overhead
  • CPU: Efficient utilization with configurable sleep/spin strategies
  • Determinism: Consistent performance across runs with same inputs

§Target Applications

The runtime excels in domains requiring:

  • Financial systems: Low-latency trading, risk management, market data
  • Real-time analytics: Live dashboards, alerting, stream aggregation
  • IoT processing: Sensor data, device management, edge computing
  • Protocol handling: Stateful network protocols, message parsing
  • Media processing: Audio/video pipelines, real-time effects

For request/response workloads or applications requiring automatic parallelism, consider using async runtimes like Tokio alongside wavelet for the appropriate components of your system.

Re-exports§

pub use clock::*;
pub use event_driver::*;
pub use executor::*;
pub use graph::*;
pub use node::*;

Modules§

clock
event_driver
executor
graph
node

Structs§

Runtime
A complete runtime instance that combines executor, clock, and execution mode.
Scheduler
SchedulerError

Enums§

ExecutionMode
Execution mode for the real-time runtime.

Type Aliases§

HistoricalRuntime
RealtimeRuntime
TestRuntime