1use std::{
6 io,
7 path::{Path, PathBuf},
8};
9
10use tokio::sync::oneshot;
11
12use super::{
13 tables::{ReadableTables, Tables},
14 ActorError, ActorMessage, ActorResult, ActorState, DataLocation, EntryState, FilterPredicate,
15 OutboardLocation, OuterResult, Store, StoreInner,
16};
17use crate::{
18 store::{mutable_mem_storage::SizeInfo, DbIter},
19 util::raw_outboard_size,
20 Hash,
21};
22use redb::ReadableTable;
23
24#[derive(derive_more::Debug)]
26pub enum EntryData {
27 Complete {
29 #[debug("data")]
31 data: Vec<u8>,
32 #[debug("outboard")]
34 outboard: Vec<u8>,
35 },
36 Partial {
38 #[debug("data")]
40 data: Vec<u8>,
41 #[debug("outboard")]
43 outboard: Vec<u8>,
44 #[debug("sizes")]
46 sizes: Vec<u8>,
47 },
48}
49
50impl Store {
51 #[cfg(test)]
53 pub(crate) async fn entry_state(&self, hash: Hash) -> io::Result<EntryStateResponse> {
54 Ok(self.0.entry_state(hash).await?)
55 }
56
57 async fn all_blobs(&self) -> io::Result<DbIter<Hash>> {
58 Ok(Box::new(self.0.all_blobs().await?.into_iter()))
59 }
60
61 pub async fn transform_entries(
64 &self,
65 transform: impl Fn(Hash, EntryData) -> Option<EntryData> + Send + Sync,
66 ) -> io::Result<()> {
67 let blobs = self.all_blobs().await?;
68 for blob in blobs {
69 let hash = blob?;
70 let entry = self.get_full_entry_state(hash).await?;
71 if let Some(entry) = entry {
72 let entry1 = transform(hash, entry);
73 self.set_full_entry_state(hash, entry1).await?;
74 }
75 }
76 Ok(())
77 }
78
79 pub(crate) async fn set_full_entry_state(
82 &self,
83 hash: Hash,
84 entry: Option<EntryData>,
85 ) -> io::Result<()> {
86 Ok(self.0.set_full_entry_state(hash, entry).await?)
87 }
88
89 pub(crate) async fn get_full_entry_state(&self, hash: Hash) -> io::Result<Option<EntryData>> {
92 Ok(self.0.get_full_entry_state(hash).await?)
93 }
94
95 pub fn owned_data_path(&self, hash: &Hash) -> PathBuf {
97 self.0.path_options.owned_data_path(hash)
98 }
99
100 pub fn owned_outboard_path(&self, hash: &Hash) -> PathBuf {
102 self.0.path_options.owned_outboard_path(hash)
103 }
104}
105
106impl StoreInner {
107 #[cfg(test)]
108 async fn entry_state(&self, hash: Hash) -> OuterResult<EntryStateResponse> {
109 let (tx, rx) = flume::bounded(1);
110 self.tx
111 .send_async(ActorMessage::EntryState { hash, tx })
112 .await?;
113 Ok(rx.recv_async().await??)
114 }
115
116 async fn set_full_entry_state(&self, hash: Hash, entry: Option<EntryData>) -> OuterResult<()> {
117 let (tx, rx) = flume::bounded(1);
118 self.tx
119 .send_async(ActorMessage::SetFullEntryState { hash, entry, tx })
120 .await?;
121 Ok(rx.recv_async().await??)
122 }
123
124 async fn get_full_entry_state(&self, hash: Hash) -> OuterResult<Option<EntryData>> {
125 let (tx, rx) = flume::bounded(1);
126 self.tx
127 .send_async(ActorMessage::GetFullEntryState { hash, tx })
128 .await?;
129 Ok(rx.recv_async().await??)
130 }
131
132 async fn all_blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
133 let (tx, rx) = oneshot::channel();
134 let filter: FilterPredicate<Hash, EntryState> =
135 Box::new(|_i, k, v| Some((k.value(), v.value())));
136 self.tx
137 .send_async(ActorMessage::Blobs { filter, tx })
138 .await?;
139 let blobs = rx.await?;
140 let res = blobs?
141 .into_iter()
142 .map(|r| {
143 r.map(|(hash, _)| hash)
144 .map_err(|e| ActorError::from(e).into())
145 })
146 .collect::<Vec<_>>();
147 Ok(res)
148 }
149}
150
151#[cfg(test)]
152#[derive(Debug)]
153pub(crate) struct EntryStateResponse {
154 pub mem: Option<crate::store::bao_file::BaoFileHandle>,
155 pub db: Option<EntryState<Vec<u8>>>,
156}
157
158impl ActorState {
159 pub(super) fn get_full_entry_state(
160 &mut self,
161 tables: &impl ReadableTables,
162 hash: Hash,
163 ) -> ActorResult<Option<EntryData>> {
164 let data_path = self.options.path.owned_data_path(&hash);
165 let outboard_path = self.options.path.owned_outboard_path(&hash);
166 let sizes_path = self.options.path.owned_sizes_path(&hash);
167 let entry = match tables.blobs().get(hash)? {
168 Some(guard) => match guard.value() {
169 EntryState::Complete {
170 data_location,
171 outboard_location,
172 } => {
173 let data = match data_location {
174 DataLocation::External(paths, size) => {
175 let path = paths.first().ok_or_else(|| {
176 ActorError::Inconsistent("external data missing".to_owned())
177 })?;
178 let res = std::fs::read(path)?;
179 if res.len() != size as usize {
180 return Err(ActorError::Inconsistent(
181 "external data size mismatch".to_owned(),
182 ));
183 }
184 res
185 }
186 DataLocation::Owned(size) => {
187 let res = std::fs::read(data_path)?;
188 if res.len() != size as usize {
189 return Err(ActorError::Inconsistent(
190 "owned data size mismatch".to_owned(),
191 ));
192 }
193 res
194 }
195 DataLocation::Inline(_) => {
196 let data = tables.inline_data().get(hash)?.ok_or_else(|| {
197 ActorError::Inconsistent("inline data missing".to_owned())
198 })?;
199 data.value().to_vec()
200 }
201 };
202 let expected_outboard_size = raw_outboard_size(data.len() as u64);
203 let outboard = match outboard_location {
204 OutboardLocation::Owned => std::fs::read(outboard_path)?,
205 OutboardLocation::Inline(_) => tables
206 .inline_outboard()
207 .get(hash)?
208 .ok_or_else(|| {
209 ActorError::Inconsistent("inline outboard missing".to_owned())
210 })?
211 .value()
212 .to_vec(),
213 OutboardLocation::NotNeeded => Vec::new(),
214 };
215 if outboard.len() != expected_outboard_size as usize {
216 return Err(ActorError::Inconsistent(
217 "outboard size mismatch".to_owned(),
218 ));
219 }
220 Some(EntryData::Complete { data, outboard })
221 }
222 EntryState::Partial { .. } => {
223 let data = std::fs::read(data_path)?;
224 let outboard = std::fs::read(outboard_path)?;
225 let sizes = std::fs::read(sizes_path)?;
226 Some(EntryData::Partial {
227 data,
228 outboard,
229 sizes,
230 })
231 }
232 },
233 None => None,
234 };
235 Ok(entry)
236 }
237
238 pub(super) fn set_full_entry_state(
239 &mut self,
240 tables: &mut Tables,
241 hash: Hash,
242 entry: Option<EntryData>,
243 ) -> ActorResult<()> {
244 let data_path = self.options.path.owned_data_path(&hash);
245 let outboard_path = self.options.path.owned_outboard_path(&hash);
246 let sizes_path = self.options.path.owned_sizes_path(&hash);
247 std::fs::remove_file(&outboard_path).ok();
249 std::fs::remove_file(&data_path).ok();
250 std::fs::remove_file(&sizes_path).ok();
251 tables.inline_data.remove(&hash)?;
252 tables.inline_outboard.remove(&hash)?;
253 let Some(entry) = entry else {
254 tables.blobs.remove(&hash)?;
255 return Ok(());
256 };
257 let entry = match entry {
259 EntryData::Complete { data, outboard } => {
260 let data_size = data.len() as u64;
261 let data_location = if data_size > self.options.inline.max_data_inlined {
262 std::fs::write(data_path, &data)?;
263 DataLocation::Owned(data_size)
264 } else {
265 tables.inline_data.insert(hash, data.as_slice())?;
266 DataLocation::Inline(())
267 };
268 let outboard_size = outboard.len() as u64;
269 let outboard_location = if outboard_size > self.options.inline.max_outboard_inlined
270 {
271 std::fs::write(outboard_path, &outboard)?;
272 OutboardLocation::Owned
273 } else if outboard_size > 0 {
274 tables.inline_outboard.insert(hash, outboard.as_slice())?;
275 OutboardLocation::Inline(())
276 } else {
277 OutboardLocation::NotNeeded
278 };
279 EntryState::Complete {
280 data_location,
281 outboard_location,
282 }
283 }
284 EntryData::Partial {
285 data,
286 outboard,
287 sizes,
288 } => {
289 std::fs::write(data_path, data)?;
290 std::fs::write(outboard_path, outboard)?;
291 std::fs::write(sizes_path, sizes)?;
292 EntryState::Partial { size: None }
293 }
294 };
295 tables.blobs.insert(hash, entry)?;
297 Ok(())
298 }
299
300 #[cfg(test)]
301 pub(super) fn entry_state(
302 &mut self,
303 tables: &impl ReadableTables,
304 hash: Hash,
305 ) -> ActorResult<EntryStateResponse> {
306 let mem = self.handles.get(&hash).and_then(|weak| weak.upgrade());
307 let db = match tables.blobs().get(hash)? {
308 Some(entry) => Some({
309 match entry.value() {
310 EntryState::Complete {
311 data_location,
312 outboard_location,
313 } => {
314 let data_location = match data_location {
315 DataLocation::Inline(()) => {
316 let data = tables.inline_data().get(hash)?.ok_or_else(|| {
317 ActorError::Inconsistent("inline data missing".to_owned())
318 })?;
319 DataLocation::Inline(data.value().to_vec())
320 }
321 DataLocation::Owned(x) => DataLocation::Owned(x),
322 DataLocation::External(p, s) => DataLocation::External(p, s),
323 };
324 let outboard_location = match outboard_location {
325 OutboardLocation::Inline(()) => {
326 let outboard =
327 tables.inline_outboard().get(hash)?.ok_or_else(|| {
328 ActorError::Inconsistent(
329 "inline outboard missing".to_owned(),
330 )
331 })?;
332 OutboardLocation::Inline(outboard.value().to_vec())
333 }
334 OutboardLocation::Owned => OutboardLocation::Owned,
335 OutboardLocation::NotNeeded => OutboardLocation::NotNeeded,
336 };
337 EntryState::Complete {
338 data_location,
339 outboard_location,
340 }
341 }
342 EntryState::Partial { size } => EntryState::Partial { size },
343 }
344 }),
345 None => None,
346 };
347 Ok(EntryStateResponse { mem, db })
348 }
349}
350
351#[derive(Debug)]
353pub enum MakePartialResult {
354 Retain,
356 Remove,
358 Truncate(u64),
360}
361
362pub fn make_partial(
364 path: &Path,
365 f: impl Fn(Hash, u64) -> MakePartialResult + Send + Sync,
366) -> io::Result<()> {
367 tokio::runtime::Builder::new_current_thread()
368 .build()?
369 .block_on(async move {
370 let blobs_path = path.join("blobs");
371 let store = Store::load(blobs_path).await?;
372 store
373 .transform_entries(|hash, entry| match &entry {
374 EntryData::Complete { data, outboard } => match f(hash, data.len() as u64) {
375 MakePartialResult::Retain => Some(entry),
376 MakePartialResult::Remove => None,
377 MakePartialResult::Truncate(size) => {
378 let current_size = data.len() as u64;
379 if size < current_size {
380 let size = size as usize;
381 let sizes = SizeInfo::complete(current_size).to_vec();
382 Some(EntryData::Partial {
383 data: data[..size].to_vec(),
384 outboard: outboard.to_vec(),
385 sizes,
386 })
387 } else {
388 Some(entry)
389 }
390 }
391 },
392 EntryData::Partial { .. } => Some(entry),
393 })
394 .await?;
395 Ok(())
396 })
397}