use async_trait::async_trait;
use dataflow_rs::Result;
use dataflow_rs::{
Engine, Workflow,
engine::{
AsyncFunctionHandler, FunctionConfig,
error::DataflowError,
message::{Change, Message},
},
};
use datalogic_rs::DataLogic;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::sync::Arc;
pub struct StatisticsFunction;
#[async_trait]
impl AsyncFunctionHandler for StatisticsFunction {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
_datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
let input = match config {
FunctionConfig::Custom { input, .. } => input,
_ => {
return Err(DataflowError::Validation(
"Invalid configuration type for statistics function".to_string(),
));
}
};
let data_path = input
.get("data_path")
.and_then(Value::as_str)
.unwrap_or("numbers");
let output_path = input
.get("output_path")
.and_then(Value::as_str)
.unwrap_or("statistics");
let numbers = self.extract_numbers_from_path(message, data_path)?;
if numbers.is_empty() {
return Err(DataflowError::Validation(
"No numeric data found to analyze".to_string(),
));
}
let stats = self.calculate_statistics(&numbers);
self.set_value_at_path(message, output_path, stats.clone())?;
Ok((
200,
vec![Change {
path: Arc::from(output_path),
old_value: Arc::new(Value::Null),
new_value: Arc::new(stats),
}],
))
}
}
impl Default for StatisticsFunction {
fn default() -> Self {
Self::new()
}
}
impl StatisticsFunction {
pub fn new() -> Self {
Self
}
fn extract_numbers_from_path(&self, message: &Message, path: &str) -> Result<Vec<f64>> {
let parts: Vec<&str> = path.split('.').collect();
let mut current = &message.context["data"];
for part in parts {
current = current.get(part).unwrap_or(&Value::Null);
}
match current {
Value::Array(arr) => {
let mut numbers = Vec::new();
for val in arr {
if let Some(num) = val.as_f64() {
numbers.push(num);
}
}
Ok(numbers)
}
_ => Err(DataflowError::Validation(format!(
"Expected array at path '{}', found {:?}",
path, current
))),
}
}
fn calculate_statistics(&self, numbers: &[f64]) -> Value {
let count = numbers.len() as f64;
let sum: f64 = numbers.iter().sum();
let mean = sum / count;
let mut sorted = numbers.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
let median = if sorted.len() % 2 == 0 {
let mid = sorted.len() / 2;
(sorted[mid - 1] + sorted[mid]) / 2.0
} else {
sorted[sorted.len() / 2]
};
let variance: f64 = numbers.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / count;
let std_dev = variance.sqrt();
json!({
"count": count,
"sum": sum,
"mean": mean,
"median": median,
"min": sorted[0],
"max": sorted[sorted.len() - 1],
"std_dev": std_dev,
"variance": variance
})
}
fn set_value_at_path(&self, message: &mut Message, path: &str, value: Value) -> Result<()> {
let parts: Vec<&str> = path.split('.').collect();
let mut current = &mut message.context["data"];
for (i, part) in parts.iter().enumerate() {
if i == parts.len() - 1 {
if let Value::Object(map) = current {
map.insert(part.to_string(), value);
return Ok(());
}
} else {
if !current.is_object() {
*current = json!({});
}
if let Value::Object(map) = current {
current = map.entry(part.to_string()).or_insert_with(|| json!({}));
}
}
}
Err(DataflowError::Validation(format!(
"Failed to set value at path '{}'",
path
)))
}
}
pub struct AsyncDataEnrichmentFunction;
#[async_trait]
impl AsyncFunctionHandler for AsyncDataEnrichmentFunction {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
_datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
let input = match config {
FunctionConfig::Custom { input, .. } => input,
_ => {
return Err(DataflowError::Validation(
"Invalid configuration type for enrichment function".to_string(),
));
}
};
let user_id = input
.get("user_id_path")
.and_then(Value::as_str)
.unwrap_or("user_id");
let user_id_value = message.context["data"]
.get(user_id)
.and_then(Value::as_str)
.unwrap_or("unknown");
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let enriched_data = json!({
"user_profile": {
"id": user_id_value,
"name": format!("User {}", user_id_value),
"email": format!("{}@example.com", user_id_value),
"created_at": "2024-01-15T10:30:00Z",
"preferences": {
"theme": "dark",
"notifications": true
}
},
"enrichment_timestamp": chrono::Utc::now().to_rfc3339()
});
if let Value::Object(ref mut map) = message.context["data"] {
map.insert("enriched".to_string(), enriched_data.clone());
}
Ok((
200,
vec![Change {
path: Arc::from("enriched"),
old_value: Arc::new(Value::Null),
new_value: Arc::new(enriched_data),
}],
))
}
}
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
env_logger::init();
println!("🚀 Custom Function Example");
println!("==========================\n");
let workflow_json = r#"
{
"id": "statistics_workflow",
"name": "Data Processing Workflow",
"tasks": [
{
"id": "prepare_data",
"name": "Prepare Data",
"function": {
"name": "map",
"input": {
"mappings": [
{
"path": "numbers",
"logic": { "var": "payload.measurements" }
},
{
"path": "user_id",
"logic": { "var": "payload.user_id" }
}
]
}
}
},
{
"id": "calculate_stats",
"name": "Calculate Statistics",
"function": {
"name": "statistics",
"input": {
"data_path": "numbers",
"output_path": "statistics"
}
}
},
{
"id": "enrich_user_data",
"name": "Enrich User Data",
"function": {
"name": "enrich_data",
"input": {
"user_id_path": "user_id"
}
}
},
{
"id": "validate_results",
"name": "Validate Results",
"function": {
"name": "validation",
"input": {
"rules": [
{
"path": "statistics.count",
"logic": { ">": [{ "var": "statistics.count" }, 0] },
"message": "Statistics must have at least one data point"
},
{
"path": "enriched.user_profile",
"logic": { "!!": { "var": "enriched.user_profile" } },
"message": "User profile enrichment is required"
}
]
}
}
}
]
}
"#;
let workflow = Workflow::from_json(workflow_json)?;
let mut custom_functions: HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>> =
HashMap::new();
custom_functions.insert(
"statistics".to_string(),
Box::new(StatisticsFunction::new()),
);
custom_functions.insert(
"enrich_data".to_string(),
Box::new(AsyncDataEnrichmentFunction),
);
let engine = Engine::new(vec![workflow], Some(custom_functions));
let sample_data = json!({
"measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
"user_id": "user_123",
"timestamp": "2024-01-15T10:30:00Z"
});
let mut message = Message::from_value(&sample_data);
println!("Processing message with custom functions...\n");
match engine.process_message(&mut message).await {
Ok(_) => {
println!("✅ Message processed successfully!\n");
println!("📊 Final Results:");
println!(
"{}\n",
serde_json::to_string_pretty(&message.context["data"])?
);
println!("📋 Audit Trail:");
for (i, audit) in message.audit_trail.iter().enumerate() {
println!(
"{}. Task: {} (Status: {})",
i + 1,
audit.task_id,
audit.status
);
println!(" Timestamp: {}", audit.timestamp);
println!(" Changes: {} field(s) modified", audit.changes.len());
}
if message.has_errors() {
println!("\n⚠️ Errors encountered:");
for error in &message.errors {
println!(
" - {}: {}",
error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
error.message
);
}
}
}
Err(e) => {
println!("❌ Error processing message: {e:?}");
}
}
println!("\n🎉 Custom function example completed!");
Ok(())
}