disk_based_bfs/
builder.rs

1//! Builder used to define settings for a BFS and run it.
2
3use std::path::PathBuf;
4
5use thiserror::Error;
6
7use crate::{
8    bfs::Bfs, callback::BfsCallback, expander::BfsExpander, io::LockedIO,
9    provider::BfsSettingsProvider, settings::BfsSettings,
10};
11
12/// Error type for [`BfsBuilder`].
13#[derive(Debug, Error)]
14pub enum BfsBuilderError {
15    /// The `threads` parameter was not set.
16    #[error("`threads` not set")]
17    ThreadsNotSet,
18
19    /// The `chunk_size_bytes` parameter was not set.
20    #[error("`chunk_size_bytes` not set")]
21    ChunkSizeBytesNotSet,
22
23    /// The `update_memory` parameter was not set.
24    #[error("`update_memory` not set")]
25    UpdateMemoryNotSet,
26
27    /// The `num_update_blocks` parameter was not set.
28    #[error("`num_update_blocks` not set")]
29    NumUpdateBlocksNotSet,
30
31    /// The `capacity_check_frequency` parameter was not set.
32    #[error("`capacity_check_frequency` not set")]
33    CapacityCheckFrequencyNotSet,
34
35    /// The `initial_states` parameter was not set.
36    #[error("`initial_states` not set")]
37    InitialStatesNotSet,
38
39    /// The `state_size` parameter was not set.
40    #[error("`state_size` not set")]
41    StateSizeNotSet,
42
43    /// The `root_directories` parameter was not set.
44    #[error("`root_directories` not set")]
45    RootDirectoriesNotSet,
46
47    /// The `initial_memory_limit` parameter was not set.
48    #[error("`initial_memory_limit` not set")]
49    InitialMemoryLimitNotSet,
50
51    /// The `available_disk_space_limit` parameter was not set.
52    #[error("`available_disk_space_limit` not set")]
53    AvailableDiskSpaceLimitNotSet,
54
55    /// The `update_array_threshold` parameter was not set.
56    #[error("`update_array_threshold` not set")]
57    UpdateArrayThresholdNotSet,
58
59    /// The `use_locked_io` parameter was not set.
60    #[error("`use_locked_io` not set")]
61    UseLockedIoNotSet,
62
63    /// The `sync_filesystem` parameter was not set.
64    #[error("`sync_filesystem` not set")]
65    SyncFilesystemNotSet,
66
67    /// The `compute_checksums` parameter was not set.
68    #[error("`compute_checksums` not set")]
69    ComputeChecksumsNotSet,
70
71    /// The `use_compression` parameter was not set.
72    #[error("`use_compression` not set")]
73    UseCompressionNotSet,
74
75    /// The `expander` parameter was not set.
76    #[error("`expander` not set")]
77    ExpanderNotSet,
78
79    /// The `callback` parameter was not set.
80    #[error("`callback` not set")]
81    CallbackNotSet,
82
83    /// The `settings_provider` parameter was not set.
84    #[error("`settings_provider` not set")]
85    SettingsProviderNotSet,
86
87    /// The `chunk_size_bytes` parameter was too large.
88    #[error("Chunk size ({chunk_size_bytes}) must be at most 2^29 = 536870912 bytes")]
89    ChunkSizeTooLarge {
90        /// The provided chunk size in bytes.
91        chunk_size_bytes: usize,
92    },
93
94    /// Not all chunks of the bit array are the same size.
95    #[error(
96        "State size ({state_size}) must be divisible by 8 * chunk size ({})",
97        8 * chunk_size_bytes,
98    )]
99    ChunksNotSameSize {
100        /// The provided state size.
101        state_size: u64,
102
103        /// The provided chunk size in bytes.
104        chunk_size_bytes: usize,
105    },
106
107    /// Not enough update blocks for the given number of threads and chunks.
108    #[error(
109        "Number of update blocks ({num_update_blocks}) must be greater than threads * num chunks \
110        ({})",
111        threads * num_chunks,
112    )]
113    NotEnoughUpdateBlocks {
114        /// The provided number of update blocks.
115        num_update_blocks: usize,
116
117        /// The provided number of threads.
118        threads: usize,
119
120        /// The number of chunks, calculated from the state size and chunk size.
121        num_chunks: usize,
122    },
123}
124
125/// Used to define all the parameters of a BFS and run it.
126#[derive(Debug)]
127pub struct BfsBuilder<Expander, Callback, Provider, const EXPANSION_NODES: usize> {
128    threads: Option<usize>,
129    chunk_size_bytes: Option<usize>,
130    update_memory: Option<usize>,
131    num_update_blocks: Option<usize>,
132    capacity_check_frequency: Option<usize>,
133    initial_states: Option<Vec<u64>>,
134    state_size: Option<u64>,
135    root_directories: Option<Vec<PathBuf>>,
136    initial_memory_limit: Option<usize>,
137    available_disk_space_limit: Option<u64>,
138    update_array_threshold: Option<u64>,
139    use_locked_io: Option<bool>,
140    sync_filesystem: Option<bool>,
141    compute_checksums: Option<bool>,
142    use_compression: Option<bool>,
143    expander: Option<Expander>,
144    callback: Option<Callback>,
145    settings_provider: Option<Provider>,
146}
147
148impl<Expander, Callback, Provider, const EXPANSION_NODES: usize> Default
149    for BfsBuilder<Expander, Callback, Provider, EXPANSION_NODES>
150where
151    Expander: BfsExpander<EXPANSION_NODES> + Clone + Sync,
152    Callback: BfsCallback + Clone + Sync,
153    Provider: BfsSettingsProvider + Sync,
154{
155    fn default() -> Self {
156        Self::new()
157    }
158}
159
160impl<Expander, Callback, Provider, const EXPANSION_NODES: usize>
161    BfsBuilder<Expander, Callback, Provider, EXPANSION_NODES>
162where
163    Expander: BfsExpander<EXPANSION_NODES> + Clone + Sync,
164    Callback: BfsCallback + Clone + Sync,
165    Provider: BfsSettingsProvider + Sync,
166{
167    /// Creates a default builder with no parameters set.
168    #[must_use]
169    pub fn new() -> Self {
170        Self {
171            threads: None,
172            chunk_size_bytes: None,
173            update_memory: None,
174            num_update_blocks: None,
175            capacity_check_frequency: None,
176            initial_states: None,
177            state_size: None,
178            root_directories: None,
179            initial_memory_limit: None,
180            available_disk_space_limit: None,
181            update_array_threshold: None,
182            use_locked_io: None,
183            sync_filesystem: None,
184            compute_checksums: None,
185            use_compression: None,
186            expander: None,
187            callback: None,
188            settings_provider: None,
189        }
190    }
191
192    /// The number of threads to use in the search.
193    #[must_use]
194    pub fn threads(mut self, threads: usize) -> Self {
195        self.threads = Some(threads);
196        self
197    }
198
199    /// The size of each bit array chunk in bytes. Must be at most 2^29.
200    #[must_use]
201    pub fn chunk_size_bytes(mut self, chunk_size_bytes: usize) -> Self {
202        self.chunk_size_bytes = Some(chunk_size_bytes);
203        self
204    }
205
206    /// The amount of memory to use for storing updates (adjacent nodes) in bytes.
207    #[must_use]
208    pub fn update_memory(mut self, update_memory: usize) -> Self {
209        self.update_memory = Some(update_memory);
210        self
211    }
212
213    /// The number of blocks in which to divide the update memory.
214    #[must_use]
215    pub fn num_update_blocks(mut self, num_update_blocks: usize) -> Self {
216        self.num_update_blocks = Some(num_update_blocks);
217        self
218    }
219
220    /// The frequency (one check per `capacity_check_frequency` new nodes) at which to check the
221    /// capacity of an update block. When capacity is reached, the block will be stored and a new
222    /// block taken.
223    #[must_use]
224    pub fn capacity_check_frequency(mut self, capacity_check_frequency: usize) -> Self {
225        self.capacity_check_frequency = Some(capacity_check_frequency);
226        self
227    }
228
229    /// The nodes at depth 0, from which the search starts.
230    #[must_use]
231    pub fn initial_states(mut self, initial_states: &[u64]) -> Self {
232        self.initial_states = Some(initial_states.to_vec());
233        self
234    }
235
236    /// The number of nodes in the graph.
237    #[must_use]
238    pub fn state_size(mut self, state_size: u64) -> Self {
239        self.state_size = Some(state_size);
240        self
241    }
242
243    /// The working directories that will contain all of the intermediate data. Each directory
244    /// should be on a different hard drive.
245    #[must_use]
246    pub fn root_directories(mut self, root_directories: &[PathBuf]) -> Self {
247        self.root_directories = Some(root_directories.to_vec());
248        self
249    }
250
251    /// Approximate amount of memory in bytes to use for the initial in-memory breadth-first
252    /// search, before switching to the disk-based search.
253    #[must_use]
254    pub fn initial_memory_limit(mut self, initial_memory_limit: usize) -> Self {
255        self.initial_memory_limit = Some(initial_memory_limit);
256        self
257    }
258
259    /// The minimum amount of available disk space (in bytes) that should be kept free on each
260    /// drive.
261    #[must_use]
262    pub fn available_disk_space_limit(mut self, available_disk_space_limit: u64) -> Self {
263        self.available_disk_space_limit = Some(available_disk_space_limit);
264        self
265    }
266
267    /// The threshold at which to switch from writing an update file containing a list of node
268    /// indices, to writing a bit array of updates.
269    #[must_use]
270    pub fn update_array_threshold(mut self, update_array_threshold: u64) -> Self {
271        self.update_array_threshold = Some(update_array_threshold);
272        self
273    }
274
275    /// Whether disk I/O should be locked using a mutex to prevent multiple threads from performing
276    /// I/O on the same disk at the same time.
277    #[must_use]
278    pub fn use_locked_io(mut self, use_locked_io: bool) -> Self {
279        self.use_locked_io = Some(use_locked_io);
280        self
281    }
282
283    /// Whether to call `sync` on the filesystem before deleting files. If this is set to false and
284    /// a system failure occurs, it's possible that data could be lost and the search would need to
285    /// be restarted from the beginning.
286    #[must_use]
287    pub fn sync_filesystem(mut self, sync_filesystem: bool) -> Self {
288        self.sync_filesystem = Some(sync_filesystem);
289        self
290    }
291
292    /// Whether to compute and verify checksums for all data written to disk.
293    #[must_use]
294    pub fn compute_checksums(mut self, compute_checksums: bool) -> Self {
295        self.compute_checksums = Some(compute_checksums);
296        self
297    }
298
299    /// Whether update files and bit arrays should be compressed.
300    #[must_use]
301    pub fn use_compression(mut self, use_compression: bool) -> Self {
302        self.use_compression = Some(use_compression);
303        self
304    }
305
306    /// The implementor of the [`BfsExpander`] trait that defines the graph to traverse.
307    ///
308    /// [`BfsExpander`]: ../expander/trait.BfsExpander.html
309    #[must_use]
310    pub fn expander(mut self, expander: Expander) -> Self {
311        self.expander = Some(expander);
312        self
313    }
314
315    /// The implementor of the [`BfsCallback`] trait that defines callbacks to run during the
316    /// search.
317    ///
318    /// [`BfsCallback`]: ../callback/trait.BfsCallback.html
319    #[must_use]
320    pub fn callback(mut self, callback: Callback) -> Self {
321        self.callback = Some(callback);
322        self
323    }
324
325    /// The implementor of the [`BfsSettingsProvider`] trait that provides various additional
326    /// settings for the search.
327    ///
328    /// [`BfsSettingsProvider`]: ../provider/trait.BfsSettingsProvider.html
329    #[must_use]
330    pub fn settings_provider(mut self, settings_provider: Provider) -> Self {
331        self.settings_provider = Some(settings_provider);
332        self
333    }
334
335    /// Runs the BFS with the given settings, requiring all fields of the [`BfsBuilder`] to be set
336    /// explicitly, without using any default values.
337    pub fn run_no_defaults(self) -> Result<(), BfsBuilderError> {
338        // Limit to 2^29 bytes so that we can store 32 bit values in the update files
339        let chunk_size_bytes = self
340            .chunk_size_bytes
341            .ok_or(BfsBuilderError::ChunkSizeBytesNotSet)?;
342        if chunk_size_bytes > 1 << 29 {
343            return Err(BfsBuilderError::ChunkSizeTooLarge { chunk_size_bytes });
344        }
345
346        // Require that all chunks are the same size
347        let state_size = self.state_size.ok_or(BfsBuilderError::StateSizeNotSet)?;
348        if state_size as usize % (8 * chunk_size_bytes) != 0 {
349            return Err(BfsBuilderError::ChunksNotSameSize {
350                state_size,
351                chunk_size_bytes,
352            });
353        }
354
355        // Each thread can hold one update vec per chunk, so we need more than (threads * chunks)
356        // update vecs in total
357        let num_update_blocks = self
358            .num_update_blocks
359            .ok_or(BfsBuilderError::NumUpdateBlocksNotSet)?;
360        let threads = self.threads.ok_or(BfsBuilderError::ThreadsNotSet)?;
361        let num_chunks = state_size as usize / (8 * chunk_size_bytes);
362        if num_update_blocks <= threads * num_chunks {
363            return Err(BfsBuilderError::NotEnoughUpdateBlocks {
364                num_update_blocks,
365                threads,
366                num_chunks,
367            });
368        }
369
370        let settings = BfsSettings {
371            threads,
372            chunk_size_bytes,
373            update_memory: self
374                .update_memory
375                .ok_or(BfsBuilderError::UpdateMemoryNotSet)?,
376            num_update_blocks,
377            capacity_check_frequency: self
378                .capacity_check_frequency
379                .ok_or(BfsBuilderError::CapacityCheckFrequencyNotSet)?,
380            initial_states: self
381                .initial_states
382                .ok_or(BfsBuilderError::InitialStatesNotSet)?,
383            state_size,
384            root_directories: self
385                .root_directories
386                .ok_or(BfsBuilderError::RootDirectoriesNotSet)?,
387            initial_memory_limit: self
388                .initial_memory_limit
389                .ok_or(BfsBuilderError::InitialMemoryLimitNotSet)?,
390            available_disk_space_limit: self
391                .available_disk_space_limit
392                .ok_or(BfsBuilderError::AvailableDiskSpaceLimitNotSet)?,
393            update_array_threshold: self
394                .update_array_threshold
395                .ok_or(BfsBuilderError::UpdateArrayThresholdNotSet)?,
396            use_locked_io: self
397                .use_locked_io
398                .ok_or(BfsBuilderError::UseLockedIoNotSet)?,
399            sync_filesystem: self
400                .sync_filesystem
401                .ok_or(BfsBuilderError::SyncFilesystemNotSet)?,
402            compute_checksums: self
403                .compute_checksums
404                .ok_or(BfsBuilderError::ComputeChecksumsNotSet)?,
405            use_compression: self
406                .use_compression
407                .ok_or(BfsBuilderError::UseCompressionNotSet)?,
408            expander: self.expander.ok_or(BfsBuilderError::ExpanderNotSet)?,
409            callback: self.callback.ok_or(BfsBuilderError::CallbackNotSet)?,
410            settings_provider: self
411                .settings_provider
412                .ok_or(BfsBuilderError::SettingsProviderNotSet)?,
413        };
414
415        let locked_io = LockedIO::new(&settings);
416
417        let bfs = Bfs::new(&settings, &locked_io);
418        bfs.run();
419
420        Ok(())
421    }
422
423    /// Runs the BFS with the given settings, using default values for any parameters that were not
424    /// set, if possible.
425    ///
426    /// Default values are:
427    /// - `threads`: 1
428    /// - `update_memory`: 1 GiB
429    /// - `capacity_check_frequency`: 256
430    /// - `initial_memory_limit`: 64 MiB
431    /// - `use_locked_io`: false
432    /// - `sync_filesystem`: true
433    /// - `compute_checksums`: true
434    /// - `use_compression`: true
435    /// - `update_array_threshold`: `chunk_size_bytes`
436    /// - `num_update_blocks`: `2 * threads * num_chunks`, where
437    ///   `num_chunks = state_size / (8 * chunk_size_bytes)`
438    pub fn run(mut self) -> Result<(), BfsBuilderError> {
439        self.threads.get_or_insert(1);
440        self.update_memory.get_or_insert(1 << 30);
441        self.capacity_check_frequency.get_or_insert(1 << 8);
442        self.initial_memory_limit.get_or_insert(1 << 26);
443        self.use_locked_io.get_or_insert(false);
444        self.sync_filesystem.get_or_insert(true);
445        self.compute_checksums.get_or_insert(true);
446        self.use_compression.get_or_insert(true);
447
448        let chunk_size_bytes = self
449            .chunk_size_bytes
450            .ok_or(BfsBuilderError::ChunkSizeBytesNotSet)?;
451        self.update_array_threshold
452            .get_or_insert(chunk_size_bytes as u64);
453
454        let state_size = self.state_size.ok_or(BfsBuilderError::StateSizeNotSet)?;
455        let num_chunks = state_size as usize / (8 * chunk_size_bytes);
456        self.num_update_blocks
457            .get_or_insert(2 * self.threads.unwrap() * num_chunks);
458
459        self.run_no_defaults()
460    }
461}