Skip to main content

cognee_cognify/
qualification.rs

1//! Python-parity `check_pipeline_run_qualification`.
2//!
3//! Reads the latest `pipeline_runs` row for `(dataset_id, pipeline_name)` and
4//! returns a verdict the caller acts on:
5//!
6//! - `Proceed` — no previous row, or the latest is `INITIATED` or `ERRORED`.
7//! - `AlreadyRunning(PipelineRun)` — latest is `STARTED`; caller should reject.
8//! - `AlreadyCompleted(PipelineRun)` — latest is `COMPLETED`; caller should
9//!   short-circuit without re-running.
10//!
11//! Source of truth: [`check_pipeline_run_qualification.py`][py]. Locked
12//! decision 3 ships this gate at the `cognify` and `memify` entry points;
13//! ingestion is intentionally excluded.
14//!
15//! [py]: https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/layers/check_pipeline_run_qualification.py
16
17use cognee_database::{DatabaseError, PipelineRun, PipelineRunRepository, PipelineRunStatus};
18use uuid::Uuid;
19
20/// Verdict from a qualification check.
21#[derive(Debug, Clone)]
22pub enum Qualification {
23    /// No previous run, or the latest is `INITIATED` or `ERRORED` — proceed.
24    Proceed,
25    /// Latest row is `STARTED` — reject; caller should return an error.
26    AlreadyRunning(PipelineRun),
27    /// Latest row is `COMPLETED` — short-circuit; caller should not re-run.
28    AlreadyCompleted(PipelineRun),
29}
30
31/// Rust mirror of Python's `check_pipeline_run_qualification`.
32///
33/// Reads the most recent `pipeline_runs` row for `(dataset_id,
34/// pipeline_name)` via [`PipelineRunRepository::get_pipeline_run_by_dataset`]
35/// (added in task 08-06) and maps it to a [`Qualification`].
36///
37/// `INITIATED` and `ERRORED` map to `Proceed` to match Python's behaviour
38/// (see [Python source][py]); only `STARTED` rejects and only `COMPLETED`
39/// short-circuits.
40///
41/// [py]: https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/layers/check_pipeline_run_qualification.py
42pub async fn check_pipeline_run_qualification(
43    repo: &dyn PipelineRunRepository,
44    dataset_id: Uuid,
45    pipeline_name: &str,
46) -> Result<Qualification, DatabaseError> {
47    let latest = repo
48        .get_pipeline_run_by_dataset(dataset_id, pipeline_name)
49        .await?;
50    Ok(match latest {
51        None => Qualification::Proceed,
52        Some(run) => match run.status {
53            PipelineRunStatus::Initiated | PipelineRunStatus::Errored => Qualification::Proceed,
54            PipelineRunStatus::Started => Qualification::AlreadyRunning(run),
55            PipelineRunStatus::Completed => Qualification::AlreadyCompleted(run),
56        },
57    })
58}