1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// Authors: Robert Lopez
use super::{
data_file::DataFile, error::Error, util::fs::delete_directory, wal::BlockDBLog, BlockDB,
BlockKey,
};
use std::sync::Arc;
use tokio::sync::RwLock;
impl BlockDB {
/// Write bytes into a `DataFile`, creating a new `DataBlock`.
///
/// Returns `BlockKey { data_file_id, data_block_id }`.
///
/// See [Write Distribution](https://gitlab.com/robertlopezdev/block-db/-/blob/main/README.md#write-distribution) in the README for more information.
///
/// ---
/// - **Atomic**
/// - **Non-corruptible**
///
/// ---
/// Example
/// ``` let block_db = BlockDB::open("./data", None).await?;
///
/// let BlockKey { data_file_id, data_block_id } = block_db.write(b"Hello World").await?;
/// ```
pub async fn write<B: AsRef<[u8]>>(&self, bytes: B) -> Result<BlockKey, Error> {
for (_, data_file) in self.data_files.read().await.clone() {
let reader = data_file.read().await;
if !reader.is_full() {
let data_file_id = reader.id.clone();
drop(reader);
return Ok(BlockKey {
data_file_id,
data_block_id: data_file.write().await.write(bytes.as_ref()).await?,
});
}
}
let new_data_file_id = DataFile::generate_id();
let new_data_file_path = self.path.join("data_files").join(&new_data_file_id);
match DataFile::open(
new_data_file_id,
new_data_file_path.clone(),
self.options_store.clone(),
)
.await
{
Ok(mut data_file) => {
if let Err(err) = self
.wal
.write()
.await
.log(&BlockDBLog::AddDataFile(data_file.id.clone()))
.await
{
let _ = data_file.delete().await;
return Err(Error::Walr(err));
}
let data_file_id = data_file.id.clone();
let data_file = Arc::new(RwLock::new(data_file));
self.data_files
.write()
.await
.insert(data_file_id.clone(), data_file.clone());
let data_block_id = data_file.write().await.write(bytes.as_ref()).await?;
Ok(BlockKey {
data_file_id,
data_block_id,
})
}
Err(err) => {
delete_directory(&new_data_file_path).await?;
Err(err)
}
}
}
}