TitanRt
Model-first, typed reactive runtime for real-time systems. The runtime focuses on models that own their connectors/streams and publish/translate cheap, lock-free state snapshots (StateCell<T>). The control thread handles lifecycle (start/restart/hot-reload/shutdown/kill), back-pressure and cooperative cancellation; your model pulls typed events from its streams and pushes typed actions back.
- Predictable back-pressure via unified TX/RX traits.
- Typed boundaries between model, I/O adapters and state.
- Graceful, hierarchical cancellation and optional core pinning.
- Hot-reload of configuration, but logic is still in the model and user depended on.
- Health flags and lock-free state snapshots.
- When using the built-in connector, a model-side hook selects which events are emitted.
- This is related also to the state payload. It is user-defined: you choose
TforStateCell<T>.
Install
Add with Cargo (no version pinning; you always pull the latest):
# or, if you plan to use stream/connector APIs:
Quick start: a minimal model
BaseModel declares your config, output transport, event, and context types. The runtime drives
initialize → execute → stop, and you may handle external events via on_event and apply config updates via
hot_reload.
use Result;
use *; // config, model, runtime, io::base re-exports
use NullTx; // no-op output
// Events: use the provided unit marker
// use titanrt::model::NullEvent; // already in prelude via model::*
;
Driving the runtime (control-plane)
Use the control sender to post typed events or lifecycle/config commands:
use ;
use json;
let mut rt = spawn?;
// Start (if not auto-started)
rt.control_tx.try_send.ok;
// Send a typed event to your model
rt.control_tx.try_send.ok;
// Hot-reload configuration (your `hot_reload` receives the deserialized value)
rt.control_tx.try_send.ok;
// Graceful stop or full shutdown
rt.control_tx.try_send.ok;
rt.control_tx.try_send.ok;
// Block until the runtime thread finishes (or drop a guard for auto-shutdown)
rt.run_blocking ?;
The control thread also listens to OS termination signals (via signal-hook) and will request a cooperative shutdown.
Streams & connectors (optional feature)
Enable the connector feature to build typed streams (worker threads) managed by your model. The crate exposes:
connector::StreamDescriptor— describes a stream (venue/kind, channel bounds, core policy, initial health).connector::StreamRunner/StreamSpawner— spawn a worker with typed action/event channels,StateCell<S>,HealthFlagand a childCancelToken.connector::Stream— handle owned by the model:id,health,state,action_tx,event_rx, etc.
You write your own connector facade (implementing BaseConnector) that holds shared resources and spawns streams via
StreamSpawner. Your model keeps the Stream handle(s), drains events each execute() tick, and publishes actions as
needed.
Tip: use
CoreStatsandCorePickPolicyto pick CPU cores (round-robin, minimum threads, specific cores) for each stream worker.
Channels & back-pressure
A small, unified transport layer:
-
Traits:
BaseTx/BaseRx(+TxPairExt/RxPairExthelpers) givetry_send/try_recv, blockingsend/recvwithCancelToken+ optional timeouts, and draining (drain,drain_max). -
Implementations:
io::ringbuffer::RingBuffer— lock-free ring buffer (bounded).io::mpmc::MpmcChannel— Crossbeam MPMC (bounded).io::base::NullTx/NullRx— no-op ends for unit/empty flows.
Choose the channel per stream; the model only sees the BaseTx/BaseRx abstractions.
State & health
utils::StateCell<S>— lock-free snapshot cell with versioning (publish,load,seq), whereS: StateMarker( usually a small “view”).utils::HealthFlag— cheapAtomicBoolwrapped to avoid false sharing (up,down,get).utils::CancelToken— hierarchical cooperative cancellation (child(),cancel(),is_cancelled()).
Configuration & hot-reload
BaseModel::Configmust beClone + Send + serde::Deserialize.- The control plane accepts
CommandInput::HotReload(serde_json::Value); your model implementshot_reload(&Config)( default is a no-op).
Pinning & per-core stats
utils::CoreStats tracks per-core thread counts; CorePickPolicy lets you request minimum-threads, **round-robin
**, or specific core(s). The runtime can also pin its own control thread (RuntimeConfig::core_id).
Examples
A complete runnable example is included under example/ in the repository: a toy model + connector/stream demonstrating
actions, events, state snapshots, core policies and cancellation.
Feature flags
connector— enables connector/stream APIs (descriptors, spawners, runners, stream handle).
Documentation
API reference on docs.rs:
License
Dual-licensed under MIT or Apache-2.0.