1use chrono::Utc;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use uuid::Uuid;
10
11fn default_feedback_weight() -> f64 {
13 0.5
14}
15
16fn default_version() -> i32 {
18 1
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
35pub struct DataPoint {
36 pub id: Uuid,
38
39 pub created_at: i64,
41
42 pub updated_at: i64,
44
45 pub ontology_valid: bool,
47
48 #[serde(default = "default_version")]
50 pub version: i32,
51
52 pub topological_rank: Option<i32>,
54
55 pub metadata: HashMap<String, serde_json::Value>,
57
58 #[serde(rename = "type")]
60 pub data_type: String,
61
62 pub belongs_to_set: Option<Vec<serde_json::Value>>,
64
65 #[serde(default, skip_serializing_if = "Option::is_none")]
67 pub source_pipeline: Option<String>,
68
69 #[serde(default, skip_serializing_if = "Option::is_none")]
71 pub source_task: Option<String>,
72
73 #[serde(default, skip_serializing_if = "Option::is_none")]
75 pub source_node_set: Option<String>,
76
77 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub source_user: Option<String>,
80
81 #[serde(default, skip_serializing_if = "Option::is_none")]
85 pub source_content_hash: Option<String>,
86
87 #[serde(default = "default_feedback_weight")]
89 pub feedback_weight: f64,
90}
91
92impl DataPoint {
93 pub fn new(data_type: impl Into<String>, dataset_id: Option<Uuid>) -> Self {
99 let now = Utc::now().timestamp_millis();
100 Self {
101 id: Uuid::new_v4(),
102 created_at: now,
103 updated_at: now,
104 ontology_valid: false,
105 version: 1,
106 topological_rank: None,
107 metadata: HashMap::new(),
108 data_type: data_type.into(),
109 belongs_to_set: dataset_id.map(|id| vec![serde_json::json!(id.to_string())]),
110 source_pipeline: None,
111 source_task: None,
112 source_node_set: None,
113 source_user: None,
114 source_content_hash: None,
115 feedback_weight: 0.5,
116 }
117 }
118
119 pub fn with_metadata(
121 data_type: impl Into<String>,
122 dataset_id: Option<Uuid>,
123 metadata: HashMap<String, serde_json::Value>,
124 ) -> Self {
125 let now = Utc::now().timestamp_millis();
126 Self {
127 id: Uuid::new_v4(),
128 created_at: now,
129 updated_at: now,
130 ontology_valid: false,
131 version: 1,
132 topological_rank: None,
133 metadata,
134 data_type: data_type.into(),
135 belongs_to_set: dataset_id.map(|id| vec![serde_json::json!(id.to_string())]),
136 source_pipeline: None,
137 source_task: None,
138 source_node_set: None,
139 source_user: None,
140 source_content_hash: None,
141 feedback_weight: 0.5,
142 }
143 }
144
145 pub fn get_embeddable_data(&self) -> String {
149 serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
150 }
151
152 pub fn to_json(&self) -> serde_json::Value {
154 serde_json::to_value(self).unwrap_or(serde_json::Value::Null)
155 }
156
157 pub fn vector_metadata(&self) -> HashMap<String, serde_json::Value> {
171 match serde_json::to_value(self) {
172 Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
173 _ => HashMap::new(),
174 }
175 }
176
177 pub fn touch(&mut self) {
179 self.updated_at = Utc::now().timestamp_millis();
180 }
181
182 pub fn set_ontology_valid(&mut self, valid: bool) {
184 self.ontology_valid = valid;
185 self.touch();
186 }
187
188 pub fn set_metadata(&mut self, key: impl Into<String>, value: serde_json::Value) {
190 self.metadata.insert(key.into(), value);
191 self.touch();
192 }
193
194 pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
196 self.metadata.get(key)
197 }
198}
199
200#[cfg(test)]
201#[allow(
202 clippy::unwrap_used,
203 clippy::expect_used,
204 reason = "test code — panics are acceptable failures"
205)]
206mod tests {
207 use super::*;
208 use serde_json::json;
209
210 #[test]
211 fn test_data_point_creation() {
212 let dp = DataPoint::new("TestType", None);
213 assert_eq!(dp.data_type, "TestType");
214 assert_eq!(dp.version, 1);
215 assert!(!dp.ontology_valid);
216 assert!(dp.metadata.is_empty());
217 assert!(dp.belongs_to_set.is_none());
218 assert!(dp.source_pipeline.is_none());
219 assert!(dp.source_task.is_none());
220 assert!(dp.source_node_set.is_none());
221 assert!(dp.source_user.is_none());
222 assert!(dp.source_content_hash.is_none());
223 assert!((dp.feedback_weight - 0.5).abs() < f64::EPSILON);
224 assert!(dp.created_at > 0);
225 assert!(dp.updated_at > 0);
226 }
227
228 #[test]
229 fn test_data_point_with_dataset() {
230 let dataset_id = Uuid::new_v4();
231 let dp = DataPoint::new("Entity", Some(dataset_id));
232 assert_eq!(
233 dp.belongs_to_set,
234 Some(vec![serde_json::json!(dataset_id.to_string())])
235 );
236 }
237
238 #[test]
239 fn test_metadata_operations() {
240 let mut dp = DataPoint::new("Entity", None);
241 dp.set_metadata("index_fields", serde_json::json!(["name"]));
242
243 assert_eq!(
244 dp.get_metadata("index_fields"),
245 Some(&serde_json::json!(["name"]))
246 );
247 }
248
249 #[test]
250 fn test_ontology_validation() {
251 let mut dp = DataPoint::new("Entity", None);
252 assert!(!dp.ontology_valid);
253
254 dp.set_ontology_valid(true);
255 assert!(dp.ontology_valid);
256 }
257
258 #[test]
259 fn test_get_embeddable_data() {
260 let dp = DataPoint::new("Entity", None);
261 let json_str = dp.get_embeddable_data();
262 assert!(json_str.contains("\"type\":\"Entity\""));
263 }
264
265 #[test]
266 fn source_content_hash_round_trips_when_set_and_omitted_when_none() {
267 let mut dp = DataPoint::new("Entity", None);
268 assert!(
269 !serde_json::to_string(&dp)
270 .unwrap()
271 .contains("source_content_hash"),
272 "absent field must be skipped by serde"
273 );
274
275 dp.source_content_hash = Some("md5:abcdef".to_string());
276 let json = serde_json::to_string(&dp).unwrap();
277 assert!(json.contains(r#""source_content_hash":"md5:abcdef""#));
278
279 let parsed: DataPoint = serde_json::from_str(&json).unwrap();
280 assert_eq!(parsed.source_content_hash.as_deref(), Some("md5:abcdef"));
281 }
282
283 #[test]
284 fn vector_metadata_includes_all_set_source_fields() {
285 let mut dp = DataPoint::new("Entity", None);
286 dp.source_pipeline = Some("cognify_pipeline".into());
287 dp.source_task = Some("classify_documents".into());
288 dp.source_user = Some("alice@example.com".into());
289 dp.source_node_set = Some("entity_nodes".into());
290 dp.source_content_hash = Some("md5:abcdef".into());
291
292 let m = dp.vector_metadata();
293 assert_eq!(
294 m.get("source_pipeline").unwrap(),
295 &json!("cognify_pipeline")
296 );
297 assert_eq!(m.get("source_task").unwrap(), &json!("classify_documents"));
298 assert_eq!(m.get("source_user").unwrap(), &json!("alice@example.com"));
299 assert_eq!(m.get("source_node_set").unwrap(), &json!("entity_nodes"));
300 assert_eq!(m.get("source_content_hash").unwrap(), &json!("md5:abcdef"));
301 assert_eq!(m.get("type").unwrap(), &json!("Entity"));
304 assert_eq!(m.get("version").unwrap(), &json!(1));
305 assert!(m.contains_key("created_at"));
306 assert!(m.contains_key("updated_at"));
307 }
308
309 #[test]
310 fn vector_metadata_omits_none_source_fields() {
311 let dp = DataPoint::new("Entity", None);
312 let m = dp.vector_metadata();
313 assert!(!m.contains_key("source_pipeline"));
314 assert!(!m.contains_key("source_task"));
315 assert!(!m.contains_key("source_user"));
316 assert!(!m.contains_key("source_node_set"));
317 assert!(!m.contains_key("source_content_hash"));
318 }
319
320 #[test]
321 fn test_touch_updates_timestamp() {
322 let mut dp = DataPoint::new("Entity", None);
323 let original_time = dp.updated_at;
324
325 std::thread::sleep(std::time::Duration::from_millis(10));
326 dp.touch();
327
328 assert!(dp.updated_at > original_time);
329 }
330}