use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::prelude::SessionContext;
use exoware_sdk::StoreClient;
use exoware_sql::{CellValue, KvSchema, TableColumnConfig};
const DOC_ID_HEX: &str = "d0c1aabbccddeeff0011223344556677";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let base_url =
std::env::var("EXOWARE_URL").unwrap_or_else(|_| "http://localhost:10000".to_string());
let client = StoreClient::new(&base_url);
let ctx = SessionContext::new();
let schema = build_schema(client)?;
schema.register_all(&ctx)?;
demo_batch_writer_insert().await;
demo_sql_queries(&ctx).await?;
Ok(())
}
fn build_schema(client: StoreClient) -> Result<KvSchema, String> {
KvSchema::new(client).table_versioned(
"documents",
vec![
TableColumnConfig::new("doc_id", DataType::FixedSizeBinary(16), false),
TableColumnConfig::new("version", DataType::UInt64, false),
TableColumnConfig::new("title", DataType::Utf8, false),
TableColumnConfig::new("body", DataType::Utf8, true),
TableColumnConfig::new("author", DataType::Utf8, true),
],
"doc_id",
"version", vec![],
)
}
async fn demo_batch_writer_insert() {
println!("\n== BatchWriter: multiple document versions ==");
let client = StoreClient::new("http://localhost:10000");
let schema = build_schema(client).unwrap();
let mut batch = schema.batch_writer();
let doc_id = doc_id_bytes();
batch
.insert(
"documents",
vec![
CellValue::FixedBinary(doc_id.clone()),
CellValue::UInt64(1),
CellValue::Utf8("Draft".to_string()),
CellValue::Utf8("Initial content...".to_string()),
CellValue::Utf8("Alice".to_string()),
],
)
.unwrap();
batch
.insert(
"documents",
vec![
CellValue::FixedBinary(doc_id.clone()),
CellValue::UInt64(2),
CellValue::Utf8("Review".to_string()),
CellValue::Utf8("Revised content after review...".to_string()),
CellValue::Utf8("Alice".to_string()),
],
)
.unwrap();
batch
.insert(
"documents",
vec![
CellValue::FixedBinary(doc_id.clone()),
CellValue::UInt64(4),
CellValue::Utf8("Published".to_string()),
CellValue::Utf8("Final published content.".to_string()),
CellValue::Utf8("Bob".to_string()),
],
)
.unwrap();
batch
.insert(
"documents",
vec![
CellValue::FixedBinary(doc_id.clone()),
CellValue::UInt64(5),
CellValue::Utf8("Translated".to_string()),
CellValue::Utf8("Localized content.".to_string()),
CellValue::Utf8("Bob".to_string()),
],
)
.unwrap();
batch
.insert(
"documents",
vec![
CellValue::FixedBinary(doc_id.clone()),
CellValue::UInt64(3),
CellValue::Utf8("Approved".to_string()),
CellValue::Utf8("Approved content.".to_string()),
CellValue::Utf8("Bob".to_string()),
],
)
.unwrap();
println!(" Queued {} document-version rows", batch.pending_count());
println!(" (skipping flush -- no server in this demo)");
}
async fn demo_sql_queries(ctx: &SessionContext) -> Result<(), Box<dyn std::error::Error>> {
println!("\n== SQL query patterns for versioned documents ==");
let doc_id = format!("X'{}'", DOC_ID_HEX.to_uppercase());
println!("\n -- All versions of a document:");
println!(" SELECT * FROM documents WHERE doc_id = '{doc_id}'");
println!("\n -- Latest version <= 2 (gets version 2, not 3):");
println!(
" SELECT * FROM documents WHERE doc_id = '{doc_id}' AND version <= 2 \
ORDER BY version DESC LIMIT 1"
);
println!("\n -- Exact version lookup:");
println!(" SELECT * FROM documents WHERE doc_id = '{doc_id}' AND version = 2");
let plan = ctx
.sql(&format!(
"SELECT title, version FROM documents \
WHERE doc_id = {doc_id} \
AND version <= 2 \
ORDER BY version DESC LIMIT 1"
))
.await;
match plan {
Ok(df) => {
println!("\n Query planned successfully");
let _ = print_batches(&df.collect().await?);
}
Err(e) => println!("\n (no data to query: {e})"),
}
Ok(())
}
fn doc_id_bytes() -> Vec<u8> {
hex::decode(DOC_ID_HEX).expect("fixed example doc_id hex should decode")
}