use super::delete_object::reap_object;
use super::OpError;
use super::OpResult;
use crate::ctx::Ctx;
use crate::incomplete_token::IncompleteToken;
use crate::object::ObjectState;
use dashmap::mapref::entry::Entry;
use std::sync::atomic::Ordering;
use std::sync::Arc;
pub struct OpCommitObjectInput {
pub incomplete_token: IncompleteToken,
pub if_not_exists: bool,
}
pub struct OpCommitObjectOutput {
pub object_id: Option<u64>,
}
pub(crate) async fn op_commit_object(
ctx: Arc<Ctx>,
req: OpCommitObjectInput,
) -> OpResult<OpCommitObjectOutput> {
let Some(obj) = ctx.incomplete_objects.write().remove(&req.incomplete_token.object_id) else {
return Err(OpError::ObjectNotFound);
};
obj
.update_state_then_ensure_no_writers(ObjectState::Committed)
.await;
let (to_delete, new_object_id) = match ctx.committed_objects.entry(obj.key.clone()) {
Entry::Occupied(_) if req.if_not_exists => (Some(obj), None),
e => {
let new_object_id = ctx.next_object_id.fetch_add(1, Ordering::Relaxed);
let new_obj = obj.with_new_id(new_object_id);
let existing = match e {
Entry::Occupied(mut e) => Some(e.insert(new_obj)),
Entry::Vacant(e) => {
e.insert(new_obj);
None
}
};
ctx
.tuples
.update_object_id_and_state(
req.incomplete_token.object_id,
new_object_id,
ObjectState::Committed,
)
.await;
(existing, Some(new_object_id))
}
};
if let Some(to_delete) = to_delete.as_ref() {
reap_object(&ctx, to_delete).await;
};
ctx
.metrics
.0
.commit_op_count
.fetch_add(1, Ordering::Relaxed);
ctx
.metrics
.0
.incomplete_object_count
.fetch_sub(1, Ordering::Relaxed);
if new_object_id.is_some() && to_delete.is_none() {
ctx
.metrics
.0
.committed_object_count
.fetch_add(1, Ordering::Relaxed);
}
Ok(OpCommitObjectOutput {
object_id: new_object_id,
})
}