subgraph/data_sources/mongo/services/update_one/
mod.rs1use async_graphql::futures_util::StreamExt;
2use bson::{doc, to_document, Document};
3use log::{debug, error};
4use mongodb::{
5 options::{FindOneAndUpdateOptions, ReturnDocument},
6 Database,
7};
8
9use crate::configuration::subgraph::entities::ServiceEntityConfig;
10
11use super::Services;
12
13impl Services {
14 pub async fn update_one(
15 db: Database,
16 input: Document,
17 collection: String,
18 entity: &ServiceEntityConfig,
19 ) -> Result<Option<Document>, async_graphql::Error> {
20 debug!("Executing Update One");
21
22 let coll = db.collection::<Document>(&collection);
23
24 let filter = to_document(input.get("query").unwrap())?;
25
26 let options = FindOneAndUpdateOptions::builder()
27 .return_document(ReturnDocument::After)
28 .upsert(true)
29 .build();
30
31 let values = to_document(input.get("values").unwrap())?;
32
33 let update_doc = Services::create_nested_fields(&values);
34
35 let mut cursor = coll.find(filter.clone(), None).await.map_err(|e| {
36 async_graphql::Error::new(format!(
37 "Error finding document to update: {}",
38 e.to_string()
39 ))
40 })?;
41
42 let mut primary_keys = Vec::new();
43
44 let primary_key_field = ServiceEntityConfig::get_primary_key_field(&entity)?;
45 while let Some(result) = cursor.next().await {
46 match result {
47 Ok(document) => {
48 let primary_key = document.get(primary_key_field.name.clone()).unwrap();
49 primary_keys.push(primary_key.clone());
50 }
51 Err(e) => {
52 return Err(async_graphql::Error::new(e.to_string()));
53 }
54 }
55 }
56
57 if primary_keys.len() > 1 {
58 error!("Multiple documents found for update");
59 return Err(async_graphql::Error::new(
60 "Multiple documents found for update",
61 ));
62 }
63
64 if primary_keys.len() == 0 {
65 error!("No documents found for update");
66 return Err(async_graphql::Error::new("No documents found for update"));
67 }
68
69 let primary_key = primary_keys.get(0).unwrap();
70 let filter = doc! {primary_key_field.name: primary_key};
71
72 let document = coll
73 .find_one_and_update(filter, doc! {"$set": update_doc}, options)
74 .await
75 .map_err(|e| async_graphql::Error::new(e.to_string()))?;
76
77 debug!("Update One Result: {:?}", document);
78
79 match document {
80 Some(document) => Ok(Some(document)),
81 None => Err(async_graphql::Error::new("No Document Found")),
82 }
83 }
84}