rust_logic_graph/multi_db/
correlation.rs

1use serde_json::Value;
2use std::collections::HashMap;
3use tracing::{debug, info};
4
5use crate::error::{ErrorContext, RustLogicGraphError};
6
7/// Strategy for joining query results from different databases
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum JoinStrategy {
10    /// Inner join: Only include rows that exist in both datasets
11    Inner,
12    /// Left join: Include all rows from left dataset, matching rows from right
13    Left,
14    /// Right join: Include all rows from right dataset, matching rows from left
15    Right,
16    /// Full outer join: Include all rows from both datasets
17    Full,
18}
19
20/// Query result correlator for joining data from multiple databases
21///
22/// Enables joining and correlating query results from different databases
23/// using common keys, similar to SQL JOINs but across distributed databases.
24///
25/// # Example
26/// ```no_run
27/// use rust_logic_graph::multi_db::{QueryCorrelator, JoinStrategy};
28/// use serde_json::json;
29///
30/// let mut correlator = QueryCorrelator::new();
31///
32/// // Data from OMS database
33/// let users = json!([
34///     {"user_id": 1, "name": "Alice"},
35///     {"user_id": 2, "name": "Bob"},
36/// ]);
37///
38/// // Data from Orders database
39/// let orders = json!([
40///     {"order_id": 101, "user_id": 1, "amount": 50.0},
41///     {"order_id": 102, "user_id": 1, "amount": 75.0},
42///     {"order_id": 103, "user_id": 3, "amount": 100.0},
43/// ]);
44///
45/// let result = correlator.join(
46///     &users,
47///     &orders,
48///     "user_id",
49///     "user_id",
50///     JoinStrategy::Inner
51/// ).unwrap();
52///
53/// println!("Joined {} rows", result.as_array().unwrap().len());
54/// ```
55pub struct QueryCorrelator {
56    /// Optional prefix for disambiguating column names from left dataset
57    pub left_prefix: Option<String>,
58    /// Optional prefix for disambiguating column names from right dataset
59    pub right_prefix: Option<String>,
60}
61
62impl QueryCorrelator {
63    /// Create a new query correlator
64    pub fn new() -> Self {
65        Self {
66            left_prefix: None,
67            right_prefix: None,
68        }
69    }
70
71    /// Set prefix for left dataset columns (e.g., "user_")
72    pub fn with_left_prefix(mut self, prefix: impl Into<String>) -> Self {
73        self.left_prefix = Some(prefix.into());
74        self
75    }
76
77    /// Set prefix for right dataset columns (e.g., "order_")
78    pub fn with_right_prefix(mut self, prefix: impl Into<String>) -> Self {
79        self.right_prefix = Some(prefix.into());
80        self
81    }
82
83    /// Join two datasets on specified keys
84    ///
85    /// # Arguments
86    /// * `left` - Left dataset (JSON array of objects)
87    /// * `right` - Right dataset (JSON array of objects)
88    /// * `left_key` - Key field name in left dataset
89    /// * `right_key` - Key field name in right dataset
90    /// * `strategy` - Join strategy (Inner, Left, Right, Full)
91    ///
92    /// # Returns
93    /// JSON array containing joined rows
94    pub fn join(
95        &self,
96        left: &Value,
97        right: &Value,
98        left_key: &str,
99        right_key: &str,
100        strategy: JoinStrategy,
101    ) -> Result<Value, RustLogicGraphError> {
102        info!(
103            "🔗 Query Correlator: Joining datasets on {}.{} = {}.{} ({:?})",
104            self.left_prefix.as_deref().unwrap_or("left"),
105            left_key,
106            self.right_prefix.as_deref().unwrap_or("right"),
107            right_key,
108            strategy
109        );
110
111        // Validate inputs are arrays
112        let left_arr = left.as_array().ok_or_else(|| {
113            RustLogicGraphError::graph_validation_error("Left dataset must be an array")
114                .with_context(ErrorContext::new().add_metadata("type", &format!("{:?}", left)))
115        })?;
116
117        let right_arr = right.as_array().ok_or_else(|| {
118            RustLogicGraphError::graph_validation_error("Right dataset must be an array")
119                .with_context(ErrorContext::new().add_metadata("type", &format!("{:?}", right)))
120        })?;
121
122        debug!(
123            "Left dataset: {} rows, Right dataset: {} rows",
124            left_arr.len(),
125            right_arr.len()
126        );
127
128        // Build index for right dataset for efficient lookup
129        let mut right_index: HashMap<String, Vec<&Value>> = HashMap::new();
130        for right_row in right_arr {
131            if let Some(key_value) = right_row.get(right_key) {
132                let key_str = self.value_to_string(key_value);
133                right_index
134                    .entry(key_str)
135                    .or_insert_with(Vec::new)
136                    .push(right_row);
137            }
138        }
139
140        let mut result = Vec::new();
141        let mut matched_right_indices = std::collections::HashSet::new();
142
143        // Process left dataset
144        for left_row in left_arr {
145            let left_obj = left_row.as_object().ok_or_else(|| {
146                RustLogicGraphError::graph_validation_error("Left row must be an object")
147            })?;
148
149            if let Some(key_value) = left_row.get(left_key) {
150                let key_str = self.value_to_string(key_value);
151
152                if let Some(matching_rights) = right_index.get(&key_str) {
153                    // Found matches - create joined rows
154                    for right_row in matching_rights {
155                        let joined = self.merge_rows(left_obj, right_row.as_object().unwrap())?;
156                        result.push(joined);
157                        matched_right_indices.insert(key_str.clone());
158                    }
159                } else if strategy == JoinStrategy::Left || strategy == JoinStrategy::Full {
160                    // No match - include left row with nulls for right columns (LEFT or FULL join)
161                    let mut joined = self.prefix_object(left_obj, &self.left_prefix);
162                    if let Some(prefix) = &self.right_prefix {
163                        // Add null columns from right dataset
164                        if let Some(sample_right) = right_arr.first().and_then(|v| v.as_object()) {
165                            for key in sample_right.keys() {
166                                joined.insert(format!("{}{}", prefix, key), Value::Null);
167                            }
168                        }
169                    }
170                    result.push(Value::Object(joined));
171                }
172            }
173        }
174
175        // Handle RIGHT or FULL join - include unmatched right rows
176        if strategy == JoinStrategy::Right || strategy == JoinStrategy::Full {
177            for right_row in right_arr {
178                if let Some(key_value) = right_row.get(right_key) {
179                    let key_str = self.value_to_string(key_value);
180                    if !matched_right_indices.contains(&key_str) {
181                        // Unmatched right row
182                        let right_obj = right_row.as_object().unwrap();
183                        let mut joined = self.prefix_object(right_obj, &self.right_prefix);
184
185                        // Add null columns from left dataset
186                        if let Some(prefix) = &self.left_prefix {
187                            if let Some(sample_left) = left_arr.first().and_then(|v| v.as_object())
188                            {
189                                for key in sample_left.keys() {
190                                    joined.insert(format!("{}{}", prefix, key), Value::Null);
191                                }
192                            }
193                        }
194                        result.push(Value::Object(joined));
195                    }
196                }
197            }
198        }
199
200        info!("✅ Query Correlator: Joined {} rows", result.len());
201        Ok(Value::Array(result))
202    }
203
204    /// Merge two JSON objects with optional prefixes
205    fn merge_rows(
206        &self,
207        left: &serde_json::Map<String, Value>,
208        right: &serde_json::Map<String, Value>,
209    ) -> Result<Value, RustLogicGraphError> {
210        let mut merged = self.prefix_object(left, &self.left_prefix);
211        let right_prefixed = self.prefix_object(right, &self.right_prefix);
212        merged.extend(right_prefixed);
213        Ok(Value::Object(merged))
214    }
215
216    /// Add prefix to all keys in an object
217    fn prefix_object(
218        &self,
219        obj: &serde_json::Map<String, Value>,
220        prefix: &Option<String>,
221    ) -> serde_json::Map<String, Value> {
222        if let Some(p) = prefix {
223            obj.iter()
224                .map(|(k, v)| (format!("{}{}", p, k), v.clone()))
225                .collect()
226        } else {
227            obj.clone()
228        }
229    }
230
231    /// Convert a JSON value to string for indexing
232    fn value_to_string(&self, value: &Value) -> String {
233        match value {
234            Value::String(s) => s.clone(),
235            Value::Number(n) => n.to_string(),
236            Value::Bool(b) => b.to_string(),
237            _ => value.to_string(),
238        }
239    }
240}
241
242impl Default for QueryCorrelator {
243    fn default() -> Self {
244        Self::new()
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use serde_json::json;
252
253    #[test]
254    fn test_inner_join() {
255        let correlator = QueryCorrelator::new();
256
257        let users = json!([
258            {"user_id": 1, "name": "Alice"},
259            {"user_id": 2, "name": "Bob"},
260        ]);
261
262        let orders = json!([
263            {"order_id": 101, "user_id": 1, "amount": 50.0},
264            {"order_id": 102, "user_id": 3, "amount": 100.0},
265        ]);
266
267        let result = correlator
268            .join(&users, &orders, "user_id", "user_id", JoinStrategy::Inner)
269            .unwrap();
270        let arr = result.as_array().unwrap();
271
272        // Only user_id=1 matches
273        assert_eq!(arr.len(), 1);
274        assert_eq!(arr[0]["name"], "Alice");
275        assert_eq!(arr[0]["order_id"], 101);
276    }
277
278    #[test]
279    fn test_left_join() {
280        let correlator = QueryCorrelator::new();
281
282        let users = json!([
283            {"user_id": 1, "name": "Alice"},
284            {"user_id": 2, "name": "Bob"},
285        ]);
286
287        let orders = json!([
288            {"order_id": 101, "user_id": 1, "amount": 50.0},
289        ]);
290
291        let result = correlator
292            .join(&users, &orders, "user_id", "user_id", JoinStrategy::Left)
293            .unwrap();
294        let arr = result.as_array().unwrap();
295
296        // Both users included (user_id=2 has null order fields)
297        assert_eq!(arr.len(), 2);
298    }
299
300    #[test]
301    fn test_with_prefixes() {
302        let correlator = QueryCorrelator::new()
303            .with_left_prefix("user_")
304            .with_right_prefix("order_");
305
306        let users = json!([{"id": 1, "name": "Alice"}]);
307        let orders = json!([{"id": 101, "user_id": 1}]);
308
309        let result = correlator
310            .join(&users, &orders, "id", "user_id", JoinStrategy::Inner)
311            .unwrap();
312        let arr = result.as_array().unwrap();
313
314        // Check prefixed column names
315        assert!(arr[0].get("user_id").is_some());
316        assert!(arr[0].get("user_name").is_some());
317        assert!(arr[0].get("order_id").is_some());
318    }
319}