use std::sync::Arc;
use tokio::sync::Mutex;
use crate::cognitive::CognitiveState;
use crate::cognitive_signal::CognitiveSignal;
use crate::error::PeError;
use crate::lobe::{Lobe, LobeContext, LobeInput, LobeOutput};
use crate::lobe_cache::LobeCache;
use crate::node::{NodeContext, NodeFn, NodeFuture, NodeResult};
use super::cognitive::CognitiveStateUpdate;
pub struct LobeNode {
lobe: Arc<dyn Lobe>,
cache: Option<Arc<Mutex<LobeCache>>>,
}
const _: () = {
fn _assert_send_sync<T: Send + Sync>() {}
fn _check() {
_assert_send_sync::<LobeNode>();
}
};
impl LobeNode {
pub fn new(lobe: Arc<dyn Lobe>) -> Self {
Self { lobe, cache: None }
}
pub fn with_cache(lobe: Arc<dyn Lobe>, cache: Arc<Mutex<LobeCache>>) -> Self {
Self {
lobe,
cache: Some(cache),
}
}
}
impl NodeFn<CognitiveState> for LobeNode {
fn call(&self, state: &CognitiveState, ctx: &NodeContext) -> NodeFuture<CognitiveStateUpdate> {
let lobe = Arc::clone(&self.lobe);
let lobe_name = lobe.name().to_string();
let context = LobeContext::from_cognitive_state(state);
if !lobe.should_activate(&context) {
return Box::pin(async move { NodeResult::Update(CognitiveStateUpdate::default()) });
}
let input = LobeInput {
input: state.input.clone(),
context,
notes: state.working_notes.clone(),
runtime_services: ctx
.lobe_runtime_service_factory
.as_ref()
.map(|factory| factory.for_lobe(&lobe_name)),
};
let budget = lobe.budget();
let cache = self.cache.clone();
let input_key = state.input.clone();
Box::pin(async move {
if let Some(ref cache) = cache {
let guard = cache.lock().await;
if let Some(cached) = guard.get(&lobe_name, &input_key) {
let update = map_output_to_update(&lobe_name, cached.clone());
return NodeResult::Update(update);
}
}
let result = if let Some(max_dur) = budget.max_duration {
match tokio::time::timeout(max_dur, lobe.process(&input)).await {
Ok(result) => result,
Err(_) => Err(PeError::Timeout {
seconds: max_dur.as_secs_f64(),
}),
}
} else {
lobe.process(&input).await
};
match result {
Ok(output) => {
if let Some(ref cache) = cache {
let mut guard = cache.lock().await;
guard.put(&lobe_name, &input_key, output.clone());
}
let update = map_output_to_update(&lobe_name, output);
NodeResult::Update(update)
}
Err(e) => {
let update = CognitiveStateUpdate {
signals: Some(vec![CognitiveSignal::ProceedWithCaution {
concern: format!("Lobe '{lobe_name}' failed: {e}"),
}]),
error_history: Some(vec![format!("lobe:{lobe_name}:{e}")]),
..Default::default()
};
NodeResult::Update(update)
}
}
})
}
fn name(&self) -> &str {
self.lobe.name()
}
}
fn map_output_to_update(lobe_name: &str, mut output: LobeOutput) -> CognitiveStateUpdate {
output.lobe_name = lobe_name.to_string();
let replace_notes = output
.metadata
.remove("__meditate_notes")
.and_then(|v| serde_json::from_value(v).ok());
CognitiveStateUpdate {
stream_outputs: Some([(lobe_name.to_string(), output)].into_iter().collect()),
replace_working_notes: replace_notes,
..Default::default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::lobe::{LobeBudget, LobeFuture, LobeOutputFormat};
use std::time::Duration;
struct FixedLobe {
name: &'static str,
output: &'static str,
confidence: f64,
active: bool,
}
impl Lobe for FixedLobe {
fn name(&self) -> &str {
self.name
}
fn should_activate(&self, _ctx: &LobeContext) -> bool {
self.active
}
fn priority(&self) -> u32 {
10
}
fn budget(&self) -> LobeBudget {
LobeBudget {
max_tokens: 100,
max_duration: Some(Duration::from_secs(5)),
streaming: false,
}
}
fn output_format(&self) -> LobeOutputFormat {
LobeOutputFormat::FreeText
}
fn process(&self, _input: &LobeInput) -> LobeFuture {
let content = self.output.to_string();
let confidence = self.confidence;
Box::pin(async move { Ok(LobeOutput::new(content, confidence)) })
}
}
struct ErrorLobe;
impl Lobe for ErrorLobe {
fn name(&self) -> &str {
"error_lobe"
}
fn should_activate(&self, _ctx: &LobeContext) -> bool {
true
}
fn priority(&self) -> u32 {
10
}
fn budget(&self) -> LobeBudget {
LobeBudget::default()
}
fn output_format(&self) -> LobeOutputFormat {
LobeOutputFormat::FreeText
}
fn process(&self, _input: &LobeInput) -> LobeFuture {
Box::pin(async {
Err(PeError::Internal {
details: "lobe crashed".into(),
})
})
}
}
fn test_state() -> CognitiveState {
CognitiveState {
input: "analyze this code".into(),
confidence: 0.7,
..Default::default()
}
}
fn test_ctx() -> NodeContext {
NodeContext {
step: 1,
recursion_limit: 25,
node_name: "test".into(),
activation: crate::node::ActivationReason::EntryPoint,
metadata: Default::default(),
phase_store: crate::phase_store::PhaseStateStore::new(),
stream_sender: None,
tool_observer: None,
lobe_runtime_service_factory: None,
}
}
#[tokio::test]
async fn test_active_lobe_produces_output() {
let lobe = Arc::new(FixedLobe {
name: "analyst",
output: "Facts: code is correct",
confidence: 0.9,
active: true,
});
let node = LobeNode::new(lobe);
let result = node.call(&test_state(), &test_ctx()).await;
match result {
NodeResult::Update(update) => {
let outputs = update.stream_outputs.unwrap();
let analyst_output = outputs.get("analyst").unwrap();
assert_eq!(analyst_output.content, "Facts: code is correct");
assert!((analyst_output.confidence - 0.9).abs() < f64::EPSILON);
assert_eq!(analyst_output.lobe_name, "analyst");
}
other => panic!("Expected Update, got {other:?}"),
}
}
#[tokio::test]
async fn test_inactive_lobe_skipped() {
let lobe = Arc::new(FixedLobe {
name: "critic",
output: "should not appear",
confidence: 0.5,
active: false,
});
let node = LobeNode::new(lobe);
let result = node.call(&test_state(), &test_ctx()).await;
match result {
NodeResult::Update(update) => {
assert!(update.stream_outputs.is_none());
assert!(update.signals.is_none());
}
other => panic!("Expected empty Update, got {other:?}"),
}
}
#[tokio::test]
async fn test_error_lobe_produces_caution_signal() {
let node = LobeNode::new(Arc::new(ErrorLobe));
let result = node.call(&test_state(), &test_ctx()).await;
match result {
NodeResult::Update(update) => {
let signals = update.signals.unwrap();
assert_eq!(signals.len(), 1);
assert!(signals[0].is_cautionary());
let errors = update.error_history.unwrap();
assert_eq!(errors.len(), 1);
assert!(errors[0].contains("error_lobe"));
}
other => panic!("Expected Update with caution, got {other:?}"),
}
}
#[tokio::test]
async fn test_lobe_with_signals_propagated() {
struct SignalLobe;
impl Lobe for SignalLobe {
fn name(&self) -> &str {
"signal_lobe"
}
fn should_activate(&self, _: &LobeContext) -> bool {
true
}
fn priority(&self) -> u32 {
10
}
fn budget(&self) -> LobeBudget {
LobeBudget::default()
}
fn output_format(&self) -> LobeOutputFormat {
LobeOutputFormat::FreeText
}
fn process(&self, _: &LobeInput) -> LobeFuture {
Box::pin(async {
Ok(LobeOutput::new("risky", 0.3).with_signal(
CognitiveSignal::ProceedWithCaution {
concern: "low confidence".into(),
},
))
})
}
}
let node = LobeNode::new(Arc::new(SignalLobe));
let result = node.call(&test_state(), &test_ctx()).await;
match result {
NodeResult::Update(update) => {
let outputs = update.stream_outputs.unwrap();
let lobe_output = outputs.get("signal_lobe").unwrap();
assert_eq!(lobe_output.signals.len(), 1);
assert!(lobe_output.signals[0].is_cautionary());
}
other => panic!("Expected Update with signal, got {other:?}"),
}
}
#[tokio::test]
async fn test_node_name_matches_lobe_name() {
let lobe = Arc::new(FixedLobe {
name: "my_lobe",
output: "test",
confidence: 0.5,
active: true,
});
let node = LobeNode::new(lobe);
assert_eq!(node.name(), "my_lobe");
}
#[tokio::test]
async fn test_lobe_context_built_from_state() {
let state = CognitiveState {
input: "test".into(),
confidence: 0.85,
current_plan: Some("step 1: analyze".into()),
error_history: vec!["prev error".into()],
..Default::default()
};
let ctx = LobeContext::from_cognitive_state(&state);
assert!((ctx.confidence - 0.85).abs() < f64::EPSILON);
assert_eq!(ctx.current_plan.as_deref(), Some("step 1: analyze"));
assert_eq!(ctx.recent_errors, vec!["prev error"]);
}
#[tokio::test]
async fn test_lobe_timeout_produces_caution() {
struct SlowLobe;
impl Lobe for SlowLobe {
fn name(&self) -> &str {
"slow"
}
fn should_activate(&self, _: &LobeContext) -> bool {
true
}
fn priority(&self) -> u32 {
10
}
fn budget(&self) -> LobeBudget {
LobeBudget {
max_tokens: 100,
max_duration: Some(Duration::from_millis(10)),
streaming: false,
}
}
fn output_format(&self) -> LobeOutputFormat {
LobeOutputFormat::FreeText
}
fn process(&self, _: &LobeInput) -> LobeFuture {
Box::pin(async {
tokio::time::sleep(Duration::from_millis(200)).await;
Ok(LobeOutput::new("too slow", 0.5))
})
}
}
let node = LobeNode::new(Arc::new(SlowLobe));
let result = node.call(&test_state(), &test_ctx()).await;
match result {
NodeResult::Update(update) => {
let signals = update.signals.unwrap();
assert!(!signals.is_empty());
assert!(signals[0].is_cautionary());
let errors = update.error_history.unwrap();
assert!(errors[0].contains("slow"));
}
other => panic!("Expected Update with timeout caution, got {other:?}"),
}
}
}