Skip to main content

cognee_models/
data_point.rs

1//! DataPoint - Base model for all storage-layer entities.
2//!
3//! Mirrors Python's `cognee/infrastructure/engine/models/DataPoint.py`
4//! Provides common fields for UUID, timestamps, versioning, and metadata.
5
6use chrono::Utc;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use uuid::Uuid;
10
11/// Default value for `feedback_weight` (used by serde).
12fn default_feedback_weight() -> f64 {
13    0.5
14}
15
16/// Default value for `version` (used by serde).
17fn default_version() -> i32 {
18    1
19}
20
21/// Base model for all storage-layer entities.
22///
23/// Provides:
24/// - Unique identifier (UUID)
25/// - Timestamps (created_at, updated_at) as milliseconds since epoch
26/// - Ontology validation flag
27/// - Version tracking (integer)
28/// - Topological rank for graph traversal
29/// - Flexible metadata storage
30/// - Type discriminator
31/// - Dataset membership
32/// - Pipeline provenance fields
33/// - Feedback weight
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
35pub struct DataPoint {
36    /// Unique identifier
37    pub id: Uuid,
38
39    /// Creation timestamp (milliseconds since epoch, matching Python)
40    pub created_at: i64,
41
42    /// Last update timestamp (milliseconds since epoch, matching Python)
43    pub updated_at: i64,
44
45    /// Whether this entity has been validated against an ontology
46    pub ontology_valid: bool,
47
48    /// Version number (default 1, matching Python)
49    #[serde(default = "default_version")]
50    pub version: i32,
51
52    /// Topological rank for graph traversal optimization
53    pub topological_rank: Option<i32>,
54
55    /// Flexible metadata storage (e.g., index_fields, custom attributes)
56    pub metadata: HashMap<String, serde_json::Value>,
57
58    /// Type discriminator (e.g., "Entity", "EntityType", "EdgeType")
59    #[serde(rename = "type")]
60    pub data_type: String,
61
62    /// Dataset this data point belongs to (list of JSON values, matching Python)
63    pub belongs_to_set: Option<Vec<serde_json::Value>>,
64
65    /// Pipeline that created this data point
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub source_pipeline: Option<String>,
68
69    /// Task that created this data point
70    #[serde(default, skip_serializing_if = "Option::is_none")]
71    pub source_task: Option<String>,
72
73    /// Node set source
74    #[serde(default, skip_serializing_if = "Option::is_none")]
75    pub source_node_set: Option<String>,
76
77    /// User that triggered creation
78    #[serde(default, skip_serializing_if = "Option::is_none")]
79    pub source_user: Option<String>,
80
81    /// Content hash of the raw `Data` artefact that produced this DataPoint.
82    /// Propagates from upstream `Data.content_hash` through every task in
83    /// the cognify pipeline, enabling content-addressed lineage queries.
84    #[serde(default, skip_serializing_if = "Option::is_none")]
85    pub source_content_hash: Option<String>,
86
87    /// Feedback weight (default 0.5, matching Python)
88    #[serde(default = "default_feedback_weight")]
89    pub feedback_weight: f64,
90}
91
92impl DataPoint {
93    /// Create a new DataPoint with default values.
94    ///
95    /// # Arguments
96    /// * `data_type` - Type discriminator (e.g., "Entity", "EntityType")
97    /// * `dataset_id` - Optional dataset UUID
98    pub fn new(data_type: impl Into<String>, dataset_id: Option<Uuid>) -> Self {
99        let now = Utc::now().timestamp_millis();
100        Self {
101            id: Uuid::new_v4(),
102            created_at: now,
103            updated_at: now,
104            ontology_valid: false,
105            version: 1,
106            topological_rank: None,
107            metadata: HashMap::new(),
108            data_type: data_type.into(),
109            belongs_to_set: dataset_id.map(|id| vec![serde_json::json!(id.to_string())]),
110            source_pipeline: None,
111            source_task: None,
112            source_node_set: None,
113            source_user: None,
114            source_content_hash: None,
115            feedback_weight: 0.5,
116        }
117    }
118
119    /// Create a DataPoint with specific metadata.
120    pub fn with_metadata(
121        data_type: impl Into<String>,
122        dataset_id: Option<Uuid>,
123        metadata: HashMap<String, serde_json::Value>,
124    ) -> Self {
125        let now = Utc::now().timestamp_millis();
126        Self {
127            id: Uuid::new_v4(),
128            created_at: now,
129            updated_at: now,
130            ontology_valid: false,
131            version: 1,
132            topological_rank: None,
133            metadata,
134            data_type: data_type.into(),
135            belongs_to_set: dataset_id.map(|id| vec![serde_json::json!(id.to_string())]),
136            source_pipeline: None,
137            source_task: None,
138            source_node_set: None,
139            source_user: None,
140            source_content_hash: None,
141            feedback_weight: 0.5,
142        }
143    }
144
145    /// Get embeddable data as JSON string for vector indexing.
146    ///
147    /// Returns a JSON representation of this DataPoint.
148    pub fn get_embeddable_data(&self) -> String {
149        serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
150    }
151
152    /// Convert to JSON value.
153    pub fn to_json(&self) -> serde_json::Value {
154        serde_json::to_value(self).unwrap_or(serde_json::Value::Null)
155    }
156
157    /// Canonical vector-store payload keys for this DataPoint.
158    ///
159    /// Mirrors Python's `DataPoint.model_dump()` payload shape: every
160    /// pydantic-equivalent field flows into the metadata map. Keys with
161    /// `None` values are omitted (consistent with the
162    /// `skip_serializing_if = "Option::is_none"` annotations on the
163    /// struct).
164    ///
165    /// Used by the cognify and memify pipelines when constructing
166    /// `VectorPoint` payloads to keep the Rust shape byte-comparable to
167    /// Python's for the cross-SDK parity tests. Note: the `data_type`
168    /// field carries `#[serde(rename = "type")]`, so the resulting map
169    /// uses the JSON key `"type"` (matching Python).
170    pub fn vector_metadata(&self) -> HashMap<String, serde_json::Value> {
171        match serde_json::to_value(self) {
172            Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
173            _ => HashMap::new(),
174        }
175    }
176
177    /// Update the timestamp to current time.
178    pub fn touch(&mut self) {
179        self.updated_at = Utc::now().timestamp_millis();
180    }
181
182    /// Set ontology validation status.
183    pub fn set_ontology_valid(&mut self, valid: bool) {
184        self.ontology_valid = valid;
185        self.touch();
186    }
187
188    /// Add or update metadata field.
189    pub fn set_metadata(&mut self, key: impl Into<String>, value: serde_json::Value) {
190        self.metadata.insert(key.into(), value);
191        self.touch();
192    }
193
194    /// Get metadata field.
195    pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
196        self.metadata.get(key)
197    }
198}
199
200#[cfg(test)]
201#[allow(
202    clippy::unwrap_used,
203    clippy::expect_used,
204    reason = "test code — panics are acceptable failures"
205)]
206mod tests {
207    use super::*;
208    use serde_json::json;
209
210    #[test]
211    fn test_data_point_creation() {
212        let dp = DataPoint::new("TestType", None);
213        assert_eq!(dp.data_type, "TestType");
214        assert_eq!(dp.version, 1);
215        assert!(!dp.ontology_valid);
216        assert!(dp.metadata.is_empty());
217        assert!(dp.belongs_to_set.is_none());
218        assert!(dp.source_pipeline.is_none());
219        assert!(dp.source_task.is_none());
220        assert!(dp.source_node_set.is_none());
221        assert!(dp.source_user.is_none());
222        assert!(dp.source_content_hash.is_none());
223        assert!((dp.feedback_weight - 0.5).abs() < f64::EPSILON);
224        assert!(dp.created_at > 0);
225        assert!(dp.updated_at > 0);
226    }
227
228    #[test]
229    fn test_data_point_with_dataset() {
230        let dataset_id = Uuid::new_v4();
231        let dp = DataPoint::new("Entity", Some(dataset_id));
232        assert_eq!(
233            dp.belongs_to_set,
234            Some(vec![serde_json::json!(dataset_id.to_string())])
235        );
236    }
237
238    #[test]
239    fn test_metadata_operations() {
240        let mut dp = DataPoint::new("Entity", None);
241        dp.set_metadata("index_fields", serde_json::json!(["name"]));
242
243        assert_eq!(
244            dp.get_metadata("index_fields"),
245            Some(&serde_json::json!(["name"]))
246        );
247    }
248
249    #[test]
250    fn test_ontology_validation() {
251        let mut dp = DataPoint::new("Entity", None);
252        assert!(!dp.ontology_valid);
253
254        dp.set_ontology_valid(true);
255        assert!(dp.ontology_valid);
256    }
257
258    #[test]
259    fn test_get_embeddable_data() {
260        let dp = DataPoint::new("Entity", None);
261        let json_str = dp.get_embeddable_data();
262        assert!(json_str.contains("\"type\":\"Entity\""));
263    }
264
265    #[test]
266    fn source_content_hash_round_trips_when_set_and_omitted_when_none() {
267        let mut dp = DataPoint::new("Entity", None);
268        assert!(
269            !serde_json::to_string(&dp)
270                .unwrap()
271                .contains("source_content_hash"),
272            "absent field must be skipped by serde"
273        );
274
275        dp.source_content_hash = Some("md5:abcdef".to_string());
276        let json = serde_json::to_string(&dp).unwrap();
277        assert!(json.contains(r#""source_content_hash":"md5:abcdef""#));
278
279        let parsed: DataPoint = serde_json::from_str(&json).unwrap();
280        assert_eq!(parsed.source_content_hash.as_deref(), Some("md5:abcdef"));
281    }
282
283    #[test]
284    fn vector_metadata_includes_all_set_source_fields() {
285        let mut dp = DataPoint::new("Entity", None);
286        dp.source_pipeline = Some("cognify_pipeline".into());
287        dp.source_task = Some("classify_documents".into());
288        dp.source_user = Some("alice@example.com".into());
289        dp.source_node_set = Some("entity_nodes".into());
290        dp.source_content_hash = Some("md5:abcdef".into());
291
292        let m = dp.vector_metadata();
293        assert_eq!(
294            m.get("source_pipeline").unwrap(),
295            &json!("cognify_pipeline")
296        );
297        assert_eq!(m.get("source_task").unwrap(), &json!("classify_documents"));
298        assert_eq!(m.get("source_user").unwrap(), &json!("alice@example.com"));
299        assert_eq!(m.get("source_node_set").unwrap(), &json!("entity_nodes"));
300        assert_eq!(m.get("source_content_hash").unwrap(), &json!("md5:abcdef"));
301        // `data_type` round-trips as the JSON key `"type"` because of
302        // `#[serde(rename = "type")]` on the struct field.
303        assert_eq!(m.get("type").unwrap(), &json!("Entity"));
304        assert_eq!(m.get("version").unwrap(), &json!(1));
305        assert!(m.contains_key("created_at"));
306        assert!(m.contains_key("updated_at"));
307    }
308
309    #[test]
310    fn vector_metadata_omits_none_source_fields() {
311        let dp = DataPoint::new("Entity", None);
312        let m = dp.vector_metadata();
313        assert!(!m.contains_key("source_pipeline"));
314        assert!(!m.contains_key("source_task"));
315        assert!(!m.contains_key("source_user"));
316        assert!(!m.contains_key("source_node_set"));
317        assert!(!m.contains_key("source_content_hash"));
318    }
319
320    #[test]
321    fn test_touch_updates_timestamp() {
322        let mut dp = DataPoint::new("Entity", None);
323        let original_time = dp.updated_at;
324
325        std::thread::sleep(std::time::Duration::from_millis(10));
326        dp.touch();
327
328        assert!(dp.updated_at > original_time);
329    }
330}