1use dataflow_rs::{
2 Engine, Workflow,
3 engine::{
4 FunctionConfig, FunctionHandler,
5 error::{DataflowError, Result},
6 message::{Change, Message},
7 },
8};
9use datalogic_rs::DataLogic;
10use serde_json::{Value, json};
11use std::collections::HashMap;
12
13pub struct StatisticsFunction;
15
16impl FunctionHandler for StatisticsFunction {
17 fn execute(
18 &self,
19 message: &mut Message,
20 config: &FunctionConfig,
21 _datalogic: &DataLogic,
22 ) -> Result<(usize, Vec<Change>)> {
23 let input = match config {
25 FunctionConfig::Custom { input, .. } => input,
26 _ => {
27 return Err(DataflowError::Validation(
28 "Invalid configuration type for statistics function".to_string(),
29 ));
30 }
31 };
32
33 let data_path = input
35 .get("data_path")
36 .and_then(Value::as_str)
37 .unwrap_or("data.numbers");
38
39 let output_path = input
41 .get("output_path")
42 .and_then(Value::as_str)
43 .unwrap_or("data.statistics");
44
45 let numbers = self.extract_numbers_from_path(message, data_path)?;
47
48 if numbers.is_empty() {
49 return Err(DataflowError::Validation(
50 "No numeric data found to analyze".to_string(),
51 ));
52 }
53
54 let stats = self.calculate_statistics(&numbers);
56
57 self.set_value_at_path(message, output_path, stats.clone())?;
59
60 Ok((
62 200,
63 vec![Change {
64 path: output_path.to_string(),
65 old_value: Value::Null,
66 new_value: stats,
67 }],
68 ))
69 }
70}
71
72impl Default for StatisticsFunction {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78impl StatisticsFunction {
79 pub fn new() -> Self {
80 Self
81 }
82
83 fn extract_numbers_from_path(&self, message: &Message, path: &str) -> Result<Vec<f64>> {
84 let parts: Vec<&str> = path.split('.').collect();
85 let mut current = if parts[0] == "data" {
86 &message.data
87 } else if parts[0] == "temp_data" {
88 &message.temp_data
89 } else if parts[0] == "metadata" {
90 &message.metadata
91 } else {
92 &message.data
93 };
94
95 for part in &parts[1..] {
97 current = current.get(part).unwrap_or(&Value::Null);
98 }
99
100 match current {
102 Value::Array(arr) => {
103 let mut numbers = Vec::new();
104 for item in arr {
105 if let Some(num) = item.as_f64() {
106 numbers.push(num);
107 } else if let Some(num) = item.as_i64() {
108 numbers.push(num as f64);
109 }
110 }
111 Ok(numbers)
112 }
113 Value::Number(num) => {
114 if let Some(f) = num.as_f64() {
115 Ok(vec![f])
116 } else {
117 Ok(vec![])
118 }
119 }
120 _ => Ok(vec![]),
121 }
122 }
123
124 fn calculate_statistics(&self, numbers: &[f64]) -> Value {
125 let count = numbers.len();
126 let sum: f64 = numbers.iter().sum();
127 let mean = sum / count as f64;
128
129 let mut sorted = numbers.to_vec();
130 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
131
132 let median = if count % 2 == 0 {
133 (sorted[count / 2 - 1] + sorted[count / 2]) / 2.0
134 } else {
135 sorted[count / 2]
136 };
137
138 let variance: f64 = numbers.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / count as f64;
139 let std_dev = variance.sqrt();
140
141 json!({
142 "count": count,
143 "sum": sum,
144 "mean": mean,
145 "median": median,
146 "min": sorted[0],
147 "max": sorted[count - 1],
148 "variance": variance,
149 "std_dev": std_dev
150 })
151 }
152
153 fn set_value_at_path(&self, message: &mut Message, path: &str, value: Value) -> Result<()> {
154 let parts: Vec<&str> = path.split('.').collect();
155 let target = if parts[0] == "data" {
156 &mut message.data
157 } else if parts[0] == "temp_data" {
158 &mut message.temp_data
159 } else if parts[0] == "metadata" {
160 &mut message.metadata
161 } else {
162 &mut message.data
163 };
164
165 let mut current = target;
167 for (i, part) in parts[1..].iter().enumerate() {
168 if i == parts.len() - 2 {
169 if current.is_null() {
171 *current = json!({});
172 }
173 if let Value::Object(map) = current {
174 map.insert(part.to_string(), value.clone());
175 }
176 break;
177 } else {
178 if current.is_null() {
180 *current = json!({});
181 }
182 if let Value::Object(map) = current {
183 if !map.contains_key(*part) {
184 map.insert(part.to_string(), json!({}));
185 }
186 current = map.get_mut(*part).unwrap();
187 }
188 }
189 }
190
191 Ok(())
192 }
193}
194
195pub struct DataEnrichmentFunction {
197 enrichment_data: HashMap<String, Value>,
198}
199
200impl FunctionHandler for DataEnrichmentFunction {
201 fn execute(
202 &self,
203 message: &mut Message,
204 config: &FunctionConfig,
205 _datalogic: &DataLogic,
206 ) -> Result<(usize, Vec<Change>)> {
207 let input = match config {
209 FunctionConfig::Custom { input, .. } => input,
210 _ => {
211 return Err(DataflowError::Validation(
212 "Invalid configuration type for enrichment function".to_string(),
213 ));
214 }
215 };
216
217 let lookup_field = input
219 .get("lookup_field")
220 .and_then(Value::as_str)
221 .ok_or_else(|| DataflowError::Validation("Missing lookup_field".to_string()))?;
222
223 let lookup_value = input
224 .get("lookup_value")
225 .and_then(Value::as_str)
226 .ok_or_else(|| DataflowError::Validation("Missing lookup_value".to_string()))?;
227
228 let output_path = input
229 .get("output_path")
230 .and_then(Value::as_str)
231 .unwrap_or("data.enrichment");
232
233 std::thread::sleep(std::time::Duration::from_millis(10));
235
236 let enrichment = if let Some(data) = self.enrichment_data.get(lookup_value) {
238 data.clone()
239 } else {
240 json!({
241 "status": "not_found",
242 "message": format!("No enrichment data found for {}: {}", lookup_field, lookup_value)
243 })
244 };
245
246 self.set_value_at_path(message, output_path, enrichment.clone())?;
248
249 Ok((
250 200,
251 vec![Change {
252 path: output_path.to_string(),
253 old_value: Value::Null,
254 new_value: enrichment,
255 }],
256 ))
257 }
258}
259
260impl Default for DataEnrichmentFunction {
261 fn default() -> Self {
262 Self::new()
263 }
264}
265
266impl DataEnrichmentFunction {
267 pub fn new() -> Self {
268 let mut enrichment_data = HashMap::new();
269
270 enrichment_data.insert(
272 "user_123".to_string(),
273 json!({
274 "department": "Engineering",
275 "location": "San Francisco",
276 "manager": "Alice Johnson",
277 "start_date": "2022-01-15",
278 "security_clearance": "Level 2"
279 }),
280 );
281
282 enrichment_data.insert(
283 "user_456".to_string(),
284 json!({
285 "department": "Marketing",
286 "location": "New York",
287 "manager": "Bob Smith",
288 "start_date": "2021-06-01",
289 "security_clearance": "Level 1"
290 }),
291 );
292
293 Self { enrichment_data }
294 }
295
296 fn set_value_at_path(&self, message: &mut Message, path: &str, value: Value) -> Result<()> {
297 let parts: Vec<&str> = path.split('.').collect();
298 let target = if parts[0] == "data" {
299 &mut message.data
300 } else if parts[0] == "temp_data" {
301 &mut message.temp_data
302 } else if parts[0] == "metadata" {
303 &mut message.metadata
304 } else {
305 &mut message.data
306 };
307
308 let mut current = target;
309 for (i, part) in parts[1..].iter().enumerate() {
310 if i == parts.len() - 2 {
311 if current.is_null() {
312 *current = json!({});
313 }
314 if let Value::Object(map) = current {
315 map.insert(part.to_string(), value.clone());
316 }
317 break;
318 } else {
319 if current.is_null() {
320 *current = json!({});
321 }
322 if let Value::Object(map) = current {
323 if !map.contains_key(*part) {
324 map.insert(part.to_string(), json!({}));
325 }
326 current = map.get_mut(*part).unwrap();
327 }
328 }
329 }
330 Ok(())
331 }
332}
333
334fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
335 println!("=== Custom Function Example ===\n");
336
337 let workflow_json = r#"
339 {
340 "id": "custom_function_demo",
341 "name": "Custom Function Demo",
342 "description": "Demonstrates custom functions in workflow",
343 "tasks": [
344 {
345 "id": "prepare_data",
346 "name": "Prepare Data",
347 "description": "Extract and prepare data for analysis",
348 "function": {
349 "name": "map",
350 "input": {
351 "mappings": [
352 {
353 "path": "data.numbers",
354 "logic": { "var": "temp_data.measurements" }
355 },
356 {
357 "path": "data.user_id",
358 "logic": { "var": "temp_data.user_id" }
359 }
360 ]
361 }
362 }
363 },
364 {
365 "id": "calculate_stats",
366 "name": "Calculate Statistics",
367 "description": "Calculate statistical measures from numeric data",
368 "function": {
369 "name": "statistics",
370 "input": {
371 "data_path": "data.numbers",
372 "output_path": "data.stats"
373 }
374 }
375 },
376 {
377 "id": "enrich_user_data",
378 "name": "Enrich User Data",
379 "description": "Add additional user information",
380 "function": {
381 "name": "enrich_data",
382 "input": {
383 "lookup_field": "user_id",
384 "lookup_value": "user_123",
385 "output_path": "data.user_info"
386 }
387 }
388 }
389 ]
390 }
391 "#;
392
393 let workflow = Workflow::from_json(workflow_json)?;
395
396 let separator = "=".repeat(50);
398 println!("\n{separator}");
399 println!("=== Second Example with Different User ===\n");
400
401 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
402 message2.temp_data = json!({
403 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
404 "user_id": "user_456",
405 "timestamp": "2024-01-15T11:00:00Z"
406 });
407 message2.data = json!({});
408
409 let workflow2_json = r#"
411 {
412 "id": "custom_function_demo_2",
413 "name": "Custom Function Demo 2",
414 "description": "Second demo with different user",
415 "tasks": [
416 {
417 "id": "prepare_data",
418 "name": "Prepare Data",
419 "function": {
420 "name": "map",
421 "input": {
422 "mappings": [
423 {
424 "path": "data.numbers",
425 "logic": { "var": "temp_data.measurements" }
426 },
427 {
428 "path": "data.user_id",
429 "logic": { "var": "temp_data.user_id" }
430 }
431 ]
432 }
433 }
434 },
435 {
436 "id": "calculate_stats",
437 "name": "Calculate Statistics",
438 "function": {
439 "name": "statistics",
440 "input": {
441 "data_path": "data.numbers",
442 "output_path": "data.analysis"
443 }
444 }
445 },
446 {
447 "id": "enrich_user_data",
448 "name": "Enrich User Data",
449 "function": {
450 "name": "enrich_data",
451 "input": {
452 "lookup_field": "user_id",
453 "lookup_value": "user_456",
454 "output_path": "data.employee_details"
455 }
456 }
457 }
458 ]
459 }
460 "#;
461
462 let workflow2 = Workflow::from_json(workflow2_json)?;
463
464 let mut custom_functions = HashMap::new();
466 custom_functions.insert(
467 "statistics".to_string(),
468 Box::new(StatisticsFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
469 );
470 custom_functions.insert(
471 "enrich_data".to_string(),
472 Box::new(DataEnrichmentFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
473 );
474 let engine = Engine::new(
478 vec![workflow, workflow2],
479 Some(custom_functions),
480 None, );
482
483 let sample_data = json!({
485 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
486 "user_id": "user_123",
487 "timestamp": "2024-01-15T10:30:00Z"
488 });
489
490 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
492 message.temp_data = sample_data;
493 message.data = json!({});
494
495 println!("Processing message with custom functions...\n");
496
497 match engine.process_message(&mut message) {
499 Ok(_) => {
500 println!("ā
Message processed successfully!\n");
501
502 println!("š Final Results:");
503 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
504
505 println!("š Audit Trail:");
506 for (i, audit) in message.audit_trail.iter().enumerate() {
507 println!(
508 "{}. Task: {} (Status: {})",
509 i + 1,
510 audit.task_id,
511 audit.status_code
512 );
513 println!(" Timestamp: {}", audit.timestamp);
514 println!(" Changes: {} field(s) modified", audit.changes.len());
515 }
516
517 if message.has_errors() {
518 println!("\nā ļø Errors encountered:");
519 for error in &message.errors {
520 println!(
521 " - {}: {:?}",
522 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
523 error.error_message
524 );
525 }
526 }
527 }
528 Err(e) => {
529 println!("ā Error processing message: {e:?}");
530 }
531 }
532
533 match engine.process_message(&mut message2) {
535 Ok(_) => {
536 println!("ā
Second message processed successfully!\n");
537 println!("š Results for user_456:");
538 println!("{}", serde_json::to_string_pretty(&message2.data)?);
539 }
540 Err(e) => {
541 println!("ā Error processing second message: {e:?}");
542 }
543 }
544
545 println!("\nš Custom function examples completed!");
546
547 Ok(())
548}