iroh_blobs/store/fs/
test_support.rs

1//! DB functions to support testing
2//!
3//! For some tests we need to modify the state of the store in ways that are not
4//! possible through the public API. This module provides functions to do that.
5use std::{
6    io,
7    path::{Path, PathBuf},
8};
9
10use redb::ReadableTable;
11
12use super::{
13    tables::{ReadableTables, Tables},
14    ActorError, ActorMessage, ActorResult, ActorState, DataLocation, EntryState, FilterPredicate,
15    OutboardLocation, OuterResult, Store, StoreInner,
16};
17use crate::{
18    store::{mutable_mem_storage::SizeInfo, DbIter},
19    util::raw_outboard_size,
20    Hash,
21};
22
23/// The full state of an entry, including the data.
24#[derive(derive_more::Debug)]
25pub enum EntryData {
26    /// Complete
27    Complete {
28        /// Data
29        #[debug("data")]
30        data: Vec<u8>,
31        /// Outboard
32        #[debug("outboard")]
33        outboard: Vec<u8>,
34    },
35    /// Partial
36    Partial {
37        /// Data
38        #[debug("data")]
39        data: Vec<u8>,
40        /// Outboard
41        #[debug("outboard")]
42        outboard: Vec<u8>,
43        /// Sizes
44        #[debug("sizes")]
45        sizes: Vec<u8>,
46    },
47}
48
49impl Store {
50    /// Get the complete state of an entry, both in memory and in redb.
51    #[cfg(test)]
52    pub(crate) async fn entry_state(&self, hash: Hash) -> io::Result<EntryStateResponse> {
53        Ok(self.0.entry_state(hash).await?)
54    }
55
56    async fn all_blobs(&self) -> io::Result<DbIter<Hash>> {
57        Ok(Box::new(self.0.all_blobs().await?.into_iter()))
58    }
59
60    /// Transform all entries in the store. This is for testing and can be used to get the store
61    /// in a wrong state.
62    pub async fn transform_entries(
63        &self,
64        transform: impl Fn(Hash, EntryData) -> Option<EntryData> + Send + Sync,
65    ) -> io::Result<()> {
66        let blobs = self.all_blobs().await?;
67        for blob in blobs {
68            let hash = blob?;
69            let entry = self.get_full_entry_state(hash).await?;
70            if let Some(entry) = entry {
71                let entry1 = transform(hash, entry);
72                self.set_full_entry_state(hash, entry1).await?;
73            }
74        }
75        Ok(())
76    }
77
78    /// Set the full entry state for a hash. This is for testing and can be used to get the store
79    /// in a wrong state.
80    pub(crate) async fn set_full_entry_state(
81        &self,
82        hash: Hash,
83        entry: Option<EntryData>,
84    ) -> io::Result<()> {
85        Ok(self.0.set_full_entry_state(hash, entry).await?)
86    }
87
88    /// Set the full entry state for a hash. This is for testing and can be used to get the store
89    /// in a wrong state.
90    pub(crate) async fn get_full_entry_state(&self, hash: Hash) -> io::Result<Option<EntryData>> {
91        Ok(self.0.get_full_entry_state(hash).await?)
92    }
93
94    /// Owned data path
95    pub fn owned_data_path(&self, hash: &Hash) -> PathBuf {
96        self.0.path_options.owned_data_path(hash)
97    }
98
99    /// Owned outboard path
100    pub fn owned_outboard_path(&self, hash: &Hash) -> PathBuf {
101        self.0.path_options.owned_outboard_path(hash)
102    }
103}
104
105impl StoreInner {
106    #[cfg(test)]
107    async fn entry_state(&self, hash: Hash) -> OuterResult<EntryStateResponse> {
108        let (tx, rx) = oneshot::channel();
109        self.tx.send(ActorMessage::EntryState { hash, tx }).await?;
110        Ok(rx.await??)
111    }
112
113    async fn set_full_entry_state(&self, hash: Hash, entry: Option<EntryData>) -> OuterResult<()> {
114        let (tx, rx) = oneshot::channel();
115        self.tx
116            .send(ActorMessage::SetFullEntryState { hash, entry, tx })
117            .await?;
118        Ok(rx.await??)
119    }
120
121    async fn get_full_entry_state(&self, hash: Hash) -> OuterResult<Option<EntryData>> {
122        let (tx, rx) = oneshot::channel();
123        self.tx
124            .send(ActorMessage::GetFullEntryState { hash, tx })
125            .await?;
126        Ok(rx.await??)
127    }
128
129    async fn all_blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
130        let (tx, rx) = oneshot::channel();
131        let filter: FilterPredicate<Hash, EntryState> =
132            Box::new(|_i, k, v| Some((k.value(), v.value())));
133        self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
134        let blobs = rx.await?;
135        let res = blobs?
136            .into_iter()
137            .map(|r| {
138                r.map(|(hash, _)| hash)
139                    .map_err(|e| ActorError::from(e).into())
140            })
141            .collect::<Vec<_>>();
142        Ok(res)
143    }
144}
145
146#[cfg(test)]
147#[derive(Debug)]
148pub(crate) struct EntryStateResponse {
149    pub mem: Option<crate::store::bao_file::BaoFileHandle>,
150    pub db: Option<EntryState<Vec<u8>>>,
151}
152
153impl ActorState {
154    pub(super) fn get_full_entry_state(
155        &mut self,
156        tables: &impl ReadableTables,
157        hash: Hash,
158    ) -> ActorResult<Option<EntryData>> {
159        let data_path = self.options.path.owned_data_path(&hash);
160        let outboard_path = self.options.path.owned_outboard_path(&hash);
161        let sizes_path = self.options.path.owned_sizes_path(&hash);
162        let entry = match tables.blobs().get(hash)? {
163            Some(guard) => match guard.value() {
164                EntryState::Complete {
165                    data_location,
166                    outboard_location,
167                } => {
168                    let data = match data_location {
169                        DataLocation::External(paths, size) => {
170                            let path = paths.first().ok_or_else(|| {
171                                ActorError::Inconsistent("external data missing".to_owned())
172                            })?;
173                            let res = std::fs::read(path)?;
174                            if res.len() != size as usize {
175                                return Err(ActorError::Inconsistent(
176                                    "external data size mismatch".to_owned(),
177                                ));
178                            }
179                            res
180                        }
181                        DataLocation::Owned(size) => {
182                            let res = std::fs::read(data_path)?;
183                            if res.len() != size as usize {
184                                return Err(ActorError::Inconsistent(
185                                    "owned data size mismatch".to_owned(),
186                                ));
187                            }
188                            res
189                        }
190                        DataLocation::Inline(_) => {
191                            let data = tables.inline_data().get(hash)?.ok_or_else(|| {
192                                ActorError::Inconsistent("inline data missing".to_owned())
193                            })?;
194                            data.value().to_vec()
195                        }
196                    };
197                    let expected_outboard_size = raw_outboard_size(data.len() as u64);
198                    let outboard = match outboard_location {
199                        OutboardLocation::Owned => std::fs::read(outboard_path)?,
200                        OutboardLocation::Inline(_) => tables
201                            .inline_outboard()
202                            .get(hash)?
203                            .ok_or_else(|| {
204                                ActorError::Inconsistent("inline outboard missing".to_owned())
205                            })?
206                            .value()
207                            .to_vec(),
208                        OutboardLocation::NotNeeded => Vec::new(),
209                    };
210                    if outboard.len() != expected_outboard_size as usize {
211                        return Err(ActorError::Inconsistent(
212                            "outboard size mismatch".to_owned(),
213                        ));
214                    }
215                    Some(EntryData::Complete { data, outboard })
216                }
217                EntryState::Partial { .. } => {
218                    let data = std::fs::read(data_path)?;
219                    let outboard = std::fs::read(outboard_path)?;
220                    let sizes = std::fs::read(sizes_path)?;
221                    Some(EntryData::Partial {
222                        data,
223                        outboard,
224                        sizes,
225                    })
226                }
227            },
228            None => None,
229        };
230        Ok(entry)
231    }
232
233    pub(super) fn set_full_entry_state(
234        &mut self,
235        tables: &mut Tables,
236        hash: Hash,
237        entry: Option<EntryData>,
238    ) -> ActorResult<()> {
239        let data_path = self.options.path.owned_data_path(&hash);
240        let outboard_path = self.options.path.owned_outboard_path(&hash);
241        let sizes_path = self.options.path.owned_sizes_path(&hash);
242        // tabula rasa
243        std::fs::remove_file(&outboard_path).ok();
244        std::fs::remove_file(&data_path).ok();
245        std::fs::remove_file(&sizes_path).ok();
246        tables.inline_data.remove(&hash)?;
247        tables.inline_outboard.remove(&hash)?;
248        let Some(entry) = entry else {
249            tables.blobs.remove(&hash)?;
250            return Ok(());
251        };
252        // write the new data and determine the new state
253        let entry = match entry {
254            EntryData::Complete { data, outboard } => {
255                let data_size = data.len() as u64;
256                let data_location = if data_size > self.options.inline.max_data_inlined {
257                    std::fs::write(data_path, &data)?;
258                    DataLocation::Owned(data_size)
259                } else {
260                    tables.inline_data.insert(hash, data.as_slice())?;
261                    DataLocation::Inline(())
262                };
263                let outboard_size = outboard.len() as u64;
264                let outboard_location = if outboard_size > self.options.inline.max_outboard_inlined
265                {
266                    std::fs::write(outboard_path, &outboard)?;
267                    OutboardLocation::Owned
268                } else if outboard_size > 0 {
269                    tables.inline_outboard.insert(hash, outboard.as_slice())?;
270                    OutboardLocation::Inline(())
271                } else {
272                    OutboardLocation::NotNeeded
273                };
274                EntryState::Complete {
275                    data_location,
276                    outboard_location,
277                }
278            }
279            EntryData::Partial {
280                data,
281                outboard,
282                sizes,
283            } => {
284                std::fs::write(data_path, data)?;
285                std::fs::write(outboard_path, outboard)?;
286                std::fs::write(sizes_path, sizes)?;
287                EntryState::Partial { size: None }
288            }
289        };
290        // finally, write the state
291        tables.blobs.insert(hash, entry)?;
292        Ok(())
293    }
294
295    #[cfg(test)]
296    pub(super) fn entry_state(
297        &mut self,
298        tables: &impl ReadableTables,
299        hash: Hash,
300    ) -> ActorResult<EntryStateResponse> {
301        let mem = self.handles.get(&hash).and_then(|weak| weak.upgrade());
302        let db = match tables.blobs().get(hash)? {
303            Some(entry) => Some({
304                match entry.value() {
305                    EntryState::Complete {
306                        data_location,
307                        outboard_location,
308                    } => {
309                        let data_location = match data_location {
310                            DataLocation::Inline(()) => {
311                                let data = tables.inline_data().get(hash)?.ok_or_else(|| {
312                                    ActorError::Inconsistent("inline data missing".to_owned())
313                                })?;
314                                DataLocation::Inline(data.value().to_vec())
315                            }
316                            DataLocation::Owned(x) => DataLocation::Owned(x),
317                            DataLocation::External(p, s) => DataLocation::External(p, s),
318                        };
319                        let outboard_location = match outboard_location {
320                            OutboardLocation::Inline(()) => {
321                                let outboard =
322                                    tables.inline_outboard().get(hash)?.ok_or_else(|| {
323                                        ActorError::Inconsistent(
324                                            "inline outboard missing".to_owned(),
325                                        )
326                                    })?;
327                                OutboardLocation::Inline(outboard.value().to_vec())
328                            }
329                            OutboardLocation::Owned => OutboardLocation::Owned,
330                            OutboardLocation::NotNeeded => OutboardLocation::NotNeeded,
331                        };
332                        EntryState::Complete {
333                            data_location,
334                            outboard_location,
335                        }
336                    }
337                    EntryState::Partial { size } => EntryState::Partial { size },
338                }
339            }),
340            None => None,
341        };
342        Ok(EntryStateResponse { mem, db })
343    }
344}
345
346/// What do to with a file pair when making partial files
347#[derive(Debug)]
348pub enum MakePartialResult {
349    /// leave the file as is
350    Retain,
351    /// remove it entirely
352    Remove,
353    /// truncate the data file to the given size
354    Truncate(u64),
355}
356
357/// Open a database and make it partial.
358pub fn make_partial(
359    path: &Path,
360    f: impl Fn(Hash, u64) -> MakePartialResult + Send + Sync,
361) -> io::Result<()> {
362    tracing::info!("starting runtime for make_partial");
363    let rt = tokio::runtime::Builder::new_current_thread()
364        .enable_all()
365        .build()?;
366    rt.block_on(async move {
367        let blobs_path = path.join("blobs");
368        let store = Store::load(blobs_path).await?;
369        store
370            .transform_entries(|hash, entry| match &entry {
371                EntryData::Complete { data, outboard } => {
372                    let res = f(hash, data.len() as u64);
373                    tracing::info!("make_partial: {} {:?}", hash, res);
374                    match res {
375                        MakePartialResult::Retain => Some(entry),
376                        MakePartialResult::Remove => None,
377                        MakePartialResult::Truncate(size) => {
378                            let current_size = data.len() as u64;
379                            if size < current_size {
380                                let size = size as usize;
381                                let sizes = SizeInfo::complete(current_size).to_vec();
382                                Some(EntryData::Partial {
383                                    data: data[..size].to_vec(),
384                                    outboard: outboard.to_vec(),
385                                    sizes,
386                                })
387                            } else {
388                                Some(entry)
389                            }
390                        }
391                    }
392                }
393                EntryData::Partial { .. } => Some(entry),
394            })
395            .await?;
396        std::io::Result::Ok(())
397    })?;
398    drop(rt);
399    tracing::info!("done with make_partial");
400    Ok(())
401}