libblobd_direct/op/
commit_object.rs

1use super::delete_object::reap_object;
2use super::OpError;
3use super::OpResult;
4use crate::ctx::Ctx;
5use crate::incomplete_token::IncompleteToken;
6use crate::object::ObjectState;
7use crate::objects::ObjectId;
8use dashmap::mapref::entry::Entry;
9use rand::thread_rng;
10use rand::Rng;
11use std::sync::atomic::Ordering;
12use std::sync::Arc;
13
14pub struct OpCommitObjectInput {
15  pub incomplete_token: IncompleteToken,
16  /// If true, the object will only be committed if there is no existing committed object with the same key; if there is an existing object, it will be left untouched, and this object will be deleted instead.
17  pub if_not_exists: bool,
18}
19
20pub struct OpCommitObjectOutput {
21  /// This will only be None if `if_not_exists` is true and an existing object with the same key was present.
22  pub object_id: Option<ObjectId>,
23}
24
25pub(crate) async fn op_commit_object(
26  ctx: Arc<Ctx>,
27  req: OpCommitObjectInput,
28) -> OpResult<OpCommitObjectOutput> {
29  let Some(obj) = ctx
30    .incomplete_objects
31    .write()
32    .remove(&req.incomplete_token.object_id)
33  else {
34    return Err(OpError::ObjectNotFound);
35  };
36
37  obj
38    .update_state_then_ensure_no_writers(ObjectState::Committed)
39    .await;
40
41  let (to_delete, new_object_id) = match ctx.committed_objects.entry(obj.key.clone()) {
42    Entry::Occupied(_) if req.if_not_exists => (Some(obj), None),
43    e => {
44      // For crash consistency, we must generate a new object ID (such that it's different from any existing incomplete or committed object's ID) for the object we're about to commit; otherwise, we may have two objects with the same key (e.g. we crash before we manage to delete the existing committed one), and if versioning isn't enabled then it isn't clear which is the winner (objects can be committed in a different order than creation).
45      let new_object_id: ObjectId = thread_rng().gen();
46      let new_obj = obj.with_new_id(new_object_id);
47      let existing = match e {
48        Entry::Occupied(mut e) => Some(e.insert(new_obj)),
49        Entry::Vacant(e) => {
50          e.insert(new_obj);
51          None
52        }
53      };
54      ctx
55        .tuples
56        .update_object_id_and_state(
57          req.incomplete_token.object_id,
58          new_object_id,
59          ObjectState::Committed,
60        )
61        .await;
62      (existing, Some(new_object_id))
63    }
64  };
65
66  // Only delete AFTER we're certain the updated object tuple we're committing has been persisted to disk.
67  // See `op_delete_object` for why this is safe to do at any time for a committed object.
68  if let Some(to_delete) = to_delete.as_ref() {
69    reap_object(&ctx, to_delete).await;
70  };
71
72  ctx
73    .metrics
74    .0
75    .commit_op_count
76    .fetch_add(1, Ordering::Relaxed);
77  ctx
78    .metrics
79    .0
80    .incomplete_object_count
81    .fetch_sub(1, Ordering::Relaxed);
82  if new_object_id.is_some() && to_delete.is_none() {
83    // We did the commit AND did NOT delete an existing object.
84    ctx
85      .metrics
86      .0
87      .committed_object_count
88      .fetch_add(1, Ordering::Relaxed);
89  }
90
91  Ok(OpCommitObjectOutput {
92    object_id: new_object_id,
93  })
94}