1use convergence::server::{self, BindOptions};
2use convergence_arrow::datafusion::DataFusionEngine;
3use convergence_arrow::metadata::Catalog;
4use datafusion::arrow::datatypes::DataType;
5use datafusion::catalog::memory::MemorySchemaProvider;
6use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider};
7use datafusion::logical_expr::Volatility;
8use datafusion::physical_plan::ColumnarValue;
9use datafusion::prelude::*;
10use datafusion::scalar::ScalarValue;
11use std::sync::Arc;
12
13async fn new_engine() -> DataFusionEngine {
14 let ctx = SessionContext::new_with_config(
15 SessionConfig::new()
16 .with_information_schema(true)
17 .with_create_default_catalog_and_schema(false),
18 );
19
20 let mem_catalog = Arc::new(MemoryCatalogProvider::new());
21 mem_catalog
22 .register_schema("public", Arc::new(MemorySchemaProvider::new()))
23 .expect("failed to register schema");
24
25 ctx.register_catalog("datafusion", Arc::new(Catalog::new(mem_catalog)));
26
27 ctx.register_csv(
28 "test_100_4buckets",
29 "convergence-arrow/data/100_4buckets.csv",
30 CsvReadOptions::new(),
31 )
32 .await
33 .expect("failed to register csv");
34
35 ctx.register_udf(create_udf(
36 "pg_backend_pid",
37 vec![],
38 DataType::Int32,
39 Volatility::Stable,
40 Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(0))))),
41 ));
42
43 ctx.register_udf(create_udf(
44 "current_schema",
45 vec![],
46 DataType::Utf8,
47 Volatility::Stable,
48 Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some("public".to_owned()))))),
49 ));
50
51 DataFusionEngine::new(ctx)
52}
53
54#[tokio::main]
55async fn main() {
56 server::run(BindOptions::new(), Arc::new(|| Box::pin(new_engine())))
57 .await
58 .unwrap();
59}