1use std::collections::HashMap;
2
3use chrono::DateTime;
4use chrono::Duration;
5use chrono::NaiveDateTime;
6use chrono::Utc;
7
8use rusqlite::Connection;
9
10use serde_json::value::Value;
11
12use dw_models::Bucket;
13use dw_models::BucketMetadata;
14use dw_models::Event;
15use dw_models::KeyValue;
16
17use rusqlite::params;
18use rusqlite::types::ToSql;
19
20use super::DatastoreError;
21
22fn _get_db_version(conn: &Connection) -> i32 {
23 conn.pragma_query_value(None, "user_version", |row| row.get(0))
24 .unwrap()
25}
26
27static NEWEST_DB_VERSION: i32 = 4;
36
37fn _create_tables(conn: &Connection, version: i32) -> bool {
38 let mut first_init = false;
39
40 if version < 1 {
41 first_init = true;
42 _migrate_v0_to_v1(conn);
43 }
44
45 if version < 2 {
46 _migrate_v1_to_v2(conn);
47 }
48
49 if version < 3 {
50 _migrate_v2_to_v3(conn);
51 }
52
53 if version < 4 {
54 _migrate_v3_to_v4(conn);
55 }
56
57 first_init
58}
59
60fn _migrate_v0_to_v1(conn: &Connection) {
61 conn.execute(
63 "
64 CREATE TABLE IF NOT EXISTS buckets (
65 id INTEGER PRIMARY KEY AUTOINCREMENT,
66 name TEXT UNIQUE NOT NULL,
67 type TEXT NOT NULL,
68 client TEXT NOT NULL,
69 hostname TEXT NOT NULL,
70 created TEXT NOT NULL
71 )",
72 &[] as &[&dyn ToSql],
73 )
74 .expect("Failed to create buckets table");
75
76 conn.execute(
78 "CREATE INDEX IF NOT EXISTS bucket_id_index ON buckets(id)",
79 &[] as &[&dyn ToSql],
80 )
81 .expect("Failed to create buckets index");
82
83 conn.execute(
85 "
86 CREATE TABLE IF NOT EXISTS events (
87 id INTEGER PRIMARY KEY AUTOINCREMENT,
88 bucketrow INTEGER NOT NULL,
89 starttime INTEGER NOT NULL,
90 endtime INTEGER NOT NULL,
91 data TEXT NOT NULL,
92 FOREIGN KEY (bucketrow) REFERENCES buckets(id)
93 )",
94 &[] as &[&dyn ToSql],
95 )
96 .expect("Failed to create events table");
97
98 conn.execute(
100 "CREATE INDEX IF NOT EXISTS events_bucketrow_index ON events(bucketrow)",
101 &[] as &[&dyn ToSql],
102 )
103 .expect("Failed to create events_bucketrow index");
104 conn.execute(
105 "CREATE INDEX IF NOT EXISTS events_starttime_index ON events(starttime)",
106 &[] as &[&dyn ToSql],
107 )
108 .expect("Failed to create events_starttime index");
109 conn.execute(
110 "CREATE INDEX IF NOT EXISTS events_endtime_index ON events(endtime)",
111 &[] as &[&dyn ToSql],
112 )
113 .expect("Failed to create events_endtime index");
114
115 conn.pragma_update(None, "user_version", &1)
117 .expect("Failed to update database version!");
118}
119
120fn _migrate_v1_to_v2(conn: &Connection) {
121 info!("Upgrading database to v2, adding data field to buckets");
122 conn.execute(
123 "ALTER TABLE buckets ADD COLUMN data TEXT DEFAULT '{}';",
124 &[] as &[&dyn ToSql],
125 )
126 .expect("Failed to upgrade database when adding data field to buckets");
127
128 conn.pragma_update(None, "user_version", &2)
129 .expect("Failed to update database version!");
130}
131
132fn _migrate_v2_to_v3(conn: &Connection) {
133 info!("Upgrading database to v3, replacing the broken data field for buckets");
135
136 match conn.execute(
138 "ALTER TABLE buckets RENAME COLUMN data TO data_deprecated;",
139 &[] as &[&dyn ToSql],
140 ) {
141 Ok(_) => (),
142 Err(rusqlite::Error::ExecuteReturnedResults) => (),
144 Err(e) => panic!("Unexpected error: {:?}", e),
145 };
146
147 conn.execute(
149 "ALTER TABLE buckets ADD COLUMN data TEXT NOT NULL DEFAULT '{}';",
150 &[] as &[&dyn ToSql],
151 )
152 .expect("Failed to upgrade database when adding new data field to buckets");
153
154 conn.pragma_update(None, "user_version", &3)
155 .expect("Failed to update database version!");
156}
157
158fn _migrate_v3_to_v4(conn: &Connection) {
159 info!("Upgrading database to v4, adding table for key-value storage");
160 conn.execute(
161 "CREATE TABLE key_value (
162 key TEXT PRIMARY KEY,
163 value TEXT,
164 last_modified NUMBER NOT NULL
165 );",
166 [],
167 )
168 .expect("Failed to upgrade db and add key-value storage table");
169
170 conn.pragma_update(None, "user_version", &4)
171 .expect("Failed to update database version!");
172}
173
174pub struct DatastoreInstance {
175 buckets_cache: HashMap<String, Bucket>,
176 first_init: bool,
177 pub db_version: i32,
178}
179
180impl DatastoreInstance {
181 pub fn new(
182 conn: &Connection,
183 migrate_enabled: bool,
184 ) -> Result<DatastoreInstance, DatastoreError> {
185 let mut first_init = false;
186 let db_version = _get_db_version(&conn);
187
188 if migrate_enabled {
189 first_init = _create_tables(&conn, db_version);
190 } else if db_version < 0 {
191 return Err(DatastoreError::Uninitialized(
192 "Tried to open an uninitialized datastore with migration disabled".to_string(),
193 ));
194 } else if db_version != NEWEST_DB_VERSION {
195 return Err(DatastoreError::OldDbVersion(format!(
196 "\
197 Tried to open an database with an incompatible database version!
198 Database has version {} while the supported version is {}",
199 db_version, NEWEST_DB_VERSION
200 )));
201 }
202
203 let mut ds = DatastoreInstance {
204 buckets_cache: HashMap::new(),
205 first_init,
206 db_version,
207 };
208 ds.get_stored_buckets(&conn)?;
209 Ok(ds)
210 }
211
212 fn get_stored_buckets(&mut self, conn: &Connection) -> Result<(), DatastoreError> {
213 let mut stmt = match conn.prepare(
214 "
215 SELECT buckets.id, buckets.name, buckets.type, buckets.client,
216 buckets.hostname, buckets.created,
217 min(events.starttime), max(events.endtime),
218 buckets.data
219 FROM buckets
220 LEFT OUTER JOIN events ON buckets.id = events.bucketrow
221 GROUP BY buckets.id
222 ;",
223 ) {
224 Ok(stmt) => stmt,
225 Err(err) => {
226 return Err(DatastoreError::InternalError(format!(
227 "Failed to prepare get_stored_buckets SQL statement: {}",
228 err.to_string()
229 )))
230 }
231 };
232 let buckets = match stmt.query_map(&[] as &[&dyn ToSql], |row| {
233 let opt_start_ns: Option<i64> = row.get(6)?;
234 let opt_start = match opt_start_ns {
235 Some(starttime_ns) => {
236 let seconds: i64 = (starttime_ns / 1_000_000_000) as i64;
237 let subnanos: u32 = (starttime_ns % 1_000_000_000) as u32;
238 Some(DateTime::<Utc>::from_utc(
239 NaiveDateTime::from_timestamp(seconds, subnanos),
240 Utc,
241 ))
242 }
243 None => None,
244 };
245
246 let opt_end_ns: Option<i64> = row.get(7)?;
247 let opt_end = match opt_end_ns {
248 Some(endtime_ns) => {
249 let seconds: i64 = (endtime_ns / 1_000_000_000) as i64;
250 let subnanos: u32 = (endtime_ns % 1_000_000_000) as u32;
251 Some(DateTime::<Utc>::from_utc(
252 NaiveDateTime::from_timestamp(seconds, subnanos),
253 Utc,
254 ))
255 }
256 None => None,
257 };
258
259 let data_str: String = row.get(8)?;
261 let data_json = match serde_json::from_str(&data_str) {
262 Ok(data) => data,
263 Err(e) => {
264 return Err(rusqlite::Error::InvalidColumnName(format!(
265 "Failed to parse data to JSON: {:?}",
266 e
267 )))
268 }
269 };
270
271 Ok(Bucket {
272 bid: row.get(0)?,
273 id: row.get(1)?,
274 _type: row.get(2)?,
275 client: row.get(3)?,
276 hostname: row.get(4)?,
277 created: row.get(5)?,
278 data: data_json,
279 metadata: BucketMetadata {
280 start: opt_start,
281 end: opt_end,
282 },
283 events: None,
284 last_updated: None,
285 })
286 }) {
287 Ok(buckets) => buckets,
288 Err(err) => {
289 return Err(DatastoreError::InternalError(format!(
290 "Failed to query get_stored_buckets SQL statement: {:?}",
291 err
292 )))
293 }
294 };
295 for bucket in buckets {
296 match bucket {
297 Ok(b) => {
298 self.buckets_cache.insert(b.id.clone(), b.clone());
299 }
300 Err(e) => {
301 return Err(DatastoreError::InternalError(format!(
302 "Failed to parse bucket from SQLite, database is corrupt! {:?}",
303 e
304 )))
305 }
306 }
307 }
308 Ok(())
309 }
310
311 pub fn ensure_legacy_import(&mut self, conn: &Connection) -> Result<bool, ()> {
312 use super::legacy_import::legacy_import;
313 if !self.first_init {
314 Ok(false)
315 } else {
316 self.first_init = false;
317 match legacy_import(self, &conn) {
318 Ok(_) => {
319 info!("Successfully imported legacy database");
320 self.get_stored_buckets(&conn).unwrap();
321 Ok(true)
322 }
323 Err(err) => {
324 warn!("Failed to import legacy database: {:?}", err);
325 Err(())
326 }
327 }
328 }
329 }
330
331 pub fn create_bucket(
332 &mut self,
333 conn: &Connection,
334 mut bucket: Bucket,
335 ) -> Result<(), DatastoreError> {
336 bucket.created = match bucket.created {
337 Some(created) => Some(created),
338 None => Some(Utc::now()),
339 };
340 let mut stmt = match conn.prepare(
341 "
342 INSERT INTO buckets (name, type, client, hostname, created, data)
343 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
344 ) {
345 Ok(buckets) => buckets,
346 Err(err) => {
347 return Err(DatastoreError::InternalError(format!(
348 "Failed to prepare create_bucket SQL statement: {}",
349 err.to_string()
350 )))
351 }
352 };
353 let data = serde_json::to_string(&bucket.data).unwrap();
354 let res = stmt.execute(&[
355 &bucket.id,
356 &bucket._type,
357 &bucket.client,
358 &bucket.hostname,
359 &bucket.created as &dyn ToSql,
360 &data,
361 ]);
362
363 match res {
364 Ok(_) => {
365 info!("Created bucket {}", bucket.id);
366 let rowid: i64 = conn.last_insert_rowid();
368 bucket.bid = Some(rowid);
369 let events = bucket.events;
371 bucket.events = None;
372 self.buckets_cache.insert(bucket.id.clone(), bucket.clone());
374 if let Some(events) = events {
376 self.insert_events(conn, &bucket.id, events.take_inner())?;
377 bucket.events = None;
378 }
379 Ok(())
380 }
381 Err(err) => match err {
383 rusqlite::Error::SqliteFailure { 0: sqlerr, 1: _ } => match sqlerr.code {
384 rusqlite::ErrorCode::ConstraintViolation => {
385 Err(DatastoreError::BucketAlreadyExists(bucket.id.to_string()))
386 }
387 _ => Err(DatastoreError::InternalError(format!(
388 "Failed to execute create_bucket SQL statement: {}",
389 err
390 ))),
391 },
392 _ => Err(DatastoreError::InternalError(format!(
393 "Failed to execute create_bucket SQL statement: {}",
394 err
395 ))),
396 },
397 }
398 }
399
400 pub fn delete_bucket(
401 &mut self,
402 conn: &Connection,
403 bucket_id: &str,
404 ) -> Result<(), DatastoreError> {
405 let bucket = (self.get_bucket(&bucket_id))?;
406 match conn.execute("DELETE FROM events WHERE bucketrow = ?1", &[&bucket.bid]) {
408 Ok(_) => (),
409 Err(err) => return Err(DatastoreError::InternalError(err.to_string())),
410 }
411 match conn.execute("DELETE FROM buckets WHERE id = ?1", &[&bucket.bid]) {
413 Ok(_) => {
414 self.buckets_cache.remove(bucket_id);
415 Ok(())
416 }
417 Err(err) => match err {
418 rusqlite::Error::SqliteFailure { 0: sqlerr, 1: _ } => match sqlerr.code {
419 rusqlite::ErrorCode::ConstraintViolation => {
420 Err(DatastoreError::BucketAlreadyExists(bucket_id.to_string()))
421 }
422 _ => Err(DatastoreError::InternalError(err.to_string())),
423 },
424 _ => Err(DatastoreError::InternalError(err.to_string())),
425 },
426 }
427 }
428
429 pub fn get_bucket(&self, bucket_id: &str) -> Result<Bucket, DatastoreError> {
430 let cached_bucket = self.buckets_cache.get(bucket_id);
431 match cached_bucket {
432 Some(bucket) => Ok(bucket.clone()),
433 None => Err(DatastoreError::NoSuchBucket(bucket_id.to_string())),
434 }
435 }
436
437 pub fn get_buckets(&self) -> HashMap<String, Bucket> {
438 self.buckets_cache.clone()
439 }
440
441 pub fn insert_events(
442 &mut self,
443 conn: &Connection,
444 bucket_id: &str,
445 mut events: Vec<Event>,
446 ) -> Result<Vec<Event>, DatastoreError> {
447 let mut bucket = self.get_bucket(&bucket_id)?;
448
449 let mut stmt = match conn.prepare(
450 "
451 INSERT OR REPLACE INTO events(bucketrow, id, starttime, endtime, data)
452 VALUES (?1, ?2, ?3, ?4, ?5)",
453 ) {
454 Ok(stmt) => stmt,
455 Err(err) => {
456 return Err(DatastoreError::InternalError(format!(
457 "Failed to prepare insert_events SQL statement: {}",
458 err
459 )))
460 }
461 };
462 for event in &mut events {
463 let starttime_nanos = event.timestamp.timestamp_nanos();
464 let duration_nanos = match event.duration.num_nanoseconds() {
465 Some(nanos) => nanos,
466 None => {
467 return Err(DatastoreError::InternalError(
468 "Failed to convert duration to nanoseconds".to_string(),
469 ))
470 }
471 };
472 let endtime_nanos = starttime_nanos + duration_nanos;
473 let data = serde_json::to_string(&event.data).unwrap();
474 let res = stmt.execute(&[
475 &bucket.bid.unwrap(),
476 &event.id as &dyn ToSql,
477 &starttime_nanos,
478 &endtime_nanos,
479 &data as &dyn ToSql,
480 ]);
481 match res {
482 Ok(_) => {
483 self.update_endtime(&mut bucket, &event);
484 let rowid = conn.last_insert_rowid();
485 event.id = Some(rowid);
486 }
487 Err(err) => {
488 return Err(DatastoreError::InternalError(format!(
489 "Failed to insert event: {:?}, {}",
490 event, err
491 )));
492 }
493 };
494 }
495 Ok(events)
496 }
497
498 pub fn delete_events_by_id(
499 &self,
500 conn: &Connection,
501 bucket_id: &str,
502 event_ids: Vec<i64>,
503 ) -> Result<(), DatastoreError> {
504 let bucket = self.get_bucket(&bucket_id)?;
505 let mut stmt = match conn.prepare(
506 "
507 DELETE FROM events
508 WHERE bucketrow = ?1 AND id = ?2",
509 ) {
510 Ok(stmt) => stmt,
511 Err(err) => {
512 return Err(DatastoreError::InternalError(format!(
513 "Failed to prepare insert_events SQL statement: {}",
514 err
515 )))
516 }
517 };
518 for id in event_ids {
519 let res = stmt.execute(&[&bucket.bid.unwrap(), &id as &dyn ToSql]);
520 match res {
521 Ok(_) => {}
522 Err(err) => {
523 return Err(DatastoreError::InternalError(format!(
524 "Failed to delete event with id {} in bucket {}: {:?}",
525 id, bucket_id, err
526 )));
527 }
528 };
529 }
530 Ok(())
531 }
532
533 fn update_endtime(&mut self, bucket: &mut Bucket, event: &Event) {
536 let mut update = false;
537 match bucket.metadata.start {
539 None => {
540 bucket.metadata.start = Some(event.timestamp);
541 update = true;
542 }
543 Some(current_start) => {
544 if current_start > event.timestamp {
545 bucket.metadata.start = Some(event.timestamp);
546 update = true;
547 }
548 }
549 }
550 let event_endtime = event.calculate_endtime();
552 match bucket.metadata.end {
553 None => {
554 bucket.metadata.end = Some(event_endtime);
555 update = true;
556 }
557 Some(current_end) => {
558 if current_end < event_endtime {
559 bucket.metadata.end = Some(event_endtime);
560 update = true;
561 }
562 }
563 }
564 if update {
566 self.buckets_cache.insert(bucket.id.clone(), bucket.clone());
567 }
568 }
569
570 pub fn replace_last_event(
571 &mut self,
572 conn: &Connection,
573 bucket_id: &str,
574 event: &Event,
575 ) -> Result<(), DatastoreError> {
576 let mut bucket = self.get_bucket(&bucket_id)?;
577
578 let mut stmt = match conn.prepare(
579 "
580 UPDATE events
581 SET starttime = ?2, endtime = ?3, data = ?4
582 WHERE bucketrow = ?1
583 AND endtime = (SELECT max(endtime) FROM events WHERE bucketrow = ?1)
584 ",
585 ) {
586 Ok(stmt) => stmt,
587 Err(err) => {
588 return Err(DatastoreError::InternalError(format!(
589 "Failed to prepare replace_last_event SQL statement: {}",
590 err
591 )))
592 }
593 };
594 let starttime_nanos = event.timestamp.timestamp_nanos();
595 let duration_nanos = match event.duration.num_nanoseconds() {
596 Some(nanos) => nanos,
597 None => {
598 return Err(DatastoreError::InternalError(
599 "Failed to convert duration to nanoseconds".to_string(),
600 ))
601 }
602 };
603 let endtime_nanos = starttime_nanos + duration_nanos;
604 let data = serde_json::to_string(&event.data).unwrap();
605 match stmt.execute(&[
606 &bucket.bid.unwrap(),
607 &starttime_nanos,
608 &endtime_nanos,
609 &data as &dyn ToSql,
610 ]) {
611 Ok(_) => self.update_endtime(&mut bucket, event),
612 Err(err) => {
613 return Err(DatastoreError::InternalError(format!(
614 "Failed to execute replace_last_event SQL statement: {}",
615 err
616 )))
617 }
618 };
619 Ok(())
620 }
621
622 pub fn heartbeat(
623 &mut self,
624 conn: &Connection,
625 bucket_id: &str,
626 heartbeat: Event,
627 pulsetime: f64,
628 last_heartbeat: &mut HashMap<String, Option<Event>>,
629 ) -> Result<Event, DatastoreError> {
630 self.get_bucket(&bucket_id)?;
631 if !last_heartbeat.contains_key(bucket_id) {
632 last_heartbeat.insert(bucket_id.to_string(), None);
633 }
634 let last_event = match last_heartbeat.remove(bucket_id).unwrap() {
635 Some(last_event) => last_event,
637 None => {
638 let mut last_event_vec = self.get_events(conn, &bucket_id, None, None, Some(1))?;
640 match last_event_vec.pop() {
641 Some(last_event) => last_event,
642 None => {
643 self.insert_events(conn, &bucket_id, vec![heartbeat.clone()])?;
645 return Ok(heartbeat);
646 }
647 }
648 }
649 };
650 let inserted_heartbeat = match dw_transform::heartbeat(&last_event, &heartbeat, pulsetime) {
651 Some(merged_heartbeat) => {
652 self.replace_last_event(conn, &bucket_id, &merged_heartbeat)?;
653 merged_heartbeat
654 }
655 None => {
656 debug!("Failed to merge heartbeat!");
657 self.insert_events(conn, &bucket_id, vec![heartbeat.clone()])?;
658 heartbeat
659 }
660 };
661 last_heartbeat.insert(bucket_id.to_string(), Some(inserted_heartbeat.clone()));
662 Ok(inserted_heartbeat)
663 }
664
665 pub fn get_events(
666 &mut self,
667 conn: &Connection,
668 bucket_id: &str,
669 starttime_opt: Option<DateTime<Utc>>,
670 endtime_opt: Option<DateTime<Utc>>,
671 limit_opt: Option<u64>,
672 ) -> Result<Vec<Event>, DatastoreError> {
673 let bucket = self.get_bucket(&bucket_id)?;
674
675 let mut list = Vec::new();
676
677 let starttime_filter_ns: i64 = match starttime_opt {
678 Some(dt) => dt.timestamp_nanos(),
679 None => 0,
680 };
681 let endtime_filter_ns = match endtime_opt {
682 Some(dt) => dt.timestamp_nanos() as i64,
683 None => std::i64::MAX,
684 };
685 if starttime_filter_ns > endtime_filter_ns {
686 warn!("Starttime in event query was lower than endtime!");
687 return Ok(list);
688 }
689 let limit = match limit_opt {
690 Some(l) => l as i64,
691 None => -1,
692 };
693
694 let mut stmt = match conn.prepare(
695 "
696 SELECT id, starttime, endtime, data
697 FROM events
698 WHERE bucketrow = ?1
699 AND endtime >= ?2
700 AND starttime <= ?3
701 ORDER BY starttime DESC
702 LIMIT ?4
703 ;",
704 ) {
705 Ok(stmt) => stmt,
706 Err(err) => {
707 return Err(DatastoreError::InternalError(format!(
708 "Failed to prepare get_events SQL statement: {}",
709 err
710 )))
711 }
712 };
713
714 let rows = match stmt.query_map(
715 &[
716 &bucket.bid.unwrap(),
717 &starttime_filter_ns,
718 &endtime_filter_ns,
719 &limit,
720 ],
721 |row| {
722 let id = row.get(0)?;
723 let mut starttime_ns: i64 = row.get(1)?;
724 let mut endtime_ns: i64 = row.get(2)?;
725 let data_str: String = row.get(3)?;
726
727 if starttime_ns < starttime_filter_ns {
728 starttime_ns = starttime_filter_ns
729 }
730 if endtime_ns > endtime_filter_ns {
731 endtime_ns = endtime_filter_ns
732 }
733 let duration_ns = endtime_ns - starttime_ns;
734
735 let time_seconds: i64 = (starttime_ns / 1_000_000_000) as i64;
736 let time_subnanos: u32 = (starttime_ns % 1_000_000_000) as u32;
737 let data: serde_json::map::Map<String, Value> =
738 serde_json::from_str(&data_str).unwrap();
739
740 Ok(Event {
741 id: Some(id),
742 timestamp: DateTime::<Utc>::from_utc(
743 NaiveDateTime::from_timestamp(time_seconds, time_subnanos),
744 Utc,
745 ),
746 duration: Duration::nanoseconds(duration_ns),
747 data,
748 })
749 },
750 ) {
751 Ok(rows) => rows,
752 Err(err) => {
753 return Err(DatastoreError::InternalError(format!(
754 "Failed to map get_events SQL statement: {}",
755 err
756 )))
757 }
758 };
759 for row in rows {
760 match row {
761 Ok(event) => list.push(event),
762 Err(err) => warn!("Corrupt event in bucket {}: {}", bucket_id, err),
763 };
764 }
765
766 Ok(list)
767 }
768
769 pub fn get_event_count(
770 &self,
771 conn: &Connection,
772 bucket_id: &str,
773 starttime_opt: Option<DateTime<Utc>>,
774 endtime_opt: Option<DateTime<Utc>>,
775 ) -> Result<i64, DatastoreError> {
776 let bucket = self.get_bucket(&bucket_id)?;
777
778 let starttime_filter_ns = match starttime_opt {
779 Some(dt) => dt.timestamp_nanos() as i64,
780 None => 0,
781 };
782 let endtime_filter_ns = match endtime_opt {
783 Some(dt) => dt.timestamp_nanos() as i64,
784 None => std::i64::MAX,
785 };
786 if starttime_filter_ns >= endtime_filter_ns {
787 warn!("Endtime in event query was same or lower than starttime!");
788 return Ok(0);
789 }
790
791 let mut stmt = match conn.prepare(
792 "
793 SELECT count(*) FROM events
794 WHERE bucketrow = ?1
795 AND endtime >= ?2
796 AND starttime <= ?3",
797 ) {
798 Ok(stmt) => stmt,
799 Err(err) => {
800 return Err(DatastoreError::InternalError(format!(
801 "Failed to prepare get_event_count SQL statement: {}",
802 err
803 )))
804 }
805 };
806
807 let count = match stmt.query_row(
808 &[
809 &bucket.bid.unwrap(),
810 &starttime_filter_ns,
811 &endtime_filter_ns,
812 ],
813 |row| row.get(0),
814 ) {
815 Ok(count) => count,
816 Err(err) => {
817 return Err(DatastoreError::InternalError(format!(
818 "Failed to query get_event_count SQL statement: {}",
819 err
820 )))
821 }
822 };
823
824 Ok(count)
825 }
826
827 pub fn insert_key_value(
828 &self,
829 conn: &Connection,
830 key: &str,
831 data: &str,
832 ) -> Result<(), DatastoreError> {
833 let mut stmt = match conn.prepare(
834 "
835 INSERT OR REPLACE INTO key_value(key, value, last_modified)
836 VALUES (?1, ?2, ?3)",
837 ) {
838 Ok(stmt) => stmt,
839 Err(err) => {
840 return Err(DatastoreError::InternalError(format!(
841 "Failed to prepare insert_value SQL statement: {}",
842 err
843 )))
844 }
845 };
846 let timestamp = Utc::now().timestamp();
847 #[allow(clippy::expect_fun_call)]
848 stmt.execute(params![key, data, ×tamp])
849 .expect(&format!("Failed to insert key-value pair: {}", key));
850 Ok(())
851 }
852
853 pub fn delete_key_value(&self, conn: &Connection, key: &str) -> Result<(), DatastoreError> {
854 conn.execute("DELETE FROM key_value WHERE key = ?1", &[key])
855 .expect("Error deleting value from database");
856 Ok(())
857 }
858
859 pub fn get_key_value(&self, conn: &Connection, key: &str) -> Result<KeyValue, DatastoreError> {
860 let mut stmt = match conn.prepare(
861 "
862 SELECT * FROM key_value WHERE KEY = ?1",
863 ) {
864 Ok(stmt) => stmt,
865 Err(err) => {
866 return Err(DatastoreError::InternalError(format!(
867 "Failed to prepare get_value SQL statement: {}",
868 err
869 )))
870 }
871 };
872
873 match stmt.query_row(&[key], |row| {
874 Ok(KeyValue {
875 key: row.get(0)?,
876 value: row.get(1)?,
877 timestamp: Some(DateTime::from_utc(
878 NaiveDateTime::from_timestamp(row.get(2)?, 0),
879 Utc,
880 )),
881 })
882 }) {
883 Ok(result) => Ok(result),
884 Err(err) => match err {
885 rusqlite::Error::QueryReturnedNoRows => {
886 Err(DatastoreError::NoSuchKey(key.to_string()))
887 }
888 _ => Err(DatastoreError::InternalError(format!(
889 "Get value query failed for key {}",
890 key
891 ))),
892 },
893 }
894 }
895
896 pub fn get_keys_starting(
897 &self,
898 conn: &Connection,
899 pattern: &str,
900 ) -> Result<Vec<String>, DatastoreError> {
901 let mut stmt = match conn.prepare("SELECT key FROM key_value WHERE key LIKE ?") {
902 Ok(stmt) => stmt,
903 Err(err) => {
904 return Err(DatastoreError::InternalError(format!(
905 "Failed to prepare get_value SQL statement: {}",
906 err
907 )))
908 }
909 };
910
911 let mut output = Vec::<String>::new();
912 let result = stmt.query_map(&[pattern], |row| row.get::<usize, String>(0));
914 match result {
915 Ok(keys) => {
916 for row in keys {
917 output.push(row.unwrap());
920 }
921 Ok(output)
922 }
923 Err(err) => match err {
924 rusqlite::Error::QueryReturnedNoRows => {
925 Err(DatastoreError::NoSuchKey(pattern.to_string()))
926 }
927 _ => Err(DatastoreError::InternalError(format!(
928 "Failed to get key_value rows starting with pattern {}",
929 pattern
930 ))),
931 },
932 }
933 }
934}