use super::delete_object::reap_object;
use super::OpError;
use super::OpResult;
use crate::ctx::Ctx;
use crate::incomplete_token::IncompleteToken;
use crate::object::ObjectState;
use std::sync::atomic::Ordering;
use std::sync::Arc;
pub struct OpCommitObjectInput {
pub incomplete_token: IncompleteToken,
}
pub struct OpCommitObjectOutput {
pub object_id: u64,
}
pub(crate) async fn op_commit_object(
ctx: Arc<Ctx>,
req: OpCommitObjectInput,
) -> OpResult<OpCommitObjectOutput> {
let Some(obj) = ctx.incomplete_objects.write().remove(&req.incomplete_token.object_id) else {
return Err(OpError::ObjectNotFound);
};
obj
.update_state_then_ensure_no_writers(ObjectState::Committed)
.await;
let new_object_id = ctx.next_object_id.fetch_add(1, Ordering::Relaxed);
let existing = ctx
.committed_objects
.insert(obj.key.clone(), obj.with_new_id(new_object_id));
ctx
.tuples
.update_object_id_and_state(
req.incomplete_token.object_id,
new_object_id,
ObjectState::Committed,
)
.await;
if let Some(existing) = existing.as_ref() {
reap_object(&ctx, &existing).await;
};
ctx
.metrics
.0
.commit_op_count
.fetch_add(1, Ordering::Relaxed);
ctx
.metrics
.0
.incomplete_object_count
.fetch_sub(1, Ordering::Relaxed);
if existing.is_none() {
ctx
.metrics
.0
.committed_object_count
.fetch_add(1, Ordering::Relaxed);
};
Ok(OpCommitObjectOutput {
object_id: new_object_id,
})
}