libblobd_lite/op/
commit_object.rs1use super::OpError;
2use super::OpResult;
3use crate::ctx::Ctx;
4use crate::incomplete_token::IncompleteToken;
5use crate::object::OBJECT_OFF;
6use crate::op::key_debug_str;
7use crate::page::ObjectPageHeader;
8use crate::page::ObjectState;
9use crate::stream::StreamEvent;
10use crate::stream::StreamEventType;
11use off64::int::Off64AsyncReadInt;
12use std::sync::Arc;
13use tracing::trace;
14
15pub struct OpCommitObjectInput {
16 pub incomplete_token: IncompleteToken,
17}
18
19pub struct OpCommitObjectOutput {
20 pub object_id: u64,
21}
22
23pub(crate) async fn op_commit_object(
24 ctx: Arc<Ctx>,
25 req: OpCommitObjectInput,
26) -> OpResult<OpCommitObjectOutput> {
27 let object_dev_offset = req.incomplete_token.object_dev_offset;
28
29 if req
31 .incomplete_token
32 .has_expired(ctx.reap_objects_after_secs)
33 {
34 return Err(OpError::ObjectNotFound);
35 };
36
37 let object_id = ctx
38 .device
39 .read_u64_be_at(object_dev_offset + OBJECT_OFF.id())
40 .await;
41 let key_len = ctx
42 .device
43 .read_u16_be_at(object_dev_offset + OBJECT_OFF.key_len())
44 .await;
45 let key = ctx
46 .device
47 .read_at(object_dev_offset + OBJECT_OFF.key(), key_len.into())
48 .await;
49
50 let (txn, event) = {
51 let mut state = ctx.state.lock().await;
52
53 let mut bkt = ctx.buckets.get_bucket_mut_for_key(&key).await;
54 trace!(
55 key = key_debug_str(&key),
56 object_id,
57 object_dev_offset,
58 "committing object"
59 );
60
61 let hdr = ctx
63 .pages
64 .read_page_header::<ObjectPageHeader>(object_dev_offset)
65 .await;
66 if hdr.state != ObjectState::Incomplete {
67 return Err(OpError::ObjectNotFound);
68 };
69
70 let mut txn = ctx.journal.begin_transaction();
72
73 if !ctx.versioning {
74 bkt
76 .move_object_to_deleted_list_if_exists(&mut txn, &mut state, None)
77 .await;
78 };
79
80 state
82 .incomplete_list
83 .detach(&mut txn, object_dev_offset)
84 .await;
85
86 let cur_bkt_head = bkt.get_head().await;
88
89 bkt.mutate_head(&mut txn, object_dev_offset);
91
92 ctx
94 .pages
95 .update_page_header::<ObjectPageHeader>(&mut txn, object_dev_offset, |o| {
96 debug_assert_eq!(o.state, ObjectState::Incomplete);
97 debug_assert_eq!(o.deleted_sec, None);
98 o.state = ObjectState::Committed;
99 o.next = cur_bkt_head;
100 })
101 .await;
102
103 let event = state.stream.create_event_on_device(&mut txn, StreamEvent {
105 typ: StreamEventType::ObjectCommit,
106 bucket_id: bkt.bucket_id(),
107 object_id,
108 });
109
110 (txn, event)
111 };
112
113 ctx.journal.commit_transaction(txn).await;
114
115 ctx.stream_in_memory.add_event_to_in_memory_list(event);
116
117 trace!(
118 key = key_debug_str(&key),
119 object_id,
120 object_dev_offset,
121 "committed object"
122 );
123
124 Ok(OpCommitObjectOutput { object_id })
125}