subgraph/data_sources/mongo/services/update_one/
mod.rs

1use async_graphql::futures_util::StreamExt;
2use bson::{doc, to_document, Document};
3use log::{debug, error};
4use mongodb::{
5    options::{FindOneAndUpdateOptions, ReturnDocument},
6    Database,
7};
8
9use crate::configuration::subgraph::entities::ServiceEntityConfig;
10
11use super::Services;
12
13impl Services {
14    pub async fn update_one(
15        db: Database,
16        input: Document,
17        collection: String,
18        entity: &ServiceEntityConfig,
19    ) -> Result<Option<Document>, async_graphql::Error> {
20        debug!("Executing Update One");
21
22        let coll = db.collection::<Document>(&collection);
23
24        let filter = to_document(input.get("query").unwrap())?;
25
26        let options = FindOneAndUpdateOptions::builder()
27            .return_document(ReturnDocument::After)
28            .upsert(true)
29            .build();
30
31        let values = to_document(input.get("values").unwrap())?;
32
33        let update_doc = Services::create_nested_fields(&values);
34
35        let mut cursor = coll.find(filter.clone(), None).await.map_err(|e| {
36            async_graphql::Error::new(format!(
37                "Error finding document to update: {}",
38                e.to_string()
39            ))
40        })?;
41
42        let mut primary_keys = Vec::new();
43
44        let primary_key_field = ServiceEntityConfig::get_primary_key_field(&entity)?;
45        while let Some(result) = cursor.next().await {
46            match result {
47                Ok(document) => {
48                    let primary_key = document.get(primary_key_field.name.clone()).unwrap();
49                    primary_keys.push(primary_key.clone());
50                }
51                Err(e) => {
52                    return Err(async_graphql::Error::new(e.to_string()));
53                }
54            }
55        }
56
57        if primary_keys.len() > 1 {
58            error!("Multiple documents found for update");
59            return Err(async_graphql::Error::new(
60                "Multiple documents found for update",
61            ));
62        }
63
64        if primary_keys.len() == 0 {
65            error!("No documents found for update");
66            return Err(async_graphql::Error::new("No documents found for update"));
67        }
68
69        let primary_key = primary_keys.get(0).unwrap();
70        let filter = doc! {primary_key_field.name: primary_key};
71
72        let document = coll
73            .find_one_and_update(filter, doc! {"$set": update_doc}, options)
74            .await
75            .map_err(|e| async_graphql::Error::new(e.to_string()))?;
76
77        debug!("Update One Result: {:?}", document);
78
79        match document {
80            Some(document) => Ok(Some(document)),
81            None => Err(async_graphql::Error::new("No Document Found")),
82        }
83    }
84}