1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
//! Python-parity reset helpers for the `pipeline_runs` table.
//!
//! Exposes [`reset_pipeline_run_status`] (single pipeline) and
//! [`reset_dataset_pipeline_run_status`] (every pipeline registered against
//! a dataset). Both write a fresh `INITIATED` row through the
//! [`PipelineRunRepository`] (decision 11: single point of truth). The
//! dataset-level helper short-circuits when the latest row for a pipeline is
//! already `INITIATED`, matching Python's
//! [`reset_dataset_pipeline_run_status`](https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py).
//!
//! See [docs/telemetry/08/05-reset-helpers.md](../../../docs/telemetry/08/05-reset-helpers.md)
//! for the full design.
use Arc;
use Uuid;
use ;
use ;
use ApiError;
/// Insert a fresh `INITIATED` row for the `(user_id, dataset_id, pipeline_name)`
/// triple so a future re-cognify is not short-circuited by
/// `check_pipeline_run_qualification` (task 08-08).
///
/// `pipeline_id` and `pipeline_run_id` are derived deterministically using the
/// Python-parity helpers in [`cognee_core::pipeline_run_registry::ids`]:
///
/// - `pipeline_id = uuid5(OID, "{user_id}{pipeline_name}{dataset_id}")`
/// - `pipeline_run_id = uuid5(OID, "{pipeline_id}_{dataset_id}")`
///
/// `run_info` is the empty object `{}` (decision 5,
/// `crates/core/src/pipeline_run_registry/data_info.rs::run_info_for_initiated`).
///
/// Matches Python's
/// [`reset_pipeline_run_status`](https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/methods/reset_pipeline_run_status.py).
///
/// # Errors
///
/// Returns [`ApiError::InvalidArgument`] if the DB write fails — wraps the
/// underlying `DatabaseError` message verbatim so callers can surface it.
pub async
/// Walk every distinct `pipeline_name` that has at least one
/// `pipeline_runs` row for `dataset_id` and call
/// [`reset_pipeline_run_status`] for each, skipping ones whose latest
/// status is already `INITIATED`.
///
/// Matches Python's
/// [`reset_dataset_pipeline_run_status`](https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py).
///
/// Uses [`PipelineRunRepository::get_pipeline_runs_by_dataset`] (one latest
/// row per `pipeline_name`) to enumerate the work — see action item 08-06.
///
/// # Errors
///
/// Returns the first error from the underlying repository — the iteration
/// stops at the first failure.
pub async