use crate::error::CassandraResult;
use crate::pool::CassandraPool;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UdfLanguage {
Java,
JavaScript,
}
impl UdfLanguage {
pub fn as_str(&self) -> &str {
match self {
UdfLanguage::Java => "java",
UdfLanguage::JavaScript => "javascript",
}
}
}
#[derive(Debug, Clone)]
pub struct UdfDefinition {
pub keyspace: String,
pub name: String,
pub arguments: Vec<(String, String)>,
pub return_type: String,
pub language: UdfLanguage,
pub body: String,
pub called_on_null: bool,
}
#[derive(Debug, Clone)]
pub struct UdaDefinition {
pub keyspace: String,
pub name: String,
pub arg_types: Vec<String>,
pub state_function: String,
pub state_type: String,
pub final_function: Option<String>,
pub initial_condition: Option<String>,
}
impl CassandraPool {
pub async fn create_function(&self, def: &UdfDefinition) -> CassandraResult<()> {
let args = def
.arguments
.iter()
.map(|(n, t)| format!("{n} {t}"))
.collect::<Vec<_>>()
.join(", ");
let null_behavior = if def.called_on_null {
"CALLED ON NULL INPUT"
} else {
"RETURNS NULL ON NULL INPUT"
};
let cql = format!(
"CREATE OR REPLACE FUNCTION {}.{}({}) \
{null_behavior} \
RETURNS {} \
LANGUAGE {} \
AS '{}'",
def.keyspace,
def.name,
args,
def.return_type,
def.language.as_str(),
def.body.replace('\'', "''"),
);
self.execute(&cql).await
}
pub async fn drop_function(
&self,
keyspace: &str,
name: &str,
arg_types: &[&str],
) -> CassandraResult<()> {
let cql = format!(
"DROP FUNCTION IF EXISTS {}.{}({})",
keyspace,
name,
arg_types.join(", "),
);
self.execute(&cql).await
}
pub async fn create_aggregate(&self, def: &UdaDefinition) -> CassandraResult<()> {
let arg_list = def.arg_types.join(", ");
let final_clause = def
.final_function
.as_ref()
.map(|f| format!(" FINALFUNC {f}"))
.unwrap_or_default();
let initial_clause = def
.initial_condition
.as_ref()
.map(|c| format!(" INITCOND {c}"))
.unwrap_or_default();
let cql = format!(
"CREATE OR REPLACE AGGREGATE {}.{}({}) \
SFUNC {} \
STYPE {}{}{}",
def.keyspace,
def.name,
arg_list,
def.state_function,
def.state_type,
final_clause,
initial_clause,
);
self.execute(&cql).await
}
pub async fn drop_aggregate(
&self,
keyspace: &str,
name: &str,
arg_types: &[&str],
) -> CassandraResult<()> {
let cql = format!(
"DROP AGGREGATE IF EXISTS {}.{}({})",
keyspace,
name,
arg_types.join(", "),
);
self.execute(&cql).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_udf_language_as_str() {
assert_eq!(UdfLanguage::Java.as_str(), "java");
assert_eq!(UdfLanguage::JavaScript.as_str(), "javascript");
}
#[test]
fn test_udf_definition_construction() {
let udf = UdfDefinition {
keyspace: "myapp".into(),
name: "plus_one".into(),
arguments: vec![("x".into(), "int".into())],
return_type: "int".into(),
language: UdfLanguage::Java,
body: "return x + 1;".into(),
called_on_null: false,
};
assert_eq!(udf.arguments.len(), 1);
assert!(!udf.called_on_null);
}
#[test]
fn test_uda_definition_optional_fields() {
let uda = UdaDefinition {
keyspace: "myapp".into(),
name: "my_sum".into(),
arg_types: vec!["int".into()],
state_function: "accumulate".into(),
state_type: "int".into(),
final_function: None,
initial_condition: Some("0".into()),
};
assert!(uda.final_function.is_none());
assert_eq!(uda.initial_condition.as_deref(), Some("0"));
}
}