rillflow 0.1.0-alpha.4

Rillflow — a lightweight document + event store for Rust, powered by Postgres.
Documentation
use clap::{ArgAction, Parser, Subcommand};
use rillflow::projection_runtime::{ProjectionDaemon, ProjectionWorkerConfig};
use rillflow::subscriptions::{SubscriptionFilter, SubscriptionOptions, Subscriptions};
use rillflow::{SchemaConfig, Store, TenancyMode, TenantSchema};
use sqlx::Row;
use uuid::Uuid;

#[derive(Parser, Debug)]
#[command(name = "rillflow", version, about = "Rillflow CLI")]
struct Cli {
    /// Postgres connection string. Falls back to DATABASE_URL.
    #[arg(long)]
    database_url: Option<String>,

    /// Base schema to manage (default: public)
    #[arg(long, default_value = "public")]
    schema: String,

    /// Additional tenant schemas (repeatable)
    #[arg(long = "tenant-schema", action = ArgAction::Append)]
    tenant_schemas: Vec<String>,

    #[command(subcommand)]
    command: Commands,
}

#[derive(Subcommand, Debug)]
enum Commands {
    /// Show planned DDL changes without applying
    SchemaPlan,

    /// Apply DDL changes (create schemas/tables/indexes as needed)
    SchemaSync,

    /// Projection admin commands
    #[command(subcommand)]
    Projections(ProjectionsCmd),

    /// Subscriptions admin commands
    #[command(subcommand)]
    Subscriptions(SubscriptionsCmd),
}

#[derive(Subcommand, Debug)]
enum ProjectionsCmd {
    /// List projections and their status
    List,
    /// Show status of a single projection
    Status { name: String },
    /// Pause a projection
    Pause { name: String },
    /// Resume a projection
    Resume { name: String },
    /// Reset checkpoint to a specific sequence
    ResetCheckpoint { name: String, seq: i64 },
    /// Rebuild (reset to 0 and clear DLQ)
    Rebuild { name: String },
    /// Run a single processing tick for one projection (by name) or all registered if omitted
    RunOnce { name: Option<String> },
    /// Run until idle (no projection has work)
    RunUntilIdle { name: Option<String> },
    /// Dead Letter Queue: list recent failures
    DlqList {
        name: String,
        #[arg(long, default_value_t = 50)]
        limit: i64,
    },
    /// Dead Letter Queue: requeue one item by id (sets checkpoint to id's seq - 1)
    DlqRequeue { name: String, id: i64 },
    /// Dead Letter Queue: delete one item by id
    DlqDelete { name: String, id: i64 },
}

#[derive(Subcommand, Debug)]
enum SubscriptionsCmd {
    /// Create or update a subscription checkpoint and filter
    Create {
        name: String,
        #[arg(long, action = ArgAction::Append)]
        event_type: Vec<String>,
        #[arg(long, action = ArgAction::Append)]
        stream_id: Vec<String>,
        #[arg(long, default_value_t = 0)]
        start_from: i64,
    },
    /// List subscriptions and checkpoints
    List,
    /// Show a subscription status
    Status { name: String },
    /// Pause a subscription
    Pause { name: String },
    /// Resume a subscription
    Resume { name: String },
    /// Reset checkpoint to a specific sequence
    Reset { name: String, seq: i64 },
    /// Tail a subscription (prints incoming events)
    Tail {
        name: String,
        #[arg(long, default_value_t = 10)]
        limit: usize,
        /// Optional consumer group name
        #[arg(long)]
        group: Option<String>,
    },
}

#[tokio::main]
async fn main() -> rillflow::Result<()> {
    let cli = Cli::parse();

    let url = match cli
        .database_url
        .or_else(|| std::env::var("DATABASE_URL").ok())
    {
        Some(u) => u,
        None => {
            eprintln!("error: --database-url or env DATABASE_URL is required");
            std::process::exit(2);
        }
    };

    let tenancy_mode = if cli.tenant_schemas.is_empty() {
        TenancyMode::SingleTenant
    } else {
        TenancyMode::SchemaPerTenant {
            tenants: cli
                .tenant_schemas
                .iter()
                .map(|s| TenantSchema::new(s.trim()))
                .collect(),
        }
    };

    let config = SchemaConfig {
        base_schema: cli.schema,
        tenancy_mode,
    };

    let store = Store::connect(&url).await?;
    let mgr = store.schema();

    match cli.command {
        Commands::SchemaPlan => {
            let plan = mgr.plan(&config).await?;
            print_plan(&plan);
        }
        Commands::SchemaSync => {
            let plan = mgr.sync(&config).await?;
            if plan.is_empty() {
                println!("No changes needed.");
            } else {
                println!("Applied changes:");
                print_plan(&plan);
            }
        }
        Commands::Projections(cmd) => {
            let daemon =
                ProjectionDaemon::new(store.pool().clone(), ProjectionWorkerConfig::default());
            match cmd {
                ProjectionsCmd::List => {
                    let list = daemon.list().await?;
                    for s in list {
                        println!(
                            "{}  last_seq={}  paused={}  leased_by={:?}  lease_until={:?}  backoff_until={:?}  dlq_count={}",
                            s.name,
                            s.last_seq,
                            s.paused,
                            s.leased_by,
                            s.lease_until,
                            s.backoff_until,
                            s.dlq_count
                        );
                    }
                }
                ProjectionsCmd::Status { name } => {
                    let s = daemon.status(&name).await?;
                    println!(
                        "{}  last_seq={}  paused={}  leased_by={:?}  lease_until={:?}  backoff_until={:?}  dlq_count={}",
                        s.name,
                        s.last_seq,
                        s.paused,
                        s.leased_by,
                        s.lease_until,
                        s.backoff_until,
                        s.dlq_count
                    );
                }
                ProjectionsCmd::Pause { name } => {
                    daemon.pause(&name).await?;
                    println!("paused {}", { name });
                }
                ProjectionsCmd::Resume { name } => {
                    daemon.resume(&name).await?;
                    println!("resumed {}", { name });
                }
                ProjectionsCmd::ResetCheckpoint { name, seq } => {
                    daemon.reset_checkpoint(&name, seq).await?;
                    println!("reset {} to {}", name, seq);
                }
                ProjectionsCmd::Rebuild { name } => {
                    daemon.rebuild(&name).await?;
                    println!("rebuild scheduled for {}", name);
                }
                ProjectionsCmd::RunOnce { name } => {
                    if let Some(n) = name {
                        let res = daemon.tick_once(&n).await?;
                        println!("{}: {:?}", n, res);
                    } else {
                        daemon.tick_all_once().await?;
                        println!("tick-all executed");
                    }
                }
                ProjectionsCmd::RunUntilIdle { name } => {
                    if let Some(n) = name {
                        // run only this projection until idle
                        loop {
                            let res = daemon.tick_once(&n).await?;
                            match res {
                                rillflow::projection_runtime::TickResult::Processed { count }
                                    if count > 0 => {}
                                _ => break,
                            }
                        }
                        println!("{}: idle", n);
                    } else {
                        daemon.run_until_idle().await?;
                        println!("all projections idle");
                    }
                }
                ProjectionsCmd::DlqList { name, limit } => {
                    let items = daemon.dlq_list(&name, limit).await?;
                    for i in items {
                        println!(
                            "id={} seq={} type={} failed_at={} error={}",
                            i.id, i.seq, i.event_type, i.failed_at, i.error
                        );
                    }
                }
                ProjectionsCmd::DlqRequeue { name, id } => {
                    daemon.dlq_requeue(&name, id).await?;
                    println!("requeued {}:{}", name, id);
                }
                ProjectionsCmd::DlqDelete { name, id } => {
                    daemon.dlq_delete(&name, id).await?;
                    println!("deleted {}:{}", name, id);
                }
            }
        }
        Commands::Subscriptions(cmd) => {
            let subs = Subscriptions::new(store.pool().clone());
            match cmd {
                SubscriptionsCmd::Create {
                    name,
                    event_type,
                    stream_id,
                    start_from,
                } => {
                    let ids: Vec<Uuid> = stream_id
                        .into_iter()
                        .filter_map(|s| Uuid::parse_str(&s).ok())
                        .collect();
                    let filter = SubscriptionFilter {
                        event_types: if event_type.is_empty() {
                            None
                        } else {
                            Some(event_type)
                        },
                        stream_ids: if ids.is_empty() { None } else { Some(ids) },
                        stream_prefix: None,
                    };
                    subs.create_or_update(&name, &filter, start_from).await?;
                    println!("subscription '{}' upserted (from={})", name, start_from);
                }
                SubscriptionsCmd::List => {
                    let rows = sqlx::query(
                        "select name, last_seq, paused, backoff_until, filter from subscriptions order by name",
                    )
                    .fetch_all(store.pool())
                    .await?;
                    for r in rows {
                        let name: String = r.get("name");
                        let last_seq: i64 = r.get::<Option<i64>, _>("last_seq").unwrap_or(0);
                        let paused: bool = r.get::<Option<bool>, _>("paused").unwrap_or(false);
                        let backoff_until =
                            r.get::<Option<chrono::DateTime<chrono::Utc>>, _>("backoff_until");
                        let filter = r
                            .get::<Option<serde_json::Value>, _>("filter")
                            .unwrap_or(serde_json::json!({}));
                        println!(
                            "{} last_seq={} paused={} backoff_until={:?} filter={}",
                            name, last_seq, paused, backoff_until, filter,
                        );
                    }
                }
                SubscriptionsCmd::Status { name } => {
                    let r = sqlx::query(
                        "select name, last_seq, paused, backoff_until, filter from subscriptions where name = $1",
                    )
                    .bind(&name)
                    .fetch_optional(store.pool())
                    .await?;
                    if let Some(r) = r {
                        let last_seq: i64 = r.get::<Option<i64>, _>("last_seq").unwrap_or(0);
                        let paused: bool = r.get::<Option<bool>, _>("paused").unwrap_or(false);
                        let backoff_until =
                            r.get::<Option<chrono::DateTime<chrono::Utc>>, _>("backoff_until");
                        let filter = r
                            .get::<Option<serde_json::Value>, _>("filter")
                            .unwrap_or(serde_json::json!({}));
                        println!(
                            "{} last_seq={} paused={} backoff_until={:?} filter={}",
                            name, last_seq, paused, backoff_until, filter,
                        );
                    } else {
                        println!("subscription '{}' not found", name);
                    }
                }
                SubscriptionsCmd::Pause { name } => {
                    sqlx::query(
                        "insert into subscriptions(name, paused) values($1,true) on conflict (name) do update set paused=true, updated_at=now()",
                    )
                    .bind(&name)
                    .execute(store.pool())
                    .await?;
                    println!("paused {}", name);
                }
                SubscriptionsCmd::Resume { name } => {
                    sqlx::query(
                        "update subscriptions set paused=false, backoff_until=null, updated_at=now() where name=$1",
                    )
                    .bind(&name)
                    .execute(store.pool())
                    .await?;
                    println!("resumed {}", name);
                }
                SubscriptionsCmd::Reset { name, seq } => {
                    sqlx::query(
                        "update subscriptions set last_seq=$2, updated_at=now() where name=$1",
                    )
                    .bind(&name)
                    .bind(seq)
                    .execute(store.pool())
                    .await?;
                    println!("reset {} to {}", name, seq);
                }
                SubscriptionsCmd::Tail { name, limit, group } => {
                    // load filter
                    let rec =
                        sqlx::query("select filter, last_seq from subscriptions where name=$1")
                            .bind(&name)
                            .fetch_one(store.pool())
                            .await?;
                    let filter: SubscriptionFilter = rec
                        .get::<Option<serde_json::Value>, _>("filter")
                        .and_then(|v| serde_json::from_value(v).ok())
                        .unwrap_or_default();
                    let last_seq: i64 = rec.get::<Option<i64>, _>("last_seq").unwrap_or(0);
                    let opts = SubscriptionOptions {
                        start_from: last_seq,
                        group,
                        ..Default::default()
                    };
                    let (_h, mut rx) = subs.subscribe(&name, filter, opts).await?;
                    let mut n = 0usize;
                    while let Some(env) = rx.recv().await {
                        println!("{} {} {}", env.stream_id, env.stream_seq, env.typ);
                        n += 1;
                        if n >= limit {
                            break;
                        }
                    }
                }
            }
        }
    }

    Ok(())
}

fn print_plan(plan: &rillflow::SchemaPlan) {
    if !plan.warnings().is_empty() {
        eprintln!("Warnings ({}):", plan.warnings().len());
        for w in plan.warnings() {
            eprintln!("  - {}", w);
        }
    }

    if plan.actions().is_empty() {
        println!("No pending DDL actions.");
        return;
    }

    println!("DDL actions ({}):", plan.actions().len());
    for (i, action) in plan.actions().iter().enumerate() {
        println!("{}. {}", i + 1, action.description());
        println!("{}\n", action.sql());
    }
}