Skip to main content

pe_core/
lobe.rs

1//! Lobe — the cognitive processing unit.
2//!
3//! A lobe is like a brain region: it has a specialized function, runs in
4//! parallel with other lobes, and feeds its output into synthesis. The
5//! library provides the trait — users implement domain-specific lobes.
6//!
7//! Lobes compile down to `NodeFn<CognitiveState>` via the [`LobeNode`](crate::lobe_node::LobeNode)
8//! bridge (in `lobe_node` module), so the cognitive graph uses the same
9//! Pregel engine as the outer execution graph.
10
11use crate::cognitive_memory::WorkingNote;
12use crate::cognitive_signal::CognitiveSignal;
13use crate::error::PeError;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::future::Future;
17use std::path::PathBuf;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::time::Duration;
21
22/// A cognitive processing unit — one lens on the agent's reasoning.
23///
24/// Lobes are the building blocks of the cognitive architecture.
25/// Each lobe receives lean, restricted context (not the full conversation),
26/// processes it through its specialized function, and returns structured output.
27///
28/// # Implementing a Lobe
29///
30/// ```ignore
31/// struct CriticLobe { model: Arc<dyn LlmProvider> }
32///
33/// impl Lobe for CriticLobe {
34///     fn name(&self) -> &str { "critic" }
35///     fn should_activate(&self, _context: &LobeContext) -> bool { true }
36///     fn priority(&self) -> u32 { 20 }
37///     fn budget(&self) -> LobeBudget { LobeBudget::default() }
38///     fn output_format(&self) -> LobeOutputFormat { LobeOutputFormat::FreeText }
39///     fn process(&self, input: &LobeInput) -> LobeFuture {
40///         let model = self.model.clone();
41///         let prompt = input.input.clone();
42///         Box::pin(async move {
43///             // Call LLM with critic-focused prompt
44///             Ok(LobeOutput::new("Risks identified: ...".into(), 0.85))
45///         })
46///     }
47/// }
48/// ```
49pub trait Lobe: Send + Sync {
50    /// Unique name for this lobe (used as key in stream_outputs).
51    fn name(&self) -> &str;
52
53    /// Whether this lobe should run for the current context.
54    /// Returning `false` skips the lobe entirely (zero cost).
55    /// Receives a lean [`LobeContext`], not the full `CognitiveState`.
56    fn should_activate(&self, context: &LobeContext) -> bool;
57
58    /// Execution priority — lower values run first in synthesis ordering.
59    /// Does NOT affect parallel execution (all lobes run concurrently).
60    fn priority(&self) -> u32;
61
62    /// Resource limits for this lobe's execution.
63    fn budget(&self) -> LobeBudget;
64
65    /// What kind of output this lobe produces.
66    fn output_format(&self) -> LobeOutputFormat;
67
68    /// Process the input and produce output. This is the lobe's main work.
69    fn process(&self, input: &LobeInput) -> LobeFuture;
70}
71
72/// Future type for lobe processing — async, Send, returns Result.
73pub type LobeFuture = Pin<Box<dyn Future<Output = Result<LobeOutput, PeError>> + Send>>;
74
75/// Lean, restricted input for a lobe.
76///
77/// Each lobe sees only what it needs — not the full conversation.
78/// This keeps cognitive processing cheap and focused.
79#[derive(Clone)]
80pub struct LobeInput {
81    /// The task/prompt being processed.
82    pub input: String,
83
84    /// Context slice relevant to this lobe.
85    pub context: LobeContext,
86
87    /// Recent working notes (for lobes that build on prior observations).
88    pub notes: Vec<WorkingNote>,
89
90    /// Optional runtime-owned services available to this lobe.
91    ///
92    /// When present, the lobe can request accountable runtime services
93    /// such as bounded inspection through the same runtime substrate as
94    /// the outer agent.
95    pub runtime_services: Option<Arc<dyn LobeRuntimeServices>>,
96}
97
98impl std::fmt::Debug for LobeInput {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        f.debug_struct("LobeInput")
101            .field("input", &self.input)
102            .field("context", &self.context)
103            .field("notes", &self.notes)
104            .field("has_runtime_services", &self.runtime_services.is_some())
105            .finish()
106    }
107}
108
109/// Restricted context for a lobe — not the full conversation.
110///
111/// Use [`from_cognitive_state`](LobeContext::from_cognitive_state) to build
112/// from a full `CognitiveState`, or construct directly for testing.
113#[derive(Debug, Clone, Default)]
114pub struct LobeContext {
115    /// Agent's self-description (from SelfModel.self_context, summarized).
116    pub self_summary: Option<String>,
117
118    /// Recent error history (for lobes that learn from failures).
119    pub recent_errors: Vec<String>,
120
121    /// Current confidence level (from matrix C value).
122    pub confidence: f64,
123
124    /// Current plan (if one exists).
125    pub current_plan: Option<String>,
126
127    /// Arbitrary metadata from the outer graph.
128    pub metadata: HashMap<String, serde_json::Value>,
129}
130
131impl LobeContext {
132    /// Build a lean context from a full [`CognitiveState`](crate::cognitive::CognitiveState).
133    ///
134    /// This is the canonical conversion — used by both [`LobeNode`](crate::lobe_node::LobeNode)
135    /// and [`LobeRegistry`](crate::lobe_registry::LobeRegistry) to avoid duplication.
136    pub fn from_cognitive_state(state: &crate::cognitive::CognitiveState) -> Self {
137        let mut metadata = HashMap::new();
138        metadata.insert(
139            "working_notes_count".into(),
140            serde_json::Value::from(state.working_notes.len()),
141        );
142        metadata.insert(
143            "failure_records_count".into(),
144            serde_json::Value::from(state.failure_records.len()),
145        );
146        Self {
147            self_summary: None, // Populated by the cognitive graph runner from SelfModel
148            recent_errors: state.error_history.clone(),
149            confidence: state.confidence,
150            current_plan: state.current_plan.clone(),
151            metadata,
152        }
153    }
154}
155
156/// Output from a lobe's processing.
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
158pub struct LobeOutput {
159    /// Name of the lobe that produced this output (for synthesizer labeling).
160    #[serde(default)]
161    pub lobe_name: String,
162
163    /// The lobe's output content.
164    pub content: String,
165
166    /// How confident the lobe is in its output (0.0 to 1.0).
167    pub confidence: f64,
168
169    /// Signals to emit to the outer graph.
170    #[serde(default)]
171    pub signals: Vec<CognitiveSignal>,
172
173    /// Lobe-specific metadata.
174    #[serde(default)]
175    pub metadata: HashMap<String, serde_json::Value>,
176}
177
178impl LobeOutput {
179    /// Create a new lobe output with content and confidence.
180    pub fn new(content: impl Into<String>, confidence: f64) -> Self {
181        Self {
182            lobe_name: String::new(),
183            content: content.into(),
184            confidence,
185            signals: Vec::new(),
186            metadata: HashMap::new(),
187        }
188    }
189
190    /// Set the lobe name on this output.
191    #[must_use]
192    pub fn with_lobe_name(mut self, name: impl Into<String>) -> Self {
193        self.lobe_name = name.into();
194        self
195    }
196
197    /// Add a signal to emit.
198    #[must_use]
199    pub fn with_signal(mut self, signal: CognitiveSignal) -> Self {
200        self.signals.push(signal);
201        self
202    }
203}
204
205/// Resource limits for a single lobe.
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct LobeBudget {
208    /// Max tokens this lobe can consume.
209    pub max_tokens: u32,
210
211    /// Max wall-clock time for this lobe.
212    #[serde(default, skip_serializing_if = "Option::is_none")]
213    pub max_duration: Option<Duration>,
214
215    /// Whether this lobe streams LLM tokens incrementally.
216    ///
217    /// Default is `false` because lobes run in parallel — streaming multiple
218    /// lobes concurrently causes SSE I/O thrash with no user-visible benefit
219    /// (the synthesizer waits for all lobes anyway). Enable selectively for
220    /// lobes that need token-level progress reporting.
221    #[serde(default)]
222    pub streaming: bool,
223}
224
225impl Default for LobeBudget {
226    fn default() -> Self {
227        Self {
228            max_tokens: 500,
229            max_duration: Some(Duration::from_secs(5)),
230            streaming: false,
231        }
232    }
233}
234
235/// What kind of output a lobe produces.
236///
237/// Helps the synthesizer interpret outputs correctly.
238#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
239#[non_exhaustive]
240pub enum LobeOutputFormat {
241    /// Unstructured text (most common).
242    FreeText,
243    /// JSON-structured output.
244    Structured,
245    /// Numeric score (0.0 to 1.0).
246    Score,
247    /// Yes/no decision.
248    Boolean,
249    /// User-defined format.
250    Custom(String),
251}
252
253/// When a lobe activates — used by the cognitive graph scheduler.
254///
255/// Currently informational. The `Lobe::should_activate()` method is the
256/// runtime activation gate. This enum is available for future scheduling
257/// optimizations (e.g., skip `should_activate()` call for `AlwaysOn` lobes).
258#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
259#[non_exhaustive]
260pub enum LobeActivation {
261    /// Runs every cognitive cycle.
262    AlwaysOn,
263    /// Runs only when `should_activate()` returns true.
264    OnDemand,
265    /// Runs based on external trigger (e.g., low confidence).
266    Conditional,
267}
268
269/// Runtime-owned services available to a lobe.
270///
271/// Defined in `pe-core` so lobes can depend on the interface without
272/// depending on `pe-runtime`.
273pub trait LobeRuntimeServices: Send + Sync {
274    /// Run a bounded runtime-owned inspection.
275    fn inspect(&self, request: LobeInspectionRequest) -> Result<LobeInspectionResult, PeError>;
276}
277
278/// Factory for creating source-attributed runtime service handles per lobe.
279pub trait LobeRuntimeServiceFactory: Send + Sync {
280    /// Build runtime services for the named lobe.
281    fn for_lobe(&self, lobe_name: &str) -> Arc<dyn LobeRuntimeServices>;
282}
283
284/// Lobe-facing request for bounded runtime-owned inspection.
285#[derive(Debug, Clone, Serialize, Deserialize)]
286pub struct LobeInspectionRequest {
287    pub root: PathBuf,
288    pub allowed_roots: Vec<PathBuf>,
289    pub max_files: Option<u64>,
290    pub max_bytes: Option<u64>,
291    pub max_depth: Option<usize>,
292    pub include_contents: bool,
293    pub include_extensions: Vec<String>,
294    pub exclude_names: Vec<String>,
295    pub exclude_path_prefixes: Vec<PathBuf>,
296    pub max_preview_bytes_per_file: Option<u64>,
297    pub skip_hidden: bool,
298}
299
300impl LobeInspectionRequest {
301    /// Create a new request rooted at `root`.
302    pub fn new(root: impl Into<PathBuf>) -> Self {
303        Self {
304            root: root.into(),
305            allowed_roots: Vec::new(),
306            max_files: None,
307            max_bytes: None,
308            max_depth: None,
309            include_contents: false,
310            include_extensions: Vec::new(),
311            exclude_names: Vec::new(),
312            exclude_path_prefixes: Vec::new(),
313            max_preview_bytes_per_file: None,
314            skip_hidden: false,
315        }
316    }
317
318    #[must_use = "builder methods return the modified builder"]
319    pub fn with_allowed_roots(mut self, roots: impl IntoIterator<Item = PathBuf>) -> Self {
320        self.allowed_roots = roots.into_iter().collect();
321        self
322    }
323
324    #[must_use = "builder methods return the modified builder"]
325    pub fn with_contents(mut self, include_contents: bool) -> Self {
326        self.include_contents = include_contents;
327        self
328    }
329
330    #[must_use = "builder methods return the modified builder"]
331    pub fn with_extensions<I, S>(mut self, extensions: I) -> Self
332    where
333        I: IntoIterator<Item = S>,
334        S: AsRef<str>,
335    {
336        self.include_extensions = extensions
337            .into_iter()
338            .map(|ext| ext.as_ref().trim_start_matches('.').to_ascii_lowercase())
339            .filter(|ext| !ext.is_empty())
340            .collect();
341        self
342    }
343
344    #[must_use = "builder methods return the modified builder"]
345    pub fn with_excluded_names<I, S>(mut self, names: I) -> Self
346    where
347        I: IntoIterator<Item = S>,
348        S: AsRef<str>,
349    {
350        self.exclude_names = names
351            .into_iter()
352            .map(|name| name.as_ref().trim().to_string())
353            .filter(|name| !name.is_empty())
354            .collect();
355        self
356    }
357
358    #[must_use = "builder methods return the modified builder"]
359    pub fn with_excluded_path_prefixes<I, P>(mut self, prefixes: I) -> Self
360    where
361        I: IntoIterator<Item = P>,
362        P: Into<PathBuf>,
363    {
364        self.exclude_path_prefixes = prefixes.into_iter().map(Into::into).collect();
365        self
366    }
367
368    #[must_use = "builder methods return the modified builder"]
369    pub fn with_max_preview_bytes_per_file(mut self, max_bytes: u64) -> Self {
370        self.max_preview_bytes_per_file = Some(max_bytes);
371        self
372    }
373
374    #[must_use = "builder methods return the modified builder"]
375    pub fn with_skip_hidden(mut self, skip_hidden: bool) -> Self {
376        self.skip_hidden = skip_hidden;
377        self
378    }
379
380    #[must_use = "builder methods return the modified builder"]
381    pub fn with_max_files(mut self, max_files: u64) -> Self {
382        self.max_files = Some(max_files);
383        self
384    }
385
386    #[must_use = "builder methods return the modified builder"]
387    pub fn with_max_bytes(mut self, max_bytes: u64) -> Self {
388        self.max_bytes = Some(max_bytes);
389        self
390    }
391
392    #[must_use = "builder methods return the modified builder"]
393    pub fn with_max_depth(mut self, max_depth: usize) -> Self {
394        self.max_depth = Some(max_depth);
395        self
396    }
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize)]
400pub struct LobeInspectionEntry {
401    pub path: PathBuf,
402    pub kind: LobeInspectionEntryKind,
403    pub depth: usize,
404    pub size_bytes: Option<u64>,
405    pub content_preview: Option<String>,
406    pub content_truncated: bool,
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
410pub enum LobeInspectionEntryKind {
411    Directory,
412    File,
413}
414
415#[derive(Debug, Clone, Serialize, Deserialize)]
416pub struct LobeInspectionResult {
417    pub root: PathBuf,
418    pub max_files: Option<u64>,
419    pub max_bytes: Option<u64>,
420    pub max_depth: Option<usize>,
421    pub entries: Vec<LobeInspectionEntry>,
422    pub files_seen: u64,
423    pub bytes_read: u64,
424    pub truncated: bool,
425    pub truncation_reason: Option<String>,
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431    use crate::cognitive_signal::CognitiveSignal;
432
433    #[test]
434    fn test_lobe_output_creation() {
435        let output = LobeOutput::new("analysis result", 0.92);
436        assert_eq!(output.content, "analysis result");
437        assert!((output.confidence - 0.92).abs() < f64::EPSILON);
438        assert!(output.signals.is_empty());
439    }
440
441    #[test]
442    fn test_lobe_output_with_signal() {
443        let output =
444            LobeOutput::new("risky", 0.3).with_signal(CognitiveSignal::ProceedWithCaution {
445                concern: "low confidence".into(),
446            });
447        assert_eq!(output.signals.len(), 1);
448        assert!(output.signals[0].is_cautionary());
449    }
450
451    #[test]
452    fn test_lobe_budget_defaults() {
453        let budget = LobeBudget::default();
454        assert_eq!(budget.max_tokens, 500);
455        assert_eq!(budget.max_duration, Some(Duration::from_secs(5)));
456    }
457
458    #[test]
459    fn test_lobe_output_format_variants() {
460        let formats = vec![
461            LobeOutputFormat::FreeText,
462            LobeOutputFormat::Score,
463            LobeOutputFormat::Boolean,
464            LobeOutputFormat::Custom("risk_matrix".into()),
465        ];
466        for fmt in &formats {
467            let json = serde_json::to_string(fmt).unwrap();
468            let back: LobeOutputFormat = serde_json::from_str(&json).unwrap();
469            assert_eq!(&back, fmt);
470        }
471    }
472
473    #[test]
474    fn test_lobe_input_construction() {
475        let input = LobeInput {
476            input: "analyze this code".into(),
477            context: LobeContext {
478                confidence: 0.7,
479                current_plan: Some("review then test".into()),
480                ..Default::default()
481            },
482            notes: vec![],
483            runtime_services: None,
484        };
485        assert_eq!(input.input, "analyze this code");
486        assert!((input.context.confidence - 0.7).abs() < f64::EPSILON);
487    }
488
489    #[test]
490    fn test_lobe_budget_streaming_default_false() {
491        let budget = LobeBudget::default();
492        assert!(
493            !budget.streaming,
494            "default streaming must be false — parallel lobes + SSE = I/O thrash"
495        );
496    }
497
498    #[test]
499    fn test_lobe_budget_streaming_serialization() {
500        // Round-trip with streaming = true
501        let budget_on = LobeBudget {
502            streaming: true,
503            ..Default::default()
504        };
505        let json = serde_json::to_string(&budget_on).unwrap();
506        let back: LobeBudget = serde_json::from_str(&json).unwrap();
507        assert!(back.streaming);
508
509        // Round-trip with streaming = false
510        let budget_off = LobeBudget::default();
511        let json = serde_json::to_string(&budget_off).unwrap();
512        let back: LobeBudget = serde_json::from_str(&json).unwrap();
513        assert!(!back.streaming);
514
515        // Deserialization from JSON missing the field defaults to false
516        let json_no_field = r#"{"max_tokens":500}"#;
517        let back: LobeBudget = serde_json::from_str(json_no_field).unwrap();
518        assert!(!back.streaming);
519    }
520
521    #[test]
522    fn test_lobe_context_metadata_counts() {
523        use crate::cognitive::CognitiveState;
524        use crate::cognitive_memory::{NoteCategory, WorkingNote};
525        use crate::self_model::FailureRecord;
526
527        let state = CognitiveState {
528            working_notes: vec![
529                WorkingNote::new("note 1", NoteCategory::Discovery),
530                WorkingNote::new("note 2", NoteCategory::Concern),
531                WorkingNote::new("note 3", NoteCategory::Reflection),
532            ],
533            failure_records: vec![
534                FailureRecord::new("db", "ALTER TABLE"),
535                FailureRecord::new("api", "POST /users"),
536            ],
537            ..Default::default()
538        };
539
540        let ctx = LobeContext::from_cognitive_state(&state);
541        assert_eq!(
542            ctx.metadata.get("working_notes_count"),
543            Some(&serde_json::json!(3)),
544            "metadata must contain working_notes_count from CognitiveState"
545        );
546        assert_eq!(
547            ctx.metadata.get("failure_records_count"),
548            Some(&serde_json::json!(2)),
549            "metadata must contain failure_records_count from CognitiveState"
550        );
551    }
552}