iroh_bytes/store/fs/
test_support.rs

1//! DB functions to support testing
2//!
3//! For some tests we need to modify the state of the store in ways that are not
4//! possible through the public API. This module provides functions to do that.
5use std::{
6    io,
7    path::{Path, PathBuf},
8};
9
10use tokio::sync::oneshot;
11
12use super::{
13    tables::{ReadableTables, Tables},
14    ActorError, ActorMessage, ActorResult, ActorState, DataLocation, EntryState, FilterPredicate,
15    OutboardLocation, OuterResult, Store, StoreInner,
16};
17use crate::{
18    store::{mutable_mem_storage::SizeInfo, DbIter},
19    util::raw_outboard_size,
20    Hash,
21};
22use redb::ReadableTable;
23
24/// The full state of an entry, including the data.
25#[derive(derive_more::Debug)]
26pub enum EntryData {
27    /// Complete
28    Complete {
29        /// Data
30        #[debug("data")]
31        data: Vec<u8>,
32        /// Outboard
33        #[debug("outboard")]
34        outboard: Vec<u8>,
35    },
36    /// Partial
37    Partial {
38        /// Data
39        #[debug("data")]
40        data: Vec<u8>,
41        /// Outboard
42        #[debug("outboard")]
43        outboard: Vec<u8>,
44        /// Sizes
45        #[debug("sizes")]
46        sizes: Vec<u8>,
47    },
48}
49
50impl Store {
51    /// Get the complete state of an entry, both in memory and in redb.
52    #[cfg(test)]
53    pub(crate) async fn entry_state(&self, hash: Hash) -> io::Result<EntryStateResponse> {
54        Ok(self.0.entry_state(hash).await?)
55    }
56
57    async fn all_blobs(&self) -> io::Result<DbIter<Hash>> {
58        Ok(Box::new(self.0.all_blobs().await?.into_iter()))
59    }
60
61    /// Transform all entries in the store. This is for testing and can be used to get the store
62    /// in a wrong state.
63    pub async fn transform_entries(
64        &self,
65        transform: impl Fn(Hash, EntryData) -> Option<EntryData> + Send + Sync,
66    ) -> io::Result<()> {
67        let blobs = self.all_blobs().await?;
68        for blob in blobs {
69            let hash = blob?;
70            let entry = self.get_full_entry_state(hash).await?;
71            if let Some(entry) = entry {
72                let entry1 = transform(hash, entry);
73                self.set_full_entry_state(hash, entry1).await?;
74            }
75        }
76        Ok(())
77    }
78
79    /// Set the full entry state for a hash. This is for testing and can be used to get the store
80    /// in a wrong state.
81    pub(crate) async fn set_full_entry_state(
82        &self,
83        hash: Hash,
84        entry: Option<EntryData>,
85    ) -> io::Result<()> {
86        Ok(self.0.set_full_entry_state(hash, entry).await?)
87    }
88
89    /// Set the full entry state for a hash. This is for testing and can be used to get the store
90    /// in a wrong state.
91    pub(crate) async fn get_full_entry_state(&self, hash: Hash) -> io::Result<Option<EntryData>> {
92        Ok(self.0.get_full_entry_state(hash).await?)
93    }
94
95    /// Owned data path
96    pub fn owned_data_path(&self, hash: &Hash) -> PathBuf {
97        self.0.path_options.owned_data_path(hash)
98    }
99
100    /// Owned outboard path
101    pub fn owned_outboard_path(&self, hash: &Hash) -> PathBuf {
102        self.0.path_options.owned_outboard_path(hash)
103    }
104}
105
106impl StoreInner {
107    #[cfg(test)]
108    async fn entry_state(&self, hash: Hash) -> OuterResult<EntryStateResponse> {
109        let (tx, rx) = flume::bounded(1);
110        self.tx
111            .send_async(ActorMessage::EntryState { hash, tx })
112            .await?;
113        Ok(rx.recv_async().await??)
114    }
115
116    async fn set_full_entry_state(&self, hash: Hash, entry: Option<EntryData>) -> OuterResult<()> {
117        let (tx, rx) = flume::bounded(1);
118        self.tx
119            .send_async(ActorMessage::SetFullEntryState { hash, entry, tx })
120            .await?;
121        Ok(rx.recv_async().await??)
122    }
123
124    async fn get_full_entry_state(&self, hash: Hash) -> OuterResult<Option<EntryData>> {
125        let (tx, rx) = flume::bounded(1);
126        self.tx
127            .send_async(ActorMessage::GetFullEntryState { hash, tx })
128            .await?;
129        Ok(rx.recv_async().await??)
130    }
131
132    async fn all_blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
133        let (tx, rx) = oneshot::channel();
134        let filter: FilterPredicate<Hash, EntryState> =
135            Box::new(|_i, k, v| Some((k.value(), v.value())));
136        self.tx
137            .send_async(ActorMessage::Blobs { filter, tx })
138            .await?;
139        let blobs = rx.await?;
140        let res = blobs?
141            .into_iter()
142            .map(|r| {
143                r.map(|(hash, _)| hash)
144                    .map_err(|e| ActorError::from(e).into())
145            })
146            .collect::<Vec<_>>();
147        Ok(res)
148    }
149}
150
151#[cfg(test)]
152#[derive(Debug)]
153pub(crate) struct EntryStateResponse {
154    pub mem: Option<crate::store::bao_file::BaoFileHandle>,
155    pub db: Option<EntryState<Vec<u8>>>,
156}
157
158impl ActorState {
159    pub(super) fn get_full_entry_state(
160        &mut self,
161        tables: &impl ReadableTables,
162        hash: Hash,
163    ) -> ActorResult<Option<EntryData>> {
164        let data_path = self.options.path.owned_data_path(&hash);
165        let outboard_path = self.options.path.owned_outboard_path(&hash);
166        let sizes_path = self.options.path.owned_sizes_path(&hash);
167        let entry = match tables.blobs().get(hash)? {
168            Some(guard) => match guard.value() {
169                EntryState::Complete {
170                    data_location,
171                    outboard_location,
172                } => {
173                    let data = match data_location {
174                        DataLocation::External(paths, size) => {
175                            let path = paths.first().ok_or_else(|| {
176                                ActorError::Inconsistent("external data missing".to_owned())
177                            })?;
178                            let res = std::fs::read(path)?;
179                            if res.len() != size as usize {
180                                return Err(ActorError::Inconsistent(
181                                    "external data size mismatch".to_owned(),
182                                ));
183                            }
184                            res
185                        }
186                        DataLocation::Owned(size) => {
187                            let res = std::fs::read(data_path)?;
188                            if res.len() != size as usize {
189                                return Err(ActorError::Inconsistent(
190                                    "owned data size mismatch".to_owned(),
191                                ));
192                            }
193                            res
194                        }
195                        DataLocation::Inline(_) => {
196                            let data = tables.inline_data().get(hash)?.ok_or_else(|| {
197                                ActorError::Inconsistent("inline data missing".to_owned())
198                            })?;
199                            data.value().to_vec()
200                        }
201                    };
202                    let expected_outboard_size = raw_outboard_size(data.len() as u64);
203                    let outboard = match outboard_location {
204                        OutboardLocation::Owned => std::fs::read(outboard_path)?,
205                        OutboardLocation::Inline(_) => tables
206                            .inline_outboard()
207                            .get(hash)?
208                            .ok_or_else(|| {
209                                ActorError::Inconsistent("inline outboard missing".to_owned())
210                            })?
211                            .value()
212                            .to_vec(),
213                        OutboardLocation::NotNeeded => Vec::new(),
214                    };
215                    if outboard.len() != expected_outboard_size as usize {
216                        return Err(ActorError::Inconsistent(
217                            "outboard size mismatch".to_owned(),
218                        ));
219                    }
220                    Some(EntryData::Complete { data, outboard })
221                }
222                EntryState::Partial { .. } => {
223                    let data = std::fs::read(data_path)?;
224                    let outboard = std::fs::read(outboard_path)?;
225                    let sizes = std::fs::read(sizes_path)?;
226                    Some(EntryData::Partial {
227                        data,
228                        outboard,
229                        sizes,
230                    })
231                }
232            },
233            None => None,
234        };
235        Ok(entry)
236    }
237
238    pub(super) fn set_full_entry_state(
239        &mut self,
240        tables: &mut Tables,
241        hash: Hash,
242        entry: Option<EntryData>,
243    ) -> ActorResult<()> {
244        let data_path = self.options.path.owned_data_path(&hash);
245        let outboard_path = self.options.path.owned_outboard_path(&hash);
246        let sizes_path = self.options.path.owned_sizes_path(&hash);
247        // tabula rasa
248        std::fs::remove_file(&outboard_path).ok();
249        std::fs::remove_file(&data_path).ok();
250        std::fs::remove_file(&sizes_path).ok();
251        tables.inline_data.remove(&hash)?;
252        tables.inline_outboard.remove(&hash)?;
253        let Some(entry) = entry else {
254            tables.blobs.remove(&hash)?;
255            return Ok(());
256        };
257        // write the new data and determine the new state
258        let entry = match entry {
259            EntryData::Complete { data, outboard } => {
260                let data_size = data.len() as u64;
261                let data_location = if data_size > self.options.inline.max_data_inlined {
262                    std::fs::write(data_path, &data)?;
263                    DataLocation::Owned(data_size)
264                } else {
265                    tables.inline_data.insert(hash, data.as_slice())?;
266                    DataLocation::Inline(())
267                };
268                let outboard_size = outboard.len() as u64;
269                let outboard_location = if outboard_size > self.options.inline.max_outboard_inlined
270                {
271                    std::fs::write(outboard_path, &outboard)?;
272                    OutboardLocation::Owned
273                } else if outboard_size > 0 {
274                    tables.inline_outboard.insert(hash, outboard.as_slice())?;
275                    OutboardLocation::Inline(())
276                } else {
277                    OutboardLocation::NotNeeded
278                };
279                EntryState::Complete {
280                    data_location,
281                    outboard_location,
282                }
283            }
284            EntryData::Partial {
285                data,
286                outboard,
287                sizes,
288            } => {
289                std::fs::write(data_path, data)?;
290                std::fs::write(outboard_path, outboard)?;
291                std::fs::write(sizes_path, sizes)?;
292                EntryState::Partial { size: None }
293            }
294        };
295        // finally, write the state
296        tables.blobs.insert(hash, entry)?;
297        Ok(())
298    }
299
300    #[cfg(test)]
301    pub(super) fn entry_state(
302        &mut self,
303        tables: &impl ReadableTables,
304        hash: Hash,
305    ) -> ActorResult<EntryStateResponse> {
306        let mem = self.handles.get(&hash).and_then(|weak| weak.upgrade());
307        let db = match tables.blobs().get(hash)? {
308            Some(entry) => Some({
309                match entry.value() {
310                    EntryState::Complete {
311                        data_location,
312                        outboard_location,
313                    } => {
314                        let data_location = match data_location {
315                            DataLocation::Inline(()) => {
316                                let data = tables.inline_data().get(hash)?.ok_or_else(|| {
317                                    ActorError::Inconsistent("inline data missing".to_owned())
318                                })?;
319                                DataLocation::Inline(data.value().to_vec())
320                            }
321                            DataLocation::Owned(x) => DataLocation::Owned(x),
322                            DataLocation::External(p, s) => DataLocation::External(p, s),
323                        };
324                        let outboard_location = match outboard_location {
325                            OutboardLocation::Inline(()) => {
326                                let outboard =
327                                    tables.inline_outboard().get(hash)?.ok_or_else(|| {
328                                        ActorError::Inconsistent(
329                                            "inline outboard missing".to_owned(),
330                                        )
331                                    })?;
332                                OutboardLocation::Inline(outboard.value().to_vec())
333                            }
334                            OutboardLocation::Owned => OutboardLocation::Owned,
335                            OutboardLocation::NotNeeded => OutboardLocation::NotNeeded,
336                        };
337                        EntryState::Complete {
338                            data_location,
339                            outboard_location,
340                        }
341                    }
342                    EntryState::Partial { size } => EntryState::Partial { size },
343                }
344            }),
345            None => None,
346        };
347        Ok(EntryStateResponse { mem, db })
348    }
349}
350
351/// What do to with a file pair when making partial files
352#[derive(Debug)]
353pub enum MakePartialResult {
354    /// leave the file as is
355    Retain,
356    /// remove it entirely
357    Remove,
358    /// truncate the data file to the given size
359    Truncate(u64),
360}
361
362/// Open a database and make it partial.
363pub fn make_partial(
364    path: &Path,
365    f: impl Fn(Hash, u64) -> MakePartialResult + Send + Sync,
366) -> io::Result<()> {
367    tokio::runtime::Builder::new_current_thread()
368        .build()?
369        .block_on(async move {
370            let blobs_path = path.join("blobs");
371            let store = Store::load(blobs_path).await?;
372            store
373                .transform_entries(|hash, entry| match &entry {
374                    EntryData::Complete { data, outboard } => match f(hash, data.len() as u64) {
375                        MakePartialResult::Retain => Some(entry),
376                        MakePartialResult::Remove => None,
377                        MakePartialResult::Truncate(size) => {
378                            let current_size = data.len() as u64;
379                            if size < current_size {
380                                let size = size as usize;
381                                let sizes = SizeInfo::complete(current_size).to_vec();
382                                Some(EntryData::Partial {
383                                    data: data[..size].to_vec(),
384                                    outboard: outboard.to_vec(),
385                                    sizes,
386                                })
387                            } else {
388                                Some(entry)
389                            }
390                        }
391                    },
392                    EntryData::Partial { .. } => Some(entry),
393                })
394                .await?;
395            Ok(())
396        })
397}