use clap::{ArgAction, Parser, Subcommand};
use rillflow::projection_runtime::{ProjectionDaemon, ProjectionWorkerConfig};
use rillflow::subscriptions::{SubscriptionFilter, SubscriptionOptions, Subscriptions};
use rillflow::{SchemaConfig, Store, TenancyMode, TenantSchema};
use sqlx::Row;
use uuid::Uuid;
#[derive(Parser, Debug)]
#[command(name = "rillflow", version, about = "Rillflow CLI")]
struct Cli {
#[arg(long)]
database_url: Option<String>,
#[arg(long, default_value = "public")]
schema: String,
#[arg(long = "tenant-schema", action = ArgAction::Append)]
tenant_schemas: Vec<String>,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand, Debug)]
enum Commands {
SchemaPlan,
SchemaSync,
#[command(subcommand)]
Projections(ProjectionsCmd),
#[command(subcommand)]
Subscriptions(SubscriptionsCmd),
}
#[derive(Subcommand, Debug)]
enum ProjectionsCmd {
List,
Status { name: String },
Pause { name: String },
Resume { name: String },
ResetCheckpoint { name: String, seq: i64 },
Rebuild { name: String },
RunOnce { name: Option<String> },
RunUntilIdle { name: Option<String> },
DlqList {
name: String,
#[arg(long, default_value_t = 50)]
limit: i64,
},
DlqRequeue { name: String, id: i64 },
DlqDelete { name: String, id: i64 },
}
#[derive(Subcommand, Debug)]
enum SubscriptionsCmd {
Create {
name: String,
#[arg(long, action = ArgAction::Append)]
event_type: Vec<String>,
#[arg(long, action = ArgAction::Append)]
stream_id: Vec<String>,
#[arg(long, default_value_t = 0)]
start_from: i64,
},
List,
Status { name: String },
Pause { name: String },
Resume { name: String },
Reset { name: String, seq: i64 },
Tail {
name: String,
#[arg(long, default_value_t = 10)]
limit: usize,
#[arg(long)]
group: Option<String>,
},
}
#[tokio::main]
async fn main() -> rillflow::Result<()> {
let cli = Cli::parse();
let url = match cli
.database_url
.or_else(|| std::env::var("DATABASE_URL").ok())
{
Some(u) => u,
None => {
eprintln!("error: --database-url or env DATABASE_URL is required");
std::process::exit(2);
}
};
let tenancy_mode = if cli.tenant_schemas.is_empty() {
TenancyMode::SingleTenant
} else {
TenancyMode::SchemaPerTenant {
tenants: cli
.tenant_schemas
.iter()
.map(|s| TenantSchema::new(s.trim()))
.collect(),
}
};
let config = SchemaConfig {
base_schema: cli.schema,
tenancy_mode,
};
let store = Store::connect(&url).await?;
let mgr = store.schema();
match cli.command {
Commands::SchemaPlan => {
let plan = mgr.plan(&config).await?;
print_plan(&plan);
}
Commands::SchemaSync => {
let plan = mgr.sync(&config).await?;
if plan.is_empty() {
println!("No changes needed.");
} else {
println!("Applied changes:");
print_plan(&plan);
}
}
Commands::Projections(cmd) => {
let daemon =
ProjectionDaemon::new(store.pool().clone(), ProjectionWorkerConfig::default());
match cmd {
ProjectionsCmd::List => {
let list = daemon.list().await?;
for s in list {
println!(
"{} last_seq={} paused={} leased_by={:?} lease_until={:?} backoff_until={:?} dlq_count={}",
s.name,
s.last_seq,
s.paused,
s.leased_by,
s.lease_until,
s.backoff_until,
s.dlq_count
);
}
}
ProjectionsCmd::Status { name } => {
let s = daemon.status(&name).await?;
println!(
"{} last_seq={} paused={} leased_by={:?} lease_until={:?} backoff_until={:?} dlq_count={}",
s.name,
s.last_seq,
s.paused,
s.leased_by,
s.lease_until,
s.backoff_until,
s.dlq_count
);
}
ProjectionsCmd::Pause { name } => {
daemon.pause(&name).await?;
println!("paused {}", { name });
}
ProjectionsCmd::Resume { name } => {
daemon.resume(&name).await?;
println!("resumed {}", { name });
}
ProjectionsCmd::ResetCheckpoint { name, seq } => {
daemon.reset_checkpoint(&name, seq).await?;
println!("reset {} to {}", name, seq);
}
ProjectionsCmd::Rebuild { name } => {
daemon.rebuild(&name).await?;
println!("rebuild scheduled for {}", name);
}
ProjectionsCmd::RunOnce { name } => {
if let Some(n) = name {
let res = daemon.tick_once(&n).await?;
println!("{}: {:?}", n, res);
} else {
daemon.tick_all_once().await?;
println!("tick-all executed");
}
}
ProjectionsCmd::RunUntilIdle { name } => {
if let Some(n) = name {
loop {
let res = daemon.tick_once(&n).await?;
match res {
rillflow::projection_runtime::TickResult::Processed { count }
if count > 0 => {}
_ => break,
}
}
println!("{}: idle", n);
} else {
daemon.run_until_idle().await?;
println!("all projections idle");
}
}
ProjectionsCmd::DlqList { name, limit } => {
let items = daemon.dlq_list(&name, limit).await?;
for i in items {
println!(
"id={} seq={} type={} failed_at={} error={}",
i.id, i.seq, i.event_type, i.failed_at, i.error
);
}
}
ProjectionsCmd::DlqRequeue { name, id } => {
daemon.dlq_requeue(&name, id).await?;
println!("requeued {}:{}", name, id);
}
ProjectionsCmd::DlqDelete { name, id } => {
daemon.dlq_delete(&name, id).await?;
println!("deleted {}:{}", name, id);
}
}
}
Commands::Subscriptions(cmd) => {
let subs = Subscriptions::new(store.pool().clone());
match cmd {
SubscriptionsCmd::Create {
name,
event_type,
stream_id,
start_from,
} => {
let ids: Vec<Uuid> = stream_id
.into_iter()
.filter_map(|s| Uuid::parse_str(&s).ok())
.collect();
let filter = SubscriptionFilter {
event_types: if event_type.is_empty() {
None
} else {
Some(event_type)
},
stream_ids: if ids.is_empty() { None } else { Some(ids) },
stream_prefix: None,
};
subs.create_or_update(&name, &filter, start_from).await?;
println!("subscription '{}' upserted (from={})", name, start_from);
}
SubscriptionsCmd::List => {
let rows = sqlx::query(
"select name, last_seq, paused, backoff_until, filter from subscriptions order by name",
)
.fetch_all(store.pool())
.await?;
for r in rows {
let name: String = r.get("name");
let last_seq: i64 = r.get::<Option<i64>, _>("last_seq").unwrap_or(0);
let paused: bool = r.get::<Option<bool>, _>("paused").unwrap_or(false);
let backoff_until =
r.get::<Option<chrono::DateTime<chrono::Utc>>, _>("backoff_until");
let filter = r
.get::<Option<serde_json::Value>, _>("filter")
.unwrap_or(serde_json::json!({}));
println!(
"{} last_seq={} paused={} backoff_until={:?} filter={}",
name, last_seq, paused, backoff_until, filter,
);
}
}
SubscriptionsCmd::Status { name } => {
let r = sqlx::query(
"select name, last_seq, paused, backoff_until, filter from subscriptions where name = $1",
)
.bind(&name)
.fetch_optional(store.pool())
.await?;
if let Some(r) = r {
let last_seq: i64 = r.get::<Option<i64>, _>("last_seq").unwrap_or(0);
let paused: bool = r.get::<Option<bool>, _>("paused").unwrap_or(false);
let backoff_until =
r.get::<Option<chrono::DateTime<chrono::Utc>>, _>("backoff_until");
let filter = r
.get::<Option<serde_json::Value>, _>("filter")
.unwrap_or(serde_json::json!({}));
println!(
"{} last_seq={} paused={} backoff_until={:?} filter={}",
name, last_seq, paused, backoff_until, filter,
);
} else {
println!("subscription '{}' not found", name);
}
}
SubscriptionsCmd::Pause { name } => {
sqlx::query(
"insert into subscriptions(name, paused) values($1,true) on conflict (name) do update set paused=true, updated_at=now()",
)
.bind(&name)
.execute(store.pool())
.await?;
println!("paused {}", name);
}
SubscriptionsCmd::Resume { name } => {
sqlx::query(
"update subscriptions set paused=false, backoff_until=null, updated_at=now() where name=$1",
)
.bind(&name)
.execute(store.pool())
.await?;
println!("resumed {}", name);
}
SubscriptionsCmd::Reset { name, seq } => {
sqlx::query(
"update subscriptions set last_seq=$2, updated_at=now() where name=$1",
)
.bind(&name)
.bind(seq)
.execute(store.pool())
.await?;
println!("reset {} to {}", name, seq);
}
SubscriptionsCmd::Tail { name, limit, group } => {
let rec =
sqlx::query("select filter, last_seq from subscriptions where name=$1")
.bind(&name)
.fetch_one(store.pool())
.await?;
let filter: SubscriptionFilter = rec
.get::<Option<serde_json::Value>, _>("filter")
.and_then(|v| serde_json::from_value(v).ok())
.unwrap_or_default();
let last_seq: i64 = rec.get::<Option<i64>, _>("last_seq").unwrap_or(0);
let opts = SubscriptionOptions {
start_from: last_seq,
group,
..Default::default()
};
let (_h, mut rx) = subs.subscribe(&name, filter, opts).await?;
let mut n = 0usize;
while let Some(env) = rx.recv().await {
println!("{} {} {}", env.stream_id, env.stream_seq, env.typ);
n += 1;
if n >= limit {
break;
}
}
}
}
}
}
Ok(())
}
fn print_plan(plan: &rillflow::SchemaPlan) {
if !plan.warnings().is_empty() {
eprintln!("Warnings ({}):", plan.warnings().len());
for w in plan.warnings() {
eprintln!(" - {}", w);
}
}
if plan.actions().is_empty() {
println!("No pending DDL actions.");
return;
}
println!("DDL actions ({}):", plan.actions().len());
for (i, action) in plan.actions().iter().enumerate() {
println!("{}. {}", i + 1, action.description());
println!("{}\n", action.sql());
}
}