Skip to main content

cognee_database/pipelines/
repository.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use uuid::Uuid;
6
7use crate::types::{DatabaseError, PipelineRun, PipelineRunStatus};
8
9/// Domain alias used in the trait signature.
10pub type PipelineRunRow = PipelineRun;
11/// Type alias for the database error used in this module.
12type DbError = DatabaseError;
13
14/// Row returned by [`PipelineRunRepository::list_recent_with_attribution`].
15///
16/// Joins `pipeline_runs ⨝ datasets ⨝ users` so the activity router can show
17/// "who/what/which dataset" attribution in one query (no N+1).
18#[derive(Debug, Clone)]
19pub struct PipelineRunWithAttributionRow {
20    pub id: Uuid,
21    pub created_at: DateTime<Utc>,
22    pub status: PipelineRunStatus,
23    pub pipeline_run_id: Uuid,
24    pub pipeline_name: String,
25    pub pipeline_id: Uuid,
26    pub dataset_id: Option<Uuid>,
27    pub dataset_name: Option<String>,
28    pub owner_id: Option<Uuid>,
29    pub owner_email: Option<String>,
30}
31
32/// Persistence abstraction for pipeline run status rows.
33///
34/// Each status transition writes a **new row** rather than updating in place,
35/// giving a full audit trail and matching Python's writing pattern.
36///
37/// Implementations must be `Send + Sync` so they can be stored behind an
38/// `Arc<dyn PipelineRunRepository>` and shared across async tasks.
39#[async_trait]
40pub trait PipelineRunRepository: Send + Sync {
41    /// Insert one row representing a status transition. Returns the new row's
42    /// primary key (`pipeline_runs.id`), which is a freshly generated UUIDv4.
43    async fn log_pipeline_run(
44        &self,
45        pipeline_run_id: Uuid,
46        pipeline_id: Uuid,
47        pipeline_name: &str,
48        dataset_id: Option<Uuid>,
49        status: PipelineRunStatus,
50        run_info: Option<serde_json::Value>,
51    ) -> Result<Uuid, DbError>;
52
53    /// Latest status per dataset for a given pipeline name.
54    ///
55    /// Returns a map from `dataset_id` to the most recent
56    /// `PipelineRunStatus` row for that dataset and pipeline name.
57    async fn latest_status(
58        &self,
59        dataset_ids: &[Uuid],
60        pipeline_name: &str,
61    ) -> Result<HashMap<Uuid, PipelineRunStatus>, DbError>;
62
63    /// Recent runs for the activity router, with optional dataset filter.
64    async fn list_recent(
65        &self,
66        dataset_id: Option<Uuid>,
67        limit: u32,
68    ) -> Result<Vec<PipelineRunRow>, DbError>;
69
70    /// Recent runs *with attribution* (dataset + owner). Powers
71    /// `GET /api/v1/activity/pipeline-runs`. Single SELECT joining
72    /// `pipeline_runs ⨝ datasets ⨝ users` (LEFT JOIN both ways so orphaned
73    /// runs whose dataset has been deleted still surface).
74    ///
75    /// Optional `dataset_id` narrows to a single dataset; `None` returns
76    /// rows across every dataset on the server.
77    ///
78    /// Default impl falls back to [`Self::list_recent`] without the join — used
79    /// only by mock implementations.
80    async fn list_recent_with_attribution(
81        &self,
82        dataset_id: Option<Uuid>,
83        limit: u32,
84    ) -> Result<Vec<PipelineRunWithAttributionRow>, DbError> {
85        let rows = self.list_recent(dataset_id, limit).await?;
86        Ok(rows
87            .into_iter()
88            .map(|r| PipelineRunWithAttributionRow {
89                id: r.id,
90                created_at: r.created_at,
91                status: r.status,
92                pipeline_run_id: r.pipeline_run_id,
93                pipeline_name: r.pipeline_name,
94                pipeline_id: r.pipeline_id,
95                dataset_id: r.dataset_id,
96                dataset_name: None,
97                owner_id: None,
98                owner_email: None,
99            })
100            .collect())
101    }
102
103    /// Restart-orphan reset: rewrite any row stuck in `INITIATED` / `STARTED`
104    /// without a more recent successor to `ERRORED` with the given `reason`.
105    ///
106    /// Returns the number of rows rewritten.
107    async fn reset_orphans(&self, reason: &str) -> Result<u64, DbError>;
108
109    /// Upsert a single payload field for a run. Concurrent calls with the
110    /// same `(run_id, key)` are last-write-wins per row; calls with different
111    /// keys do not contend.
112    async fn set_payload_field(
113        &self,
114        run_id: Uuid,
115        key: &str,
116        value: serde_json::Value,
117    ) -> Result<(), DbError>;
118
119    /// Read all payload fields for a run as a `serde_json::Map`. Returns an
120    /// empty map (not `None`) when the run has no payload events; returns
121    /// `Err` only on actual DB failures.
122    async fn get_payload(
123        &self,
124        run_id: Uuid,
125    ) -> Result<serde_json::Map<String, serde_json::Value>, DbError>;
126
127    /// Return the latest row for `pipeline_run_id` (ordered by `created_at DESC`).
128    ///
129    /// Multiple rows share the same `pipeline_run_id` per locked decision 12 —
130    /// Python intentionally reuses it across status transitions. This method
131    /// picks the most recent.
132    ///
133    /// Python parity: matches
134    /// [`get_pipeline_run.py`](https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/methods/get_pipeline_run.py).
135    /// Python uses `session.scalar()` without an `ORDER BY` — the Rust port
136    /// adds an explicit `ORDER BY created_at DESC` which is a *stronger*
137    /// guarantee consistent with decision 12 ("latest by `created_at` defines
138    /// current state"). Intentional, not drift.
139    async fn get_pipeline_run(&self, pipeline_run_id: Uuid)
140    -> Result<Option<PipelineRun>, DbError>;
141
142    /// Return the latest run for `(dataset_id, pipeline_name)` by `created_at`.
143    ///
144    /// Python parity: matches
145    /// [`get_pipeline_run_by_dataset.py`](https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/methods/get_pipeline_run_by_dataset.py).
146    async fn get_pipeline_run_by_dataset(
147        &self,
148        dataset_id: Uuid,
149        pipeline_name: &str,
150    ) -> Result<Option<PipelineRun>, DbError>;
151
152    /// Return one latest row per distinct `pipeline_name` that has runs for
153    /// `dataset_id`. Result order is unspecified.
154    ///
155    /// Supersedes the temporary `list_pipeline_names_for_dataset` helper that
156    /// task 08-05 introduced. Used by
157    /// `cognee_lib::api::pipeline_runs::reset_dataset_pipeline_run_status`
158    /// and the delete crate's prune flow to enumerate pipelines per dataset.
159    ///
160    /// Python parity: matches
161    /// [`get_pipeline_runs_by_dataset.py`](https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/methods/get_pipeline_runs_by_dataset.py).
162    async fn get_pipeline_runs_by_dataset(
163        &self,
164        dataset_id: Uuid,
165    ) -> Result<Vec<PipelineRun>, DbError>;
166}