cognee_cognify/qualification.rs
1//! Python-parity `check_pipeline_run_qualification`.
2//!
3//! Reads the latest `pipeline_runs` row for `(dataset_id, pipeline_name)` and
4//! returns a verdict the caller acts on:
5//!
6//! - `Proceed` — no previous row, or the latest is `INITIATED` or `ERRORED`.
7//! - `AlreadyRunning(PipelineRun)` — latest is `STARTED`; caller should reject.
8//! - `AlreadyCompleted(PipelineRun)` — latest is `COMPLETED`; caller should
9//! short-circuit without re-running.
10//!
11//! Source of truth: [`check_pipeline_run_qualification.py`][py]. Locked
12//! decision 3 ships this gate at the `cognify` and `memify` entry points;
13//! ingestion is intentionally excluded.
14//!
15//! [py]: https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/layers/check_pipeline_run_qualification.py
16
17use cognee_database::{DatabaseError, PipelineRun, PipelineRunRepository, PipelineRunStatus};
18use uuid::Uuid;
19
20/// Verdict from a qualification check.
21#[derive(Debug, Clone)]
22pub enum Qualification {
23 /// No previous run, or the latest is `INITIATED` or `ERRORED` — proceed.
24 Proceed,
25 /// Latest row is `STARTED` — reject; caller should return an error.
26 AlreadyRunning(PipelineRun),
27 /// Latest row is `COMPLETED` — short-circuit; caller should not re-run.
28 AlreadyCompleted(PipelineRun),
29}
30
31/// Rust mirror of Python's `check_pipeline_run_qualification`.
32///
33/// Reads the most recent `pipeline_runs` row for `(dataset_id,
34/// pipeline_name)` via [`PipelineRunRepository::get_pipeline_run_by_dataset`]
35/// (added in task 08-06) and maps it to a [`Qualification`].
36///
37/// `INITIATED` and `ERRORED` map to `Proceed` to match Python's behaviour
38/// (see [Python source][py]); only `STARTED` rejects and only `COMPLETED`
39/// short-circuits.
40///
41/// [py]: https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/layers/check_pipeline_run_qualification.py
42pub async fn check_pipeline_run_qualification(
43 repo: &dyn PipelineRunRepository,
44 dataset_id: Uuid,
45 pipeline_name: &str,
46) -> Result<Qualification, DatabaseError> {
47 let latest = repo
48 .get_pipeline_run_by_dataset(dataset_id, pipeline_name)
49 .await?;
50 Ok(match latest {
51 None => Qualification::Proceed,
52 Some(run) => match run.status {
53 PipelineRunStatus::Initiated | PipelineRunStatus::Errored => Qualification::Proceed,
54 PipelineRunStatus::Started => Qualification::AlreadyRunning(run),
55 PipelineRunStatus::Completed => Qualification::AlreadyCompleted(run),
56 },
57 })
58}