teo_mongodb_connector/aggregation/
mod.rs

1use std::collections::HashSet;
2use bson::doc;
3use bson::{Bson, Document, Regex as BsonRegex};
4use indexmap::{indexmap, IndexMap};
5use teo_parser::r#type::Type;
6use teo_runtime::model::object::input::Input;
7use teo_runtime::model::{Relation, Model};
8use teo_runtime::{teon, Value};
9use teo_result::{Error, Result};
10use teo_runtime::model::field::column_named::ColumnNamed;
11use teo_runtime::model::field::is_optional::IsOptional;
12use teo_runtime::traits::named::Named;
13use teo_runtime::model::field::typed::Typed;
14use teo_runtime::namespace::Namespace;
15use crate::bson_ext::teon_value_to_bson;
16
17pub(crate) struct Aggregation { }
18
19impl Aggregation {
20
21    fn insert_group_set_unset_for_aggregate(model: &Model, group: &mut Document, set: &mut Document, unset: &mut Vec<String>, k: &str, g: &str, having_mode: bool) {
22        let prefix = if having_mode { "_having" } else { "" };
23        let dbk = if k == "_all" { "_all" } else {model.field(k).unwrap().column_name() };
24        if g == "count" {
25            if k == "_all" {
26                group.insert(format!("{prefix}_count__all"), doc!{"$count": {}});
27            } else {
28                group.insert(format!("{prefix}_count_{dbk}"), doc!{
29                "$sum": {
30                    "$cond": [{"$ifNull": [format!("${dbk}"), false]}, 1, 0]
31                }
32            });
33            }
34        } else {
35            group.insert(format!("{prefix}_{g}_{dbk}"), doc!{format!("${g}"): format!("${dbk}")});
36            if g == "sum" {
37                group.insert(format!("{prefix}_{g}_count_{dbk}"), doc!{format!("$sum"): {
38                "$cond": [
39                    {"$ifNull": [format!("${dbk}"), false]},
40                    1,
41                    0
42                ]
43            }});
44            }
45        }
46        if g == "sum" {
47            set.insert(format!("{prefix}_{g}.{k}"), doc!{
48            "$cond": {
49                "if": {
50                    "$eq": [format!("${prefix}_{g}_count_{dbk}"), 0]
51                },
52                "then": null,
53                "else": format!("${prefix}_{g}_{dbk}")
54            }
55        });
56            unset.push(format!("{prefix}_{g}_{dbk}"));
57            unset.push(format!("{prefix}_{g}_count_{dbk}"));
58        } else {
59            set.insert(format!("{prefix}_{g}.{k}"), format!("${prefix}_{g}_{dbk}"));
60            unset.push(format!("{prefix}_{g}_{dbk}"));
61        }
62    }
63
64    pub(crate) fn build_for_aggregate(namespace: &Namespace, model: &Model, value: &Value) -> Result<Vec<Document>> {
65        let mut retval = Self::build(namespace, model, value)?;
66        let by = value.get("by");
67        let having = value.get("having");
68        let mut aggregates = teon!({});
69        for k in ["_sum", "_count", "_avg", "_min", "_max"] {
70            if value.as_dictionary().unwrap().contains_key(k) {
71                aggregates[k] = value.as_dictionary().unwrap().get(k).unwrap().clone();
72            }
73        }
74        let mut group = if let Some(by) = by {
75            let mut id_for_group_by = doc!{};
76            for key in by.as_array().unwrap() {
77                let k = key.as_str().unwrap();
78                let dbk = model.field(k).unwrap().column_name();
79                id_for_group_by.insert(dbk, doc!{
80                "$cond": [{"$ifNull": [format!("${dbk}"), false]}, format!("${dbk}"), null]
81            });
82            }
83            doc!{"_id": id_for_group_by}
84        } else {
85            doc!{"_id": Bson::Null}
86        };
87        let mut set = doc!{};
88        let mut unset: Vec<String> = vec![];
89        if let Some(by) = by {
90            for key in by.as_array().unwrap() {
91                let k = key.as_str().unwrap();
92                let dbk = model.field(k).unwrap().column_name();
93                set.insert(k, format!("$_id.{dbk}"));
94            }
95        }
96        if let Some(having) = having {
97            for (k, o) in having.as_dictionary().unwrap() {
98                let _dbk = model.field(k).unwrap().column_name();
99                for (g, _matcher) in o.as_dictionary().unwrap() {
100                    let g = g.strip_prefix("_").unwrap();
101                    Self::insert_group_set_unset_for_aggregate(model, &mut group, &mut set, &mut unset, k, g, true);
102                }
103            }
104        }
105        for (g, o) in aggregates.as_dictionary().unwrap() {
106            let g = g.strip_prefix("_").unwrap();
107            for (k, _t) in o.as_dictionary().unwrap() {
108                Self::insert_group_set_unset_for_aggregate(model, &mut group, &mut set, &mut unset, k, g, false);
109            }
110        }
111        retval.push(doc!{"$group": group});
112        retval.push(doc!{"$set": set});
113        if !unset.is_empty() {
114            retval.push(doc!{"$unset": unset});
115        }
116        // filter if there is a having
117        if let Some(having) = having {
118            let mut having_match = doc!{};
119            let mut having_unset: Vec<String> = Vec::new();
120            for (k, o) in having.as_dictionary().unwrap() {
121                let dbk = model.field(k).unwrap().column_name();
122                for (g, matcher) in o.as_dictionary().unwrap() {
123                    let g = g.strip_prefix("_").unwrap();
124                    let matcher_bson = Self::build_where_item(model, &Type::Float, true, matcher)?;
125                    having_match.insert(format!("_having_{g}.{dbk}"), matcher_bson);
126                    let having_group = format!("_having_{g}");
127                    if !having_unset.contains(&having_group) {
128                        having_unset.push(having_group);
129                    }
130                }
131            }
132            retval.push(doc!{"$match": having_match});
133            retval.push(doc!{"$unset": having_unset});
134        }
135        let mut group_by_sort = doc!{};
136        if let Some(by) = by {
137            // we need to order these
138            for key in by.as_array().unwrap() {
139                let k = key.as_str().unwrap();
140                group_by_sort.insert(k, 1);
141            }
142        }
143        if !group_by_sort.is_empty() {
144            retval.push(doc!{"$sort": group_by_sort});
145        }
146        Ok(retval)
147    }
148
149    pub(crate) fn build_for_count(namespace: &Namespace, model: &Model, value: &Value) -> Result<Vec<Document>> {
150        let mut retval = Self::build(namespace, model, value)?;
151        retval.push(doc! {"$count": "count"});
152        Ok(retval)
153    }
154
155    pub(crate) fn build(namespace: &Namespace, model: &Model, value: &Value) -> Result<Vec<Document>> {
156        let mut retval: Vec<Document> = vec![];
157        let r#where = value.get("where");
158        let order_by = value.get("orderBy");
159        let distinct = value.get("distinct");
160        let skip = value.get("skip");
161        let take = value.get("take");
162        let page_size = value.get("pageSize");
163        let page_number = value.get("pageNumber");
164        let select = value.get("select");
165        let include = value.get("include");
166        // if cursor exists, we modify the actual where
167        let cursor_where_additions = if let Some(cursor) = value.get("cursor") {
168            let cursor = cursor.as_dictionary().unwrap();
169            let cursor_key = cursor.keys().next().unwrap();
170            let cursor_value = cursor.values().next().unwrap();
171            let order_by = value.get("orderBy").unwrap().as_array().unwrap().get(0).unwrap().as_dictionary().unwrap().values().next().unwrap().as_str().unwrap();
172            let mut order_asc = order_by == "asc";
173            if let Some(take) = take {
174                if take.as_int64().unwrap() < 0 {
175                    order_asc = !order_asc;
176                }
177            }
178            let cursor_where_key = if order_asc { "gte" } else { "lte" };
179            let cursor_additional_where = Self::build_where(namespace, model, &teon!({cursor_key: {cursor_where_key: cursor_value}}));
180            Some(cursor_additional_where?)
181        } else {
182            None
183        };
184        // build `$lookup`s for relation where
185        if let Some(r#where) = r#where {
186            let lookups_for_relation_where = Self::build_lookups_for_relation_where(namespace, model, r#where)?;
187            retval.extend(lookups_for_relation_where)
188        }
189        // $match
190        if let Some(r#where) = r#where {
191            let r#match = Self::build_where(namespace, model, r#where)?;
192            if !r#match.is_empty() {
193                if let Some(cursor_where_additions) = cursor_where_additions {
194                    retval.push(doc!{"$match": {"$and": [r#match, cursor_where_additions]}});
195                } else {
196                    retval.push(doc!{"$match": r#match});
197                }
198            } else {
199                if let Some(cursor_where_additions) = cursor_where_additions {
200                    retval.push(doc!{"$match": cursor_where_additions});
201                }
202            }
203        } else {
204            if let Some(cursor_where_additions) = cursor_where_additions {
205                retval.push(doc!{"$match": cursor_where_additions});
206            }
207        }
208        // remove lookup for matching here
209        if let Some(r#where) = r#where {
210            let unsets = Self::build_unsets_for_relation_where(model, r#where)?;
211            if !unsets.is_empty() {
212                retval.extend(unsets);
213            }
214        }
215        // sort without distinct. If distinct, sort later in distinct
216        if distinct.is_none() {
217            if let Some(order_by) = order_by {
218                let reverse = match take {
219                    Some(take) => take.to_int64().unwrap() < 0,
220                    None => false
221                };
222                let sort = Self::build_order_by(model, order_by, reverse)?;
223                if !sort.is_empty() {
224                    retval.push(doc!{"$sort": sort});
225                }
226            } else if let Some(take) = take {
227                if take.to_int64().unwrap() < 0 {
228                    let sort = Self::build_order_by(model, &Self::default_desc_order(model), false)?;
229                    retval.push(doc!{"$sort": sort});
230                }
231            }
232        }
233        // $skip and $limit
234        if page_size.is_some() && page_number.is_some() {
235            retval.push(doc!{"$skip": ((page_number.unwrap().to_int64().unwrap() - 1) * page_size.unwrap().to_int64().unwrap()) as i64});
236            retval.push(doc!{"$limit": page_size.unwrap().to_int64().unwrap()});
237        } else {
238            if skip.is_some() {
239                retval.push(doc!{"$skip": skip.unwrap().to_int64().unwrap()});
240            }
241            if take.is_some() {
242                retval.push(doc!{"$limit": take.unwrap().to_int64().unwrap().abs()});
243            }
244        }
245        // distinct or select
246        // distinct ($group and $project)
247        if let Some(distinct) = distinct {
248            // $group
249            let mut group_id = doc!{};
250            for value in distinct.as_array().unwrap().iter() {
251                let val = value.as_str().unwrap();
252                group_id.insert(val, format!("${val}"));
253            }
254            let _empty = teon!({});
255            let mut group_data = Self::build_select(model, select.unwrap_or(&teon!({})), Some(distinct))?;
256            group_data.insert("_id", group_id);
257            retval.push(doc!{"$group": &group_data});
258            if group_data.get("__id").is_some() {
259                retval.push(doc!{"$addFields": {"_id": "$__id"}});
260                retval.push(doc!{"$unset": "__id"});
261            } else {
262                retval.push(doc!{"$unset": "_id"});
263            }
264            // $sort again if distinct
265            let reverse = match take {
266                Some(take) => take.to_int64().unwrap() < 0,
267                None => false
268            };
269            if let Some(order_by) = order_by {
270                let sort = Self::build_order_by(model, order_by, reverse)?;
271                if !sort.is_empty() {
272                    retval.push(doc!{"$sort": sort});
273                }
274            }
275        } else {
276            // $project
277            if let Some(select) = select {
278                if !select.as_dictionary().unwrap().is_empty() {
279                    let select_input = Self::build_select(model, select, distinct)?;
280                    if !select_input.is_empty() {
281                        retval.push(doc!{"$project": select_input})
282                    }
283                }
284            }
285        }
286        // $lookup
287        if let Some(include) = include {
288            let mut lookups = Self::build_lookups(namespace, model, include)?;
289            if !lookups.is_empty() {
290                retval.append(&mut lookups);
291            }
292        }
293        Ok(retval)
294    }
295
296    fn build_select(model: &Model, select: &Value, distinct: Option<&Value>) -> Result<Document> {
297        let map = select.as_dictionary().unwrap();
298        let true_keys: Vec<&str> = map.iter().filter(|(_k, v)| v.as_bool().unwrap() == true).map(|(k, _)| k.as_str()).collect();
299        let false_keys: Vec<&str> = map.iter().filter(|(_k, v)| v.as_bool().unwrap() == false).map(|(k, _)| k.as_str()).collect();
300        let primary_field_names = model.primary_index().unwrap().keys();
301        let mut keys: HashSet<String> = HashSet::new();
302        let save_unmentioned_keys = true_keys.is_empty();
303        model.cache().all_keys.iter().for_each(|k| {
304            let save = primary_field_names.contains(k) || (!false_keys.contains(&k.as_str()) && (true_keys.contains(&k.as_str()) || save_unmentioned_keys));
305            if save {
306                if let Some(field) = model.field(k) {
307                    let column_name = field.column_name();
308                    keys.insert(column_name.to_string());
309                } else if let Some(property) = model.property(k) {
310                    for d in property.dependencies() {
311                        let column_name = model.field(d).unwrap().name();
312                        keys.insert(column_name.to_string());
313                    }
314                }
315            }
316        });
317        let mut result = doc!{};
318        for key in keys.iter() {
319            if distinct.is_some() {
320                result.insert(Self::distinct_key(key), doc!{"$first": format!("${key}")});
321            } else {
322                result.insert(key, 1);
323            }
324        }
325        if result.get("_id").is_none() {
326            result.insert("_id", 0);
327        }
328        Ok(result)
329    }
330
331    fn build_order_by(model: &Model, order_by: &Value, reverse: bool) -> Result<Document> {
332        let mut retval = doc!{};
333        for sort in order_by.as_array().unwrap().iter() {
334            let (key, value) = Input::key_value(sort.as_dictionary().unwrap());
335            let key = model.field(key).unwrap().column_name();
336            if value.is_string() {
337                let str_val = value.as_str().unwrap();
338                if str_val == "asc" {
339                    retval.insert(key, if reverse { -1 } else { 1 });
340                } else if str_val == "desc" {
341                    retval.insert(key, if reverse { 1 } else { -1 });
342                }
343            }
344        }
345        Ok(retval)
346    }
347
348    fn build_where(namespace: &Namespace, model: &Model, value: &Value) -> Result<Document> {
349        let value_map = value.as_dictionary().unwrap();
350        let mut retval = doc!{};
351        for (key, value) in value_map.iter() {
352            let key = key.as_str();
353            match key {
354                "AND" => {
355                    let mut vals: Vec<Document> = vec![];
356                    for val in value.as_array().unwrap() {
357                        vals.push(Self::build_where(namespace, model, val)?);
358                    }
359                    retval.insert("$and", vals);
360                }
361                "OR" => {
362                    let mut vals: Vec<Document> = vec![];
363                    for val in value.as_array().unwrap() {
364                        vals.push(Self::build_where(namespace, model, val)?);
365                    }
366                    retval.insert("$or", vals);
367                }
368                "NOT" => {
369                    retval.insert("$nor", vec![Self::build_where(namespace, model, value)?]);
370                }
371                _ => {
372                    if let Some(field) = model.field(key) {
373                        let column_name = field.column_name();
374                        retval.insert(column_name, Self::build_where_item(model, field.r#type(), field.is_optional(), value)?);
375                    } else if let Some(relation) = model.relation(key) {
376                        let relation_model = namespace.model_at_path(&relation.model_path()).unwrap();
377                        let (command, inner_where) = Input::key_value(value.as_dictionary().unwrap());
378                        let _inner_where = Self::build_where(namespace, relation_model, inner_where)?;
379                        match command {
380                            "none" | "isNot" => {
381                                retval.insert(key, doc!{"$size": 0});
382                            }
383                            "some" | "is" => {
384                                retval.insert(key, doc!{"$size": 1});
385                            }
386                            "all" => {
387                                retval.insert(key, doc!{"$size": 0});
388                            }
389                            _ => {}
390                        }
391                    }
392                }
393            }
394        }
395        Ok(retval)
396    }
397
398    fn build_where_item(_model: &Model, _type: &Type, _optional: bool, value: &Value) -> Result<Bson> {
399        if let Some(map) = value.as_dictionary() {
400            Ok(Bson::Document(map.iter().filter(|(k, _)| k.as_str() != "mode").map(|(k, v)| {
401                let k = k.as_str();
402                match k {
403                    "startsWith" => {
404                        let bson_regex = BsonRegex {
405                            pattern: "^".to_string() + &*regex::escape(v.as_str().unwrap()),
406                            options: if Input::has_i_mode(map) { "i".to_string() } else { "".to_string() }
407                        };
408                        let regex = Bson::RegularExpression(bson_regex);
409                        ("$regex".to_string(), regex)
410                    },
411                    "endsWith" => {
412                        let bson_regex = BsonRegex {
413                            pattern: regex::escape(v.as_str().unwrap()) + "$",
414                            options: if Input::has_i_mode(map) { "i".to_string() } else { "".to_string() }
415                        };
416                        let regex = Bson::RegularExpression(bson_regex);
417                        ("$regex".to_string(), regex)
418                    },
419                    "contains" => {
420                        let bson_regex = BsonRegex {
421                            pattern: regex::escape(v.as_str().unwrap()),
422                            options: if Input::has_i_mode(map) { "i".to_string() } else { "".to_string() }
423                        };
424                        let regex = Bson::RegularExpression(bson_regex);
425                        ("$regex".to_string(), regex)
426                    },
427                    "matches" => {
428                        let bson_regex = BsonRegex {
429                            pattern: v.as_str().unwrap().to_string(),
430                            options: if Input::has_i_mode(map) { "i".to_string() } else { "".to_string() }
431                        };
432                        let regex = Bson::RegularExpression(bson_regex);
433                        ("$regex".to_string(), regex)
434                    },
435                    "isEmpty" => {
436                        ("$size".to_string(), Bson::from(0))
437                    },
438                    _ => (Self::build_where_key(k).as_str().unwrap().to_string(), teon_value_to_bson(v))
439                }
440            }).collect()))
441        } else {
442            Ok(teon_value_to_bson(value))
443        }
444    }
445
446    fn build_where_key(key: &str) -> Bson {
447        Bson::String(match key {
448            "equals" => "$eq",
449            "not" => "$ne",
450            "gt" => "$gt",
451            "gte" => "$gte",
452            "lt" => "$lt",
453            "lte" => "$lte",
454            "in" => "$in",
455            "notIn" => "$nin",
456            "has" => "$elemMatch",
457            "hasEvery" => "$all",
458            "hasSome" => "$in",
459            "length" => "$size",
460            _ => panic!("Unhandled key.")
461        }.to_owned())
462    }
463
464    fn build_lookups(namespace: &Namespace, model: &Model, include: &Value) -> Result<Vec<Document>> {
465        let include = include.as_dictionary().unwrap();
466        let mut retval: Vec<Document> = vec![];
467        for (key, value) in include {
468            let relation = model.relation(key).unwrap();
469            if (value.is_bool() && (value.as_bool().unwrap() == true)) || (value.is_dictionary()) {
470                if relation.has_join_table() {
471                    retval.extend(Self::build_lookup_with_join_table(namespace, model, key, relation, value)?)
472                } else {
473                    retval.extend(Self::build_lookup_without_join_table(namespace, model, key, relation, value)?)
474                }
475            }
476        }
477        Ok(retval)
478    }
479
480    fn build_lookup_with_join_table(namespace: &Namespace, model: &Model, _key: &str, relation: &Relation, value: &Value) -> Result<Vec<Document>> {
481        let mut retval = vec![];
482        let join_model = namespace.model_at_path(&relation.through_path().unwrap()).unwrap();
483        let local_relation_on_join_table = join_model.relation(relation.local().unwrap()).unwrap();
484        let foreign_relation_on_join_table = join_model.relation(relation.foreign().unwrap()).unwrap();
485        let _foreign_model_name = foreign_relation_on_join_table.model_path();
486        let (opposite_model, _opposite_relation) = namespace.opposite_relation(relation);
487        let mut outer_let_value = doc! {};
488        let mut outer_eq_values: Vec<Document> = vec![];
489        let mut inner_let_value = doc! {};
490        let mut inner_eq_values: Vec<Document> = vec![];
491        for (jt_field, local_field) in local_relation_on_join_table.iter() {
492            let jt_column_name = join_model.field(jt_field).unwrap().column_name();
493            let local_column_name = model.field(local_field).unwrap().column_name();
494            outer_let_value.insert(jt_column_name, format!("${local_column_name}"));
495            outer_eq_values.push(doc! {"$eq": [format!("${jt_column_name}"), format!("$${jt_column_name}")]});
496        }
497        for (jt_field, foreign_field) in foreign_relation_on_join_table.iter() {
498            let jt_column_name = join_model.field(jt_field).unwrap().column_name();
499            let foreign_column_name = model.field(foreign_field).unwrap().column_name();
500            inner_let_value.insert(jt_column_name, format!("${jt_column_name}"));
501            inner_eq_values.push(doc! {"$eq": [format!("${foreign_column_name}"), format!("$${jt_column_name}")]});
502        }
503        let mut original_inner_pipeline = if value.is_dictionary() {
504            Self::build(namespace, opposite_model, value)?
505        } else {
506            vec![]
507        };
508        let inner_is_reversed = Input::has_negative_take(value);
509        let original_inner_pipeline_immu = original_inner_pipeline.clone();
510        let mut inner_match = doc! {
511            "$expr": {
512                "$and": inner_eq_values
513            }
514        };
515        let original_inner_match = original_inner_pipeline.iter().find(|v| {
516            v.get("$match").is_some()
517        });
518        if original_inner_match.is_some() {
519            let original_inner_match = original_inner_match.unwrap();
520            let doc = original_inner_match.get_document("$match").unwrap();
521            for (k, v) in doc.iter() {
522                inner_match.insert(k, v);
523            }
524        }
525        let index = original_inner_pipeline.iter().position(|v| {
526            v.get("$match").is_some()
527        });
528        if index.is_some() {
529            original_inner_pipeline.remove(index.unwrap());
530            original_inner_pipeline.insert(index.unwrap(), doc! {"$match": inner_match});
531        } else {
532            original_inner_pipeline.insert(0, doc! {"$match": inner_match});
533        }
534        // group addfields unset for distinct
535        let original_inner_group = original_inner_pipeline_immu.iter().find(|v| {
536            v.get("$group").is_some()
537        });
538        let index = original_inner_pipeline.iter().position(|v| {
539            v.get("$group").is_some()
540        });
541        if index.is_some() {
542            original_inner_pipeline.remove(index.unwrap());
543        }
544        let original_inner_add_fields = original_inner_pipeline_immu.iter().find(|v| {
545            v.get("$addFields").is_some()
546        });
547        let index = original_inner_pipeline.iter().position(|v| {
548            v.get("$addFields").is_some()
549        });
550        if index.is_some() {
551            original_inner_pipeline.remove(index.unwrap());
552        }
553        let original_inner_unset = original_inner_pipeline_immu.iter().find(|v| {
554            v.get("$unset").is_some()
555        });
556        let index = original_inner_pipeline.iter().position(|v| {
557            v.get("$unset").is_some()
558        });
559        if index.is_some() {
560            original_inner_pipeline.remove(index.unwrap());
561        }
562        let original_inner_sort = original_inner_pipeline_immu.iter().find(|v| {
563            v.get("$sort").is_some()
564        });
565        let index = original_inner_pipeline.iter().position(|v| {
566            v.get("$sort").is_some()
567        });
568        if index.is_some() {
569            original_inner_pipeline.remove(index.unwrap());
570        }
571        let original_inner_skip = original_inner_pipeline_immu.iter().find(|v| {
572            v.get("$skip").is_some()
573        });
574        let index = original_inner_pipeline.iter().position(|v| {
575            v.get("$skip").is_some()
576        });
577        if index.is_some() {
578            original_inner_pipeline.remove(index.unwrap());
579        }
580        let original_inner_limit = original_inner_pipeline_immu.iter().find(|v| {
581            v.get("$limit").is_some()
582        });
583        let index = original_inner_pipeline.iter().position(|v| {
584            v.get("$limit").is_some()
585        });
586        if index.is_some() {
587            original_inner_pipeline.remove(index.unwrap());
588        }
589        let mut target = doc! {
590            "$lookup": {
591                "from": join_model.table_name(),
592                "as": relation.name(),
593                "let": outer_let_value,
594                "pipeline": [{
595                    "$match": {
596                        "$expr": {
597                            "$and": outer_eq_values
598                        }
599                    }
600                }, {
601                    "$lookup": {
602                        "from": opposite_model.table_name(),
603                        "as": relation.name(),
604                        "let": inner_let_value,
605                        "pipeline": original_inner_pipeline
606                    }
607                }, {
608                    "$unwind": {
609                        "path": format!("${}", relation.name())
610                    }
611                }, {
612                    "$replaceRoot": {
613                        "newRoot": format!("${}", relation.name())
614                    }
615                }]
616            }
617        };
618        if original_inner_group.is_some() {
619            let original_inner_group = original_inner_group.unwrap();
620            target.get_document_mut("$lookup").unwrap().get_array_mut("pipeline").unwrap().push(Bson::Document(original_inner_group.clone()));
621        }
622        if original_inner_add_fields.is_some() {
623            let original_inner_add_fields = original_inner_add_fields.unwrap();
624            target.get_document_mut("$lookup").unwrap().get_array_mut("pipeline").unwrap().push(Bson::Document(original_inner_add_fields.clone()));
625        }
626        if original_inner_unset.is_some() {
627            let original_inner_unset = original_inner_unset.unwrap();
628            target.get_document_mut("$lookup").unwrap().get_array_mut("pipeline").unwrap().push(Bson::Document(original_inner_unset.clone()));
629        }
630        if original_inner_sort.is_some() {
631            let original_inner_sort = original_inner_sort.unwrap();
632            target.get_document_mut("$lookup").unwrap().get_array_mut("pipeline").unwrap().push(Bson::Document(original_inner_sort.clone()));
633        }
634        if original_inner_skip.is_some() {
635            let original_inner_skip = original_inner_skip.unwrap();
636            target.get_document_mut("$lookup").unwrap().get_array_mut("pipeline").unwrap().push(Bson::Document(original_inner_skip.clone()));
637        }
638        if original_inner_limit.is_some() {
639            let original_inner_limit = original_inner_limit.unwrap();
640            target.get_document_mut("$lookup").unwrap().get_array_mut("pipeline").unwrap().push(Bson::Document(original_inner_limit.clone()));
641        }
642        retval.push(target);
643        if inner_is_reversed {
644            retval.push(doc! {"$set": {relation.name(): {"$reverseArray": format!("${}", relation.name())}}});
645        }
646        Ok(retval)
647    }
648
649    fn build_lookup_without_join_table(namespace: &Namespace, model: &Model, key: &str, relation: &Relation, value: &Value) -> Result<Vec<Document>> {
650        let mut retval = vec![];
651        let mut let_value = doc!{};
652        let mut eq_values: Vec<Document> = vec![];
653        let (opposite_model, _opposite_relation) = namespace.opposite_relation(relation);
654        for (field, reference) in relation.iter() {
655            let _field_name = model.field(field).unwrap().name();
656            let field_column_name = model.field(field).unwrap().column_name();
657            let reference_name = opposite_model.field(reference).unwrap().name();
658            let reference_column_name = opposite_model.field(reference).unwrap().column_name();
659            let_value.insert(reference_name, format!("${field_column_name}"));
660            eq_values.push(doc!{"$eq": [format!("${reference_column_name}"), format!("$${reference_name}")]});
661        }
662        let mut inner_pipeline = if value.is_dictionary() {
663            Self::build(namespace, opposite_model, value)?
664        } else {
665            vec![]
666        };
667        let inner_is_reversed = Input::has_negative_take(value);
668        let inner_match = inner_pipeline.iter().find(|v| v.get("$match").is_some());
669        let has_inner_match = inner_match.is_some();
670        let mut inner_match = if has_inner_match {
671            inner_match.unwrap().clone()
672        } else {
673            doc!{"$match": {}}
674        };
675        let inner_match_inner = inner_match.get_mut("$match").unwrap().as_document_mut().unwrap();
676        if inner_match_inner.get("$expr").is_none() {
677            inner_match_inner.insert("$expr", doc!{});
678        }
679        if inner_match_inner.get("$expr").unwrap().as_document().unwrap().get("$and").is_none() {
680            inner_match_inner.get_mut("$expr").unwrap().as_document_mut().unwrap().insert("$and", vec![] as Vec<Document>);
681        }
682        inner_match_inner.get_mut("$expr").unwrap().as_document_mut().unwrap().get_mut("$and").unwrap().as_array_mut().unwrap().extend(eq_values.iter().map(|item| Bson::Document(item.clone())));
683        if has_inner_match {
684            let index = inner_pipeline.iter().position(|v| v.get("$match").is_some()).unwrap();
685            inner_pipeline.remove(index);
686            inner_pipeline.insert(index, inner_match);
687        } else {
688            inner_pipeline.insert(0, inner_match);
689        }
690        let lookup = doc!{
691            "$lookup": {
692                "from": opposite_model.table_name(),
693                "as": key,
694                "let": let_value,
695                "pipeline": inner_pipeline
696            }
697        };
698        retval.push(lookup);
699        if inner_is_reversed {
700            retval.push(doc!{"$set": {relation.name(): {"$reverseArray": format!("${}", relation.name())}}});
701        }
702        Ok(retval)
703    }
704
705    fn build_unsets_for_relation_where(model: &Model, r#where: &Value) -> Result<Vec<Document>> {
706        let r#where = r#where.as_dictionary().unwrap();
707        let mut retval: Vec<Document> = vec![];
708        for (key, _) in r#where.iter() {
709            if let Some(_) = model.relation(key) {
710                retval.push(doc!{"$unset": key})
711            }
712        }
713        Ok(retval)
714    }
715
716    fn build_lookups_for_relation_where(namespace: &Namespace, model: &Model, r#where: &Value) -> Result<Vec<Document>> {
717        let r#where = r#where.as_dictionary().unwrap();
718        let mut include_input = IndexMap::new();
719        for (key, value) in r#where.iter() {
720            let relation = model.relation(key);
721            if relation.is_some() {
722                let (command, r_where) = Input::key_value(value.as_dictionary().unwrap());
723                match command {
724                    "some" | "is" => {
725                        include_input.insert(key.to_string(), teon!({
726                        "where": r_where,
727                        "take": 1
728                    }));
729                    }
730                    "none" | "isNot" => {
731                        include_input.insert(key.to_string(), teon!({
732                        "where": r_where,
733                        "take": 1
734                    }));
735                    }
736                    "all" => {
737                        include_input.insert(key.to_string(), teon!({
738                        "where": {"NOT": r_where},
739                        "take": 1
740                    }));
741                    }
742                    _ => {}
743                }
744            }
745        }
746        Ok(if !include_input.is_empty() {
747            Self::build_lookups(namespace, model, &Value::Dictionary(include_input))?
748        } else {
749            vec![]
750        })
751    }
752
753    fn distinct_key(original: impl AsRef<str>) -> String {
754        if original.as_ref() == "_id" {
755            "__id".to_string()
756        } else {
757            original.as_ref().to_string()
758        }
759    }
760
761    fn default_desc_order(model: &Model) -> Value {
762        let mut vec: Vec<Value> = vec![];
763        for item in model.primary_index().unwrap().items() {
764            vec.push(Value::Dictionary(indexmap!{item.field.clone() => Value::String("desc".to_string())}));
765        }
766        Value::Array(vec)
767    }
768}