use std::sync::Arc;
use tracing::{Instrument, Level, event, info_span};
use crate::embedding::EmbeddingModel;
use crate::jobs::{Job, JobKind, JobsError, MemoryJobsStore};
use crate::memory::{KindSelector, Memory, RetirementReason, Scope};
use crate::store::{MemoryStore, NewMemory, StoreError};
use crate::vector::VectorIndex;
use super::extract::ExtractError;
use super::ClientInner;
const DEFAULT_NEIGHBORHOOD_FLOOR: f32 = 0.5;
const DEFAULT_NEIGHBORHOOD_TOP_K: usize = 10;
#[derive(Debug, thiserror::Error)]
pub(super) enum ReprocessError {
#[error("invalid reprocess payload: {0}")]
Payload(String),
#[error("store failed: {0}")]
Store(#[from] StoreError),
#[error("embedding failed: {0}")]
Embedding(String),
#[error("neighborhood search failed: {0}")]
Search(String),
#[error("enqueue failed: {0}")]
Enqueue(String),
#[error("re-extraction failed: {0}")]
Extract(#[from] ExtractError),
}
struct ReprocessRequest {
reason: RetirementReason,
feedback: Option<String>,
min_similarity: f32,
top_k: usize,
}
impl ReprocessRequest {
fn from_payload(payload: &serde_json::Value) -> Result<Self, ReprocessError> {
let reason = payload
.get("reason")
.and_then(|v| v.as_str())
.ok_or_else(|| ReprocessError::Payload("missing 'reason'".to_string()))?
.parse::<RetirementReason>()
.map_err(|err| ReprocessError::Payload(format!("bad 'reason': {err}")))?;
let feedback = payload
.get("feedback")
.and_then(|v| v.as_str())
.map(str::to_string)
.filter(|s| !s.trim().is_empty());
let min_similarity = payload
.get("min_similarity")
.and_then(serde_json::Value::as_f64)
.map_or(DEFAULT_NEIGHBORHOOD_FLOOR, |v| v as f32);
let top_k = payload
.get("top_k")
.and_then(serde_json::Value::as_u64)
.map_or(DEFAULT_NEIGHBORHOOD_TOP_K, |v| v as usize);
Ok(Self {
reason,
feedback,
min_similarity,
top_k,
})
}
}
impl ClientInner {
pub(super) async fn run_reprocess(self: &Arc<Self>, job: Job) -> Result<(), ReprocessError> {
let span = info_span!("memoir.reprocess", source_pid = %job.source_pid);
async move { self.run_reprocess_inner(job).await }.instrument(span).await
}
async fn run_reprocess_inner(self: &Arc<Self>, job: Job) -> Result<(), ReprocessError> {
let source_pid = job.source_pid.clone();
let request = ReprocessRequest::from_payload(&job.payload)?;
event!(
name: "memoir.reprocess.started",
Level::INFO,
source_pid = %source_pid,
reason = %request.reason,
has_feedback = request.feedback.is_some(),
"reprocess started for {{source_pid}} ({{reason}})",
);
let source = match self.store.recall(&source_pid).await {
Ok(memory) => memory,
Err(StoreError::NotFound(_)) => {
event!(
name: "memoir.reprocess.source_missing",
Level::INFO,
source_pid = %source_pid,
"source memory absent for {{source_pid}} (cascade delete race); skipping",
);
return Ok(());
}
Err(err) => return Err(ReprocessError::Store(err)),
};
if let Some(feedback) = request.feedback.as_deref() {
self.persist_feedback_row(&source.scope, feedback).await?;
}
let neighborhood = self
.reprocess_neighborhood(&source, request.feedback.as_deref(), request.min_similarity, request.top_k)
.await?;
event!(
name: "memoir.reprocess.neighborhood",
Level::INFO,
source_pid = %source_pid,
neighborhood_size = neighborhood.len(),
"reprocessing {{neighborhood_size}} source(s)",
);
for member in &neighborhood {
let derived = self.store.active_semantics_for_source(&member.pid).await?;
for row in &derived {
self.retire_and_evict_internal(&row.pid, request.reason).await;
}
self.re_extract_source(member, request.feedback.as_deref(), job.id).await?;
}
event!(
name: "memoir.reprocess.done",
Level::INFO,
source_pid = %source_pid,
"reprocess complete for {{source_pid}}",
);
Ok(())
}
async fn persist_feedback_row(self: &Arc<Self>, scope: &Scope, feedback: &str) -> Result<(), ReprocessError> {
let written = self
.store
.remember(NewMemory {
scope: scope.clone(),
content: feedback.to_string(),
metadata: serde_json::json!({ "origin": "feedback" }),
kind: crate::memory::MemoryKind::Episodic,
source_pid: None,
event_at: None,
confidence: crate::memory::Confidence::MAX,
})
.await?;
self.jobs
.enqueue(JobKind::Embed, written.pid, serde_json::json!({ "origin": "feedback" }))
.await
.map_err(|err: JobsError| ReprocessError::Enqueue(err.to_string()))?;
Ok(())
}
async fn reprocess_neighborhood(
self: &Arc<Self>,
source: &Memory,
feedback: Option<&str>,
min_similarity: f32,
top_k: usize,
) -> Result<Vec<Memory>, ReprocessError> {
let query = feedback.unwrap_or(&source.content);
let query_vector = self
.embedder
.embed(query)
.await
.map_err(|err| ReprocessError::Embedding(err.to_string()))?;
let hits = self
.index
.search(
source.scope.clone(),
query_vector,
top_k,
KindSelector {
episodic: true,
semantic: false,
},
None,
Some(min_similarity),
)
.await
.map_err(|err| ReprocessError::Search(err.to_string()))?;
let mut pids: Vec<String> = hits.into_iter().map(|(pid, _)| pid).collect();
if !pids.iter().any(|pid| pid == &source.pid) {
pids.push(source.pid.clone());
}
let pid_refs: Vec<&str> = pids.iter().map(String::as_str).collect();
let rows = self.store.find_by_pids(&pid_refs).await?;
Ok(rows)
}
pub(crate) async fn retire_and_evict(&self, pid: &str, reason: RetirementReason) -> Result<(), super::ClientError> {
self.store.retire(pid, reason).await?;
self.evict_after_retire(pid, reason).await;
Ok(())
}
async fn retire_and_evict_internal(&self, pid: &str, reason: RetirementReason) {
if let Err(err) = self.store.retire(pid, reason).await {
event!(
name: "memoir.reprocess.retire_failed",
Level::WARN,
pid = %pid,
reason = %reason,
error.message = %err,
"retire failed for {{pid}} during reprocess: {{error.message}}; continuing",
);
return;
}
self.evict_after_retire(pid, reason).await;
}
async fn evict_after_retire(&self, pid: &str, reason: RetirementReason) {
if let Err(err) = self.index.delete_by_pids(&[pid]).await {
event!(
name: "memoir.retire.index_delete_failed",
Level::WARN,
pid = %pid,
reason = %reason,
error.message = %err,
"vector evict failed for {{pid}} ({{reason}}): {{error.message}} — reconciliation will clean up the orphan",
);
} else {
event!(
name: "memoir.retire.success",
Level::INFO,
pid = %pid,
reason = %reason,
"retired {{pid}} as {{reason}}",
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_parse_reason_from_payload() {
let payload = serde_json::json!({ "reason": "rejected" });
let req = ReprocessRequest::from_payload(&payload).unwrap();
assert_eq!(req.reason, RetirementReason::Rejected);
assert!(req.feedback.is_none());
}
#[test]
fn should_default_floor_and_top_k_when_payload_omits_them() {
let payload = serde_json::json!({ "reason": "stale" });
let req = ReprocessRequest::from_payload(&payload).unwrap();
assert_eq!(req.min_similarity, DEFAULT_NEIGHBORHOOD_FLOOR);
assert_eq!(req.top_k, DEFAULT_NEIGHBORHOOD_TOP_K);
}
#[test]
fn should_carry_feedback_and_overrides_from_payload() {
let payload = serde_json::json!({
"reason": "rejected",
"feedback": "they actually love green",
"min_similarity": 0.7,
"top_k": 3,
});
let req = ReprocessRequest::from_payload(&payload).unwrap();
assert_eq!(req.feedback.as_deref(), Some("they actually love green"));
assert_eq!(req.min_similarity, 0.7);
assert_eq!(req.top_k, 3);
}
#[test]
fn should_treat_blank_feedback_as_none() {
let payload = serde_json::json!({ "reason": "stale", "feedback": " " });
let req = ReprocessRequest::from_payload(&payload).unwrap();
assert!(req.feedback.is_none());
}
#[test]
fn should_error_when_reason_missing() {
let payload = serde_json::json!({ "feedback": "x" });
assert!(ReprocessRequest::from_payload(&payload).is_err());
}
#[test]
fn should_error_when_reason_unparseable() {
let payload = serde_json::json!({ "reason": "nonsense" });
assert!(ReprocessRequest::from_payload(&payload).is_err());
}
}