subgraph/data_sources/mongo/
mod.rs

1use async_graphql::dynamic::FieldValue;
2use bson::{doc, to_document, Document};
3use log::{debug, error, trace, warn};
4use mongodb::{options::ClientOptions, Client, Database};
5
6use crate::{
7    configuration::subgraph::{
8        data_sources::mongo::MongoDataSourceConfig,
9        entities::{service_entity_field::ServiceEntityFieldConfig, ServiceEntityConfig},
10        SubGraphConfig,
11    },
12    filter_operator::FilterOperator,
13    graphql::{
14        entity::create_return_types::{ResolverResponse, ResolverResponseMeta},
15        schema::create_options_input::{DirectionEnum, OptionsInput},
16    },
17    resolver_type::ResolverType,
18    scalar_option::{to_mongo::MongoValue, ScalarOption},
19};
20
21use super::DataSource;
22
23pub mod services;
24
25#[derive(Debug, Clone)]
26pub struct MongoDataSource {
27    pub client: Client,
28    pub db: Database,
29    pub config: MongoDataSourceConfig,
30}
31
32#[derive(Debug, Clone)]
33pub struct EagerLoadOptions {
34    pub from: String,
35    pub local_field: String,
36    pub foreign_field: String,
37    pub as_field: String,
38}
39
40impl MongoDataSource {
41    pub async fn init(mongo_data_source_config: &MongoDataSourceConfig) -> DataSource {
42        debug!("Initializing Mongo");
43        let client_options = ClientOptions::parse(&mongo_data_source_config.uri)
44            .await
45            .expect("Failed to parse mongo client options.");
46
47        let client = Client::with_options(client_options).expect("Failed to create client");
48        let db = client.database(&mongo_data_source_config.db);
49
50        debug!("Created Mongo Data Source");
51        debug!("{:?}", client);
52        debug!("{:?}", db);
53
54        DataSource::Mongo(MongoDataSource {
55            client,
56            db,
57            config: mongo_data_source_config.clone(),
58        })
59    }
60
61    /// Recursively convert all string object ids to object ids.
62    /// Uses field definitions to determine if a field is an object id.
63    pub fn convert_object_id_string_to_object_id_from_doc(
64        filter: Document,
65        entity: &ServiceEntityConfig,
66        subgraph_config: &SubGraphConfig,
67        resolver_type: &ResolverType,
68        key: Option<String>, // Provide a key to keep track of nested fields.
69    ) -> Result<(Document, Vec<EagerLoadOptions>), async_graphql::Error> {
70        debug!("Serialize String Object IDs to Object IDs");
71        trace!("Filter: {:?}", filter);
72
73        let mut converted = filter.clone();
74        let mut combined_eager_options = vec![];
75
76        for (k, value) in filter.iter() {
77            trace!(
78                "Current Key: {:?}, Processing Key: {}, Value: {}",
79                key.clone(),
80                k,
81                value
82            );
83
84            // If it is a servie defined field, iterate through the fields to find the correct field.
85            if k == "query"
86                || k == "values"
87                || FilterOperator::list()
88                    .iter()
89                    .map(|x| x.as_str())
90                    .any(|x| x == k)
91            {
92                let document = match value.as_document() {
93                    Some(document) => document,
94                    None => {
95                        error!("Failed to get document from value");
96                        return Err(async_graphql::Error::from(
97                            "Failed to get document from value",
98                        ));
99                    }
100                };
101                // Send through recursive function to convert the object id string to object id
102                let (nested_converted, nested_eager_load_options) =
103                    match MongoDataSource::convert_object_id_string_to_object_id_from_doc(
104                        document.clone(),
105                        entity,
106                        subgraph_config,
107                        resolver_type,
108                        key.clone(),
109                    ) {
110                        Ok(nested) => nested,
111                        Err(e) => {
112                            error!(
113                                "Failed to convert object id string to object id. Error: {:?}",
114                                e
115                            );
116                            return Err(e);
117                        }
118                    };
119                converted.insert(k.clone(), nested_converted);
120                combined_eager_options.extend(nested_eager_load_options);
121                continue;
122            }
123
124            let fields = match ServiceEntityConfig::get_fields_recursive(entity, &k) {
125                Ok(fields) => fields,
126                Err(_) => {
127                    continue;
128                }
129            };
130
131            // Since searching by a single key above, the last field is guaranteed to be the field we are looking for.
132            if let Some(field) = fields.last() {
133                // Certain scalars need to be converted to mongo types.
134                // If they do, replace them in the doc.
135                if !field.eager.unwrap_or(false)
136                    || resolver_type == &ResolverType::CreateOne
137                    || resolver_type == &ResolverType::UpdateOne
138                    || resolver_type == &ResolverType::UpdateMany
139                {
140                    match field.scalar.bson_to_mongo_value(value) {
141                        Ok(mongo_value) => {
142                            if mongo_value.is_some() {
143                                match mongo_value.unwrap() {
144                                    MongoValue::ObjectID(object_id) => {
145                                        //update the cooresponding value in converted
146                                        converted.insert(k.clone(), object_id);
147                                    }
148                                    MongoValue::DateTime(date_time) => {
149                                        //update the cooresponding value in converted
150                                        converted.insert(k.clone(), date_time);
151                                    }
152                                    _ => {}
153                                }
154                            }
155                        }
156                        Err(_) => {}
157                    };
158                }
159
160                // Handle object types and eager loaded fields
161                match field.scalar {
162                    ScalarOption::Object => {
163                        trace!("Found Object Scalar");
164                        trace!("Current Key: {:?}", key.clone());
165                        let separator = if key.is_none() { "" } else { "." };
166                        let separated = format!("{}{}", separator, k);
167                        let key = Some(separated);
168                        let (nested_converted, nested_eager_load_options) =
169                            match MongoDataSource::convert_object_id_string_to_object_id_from_doc(
170                                value.as_document().unwrap().clone(),
171                                entity,
172                                subgraph_config,
173                                resolver_type,
174                                key.clone(),
175                            ) {
176                                Ok(nested) => nested,
177                                Err(e) => {
178                                    error!(
179                                        "Failed to convert object id string to object id. Error: {:?}",
180                                        e
181                                    );
182                                    return Err(e);
183                                }
184                            };
185                        trace!("Nested Converted: {:?}", nested_converted);
186                        trace!("Inserting Key: {:?}", key);
187                        converted.insert(key.as_ref().unwrap().clone(), nested_converted);
188                        combined_eager_options.extend(nested_eager_load_options);
189                    }
190                    _ => (),
191                }
192
193                // If the resolver type is not find one or find many, we don't need to handle eager loaded fields.
194                if resolver_type != &ResolverType::FindOne
195                    && resolver_type != &ResolverType::FindMany
196                {
197                    continue;
198                }
199
200                // Handle eager loaded fields
201                let eager_load_options =
202                    match MongoDataSource::handle_eager_fields(field, entity, subgraph_config) {
203                        Ok(eager_load_options) => eager_load_options,
204                        Err(e) => {
205                            error!("Failed to handle eager fields. Error: {:?}", e);
206                            return Err(e);
207                        }
208                    };
209
210                if let Some((eager_load_option, eager_entity)) = eager_load_options {
211                    // Send through recursive function to convert the object id string to object id
212                    // for eager loaded fields.
213                    let (value, nested_eager_opts) =
214                        match MongoDataSource::convert_object_id_string_to_object_id_from_doc(
215                            value.as_document().unwrap().clone(), // If eager, this will always be a document.
216                            &eager_entity,
217                            subgraph_config,
218                            resolver_type,
219                            key.clone(),
220                        ) {
221                            Ok(nested) => nested,
222                            Err(e) => {
223                                error!(
224                                    "Failed to convert object id string to object id. Error: {:?}",
225                                    e
226                                );
227                                return Err(e);
228                            }
229                        };
230                    combined_eager_options.extend(nested_eager_opts);
231
232                    // replace the key with the eager load key.
233                    converted.remove(&k);
234                    converted.insert(eager_load_option.as_field.clone(), value);
235                    combined_eager_options.push(eager_load_option);
236                }
237            }
238        }
239
240        trace!("Converted: {:?}", converted);
241        Ok((converted, combined_eager_options))
242    }
243
244    pub fn handle_eager_fields(
245        field: &ServiceEntityFieldConfig,
246        entity: &ServiceEntityConfig,
247        subgraph_config: &SubGraphConfig,
248    ) -> Result<Option<(EagerLoadOptions, ServiceEntityConfig)>, async_graphql::Error> {
249        debug!("Handle eager fields");
250        // Since searching by a single key above, the last field is guaranteed to be the field we are looking for.
251        if field.eager.is_none() {
252            trace!("Field is not eager");
253            return Ok(None);
254        }
255
256        // Get the name of the field to eager load - this can help to get the correct collection name.
257        // the join_on value will be the name of the parent entity
258        let join_on = if let Some(join_on) = field.join_on.clone() {
259            join_on
260        } else {
261            return Err(async_graphql::Error::new(format!(
262                "Eager load failed. Failed to get join_on for field: {}. Ensure propety `join_on` is present on the field definiton for this field.",
263                field.name
264            )));
265        };
266
267        let as_type = if let Some(as_type) = field.as_type.clone() {
268            as_type
269        } else {
270            return Err(async_graphql::Error::new(format!(
271                "Eager load failed. Failed to get `as_type` for field: {}",
272                field.name
273            )));
274        };
275
276        // Get the entity to reference the correct collection name.
277        let eager_entity = match subgraph_config.clone().get_entity(&as_type) {
278            Some(entity) => entity,
279            None => {
280                return Err(async_graphql::Error::new(format!(
281                    "Eager load failed. Failed to get entity for key: {}",
282                    join_on
283                )));
284            }
285        };
286
287        // Check if it has a collection name.
288        let collection_name = if let Some(ds) = eager_entity.data_source.clone() {
289            if ds.collection.is_some() {
290                ds.collection.unwrap().clone()
291            } else {
292                eager_entity.name.clone()
293            }
294        } else {
295            eager_entity.name.clone()
296        };
297
298        let join_from = if let Some(join_from) = field.join_from.clone() {
299            join_from
300        } else {
301            field.name.clone()
302        };
303
304        //replace the key with the eager loaded key.
305        let eager_key = format!("{}_{}_{}", entity.name, field.name, join_on);
306
307        let eager_load_options = EagerLoadOptions {
308            from: collection_name,
309            local_field: join_from,
310            foreign_field: join_on,
311            as_field: eager_key.clone(),
312        };
313
314        trace!("Eager load options: {:?}", eager_load_options);
315        Ok(Some((eager_load_options, eager_entity)))
316    }
317
318    pub fn finalize_input(
319        filter: Document,
320        entity: &ServiceEntityConfig,
321        subgraph_config: &SubGraphConfig,
322        resolver_type: &ResolverType,
323    ) -> Result<(Document, Vec<EagerLoadOptions>), async_graphql::Error> {
324        debug!("Finalizing Input Filters");
325        trace!("Filter: {:?}", filter);
326
327        let mut finalized = filter.clone();
328        let mut eager_filters = Vec::new();
329
330        for (key, value) in filter.iter() {
331            if key == "query" {
332                let query = value.as_document().unwrap();
333                let (query_finalized, eager_opts) = MongoDataSource::finalize_input(
334                    query.clone(),
335                    entity,
336                    subgraph_config,
337                    &resolver_type,
338                )?;
339                finalized.insert(key.clone(), query_finalized);
340                eager_filters.extend(eager_opts);
341            }
342
343            // Values is an object without filters, so we can just return it.
344            if key == "values" {
345                finalized.insert(key.clone(), value.clone());
346            }
347
348            if FilterOperator::list()
349                .iter()
350                .map(|operator| operator.as_str())
351                .collect::<Vec<&str>>()
352                .contains(&key.as_str())
353            {
354                trace!("Found filter operator key: {}", key);
355                let mut recursive_filters = Vec::new();
356                let filters = match value.as_array() {
357                    Some(filters) => filters.clone(),
358                    None => {
359                        let filters = vec![value.clone()];
360                        filters
361                    }
362                };
363                for filter in filters {
364                    let filter = filter.as_document().unwrap();
365                    let (filter_finalized, eager_opts) = MongoDataSource::finalize_input(
366                        filter.clone(),
367                        entity,
368                        subgraph_config,
369                        &resolver_type,
370                    )?;
371                    recursive_filters.push(filter_finalized);
372                    eager_filters.extend(eager_opts);
373                }
374                finalized.remove(key);
375                let filter_operator = FilterOperator::from_str(key).unwrap();
376                let operator_key = FilterOperator::get_mongo_operator(&filter_operator);
377
378                match filter_operator {
379                    FilterOperator::And => {
380                        finalized.insert(operator_key, recursive_filters);
381                    }
382                    FilterOperator::Or => {
383                        finalized.insert(operator_key, recursive_filters);
384                    }
385                    _ => {
386                        // The rest of the filter operators require the reverse format.
387                        // For example { $eq: { name: "test" } } becomes { name: { $eq: "test" } }
388                        let mut new_filter = Document::new();
389                        for filter in recursive_filters {
390                            for (key, value) in filter.iter() {
391                                let mut new_value = Document::new();
392                                new_value.insert(operator_key, value.clone());
393                                new_filter.insert(key.clone(), new_value);
394                            }
395                        }
396                        finalized.insert("$and".to_string(), vec![new_filter]);
397                    }
398                }
399            }
400
401            // Add the options back to the filter.
402            if key == "opts" {
403                finalized.insert(key.clone(), value.clone());
404            }
405        }
406
407        // Parse the provided object eager options and convert them to the correct format.
408        let eager_load_options;
409        (finalized, eager_load_options) =
410            MongoDataSource::convert_object_id_string_to_object_id_from_doc(
411                finalized,
412                entity,
413                subgraph_config,
414                &resolver_type,
415                None,
416            )?;
417        eager_filters.extend(eager_load_options);
418
419        trace!("Filter Finalized");
420        trace!("Finalized: {:?}", finalized);
421        trace!("Total Eager Load Options: {:?}", eager_filters);
422
423        Ok((finalized, eager_filters))
424    }
425
426    pub fn create_aggregation(
427        query_doc: &Document,
428        eager_load_options: Vec<EagerLoadOptions>,
429        opts_doc: Option<OptionsInput>,
430    ) -> Result<Vec<Document>, async_graphql::Error> {
431        debug!("Creating Aggregation");
432        trace!("Query Doc: {:?}", query_doc);
433        trace!("Eager Load Options: {:?}", eager_load_options);
434        trace!("Opts Doc: {:?}", opts_doc);
435        let mut pipeline = Vec::new();
436        for eager_load_option in eager_load_options {
437            let lookup = doc! {
438                "$lookup": {
439                    "from": eager_load_option.from,
440                    "localField": eager_load_option.local_field,
441                    "foreignField": eager_load_option.foreign_field,
442                    "as": eager_load_option.as_field.clone(),
443                }
444            };
445            pipeline.push(lookup);
446            let unwind = doc! {
447                "$unwind": {
448                    "path": format!("${}", eager_load_option.as_field),
449                    "preserveNullAndEmptyArrays": true,
450                }
451            };
452            pipeline.push(unwind);
453        }
454
455        let match_doc = doc! {
456            "$match": query_doc
457        };
458        pipeline.push(match_doc);
459
460        // Start the facet pipeline.
461        let mut facet_doc = doc! {
462            "total_count": [
463                {
464                    "$count": "total_count"
465                }
466            ]
467        };
468
469        let mut paginated_facet_doc = vec![];
470
471        // Handle sorting and paginating
472        if let Some(opts) = opts_doc {
473            let mut sort_doc = doc! {};
474            let mut skip = 0;
475            let mut limit = 10;
476            if let Some(sort) = opts.sort {
477                for sort_input in sort.iter() {
478                    sort_doc.insert(
479                        sort_input.field.clone(),
480                        match sort_input.direction {
481                            DirectionEnum::Asc => 1,
482                            DirectionEnum::Desc => -1,
483                        },
484                    );
485                }
486            }
487
488            // Get the limit from the opts input
489            if let Some(per_page) = opts.per_page {
490                limit = per_page;
491            }
492
493            // If opts.page and opts.per_page, calculate the new skip and limit values.
494            if let Some(page_value) = opts.page {
495                if let Some(per_page_value) = opts.per_page {
496                    skip = (page_value - 1) * per_page_value;
497                    limit = per_page_value;
498                }
499            }
500
501            trace!("Sort Doc: {:?}", sort_doc);
502
503            if !sort_doc.is_empty() {
504                let sort = doc! {
505                    "$sort": sort_doc
506                };
507                paginated_facet_doc.push(sort);
508            }
509            if skip > 0 {
510                let skip = doc! {
511                    "$skip": skip
512                };
513                paginated_facet_doc.push(skip);
514            }
515            if limit > 0 {
516                let limit = doc! {
517                    "$limit": limit
518                };
519                paginated_facet_doc.push(limit);
520            }
521        }
522
523        facet_doc.insert("documents", paginated_facet_doc);
524
525        pipeline.push(doc! {
526            "$facet": facet_doc
527        });
528
529        trace!("Pipeline: {:?}", pipeline);
530        Ok(pipeline)
531    }
532
533    pub async fn execute_operation<'a>(
534        data_source: &DataSource,
535        mut input: Document,
536        entity: ServiceEntityConfig,
537        resolver_type: ResolverType,
538        subgraph_config: &SubGraphConfig,
539    ) -> Result<Option<FieldValue<'a>>, async_graphql::Error> {
540        debug!("Executing Operation - Mongo Data Source");
541        trace!("Input: {:?}", input);
542
543        let eager_load_options;
544        (input, eager_load_options) =
545            MongoDataSource::finalize_input(input, &entity, subgraph_config, &resolver_type)?;
546
547        let db = match data_source {
548            DataSource::Mongo(ds) => ds.db.clone(),
549            _ => unreachable!(),
550        };
551
552        debug!("Database Found");
553
554        let collection_name = ServiceEntityConfig::get_mongo_collection_name(&entity);
555
556        debug!("Found Collection Name");
557        trace!("{:?}", collection_name);
558
559        match resolver_type {
560            ResolverType::FindOne => {
561                let result =
562                    services::Services::find_one(db, input, collection_name, eager_load_options)
563                        .await?;
564                let res = ResolverResponse {
565                    data: vec![FieldValue::owned_any(result)],
566                    meta: ResolverResponseMeta {
567                        request_id: uuid::Uuid::new_v4().to_string(),
568                        service_name: subgraph_config.service.name.clone(),
569                        service_version: subgraph_config.service.version.clone(),
570                        executed_at: chrono::Utc::now()
571                            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
572                        count: 1,
573                        total_count: 1,
574                        page: 1,
575                        total_pages: 1,
576                        user_uuid: None,
577                    },
578                };
579                Ok(Some(FieldValue::owned_any(res)))
580            }
581            ResolverType::FindMany => {
582                let (results, total_count) = services::Services::find_many(
583                    db,
584                    input.clone(),
585                    collection_name,
586                    eager_load_options,
587                )
588                .await?;
589                let opts_doc = if input.clone().get("opts").is_some() {
590                    trace!("Options Document Found: {:?}", input.get("opts").unwrap());
591                    to_document(input.get("opts").unwrap()).unwrap()
592                } else {
593                    trace!("Options Document Not Found. Defaulting to 10 per page.");
594                    let mut d = Document::new();
595                    d.insert("per_page", 10);
596                    d.insert("page", 1);
597                    trace!("created opts: {:?}", d);
598                    d
599                };
600                let page = if let Some(page_value) = opts_doc.get("page") {
601                    page_value.as_i32().unwrap() as i64
602                } else {
603                    1
604                };
605                let total_pages = if let Some(per_page_value) = opts_doc.get("per_page") {
606                    let mut per_page = per_page_value.as_i32();
607                    if per_page.is_none() {
608                        let per_page_i64 = per_page_value.as_i64();
609                        if per_page_i64.is_none() {
610                            warn!("Invalid per_page value. Defaulting to 10.");
611                            per_page = Some(10);
612                        } else {
613                            per_page = Some(per_page_i64.unwrap() as i32);
614                        }
615                    }
616                    if total_count as i32 % per_page.unwrap() as i32 == 0 {
617                        total_count as i32 / per_page.unwrap() as i32
618                    } else {
619                        (total_count as i32 / per_page.unwrap() as i32) + 1
620                    }
621                } else {
622                    1
623                };
624
625                let res = ResolverResponse {
626                    data: results
627                        .clone()
628                        .into_iter()
629                        .map(|doc| FieldValue::owned_any(doc))
630                        .collect(),
631                    meta: ResolverResponseMeta {
632                        request_id: uuid::Uuid::new_v4().to_string(),
633                        service_name: subgraph_config.service.name.clone(),
634                        service_version: subgraph_config.service.version.clone(),
635                        executed_at: chrono::Utc::now()
636                            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
637                        count: results.len() as i64,
638                        total_count: total_count as i64,
639                        page,
640                        total_pages: total_pages as i64,
641                        user_uuid: None,
642                    },
643                };
644                Ok(Some(FieldValue::owned_any(res)))
645            }
646            ResolverType::CreateOne => {
647                let result = services::Services::create_one(db, input, collection_name).await?;
648                let res = ResolverResponse {
649                    data: vec![FieldValue::owned_any(result)],
650                    meta: ResolverResponseMeta {
651                        request_id: uuid::Uuid::new_v4().to_string(),
652                        service_name: subgraph_config.service.name.clone(),
653                        service_version: subgraph_config.service.version.clone(),
654                        executed_at: chrono::Utc::now()
655                            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
656                        count: 1,
657                        total_count: 1,
658                        page: 1,
659                        total_pages: 1,
660                        user_uuid: None,
661                    },
662                };
663                Ok(Some(FieldValue::owned_any(res)))
664            }
665            ResolverType::UpdateOne => {
666                let result =
667                    services::Services::update_one(db, input, collection_name, &entity).await?;
668                let res = ResolverResponse {
669                    data: vec![FieldValue::owned_any(result)],
670                    meta: ResolverResponseMeta {
671                        request_id: uuid::Uuid::new_v4().to_string(),
672                        service_name: subgraph_config.service.name.clone(),
673                        service_version: subgraph_config.service.version.clone(),
674                        executed_at: chrono::Utc::now()
675                            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
676                        count: 1,
677                        total_count: 1,
678                        page: 1,
679                        total_pages: 1,
680                        user_uuid: None,
681                    },
682                };
683                Ok(Some(FieldValue::owned_any(res)))
684            }
685            ResolverType::UpdateMany => {
686                let results =
687                    services::Services::update_many(db, input, collection_name, &entity).await?;
688                let count = results.len();
689                let res = ResolverResponse {
690                    data: results
691                        .into_iter()
692                        .map(|doc| FieldValue::owned_any(doc))
693                        .collect(),
694                    meta: ResolverResponseMeta {
695                        request_id: uuid::Uuid::new_v4().to_string(),
696                        service_name: subgraph_config.service.name.clone(),
697                        service_version: subgraph_config.service.version.clone(),
698                        executed_at: chrono::Utc::now()
699                            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
700                        count: count as i64,
701                        total_count: count as i64,
702                        page: 1,
703                        total_pages: 1,
704                        user_uuid: None,
705                    },
706                };
707                Ok(Some(FieldValue::owned_any(res)))
708            }
709            _ => panic!("Invalid resolver type"),
710        }
711    }
712}