cognee_database/pipelines/repository.rs
1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use uuid::Uuid;
6
7use crate::types::{DatabaseError, PipelineRun, PipelineRunStatus};
8
9/// Domain alias used in the trait signature.
10pub type PipelineRunRow = PipelineRun;
11/// Type alias for the database error used in this module.
12type DbError = DatabaseError;
13
14/// Row returned by [`PipelineRunRepository::list_recent_with_attribution`].
15///
16/// Joins `pipeline_runs ⨝ datasets ⨝ users` so the activity router can show
17/// "who/what/which dataset" attribution in one query (no N+1).
18#[derive(Debug, Clone)]
19pub struct PipelineRunWithAttributionRow {
20 pub id: Uuid,
21 pub created_at: DateTime<Utc>,
22 pub status: PipelineRunStatus,
23 pub pipeline_run_id: Uuid,
24 pub pipeline_name: String,
25 pub pipeline_id: Uuid,
26 pub dataset_id: Option<Uuid>,
27 pub dataset_name: Option<String>,
28 pub owner_id: Option<Uuid>,
29 pub owner_email: Option<String>,
30}
31
32/// Persistence abstraction for pipeline run status rows.
33///
34/// Each status transition writes a **new row** rather than updating in place,
35/// giving a full audit trail and matching Python's writing pattern.
36///
37/// Implementations must be `Send + Sync` so they can be stored behind an
38/// `Arc<dyn PipelineRunRepository>` and shared across async tasks.
39#[async_trait]
40pub trait PipelineRunRepository: Send + Sync {
41 /// Insert one row representing a status transition. Returns the new row's
42 /// primary key (`pipeline_runs.id`), which is a freshly generated UUIDv4.
43 async fn log_pipeline_run(
44 &self,
45 pipeline_run_id: Uuid,
46 pipeline_id: Uuid,
47 pipeline_name: &str,
48 dataset_id: Option<Uuid>,
49 status: PipelineRunStatus,
50 run_info: Option<serde_json::Value>,
51 ) -> Result<Uuid, DbError>;
52
53 /// Latest status per dataset for a given pipeline name.
54 ///
55 /// Returns a map from `dataset_id` to the most recent
56 /// `PipelineRunStatus` row for that dataset and pipeline name.
57 async fn latest_status(
58 &self,
59 dataset_ids: &[Uuid],
60 pipeline_name: &str,
61 ) -> Result<HashMap<Uuid, PipelineRunStatus>, DbError>;
62
63 /// Recent runs for the activity router, with optional dataset filter.
64 async fn list_recent(
65 &self,
66 dataset_id: Option<Uuid>,
67 limit: u32,
68 ) -> Result<Vec<PipelineRunRow>, DbError>;
69
70 /// Recent runs *with attribution* (dataset + owner). Powers
71 /// `GET /api/v1/activity/pipeline-runs`. Single SELECT joining
72 /// `pipeline_runs ⨝ datasets ⨝ users` (LEFT JOIN both ways so orphaned
73 /// runs whose dataset has been deleted still surface).
74 ///
75 /// Optional `dataset_id` narrows to a single dataset; `None` returns
76 /// rows across every dataset on the server.
77 ///
78 /// Default impl falls back to [`Self::list_recent`] without the join — used
79 /// only by mock implementations.
80 async fn list_recent_with_attribution(
81 &self,
82 dataset_id: Option<Uuid>,
83 limit: u32,
84 ) -> Result<Vec<PipelineRunWithAttributionRow>, DbError> {
85 let rows = self.list_recent(dataset_id, limit).await?;
86 Ok(rows
87 .into_iter()
88 .map(|r| PipelineRunWithAttributionRow {
89 id: r.id,
90 created_at: r.created_at,
91 status: r.status,
92 pipeline_run_id: r.pipeline_run_id,
93 pipeline_name: r.pipeline_name,
94 pipeline_id: r.pipeline_id,
95 dataset_id: r.dataset_id,
96 dataset_name: None,
97 owner_id: None,
98 owner_email: None,
99 })
100 .collect())
101 }
102
103 /// Restart-orphan reset: rewrite any row stuck in `INITIATED` / `STARTED`
104 /// without a more recent successor to `ERRORED` with the given `reason`.
105 ///
106 /// Returns the number of rows rewritten.
107 async fn reset_orphans(&self, reason: &str) -> Result<u64, DbError>;
108
109 /// Upsert a single payload field for a run. Concurrent calls with the
110 /// same `(run_id, key)` are last-write-wins per row; calls with different
111 /// keys do not contend.
112 async fn set_payload_field(
113 &self,
114 run_id: Uuid,
115 key: &str,
116 value: serde_json::Value,
117 ) -> Result<(), DbError>;
118
119 /// Read all payload fields for a run as a `serde_json::Map`. Returns an
120 /// empty map (not `None`) when the run has no payload events; returns
121 /// `Err` only on actual DB failures.
122 async fn get_payload(
123 &self,
124 run_id: Uuid,
125 ) -> Result<serde_json::Map<String, serde_json::Value>, DbError>;
126
127 /// Return the latest row for `pipeline_run_id` (ordered by `created_at DESC`).
128 ///
129 /// Multiple rows share the same `pipeline_run_id` per locked decision 12 —
130 /// Python intentionally reuses it across status transitions. This method
131 /// picks the most recent.
132 ///
133 /// Python parity: matches
134 /// [`get_pipeline_run.py`](https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/methods/get_pipeline_run.py).
135 /// Python uses `session.scalar()` without an `ORDER BY` — the Rust port
136 /// adds an explicit `ORDER BY created_at DESC` which is a *stronger*
137 /// guarantee consistent with decision 12 ("latest by `created_at` defines
138 /// current state"). Intentional, not drift.
139 async fn get_pipeline_run(&self, pipeline_run_id: Uuid)
140 -> Result<Option<PipelineRun>, DbError>;
141
142 /// Return the latest run for `(dataset_id, pipeline_name)` by `created_at`.
143 ///
144 /// Python parity: matches
145 /// [`get_pipeline_run_by_dataset.py`](https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/methods/get_pipeline_run_by_dataset.py).
146 async fn get_pipeline_run_by_dataset(
147 &self,
148 dataset_id: Uuid,
149 pipeline_name: &str,
150 ) -> Result<Option<PipelineRun>, DbError>;
151
152 /// Return one latest row per distinct `pipeline_name` that has runs for
153 /// `dataset_id`. Result order is unspecified.
154 ///
155 /// Supersedes the temporary `list_pipeline_names_for_dataset` helper that
156 /// task 08-05 introduced. Used by
157 /// `cognee_lib::api::pipeline_runs::reset_dataset_pipeline_run_status`
158 /// and the delete crate's prune flow to enumerate pipelines per dataset.
159 ///
160 /// Python parity: matches
161 /// [`get_pipeline_runs_by_dataset.py`](https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/methods/get_pipeline_runs_by_dataset.py).
162 async fn get_pipeline_runs_by_dataset(
163 &self,
164 dataset_id: Uuid,
165 ) -> Result<Vec<PipelineRun>, DbError>;
166}