Skip to main content

entelix_graph/
contributing_node.rs

1//! `ContributingNodeAdapter` — wraps a `Runnable<S, S::Contribution>`
2//! whose output names exactly the slots the node touched, and folds
3//! it into the inbound state through
4//! [`StateMerge::merge_contribution`](crate::StateMerge::merge_contribution).
5//!
6//! Companion to [`MergeNodeAdapter`](crate::MergeNodeAdapter): the
7//! latter takes a per-graph closure for arbitrary merge logic;
8//! this one is type-driven via the state's
9//! [`StateMerge`](crate::StateMerge) impl, with the
10//! `<Name>Contribution` companion struct from
11//! `entelix-graph-derive` providing the per-slot `Option`-wrapped
12//! shape.
13
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use entelix_core::context::ExecutionContext;
18use entelix_core::error::Result;
19use entelix_runnable::Runnable;
20
21use crate::reducer::StateMerge;
22
23/// `Runnable<S, S>` adapter that snapshots the inbound state, runs
24/// an inner `Runnable<S, S::Contribution>` to produce only the
25/// slots the node touched, and folds that contribution into the
26/// snapshot via `S::merge_contribution`.
27pub struct ContributingNodeAdapter<S>
28where
29    S: StateMerge + Clone + Send + Sync + 'static,
30{
31    inner: Arc<dyn Runnable<S, S::Contribution>>,
32}
33
34impl<S> ContributingNodeAdapter<S>
35where
36    S: StateMerge + Clone + Send + Sync + 'static,
37{
38    /// Wrap `inner`. The inner runnable's output is
39    /// `S::Contribution` — typically built via
40    /// `S::Contribution::default().with_<field>(…)` chained per
41    /// slot the node intends to write.
42    pub fn new<R>(inner: R) -> Self
43    where
44        R: Runnable<S, S::Contribution> + 'static,
45    {
46        Self {
47            inner: Arc::new(inner),
48        }
49    }
50}
51
52impl<S> std::fmt::Debug for ContributingNodeAdapter<S>
53where
54    S: StateMerge + Clone + Send + Sync + 'static,
55{
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("ContributingNodeAdapter")
58            .field("inner", &"<runnable>")
59            .finish()
60    }
61}
62
63#[async_trait]
64impl<S> Runnable<S, S> for ContributingNodeAdapter<S>
65where
66    S: StateMerge + Clone + Send + Sync + 'static,
67{
68    async fn invoke(&self, input: S, ctx: &ExecutionContext) -> Result<S> {
69        let snapshot = input.clone();
70        let contribution = self.inner.invoke(input, ctx).await?;
71        Ok(snapshot.merge_contribution(contribution))
72    }
73}
74
75#[cfg(test)]
76#[allow(clippy::unwrap_used)]
77mod tests {
78    use entelix_runnable::RunnableLambda;
79
80    use super::*;
81    use crate::reducer::{Annotated, Append, Max};
82
83    #[derive(Clone, Debug, Default)]
84    struct AgentState {
85        log: Annotated<Vec<String>, Append<String>>,
86        score: Annotated<i32, Max<i32>>,
87        tag: String,
88    }
89
90    #[derive(Default)]
91    struct AgentStateContribution {
92        log: Option<Annotated<Vec<String>, Append<String>>>,
93        score: Option<Annotated<i32, Max<i32>>>,
94        tag: Option<String>,
95    }
96
97    impl StateMerge for AgentState {
98        type Contribution = AgentStateContribution;
99
100        fn merge(self, update: Self) -> Self {
101            Self {
102                log: self.log.merge(update.log),
103                score: self.score.merge(update.score),
104                tag: update.tag,
105            }
106        }
107
108        fn merge_contribution(self, c: Self::Contribution) -> Self {
109            Self {
110                log: match c.log {
111                    Some(v) => self.log.merge(v),
112                    None => self.log,
113                },
114                score: match c.score {
115                    Some(v) => self.score.merge(v),
116                    None => self.score,
117                },
118                tag: c.tag.unwrap_or(self.tag),
119            }
120        }
121    }
122
123    fn seed(log: Vec<&str>, score: i32, tag: &str) -> AgentState {
124        AgentState {
125            log: Annotated::new(log.into_iter().map(String::from).collect(), Append::new()),
126            score: Annotated::new(score, Max::new()),
127            tag: tag.into(),
128        }
129    }
130
131    #[tokio::test]
132    async fn contributing_adapter_writes_only_named_slots() {
133        // Node writes log + tag, leaves score untouched. Score
134        // contribution is `None`, so the score slot's *current*
135        // value (80) survives — the LangGraph TypedDict semantic
136        // that the simpler "always-merge" approach can't express
137        // without per-slot intent.
138        let node = RunnableLambda::new(|_input: AgentState, _ctx| async move {
139            Ok::<_, _>(AgentStateContribution {
140                log: Some(Annotated::new(vec!["new entry".into()], Append::new())),
141                score: None,
142                tag: Some("after".into()),
143            })
144        });
145        let adapter = ContributingNodeAdapter::new(node);
146
147        let initial = seed(vec!["seed"], 80, "before");
148        let merged = adapter
149            .invoke(initial, &ExecutionContext::new())
150            .await
151            .unwrap();
152
153        assert_eq!(
154            merged.log.value,
155            vec!["seed".to_owned(), "new entry".into()]
156        );
157        // Score: untouched — score=80 keeps regardless of any
158        // default-zero contribution might have implied.
159        assert_eq!(merged.score.value, 80);
160        assert_eq!(merged.tag, "after");
161    }
162
163    #[tokio::test]
164    async fn contributing_adapter_unwritten_field_keeps_negative_current_value() {
165        // Regression for the default-overrides edge case: a node
166        // that doesn't touch `score` but returns a contribution
167        // with `score: None` must keep the current value `-100`,
168        // NOT collapse it via Max(current, default=0).
169        let node = RunnableLambda::new(|_input: AgentState, _ctx| async move {
170            Ok::<_, _>(AgentStateContribution::default())
171        });
172        let adapter = ContributingNodeAdapter::new(node);
173
174        let initial = seed(vec![], -100, "x");
175        let merged = adapter
176            .invoke(initial, &ExecutionContext::new())
177            .await
178            .unwrap();
179        assert_eq!(merged.score.value, -100, "no contribution must leave value");
180    }
181
182    #[tokio::test]
183    async fn contributing_adapter_propagates_inner_error() {
184        let node = RunnableLambda::new(|_input: AgentState, _ctx| async move {
185            Err::<AgentStateContribution, _>(entelix_core::error::Error::invalid_request("nope"))
186        });
187        let adapter = ContributingNodeAdapter::new(node);
188        let err = adapter
189            .invoke(seed(vec![], 0, ""), &ExecutionContext::new())
190            .await
191            .unwrap_err();
192        assert!(format!("{err}").contains("nope"));
193    }
194}