pub trait PipelineRunRepository: Send + Sync {
// Required methods
fn log_pipeline_run<'life0, 'life1, 'async_trait>(
&'life0 self,
pipeline_run_id: Uuid,
pipeline_id: Uuid,
pipeline_name: &'life1 str,
dataset_id: Option<Uuid>,
status: PipelineRunStatus,
run_info: Option<Value>,
) -> Pin<Box<dyn Future<Output = Result<Uuid, DatabaseError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait;
fn latest_status<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
dataset_ids: &'life1 [Uuid],
pipeline_name: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<HashMap<Uuid, PipelineRunStatus>, DatabaseError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Self: 'async_trait;
fn list_recent<'life0, 'async_trait>(
&'life0 self,
dataset_id: Option<Uuid>,
limit: u32,
) -> Pin<Box<dyn Future<Output = Result<Vec<PipelineRun>, DatabaseError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn reset_orphans<'life0, 'life1, 'async_trait>(
&'life0 self,
reason: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<u64, DatabaseError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait;
fn set_payload_field<'life0, 'life1, 'async_trait>(
&'life0 self,
run_id: Uuid,
key: &'life1 str,
value: Value,
) -> Pin<Box<dyn Future<Output = Result<(), DatabaseError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait;
fn get_payload<'life0, 'async_trait>(
&'life0 self,
run_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<Map<String, Value>, DatabaseError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn get_pipeline_run<'life0, 'async_trait>(
&'life0 self,
pipeline_run_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<Option<PipelineRun>, DatabaseError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn get_pipeline_run_by_dataset<'life0, 'life1, 'async_trait>(
&'life0 self,
dataset_id: Uuid,
pipeline_name: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<PipelineRun>, DatabaseError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait;
fn get_pipeline_runs_by_dataset<'life0, 'async_trait>(
&'life0 self,
dataset_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<Vec<PipelineRun>, DatabaseError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
// Provided method
fn list_recent_with_attribution<'life0, 'async_trait>(
&'life0 self,
dataset_id: Option<Uuid>,
limit: u32,
) -> Pin<Box<dyn Future<Output = Result<Vec<PipelineRunWithAttributionRow>, DatabaseError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait { ... }
}Expand description
Persistence abstraction for pipeline run status rows.
Each status transition writes a new row rather than updating in place, giving a full audit trail and matching Python’s writing pattern.
Implementations must be Send + Sync so they can be stored behind an
Arc<dyn PipelineRunRepository> and shared across async tasks.
Required Methods§
Sourcefn log_pipeline_run<'life0, 'life1, 'async_trait>(
&'life0 self,
pipeline_run_id: Uuid,
pipeline_id: Uuid,
pipeline_name: &'life1 str,
dataset_id: Option<Uuid>,
status: PipelineRunStatus,
run_info: Option<Value>,
) -> Pin<Box<dyn Future<Output = Result<Uuid, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn log_pipeline_run<'life0, 'life1, 'async_trait>(
&'life0 self,
pipeline_run_id: Uuid,
pipeline_id: Uuid,
pipeline_name: &'life1 str,
dataset_id: Option<Uuid>,
status: PipelineRunStatus,
run_info: Option<Value>,
) -> Pin<Box<dyn Future<Output = Result<Uuid, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Insert one row representing a status transition. Returns the new row’s
primary key (pipeline_runs.id), which is a freshly generated UUIDv4.
Sourcefn latest_status<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
dataset_ids: &'life1 [Uuid],
pipeline_name: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<HashMap<Uuid, PipelineRunStatus>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Self: 'async_trait,
fn latest_status<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
dataset_ids: &'life1 [Uuid],
pipeline_name: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<HashMap<Uuid, PipelineRunStatus>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Self: 'async_trait,
Latest status per dataset for a given pipeline name.
Returns a map from dataset_id to the most recent
PipelineRunStatus row for that dataset and pipeline name.
Sourcefn list_recent<'life0, 'async_trait>(
&'life0 self,
dataset_id: Option<Uuid>,
limit: u32,
) -> Pin<Box<dyn Future<Output = Result<Vec<PipelineRun>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn list_recent<'life0, 'async_trait>(
&'life0 self,
dataset_id: Option<Uuid>,
limit: u32,
) -> Pin<Box<dyn Future<Output = Result<Vec<PipelineRun>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Recent runs for the activity router, with optional dataset filter.
Sourcefn reset_orphans<'life0, 'life1, 'async_trait>(
&'life0 self,
reason: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<u64, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn reset_orphans<'life0, 'life1, 'async_trait>(
&'life0 self,
reason: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<u64, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Restart-orphan reset: rewrite any row stuck in INITIATED / STARTED
without a more recent successor to ERRORED with the given reason.
Returns the number of rows rewritten.
Sourcefn set_payload_field<'life0, 'life1, 'async_trait>(
&'life0 self,
run_id: Uuid,
key: &'life1 str,
value: Value,
) -> Pin<Box<dyn Future<Output = Result<(), DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn set_payload_field<'life0, 'life1, 'async_trait>(
&'life0 self,
run_id: Uuid,
key: &'life1 str,
value: Value,
) -> Pin<Box<dyn Future<Output = Result<(), DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Upsert a single payload field for a run. Concurrent calls with the
same (run_id, key) are last-write-wins per row; calls with different
keys do not contend.
Sourcefn get_payload<'life0, 'async_trait>(
&'life0 self,
run_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<Map<String, Value>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn get_payload<'life0, 'async_trait>(
&'life0 self,
run_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<Map<String, Value>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Read all payload fields for a run as a serde_json::Map. Returns an
empty map (not None) when the run has no payload events; returns
Err only on actual DB failures.
Sourcefn get_pipeline_run<'life0, 'async_trait>(
&'life0 self,
pipeline_run_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<Option<PipelineRun>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn get_pipeline_run<'life0, 'async_trait>(
&'life0 self,
pipeline_run_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<Option<PipelineRun>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Return the latest row for pipeline_run_id (ordered by created_at DESC).
Multiple rows share the same pipeline_run_id per locked decision 12 —
Python intentionally reuses it across status transitions. This method
picks the most recent.
Python parity: matches
get_pipeline_run.py.
Python uses session.scalar() without an ORDER BY — the Rust port
adds an explicit ORDER BY created_at DESC which is a stronger
guarantee consistent with decision 12 (“latest by created_at defines
current state”). Intentional, not drift.
Sourcefn get_pipeline_run_by_dataset<'life0, 'life1, 'async_trait>(
&'life0 self,
dataset_id: Uuid,
pipeline_name: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<PipelineRun>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn get_pipeline_run_by_dataset<'life0, 'life1, 'async_trait>(
&'life0 self,
dataset_id: Uuid,
pipeline_name: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<PipelineRun>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Return the latest run for (dataset_id, pipeline_name) by created_at.
Python parity: matches
get_pipeline_run_by_dataset.py.
Sourcefn get_pipeline_runs_by_dataset<'life0, 'async_trait>(
&'life0 self,
dataset_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<Vec<PipelineRun>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn get_pipeline_runs_by_dataset<'life0, 'async_trait>(
&'life0 self,
dataset_id: Uuid,
) -> Pin<Box<dyn Future<Output = Result<Vec<PipelineRun>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Return one latest row per distinct pipeline_name that has runs for
dataset_id. Result order is unspecified.
Supersedes the temporary list_pipeline_names_for_dataset helper that
task 08-05 introduced. Used by
cognee_lib::api::pipeline_runs::reset_dataset_pipeline_run_status
and the delete crate’s prune flow to enumerate pipelines per dataset.
Python parity: matches
get_pipeline_runs_by_dataset.py.
Provided Methods§
Sourcefn list_recent_with_attribution<'life0, 'async_trait>(
&'life0 self,
dataset_id: Option<Uuid>,
limit: u32,
) -> Pin<Box<dyn Future<Output = Result<Vec<PipelineRunWithAttributionRow>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn list_recent_with_attribution<'life0, 'async_trait>(
&'life0 self,
dataset_id: Option<Uuid>,
limit: u32,
) -> Pin<Box<dyn Future<Output = Result<Vec<PipelineRunWithAttributionRow>, DatabaseError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Recent runs with attribution (dataset + owner). Powers
GET /api/v1/activity/pipeline-runs. Single SELECT joining
pipeline_runs ⨝ datasets ⨝ users (LEFT JOIN both ways so orphaned
runs whose dataset has been deleted still surface).
Optional dataset_id narrows to a single dataset; None returns
rows across every dataset on the server.
Default impl falls back to Self::list_recent without the join — used
only by mock implementations.
Dyn Compatibility§
This trait is dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety".