libblobd_lite/op/
commit_object.rs

1use super::OpError;
2use super::OpResult;
3use crate::ctx::Ctx;
4use crate::incomplete_token::IncompleteToken;
5use crate::object::OBJECT_OFF;
6use crate::op::key_debug_str;
7use crate::page::ObjectPageHeader;
8use crate::page::ObjectState;
9use crate::stream::StreamEvent;
10use crate::stream::StreamEventType;
11use off64::int::Off64AsyncReadInt;
12use std::sync::Arc;
13use tracing::trace;
14
15pub struct OpCommitObjectInput {
16  pub incomplete_token: IncompleteToken,
17}
18
19pub struct OpCommitObjectOutput {
20  pub object_id: u64,
21}
22
23pub(crate) async fn op_commit_object(
24  ctx: Arc<Ctx>,
25  req: OpCommitObjectInput,
26) -> OpResult<OpCommitObjectOutput> {
27  let object_dev_offset = req.incomplete_token.object_dev_offset;
28
29  // See IncompleteToken for why if the token has not expired, the object definitely still exists (i.e. safe to read any metadata).
30  if req
31    .incomplete_token
32    .has_expired(ctx.reap_objects_after_secs)
33  {
34    return Err(OpError::ObjectNotFound);
35  };
36
37  let object_id = ctx
38    .device
39    .read_u64_be_at(object_dev_offset + OBJECT_OFF.id())
40    .await;
41  let key_len = ctx
42    .device
43    .read_u16_be_at(object_dev_offset + OBJECT_OFF.key_len())
44    .await;
45  let key = ctx
46    .device
47    .read_at(object_dev_offset + OBJECT_OFF.key(), key_len.into())
48    .await;
49
50  let (txn, event) = {
51    let mut state = ctx.state.lock().await;
52
53    let mut bkt = ctx.buckets.get_bucket_mut_for_key(&key).await;
54    trace!(
55      key = key_debug_str(&key),
56      object_id,
57      object_dev_offset,
58      "committing object"
59    );
60
61    // Check while holding lock to prevent two commits to the same object.
62    let hdr = ctx
63      .pages
64      .read_page_header::<ObjectPageHeader>(object_dev_offset)
65      .await;
66    if hdr.state != ObjectState::Incomplete {
67      return Err(OpError::ObjectNotFound);
68    };
69
70    // Don't begin transaction until after possible previous `return` (otherwise our journal will wait forever for the transaction to commit).
71    let mut txn = ctx.journal.begin_transaction();
72
73    if !ctx.versioning {
74      // This will create an event for any deletion, which we want (we don't just want a commit event, as then anyone reading the stream must tracked all seen keys to know when a commit deletes an existing object).
75      bkt
76        .move_object_to_deleted_list_if_exists(&mut txn, &mut state, None)
77        .await;
78    };
79
80    // Detach from incomplete list.
81    state
82      .incomplete_list
83      .detach(&mut txn, object_dev_offset)
84      .await;
85
86    // Get the current bucket head. We use the overlay, so we'll see any change made by the previous `move_object_to_deleted_list_if_exists` call.
87    let cur_bkt_head = bkt.get_head().await;
88
89    // Update bucket head to point to this new inode.
90    bkt.mutate_head(&mut txn, object_dev_offset);
91
92    // Update inode next pointer.
93    ctx
94      .pages
95      .update_page_header::<ObjectPageHeader>(&mut txn, object_dev_offset, |o| {
96        debug_assert_eq!(o.state, ObjectState::Incomplete);
97        debug_assert_eq!(o.deleted_sec, None);
98        o.state = ObjectState::Committed;
99        o.next = cur_bkt_head;
100      })
101      .await;
102
103    // Create stream event.
104    let event = state.stream.create_event_on_device(&mut txn, StreamEvent {
105      typ: StreamEventType::ObjectCommit,
106      bucket_id: bkt.bucket_id(),
107      object_id,
108    });
109
110    (txn, event)
111  };
112
113  ctx.journal.commit_transaction(txn).await;
114
115  ctx.stream_in_memory.add_event_to_in_memory_list(event);
116
117  trace!(
118    key = key_debug_str(&key),
119    object_id,
120    object_dev_offset,
121    "committed object"
122  );
123
124  Ok(OpCommitObjectOutput { object_id })
125}