rust_logic_graph/multi_db/
correlation.rs1use std::collections::HashMap;
2use serde_json::Value;
3use tracing::{info, debug};
4
5use crate::error::{RustLogicGraphError, ErrorContext};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum JoinStrategy {
10 Inner,
12 Left,
14 Right,
16 Full,
18}
19
20pub struct QueryCorrelator {
56 pub left_prefix: Option<String>,
58 pub right_prefix: Option<String>,
60}
61
62impl QueryCorrelator {
63 pub fn new() -> Self {
65 Self {
66 left_prefix: None,
67 right_prefix: None,
68 }
69 }
70
71 pub fn with_left_prefix(mut self, prefix: impl Into<String>) -> Self {
73 self.left_prefix = Some(prefix.into());
74 self
75 }
76
77 pub fn with_right_prefix(mut self, prefix: impl Into<String>) -> Self {
79 self.right_prefix = Some(prefix.into());
80 self
81 }
82
83 pub fn join(
95 &self,
96 left: &Value,
97 right: &Value,
98 left_key: &str,
99 right_key: &str,
100 strategy: JoinStrategy,
101 ) -> Result<Value, RustLogicGraphError> {
102 info!("🔗 Query Correlator: Joining datasets on {}.{} = {}.{} ({:?})",
103 self.left_prefix.as_deref().unwrap_or("left"),
104 left_key,
105 self.right_prefix.as_deref().unwrap_or("right"),
106 right_key,
107 strategy
108 );
109
110 let left_arr = left.as_array().ok_or_else(|| {
112 RustLogicGraphError::graph_validation_error("Left dataset must be an array")
113 .with_context(ErrorContext::new().add_metadata("type", &format!("{:?}", left)))
114 })?;
115
116 let right_arr = right.as_array().ok_or_else(|| {
117 RustLogicGraphError::graph_validation_error("Right dataset must be an array")
118 .with_context(ErrorContext::new().add_metadata("type", &format!("{:?}", right)))
119 })?;
120
121 debug!("Left dataset: {} rows, Right dataset: {} rows", left_arr.len(), right_arr.len());
122
123 let mut right_index: HashMap<String, Vec<&Value>> = HashMap::new();
125 for right_row in right_arr {
126 if let Some(key_value) = right_row.get(right_key) {
127 let key_str = self.value_to_string(key_value);
128 right_index.entry(key_str).or_insert_with(Vec::new).push(right_row);
129 }
130 }
131
132 let mut result = Vec::new();
133 let mut matched_right_indices = std::collections::HashSet::new();
134
135 for left_row in left_arr {
137 let left_obj = left_row.as_object().ok_or_else(|| {
138 RustLogicGraphError::graph_validation_error("Left row must be an object")
139 })?;
140
141 if let Some(key_value) = left_row.get(left_key) {
142 let key_str = self.value_to_string(key_value);
143
144 if let Some(matching_rights) = right_index.get(&key_str) {
145 for right_row in matching_rights {
147 let joined = self.merge_rows(left_obj, right_row.as_object().unwrap())?;
148 result.push(joined);
149 matched_right_indices.insert(key_str.clone());
150 }
151 } else if strategy == JoinStrategy::Left || strategy == JoinStrategy::Full {
152 let mut joined = self.prefix_object(left_obj, &self.left_prefix);
154 if let Some(prefix) = &self.right_prefix {
155 if let Some(sample_right) = right_arr.first().and_then(|v| v.as_object()) {
157 for key in sample_right.keys() {
158 joined.insert(format!("{}{}", prefix, key), Value::Null);
159 }
160 }
161 }
162 result.push(Value::Object(joined));
163 }
164 }
165 }
166
167 if strategy == JoinStrategy::Right || strategy == JoinStrategy::Full {
169 for right_row in right_arr {
170 if let Some(key_value) = right_row.get(right_key) {
171 let key_str = self.value_to_string(key_value);
172 if !matched_right_indices.contains(&key_str) {
173 let right_obj = right_row.as_object().unwrap();
175 let mut joined = self.prefix_object(right_obj, &self.right_prefix);
176
177 if let Some(prefix) = &self.left_prefix {
179 if let Some(sample_left) = left_arr.first().and_then(|v| v.as_object()) {
180 for key in sample_left.keys() {
181 joined.insert(format!("{}{}", prefix, key), Value::Null);
182 }
183 }
184 }
185 result.push(Value::Object(joined));
186 }
187 }
188 }
189 }
190
191 info!("✅ Query Correlator: Joined {} rows", result.len());
192 Ok(Value::Array(result))
193 }
194
195 fn merge_rows(
197 &self,
198 left: &serde_json::Map<String, Value>,
199 right: &serde_json::Map<String, Value>,
200 ) -> Result<Value, RustLogicGraphError> {
201 let mut merged = self.prefix_object(left, &self.left_prefix);
202 let right_prefixed = self.prefix_object(right, &self.right_prefix);
203 merged.extend(right_prefixed);
204 Ok(Value::Object(merged))
205 }
206
207 fn prefix_object(
209 &self,
210 obj: &serde_json::Map<String, Value>,
211 prefix: &Option<String>,
212 ) -> serde_json::Map<String, Value> {
213 if let Some(p) = prefix {
214 obj.iter()
215 .map(|(k, v)| (format!("{}{}", p, k), v.clone()))
216 .collect()
217 } else {
218 obj.clone()
219 }
220 }
221
222 fn value_to_string(&self, value: &Value) -> String {
224 match value {
225 Value::String(s) => s.clone(),
226 Value::Number(n) => n.to_string(),
227 Value::Bool(b) => b.to_string(),
228 _ => value.to_string(),
229 }
230 }
231}
232
233impl Default for QueryCorrelator {
234 fn default() -> Self {
235 Self::new()
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242 use serde_json::json;
243
244 #[test]
245 fn test_inner_join() {
246 let correlator = QueryCorrelator::new();
247
248 let users = json!([
249 {"user_id": 1, "name": "Alice"},
250 {"user_id": 2, "name": "Bob"},
251 ]);
252
253 let orders = json!([
254 {"order_id": 101, "user_id": 1, "amount": 50.0},
255 {"order_id": 102, "user_id": 3, "amount": 100.0},
256 ]);
257
258 let result = correlator.join(&users, &orders, "user_id", "user_id", JoinStrategy::Inner).unwrap();
259 let arr = result.as_array().unwrap();
260
261 assert_eq!(arr.len(), 1);
263 assert_eq!(arr[0]["name"], "Alice");
264 assert_eq!(arr[0]["order_id"], 101);
265 }
266
267 #[test]
268 fn test_left_join() {
269 let correlator = QueryCorrelator::new();
270
271 let users = json!([
272 {"user_id": 1, "name": "Alice"},
273 {"user_id": 2, "name": "Bob"},
274 ]);
275
276 let orders = json!([
277 {"order_id": 101, "user_id": 1, "amount": 50.0},
278 ]);
279
280 let result = correlator.join(&users, &orders, "user_id", "user_id", JoinStrategy::Left).unwrap();
281 let arr = result.as_array().unwrap();
282
283 assert_eq!(arr.len(), 2);
285 }
286
287 #[test]
288 fn test_with_prefixes() {
289 let correlator = QueryCorrelator::new()
290 .with_left_prefix("user_")
291 .with_right_prefix("order_");
292
293 let users = json!([{"id": 1, "name": "Alice"}]);
294 let orders = json!([{"id": 101, "user_id": 1}]);
295
296 let result = correlator.join(&users, &orders, "id", "user_id", JoinStrategy::Inner).unwrap();
297 let arr = result.as_array().unwrap();
298
299 assert!(arr[0].get("user_id").is_some());
301 assert!(arr[0].get("user_name").is_some());
302 assert!(arr[0].get("order_id").is_some());
303 }
304}