libblobd_direct/op/
write_object.rs1use super::OpError;
2use super::OpResult;
3use crate::ctx::Ctx;
4use crate::incomplete_token::IncompleteToken;
5use crate::object::calc_object_layout;
6use crate::object::ObjectState;
7use crate::util::ceil_pow2;
8use crate::util::div_pow2;
9use crate::util::is_multiple_of_pow2;
10use futures::Stream;
11use futures::StreamExt;
12use itertools::Itertools;
13use off64::u32;
14use off64::u64;
15use off64::usz;
16use std::cmp::max;
17use std::cmp::min;
18use std::error::Error;
19use std::iter::empty;
20use std::sync::atomic::Ordering::Relaxed;
21use std::sync::Arc;
22use tracing::trace;
23use tracing::warn;
24
25pub struct OpWriteObjectInput<
26 D: AsRef<[u8]>,
27 S: Unpin + Stream<Item = Result<D, Box<dyn Error + Send + Sync>>>,
28> {
29 pub incomplete_token: IncompleteToken,
30 pub offset: u64,
31 pub data_len: u64,
32 pub data_stream: S,
33}
34
35pub struct OpWriteObjectOutput {}
36
37pub(crate) async fn op_write_object<
38 D: AsRef<[u8]>,
39 S: Unpin + Stream<Item = Result<D, Box<dyn Error + Send + Sync>>>,
40>(
41 ctx: Arc<Ctx>,
42 mut req: OpWriteObjectInput<D, S>,
43) -> OpResult<OpWriteObjectOutput> {
44 let len = req.data_len;
45 let IncompleteToken { object_id, .. } = req.incomplete_token;
46
47 let Some(obj) = ctx.incomplete_objects.read().get(&object_id).cloned() else {
48 return Err(OpError::ObjectNotFound);
49 };
50
51 if !is_multiple_of_pow2(req.offset, ctx.pages.lpage_size_pow2) {
52 return Err(OpError::UnalignedWrite);
54 };
55 if len > ctx.pages.lpage_size() {
56 return Err(OpError::InexactWriteLength);
58 };
59
60 if req.offset + len > obj.size {
61 return Err(OpError::RangeOutOfBounds);
63 };
64
65 if req.offset + len != min(req.offset + ctx.pages.lpage_size(), obj.size) {
66 return Err(OpError::InexactWriteLength);
68 };
69
70 ctx.metrics.0.write_op_count.fetch_add(1, Relaxed);
71 ctx
72 .metrics
73 .0
74 .write_op_bytes_requested
75 .fetch_add(len, Relaxed);
76
77 let layout = calc_object_layout(&ctx.pages, obj.size);
78
79 let write_dev_offsets = {
81 let idx = u32!(div_pow2(req.offset, ctx.pages.lpage_size_pow2));
82 if idx < layout.lpage_count {
83 vec![(ctx.pages.lpage_size(), obj.lpage_dev_offsets[usz!(idx)])]
84 } else {
85 let mut offsets = layout
86 .tail_page_sizes_pow2
87 .into_iter()
88 .map(|(i, sz)| (1 << sz, obj.tail_page_dev_offsets[usz!(i)]))
89 .collect_vec();
90 offsets.last_mut().map(|(page_size, _page_dev_offset)| {
92 let mod_ = obj.size % *page_size;
93 if mod_ != 0 {
94 *page_size = mod_;
95 };
96 });
97 offsets
98 }
99 };
100
101 let mut written = 0;
102 let mut write_page_idx = 0;
103 let mut buf = Vec::new();
104 loop {
105 let maybe_chunk = req.data_stream.next().await;
106 if let Some(chunk) = maybe_chunk {
107 buf.extend_from_slice(
108 chunk
109 .map_err(|err| OpError::DataStreamError(Box::from(err)))?
110 .as_ref(),
111 );
112 } else if buf.is_empty() {
113 break;
115 };
116 let buf_len = u64!(buf.len());
117 if written + buf_len > len {
118 warn!(
119 received = written + buf_len,
120 declared = len,
121 "stream provided more data than declared"
122 );
123 return Err(OpError::DataStreamLengthMismatch);
124 };
125
126 let (amount_to_write, page_dev_offset) = write_dev_offsets[write_page_idx];
128 if buf_len < amount_to_write {
129 continue;
130 };
131 {
132 let lock = obj
133 .lock_for_writing_if_still_valid(ObjectState::Incomplete)
134 .await?;
135 let mut write_data = ctx.pages.allocate_uninitialised(max(
138 ctx.pages.spage_size(),
139 ceil_pow2(amount_to_write, ctx.pages.spage_size_pow2),
140 ));
141 write_data[..usz!(amount_to_write)].copy_from_slice(&buf[..usz!(amount_to_write)]);
142 buf.splice(..usz!(amount_to_write), empty());
143 let write_data_len = write_data.len();
145 ctx.device.write_at(page_dev_offset, write_data).await;
146 ctx
147 .metrics
148 .0
149 .write_op_bytes_written
150 .fetch_add(u64!(write_data_len), Relaxed);
151 trace!(
152 object_id,
153 previously_written = written,
154 page_write_amount = amount_to_write,
155 "wrote page"
156 );
157 written += amount_to_write;
158 write_page_idx += 1;
159 drop(lock);
160 };
161 }
162 if written != len {
163 warn!(
164 received = written,
165 declared = len,
166 "stream provided fewer data than declared"
167 );
168 return Err(OpError::DataStreamLengthMismatch);
169 };
170
171 ctx.device.sync().await;
172
173 Ok(OpWriteObjectOutput {})
174}