1use anyhow::Result;
16use serde::{Deserialize, Serialize};
17use std::sync::Arc;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "operation", rename_all = "lowercase")]
24pub enum HttpSourceChange {
25 #[serde(rename = "insert")]
27 Insert {
28 element: HttpElement,
29 #[serde(skip_serializing_if = "Option::is_none")]
30 timestamp: Option<u64>,
31 },
32 #[serde(rename = "update")]
34 Update {
35 element: HttpElement,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 timestamp: Option<u64>,
38 },
39 #[serde(rename = "delete")]
41 Delete {
42 id: String,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 labels: Option<Vec<String>>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 timestamp: Option<u64>,
47 },
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(tag = "type", rename_all = "lowercase")]
53pub enum HttpElement {
54 #[serde(rename = "node")]
55 Node {
56 id: String,
57 labels: Vec<String>,
58 #[serde(default)]
59 properties: serde_json::Map<String, serde_json::Value>,
60 },
61 #[serde(rename = "relation")]
62 Relation {
63 id: String,
64 labels: Vec<String>,
65 from: String,
66 to: String,
67 #[serde(default)]
68 properties: serde_json::Map<String, serde_json::Value>,
69 },
70}
71
72pub fn convert_http_to_source_change(
74 http_change: &HttpSourceChange,
75 source_id: &str,
76) -> Result<drasi_core::models::SourceChange> {
77 use drasi_core::models::{ElementMetadata, ElementReference, SourceChange};
78
79 let get_timestamp = |ts: Option<u64>| -> u64 {
82 ts.map(|nanos| nanos / 1_000_000) .unwrap_or_else(|| {
84 crate::time::get_system_time_millis().unwrap_or_else(|e| {
85 log::warn!("Failed to get system time for HTTP event: {e}, using fallback");
86 chrono::Utc::now().timestamp_millis() as u64
87 })
88 })
89 };
90
91 match http_change {
92 HttpSourceChange::Insert { element, timestamp } => {
93 let element = create_element_from_http(element, source_id, get_timestamp(*timestamp))?;
94 Ok(SourceChange::Insert { element })
95 }
96 HttpSourceChange::Update { element, timestamp } => {
97 let element = create_element_from_http(element, source_id, get_timestamp(*timestamp))?;
98 Ok(SourceChange::Update { element })
99 }
100 HttpSourceChange::Delete {
101 id,
102 labels,
103 timestamp,
104 } => {
105 let metadata = ElementMetadata {
106 reference: ElementReference {
107 source_id: Arc::from(source_id),
108 element_id: Arc::from(id.as_str()),
109 },
110 labels: Arc::from(
111 labels
112 .as_ref()
113 .map(|l| l.iter().map(|s| Arc::from(s.as_str())).collect::<Vec<_>>())
114 .unwrap_or_default(),
115 ),
116 effective_from: get_timestamp(*timestamp),
117 };
118 Ok(SourceChange::Delete { metadata })
119 }
120 }
121}
122
123fn create_element_from_http(
125 http_element: &HttpElement,
126 source_id: &str,
127 timestamp: u64,
128) -> Result<drasi_core::models::Element> {
129 use drasi_core::models::{Element, ElementMetadata, ElementPropertyMap, ElementReference};
130
131 match http_element {
132 HttpElement::Node {
133 id,
134 labels,
135 properties,
136 } => {
137 let metadata = ElementMetadata {
138 reference: ElementReference {
139 source_id: Arc::from(source_id),
140 element_id: Arc::from(id.as_str()),
141 },
142 labels: Arc::from(
143 labels
144 .iter()
145 .map(|s| Arc::from(s.as_str()))
146 .collect::<Vec<_>>(),
147 ),
148 effective_from: timestamp,
149 };
150
151 let mut prop_map = ElementPropertyMap::new();
152 for (key, value) in properties {
153 prop_map.insert(key, convert_json_to_element_value(value)?);
154 }
155
156 Ok(Element::Node {
157 metadata,
158 properties: prop_map,
159 })
160 }
161 HttpElement::Relation {
162 id,
163 labels,
164 from,
165 to,
166 properties,
167 } => {
168 let metadata = ElementMetadata {
169 reference: ElementReference {
170 source_id: Arc::from(source_id),
171 element_id: Arc::from(id.as_str()),
172 },
173 labels: Arc::from(
174 labels
175 .iter()
176 .map(|s| Arc::from(s.as_str()))
177 .collect::<Vec<_>>(),
178 ),
179 effective_from: timestamp,
180 };
181
182 let mut prop_map = ElementPropertyMap::new();
183 for (key, value) in properties {
184 prop_map.insert(key, convert_json_to_element_value(value)?);
185 }
186
187 Ok(Element::Relation {
188 metadata,
189 properties: prop_map,
190 in_node: ElementReference {
191 source_id: Arc::from(source_id),
192 element_id: Arc::from(to.as_str()),
193 },
194 out_node: ElementReference {
195 source_id: Arc::from(source_id),
196 element_id: Arc::from(from.as_str()),
197 },
198 })
199 }
200 }
201}
202
203fn convert_json_to_element_value(
205 value: &serde_json::Value,
206) -> Result<drasi_core::models::ElementValue> {
207 use drasi_core::models::ElementValue;
208 use ordered_float::OrderedFloat;
209
210 match value {
211 serde_json::Value::Null => Ok(ElementValue::Null),
212 serde_json::Value::Bool(b) => Ok(ElementValue::Bool(*b)),
213 serde_json::Value::Number(n) => {
214 if let Some(i) = n.as_i64() {
215 Ok(ElementValue::Integer(i))
216 } else if let Some(f) = n.as_f64() {
217 Ok(ElementValue::Float(OrderedFloat(f)))
218 } else {
219 Err(anyhow::anyhow!("Invalid number value"))
220 }
221 }
222 serde_json::Value::String(s) => Ok(ElementValue::String(Arc::from(s.as_str()))),
223 serde_json::Value::Array(arr) => {
224 let elements: Result<Vec<_>> = arr.iter().map(convert_json_to_element_value).collect();
225 Ok(ElementValue::List(elements?))
226 }
227 serde_json::Value::Object(obj) => {
228 let mut map = drasi_core::models::ElementPropertyMap::new();
229 for (key, val) in obj {
230 map.insert(key, convert_json_to_element_value(val)?);
231 }
232 Ok(ElementValue::Object(map))
233 }
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240
241 #[test]
242 fn test_node_insert_deserialization() {
243 let json = r#"{
244 "operation": "insert",
245 "element": {
246 "type": "node",
247 "id": "user_123",
248 "labels": ["User", "Person"],
249 "properties": {
250 "name": "John Doe",
251 "age": 30,
252 "active": true
253 }
254 },
255 "timestamp": 1234567890000
256 }"#;
257
258 let result: HttpSourceChange = serde_json::from_str(json).unwrap();
259 match result {
260 HttpSourceChange::Insert { element, timestamp } => {
261 match element {
262 HttpElement::Node {
263 id,
264 labels,
265 properties,
266 } => {
267 assert_eq!(id, "user_123");
268 assert_eq!(labels, vec!["User", "Person"]);
269 assert_eq!(properties["name"], "John Doe");
270 assert_eq!(properties["age"], 30);
271 assert_eq!(properties["active"], true);
272 }
273 _ => panic!("Expected Node element"),
274 }
275 assert_eq!(timestamp, Some(1234567890000));
276 }
277 _ => panic!("Expected Insert operation"),
278 }
279 }
280
281 #[test]
282 fn test_relation_insert_deserialization() {
283 let json = r#"{
284 "operation": "insert",
285 "element": {
286 "type": "relation",
287 "id": "follows_123",
288 "labels": ["FOLLOWS"],
289 "from": "user_123",
290 "to": "user_456",
291 "properties": {
292 "since": "2024-01-01"
293 }
294 }
295 }"#;
296
297 let result: HttpSourceChange = serde_json::from_str(json).unwrap();
298 match result {
299 HttpSourceChange::Insert { element, timestamp } => {
300 match element {
301 HttpElement::Relation {
302 id,
303 labels,
304 from,
305 to,
306 properties,
307 } => {
308 assert_eq!(id, "follows_123");
309 assert_eq!(labels, vec!["FOLLOWS"]);
310 assert_eq!(from, "user_123");
311 assert_eq!(to, "user_456");
312 assert_eq!(properties["since"], "2024-01-01");
313 }
314 _ => panic!("Expected Relation element"),
315 }
316 assert_eq!(timestamp, None);
317 }
318 _ => panic!("Expected Insert operation"),
319 }
320 }
321
322 #[test]
323 fn test_delete_deserialization() {
324 let json = r#"{
325 "operation": "delete",
326 "id": "user_123",
327 "labels": ["User"],
328 "timestamp": 1234567890000
329 }"#;
330
331 let result: HttpSourceChange = serde_json::from_str(json).unwrap();
332 match result {
333 HttpSourceChange::Delete {
334 id,
335 labels,
336 timestamp,
337 } => {
338 assert_eq!(id, "user_123");
339 assert_eq!(labels, Some(vec!["User".to_string()]));
340 assert_eq!(timestamp, Some(1234567890000));
341 }
342 _ => panic!("Expected Delete operation"),
343 }
344 }
345
346 #[test]
347 fn test_minimal_delete_deserialization() {
348 let json = r#"{
349 "operation": "delete",
350 "id": "user_123"
351 }"#;
352
353 let result: HttpSourceChange = serde_json::from_str(json).unwrap();
354 match result {
355 HttpSourceChange::Delete {
356 id,
357 labels,
358 timestamp,
359 } => {
360 assert_eq!(id, "user_123");
361 assert_eq!(labels, None);
362 assert_eq!(timestamp, None);
363 }
364 _ => panic!("Expected Delete operation"),
365 }
366 }
367}