#![allow(clippy::disallowed_types)]
use std::collections::HashMap;
use std::path::PathBuf;
use datafusion_expr::{AggregateUDF, ScalarUDF};
use laminar_core::streaming::{BackpressureStrategy, StreamCheckpointConfig};
use crate::config::LaminarConfig;
use crate::db::LaminarDB;
use crate::error::DbError;
use crate::profile::Profile;
type ConnectorCallback = Box<dyn FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send>;
pub struct LaminarDbBuilder {
config: LaminarConfig,
config_vars: HashMap<String, String>,
connector_callbacks: Vec<ConnectorCallback>,
profile: Profile,
profile_explicit: bool,
object_store_url: Option<String>,
object_store_options: HashMap<String, String>,
custom_udfs: Vec<ScalarUDF>,
custom_udafs: Vec<AggregateUDF>,
}
impl LaminarDbBuilder {
#[must_use]
pub fn new() -> Self {
Self {
config: LaminarConfig::default(),
config_vars: HashMap::new(),
connector_callbacks: Vec::new(),
profile: Profile::default(),
profile_explicit: false,
object_store_url: None,
object_store_options: HashMap::new(),
custom_udfs: Vec::new(),
custom_udafs: Vec::new(),
}
}
#[must_use]
pub fn config_var(mut self, key: &str, value: &str) -> Self {
self.config_vars.insert(key.to_string(), value.to_string());
self
}
#[must_use]
pub fn buffer_size(mut self, size: usize) -> Self {
self.config.default_buffer_size = size;
self
}
#[must_use]
pub fn backpressure(mut self, strategy: BackpressureStrategy) -> Self {
self.config.default_backpressure = strategy;
self
}
#[must_use]
pub fn storage_dir(mut self, path: impl Into<PathBuf>) -> Self {
self.config.storage_dir = Some(path.into());
self
}
#[must_use]
pub fn checkpoint(mut self, config: StreamCheckpointConfig) -> Self {
self.config.checkpoint = Some(config);
self
}
#[must_use]
pub fn profile(mut self, profile: Profile) -> Self {
self.profile = profile;
self.profile_explicit = true;
self
}
#[must_use]
pub fn object_store_url(mut self, url: impl Into<String>) -> Self {
self.object_store_url = Some(url.into());
self
}
#[must_use]
pub fn object_store_options(mut self, opts: HashMap<String, String>) -> Self {
self.object_store_options = opts;
self
}
#[must_use]
pub fn tiering(mut self, tiering: crate::config::TieringConfig) -> Self {
self.config.tiering = Some(tiering);
self
}
#[must_use]
pub fn delivery_guarantee(
mut self,
guarantee: laminar_connectors::connector::DeliveryGuarantee,
) -> Self {
self.config.delivery_guarantee = guarantee;
self
}
#[must_use]
pub fn register_udf(mut self, udf: ScalarUDF) -> Self {
self.custom_udfs.push(udf);
self
}
#[must_use]
pub fn register_udaf(mut self, udaf: AggregateUDF) -> Self {
self.custom_udafs.push(udaf);
self
}
#[must_use]
pub fn register_connector(
mut self,
f: impl FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send + 'static,
) -> Self {
self.connector_callbacks.push(Box::new(f));
self
}
#[allow(clippy::unused_async)]
pub async fn build(mut self) -> Result<LaminarDB, DbError> {
self.config.object_store_url = self.object_store_url;
self.config.object_store_options = self.object_store_options;
if !self.profile_explicit {
self.profile = Profile::from_config(&self.config, false);
}
self.profile
.validate_features()
.map_err(|e| DbError::Config(e.to_string()))?;
self.profile
.validate_config(&self.config, self.config.object_store_url.as_deref())
.map_err(|e| DbError::Config(e.to_string()))?;
self.profile.apply_defaults(&mut self.config);
let db = LaminarDB::open_with_config_and_vars(self.config, self.config_vars)?;
for callback in self.connector_callbacks {
callback(db.connector_registry());
}
for udf in self.custom_udfs {
db.register_custom_udf(udf);
}
for udaf in self.custom_udafs {
db.register_custom_udaf(udaf);
}
Ok(db)
}
}
impl Default for LaminarDbBuilder {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for LaminarDbBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LaminarDbBuilder")
.field("config", &self.config)
.field("profile", &self.profile)
.field("profile_explicit", &self.profile_explicit)
.field("object_store_url", &self.object_store_url)
.field(
"object_store_options_count",
&self.object_store_options.len(),
)
.field("config_vars_count", &self.config_vars.len())
.field("connector_callbacks", &self.connector_callbacks.len())
.field("custom_udfs", &self.custom_udfs.len())
.field("custom_udafs", &self.custom_udafs.len())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_default_builder() {
let db = LaminarDbBuilder::new().build().await.unwrap();
assert!(!db.is_closed());
}
#[tokio::test]
async fn test_builder_with_config_vars() {
let db = LaminarDbBuilder::new()
.config_var("KAFKA_BROKERS", "localhost:9092")
.config_var("GROUP_ID", "test-group")
.build()
.await
.unwrap();
assert!(!db.is_closed());
}
#[tokio::test]
async fn test_builder_with_options() {
let db = LaminarDbBuilder::new()
.buffer_size(131_072)
.build()
.await
.unwrap();
assert!(!db.is_closed());
}
#[tokio::test]
async fn test_builder_from_laminardb() {
let db = LaminarDB::builder().build().await.unwrap();
assert!(!db.is_closed());
}
#[test]
fn test_builder_debug() {
let builder = LaminarDbBuilder::new().config_var("K", "V");
let debug = format!("{builder:?}");
assert!(debug.contains("LaminarDbBuilder"));
assert!(debug.contains("config_vars_count: 1"));
}
#[tokio::test]
async fn test_builder_register_udf() {
use std::any::Any;
use std::hash::{Hash, Hasher};
use arrow::datatypes::DataType;
use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
#[derive(Debug)]
struct FortyTwo {
signature: Signature,
}
impl FortyTwo {
fn new() -> Self {
Self {
signature: Signature::new(TypeSignature::Nullary, Volatility::Immutable),
}
}
}
impl PartialEq for FortyTwo {
fn eq(&self, _: &Self) -> bool {
true
}
}
impl Eq for FortyTwo {}
impl Hash for FortyTwo {
fn hash<H: Hasher>(&self, state: &mut H) {
"forty_two".hash(state);
}
}
impl ScalarUDFImpl for FortyTwo {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &'static str {
"forty_two"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Int64)
}
fn invoke_with_args(
&self,
_args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(
datafusion_common::ScalarValue::Int64(Some(42)),
))
}
}
let udf = ScalarUDF::new_from_impl(FortyTwo::new());
let db = LaminarDB::builder()
.register_udf(udf)
.build()
.await
.unwrap();
let result = db.execute("SELECT forty_two()").await;
assert!(result.is_ok(), "UDF should be callable: {result:?}");
}
}