rust_logic_graph/multi_db/
correlation.rs1use serde_json::Value;
2use std::collections::HashMap;
3use tracing::{debug, info};
4
5use crate::error::{ErrorContext, RustLogicGraphError};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum JoinStrategy {
10 Inner,
12 Left,
14 Right,
16 Full,
18}
19
20pub struct QueryCorrelator {
56 pub left_prefix: Option<String>,
58 pub right_prefix: Option<String>,
60}
61
62impl QueryCorrelator {
63 pub fn new() -> Self {
65 Self {
66 left_prefix: None,
67 right_prefix: None,
68 }
69 }
70
71 pub fn with_left_prefix(mut self, prefix: impl Into<String>) -> Self {
73 self.left_prefix = Some(prefix.into());
74 self
75 }
76
77 pub fn with_right_prefix(mut self, prefix: impl Into<String>) -> Self {
79 self.right_prefix = Some(prefix.into());
80 self
81 }
82
83 pub fn join(
95 &self,
96 left: &Value,
97 right: &Value,
98 left_key: &str,
99 right_key: &str,
100 strategy: JoinStrategy,
101 ) -> Result<Value, RustLogicGraphError> {
102 info!(
103 "🔗 Query Correlator: Joining datasets on {}.{} = {}.{} ({:?})",
104 self.left_prefix.as_deref().unwrap_or("left"),
105 left_key,
106 self.right_prefix.as_deref().unwrap_or("right"),
107 right_key,
108 strategy
109 );
110
111 let left_arr = left.as_array().ok_or_else(|| {
113 RustLogicGraphError::graph_validation_error("Left dataset must be an array")
114 .with_context(ErrorContext::new().add_metadata("type", &format!("{:?}", left)))
115 })?;
116
117 let right_arr = right.as_array().ok_or_else(|| {
118 RustLogicGraphError::graph_validation_error("Right dataset must be an array")
119 .with_context(ErrorContext::new().add_metadata("type", &format!("{:?}", right)))
120 })?;
121
122 debug!(
123 "Left dataset: {} rows, Right dataset: {} rows",
124 left_arr.len(),
125 right_arr.len()
126 );
127
128 let mut right_index: HashMap<String, Vec<&Value>> = HashMap::new();
130 for right_row in right_arr {
131 if let Some(key_value) = right_row.get(right_key) {
132 let key_str = self.value_to_string(key_value);
133 right_index
134 .entry(key_str)
135 .or_insert_with(Vec::new)
136 .push(right_row);
137 }
138 }
139
140 let mut result = Vec::new();
141 let mut matched_right_indices = std::collections::HashSet::new();
142
143 for left_row in left_arr {
145 let left_obj = left_row.as_object().ok_or_else(|| {
146 RustLogicGraphError::graph_validation_error("Left row must be an object")
147 })?;
148
149 if let Some(key_value) = left_row.get(left_key) {
150 let key_str = self.value_to_string(key_value);
151
152 if let Some(matching_rights) = right_index.get(&key_str) {
153 for right_row in matching_rights {
155 let joined = self.merge_rows(left_obj, right_row.as_object().unwrap())?;
156 result.push(joined);
157 matched_right_indices.insert(key_str.clone());
158 }
159 } else if strategy == JoinStrategy::Left || strategy == JoinStrategy::Full {
160 let mut joined = self.prefix_object(left_obj, &self.left_prefix);
162 if let Some(prefix) = &self.right_prefix {
163 if let Some(sample_right) = right_arr.first().and_then(|v| v.as_object()) {
165 for key in sample_right.keys() {
166 joined.insert(format!("{}{}", prefix, key), Value::Null);
167 }
168 }
169 }
170 result.push(Value::Object(joined));
171 }
172 }
173 }
174
175 if strategy == JoinStrategy::Right || strategy == JoinStrategy::Full {
177 for right_row in right_arr {
178 if let Some(key_value) = right_row.get(right_key) {
179 let key_str = self.value_to_string(key_value);
180 if !matched_right_indices.contains(&key_str) {
181 let right_obj = right_row.as_object().unwrap();
183 let mut joined = self.prefix_object(right_obj, &self.right_prefix);
184
185 if let Some(prefix) = &self.left_prefix {
187 if let Some(sample_left) = left_arr.first().and_then(|v| v.as_object())
188 {
189 for key in sample_left.keys() {
190 joined.insert(format!("{}{}", prefix, key), Value::Null);
191 }
192 }
193 }
194 result.push(Value::Object(joined));
195 }
196 }
197 }
198 }
199
200 info!("✅ Query Correlator: Joined {} rows", result.len());
201 Ok(Value::Array(result))
202 }
203
204 fn merge_rows(
206 &self,
207 left: &serde_json::Map<String, Value>,
208 right: &serde_json::Map<String, Value>,
209 ) -> Result<Value, RustLogicGraphError> {
210 let mut merged = self.prefix_object(left, &self.left_prefix);
211 let right_prefixed = self.prefix_object(right, &self.right_prefix);
212 merged.extend(right_prefixed);
213 Ok(Value::Object(merged))
214 }
215
216 fn prefix_object(
218 &self,
219 obj: &serde_json::Map<String, Value>,
220 prefix: &Option<String>,
221 ) -> serde_json::Map<String, Value> {
222 if let Some(p) = prefix {
223 obj.iter()
224 .map(|(k, v)| (format!("{}{}", p, k), v.clone()))
225 .collect()
226 } else {
227 obj.clone()
228 }
229 }
230
231 fn value_to_string(&self, value: &Value) -> String {
233 match value {
234 Value::String(s) => s.clone(),
235 Value::Number(n) => n.to_string(),
236 Value::Bool(b) => b.to_string(),
237 _ => value.to_string(),
238 }
239 }
240}
241
242impl Default for QueryCorrelator {
243 fn default() -> Self {
244 Self::new()
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use serde_json::json;
252
253 #[test]
254 fn test_inner_join() {
255 let correlator = QueryCorrelator::new();
256
257 let users = json!([
258 {"user_id": 1, "name": "Alice"},
259 {"user_id": 2, "name": "Bob"},
260 ]);
261
262 let orders = json!([
263 {"order_id": 101, "user_id": 1, "amount": 50.0},
264 {"order_id": 102, "user_id": 3, "amount": 100.0},
265 ]);
266
267 let result = correlator
268 .join(&users, &orders, "user_id", "user_id", JoinStrategy::Inner)
269 .unwrap();
270 let arr = result.as_array().unwrap();
271
272 assert_eq!(arr.len(), 1);
274 assert_eq!(arr[0]["name"], "Alice");
275 assert_eq!(arr[0]["order_id"], 101);
276 }
277
278 #[test]
279 fn test_left_join() {
280 let correlator = QueryCorrelator::new();
281
282 let users = json!([
283 {"user_id": 1, "name": "Alice"},
284 {"user_id": 2, "name": "Bob"},
285 ]);
286
287 let orders = json!([
288 {"order_id": 101, "user_id": 1, "amount": 50.0},
289 ]);
290
291 let result = correlator
292 .join(&users, &orders, "user_id", "user_id", JoinStrategy::Left)
293 .unwrap();
294 let arr = result.as_array().unwrap();
295
296 assert_eq!(arr.len(), 2);
298 }
299
300 #[test]
301 fn test_with_prefixes() {
302 let correlator = QueryCorrelator::new()
303 .with_left_prefix("user_")
304 .with_right_prefix("order_");
305
306 let users = json!([{"id": 1, "name": "Alice"}]);
307 let orders = json!([{"id": 101, "user_id": 1}]);
308
309 let result = correlator
310 .join(&users, &orders, "id", "user_id", JoinStrategy::Inner)
311 .unwrap();
312 let arr = result.as_array().unwrap();
313
314 assert!(arr[0].get("user_id").is_some());
316 assert!(arr[0].get("user_name").is_some());
317 assert!(arr[0].get("order_id").is_some());
318 }
319}