1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use std::{
    fs::{File, OpenOptions},
    io::{Seek, SeekFrom},
    path::{Path, PathBuf},
    ptr,
};

use formatter::BitcaskFormatter;
use fs::FileType;
#[cfg(not(unix))]
use fs4::FileExt;
use storage_id::StorageId;

use crate::formatter::FILE_HEADER_SIZE;

pub mod clock;
pub mod formatter;
pub mod fs;
pub mod options;
pub mod storage_id;
pub mod tombstone;

#[cfg(test)]
#[macro_use]
extern crate assert_matches;

pub fn create_file<P: AsRef<Path>>(
    base_dir: P,
    file_type: FileType,
    storage_id: Option<StorageId>,
    formatter: &BitcaskFormatter,
    init_data_file_capacity: usize,
) -> std::io::Result<File> {
    // Round capacity down to the nearest 8-byte alignment, since the
    // data storage would not be able to take advantage of the space.
    let capacity = std::cmp::max(FILE_HEADER_SIZE, init_data_file_capacity) & !7;

    let path = file_type.get_path(&base_dir, storage_id);
    let file_name = path
        .file_name()
        .and_then(|file_name| file_name.to_str())
        .expect("File name required");

    let tmp_file_path = match path.parent() {
        Some(parent) => parent.join(format!("tmp-{file_name}")),
        None => PathBuf::from(format!("tmp-{file_name}")),
    };

    {
        // Prepare properly formatted file in a temporary file, so in case of failure it won't be corrupted.
        let mut file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .open(&tmp_file_path)?;

        fs::truncate_file(&mut file, capacity)?;

        formatter::initialize_new_file(&mut file, formatter.version())?;

        // Manually sync each file in Windows since sync-ing cannot be done for the whole directory.
        #[cfg(target_os = "windows")]
        {
            file.sync_all()?;
        }
    };

    // File renames are atomic, so we can safely rename the temporary file to the final file.
    std::fs::rename(&tmp_file_path, &path)?;

    let mut file = OpenOptions::new()
        .read(true)
        .write(true)
        .create(false)
        .open(&path)?;
    file.seek(SeekFrom::Start(FILE_HEADER_SIZE as u64))?;

    Ok(file)
}

pub fn resize_file(file: &File, required_capacity: usize) -> std::io::Result<usize> {
    let capacity = required_capacity & !7;
    // fs4 provides some cross-platform bindings which help for Windows.
    #[cfg(not(unix))]
    file.allocate(capacity as u64)?;
    // For all unix systems WAL can just use ftruncate directly
    #[cfg(unix)]
    {
        rustix::fs::ftruncate(file, capacity as u64)?;
    }
    Ok(capacity)
}

pub fn copy_memory(src: &[u8], dst: &mut [u8]) {
    let len_src = src.len();
    assert!(dst.len() >= len_src);
    unsafe {
        ptr::copy_nonoverlapping(src.as_ptr(), dst.as_mut_ptr(), len_src);
    }
}

#[cfg(test)]
mod tests {
    use std::io::{Read, Write};

    use crate::formatter::get_formatter_from_file;

    use super::*;

    use bytes::{Buf, BufMut, Bytes, BytesMut};
    use test_log::test;
    use utilities::common::get_temporary_directory_path;

    #[test]
    fn test_create_file() {
        let dir = get_temporary_directory_path();
        let storage_id = 1;
        let formatter = BitcaskFormatter::default();
        let mut file =
            create_file(&dir, FileType::DataFile, Some(storage_id), &formatter, 100).unwrap();

        let mut bs = BytesMut::with_capacity(4);
        bs.put_u32(101);

        file.write_all(&bs.freeze()).unwrap();
        file.flush().unwrap();

        let mut reopen_file = fs::open_file(dir, FileType::DataFile, Some(storage_id))
            .unwrap()
            .file;
        assert_eq!(
            formatter,
            get_formatter_from_file(&mut reopen_file).unwrap()
        );

        reopen_file
            .seek(SeekFrom::Start(FILE_HEADER_SIZE as u64))
            .unwrap();
        let mut buf = vec![0; 4];
        reopen_file.read_exact(&mut buf).unwrap();
        assert_eq!(101, Bytes::from(buf).get_u32());
    }
}