subgraph/data_sources/mongo/services/update_many/
mod.rs1use async_graphql::futures_util::StreamExt;
2use bson::{doc, to_document, Document};
3use log::debug;
4use mongodb::Database;
5
6use crate::configuration::subgraph::entities::ServiceEntityConfig;
7
8use super::Services;
9
10impl Services {
11 pub async fn update_many(
12 db: Database,
13 input: Document,
14 collection: String,
15 entity: &ServiceEntityConfig,
16 ) -> Result<Vec<Option<Document>>, async_graphql::Error> {
17 debug!("Executing Update Many");
18
19 let coll = db.collection::<Document>(&collection);
20
21 let filter = to_document(input.get("query").unwrap()).unwrap();
22 let values = to_document(input.get("values").unwrap()).unwrap();
23
24 let update_doc = Services::create_nested_fields(&values);
25
26 let mut cursor = coll
27 .find(filter.clone(), None)
28 .await
29 .map_err(|e| async_graphql::Error::new(e.to_string()))?;
30
31 let mut primary_keys = vec![];
32
33 while let Some(result) = cursor.next().await {
34 match result {
35 Ok(document) => {
36 let primary_key_field = ServiceEntityConfig::get_primary_key_field(&entity)?;
37 let primary_key = document.get(primary_key_field.name).unwrap();
38 primary_keys.push(primary_key.clone());
39 }
40 Err(e) => {
41 return Err(async_graphql::Error::new(e.to_string()));
42 }
43 }
44 }
45
46 let ids_doc = doc! {"_id": {"$in": primary_keys}};
47
48 coll.update_many(ids_doc.clone(), doc! {"$set": update_doc}, None)
49 .await
50 .map_err(|e| async_graphql::Error::new(e.to_string()))?;
51
52 let mut cursor = coll
53 .find(ids_doc.clone(), None)
54 .await
55 .map_err(|e| async_graphql::Error::new(e.to_string()))?;
56
57 let mut documents = vec![];
58 while let Some(result) = cursor.next().await {
59 match result {
60 Ok(document) => {
61 documents.push(Some(document));
62 }
63 Err(e) => {
64 return Err(async_graphql::Error::new(e.to_string()));
65 }
66 }
67 }
68
69 Ok(documents)
70 }
71}