1use std::collections::HashSet;
4use std::sync::Arc;
5
6use nexus_core::config::AgentConfig;
7use nexus_core::traits::EmbeddingService;
8use nexus_core::{
9 cognitive_level_from_metadata, infer_perspective, perspective_from_metadata, CognitiveLevel,
10 CognitiveMetadata, Memory, MemoryCategory, MemoryLaneType, PerspectiveKey, PerspectiveSource,
11};
12use nexus_llm::{ChatMessage, GenerateParams, LlmClient, LlmClientJson};
13use nexus_storage::models::EnqueueJobParams;
14use nexus_storage::repository::{
15 MemoryRepository, StoreMemoryParams, StoreMemoryWithLineageParams,
16};
17use tracing::{debug, info, warn};
18
19use crate::error::AgentError;
20use crate::prompts::{derive_user_prompt, DERIVE_SYSTEM_PROMPT};
21use crate::util::maybe_embed;
22
23const DERIVE_MAX_TOKENS: u32 = 4096;
24const REFLECT_PERSPECTIVE_JOB: &str = "reflect_perspective";
25const DIGEST_SESSION_JOB: &str = "digest_session";
26const DERIVE_GENERATED_BY: &str = "derive_service";
27const DERIVED_FROM_ROLE: &str = "derived_from";
28const RAW_ACTIVITY_LABEL: &str = "raw-activity";
29const LOW_SIGNAL_LABEL: &str = "low-signal";
30
31pub struct DeriveService {
32 config: AgentConfig,
33 llm: Arc<dyn LlmClient>,
34 embeddings: Option<Arc<dyn EmbeddingService>>,
35}
36
37#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
38pub struct DerivedObservation {
39 pub content: String,
40 pub category: String,
41 pub memory_lane_type: Option<String>,
42 pub labels: Vec<String>,
43 pub confidence: f32,
44}
45
46#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
47struct DerivedObservationEnvelope {
48 observations: Vec<DerivedObservation>,
49}
50
51impl DeriveService {
52 pub fn new(
53 config: AgentConfig,
54 llm: Arc<dyn LlmClient>,
55 embeddings: Option<Arc<dyn EmbeddingService>>,
56 ) -> Self {
57 Self {
58 config,
59 llm,
60 embeddings,
61 }
62 }
63
64 pub async fn derive_memory(
65 &self,
66 memory: &Memory,
67 repo: &MemoryRepository,
68 ) -> Result<Vec<i64>, AgentError> {
69 self.derive_memory_with_perspective(memory, None, repo)
70 .await
71 }
72
73 pub async fn derive_memory_with_perspective(
74 &self,
75 memory: &Memory,
76 queued_perspective: Option<&PerspectiveKey>,
77 repo: &MemoryRepository,
78 ) -> Result<Vec<i64>, AgentError> {
79 if !is_derivable_source(memory) {
80 return Ok(Vec::new());
81 }
82
83 let existing_ids = existing_derived_ids(repo, memory.id).await?;
84 if !existing_ids.is_empty() {
85 debug!(
86 memory_id = memory.id,
87 derived_count = existing_ids.len(),
88 "Reusing existing derived observations"
89 );
90 return Ok(existing_ids);
91 }
92
93 let perspective = queued_perspective
94 .cloned()
95 .or_else(|| perspective_from_metadata(&memory.metadata))
96 .unwrap_or_else(|| {
97 infer_perspective(
98 PerspectiveSource::HookIngest,
99 self.config.namespace.clone(),
100 None,
101 None,
102 )
103 });
104
105 let observations = match self.derive_with_llm(memory).await {
106 Ok(observations) => observations,
107 Err(error) => {
108 warn!(memory_id = memory.id, %error, "LLM derivation failed, using fallback");
109 fallback_observations(memory)
110 }
111 };
112
113 let observations = normalize_observations(observations);
114 if observations.is_empty() {
115 debug!(memory_id = memory.id, "No explicit observations derived");
116 return Ok(Vec::new());
117 }
118
119 let mut derived_ids = Vec::with_capacity(observations.len());
120 for observation in observations {
121 let category = MemoryCategory::parse(&observation.category).unwrap_or(memory.category);
122 let memory_lane_type = observation
123 .memory_lane_type
124 .as_deref()
125 .and_then(MemoryLaneType::parse);
126 let metadata = derive_metadata(memory, &perspective, observation.confidence);
127 let (embedding, embedding_model) =
128 maybe_embed(self.embeddings.as_deref(), &observation.content).await;
129
130 let derived = repo
131 .store_with_lineage(StoreMemoryWithLineageParams {
132 store: StoreMemoryParams {
133 namespace_id: memory.namespace_id,
134 content: &observation.content,
135 category: &category,
136 memory_lane_type: memory_lane_type.as_ref(),
137 labels: &observation.labels,
138 metadata: &metadata,
139 embedding: embedding.as_deref(),
140 embedding_model: embedding_model.as_deref(),
141 },
142 source_memory_ids: &[memory.id],
143 evidence_role: DERIVED_FROM_ROLE,
144 })
145 .await
146 .map_err(|error| AgentError::Storage(error.to_string()))?;
147 derived_ids.push(derived.id);
148 }
149
150 enqueue_follow_up_jobs(repo, memory, &perspective, &derived_ids, &self.config).await?;
151
152 info!(
153 memory_id = memory.id,
154 derived_count = derived_ids.len(),
155 "Derived explicit observations from raw memory"
156 );
157 Ok(derived_ids)
158 }
159
160 async fn derive_with_llm(
161 &self,
162 memory: &Memory,
163 ) -> Result<Vec<DerivedObservation>, AgentError> {
164 let params = GenerateParams {
165 messages: vec![
166 ChatMessage::system(DERIVE_SYSTEM_PROMPT),
167 ChatMessage::user(derive_user_prompt(memory)),
168 ],
169 max_tokens: DERIVE_MAX_TOKENS,
170 temperature: 0.1,
171 json_mode: true,
172 };
173
174 let envelope: DerivedObservationEnvelope = self
175 .llm
176 .generate_json(params)
177 .await
178 .map_err(|error| AgentError::Llm(error.to_string()))?;
179
180 Ok(envelope.observations)
181 }
182}
183
184fn derive_metadata(
185 source: &Memory,
186 perspective: &PerspectiveKey,
187 confidence: f32,
188) -> serde_json::Value {
189 let mut cognitive = CognitiveMetadata::new(
190 CognitiveLevel::Explicit,
191 perspective.observer.clone(),
192 perspective.subject.clone(),
193 perspective.session_key.clone(),
194 DERIVE_GENERATED_BY,
195 );
196 cognitive.source_memory_ids = vec![source.id];
197 cognitive.confidence = Some(confidence.max(0.70));
198 cognitive.merge_into(&sanitized_source_metadata(source))
199}
200
201fn fallback_observations(memory: &Memory) -> Vec<DerivedObservation> {
202 let summary = memory
203 .metadata
204 .get("agent")
205 .and_then(|agent| agent.get("summary"))
206 .and_then(serde_json::Value::as_str)
207 .map(str::trim)
208 .filter(|summary| summary.len() >= 16 && !looks_like_noise(summary))
209 .map(ToString::to_string);
210
211 let content = memory
212 .content
213 .split_whitespace()
214 .collect::<Vec<_>>()
215 .join(" ");
216
217 let candidate = summary.unwrap_or(content);
218 if candidate.is_empty() || looks_like_noise(&candidate) {
219 return Vec::new();
220 }
221
222 vec![DerivedObservation {
223 content: candidate,
224 category: memory.category.to_string(),
225 memory_lane_type: memory.memory_lane_type.as_ref().map(ToString::to_string),
226 labels: explicit_labels_from_source(memory),
227 confidence: 0.70,
228 }]
229}
230
231fn sanitized_source_metadata(source: &Memory) -> serde_json::Value {
232 let mut sanitized = serde_json::Map::new();
233
234 if let Some(agent) = source
235 .metadata
236 .get("agent")
237 .and_then(serde_json::Value::as_object)
238 {
239 let mut agent_sanitized = serde_json::Map::new();
240 for key in [
241 "summary",
242 "entities",
243 "topics",
244 "importance_score",
245 "source",
246 "generated_by",
247 ] {
248 if let Some(value) = agent.get(key) {
249 agent_sanitized.insert(key.to_string(), value.clone());
250 }
251 }
252 if !agent_sanitized.is_empty() {
253 sanitized.insert(
254 "agent".to_string(),
255 serde_json::Value::Object(agent_sanitized),
256 );
257 }
258 }
259
260 serde_json::Value::Object(sanitized)
261}
262
263fn explicit_labels_from_source(source: &Memory) -> Vec<String> {
264 let mut labels: Vec<String> = source
265 .labels
266 .iter()
267 .filter(|label| {
268 !label.eq_ignore_ascii_case(RAW_ACTIVITY_LABEL)
269 && !label.eq_ignore_ascii_case(LOW_SIGNAL_LABEL)
270 })
271 .cloned()
272 .collect();
273 dedupe_labels(&mut labels);
274 labels
275}
276
277fn normalize_observations(observations: Vec<DerivedObservation>) -> Vec<DerivedObservation> {
278 let mut seen = HashSet::new();
279 let mut normalized = Vec::new();
280
281 for mut observation in observations {
282 observation.content = observation.content.trim().to_string();
283 if observation.content.is_empty() || observation.confidence < 0.70 {
284 continue;
285 }
286
287 let fingerprint = observation.content.to_lowercase();
288 if !seen.insert(fingerprint) {
289 continue;
290 }
291
292 observation.labels.retain(|label| {
293 !label.eq_ignore_ascii_case(RAW_ACTIVITY_LABEL)
294 && !label.eq_ignore_ascii_case(LOW_SIGNAL_LABEL)
295 });
296 dedupe_labels(&mut observation.labels);
297 normalized.push(observation);
298 }
299
300 normalized
301}
302
303fn dedupe_labels(labels: &mut Vec<String>) {
304 let mut seen = HashSet::new();
305 labels.retain(|label| seen.insert(label.to_lowercase()));
306}
307
308fn looks_like_noise(content: &str) -> bool {
309 let trimmed = content.trim();
310 trimmed.starts_with('{')
311 || trimmed.starts_with('[')
312 || trimmed.contains("\"event_name\"")
313 || trimmed.contains("\"tool_name\"")
314}
315
316fn is_derivable_source(memory: &Memory) -> bool {
317 if cognitive_level_from_metadata(&memory.metadata) == CognitiveLevel::Raw {
318 return true;
319 }
320
321 memory.labels.iter().any(|label| {
322 label.eq_ignore_ascii_case(RAW_ACTIVITY_LABEL)
323 || label.eq_ignore_ascii_case(LOW_SIGNAL_LABEL)
324 }) || memory
325 .metadata
326 .get("raw_activity")
327 .and_then(serde_json::Value::as_bool)
328 .unwrap_or(false)
329 || memory
330 .metadata
331 .get("activity")
332 .and_then(|activity| activity.get("low_signal"))
333 .and_then(serde_json::Value::as_bool)
334 .unwrap_or(false)
335}
336
337async fn existing_derived_ids(
338 repo: &MemoryRepository,
339 source_memory_id: i64,
340) -> Result<Vec<i64>, AgentError> {
341 let mut ids: Vec<i64> = repo
342 .load_lineage(source_memory_id)
343 .await
344 .map_err(|error| AgentError::Storage(error.to_string()))?
345 .into_iter()
346 .filter(|entry| entry.source_memory_id == source_memory_id)
347 .map(|entry| entry.derived_memory_id)
348 .collect();
349 ids.sort_unstable();
350 ids.dedup();
351 Ok(ids)
352}
353
354async fn enqueue_follow_up_jobs(
355 repo: &MemoryRepository,
356 source: &Memory,
357 perspective: &PerspectiveKey,
358 derived_ids: &[i64],
359 config: &AgentConfig,
360) -> Result<(), AgentError> {
361 if derived_ids.is_empty() {
362 return Ok(());
363 }
364
365 let perspective_json = serde_json::to_value(perspective).ok();
366 let reflect_payload = serde_json::json!({
367 "source_memory_id": source.id,
368 "derived_memory_ids": derived_ids,
369 "agent_namespace": config.namespace,
370 });
371
372 repo.enqueue_job(EnqueueJobParams {
373 namespace_id: source.namespace_id,
374 job_type: REFLECT_PERSPECTIVE_JOB,
375 priority: 100,
376 perspective: perspective_json.as_ref(),
377 payload: &reflect_payload,
378 })
379 .await
380 .map_err(|error| AgentError::Storage(error.to_string()))?;
381
382 if let Some(session_key) = &perspective.session_key {
383 let digest_payload = serde_json::json!({
384 "source_memory_id": source.id,
385 "derived_memory_ids": derived_ids,
386 "session_key": session_key,
387 "agent_namespace": config.namespace,
388 });
389 repo.enqueue_job(EnqueueJobParams {
390 namespace_id: source.namespace_id,
391 job_type: DIGEST_SESSION_JOB,
392 priority: 90,
393 perspective: perspective_json.as_ref(),
394 payload: &digest_payload,
395 })
396 .await
397 .map_err(|error| AgentError::Storage(error.to_string()))?;
398 }
399
400 Ok(())
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406
407 use std::collections::VecDeque;
408 use std::sync::Mutex;
409
410 use async_trait::async_trait;
411 use nexus_core::MemoryLanePriorityType;
412 use nexus_llm::GenerateResponse;
413 use nexus_storage::repository::{NamespaceRepository, StoreMemoryParams};
414 use sqlx::sqlite::SqlitePoolOptions;
415
416 struct MockLlmClient {
417 responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
418 }
419
420 impl MockLlmClient {
421 fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
422 Self {
423 responses: Mutex::new(VecDeque::from(responses)),
424 }
425 }
426 }
427
428 #[async_trait]
429 impl LlmClient for MockLlmClient {
430 async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
431 self.responses
432 .lock()
433 .expect("mock responses poisoned")
434 .pop_front()
435 .expect("mock response missing")
436 }
437
438 fn provider_name(&self) -> String {
439 "mock".to_string()
440 }
441
442 fn model_name(&self) -> String {
443 "mock-model".to_string()
444 }
445 }
446
447 async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64) {
448 let pool = SqlitePoolOptions::new()
449 .max_connections(1)
450 .connect("sqlite::memory:")
451 .await
452 .unwrap();
453 nexus_storage::migrations::run_migrations(&pool)
454 .await
455 .unwrap();
456 let namespace_repo = NamespaceRepository::new(pool.clone());
457 let namespace = namespace_repo
458 .get_or_create("derive-test", "derive-test")
459 .await
460 .unwrap();
461 let repo = MemoryRepository::new(pool.clone());
462 (pool, repo, namespace.id)
463 }
464
465 fn raw_memory_metadata(session_key: &str) -> serde_json::Value {
466 let mut cognitive = CognitiveMetadata::new(
467 CognitiveLevel::Raw,
468 "claude-code",
469 "claude-code",
470 Some(session_key.to_string()),
471 "ingest_service",
472 );
473 cognitive.confidence = Some(0.9);
474 cognitive.merge_into(&serde_json::json!({
475 "agent": {
476 "summary": "Fixed the query path and tightened pagination behavior."
477 }
478 }))
479 }
480
481 async fn store_raw_memory(repo: &MemoryRepository, namespace_id: i64, content: &str) -> Memory {
482 repo.store(StoreMemoryParams {
483 namespace_id,
484 content,
485 category: &MemoryCategory::Session,
486 memory_lane_type: Some(&MemoryLaneType::Priority(MemoryLanePriorityType::Decision)),
487 labels: &["memory".to_string(), "memory".to_string()],
488 metadata: &raw_memory_metadata("session-1"),
489 embedding: None,
490 embedding_model: None,
491 })
492 .await
493 .unwrap()
494 }
495
496 #[tokio::test]
497 async fn test_derive_memory_skips_non_raw_memories() {
498 let (_pool, repo, namespace_id) = setup_repo().await;
499 let mut cognitive = CognitiveMetadata::new(
500 CognitiveLevel::Explicit,
501 "claude-code",
502 "claude-code",
503 Some("session-1".to_string()),
504 "derive_service",
505 );
506 cognitive.confidence = Some(0.9);
507
508 let explicit = repo
509 .store(StoreMemoryParams {
510 namespace_id,
511 content: "Already explicit",
512 category: &MemoryCategory::Facts,
513 memory_lane_type: None,
514 labels: &[],
515 metadata: &cognitive.merge_into(&serde_json::json!({})),
516 embedding: None,
517 embedding_model: None,
518 })
519 .await
520 .unwrap();
521
522 let service = DeriveService::new(
523 AgentConfig::default(),
524 Arc::new(MockLlmClient::new(Vec::new())),
525 None,
526 );
527 let derived_ids = service.derive_memory(&explicit, &repo).await.unwrap();
528 assert!(derived_ids.is_empty());
529 }
530
531 #[tokio::test]
532 async fn test_derive_memory_persists_explicit_observations_and_jobs() {
533 let (pool, repo, namespace_id) = setup_repo().await;
534 let raw = store_raw_memory(
535 &repo,
536 namespace_id,
537 "Fixed query pagination and recall ranking.",
538 )
539 .await;
540 let response = GenerateResponse {
541 content: r#"{"observations":[{"content":"Fixed query pagination behavior.","category":"session","memory_lane_type":"decision","labels":["query","pagination"],"confidence":0.9}]}"#.to_string(),
542 model: "mock-model".to_string(),
543 usage: None,
544 };
545 let service = DeriveService::new(
546 AgentConfig::default(),
547 Arc::new(MockLlmClient::new(vec![Ok(response)])),
548 None,
549 );
550
551 let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
552
553 assert_eq!(derived_ids.len(), 1);
554 let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
555 assert_eq!(
556 cognitive_level_from_metadata(&derived.metadata),
557 CognitiveLevel::Explicit
558 );
559 assert_eq!(
560 derived.labels,
561 vec!["query".to_string(), "pagination".to_string()]
562 );
563
564 let lineage = repo.load_lineage(derived.id).await.unwrap();
565 assert_eq!(lineage.len(), 1);
566 assert_eq!(lineage[0].source_memory_id, raw.id);
567
568 let reflect_jobs: i64 =
569 sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs WHERE job_type = ?")
570 .bind(REFLECT_PERSPECTIVE_JOB)
571 .fetch_one(&pool)
572 .await
573 .unwrap();
574 let digest_jobs: i64 =
575 sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs WHERE job_type = ?")
576 .bind(DIGEST_SESSION_JOB)
577 .fetch_one(&pool)
578 .await
579 .unwrap();
580 assert_eq!(reflect_jobs, 1);
581 assert_eq!(digest_jobs, 1);
582 }
583
584 #[tokio::test]
585 async fn test_derive_memory_is_idempotent() {
586 let (pool, repo, namespace_id) = setup_repo().await;
587 let raw = store_raw_memory(
588 &repo,
589 namespace_id,
590 "Introduced working-set retrieval primitives.",
591 )
592 .await;
593 let response = GenerateResponse {
594 content: r#"{"observations":[{"content":"Added working-set retrieval primitives.","category":"facts","memory_lane_type":null,"labels":["retrieval"],"confidence":0.85}]}"#.to_string(),
595 model: "mock-model".to_string(),
596 usage: None,
597 };
598 let service = DeriveService::new(
599 AgentConfig::default(),
600 Arc::new(MockLlmClient::new(vec![Ok(response)])),
601 None,
602 );
603
604 let first = service.derive_memory(&raw, &repo).await.unwrap();
605 let second = service.derive_memory(&raw, &repo).await.unwrap();
606
607 assert_eq!(first, second);
608
609 let job_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs")
610 .fetch_one(&pool)
611 .await
612 .unwrap();
613 assert_eq!(job_count, 2);
614 }
615
616 #[tokio::test]
617 async fn test_derive_memory_falls_back_when_llm_response_is_invalid() {
618 let (_pool, repo, namespace_id) = setup_repo().await;
619 let raw = store_raw_memory(
620 &repo,
621 namespace_id,
622 "Noisy raw content that still has a useful summary.",
623 )
624 .await;
625 let bad_response = GenerateResponse {
626 content: "not json".to_string(),
627 model: "mock-model".to_string(),
628 usage: None,
629 };
630 let service = DeriveService::new(
631 AgentConfig::default(),
632 Arc::new(MockLlmClient::new(vec![Ok(bad_response)])),
633 None,
634 );
635
636 let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
637 assert_eq!(derived_ids.len(), 1);
638
639 let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
640 assert!(derived.content.contains("tightened pagination behavior"));
641 }
642
643 #[tokio::test]
644 async fn test_derive_memory_strips_raw_noise_markers_from_explicit_output() {
645 let (_pool, repo, namespace_id) = setup_repo().await;
646 let raw = repo
647 .store(StoreMemoryParams {
648 namespace_id,
649 content: "Raw hook activity with durable summary.",
650 category: &MemoryCategory::Session,
651 memory_lane_type: None,
652 labels: &[
653 RAW_ACTIVITY_LABEL.to_string(),
654 "query".to_string(),
655 "query".to_string(),
656 ],
657 metadata: &serde_json::json!({
658 "raw_activity": true,
659 "agent": { "summary": "Useful summary survives." },
660 "cognitive": {
661 "level": "raw",
662 "observer": "claude-code",
663 "subject": "claude-code",
664 "session_key": "session-1",
665 "generated_by": "ingest_service"
666 }
667 }),
668 embedding: None,
669 embedding_model: None,
670 })
671 .await
672 .unwrap();
673
674 let response = GenerateResponse {
675 content: r#"{"observations":[{"content":"Useful explicit observation.","category":"facts","memory_lane_type":null,"labels":["raw-activity","query"],"confidence":0.9}]}"#.to_string(),
676 model: "mock-model".to_string(),
677 usage: None,
678 };
679 let service = DeriveService::new(
680 AgentConfig::default(),
681 Arc::new(MockLlmClient::new(vec![Ok(response)])),
682 None,
683 );
684
685 let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
686 let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
687
688 assert!(!derived
689 .labels
690 .iter()
691 .any(|label| label == RAW_ACTIVITY_LABEL));
692 assert!(derived.metadata.get("raw_activity").is_none());
693 assert_eq!(
694 derived.metadata["agent"]["summary"],
695 serde_json::Value::String("Useful summary survives.".to_string())
696 );
697 }
698
699 #[tokio::test]
700 async fn test_derive_memory_produces_embeddings_when_service_provided() {
701 let (_pool, repo, namespace_id) = setup_repo().await;
702 let raw = store_raw_memory(
703 &repo,
704 namespace_id,
705 "Implemented the query path with tight pagination.",
706 )
707 .await;
708
709 let mock_embed = nexus_embeddings::MockEmbeddingService::new();
710 let response = GenerateResponse {
711 content: r#"{"observations":[{"content":"Implemented query path with pagination.","category":"facts","memory_lane_type":null,"labels":["query","pagination"],"confidence":0.85}]}"#.to_string(),
712 model: "mock-model".to_string(),
713 usage: None,
714 };
715 let service = DeriveService::new(
716 AgentConfig::default(),
717 Arc::new(MockLlmClient::new(vec![Ok(response)])),
718 Some(Arc::new(mock_embed)),
719 );
720
721 let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
722 let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
723
724 assert!(
725 derived.content_embedding.is_some(),
726 "derived explicit observation should have an embedding when service is provided"
727 );
728 let embedding = derived.content_embedding.as_ref().unwrap();
729 assert_eq!(embedding.len(), 384, "embedding dimension should be 384");
730 }
731
732 #[tokio::test]
733 async fn test_derive_memory_stores_without_embedding_when_service_absent() {
734 let (_pool, repo, namespace_id) = setup_repo().await;
735 let raw = store_raw_memory(
736 &repo,
737 namespace_id,
738 "Implemented the query path with tight pagination.",
739 )
740 .await;
741
742 let response = GenerateResponse {
743 content: r#"{"observations":[{"content":"Implemented query path with pagination.","category":"facts","memory_lane_type":null,"labels":["query","pagination"],"confidence":0.85}]}"#.to_string(),
744 model: "mock-model".to_string(),
745 usage: None,
746 };
747 let service = DeriveService::new(
748 AgentConfig::default(),
749 Arc::new(MockLlmClient::new(vec![Ok(response)])),
750 None,
751 );
752
753 let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
754 let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
755
756 assert!(
757 derived.content_embedding.is_none(),
758 "derived observation should NOT have an embedding when no service provided"
759 );
760 }
761
762 #[tokio::test]
763 async fn test_derive_memory_accepts_low_signal_activity_sources() {
764 let (_pool, repo, namespace_id) = setup_repo().await;
765 let low_signal = repo
766 .store(StoreMemoryParams {
767 namespace_id,
768 content: "Low signal activity with useful summary.",
769 category: &MemoryCategory::Session,
770 memory_lane_type: None,
771 labels: &[LOW_SIGNAL_LABEL.to_string()],
772 metadata: &serde_json::json!({
773 "activity": { "low_signal": true },
774 "agent": { "summary": "Captured meaningful work despite low signal." },
775 "cognitive": {
776 "level": "explicit",
777 "observer": "claude-code",
778 "subject": "claude-code",
779 "session_key": "session-1",
780 "generated_by": "activity_distiller"
781 }
782 }),
783 embedding: None,
784 embedding_model: None,
785 })
786 .await
787 .unwrap();
788
789 let service = DeriveService::new(
790 AgentConfig::default(),
791 Arc::new(MockLlmClient::new(vec![Err(
792 nexus_llm::LlmError::InvalidJsonResponse("bad".to_string()),
793 )])),
794 None,
795 );
796
797 let derived_ids = service.derive_memory(&low_signal, &repo).await.unwrap();
798 assert_eq!(derived_ids.len(), 1);
799 }
800}