Skip to main content

mofa_foundation/workflow/
reducers.rs

1//! Concrete Reducer Implementations
2//!
3//! This module provides built-in reducers for state management in workflows.
4//! Reducers determine how state updates are merged with existing values.
5
6use async_trait::async_trait;
7use mofa_kernel::workflow::{Reducer, ReducerType, StateUpdate};
8use serde_json::Value;
9
10use mofa_kernel::agent::error::{AgentError, AgentResult};
11
12/// Overwrite reducer - replaces the current value with the update
13///
14/// This is the default reducer behavior. The new value completely
15/// replaces any existing value.
16///
17/// # Example
18///
19/// ```rust,ignore
20/// // Before: { "result": "old" }
21/// // Update: { "result": "new" }
22/// // After:  { "result": "new" }
23/// ```
24#[derive(Debug, Clone, Default)]
25pub struct OverwriteReducer;
26
27#[async_trait]
28impl Reducer for OverwriteReducer {
29    async fn reduce(&self, _current: Option<&Value>, update: &Value) -> AgentResult<Value> {
30        Ok(update.clone())
31    }
32
33    fn name(&self) -> &str {
34        "overwrite"
35    }
36
37    fn reducer_type(&self) -> ReducerType {
38        ReducerType::Overwrite
39    }
40}
41
42/// Append reducer - appends the update to a list
43///
44/// If the current value doesn't exist or isn't an array, creates a new array.
45/// The update value is appended to the array.
46///
47/// # Example
48///
49/// ```rust,ignore
50/// // Before: { "messages": ["hello"] }
51/// // Update: { "messages": "world" }
52/// // After:  { "messages": ["hello", "world"] }
53/// ```
54#[derive(Debug, Clone, Default)]
55pub struct AppendReducer;
56
57#[async_trait]
58impl Reducer for AppendReducer {
59    async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value> {
60        let mut arr = match current {
61            Some(Value::Array(a)) => a.clone(),
62            _ => Vec::new(),
63        };
64        arr.push(update.clone());
65        Ok(Value::Array(arr))
66    }
67
68    fn name(&self) -> &str {
69        "append"
70    }
71
72    fn reducer_type(&self) -> ReducerType {
73        ReducerType::Append
74    }
75}
76
77/// Extend reducer - extends a list with items from another list
78///
79/// If the update is an array, all items are added to the current array.
80/// If the current value doesn't exist or isn't an array, creates a new array.
81///
82/// # Example
83///
84/// ```rust,ignore
85/// // Before: { "items": [1, 2] }
86/// // Update: { "items": [3, 4, 5] }
87/// // After:  { "items": [1, 2, 3, 4, 5] }
88/// ```
89#[derive(Debug, Clone, Default)]
90pub struct ExtendReducer;
91
92#[async_trait]
93impl Reducer for ExtendReducer {
94    async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value> {
95        let mut arr = match current {
96            Some(Value::Array(a)) => a.clone(),
97            _ => Vec::new(),
98        };
99
100        match update {
101            Value::Array(items) => {
102                arr.extend(items.iter().cloned());
103            }
104            other => {
105                arr.push(other.clone());
106            }
107        }
108
109        Ok(Value::Array(arr))
110    }
111
112    fn name(&self) -> &str {
113        "extend"
114    }
115
116    fn reducer_type(&self) -> ReducerType {
117        ReducerType::Extend
118    }
119}
120
121/// Merge reducer - merges the update into the current object
122///
123/// Performs a shallow or deep merge of two objects.
124///
125/// # Example
126///
127/// ```rust,ignore
128/// // Before: { "config": { "a": 1, "b": 2 } }
129/// // Update: { "config": { "b": 3, "c": 4 } }
130/// // After (shallow): { "config": { "a": 1, "b": 3, "c": 4 } }
131/// ```
132#[derive(Debug, Clone)]
133pub struct MergeReducer {
134    /// Whether to perform deep merge on nested objects
135    pub deep: bool,
136}
137
138impl Default for MergeReducer {
139    fn default() -> Self {
140        Self { deep: false }
141    }
142}
143
144impl MergeReducer {
145    /// Create a shallow merge reducer
146    pub fn shallow() -> Self {
147        Self { deep: false }
148    }
149
150    /// Create a deep merge reducer
151    pub fn deep() -> Self {
152        Self { deep: true }
153    }
154}
155
156#[async_trait]
157impl Reducer for MergeReducer {
158    async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value> {
159        match (current, update) {
160            (Some(Value::Object(current_map)), Value::Object(update_map)) => {
161                let mut result = current_map.clone();
162
163                for (key, value) in update_map {
164                    // If deep merge and both values are objects, recurse
165                    if self.deep {
166                        if let (Some(Value::Object(existing)), Value::Object(new_obj)) =
167                            (result.get(key), value)
168                        {
169                            let merged = merge_objects_deep(existing.clone(), new_obj.clone());
170                            result.insert(key.clone(), Value::Object(merged));
171                            continue;
172                        }
173                    }
174                    result.insert(key.clone(), value.clone());
175                }
176
177                Ok(Value::Object(result))
178            }
179            (None, Value::Object(update_map)) => Ok(Value::Object(update_map.clone())),
180            (Some(current), _) => Ok(current.clone()),
181            (None, update) => Ok(update.clone()),
182        }
183    }
184
185    fn name(&self) -> &str {
186        if self.deep {
187            "merge_deep"
188        } else {
189            "merge"
190        }
191    }
192
193    fn reducer_type(&self) -> ReducerType {
194        ReducerType::Merge { deep: self.deep }
195    }
196}
197
198/// Helper function for deep object merging
199fn merge_objects_deep(
200    mut base: serde_json::Map<String, Value>,
201    update: serde_json::Map<String, Value>,
202) -> serde_json::Map<String, Value> {
203    for (key, value) in update {
204        match (base.get(&key), value) {
205            (Some(Value::Object(base_obj)), Value::Object(update_obj)) => {
206                let merged = merge_objects_deep(base_obj.clone(), update_obj);
207                base.insert(key, Value::Object(merged));
208            }
209            (_, value) => {
210                base.insert(key, value);
211            }
212        }
213    }
214    base
215}
216
217/// LastN reducer - keeps only the last N items in a list
218///
219/// Useful for maintaining a sliding window of values.
220///
221/// # Example
222///
223/// ```rust,ignore
224/// // With n=3:
225/// // Before: { "history": [1, 2, 3, 4, 5] }
226/// // Update: { "history": 6 }
227/// // After:  { "history": [4, 5, 6] }
228/// ```
229#[derive(Debug, Clone)]
230pub struct LastNReducer {
231    /// Maximum number of items to keep
232    pub n: usize,
233}
234
235impl LastNReducer {
236    /// Create a new LastN reducer
237    pub fn new(n: usize) -> Self {
238        Self { n }
239    }
240}
241
242#[async_trait]
243impl Reducer for LastNReducer {
244    async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value> {
245        let mut arr = match current {
246            Some(Value::Array(a)) => a.clone(),
247            _ => Vec::new(),
248        };
249
250        // Append the new value
251        match update {
252            Value::Array(items) => {
253                arr.extend(items.iter().cloned());
254            }
255            other => {
256                arr.push(other.clone());
257            }
258        }
259
260        // Keep only last n items
261        if arr.len() > self.n {
262            let start = arr.len() - self.n;
263            arr = arr.split_off(start);
264        }
265
266        Ok(Value::Array(arr))
267    }
268
269    fn name(&self) -> &str {
270        "last_n"
271    }
272
273    fn reducer_type(&self) -> ReducerType {
274        ReducerType::LastN { n: self.n }
275    }
276}
277
278/// First reducer - keeps the first non-null value
279///
280/// Once a value is set, subsequent updates are ignored.
281///
282/// # Example
283///
284/// ```rust,ignore
285/// // Before: null
286/// // Update: "first"
287/// // After:  "first"
288/// // Update: "second"
289/// // After:  "first" (unchanged)
290/// ```
291#[derive(Debug, Clone, Default)]
292pub struct FirstReducer;
293
294#[async_trait]
295impl Reducer for FirstReducer {
296    async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value> {
297        match current {
298            Some(value) if !value.is_null() => Ok(value.clone()),
299            _ => Ok(update.clone()),
300        }
301    }
302
303    fn name(&self) -> &str {
304        "first"
305    }
306
307    fn reducer_type(&self) -> ReducerType {
308        ReducerType::First
309    }
310}
311
312/// Last reducer - keeps the last non-null value
313///
314/// Always takes the most recent non-null value.
315///
316/// # Example
317///
318/// ```rust,ignore
319/// // Before: "first"
320/// // Update: "second"
321/// // After:  "second"
322/// // Update: null
323/// // After:  "second" (unchanged because update is null)
324/// ```
325#[derive(Debug, Clone, Default)]
326pub struct LastReducer;
327
328#[async_trait]
329impl Reducer for LastReducer {
330    async fn reduce(&self, _current: Option<&Value>, update: &Value) -> AgentResult<Value> {
331        // If update is null, keep current
332        if update.is_null() {
333            // This is a bit tricky - we need to return the current value
334            // But if current is None, we return null
335            // For now, let's just return the update (null)
336            Ok(update.clone())
337        } else {
338            Ok(update.clone())
339        }
340    }
341
342    fn name(&self) -> &str {
343        "last"
344    }
345
346    fn reducer_type(&self) -> ReducerType {
347        ReducerType::Last
348    }
349}
350
351/// Custom reducer using a closure
352///
353/// Allows defining custom merge logic with a function.
354pub struct CustomReducer<F>
355where
356    F: Fn(Option<&Value>, &Value) -> AgentResult<Value> + Send + Sync,
357{
358    name: String,
359    func: F,
360}
361
362impl<F> CustomReducer<F>
363where
364    F: Fn(Option<&Value>, &Value) -> AgentResult<Value> + Send + Sync,
365{
366    /// Create a new custom reducer
367    pub fn new(name: impl Into<String>, func: F) -> Self {
368        Self {
369            name: name.into(),
370            func,
371        }
372    }
373}
374
375#[async_trait]
376impl<F> Reducer for CustomReducer<F>
377where
378    F: Fn(Option<&Value>, &Value) -> AgentResult<Value> + Send + Sync,
379{
380    async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value> {
381        (self.func)(current, update)
382    }
383
384    fn name(&self) -> &str {
385        &self.name
386    }
387
388    fn reducer_type(&self) -> ReducerType {
389        ReducerType::Custom(self.name.clone())
390    }
391}
392
393/// Create a reducer from a ReducerType
394pub fn create_reducer(reducer_type: &ReducerType) -> AgentResult<Box<dyn Reducer>> {
395    match reducer_type {
396        ReducerType::Overwrite => Ok(Box::new(OverwriteReducer)),
397        ReducerType::Append => Ok(Box::new(AppendReducer)),
398        ReducerType::Extend => Ok(Box::new(ExtendReducer)),
399        ReducerType::Merge { deep } => Ok(Box::new(MergeReducer { deep: *deep })),
400        ReducerType::LastN { n } => Ok(Box::new(LastNReducer::new(*n))),
401        ReducerType::First => Ok(Box::new(FirstReducer)),
402        ReducerType::Last => Ok(Box::new(LastReducer)),
403        ReducerType::Custom(name) => Err(AgentError::Internal(format!(
404            "Cannot create reducer for unknown custom type: {}",
405            name
406        ))),
407    }
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413    use serde_json::json;
414
415    #[tokio::test]
416    async fn test_overwrite_reducer() {
417        let reducer = OverwriteReducer;
418
419        let result = reducer
420            .reduce(Some(&json!("old")), &json!("new"))
421            .await
422            .unwrap();
423        assert_eq!(result, json!("new"));
424
425        let result = reducer.reduce(None, &json!("value")).await.unwrap();
426        assert_eq!(result, json!("value"));
427    }
428
429    #[tokio::test]
430    async fn test_append_reducer() {
431        let reducer = AppendReducer;
432
433        let result = reducer
434            .reduce(Some(&json!(["a", "b"])), &json!("c"))
435            .await
436            .unwrap();
437        assert_eq!(result, json!(["a", "b", "c"]));
438
439        let result = reducer.reduce(None, &json!("first")).await.unwrap();
440        assert_eq!(result, json!(["first"]));
441    }
442
443    #[tokio::test]
444    async fn test_extend_reducer() {
445        let reducer = ExtendReducer;
446
447        let result = reducer
448            .reduce(Some(&json!([1, 2])), &json!([3, 4]))
449            .await
450            .unwrap();
451        assert_eq!(result, json!([1, 2, 3, 4]));
452
453        // Single item extends
454        let result = reducer
455            .reduce(Some(&json!([1])), &json!(2))
456            .await
457            .unwrap();
458        assert_eq!(result, json!([1, 2]));
459    }
460
461    #[tokio::test]
462    async fn test_merge_reducer_shallow() {
463        let reducer = MergeReducer::shallow();
464
465        let current = json!({"a": 1, "b": 2});
466        let update = json!({"b": 3, "c": 4});
467        let result = reducer.reduce(Some(&current), &update).await.unwrap();
468
469        assert_eq!(result["a"], 1);
470        assert_eq!(result["b"], 3);
471        assert_eq!(result["c"], 4);
472    }
473
474    #[tokio::test]
475    async fn test_merge_reducer_deep() {
476        let reducer = MergeReducer::deep();
477
478        let current = json!({
479            "config": {
480                "a": 1,
481                "b": { "x": 1, "y": 2 }
482            }
483        });
484        let update = json!({
485            "config": {
486                "b": { "y": 3, "z": 4 },
487                "c": 5
488            }
489        });
490        let result = reducer.reduce(Some(&current), &update).await.unwrap();
491
492        // Deep merge should preserve nested "x"
493        assert_eq!(result["config"]["a"], 1);
494        assert_eq!(result["config"]["b"]["x"], 1);
495        assert_eq!(result["config"]["b"]["y"], 3);
496        assert_eq!(result["config"]["b"]["z"], 4);
497        assert_eq!(result["config"]["c"], 5);
498    }
499
500    #[tokio::test]
501    async fn test_last_n_reducer() {
502        let reducer = LastNReducer::new(3);
503
504        let result = reducer
505            .reduce(Some(&json!([1, 2, 3, 4])), &json!(5))
506            .await
507            .unwrap();
508        assert_eq!(result, json!([3, 4, 5]));
509
510        // Adding array extends and keeps last N
511        let result = reducer
512            .reduce(Some(&json!([1, 2])), &json!([3, 4, 5, 6]))
513            .await
514            .unwrap();
515        assert_eq!(result, json!([4, 5, 6]));
516    }
517
518    #[tokio::test]
519    async fn test_first_reducer() {
520        let reducer = FirstReducer;
521
522        // First value
523        let result = reducer.reduce(None, &json!("first")).await.unwrap();
524        assert_eq!(result, json!("first"));
525
526        // Subsequent updates ignored
527        let result = reducer
528            .reduce(Some(&json!("first")), &json!("second"))
529            .await
530            .unwrap();
531        assert_eq!(result, json!("first"));
532
533        // Null current accepts update
534        let result = reducer
535            .reduce(Some(&json!(null)), &json!("value"))
536            .await
537            .unwrap();
538        assert_eq!(result, json!("value"));
539    }
540
541    #[tokio::test]
542    async fn test_last_reducer() {
543        let reducer = LastReducer;
544
545        // Takes last non-null value
546        let result = reducer
547            .reduce(Some(&json!("first")), &json!("second"))
548            .await
549            .unwrap();
550        assert_eq!(result, json!("second"));
551
552        // Non-null update replaces
553        let result = reducer
554            .reduce(Some(&json!("old")), &json!("new"))
555            .await
556            .unwrap();
557        assert_eq!(result, json!("new"));
558    }
559
560    #[tokio::test]
561    async fn test_custom_reducer() {
562        let reducer = CustomReducer::new("sum", |current, update| {
563            let curr = current
564                .and_then(|v| v.as_i64())
565                .unwrap_or(0);
566            let upd = update.as_i64().unwrap_or(0);
567            Ok(json!(curr + upd))
568        });
569
570        let result = reducer
571            .reduce(Some(&json!(10)), &json!(5))
572            .await
573            .unwrap();
574        assert_eq!(result, json!(15));
575
576        assert_eq!(reducer.name(), "sum");
577    }
578
579    #[test]
580    fn test_create_reducer() {
581        let r = create_reducer(&ReducerType::Overwrite).unwrap();
582        assert_eq!(r.name(), "overwrite");
583
584        let r = create_reducer(&ReducerType::Append).unwrap();
585        assert_eq!(r.name(), "append");
586
587        let r = create_reducer(&ReducerType::LastN { n: 5 }).unwrap();
588        assert_eq!(r.name(), "last_n");
589    }
590}