memoir_core/client/mod.rs
1//! High-level facade composing the embedder, store, and vector index.
2
3mod admin;
4mod categorize;
5mod edit;
6mod embed;
7mod error;
8mod extract;
9mod feedback;
10mod query;
11mod recall_as_of;
12mod reconcile;
13#[cfg(feature = "knowledge-graph")]
14mod relational;
15mod remember;
16mod reprocess;
17mod search;
18#[cfg(feature = "knowledge-graph")]
19mod synthesize;
20mod timeline;
21mod worker;
22
23#[cfg(feature = "knowledge-graph")]
24pub use admin::GraphInspectionBuilder;
25pub use admin::{ExtractionStatsBuilder, RetryBuilder};
26pub use edit::EditBuilder;
27pub use error::ClientError;
28pub use feedback::FeedbackBuilder;
29pub use query::{
30 BlendWeights, DEFAULT_HYBRID_ALPHA, DEFAULT_HYBRID_HALF_LIFE_DAYS, DEFAULT_QUERY_LIMIT, DecayFn, MemoryContext,
31 QueryBuilder, RankingStrategy,
32};
33pub use recall_as_of::RecallAsOfBuilder;
34pub use reconcile::{ReconcileBuilder, ReconcileSummary};
35pub use remember::{DEFAULT_SYSTEM_PROMPT, RememberBuilder};
36pub use search::{DEFAULT_LIMIT, SearchBuilder};
37pub use timeline::TimelineBuilder;
38pub use worker::{
39 DEFAULT_DRAIN_TIMEOUT, DEFAULT_LEASE_DURATION, DEFAULT_MAX_ATTEMPTS, DEFAULT_POLL_INTERVAL, WorkerBuilder,
40 WorkerHandle,
41};
42
43use std::sync::Arc;
44
45use bon::bon;
46use sea_orm::{ConnectOptions, Database};
47
48use crate::embedding::{EmbeddingModel, OnnxEmbedding};
49#[cfg(feature = "knowledge-graph")]
50use crate::graph::GraphStore;
51use crate::jobs::{MemoryJobsStore, PostgresJobsStore};
52use crate::llm::{LlmConfig, LlmRegistry, LlmRole};
53use crate::memory::{ForgetTarget, Memory, SupersessionEvent};
54use crate::store::{MemoryStore, PostgresStore};
55use crate::vector::{QdrantIndex, VectorIndex};
56
57/// Shared internal state held by [`Client`] behind an `Arc`.
58pub(crate) struct ClientInner {
59 pub(crate) embedder: Arc<OnnxEmbedding>,
60 pub(crate) store: PostgresStore,
61 pub(crate) index: QdrantIndex,
62 pub(crate) jobs: PostgresJobsStore,
63 pub(crate) llms: LlmRegistry,
64 /// Optional NLI classifier for the categorize stage (epic 0011).
65 ///
66 /// `None` when no classifier is configured — categorization is then a
67 /// no-op and the extract stage skips enqueuing categorize jobs. Behind
68 /// `Arc` because the classifier is `Send + Sync` and shared into the
69 /// `spawn_blocking` inference task.
70 pub(crate) nli: Option<Arc<crate::nli::NliClassifier>>,
71 pub(crate) schema: String,
72 pub(crate) system_prompt: Option<String>,
73 /// Optional FalkorDB knowledge-graph store
74 ///
75 /// `None` when no FalkorDB connection was supplied — graph features are then
76 /// absent and recall returns only vector hits. Only present with the
77 /// `knowledge-graph` feature; vector-only builds omit the field entirely.
78 /// Behind `Arc` so the per-job relational catalogs can share one connection
79 /// without reconnecting.
80 #[cfg(feature = "knowledge-graph")]
81 pub(crate) graph: Option<std::sync::Arc<crate::graph::FalkorGraphStore>>,
82
83 /// Staging store for relational triples awaiting the synthesis fan-in.
84 ///
85 /// Worker-internal handoff between `relational_extract` (which stages) and
86 /// `synthesize` (which reads, commits, and clears). Present whenever the
87 /// `knowledge-graph` feature is built; unused when no graph is configured.
88 #[cfg(feature = "knowledge-graph")]
89 pub(crate) triple_staging: crate::graph::TripleStaging,
90}
91
92/// High-level facade composing the embedder, store, and vector index.
93///
94/// Constructed via [`Client::builder`]. Cheap to clone — internally backed by
95/// `Arc` so multiple call sites can share one configured Memoir instance.
96#[derive(Clone)]
97pub struct Client {
98 pub(crate) inner: Arc<ClientInner>,
99}
100
101impl std::fmt::Debug for Client {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 f.debug_struct("Client")
104 .field("schema", &self.inner.schema)
105 .field("collection", &self.inner.index.collection_name())
106 .finish_non_exhaustive()
107 }
108}
109
110#[bon]
111impl Client {
112 /// Builds a [`Client`] from Postgres and Qdrant connection strings.
113 ///
114 /// memoir-core owns its own connection pool. The pool's `search_path` is
115 /// pinned to the configured schema so memoir-core's tables and
116 /// migration ledger never collide with the consumer's other Postgres
117 /// state. The consumer never sees a [`sea_orm::DatabaseConnection`] — this is a
118 /// deliberate boundary so the library can manage its own connection
119 /// lifecycle (search_path, pool sizing, future read-replica routing)
120 /// without each consumer reinventing the same plumbing.
121 ///
122 /// LLM providers are configured per [`LlmRole`] via the `extraction_llm`
123 /// and `contradiction_llm` setters. A role left unconfigured produces a
124 /// registry with no entry for that role; downstream call sites
125 /// (e.g. the extraction worker stage) skip gracefully when their
126 /// preferred role is absent.
127 ///
128 /// # Examples
129 ///
130 /// ```no_run
131 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
132 /// use memoir_core::client::Client;
133 /// use memoir_core::llm::LlmConfig;
134 ///
135 /// let client = Client::builder()
136 /// .database_url("postgres://postgres:postgres@localhost:54321/my_app")
137 /// .qdrant("http://localhost:6334")
138 /// .schema("memoir")
139 /// .extraction_llm(LlmConfig::ollama("http://localhost:11434", "llama3.2"))
140 /// .build()
141 /// .await?;
142 /// # Ok(())
143 /// # }
144 /// ```
145 ///
146 /// # Errors
147 ///
148 /// Returns [`ClientError::Database`] when the pool cannot connect,
149 /// [`ClientError::Embedding`] if the embedder fails to initialize,
150 /// [`ClientError::Vector`] if `ensure_collection` fails on Qdrant, and
151 /// [`ClientError::Llm`] if a configured provider can't be constructed
152 /// (e.g. malformed URL or api-key rejected by rig).
153 #[builder(start_fn = builder, finish_fn = build)]
154 pub async fn new(
155 #[builder(into)] database_url: String,
156 #[builder(into)] qdrant: String,
157 #[builder(into)] schema: Option<String>,
158 #[builder(into)] system_prompt: Option<String>,
159 #[builder(into)] collection: Option<String>,
160 extraction_llm: Option<LlmConfig>,
161 contradiction_llm: Option<LlmConfig>,
162 categorize_model: Option<crate::nli::NliConfig>,
163 #[cfg(feature = "knowledge-graph")]
164 #[builder(into)]
165 falkor: Option<String>,
166 #[cfg(feature = "knowledge-graph")]
167 #[builder(into)]
168 graph_name: Option<String>,
169 #[cfg(feature = "knowledge-graph")] relational_llm: Option<LlmConfig>,
170 ) -> Result<Client, ClientError> {
171 let schema = schema.unwrap_or_else(|| crate::migration::DEFAULT_SCHEMA.to_string());
172
173 // Pin the pool to memoir-core's schema. Every connection the pool
174 // hands out — including ones sea-orm-migration grabs for
175 // `Migrator::up` — resolves unqualified table names against this
176 // path. Listing `public` second lets shared extensions (pgcrypto,
177 // etc.) resolve.
178 let search_path = format!("{schema},public");
179 let options = ConnectOptions::new(database_url)
180 .set_schema_search_path(search_path)
181 .to_owned();
182 let db = Database::connect(options).await.map_err(ClientError::Database)?;
183
184 let embedder = OnnxEmbedding::new()?;
185 let store = PostgresStore::new(db.clone());
186 #[cfg(feature = "knowledge-graph")]
187 let triple_staging = crate::graph::TripleStaging::new(db.clone());
188 let jobs = PostgresJobsStore::new(db);
189 let index = match collection {
190 Some(name) => QdrantIndex::connect(qdrant)?.with_collection(name),
191 None => QdrantIndex::connect(qdrant)?,
192 };
193
194 index.ensure_collection(embedder.dimensions()).await?;
195
196 let mut llms = LlmRegistry::new();
197 if let Some(config) = extraction_llm {
198 llms.install(LlmRole::Extraction, config)?;
199 }
200 if let Some(config) = contradiction_llm {
201 llms.install(LlmRole::Contradiction, config)?;
202 }
203 #[cfg(feature = "knowledge-graph")]
204 if let Some(config) = relational_llm {
205 llms.install(LlmRole::Relational, config)?;
206 }
207
208 // Build the NLI classifier only when a model is configured — it
209 // downloads an ~87MB model on first construction, so consumers who
210 // don't want categorization shouldn't pay for it. `new()` is
211 // sync-blocking (HF download), so it runs on the blocking pool to
212 // avoid stalling the async runtime. Pass `NliConfig::default()` for
213 // the model memoir ships with.
214 let nli = if let Some(config) = categorize_model {
215 let classifier = tokio::task::spawn_blocking(move || crate::nli::NliClassifier::new(config))
216 .await
217 .map_err(|join_err| ClientError::Nli(format!("classifier init task panicked: {join_err}")))?
218 .map_err(|nli_err| ClientError::Nli(nli_err.to_string()))?;
219 Some(Arc::new(classifier))
220 } else {
221 None
222 };
223
224 // A `graph_name` with no `falkor` connection is a misconfiguration, not
225 // a silent fallback to vector-only.
226 #[cfg(feature = "knowledge-graph")]
227 let graph = match (falkor, graph_name) {
228 (Some(url), name) => {
229 let name = name.unwrap_or_else(|| crate::graph::DEFAULT_GRAPH_NAME.to_string());
230 let store = crate::graph::FalkorGraphStore::connect(url, name).await?;
231 crate::graph::GraphStore::ensure_graph(&store).await?;
232 Some(std::sync::Arc::new(store))
233 }
234 (None, Some(_)) => return Err(ClientError::GraphNotConfigured),
235 (None, None) => None,
236 };
237
238 Ok(Client {
239 inner: Arc::new(ClientInner {
240 embedder: Arc::new(embedder),
241 store,
242 index,
243 jobs,
244 llms,
245 nli,
246 schema,
247 system_prompt,
248 #[cfg(feature = "knowledge-graph")]
249 graph,
250 #[cfg(feature = "knowledge-graph")]
251 triple_staging,
252 }),
253 })
254 }
255}
256
257impl Client {
258 /// Applies memoir-core's migrations in the configured Postgres schema.
259 ///
260 /// Idempotent — safe to call on every startup. Creates the schema if
261 /// missing and applies all pending migrations.
262 ///
263 /// # Errors
264 ///
265 /// Returns [`ClientError::Migration`] if the schema name is invalid or
266 /// if any migration fails to apply.
267 pub async fn migrate(&self) -> Result<(), ClientError> {
268 crate::migration::bootstrap_and_migrate(self.inner.store.db(), &self.inner.schema).await?;
269 Ok(())
270 }
271
272 /// Returns the Postgres schema this client writes its tables into.
273 pub fn schema(&self) -> &str {
274 &self.inner.schema
275 }
276
277 /// Returns the configured system-prompt section, if any.
278 pub fn system_prompt(&self) -> Option<&str> {
279 self.inner.system_prompt.as_deref()
280 }
281
282 /// Returns the Qdrant collection name configured for vector storage.
283 pub fn collection_name(&self) -> &str {
284 self.inner.index.collection_name()
285 }
286
287 /// Returns the embedding model used by this client.
288 pub fn embedder(&self) -> &OnnxEmbedding {
289 &self.inner.embedder
290 }
291
292 /// Returns the source-of-truth store used by this client.
293 pub fn store(&self) -> &PostgresStore {
294 &self.inner.store
295 }
296
297 /// Returns the vector index used by this client.
298 pub fn index(&self) -> &QdrantIndex {
299 &self.inner.index
300 }
301
302 /// Returns the registry of LLM providers configured on this client.
303 pub fn llms(&self) -> &LlmRegistry {
304 &self.inner.llms
305 }
306
307 /// Writes `prompt` as an episodic memory under `scope`.
308 ///
309 /// Returns a per-call builder; await it to persist the row and enqueue
310 /// its embed (and, if extraction is configured, extract) job. The
311 /// returned [`Memory`] reflects the source-of-truth row; its vector
312 /// index entry is `pending` until the worker drains the embed job.
313 /// Use [`Client::search`] to retrieve memories — `remember` is
314 /// write-only.
315 ///
316 /// Attach optional JSON metadata via [`RememberBuilder::metadata`];
317 /// without it the column defaults to `{}`. Attach a parsed event-time
318 /// via [`RememberBuilder::event_at`] when the content references a
319 /// specific moment (e.g. "the deployment happened Friday"); without it,
320 /// the memory has no event-time and is excluded from event-time range
321 /// filters at search time.
322 ///
323 /// # Examples
324 ///
325 /// ```no_run
326 /// # use memoir_core::client::Client;
327 /// # use memoir_core::memory::Scope;
328 /// # async fn example(client: &Client, scope: Scope) -> Result<(), Box<dyn std::error::Error>> {
329 /// let written = client
330 /// .remember("hello", scope)
331 /// .metadata(serde_json::json!({ "source": "chat" }))
332 /// .await?;
333 /// println!("wrote pid={}", written.pid);
334 /// # Ok(())
335 /// # }
336 /// ```
337 ///
338 /// # Errors
339 ///
340 /// Returns [`ClientError::Store`] wrapping a database failure when the
341 /// row cannot be inserted, and [`ClientError::Jobs`] when the embed or
342 /// extract job cannot be enqueued.
343 pub fn remember(&self, prompt: impl Into<String>, scope: crate::memory::Scope) -> RememberBuilder<'_> {
344 RememberBuilder::new(self, prompt.into(), scope)
345 }
346
347 /// Mutates an existing memory in place — a correction, not a supersession.
348 ///
349 /// Returns a per-call builder; await it to apply the patch. Use this when
350 /// the original row was *wrong* and the caller is overwriting it (typo,
351 /// misheard utterance, wrong parsed date). When the original was *true at
352 /// the time* but new information now obsoletes it, use the contradiction
353 /// path that calls `MemoryStore::supersede` instead — `edit` discards the
354 /// old text, supersede preserves it.
355 ///
356 /// `created_at` is preserved; `updated_at` bumps automatically via the
357 /// database trigger. Content changes flip the row's vector-index state
358 /// back to `pending` and enqueue a re-embed job, so the row drops out of
359 /// search hits until the worker drains the queue — same lifecycle as a
360 /// fresh `remember()`. See [`EditBuilder`] for the builder methods.
361 ///
362 /// # Errors
363 ///
364 /// Returns [`ClientError::Store`] wrapping
365 /// [`crate::store::StoreError::NotFound`] when no memory matches `pid`,
366 /// [`crate::store::StoreError::UnsupportedEdit`] when the target row's
367 /// kind does not support in-place edits (today: every non-Episodic
368 /// kind), [`ClientError::ReservedMetadataKey`] when metadata contains a
369 /// reserved payload key, and [`ClientError::Jobs`] when the re-embed
370 /// job cannot be enqueued.
371 pub fn edit(&self, pid: impl Into<String>) -> EditBuilder<'_> {
372 EditBuilder::new(self, pid.into())
373 }
374
375 /// Searches indexed memories in `scope` by vector similarity to `query`.
376 ///
377 /// Returns a per-call builder; await it to embed the query, run the
378 /// vector search, and assemble the matching [`Memory`] rows. The kind
379 /// toggles on the builder filter retrieval. See [`SearchBuilder`] for
380 /// builder methods.
381 ///
382 /// Only memories whose vector index entry has reached `indexed` are
383 /// eligible. Rows still in `pending` (recently written via
384 /// [`Client::remember`], not yet drained by the worker) are filtered
385 /// out — they can still be inspected by pid via [`Client::recall`].
386 ///
387 /// # Examples
388 ///
389 /// ```no_run
390 /// # use memoir_core::client::Client;
391 /// # use memoir_core::memory::Scope;
392 /// # async fn example(client: &Client, scope: Scope) -> Result<(), Box<dyn std::error::Error>> {
393 /// let memories = client.search("hello", scope).limit(5).await?;
394 /// for m in memories.list() {
395 /// println!("{}", m.content);
396 /// }
397 /// # Ok(())
398 /// # }
399 /// ```
400 ///
401 /// # Errors
402 ///
403 /// Returns [`ClientError::Embedding`] if the query cannot be embedded,
404 /// [`ClientError::Vector`] if the vector index search fails, and
405 /// [`ClientError::Store`] wrapping a database failure when the matched
406 /// pids cannot be hydrated to full rows.
407 pub fn search(&self, query: impl Into<String>, scope: crate::memory::Scope) -> SearchBuilder<'_> {
408 SearchBuilder::new(self, query.into(), scope)
409 }
410
411 /// Returns memories in `scope` ordered chronologically — the event log.
412 ///
413 /// Postgres-only read; no embedding, no Qdrant. Includes superseded rows
414 /// by default (this is the audit view). Default order is newest-first,
415 /// default limit is [`crate::store::DEFAULT_TIMELINE_LIMIT`]. See
416 /// [`TimelineBuilder`] for the builder methods.
417 ///
418 /// # Errors
419 ///
420 /// Returns [`ClientError::Store`] wrapping
421 /// [`crate::store::StoreError::InvalidScope`] when a `Scope` target has
422 /// empty fields, or wrapping
423 /// [`crate::store::StoreError::Database`] for database failures.
424 pub fn timeline(&self, scope: crate::memory::Scope) -> TimelineBuilder<'_> {
425 TimelineBuilder::new(self, scope)
426 }
427
428 /// Retrieves memories in `scope` ranked by hybrid cosine-and-recency, as
429 /// a prompt-shaped [`MemoryContext`].
430 ///
431 /// Mirrors [`Client::search`]'s candidate-retrieval primitives but
432 /// re-ranks the top-K candidates by combining vector similarity with
433 /// recency before returning. Default strategy is
434 /// [`RankingStrategy::default_hybrid`]; override via
435 /// [`QueryBuilder::ranking`]. **The default strategy's parameter values
436 /// are explicitly allowed to drift pre-1.0** — callers depending on a
437 /// specific ranking must pass an explicit `RankingStrategy::Hybrid {
438 /// .. }`.
439 ///
440 /// Returns a [`MemoryContext`] suitable for dropping into a system
441 /// prompt via [`Display`]. See [`MemoryContext`] for the rendering
442 /// shape and the staleness caveat for cached output.
443 ///
444 /// # Errors
445 ///
446 /// Returns [`ClientError::Embedding`] if the query cannot be embedded,
447 /// [`ClientError::Vector`] if the vector index search fails, and
448 /// [`ClientError::Store`] wrapping a database failure when the matched
449 /// pids cannot be hydrated to full rows.
450 ///
451 /// [`Display`]: std::fmt::Display
452 pub fn query(&self, query: impl Into<String>, scope: crate::memory::Scope) -> QueryBuilder<'_> {
453 QueryBuilder::new(self, query.into(), scope)
454 }
455
456 /// Returns memoir's state of knowledge in `scope` as of `as_of`.
457 ///
458 /// A memory is included when it was written on or before `as_of` and was
459 /// not yet superseded then. Pure Postgres read; no Qdrant, no embedder.
460 /// Newest-first by `created_at`, default limit
461 /// [`crate::store::DEFAULT_TIMELINE_LIMIT`]. See [`RecallAsOfBuilder`]
462 /// for the builder methods.
463 ///
464 /// # Errors
465 ///
466 /// Returns [`ClientError::Store`] wrapping
467 /// [`crate::store::StoreError::InvalidScope`] when a `Scope` target has
468 /// empty fields, or wrapping
469 /// [`crate::store::StoreError::Database`] for database failures.
470 pub fn recall_as_of(
471 &self,
472 scope: crate::memory::Scope,
473 as_of: impl Into<chrono::DateTime<chrono::FixedOffset>>,
474 ) -> RecallAsOfBuilder<'_> {
475 RecallAsOfBuilder::new(self, scope, as_of.into())
476 }
477
478 /// Looks up a single memory by its public id, at any lifecycle state.
479 ///
480 /// Returns the memory regardless of whether its vector index entry is
481 /// `pending`, `indexed`, or `failed` — callers using this for direct
482 /// lookups see the source-of-truth row from Postgres.
483 ///
484 /// No scope check is performed: any caller holding a `pid` can retrieve
485 /// the corresponding memory. The library treats its caller as the trust
486 /// boundary; service-mode callers (epic 0007) gate access via their own
487 /// auth layer.
488 ///
489 /// # Examples
490 ///
491 /// ```no_run
492 /// # use memoir_core::client::Client;
493 /// # async fn example(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
494 /// let memory = client.recall("AbCdEfGhIjKlMnOpQrStU").await?;
495 /// println!("{}", memory.content);
496 /// # Ok(())
497 /// # }
498 /// ```
499 ///
500 /// # Errors
501 ///
502 /// Returns [`ClientError::Store`] wrapping [`crate::store::StoreError::NotFound`]
503 /// when no memory matches `pid`, and [`ClientError::Store`] wrapping
504 /// [`crate::store::StoreError::Database`] for database failures.
505 pub async fn recall(&self, pid: &str) -> Result<Memory, ClientError> {
506 Ok(self.inner.store.recall(pid).await?)
507 }
508
509 /// Deletes one memory by pid, or every memory matching a scope tuple.
510 ///
511 /// The Postgres delete is authoritative — returned pids reflect what was
512 /// actually removed from the source of truth. The Qdrant delete is
513 /// best-effort: on failure the source-of-truth row is already gone and
514 /// the orphaned vector becomes the reconciliation sweep's problem
515 /// (ticket 0012). Failure does not propagate; it emits
516 /// `memoir.forget.index_delete_failed` at WARN.
517 ///
518 /// Returns the pids that were deleted. An empty vector means the target
519 /// matched no rows — not an error.
520 ///
521 /// # Examples
522 ///
523 /// ```no_run
524 /// # use memoir_core::client::Client;
525 /// # use memoir_core::memory::{ForgetTarget, Scope};
526 /// # async fn example(client: &Client, scope: Scope) -> Result<(), Box<dyn std::error::Error>> {
527 /// let deleted = client.forget(ForgetTarget::Pid("abc123".to_string())).await?;
528 /// let cleared = client.forget(ForgetTarget::Scope(scope)).await?;
529 /// # let _ = (deleted, cleared);
530 /// # Ok(())
531 /// # }
532 /// ```
533 ///
534 /// # Errors
535 ///
536 /// Returns [`ClientError::Store`] wrapping
537 /// [`crate::store::StoreError::InvalidScope`] when a `Scope` target has
538 /// any empty field, and [`ClientError::Store`] wrapping
539 /// [`crate::store::StoreError::Database`] for database failures.
540 pub async fn forget(&self, target: ForgetTarget) -> Result<Vec<String>, ClientError> {
541 // A scope target wipes the whole scoped subgraph below, independent of
542 // the returned pid list; keep the scope before `target` is moved.
543 #[cfg(feature = "knowledge-graph")]
544 let forget_scope = match &target {
545 ForgetTarget::Scope(scope) => Some(scope.clone()),
546 ForgetTarget::Pid(_) => None,
547 };
548
549 let deleted = self.inner.store.forget(target).await?;
550
551 if deleted.is_empty() {
552 return Ok(deleted);
553 }
554
555 let pid_refs: Vec<&str> = deleted.iter().map(String::as_str).collect();
556 if let Err(err) = self.inner.index.delete_by_pids(&pid_refs).await {
557 tracing::event!(
558 name: "memoir.forget.index_delete_failed",
559 tracing::Level::WARN,
560 pid_count = deleted.len(),
561 error.message = %err,
562 "vector delete failed for {{pid_count}} pid(s): {{error.message}} — reconciliation will clean up orphans",
563 );
564 } else {
565 tracing::event!(
566 name: "memoir.forget.success",
567 tracing::Level::INFO,
568 pid_count = deleted.len(),
569 "{{pid_count}} memories forgotten",
570 );
571 }
572
573 // Remove the forgotten memories from the graph, best-effort: a failure
574 // is logged, not surfaced (the source-of-truth rows are already gone and
575 // reconciliation is the backstop). A scope target wipes the subgraph; a
576 // pid target reference-counts each forgotten pid out of the graph.
577 #[cfg(feature = "knowledge-graph")]
578 if let Some(graph) = self.inner.graph.as_deref() {
579 let graph_result = match &forget_scope {
580 Some(scope) => graph.forget_scope(scope).await,
581 None => graph.forget_pids(&pid_refs).await,
582 };
583 if let Err(err) = graph_result {
584 tracing::event!(
585 name: "memoir.forget.graph_delete_failed",
586 tracing::Level::WARN,
587 pid_count = deleted.len(),
588 error.message = %err,
589 "graph delete failed for {{pid_count}} pid(s): {{error.message}} — reconciliation will clean up orphans",
590 );
591 }
592 }
593
594 Ok(deleted)
595 }
596
597 /// Rejects a memory: a wrong extraction the user corrected (epic 0011).
598 ///
599 /// Marks the row `retirement_reason = 'rejected'` and evicts its vector,
600 /// so it disappears from every read and can no longer pollute search or
601 /// reprocessing. The row is kept (not deleted) — it is the reprocess
602 /// "don't re-derive this" guard and counts toward the extraction-accuracy
603 /// metric. Rejection is the extraction-error case; for a source that
604 /// merely changed, use [`Self::mark_stale`].
605 ///
606 /// # Errors
607 ///
608 /// Returns [`ClientError::Store`] wrapping
609 /// [`crate::store::StoreError::NotFound`] when no memory matches `pid`,
610 /// or [`crate::store::StoreError::Database`] for database failures. A
611 /// vector-eviction failure is logged at WARN (reconciliation cleans the
612 /// orphan) and does not fail the call once the row is marked.
613 pub async fn reject(&self, pid: &str) -> Result<(), ClientError> {
614 self.retire(pid, crate::memory::RetirementReason::Rejected).await
615 }
616
617 /// Marks a memory stale: its episodic source changed (epic 0011).
618 ///
619 /// Marks the row `retirement_reason = 'stale'` and evicts its vector. Like
620 /// [`Self::reject`] the row is hidden everywhere and kept, but stale is
621 /// NOT an extraction error (the model was right; the source moved), so it
622 /// does not count against the accuracy metric.
623 ///
624 /// # Errors
625 ///
626 /// See [`Self::reject`].
627 pub async fn mark_stale(&self, pid: &str) -> Result<(), ClientError> {
628 self.retire(pid, crate::memory::RetirementReason::Stale).await
629 }
630
631 /// Corrects a wrong extraction by teaching, not editing (epic 0011).
632 ///
633 /// `pid` is the wrong *semantic* memory the user saw in recall. Awaiting
634 /// the returned builder enqueues a reprocess of that fact's episodic
635 /// source: the derived rows are retired as `rejected` and re-derived with
636 /// the correction in context, so a corrected fact replaces the wrong one.
637 /// The user never hand-writes a semantic row — semantic memory stays
638 /// always-derived. Fire-and-forget: returns once the job is enqueued.
639 ///
640 /// To correct the episodic record itself, use [`Self::edit`]; that is a
641 /// different correction (the source changed, not a wrong extraction). See
642 /// [`FeedbackBuilder`] for the builder methods.
643 ///
644 /// # Errors
645 ///
646 /// Returns [`ClientError::Store`] wrapping
647 /// [`crate::store::StoreError::NotFound`] when no memory matches `pid`,
648 /// [`ClientError::NotCorrectable`] when the target is not a semantic row
649 /// or has no episodic source, and [`ClientError::Jobs`] when the reprocess
650 /// job cannot be enqueued.
651 pub fn feedback(&self, pid: impl Into<String>) -> FeedbackBuilder<'_> {
652 FeedbackBuilder::new(self, pid.into())
653 }
654
655 /// Retires `pid` with `reason`: marks the column, then evicts the vector.
656 ///
657 /// Shared by [`Self::reject`] and [`Self::mark_stale`]. Mirrors
658 /// [`Self::forget`]'s store-then-index ordering and its
659 /// WARN-on-evict-failure resilience: the Postgres mark is the source of
660 /// truth, and a transient Qdrant failure leaves a searchable orphan that
661 /// reconciliation removes — it does not roll back the retirement.
662 async fn retire(&self, pid: &str, reason: crate::memory::RetirementReason) -> Result<(), ClientError> {
663 self.inner.retire_and_evict(pid, reason).await
664 }
665
666 /// Runs reconciliation: retries `failed` rows and cleans Qdrant orphans.
667 ///
668 /// Returns a per-call builder. Awaiting it runs the configured passes
669 /// and returns a [`ReconcileSummary`]. Both passes run by default;
670 /// narrow with [`ReconcileBuilder::only_retry_failed`] /
671 /// [`ReconcileBuilder::only_clean_orphans`]. Idempotent: running against
672 /// a clean store does nothing and exits cleanly.
673 ///
674 /// # Examples
675 ///
676 /// ```no_run
677 /// # use memoir_core::client::Client;
678 /// # async fn example(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
679 /// let summary = client.reconcile().await?;
680 /// let _ = summary;
681 /// # Ok(())
682 /// # }
683 /// ```
684 pub fn reconcile(&self) -> ReconcileBuilder<'_> {
685 ReconcileBuilder::new(self)
686 }
687
688 /// Configures the background queue worker; call `.start().await` to launch.
689 ///
690 /// Returns a per-call builder. The worker polls the `memory_jobs` queue,
691 /// dispatches each row to its stage handler (embed in ticket 0007,
692 /// extract in ticket 0006), and runs lease recovery when the queue is
693 /// idle. Cooperative shutdown via [`WorkerHandle::shutdown`].
694 ///
695 /// # Examples
696 ///
697 /// ```no_run
698 /// # use memoir_core::client::Client;
699 /// # async fn example(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
700 /// let worker = client.spawn_worker().start().await?;
701 /// // ... server runs ...
702 /// worker.shutdown().await;
703 /// # Ok(())
704 /// # }
705 /// ```
706 pub fn spawn_worker(&self) -> WorkerBuilder<'_> {
707 WorkerBuilder::new(self)
708 }
709
710 /// Lists failed jobs newest-first, capped at `limit`.
711 ///
712 /// Returns metadata only (id, kind, source pid, attempts, failure
713 /// reason, last update); content from the referenced memory is NOT
714 /// included. Operators who need to inspect the memory's content can
715 /// follow up with [`Self::recall`] against the source pid.
716 ///
717 /// # Errors
718 ///
719 /// Returns [`ClientError::Jobs`] wrapping any database failure.
720 pub async fn failed_jobs(&self, limit: usize) -> Result<Vec<crate::jobs::FailedJob>, ClientError> {
721 Ok(self.inner.jobs.list_failed(limit).await?)
722 }
723
724 /// Retries one failed job, clearing the attempt counter.
725 ///
726 /// The attempt counter is reset to zero on operator-initiated retry: a
727 /// human has decided prior failures shouldn't count against the new
728 /// attempt budget. Reconciliation-driven retries leave the counter
729 /// alone (see [`Self::reconcile`]).
730 ///
731 /// # Errors
732 ///
733 /// Returns [`ClientError::Jobs`] wrapping [`crate::jobs::JobsError::NotFound`]
734 /// when no failed job matches `id`, or wrapping a database failure.
735 pub async fn retry_job(&self, id: i64) -> Result<(), ClientError> {
736 self.inner.jobs.retry_job(id).await?;
737 tracing::event!(
738 name: "memoir.admin.retry_succeeded",
739 tracing::Level::INFO,
740 job_id = id,
741 "retried failed job {{job_id}}",
742 );
743 Ok(())
744 }
745
746 /// Configures a bulk retry. Awaiting the returned builder runs it.
747 ///
748 /// See [`RetryBuilder`] for filter and dry-run options. Returns the
749 /// number of affected (or for `dry_run`, would-affect) rows.
750 pub fn retry_failed_jobs(&self) -> RetryBuilder<'_> {
751 RetryBuilder::new(self)
752 }
753
754 /// Permanently deletes one failed job. The referenced memory is untouched.
755 ///
756 /// # Errors
757 ///
758 /// Returns [`ClientError::Jobs`] wrapping [`crate::jobs::JobsError::NotFound`]
759 /// when no failed job matches `id`, or wrapping a database failure.
760 pub async fn delete_failed_job(&self, id: i64) -> Result<(), ClientError> {
761 self.inner.jobs.delete_failed(id).await?;
762 tracing::event!(
763 name: "memoir.admin.delete_failed",
764 tracing::Level::INFO,
765 job_id = id,
766 "deleted failed job {{job_id}}",
767 );
768 Ok(())
769 }
770
771 /// Returns the number of jobs currently in `pending` state.
772 ///
773 /// Cheap observation for operators monitoring queue depth.
774 ///
775 /// # Errors
776 ///
777 /// Returns [`ClientError::Jobs`] wrapping any database failure.
778 pub async fn pending_jobs_count(&self) -> Result<u64, ClientError> {
779 Ok(self.inner.jobs.pending_count().await?)
780 }
781
782 /// Clears the supersession marker on `pid`, restoring it to active state.
783 ///
784 /// Admin-only counterpart to the internal supersede path. Use when an
785 /// operator decides a future contradiction-detection pass wrongly
786 /// marked a row as outdated. Idempotent at the SQL level for rows that
787 /// were already active, but still errors if no row matches `pid`.
788 ///
789 /// There is no symmetric public `Client::supersede`: supersession is a
790 /// decision made by the (forthcoming) detection engine against verified
791 /// contradicting facts, not by operator hand. Hand-rolled supersession
792 /// would defeat the audit trail the column is meant to preserve.
793 ///
794 /// # Errors
795 ///
796 /// Returns [`ClientError::Store`] wrapping
797 /// [`crate::store::StoreError::NotFound`] when no memory matches `pid`,
798 /// or wrapping a database failure.
799 pub async fn unsupersede(&self, pid: &str) -> Result<(), ClientError> {
800 self.inner.store.unsupersede(pid).await?;
801 tracing::event!(
802 name: "memoir.admin.unsuperseded",
803 tracing::Level::INFO,
804 pid = pid,
805 "unsuperseded memory {{pid}}",
806 );
807 Ok(())
808 }
809
810 /// Returns the full supersede/unsupersede event trail for `pid`.
811 ///
812 /// Each [`SupersessionEvent`] is one decision against the memory,
813 /// chronological (oldest first). An event with `winner_pid = None` is
814 /// an unsupersede. A pid with no events — never superseded, or simply
815 /// not present in the store — returns an empty vec, not an error.
816 ///
817 /// Surfaces the audit trail behind a row's current `Memory.supersession`
818 /// marker, for the supersession-audit UI and rig-service introspection
819 /// of contradiction-detection decisions over time.
820 ///
821 /// # Errors
822 ///
823 /// Returns [`ClientError::Store`] wrapping a database failure.
824 pub async fn supersession_history(&self, pid: &str) -> Result<Vec<SupersessionEvent>, ClientError> {
825 Ok(self.inner.store.supersession_history(pid).await?)
826 }
827
828 /// Lists the distinct agent ids with memories under `org_id` + `user_id`.
829 ///
830 /// Caller-scoped agent discovery: returns only the agents within the given
831 /// org and user, sorted ascending, so a tenant never sees another tenant's
832 /// agents. A scope with no memories yet returns an empty vec, not an error.
833 ///
834 /// # Errors
835 ///
836 /// Returns [`ClientError::Store`] wrapping a database failure.
837 pub async fn list_agents(&self, org_id: &str, user_id: &str) -> Result<Vec<String>, ClientError> {
838 Ok(self.inner.store.list_agent_ids(org_id, user_id).await?)
839 }
840
841 /// Computes extraction accuracy per `(provider, model)` over a scope slice.
842 ///
843 /// Returns an [`ExtractionStatsBuilder`]; its scope setters narrow the slice
844 /// before awaiting. A read-only aggregate proving extraction quality to a
845 /// consumer — `accuracy = 1 − rejected/total`, where `rejected` counts only
846 /// wrong extractions the user corrected (not `Stale` or superseded rows).
847 /// No LLM call. See [`ExtractionStatsBuilder`] for the builder methods.
848 pub fn extraction_stats(&self) -> ExtractionStatsBuilder<'_> {
849 ExtractionStatsBuilder::new(self)
850 }
851
852 /// Reads a whole-scope snapshot of the knowledge graph for admin inspection.
853 ///
854 /// Returns a [`GraphInspectionBuilder`]; its scope setters narrow the view
855 /// before awaiting, and `.limit(..)` caps the result. The admin "Knowledge
856 /// graph view" — every entity and relationship in scope, current and
857 /// superseded, with per-element provenance (`memory_pids`, timestamps). A
858 /// read-only graph traversal, no LLM call.
859 ///
860 /// Unlike the other scoped reads, an unset dimension widens *across* scopes:
861 /// the default (no setters) inspects every tenant. This is a privileged,
862 /// cross-scope operation — service-mode consumers gate it behind admin auth.
863 /// An unconfigured graph yields an empty snapshot, not an error.
864 #[cfg(feature = "knowledge-graph")]
865 pub fn inspect_graph(&self) -> GraphInspectionBuilder<'_> {
866 GraphInspectionBuilder::new(self)
867 }
868}
869
870#[cfg(test)]
871mod tests {
872 use super::*;
873 use crate::llm::LlmKind;
874
875 #[test]
876 fn should_install_extraction_llm_into_empty_registry() {
877 let mut registry = LlmRegistry::new();
878 registry
879 .install(
880 LlmRole::Extraction,
881 LlmConfig::ollama("http://localhost:11434", "llama3.2"),
882 )
883 .unwrap();
884
885 let provider = registry.get(LlmRole::Extraction).expect("extraction provider present");
886 assert_eq!(provider.kind(), LlmKind::Ollama);
887 assert_eq!(provider.model(), "llama3.2");
888 }
889
890 #[test]
891 fn should_install_both_extraction_and_contradiction_llms_independently() {
892 let mut registry = LlmRegistry::new();
893 registry
894 .install(
895 LlmRole::Extraction,
896 LlmConfig::ollama("http://localhost:11434", "extraction-model"),
897 )
898 .unwrap();
899 registry
900 .install(
901 LlmRole::Contradiction,
902 LlmConfig::ollama("http://localhost:11434", "contradiction-model"),
903 )
904 .unwrap();
905
906 assert_eq!(registry.get(LlmRole::Extraction).unwrap().model(), "extraction-model");
907 assert_eq!(
908 registry.get(LlmRole::Contradiction).unwrap().model(),
909 "contradiction-model"
910 );
911 }
912
913 #[test]
914 fn should_leave_registry_empty_when_no_llms_installed() {
915 let registry = LlmRegistry::new();
916 assert!(registry.is_empty());
917 assert!(registry.get(LlmRole::Extraction).is_none());
918 assert!(registry.get(LlmRole::Contradiction).is_none());
919 }
920}