tito 0.2.1

A flexible database layer with powerful indexing strategies and relationship modeling, supporting multiple backends
Documentation

Tito

A transactional outbox with partitioned pub/sub and scheduled event processing, built on TiKV.

What It Does

When you save data, events are written in the same transaction. Workers pick them up later. No dual-write problem, no lost messages.

┌─────────────────────────────────────────────────────────────────┐
│                         Your App                                │
│                                                                 │
│   model.create(user)  ──►  TiKV Transaction:                   │
│                              1. Write user data                 │
│                              2. Write event (same tx)           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌─────────────────────────────────────────────────────────────────┐
│                          TiKV                                   │
│                                                                 │
│   data:user:123 ─────────────► { id, name, email }             │
│   event:user:0001:1732541234:abc ──► { entity, action, ... }   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌─────────────────────────────────────────────────────────────────┐
│                        Workers                                  │
│                                                                 │
│   Worker 1 (partition 0) ──► Pulls events, processes, acks     │
│   Worker 2 (partition 1) ──► Pulls events, processes, acks     │
│   Worker N (partition N) ──► ...                                │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Features

  • Transactional Outbox: Events written atomically with data
  • Horizontal Partitioning: Scale by adding partitions and workers
  • Scheduled Processing: Set timestamp for when events should fire
  • Pub/Sub Semantics: Workers subscribe to event types
  • Checkpointed: Resume from where you left off after crashes

Quick Start

Define a Model

#[derive(Default, Clone, Serialize, Deserialize)]
struct User {
    id: String,
    name: String,
    email: String,
}

impl TitoModelTrait for User {
    fn id(&self) -> String {
        self.id.clone()
    }

    fn table(&self) -> String {
        "user".to_string()
    }

    fn indexes(&self) -> Vec<TitoIndexConfig> {
        vec![TitoIndexConfig {
            condition: true,
            name: "by_email".to_string(),
            fields: vec![TitoIndexField {
                name: "email".to_string(),
                r#type: TitoIndexBlockType::String,
            }],
        }]
    }

    fn events(&self) -> Vec<TitoEventConfig> {
        let now = chrono::Utc::now().timestamp();
        vec![
            TitoEventConfig {
                name: "user".to_string(),
                timestamp: now,
            },
            TitoEventConfig {
                name: "analytics".to_string(),
                timestamp: now,
            },
        ]
    }
}

Create Data (Events Generated Automatically)

let db = TiKVBackend::connect(vec!["127.0.0.1:2379"]).await?;
let user_model = db.clone().model::<User>();

db.transaction(|tx| async move {
    user_model.build_with_options(
        user,
        TitoOptions::with_events(TitoOperation::Insert),
        &tx,
    ).await
}).await?;

Run Workers

let queue = Arc::new(TitoQueue { engine: db });
let partition_config = PartitionConfig::new(4, 0);

run_worker(
    queue,
    "user".to_string(),
    |event| async move {
        println!("Processing: {} {}", event.action, event.entity);
        Ok(())
    }.boxed(),
    partition_config,
    is_leader,
    shutdown_rx,
).await;

Event Key Format

event:{type}:{partition}:{timestamp}:{uuid}
  • type: Event name from events() (e.g., "user", "order")
  • partition: 4-digit partition number (0000-9999)
  • timestamp: Unix timestamp for scheduled processing
  • uuid: Unique event ID

Scaling

Add partitions and workers. Each worker owns one partition:

// Worker 1
PartitionConfig::new(4, 0)  // 4 total, owns partition 0

// Worker 2
PartitionConfig::new(4, 1)  // 4 total, owns partition 1

// Worker 3
PartitionConfig::new(4, 2)  // 4 total, owns partition 2

// Worker 4
PartitionConfig::new(4, 3)  // 4 total, owns partition 3

Need more throughput? Increase total_partitions and add workers.

Scheduled Events

Set timestamp in the future for delayed processing:

fn events(&self) -> Vec<TitoEventConfig> {
    let in_one_hour = chrono::Utc::now().timestamp() + 3600;
    vec![TitoEventConfig {
        name: "reminder".to_string(),
        timestamp: in_one_hour,
    }]
}

Workers only process events where timestamp <= now.

Requirements

  • Rust 2021+
  • TiKV cluster

License

Apache-2.0