1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
use std::error::Error;
use std::path::PathBuf;
use std::str;
use std::sync::Arc;

use futures::channel::oneshot;
use locustdb_serialization::event_buffer::EventBuffer;

use crate::engine::query_task::QueryTask;
use crate::ingest::colgen::GenTable;
use crate::ingest::csv_loader::{CSVIngestionTask, Options as LoadOptions};
use crate::mem_store::*;
use crate::perf_counter::PerfCounter;
use crate::scheduler::*;
use crate::syntax::parser;
use crate::QueryError;
use crate::QueryResult;

// Cannot implement Clone on LocustDB without changing Drop implementation.
pub struct LocustDB {
    inner_locustdb: Arc<InnerLocustDB>,
}

impl LocustDB {
    pub fn memory_only() -> LocustDB {
        LocustDB::new(&Options::default())
    }

    pub fn new(opts: &Options) -> LocustDB {
        opts.validate().expect("Invalid options");
        let locustdb = Arc::new(InnerLocustDB::new(opts));
        InnerLocustDB::start_worker_threads(&locustdb);
        LocustDB {
            inner_locustdb: locustdb,
        }
    }

    pub async fn run_query(
        &self,
        query: &str,
        explain: bool,
        rowformat: bool,
        show: Vec<usize>,
    ) -> Result<QueryResult, oneshot::Canceled> {
        let (sender, receiver) = oneshot::channel();

        // PERF: perform compilation and table snapshot in asynchronous task?
        let query = match parser::parse_query(query) {
            Ok(query) => query,
            Err(err) => return Ok(Err(err)),
        };

        let mut data = match self.inner_locustdb.snapshot(&query.table) {
            Some(data) => data,
            None => {
                return Ok(Err(QueryError::NotImplemented(format!(
                    "Table {} does not exist!",
                    &query.table
                ))))
            }
        };

        if self.inner_locustdb.opts().seq_disk_read {
            self.inner_locustdb
                .disk_read_scheduler()
                .schedule_sequential_read(
                    &mut data,
                    &query.find_referenced_cols(),
                    self.inner_locustdb.opts().readahead,
                );
            let ldb = self.inner_locustdb.clone();
            let (read_data, _) =
                <dyn Task>::from_fn(move || ldb.disk_read_scheduler().service_reads(&ldb));
            self.inner_locustdb.schedule(read_data);
        }

        let query_task = QueryTask::new(
            query,
            rowformat,
            explain,
            show,
            data,
            self.inner_locustdb.disk_read_scheduler().clone(),
            SharedSender::new(sender),
            self.inner_locustdb.opts().batch_size,
        );

        match query_task {
            Ok(task) => {
                self.schedule(task);
                let result = receiver.await?;
                Ok(result)
            }
            Err(err) => Ok(Err(err)),
        }
    }

    pub async fn load_csv(&self, options: LoadOptions) -> Result<(), Box<dyn Error>> {
        let (sender, receiver) = oneshot::channel();
        let task = CSVIngestionTask::new(
            options,
            self.inner_locustdb.clone(),
            SharedSender::new(sender),
        );
        self.schedule(task);
        Ok(receiver.await??)
    }

    pub async fn ingest_efficient(&self, events: EventBuffer) {
        self.inner_locustdb.ingest_efficient(events);
    }

    pub async fn gen_table(&self, opts: GenTable) -> Result<(), oneshot::Canceled> {
        let mut receivers = Vec::new();
        let opts = Arc::new(opts);
        for partition in 0..opts.partitions {
            let opts = opts.clone();
            let inner = self.inner_locustdb.clone();
            let (task, receiver) =
                <dyn Task>::from_fn(move || inner.gen_partition(&opts, partition as u64));
            self.schedule(task);
            receivers.push(receiver);
        }
        for receiver in receivers {
            receiver.await?;
        }
        Ok(())
    }

    pub fn ast(&self, query: &str) -> String {
        match parser::parse_query(query) {
            Ok(query) => format!("{:#?}", query),
            Err(err) => format!("{:?}", err),
        }
    }

    pub fn search_column_names(&self, table: &str, query: &str) -> Vec<String> {
        self.inner_locustdb.search_column_names(table, query)
    }

    pub async fn bulk_load(&self) -> Result<Vec<MemTreeTable>, oneshot::Canceled> {
        for table in self.inner_locustdb.full_snapshot() {
            self.inner_locustdb
                .disk_read_scheduler()
                .schedule_bulk_load(table, self.inner_locustdb.opts().readahead);
        }
        let mut receivers = Vec::new();
        for _ in 0..self.inner_locustdb.opts().read_threads {
            let ldb = self.inner_locustdb.clone();
            let (read_data, receiver) =
                <dyn Task>::from_fn(move || ldb.disk_read_scheduler().service_reads(&ldb));
            self.inner_locustdb.schedule(read_data);
            receivers.push(receiver);
        }
        for receiver in receivers {
            receiver.await?;
        }
        self.mem_tree(2, None).await
    }

    pub fn recover(&self) {
        self.inner_locustdb.drop_pending_tasks();
        InnerLocustDB::start_worker_threads(&self.inner_locustdb);
    }

    pub async fn mem_tree(&self, depth: usize, table: Option<String>) -> Result<Vec<MemTreeTable>, oneshot::Canceled> {
        let inner = self.inner_locustdb.clone();
        let (task, receiver) = <dyn Task>::from_fn(move || inner.mem_tree(depth, table.clone()));
        self.schedule(task);
        receiver.await
    }

    pub async fn table_stats(&self) -> Result<Vec<TableStats>, oneshot::Canceled> {
        let inner = self.inner_locustdb.clone();
        let (task, receiver) = <dyn Task>::from_fn(move || inner.stats());
        self.schedule(task);
        receiver.await
    }

    pub fn schedule<T: Task + 'static>(&self, task: T) {
        self.inner_locustdb.schedule(task)
    }

    pub fn perf_counter(&self) -> &PerfCounter {
        self.inner_locustdb.perf_counter()
    }

    pub fn force_flush(&self) {
        let inner = self.inner_locustdb.clone();
        std::thread::spawn(move || inner.wal_flush())
            .join()
            .unwrap();
    }

    pub fn evict_cache(&self) -> usize {
        self.inner_locustdb.evict_cache()
    }
}

#[derive(Clone)]
pub struct Options {
    pub threads: usize,
    pub read_threads: usize,
    pub db_path: Option<PathBuf>,
    pub mem_size_limit_tables: usize,
    pub mem_lz4: bool,
    pub readahead: usize,
    pub seq_disk_read: bool,
    /// Maximum size of WAL in bytes before triggering compaction
    pub max_wal_size_bytes: u64,
    /// Maximum size of partition
    pub max_partition_size_bytes: u64,
    /// Combine partitions when the size of every original partition is less than this factor of the combined partition size
    pub partition_combine_factor: u64,
    /// Maximum length of temporary buffer used in streaming stages during query execution
    pub batch_size: usize,
    /// Maximum number of rows in a partitions. Not implemented.
    pub max_partition_length: usize,
}

impl Default for Options {
    fn default() -> Options {
        Options {
            threads: num_cpus::get(),
            read_threads: num_cpus::get(),
            db_path: None,
            mem_size_limit_tables: 8 * 1024 * 1024 * 1024, // 8 GiB
            mem_lz4: true,
            readahead: 256 * 1024 * 1024, // 256 MiB
            seq_disk_read: false,
            max_wal_size_bytes: 64 * 1024 * 1024, // 64 MiB
            max_partition_size_bytes: 8 * 1024 * 1024, // 8 MiB
            partition_combine_factor: 4,
            batch_size: 1024,
            max_partition_length: 1024 * 1024,
        }
    }
}

impl Options {
    fn validate(&self) -> Result<(), String> {
        if self.threads == 0 {
            return Err("threads must be greater than 0".to_string());
        }
        if self.read_threads == 0 {
            return Err("read_threads must be greater than 0".to_string());
        }
        // if self.partition_combine_factor == 0 {
        //     return Err("partition_combine_factor must be greater than 0".to_string());
        // }
        if self.batch_size % 8 != 0 {
            return Err("batch_size must be a multiple of 8".to_string());
        }
        Ok(())
    }
}

impl Drop for LocustDB {
    fn drop(&mut self) {
        self.inner_locustdb.stop();
    }
}