custom_function/
custom_function.rs

1use dataflow_rs::{
2    Engine, Workflow,
3    engine::{
4        FunctionConfig, FunctionHandler,
5        error::{DataflowError, Result},
6        message::{Change, Message},
7    },
8};
9use datalogic_rs::DataLogic;
10use serde_json::{Value, json};
11use std::collections::HashMap;
12
13/// Custom function that calculates statistics from numeric data
14pub struct StatisticsFunction;
15
16impl FunctionHandler for StatisticsFunction {
17    fn execute(
18        &self,
19        message: &mut Message,
20        config: &FunctionConfig,
21        _datalogic: &DataLogic,
22    ) -> Result<(usize, Vec<Change>)> {
23        // Extract the raw input from config
24        let input = match config {
25            FunctionConfig::Custom { input, .. } => input,
26            _ => {
27                return Err(DataflowError::Validation(
28                    "Invalid configuration type for statistics function".to_string(),
29                ));
30            }
31        };
32
33        // Extract the data path to analyze
34        let data_path = input
35            .get("data_path")
36            .and_then(Value::as_str)
37            .unwrap_or("data.numbers");
38
39        // Extract the output path where to store results
40        let output_path = input
41            .get("output_path")
42            .and_then(Value::as_str)
43            .unwrap_or("data.statistics");
44
45        // Get the numbers from the specified path
46        let numbers = self.extract_numbers_from_path(message, data_path)?;
47
48        if numbers.is_empty() {
49            return Err(DataflowError::Validation(
50                "No numeric data found to analyze".to_string(),
51            ));
52        }
53
54        // Calculate statistics
55        let stats = self.calculate_statistics(&numbers);
56
57        // Store the results in the message
58        self.set_value_at_path(message, output_path, stats.clone())?;
59
60        // Return success with changes
61        Ok((
62            200,
63            vec![Change {
64                path: output_path.to_string(),
65                old_value: Value::Null,
66                new_value: stats,
67            }],
68        ))
69    }
70}
71
72impl Default for StatisticsFunction {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78impl StatisticsFunction {
79    pub fn new() -> Self {
80        Self
81    }
82
83    fn extract_numbers_from_path(&self, message: &Message, path: &str) -> Result<Vec<f64>> {
84        let parts: Vec<&str> = path.split('.').collect();
85        let mut current = if parts[0] == "data" {
86            &message.data
87        } else if parts[0] == "temp_data" {
88            &message.temp_data
89        } else if parts[0] == "metadata" {
90            &message.metadata
91        } else {
92            &message.data
93        };
94
95        // Navigate to the target location
96        for part in &parts[1..] {
97            current = current.get(part).unwrap_or(&Value::Null);
98        }
99
100        // Extract numbers from the value
101        match current {
102            Value::Array(arr) => {
103                let mut numbers = Vec::new();
104                for item in arr {
105                    if let Some(num) = item.as_f64() {
106                        numbers.push(num);
107                    } else if let Some(num) = item.as_i64() {
108                        numbers.push(num as f64);
109                    }
110                }
111                Ok(numbers)
112            }
113            Value::Number(num) => {
114                if let Some(f) = num.as_f64() {
115                    Ok(vec![f])
116                } else {
117                    Ok(vec![])
118                }
119            }
120            _ => Ok(vec![]),
121        }
122    }
123
124    fn calculate_statistics(&self, numbers: &[f64]) -> Value {
125        let count = numbers.len();
126        let sum: f64 = numbers.iter().sum();
127        let mean = sum / count as f64;
128
129        let mut sorted = numbers.to_vec();
130        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
131
132        let median = if count % 2 == 0 {
133            (sorted[count / 2 - 1] + sorted[count / 2]) / 2.0
134        } else {
135            sorted[count / 2]
136        };
137
138        let variance: f64 = numbers.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / count as f64;
139        let std_dev = variance.sqrt();
140
141        json!({
142            "count": count,
143            "sum": sum,
144            "mean": mean,
145            "median": median,
146            "min": sorted[0],
147            "max": sorted[count - 1],
148            "variance": variance,
149            "std_dev": std_dev
150        })
151    }
152
153    fn set_value_at_path(&self, message: &mut Message, path: &str, value: Value) -> Result<()> {
154        let parts: Vec<&str> = path.split('.').collect();
155        let target = if parts[0] == "data" {
156            &mut message.data
157        } else if parts[0] == "temp_data" {
158            &mut message.temp_data
159        } else if parts[0] == "metadata" {
160            &mut message.metadata
161        } else {
162            &mut message.data
163        };
164
165        // Navigate and set the value
166        let mut current = target;
167        for (i, part) in parts[1..].iter().enumerate() {
168            if i == parts.len() - 2 {
169                // Last part, set the value
170                if current.is_null() {
171                    *current = json!({});
172                }
173                if let Value::Object(map) = current {
174                    map.insert(part.to_string(), value.clone());
175                }
176                break;
177            } else {
178                // Navigate deeper
179                if current.is_null() {
180                    *current = json!({});
181                }
182                if let Value::Object(map) = current {
183                    if !map.contains_key(*part) {
184                        map.insert(part.to_string(), json!({}));
185                    }
186                    current = map.get_mut(*part).unwrap();
187                }
188            }
189        }
190
191        Ok(())
192    }
193}
194
195/// Custom function that enriches data with external information
196pub struct DataEnrichmentFunction {
197    enrichment_data: HashMap<String, Value>,
198}
199
200impl FunctionHandler for DataEnrichmentFunction {
201    fn execute(
202        &self,
203        message: &mut Message,
204        config: &FunctionConfig,
205        _datalogic: &DataLogic,
206    ) -> Result<(usize, Vec<Change>)> {
207        // Extract the raw input from config
208        let input = match config {
209            FunctionConfig::Custom { input, .. } => input,
210            _ => {
211                return Err(DataflowError::Validation(
212                    "Invalid configuration type for enrichment function".to_string(),
213                ));
214            }
215        };
216
217        // Extract lookup key and field
218        let lookup_field = input
219            .get("lookup_field")
220            .and_then(Value::as_str)
221            .ok_or_else(|| DataflowError::Validation("Missing lookup_field".to_string()))?;
222
223        let lookup_value = input
224            .get("lookup_value")
225            .and_then(Value::as_str)
226            .ok_or_else(|| DataflowError::Validation("Missing lookup_value".to_string()))?;
227
228        let output_path = input
229            .get("output_path")
230            .and_then(Value::as_str)
231            .unwrap_or("data.enrichment");
232
233        // Simulate operation (e.g., database lookup, API call)
234        std::thread::sleep(std::time::Duration::from_millis(10));
235
236        // Look up enrichment data
237        let enrichment = if let Some(data) = self.enrichment_data.get(lookup_value) {
238            data.clone()
239        } else {
240            json!({
241                "status": "not_found",
242                "message": format!("No enrichment data found for {}: {}", lookup_field, lookup_value)
243            })
244        };
245
246        // Store enrichment data
247        self.set_value_at_path(message, output_path, enrichment.clone())?;
248
249        Ok((
250            200,
251            vec![Change {
252                path: output_path.to_string(),
253                old_value: Value::Null,
254                new_value: enrichment,
255            }],
256        ))
257    }
258}
259
260impl Default for DataEnrichmentFunction {
261    fn default() -> Self {
262        Self::new()
263    }
264}
265
266impl DataEnrichmentFunction {
267    pub fn new() -> Self {
268        let mut enrichment_data = HashMap::new();
269
270        // Sample enrichment data
271        enrichment_data.insert(
272            "user_123".to_string(),
273            json!({
274                "department": "Engineering",
275                "location": "San Francisco",
276                "manager": "Alice Johnson",
277                "start_date": "2022-01-15",
278                "security_clearance": "Level 2"
279            }),
280        );
281
282        enrichment_data.insert(
283            "user_456".to_string(),
284            json!({
285                "department": "Marketing",
286                "location": "New York",
287                "manager": "Bob Smith",
288                "start_date": "2021-06-01",
289                "security_clearance": "Level 1"
290            }),
291        );
292
293        Self { enrichment_data }
294    }
295
296    fn set_value_at_path(&self, message: &mut Message, path: &str, value: Value) -> Result<()> {
297        let parts: Vec<&str> = path.split('.').collect();
298        let target = if parts[0] == "data" {
299            &mut message.data
300        } else if parts[0] == "temp_data" {
301            &mut message.temp_data
302        } else if parts[0] == "metadata" {
303            &mut message.metadata
304        } else {
305            &mut message.data
306        };
307
308        let mut current = target;
309        for (i, part) in parts[1..].iter().enumerate() {
310            if i == parts.len() - 2 {
311                if current.is_null() {
312                    *current = json!({});
313                }
314                if let Value::Object(map) = current {
315                    map.insert(part.to_string(), value.clone());
316                }
317                break;
318            } else {
319                if current.is_null() {
320                    *current = json!({});
321                }
322                if let Value::Object(map) = current {
323                    if !map.contains_key(*part) {
324                        map.insert(part.to_string(), json!({}));
325                    }
326                    current = map.get_mut(*part).unwrap();
327                }
328            }
329        }
330        Ok(())
331    }
332}
333
334fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
335    println!("=== Custom Function Example ===\n");
336
337    // Define a workflow that uses our custom functions
338    let workflow_json = r#"
339    {
340        "id": "custom_function_demo",
341        "name": "Custom Function Demo",
342        "description": "Demonstrates custom functions in workflow",
343        "tasks": [
344            {
345                "id": "prepare_data",
346                "name": "Prepare Data",
347                "description": "Extract and prepare data for analysis",
348                "function": {
349                    "name": "map",
350                    "input": {
351                        "mappings": [
352                            {
353                                "path": "data.numbers",
354                                "logic": { "var": "temp_data.measurements" }
355                            },
356                            {
357                                "path": "data.user_id",
358                                "logic": { "var": "temp_data.user_id" }
359                            }
360                        ]
361                    }
362                }
363            },
364            {
365                "id": "calculate_stats",
366                "name": "Calculate Statistics",
367                "description": "Calculate statistical measures from numeric data",
368                "function": {
369                    "name": "statistics",
370                    "input": {
371                        "data_path": "data.numbers",
372                        "output_path": "data.stats"
373                    }
374                }
375            },
376            {
377                "id": "enrich_user_data",
378                "name": "Enrich User Data",
379                "description": "Add additional user information",
380                "function": {
381                    "name": "enrich_data",
382                    "input": {
383                        "lookup_field": "user_id",
384                        "lookup_value": "user_123",
385                        "output_path": "data.user_info"
386                    }
387                }
388            }
389        ]
390    }
391    "#;
392
393    // Parse the first workflow
394    let workflow = Workflow::from_json(workflow_json)?;
395
396    // Demonstrate another example with different data
397    let separator = "=".repeat(50);
398    println!("\n{separator}");
399    println!("=== Second Example with Different User ===\n");
400
401    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
402    message2.temp_data = json!({
403        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
404        "user_id": "user_456",
405        "timestamp": "2024-01-15T11:00:00Z"
406    });
407    message2.data = json!({});
408
409    // Create a workflow for the second user
410    let workflow2_json = r#"
411    {
412        "id": "custom_function_demo_2",
413        "name": "Custom Function Demo 2",
414        "description": "Second demo with different user",
415        "tasks": [
416            {
417                "id": "prepare_data",
418                "name": "Prepare Data",
419                "function": {
420                    "name": "map",
421                    "input": {
422                        "mappings": [
423                            {
424                                "path": "data.numbers",
425                                "logic": { "var": "temp_data.measurements" }
426                            },
427                            {
428                                "path": "data.user_id",
429                                "logic": { "var": "temp_data.user_id" }
430                            }
431                        ]
432                    }
433                }
434            },
435            {
436                "id": "calculate_stats",
437                "name": "Calculate Statistics",
438                "function": {
439                    "name": "statistics",
440                    "input": {
441                        "data_path": "data.numbers",
442                        "output_path": "data.analysis"
443                    }
444                }
445            },
446            {
447                "id": "enrich_user_data",
448                "name": "Enrich User Data",
449                "function": {
450                    "name": "enrich_data",
451                    "input": {
452                        "lookup_field": "user_id",
453                        "lookup_value": "user_456",
454                        "output_path": "data.employee_details"
455                    }
456                }
457            }
458        ]
459    }
460    "#;
461
462    let workflow2 = Workflow::from_json(workflow2_json)?;
463
464    // Prepare custom functions
465    let mut custom_functions = HashMap::new();
466    custom_functions.insert(
467        "statistics".to_string(),
468        Box::new(StatisticsFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
469    );
470    custom_functions.insert(
471        "enrich_data".to_string(),
472        Box::new(DataEnrichmentFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
473    );
474    // Note: map and validate are now built-in to the Engine and will be used automatically
475
476    // Create engine with custom functions and built-ins (map/validate are always included internally)
477    let engine = Engine::new(
478        vec![workflow, workflow2],
479        Some(custom_functions),
480        None, // Use default (includes built-ins)
481    );
482
483    // Create sample data for first message
484    let sample_data = json!({
485        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
486        "user_id": "user_123",
487        "timestamp": "2024-01-15T10:30:00Z"
488    });
489
490    // Create and process first message
491    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
492    message.temp_data = sample_data;
493    message.data = json!({});
494
495    println!("Processing message with custom functions...\n");
496
497    // Process the message through our custom workflow
498    match engine.process_message(&mut message) {
499        Ok(_) => {
500            println!("āœ… Message processed successfully!\n");
501
502            println!("šŸ“Š Final Results:");
503            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
504
505            println!("šŸ“‹ Audit Trail:");
506            for (i, audit) in message.audit_trail.iter().enumerate() {
507                println!(
508                    "{}. Task: {} (Status: {})",
509                    i + 1,
510                    audit.task_id,
511                    audit.status_code
512                );
513                println!("   Timestamp: {}", audit.timestamp);
514                println!("   Changes: {} field(s) modified", audit.changes.len());
515            }
516
517            if message.has_errors() {
518                println!("\nāš ļø  Errors encountered:");
519                for error in &message.errors {
520                    println!(
521                        "   - {}: {:?}",
522                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
523                        error.error_message
524                    );
525                }
526            }
527        }
528        Err(e) => {
529            println!("āŒ Error processing message: {e:?}");
530        }
531    }
532
533    // Process second message
534    match engine.process_message(&mut message2) {
535        Ok(_) => {
536            println!("āœ… Second message processed successfully!\n");
537            println!("šŸ“Š Results for user_456:");
538            println!("{}", serde_json::to_string_pretty(&message2.data)?);
539        }
540        Err(e) => {
541            println!("āŒ Error processing second message: {e:?}");
542        }
543    }
544
545    println!("\nšŸŽ‰ Custom function examples completed!");
546
547    Ok(())
548}