use crate::types::FieldType;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub fn parameter_ref(name: impl Into<String>) -> serde_json::Value {
serde_json::json!({
"type": "Parameter",
"name": name.into(),
})
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserFunction {
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
pub label: String,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
#[serde(default)]
pub parameters: HashMap<String, ParameterDefinition>,
pub functions: Vec<Function>,
#[serde(default)]
pub tags: Vec<String>,
#[serde(skip_serializing)]
pub created_at: Option<DateTime<Utc>>,
#[serde(skip_serializing)]
pub updated_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub http_method: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub http_path: Option<String>,
}
impl UserFunction {
pub fn new(label: impl Into<String>, name: impl Into<String>) -> Self {
Self {
id: None,
label: label.into(),
name: name.into(),
description: None,
version: None,
parameters: HashMap::new(),
functions: Vec::new(),
tags: Vec::new(),
created_at: None,
updated_at: None,
http_method: None,
http_path: None,
}
}
pub fn with_http_route(mut self, method: impl Into<String>, path: impl Into<String>) -> Self {
self.http_method = Some(method.into());
self.http_path = Some(path.into());
self
}
pub fn with_description(mut self, description: impl Into<String>) -> Self {
self.description = Some(description.into());
self
}
pub fn with_version(mut self, version: impl Into<String>) -> Self {
self.version = Some(version.into());
self
}
pub fn with_parameter(mut self, param: ParameterDefinition) -> Self {
self.parameters.insert(param.name.clone(), param);
self
}
pub fn with_function(mut self, function: Function) -> Self {
self.functions.push(function);
self
}
pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
self.tags.push(tag.into());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParameterDefinition {
#[serde(skip_serializing, default)]
pub name: String,
#[serde(default)]
pub required: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub default: Option<FieldType>,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
}
impl ParameterDefinition {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
required: false,
default: None,
description: None,
}
}
pub fn required(mut self) -> Self {
self.required = true;
self
}
pub fn with_default(mut self, default: FieldType) -> Self {
self.default = Some(default);
self
}
pub fn with_description(mut self, description: impl Into<String>) -> Self {
self.description = Some(description.into());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "value")]
pub enum FunctionCondition {
FieldEquals {
field: String,
value: serde_json::Value,
},
FieldExists { field: String },
HasRecords,
CountEquals { count: usize },
CountGreaterThan { count: usize },
CountLessThan { count: usize },
And { conditions: Vec<FunctionCondition> },
Or { conditions: Vec<FunctionCondition> },
Not { condition: Box<FunctionCondition> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "PascalCase")]
#[allow(
clippy::enum_variant_names,
clippy::vec_box,
clippy::upper_case_acronyms
)]
pub enum Function {
FindAll { collection: String },
Query {
collection: String,
#[serde(skip_serializing_if = "Option::is_none")]
filter: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
sort: Option<Vec<SortFieldConfig>>,
#[serde(skip_serializing_if = "Option::is_none")]
limit: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
skip: Option<serde_json::Value>,
},
Project { fields: Vec<String>, exclude: bool },
Group {
by_fields: Vec<String>,
functions: Vec<GroupFunctionConfig>,
},
Count { output_field: String },
FindById {
collection: String,
record_id: String,
},
FindOne {
collection: String,
key: String,
value: serde_json::Value,
},
Insert {
collection: String,
record: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
bypass_ripple: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
ttl: Option<serde_json::Value>,
},
Update {
collection: String,
filter: serde_json::Value,
updates: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
bypass_ripple: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
ttl: Option<serde_json::Value>,
},
UpdateById {
collection: String,
record_id: String,
updates: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
bypass_ripple: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
ttl: Option<serde_json::Value>,
},
FindOneAndUpdate {
collection: String,
filter: serde_json::Value,
updates: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
bypass_ripple: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
ttl: Option<serde_json::Value>,
},
UpdateWithAction {
collection: String,
filter: serde_json::Value,
actions: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
bypass_ripple: Option<bool>,
},
Delete {
collection: String,
filter: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
bypass_ripple: Option<bool>,
},
DeleteById {
collection: String,
record_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
bypass_ripple: Option<bool>,
},
BatchInsert {
collection: String,
records: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
bypass_ripple: Option<bool>,
},
BatchDelete {
ids: serde_json::Value,
#[serde(default)]
bypass_ripple: bool,
},
HttpRequest {
url: String,
#[serde(default = "default_method")]
method: String,
#[serde(skip_serializing_if = "Option::is_none")]
headers: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
body: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
timeout_seconds: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
output_field: Option<String>,
},
VectorSearch {
query_vector: Vec<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
options: Option<serde_json::Value>,
},
TextSearch {
collection: String,
query_text: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
fields: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
limit: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
fuzzy: Option<bool>,
},
HybridSearch {
text_query: String,
vector_query: Vec<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
options: Option<serde_json::Value>,
},
Chat {
messages: Vec<ChatMessage>,
#[serde(skip_serializing_if = "Option::is_none")]
model: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
temperature: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
max_tokens: Option<i32>,
},
Embed {
input_field: String,
output_field: String,
#[serde(skip_serializing_if = "Option::is_none")]
model: Option<String>,
},
If {
condition: FunctionCondition,
then_functions: Vec<Box<Function>>,
#[serde(skip_serializing_if = "Option::is_none")]
else_functions: Option<Vec<Box<Function>>>,
},
ForEach { functions: Vec<Box<Function>> },
CallFunction {
function_label: String,
params: Option<HashMap<String, serde_json::Value>>,
},
CreateSavepoint { name: String },
RollbackToSavepoint { name: String },
ReleaseSavepoint { name: String },
KvGet { key: serde_json::Value },
KvSet {
key: serde_json::Value,
value: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
ttl: Option<serde_json::Value>,
},
KvDelete { key: serde_json::Value },
KvExists {
key: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
output_field: Option<String>,
},
KvQuery {
#[serde(skip_serializing_if = "Option::is_none")]
pattern: Option<serde_json::Value>,
#[serde(default)]
include_expired: bool,
},
SWR {
cache_key: String,
ttl: serde_json::Value,
url: String,
method: String,
#[serde(skip_serializing_if = "Option::is_none")]
headers: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
body: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
timeout_seconds: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
output_field: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
collection: Option<String>,
},
BcryptHash {
plain: String,
#[serde(skip_serializing_if = "Option::is_none")]
cost: Option<u32>,
output_field: String,
},
BcryptVerify {
plain: String,
hash_field: String,
output_field: String,
},
RandomToken {
bytes: usize,
#[serde(skip_serializing_if = "Option::is_none")]
encoding: Option<String>,
output_field: String,
},
JwtSign {
claims: HashMap<String, serde_json::Value>,
secret: String,
#[serde(skip_serializing_if = "Option::is_none")]
algorithm: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
expires_in_secs: Option<i64>,
output_field: String,
},
JwtVerify {
token_field: String,
secret: String,
#[serde(skip_serializing_if = "Option::is_none")]
algorithm: Option<String>,
output_field: String,
},
EmailSend {
to: String,
subject: String,
body: String,
from: String,
#[serde(skip_serializing_if = "Option::is_none")]
reply_to: Option<String>,
api_key: String,
#[serde(skip_serializing_if = "Option::is_none")]
provider: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
html: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
output_field: Option<String>,
},
HmacSign {
input: String,
secret: String,
#[serde(skip_serializing_if = "Option::is_none")]
algorithm: Option<String>,
output_field: String,
#[serde(skip_serializing_if = "Option::is_none")]
encoding: Option<String>,
},
HmacVerify {
input: String,
provided_mac: String,
secret: String,
#[serde(skip_serializing_if = "Option::is_none")]
algorithm: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
encoding: Option<String>,
output_field: String,
},
AesEncrypt {
plaintext: String,
key: String,
#[serde(skip_serializing_if = "Option::is_none")]
key_encoding: Option<String>,
output_field: String,
},
AesDecrypt {
ciphertext_field: String,
key: String,
#[serde(skip_serializing_if = "Option::is_none")]
key_encoding: Option<String>,
output_field: String,
},
UuidGenerate { output_field: String },
TotpGenerate {
secret: String,
#[serde(skip_serializing_if = "Option::is_none")]
digits: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
period: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
algorithm: Option<String>,
output_field: String,
},
TotpVerify {
code: String,
secret: String,
#[serde(skip_serializing_if = "Option::is_none")]
digits: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
period: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
algorithm: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
skew: Option<u8>,
output_field: String,
},
Base64Encode {
input: String,
#[serde(skip_serializing_if = "Option::is_none")]
url_safe: Option<bool>,
output_field: String,
},
Base64Decode {
input: String,
#[serde(skip_serializing_if = "Option::is_none")]
url_safe: Option<bool>,
output_field: String,
},
HexEncode { input: String, output_field: String },
HexDecode { input: String, output_field: String },
Slugify { input: String, output_field: String },
IdempotencyClaim {
key: String,
ttl_secs: u64,
output_field: String,
},
RateLimit {
key: String,
limit: u64,
window_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
on_exceed: Option<String>,
output_field: String,
},
LockAcquire {
key: String,
ttl_secs: u64,
output_field: String,
},
LockRelease {
key: String,
token: String,
output_field: String,
},
TryCatch {
try_functions: Vec<Box<Function>>,
catch_functions: Vec<Box<Function>>,
#[serde(skip_serializing_if = "Option::is_none")]
output_error_field: Option<String>,
},
Parallel {
functions: Vec<Box<Function>>,
wait_for_all: bool,
},
Sleep { duration_ms: serde_json::Value },
Return {
fields: HashMap<String, serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
status_code: Option<u16>,
},
Validate {
schema: serde_json::Value,
data_field: String,
#[serde(skip_serializing_if = "Option::is_none")]
on_error: Option<Vec<Box<Function>>>,
},
}
fn default_method() -> String {
"GET".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatMessage {
pub role: String,
pub content: String,
}
impl ChatMessage {
pub fn system(content: impl Into<String>) -> Self {
Self {
role: "system".to_string(),
content: content.into(),
}
}
pub fn user(content: impl Into<String>) -> Self {
Self {
role: "user".to_string(),
content: content.into(),
}
}
pub fn assistant(content: impl Into<String>) -> Self {
Self {
role: "assistant".to_string(),
content: content.into(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GroupFunctionConfig {
pub output_field: String,
pub operation: GroupFunctionOp,
#[serde(skip_serializing_if = "Option::is_none")]
pub input_field: Option<String>,
}
impl GroupFunctionConfig {
pub fn new(output_field: impl Into<String>, operation: GroupFunctionOp) -> Self {
Self {
output_field: output_field.into(),
operation,
input_field: None,
}
}
pub fn with_input_field(mut self, field: impl Into<String>) -> Self {
self.input_field = Some(field.into());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub enum GroupFunctionOp {
Sum,
Average,
Count,
Min,
Max,
First,
Last,
Push,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SortFieldConfig {
pub field: String,
#[serde(default = "default_ascending")]
pub ascending: bool,
}
impl SortFieldConfig {
pub fn new(field: impl Into<String>) -> Self {
Self {
field: field.into(),
ascending: true,
}
}
pub fn descending(mut self) -> Self {
self.ascending = false;
self
}
pub fn ascending(mut self) -> Self {
self.ascending = true;
self
}
}
fn default_ascending() -> bool {
true
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FunctionResult {
pub records: Vec<crate::Record>,
pub stats: FunctionStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FunctionStats {
pub input_count: usize,
pub output_count: usize,
pub execution_time_ms: u128,
pub stages_executed: usize,
pub stage_stats: Vec<StageStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StageStats {
pub stage: String,
pub input_count: usize,
pub output_count: usize,
pub execution_time_ms: u128,
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn try_catch_round_trip() {
let stage = Function::TryCatch {
try_functions: vec![Box::new(Function::FindAll {
collection: "users".to_string(),
})],
catch_functions: vec![Box::new(Function::Insert {
collection: "errors".to_string(),
record: json!({"msg": "failed"}),
bypass_ripple: None,
ttl: None,
})],
output_error_field: Some("api_error".to_string()),
};
let json = serde_json::to_string(&stage).unwrap();
let parsed: Function = serde_json::from_str(&json).unwrap();
match parsed {
Function::TryCatch {
try_functions,
catch_functions,
output_error_field,
} => {
assert_eq!(try_functions.len(), 1);
assert_eq!(catch_functions.len(), 1);
assert_eq!(output_error_field, Some("api_error".to_string()));
}
other => panic!("Expected TryCatch, got {:?}", other),
}
}
#[test]
fn parallel_round_trip() {
let stage = Function::Parallel {
functions: vec![
Box::new(Function::FindAll {
collection: "a".to_string(),
}),
Box::new(Function::FindAll {
collection: "b".to_string(),
}),
],
wait_for_all: false,
};
let json = serde_json::to_string(&stage).unwrap();
let parsed: Function = serde_json::from_str(&json).unwrap();
match parsed {
Function::Parallel {
functions,
wait_for_all,
} => {
assert_eq!(functions.len(), 2);
assert!(!wait_for_all);
}
other => panic!("Expected Parallel, got {:?}", other),
}
}
#[test]
fn sleep_round_trip() {
let stage = Function::Sleep {
duration_ms: json!(1000),
};
let json = serde_json::to_string(&stage).unwrap();
let parsed: Function = serde_json::from_str(&json).unwrap();
match parsed {
Function::Sleep { duration_ms } => {
assert_eq!(duration_ms, json!(1000));
}
other => panic!("Expected Sleep, got {:?}", other),
}
}
#[test]
fn sleep_with_placeholder() {
let stage = Function::Sleep {
duration_ms: json!("{{delay}}"),
};
let json = serde_json::to_string(&stage).unwrap();
assert!(json.contains("{{delay}}"));
let parsed: Function = serde_json::from_str(&json).unwrap();
match parsed {
Function::Sleep { duration_ms } => {
assert_eq!(duration_ms, json!("{{delay}}"));
}
other => panic!("Expected Sleep, got {:?}", other),
}
}
#[test]
fn return_round_trip() {
let mut fields = HashMap::new();
fields.insert("message".to_string(), json!("ok"));
fields.insert("user_id".to_string(), json!("{{id}}"));
let stage = Function::Return {
fields,
status_code: Some(201),
};
let json = serde_json::to_string(&stage).unwrap();
let parsed: Function = serde_json::from_str(&json).unwrap();
match parsed {
Function::Return {
fields,
status_code,
} => {
assert_eq!(fields.get("message"), Some(&json!("ok")));
assert_eq!(status_code, Some(201));
}
other => panic!("Expected Return, got {:?}", other),
}
}
#[test]
fn return_omits_status_code_when_none() {
let mut fields = HashMap::new();
fields.insert("success".to_string(), json!(true));
let stage = Function::Return {
fields,
status_code: None,
};
let json = serde_json::to_string(&stage).unwrap();
assert!(!json.contains("status_code"));
}
#[test]
fn validate_round_trip() {
let stage = Function::Validate {
schema: json!({"type": "object", "required": ["name"]}),
data_field: "{{input}}".to_string(),
on_error: Some(vec![Box::new(Function::FindAll {
collection: "errors".to_string(),
})]),
};
let json = serde_json::to_string(&stage).unwrap();
let parsed: Function = serde_json::from_str(&json).unwrap();
match parsed {
Function::Validate {
schema,
data_field,
on_error,
} => {
assert_eq!(schema, json!({"type": "object", "required": ["name"]}));
assert_eq!(data_field, "{{input}}");
assert_eq!(on_error.unwrap().len(), 1);
}
other => panic!("Expected Validate, got {:?}", other),
}
}
#[test]
fn validate_omits_on_error_when_none() {
let stage = Function::Validate {
schema: json!({"type": "object"}),
data_field: "record".to_string(),
on_error: None,
};
let json = serde_json::to_string(&stage).unwrap();
assert!(!json.contains("on_error"));
}
#[test]
fn deserialize_user_function_with_return_stage() {
let json = r#"{
"id": "abc123",
"label": "api_handler",
"name": "API Handler",
"functions": [
{"type": "FindAll", "collection": "users"},
{"type": "Return", "fields": {"data": "{{result}}"}, "status_code": 200}
]
}"#;
let uf: UserFunction = serde_json::from_str(json).unwrap();
assert_eq!(uf.functions.len(), 2);
match &uf.functions[1] {
Function::Return {
fields,
status_code,
} => {
assert!(fields.contains_key("data"));
assert_eq!(*status_code, Some(200));
}
other => panic!("Expected Return, got {:?}", other),
}
}
}