entelix_graph/
contributing_node.rs1use std::sync::Arc;
15
16use async_trait::async_trait;
17use entelix_core::context::ExecutionContext;
18use entelix_core::error::Result;
19use entelix_runnable::Runnable;
20
21use crate::reducer::StateMerge;
22
23pub struct ContributingNodeAdapter<S>
28where
29 S: StateMerge + Clone + Send + Sync + 'static,
30{
31 inner: Arc<dyn Runnable<S, S::Contribution>>,
32}
33
34impl<S> ContributingNodeAdapter<S>
35where
36 S: StateMerge + Clone + Send + Sync + 'static,
37{
38 pub fn new<R>(inner: R) -> Self
43 where
44 R: Runnable<S, S::Contribution> + 'static,
45 {
46 Self {
47 inner: Arc::new(inner),
48 }
49 }
50}
51
52impl<S> std::fmt::Debug for ContributingNodeAdapter<S>
53where
54 S: StateMerge + Clone + Send + Sync + 'static,
55{
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("ContributingNodeAdapter")
58 .field("inner", &"<runnable>")
59 .finish()
60 }
61}
62
63#[async_trait]
64impl<S> Runnable<S, S> for ContributingNodeAdapter<S>
65where
66 S: StateMerge + Clone + Send + Sync + 'static,
67{
68 async fn invoke(&self, input: S, ctx: &ExecutionContext) -> Result<S> {
69 let snapshot = input.clone();
70 let contribution = self.inner.invoke(input, ctx).await?;
71 Ok(snapshot.merge_contribution(contribution))
72 }
73}
74
75#[cfg(test)]
76#[allow(clippy::unwrap_used)]
77mod tests {
78 use entelix_runnable::RunnableLambda;
79
80 use super::*;
81 use crate::reducer::{Annotated, Append, Max};
82
83 #[derive(Clone, Debug, Default)]
84 struct AgentState {
85 log: Annotated<Vec<String>, Append<String>>,
86 score: Annotated<i32, Max<i32>>,
87 tag: String,
88 }
89
90 #[derive(Default)]
91 struct AgentStateContribution {
92 log: Option<Annotated<Vec<String>, Append<String>>>,
93 score: Option<Annotated<i32, Max<i32>>>,
94 tag: Option<String>,
95 }
96
97 impl StateMerge for AgentState {
98 type Contribution = AgentStateContribution;
99
100 fn merge(self, update: Self) -> Self {
101 Self {
102 log: self.log.merge(update.log),
103 score: self.score.merge(update.score),
104 tag: update.tag,
105 }
106 }
107
108 fn merge_contribution(self, c: Self::Contribution) -> Self {
109 Self {
110 log: match c.log {
111 Some(v) => self.log.merge(v),
112 None => self.log,
113 },
114 score: match c.score {
115 Some(v) => self.score.merge(v),
116 None => self.score,
117 },
118 tag: c.tag.unwrap_or(self.tag),
119 }
120 }
121 }
122
123 fn seed(log: Vec<&str>, score: i32, tag: &str) -> AgentState {
124 AgentState {
125 log: Annotated::new(log.into_iter().map(String::from).collect(), Append::new()),
126 score: Annotated::new(score, Max::new()),
127 tag: tag.into(),
128 }
129 }
130
131 #[tokio::test]
132 async fn contributing_adapter_writes_only_named_slots() {
133 let node = RunnableLambda::new(|_input: AgentState, _ctx| async move {
139 Ok::<_, _>(AgentStateContribution {
140 log: Some(Annotated::new(vec!["new entry".into()], Append::new())),
141 score: None,
142 tag: Some("after".into()),
143 })
144 });
145 let adapter = ContributingNodeAdapter::new(node);
146
147 let initial = seed(vec!["seed"], 80, "before");
148 let merged = adapter
149 .invoke(initial, &ExecutionContext::new())
150 .await
151 .unwrap();
152
153 assert_eq!(
154 merged.log.value,
155 vec!["seed".to_owned(), "new entry".into()]
156 );
157 assert_eq!(merged.score.value, 80);
160 assert_eq!(merged.tag, "after");
161 }
162
163 #[tokio::test]
164 async fn contributing_adapter_unwritten_field_keeps_negative_current_value() {
165 let node = RunnableLambda::new(|_input: AgentState, _ctx| async move {
170 Ok::<_, _>(AgentStateContribution::default())
171 });
172 let adapter = ContributingNodeAdapter::new(node);
173
174 let initial = seed(vec![], -100, "x");
175 let merged = adapter
176 .invoke(initial, &ExecutionContext::new())
177 .await
178 .unwrap();
179 assert_eq!(merged.score.value, -100, "no contribution must leave value");
180 }
181
182 #[tokio::test]
183 async fn contributing_adapter_propagates_inner_error() {
184 let node = RunnableLambda::new(|_input: AgentState, _ctx| async move {
185 Err::<AgentStateContribution, _>(entelix_core::error::Error::invalid_request("nope"))
186 });
187 let adapter = ContributingNodeAdapter::new(node);
188 let err = adapter
189 .invoke(seed(vec![], 0, ""), &ExecutionContext::new())
190 .await
191 .unwrap_err();
192 assert!(format!("{err}").contains("nope"));
193 }
194}