1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use super::database_logger::DatabaseLogger;
use crate::address::Address;
use crate::db::message_log::MessageLog;
use crate::db::ostorage::memory_object_db::MemoryObjectDB;
use crate::db::ostorage::sled_object_db::SledObjectDB;
use crate::db::ostorage::ObjectDB;
use crate::db::relational_db::RelationalDB;
use crate::db::{Config, FsyncPolicy, Storage};
use crate::error::DBError;
use crate::identity::Identity;
use crate::messages::control_db::Database;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};

#[derive(Clone)]
pub struct DatabaseInstanceContext {
    pub database_instance_id: u64,
    pub database_id: u64,
    pub identity: Identity,
    pub address: Address,
    pub logger: Arc<DatabaseLogger>,
    pub relational_db: Arc<RelationalDB>,
    pub publisher_address: Option<Address>,
}

impl DatabaseInstanceContext {
    pub fn from_database(config: Config, database: &Database, instance_id: u64, root_db_path: PathBuf) -> Arc<Self> {
        let mut db_path = root_db_path;
        db_path.extend([&*database.address.to_hex(), &*instance_id.to_string()]);
        db_path.push("database");

        let log_path = DatabaseLogger::filepath(&database.address, instance_id);

        Self::new(
            config,
            instance_id,
            database.id,
            database.identity,
            database.address,
            db_path,
            &log_path,
            database.publisher_address,
        )
    }

    pub fn scheduler_db_path(&self, root_db_path: PathBuf) -> PathBuf {
        let mut scheduler_db_path = root_db_path;
        scheduler_db_path.extend([&*self.address.to_hex(), &*self.database_instance_id.to_string()]);
        scheduler_db_path.push("scheduler");
        scheduler_db_path
    }

    #[allow(clippy::too_many_arguments)]
    pub fn new(
        config: Config,
        database_instance_id: u64,
        database_id: u64,
        identity: Identity,
        address: Address,
        db_path: PathBuf,
        log_path: &Path,
        publisher_address: Option<Address>,
    ) -> Arc<Self> {
        let message_log = match config.storage {
            Storage::Memory => None,
            Storage::Disk => {
                let mlog_path = db_path.join("mlog");
                Some(Arc::new(Mutex::new(MessageLog::open(mlog_path).unwrap())))
            }
        };

        let odb = match config.storage {
            Storage::Memory => Box::<MemoryObjectDB>::default(),
            Storage::Disk => {
                let odb_path = db_path.join("odb");
                DatabaseInstanceContext::make_default_ostorage(odb_path)
            }
        };
        let odb = Arc::new(Mutex::new(odb));

        Arc::new(Self {
            database_instance_id,
            database_id,
            identity,
            address,
            logger: Arc::new(DatabaseLogger::open(log_path)),
            relational_db: Arc::new(
                RelationalDB::open(db_path, message_log, odb, address, config.fsync != FsyncPolicy::Never).unwrap(),
            ),
            publisher_address,
        })
    }

    pub(crate) fn make_default_ostorage(path: impl AsRef<Path>) -> Box<dyn ObjectDB + Send> {
        Box::new(SledObjectDB::open(path).unwrap())
    }

    /// The number of bytes on disk occupied by the [MessageLog].
    pub fn message_log_size_on_disk(&self) -> Result<u64, DBError> {
        self.relational_db.commit_log().message_log_size_on_disk()
    }

    /// The number of bytes on disk occupied by the [ObjectDB].
    pub fn object_db_size_on_disk(&self) -> Result<u64, DBError> {
        self.relational_db.commit_log().object_db_size_on_disk()
    }

    /// The size of the log file.
    pub fn log_file_size(&self) -> Result<u64, DBError> {
        self.logger.size()
    }
}