1use std::collections::HashMap;
2use std::fmt;
3use std::thread;
4
5use chrono::DateTime;
6use chrono::Duration;
7use chrono::Utc;
8
9use rusqlite::Connection;
10use rusqlite::DropBehavior;
11use rusqlite::Transaction;
12use rusqlite::TransactionBehavior;
13
14use dw_models::Bucket;
15use dw_models::Event;
16use dw_models::KeyValue;
17
18use crate::DatastoreError;
19use crate::DatastoreInstance;
20use crate::DatastoreMethod;
21
22use mpsc_requests::ResponseReceiver;
23
24type RequestSender = mpsc_requests::RequestSender<Command, Result<Response, DatastoreError>>;
25type RequestReceiver = mpsc_requests::RequestReceiver<Command, Result<Response, DatastoreError>>;
26
27#[derive(Clone)]
28pub struct Datastore {
29 requester: RequestSender,
30}
31
32impl fmt::Debug for Datastore {
33 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34 write!(f, "Datastore()")
35 }
36}
37
38#[allow(clippy::large_enum_variant)]
46#[derive(Debug, Clone)]
47pub enum Response {
48 Empty(),
49 Bucket(Bucket),
50 BucketMap(HashMap<String, Bucket>),
51 Event(Event),
52 EventList(Vec<Event>),
53 Count(i64),
54 KeyValue(KeyValue),
55 StringVec(Vec<String>),
56}
57
58#[allow(clippy::large_enum_variant)]
59#[derive(Debug, Clone)]
60pub enum Command {
61 CreateBucket(Bucket),
62 DeleteBucket(String),
63 GetBucket(String),
64 GetBuckets(),
65 InsertEvents(String, Vec<Event>),
66 Heartbeat(String, Event, f64),
67 GetEvents(
68 String,
69 Option<DateTime<Utc>>,
70 Option<DateTime<Utc>>,
71 Option<u64>,
72 ),
73 GetEventCount(String, Option<DateTime<Utc>>, Option<DateTime<Utc>>),
74 DeleteEventsById(String, Vec<i64>),
75 ForceCommit(),
76 InsertKeyValue(String, String),
77 GetKeyValue(String),
78 GetKeysStarting(String),
79 DeleteKeyValue(String),
80}
81
82fn _unwrap_response(
83 receiver: ResponseReceiver<Result<Response, DatastoreError>>,
84) -> Result<(), DatastoreError> {
85 match receiver.collect().unwrap() {
86 Ok(r) => match r {
87 Response::Empty() => Ok(()),
88 _ => panic!("Invalid response"),
89 },
90 Err(e) => Err(e),
91 }
92}
93
94struct DatastoreWorker {
95 responder: RequestReceiver,
96 legacy_import: bool,
97 quit: bool,
98 uncommited_events: usize,
99 commit: bool,
100 last_heartbeat: HashMap<String, Option<Event>>,
101}
102
103impl DatastoreWorker {
104 pub fn new(
105 responder: mpsc_requests::RequestReceiver<Command, Result<Response, DatastoreError>>,
106 legacy_import: bool,
107 ) -> Self {
108 DatastoreWorker {
109 responder,
110 legacy_import,
111 quit: false,
112 uncommited_events: 0,
113 commit: false,
114 last_heartbeat: HashMap::new(),
115 }
116 }
117
118 fn work_loop(&mut self, method: DatastoreMethod) {
119 let mut conn = match method {
121 DatastoreMethod::Memory() => {
122 Connection::open_in_memory().expect("Failed to create in-memory datastore")
123 }
124 DatastoreMethod::File(path) => {
125 Connection::open(path).expect("Failed to create datastore")
126 }
127 };
128 let mut ds = DatastoreInstance::new(&conn, true).unwrap();
129
130 if self.legacy_import {
132 let transaction = match conn.transaction_with_behavior(TransactionBehavior::Immediate) {
133 Ok(transaction) => transaction,
134 Err(err) => panic!(
135 "Unable to start immediate transaction on SQLite database! {}",
136 err
137 ),
138 };
139 match ds.ensure_legacy_import(&transaction) {
140 Ok(_) => (),
141 Err(err) => error!("Failed to do legacy import: {:?}", err),
142 }
143 match transaction.commit() {
144 Ok(_) => (),
145 Err(err) => panic!("Failed to commit datastore transaction! {}", err),
146 }
147 }
148
149 loop {
151 let last_commit_time: DateTime<Utc> = Utc::now();
152 let mut transaction = conn
153 .transaction_with_behavior(TransactionBehavior::Immediate)
154 .unwrap();
155 self.uncommited_events = 0;
156 self.commit = false;
157 transaction.set_drop_behavior(DropBehavior::Commit);
158 loop {
159 let (request, response_sender) = match self.responder.poll() {
160 Ok((req, res_sender)) => (req, res_sender),
161 Err(_) => {
162 info!("DB worker quitting");
164 self.quit = true;
165 break;
166 }
167 };
168 let response = self.handle_request(request, &mut ds, &transaction);
169 response_sender.respond(response);
170 let now: DateTime<Utc> = Utc::now();
171 let commit_interval_passed: bool = (now - last_commit_time) > Duration::seconds(15);
172 if self.commit || commit_interval_passed || self.uncommited_events > 100 {
173 break;
174 };
175 }
176 debug!(
177 "Commiting DB! Force commit {}, {} uncommited events",
178 self.commit, self.uncommited_events
179 );
180 match transaction.commit() {
181 Ok(_) => (),
182 Err(err) => panic!("Failed to commit datastore transaction! {}", err),
183 }
184 if self.quit {
185 break;
186 };
187 }
188 info!("DB Worker thread finished");
189 }
190
191 fn handle_request(
192 &mut self,
193 request: Command,
194 ds: &mut DatastoreInstance,
195 transaction: &Transaction,
196 ) -> Result<Response, DatastoreError> {
197 match request {
198 Command::CreateBucket(bucket) => match ds.create_bucket(&transaction, bucket) {
199 Ok(_) => {
200 self.commit = true;
201 Ok(Response::Empty())
202 }
203 Err(e) => Err(e),
204 },
205 Command::DeleteBucket(bucketname) => {
206 match ds.delete_bucket(&transaction, &bucketname) {
207 Ok(_) => {
208 self.commit = true;
209 Ok(Response::Empty())
210 }
211 Err(e) => Err(e),
212 }
213 }
214 Command::GetBucket(bucketname) => match ds.get_bucket(&bucketname) {
215 Ok(b) => Ok(Response::Bucket(b)),
216 Err(e) => Err(e),
217 },
218 Command::GetBuckets() => Ok(Response::BucketMap(ds.get_buckets())),
219 Command::InsertEvents(bucketname, events) => {
220 match ds.insert_events(&transaction, &bucketname, events) {
221 Ok(events) => {
222 self.uncommited_events += events.len();
223 self.last_heartbeat.insert(bucketname.to_string(), None); Ok(Response::EventList(events))
225 }
226 Err(e) => Err(e),
227 }
228 }
229 Command::Heartbeat(bucketname, event, pulsetime) => {
230 match ds.heartbeat(
231 &transaction,
232 &bucketname,
233 event,
234 pulsetime,
235 &mut self.last_heartbeat,
236 ) {
237 Ok(e) => {
238 self.uncommited_events += 1;
239 Ok(Response::Event(e))
240 }
241 Err(e) => Err(e),
242 }
243 }
244 Command::GetEvents(bucketname, starttime_opt, endtime_opt, limit_opt) => {
245 match ds.get_events(
246 &transaction,
247 &bucketname,
248 starttime_opt,
249 endtime_opt,
250 limit_opt,
251 ) {
252 Ok(el) => Ok(Response::EventList(el)),
253 Err(e) => Err(e),
254 }
255 }
256 Command::GetEventCount(bucketname, starttime_opt, endtime_opt) => {
257 match ds.get_event_count(&transaction, &bucketname, starttime_opt, endtime_opt) {
258 Ok(n) => Ok(Response::Count(n)),
259 Err(e) => Err(e),
260 }
261 }
262 Command::DeleteEventsById(bucketname, event_ids) => {
263 match ds.delete_events_by_id(&transaction, &bucketname, event_ids) {
264 Ok(()) => Ok(Response::Empty()),
265 Err(e) => Err(e),
266 }
267 }
268 Command::ForceCommit() => {
269 self.commit = true;
270 Ok(Response::Empty())
271 }
272 Command::InsertKeyValue(key, data) => {
273 match ds.insert_key_value(&transaction, &key, &data) {
274 Ok(()) => Ok(Response::Empty()),
275 Err(e) => Err(e),
276 }
277 }
278 Command::GetKeyValue(key) => match ds.get_key_value(&transaction, &key) {
279 Ok(result) => Ok(Response::KeyValue(result)),
280 Err(e) => Err(e),
281 },
282 Command::GetKeysStarting(pattern) => {
283 match ds.get_keys_starting(&transaction, &pattern) {
284 Ok(result) => Ok(Response::StringVec(result)),
285 Err(e) => Err(e),
286 }
287 }
288 Command::DeleteKeyValue(key) => match ds.delete_key_value(&transaction, &key) {
289 Ok(()) => Ok(Response::Empty()),
290 Err(e) => Err(e),
291 },
292 }
293 }
294}
295
296impl Datastore {
297 pub fn new(dbpath: String, legacy_import: bool) -> Self {
298 let method = DatastoreMethod::File(dbpath);
299 Datastore::_new_internal(method, legacy_import)
300 }
301
302 pub fn new_in_memory(legacy_import: bool) -> Self {
303 let method = DatastoreMethod::Memory();
304 Datastore::_new_internal(method, legacy_import)
305 }
306
307 fn _new_internal(method: DatastoreMethod, legacy_import: bool) -> Self {
308 let (requester, responder) =
309 mpsc_requests::channel::<Command, Result<Response, DatastoreError>>();
310 let _thread = thread::spawn(move || {
311 let mut di = DatastoreWorker::new(responder, legacy_import);
312 di.work_loop(method);
313 });
314 Datastore { requester }
315 }
316
317 pub fn create_bucket(&self, bucket: &Bucket) -> Result<(), DatastoreError> {
318 let cmd = Command::CreateBucket(bucket.clone());
319 let receiver = self.requester.request(cmd).unwrap();
320 match receiver.collect().unwrap() {
321 Ok(_) => Ok(()),
322 Err(e) => Err(e),
323 }
324 }
325
326 pub fn delete_bucket(&self, bucket_id: &str) -> Result<(), DatastoreError> {
327 let cmd = Command::DeleteBucket(bucket_id.to_string());
328 let receiver = self.requester.request(cmd).unwrap();
329 match receiver.collect().unwrap() {
330 Ok(r) => match r {
331 Response::Empty() => Ok(()),
332 _ => panic!("Invalid response"),
333 },
334 Err(e) => Err(e),
335 }
336 }
337
338 pub fn get_bucket(&self, bucket_id: &str) -> Result<Bucket, DatastoreError> {
339 let cmd = Command::GetBucket(bucket_id.to_string());
340 let receiver = self.requester.request(cmd).unwrap();
341 match receiver.collect().unwrap() {
342 Ok(r) => match r {
343 Response::Bucket(b) => Ok(b),
344 _ => panic!("Invalid response"),
345 },
346 Err(e) => Err(e),
347 }
348 }
349
350 pub fn get_buckets(&self) -> Result<HashMap<String, Bucket>, DatastoreError> {
351 let cmd = Command::GetBuckets();
352 let receiver = self.requester.request(cmd).unwrap();
353 match receiver.collect().unwrap() {
354 Ok(r) => match r {
355 Response::BucketMap(bm) => Ok(bm),
356 e => Err(DatastoreError::InternalError(format!(
357 "Invalid response: {:?}",
358 e
359 ))),
360 },
361 Err(e) => Err(e),
362 }
363 }
364
365 pub fn insert_events(
366 &self,
367 bucket_id: &str,
368 events: &[Event],
369 ) -> Result<Vec<Event>, DatastoreError> {
370 let cmd = Command::InsertEvents(bucket_id.to_string(), events.to_vec());
371 let receiver = self.requester.request(cmd).unwrap();
372 match receiver.collect().unwrap() {
373 Ok(r) => match r {
374 Response::EventList(events) => Ok(events),
375 _ => panic!("Invalid response"),
376 },
377 Err(e) => Err(e),
378 }
379 }
380
381 pub fn heartbeat(
382 &self,
383 bucket_id: &str,
384 heartbeat: Event,
385 pulsetime: f64,
386 ) -> Result<Event, DatastoreError> {
387 let cmd = Command::Heartbeat(bucket_id.to_string(), heartbeat, pulsetime);
388 let receiver = self.requester.request(cmd).unwrap();
389 match receiver.collect().unwrap() {
390 Ok(r) => match r {
391 Response::Event(e) => Ok(e),
392 _ => panic!("Invalid response"),
393 },
394 Err(e) => Err(e),
395 }
396 }
397
398 pub fn get_events(
399 &self,
400 bucket_id: &str,
401 starttime_opt: Option<DateTime<Utc>>,
402 endtime_opt: Option<DateTime<Utc>>,
403 limit_opt: Option<u64>,
404 ) -> Result<Vec<Event>, DatastoreError> {
405 let cmd = Command::GetEvents(bucket_id.to_string(), starttime_opt, endtime_opt, limit_opt);
406 let receiver = self.requester.request(cmd).unwrap();
407 match receiver.collect().unwrap() {
408 Ok(r) => match r {
409 Response::EventList(el) => Ok(el),
410 _ => panic!("Invalid response"),
411 },
412 Err(e) => Err(e),
413 }
414 }
415
416 pub fn get_event_count(
417 &self,
418 bucket_id: &str,
419 starttime_opt: Option<DateTime<Utc>>,
420 endtime_opt: Option<DateTime<Utc>>,
421 ) -> Result<i64, DatastoreError> {
422 let cmd = Command::GetEventCount(bucket_id.to_string(), starttime_opt, endtime_opt);
423 let receiver = self.requester.request(cmd).unwrap();
424 match receiver.collect().unwrap() {
425 Ok(r) => match r {
426 Response::Count(n) => Ok(n),
427 _ => panic!("Invalid response"),
428 },
429 Err(e) => Err(e),
430 }
431 }
432
433 pub fn delete_events_by_id(
434 &self,
435 bucket_id: &str,
436 event_ids: Vec<i64>,
437 ) -> Result<(), DatastoreError> {
438 let cmd = Command::DeleteEventsById(bucket_id.to_string(), event_ids);
439 let receiver = self.requester.request(cmd).unwrap();
440 match receiver.collect().unwrap() {
441 Ok(r) => match r {
442 Response::Empty() => Ok(()),
443 _ => panic!("Invalid response"),
444 },
445 Err(e) => Err(e),
446 }
447 }
448
449 pub fn force_commit(&self) -> Result<(), DatastoreError> {
450 let cmd = Command::ForceCommit();
451 let receiver = self.requester.request(cmd).unwrap();
452 match receiver.collect().unwrap() {
453 Ok(r) => match r {
454 Response::Empty() => Ok(()),
455 _ => panic!("Invalid response"),
456 },
457 Err(e) => Err(e),
458 }
459 }
460
461 pub fn insert_key_value(&self, key: &str, data: &str) -> Result<(), DatastoreError> {
462 let cmd = Command::InsertKeyValue(key.to_string(), data.to_string());
463 let receiver = self.requester.request(cmd).unwrap();
464
465 _unwrap_response(receiver)
466 }
467
468 pub fn delete_key_value(&self, key: &str) -> Result<(), DatastoreError> {
469 let cmd = Command::DeleteKeyValue(key.to_string());
470 let receiver = self.requester.request(cmd).unwrap();
471
472 _unwrap_response(receiver)
473 }
474
475 pub fn get_key_value(&self, key: &str) -> Result<KeyValue, DatastoreError> {
476 let cmd = Command::GetKeyValue(key.to_string());
477 let receiver = self.requester.request(cmd).unwrap();
478
479 match receiver.collect().unwrap() {
480 Ok(r) => match r {
481 Response::KeyValue(value) => Ok(value),
482 _ => panic!("Invalid response"),
483 },
484 Err(e) => Err(e),
485 }
486 }
487
488 pub fn get_keys_starting(&self, pattern: &str) -> Result<Vec<String>, DatastoreError> {
489 let cmd = Command::GetKeysStarting(pattern.to_string());
490 let receiver = self.requester.request(cmd).unwrap();
491
492 match receiver.collect().unwrap() {
493 Ok(r) => match r {
494 Response::StringVec(value) => Ok(value),
495 _ => panic!("Invalid response"),
496 },
497 Err(e) => Err(e),
498 }
499 }
500}