use std::collections::HashSet;
use std::sync::Arc;
use nexus_core::config::AgentConfig;
use nexus_core::traits::EmbeddingService;
use nexus_core::{
cognitive_level_from_metadata, infer_perspective, perspective_from_metadata, CognitiveLevel,
CognitiveMetadata, Memory, MemoryCategory, MemoryLaneType, PerspectiveKey, PerspectiveSource,
};
use nexus_llm::{ChatMessage, GenerateParams, LlmClient, LlmClientJson};
use nexus_storage::models::EnqueueJobParams;
use nexus_storage::repository::{
MemoryRepository, StoreMemoryParams, StoreMemoryWithLineageParams,
};
use tracing::{debug, info, warn};
use crate::error::AgentError;
use crate::prompts::{derive_user_prompt, DERIVE_SYSTEM_PROMPT};
use crate::util::maybe_embed;
const DERIVE_MAX_TOKENS: u32 = 4096;
const REFLECT_PERSPECTIVE_JOB: &str = "reflect_perspective";
const DIGEST_SESSION_JOB: &str = "digest_session";
const DERIVE_GENERATED_BY: &str = "derive_service";
const DERIVED_FROM_ROLE: &str = "derived_from";
const RAW_ACTIVITY_LABEL: &str = "raw-activity";
const LOW_SIGNAL_LABEL: &str = "low-signal";
pub struct DeriveService {
config: AgentConfig,
llm: Arc<dyn LlmClient>,
embeddings: Option<Arc<dyn EmbeddingService>>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DerivedObservation {
pub content: String,
pub category: String,
pub memory_lane_type: Option<String>,
pub labels: Vec<String>,
pub confidence: f32,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct DerivedObservationEnvelope {
observations: Vec<DerivedObservation>,
}
impl DeriveService {
pub fn new(
config: AgentConfig,
llm: Arc<dyn LlmClient>,
embeddings: Option<Arc<dyn EmbeddingService>>,
) -> Self {
Self {
config,
llm,
embeddings,
}
}
pub async fn derive_memory(
&self,
memory: &Memory,
repo: &MemoryRepository,
) -> Result<Vec<i64>, AgentError> {
self.derive_memory_with_perspective(memory, None, repo)
.await
}
pub async fn derive_memory_with_perspective(
&self,
memory: &Memory,
queued_perspective: Option<&PerspectiveKey>,
repo: &MemoryRepository,
) -> Result<Vec<i64>, AgentError> {
if !is_derivable_source(memory) {
return Ok(Vec::new());
}
let existing_ids = existing_derived_ids(repo, memory.id).await?;
if !existing_ids.is_empty() {
debug!(
memory_id = memory.id,
derived_count = existing_ids.len(),
"Reusing existing derived observations"
);
return Ok(existing_ids);
}
let perspective = queued_perspective
.cloned()
.or_else(|| perspective_from_metadata(&memory.metadata))
.unwrap_or_else(|| {
infer_perspective(
PerspectiveSource::HookIngest,
self.config.namespace.clone(),
None,
None,
)
});
let observations = match self.derive_with_llm(memory).await {
Ok(observations) => observations,
Err(error) => {
warn!(memory_id = memory.id, %error, "LLM derivation failed, using fallback");
fallback_observations(memory)
}
};
let observations = normalize_observations(observations);
if observations.is_empty() {
debug!(memory_id = memory.id, "No explicit observations derived");
return Ok(Vec::new());
}
let mut derived_ids = Vec::with_capacity(observations.len());
for observation in observations {
let category = MemoryCategory::parse(&observation.category).unwrap_or(memory.category);
let memory_lane_type = observation
.memory_lane_type
.as_deref()
.and_then(MemoryLaneType::parse);
let metadata = derive_metadata(memory, &perspective, observation.confidence);
let (embedding, embedding_model) =
maybe_embed(self.embeddings.as_deref(), &observation.content).await;
let derived = repo
.store_with_lineage(StoreMemoryWithLineageParams {
store: StoreMemoryParams {
namespace_id: memory.namespace_id,
content: &observation.content,
category: &category,
memory_lane_type: memory_lane_type.as_ref(),
labels: &observation.labels,
metadata: &metadata,
embedding: embedding.as_deref(),
embedding_model: embedding_model.as_deref(),
},
source_memory_ids: &[memory.id],
evidence_role: DERIVED_FROM_ROLE,
})
.await
.map_err(|error| AgentError::Storage(error.to_string()))?;
derived_ids.push(derived.id);
}
enqueue_follow_up_jobs(repo, memory, &perspective, &derived_ids, &self.config).await?;
info!(
memory_id = memory.id,
derived_count = derived_ids.len(),
"Derived explicit observations from raw memory"
);
Ok(derived_ids)
}
async fn derive_with_llm(
&self,
memory: &Memory,
) -> Result<Vec<DerivedObservation>, AgentError> {
let params = GenerateParams {
messages: vec![
ChatMessage::system(DERIVE_SYSTEM_PROMPT),
ChatMessage::user(derive_user_prompt(memory)),
],
max_tokens: DERIVE_MAX_TOKENS,
temperature: 0.1,
json_mode: true,
};
let envelope: DerivedObservationEnvelope = self
.llm
.generate_json(params)
.await
.map_err(|error| AgentError::Llm(error.to_string()))?;
Ok(envelope.observations)
}
}
fn derive_metadata(
source: &Memory,
perspective: &PerspectiveKey,
confidence: f32,
) -> serde_json::Value {
let mut cognitive = CognitiveMetadata::new(
CognitiveLevel::Explicit,
perspective.observer.clone(),
perspective.subject.clone(),
perspective.session_key.clone(),
DERIVE_GENERATED_BY,
);
cognitive.source_memory_ids = vec![source.id];
cognitive.confidence = Some(confidence.max(0.70));
cognitive.merge_into(&sanitized_source_metadata(source))
}
fn fallback_observations(memory: &Memory) -> Vec<DerivedObservation> {
let summary = memory
.metadata
.get("agent")
.and_then(|agent| agent.get("summary"))
.and_then(serde_json::Value::as_str)
.map(str::trim)
.filter(|summary| summary.len() >= 16 && !looks_like_noise(summary))
.map(ToString::to_string);
let content = memory
.content
.split_whitespace()
.collect::<Vec<_>>()
.join(" ");
let candidate = summary.unwrap_or(content);
if candidate.is_empty() || looks_like_noise(&candidate) {
return Vec::new();
}
vec![DerivedObservation {
content: candidate,
category: memory.category.to_string(),
memory_lane_type: memory.memory_lane_type.as_ref().map(ToString::to_string),
labels: explicit_labels_from_source(memory),
confidence: 0.70,
}]
}
fn sanitized_source_metadata(source: &Memory) -> serde_json::Value {
let mut sanitized = serde_json::Map::new();
if let Some(agent) = source
.metadata
.get("agent")
.and_then(serde_json::Value::as_object)
{
let mut agent_sanitized = serde_json::Map::new();
for key in [
"summary",
"entities",
"topics",
"importance_score",
"source",
"generated_by",
] {
if let Some(value) = agent.get(key) {
agent_sanitized.insert(key.to_string(), value.clone());
}
}
if !agent_sanitized.is_empty() {
sanitized.insert(
"agent".to_string(),
serde_json::Value::Object(agent_sanitized),
);
}
}
serde_json::Value::Object(sanitized)
}
fn explicit_labels_from_source(source: &Memory) -> Vec<String> {
let mut labels: Vec<String> = source
.labels
.iter()
.filter(|label| {
!label.eq_ignore_ascii_case(RAW_ACTIVITY_LABEL)
&& !label.eq_ignore_ascii_case(LOW_SIGNAL_LABEL)
})
.cloned()
.collect();
dedupe_labels(&mut labels);
labels
}
fn normalize_observations(observations: Vec<DerivedObservation>) -> Vec<DerivedObservation> {
let mut seen = HashSet::new();
let mut normalized = Vec::new();
for mut observation in observations {
observation.content = observation.content.trim().to_string();
if observation.content.is_empty() || observation.confidence < 0.70 {
continue;
}
let fingerprint = observation.content.to_lowercase();
if !seen.insert(fingerprint) {
continue;
}
observation.labels.retain(|label| {
!label.eq_ignore_ascii_case(RAW_ACTIVITY_LABEL)
&& !label.eq_ignore_ascii_case(LOW_SIGNAL_LABEL)
});
dedupe_labels(&mut observation.labels);
normalized.push(observation);
}
normalized
}
fn dedupe_labels(labels: &mut Vec<String>) {
let mut seen = HashSet::new();
labels.retain(|label| seen.insert(label.to_lowercase()));
}
fn looks_like_noise(content: &str) -> bool {
let trimmed = content.trim();
trimmed.starts_with('{')
|| trimmed.starts_with('[')
|| trimmed.contains("\"event_name\"")
|| trimmed.contains("\"tool_name\"")
}
fn is_derivable_source(memory: &Memory) -> bool {
if cognitive_level_from_metadata(&memory.metadata) == CognitiveLevel::Raw {
return true;
}
memory.labels.iter().any(|label| {
label.eq_ignore_ascii_case(RAW_ACTIVITY_LABEL)
|| label.eq_ignore_ascii_case(LOW_SIGNAL_LABEL)
}) || memory
.metadata
.get("raw_activity")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false)
|| memory
.metadata
.get("activity")
.and_then(|activity| activity.get("low_signal"))
.and_then(serde_json::Value::as_bool)
.unwrap_or(false)
}
async fn existing_derived_ids(
repo: &MemoryRepository,
source_memory_id: i64,
) -> Result<Vec<i64>, AgentError> {
let mut ids: Vec<i64> = repo
.load_lineage(source_memory_id)
.await
.map_err(|error| AgentError::Storage(error.to_string()))?
.into_iter()
.filter(|entry| entry.source_memory_id == source_memory_id)
.map(|entry| entry.derived_memory_id)
.collect();
ids.sort_unstable();
ids.dedup();
Ok(ids)
}
async fn enqueue_follow_up_jobs(
repo: &MemoryRepository,
source: &Memory,
perspective: &PerspectiveKey,
derived_ids: &[i64],
config: &AgentConfig,
) -> Result<(), AgentError> {
if derived_ids.is_empty() {
return Ok(());
}
let perspective_json = serde_json::to_value(perspective).ok();
let reflect_payload = serde_json::json!({
"source_memory_id": source.id,
"derived_memory_ids": derived_ids,
"agent_namespace": config.namespace,
});
repo.enqueue_job(EnqueueJobParams {
namespace_id: source.namespace_id,
job_type: REFLECT_PERSPECTIVE_JOB,
priority: 100,
perspective: perspective_json.as_ref(),
payload: &reflect_payload,
})
.await
.map_err(|error| AgentError::Storage(error.to_string()))?;
if let Some(session_key) = &perspective.session_key {
let digest_payload = serde_json::json!({
"source_memory_id": source.id,
"derived_memory_ids": derived_ids,
"session_key": session_key,
"agent_namespace": config.namespace,
});
repo.enqueue_job(EnqueueJobParams {
namespace_id: source.namespace_id,
job_type: DIGEST_SESSION_JOB,
priority: 90,
perspective: perspective_json.as_ref(),
payload: &digest_payload,
})
.await
.map_err(|error| AgentError::Storage(error.to_string()))?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use std::sync::Mutex;
use async_trait::async_trait;
use nexus_core::MemoryLanePriorityType;
use nexus_llm::GenerateResponse;
use nexus_storage::repository::{NamespaceRepository, StoreMemoryParams};
use sqlx::sqlite::SqlitePoolOptions;
struct MockLlmClient {
responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
}
impl MockLlmClient {
fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
Self {
responses: Mutex::new(VecDeque::from(responses)),
}
}
}
#[async_trait]
impl LlmClient for MockLlmClient {
async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
self.responses
.lock()
.expect("mock responses poisoned")
.pop_front()
.expect("mock response missing")
}
fn provider_name(&self) -> String {
"mock".to_string()
}
fn model_name(&self) -> String {
"mock-model".to_string()
}
}
async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64) {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
nexus_storage::migrations::run_migrations(&pool)
.await
.unwrap();
let namespace_repo = NamespaceRepository::new(pool.clone());
let namespace = namespace_repo
.get_or_create("derive-test", "derive-test")
.await
.unwrap();
let repo = MemoryRepository::new(pool.clone());
(pool, repo, namespace.id)
}
fn raw_memory_metadata(session_key: &str) -> serde_json::Value {
let mut cognitive = CognitiveMetadata::new(
CognitiveLevel::Raw,
"claude-code",
"claude-code",
Some(session_key.to_string()),
"ingest_service",
);
cognitive.confidence = Some(0.9);
cognitive.merge_into(&serde_json::json!({
"agent": {
"summary": "Fixed the query path and tightened pagination behavior."
}
}))
}
async fn store_raw_memory(repo: &MemoryRepository, namespace_id: i64, content: &str) -> Memory {
repo.store(StoreMemoryParams {
namespace_id,
content,
category: &MemoryCategory::Session,
memory_lane_type: Some(&MemoryLaneType::Priority(MemoryLanePriorityType::Decision)),
labels: &["memory".to_string(), "memory".to_string()],
metadata: &raw_memory_metadata("session-1"),
embedding: None,
embedding_model: None,
})
.await
.unwrap()
}
#[tokio::test]
async fn test_derive_memory_skips_non_raw_memories() {
let (_pool, repo, namespace_id) = setup_repo().await;
let mut cognitive = CognitiveMetadata::new(
CognitiveLevel::Explicit,
"claude-code",
"claude-code",
Some("session-1".to_string()),
"derive_service",
);
cognitive.confidence = Some(0.9);
let explicit = repo
.store(StoreMemoryParams {
namespace_id,
content: "Already explicit",
category: &MemoryCategory::Facts,
memory_lane_type: None,
labels: &[],
metadata: &cognitive.merge_into(&serde_json::json!({})),
embedding: None,
embedding_model: None,
})
.await
.unwrap();
let service = DeriveService::new(
AgentConfig::default(),
Arc::new(MockLlmClient::new(Vec::new())),
None,
);
let derived_ids = service.derive_memory(&explicit, &repo).await.unwrap();
assert!(derived_ids.is_empty());
}
#[tokio::test]
async fn test_derive_memory_persists_explicit_observations_and_jobs() {
let (pool, repo, namespace_id) = setup_repo().await;
let raw = store_raw_memory(
&repo,
namespace_id,
"Fixed query pagination and recall ranking.",
)
.await;
let response = GenerateResponse {
content: r#"{"observations":[{"content":"Fixed query pagination behavior.","category":"session","memory_lane_type":"decision","labels":["query","pagination"],"confidence":0.9}]}"#.to_string(),
model: "mock-model".to_string(),
usage: None,
};
let service = DeriveService::new(
AgentConfig::default(),
Arc::new(MockLlmClient::new(vec![Ok(response)])),
None,
);
let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
assert_eq!(derived_ids.len(), 1);
let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
assert_eq!(
cognitive_level_from_metadata(&derived.metadata),
CognitiveLevel::Explicit
);
assert_eq!(
derived.labels,
vec!["query".to_string(), "pagination".to_string()]
);
let lineage = repo.load_lineage(derived.id).await.unwrap();
assert_eq!(lineage.len(), 1);
assert_eq!(lineage[0].source_memory_id, raw.id);
let reflect_jobs: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs WHERE job_type = ?")
.bind(REFLECT_PERSPECTIVE_JOB)
.fetch_one(&pool)
.await
.unwrap();
let digest_jobs: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs WHERE job_type = ?")
.bind(DIGEST_SESSION_JOB)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(reflect_jobs, 1);
assert_eq!(digest_jobs, 1);
}
#[tokio::test]
async fn test_derive_memory_is_idempotent() {
let (pool, repo, namespace_id) = setup_repo().await;
let raw = store_raw_memory(
&repo,
namespace_id,
"Introduced working-set retrieval primitives.",
)
.await;
let response = GenerateResponse {
content: r#"{"observations":[{"content":"Added working-set retrieval primitives.","category":"facts","memory_lane_type":null,"labels":["retrieval"],"confidence":0.85}]}"#.to_string(),
model: "mock-model".to_string(),
usage: None,
};
let service = DeriveService::new(
AgentConfig::default(),
Arc::new(MockLlmClient::new(vec![Ok(response)])),
None,
);
let first = service.derive_memory(&raw, &repo).await.unwrap();
let second = service.derive_memory(&raw, &repo).await.unwrap();
assert_eq!(first, second);
let job_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(job_count, 2);
}
#[tokio::test]
async fn test_derive_memory_falls_back_when_llm_response_is_invalid() {
let (_pool, repo, namespace_id) = setup_repo().await;
let raw = store_raw_memory(
&repo,
namespace_id,
"Noisy raw content that still has a useful summary.",
)
.await;
let bad_response = GenerateResponse {
content: "not json".to_string(),
model: "mock-model".to_string(),
usage: None,
};
let service = DeriveService::new(
AgentConfig::default(),
Arc::new(MockLlmClient::new(vec![Ok(bad_response)])),
None,
);
let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
assert_eq!(derived_ids.len(), 1);
let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
assert!(derived.content.contains("tightened pagination behavior"));
}
#[tokio::test]
async fn test_derive_memory_strips_raw_noise_markers_from_explicit_output() {
let (_pool, repo, namespace_id) = setup_repo().await;
let raw = repo
.store(StoreMemoryParams {
namespace_id,
content: "Raw hook activity with durable summary.",
category: &MemoryCategory::Session,
memory_lane_type: None,
labels: &[
RAW_ACTIVITY_LABEL.to_string(),
"query".to_string(),
"query".to_string(),
],
metadata: &serde_json::json!({
"raw_activity": true,
"agent": { "summary": "Useful summary survives." },
"cognitive": {
"level": "raw",
"observer": "claude-code",
"subject": "claude-code",
"session_key": "session-1",
"generated_by": "ingest_service"
}
}),
embedding: None,
embedding_model: None,
})
.await
.unwrap();
let response = GenerateResponse {
content: r#"{"observations":[{"content":"Useful explicit observation.","category":"facts","memory_lane_type":null,"labels":["raw-activity","query"],"confidence":0.9}]}"#.to_string(),
model: "mock-model".to_string(),
usage: None,
};
let service = DeriveService::new(
AgentConfig::default(),
Arc::new(MockLlmClient::new(vec![Ok(response)])),
None,
);
let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
assert!(!derived
.labels
.iter()
.any(|label| label == RAW_ACTIVITY_LABEL));
assert!(derived.metadata.get("raw_activity").is_none());
assert_eq!(
derived.metadata["agent"]["summary"],
serde_json::Value::String("Useful summary survives.".to_string())
);
}
#[tokio::test]
async fn test_derive_memory_produces_embeddings_when_service_provided() {
let (_pool, repo, namespace_id) = setup_repo().await;
let raw = store_raw_memory(
&repo,
namespace_id,
"Implemented the query path with tight pagination.",
)
.await;
let mock_embed = nexus_embeddings::MockEmbeddingService::new();
let response = GenerateResponse {
content: r#"{"observations":[{"content":"Implemented query path with pagination.","category":"facts","memory_lane_type":null,"labels":["query","pagination"],"confidence":0.85}]}"#.to_string(),
model: "mock-model".to_string(),
usage: None,
};
let service = DeriveService::new(
AgentConfig::default(),
Arc::new(MockLlmClient::new(vec![Ok(response)])),
Some(Arc::new(mock_embed)),
);
let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
assert!(
derived.content_embedding.is_some(),
"derived explicit observation should have an embedding when service is provided"
);
let embedding = derived.content_embedding.as_ref().unwrap();
assert_eq!(embedding.len(), 384, "embedding dimension should be 384");
}
#[tokio::test]
async fn test_derive_memory_stores_without_embedding_when_service_absent() {
let (_pool, repo, namespace_id) = setup_repo().await;
let raw = store_raw_memory(
&repo,
namespace_id,
"Implemented the query path with tight pagination.",
)
.await;
let response = GenerateResponse {
content: r#"{"observations":[{"content":"Implemented query path with pagination.","category":"facts","memory_lane_type":null,"labels":["query","pagination"],"confidence":0.85}]}"#.to_string(),
model: "mock-model".to_string(),
usage: None,
};
let service = DeriveService::new(
AgentConfig::default(),
Arc::new(MockLlmClient::new(vec![Ok(response)])),
None,
);
let derived_ids = service.derive_memory(&raw, &repo).await.unwrap();
let derived = repo.get_by_id(derived_ids[0]).await.unwrap().unwrap();
assert!(
derived.content_embedding.is_none(),
"derived observation should NOT have an embedding when no service provided"
);
}
#[tokio::test]
async fn test_derive_memory_accepts_low_signal_activity_sources() {
let (_pool, repo, namespace_id) = setup_repo().await;
let low_signal = repo
.store(StoreMemoryParams {
namespace_id,
content: "Low signal activity with useful summary.",
category: &MemoryCategory::Session,
memory_lane_type: None,
labels: &[LOW_SIGNAL_LABEL.to_string()],
metadata: &serde_json::json!({
"activity": { "low_signal": true },
"agent": { "summary": "Captured meaningful work despite low signal." },
"cognitive": {
"level": "explicit",
"observer": "claude-code",
"subject": "claude-code",
"session_key": "session-1",
"generated_by": "activity_distiller"
}
}),
embedding: None,
embedding_model: None,
})
.await
.unwrap();
let service = DeriveService::new(
AgentConfig::default(),
Arc::new(MockLlmClient::new(vec![Err(
nexus_llm::LlmError::InvalidJsonResponse("bad".to_string()),
)])),
None,
);
let derived_ids = service.derive_memory(&low_signal, &repo).await.unwrap();
assert_eq!(derived_ids.len(), 1);
}
}