rust_logic_graph/multi_db/
correlation.rs

1use std::collections::HashMap;
2use serde_json::Value;
3use tracing::{info, debug};
4
5use crate::error::{RustLogicGraphError, ErrorContext};
6
7/// Strategy for joining query results from different databases
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum JoinStrategy {
10    /// Inner join: Only include rows that exist in both datasets
11    Inner,
12    /// Left join: Include all rows from left dataset, matching rows from right
13    Left,
14    /// Right join: Include all rows from right dataset, matching rows from left
15    Right,
16    /// Full outer join: Include all rows from both datasets
17    Full,
18}
19
20/// Query result correlator for joining data from multiple databases
21/// 
22/// Enables joining and correlating query results from different databases
23/// using common keys, similar to SQL JOINs but across distributed databases.
24/// 
25/// # Example
26/// ```no_run
27/// use rust_logic_graph::multi_db::{QueryCorrelator, JoinStrategy};
28/// use serde_json::json;
29/// 
30/// let mut correlator = QueryCorrelator::new();
31/// 
32/// // Data from OMS database
33/// let users = json!([
34///     {"user_id": 1, "name": "Alice"},
35///     {"user_id": 2, "name": "Bob"},
36/// ]);
37/// 
38/// // Data from Orders database
39/// let orders = json!([
40///     {"order_id": 101, "user_id": 1, "amount": 50.0},
41///     {"order_id": 102, "user_id": 1, "amount": 75.0},
42///     {"order_id": 103, "user_id": 3, "amount": 100.0},
43/// ]);
44/// 
45/// let result = correlator.join(
46///     &users,
47///     &orders,
48///     "user_id",
49///     "user_id",
50///     JoinStrategy::Inner
51/// ).unwrap();
52/// 
53/// println!("Joined {} rows", result.as_array().unwrap().len());
54/// ```
55pub struct QueryCorrelator {
56    /// Optional prefix for disambiguating column names from left dataset
57    pub left_prefix: Option<String>,
58    /// Optional prefix for disambiguating column names from right dataset
59    pub right_prefix: Option<String>,
60}
61
62impl QueryCorrelator {
63    /// Create a new query correlator
64    pub fn new() -> Self {
65        Self {
66            left_prefix: None,
67            right_prefix: None,
68        }
69    }
70    
71    /// Set prefix for left dataset columns (e.g., "user_")
72    pub fn with_left_prefix(mut self, prefix: impl Into<String>) -> Self {
73        self.left_prefix = Some(prefix.into());
74        self
75    }
76    
77    /// Set prefix for right dataset columns (e.g., "order_")
78    pub fn with_right_prefix(mut self, prefix: impl Into<String>) -> Self {
79        self.right_prefix = Some(prefix.into());
80        self
81    }
82    
83    /// Join two datasets on specified keys
84    /// 
85    /// # Arguments
86    /// * `left` - Left dataset (JSON array of objects)
87    /// * `right` - Right dataset (JSON array of objects)
88    /// * `left_key` - Key field name in left dataset
89    /// * `right_key` - Key field name in right dataset
90    /// * `strategy` - Join strategy (Inner, Left, Right, Full)
91    /// 
92    /// # Returns
93    /// JSON array containing joined rows
94    pub fn join(
95        &self,
96        left: &Value,
97        right: &Value,
98        left_key: &str,
99        right_key: &str,
100        strategy: JoinStrategy,
101    ) -> Result<Value, RustLogicGraphError> {
102        info!("🔗 Query Correlator: Joining datasets on {}.{} = {}.{} ({:?})",
103            self.left_prefix.as_deref().unwrap_or("left"),
104            left_key,
105            self.right_prefix.as_deref().unwrap_or("right"),
106            right_key,
107            strategy
108        );
109        
110        // Validate inputs are arrays
111        let left_arr = left.as_array().ok_or_else(|| {
112            RustLogicGraphError::graph_validation_error("Left dataset must be an array")
113                .with_context(ErrorContext::new().add_metadata("type", &format!("{:?}", left)))
114        })?;
115        
116        let right_arr = right.as_array().ok_or_else(|| {
117            RustLogicGraphError::graph_validation_error("Right dataset must be an array")
118                .with_context(ErrorContext::new().add_metadata("type", &format!("{:?}", right)))
119        })?;
120        
121        debug!("Left dataset: {} rows, Right dataset: {} rows", left_arr.len(), right_arr.len());
122        
123        // Build index for right dataset for efficient lookup
124        let mut right_index: HashMap<String, Vec<&Value>> = HashMap::new();
125        for right_row in right_arr {
126            if let Some(key_value) = right_row.get(right_key) {
127                let key_str = self.value_to_string(key_value);
128                right_index.entry(key_str).or_insert_with(Vec::new).push(right_row);
129            }
130        }
131        
132        let mut result = Vec::new();
133        let mut matched_right_indices = std::collections::HashSet::new();
134        
135        // Process left dataset
136        for left_row in left_arr {
137            let left_obj = left_row.as_object().ok_or_else(|| {
138                RustLogicGraphError::graph_validation_error("Left row must be an object")
139            })?;
140            
141            if let Some(key_value) = left_row.get(left_key) {
142                let key_str = self.value_to_string(key_value);
143                
144                if let Some(matching_rights) = right_index.get(&key_str) {
145                    // Found matches - create joined rows
146                    for right_row in matching_rights {
147                        let joined = self.merge_rows(left_obj, right_row.as_object().unwrap())?;
148                        result.push(joined);
149                        matched_right_indices.insert(key_str.clone());
150                    }
151                } else if strategy == JoinStrategy::Left || strategy == JoinStrategy::Full {
152                    // No match - include left row with nulls for right columns (LEFT or FULL join)
153                    let mut joined = self.prefix_object(left_obj, &self.left_prefix);
154                    if let Some(prefix) = &self.right_prefix {
155                        // Add null columns from right dataset
156                        if let Some(sample_right) = right_arr.first().and_then(|v| v.as_object()) {
157                            for key in sample_right.keys() {
158                                joined.insert(format!("{}{}", prefix, key), Value::Null);
159                            }
160                        }
161                    }
162                    result.push(Value::Object(joined));
163                }
164            }
165        }
166        
167        // Handle RIGHT or FULL join - include unmatched right rows
168        if strategy == JoinStrategy::Right || strategy == JoinStrategy::Full {
169            for right_row in right_arr {
170                if let Some(key_value) = right_row.get(right_key) {
171                    let key_str = self.value_to_string(key_value);
172                    if !matched_right_indices.contains(&key_str) {
173                        // Unmatched right row
174                        let right_obj = right_row.as_object().unwrap();
175                        let mut joined = self.prefix_object(right_obj, &self.right_prefix);
176                        
177                        // Add null columns from left dataset
178                        if let Some(prefix) = &self.left_prefix {
179                            if let Some(sample_left) = left_arr.first().and_then(|v| v.as_object()) {
180                                for key in sample_left.keys() {
181                                    joined.insert(format!("{}{}", prefix, key), Value::Null);
182                                }
183                            }
184                        }
185                        result.push(Value::Object(joined));
186                    }
187                }
188            }
189        }
190        
191        info!("✅ Query Correlator: Joined {} rows", result.len());
192        Ok(Value::Array(result))
193    }
194    
195    /// Merge two JSON objects with optional prefixes
196    fn merge_rows(
197        &self,
198        left: &serde_json::Map<String, Value>,
199        right: &serde_json::Map<String, Value>,
200    ) -> Result<Value, RustLogicGraphError> {
201        let mut merged = self.prefix_object(left, &self.left_prefix);
202        let right_prefixed = self.prefix_object(right, &self.right_prefix);
203        merged.extend(right_prefixed);
204        Ok(Value::Object(merged))
205    }
206    
207    /// Add prefix to all keys in an object
208    fn prefix_object(
209        &self,
210        obj: &serde_json::Map<String, Value>,
211        prefix: &Option<String>,
212    ) -> serde_json::Map<String, Value> {
213        if let Some(p) = prefix {
214            obj.iter()
215                .map(|(k, v)| (format!("{}{}", p, k), v.clone()))
216                .collect()
217        } else {
218            obj.clone()
219        }
220    }
221    
222    /// Convert a JSON value to string for indexing
223    fn value_to_string(&self, value: &Value) -> String {
224        match value {
225            Value::String(s) => s.clone(),
226            Value::Number(n) => n.to_string(),
227            Value::Bool(b) => b.to_string(),
228            _ => value.to_string(),
229        }
230    }
231}
232
233impl Default for QueryCorrelator {
234    fn default() -> Self {
235        Self::new()
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242    use serde_json::json;
243    
244    #[test]
245    fn test_inner_join() {
246        let correlator = QueryCorrelator::new();
247        
248        let users = json!([
249            {"user_id": 1, "name": "Alice"},
250            {"user_id": 2, "name": "Bob"},
251        ]);
252        
253        let orders = json!([
254            {"order_id": 101, "user_id": 1, "amount": 50.0},
255            {"order_id": 102, "user_id": 3, "amount": 100.0},
256        ]);
257        
258        let result = correlator.join(&users, &orders, "user_id", "user_id", JoinStrategy::Inner).unwrap();
259        let arr = result.as_array().unwrap();
260        
261        // Only user_id=1 matches
262        assert_eq!(arr.len(), 1);
263        assert_eq!(arr[0]["name"], "Alice");
264        assert_eq!(arr[0]["order_id"], 101);
265    }
266    
267    #[test]
268    fn test_left_join() {
269        let correlator = QueryCorrelator::new();
270        
271        let users = json!([
272            {"user_id": 1, "name": "Alice"},
273            {"user_id": 2, "name": "Bob"},
274        ]);
275        
276        let orders = json!([
277            {"order_id": 101, "user_id": 1, "amount": 50.0},
278        ]);
279        
280        let result = correlator.join(&users, &orders, "user_id", "user_id", JoinStrategy::Left).unwrap();
281        let arr = result.as_array().unwrap();
282        
283        // Both users included (user_id=2 has null order fields)
284        assert_eq!(arr.len(), 2);
285    }
286    
287    #[test]
288    fn test_with_prefixes() {
289        let correlator = QueryCorrelator::new()
290            .with_left_prefix("user_")
291            .with_right_prefix("order_");
292        
293        let users = json!([{"id": 1, "name": "Alice"}]);
294        let orders = json!([{"id": 101, "user_id": 1}]);
295        
296        let result = correlator.join(&users, &orders, "id", "user_id", JoinStrategy::Inner).unwrap();
297        let arr = result.as_array().unwrap();
298        
299        // Check prefixed column names
300        assert!(arr[0].get("user_id").is_some());
301        assert!(arr[0].get("user_name").is_some());
302        assert!(arr[0].get("order_id").is_some());
303    }
304}