mofa_foundation/workflow/
reducers.rs1use async_trait::async_trait;
7use mofa_kernel::workflow::{Reducer, ReducerType, StateUpdate};
8use serde_json::Value;
9
10use mofa_kernel::agent::error::{AgentError, AgentResult};
11
12#[derive(Debug, Clone, Default)]
25pub struct OverwriteReducer;
26
27#[async_trait]
28impl Reducer for OverwriteReducer {
29 async fn reduce(&self, _current: Option<&Value>, update: &Value) -> AgentResult<Value> {
30 Ok(update.clone())
31 }
32
33 fn name(&self) -> &str {
34 "overwrite"
35 }
36
37 fn reducer_type(&self) -> ReducerType {
38 ReducerType::Overwrite
39 }
40}
41
42#[derive(Debug, Clone, Default)]
55pub struct AppendReducer;
56
57#[async_trait]
58impl Reducer for AppendReducer {
59 async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value> {
60 let mut arr = match current {
61 Some(Value::Array(a)) => a.clone(),
62 _ => Vec::new(),
63 };
64 arr.push(update.clone());
65 Ok(Value::Array(arr))
66 }
67
68 fn name(&self) -> &str {
69 "append"
70 }
71
72 fn reducer_type(&self) -> ReducerType {
73 ReducerType::Append
74 }
75}
76
77#[derive(Debug, Clone, Default)]
90pub struct ExtendReducer;
91
92#[async_trait]
93impl Reducer for ExtendReducer {
94 async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value> {
95 let mut arr = match current {
96 Some(Value::Array(a)) => a.clone(),
97 _ => Vec::new(),
98 };
99
100 match update {
101 Value::Array(items) => {
102 arr.extend(items.iter().cloned());
103 }
104 other => {
105 arr.push(other.clone());
106 }
107 }
108
109 Ok(Value::Array(arr))
110 }
111
112 fn name(&self) -> &str {
113 "extend"
114 }
115
116 fn reducer_type(&self) -> ReducerType {
117 ReducerType::Extend
118 }
119}
120
121#[derive(Debug, Clone)]
133pub struct MergeReducer {
134 pub deep: bool,
136}
137
138impl Default for MergeReducer {
139 fn default() -> Self {
140 Self { deep: false }
141 }
142}
143
144impl MergeReducer {
145 pub fn shallow() -> Self {
147 Self { deep: false }
148 }
149
150 pub fn deep() -> Self {
152 Self { deep: true }
153 }
154}
155
156#[async_trait]
157impl Reducer for MergeReducer {
158 async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value> {
159 match (current, update) {
160 (Some(Value::Object(current_map)), Value::Object(update_map)) => {
161 let mut result = current_map.clone();
162
163 for (key, value) in update_map {
164 if self.deep {
166 if let (Some(Value::Object(existing)), Value::Object(new_obj)) =
167 (result.get(key), value)
168 {
169 let merged = merge_objects_deep(existing.clone(), new_obj.clone());
170 result.insert(key.clone(), Value::Object(merged));
171 continue;
172 }
173 }
174 result.insert(key.clone(), value.clone());
175 }
176
177 Ok(Value::Object(result))
178 }
179 (None, Value::Object(update_map)) => Ok(Value::Object(update_map.clone())),
180 (Some(current), _) => Ok(current.clone()),
181 (None, update) => Ok(update.clone()),
182 }
183 }
184
185 fn name(&self) -> &str {
186 if self.deep {
187 "merge_deep"
188 } else {
189 "merge"
190 }
191 }
192
193 fn reducer_type(&self) -> ReducerType {
194 ReducerType::Merge { deep: self.deep }
195 }
196}
197
198fn merge_objects_deep(
200 mut base: serde_json::Map<String, Value>,
201 update: serde_json::Map<String, Value>,
202) -> serde_json::Map<String, Value> {
203 for (key, value) in update {
204 match (base.get(&key), value) {
205 (Some(Value::Object(base_obj)), Value::Object(update_obj)) => {
206 let merged = merge_objects_deep(base_obj.clone(), update_obj);
207 base.insert(key, Value::Object(merged));
208 }
209 (_, value) => {
210 base.insert(key, value);
211 }
212 }
213 }
214 base
215}
216
217#[derive(Debug, Clone)]
230pub struct LastNReducer {
231 pub n: usize,
233}
234
235impl LastNReducer {
236 pub fn new(n: usize) -> Self {
238 Self { n }
239 }
240}
241
242#[async_trait]
243impl Reducer for LastNReducer {
244 async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value> {
245 let mut arr = match current {
246 Some(Value::Array(a)) => a.clone(),
247 _ => Vec::new(),
248 };
249
250 match update {
252 Value::Array(items) => {
253 arr.extend(items.iter().cloned());
254 }
255 other => {
256 arr.push(other.clone());
257 }
258 }
259
260 if arr.len() > self.n {
262 let start = arr.len() - self.n;
263 arr = arr.split_off(start);
264 }
265
266 Ok(Value::Array(arr))
267 }
268
269 fn name(&self) -> &str {
270 "last_n"
271 }
272
273 fn reducer_type(&self) -> ReducerType {
274 ReducerType::LastN { n: self.n }
275 }
276}
277
278#[derive(Debug, Clone, Default)]
292pub struct FirstReducer;
293
294#[async_trait]
295impl Reducer for FirstReducer {
296 async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value> {
297 match current {
298 Some(value) if !value.is_null() => Ok(value.clone()),
299 _ => Ok(update.clone()),
300 }
301 }
302
303 fn name(&self) -> &str {
304 "first"
305 }
306
307 fn reducer_type(&self) -> ReducerType {
308 ReducerType::First
309 }
310}
311
312#[derive(Debug, Clone, Default)]
326pub struct LastReducer;
327
328#[async_trait]
329impl Reducer for LastReducer {
330 async fn reduce(&self, _current: Option<&Value>, update: &Value) -> AgentResult<Value> {
331 if update.is_null() {
333 Ok(update.clone())
337 } else {
338 Ok(update.clone())
339 }
340 }
341
342 fn name(&self) -> &str {
343 "last"
344 }
345
346 fn reducer_type(&self) -> ReducerType {
347 ReducerType::Last
348 }
349}
350
351pub struct CustomReducer<F>
355where
356 F: Fn(Option<&Value>, &Value) -> AgentResult<Value> + Send + Sync,
357{
358 name: String,
359 func: F,
360}
361
362impl<F> CustomReducer<F>
363where
364 F: Fn(Option<&Value>, &Value) -> AgentResult<Value> + Send + Sync,
365{
366 pub fn new(name: impl Into<String>, func: F) -> Self {
368 Self {
369 name: name.into(),
370 func,
371 }
372 }
373}
374
375#[async_trait]
376impl<F> Reducer for CustomReducer<F>
377where
378 F: Fn(Option<&Value>, &Value) -> AgentResult<Value> + Send + Sync,
379{
380 async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value> {
381 (self.func)(current, update)
382 }
383
384 fn name(&self) -> &str {
385 &self.name
386 }
387
388 fn reducer_type(&self) -> ReducerType {
389 ReducerType::Custom(self.name.clone())
390 }
391}
392
393pub fn create_reducer(reducer_type: &ReducerType) -> AgentResult<Box<dyn Reducer>> {
395 match reducer_type {
396 ReducerType::Overwrite => Ok(Box::new(OverwriteReducer)),
397 ReducerType::Append => Ok(Box::new(AppendReducer)),
398 ReducerType::Extend => Ok(Box::new(ExtendReducer)),
399 ReducerType::Merge { deep } => Ok(Box::new(MergeReducer { deep: *deep })),
400 ReducerType::LastN { n } => Ok(Box::new(LastNReducer::new(*n))),
401 ReducerType::First => Ok(Box::new(FirstReducer)),
402 ReducerType::Last => Ok(Box::new(LastReducer)),
403 ReducerType::Custom(name) => Err(AgentError::Internal(format!(
404 "Cannot create reducer for unknown custom type: {}",
405 name
406 ))),
407 }
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413 use serde_json::json;
414
415 #[tokio::test]
416 async fn test_overwrite_reducer() {
417 let reducer = OverwriteReducer;
418
419 let result = reducer
420 .reduce(Some(&json!("old")), &json!("new"))
421 .await
422 .unwrap();
423 assert_eq!(result, json!("new"));
424
425 let result = reducer.reduce(None, &json!("value")).await.unwrap();
426 assert_eq!(result, json!("value"));
427 }
428
429 #[tokio::test]
430 async fn test_append_reducer() {
431 let reducer = AppendReducer;
432
433 let result = reducer
434 .reduce(Some(&json!(["a", "b"])), &json!("c"))
435 .await
436 .unwrap();
437 assert_eq!(result, json!(["a", "b", "c"]));
438
439 let result = reducer.reduce(None, &json!("first")).await.unwrap();
440 assert_eq!(result, json!(["first"]));
441 }
442
443 #[tokio::test]
444 async fn test_extend_reducer() {
445 let reducer = ExtendReducer;
446
447 let result = reducer
448 .reduce(Some(&json!([1, 2])), &json!([3, 4]))
449 .await
450 .unwrap();
451 assert_eq!(result, json!([1, 2, 3, 4]));
452
453 let result = reducer
455 .reduce(Some(&json!([1])), &json!(2))
456 .await
457 .unwrap();
458 assert_eq!(result, json!([1, 2]));
459 }
460
461 #[tokio::test]
462 async fn test_merge_reducer_shallow() {
463 let reducer = MergeReducer::shallow();
464
465 let current = json!({"a": 1, "b": 2});
466 let update = json!({"b": 3, "c": 4});
467 let result = reducer.reduce(Some(¤t), &update).await.unwrap();
468
469 assert_eq!(result["a"], 1);
470 assert_eq!(result["b"], 3);
471 assert_eq!(result["c"], 4);
472 }
473
474 #[tokio::test]
475 async fn test_merge_reducer_deep() {
476 let reducer = MergeReducer::deep();
477
478 let current = json!({
479 "config": {
480 "a": 1,
481 "b": { "x": 1, "y": 2 }
482 }
483 });
484 let update = json!({
485 "config": {
486 "b": { "y": 3, "z": 4 },
487 "c": 5
488 }
489 });
490 let result = reducer.reduce(Some(¤t), &update).await.unwrap();
491
492 assert_eq!(result["config"]["a"], 1);
494 assert_eq!(result["config"]["b"]["x"], 1);
495 assert_eq!(result["config"]["b"]["y"], 3);
496 assert_eq!(result["config"]["b"]["z"], 4);
497 assert_eq!(result["config"]["c"], 5);
498 }
499
500 #[tokio::test]
501 async fn test_last_n_reducer() {
502 let reducer = LastNReducer::new(3);
503
504 let result = reducer
505 .reduce(Some(&json!([1, 2, 3, 4])), &json!(5))
506 .await
507 .unwrap();
508 assert_eq!(result, json!([3, 4, 5]));
509
510 let result = reducer
512 .reduce(Some(&json!([1, 2])), &json!([3, 4, 5, 6]))
513 .await
514 .unwrap();
515 assert_eq!(result, json!([4, 5, 6]));
516 }
517
518 #[tokio::test]
519 async fn test_first_reducer() {
520 let reducer = FirstReducer;
521
522 let result = reducer.reduce(None, &json!("first")).await.unwrap();
524 assert_eq!(result, json!("first"));
525
526 let result = reducer
528 .reduce(Some(&json!("first")), &json!("second"))
529 .await
530 .unwrap();
531 assert_eq!(result, json!("first"));
532
533 let result = reducer
535 .reduce(Some(&json!(null)), &json!("value"))
536 .await
537 .unwrap();
538 assert_eq!(result, json!("value"));
539 }
540
541 #[tokio::test]
542 async fn test_last_reducer() {
543 let reducer = LastReducer;
544
545 let result = reducer
547 .reduce(Some(&json!("first")), &json!("second"))
548 .await
549 .unwrap();
550 assert_eq!(result, json!("second"));
551
552 let result = reducer
554 .reduce(Some(&json!("old")), &json!("new"))
555 .await
556 .unwrap();
557 assert_eq!(result, json!("new"));
558 }
559
560 #[tokio::test]
561 async fn test_custom_reducer() {
562 let reducer = CustomReducer::new("sum", |current, update| {
563 let curr = current
564 .and_then(|v| v.as_i64())
565 .unwrap_or(0);
566 let upd = update.as_i64().unwrap_or(0);
567 Ok(json!(curr + upd))
568 });
569
570 let result = reducer
571 .reduce(Some(&json!(10)), &json!(5))
572 .await
573 .unwrap();
574 assert_eq!(result, json!(15));
575
576 assert_eq!(reducer.name(), "sum");
577 }
578
579 #[test]
580 fn test_create_reducer() {
581 let r = create_reducer(&ReducerType::Overwrite).unwrap();
582 assert_eq!(r.name(), "overwrite");
583
584 let r = create_reducer(&ReducerType::Append).unwrap();
585 assert_eq!(r.name(), "append");
586
587 let r = create_reducer(&ReducerType::LastN { n: 5 }).unwrap();
588 assert_eq!(r.name(), "last_n");
589 }
590}