libblobd_lite/op/
write_object.rs1use super::OpError;
2use super::OpResult;
3use crate::ctx::Ctx;
4use crate::incomplete_token::IncompleteToken;
5use crate::object::calc_object_layout;
6use crate::object::ObjectLayout;
7use crate::object::OBJECT_OFF;
8use crate::page::ObjectPageHeader;
9use crate::page::ObjectState;
10use crate::util::div_pow2;
11use crate::util::is_multiple_of_pow2;
12use futures::Stream;
13use futures::StreamExt;
14use itertools::Itertools;
15use off64::int::Off64AsyncReadInt;
16use off64::int::Off64ReadInt;
17use off64::u64;
18use off64::usz;
19use std::cmp::min;
20use std::error::Error;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::time::timeout;
24use tracing::trace;
25use tracing::warn;
26
27pub struct OpWriteObjectInput<
28 D: AsRef<[u8]>,
29 S: Unpin + Stream<Item = Result<D, Box<dyn Error + Send + Sync>>>,
30> {
31 pub offset: u64,
32 pub incomplete_token: IncompleteToken,
33 pub data_len: u64,
34 pub data_stream: S,
35}
36
37pub struct OpWriteObjectOutput {}
38
39pub(crate) async fn op_write_object<
40 D: AsRef<[u8]>,
41 S: Unpin + Stream<Item = Result<D, Box<dyn Error + Send + Sync>>>,
42>(
43 ctx: Arc<Ctx>,
44 mut req: OpWriteObjectInput<D, S>,
45) -> OpResult<OpWriteObjectOutput> {
46 let len = req.data_len;
47 let object_dev_offset = req.incomplete_token.object_dev_offset;
48 trace!(
49 dev_offset = object_dev_offset,
50 offset = req.offset,
51 length = req.data_len,
52 "writing object"
53 );
54
55 if req
57 .incomplete_token
58 .has_expired(ctx.reap_objects_after_secs)
59 {
60 return Err(OpError::ObjectNotFound);
61 };
62
63 let incomplete_object_is_still_valid = || async {
64 let hdr = ctx
66 .pages
67 .read_page_header::<ObjectPageHeader>(object_dev_offset)
68 .await;
69 hdr.state == ObjectState::Incomplete
70 };
71
72 if !incomplete_object_is_still_valid().await {
73 return Err(OpError::ObjectNotFound);
74 };
75
76 if !is_multiple_of_pow2(req.offset, ctx.pages.lpage_size_pow2) {
77 return Err(OpError::UnalignedWrite);
79 };
80 if len > ctx.pages.lpage_size() {
81 return Err(OpError::InexactWriteLength);
83 };
84
85 let raw = ctx
87 .device
88 .read_at(object_dev_offset, OBJECT_OFF.key())
89 .await;
90 let object_id = raw.read_u64_be_at(OBJECT_OFF.id());
91 let size = raw.read_u40_be_at(OBJECT_OFF.size());
92 let key_len = raw.read_u16_be_at(OBJECT_OFF.key_len());
93 trace!(
94 object_id,
95 object_dev_offset,
96 size,
97 "found object to write to"
98 );
99
100 if req.offset + len > size {
101 return Err(OpError::RangeOutOfBounds);
103 };
104
105 if req.offset + len != min(req.offset + ctx.pages.lpage_size(), size) {
106 return Err(OpError::InexactWriteLength);
108 };
109
110 let ObjectLayout {
111 lpage_count,
112 tail_page_sizes_pow2,
113 } = calc_object_layout(&ctx.pages, size);
114 let off = OBJECT_OFF
115 .with_key_len(key_len)
116 .with_lpages(lpage_count)
117 .with_tail_pages(tail_page_sizes_pow2.len());
118 let write_dev_offsets = {
120 let idx = div_pow2(req.offset, ctx.pages.lpage_size_pow2);
121 if idx < lpage_count {
122 vec![(
123 ctx.pages.lpage_size(),
124 Off64AsyncReadInt::read_u48_be_at(&ctx.device, object_dev_offset + off.lpage(idx)).await,
125 )]
126 } else {
127 let raw = ctx
128 .device
129 .read_at(
130 object_dev_offset + off.tail_pages(),
131 6 * u64!(tail_page_sizes_pow2.len()),
132 )
133 .await;
134 let mut offsets = tail_page_sizes_pow2
135 .into_iter()
136 .map(|(i, sz)| (1 << sz, raw.read_u48_be_at(u64!(i) * 6)))
137 .collect_vec();
138 offsets.last_mut().map(|(page_size, _page_dev_offset)| {
140 let mod_ = size % *page_size;
141 if mod_ != 0 {
142 *page_size = mod_;
143 };
144 });
145 offsets
146 }
147 };
148
149 let mut written = 0;
150 let mut write_page_idx = 0;
151 let mut buf = Vec::new();
152 loop {
153 if !incomplete_object_is_still_valid().await {
155 return Err(OpError::ObjectNotFound);
156 };
157 let Ok(maybe_chunk) = timeout(Duration::from_secs(60), req.data_stream.next()).await else {
158 continue;
160 };
161 if let Some(chunk) = maybe_chunk {
162 buf.extend_from_slice(
163 chunk
164 .map_err(|err| OpError::DataStreamError(Box::from(err)))?
165 .as_ref(),
166 );
167 } else if buf.is_empty() {
168 break;
170 };
171 let buf_len = u64!(buf.len());
172 if written + buf_len > len {
173 warn!(
174 received = written + buf_len,
175 declared = len,
176 "stream provided more data than declared"
177 );
178 return Err(OpError::DataStreamLengthMismatch);
179 };
180
181 if !incomplete_object_is_still_valid().await {
185 return Err(OpError::ObjectNotFound);
186 };
187
188 let (amount_to_write, page_dev_offset) = write_dev_offsets[write_page_idx];
190 if buf_len < amount_to_write {
191 continue;
192 };
193 ctx
195 .device
196 .write_at(
197 page_dev_offset,
198 buf.drain(..usz!(amount_to_write)).collect_vec(),
199 )
200 .await;
201 trace!(
202 object_id,
203 previously_written = written,
204 page_write_amount = amount_to_write,
205 "wrote page"
206 );
207 written += amount_to_write;
208 write_page_idx += 1;
209 }
210 if written != len {
211 warn!(
212 received = written,
213 declared = len,
214 "stream provided fewer data than declared"
215 );
216 return Err(OpError::DataStreamLengthMismatch);
217 };
218
219 #[cfg(not(test))]
221 ctx
222 .device
223 .write_at_with_delayed_sync::<&'static [u8]>(vec![])
224 .await;
225
226 Ok(OpWriteObjectOutput {})
227}