# TitanRt
Model-first, **typed reactive runtime** for real-time systems. The runtime focuses on **models that own their connectors/streams** and publish/translate cheap, lock-free state snapshots (`StateCell<T>`). The control thread handles lifecycle (start/restart/hot-reload/shutdown/kill), back-pressure and cooperative cancellation; your model pulls typed events from its streams and pushes typed actions back.
* Predictable back-pressure via unified TX/RX traits.
* Typed boundaries between model, I/O adapters and state.
* Graceful, hierarchical cancellation and optional core pinning.
* Hot-reload of configuration, but logic is still in the model and user depended on.
* Health flags and lock-free state snapshots.
* When using the built-in connector, a model-side hook selects which events are emitted.
* This is related also to the state payload. It is **user-defined**: you choose `T` for `StateCell<T>`.
## Install
Add with Cargo (no version pinning; you always pull the latest):
```bash
cargo add titanrt
# or, if you plan to use stream/connector APIs:
cargo add titanrt --features connector
```
## Quick start: a minimal model
`BaseModel` declares your config, output transport, event, and context types. The runtime drives
`initialize → execute → stop`, and you may handle external events via `on_event` and apply config updates via
`hot_reload`.
```rust
use anyhow::Result;
use titanrt::prelude::*; // config, model, runtime, io::base re-exports
use titanrt::io::base::NullTx; // no-op output
// Events: use the provided unit marker
// use titanrt::model::NullEvent; // already in prelude via model::*
#[derive(Clone)]
struct AppCtx;
impl ModelContext for AppCtx {}
struct MyModel {
_cancel: CancelToken,
}
impl BaseModel for MyModel {
type Config = String; // any serde-deserializable type
type OutputTx = NullTx; // output channel (no-op here)
type Event = NullEvent; // typed external events
type Ctx = AppCtx;
fn initialize(
_ctx: AppCtx,
_cfg: String,
_reserved_core: Option<usize>,
_out: NullTx,
cancel: CancelToken,
) -> Result<Self> {
Ok(Self { _cancel: cancel })
}
fn execute(&mut self) -> ExecutionResult {
// single hot-loop tick; do useful work or drain streams here
ExecutionResult::Relax // yield (spin/yield/sleep backoff)
}
fn on_event(&mut self, _e: Self::Event) { /* handle external events from outside of the runtime */ }
fn stop(&mut self, _kind: StopKind) -> StopState {
// cancel streams, join workers, flush state, etc.
StopState::Done
}
fn hot_reload(&mut self, config: &Self::Config) -> Result<()> {
// reload configuration, apply changes, etc.
Ok(())
}
}
fn main() -> Result<()> {
let cfg = RuntimeConfig {
init_model_on_start: true,
core_id: None, // optionally pin runtime thread
max_inputs_pending: Some(1024), // control-plane ring capacity
max_inputs_drain: Some(64), // max inputs per drain pass
stop_model_timeout: Some(5), // seconds
};
let ctx = AppCtx;
let model_cfg = "hello".to_string();
let output_tx = NullOutputTx;
Runtime::<MyModel>::spawn(cfg, ctx, model_cfg, output_tx)?
.run_blocking()
}
```
## Driving the runtime (control-plane)
Use the control sender to post **typed events** or **lifecycle/config commands**:
```Rust
use titanrt::control::inputs::{Input, CommandInput};
use serde_json::json;
let mut rt = Runtime::<MyModel>::spawn(cfg, ctx, model_cfg, outputs)?;
// Start (if not auto-started)
rt.control_tx().try_send(Input::Command(CommandInput::Start)).ok();
// Send a typed event to your model
rt.control_tx().try_send(Input::Event(NullEvent)).ok();
// Hot-reload configuration (your `hot_reload` receives the deserialized value)
rt.control_tx().try_send(Input::Command(
CommandInput::HotReload(json!({"foo": 42})),
)).ok();
// Graceful stop or full shutdown
rt.control_tx().try_send(Input::Command(CommandInput::Stop)).ok();
rt.control_tx().try_send(Input::Command(CommandInput::Shutdown)).ok();
// Block until the runtime thread finishes (or drop a guard for auto-shutdown)
rt.run_blocking() ?;
```
The control thread also listens to OS termination signals (via `signal-hook`) and will request a cooperative shutdown.
## Streams & connectors (optional feature)
Enable the `connector` feature to build typed **streams** (worker threads) managed by your model. The crate exposes:
* `connector::StreamDescriptor` — describes a stream (venue/kind, channel bounds, core policy, initial health).
* `connector::StreamRunner`/`StreamSpawner` — spawn a worker with typed action/event channels, `StateCell<S>`,
`HealthFlag` and a child `CancelToken`.
* `connector::Stream` — handle owned by the model: `id`, `health`, `state`, `action_tx`, `event_rx`, etc.
You write your own connector facade (implementing `BaseConnector`) that holds shared resources and spawns streams via
`StreamSpawner`. Your model keeps the `Stream` handle(s), drains events each `execute()` tick, and publishes actions as
needed.
> Tip: use `CoreStats` and `CorePickPolicy` to pick CPU cores (round-robin, minimum threads, specific cores) for each
> stream worker.
## Channels & back-pressure
A small, unified transport layer:
* **Traits:** `BaseTx` / `BaseRx` (+ `TxPairExt` / `RxPairExt` helpers) give `try_send/try_recv`, blocking `send/recv`
with `CancelToken` + optional timeouts, and draining (`drain`, `drain_max`).
* **Implementations:**
* `io::ringbuffer::RingBuffer` — lock-free ring buffer (bounded).
* `io::mpmc::MpmcChannel` — Crossbeam MPMC (bounded).
* `io::base::NullTx`/`NullRx` — no-op ends for unit/empty flows.
Choose the channel per stream; the model only sees the `BaseTx/BaseRx` abstractions.
## State & health
* `utils::StateCell<S>` — lock-free snapshot cell with versioning (`publish`, `load`, `seq`), where `S: StateMarker` (
usually a small “view”).
* `utils::HealthFlag` — cheap `AtomicBool` wrapped to avoid false sharing (`up`, `down`, `get`).
* `utils::CancelToken` — hierarchical cooperative cancellation (`child()`, `cancel()`, `is_cancelled()`).
## Configuration & hot-reload
* `BaseModel::Config` must be `Clone + Send + serde::Deserialize`.
* The control plane accepts `CommandInput::HotReload(serde_json::Value)`; your model implements `hot_reload(&Config)` (
default is a no-op).
## Pinning & per-core stats
`utils::CoreStats` tracks per-core thread counts; `CorePickPolicy` lets you request **minimum-threads**, **round-robin
**, or **specific core(s)**. The runtime can also pin its own control thread (`RuntimeConfig::core_id`).
## Examples
A complete runnable example is included under `example/` in the repository: a toy model + connector/stream demonstrating
actions, events, state snapshots, core policies and cancellation.
## Feature flags
* `connector` — enables connector/stream APIs (descriptors, spawners, runners, stream handle).
## Documentation
API reference on **docs.rs**:
* [https://docs.rs/titanrt](https://docs.rs/titanrt)
## License
Dual-licensed under **MIT** or **Apache-2.0**.
---