libblobd_direct/op/
commit_object.rs

1use super::delete_object::reap_object;
2use super::OpError;
3use super::OpResult;
4use crate::ctx::Ctx;
5use crate::incomplete_token::IncompleteToken;
6use crate::object::ObjectState;
7use dashmap::mapref::entry::Entry;
8use std::sync::atomic::Ordering;
9use std::sync::Arc;
10
11pub struct OpCommitObjectInput {
12  pub incomplete_token: IncompleteToken,
13  /// If true, the object will only be committed if there is no existing committed object with the same key; if there is an existing object, it will be left untouched, and this object will be deleted instead.
14  pub if_not_exists: bool,
15}
16
17pub struct OpCommitObjectOutput {
18  /// This will only be None if `if_not_exists` is true and an existing object with the same key was present.
19  pub object_id: Option<u64>,
20}
21
22pub(crate) async fn op_commit_object(
23  ctx: Arc<Ctx>,
24  req: OpCommitObjectInput,
25) -> OpResult<OpCommitObjectOutput> {
26  let Some(obj) = ctx.incomplete_objects.write().remove(&req.incomplete_token.object_id) else {
27    return Err(OpError::ObjectNotFound);
28  };
29
30  obj
31    .update_state_then_ensure_no_writers(ObjectState::Committed)
32    .await;
33
34  let (to_delete, new_object_id) = match ctx.committed_objects.entry(obj.key.clone()) {
35    Entry::Occupied(_) if req.if_not_exists => (Some(obj), None),
36    e => {
37      // For crash consistency, we must generate a new object ID (such that it's greater than any existing incomplete or committed object's ID) for the object we're about to commit; otherwise, we may have two objects with the same key (e.g. we crash before we manage to delete the existing committed one), and if versioning isn't enabled then it isn't clear which is the winner (objects can be committed in a different order than creation).
38      let new_object_id = ctx.next_object_id.fetch_add(1, Ordering::Relaxed);
39      let new_obj = obj.with_new_id(new_object_id);
40      let existing = match e {
41        Entry::Occupied(mut e) => Some(e.insert(new_obj)),
42        Entry::Vacant(e) => {
43          e.insert(new_obj);
44          None
45        }
46      };
47      ctx
48        .tuples
49        .update_object_id_and_state(
50          req.incomplete_token.object_id,
51          new_object_id,
52          ObjectState::Committed,
53        )
54        .await;
55      (existing, Some(new_object_id))
56    }
57  };
58
59  // Only delete AFTER we're certain the updated object tuple we're committing has been persisted to disk.
60  // See `op_delete_object` for why this is safe to do at any time for a committed object.
61  if let Some(to_delete) = to_delete.as_ref() {
62    reap_object(&ctx, to_delete).await;
63  };
64
65  ctx
66    .metrics
67    .0
68    .commit_op_count
69    .fetch_add(1, Ordering::Relaxed);
70  ctx
71    .metrics
72    .0
73    .incomplete_object_count
74    .fetch_sub(1, Ordering::Relaxed);
75  if new_object_id.is_some() && to_delete.is_none() {
76    // We did the commit AND did NOT delete an existing object.
77    ctx
78      .metrics
79      .0
80      .committed_object_count
81      .fetch_add(1, Ordering::Relaxed);
82  }
83
84  Ok(OpCommitObjectOutput {
85    object_id: new_object_id,
86  })
87}