block_db/
write.rs

1// Authors: Robert Lopez
2
3use super::{
4    data_file::DataFile, error::Error, util::fs::delete_directory, wal::BlockDBLog, BlockDB,
5    BlockKey,
6};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10impl BlockDB {
11    /// Write bytes into a `DataFile`, creating a new `DataBlock`.
12    ///
13    /// Returns `BlockKey { data_file_id, data_block_id }`.
14    ///
15    /// See [Write Distribution](https://gitlab.com/robertlopezdev/block-db/-/blob/main/README.md#write-distribution) in the README for more information.
16    ///
17    /// ---
18    /// - **Atomic**
19    /// - **Non-corruptible**
20    ///
21    /// ---
22    /// Example
23    /// ``` let block_db = BlockDB::open("./data", None).await?;
24    ///
25    /// let BlockKey { data_file_id, data_block_id } = block_db.write(b"Hello World").await?;
26    /// ```
27    pub async fn write<B: AsRef<[u8]>>(&self, bytes: B) -> Result<BlockKey, Error> {
28        for (_, data_file) in self.data_files.read().await.clone() {
29            let reader = data_file.read().await;
30
31            if !reader.is_full() {
32                let data_file_id = reader.id.clone();
33                drop(reader);
34
35                return Ok(BlockKey {
36                    data_file_id,
37                    data_block_id: data_file.write().await.write(bytes.as_ref()).await?,
38                });
39            }
40        }
41
42        let new_data_file_id = DataFile::generate_id();
43        let new_data_file_path = self.path.join("data_files").join(&new_data_file_id);
44
45        match DataFile::open(
46            new_data_file_id,
47            new_data_file_path.clone(),
48            self.options_store.clone(),
49        )
50        .await
51        {
52            Ok(mut data_file) => {
53                if let Err(err) = self
54                    .wal
55                    .write()
56                    .await
57                    .log(&BlockDBLog::AddDataFile(data_file.id.clone()))
58                    .await
59                {
60                    let _ = data_file.delete().await;
61                    return Err(Error::Walr(err));
62                }
63
64                let data_file_id = data_file.id.clone();
65                let data_file = Arc::new(RwLock::new(data_file));
66
67                self.data_files
68                    .write()
69                    .await
70                    .insert(data_file_id.clone(), data_file.clone());
71
72                let data_block_id = data_file.write().await.write(bytes.as_ref()).await?;
73
74                Ok(BlockKey {
75                    data_file_id,
76                    data_block_id,
77                })
78            }
79            Err(err) => {
80                delete_directory(&new_data_file_path).await?;
81
82                Err(err)
83            }
84        }
85    }
86}