dw_datastore/
worker.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::thread;
4
5use chrono::DateTime;
6use chrono::Duration;
7use chrono::Utc;
8
9use rusqlite::Connection;
10use rusqlite::DropBehavior;
11use rusqlite::Transaction;
12use rusqlite::TransactionBehavior;
13
14use dw_models::Bucket;
15use dw_models::Event;
16use dw_models::KeyValue;
17
18use crate::DatastoreError;
19use crate::DatastoreInstance;
20use crate::DatastoreMethod;
21
22use mpsc_requests::ResponseReceiver;
23
24type RequestSender = mpsc_requests::RequestSender<Command, Result<Response, DatastoreError>>;
25type RequestReceiver = mpsc_requests::RequestReceiver<Command, Result<Response, DatastoreError>>;
26
27#[derive(Clone)]
28pub struct Datastore {
29    requester: RequestSender,
30}
31
32impl fmt::Debug for Datastore {
33    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34        write!(f, "Datastore()")
35    }
36}
37
38/*
39 * TODO:
40 * - Allow read requests to go straight through a read-only db connection instead of requesting the
41 * worker thread for better performance?
42 * TODO: Add an seperate "Import" request which does an import with an transaction
43 */
44
45#[allow(clippy::large_enum_variant)]
46#[derive(Debug, Clone)]
47pub enum Response {
48    Empty(),
49    Bucket(Bucket),
50    BucketMap(HashMap<String, Bucket>),
51    Event(Event),
52    EventList(Vec<Event>),
53    Count(i64),
54    KeyValue(KeyValue),
55    StringVec(Vec<String>),
56}
57
58#[allow(clippy::large_enum_variant)]
59#[derive(Debug, Clone)]
60pub enum Command {
61    CreateBucket(Bucket),
62    DeleteBucket(String),
63    GetBucket(String),
64    GetBuckets(),
65    InsertEvents(String, Vec<Event>),
66    Heartbeat(String, Event, f64),
67    GetEvents(
68        String,
69        Option<DateTime<Utc>>,
70        Option<DateTime<Utc>>,
71        Option<u64>,
72    ),
73    GetEventCount(String, Option<DateTime<Utc>>, Option<DateTime<Utc>>),
74    DeleteEventsById(String, Vec<i64>),
75    ForceCommit(),
76    InsertKeyValue(String, String),
77    GetKeyValue(String),
78    GetKeysStarting(String),
79    DeleteKeyValue(String),
80}
81
82fn _unwrap_response(
83    receiver: ResponseReceiver<Result<Response, DatastoreError>>,
84) -> Result<(), DatastoreError> {
85    match receiver.collect().unwrap() {
86        Ok(r) => match r {
87            Response::Empty() => Ok(()),
88            _ => panic!("Invalid response"),
89        },
90        Err(e) => Err(e),
91    }
92}
93
94struct DatastoreWorker {
95    responder: RequestReceiver,
96    legacy_import: bool,
97    quit: bool,
98    uncommited_events: usize,
99    commit: bool,
100    last_heartbeat: HashMap<String, Option<Event>>,
101}
102
103impl DatastoreWorker {
104    pub fn new(
105        responder: mpsc_requests::RequestReceiver<Command, Result<Response, DatastoreError>>,
106        legacy_import: bool,
107    ) -> Self {
108        DatastoreWorker {
109            responder,
110            legacy_import,
111            quit: false,
112            uncommited_events: 0,
113            commit: false,
114            last_heartbeat: HashMap::new(),
115        }
116    }
117
118    fn work_loop(&mut self, method: DatastoreMethod) {
119        // Open SQLite connection
120        let mut conn = match method {
121            DatastoreMethod::Memory() => {
122                Connection::open_in_memory().expect("Failed to create in-memory datastore")
123            }
124            DatastoreMethod::File(path) => {
125                Connection::open(path).expect("Failed to create datastore")
126            }
127        };
128        let mut ds = DatastoreInstance::new(&conn, true).unwrap();
129
130        // Ensure legacy import
131        if self.legacy_import {
132            let transaction = match conn.transaction_with_behavior(TransactionBehavior::Immediate) {
133                Ok(transaction) => transaction,
134                Err(err) => panic!(
135                    "Unable to start immediate transaction on SQLite database! {}",
136                    err
137                ),
138            };
139            match ds.ensure_legacy_import(&transaction) {
140                Ok(_) => (),
141                Err(err) => error!("Failed to do legacy import: {:?}", err),
142            }
143            match transaction.commit() {
144                Ok(_) => (),
145                Err(err) => panic!("Failed to commit datastore transaction! {}", err),
146            }
147        }
148
149        // Start handling and respond to requests
150        loop {
151            let last_commit_time: DateTime<Utc> = Utc::now();
152            let mut transaction = conn
153                .transaction_with_behavior(TransactionBehavior::Immediate)
154                .unwrap();
155            self.uncommited_events = 0;
156            self.commit = false;
157            transaction.set_drop_behavior(DropBehavior::Commit);
158            loop {
159                let (request, response_sender) = match self.responder.poll() {
160                    Ok((req, res_sender)) => (req, res_sender),
161                    Err(_) => {
162                        // All references to responder is gone, quit
163                        info!("DB worker quitting");
164                        self.quit = true;
165                        break;
166                    }
167                };
168                let response = self.handle_request(request, &mut ds, &transaction);
169                response_sender.respond(response);
170                let now: DateTime<Utc> = Utc::now();
171                let commit_interval_passed: bool = (now - last_commit_time) > Duration::seconds(15);
172                if self.commit || commit_interval_passed || self.uncommited_events > 100 {
173                    break;
174                };
175            }
176            debug!(
177                "Commiting DB! Force commit {}, {} uncommited events",
178                self.commit, self.uncommited_events
179            );
180            match transaction.commit() {
181                Ok(_) => (),
182                Err(err) => panic!("Failed to commit datastore transaction! {}", err),
183            }
184            if self.quit {
185                break;
186            };
187        }
188        info!("DB Worker thread finished");
189    }
190
191    fn handle_request(
192        &mut self,
193        request: Command,
194        ds: &mut DatastoreInstance,
195        transaction: &Transaction,
196    ) -> Result<Response, DatastoreError> {
197        match request {
198            Command::CreateBucket(bucket) => match ds.create_bucket(&transaction, bucket) {
199                Ok(_) => {
200                    self.commit = true;
201                    Ok(Response::Empty())
202                }
203                Err(e) => Err(e),
204            },
205            Command::DeleteBucket(bucketname) => {
206                match ds.delete_bucket(&transaction, &bucketname) {
207                    Ok(_) => {
208                        self.commit = true;
209                        Ok(Response::Empty())
210                    }
211                    Err(e) => Err(e),
212                }
213            }
214            Command::GetBucket(bucketname) => match ds.get_bucket(&bucketname) {
215                Ok(b) => Ok(Response::Bucket(b)),
216                Err(e) => Err(e),
217            },
218            Command::GetBuckets() => Ok(Response::BucketMap(ds.get_buckets())),
219            Command::InsertEvents(bucketname, events) => {
220                match ds.insert_events(&transaction, &bucketname, events) {
221                    Ok(events) => {
222                        self.uncommited_events += events.len();
223                        self.last_heartbeat.insert(bucketname.to_string(), None); // invalidate last_heartbeat cache
224                        Ok(Response::EventList(events))
225                    }
226                    Err(e) => Err(e),
227                }
228            }
229            Command::Heartbeat(bucketname, event, pulsetime) => {
230                match ds.heartbeat(
231                    &transaction,
232                    &bucketname,
233                    event,
234                    pulsetime,
235                    &mut self.last_heartbeat,
236                ) {
237                    Ok(e) => {
238                        self.uncommited_events += 1;
239                        Ok(Response::Event(e))
240                    }
241                    Err(e) => Err(e),
242                }
243            }
244            Command::GetEvents(bucketname, starttime_opt, endtime_opt, limit_opt) => {
245                match ds.get_events(
246                    &transaction,
247                    &bucketname,
248                    starttime_opt,
249                    endtime_opt,
250                    limit_opt,
251                ) {
252                    Ok(el) => Ok(Response::EventList(el)),
253                    Err(e) => Err(e),
254                }
255            }
256            Command::GetEventCount(bucketname, starttime_opt, endtime_opt) => {
257                match ds.get_event_count(&transaction, &bucketname, starttime_opt, endtime_opt) {
258                    Ok(n) => Ok(Response::Count(n)),
259                    Err(e) => Err(e),
260                }
261            }
262            Command::DeleteEventsById(bucketname, event_ids) => {
263                match ds.delete_events_by_id(&transaction, &bucketname, event_ids) {
264                    Ok(()) => Ok(Response::Empty()),
265                    Err(e) => Err(e),
266                }
267            }
268            Command::ForceCommit() => {
269                self.commit = true;
270                Ok(Response::Empty())
271            }
272            Command::InsertKeyValue(key, data) => {
273                match ds.insert_key_value(&transaction, &key, &data) {
274                    Ok(()) => Ok(Response::Empty()),
275                    Err(e) => Err(e),
276                }
277            }
278            Command::GetKeyValue(key) => match ds.get_key_value(&transaction, &key) {
279                Ok(result) => Ok(Response::KeyValue(result)),
280                Err(e) => Err(e),
281            },
282            Command::GetKeysStarting(pattern) => {
283                match ds.get_keys_starting(&transaction, &pattern) {
284                    Ok(result) => Ok(Response::StringVec(result)),
285                    Err(e) => Err(e),
286                }
287            }
288            Command::DeleteKeyValue(key) => match ds.delete_key_value(&transaction, &key) {
289                Ok(()) => Ok(Response::Empty()),
290                Err(e) => Err(e),
291            },
292        }
293    }
294}
295
296impl Datastore {
297    pub fn new(dbpath: String, legacy_import: bool) -> Self {
298        let method = DatastoreMethod::File(dbpath);
299        Datastore::_new_internal(method, legacy_import)
300    }
301
302    pub fn new_in_memory(legacy_import: bool) -> Self {
303        let method = DatastoreMethod::Memory();
304        Datastore::_new_internal(method, legacy_import)
305    }
306
307    fn _new_internal(method: DatastoreMethod, legacy_import: bool) -> Self {
308        let (requester, responder) =
309            mpsc_requests::channel::<Command, Result<Response, DatastoreError>>();
310        let _thread = thread::spawn(move || {
311            let mut di = DatastoreWorker::new(responder, legacy_import);
312            di.work_loop(method);
313        });
314        Datastore { requester }
315    }
316
317    pub fn create_bucket(&self, bucket: &Bucket) -> Result<(), DatastoreError> {
318        let cmd = Command::CreateBucket(bucket.clone());
319        let receiver = self.requester.request(cmd).unwrap();
320        match receiver.collect().unwrap() {
321            Ok(_) => Ok(()),
322            Err(e) => Err(e),
323        }
324    }
325
326    pub fn delete_bucket(&self, bucket_id: &str) -> Result<(), DatastoreError> {
327        let cmd = Command::DeleteBucket(bucket_id.to_string());
328        let receiver = self.requester.request(cmd).unwrap();
329        match receiver.collect().unwrap() {
330            Ok(r) => match r {
331                Response::Empty() => Ok(()),
332                _ => panic!("Invalid response"),
333            },
334            Err(e) => Err(e),
335        }
336    }
337
338    pub fn get_bucket(&self, bucket_id: &str) -> Result<Bucket, DatastoreError> {
339        let cmd = Command::GetBucket(bucket_id.to_string());
340        let receiver = self.requester.request(cmd).unwrap();
341        match receiver.collect().unwrap() {
342            Ok(r) => match r {
343                Response::Bucket(b) => Ok(b),
344                _ => panic!("Invalid response"),
345            },
346            Err(e) => Err(e),
347        }
348    }
349
350    pub fn get_buckets(&self) -> Result<HashMap<String, Bucket>, DatastoreError> {
351        let cmd = Command::GetBuckets();
352        let receiver = self.requester.request(cmd).unwrap();
353        match receiver.collect().unwrap() {
354            Ok(r) => match r {
355                Response::BucketMap(bm) => Ok(bm),
356                e => Err(DatastoreError::InternalError(format!(
357                    "Invalid response: {:?}",
358                    e
359                ))),
360            },
361            Err(e) => Err(e),
362        }
363    }
364
365    pub fn insert_events(
366        &self,
367        bucket_id: &str,
368        events: &[Event],
369    ) -> Result<Vec<Event>, DatastoreError> {
370        let cmd = Command::InsertEvents(bucket_id.to_string(), events.to_vec());
371        let receiver = self.requester.request(cmd).unwrap();
372        match receiver.collect().unwrap() {
373            Ok(r) => match r {
374                Response::EventList(events) => Ok(events),
375                _ => panic!("Invalid response"),
376            },
377            Err(e) => Err(e),
378        }
379    }
380
381    pub fn heartbeat(
382        &self,
383        bucket_id: &str,
384        heartbeat: Event,
385        pulsetime: f64,
386    ) -> Result<Event, DatastoreError> {
387        let cmd = Command::Heartbeat(bucket_id.to_string(), heartbeat, pulsetime);
388        let receiver = self.requester.request(cmd).unwrap();
389        match receiver.collect().unwrap() {
390            Ok(r) => match r {
391                Response::Event(e) => Ok(e),
392                _ => panic!("Invalid response"),
393            },
394            Err(e) => Err(e),
395        }
396    }
397
398    pub fn get_events(
399        &self,
400        bucket_id: &str,
401        starttime_opt: Option<DateTime<Utc>>,
402        endtime_opt: Option<DateTime<Utc>>,
403        limit_opt: Option<u64>,
404    ) -> Result<Vec<Event>, DatastoreError> {
405        let cmd = Command::GetEvents(bucket_id.to_string(), starttime_opt, endtime_opt, limit_opt);
406        let receiver = self.requester.request(cmd).unwrap();
407        match receiver.collect().unwrap() {
408            Ok(r) => match r {
409                Response::EventList(el) => Ok(el),
410                _ => panic!("Invalid response"),
411            },
412            Err(e) => Err(e),
413        }
414    }
415
416    pub fn get_event_count(
417        &self,
418        bucket_id: &str,
419        starttime_opt: Option<DateTime<Utc>>,
420        endtime_opt: Option<DateTime<Utc>>,
421    ) -> Result<i64, DatastoreError> {
422        let cmd = Command::GetEventCount(bucket_id.to_string(), starttime_opt, endtime_opt);
423        let receiver = self.requester.request(cmd).unwrap();
424        match receiver.collect().unwrap() {
425            Ok(r) => match r {
426                Response::Count(n) => Ok(n),
427                _ => panic!("Invalid response"),
428            },
429            Err(e) => Err(e),
430        }
431    }
432
433    pub fn delete_events_by_id(
434        &self,
435        bucket_id: &str,
436        event_ids: Vec<i64>,
437    ) -> Result<(), DatastoreError> {
438        let cmd = Command::DeleteEventsById(bucket_id.to_string(), event_ids);
439        let receiver = self.requester.request(cmd).unwrap();
440        match receiver.collect().unwrap() {
441            Ok(r) => match r {
442                Response::Empty() => Ok(()),
443                _ => panic!("Invalid response"),
444            },
445            Err(e) => Err(e),
446        }
447    }
448
449    pub fn force_commit(&self) -> Result<(), DatastoreError> {
450        let cmd = Command::ForceCommit();
451        let receiver = self.requester.request(cmd).unwrap();
452        match receiver.collect().unwrap() {
453            Ok(r) => match r {
454                Response::Empty() => Ok(()),
455                _ => panic!("Invalid response"),
456            },
457            Err(e) => Err(e),
458        }
459    }
460
461    pub fn insert_key_value(&self, key: &str, data: &str) -> Result<(), DatastoreError> {
462        let cmd = Command::InsertKeyValue(key.to_string(), data.to_string());
463        let receiver = self.requester.request(cmd).unwrap();
464
465        _unwrap_response(receiver)
466    }
467
468    pub fn delete_key_value(&self, key: &str) -> Result<(), DatastoreError> {
469        let cmd = Command::DeleteKeyValue(key.to_string());
470        let receiver = self.requester.request(cmd).unwrap();
471
472        _unwrap_response(receiver)
473    }
474
475    pub fn get_key_value(&self, key: &str) -> Result<KeyValue, DatastoreError> {
476        let cmd = Command::GetKeyValue(key.to_string());
477        let receiver = self.requester.request(cmd).unwrap();
478
479        match receiver.collect().unwrap() {
480            Ok(r) => match r {
481                Response::KeyValue(value) => Ok(value),
482                _ => panic!("Invalid response"),
483            },
484            Err(e) => Err(e),
485        }
486    }
487
488    pub fn get_keys_starting(&self, pattern: &str) -> Result<Vec<String>, DatastoreError> {
489        let cmd = Command::GetKeysStarting(pattern.to_string());
490        let receiver = self.requester.request(cmd).unwrap();
491
492        match receiver.collect().unwrap() {
493            Ok(r) => match r {
494                Response::StringVec(value) => Ok(value),
495                _ => panic!("Invalid response"),
496            },
497            Err(e) => Err(e),
498        }
499    }
500}