Skip to main content

mofa_kernel/workflow/
reducer.rs

1//! Reducer Trait and Types
2//!
3//! Defines the Reducer pattern for state update strategies.
4//! Reducers determine how state updates are merged with existing values.
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9
10use crate::agent::error::AgentResult;
11
12/// Reducer trait for state update strategies
13///
14/// A Reducer defines how to merge a state update with an existing value.
15/// Different keys in the state can have different reducers.
16///
17/// # Example
18///
19/// ```rust,ignore
20/// // Messages should be appended
21/// graph.add_reducer("messages", Box::new(AppendReducer));
22///
23/// // Result should overwrite
24/// graph.add_reducer("result", Box::new(OverwriteReducer));
25/// ```
26#[async_trait]
27pub trait Reducer: Send + Sync {
28    /// Reduce the current value with the update value
29    ///
30    /// # Arguments
31    /// * `current` - The current value (None if key doesn't exist)
32    /// * `update` - The new value to merge
33    ///
34    /// # Returns
35    /// The merged result
36    async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value>;
37
38    /// Returns the name of this reducer
39    fn name(&self) -> &str;
40
41    /// Returns the type of this reducer
42    fn reducer_type(&self) -> ReducerType;
43}
44
45/// Built-in reducer types
46#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
47pub enum ReducerType {
48    /// Overwrite the current value with the update (default)
49    Overwrite,
50
51    /// Append the update to a list (creates list if doesn't exist)
52    Append,
53
54    /// Extend the current list with items from update list
55    Extend,
56
57    /// Merge the update into the current object
58    Merge {
59        /// Whether to deep merge nested objects
60        deep: bool,
61    },
62
63    /// Keep only the last N items in a list
64    LastN {
65        /// Maximum number of items to keep
66        n: usize,
67    },
68
69    /// Take the first non-null value
70    First,
71
72    /// Take the last non-null value
73    Last,
74
75    /// Custom reducer with a name identifier
76    Custom(String),
77}
78
79impl Default for ReducerType {
80    fn default() -> Self {
81        Self::Overwrite
82    }
83}
84
85impl std::fmt::Display for ReducerType {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        match self {
88            ReducerType::Overwrite => write!(f, "overwrite"),
89            ReducerType::Append => write!(f, "append"),
90            ReducerType::Extend => write!(f, "extend"),
91            ReducerType::Merge { deep } => write!(f, "merge(deep={})", deep),
92            ReducerType::LastN { n } => write!(f, "last_n({})", n),
93            ReducerType::First => write!(f, "first"),
94            ReducerType::Last => write!(f, "last"),
95            ReducerType::Custom(name) => write!(f, "custom({})", name),
96        }
97    }
98}
99
100/// State update operation
101///
102/// Represents a single key-value update to be applied to the state.
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct StateUpdate {
105    /// The key to update
106    pub key: String,
107    /// The new value
108    pub value: Value,
109}
110
111impl StateUpdate {
112    /// Create a new state update
113    pub fn new(key: impl Into<String>, value: Value) -> Self {
114        Self {
115            key: key.into(),
116            value,
117        }
118    }
119
120    /// Create a state update from a serializable value
121    pub fn from_serializable<T: Serialize>(key: impl Into<String>, value: &T) -> AgentResult<Self> {
122        Ok(Self::new(key, serde_json::to_value(value)?))
123    }
124}
125
126impl From<(String, Value)> for StateUpdate {
127    fn from((key, value): (String, Value)) -> Self {
128        Self::new(key, value)
129    }
130}
131
132impl From<(&str, Value)> for StateUpdate {
133    fn from((key, value): (&str, Value)) -> Self {
134        Self::new(key, value)
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141    use serde_json::json;
142
143    #[test]
144    fn test_state_update_creation() {
145        let update = StateUpdate::new("key", json!("value"));
146        assert_eq!(update.key, "key");
147        assert_eq!(update.value, json!("value"));
148    }
149
150    #[test]
151    fn test_state_update_from_tuple() {
152        let update: StateUpdate = ("message", json!("hello")).into();
153        assert_eq!(update.key, "message");
154        assert_eq!(update.value, json!("hello"));
155    }
156
157    #[test]
158    fn test_reducer_type_display() {
159        assert_eq!(ReducerType::Overwrite.to_string(), "overwrite");
160        assert_eq!(
161            ReducerType::Merge { deep: true }.to_string(),
162            "merge(deep=true)"
163        );
164        assert_eq!(ReducerType::LastN { n: 5 }.to_string(), "last_n(5)");
165    }
166}