datafusion/
datafusion.rs

1use convergence::server::{self, BindOptions};
2use convergence_arrow::datafusion::DataFusionEngine;
3use convergence_arrow::metadata::Catalog;
4use datafusion::arrow::datatypes::DataType;
5use datafusion::catalog::memory::MemorySchemaProvider;
6use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider};
7use datafusion::logical_expr::Volatility;
8use datafusion::physical_plan::ColumnarValue;
9use datafusion::prelude::*;
10use datafusion::scalar::ScalarValue;
11use std::sync::Arc;
12
13async fn new_engine() -> DataFusionEngine {
14	let ctx = SessionContext::new_with_config(
15		SessionConfig::new()
16			.with_information_schema(true)
17			.with_create_default_catalog_and_schema(false),
18	);
19
20	let mem_catalog = Arc::new(MemoryCatalogProvider::new());
21	mem_catalog
22		.register_schema("public", Arc::new(MemorySchemaProvider::new()))
23		.expect("failed to register schema");
24
25	ctx.register_catalog("datafusion", Arc::new(Catalog::new(mem_catalog)));
26
27	ctx.register_csv(
28		"test_100_4buckets",
29		"convergence-arrow/data/100_4buckets.csv",
30		CsvReadOptions::new(),
31	)
32	.await
33	.expect("failed to register csv");
34
35	ctx.register_udf(create_udf(
36		"pg_backend_pid",
37		vec![],
38		DataType::Int32,
39		Volatility::Stable,
40		Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(0))))),
41	));
42
43	ctx.register_udf(create_udf(
44		"current_schema",
45		vec![],
46		DataType::Utf8,
47		Volatility::Stable,
48		Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some("public".to_owned()))))),
49	));
50
51	DataFusionEngine::new(ctx)
52}
53
54#[tokio::main]
55async fn main() {
56	server::run(BindOptions::new(), Arc::new(|| Box::pin(new_engine())))
57		.await
58		.unwrap();
59}