use std::future::{Future, IntoFuture};
use std::pin::Pin;
use tracing::{Level, event};
#[cfg(feature = "knowledge-graph")]
use crate::graph::{GraphSnapshot, GraphStore};
use crate::jobs::{JobKind, MemoryJobsStore};
use crate::memory::{ExtractionStat, StatsFilter};
use crate::store::MemoryStore;
use super::{Client, ClientError};
#[must_use = "retry_failed_jobs() returns a builder that must be awaited"]
pub struct RetryBuilder<'a> {
client: &'a Client,
kind: Option<JobKind>,
dry_run: bool,
}
impl<'a> RetryBuilder<'a> {
pub(super) fn new(client: &'a Client) -> Self {
Self {
client,
kind: None,
dry_run: false,
}
}
pub fn of_kind(mut self, kind: JobKind) -> Self {
self.kind = Some(kind);
self
}
pub fn dry_run(mut self) -> Self {
self.dry_run = true;
self
}
}
impl<'a> IntoFuture for RetryBuilder<'a> {
type Output = Result<u64, ClientError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let Self { client, kind, dry_run } = self;
let affected = client.inner.jobs.bulk_retry(kind, dry_run).await?;
event!(
name: "memoir.admin.bulk_retry",
Level::INFO,
affected = affected,
dry_run = dry_run,
kind = kind.as_ref().map(|k| k.as_ref()).unwrap_or("any"),
"bulk retry affected={{affected}} dry_run={{dry_run}} kind={{kind}}",
);
Ok(affected)
})
}
}
#[must_use = "extraction_stats() returns a builder that must be awaited"]
pub struct ExtractionStatsBuilder<'a> {
client: &'a Client,
filter: StatsFilter,
}
impl<'a> ExtractionStatsBuilder<'a> {
pub(super) fn new(client: &'a Client) -> Self {
Self {
client,
filter: StatsFilter::default(),
}
}
pub fn agent(mut self, agent_id: impl Into<String>) -> Self {
self.filter.agent_id = Some(agent_id.into());
self
}
pub fn org(mut self, org_id: impl Into<String>) -> Self {
self.filter.org_id = Some(org_id.into());
self
}
pub fn user(mut self, user_id: impl Into<String>) -> Self {
self.filter.user_id = Some(user_id.into());
self
}
}
impl<'a> IntoFuture for ExtractionStatsBuilder<'a> {
type Output = Result<Vec<ExtractionStat>, ClientError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let Self { client, filter } = self;
Ok(client.inner.store.extraction_stats(filter).await?)
})
}
}
#[cfg(feature = "knowledge-graph")]
#[must_use = "inspect_graph() returns a builder that must be awaited"]
pub struct GraphInspectionBuilder<'a> {
client: &'a Client,
agent_id: Option<String>,
org_id: Option<String>,
user_id: Option<String>,
limit: usize,
}
#[cfg(feature = "knowledge-graph")]
impl<'a> GraphInspectionBuilder<'a> {
pub(super) fn new(client: &'a Client) -> Self {
Self {
client,
agent_id: None,
org_id: None,
user_id: None,
limit: crate::graph::DEFAULT_INSPECTION_LIMIT,
}
}
pub fn agent(mut self, agent_id: impl Into<String>) -> Self {
self.agent_id = Some(agent_id.into());
self
}
pub fn org(mut self, org_id: impl Into<String>) -> Self {
self.org_id = Some(org_id.into());
self
}
pub fn user(mut self, user_id: impl Into<String>) -> Self {
self.user_id = Some(user_id.into());
self
}
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
}
#[cfg(feature = "knowledge-graph")]
impl<'a> IntoFuture for GraphInspectionBuilder<'a> {
type Output = Result<GraphSnapshot, ClientError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let Self {
client,
agent_id,
org_id,
user_id,
limit,
} = self;
let Some(graph) = client.inner.graph.as_deref() else {
return Ok(GraphSnapshot::default());
};
let snapshot = graph
.inspect_scope(agent_id.as_deref(), org_id.as_deref(), user_id.as_deref(), limit)
.await?;
event!(
name: "memoir.admin.inspect_graph",
Level::INFO,
agent_id = agent_id.as_deref().unwrap_or("*"),
org_id = org_id.as_deref().unwrap_or("*"),
user_id = user_id.as_deref().unwrap_or("*"),
nodes = snapshot.nodes.len(),
edges = snapshot.edges.len(),
truncated = snapshot.truncated,
"inspected graph snapshot",
);
Ok(snapshot)
})
}
}