use super::delete_object::reap_object;
use super::OpError;
use super::OpResult;
use crate::ctx::Ctx;
use crate::incomplete_token::IncompleteToken;
use crate::object::ObjectState;
use std::sync::atomic::Ordering;
use std::sync::Arc;
pub struct OpCommitObjectInput {
  pub incomplete_token: IncompleteToken,
}
pub struct OpCommitObjectOutput {
  pub object_id: u64,
}
pub(crate) async fn op_commit_object(
  ctx: Arc<Ctx>,
  req: OpCommitObjectInput,
) -> OpResult<OpCommitObjectOutput> {
  let Some(obj) = ctx.incomplete_objects.write().remove(&req.incomplete_token.object_id) else {
    return Err(OpError::ObjectNotFound);
  };
  obj
    .update_state_then_ensure_no_writers(ObjectState::Committed)
    .await;
  let new_object_id = ctx.next_object_id.fetch_add(1, Ordering::Relaxed);
  let existing = ctx
    .committed_objects
    .insert(obj.key.clone(), obj.with_new_id(new_object_id));
  ctx
    .tuples
    .update_object_id_and_state(
      req.incomplete_token.object_id,
      new_object_id,
      ObjectState::Committed,
    )
    .await;
  if let Some(existing) = existing.as_ref() {
    reap_object(&ctx, &existing).await;
  };
  ctx
    .metrics
    .0
    .commit_op_count
    .fetch_add(1, Ordering::Relaxed);
  ctx
    .metrics
    .0
    .incomplete_object_count
    .fetch_sub(1, Ordering::Relaxed);
  if existing.is_none() {
    ctx
      .metrics
      .0
      .committed_object_count
      .fetch_add(1, Ordering::Relaxed);
  };
  Ok(OpCommitObjectOutput {
    object_id: new_object_id,
  })
}