safe_network/dbs/
chunk_store.rs

1// Copyright 2022 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use super::{Error, Result};
10
11use crate::UsedSpace;
12use sn_interface::types::{Chunk, ChunkAddress};
13
14use bytes::Bytes;
15use std::path::{Path, PathBuf};
16use tokio::io::AsyncWriteExt;
17use walkdir::WalkDir;
18use xor_name::{Prefix, XorName};
19
20const BIT_TREE_DEPTH: usize = 20;
21const CHUNK_DB_DIR: &str = "chunkdb";
22
23/// A disk store for chunks
24#[derive(Clone)]
25pub(crate) struct ChunkStore {
26    bit_tree_depth: usize,
27    chunk_store_path: PathBuf,
28    used_space: UsedSpace,
29}
30
31impl ChunkStore {
32    /// Creates a new `ChunkStore` at location `root/CHUNK_DB_DIR`
33    ///
34    /// If the location specified already contains a ChunkStore, it is simply used
35    ///
36    /// Used space of the dir is tracked
37    pub(crate) fn new<P: AsRef<Path>>(root: P, used_space: UsedSpace) -> Result<Self> {
38        let chunk_store_path = root.as_ref().join(CHUNK_DB_DIR);
39
40        Ok(ChunkStore {
41            bit_tree_depth: BIT_TREE_DEPTH,
42            chunk_store_path,
43            used_space,
44        })
45    }
46
47    // ---------------------- helper methods ----------------------
48
49    // Helper that returns the prefix tree path of depth `bit_count` for a given xorname
50    // Example:
51    // - with a xorname with starting bits `010001110110....`
52    // - and a bit_count of `6`
53    // returns the path `CHUNK_STORE_PATH/0/1/0/0/0/1`
54    // If the provided bit count is larger than `self.bit_tree_depth`, uses `self.bit_tree_depth`
55    // to stay within the prefix tree path
56    fn prefix_tree_path(&self, xorname: XorName, bit_count: usize) -> PathBuf {
57        let bin = format!("{:b}", xorname);
58        let prefix_dir_path: PathBuf = bin
59            .chars()
60            .take(std::cmp::min(bit_count, self.bit_tree_depth))
61            .map(|c| format!("{}", c))
62            .collect();
63
64        let mut path = self.chunk_store_path.clone();
65        path.push(prefix_dir_path);
66        path
67    }
68
69    fn address_to_filepath(&self, addr: &ChunkAddress) -> Result<PathBuf> {
70        let xorname = *addr.name();
71        let filename = addr.encode_to_zbase32()?;
72        let mut path = self.prefix_tree_path(xorname, self.bit_tree_depth);
73        path.push(filename);
74        Ok(path)
75    }
76
77    fn filepath_to_address(&self, path: &str) -> Result<ChunkAddress> {
78        let filename = Path::new(path)
79            .file_name()
80            .ok_or(Error::NoFilename)?
81            .to_str()
82            .ok_or(Error::InvalidFilename)?;
83        Ok(ChunkAddress::decode_from_zbase32(filename)?)
84    }
85
86    // ---------------------- api methods ----------------------
87
88    pub(crate) fn can_add(&self, size: usize) -> bool {
89        self.used_space.can_add(size)
90    }
91
92    pub(crate) async fn write_chunk(&self, data: &Chunk) -> Result<ChunkAddress> {
93        let addr = data.address();
94        let filepath = self.address_to_filepath(addr)?;
95        if let Some(dirs) = filepath.parent() {
96            tokio::fs::create_dir_all(dirs).await?;
97        }
98
99        let mut file = tokio::fs::File::create(filepath).await?;
100        file.write_all(data.value()).await?;
101
102        self.used_space.increase(data.value().len());
103
104        Ok(*addr)
105    }
106
107    #[allow(dead_code)]
108    pub(crate) async fn delete_chunk(&self, addr: &ChunkAddress) -> Result<()> {
109        let filepath = self.address_to_filepath(addr)?;
110        let meta = tokio::fs::metadata(filepath.clone()).await?;
111        tokio::fs::remove_file(filepath).await?;
112        self.used_space.decrease(meta.len() as usize);
113        Ok(())
114    }
115
116    pub(crate) async fn read_chunk(&self, addr: &ChunkAddress) -> Result<Chunk> {
117        let file_path = self.address_to_filepath(addr)?;
118        let bytes = Bytes::from(tokio::fs::read(file_path).await?);
119        let chunk = Chunk::new(bytes);
120        Ok(chunk)
121    }
122
123    pub(crate) fn chunk_file_exists(&self, addr: &ChunkAddress) -> Result<bool> {
124        let filepath = self.address_to_filepath(addr)?;
125        Ok(filepath.exists())
126    }
127
128    pub(crate) fn list_all_files(&self) -> Result<Vec<String>> {
129        list_files_in(&self.chunk_store_path)
130    }
131
132    pub(crate) fn list_all_chunk_addresses(&self) -> Result<Vec<ChunkAddress>> {
133        let all_files = self.list_all_files()?;
134        let all_addrs = all_files
135            .iter()
136            .map(|filepath| self.filepath_to_address(filepath))
137            .collect();
138        all_addrs
139    }
140
141    #[allow(unused)]
142    /// quickly find chunks related or not to a section, might be useful when adults change sections
143    /// not used yet
144    pub(crate) fn list_files_without_prefix(&self, prefix: Prefix) -> Result<Vec<String>> {
145        let all_files = self.list_all_files()?;
146        let prefix_path = self.prefix_tree_path(prefix.name(), prefix.bit_count());
147        let outside_prefix = all_files
148            .into_iter()
149            .filter(|p| !Path::new(&p).starts_with(&prefix_path.as_path()))
150            .collect();
151        Ok(outside_prefix)
152    }
153
154    #[allow(unused)]
155    /// quickly find chunks related or not to a section, might be useful when adults change sections
156    /// not used yet
157    pub(crate) fn list_files_with_prefix(&self, prefix: Prefix) -> Result<Vec<String>> {
158        let prefix_path = self.prefix_tree_path(prefix.name(), prefix.bit_count());
159        list_files_in(prefix_path.as_path())
160    }
161}
162
163fn list_files_in(path: &Path) -> Result<Vec<String>> {
164    let files = WalkDir::new(path)
165        .into_iter()
166        .filter_map(|e| match e {
167            Ok(direntry) => Some(direntry),
168            Err(err) => {
169                warn!("ChunkStore: failed to process file entry: {}", err);
170                None
171            }
172        })
173        .filter(|e| e.file_type().is_file())
174        .map(|e| e.path().display().to_string())
175        .collect();
176    Ok(files)
177}
178
179#[cfg(test)]
180mod tests {
181    use sn_interface::types::utils::random_bytes;
182
183    use super::*;
184    use futures::future::join_all;
185    use rayon::prelude::*;
186    use tempfile::tempdir;
187
188    fn init_chunk_disk_store() -> ChunkStore {
189        let root = tempdir().expect("Failed to create temporary directory for chunk disk store");
190        ChunkStore::new(root.path(), UsedSpace::new(usize::MAX))
191            .expect("Failed to create chunk disk store")
192    }
193
194    #[tokio::test]
195    #[ignore]
196    async fn test_write_read_chunk() {
197        let store = init_chunk_disk_store();
198        // test that a range of different chunks return the written chunk
199        for _ in 0..10 {
200            let chunk = Chunk::new(random_bytes(100));
201
202            let addr = store
203                .write_chunk(&chunk)
204                .await
205                .expect("Failed to write chunk.");
206
207            let read_chunk = store
208                .read_chunk(&addr)
209                .await
210                .expect("Failed to read chunk.");
211
212            assert_eq!(chunk.value(), read_chunk.value());
213        }
214    }
215
216    #[tokio::test(flavor = "multi_thread")]
217    async fn test_write_read_async_multiple_chunks() {
218        let store = init_chunk_disk_store();
219        let size = 100;
220        let chunks: Vec<Chunk> = std::iter::repeat_with(|| Chunk::new(random_bytes(size)))
221            .take(7)
222            .collect();
223        write_and_read_chunks(&chunks, store).await;
224    }
225
226    #[tokio::test(flavor = "multi_thread")]
227    async fn test_write_read_async_multiple_identical_chunks() {
228        let store = init_chunk_disk_store();
229        let chunks: Vec<Chunk> = std::iter::repeat(Chunk::new(Bytes::from("test_concurrent")))
230            .take(7)
231            .collect();
232        write_and_read_chunks(&chunks, store).await;
233    }
234
235    async fn write_and_read_chunks(chunks: &[Chunk], store: ChunkStore) {
236        // write all chunks
237        let tasks = chunks.iter().map(|c| store.write_chunk(c));
238        let results = join_all(tasks).await;
239
240        // read all chunks
241        let tasks = results.iter().flatten().map(|addr| store.read_chunk(addr));
242        let results = join_all(tasks).await;
243        let read_chunks: Vec<&Chunk> = results.iter().flatten().collect();
244
245        // verify all written were read
246        assert!(chunks
247            .par_iter()
248            .all(|c| read_chunks.iter().any(|r| r.value() == c.value())))
249    }
250}