libblobd_direct/op/
commit_object.rs1use super::delete_object::reap_object;
2use super::OpError;
3use super::OpResult;
4use crate::ctx::Ctx;
5use crate::incomplete_token::IncompleteToken;
6use crate::object::ObjectState;
7use dashmap::mapref::entry::Entry;
8use std::sync::atomic::Ordering;
9use std::sync::Arc;
10
11pub struct OpCommitObjectInput {
12 pub incomplete_token: IncompleteToken,
13 pub if_not_exists: bool,
15}
16
17pub struct OpCommitObjectOutput {
18 pub object_id: Option<u64>,
20}
21
22pub(crate) async fn op_commit_object(
23 ctx: Arc<Ctx>,
24 req: OpCommitObjectInput,
25) -> OpResult<OpCommitObjectOutput> {
26 let Some(obj) = ctx.incomplete_objects.write().remove(&req.incomplete_token.object_id) else {
27 return Err(OpError::ObjectNotFound);
28 };
29
30 obj
31 .update_state_then_ensure_no_writers(ObjectState::Committed)
32 .await;
33
34 let (to_delete, new_object_id) = match ctx.committed_objects.entry(obj.key.clone()) {
35 Entry::Occupied(_) if req.if_not_exists => (Some(obj), None),
36 e => {
37 let new_object_id = ctx.next_object_id.fetch_add(1, Ordering::Relaxed);
39 let new_obj = obj.with_new_id(new_object_id);
40 let existing = match e {
41 Entry::Occupied(mut e) => Some(e.insert(new_obj)),
42 Entry::Vacant(e) => {
43 e.insert(new_obj);
44 None
45 }
46 };
47 ctx
48 .tuples
49 .update_object_id_and_state(
50 req.incomplete_token.object_id,
51 new_object_id,
52 ObjectState::Committed,
53 )
54 .await;
55 (existing, Some(new_object_id))
56 }
57 };
58
59 if let Some(to_delete) = to_delete.as_ref() {
62 reap_object(&ctx, to_delete).await;
63 };
64
65 ctx
66 .metrics
67 .0
68 .commit_op_count
69 .fetch_add(1, Ordering::Relaxed);
70 ctx
71 .metrics
72 .0
73 .incomplete_object_count
74 .fetch_sub(1, Ordering::Relaxed);
75 if new_object_id.is_some() && to_delete.is_none() {
76 ctx
78 .metrics
79 .0
80 .committed_object_count
81 .fetch_add(1, Ordering::Relaxed);
82 }
83
84 Ok(OpCommitObjectOutput {
85 object_id: new_object_id,
86 })
87}