use std::future::{Future, IntoFuture};
use std::pin::Pin;
use tracing::{Level, event};
use crate::jobs::{JobKind, MemoryJobsStore};
use crate::memory::{ExtractionStat, StatsFilter};
use crate::store::MemoryStore;
use super::{Client, ClientError};
#[must_use = "retry_failed_jobs() returns a builder that must be awaited"]
pub struct RetryBuilder<'a> {
client: &'a Client,
kind: Option<JobKind>,
dry_run: bool,
}
impl<'a> RetryBuilder<'a> {
pub(super) fn new(client: &'a Client) -> Self {
Self {
client,
kind: None,
dry_run: false,
}
}
pub fn of_kind(mut self, kind: JobKind) -> Self {
self.kind = Some(kind);
self
}
pub fn dry_run(mut self) -> Self {
self.dry_run = true;
self
}
}
impl<'a> IntoFuture for RetryBuilder<'a> {
type Output = Result<u64, ClientError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(execute(self))
}
}
async fn execute(builder: RetryBuilder<'_>) -> Result<u64, ClientError> {
let RetryBuilder { client, kind, dry_run } = builder;
let affected = client
.inner
.jobs
.bulk_retry(kind, dry_run)
.await?;
event!(
name: "memoir.admin.bulk_retry",
Level::INFO,
affected = affected,
dry_run = dry_run,
kind = kind.as_ref().map(|k| k.as_ref()).unwrap_or("any"),
"bulk retry affected={{affected}} dry_run={{dry_run}} kind={{kind}}",
);
Ok(affected)
}
#[must_use = "extraction_stats() returns a builder that must be awaited"]
pub struct ExtractionStatsBuilder<'a> {
client: &'a Client,
filter: StatsFilter,
}
impl<'a> ExtractionStatsBuilder<'a> {
pub(super) fn new(client: &'a Client) -> Self {
Self {
client,
filter: StatsFilter::default(),
}
}
pub fn agent(mut self, agent_id: impl Into<String>) -> Self {
self.filter.agent_id = Some(agent_id.into());
self
}
pub fn org(mut self, org_id: impl Into<String>) -> Self {
self.filter.org_id = Some(org_id.into());
self
}
pub fn user(mut self, user_id: impl Into<String>) -> Self {
self.filter.user_id = Some(user_id.into());
self
}
}
impl<'a> IntoFuture for ExtractionStatsBuilder<'a> {
type Output = Result<Vec<ExtractionStat>, ClientError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(execute_stats(self))
}
}
async fn execute_stats(builder: ExtractionStatsBuilder<'_>) -> Result<Vec<ExtractionStat>, ClientError> {
let ExtractionStatsBuilder { client, filter } = builder;
Ok(client.inner.store.extraction_stats(filter).await?)
}