use datafusion::prelude::SessionContext;
use datafusion::sql::TableReference;
use datafusion_table_providers::{
common::DatabaseCatalogProvider, postgres::PostgresTableFactory,
sql::db_connection_pool::postgrespool::PostgresConnectionPool, util::secrets::to_secret_map,
};
use std::collections::HashMap;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let postgres_params = to_secret_map(HashMap::from([
("host".to_string(), "localhost".to_string()),
("user".to_string(), "postgres".to_string()),
("db".to_string(), "postgres_db".to_string()),
("pass".to_string(), "password".to_string()),
("port".to_string(), "5432".to_string()),
("sslmode".to_string(), "disable".to_string()),
]));
let postgres_pool = Arc::new(
PostgresConnectionPool::new(postgres_params)
.await
.expect("unable to create PostgreSQL connection pool"),
);
let table_factory = PostgresTableFactory::new(postgres_pool.clone());
let catalog = DatabaseCatalogProvider::try_new(postgres_pool)
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_catalog("postgres", Arc::new(catalog));
ctx.register_table(
"companies_v2",
table_factory
.table_provider(TableReference::bare("companies"))
.await
.expect("failed to register table provider"),
)
.expect("failed to register table");
let companies_view = table_factory
.table_provider(TableReference::bare("companies_view"))
.await
.expect("to create table provider for view");
let companies_materialized_view = table_factory
.table_provider(TableReference::bare("companies_materialized_view"))
.await
.expect("to create table provider for materialized view");
ctx.register_table("companies_view", companies_view)
.expect("to register view");
ctx.register_table("companies_materialized_view", companies_materialized_view)
.expect("to register materialized view");
let df = ctx
.sql("SELECT * FROM datafusion.public.companies_v2")
.await
.expect("select failed");
df.show().await.expect("show failed");
let df = ctx
.sql("SELECT * FROM postgres.public.companies")
.await
.expect("select failed");
df.show().await.expect("show failed");
let df = ctx
.sql("SELECT * FROM companies_view")
.await
.expect("select from view failed");
df.show().await.expect("show failed");
let df = ctx
.sql("SELECT * FROM companies_materialized_view")
.await
.expect("select from materialized view failed");
df.show().await.expect("show failed");
}