Skip to main content

agentics_persistence/repositories/
evaluation_jobs.rs

1use sqlx::PgPool;
2
3use crate::db;
4use crate::repositories::{
5    EvaluationJobRecord, MarkEvaluationStartedInput, PersistedEvaluationResult,
6    QueueEvaluationJobInput,
7};
8use agentics_config::WorkerAccelerators;
9use agentics_domain::models::evaluation::ScoringMode;
10use agentics_domain::models::ids::EvaluationJobId;
11use agentics_error::Result;
12
13#[derive(Debug, Clone, Copy)]
14pub struct EvaluationJobsRepository<'a> {
15    pub(super) pool: &'a PgPool,
16}
17
18impl EvaluationJobsRepository<'_> {
19    pub async fn claim_next(
20        &self,
21        worker_id: &str,
22        accelerators: WorkerAccelerators,
23    ) -> Result<Option<EvaluationJobRecord>> {
24        db::evaluation_jobs::claim_next_evaluation_job(self.pool, worker_id, accelerators).await
25    }
26
27    pub async fn refresh_claim(
28        &self,
29        job_id: &EvaluationJobId,
30        worker_id: &str,
31        attempt_count: i32,
32    ) -> Result<bool> {
33        db::evaluation_jobs::refresh_evaluation_job_claim(
34            self.pool,
35            job_id,
36            worker_id,
37            attempt_count,
38        )
39        .await
40    }
41
42    pub async fn runner_claim(
43        &self,
44        job_id: &EvaluationJobId,
45        stale_minutes: i32,
46    ) -> Result<Option<crate::repositories::RunnerJobClaimRecord>> {
47        db::evaluation_jobs::get_runner_job_claim(self.pool, job_id, stale_minutes).await
48    }
49
50    pub async fn requeue_for_capacity(
51        &self,
52        job_id: &EvaluationJobId,
53        worker_id: &str,
54        attempt_count: i32,
55        last_error: &str,
56    ) -> Result<bool> {
57        db::evaluation_jobs::requeue_running_evaluation_job_for_capacity(
58            self.pool,
59            job_id,
60            worker_id,
61            attempt_count,
62            last_error,
63        )
64        .await
65    }
66
67    pub async fn mark_ready(&self, job_id: &EvaluationJobId) -> Result<()> {
68        db::evaluation_jobs::mark_evaluation_job_ready(self.pool, job_id).await
69    }
70
71    pub async fn queue(&self, input: &QueueEvaluationJobInput) -> Result<EvaluationJobRecord> {
72        db::evaluation_jobs::queue_evaluation_job(self.pool, input).await
73    }
74
75    pub async fn count_active(&self, eval_type: ScoringMode) -> Result<i64> {
76        db::evaluation_jobs::count_active_evaluation_jobs(self.pool, eval_type).await
77    }
78
79    pub async fn mark_started(&self, input: &MarkEvaluationStartedInput) -> Result<bool> {
80        db::evaluations::mark_evaluation_started(self.pool, input).await
81    }
82
83    pub async fn mark_finished(&self, input: &PersistedEvaluationResult) -> Result<bool> {
84        db::evaluations::mark_evaluation_finished(self.pool, input).await
85    }
86}