cognee_core/provenance.rs
1//! Provenance stamping for DataPoints emitted by pipeline tasks.
2//!
3//! This module is **not** the same thing as
4//! [`crate::exec_status::ExecStatusManager::stamp_provenance`]. That
5//! trait method is an audit-log hook keyed on `data_id` (the input
6//! item's content hash) and never mutates DataPoint fields. The
7//! function in *this* module mutates DataPoint fields (the five
8//! `source_*` columns) and is called from the pipeline executor after
9//! every successful task. Both run during a normal pipeline run; they
10//! address different concerns (one writes a per-data-id row in the
11//! relational DB, the other writes onto the DataPoints flowing through
12//! the executor).
13//!
14//! See `docs/telemetry/05-datapoint-provenance.md` for the locked
15//! design decisions backing this module — in particular:
16//!
17//! - Decision 1: walk via the [`HasDataPoint`] trait, not serde-JSON
18//! reflection.
19//! - Decision 2: visited-set keyed on `DataPoint.id: Uuid`, not
20//! pointer identity.
21//! - Decision 3: the name collision with
22//! `ExecStatusManager::stamp_provenance` is intentional and locked
23//! — neither symbol is renamed.
24
25use std::collections::HashSet;
26
27use cognee_models::DataPoint;
28use uuid::Uuid;
29
30use crate::task::Value;
31
32// `HasDataPoint` is declared in `cognee-models` (next to its primary
33// impls) and re-exported here so the historical public path
34// `cognee_core::provenance::HasDataPoint` (and `cognee_core::HasDataPoint`
35// via `crates/core/src/lib.rs`) keeps resolving. Placement decision is
36// recorded in `docs/telemetry/05/04-has-datapoint-impls.md` §4.1.
37pub use cognee_models::HasDataPoint;
38
39/// What we know at the call site of [`stamp_tree`].
40///
41/// All fields are borrows so the executor can build a context per task
42/// without cloning strings on the hot path.
43#[derive(Clone, Copy)]
44pub struct ProvenanceContext<'a> {
45 /// Pipeline name, e.g. `"cognify_pipeline"`.
46 pub pipeline_name: &'a str,
47 /// Task name, e.g. `"extract_graph_from_data"`.
48 pub task_name: &'a str,
49 /// Resolved user label (`email` or `id` fallback). `None` if the
50 /// pipeline has no user attached.
51 pub user_label: Option<&'a str>,
52 /// Default `node_set` inherited by stamped DPs whose own
53 /// `source_node_set` is `None`.
54 pub node_set: Option<&'a str>,
55 /// Default `content_hash` inherited by stamped DPs whose own
56 /// `source_content_hash` is `None`.
57 pub content_hash: Option<&'a str>,
58}
59
60/// Stamp a tree of [`HasDataPoint`] values in place.
61///
62/// Mirrors Python's `_stamp_provenance` in
63/// `cognee/modules/pipelines/operations/run_tasks_base.py`:
64///
65/// - **Idempotent**: every assignment is guarded by
66/// `if dp.source_X.is_none()`, so a downstream task never overwrites
67/// an upstream stamp.
68/// - **Visited-set**: keyed on `DataPoint.id: Uuid` (locked decision
69/// 2). Re-entering the same DataPoint (by UUID) is a no-op.
70/// - **Inheritance**: `node_set` and `content_hash` inherit from the
71/// parent context if absent on the DP, but a value already present
72/// on the DP overrides for further recursion.
73pub fn stamp_tree(
74 root: &mut dyn HasDataPoint,
75 ctx: &ProvenanceContext<'_>,
76 visited: &mut HashSet<Uuid>,
77) {
78 let dp_id = root.data_point().id;
79 if !visited.insert(dp_id) {
80 return;
81 }
82
83 {
84 let dp = root.data_point_mut();
85 if dp.source_pipeline.is_none() {
86 dp.source_pipeline = Some(ctx.pipeline_name.to_string());
87 }
88 if dp.source_task.is_none() {
89 dp.source_task = Some(ctx.task_name.to_string());
90 }
91 if dp.source_user.is_none()
92 && let Some(u) = ctx.user_label
93 {
94 dp.source_user = Some(u.to_string());
95 }
96 }
97
98 // Compute the inherited values once before recursing. A DP that
99 // already carries node_set / content_hash exposes its own value to
100 // children; otherwise the parent context's value flows down. The
101 // temporary String allocations are only consumed when there are
102 // children to recurse into; for leaf DPs `for_each_child_mut`'s
103 // default impl is a no-op and the strings are dropped immediately.
104 let current_node_set = match root.data_point().source_node_set.as_deref() {
105 Some(v) => Some(v.to_string()),
106 None => ctx.node_set.map(|s| s.to_string()),
107 };
108 if root.data_point().source_node_set.is_none()
109 && let Some(v) = ctx.node_set
110 {
111 root.data_point_mut().source_node_set = Some(v.to_string());
112 }
113
114 let current_hash = match root.data_point().source_content_hash.as_deref() {
115 Some(v) => Some(v.to_string()),
116 None => ctx.content_hash.map(|s| s.to_string()),
117 };
118 if root.data_point().source_content_hash.is_none()
119 && let Some(v) = ctx.content_hash
120 {
121 root.data_point_mut().source_content_hash = Some(v.to_string());
122 }
123
124 let child_ctx = ProvenanceContext {
125 pipeline_name: ctx.pipeline_name,
126 task_name: ctx.task_name,
127 user_label: ctx.user_label,
128 node_set: current_node_set.as_deref(),
129 content_hash: current_hash.as_deref(),
130 };
131
132 root.for_each_child_mut(&mut |child| {
133 stamp_tree(child, &child_ctx, visited);
134 });
135}
136
137/// Walk a type-erased [`Value`] looking for the first non-empty
138/// `source_node_set` on an embedded `DataPoint`. Mirrors Python's
139/// `_extract_node_set`.
140///
141/// The downcast list below is the canonical set of `HasDataPoint`
142/// container types that the executor recognises. Types not in the
143/// list are passed through (return `None`); add them here in lockstep
144/// with new `HasDataPoint` impls in `cognee-models` (gap 05-04).
145pub fn extract_node_set_from_value(value: &dyn Value) -> Option<String> {
146 downcast_to_datapoint(value).and_then(|dp| dp.source_node_set.clone())
147}
148
149/// Walk a type-erased [`Value`] looking for the first non-empty
150/// `Data.content_hash` (raw ingestion artefact) **or**
151/// `DataPoint.source_content_hash`. Mirrors Python's
152/// `_extract_content_hash`.
153///
154/// The raw `Data` artefact takes priority over a stamped DataPoint —
155/// it is the lineage anchor closest to the ingestion source.
156pub fn extract_content_hash_from_value(value: &dyn Value) -> Option<String> {
157 use cognee_models::Data;
158
159 if let Some(d) = value.as_any().downcast_ref::<Data>()
160 && !d.content_hash.is_empty()
161 {
162 return Some(d.content_hash.clone());
163 }
164
165 downcast_to_datapoint(value).and_then(|dp| dp.source_content_hash.clone())
166}
167
168/// Internal helper: `value` → optional borrow of its embedded
169/// `DataPoint`. Keeps a single registered list of
170/// `HasDataPoint`-bearing container types, used by both
171/// [`extract_node_set_from_value`] and
172/// [`extract_content_hash_from_value`].
173fn downcast_to_datapoint(value: &dyn Value) -> Option<&DataPoint> {
174 use cognee_models::{Document, DocumentChunk, EdgeType, Entity, EntityType};
175
176 if let Some(d) = value.as_any().downcast_ref::<Document>() {
177 return Some(&d.base);
178 }
179 if let Some(d) = value.as_any().downcast_ref::<DocumentChunk>() {
180 return Some(&d.base);
181 }
182 if let Some(d) = value.as_any().downcast_ref::<Entity>() {
183 return Some(&d.base);
184 }
185 if let Some(d) = value.as_any().downcast_ref::<EntityType>() {
186 return Some(&d.base);
187 }
188 if let Some(d) = value.as_any().downcast_ref::<EdgeType>() {
189 return Some(&d.base);
190 }
191 // `cognee_models::Triplet` is intentionally absent: it is a flat
192 // struct without an embedded `DataPoint`. `cognify::TextSummary`
193 // and any other future container types should be added here in
194 // lockstep with their `HasDataPoint` impls (gap 05-04).
195 None
196}
197
198/// Type-erased dispatch for [`stamp_tree`].
199///
200/// Recognises every `HasDataPoint` container type from `cognee-models`
201/// (the executor's canonical set; gap 05-04) plus their bare `Vec<T>`
202/// containers as emitted by tasks. Returns `true` if the concrete type
203/// was recognised and stamped, `false` for "passed through unchanged"
204/// (matches Python's no-op branch for non-DataPoint values).
205///
206/// `cognify::TextSummary` is intentionally absent — gap 05-07 §4.7
207/// chose option (b): keep `TextSummary` out of the dyn cascade in
208/// `cognee-core` to avoid a `cognee-cognify` dependency. Cognify
209/// pre-stamps `TextSummary` via its local helper (locked decision 6).
210pub fn stamp_tree_dyn(
211 value: &mut dyn Value,
212 ctx: &ProvenanceContext<'_>,
213 visited: &mut HashSet<Uuid>,
214) -> bool {
215 use cognee_models::{Document, DocumentChunk, EdgeType, Entity, EntityType};
216
217 let any = value.as_any_mut();
218
219 if let Some(d) = any.downcast_mut::<Document>() {
220 stamp_tree(d, ctx, visited);
221 return true;
222 }
223 if let Some(d) = any.downcast_mut::<DocumentChunk>() {
224 stamp_tree(d, ctx, visited);
225 return true;
226 }
227 if let Some(d) = any.downcast_mut::<Entity>() {
228 stamp_tree(d, ctx, visited);
229 return true;
230 }
231 if let Some(d) = any.downcast_mut::<EntityType>() {
232 stamp_tree(d, ctx, visited);
233 return true;
234 }
235 if let Some(d) = any.downcast_mut::<EdgeType>() {
236 stamp_tree(d, ctx, visited);
237 return true;
238 }
239
240 // Vec<T> containers — task outputs that wrap multiple DataPoints
241 // (e.g. `Vec<DocumentChunk>` from a chunking task). Walk and recurse.
242 if let Some(items) = any.downcast_mut::<Vec<Document>>() {
243 for item in items.iter_mut() {
244 stamp_tree(item, ctx, visited);
245 }
246 return true;
247 }
248 if let Some(items) = any.downcast_mut::<Vec<DocumentChunk>>() {
249 for item in items.iter_mut() {
250 stamp_tree(item, ctx, visited);
251 }
252 return true;
253 }
254 if let Some(items) = any.downcast_mut::<Vec<Entity>>() {
255 for item in items.iter_mut() {
256 stamp_tree(item, ctx, visited);
257 }
258 return true;
259 }
260 if let Some(items) = any.downcast_mut::<Vec<EntityType>>() {
261 for item in items.iter_mut() {
262 stamp_tree(item, ctx, visited);
263 }
264 return true;
265 }
266 if let Some(items) = any.downcast_mut::<Vec<EdgeType>>() {
267 for item in items.iter_mut() {
268 stamp_tree(item, ctx, visited);
269 }
270 return true;
271 }
272
273 false
274}