exoware-sql 2026.4.1

SQL engine backed by the Exoware API.
Documentation
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;

use axum::{routing::get, Router};
use clap::{Parser, Subcommand};
use exoware_sdk::StoreClient;
use exoware_sql::{default_orders_index_specs, sql_connect_stack, KvSchema, SqlServer};
use tower_http::cors::CorsLayer;
use tracing::info;

const TABLE_NAME: &str = "orders_kv";

#[derive(Parser, Debug)]
#[command(name = "sql", version, about = "SQL server over the Exoware store.")]
struct Cli {
    #[command(subcommand)]
    command: Command,
}

#[derive(Subcommand, Debug)]
enum Command {
    Run {
        #[arg(long)]
        store_url: String,
        #[arg(long, default_value = "0.0.0.0")]
        host: IpAddr,
        #[arg(long, default_value_t = 8082)]
        port: u16,
    },
    Seed {
        #[arg(long)]
        store_url: String,
        #[arg(long, default_value_t = 2)]
        interval_secs: u64,
    },
}

async fn health() -> &'static str {
    "ok"
}

fn build_server(
    store_url: &str,
) -> Result<Arc<SqlServer>, Box<dyn std::error::Error + Send + Sync>> {
    let client = StoreClient::new(store_url);
    let schema = KvSchema::new(client)
        .orders_table(TABLE_NAME, default_orders_index_specs())
        .map_err(|e| format!("configure schema: {e}"))?;
    let server = SqlServer::new(schema)?;
    Ok(Arc::new(server))
}

async fn run(
    store_url: &str,
    host: IpAddr,
    port: u16,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let server = build_server(store_url)?;
    let app = Router::new()
        .route("/health", get(health))
        .fallback_service(sql_connect_stack(server))
        .layer(CorsLayer::very_permissive());

    let addr = SocketAddr::from((host, port));
    info!(%addr, store_url, "sql server listening");
    let listener = tokio::net::TcpListener::bind(addr).await?;
    axum::serve(listener, app).await?;
    Ok(())
}

async fn seed(
    store_url: &str,
    interval_secs: u64,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let server = build_server(store_url)?;
    info!(store_url, interval_secs, "starting sql seed");
    let mut ticker = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

    let run_nonce = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)?
        .as_secs() as i64;
    let mut offset: i64 = run_nonce.saturating_mul(10);

    loop {
        tokio::select! {
            biased;
            _ = tokio::signal::ctrl_c() => {
                info!("ctrl-c received, shutting down");
                break;
            }
            _ = ticker.tick() => {}
        }

        let sql = format!(
            "INSERT INTO {TABLE_NAME} (region, customer_id, order_id, amount_cents, status) VALUES \
             ('us-east', 1001, {}, 3499, 'paid'), \
             ('us-west', 1002, {}, 1799, 'paid'), \
             ('us-east', 1003, {}, 2299, 'pending'), \
             ('eu-central', 1004, {}, 1299, 'paid'), \
             ('us-west', 1005, {}, 4599, 'refunded')",
            offset + 1,
            offset + 2,
            offset + 3,
            offset + 4,
            offset + 5,
        );
        offset = offset.saturating_add(5);

        let df = server.session().sql(&sql).await?;
        let _ = df.collect().await?;
        println!("seeded 5 rows at order_id base {}", offset - 5);
    }

    Ok(())
}

fn init_tracing() {
    let _ = tracing_subscriber::fmt()
        .with_env_filter(
            tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
        )
        .try_init();
}

#[tokio::main]
async fn main() -> std::process::ExitCode {
    init_tracing();
    let cli = Cli::parse();

    let result = match cli.command {
        Command::Run {
            store_url,
            host,
            port,
        } => run(&store_url, host, port).await,
        Command::Seed {
            store_url,
            interval_secs,
        } => seed(&store_url, interval_secs).await,
    };

    match result {
        Ok(()) => std::process::ExitCode::SUCCESS,
        Err(err) => {
            eprintln!("sql failed: {err}");
            std::process::ExitCode::FAILURE
        }
    }
}