libblobd_direct/op/
commit_object.rs1use super::delete_object::reap_object;
2use super::OpError;
3use super::OpResult;
4use crate::ctx::Ctx;
5use crate::incomplete_token::IncompleteToken;
6use crate::object::ObjectState;
7use crate::objects::ObjectId;
8use dashmap::mapref::entry::Entry;
9use rand::thread_rng;
10use rand::Rng;
11use std::sync::atomic::Ordering;
12use std::sync::Arc;
13
14pub struct OpCommitObjectInput {
15 pub incomplete_token: IncompleteToken,
16 pub if_not_exists: bool,
18}
19
20pub struct OpCommitObjectOutput {
21 pub object_id: Option<ObjectId>,
23}
24
25pub(crate) async fn op_commit_object(
26 ctx: Arc<Ctx>,
27 req: OpCommitObjectInput,
28) -> OpResult<OpCommitObjectOutput> {
29 let Some(obj) = ctx
30 .incomplete_objects
31 .write()
32 .remove(&req.incomplete_token.object_id)
33 else {
34 return Err(OpError::ObjectNotFound);
35 };
36
37 obj
38 .update_state_then_ensure_no_writers(ObjectState::Committed)
39 .await;
40
41 let (to_delete, new_object_id) = match ctx.committed_objects.entry(obj.key.clone()) {
42 Entry::Occupied(_) if req.if_not_exists => (Some(obj), None),
43 e => {
44 let new_object_id: ObjectId = thread_rng().gen();
46 let new_obj = obj.with_new_id(new_object_id);
47 let existing = match e {
48 Entry::Occupied(mut e) => Some(e.insert(new_obj)),
49 Entry::Vacant(e) => {
50 e.insert(new_obj);
51 None
52 }
53 };
54 ctx
55 .tuples
56 .update_object_id_and_state(
57 req.incomplete_token.object_id,
58 new_object_id,
59 ObjectState::Committed,
60 )
61 .await;
62 (existing, Some(new_object_id))
63 }
64 };
65
66 if let Some(to_delete) = to_delete.as_ref() {
69 reap_object(&ctx, to_delete).await;
70 };
71
72 ctx
73 .metrics
74 .0
75 .commit_op_count
76 .fetch_add(1, Ordering::Relaxed);
77 ctx
78 .metrics
79 .0
80 .incomplete_object_count
81 .fetch_sub(1, Ordering::Relaxed);
82 if new_object_id.is_some() && to_delete.is_none() {
83 ctx
85 .metrics
86 .0
87 .committed_object_count
88 .fetch_add(1, Ordering::Relaxed);
89 }
90
91 Ok(OpCommitObjectOutput {
92 object_id: new_object_id,
93 })
94}