Skip to main content

nexus_memory_agent/
util.rs

1//! Shared utility functions for agent services
2
3use nexus_core::traits::EmbeddingService;
4use nexus_core::{CognitiveLevel, CognitiveMetadata, Memory, PerspectiveKey};
5use nexus_llm::{GenerateResponse, TokenUsage};
6use nexus_storage::{MemoryRepository, MetricSample};
7use tracing::warn;
8
9#[derive(Debug, Clone)]
10pub struct CognitionSnapshot {
11    pub level: CognitiveLevel,
12    pub confidence: Option<f32>,
13    pub perspective: Option<PerspectiveKey>,
14    pub generated_by: Option<String>,
15    pub times_reinforced: i64,
16    pub raw_activity: bool,
17}
18
19impl CognitionSnapshot {
20    /// Build a cognition snapshot from a memory's metadata.
21    ///
22    /// Parses `CognitiveMetadata` exactly once and reads all fields from the parsed
23    /// struct.  Previous versions called `cognitive_level_from_metadata` and
24    /// `perspective_from_metadata` as fallbacks, each of which re-parsed the same
25    /// JSON — up to three redundant deserializations per call.
26    pub fn from_memory(memory: &Memory) -> Self {
27        let cognitive = CognitiveMetadata::from_metadata(&memory.metadata);
28        let level = cognitive
29            .as_ref()
30            .map_or(CognitiveLevel::Raw, |value| value.level);
31        let perspective = cognitive.as_ref().map(|value| PerspectiveKey {
32            observer: value.observer.clone(),
33            subject: value.subject.clone(),
34            session_key: value.session_key.clone(),
35        });
36        let raw_activity = memory.labels.iter().any(|label| label == "raw-activity")
37            || memory
38                .metadata
39                .get("raw_activity")
40                .and_then(serde_json::Value::as_bool)
41                .unwrap_or(false);
42
43        Self {
44            level,
45            confidence: cognitive.as_ref().and_then(|value| value.confidence),
46            perspective,
47            generated_by: cognitive.as_ref().map(|value| value.generated_by.clone()),
48            times_reinforced: cognitive
49                .as_ref()
50                .map(|value| value.times_reinforced)
51                .unwrap_or(0),
52            raw_activity,
53        }
54    }
55
56    pub fn confidence_meets_threshold(&self) -> bool {
57        let confidence = self.confidence.unwrap_or(1.0);
58        match self.level {
59            CognitiveLevel::Explicit => confidence >= 0.70,
60            CognitiveLevel::Derived => confidence >= 0.75,
61            CognitiveLevel::Contradiction => confidence >= 0.80,
62            CognitiveLevel::SummaryShort | CognitiveLevel::SummaryLong | CognitiveLevel::Raw => {
63                true
64            }
65        }
66    }
67}
68
69/// Extract the `agent.summary` field from JSON metadata, falling back to a
70/// truncated content excerpt.
71pub fn extract_agent_summary(metadata: &str, content: &str, fallback_chars: usize) -> String {
72    #[derive(serde::Deserialize)]
73    struct AgentMeta {
74        summary: Option<String>,
75    }
76
77    #[derive(serde::Deserialize)]
78    struct Metadata {
79        agent: Option<AgentMeta>,
80    }
81
82    serde_json::from_str::<Metadata>(metadata)
83        .ok()
84        .and_then(|md| md.agent)
85        .and_then(|a| a.summary)
86        .unwrap_or_else(|| content.chars().take(fallback_chars).collect())
87}
88
89/// Persist a best-effort stage timing metric without impacting cognition flow.
90pub async fn record_stage_metric(
91    repo: &MemoryRepository,
92    namespace_id: i64,
93    metric_name: &str,
94    metric_value_ms: f64,
95    stage: &str,
96) {
97    let labels = serde_json::json!({
98        "namespace_id": namespace_id,
99        "stage": stage,
100        "unit": "ms",
101    });
102
103    if let Err(error) = repo
104        .record_metric(metric_name, metric_value_ms, &labels)
105        .await
106    {
107        warn!(
108            %error,
109            namespace_id,
110            metric_name,
111            stage,
112            "Failed to persist cognition stage metric"
113        );
114    }
115}
116
117pub fn stage_metric_sample(
118    namespace_id: i64,
119    metric_name: &str,
120    metric_value_ms: f64,
121    stage: &str,
122) -> MetricSample {
123    MetricSample {
124        metric_name: metric_name.to_string(),
125        metric_value: metric_value_ms,
126        labels: serde_json::json!({
127            "namespace_id": namespace_id,
128            "stage": stage,
129            "unit": "ms",
130        }),
131    }
132}
133
134/// Persist best-effort token usage metrics for a cognition stage.
135pub async fn record_token_usage_metrics(
136    repo: &MemoryRepository,
137    namespace_id: i64,
138    metric_prefix: &str,
139    stage: &str,
140    usage: Option<&TokenUsage>,
141) {
142    let Some(usage) = usage else {
143        return;
144    };
145
146    for (suffix, value) in [
147        ("prompt_tokens", usage.prompt_tokens as f64),
148        ("completion_tokens", usage.completion_tokens as f64),
149        ("total_tokens", usage.total_tokens as f64),
150    ] {
151        let metric_name = format!("{metric_prefix}.{suffix}");
152        let labels = serde_json::json!({
153            "namespace_id": namespace_id,
154            "stage": stage,
155            "unit": "tokens",
156        });
157
158        if let Err(error) = repo.record_metric(&metric_name, value, &labels).await {
159            warn!(
160                %error,
161                namespace_id,
162                metric_name,
163                stage,
164                "Failed to persist cognition token usage metric"
165            );
166        }
167    }
168}
169
170pub fn token_usage_metric_samples(
171    namespace_id: i64,
172    metric_prefix: &str,
173    stage: &str,
174    usage: Option<&TokenUsage>,
175) -> Vec<MetricSample> {
176    let Some(usage) = usage else {
177        return Vec::new();
178    };
179
180    [
181        ("prompt_tokens", usage.prompt_tokens as f64),
182        ("completion_tokens", usage.completion_tokens as f64),
183        ("total_tokens", usage.total_tokens as f64),
184    ]
185    .into_iter()
186    .map(|(suffix, value)| MetricSample {
187        metric_name: format!("{metric_prefix}.{suffix}"),
188        metric_value: value,
189        labels: serde_json::json!({
190            "namespace_id": namespace_id,
191            "stage": stage,
192            "unit": "tokens",
193        }),
194    })
195    .collect()
196}
197
198pub async fn flush_metric_samples(repo: &MemoryRepository, samples: &[MetricSample]) {
199    if samples.is_empty() {
200        return;
201    }
202
203    if let Err(error) = repo.record_metrics_batch(samples).await {
204        warn!(%error, count = samples.len(), "Failed to persist cognition metric batch");
205    }
206}
207
208/// Parse a JSON response using the same fenced-block tolerance as `LlmClientJson`.
209pub fn parse_json_response<T: serde::de::DeserializeOwned>(
210    response: &GenerateResponse,
211) -> Result<T, serde_json::Error> {
212    let content = response.content.trim();
213    let json_str = if content.starts_with("```") {
214        let start = content.find('\n').map(|i| i + 1).unwrap_or(3);
215        let end = content[start..]
216            .rfind("```")
217            .map(|i| start + i)
218            .unwrap_or(content.len());
219        if start >= end {
220            content
221        } else {
222            &content[start..end]
223        }
224    } else {
225        content
226    };
227
228    serde_json::from_str(json_str.trim())
229}
230
231/// Attempt to generate an embedding for `content` using the optional service.
232///
233/// Returns `(Some(vector), Some(model_name))` on success, `(None, None)` when
234/// the service is absent or the call fails (graceful degradation).
235pub async fn maybe_embed(
236    service: Option<&dyn EmbeddingService>,
237    content: &str,
238) -> (Option<Vec<f32>>, Option<String>) {
239    let Some(svc) = service else {
240        return (None, None);
241    };
242    match svc.embed(content).await {
243        Ok(vec) => (Some(vec), Some(svc.model_name().to_string())),
244        Err(error) => {
245            warn!(%error, "Embedding generation failed, storing without embedding");
246            (None, None)
247        }
248    }
249}