1use async_graphql::dynamic::FieldValue;
2use bson::{doc, to_document, Document};
3use log::{debug, error, trace, warn};
4use mongodb::{options::ClientOptions, Client, Database};
5
6use crate::{
7 configuration::subgraph::{
8 data_sources::mongo::MongoDataSourceConfig,
9 entities::{service_entity_field::ServiceEntityFieldConfig, ServiceEntityConfig},
10 SubGraphConfig,
11 },
12 filter_operator::FilterOperator,
13 graphql::{
14 entity::create_return_types::{ResolverResponse, ResolverResponseMeta},
15 schema::create_options_input::{DirectionEnum, OptionsInput},
16 },
17 resolver_type::ResolverType,
18 scalar_option::{to_mongo::MongoValue, ScalarOption},
19};
20
21use super::DataSource;
22
23pub mod services;
24
25#[derive(Debug, Clone)]
26pub struct MongoDataSource {
27 pub client: Client,
28 pub db: Database,
29 pub config: MongoDataSourceConfig,
30}
31
32#[derive(Debug, Clone)]
33pub struct EagerLoadOptions {
34 pub from: String,
35 pub local_field: String,
36 pub foreign_field: String,
37 pub as_field: String,
38}
39
40impl MongoDataSource {
41 pub async fn init(mongo_data_source_config: &MongoDataSourceConfig) -> DataSource {
42 debug!("Initializing Mongo");
43 let client_options = ClientOptions::parse(&mongo_data_source_config.uri)
44 .await
45 .expect("Failed to parse mongo client options.");
46
47 let client = Client::with_options(client_options).expect("Failed to create client");
48 let db = client.database(&mongo_data_source_config.db);
49
50 debug!("Created Mongo Data Source");
51 debug!("{:?}", client);
52 debug!("{:?}", db);
53
54 DataSource::Mongo(MongoDataSource {
55 client,
56 db,
57 config: mongo_data_source_config.clone(),
58 })
59 }
60
61 pub fn convert_object_id_string_to_object_id_from_doc(
64 filter: Document,
65 entity: &ServiceEntityConfig,
66 subgraph_config: &SubGraphConfig,
67 resolver_type: &ResolverType,
68 key: Option<String>, ) -> Result<(Document, Vec<EagerLoadOptions>), async_graphql::Error> {
70 debug!("Serialize String Object IDs to Object IDs");
71 trace!("Filter: {:?}", filter);
72
73 let mut converted = filter.clone();
74 let mut combined_eager_options = vec![];
75
76 for (k, value) in filter.iter() {
77 trace!(
78 "Current Key: {:?}, Processing Key: {}, Value: {}",
79 key.clone(),
80 k,
81 value
82 );
83
84 if k == "query"
86 || k == "values"
87 || FilterOperator::list()
88 .iter()
89 .map(|x| x.as_str())
90 .any(|x| x == k)
91 {
92 let document = match value.as_document() {
93 Some(document) => document,
94 None => {
95 error!("Failed to get document from value");
96 return Err(async_graphql::Error::from(
97 "Failed to get document from value",
98 ));
99 }
100 };
101 let (nested_converted, nested_eager_load_options) =
103 match MongoDataSource::convert_object_id_string_to_object_id_from_doc(
104 document.clone(),
105 entity,
106 subgraph_config,
107 resolver_type,
108 key.clone(),
109 ) {
110 Ok(nested) => nested,
111 Err(e) => {
112 error!(
113 "Failed to convert object id string to object id. Error: {:?}",
114 e
115 );
116 return Err(e);
117 }
118 };
119 converted.insert(k.clone(), nested_converted);
120 combined_eager_options.extend(nested_eager_load_options);
121 continue;
122 }
123
124 let fields = match ServiceEntityConfig::get_fields_recursive(entity, &k) {
125 Ok(fields) => fields,
126 Err(_) => {
127 continue;
128 }
129 };
130
131 if let Some(field) = fields.last() {
133 if !field.eager.unwrap_or(false)
136 || resolver_type == &ResolverType::CreateOne
137 || resolver_type == &ResolverType::UpdateOne
138 || resolver_type == &ResolverType::UpdateMany
139 {
140 match field.scalar.bson_to_mongo_value(value) {
141 Ok(mongo_value) => {
142 if mongo_value.is_some() {
143 match mongo_value.unwrap() {
144 MongoValue::ObjectID(object_id) => {
145 converted.insert(k.clone(), object_id);
147 }
148 MongoValue::DateTime(date_time) => {
149 converted.insert(k.clone(), date_time);
151 }
152 _ => {}
153 }
154 }
155 }
156 Err(_) => {}
157 };
158 }
159
160 match field.scalar {
162 ScalarOption::Object => {
163 trace!("Found Object Scalar");
164 trace!("Current Key: {:?}", key.clone());
165 let separator = if key.is_none() { "" } else { "." };
166 let separated = format!("{}{}", separator, k);
167 let key = Some(separated);
168 let (nested_converted, nested_eager_load_options) =
169 match MongoDataSource::convert_object_id_string_to_object_id_from_doc(
170 value.as_document().unwrap().clone(),
171 entity,
172 subgraph_config,
173 resolver_type,
174 key.clone(),
175 ) {
176 Ok(nested) => nested,
177 Err(e) => {
178 error!(
179 "Failed to convert object id string to object id. Error: {:?}",
180 e
181 );
182 return Err(e);
183 }
184 };
185 trace!("Nested Converted: {:?}", nested_converted);
186 trace!("Inserting Key: {:?}", key);
187 converted.insert(key.as_ref().unwrap().clone(), nested_converted);
188 combined_eager_options.extend(nested_eager_load_options);
189 }
190 _ => (),
191 }
192
193 if resolver_type != &ResolverType::FindOne
195 && resolver_type != &ResolverType::FindMany
196 {
197 continue;
198 }
199
200 let eager_load_options =
202 match MongoDataSource::handle_eager_fields(field, entity, subgraph_config) {
203 Ok(eager_load_options) => eager_load_options,
204 Err(e) => {
205 error!("Failed to handle eager fields. Error: {:?}", e);
206 return Err(e);
207 }
208 };
209
210 if let Some((eager_load_option, eager_entity)) = eager_load_options {
211 let (value, nested_eager_opts) =
214 match MongoDataSource::convert_object_id_string_to_object_id_from_doc(
215 value.as_document().unwrap().clone(), &eager_entity,
217 subgraph_config,
218 resolver_type,
219 key.clone(),
220 ) {
221 Ok(nested) => nested,
222 Err(e) => {
223 error!(
224 "Failed to convert object id string to object id. Error: {:?}",
225 e
226 );
227 return Err(e);
228 }
229 };
230 combined_eager_options.extend(nested_eager_opts);
231
232 converted.remove(&k);
234 converted.insert(eager_load_option.as_field.clone(), value);
235 combined_eager_options.push(eager_load_option);
236 }
237 }
238 }
239
240 trace!("Converted: {:?}", converted);
241 Ok((converted, combined_eager_options))
242 }
243
244 pub fn handle_eager_fields(
245 field: &ServiceEntityFieldConfig,
246 entity: &ServiceEntityConfig,
247 subgraph_config: &SubGraphConfig,
248 ) -> Result<Option<(EagerLoadOptions, ServiceEntityConfig)>, async_graphql::Error> {
249 debug!("Handle eager fields");
250 if field.eager.is_none() {
252 trace!("Field is not eager");
253 return Ok(None);
254 }
255
256 let join_on = if let Some(join_on) = field.join_on.clone() {
259 join_on
260 } else {
261 return Err(async_graphql::Error::new(format!(
262 "Eager load failed. Failed to get join_on for field: {}. Ensure propety `join_on` is present on the field definiton for this field.",
263 field.name
264 )));
265 };
266
267 let as_type = if let Some(as_type) = field.as_type.clone() {
268 as_type
269 } else {
270 return Err(async_graphql::Error::new(format!(
271 "Eager load failed. Failed to get `as_type` for field: {}",
272 field.name
273 )));
274 };
275
276 let eager_entity = match subgraph_config.clone().get_entity(&as_type) {
278 Some(entity) => entity,
279 None => {
280 return Err(async_graphql::Error::new(format!(
281 "Eager load failed. Failed to get entity for key: {}",
282 join_on
283 )));
284 }
285 };
286
287 let collection_name = if let Some(ds) = eager_entity.data_source.clone() {
289 if ds.collection.is_some() {
290 ds.collection.unwrap().clone()
291 } else {
292 eager_entity.name.clone()
293 }
294 } else {
295 eager_entity.name.clone()
296 };
297
298 let join_from = if let Some(join_from) = field.join_from.clone() {
299 join_from
300 } else {
301 field.name.clone()
302 };
303
304 let eager_key = format!("{}_{}_{}", entity.name, field.name, join_on);
306
307 let eager_load_options = EagerLoadOptions {
308 from: collection_name,
309 local_field: join_from,
310 foreign_field: join_on,
311 as_field: eager_key.clone(),
312 };
313
314 trace!("Eager load options: {:?}", eager_load_options);
315 Ok(Some((eager_load_options, eager_entity)))
316 }
317
318 pub fn finalize_input(
319 filter: Document,
320 entity: &ServiceEntityConfig,
321 subgraph_config: &SubGraphConfig,
322 resolver_type: &ResolverType,
323 ) -> Result<(Document, Vec<EagerLoadOptions>), async_graphql::Error> {
324 debug!("Finalizing Input Filters");
325 trace!("Filter: {:?}", filter);
326
327 let mut finalized = filter.clone();
328 let mut eager_filters = Vec::new();
329
330 for (key, value) in filter.iter() {
331 if key == "query" {
332 let query = value.as_document().unwrap();
333 let (query_finalized, eager_opts) = MongoDataSource::finalize_input(
334 query.clone(),
335 entity,
336 subgraph_config,
337 &resolver_type,
338 )?;
339 finalized.insert(key.clone(), query_finalized);
340 eager_filters.extend(eager_opts);
341 }
342
343 if key == "values" {
345 finalized.insert(key.clone(), value.clone());
346 }
347
348 if FilterOperator::list()
349 .iter()
350 .map(|operator| operator.as_str())
351 .collect::<Vec<&str>>()
352 .contains(&key.as_str())
353 {
354 trace!("Found filter operator key: {}", key);
355 let mut recursive_filters = Vec::new();
356 let filters = match value.as_array() {
357 Some(filters) => filters.clone(),
358 None => {
359 let filters = vec![value.clone()];
360 filters
361 }
362 };
363 for filter in filters {
364 let filter = filter.as_document().unwrap();
365 let (filter_finalized, eager_opts) = MongoDataSource::finalize_input(
366 filter.clone(),
367 entity,
368 subgraph_config,
369 &resolver_type,
370 )?;
371 recursive_filters.push(filter_finalized);
372 eager_filters.extend(eager_opts);
373 }
374 finalized.remove(key);
375 let filter_operator = FilterOperator::from_str(key).unwrap();
376 let operator_key = FilterOperator::get_mongo_operator(&filter_operator);
377
378 match filter_operator {
379 FilterOperator::And => {
380 finalized.insert(operator_key, recursive_filters);
381 }
382 FilterOperator::Or => {
383 finalized.insert(operator_key, recursive_filters);
384 }
385 _ => {
386 let mut new_filter = Document::new();
389 for filter in recursive_filters {
390 for (key, value) in filter.iter() {
391 let mut new_value = Document::new();
392 new_value.insert(operator_key, value.clone());
393 new_filter.insert(key.clone(), new_value);
394 }
395 }
396 finalized.insert("$and".to_string(), vec![new_filter]);
397 }
398 }
399 }
400
401 if key == "opts" {
403 finalized.insert(key.clone(), value.clone());
404 }
405 }
406
407 let eager_load_options;
409 (finalized, eager_load_options) =
410 MongoDataSource::convert_object_id_string_to_object_id_from_doc(
411 finalized,
412 entity,
413 subgraph_config,
414 &resolver_type,
415 None,
416 )?;
417 eager_filters.extend(eager_load_options);
418
419 trace!("Filter Finalized");
420 trace!("Finalized: {:?}", finalized);
421 trace!("Total Eager Load Options: {:?}", eager_filters);
422
423 Ok((finalized, eager_filters))
424 }
425
426 pub fn create_aggregation(
427 query_doc: &Document,
428 eager_load_options: Vec<EagerLoadOptions>,
429 opts_doc: Option<OptionsInput>,
430 ) -> Result<Vec<Document>, async_graphql::Error> {
431 debug!("Creating Aggregation");
432 trace!("Query Doc: {:?}", query_doc);
433 trace!("Eager Load Options: {:?}", eager_load_options);
434 trace!("Opts Doc: {:?}", opts_doc);
435 let mut pipeline = Vec::new();
436 for eager_load_option in eager_load_options {
437 let lookup = doc! {
438 "$lookup": {
439 "from": eager_load_option.from,
440 "localField": eager_load_option.local_field,
441 "foreignField": eager_load_option.foreign_field,
442 "as": eager_load_option.as_field.clone(),
443 }
444 };
445 pipeline.push(lookup);
446 let unwind = doc! {
447 "$unwind": {
448 "path": format!("${}", eager_load_option.as_field),
449 "preserveNullAndEmptyArrays": true,
450 }
451 };
452 pipeline.push(unwind);
453 }
454
455 let match_doc = doc! {
456 "$match": query_doc
457 };
458 pipeline.push(match_doc);
459
460 let mut facet_doc = doc! {
462 "total_count": [
463 {
464 "$count": "total_count"
465 }
466 ]
467 };
468
469 let mut paginated_facet_doc = vec![];
470
471 if let Some(opts) = opts_doc {
473 let mut sort_doc = doc! {};
474 let mut skip = 0;
475 let mut limit = 10;
476 if let Some(sort) = opts.sort {
477 for sort_input in sort.iter() {
478 sort_doc.insert(
479 sort_input.field.clone(),
480 match sort_input.direction {
481 DirectionEnum::Asc => 1,
482 DirectionEnum::Desc => -1,
483 },
484 );
485 }
486 }
487
488 if let Some(per_page) = opts.per_page {
490 limit = per_page;
491 }
492
493 if let Some(page_value) = opts.page {
495 if let Some(per_page_value) = opts.per_page {
496 skip = (page_value - 1) * per_page_value;
497 limit = per_page_value;
498 }
499 }
500
501 trace!("Sort Doc: {:?}", sort_doc);
502
503 if !sort_doc.is_empty() {
504 let sort = doc! {
505 "$sort": sort_doc
506 };
507 paginated_facet_doc.push(sort);
508 }
509 if skip > 0 {
510 let skip = doc! {
511 "$skip": skip
512 };
513 paginated_facet_doc.push(skip);
514 }
515 if limit > 0 {
516 let limit = doc! {
517 "$limit": limit
518 };
519 paginated_facet_doc.push(limit);
520 }
521 }
522
523 facet_doc.insert("documents", paginated_facet_doc);
524
525 pipeline.push(doc! {
526 "$facet": facet_doc
527 });
528
529 trace!("Pipeline: {:?}", pipeline);
530 Ok(pipeline)
531 }
532
533 pub async fn execute_operation<'a>(
534 data_source: &DataSource,
535 mut input: Document,
536 entity: ServiceEntityConfig,
537 resolver_type: ResolverType,
538 subgraph_config: &SubGraphConfig,
539 ) -> Result<Option<FieldValue<'a>>, async_graphql::Error> {
540 debug!("Executing Operation - Mongo Data Source");
541 trace!("Input: {:?}", input);
542
543 let eager_load_options;
544 (input, eager_load_options) =
545 MongoDataSource::finalize_input(input, &entity, subgraph_config, &resolver_type)?;
546
547 let db = match data_source {
548 DataSource::Mongo(ds) => ds.db.clone(),
549 _ => unreachable!(),
550 };
551
552 debug!("Database Found");
553
554 let collection_name = ServiceEntityConfig::get_mongo_collection_name(&entity);
555
556 debug!("Found Collection Name");
557 trace!("{:?}", collection_name);
558
559 match resolver_type {
560 ResolverType::FindOne => {
561 let result =
562 services::Services::find_one(db, input, collection_name, eager_load_options)
563 .await?;
564 let res = ResolverResponse {
565 data: vec![FieldValue::owned_any(result)],
566 meta: ResolverResponseMeta {
567 request_id: uuid::Uuid::new_v4().to_string(),
568 service_name: subgraph_config.service.name.clone(),
569 service_version: subgraph_config.service.version.clone(),
570 executed_at: chrono::Utc::now()
571 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
572 count: 1,
573 total_count: 1,
574 page: 1,
575 total_pages: 1,
576 user_uuid: None,
577 },
578 };
579 Ok(Some(FieldValue::owned_any(res)))
580 }
581 ResolverType::FindMany => {
582 let (results, total_count) = services::Services::find_many(
583 db,
584 input.clone(),
585 collection_name,
586 eager_load_options,
587 )
588 .await?;
589 let opts_doc = if input.clone().get("opts").is_some() {
590 trace!("Options Document Found: {:?}", input.get("opts").unwrap());
591 to_document(input.get("opts").unwrap()).unwrap()
592 } else {
593 trace!("Options Document Not Found. Defaulting to 10 per page.");
594 let mut d = Document::new();
595 d.insert("per_page", 10);
596 d.insert("page", 1);
597 trace!("created opts: {:?}", d);
598 d
599 };
600 let page = if let Some(page_value) = opts_doc.get("page") {
601 page_value.as_i32().unwrap() as i64
602 } else {
603 1
604 };
605 let total_pages = if let Some(per_page_value) = opts_doc.get("per_page") {
606 let mut per_page = per_page_value.as_i32();
607 if per_page.is_none() {
608 let per_page_i64 = per_page_value.as_i64();
609 if per_page_i64.is_none() {
610 warn!("Invalid per_page value. Defaulting to 10.");
611 per_page = Some(10);
612 } else {
613 per_page = Some(per_page_i64.unwrap() as i32);
614 }
615 }
616 if total_count as i32 % per_page.unwrap() as i32 == 0 {
617 total_count as i32 / per_page.unwrap() as i32
618 } else {
619 (total_count as i32 / per_page.unwrap() as i32) + 1
620 }
621 } else {
622 1
623 };
624
625 let res = ResolverResponse {
626 data: results
627 .clone()
628 .into_iter()
629 .map(|doc| FieldValue::owned_any(doc))
630 .collect(),
631 meta: ResolverResponseMeta {
632 request_id: uuid::Uuid::new_v4().to_string(),
633 service_name: subgraph_config.service.name.clone(),
634 service_version: subgraph_config.service.version.clone(),
635 executed_at: chrono::Utc::now()
636 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
637 count: results.len() as i64,
638 total_count: total_count as i64,
639 page,
640 total_pages: total_pages as i64,
641 user_uuid: None,
642 },
643 };
644 Ok(Some(FieldValue::owned_any(res)))
645 }
646 ResolverType::CreateOne => {
647 let result = services::Services::create_one(db, input, collection_name).await?;
648 let res = ResolverResponse {
649 data: vec![FieldValue::owned_any(result)],
650 meta: ResolverResponseMeta {
651 request_id: uuid::Uuid::new_v4().to_string(),
652 service_name: subgraph_config.service.name.clone(),
653 service_version: subgraph_config.service.version.clone(),
654 executed_at: chrono::Utc::now()
655 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
656 count: 1,
657 total_count: 1,
658 page: 1,
659 total_pages: 1,
660 user_uuid: None,
661 },
662 };
663 Ok(Some(FieldValue::owned_any(res)))
664 }
665 ResolverType::UpdateOne => {
666 let result =
667 services::Services::update_one(db, input, collection_name, &entity).await?;
668 let res = ResolverResponse {
669 data: vec![FieldValue::owned_any(result)],
670 meta: ResolverResponseMeta {
671 request_id: uuid::Uuid::new_v4().to_string(),
672 service_name: subgraph_config.service.name.clone(),
673 service_version: subgraph_config.service.version.clone(),
674 executed_at: chrono::Utc::now()
675 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
676 count: 1,
677 total_count: 1,
678 page: 1,
679 total_pages: 1,
680 user_uuid: None,
681 },
682 };
683 Ok(Some(FieldValue::owned_any(res)))
684 }
685 ResolverType::UpdateMany => {
686 let results =
687 services::Services::update_many(db, input, collection_name, &entity).await?;
688 let count = results.len();
689 let res = ResolverResponse {
690 data: results
691 .into_iter()
692 .map(|doc| FieldValue::owned_any(doc))
693 .collect(),
694 meta: ResolverResponseMeta {
695 request_id: uuid::Uuid::new_v4().to_string(),
696 service_name: subgraph_config.service.name.clone(),
697 service_version: subgraph_config.service.version.clone(),
698 executed_at: chrono::Utc::now()
699 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
700 count: count as i64,
701 total_count: count as i64,
702 page: 1,
703 total_pages: 1,
704 user_uuid: None,
705 },
706 };
707 Ok(Some(FieldValue::owned_any(res)))
708 }
709 _ => panic!("Invalid resolver type"),
710 }
711 }
712}