Skip to main content

nexus_memory_agent/
derive.rs

1//! Derivation service - converts raw memories into explicit observations.
2
3use std::collections::HashSet;
4use std::sync::Arc;
5
6use nexus_core::config::AgentConfig;
7use nexus_core::traits::EmbeddingService;
8use nexus_core::{
9    cognitive_level_from_metadata, infer_perspective, perspective_from_metadata, CognitiveLevel,
10    CognitiveMetadata, Memory, MemoryCategory, MemoryLaneType, PerspectiveKey, PerspectiveSource,
11};
12use nexus_llm::{ChatMessage, GenerateParams, LlmClient, LlmClientJson};
13use nexus_storage::models::EnqueueJobParams;
14use nexus_storage::repository::{
15    MemoryRepository, StoreMemoryParams, StoreMemoryWithLineageParams,
16};
17use tracing::{debug, info, warn};
18
19use crate::error::AgentError;
20use crate::prompts::{derive_user_prompt, DERIVE_SYSTEM_PROMPT};
21use crate::util::maybe_embed;
22
23const DERIVE_MAX_TOKENS: u32 = 4096;
24const REFLECT_PERSPECTIVE_JOB: &str = "reflect_perspective";
25const DIGEST_SESSION_JOB: &str = "digest_session";
26const DERIVE_GENERATED_BY: &str = "derive_service";
27const DERIVED_FROM_ROLE: &str = "derived_from";
28const RAW_ACTIVITY_LABEL: &str = "raw-activity";
29const LOW_SIGNAL_LABEL: &str = "low-signal";
30
31pub struct DeriveService {
32    config: AgentConfig,
33    llm: Arc<dyn LlmClient>,
34    embeddings: Option<Arc<dyn EmbeddingService>>,
35}
36
37#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
38pub struct DerivedObservation {
39    pub content: String,
40    pub category: String,
41    pub memory_lane_type: Option<String>,
42    pub labels: Vec<String>,
43    pub confidence: f32,
44}
45
46#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
47struct DerivedObservationEnvelope {
48    observations: Vec<DerivedObservation>,
49}
50
51impl DeriveService {
52    pub fn new(
53        config: AgentConfig,
54        llm: Arc<dyn LlmClient>,
55        embeddings: Option<Arc<dyn EmbeddingService>>,
56    ) -> Self {
57        Self {
58            config,
59            llm,
60            embeddings,
61        }
62    }
63
64    pub async fn derive_memory(
65        &self,
66        memory: &Memory,
67        repo: &MemoryRepository,
68    ) -> Result<Vec<i64>, AgentError> {
69        self.derive_memory_with_perspective(memory, None, repo)
70            .await
71    }
72
73    pub async fn derive_memory_with_perspective(
74        &self,
75        memory: &Memory,
76        queued_perspective: Option<&PerspectiveKey>,
77        repo: &MemoryRepository,
78    ) -> Result<Vec<i64>, AgentError> {
79        if !is_derivable_source(memory) {
80            return Ok(Vec::new());
81        }
82
83        let existing_ids = existing_derived_ids(repo, memory.id).await?;
84        if !existing_ids.is_empty() {
85            debug!(
86                memory_id = memory.id,
87                derived_count = existing_ids.len(),
88                "Reusing existing derived observations"
89            );
90            return Ok(existing_ids);
91        }
92
93        let perspective = queued_perspective
94            .cloned()
95            .or_else(|| perspective_from_metadata(&memory.metadata))
96            .unwrap_or_else(|| {
97                infer_perspective(
98                    PerspectiveSource::HookIngest,
99                    self.config.namespace.clone(),
100                    None,
101                    None,
102                )
103            });
104
105        let observations = match self.derive_with_llm(memory).await {
106            Ok(observations) => observations,
107            Err(error) => {
108                warn!(memory_id = memory.id, %error, "LLM derivation failed, using fallback");
109                fallback_observations(memory)
110            }
111        };
112
113        let observations = normalize_observations(observations);
114        if observations.is_empty() {
115            debug!(memory_id = memory.id, "No explicit observations derived");
116            return Ok(Vec::new());
117        }
118
119        let mut derived_ids = Vec::with_capacity(observations.len());
120        for observation in observations {
121            let category = MemoryCategory::parse(&observation.category).unwrap_or(memory.category);
122            let memory_lane_type = observation
123                .memory_lane_type
124                .as_deref()
125                .and_then(MemoryLaneType::parse);
126            let metadata = derive_metadata(memory, &perspective, observation.confidence);
127            let (embedding, embedding_model) =
128                maybe_embed(self.embeddings.as_deref(), &observation.content).await;
129
130            let derived = repo
131                .store_with_lineage(StoreMemoryWithLineageParams {
132                    store: StoreMemoryParams {
133                        namespace_id: memory.namespace_id,
134                        content: &observation.content,
135                        category: &category,
136                        memory_lane_type: memory_lane_type.as_ref(),
137                        labels: &observation.labels,
138                        metadata: &metadata,
139                        embedding: embedding.as_deref(),
140                        embedding_model: embedding_model.as_deref(),
141                    },
142                    source_memory_ids: &[memory.id],
143                    evidence_role: DERIVED_FROM_ROLE,
144                })
145                .await
146                .map_err(|error| AgentError::Storage(error.to_string()))?;
147            derived_ids.push(derived.id);
148        }
149
150        enqueue_follow_up_jobs(repo, memory, &perspective, &derived_ids, &self.config).await?;
151
152        info!(
153            memory_id = memory.id,
154            derived_count = derived_ids.len(),
155            "Derived explicit observations from raw memory"
156        );
157        Ok(derived_ids)
158    }
159
160    async fn derive_with_llm(
161        &self,
162        memory: &Memory,
163    ) -> Result<Vec<DerivedObservation>, AgentError> {
164        let params = GenerateParams {
165            messages: vec![
166                ChatMessage::system(DERIVE_SYSTEM_PROMPT),
167                ChatMessage::user(derive_user_prompt(memory)),
168            ],
169            max_tokens: DERIVE_MAX_TOKENS,
170            temperature: 0.1,
171            json_mode: true,
172        };
173
174        let envelope: DerivedObservationEnvelope = self
175            .llm
176            .generate_json(params)
177            .await
178            .map_err(|error| AgentError::Llm(error.to_string()))?;
179
180        Ok(envelope.observations)
181    }
182}
183
184fn derive_metadata(
185    source: &Memory,
186    perspective: &PerspectiveKey,
187    confidence: f32,
188) -> serde_json::Value {
189    let mut cognitive = CognitiveMetadata::new(
190        CognitiveLevel::Explicit,
191        perspective.observer.clone(),
192        perspective.subject.clone(),
193        perspective.session_key.clone(),
194        DERIVE_GENERATED_BY,
195    );
196    cognitive.source_memory_ids = vec![source.id];
197    cognitive.confidence = Some(confidence.max(0.70));
198    cognitive.merge_into(&sanitized_source_metadata(source))
199}
200
201fn fallback_observations(memory: &Memory) -> Vec<DerivedObservation> {
202    let summary = memory
203        .metadata
204        .get("agent")
205        .and_then(|agent| agent.get("summary"))
206        .and_then(serde_json::Value::as_str)
207        .map(str::trim)
208        .filter(|summary| summary.len() >= 16 && !looks_like_noise(summary))
209        .map(ToString::to_string);
210
211    let content = memory
212        .content
213        .split_whitespace()
214        .collect::<Vec<_>>()
215        .join(" ");
216
217    let candidate = summary.unwrap_or(content);
218    if candidate.is_empty() || looks_like_noise(&candidate) {
219        return Vec::new();
220    }
221
222    vec![DerivedObservation {
223        content: candidate,
224        category: memory.category.to_string(),
225        memory_lane_type: memory.memory_lane_type.as_ref().map(ToString::to_string),
226        labels: explicit_labels_from_source(memory),
227        confidence: 0.70,
228    }]
229}
230
231fn sanitized_source_metadata(source: &Memory) -> serde_json::Value {
232    let mut sanitized = serde_json::Map::new();
233
234    if let Some(agent) = source
235        .metadata
236        .get("agent")
237        .and_then(serde_json::Value::as_object)
238    {
239        let mut agent_sanitized = serde_json::Map::new();
240        for key in [
241            "summary",
242            "entities",
243            "topics",
244            "importance_score",
245            "source",
246            "generated_by",
247        ] {
248            if let Some(value) = agent.get(key) {
249                agent_sanitized.insert(key.to_string(), value.clone());
250            }
251        }
252        if !agent_sanitized.is_empty() {
253            sanitized.insert(
254                "agent".to_string(),
255                serde_json::Value::Object(agent_sanitized),
256            );
257        }
258    }
259
260    serde_json::Value::Object(sanitized)
261}
262
263fn explicit_labels_from_source(source: &Memory) -> Vec<String> {
264    let mut labels: Vec<String> = source
265        .labels
266        .iter()
267        .filter(|label| {
268            !label.eq_ignore_ascii_case(RAW_ACTIVITY_LABEL)
269                && !label.eq_ignore_ascii_case(LOW_SIGNAL_LABEL)
270        })
271        .cloned()
272        .collect();
273    dedupe_labels(&mut labels);
274    labels
275}
276
277fn normalize_observations(observations: Vec<DerivedObservation>) -> Vec<DerivedObservation> {
278    let mut seen = HashSet::new();
279    let mut normalized = Vec::new();
280
281    for mut observation in observations {
282        observation.content = observation.content.trim().to_string();
283        if observation.content.is_empty() || observation.confidence < 0.70 {
284            continue;
285        }
286
287        let fingerprint = observation.content.to_lowercase();
288        if !seen.insert(fingerprint) {
289            continue;
290        }
291
292        observation.labels.retain(|label| {
293            !label.eq_ignore_ascii_case(RAW_ACTIVITY_LABEL)
294                && !label.eq_ignore_ascii_case(LOW_SIGNAL_LABEL)
295        });
296        dedupe_labels(&mut observation.labels);
297        normalized.push(observation);
298    }
299
300    normalized
301}
302
303fn dedupe_labels(labels: &mut Vec<String>) {
304    let mut seen = HashSet::new();
305    labels.retain(|label| seen.insert(label.to_lowercase()));
306}
307
308fn looks_like_noise(content: &str) -> bool {
309    let trimmed = content.trim();
310    trimmed.starts_with('{')
311        || trimmed.starts_with('[')
312        || trimmed.contains("\"event_name\"")
313        || trimmed.contains("\"tool_name\"")
314}
315
316fn is_derivable_source(memory: &Memory) -> bool {
317    if cognitive_level_from_metadata(&memory.metadata) == CognitiveLevel::Raw {
318        return true;
319    }
320
321    memory.labels.iter().any(|label| {
322        label.eq_ignore_ascii_case(RAW_ACTIVITY_LABEL)
323            || label.eq_ignore_ascii_case(LOW_SIGNAL_LABEL)
324    }) || memory
325        .metadata
326        .get("raw_activity")
327        .and_then(serde_json::Value::as_bool)
328        .unwrap_or(false)
329        || memory
330            .metadata
331            .get("activity")
332            .and_then(|activity| activity.get("low_signal"))
333            .and_then(serde_json::Value::as_bool)
334            .unwrap_or(false)
335}
336
337async fn existing_derived_ids(
338    repo: &MemoryRepository,
339    source_memory_id: i64,
340) -> Result<Vec<i64>, AgentError> {
341    let mut ids: Vec<i64> = repo
342        .load_lineage(source_memory_id)
343        .await
344        .map_err(|error| AgentError::Storage(error.to_string()))?
345        .into_iter()
346        .filter(|entry| entry.source_memory_id == source_memory_id)
347        .map(|entry| entry.derived_memory_id)
348        .collect();
349    ids.sort_unstable();
350    ids.dedup();
351    Ok(ids)
352}
353
354async fn enqueue_follow_up_jobs(
355    repo: &MemoryRepository,
356    source: &Memory,
357    perspective: &PerspectiveKey,
358    derived_ids: &[i64],
359    config: &AgentConfig,
360) -> Result<(), AgentError> {
361    if derived_ids.is_empty() {
362        return Ok(());
363    }
364
365    let perspective_json = serde_json::to_value(perspective).ok();
366    let reflect_payload = serde_json::json!({
367        "source_memory_id": source.id,
368        "derived_memory_ids": derived_ids,
369        "agent_namespace": config.namespace,
370    });
371
372    repo.enqueue_job(EnqueueJobParams {
373        namespace_id: source.namespace_id,
374        job_type: REFLECT_PERSPECTIVE_JOB,
375        priority: 100,
376        perspective: perspective_json.as_ref(),
377        payload: &reflect_payload,
378    })
379    .await
380    .map_err(|error| AgentError::Storage(error.to_string()))?;
381
382    if let Some(session_key) = &perspective.session_key {
383        let digest_payload = serde_json::json!({
384            "source_memory_id": source.id,
385            "derived_memory_ids": derived_ids,
386            "session_key": session_key,
387            "agent_namespace": config.namespace,
388        });
389        repo.enqueue_job(EnqueueJobParams {
390            namespace_id: source.namespace_id,
391            job_type: DIGEST_SESSION_JOB,
392            priority: 90,
393            perspective: perspective_json.as_ref(),
394            payload: &digest_payload,
395        })
396        .await
397        .map_err(|error| AgentError::Storage(error.to_string()))?;
398    }
399
400    Ok(())
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    use std::collections::VecDeque;
408    use std::sync::Mutex;
409
410    use async_trait::async_trait;
411    use nexus_core::MemoryLanePriorityType;
412    use nexus_llm::GenerateResponse;
413    use nexus_storage::repository::{NamespaceRepository, StoreMemoryParams};
414    use sqlx::sqlite::SqlitePoolOptions;
415
416    struct MockLlmClient {
417        responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
418    }
419
420    impl MockLlmClient {
421        fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
422            Self {
423                responses: Mutex::new(VecDeque::from(responses)),
424            }
425        }
426    }
427
428    #[async_trait]
429    impl LlmClient for MockLlmClient {
430        async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
431            self.responses
432                .lock()
433                .expect("mock responses poisoned")
434                .pop_front()
435                .expect("mock response missing")
436        }
437
438        fn provider_name(&self) -> String {
439            "mock".to_string()
440        }
441
442        fn model_name(&self) -> String {
443            "mock-model".to_string()
444        }
445    }
446
447    async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64) {
448        let pool = SqlitePoolOptions::new()
449            .max_connections(1)
450            .connect("sqlite::memory:")
451            .await
452            .unwrap();
453        nexus_storage::migrations::run_migrations(&pool)
454            .await
455            .unwrap();
456        let namespace_repo = NamespaceRepository::new(pool.clone());
457        let namespace = namespace_repo
458            .get_or_create("derive-test", "derive-test")
459            .await
460            .unwrap();
461        let repo = MemoryRepository::new(pool.clone());
462        (pool, repo, namespace.id)
463    }
464
465    fn raw_memory_metadata(session_key: &str) -> serde_json::Value {
466        let mut cognitive = CognitiveMetadata::new(
467            CognitiveLevel::Raw,
468            "claude-code",
469            "claude-code",
470            Some(session_key.to_string()),
471            "ingest_service",
472        );
473        cognitive.confidence = Some(0.9);
474        cognitive.merge_into(&serde_json::json!({
475            "agent": {
476                "summary": "Fixed the query path and tightened pagination behavior."
477            }
478        }))
479    }
480
481    async fn store_raw_memory(repo: &MemoryRepository, namespace_id: i64, content: &str) -> Memory {
482        repo.store(StoreMemoryParams {
483            namespace_id,
484            content,
485            category: &MemoryCategory::Session,
486            memory_lane_type: Some(&MemoryLaneType::Priority(MemoryLanePriorityType::Decision)),
487            labels: &["memory".to_string(), "memory".to_string()],
488            metadata: &raw_memory_metadata("session-1"),
489            embedding: None,
490            embedding_model: None,
491        })
492        .await
493        .unwrap()
494    }
495
496    #[tokio::test]
497    async fn test_derive_memory_skips_non_raw_memories() {
498        let (_pool, repo, namespace_id) = setup_repo().await;
499        let mut cognitive = CognitiveMetadata::new(
500            CognitiveLevel::Explicit,
501            "claude-code",
502            "claude-code",
503            Some("session-1".to_string()),
504            "derive_service",
505        );
506        cognitive.confidence = Some(0.9);
507
508        let explicit = repo
509            .store(StoreMemoryParams {
510                namespace_id,
511                content: "Already explicit",
512                category: &MemoryCategory::Facts,
513                memory_lane_type: None,
514                labels: &[],
515                metadata: &cognitive.merge_into(&serde_json::json!({})),
516                embedding: None,
517                embedding_model: None,
518            })
519            .await
520            .unwrap();
521
522        let service = DeriveService::new(
523            AgentConfig::default(),
524            Arc::new(MockLlmClient::new(Vec::new())),
525            None,
526        );
527        let derived_ids = service.derive_memory(&explicit, &repo).await.unwrap();
528        assert!(derived_ids.is_empty());
529    }
530
531    #[tokio::test]
532    async fn test_derive_memory_persists_explicit_observations_and_jobs() {
533        let (pool, repo, namespace_id) = setup_repo().await;
534        let raw = store_raw_memory(
535            &repo,
536            namespace_id,
537            "Fixed query pagination and recall ranking.",
538        )
539        .await;
540        let response = GenerateResponse {
541            content: r#"{"observations":[{"content":"Fixed query pagination behavior.","category":"session","memory_lane_type":"decision","labels":["query","pagination"],"confidence":0.9}]}"#.to_string(),
542            model: "mock-model".to_string(),
543            usage: None,
544        };
545        let service = DeriveService::new(
546            AgentConfig::default(),
547            Arc::new(MockLlmClient::new(vec![Ok(response)])),
548            None,
549        );
550
551        let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
552
553        assert_eq!(derived_ids.len(), 1);
554        let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
555        assert_eq!(
556            cognitive_level_from_metadata(&derived.metadata),
557            CognitiveLevel::Explicit
558        );
559        assert_eq!(
560            derived.labels,
561            vec!["query".to_string(), "pagination".to_string()]
562        );
563
564        let lineage = repo.load_lineage(derived.id).await.unwrap();
565        assert_eq!(lineage.len(), 1);
566        assert_eq!(lineage[0].source_memory_id, raw.id);
567
568        let reflect_jobs: i64 =
569            sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs WHERE job_type = ?")
570                .bind(REFLECT_PERSPECTIVE_JOB)
571                .fetch_one(&pool)
572                .await
573                .unwrap();
574        let digest_jobs: i64 =
575            sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs WHERE job_type = ?")
576                .bind(DIGEST_SESSION_JOB)
577                .fetch_one(&pool)
578                .await
579                .unwrap();
580        assert_eq!(reflect_jobs, 1);
581        assert_eq!(digest_jobs, 1);
582    }
583
584    #[tokio::test]
585    async fn test_derive_memory_is_idempotent() {
586        let (pool, repo, namespace_id) = setup_repo().await;
587        let raw = store_raw_memory(
588            &repo,
589            namespace_id,
590            "Introduced working-set retrieval primitives.",
591        )
592        .await;
593        let response = GenerateResponse {
594            content: r#"{"observations":[{"content":"Added working-set retrieval primitives.","category":"facts","memory_lane_type":null,"labels":["retrieval"],"confidence":0.85}]}"#.to_string(),
595            model: "mock-model".to_string(),
596            usage: None,
597        };
598        let service = DeriveService::new(
599            AgentConfig::default(),
600            Arc::new(MockLlmClient::new(vec![Ok(response)])),
601            None,
602        );
603
604        let first = service.derive_memory(&raw, &repo).await.unwrap();
605        let second = service.derive_memory(&raw, &repo).await.unwrap();
606
607        assert_eq!(first, second);
608
609        let job_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs")
610            .fetch_one(&pool)
611            .await
612            .unwrap();
613        assert_eq!(job_count, 2);
614    }
615
616    #[tokio::test]
617    async fn test_derive_memory_falls_back_when_llm_response_is_invalid() {
618        let (_pool, repo, namespace_id) = setup_repo().await;
619        let raw = store_raw_memory(
620            &repo,
621            namespace_id,
622            "Noisy raw content that still has a useful summary.",
623        )
624        .await;
625        let bad_response = GenerateResponse {
626            content: "not json".to_string(),
627            model: "mock-model".to_string(),
628            usage: None,
629        };
630        let service = DeriveService::new(
631            AgentConfig::default(),
632            Arc::new(MockLlmClient::new(vec![Ok(bad_response)])),
633            None,
634        );
635
636        let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
637        assert_eq!(derived_ids.len(), 1);
638
639        let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
640        assert!(derived.content.contains("tightened pagination behavior"));
641    }
642
643    #[tokio::test]
644    async fn test_derive_memory_strips_raw_noise_markers_from_explicit_output() {
645        let (_pool, repo, namespace_id) = setup_repo().await;
646        let raw = repo
647            .store(StoreMemoryParams {
648                namespace_id,
649                content: "Raw hook activity with durable summary.",
650                category: &MemoryCategory::Session,
651                memory_lane_type: None,
652                labels: &[
653                    RAW_ACTIVITY_LABEL.to_string(),
654                    "query".to_string(),
655                    "query".to_string(),
656                ],
657                metadata: &serde_json::json!({
658                    "raw_activity": true,
659                    "agent": { "summary": "Useful summary survives." },
660                    "cognitive": {
661                        "level": "raw",
662                        "observer": "claude-code",
663                        "subject": "claude-code",
664                        "session_key": "session-1",
665                        "generated_by": "ingest_service"
666                    }
667                }),
668                embedding: None,
669                embedding_model: None,
670            })
671            .await
672            .unwrap();
673
674        let response = GenerateResponse {
675            content: r#"{"observations":[{"content":"Useful explicit observation.","category":"facts","memory_lane_type":null,"labels":["raw-activity","query"],"confidence":0.9}]}"#.to_string(),
676            model: "mock-model".to_string(),
677            usage: None,
678        };
679        let service = DeriveService::new(
680            AgentConfig::default(),
681            Arc::new(MockLlmClient::new(vec![Ok(response)])),
682            None,
683        );
684
685        let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
686        let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
687
688        assert!(!derived
689            .labels
690            .iter()
691            .any(|label| label == RAW_ACTIVITY_LABEL));
692        assert!(derived.metadata.get("raw_activity").is_none());
693        assert_eq!(
694            derived.metadata["agent"]["summary"],
695            serde_json::Value::String("Useful summary survives.".to_string())
696        );
697    }
698
699    #[tokio::test]
700    async fn test_derive_memory_produces_embeddings_when_service_provided() {
701        let (_pool, repo, namespace_id) = setup_repo().await;
702        let raw = store_raw_memory(
703            &repo,
704            namespace_id,
705            "Implemented the query path with tight pagination.",
706        )
707        .await;
708
709        let mock_embed = nexus_embeddings::MockEmbeddingService::new();
710        let response = GenerateResponse {
711            content: r#"{"observations":[{"content":"Implemented query path with pagination.","category":"facts","memory_lane_type":null,"labels":["query","pagination"],"confidence":0.85}]}"#.to_string(),
712            model: "mock-model".to_string(),
713            usage: None,
714        };
715        let service = DeriveService::new(
716            AgentConfig::default(),
717            Arc::new(MockLlmClient::new(vec![Ok(response)])),
718            Some(Arc::new(mock_embed)),
719        );
720
721        let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
722        let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
723
724        assert!(
725            derived.content_embedding.is_some(),
726            "derived explicit observation should have an embedding when service is provided"
727        );
728        let embedding = derived.content_embedding.as_ref().unwrap();
729        assert_eq!(embedding.len(), 384, "embedding dimension should be 384");
730    }
731
732    #[tokio::test]
733    async fn test_derive_memory_stores_without_embedding_when_service_absent() {
734        let (_pool, repo, namespace_id) = setup_repo().await;
735        let raw = store_raw_memory(
736            &repo,
737            namespace_id,
738            "Implemented the query path with tight pagination.",
739        )
740        .await;
741
742        let response = GenerateResponse {
743            content: r#"{"observations":[{"content":"Implemented query path with pagination.","category":"facts","memory_lane_type":null,"labels":["query","pagination"],"confidence":0.85}]}"#.to_string(),
744            model: "mock-model".to_string(),
745            usage: None,
746        };
747        let service = DeriveService::new(
748            AgentConfig::default(),
749            Arc::new(MockLlmClient::new(vec![Ok(response)])),
750            None,
751        );
752
753        let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
754        let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
755
756        assert!(
757            derived.content_embedding.is_none(),
758            "derived observation should NOT have an embedding when no service provided"
759        );
760    }
761
762    #[tokio::test]
763    async fn test_derive_memory_accepts_low_signal_activity_sources() {
764        let (_pool, repo, namespace_id) = setup_repo().await;
765        let low_signal = repo
766            .store(StoreMemoryParams {
767                namespace_id,
768                content: "Low signal activity with useful summary.",
769                category: &MemoryCategory::Session,
770                memory_lane_type: None,
771                labels: &[LOW_SIGNAL_LABEL.to_string()],
772                metadata: &serde_json::json!({
773                    "activity": { "low_signal": true },
774                    "agent": { "summary": "Captured meaningful work despite low signal." },
775                    "cognitive": {
776                        "level": "explicit",
777                        "observer": "claude-code",
778                        "subject": "claude-code",
779                        "session_key": "session-1",
780                        "generated_by": "activity_distiller"
781                    }
782                }),
783                embedding: None,
784                embedding_model: None,
785            })
786            .await
787            .unwrap();
788
789        let service = DeriveService::new(
790            AgentConfig::default(),
791            Arc::new(MockLlmClient::new(vec![Err(
792                nexus_llm::LlmError::InvalidJsonResponse("bad".to_string()),
793            )])),
794            None,
795        );
796
797        let derived_ids = service.derive_memory(&low_signal, &repo).await.unwrap();
798        assert_eq!(derived_ids.len(), 1);
799    }
800}