1use std::{
6 io,
7 path::{Path, PathBuf},
8};
9
10use redb::ReadableTable;
11
12use super::{
13 tables::{ReadableTables, Tables},
14 ActorError, ActorMessage, ActorResult, ActorState, DataLocation, EntryState, FilterPredicate,
15 OutboardLocation, OuterResult, Store, StoreInner,
16};
17use crate::{
18 store::{mutable_mem_storage::SizeInfo, DbIter},
19 util::raw_outboard_size,
20 Hash,
21};
22
23#[derive(derive_more::Debug)]
25pub enum EntryData {
26 Complete {
28 #[debug("data")]
30 data: Vec<u8>,
31 #[debug("outboard")]
33 outboard: Vec<u8>,
34 },
35 Partial {
37 #[debug("data")]
39 data: Vec<u8>,
40 #[debug("outboard")]
42 outboard: Vec<u8>,
43 #[debug("sizes")]
45 sizes: Vec<u8>,
46 },
47}
48
49impl Store {
50 #[cfg(test)]
52 pub(crate) async fn entry_state(&self, hash: Hash) -> io::Result<EntryStateResponse> {
53 Ok(self.0.entry_state(hash).await?)
54 }
55
56 async fn all_blobs(&self) -> io::Result<DbIter<Hash>> {
57 Ok(Box::new(self.0.all_blobs().await?.into_iter()))
58 }
59
60 pub async fn transform_entries(
63 &self,
64 transform: impl Fn(Hash, EntryData) -> Option<EntryData> + Send + Sync,
65 ) -> io::Result<()> {
66 let blobs = self.all_blobs().await?;
67 for blob in blobs {
68 let hash = blob?;
69 let entry = self.get_full_entry_state(hash).await?;
70 if let Some(entry) = entry {
71 let entry1 = transform(hash, entry);
72 self.set_full_entry_state(hash, entry1).await?;
73 }
74 }
75 Ok(())
76 }
77
78 pub(crate) async fn set_full_entry_state(
81 &self,
82 hash: Hash,
83 entry: Option<EntryData>,
84 ) -> io::Result<()> {
85 Ok(self.0.set_full_entry_state(hash, entry).await?)
86 }
87
88 pub(crate) async fn get_full_entry_state(&self, hash: Hash) -> io::Result<Option<EntryData>> {
91 Ok(self.0.get_full_entry_state(hash).await?)
92 }
93
94 pub fn owned_data_path(&self, hash: &Hash) -> PathBuf {
96 self.0.path_options.owned_data_path(hash)
97 }
98
99 pub fn owned_outboard_path(&self, hash: &Hash) -> PathBuf {
101 self.0.path_options.owned_outboard_path(hash)
102 }
103}
104
105impl StoreInner {
106 #[cfg(test)]
107 async fn entry_state(&self, hash: Hash) -> OuterResult<EntryStateResponse> {
108 let (tx, rx) = oneshot::channel();
109 self.tx.send(ActorMessage::EntryState { hash, tx }).await?;
110 Ok(rx.await??)
111 }
112
113 async fn set_full_entry_state(&self, hash: Hash, entry: Option<EntryData>) -> OuterResult<()> {
114 let (tx, rx) = oneshot::channel();
115 self.tx
116 .send(ActorMessage::SetFullEntryState { hash, entry, tx })
117 .await?;
118 Ok(rx.await??)
119 }
120
121 async fn get_full_entry_state(&self, hash: Hash) -> OuterResult<Option<EntryData>> {
122 let (tx, rx) = oneshot::channel();
123 self.tx
124 .send(ActorMessage::GetFullEntryState { hash, tx })
125 .await?;
126 Ok(rx.await??)
127 }
128
129 async fn all_blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
130 let (tx, rx) = oneshot::channel();
131 let filter: FilterPredicate<Hash, EntryState> =
132 Box::new(|_i, k, v| Some((k.value(), v.value())));
133 self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
134 let blobs = rx.await?;
135 let res = blobs?
136 .into_iter()
137 .map(|r| {
138 r.map(|(hash, _)| hash)
139 .map_err(|e| ActorError::from(e).into())
140 })
141 .collect::<Vec<_>>();
142 Ok(res)
143 }
144}
145
146#[cfg(test)]
147#[derive(Debug)]
148pub(crate) struct EntryStateResponse {
149 pub mem: Option<crate::store::bao_file::BaoFileHandle>,
150 pub db: Option<EntryState<Vec<u8>>>,
151}
152
153impl ActorState {
154 pub(super) fn get_full_entry_state(
155 &mut self,
156 tables: &impl ReadableTables,
157 hash: Hash,
158 ) -> ActorResult<Option<EntryData>> {
159 let data_path = self.options.path.owned_data_path(&hash);
160 let outboard_path = self.options.path.owned_outboard_path(&hash);
161 let sizes_path = self.options.path.owned_sizes_path(&hash);
162 let entry = match tables.blobs().get(hash)? {
163 Some(guard) => match guard.value() {
164 EntryState::Complete {
165 data_location,
166 outboard_location,
167 } => {
168 let data = match data_location {
169 DataLocation::External(paths, size) => {
170 let path = paths.first().ok_or_else(|| {
171 ActorError::Inconsistent("external data missing".to_owned())
172 })?;
173 let res = std::fs::read(path)?;
174 if res.len() != size as usize {
175 return Err(ActorError::Inconsistent(
176 "external data size mismatch".to_owned(),
177 ));
178 }
179 res
180 }
181 DataLocation::Owned(size) => {
182 let res = std::fs::read(data_path)?;
183 if res.len() != size as usize {
184 return Err(ActorError::Inconsistent(
185 "owned data size mismatch".to_owned(),
186 ));
187 }
188 res
189 }
190 DataLocation::Inline(_) => {
191 let data = tables.inline_data().get(hash)?.ok_or_else(|| {
192 ActorError::Inconsistent("inline data missing".to_owned())
193 })?;
194 data.value().to_vec()
195 }
196 };
197 let expected_outboard_size = raw_outboard_size(data.len() as u64);
198 let outboard = match outboard_location {
199 OutboardLocation::Owned => std::fs::read(outboard_path)?,
200 OutboardLocation::Inline(_) => tables
201 .inline_outboard()
202 .get(hash)?
203 .ok_or_else(|| {
204 ActorError::Inconsistent("inline outboard missing".to_owned())
205 })?
206 .value()
207 .to_vec(),
208 OutboardLocation::NotNeeded => Vec::new(),
209 };
210 if outboard.len() != expected_outboard_size as usize {
211 return Err(ActorError::Inconsistent(
212 "outboard size mismatch".to_owned(),
213 ));
214 }
215 Some(EntryData::Complete { data, outboard })
216 }
217 EntryState::Partial { .. } => {
218 let data = std::fs::read(data_path)?;
219 let outboard = std::fs::read(outboard_path)?;
220 let sizes = std::fs::read(sizes_path)?;
221 Some(EntryData::Partial {
222 data,
223 outboard,
224 sizes,
225 })
226 }
227 },
228 None => None,
229 };
230 Ok(entry)
231 }
232
233 pub(super) fn set_full_entry_state(
234 &mut self,
235 tables: &mut Tables,
236 hash: Hash,
237 entry: Option<EntryData>,
238 ) -> ActorResult<()> {
239 let data_path = self.options.path.owned_data_path(&hash);
240 let outboard_path = self.options.path.owned_outboard_path(&hash);
241 let sizes_path = self.options.path.owned_sizes_path(&hash);
242 std::fs::remove_file(&outboard_path).ok();
244 std::fs::remove_file(&data_path).ok();
245 std::fs::remove_file(&sizes_path).ok();
246 tables.inline_data.remove(&hash)?;
247 tables.inline_outboard.remove(&hash)?;
248 let Some(entry) = entry else {
249 tables.blobs.remove(&hash)?;
250 return Ok(());
251 };
252 let entry = match entry {
254 EntryData::Complete { data, outboard } => {
255 let data_size = data.len() as u64;
256 let data_location = if data_size > self.options.inline.max_data_inlined {
257 std::fs::write(data_path, &data)?;
258 DataLocation::Owned(data_size)
259 } else {
260 tables.inline_data.insert(hash, data.as_slice())?;
261 DataLocation::Inline(())
262 };
263 let outboard_size = outboard.len() as u64;
264 let outboard_location = if outboard_size > self.options.inline.max_outboard_inlined
265 {
266 std::fs::write(outboard_path, &outboard)?;
267 OutboardLocation::Owned
268 } else if outboard_size > 0 {
269 tables.inline_outboard.insert(hash, outboard.as_slice())?;
270 OutboardLocation::Inline(())
271 } else {
272 OutboardLocation::NotNeeded
273 };
274 EntryState::Complete {
275 data_location,
276 outboard_location,
277 }
278 }
279 EntryData::Partial {
280 data,
281 outboard,
282 sizes,
283 } => {
284 std::fs::write(data_path, data)?;
285 std::fs::write(outboard_path, outboard)?;
286 std::fs::write(sizes_path, sizes)?;
287 EntryState::Partial { size: None }
288 }
289 };
290 tables.blobs.insert(hash, entry)?;
292 Ok(())
293 }
294
295 #[cfg(test)]
296 pub(super) fn entry_state(
297 &mut self,
298 tables: &impl ReadableTables,
299 hash: Hash,
300 ) -> ActorResult<EntryStateResponse> {
301 let mem = self.handles.get(&hash).and_then(|weak| weak.upgrade());
302 let db = match tables.blobs().get(hash)? {
303 Some(entry) => Some({
304 match entry.value() {
305 EntryState::Complete {
306 data_location,
307 outboard_location,
308 } => {
309 let data_location = match data_location {
310 DataLocation::Inline(()) => {
311 let data = tables.inline_data().get(hash)?.ok_or_else(|| {
312 ActorError::Inconsistent("inline data missing".to_owned())
313 })?;
314 DataLocation::Inline(data.value().to_vec())
315 }
316 DataLocation::Owned(x) => DataLocation::Owned(x),
317 DataLocation::External(p, s) => DataLocation::External(p, s),
318 };
319 let outboard_location = match outboard_location {
320 OutboardLocation::Inline(()) => {
321 let outboard =
322 tables.inline_outboard().get(hash)?.ok_or_else(|| {
323 ActorError::Inconsistent(
324 "inline outboard missing".to_owned(),
325 )
326 })?;
327 OutboardLocation::Inline(outboard.value().to_vec())
328 }
329 OutboardLocation::Owned => OutboardLocation::Owned,
330 OutboardLocation::NotNeeded => OutboardLocation::NotNeeded,
331 };
332 EntryState::Complete {
333 data_location,
334 outboard_location,
335 }
336 }
337 EntryState::Partial { size } => EntryState::Partial { size },
338 }
339 }),
340 None => None,
341 };
342 Ok(EntryStateResponse { mem, db })
343 }
344}
345
346#[derive(Debug)]
348pub enum MakePartialResult {
349 Retain,
351 Remove,
353 Truncate(u64),
355}
356
357pub fn make_partial(
359 path: &Path,
360 f: impl Fn(Hash, u64) -> MakePartialResult + Send + Sync,
361) -> io::Result<()> {
362 tracing::info!("starting runtime for make_partial");
363 let rt = tokio::runtime::Builder::new_current_thread()
364 .enable_all()
365 .build()?;
366 rt.block_on(async move {
367 let blobs_path = path.join("blobs");
368 let store = Store::load(blobs_path).await?;
369 store
370 .transform_entries(|hash, entry| match &entry {
371 EntryData::Complete { data, outboard } => {
372 let res = f(hash, data.len() as u64);
373 tracing::info!("make_partial: {} {:?}", hash, res);
374 match res {
375 MakePartialResult::Retain => Some(entry),
376 MakePartialResult::Remove => None,
377 MakePartialResult::Truncate(size) => {
378 let current_size = data.len() as u64;
379 if size < current_size {
380 let size = size as usize;
381 let sizes = SizeInfo::complete(current_size).to_vec();
382 Some(EntryData::Partial {
383 data: data[..size].to_vec(),
384 outboard: outboard.to_vec(),
385 sizes,
386 })
387 } else {
388 Some(entry)
389 }
390 }
391 }
392 }
393 EntryData::Partial { .. } => Some(entry),
394 })
395 .await?;
396 std::io::Result::Ok(())
397 })?;
398 drop(rt);
399 tracing::info!("done with make_partial");
400 Ok(())
401}