1use super::{
4 data_file::DataFile, error::Error, util::fs::delete_directory, wal::BlockDBLog, BlockDB,
5 BlockKey,
6};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10impl BlockDB {
11 pub async fn write<B: AsRef<[u8]>>(&self, bytes: B) -> Result<BlockKey, Error> {
28 for (_, data_file) in self.data_files.read().await.clone() {
29 let reader = data_file.read().await;
30
31 if !reader.is_full() {
32 let data_file_id = reader.id.clone();
33 drop(reader);
34
35 return Ok(BlockKey {
36 data_file_id,
37 data_block_id: data_file.write().await.write(bytes.as_ref()).await?,
38 });
39 }
40 }
41
42 let new_data_file_id = DataFile::generate_id();
43 let new_data_file_path = self.path.join("data_files").join(&new_data_file_id);
44
45 match DataFile::open(
46 new_data_file_id,
47 new_data_file_path.clone(),
48 self.options_store.clone(),
49 )
50 .await
51 {
52 Ok(mut data_file) => {
53 if let Err(err) = self
54 .wal
55 .write()
56 .await
57 .log(&BlockDBLog::AddDataFile(data_file.id.clone()))
58 .await
59 {
60 let _ = data_file.delete().await;
61 return Err(Error::Walr(err));
62 }
63
64 let data_file_id = data_file.id.clone();
65 let data_file = Arc::new(RwLock::new(data_file));
66
67 self.data_files
68 .write()
69 .await
70 .insert(data_file_id.clone(), data_file.clone());
71
72 let data_block_id = data_file.write().await.write(bytes.as_ref()).await?;
73
74 Ok(BlockKey {
75 data_file_id,
76 data_block_id,
77 })
78 }
79 Err(err) => {
80 delete_directory(&new_data_file_path).await?;
81
82 Err(err)
83 }
84 }
85 }
86}