Skip to main content

cognee_core/
provenance.rs

1//! Provenance stamping for DataPoints emitted by pipeline tasks.
2//!
3//! This module is **not** the same thing as
4//! [`crate::exec_status::ExecStatusManager::stamp_provenance`]. That
5//! trait method is an audit-log hook keyed on `data_id` (the input
6//! item's content hash) and never mutates DataPoint fields. The
7//! function in *this* module mutates DataPoint fields (the five
8//! `source_*` columns) and is called from the pipeline executor after
9//! every successful task. Both run during a normal pipeline run; they
10//! address different concerns (one writes a per-data-id row in the
11//! relational DB, the other writes onto the DataPoints flowing through
12//! the executor).
13//!
14//! See `docs/telemetry/05-datapoint-provenance.md` for the locked
15//! design decisions backing this module — in particular:
16//!
17//! - Decision 1: walk via the [`HasDataPoint`] trait, not serde-JSON
18//!   reflection.
19//! - Decision 2: visited-set keyed on `DataPoint.id: Uuid`, not
20//!   pointer identity.
21//! - Decision 3: the name collision with
22//!   `ExecStatusManager::stamp_provenance` is intentional and locked
23//!   — neither symbol is renamed.
24
25use std::collections::HashSet;
26
27use cognee_models::DataPoint;
28use uuid::Uuid;
29
30use crate::task::Value;
31
32// `HasDataPoint` is declared in `cognee-models` (next to its primary
33// impls) and re-exported here so the historical public path
34// `cognee_core::provenance::HasDataPoint` (and `cognee_core::HasDataPoint`
35// via `crates/core/src/lib.rs`) keeps resolving. Placement decision is
36// recorded in `docs/telemetry/05/04-has-datapoint-impls.md` §4.1.
37pub use cognee_models::HasDataPoint;
38
39/// What we know at the call site of [`stamp_tree`].
40///
41/// All fields are borrows so the executor can build a context per task
42/// without cloning strings on the hot path.
43#[derive(Clone, Copy)]
44pub struct ProvenanceContext<'a> {
45    /// Pipeline name, e.g. `"cognify_pipeline"`.
46    pub pipeline_name: &'a str,
47    /// Task name, e.g. `"extract_graph_from_data"`.
48    pub task_name: &'a str,
49    /// Resolved user label (`email` or `id` fallback). `None` if the
50    /// pipeline has no user attached.
51    pub user_label: Option<&'a str>,
52    /// Default `node_set` inherited by stamped DPs whose own
53    /// `source_node_set` is `None`.
54    pub node_set: Option<&'a str>,
55    /// Default `content_hash` inherited by stamped DPs whose own
56    /// `source_content_hash` is `None`.
57    pub content_hash: Option<&'a str>,
58}
59
60/// Stamp a tree of [`HasDataPoint`] values in place.
61///
62/// Mirrors Python's `_stamp_provenance` in
63/// `cognee/modules/pipelines/operations/run_tasks_base.py`:
64///
65/// - **Idempotent**: every assignment is guarded by
66///   `if dp.source_X.is_none()`, so a downstream task never overwrites
67///   an upstream stamp.
68/// - **Visited-set**: keyed on `DataPoint.id: Uuid` (locked decision
69///   2). Re-entering the same DataPoint (by UUID) is a no-op.
70/// - **Inheritance**: `node_set` and `content_hash` inherit from the
71///   parent context if absent on the DP, but a value already present
72///   on the DP overrides for further recursion.
73pub fn stamp_tree(
74    root: &mut dyn HasDataPoint,
75    ctx: &ProvenanceContext<'_>,
76    visited: &mut HashSet<Uuid>,
77) {
78    let dp_id = root.data_point().id;
79    if !visited.insert(dp_id) {
80        return;
81    }
82
83    {
84        let dp = root.data_point_mut();
85        if dp.source_pipeline.is_none() {
86            dp.source_pipeline = Some(ctx.pipeline_name.to_string());
87        }
88        if dp.source_task.is_none() {
89            dp.source_task = Some(ctx.task_name.to_string());
90        }
91        if dp.source_user.is_none()
92            && let Some(u) = ctx.user_label
93        {
94            dp.source_user = Some(u.to_string());
95        }
96    }
97
98    // Compute the inherited values once before recursing. A DP that
99    // already carries node_set / content_hash exposes its own value to
100    // children; otherwise the parent context's value flows down. The
101    // temporary String allocations are only consumed when there are
102    // children to recurse into; for leaf DPs `for_each_child_mut`'s
103    // default impl is a no-op and the strings are dropped immediately.
104    let current_node_set = match root.data_point().source_node_set.as_deref() {
105        Some(v) => Some(v.to_string()),
106        None => ctx.node_set.map(|s| s.to_string()),
107    };
108    if root.data_point().source_node_set.is_none()
109        && let Some(v) = ctx.node_set
110    {
111        root.data_point_mut().source_node_set = Some(v.to_string());
112    }
113
114    let current_hash = match root.data_point().source_content_hash.as_deref() {
115        Some(v) => Some(v.to_string()),
116        None => ctx.content_hash.map(|s| s.to_string()),
117    };
118    if root.data_point().source_content_hash.is_none()
119        && let Some(v) = ctx.content_hash
120    {
121        root.data_point_mut().source_content_hash = Some(v.to_string());
122    }
123
124    let child_ctx = ProvenanceContext {
125        pipeline_name: ctx.pipeline_name,
126        task_name: ctx.task_name,
127        user_label: ctx.user_label,
128        node_set: current_node_set.as_deref(),
129        content_hash: current_hash.as_deref(),
130    };
131
132    root.for_each_child_mut(&mut |child| {
133        stamp_tree(child, &child_ctx, visited);
134    });
135}
136
137/// Walk a type-erased [`Value`] looking for the first non-empty
138/// `source_node_set` on an embedded `DataPoint`. Mirrors Python's
139/// `_extract_node_set`.
140///
141/// The downcast list below is the canonical set of `HasDataPoint`
142/// container types that the executor recognises. Types not in the
143/// list are passed through (return `None`); add them here in lockstep
144/// with new `HasDataPoint` impls in `cognee-models` (gap 05-04).
145pub fn extract_node_set_from_value(value: &dyn Value) -> Option<String> {
146    downcast_to_datapoint(value).and_then(|dp| dp.source_node_set.clone())
147}
148
149/// Walk a type-erased [`Value`] looking for the first non-empty
150/// `Data.content_hash` (raw ingestion artefact) **or**
151/// `DataPoint.source_content_hash`. Mirrors Python's
152/// `_extract_content_hash`.
153///
154/// The raw `Data` artefact takes priority over a stamped DataPoint —
155/// it is the lineage anchor closest to the ingestion source.
156pub fn extract_content_hash_from_value(value: &dyn Value) -> Option<String> {
157    use cognee_models::Data;
158
159    if let Some(d) = value.as_any().downcast_ref::<Data>()
160        && !d.content_hash.is_empty()
161    {
162        return Some(d.content_hash.clone());
163    }
164
165    downcast_to_datapoint(value).and_then(|dp| dp.source_content_hash.clone())
166}
167
168/// Internal helper: `value` → optional borrow of its embedded
169/// `DataPoint`. Keeps a single registered list of
170/// `HasDataPoint`-bearing container types, used by both
171/// [`extract_node_set_from_value`] and
172/// [`extract_content_hash_from_value`].
173fn downcast_to_datapoint(value: &dyn Value) -> Option<&DataPoint> {
174    use cognee_models::{Document, DocumentChunk, EdgeType, Entity, EntityType};
175
176    if let Some(d) = value.as_any().downcast_ref::<Document>() {
177        return Some(&d.base);
178    }
179    if let Some(d) = value.as_any().downcast_ref::<DocumentChunk>() {
180        return Some(&d.base);
181    }
182    if let Some(d) = value.as_any().downcast_ref::<Entity>() {
183        return Some(&d.base);
184    }
185    if let Some(d) = value.as_any().downcast_ref::<EntityType>() {
186        return Some(&d.base);
187    }
188    if let Some(d) = value.as_any().downcast_ref::<EdgeType>() {
189        return Some(&d.base);
190    }
191    // `cognee_models::Triplet` is intentionally absent: it is a flat
192    // struct without an embedded `DataPoint`. `cognify::TextSummary`
193    // and any other future container types should be added here in
194    // lockstep with their `HasDataPoint` impls (gap 05-04).
195    None
196}
197
198/// Type-erased dispatch for [`stamp_tree`].
199///
200/// Recognises every `HasDataPoint` container type from `cognee-models`
201/// (the executor's canonical set; gap 05-04) plus their bare `Vec<T>`
202/// containers as emitted by tasks. Returns `true` if the concrete type
203/// was recognised and stamped, `false` for "passed through unchanged"
204/// (matches Python's no-op branch for non-DataPoint values).
205///
206/// `cognify::TextSummary` is intentionally absent — gap 05-07 §4.7
207/// chose option (b): keep `TextSummary` out of the dyn cascade in
208/// `cognee-core` to avoid a `cognee-cognify` dependency. Cognify
209/// pre-stamps `TextSummary` via its local helper (locked decision 6).
210pub fn stamp_tree_dyn(
211    value: &mut dyn Value,
212    ctx: &ProvenanceContext<'_>,
213    visited: &mut HashSet<Uuid>,
214) -> bool {
215    use cognee_models::{Document, DocumentChunk, EdgeType, Entity, EntityType};
216
217    let any = value.as_any_mut();
218
219    if let Some(d) = any.downcast_mut::<Document>() {
220        stamp_tree(d, ctx, visited);
221        return true;
222    }
223    if let Some(d) = any.downcast_mut::<DocumentChunk>() {
224        stamp_tree(d, ctx, visited);
225        return true;
226    }
227    if let Some(d) = any.downcast_mut::<Entity>() {
228        stamp_tree(d, ctx, visited);
229        return true;
230    }
231    if let Some(d) = any.downcast_mut::<EntityType>() {
232        stamp_tree(d, ctx, visited);
233        return true;
234    }
235    if let Some(d) = any.downcast_mut::<EdgeType>() {
236        stamp_tree(d, ctx, visited);
237        return true;
238    }
239
240    // Vec<T> containers — task outputs that wrap multiple DataPoints
241    // (e.g. `Vec<DocumentChunk>` from a chunking task). Walk and recurse.
242    if let Some(items) = any.downcast_mut::<Vec<Document>>() {
243        for item in items.iter_mut() {
244            stamp_tree(item, ctx, visited);
245        }
246        return true;
247    }
248    if let Some(items) = any.downcast_mut::<Vec<DocumentChunk>>() {
249        for item in items.iter_mut() {
250            stamp_tree(item, ctx, visited);
251        }
252        return true;
253    }
254    if let Some(items) = any.downcast_mut::<Vec<Entity>>() {
255        for item in items.iter_mut() {
256            stamp_tree(item, ctx, visited);
257        }
258        return true;
259    }
260    if let Some(items) = any.downcast_mut::<Vec<EntityType>>() {
261        for item in items.iter_mut() {
262            stamp_tree(item, ctx, visited);
263        }
264        return true;
265    }
266    if let Some(items) = any.downcast_mut::<Vec<EdgeType>>() {
267        for item in items.iter_mut() {
268            stamp_tree(item, ctx, visited);
269        }
270        return true;
271    }
272
273    false
274}