use convergence::server::{self, BindOptions};
use convergence_arrow::datafusion::DataFusionEngine;
use convergence_arrow::metadata::Catalog;
use datafusion::arrow::datatypes::DataType;
use datafusion::catalog::memory::MemorySchemaProvider;
use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider};
use datafusion::logical_expr::Volatility;
use datafusion::physical_plan::ColumnarValue;
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
use std::sync::Arc;
async fn new_engine() -> DataFusionEngine {
let ctx = SessionContext::new_with_config(
SessionConfig::new()
.with_information_schema(true)
.with_create_default_catalog_and_schema(false),
);
let mem_catalog = Arc::new(MemoryCatalogProvider::new());
mem_catalog
.register_schema("public", Arc::new(MemorySchemaProvider::new()))
.expect("failed to register schema");
ctx.register_catalog("datafusion", Arc::new(Catalog::new(mem_catalog)));
ctx.register_csv(
"test_100_4buckets",
"convergence-arrow/data/100_4buckets.csv",
CsvReadOptions::new(),
)
.await
.expect("failed to register csv");
ctx.register_udf(create_udf(
"pg_backend_pid",
vec![],
DataType::Int32,
Volatility::Stable,
Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(0))))),
));
ctx.register_udf(create_udf(
"current_schema",
vec![],
DataType::Utf8,
Volatility::Stable,
Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some("public".to_owned()))))),
));
DataFusionEngine::new(ctx)
}
#[tokio::main]
async fn main() {
server::run(BindOptions::new(), Arc::new(|| Box::pin(new_engine())))
.await
.unwrap();
}