subgraph/data_sources/mongo/services/update_many/
mod.rs

1use async_graphql::futures_util::StreamExt;
2use bson::{doc, to_document, Document};
3use log::debug;
4use mongodb::Database;
5
6use crate::configuration::subgraph::entities::ServiceEntityConfig;
7
8use super::Services;
9
10impl Services {
11    pub async fn update_many(
12        db: Database,
13        input: Document,
14        collection: String,
15        entity: &ServiceEntityConfig,
16    ) -> Result<Vec<Option<Document>>, async_graphql::Error> {
17        debug!("Executing Update Many");
18
19        let coll = db.collection::<Document>(&collection);
20
21        let filter = to_document(input.get("query").unwrap()).unwrap();
22        let values = to_document(input.get("values").unwrap()).unwrap();
23
24        let update_doc = Services::create_nested_fields(&values);
25
26        let mut cursor = coll
27            .find(filter.clone(), None)
28            .await
29            .map_err(|e| async_graphql::Error::new(e.to_string()))?;
30
31        let mut primary_keys = vec![];
32
33        while let Some(result) = cursor.next().await {
34            match result {
35                Ok(document) => {
36                    let primary_key_field = ServiceEntityConfig::get_primary_key_field(&entity)?;
37                    let primary_key = document.get(primary_key_field.name).unwrap();
38                    primary_keys.push(primary_key.clone());
39                }
40                Err(e) => {
41                    return Err(async_graphql::Error::new(e.to_string()));
42                }
43            }
44        }
45
46        let ids_doc = doc! {"_id": {"$in": primary_keys}};
47
48        coll.update_many(ids_doc.clone(), doc! {"$set": update_doc}, None)
49            .await
50            .map_err(|e| async_graphql::Error::new(e.to_string()))?;
51
52        let mut cursor = coll
53            .find(ids_doc.clone(), None)
54            .await
55            .map_err(|e| async_graphql::Error::new(e.to_string()))?;
56
57        let mut documents = vec![];
58        while let Some(result) = cursor.next().await {
59            match result {
60                Ok(document) => {
61                    documents.push(Some(document));
62                }
63                Err(e) => {
64                    return Err(async_graphql::Error::new(e.to_string()));
65                }
66            }
67        }
68
69        Ok(documents)
70    }
71}