Skip to main content

drasi_source_http/
models.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Result;
16use serde::{Deserialize, Serialize};
17use std::sync::Arc;
18
19/// Data schema for HTTP source events
20///
21/// This schema closely mirrors drasi_core::models::SourceChange for efficient conversion
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "operation", rename_all = "lowercase")]
24pub enum HttpSourceChange {
25    /// Insert a new element
26    #[serde(rename = "insert")]
27    Insert {
28        element: HttpElement,
29        #[serde(skip_serializing_if = "Option::is_none")]
30        timestamp: Option<u64>,
31    },
32    /// Update an existing element
33    #[serde(rename = "update")]
34    Update {
35        element: HttpElement,
36        #[serde(skip_serializing_if = "Option::is_none")]
37        timestamp: Option<u64>,
38    },
39    /// Delete an element
40    #[serde(rename = "delete")]
41    Delete {
42        id: String,
43        #[serde(skip_serializing_if = "Option::is_none")]
44        labels: Option<Vec<String>>,
45        #[serde(skip_serializing_if = "Option::is_none")]
46        timestamp: Option<u64>,
47    },
48}
49
50/// Element that can be either a Node or Relation
51#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(tag = "type", rename_all = "lowercase")]
53pub enum HttpElement {
54    #[serde(rename = "node")]
55    Node {
56        id: String,
57        labels: Vec<String>,
58        #[serde(default)]
59        properties: serde_json::Map<String, serde_json::Value>,
60    },
61    #[serde(rename = "relation")]
62    Relation {
63        id: String,
64        labels: Vec<String>,
65        from: String,
66        to: String,
67        #[serde(default)]
68        properties: serde_json::Map<String, serde_json::Value>,
69    },
70}
71
72/// Convert HttpSourceChange to drasi_core::models::SourceChange
73pub fn convert_http_to_source_change(
74    http_change: &HttpSourceChange,
75    source_id: &str,
76) -> Result<drasi_core::models::SourceChange> {
77    use drasi_core::models::{ElementMetadata, ElementReference, SourceChange};
78
79    // Get timestamp or use current time in milliseconds
80    // Note: HTTP API accepts timestamps in nanoseconds, we convert to milliseconds
81    let get_timestamp = |ts: Option<u64>| -> u64 {
82        ts.map(|nanos| nanos / 1_000_000) // Convert nanoseconds to milliseconds
83            .unwrap_or_else(|| {
84                crate::time::get_system_time_millis().unwrap_or_else(|e| {
85                    log::warn!("Failed to get system time for HTTP event: {e}, using fallback");
86                    chrono::Utc::now().timestamp_millis() as u64
87                })
88            })
89    };
90
91    match http_change {
92        HttpSourceChange::Insert { element, timestamp } => {
93            let element = create_element_from_http(element, source_id, get_timestamp(*timestamp))?;
94            Ok(SourceChange::Insert { element })
95        }
96        HttpSourceChange::Update { element, timestamp } => {
97            let element = create_element_from_http(element, source_id, get_timestamp(*timestamp))?;
98            Ok(SourceChange::Update { element })
99        }
100        HttpSourceChange::Delete {
101            id,
102            labels,
103            timestamp,
104        } => {
105            let metadata = ElementMetadata {
106                reference: ElementReference {
107                    source_id: Arc::from(source_id),
108                    element_id: Arc::from(id.as_str()),
109                },
110                labels: Arc::from(
111                    labels
112                        .as_ref()
113                        .map(|l| l.iter().map(|s| Arc::from(s.as_str())).collect::<Vec<_>>())
114                        .unwrap_or_default(),
115                ),
116                effective_from: get_timestamp(*timestamp),
117            };
118            Ok(SourceChange::Delete { metadata })
119        }
120    }
121}
122
123/// Create Element from HttpElement
124fn create_element_from_http(
125    http_element: &HttpElement,
126    source_id: &str,
127    timestamp: u64,
128) -> Result<drasi_core::models::Element> {
129    use drasi_core::models::{Element, ElementMetadata, ElementPropertyMap, ElementReference};
130
131    match http_element {
132        HttpElement::Node {
133            id,
134            labels,
135            properties,
136        } => {
137            let metadata = ElementMetadata {
138                reference: ElementReference {
139                    source_id: Arc::from(source_id),
140                    element_id: Arc::from(id.as_str()),
141                },
142                labels: Arc::from(
143                    labels
144                        .iter()
145                        .map(|s| Arc::from(s.as_str()))
146                        .collect::<Vec<_>>(),
147                ),
148                effective_from: timestamp,
149            };
150
151            let mut prop_map = ElementPropertyMap::new();
152            for (key, value) in properties {
153                prop_map.insert(key, convert_json_to_element_value(value)?);
154            }
155
156            Ok(Element::Node {
157                metadata,
158                properties: prop_map,
159            })
160        }
161        HttpElement::Relation {
162            id,
163            labels,
164            from,
165            to,
166            properties,
167        } => {
168            let metadata = ElementMetadata {
169                reference: ElementReference {
170                    source_id: Arc::from(source_id),
171                    element_id: Arc::from(id.as_str()),
172                },
173                labels: Arc::from(
174                    labels
175                        .iter()
176                        .map(|s| Arc::from(s.as_str()))
177                        .collect::<Vec<_>>(),
178                ),
179                effective_from: timestamp,
180            };
181
182            let mut prop_map = ElementPropertyMap::new();
183            for (key, value) in properties {
184                prop_map.insert(key, convert_json_to_element_value(value)?);
185            }
186
187            Ok(Element::Relation {
188                metadata,
189                properties: prop_map,
190                in_node: ElementReference {
191                    source_id: Arc::from(source_id),
192                    element_id: Arc::from(to.as_str()),
193                },
194                out_node: ElementReference {
195                    source_id: Arc::from(source_id),
196                    element_id: Arc::from(from.as_str()),
197                },
198            })
199        }
200    }
201}
202
203/// Convert JSON value to ElementValue
204fn convert_json_to_element_value(
205    value: &serde_json::Value,
206) -> Result<drasi_core::models::ElementValue> {
207    use drasi_core::models::ElementValue;
208    use ordered_float::OrderedFloat;
209
210    match value {
211        serde_json::Value::Null => Ok(ElementValue::Null),
212        serde_json::Value::Bool(b) => Ok(ElementValue::Bool(*b)),
213        serde_json::Value::Number(n) => {
214            if let Some(i) = n.as_i64() {
215                Ok(ElementValue::Integer(i))
216            } else if let Some(f) = n.as_f64() {
217                Ok(ElementValue::Float(OrderedFloat(f)))
218            } else {
219                Err(anyhow::anyhow!("Invalid number value"))
220            }
221        }
222        serde_json::Value::String(s) => Ok(ElementValue::String(Arc::from(s.as_str()))),
223        serde_json::Value::Array(arr) => {
224            let elements: Result<Vec<_>> = arr.iter().map(convert_json_to_element_value).collect();
225            Ok(ElementValue::List(elements?))
226        }
227        serde_json::Value::Object(obj) => {
228            let mut map = drasi_core::models::ElementPropertyMap::new();
229            for (key, val) in obj {
230                map.insert(key, convert_json_to_element_value(val)?);
231            }
232            Ok(ElementValue::Object(map))
233        }
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240
241    #[test]
242    fn test_node_insert_deserialization() {
243        let json = r#"{
244            "operation": "insert",
245            "element": {
246                "type": "node",
247                "id": "user_123",
248                "labels": ["User", "Person"],
249                "properties": {
250                    "name": "John Doe",
251                    "age": 30,
252                    "active": true
253                }
254            },
255            "timestamp": 1234567890000
256        }"#;
257
258        let result: HttpSourceChange = serde_json::from_str(json).unwrap();
259        match result {
260            HttpSourceChange::Insert { element, timestamp } => {
261                match element {
262                    HttpElement::Node {
263                        id,
264                        labels,
265                        properties,
266                    } => {
267                        assert_eq!(id, "user_123");
268                        assert_eq!(labels, vec!["User", "Person"]);
269                        assert_eq!(properties["name"], "John Doe");
270                        assert_eq!(properties["age"], 30);
271                        assert_eq!(properties["active"], true);
272                    }
273                    _ => panic!("Expected Node element"),
274                }
275                assert_eq!(timestamp, Some(1234567890000));
276            }
277            _ => panic!("Expected Insert operation"),
278        }
279    }
280
281    #[test]
282    fn test_relation_insert_deserialization() {
283        let json = r#"{
284            "operation": "insert",
285            "element": {
286                "type": "relation",
287                "id": "follows_123",
288                "labels": ["FOLLOWS"],
289                "from": "user_123",
290                "to": "user_456",
291                "properties": {
292                    "since": "2024-01-01"
293                }
294            }
295        }"#;
296
297        let result: HttpSourceChange = serde_json::from_str(json).unwrap();
298        match result {
299            HttpSourceChange::Insert { element, timestamp } => {
300                match element {
301                    HttpElement::Relation {
302                        id,
303                        labels,
304                        from,
305                        to,
306                        properties,
307                    } => {
308                        assert_eq!(id, "follows_123");
309                        assert_eq!(labels, vec!["FOLLOWS"]);
310                        assert_eq!(from, "user_123");
311                        assert_eq!(to, "user_456");
312                        assert_eq!(properties["since"], "2024-01-01");
313                    }
314                    _ => panic!("Expected Relation element"),
315                }
316                assert_eq!(timestamp, None);
317            }
318            _ => panic!("Expected Insert operation"),
319        }
320    }
321
322    #[test]
323    fn test_delete_deserialization() {
324        let json = r#"{
325            "operation": "delete",
326            "id": "user_123",
327            "labels": ["User"],
328            "timestamp": 1234567890000
329        }"#;
330
331        let result: HttpSourceChange = serde_json::from_str(json).unwrap();
332        match result {
333            HttpSourceChange::Delete {
334                id,
335                labels,
336                timestamp,
337            } => {
338                assert_eq!(id, "user_123");
339                assert_eq!(labels, Some(vec!["User".to_string()]));
340                assert_eq!(timestamp, Some(1234567890000));
341            }
342            _ => panic!("Expected Delete operation"),
343        }
344    }
345
346    #[test]
347    fn test_minimal_delete_deserialization() {
348        let json = r#"{
349            "operation": "delete",
350            "id": "user_123"
351        }"#;
352
353        let result: HttpSourceChange = serde_json::from_str(json).unwrap();
354        match result {
355            HttpSourceChange::Delete {
356                id,
357                labels,
358                timestamp,
359            } => {
360                assert_eq!(id, "user_123");
361                assert_eq!(labels, None);
362                assert_eq!(timestamp, None);
363            }
364            _ => panic!("Expected Delete operation"),
365        }
366    }
367}