1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
pub mod reader;
pub mod writer;

use std::{
    io::{BufWriter, Seek, SeekFrom, Write},
    sync::RwLock,
};

//use reader::Reader;
use writer::Writer;

use crate::database::{table::TableDb, KvDb};

use crate::storage::KvInterface;
use crate::types::{BvObject, BvString};

pub struct FileIO {
    writer: RwLock<Writer<BufWriter<std::fs::File>>>,
    //reader: RwLock<Reader<BufReader<std::fs::File>>>,
}

impl FileIO {
    pub fn new(f: std::fs::File) -> Self {
        FileIO {
            writer: RwLock::new(Writer::new(BufWriter::new(f.try_clone().unwrap()))),
            //reader: RwLock::new(Reader::new(BufReader::new(f))),
        }
    }

    pub fn commit_kv_db<KV: KvInterface>(&mut self, kv: &KvDb<KV>) -> std::io::Result<()>
    where
        for<'a> &'a KV: IntoIterator<Item = (&'a BvString, &'a BvObject)>,
    {
        let mut writer = self.writer.write().unwrap();
        writer.write_all(b"KVIDB")?;

        for record in (&kv.records).into_iter() {
            writer.write_kv_record(record)?;
        }

        writer.flush()?;

        Ok(())
    }

    pub fn commit_table_db(&mut self, tdb: &TableDb) -> std::io::Result<()> {
        let mut writer = self.writer.write().unwrap();
        writer.curr_pos += writer.write(b"TABLEIDB")?;

        writer.curr_pos += writer.write_header()?;

        // Table definitions
        for (name, fields) in tdb.maps.iter() {
            let len = writer.write_table(name, fields)?;
            writer.curr_pos += len as usize;
            writer.table_length += len;
        }

        // Table rows
        let mut table_rows_length = 0;
        for (name, rows) in tdb.rows.iter() {
            if !rows.is_empty() {
                let mut decl_lu_name = Vec::with_capacity(name.len() + "decl_records_start".len());
                decl_lu_name.extend(name);
                decl_lu_name.extend(b"decl_records_start");
                let pos = writer.curr_pos as u64;
                writer.decl_lu_map.insert(decl_lu_name, pos);

                table_rows_length += writer.write_decl_header()?;
                for row in rows.iter() {
                    table_rows_length += writer.write_decl_record(row)?;
                }
                writer.curr_pos += table_rows_length as usize;
                writer.table_rows_length += table_rows_length;

                writer.write_table_rows_data(name, table_rows_length, rows.len() as u64)?;
                writer.writer.seek(SeekFrom::End(0))?;
                table_rows_length = 0;
            }
        }

        writer.write_header()?;

        writer.flush()?;

        Ok(())
    }
}