safe_network/dbs/
chunk_store.rs1use super::{Error, Result};
10
11use crate::UsedSpace;
12use sn_interface::types::{Chunk, ChunkAddress};
13
14use bytes::Bytes;
15use std::path::{Path, PathBuf};
16use tokio::io::AsyncWriteExt;
17use walkdir::WalkDir;
18use xor_name::{Prefix, XorName};
19
20const BIT_TREE_DEPTH: usize = 20;
21const CHUNK_DB_DIR: &str = "chunkdb";
22
23#[derive(Clone)]
25pub(crate) struct ChunkStore {
26 bit_tree_depth: usize,
27 chunk_store_path: PathBuf,
28 used_space: UsedSpace,
29}
30
31impl ChunkStore {
32 pub(crate) fn new<P: AsRef<Path>>(root: P, used_space: UsedSpace) -> Result<Self> {
38 let chunk_store_path = root.as_ref().join(CHUNK_DB_DIR);
39
40 Ok(ChunkStore {
41 bit_tree_depth: BIT_TREE_DEPTH,
42 chunk_store_path,
43 used_space,
44 })
45 }
46
47 fn prefix_tree_path(&self, xorname: XorName, bit_count: usize) -> PathBuf {
57 let bin = format!("{:b}", xorname);
58 let prefix_dir_path: PathBuf = bin
59 .chars()
60 .take(std::cmp::min(bit_count, self.bit_tree_depth))
61 .map(|c| format!("{}", c))
62 .collect();
63
64 let mut path = self.chunk_store_path.clone();
65 path.push(prefix_dir_path);
66 path
67 }
68
69 fn address_to_filepath(&self, addr: &ChunkAddress) -> Result<PathBuf> {
70 let xorname = *addr.name();
71 let filename = addr.encode_to_zbase32()?;
72 let mut path = self.prefix_tree_path(xorname, self.bit_tree_depth);
73 path.push(filename);
74 Ok(path)
75 }
76
77 fn filepath_to_address(&self, path: &str) -> Result<ChunkAddress> {
78 let filename = Path::new(path)
79 .file_name()
80 .ok_or(Error::NoFilename)?
81 .to_str()
82 .ok_or(Error::InvalidFilename)?;
83 Ok(ChunkAddress::decode_from_zbase32(filename)?)
84 }
85
86 pub(crate) fn can_add(&self, size: usize) -> bool {
89 self.used_space.can_add(size)
90 }
91
92 pub(crate) async fn write_chunk(&self, data: &Chunk) -> Result<ChunkAddress> {
93 let addr = data.address();
94 let filepath = self.address_to_filepath(addr)?;
95 if let Some(dirs) = filepath.parent() {
96 tokio::fs::create_dir_all(dirs).await?;
97 }
98
99 let mut file = tokio::fs::File::create(filepath).await?;
100 file.write_all(data.value()).await?;
101
102 self.used_space.increase(data.value().len());
103
104 Ok(*addr)
105 }
106
107 #[allow(dead_code)]
108 pub(crate) async fn delete_chunk(&self, addr: &ChunkAddress) -> Result<()> {
109 let filepath = self.address_to_filepath(addr)?;
110 let meta = tokio::fs::metadata(filepath.clone()).await?;
111 tokio::fs::remove_file(filepath).await?;
112 self.used_space.decrease(meta.len() as usize);
113 Ok(())
114 }
115
116 pub(crate) async fn read_chunk(&self, addr: &ChunkAddress) -> Result<Chunk> {
117 let file_path = self.address_to_filepath(addr)?;
118 let bytes = Bytes::from(tokio::fs::read(file_path).await?);
119 let chunk = Chunk::new(bytes);
120 Ok(chunk)
121 }
122
123 pub(crate) fn chunk_file_exists(&self, addr: &ChunkAddress) -> Result<bool> {
124 let filepath = self.address_to_filepath(addr)?;
125 Ok(filepath.exists())
126 }
127
128 pub(crate) fn list_all_files(&self) -> Result<Vec<String>> {
129 list_files_in(&self.chunk_store_path)
130 }
131
132 pub(crate) fn list_all_chunk_addresses(&self) -> Result<Vec<ChunkAddress>> {
133 let all_files = self.list_all_files()?;
134 let all_addrs = all_files
135 .iter()
136 .map(|filepath| self.filepath_to_address(filepath))
137 .collect();
138 all_addrs
139 }
140
141 #[allow(unused)]
142 pub(crate) fn list_files_without_prefix(&self, prefix: Prefix) -> Result<Vec<String>> {
145 let all_files = self.list_all_files()?;
146 let prefix_path = self.prefix_tree_path(prefix.name(), prefix.bit_count());
147 let outside_prefix = all_files
148 .into_iter()
149 .filter(|p| !Path::new(&p).starts_with(&prefix_path.as_path()))
150 .collect();
151 Ok(outside_prefix)
152 }
153
154 #[allow(unused)]
155 pub(crate) fn list_files_with_prefix(&self, prefix: Prefix) -> Result<Vec<String>> {
158 let prefix_path = self.prefix_tree_path(prefix.name(), prefix.bit_count());
159 list_files_in(prefix_path.as_path())
160 }
161}
162
163fn list_files_in(path: &Path) -> Result<Vec<String>> {
164 let files = WalkDir::new(path)
165 .into_iter()
166 .filter_map(|e| match e {
167 Ok(direntry) => Some(direntry),
168 Err(err) => {
169 warn!("ChunkStore: failed to process file entry: {}", err);
170 None
171 }
172 })
173 .filter(|e| e.file_type().is_file())
174 .map(|e| e.path().display().to_string())
175 .collect();
176 Ok(files)
177}
178
179#[cfg(test)]
180mod tests {
181 use sn_interface::types::utils::random_bytes;
182
183 use super::*;
184 use futures::future::join_all;
185 use rayon::prelude::*;
186 use tempfile::tempdir;
187
188 fn init_chunk_disk_store() -> ChunkStore {
189 let root = tempdir().expect("Failed to create temporary directory for chunk disk store");
190 ChunkStore::new(root.path(), UsedSpace::new(usize::MAX))
191 .expect("Failed to create chunk disk store")
192 }
193
194 #[tokio::test]
195 #[ignore]
196 async fn test_write_read_chunk() {
197 let store = init_chunk_disk_store();
198 for _ in 0..10 {
200 let chunk = Chunk::new(random_bytes(100));
201
202 let addr = store
203 .write_chunk(&chunk)
204 .await
205 .expect("Failed to write chunk.");
206
207 let read_chunk = store
208 .read_chunk(&addr)
209 .await
210 .expect("Failed to read chunk.");
211
212 assert_eq!(chunk.value(), read_chunk.value());
213 }
214 }
215
216 #[tokio::test(flavor = "multi_thread")]
217 async fn test_write_read_async_multiple_chunks() {
218 let store = init_chunk_disk_store();
219 let size = 100;
220 let chunks: Vec<Chunk> = std::iter::repeat_with(|| Chunk::new(random_bytes(size)))
221 .take(7)
222 .collect();
223 write_and_read_chunks(&chunks, store).await;
224 }
225
226 #[tokio::test(flavor = "multi_thread")]
227 async fn test_write_read_async_multiple_identical_chunks() {
228 let store = init_chunk_disk_store();
229 let chunks: Vec<Chunk> = std::iter::repeat(Chunk::new(Bytes::from("test_concurrent")))
230 .take(7)
231 .collect();
232 write_and_read_chunks(&chunks, store).await;
233 }
234
235 async fn write_and_read_chunks(chunks: &[Chunk], store: ChunkStore) {
236 let tasks = chunks.iter().map(|c| store.write_chunk(c));
238 let results = join_all(tasks).await;
239
240 let tasks = results.iter().flatten().map(|addr| store.read_chunk(addr));
242 let results = join_all(tasks).await;
243 let read_chunks: Vec<&Chunk> = results.iter().flatten().collect();
244
245 assert!(chunks
247 .par_iter()
248 .all(|c| read_chunks.iter().any(|r| r.value() == c.value())))
249 }
250}