Skip to main content

memoir_core/client/
admin.rs

1//! Admin surface for inspecting and triaging the write-behind queue.
2//!
3//! These methods exist on [`super::Client`] (top-level, not behind a
4//! sub-handle) because v0.1 ships only a small number. If the admin surface
5//! grows past ~5 methods, group them under a `Client::admin()` sub-handle
6//! to keep the primary API uncluttered.
7//!
8//! ## Trust boundary
9//!
10//! memoir-core treats the caller as the trust boundary. These methods
11//! perform privileged operations (mass retry, deletion) with no caller
12//! identity check. Service-mode consumers (epic 0007) gate access via
13//! their own auth layer before reaching these methods.
14
15use std::future::{Future, IntoFuture};
16use std::pin::Pin;
17
18use tracing::{Level, event};
19
20#[cfg(feature = "knowledge-graph")]
21use crate::graph::{GraphSnapshot, GraphStore};
22use crate::jobs::{JobKind, MemoryJobsStore};
23use crate::memory::{ExtractionStat, StatsFilter};
24use crate::store::MemoryStore;
25
26use super::{Client, ClientError};
27
28/// Per-call builder returned by [`Client::retry_failed_jobs`].
29///
30/// Awaiting the builder runs the bulk-retry operation against the configured
31/// filters. Returns the number of jobs that were affected (or, with
32/// `.dry_run(true)`, would have been affected).
33///
34/// # Examples
35///
36/// ```no_run
37/// # use memoir_core::client::Client;
38/// # use memoir_core::jobs::JobKind;
39/// # async fn example(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
40/// // Retry every failed extract job.
41/// let n = client.retry_failed_jobs().of_kind(JobKind::Extract).await?;
42/// println!("retried {n} extract jobs");
43///
44/// // Dry-run: preview the count without enqueueing anything.
45/// let n = client.retry_failed_jobs().dry_run().await?;
46/// println!("would retry {n} failed jobs");
47/// # Ok(())
48/// # }
49/// ```
50#[must_use = "retry_failed_jobs() returns a builder that must be awaited"]
51pub struct RetryBuilder<'a> {
52    client: &'a Client,
53    kind: Option<JobKind>,
54    dry_run: bool,
55}
56
57impl<'a> RetryBuilder<'a> {
58    pub(super) fn new(client: &'a Client) -> Self {
59        Self {
60            client,
61            kind: None,
62            dry_run: false,
63        }
64    }
65
66    /// Restricts the bulk retry to one job kind. Default: all kinds.
67    pub fn of_kind(mut self, kind: JobKind) -> Self {
68        self.kind = Some(kind);
69        self
70    }
71
72    /// Returns the affected count without modifying any rows.
73    ///
74    /// Useful for previewing how large a bulk retry will be before firing
75    /// it — a wide retry against many failed extract jobs can DoS the LLM.
76    pub fn dry_run(mut self) -> Self {
77        self.dry_run = true;
78        self
79    }
80}
81
82impl<'a> IntoFuture for RetryBuilder<'a> {
83    type Output = Result<u64, ClientError>;
84    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
85
86    fn into_future(self) -> Self::IntoFuture {
87        Box::pin(async move {
88            let Self { client, kind, dry_run } = self;
89
90            let affected = client.inner.jobs.bulk_retry(kind, dry_run).await?;
91
92            event!(
93                name: "memoir.admin.bulk_retry",
94                Level::INFO,
95                affected = affected,
96                dry_run = dry_run,
97                kind = kind.as_ref().map(|k| k.as_ref()).unwrap_or("any"),
98                "bulk retry affected={{affected}} dry_run={{dry_run}} kind={{kind}}",
99            );
100
101            Ok(affected)
102        })
103    }
104}
105
106/// Per-call builder returned by [`Client::extraction_stats`].
107///
108/// Awaiting the builder computes extraction accuracy per `(provider, model)`
109/// over the requested scope slice — a read-only Postgres aggregate, no LLM
110/// call. Each [`ExtractionStat`] row carries the total semantic rows produced
111/// and the subset retired as `rejected` (a wrong extraction the user
112/// corrected); [`ExtractionStat::accuracy`] is the derived ratio.
113///
114/// The scope setters narrow the slice and are AND-combined; an unset dimension
115/// imposes no constraint, so the default (no setters) aggregates the whole
116/// store. Use `.org(..)` for a per-tenant number, add `.agent(..)` / `.user(..)`
117/// to narrow further.
118///
119/// # Examples
120///
121/// ```no_run
122/// # use memoir_core::client::Client;
123/// # async fn example(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
124/// // Per-model accuracy for one org.
125/// for stat in client.extraction_stats().org("acme").await? {
126///     println!("{}/{}: {:.1}% over {} rows", stat.provider, stat.model, stat.accuracy() * 100.0, stat.total);
127/// }
128/// # Ok(())
129/// # }
130/// ```
131#[must_use = "extraction_stats() returns a builder that must be awaited"]
132pub struct ExtractionStatsBuilder<'a> {
133    client: &'a Client,
134    filter: StatsFilter,
135}
136
137impl<'a> ExtractionStatsBuilder<'a> {
138    pub(super) fn new(client: &'a Client) -> Self {
139        Self {
140            client,
141            filter: StatsFilter::default(),
142        }
143    }
144
145    /// Narrows the slice to one agent id. Default: all agents.
146    pub fn agent(mut self, agent_id: impl Into<String>) -> Self {
147        self.filter.agent_id = Some(agent_id.into());
148        self
149    }
150
151    /// Narrows the slice to one org id. Default: all orgs.
152    pub fn org(mut self, org_id: impl Into<String>) -> Self {
153        self.filter.org_id = Some(org_id.into());
154        self
155    }
156
157    /// Narrows the slice to one user id. Default: all users.
158    pub fn user(mut self, user_id: impl Into<String>) -> Self {
159        self.filter.user_id = Some(user_id.into());
160        self
161    }
162}
163
164impl<'a> IntoFuture for ExtractionStatsBuilder<'a> {
165    type Output = Result<Vec<ExtractionStat>, ClientError>;
166    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
167
168    fn into_future(self) -> Self::IntoFuture {
169        Box::pin(async move {
170            let Self { client, filter } = self;
171            Ok(client.inner.store.extraction_stats(filter).await?)
172        })
173    }
174}
175
176/// Per-call builder returned by [`Client::inspect_graph`].
177///
178/// Awaiting the builder reads a whole-scope snapshot of the knowledge graph —
179/// the admin "Knowledge graph view": every entity and relationship, current and
180/// superseded, for an operator to inspect or render. A read-only FalkorDB
181/// traversal, no LLM call.
182///
183/// The scope setters narrow the view and are AND-combined; an unset dimension
184/// imposes no constraint, so the default (no setters) inspects across every
185/// agent, user, and org. This is the one cross-scope read in memoir — a privileged
186/// operation gated by the consumer's auth layer (memoir-service requires admin);
187/// the write, forget, and enrichment paths keep full-scope-tuple isolation.
188///
189/// `.limit(..)` caps the nodes and the edges returned (default
190/// [`DEFAULT_INSPECTION_LIMIT`](crate::graph::DEFAULT_INSPECTION_LIMIT), clamped
191/// to [`MAX_INSPECTION_LIMIT`](crate::graph::MAX_INSPECTION_LIMIT)); the snapshot's
192/// `truncated` flag marks when a cap was hit.
193///
194/// # Examples
195///
196/// ```no_run
197/// # use memoir_core::client::Client;
198/// # async fn example(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
199/// let snapshot = client.inspect_graph().org("acme").limit(200).await?;
200/// for edge in &snapshot.edges {
201///     println!("{} -{}-> {} (valid_to: {:?})", edge.subject, edge.relation, edge.object, edge.valid_to);
202/// }
203/// # Ok(())
204/// # }
205/// ```
206#[cfg(feature = "knowledge-graph")]
207#[must_use = "inspect_graph() returns a builder that must be awaited"]
208pub struct GraphInspectionBuilder<'a> {
209    client: &'a Client,
210    agent_id: Option<String>,
211    org_id: Option<String>,
212    user_id: Option<String>,
213    limit: usize,
214}
215
216#[cfg(feature = "knowledge-graph")]
217impl<'a> GraphInspectionBuilder<'a> {
218    pub(super) fn new(client: &'a Client) -> Self {
219        Self {
220            client,
221            agent_id: None,
222            org_id: None,
223            user_id: None,
224            limit: crate::graph::DEFAULT_INSPECTION_LIMIT,
225        }
226    }
227
228    /// Narrows the view to one agent id. Default: all agents.
229    pub fn agent(mut self, agent_id: impl Into<String>) -> Self {
230        self.agent_id = Some(agent_id.into());
231        self
232    }
233
234    /// Narrows the view to one org id. Default: all orgs.
235    pub fn org(mut self, org_id: impl Into<String>) -> Self {
236        self.org_id = Some(org_id.into());
237        self
238    }
239
240    /// Narrows the view to one user id. Default: all users.
241    pub fn user(mut self, user_id: impl Into<String>) -> Self {
242        self.user_id = Some(user_id.into());
243        self
244    }
245
246    /// Caps the nodes and edges returned. Clamped to the inspection bounds.
247    pub fn limit(mut self, limit: usize) -> Self {
248        self.limit = limit;
249        self
250    }
251}
252
253#[cfg(feature = "knowledge-graph")]
254impl<'a> IntoFuture for GraphInspectionBuilder<'a> {
255    type Output = Result<GraphSnapshot, ClientError>;
256    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
257
258    fn into_future(self) -> Self::IntoFuture {
259        Box::pin(async move {
260            let Self {
261                client,
262                agent_id,
263                org_id,
264                user_id,
265                limit,
266            } = self;
267
268            let Some(graph) = client.inner.graph.as_deref() else {
269                return Ok(GraphSnapshot::default());
270            };
271
272            let snapshot = graph
273                .inspect_scope(agent_id.as_deref(), org_id.as_deref(), user_id.as_deref(), limit)
274                .await?;
275
276            event!(
277                name: "memoir.admin.inspect_graph",
278                Level::INFO,
279                agent_id = agent_id.as_deref().unwrap_or("*"),
280                org_id = org_id.as_deref().unwrap_or("*"),
281                user_id = user_id.as_deref().unwrap_or("*"),
282                nodes = snapshot.nodes.len(),
283                edges = snapshot.edges.len(),
284                truncated = snapshot.truncated,
285                "inspected graph snapshot",
286            );
287
288            Ok(snapshot)
289        })
290    }
291}