# Tito
A transactional outbox with partitioned pub/sub and scheduled event processing, built on TiKV.
## What It Does
When you save data, events are written in the same transaction. Workers pick them up later. No dual-write problem, no lost messages.
```
┌─────────────────────────────────────────────────────────────────┐
│ Your App │
│ │
│ model.create(user) ──► TiKV Transaction: │
│ 1. Write user data │
│ 2. Write event (same tx) │
│ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ TiKV │
│ │
│ data:user:123 ─────────────► { id, name, email } │
│ event:user:0001:1732541234:abc ──► { entity, action, ... } │
│ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Workers │
│ │
│ Worker 1 (partition 0) ──► Pulls events, processes, acks │
│ Worker 2 (partition 1) ──► Pulls events, processes, acks │
│ Worker N (partition N) ──► ... │
│ │
└─────────────────────────────────────────────────────────────────┘
```
## Features
- **Transactional Outbox**: Events written atomically with data
- **Horizontal Partitioning**: Scale by adding partitions and workers
- **Scheduled Processing**: Set `timestamp` for when events should fire
- **Pub/Sub Semantics**: Workers subscribe to event types
- **Checkpointed**: Resume from where you left off after crashes
## Quick Start
### Define a Model
```rust
#[derive(Default, Clone, Serialize, Deserialize)]
struct User {
id: String,
name: String,
email: String,
}
impl TitoModelTrait for User {
fn id(&self) -> String {
self.id.clone()
}
fn table(&self) -> String {
"user".to_string()
}
fn indexes(&self) -> Vec<TitoIndexConfig> {
vec![TitoIndexConfig {
condition: true,
name: "by_email".to_string(),
fields: vec![TitoIndexField {
name: "email".to_string(),
r#type: TitoIndexBlockType::String,
}],
}]
}
fn events(&self) -> Vec<TitoEventConfig> {
let now = chrono::Utc::now().timestamp();
vec![
TitoEventConfig {
name: "user".to_string(),
timestamp: now,
},
TitoEventConfig {
name: "analytics".to_string(),
timestamp: now,
},
]
}
}
```
### Create Data (Events Generated Automatically)
```rust
let db = TiKVBackend::connect(vec!["127.0.0.1:2379"]).await?;
let user_model = db.clone().model::<User>();
user,
TitoOptions::with_events(TitoOperation::Insert),
&tx,
).await
}).await?;
```
### Run Workers
```rust
let queue = Arc::new(TitoQueue { engine: db });
let partition_config = PartitionConfig::new(4, 0);
run_worker(
queue,
"user".to_string(),
|event| async move {
println!("Processing: {} {}", event.action, event.entity);
Ok(())
}.boxed(),
partition_config,
is_leader,
shutdown_rx,
).await;
```
## Event Key Format
```
event:{type}:{partition}:{timestamp}:{uuid}
```
- `type`: Event name from `events()` (e.g., "user", "order")
- `partition`: 4-digit partition number (0000-9999)
- `timestamp`: Unix timestamp for scheduled processing
- `uuid`: Unique event ID
## Scaling
Add partitions and workers. Each worker owns one partition:
```rust
// Worker 1
PartitionConfig::new(4, 0) // 4 total, owns partition 0
// Worker 2
PartitionConfig::new(4, 1) // 4 total, owns partition 1
// Worker 3
PartitionConfig::new(4, 2) // 4 total, owns partition 2
// Worker 4
PartitionConfig::new(4, 3) // 4 total, owns partition 3
```
Need more throughput? Increase `total_partitions` and add workers.
## Scheduled Events
Set `timestamp` in the future for delayed processing:
```rust
fn events(&self) -> Vec<TitoEventConfig> {
let in_one_hour = chrono::Utc::now().timestamp() + 3600;
vec![TitoEventConfig {
name: "reminder".to_string(),
timestamp: in_one_hour,
}]
}
```
Workers only process events where `timestamp <= now`.
## Requirements
- Rust 2021+
- TiKV cluster
## License
Apache-2.0