1use std::sync::Arc;
9use tokio::sync::Mutex;
10
11use crate::cognitive::CognitiveState;
12use crate::cognitive_signal::CognitiveSignal;
13use crate::error::PeError;
14use crate::lobe::{Lobe, LobeContext, LobeInput, LobeOutput};
15use crate::lobe_cache::LobeCache;
16use crate::node::{NodeContext, NodeFn, NodeFuture, NodeResult};
17
18use super::cognitive::CognitiveStateUpdate;
19
20pub struct LobeNode {
34 lobe: Arc<dyn Lobe>,
35 cache: Option<Arc<Mutex<LobeCache>>>,
36}
37
38const _: () = {
40 fn _assert_send_sync<T: Send + Sync>() {}
41 fn _check() {
42 _assert_send_sync::<LobeNode>();
43 }
44};
45
46impl LobeNode {
47 pub fn new(lobe: Arc<dyn Lobe>) -> Self {
49 Self { lobe, cache: None }
50 }
51
52 pub fn with_cache(lobe: Arc<dyn Lobe>, cache: Arc<Mutex<LobeCache>>) -> Self {
58 Self {
59 lobe,
60 cache: Some(cache),
61 }
62 }
63}
64
65impl NodeFn<CognitiveState> for LobeNode {
66 fn call(&self, state: &CognitiveState, ctx: &NodeContext) -> NodeFuture<CognitiveStateUpdate> {
67 let lobe = Arc::clone(&self.lobe);
68 let lobe_name = lobe.name().to_string();
69
70 let context = LobeContext::from_cognitive_state(state);
72
73 if !lobe.should_activate(&context) {
75 return Box::pin(async move { NodeResult::Update(CognitiveStateUpdate::default()) });
76 }
77
78 let input = LobeInput {
80 input: state.input.clone(),
81 context,
82 notes: state.working_notes.clone(),
83 runtime_services: ctx
84 .lobe_runtime_service_factory
85 .as_ref()
86 .map(|factory| factory.for_lobe(&lobe_name)),
87 };
88
89 let budget = lobe.budget();
90 let cache = self.cache.clone();
91 let input_key = state.input.clone(); Box::pin(async move {
94 if let Some(ref cache) = cache {
96 let guard = cache.lock().await;
97 if let Some(cached) = guard.get(&lobe_name, &input_key) {
98 let update = map_output_to_update(&lobe_name, cached.clone());
99 return NodeResult::Update(update);
100 }
101 }
102
103 let result = if let Some(max_dur) = budget.max_duration {
105 match tokio::time::timeout(max_dur, lobe.process(&input)).await {
106 Ok(result) => result,
107 Err(_) => Err(PeError::Timeout {
108 seconds: max_dur.as_secs_f64(),
109 }),
110 }
111 } else {
112 lobe.process(&input).await
113 };
114
115 match result {
116 Ok(output) => {
117 if let Some(ref cache) = cache {
119 let mut guard = cache.lock().await;
120 guard.put(&lobe_name, &input_key, output.clone());
121 }
122 let update = map_output_to_update(&lobe_name, output);
123 NodeResult::Update(update)
124 }
125 Err(e) => {
126 let update = CognitiveStateUpdate {
129 signals: Some(vec![CognitiveSignal::ProceedWithCaution {
130 concern: format!("Lobe '{lobe_name}' failed: {e}"),
131 }]),
132 error_history: Some(vec![format!("lobe:{lobe_name}:{e}")]),
133 ..Default::default()
134 };
135 NodeResult::Update(update)
136 }
137 }
138 })
139 }
140
141 fn name(&self) -> &str {
142 self.lobe.name()
143 }
144}
145
146fn map_output_to_update(lobe_name: &str, mut output: LobeOutput) -> CognitiveStateUpdate {
156 output.lobe_name = lobe_name.to_string();
157
158 let replace_notes = output
160 .metadata
161 .remove("__meditate_notes")
162 .and_then(|v| serde_json::from_value(v).ok());
163
164 CognitiveStateUpdate {
165 stream_outputs: Some([(lobe_name.to_string(), output)].into_iter().collect()),
166 replace_working_notes: replace_notes,
167 ..Default::default()
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use crate::lobe::{LobeBudget, LobeFuture, LobeOutputFormat};
175 use std::time::Duration;
176
177 struct FixedLobe {
179 name: &'static str,
180 output: &'static str,
181 confidence: f64,
182 active: bool,
183 }
184
185 impl Lobe for FixedLobe {
186 fn name(&self) -> &str {
187 self.name
188 }
189 fn should_activate(&self, _ctx: &LobeContext) -> bool {
190 self.active
191 }
192 fn priority(&self) -> u32 {
193 10
194 }
195 fn budget(&self) -> LobeBudget {
196 LobeBudget {
197 max_tokens: 100,
198 max_duration: Some(Duration::from_secs(5)),
199 streaming: false,
200 }
201 }
202 fn output_format(&self) -> LobeOutputFormat {
203 LobeOutputFormat::FreeText
204 }
205 fn process(&self, _input: &LobeInput) -> LobeFuture {
206 let content = self.output.to_string();
207 let confidence = self.confidence;
208 Box::pin(async move { Ok(LobeOutput::new(content, confidence)) })
209 }
210 }
211
212 struct ErrorLobe;
214
215 impl Lobe for ErrorLobe {
216 fn name(&self) -> &str {
217 "error_lobe"
218 }
219 fn should_activate(&self, _ctx: &LobeContext) -> bool {
220 true
221 }
222 fn priority(&self) -> u32 {
223 10
224 }
225 fn budget(&self) -> LobeBudget {
226 LobeBudget::default()
227 }
228 fn output_format(&self) -> LobeOutputFormat {
229 LobeOutputFormat::FreeText
230 }
231 fn process(&self, _input: &LobeInput) -> LobeFuture {
232 Box::pin(async {
233 Err(PeError::Internal {
234 details: "lobe crashed".into(),
235 })
236 })
237 }
238 }
239
240 fn test_state() -> CognitiveState {
241 CognitiveState {
242 input: "analyze this code".into(),
243 confidence: 0.7,
244 ..Default::default()
245 }
246 }
247
248 fn test_ctx() -> NodeContext {
249 NodeContext {
250 step: 1,
251 recursion_limit: 25,
252 node_name: "test".into(),
253 activation: crate::node::ActivationReason::EntryPoint,
254 metadata: Default::default(),
255 phase_store: crate::phase_store::PhaseStateStore::new(),
256 stream_sender: None,
257 tool_observer: None,
258 lobe_runtime_service_factory: None,
259 }
260 }
261
262 #[tokio::test]
263 async fn test_active_lobe_produces_output() {
264 let lobe = Arc::new(FixedLobe {
265 name: "analyst",
266 output: "Facts: code is correct",
267 confidence: 0.9,
268 active: true,
269 });
270 let node = LobeNode::new(lobe);
271 let result = node.call(&test_state(), &test_ctx()).await;
272
273 match result {
274 NodeResult::Update(update) => {
275 let outputs = update.stream_outputs.unwrap();
276 let analyst_output = outputs.get("analyst").unwrap();
277 assert_eq!(analyst_output.content, "Facts: code is correct");
278 assert!((analyst_output.confidence - 0.9).abs() < f64::EPSILON);
279 assert_eq!(analyst_output.lobe_name, "analyst");
280 }
281 other => panic!("Expected Update, got {other:?}"),
282 }
283 }
284
285 #[tokio::test]
286 async fn test_inactive_lobe_skipped() {
287 let lobe = Arc::new(FixedLobe {
288 name: "critic",
289 output: "should not appear",
290 confidence: 0.5,
291 active: false,
292 });
293 let node = LobeNode::new(lobe);
294 let result = node.call(&test_state(), &test_ctx()).await;
295
296 match result {
297 NodeResult::Update(update) => {
298 assert!(update.stream_outputs.is_none());
299 assert!(update.signals.is_none());
300 }
301 other => panic!("Expected empty Update, got {other:?}"),
302 }
303 }
304
305 #[tokio::test]
306 async fn test_error_lobe_produces_caution_signal() {
307 let node = LobeNode::new(Arc::new(ErrorLobe));
308 let result = node.call(&test_state(), &test_ctx()).await;
309
310 match result {
311 NodeResult::Update(update) => {
312 let signals = update.signals.unwrap();
314 assert_eq!(signals.len(), 1);
315 assert!(signals[0].is_cautionary());
316
317 let errors = update.error_history.unwrap();
319 assert_eq!(errors.len(), 1);
320 assert!(errors[0].contains("error_lobe"));
321 }
322 other => panic!("Expected Update with caution, got {other:?}"),
323 }
324 }
325
326 #[tokio::test]
327 async fn test_lobe_with_signals_propagated() {
328 struct SignalLobe;
329 impl Lobe for SignalLobe {
330 fn name(&self) -> &str {
331 "signal_lobe"
332 }
333 fn should_activate(&self, _: &LobeContext) -> bool {
334 true
335 }
336 fn priority(&self) -> u32 {
337 10
338 }
339 fn budget(&self) -> LobeBudget {
340 LobeBudget::default()
341 }
342 fn output_format(&self) -> LobeOutputFormat {
343 LobeOutputFormat::FreeText
344 }
345 fn process(&self, _: &LobeInput) -> LobeFuture {
346 Box::pin(async {
347 Ok(LobeOutput::new("risky", 0.3).with_signal(
348 CognitiveSignal::ProceedWithCaution {
349 concern: "low confidence".into(),
350 },
351 ))
352 })
353 }
354 }
355
356 let node = LobeNode::new(Arc::new(SignalLobe));
357 let result = node.call(&test_state(), &test_ctx()).await;
358
359 match result {
360 NodeResult::Update(update) => {
361 let outputs = update.stream_outputs.unwrap();
363 let lobe_output = outputs.get("signal_lobe").unwrap();
364 assert_eq!(lobe_output.signals.len(), 1);
365 assert!(lobe_output.signals[0].is_cautionary());
366 }
367 other => panic!("Expected Update with signal, got {other:?}"),
368 }
369 }
370
371 #[tokio::test]
372 async fn test_node_name_matches_lobe_name() {
373 let lobe = Arc::new(FixedLobe {
374 name: "my_lobe",
375 output: "test",
376 confidence: 0.5,
377 active: true,
378 });
379 let node = LobeNode::new(lobe);
380 assert_eq!(node.name(), "my_lobe");
381 }
382
383 #[tokio::test]
384 async fn test_lobe_context_built_from_state() {
385 let state = CognitiveState {
386 input: "test".into(),
387 confidence: 0.85,
388 current_plan: Some("step 1: analyze".into()),
389 error_history: vec!["prev error".into()],
390 ..Default::default()
391 };
392 let ctx = LobeContext::from_cognitive_state(&state);
393 assert!((ctx.confidence - 0.85).abs() < f64::EPSILON);
394 assert_eq!(ctx.current_plan.as_deref(), Some("step 1: analyze"));
395 assert_eq!(ctx.recent_errors, vec!["prev error"]);
396 }
397
398 #[tokio::test]
399 async fn test_lobe_timeout_produces_caution() {
400 struct SlowLobe;
401 impl Lobe for SlowLobe {
402 fn name(&self) -> &str {
403 "slow"
404 }
405 fn should_activate(&self, _: &LobeContext) -> bool {
406 true
407 }
408 fn priority(&self) -> u32 {
409 10
410 }
411 fn budget(&self) -> LobeBudget {
412 LobeBudget {
413 max_tokens: 100,
414 max_duration: Some(Duration::from_millis(10)),
415 streaming: false,
416 }
417 }
418 fn output_format(&self) -> LobeOutputFormat {
419 LobeOutputFormat::FreeText
420 }
421 fn process(&self, _: &LobeInput) -> LobeFuture {
422 Box::pin(async {
423 tokio::time::sleep(Duration::from_millis(200)).await;
424 Ok(LobeOutput::new("too slow", 0.5))
425 })
426 }
427 }
428
429 let node = LobeNode::new(Arc::new(SlowLobe));
430 let result = node.call(&test_state(), &test_ctx()).await;
431
432 match result {
433 NodeResult::Update(update) => {
434 let signals = update.signals.unwrap();
436 assert!(!signals.is_empty());
437 assert!(signals[0].is_cautionary());
438 let errors = update.error_history.unwrap();
439 assert!(errors[0].contains("slow"));
440 }
441 other => panic!("Expected Update with timeout caution, got {other:?}"),
442 }
443 }
444}