use super::{
MongodbConfigStore,
serde::{now_as_datetime, transform_from_mongo, transform_to_mongo, try_to_object_id, try_to_uuid},
};
use crate::storage::Error;
use mongodb::{ClientSession, Collection, bson::Document, results::UpdateResult};
use pbbson::Model;
use pbbson::bson::{Bson, doc};
use std::fmt::Debug;
use std::str::FromStr;
pub async fn update<B: Clone + Debug + FromStr + Send + Sync + ToString>(
this: &MongodbConfigStore<B>,
mut session: Option<&mut ClientSession>,
bucket: B,
model: Model,
) -> Result<Model, Error> {
let coll: Collection<Document> = this.db.collection(&bucket.to_string());
let model_id: Bson = match try_to_object_id(&model.id()?) {
Ok(id) => id.into(),
Err(_e) => try_to_uuid(&model.id()?)?.into(),
};
let existing = this
.find_in_session(&mut session, bucket.clone(), &model_id.clone())
.await?;
let mut model = model.clone();
let now = now_as_datetime();
model.set_datetime("updatedAt", now);
let local_model = transform_to_mongo(model.clone())?;
let maybe_updated_by_account_id = match local_model.get("updatedByAccountId") {
Some(updated_by_account_id) => Some(updated_by_account_id.clone()),
None => model.get("createdByAccountId").cloned(),
};
let filter = doc! {"_id": &model_id};
let _res: UpdateResult = {
let op = coll.replace_one(filter, local_model);
if let Some(ref mut session) = session {
op.session(&mut **session).await?
} else {
op.await?
}
};
let belongs_tos = this
.belongs_tos_by_bucket
.get(&bucket.to_string())
.cloned()
.unwrap_or_default();
for belongs_to in belongs_tos.iter() {
let existing_fk = existing.get(&belongs_to.local);
let new_fk = model.get(&belongs_to.local);
let new_fk = match new_fk.as_ref() {
Some(Bson::String(s)) => Some(Bson::ObjectId(try_to_object_id(s)?)),
_ => new_fk.cloned(),
};
match existing_fk {
None => continue,
Some(existing_fk_value) => {
if existing_fk == new_fk.as_ref() {
continue;
}
let existing_fk_object_id = match existing_fk_value.as_object_id() {
Some(existing_fk_object_id) => existing_fk_object_id,
None => continue,
};
let remote_model = this
.find_in_session(&mut session, belongs_to.remote.clone(), &existing_fk_object_id.into())
.await?;
if let Some(inverse) = belongs_to.inverse.as_ref() {
let remote_fk_value = remote_model.get(inverse.clone());
if let Some(Bson::Array(array)) = remote_fk_value {
let array: Vec<_> = array.iter().filter(|remote_fk| *remote_fk != &model_id).collect();
let mut remote_model = remote_model.clone();
remote_model.insert(inverse.clone(), array);
if let Some(ref account_id) = maybe_updated_by_account_id {
remote_model.insert("updatedByAccountId", account_id.clone());
}
let filter = doc! {"_id": &existing_fk_object_id};
let remote_coll: Collection<Document> = this.db.collection(&belongs_to.remote.to_string());
let res: UpdateResult = {
let op = remote_coll.replace_one(filter, remote_model);
if let Some(ref mut session) = session {
op.session(&mut **session).await?
} else {
op.await?
}
};
if res.modified_count > 0 {
log::debug!("Updated {} to remove fk", belongs_to.remote.to_string());
}
}
}
}
}
}
for belongs_to in belongs_tos.iter() {
let existing_fk = existing.get(&belongs_to.local);
let new_fk = model.get(&belongs_to.local);
let new_fk = match new_fk.as_ref() {
Some(Bson::String(s)) => Some(Bson::ObjectId(try_to_object_id(s)?)),
_ => new_fk.cloned(),
};
match new_fk {
None => continue,
Some(ref new_fk_value) => {
if existing_fk == new_fk.as_ref() {
continue;
}
let new_fk_object_id = match new_fk_value.as_object_id() {
None => continue,
Some(object_id) => object_id,
};
let remote_model = this
.find_in_session(&mut session, belongs_to.remote.clone(), &new_fk_object_id.into())
.await?;
if let Some(inverse) = belongs_to.inverse.as_ref() {
let remote_fk_value = remote_model.get(inverse.clone());
if let Some(Bson::Array(array)) = remote_fk_value {
let mut array = array.clone();
array.push(model_id.clone());
let mut remote_model = remote_model.clone();
remote_model.insert(inverse.clone(), array);
if let Some(ref account_id) = maybe_updated_by_account_id {
remote_model.insert("updatedByAccountId", account_id.clone());
}
let filter = doc! {"_id": &new_fk_object_id};
let remote_coll: Collection<Document> = this.db.collection(&belongs_to.remote.to_string());
let res: UpdateResult = {
let op = remote_coll.replace_one(filter, remote_model);
if let Some(ref mut session) = session {
op.session(&mut **session).await?
} else {
op.await?
}
};
if res.modified_count > 0 {
log::debug!("Updated {} to add fk", belongs_to.remote.to_string());
}
}
}
}
}
}
for has_many in this
.has_manys_by_bucket
.get(&bucket.to_string())
.cloned()
.unwrap_or_default()
{
let existing_local_value = existing.get(&has_many.local);
let local_value = model.get(&has_many.local);
match (existing_local_value, local_value) {
(Some(Bson::Array(existing_array)), Some(Bson::Array(array))) => {
match has_many.inverse.clone() {
None => {
}
Some(inverse) => {
let remote_coll: Collection<Document> = this.db.collection(&has_many.remote.to_string());
for existing_id in existing_array.iter() {
if array.contains(existing_id) {
continue;
}
if let Bson::String(fk_object_id) = existing_id {
let fk_object_id = try_to_object_id(fk_object_id)?;
let mut set = doc! {&inverse: Bson::Null, "updatedAt": now};
if let Some(ref account_id) = maybe_updated_by_account_id {
set.insert("updatedByAccountId", account_id.clone());
}
let update = doc! {"$set": set};
let filter = doc! {"_id": &fk_object_id};
let res: UpdateResult = {
let op = remote_coll.update_one(filter, update);
if let Some(ref mut session) = session {
op.session(&mut **session).await?
} else {
op.await?
}
};
if res.modified_count > 0 {
log::debug!("Updated {} to remove fk", has_many.remote.to_string());
}
}
}
for id in array.iter() {
if existing_array.contains(id) {
continue;
}
if let Bson::String(fk_object_id) = id {
let fk_object_id = try_to_object_id(fk_object_id)?;
let mut set = doc! {&inverse: &model_id, "updatedAt": now};
if let Some(ref account_id) = maybe_updated_by_account_id {
set.insert("updatedByAccountId", account_id.clone());
}
let update = doc! {"$set": set};
let filter = doc! {"_id": &fk_object_id};
let res: UpdateResult = {
let op = remote_coll.update_one(filter, update);
if let Some(ref mut session) = session {
op.session(&mut **session).await?
} else {
op.await?
}
};
if res.modified_count > 0 {
log::debug!("Updated {} to add fk", has_many.remote.to_string());
}
}
}
}
}
}
_ => {
log::error!("Unknown fk type: {local_value:?}");
}
}
}
transform_from_mongo(model)
}