Skip to main content

pe_core/
lobe_node.rs

1//! Lobe-to-Node bridge — wraps a [`Lobe`] into a [`NodeFn<CognitiveState>`].
2//!
3//! This is the key adapter that lets lobes run inside the same Pregel BSP
4//! engine as outer graph nodes. The cognitive graph registers `LobeNode`
5//! instances as regular nodes — the engine sees standard `NodeFn`, the
6//! lobe sees its restricted `LobeInput`.
7
8use std::sync::Arc;
9use tokio::sync::Mutex;
10
11use crate::cognitive::CognitiveState;
12use crate::cognitive_signal::CognitiveSignal;
13use crate::error::PeError;
14use crate::lobe::{Lobe, LobeContext, LobeInput, LobeOutput};
15use crate::lobe_cache::LobeCache;
16use crate::node::{NodeContext, NodeFn, NodeFuture, NodeResult};
17
18use super::cognitive::CognitiveStateUpdate;
19
20/// Wraps a [`Lobe`] into a [`NodeFn<CognitiveState>`] for the cognitive graph.
21///
22/// Handles activation checking, budget enforcement, and output mapping.
23/// When the lobe is inactive, returns an empty update (zero cost).
24///
25/// # Example
26///
27/// ```ignore
28/// let lobe: Arc<dyn Lobe> = Arc::new(MyCriticLobe::new(provider));
29/// let node = LobeNode::new(lobe);
30/// // Register as a node in the cognitive StateGraph
31/// graph.add_node(node.name(), node);
32/// ```
33pub struct LobeNode {
34    lobe: Arc<dyn Lobe>,
35    cache: Option<Arc<Mutex<LobeCache>>>,
36}
37
38// Compile-time assertion: LobeNode must be Send + Sync for Pregel BSP
39const _: () = {
40    fn _assert_send_sync<T: Send + Sync>() {}
41    fn _check() {
42        _assert_send_sync::<LobeNode>();
43    }
44};
45
46impl LobeNode {
47    /// Create a new bridge node wrapping a lobe (no cache).
48    pub fn new(lobe: Arc<dyn Lobe>) -> Self {
49        Self { lobe, cache: None }
50    }
51
52    /// Create a bridge node with an output cache.
53    ///
54    /// When the same input is seen twice, the cached result is returned
55    /// without calling `Lobe::process()`. The cache is shared across
56    /// invocations via `Arc<Mutex<LobeCache>>`.
57    pub fn with_cache(lobe: Arc<dyn Lobe>, cache: Arc<Mutex<LobeCache>>) -> Self {
58        Self {
59            lobe,
60            cache: Some(cache),
61        }
62    }
63}
64
65impl NodeFn<CognitiveState> for LobeNode {
66    fn call(&self, state: &CognitiveState, ctx: &NodeContext) -> NodeFuture<CognitiveStateUpdate> {
67        let lobe = Arc::clone(&self.lobe);
68        let lobe_name = lobe.name().to_string();
69
70        // Build the lean LobeContext from full CognitiveState
71        let context = LobeContext::from_cognitive_state(state);
72
73        // Check activation BEFORE cloning anything expensive
74        if !lobe.should_activate(&context) {
75            return Box::pin(async move { NodeResult::Update(CognitiveStateUpdate::default()) });
76        }
77
78        // Build the restricted input
79        let input = LobeInput {
80            input: state.input.clone(),
81            context,
82            notes: state.working_notes.clone(),
83            runtime_services: ctx
84                .lobe_runtime_service_factory
85                .as_ref()
86                .map(|factory| factory.for_lobe(&lobe_name)),
87        };
88
89        let budget = lobe.budget();
90        let cache = self.cache.clone();
91        let input_key = state.input.clone(); // for cache lookup
92
93        Box::pin(async move {
94            // Check cache before executing
95            if let Some(ref cache) = cache {
96                let guard = cache.lock().await;
97                if let Some(cached) = guard.get(&lobe_name, &input_key) {
98                    let update = map_output_to_update(&lobe_name, cached.clone());
99                    return NodeResult::Update(update);
100                }
101            }
102
103            // Execute the lobe with timeout enforcement
104            let result = if let Some(max_dur) = budget.max_duration {
105                match tokio::time::timeout(max_dur, lobe.process(&input)).await {
106                    Ok(result) => result,
107                    Err(_) => Err(PeError::Timeout {
108                        seconds: max_dur.as_secs_f64(),
109                    }),
110                }
111            } else {
112                lobe.process(&input).await
113            };
114
115            match result {
116                Ok(output) => {
117                    // Populate cache on success
118                    if let Some(ref cache) = cache {
119                        let mut guard = cache.lock().await;
120                        guard.put(&lobe_name, &input_key, output.clone());
121                    }
122                    let update = map_output_to_update(&lobe_name, output);
123                    NodeResult::Update(update)
124                }
125                Err(e) => {
126                    // Lobe errors don't crash the cognitive graph — they produce
127                    // a low-confidence signal so synthesis can work with partial data
128                    let update = CognitiveStateUpdate {
129                        signals: Some(vec![CognitiveSignal::ProceedWithCaution {
130                            concern: format!("Lobe '{lobe_name}' failed: {e}"),
131                        }]),
132                        error_history: Some(vec![format!("lobe:{lobe_name}:{e}")]),
133                        ..Default::default()
134                    };
135                    NodeResult::Update(update)
136                }
137            }
138        })
139    }
140
141    fn name(&self) -> &str {
142        self.lobe.name()
143    }
144}
145
146/// Map a [`LobeOutput`] into a [`CognitiveStateUpdate`].
147///
148/// Stores the FULL LobeOutput (content + confidence + signals + metadata)
149/// so the synthesizer receives lossless data. Signals stay on the LobeOutput
150/// rather than being extracted separately — the synthesizer aggregates them.
151///
152/// Special handling: if metadata contains `__meditate_notes`, the consolidated
153/// notes are extracted and used as `replace_working_notes` (replace semantics,
154/// not append). This is how [`MeditateLobe`] writes back consolidated memory.
155fn map_output_to_update(lobe_name: &str, mut output: LobeOutput) -> CognitiveStateUpdate {
156    output.lobe_name = lobe_name.to_string();
157
158    // Extract meditate replace data from metadata if present.
159    let replace_notes = output
160        .metadata
161        .remove("__meditate_notes")
162        .and_then(|v| serde_json::from_value(v).ok());
163
164    CognitiveStateUpdate {
165        stream_outputs: Some([(lobe_name.to_string(), output)].into_iter().collect()),
166        replace_working_notes: replace_notes,
167        ..Default::default()
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use crate::lobe::{LobeBudget, LobeFuture, LobeOutputFormat};
175    use std::time::Duration;
176
177    /// A simple test lobe that returns fixed output.
178    struct FixedLobe {
179        name: &'static str,
180        output: &'static str,
181        confidence: f64,
182        active: bool,
183    }
184
185    impl Lobe for FixedLobe {
186        fn name(&self) -> &str {
187            self.name
188        }
189        fn should_activate(&self, _ctx: &LobeContext) -> bool {
190            self.active
191        }
192        fn priority(&self) -> u32 {
193            10
194        }
195        fn budget(&self) -> LobeBudget {
196            LobeBudget {
197                max_tokens: 100,
198                max_duration: Some(Duration::from_secs(5)),
199                streaming: false,
200            }
201        }
202        fn output_format(&self) -> LobeOutputFormat {
203            LobeOutputFormat::FreeText
204        }
205        fn process(&self, _input: &LobeInput) -> LobeFuture {
206            let content = self.output.to_string();
207            let confidence = self.confidence;
208            Box::pin(async move { Ok(LobeOutput::new(content, confidence)) })
209        }
210    }
211
212    /// A lobe that always errors.
213    struct ErrorLobe;
214
215    impl Lobe for ErrorLobe {
216        fn name(&self) -> &str {
217            "error_lobe"
218        }
219        fn should_activate(&self, _ctx: &LobeContext) -> bool {
220            true
221        }
222        fn priority(&self) -> u32 {
223            10
224        }
225        fn budget(&self) -> LobeBudget {
226            LobeBudget::default()
227        }
228        fn output_format(&self) -> LobeOutputFormat {
229            LobeOutputFormat::FreeText
230        }
231        fn process(&self, _input: &LobeInput) -> LobeFuture {
232            Box::pin(async {
233                Err(PeError::Internal {
234                    details: "lobe crashed".into(),
235                })
236            })
237        }
238    }
239
240    fn test_state() -> CognitiveState {
241        CognitiveState {
242            input: "analyze this code".into(),
243            confidence: 0.7,
244            ..Default::default()
245        }
246    }
247
248    fn test_ctx() -> NodeContext {
249        NodeContext {
250            step: 1,
251            recursion_limit: 25,
252            node_name: "test".into(),
253            activation: crate::node::ActivationReason::EntryPoint,
254            metadata: Default::default(),
255            phase_store: crate::phase_store::PhaseStateStore::new(),
256            stream_sender: None,
257            tool_observer: None,
258            lobe_runtime_service_factory: None,
259        }
260    }
261
262    #[tokio::test]
263    async fn test_active_lobe_produces_output() {
264        let lobe = Arc::new(FixedLobe {
265            name: "analyst",
266            output: "Facts: code is correct",
267            confidence: 0.9,
268            active: true,
269        });
270        let node = LobeNode::new(lobe);
271        let result = node.call(&test_state(), &test_ctx()).await;
272
273        match result {
274            NodeResult::Update(update) => {
275                let outputs = update.stream_outputs.unwrap();
276                let analyst_output = outputs.get("analyst").unwrap();
277                assert_eq!(analyst_output.content, "Facts: code is correct");
278                assert!((analyst_output.confidence - 0.9).abs() < f64::EPSILON);
279                assert_eq!(analyst_output.lobe_name, "analyst");
280            }
281            other => panic!("Expected Update, got {other:?}"),
282        }
283    }
284
285    #[tokio::test]
286    async fn test_inactive_lobe_skipped() {
287        let lobe = Arc::new(FixedLobe {
288            name: "critic",
289            output: "should not appear",
290            confidence: 0.5,
291            active: false,
292        });
293        let node = LobeNode::new(lobe);
294        let result = node.call(&test_state(), &test_ctx()).await;
295
296        match result {
297            NodeResult::Update(update) => {
298                assert!(update.stream_outputs.is_none());
299                assert!(update.signals.is_none());
300            }
301            other => panic!("Expected empty Update, got {other:?}"),
302        }
303    }
304
305    #[tokio::test]
306    async fn test_error_lobe_produces_caution_signal() {
307        let node = LobeNode::new(Arc::new(ErrorLobe));
308        let result = node.call(&test_state(), &test_ctx()).await;
309
310        match result {
311            NodeResult::Update(update) => {
312                // Error produces a caution signal, not a NodeResult::Error
313                let signals = update.signals.unwrap();
314                assert_eq!(signals.len(), 1);
315                assert!(signals[0].is_cautionary());
316
317                // Error recorded in history
318                let errors = update.error_history.unwrap();
319                assert_eq!(errors.len(), 1);
320                assert!(errors[0].contains("error_lobe"));
321            }
322            other => panic!("Expected Update with caution, got {other:?}"),
323        }
324    }
325
326    #[tokio::test]
327    async fn test_lobe_with_signals_propagated() {
328        struct SignalLobe;
329        impl Lobe for SignalLobe {
330            fn name(&self) -> &str {
331                "signal_lobe"
332            }
333            fn should_activate(&self, _: &LobeContext) -> bool {
334                true
335            }
336            fn priority(&self) -> u32 {
337                10
338            }
339            fn budget(&self) -> LobeBudget {
340                LobeBudget::default()
341            }
342            fn output_format(&self) -> LobeOutputFormat {
343                LobeOutputFormat::FreeText
344            }
345            fn process(&self, _: &LobeInput) -> LobeFuture {
346                Box::pin(async {
347                    Ok(LobeOutput::new("risky", 0.3).with_signal(
348                        CognitiveSignal::ProceedWithCaution {
349                            concern: "low confidence".into(),
350                        },
351                    ))
352                })
353            }
354        }
355
356        let node = LobeNode::new(Arc::new(SignalLobe));
357        let result = node.call(&test_state(), &test_ctx()).await;
358
359        match result {
360            NodeResult::Update(update) => {
361                // Signals stay on the LobeOutput, not extracted to update.signals
362                let outputs = update.stream_outputs.unwrap();
363                let lobe_output = outputs.get("signal_lobe").unwrap();
364                assert_eq!(lobe_output.signals.len(), 1);
365                assert!(lobe_output.signals[0].is_cautionary());
366            }
367            other => panic!("Expected Update with signal, got {other:?}"),
368        }
369    }
370
371    #[tokio::test]
372    async fn test_node_name_matches_lobe_name() {
373        let lobe = Arc::new(FixedLobe {
374            name: "my_lobe",
375            output: "test",
376            confidence: 0.5,
377            active: true,
378        });
379        let node = LobeNode::new(lobe);
380        assert_eq!(node.name(), "my_lobe");
381    }
382
383    #[tokio::test]
384    async fn test_lobe_context_built_from_state() {
385        let state = CognitiveState {
386            input: "test".into(),
387            confidence: 0.85,
388            current_plan: Some("step 1: analyze".into()),
389            error_history: vec!["prev error".into()],
390            ..Default::default()
391        };
392        let ctx = LobeContext::from_cognitive_state(&state);
393        assert!((ctx.confidence - 0.85).abs() < f64::EPSILON);
394        assert_eq!(ctx.current_plan.as_deref(), Some("step 1: analyze"));
395        assert_eq!(ctx.recent_errors, vec!["prev error"]);
396    }
397
398    #[tokio::test]
399    async fn test_lobe_timeout_produces_caution() {
400        struct SlowLobe;
401        impl Lobe for SlowLobe {
402            fn name(&self) -> &str {
403                "slow"
404            }
405            fn should_activate(&self, _: &LobeContext) -> bool {
406                true
407            }
408            fn priority(&self) -> u32 {
409                10
410            }
411            fn budget(&self) -> LobeBudget {
412                LobeBudget {
413                    max_tokens: 100,
414                    max_duration: Some(Duration::from_millis(10)),
415                    streaming: false,
416                }
417            }
418            fn output_format(&self) -> LobeOutputFormat {
419                LobeOutputFormat::FreeText
420            }
421            fn process(&self, _: &LobeInput) -> LobeFuture {
422                Box::pin(async {
423                    tokio::time::sleep(Duration::from_millis(200)).await;
424                    Ok(LobeOutput::new("too slow", 0.5))
425                })
426            }
427        }
428
429        let node = LobeNode::new(Arc::new(SlowLobe));
430        let result = node.call(&test_state(), &test_ctx()).await;
431
432        match result {
433            NodeResult::Update(update) => {
434                // Timeout → caution signal (same as error path)
435                let signals = update.signals.unwrap();
436                assert!(!signals.is_empty());
437                assert!(signals[0].is_cautionary());
438                let errors = update.error_history.unwrap();
439                assert!(errors[0].contains("slow"));
440            }
441            other => panic!("Expected Update with timeout caution, got {other:?}"),
442        }
443    }
444}