Skip to main content

pulsehive_runtime/
perception.rs

1//! Lens-based perception pipeline for the agentic loop.
2//!
3//! Implements the Perceive phase: query substrate → re-rank through lens → format as
4//! intrinsic knowledge. Each agent sees the same substrate differently based on its lens.
5
6use pulsedb::{Activity, CollectiveId, Experience, SubstrateProvider, Timestamp};
7use pulsehive_core::error::Result;
8use pulsehive_core::lens::{ExperienceTypeTag, Lens, RecencyCurve};
9use pulsehive_core::llm::Message;
10use tracing::Instrument;
11
12// ── Query Phase (#24) ────────────────────────────────────────────────
13
14/// Query the substrate for perception candidates.
15///
16/// If `lens.purpose_embedding` is set, uses semantic search via `search_similar`.
17/// Otherwise falls back to `get_recent` with domain post-filtering.
18pub async fn query_substrate(
19    substrate: &dyn SubstrateProvider,
20    lens: &Lens,
21    collective_id: CollectiveId,
22) -> Result<(Vec<Experience>, Vec<Activity>)> {
23    let fetch_limit = lens.attention_budget * 2; // Over-fetch for re-ranking headroom
24
25    let experiences = if !lens.purpose_embedding.is_empty() {
26        // Semantic search using the lens purpose embedding
27        let results = substrate
28            .search_similar(collective_id, &lens.purpose_embedding, fetch_limit)
29            .await?;
30        results.into_iter().map(|(exp, _sim)| exp).collect()
31    } else {
32        // Fallback: get recent experiences
33        substrate.get_recent(collective_id, fetch_limit).await?
34    };
35
36    // Post-filter by domain if lens has domain focus
37    let experiences = if lens.domain_focus.is_empty() {
38        experiences
39    } else {
40        experiences
41            .into_iter()
42            .filter(|exp| {
43                // Keep if any experience domain matches any lens domain
44                exp.domain
45                    .iter()
46                    .any(|d| lens.domain_focus.iter().any(|ld| ld == d))
47                    || exp.domain.is_empty() // Keep domain-less experiences
48            })
49            .collect()
50    };
51
52    // Fetch active agents for awareness
53    let activities = substrate
54        .get_activities(collective_id)
55        .await
56        .unwrap_or_default();
57
58    Ok((experiences, activities))
59}
60
61// ── Re-Rank Phase (#25) ──────────────────────────────────────────────
62
63/// Re-rank experiences through the lens using domain, type, and temporal weighting.
64///
65/// When `attractor_config` is provided, high-importance experiences additionally
66/// boost nearby experiences via attractor dynamics (additive to the multiplicative base score).
67///
68/// Returns experiences sorted by composite score (descending), truncated to
69/// `lens.attention_budget`.
70pub fn rerank(
71    experiences: Vec<Experience>,
72    lens: &Lens,
73    attractor_config: Option<&crate::field::AttractorConfig>,
74) -> Vec<(Experience, f32)> {
75    let now = Timestamp::now();
76
77    // Pre-compute attractor dynamics and cache embeddings if config provided
78    let attractors: Vec<(crate::field::AttractorDynamics, Vec<f32>)> = match attractor_config {
79        Some(config) => experiences
80            .iter()
81            .map(|exp| {
82                (
83                    crate::field::AttractorDynamics::from_experience(exp, config),
84                    exp.embedding.clone(),
85                )
86            })
87            .collect(),
88        None => vec![],
89    };
90
91    let mut scored: Vec<(Experience, f32)> = experiences
92        .into_iter()
93        .enumerate()
94        .map(|(idx, exp)| {
95            let base_score = compute_score(&exp, lens, now);
96
97            // Add attractor influence: sum of how much nearby high-strength
98            // experiences attract this one. Additive boost (never reduces score).
99            let attractor_boost = if !attractors.is_empty() && !exp.embedding.is_empty() {
100                attractors
101                    .iter()
102                    .enumerate()
103                    .filter(|(j, _)| *j != idx)
104                    .map(|(_, (attr, attr_emb))| attr.influence_at(&exp.embedding, attr_emb))
105                    .sum::<f32>()
106            } else {
107                0.0
108            };
109
110            (exp, base_score + attractor_boost)
111        })
112        .collect();
113
114    // Sort by score descending
115    scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
116
117    // Truncate to attention budget
118    scored.truncate(lens.attention_budget);
119
120    scored
121}
122
123/// Compute composite perception score for a single experience.
124fn compute_score(exp: &Experience, lens: &Lens, now: Timestamp) -> f32 {
125    let domain_weight = compute_domain_weight(exp, lens);
126    let type_weight = compute_type_weight(exp, lens);
127    let temporal_score = compute_temporal_score(exp, lens, now);
128
129    domain_weight * type_weight * temporal_score
130}
131
132/// Domain relevance: 1.5x boost if experience domain overlaps lens focus.
133fn compute_domain_weight(exp: &Experience, lens: &Lens) -> f32 {
134    if lens.domain_focus.is_empty() {
135        return 1.0;
136    }
137    let matches = exp
138        .domain
139        .iter()
140        .any(|d| lens.domain_focus.iter().any(|ld| ld == d));
141    if matches {
142        1.5
143    } else {
144        1.0
145    }
146}
147
148/// Type weight from lens configuration, default 1.0.
149fn compute_type_weight(exp: &Experience, lens: &Lens) -> f32 {
150    let tag = ExperienceTypeTag::from_experience_type(&exp.experience_type);
151    *lens.type_weights.get(&tag).unwrap_or(&1.0)
152}
153
154/// Temporal decay + reinforcement based on recency curve.
155fn compute_temporal_score(exp: &Experience, lens: &Lens, now: Timestamp) -> f32 {
156    let age_hours = (now.0 - exp.timestamp.0) as f32 / (1000.0 * 3600.0);
157    let age_hours = age_hours.max(0.0); // Guard against negative (clock skew)
158    let reinforcement = 1.0 + (exp.applications as f32 * 0.1);
159
160    match &lens.recency_curve {
161        RecencyCurve::Exponential { half_life_hours } => {
162            let decay = 0.5_f32.powf(age_hours / half_life_hours);
163            exp.importance * decay * reinforcement
164        }
165        RecencyCurve::Uniform => exp.importance * reinforcement,
166    }
167}
168
169// ── Format Phase (#26) ───────────────────────────────────────────────
170
171/// Format perceived experiences as intrinsic knowledge messages.
172///
173/// Produces "You understand that..." format (not "Retrieved documents say...").
174/// Knowledge is woven into the agent's identity as its own understanding.
175pub fn format_as_intrinsic_knowledge(
176    experiences: &[Experience],
177    activities: &[Activity],
178) -> Vec<Message> {
179    if experiences.is_empty() && activities.is_empty() {
180        return vec![];
181    }
182
183    let mut parts = Vec::new();
184
185    if !experiences.is_empty() {
186        parts.push("Based on your previous experience and knowledge:\n".to_string());
187        for exp in experiences {
188            // Truncate long content for context window efficiency
189            let content = if exp.content.len() > 500 {
190                format!("{}...", &exp.content[..500])
191            } else {
192                exp.content.clone()
193            };
194            parts.push(format!("• You understand that {content}"));
195        }
196    }
197
198    if !activities.is_empty() {
199        if !parts.is_empty() {
200            parts.push(String::new()); // blank line separator
201        }
202        for activity in activities {
203            let task_info = activity
204                .current_task
205                .as_deref()
206                .unwrap_or("an unspecified task");
207            parts.push(format!(
208                "• You're aware that agent {} is working on {}",
209                activity.agent_id, task_info
210            ));
211        }
212    }
213
214    vec![Message::system(parts.join("\n"))]
215}
216
217// ── Budget Packing (#32) ─────────────────────────────────────────────
218
219/// Pack ranked experiences within the token budget.
220///
221/// Greedily selects experiences in score order until the token or count
222/// budget is exhausted. Token estimation uses chars/4 + overhead.
223pub fn pack_within_budget(
224    ranked: Vec<(Experience, f32)>,
225    budget: &pulsehive_core::context::ContextBudget,
226) -> Vec<Experience> {
227    use pulsehive_core::context::estimate_tokens;
228
229    let mut packed = Vec::new();
230    let mut tokens_used: u32 = 0;
231
232    for (exp, _score) in ranked {
233        if packed.len() >= budget.max_experiences {
234            break;
235        }
236        let est = estimate_tokens(&exp.content);
237        if tokens_used + est > budget.max_tokens {
238            break;
239        }
240        tokens_used += est;
241        packed.push(exp);
242    }
243
244    packed
245}
246
247// ── Full Assembly (#33) ──────────────────────────────────────────────
248
249/// Assemble budget-aware context from the substrate through the lens.
250///
251/// Complete pipeline: query → re-rank → budget pack → format as intrinsic knowledge.
252pub async fn assemble_context(
253    substrate: &dyn SubstrateProvider,
254    lens: &Lens,
255    collective_id: CollectiveId,
256    budget: &pulsehive_core::context::ContextBudget,
257) -> Result<Vec<Message>> {
258    let (candidates, activities) = query_substrate(substrate, lens, collective_id)
259        .instrument(tracing::debug_span!(
260            "query_substrate",
261            mode = if !lens.purpose_embedding.is_empty() {
262                "semantic"
263            } else {
264                "recent"
265            },
266        ))
267        .await?;
268    tracing::debug!(
269        candidate_count = candidates.len(),
270        activity_count = activities.len(),
271        "Substrate queried"
272    );
273    let ranked = rerank(candidates, lens, None);
274    let packed = pack_within_budget(ranked, budget);
275    tracing::debug!(packed_count = packed.len(), "Context packed");
276    Ok(format_as_intrinsic_knowledge(&packed, &activities))
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use pulsedb::{AgentId, ExperienceId, ExperienceType};
283    use pulsehive_core::context::ContextBudget;
284    use pulsehive_core::lens::ExperienceTypeTag;
285
286    fn make_experience(
287        content: &str,
288        domain: Vec<&str>,
289        importance: f32,
290        exp_type: ExperienceType,
291        age_hours: f32,
292    ) -> Experience {
293        let now_ms = Timestamp::now().0;
294        let age_ms = (age_hours * 3600.0 * 1000.0) as i64;
295        Experience {
296            id: ExperienceId::new(),
297            collective_id: CollectiveId::new(),
298            content: content.into(),
299            experience_type: exp_type,
300            embedding: vec![],
301            importance,
302            confidence: 0.8,
303            domain: domain.into_iter().map(String::from).collect(),
304            related_files: vec![],
305            source_agent: AgentId("test".into()),
306            source_task: None,
307            timestamp: Timestamp(now_ms - age_ms),
308            archived: false,
309            applications: 0,
310        }
311    }
312
313    // ── Re-rank tests ────────────────────────────────────────────────
314
315    #[test]
316    fn test_rerank_domain_boost() {
317        let experiences = vec![
318            make_experience(
319                "safety issue",
320                vec!["safety"],
321                0.5,
322                ExperienceType::Generic { category: None },
323                1.0,
324            ),
325            make_experience(
326                "code pattern",
327                vec!["code"],
328                0.5,
329                ExperienceType::Generic { category: None },
330                1.0,
331            ),
332        ];
333
334        let lens = Lens::new(["safety"]);
335        let ranked = rerank(experiences, &lens, None);
336
337        // Safety-domain experience should rank higher (1.5x boost)
338        assert_eq!(ranked[0].0.content, "safety issue");
339    }
340
341    #[test]
342    fn test_rerank_type_weight() {
343        let experiences = vec![
344            make_experience(
345                "an error",
346                vec![],
347                0.5,
348                ExperienceType::ErrorPattern {
349                    signature: "err".into(),
350                    fix: "".into(),
351                    prevention: "".into(),
352                },
353                1.0,
354            ),
355            make_experience(
356                "a fact",
357                vec![],
358                0.5,
359                ExperienceType::Fact {
360                    statement: "x".into(),
361                    source: "y".into(),
362                },
363                1.0,
364            ),
365        ];
366
367        let mut lens = Lens::default();
368        lens.type_weights
369            .insert(ExperienceTypeTag::ErrorPattern, 3.0);
370        // Fact has default weight 1.0
371
372        let ranked = rerank(experiences, &lens, None);
373        assert_eq!(ranked[0].0.content, "an error"); // 3x type weight
374    }
375
376    #[test]
377    fn test_rerank_temporal_decay() {
378        let experiences = vec![
379            make_experience(
380                "old",
381                vec![],
382                0.8,
383                ExperienceType::Generic { category: None },
384                200.0, // 200 hours old
385            ),
386            make_experience(
387                "recent",
388                vec![],
389                0.8,
390                ExperienceType::Generic { category: None },
391                1.0, // 1 hour old
392            ),
393        ];
394
395        let lens = Lens::default(); // Exponential 72h half-life
396        let ranked = rerank(experiences, &lens, None);
397        assert_eq!(ranked[0].0.content, "recent"); // Recent decays less
398    }
399
400    #[test]
401    fn test_rerank_truncates_to_budget() {
402        let experiences: Vec<Experience> = (0..20)
403            .map(|i| {
404                make_experience(
405                    &format!("exp {i}"),
406                    vec![],
407                    0.5,
408                    ExperienceType::Generic { category: None },
409                    i as f32,
410                )
411            })
412            .collect();
413
414        let lens = Lens {
415            attention_budget: 5,
416            ..Lens::default()
417        };
418
419        let ranked = rerank(experiences, &lens, None);
420        assert_eq!(ranked.len(), 5);
421    }
422
423    #[test]
424    fn test_rerank_uniform_curve() {
425        let experiences = vec![
426            make_experience(
427                "old high importance",
428                vec![],
429                0.9,
430                ExperienceType::Generic { category: None },
431                500.0,
432            ),
433            make_experience(
434                "recent low importance",
435                vec![],
436                0.3,
437                ExperienceType::Generic { category: None },
438                1.0,
439            ),
440        ];
441
442        let lens = Lens {
443            recency_curve: RecencyCurve::Uniform,
444            ..Lens::default()
445        };
446
447        let ranked = rerank(experiences, &lens, None);
448        // Uniform: no time decay, importance wins
449        assert_eq!(ranked[0].0.content, "old high importance");
450    }
451
452    // ── Format tests ─────────────────────────────────────────────────
453
454    #[test]
455    fn test_format_empty_returns_empty() {
456        let messages = format_as_intrinsic_knowledge(&[], &[]);
457        assert!(messages.is_empty());
458    }
459
460    #[test]
461    fn test_format_experiences_as_intrinsic_knowledge() {
462        let experiences = vec![make_experience(
463            "Rust's ownership model prevents data races",
464            vec!["rust"],
465            0.8,
466            ExperienceType::Generic { category: None },
467            1.0,
468        )];
469
470        let messages = format_as_intrinsic_knowledge(&experiences, &[]);
471        assert_eq!(messages.len(), 1);
472        let content = match &messages[0] {
473            Message::System { content } => content.clone(),
474            _ => panic!("Expected System message"),
475        };
476        assert!(content.contains("You understand that"));
477        assert!(content.contains("Rust's ownership model"));
478    }
479
480    #[test]
481    fn test_format_with_activities() {
482        let activities = vec![Activity {
483            agent_id: "researcher".into(),
484            collective_id: CollectiveId::new(),
485            current_task: Some("analyzing codebase".into()),
486            context_summary: None,
487            started_at: Timestamp::now(),
488            last_heartbeat: Timestamp::now(),
489        }];
490
491        let messages = format_as_intrinsic_knowledge(&[], &activities);
492        assert_eq!(messages.len(), 1);
493        let content = match &messages[0] {
494            Message::System { content } => content.clone(),
495            _ => panic!("Expected System message"),
496        };
497        assert!(content.contains("You're aware that"));
498        assert!(content.contains("researcher"));
499        assert!(content.contains("analyzing codebase"));
500    }
501
502    // ── Budget packing tests ─────────────────────────────────────────
503
504    #[test]
505    fn test_pack_within_token_budget() {
506        // Each experience has ~100 chars content → ~25+20=45 tokens estimated
507        let ranked: Vec<(Experience, f32)> = (0..10)
508            .map(|i| {
509                (
510                    make_experience(
511                        &"x".repeat(100),
512                        vec![],
513                        0.5,
514                        ExperienceType::Generic { category: None },
515                        i as f32,
516                    ),
517                    1.0 - (i as f32 * 0.1),
518                )
519            })
520            .collect();
521
522        let budget = ContextBudget {
523            max_tokens: 200, // ~4-5 experiences worth
524            max_experiences: 50,
525            max_insights: 10,
526        };
527
528        let packed = pack_within_budget(ranked, &budget);
529        assert!(
530            packed.len() < 10,
531            "Should have been limited by token budget"
532        );
533        assert!(!packed.is_empty());
534    }
535
536    #[test]
537    fn test_pack_within_experience_budget() {
538        let ranked: Vec<(Experience, f32)> = (0..10)
539            .map(|i| {
540                (
541                    make_experience(
542                        "short",
543                        vec![],
544                        0.5,
545                        ExperienceType::Generic { category: None },
546                        i as f32,
547                    ),
548                    1.0,
549                )
550            })
551            .collect();
552
553        let budget = ContextBudget {
554            max_tokens: 100_000, // Unlimited tokens
555            max_experiences: 3,  // But only 3 experiences
556            max_insights: 10,
557        };
558
559        let packed = pack_within_budget(ranked, &budget);
560        assert_eq!(packed.len(), 3);
561    }
562
563    #[test]
564    fn test_pack_empty_input() {
565        let budget = ContextBudget::default();
566        let packed = pack_within_budget(vec![], &budget);
567        assert!(packed.is_empty());
568    }
569}