dw_datastore/
datastore.rs

1use std::collections::HashMap;
2
3use chrono::DateTime;
4use chrono::Duration;
5use chrono::NaiveDateTime;
6use chrono::Utc;
7
8use rusqlite::Connection;
9
10use serde_json::value::Value;
11
12use dw_models::Bucket;
13use dw_models::BucketMetadata;
14use dw_models::Event;
15use dw_models::KeyValue;
16
17use rusqlite::params;
18use rusqlite::types::ToSql;
19
20use super::DatastoreError;
21
22fn _get_db_version(conn: &Connection) -> i32 {
23    conn.pragma_query_value(None, "user_version", |row| row.get(0))
24        .unwrap()
25}
26
27/*
28 * ### Database version changelog ###
29 * 0: Uninitialized database
30 * 1: Initialized database
31 * 2: Added 'data' field to 'buckets' table
32 * 3: see: https://github.com/DeskWatch/dw-server-rust/pull/52
33 * 4: Added 'key_value' table for storing key - value pairs
34 */
35static NEWEST_DB_VERSION: i32 = 4;
36
37fn _create_tables(conn: &Connection, version: i32) -> bool {
38    let mut first_init = false;
39
40    if version < 1 {
41        first_init = true;
42        _migrate_v0_to_v1(conn);
43    }
44
45    if version < 2 {
46        _migrate_v1_to_v2(conn);
47    }
48
49    if version < 3 {
50        _migrate_v2_to_v3(conn);
51    }
52
53    if version < 4 {
54        _migrate_v3_to_v4(conn);
55    }
56
57    first_init
58}
59
60fn _migrate_v0_to_v1(conn: &Connection) {
61    /* Set up bucket table */
62    conn.execute(
63        "
64        CREATE TABLE IF NOT EXISTS buckets (
65            id INTEGER PRIMARY KEY AUTOINCREMENT,
66            name TEXT UNIQUE NOT NULL,
67            type TEXT NOT NULL,
68            client TEXT NOT NULL,
69            hostname TEXT NOT NULL,
70            created TEXT NOT NULL
71        )",
72        &[] as &[&dyn ToSql],
73    )
74    .expect("Failed to create buckets table");
75
76    /* Set up index for bucket table */
77    conn.execute(
78        "CREATE INDEX IF NOT EXISTS bucket_id_index ON buckets(id)",
79        &[] as &[&dyn ToSql],
80    )
81    .expect("Failed to create buckets index");
82
83    /* Set up events table */
84    conn.execute(
85        "
86        CREATE TABLE IF NOT EXISTS events (
87            id INTEGER PRIMARY KEY AUTOINCREMENT,
88            bucketrow INTEGER NOT NULL,
89            starttime INTEGER NOT NULL,
90            endtime INTEGER NOT NULL,
91            data TEXT NOT NULL,
92            FOREIGN KEY (bucketrow) REFERENCES buckets(id)
93        )",
94        &[] as &[&dyn ToSql],
95    )
96    .expect("Failed to create events table");
97
98    /* Set up index for events table */
99    conn.execute(
100        "CREATE INDEX IF NOT EXISTS events_bucketrow_index ON events(bucketrow)",
101        &[] as &[&dyn ToSql],
102    )
103    .expect("Failed to create events_bucketrow index");
104    conn.execute(
105        "CREATE INDEX IF NOT EXISTS events_starttime_index ON events(starttime)",
106        &[] as &[&dyn ToSql],
107    )
108    .expect("Failed to create events_starttime index");
109    conn.execute(
110        "CREATE INDEX IF NOT EXISTS events_endtime_index ON events(endtime)",
111        &[] as &[&dyn ToSql],
112    )
113    .expect("Failed to create events_endtime index");
114
115    /* Update database version */
116    conn.pragma_update(None, "user_version", &1)
117        .expect("Failed to update database version!");
118}
119
120fn _migrate_v1_to_v2(conn: &Connection) {
121    info!("Upgrading database to v2, adding data field to buckets");
122    conn.execute(
123        "ALTER TABLE buckets ADD COLUMN data TEXT DEFAULT '{}';",
124        &[] as &[&dyn ToSql],
125    )
126    .expect("Failed to upgrade database when adding data field to buckets");
127
128    conn.pragma_update(None, "user_version", &2)
129        .expect("Failed to update database version!");
130}
131
132fn _migrate_v2_to_v3(conn: &Connection) {
133    // For details about why this migration was necessary, see: https://github.com/DeskWatch/dw-server-rust/pull/52
134    info!("Upgrading database to v3, replacing the broken data field for buckets");
135
136    // Rename column, marking it as deprecated
137    match conn.execute(
138        "ALTER TABLE buckets RENAME COLUMN data TO data_deprecated;",
139        &[] as &[&dyn ToSql],
140    ) {
141        Ok(_) => (),
142        // This error is okay, it still has the intended effects
143        Err(rusqlite::Error::ExecuteReturnedResults) => (),
144        Err(e) => panic!("Unexpected error: {:?}", e),
145    };
146
147    // Create new correct column
148    conn.execute(
149        "ALTER TABLE buckets ADD COLUMN data TEXT NOT NULL DEFAULT '{}';",
150        &[] as &[&dyn ToSql],
151    )
152    .expect("Failed to upgrade database when adding new data field to buckets");
153
154    conn.pragma_update(None, "user_version", &3)
155        .expect("Failed to update database version!");
156}
157
158fn _migrate_v3_to_v4(conn: &Connection) {
159    info!("Upgrading database to v4, adding table for key-value storage");
160    conn.execute(
161        "CREATE TABLE key_value (
162        key TEXT PRIMARY KEY,
163        value TEXT,
164        last_modified NUMBER NOT NULL
165    );",
166        [],
167    )
168    .expect("Failed to upgrade db and add key-value storage table");
169
170    conn.pragma_update(None, "user_version", &4)
171        .expect("Failed to update database version!");
172}
173
174pub struct DatastoreInstance {
175    buckets_cache: HashMap<String, Bucket>,
176    first_init: bool,
177    pub db_version: i32,
178}
179
180impl DatastoreInstance {
181    pub fn new(
182        conn: &Connection,
183        migrate_enabled: bool,
184    ) -> Result<DatastoreInstance, DatastoreError> {
185        let mut first_init = false;
186        let db_version = _get_db_version(&conn);
187
188        if migrate_enabled {
189            first_init = _create_tables(&conn, db_version);
190        } else if db_version < 0 {
191            return Err(DatastoreError::Uninitialized(
192                "Tried to open an uninitialized datastore with migration disabled".to_string(),
193            ));
194        } else if db_version != NEWEST_DB_VERSION {
195            return Err(DatastoreError::OldDbVersion(format!(
196                "\
197                Tried to open an database with an incompatible database version!
198                Database has version {} while the supported version is {}",
199                db_version, NEWEST_DB_VERSION
200            )));
201        }
202
203        let mut ds = DatastoreInstance {
204            buckets_cache: HashMap::new(),
205            first_init,
206            db_version,
207        };
208        ds.get_stored_buckets(&conn)?;
209        Ok(ds)
210    }
211
212    fn get_stored_buckets(&mut self, conn: &Connection) -> Result<(), DatastoreError> {
213        let mut stmt = match conn.prepare(
214            "
215            SELECT  buckets.id, buckets.name, buckets.type, buckets.client,
216                    buckets.hostname, buckets.created,
217                    min(events.starttime), max(events.endtime),
218                    buckets.data
219            FROM buckets
220            LEFT OUTER JOIN events ON buckets.id = events.bucketrow
221            GROUP BY buckets.id
222            ;",
223        ) {
224            Ok(stmt) => stmt,
225            Err(err) => {
226                return Err(DatastoreError::InternalError(format!(
227                    "Failed to prepare get_stored_buckets SQL statement: {}",
228                    err.to_string()
229                )))
230            }
231        };
232        let buckets = match stmt.query_map(&[] as &[&dyn ToSql], |row| {
233            let opt_start_ns: Option<i64> = row.get(6)?;
234            let opt_start = match opt_start_ns {
235                Some(starttime_ns) => {
236                    let seconds: i64 = (starttime_ns / 1_000_000_000) as i64;
237                    let subnanos: u32 = (starttime_ns % 1_000_000_000) as u32;
238                    Some(DateTime::<Utc>::from_utc(
239                        NaiveDateTime::from_timestamp(seconds, subnanos),
240                        Utc,
241                    ))
242                }
243                None => None,
244            };
245
246            let opt_end_ns: Option<i64> = row.get(7)?;
247            let opt_end = match opt_end_ns {
248                Some(endtime_ns) => {
249                    let seconds: i64 = (endtime_ns / 1_000_000_000) as i64;
250                    let subnanos: u32 = (endtime_ns % 1_000_000_000) as u32;
251                    Some(DateTime::<Utc>::from_utc(
252                        NaiveDateTime::from_timestamp(seconds, subnanos),
253                        Utc,
254                    ))
255                }
256                None => None,
257            };
258
259            // If data column is not set (possible on old installations), use an empty map as default
260            let data_str: String = row.get(8)?;
261            let data_json = match serde_json::from_str(&data_str) {
262                Ok(data) => data,
263                Err(e) => {
264                    return Err(rusqlite::Error::InvalidColumnName(format!(
265                        "Failed to parse data to JSON: {:?}",
266                        e
267                    )))
268                }
269            };
270
271            Ok(Bucket {
272                bid: row.get(0)?,
273                id: row.get(1)?,
274                _type: row.get(2)?,
275                client: row.get(3)?,
276                hostname: row.get(4)?,
277                created: row.get(5)?,
278                data: data_json,
279                metadata: BucketMetadata {
280                    start: opt_start,
281                    end: opt_end,
282                },
283                events: None,
284                last_updated: None,
285            })
286        }) {
287            Ok(buckets) => buckets,
288            Err(err) => {
289                return Err(DatastoreError::InternalError(format!(
290                    "Failed to query get_stored_buckets SQL statement: {:?}",
291                    err
292                )))
293            }
294        };
295        for bucket in buckets {
296            match bucket {
297                Ok(b) => {
298                    self.buckets_cache.insert(b.id.clone(), b.clone());
299                }
300                Err(e) => {
301                    return Err(DatastoreError::InternalError(format!(
302                        "Failed to parse bucket from SQLite, database is corrupt! {:?}",
303                        e
304                    )))
305                }
306            }
307        }
308        Ok(())
309    }
310
311    pub fn ensure_legacy_import(&mut self, conn: &Connection) -> Result<bool, ()> {
312        use super::legacy_import::legacy_import;
313        if !self.first_init {
314            Ok(false)
315        } else {
316            self.first_init = false;
317            match legacy_import(self, &conn) {
318                Ok(_) => {
319                    info!("Successfully imported legacy database");
320                    self.get_stored_buckets(&conn).unwrap();
321                    Ok(true)
322                }
323                Err(err) => {
324                    warn!("Failed to import legacy database: {:?}", err);
325                    Err(())
326                }
327            }
328        }
329    }
330
331    pub fn create_bucket(
332        &mut self,
333        conn: &Connection,
334        mut bucket: Bucket,
335    ) -> Result<(), DatastoreError> {
336        bucket.created = match bucket.created {
337            Some(created) => Some(created),
338            None => Some(Utc::now()),
339        };
340        let mut stmt = match conn.prepare(
341            "
342                INSERT INTO buckets (name, type, client, hostname, created, data)
343                VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
344        ) {
345            Ok(buckets) => buckets,
346            Err(err) => {
347                return Err(DatastoreError::InternalError(format!(
348                    "Failed to prepare create_bucket SQL statement: {}",
349                    err.to_string()
350                )))
351            }
352        };
353        let data = serde_json::to_string(&bucket.data).unwrap();
354        let res = stmt.execute(&[
355            &bucket.id,
356            &bucket._type,
357            &bucket.client,
358            &bucket.hostname,
359            &bucket.created as &dyn ToSql,
360            &data,
361        ]);
362
363        match res {
364            Ok(_) => {
365                info!("Created bucket {}", bucket.id);
366                // Get and set rowid
367                let rowid: i64 = conn.last_insert_rowid();
368                bucket.bid = Some(rowid);
369                // Take out events from struct before caching
370                let events = bucket.events;
371                bucket.events = None;
372                // Cache bucket
373                self.buckets_cache.insert(bucket.id.clone(), bucket.clone());
374                // Insert events
375                if let Some(events) = events {
376                    self.insert_events(conn, &bucket.id, events.take_inner())?;
377                    bucket.events = None;
378                }
379                Ok(())
380            }
381            // FIXME: This match is ugly, is it possible to write it in a cleaner way?
382            Err(err) => match err {
383                rusqlite::Error::SqliteFailure { 0: sqlerr, 1: _ } => match sqlerr.code {
384                    rusqlite::ErrorCode::ConstraintViolation => {
385                        Err(DatastoreError::BucketAlreadyExists(bucket.id.to_string()))
386                    }
387                    _ => Err(DatastoreError::InternalError(format!(
388                        "Failed to execute create_bucket SQL statement: {}",
389                        err
390                    ))),
391                },
392                _ => Err(DatastoreError::InternalError(format!(
393                    "Failed to execute create_bucket SQL statement: {}",
394                    err
395                ))),
396            },
397        }
398    }
399
400    pub fn delete_bucket(
401        &mut self,
402        conn: &Connection,
403        bucket_id: &str,
404    ) -> Result<(), DatastoreError> {
405        let bucket = (self.get_bucket(&bucket_id))?;
406        // Delete all events in bucket
407        match conn.execute("DELETE FROM events WHERE bucketrow = ?1", &[&bucket.bid]) {
408            Ok(_) => (),
409            Err(err) => return Err(DatastoreError::InternalError(err.to_string())),
410        }
411        // Delete bucket itself
412        match conn.execute("DELETE FROM buckets WHERE id = ?1", &[&bucket.bid]) {
413            Ok(_) => {
414                self.buckets_cache.remove(bucket_id);
415                Ok(())
416            }
417            Err(err) => match err {
418                rusqlite::Error::SqliteFailure { 0: sqlerr, 1: _ } => match sqlerr.code {
419                    rusqlite::ErrorCode::ConstraintViolation => {
420                        Err(DatastoreError::BucketAlreadyExists(bucket_id.to_string()))
421                    }
422                    _ => Err(DatastoreError::InternalError(err.to_string())),
423                },
424                _ => Err(DatastoreError::InternalError(err.to_string())),
425            },
426        }
427    }
428
429    pub fn get_bucket(&self, bucket_id: &str) -> Result<Bucket, DatastoreError> {
430        let cached_bucket = self.buckets_cache.get(bucket_id);
431        match cached_bucket {
432            Some(bucket) => Ok(bucket.clone()),
433            None => Err(DatastoreError::NoSuchBucket(bucket_id.to_string())),
434        }
435    }
436
437    pub fn get_buckets(&self) -> HashMap<String, Bucket> {
438        self.buckets_cache.clone()
439    }
440
441    pub fn insert_events(
442        &mut self,
443        conn: &Connection,
444        bucket_id: &str,
445        mut events: Vec<Event>,
446    ) -> Result<Vec<Event>, DatastoreError> {
447        let mut bucket = self.get_bucket(&bucket_id)?;
448
449        let mut stmt = match conn.prepare(
450            "
451                INSERT OR REPLACE INTO events(bucketrow, id, starttime, endtime, data)
452                VALUES (?1, ?2, ?3, ?4, ?5)",
453        ) {
454            Ok(stmt) => stmt,
455            Err(err) => {
456                return Err(DatastoreError::InternalError(format!(
457                    "Failed to prepare insert_events SQL statement: {}",
458                    err
459                )))
460            }
461        };
462        for event in &mut events {
463            let starttime_nanos = event.timestamp.timestamp_nanos();
464            let duration_nanos = match event.duration.num_nanoseconds() {
465                Some(nanos) => nanos,
466                None => {
467                    return Err(DatastoreError::InternalError(
468                        "Failed to convert duration to nanoseconds".to_string(),
469                    ))
470                }
471            };
472            let endtime_nanos = starttime_nanos + duration_nanos;
473            let data = serde_json::to_string(&event.data).unwrap();
474            let res = stmt.execute(&[
475                &bucket.bid.unwrap(),
476                &event.id as &dyn ToSql,
477                &starttime_nanos,
478                &endtime_nanos,
479                &data as &dyn ToSql,
480            ]);
481            match res {
482                Ok(_) => {
483                    self.update_endtime(&mut bucket, &event);
484                    let rowid = conn.last_insert_rowid();
485                    event.id = Some(rowid);
486                }
487                Err(err) => {
488                    return Err(DatastoreError::InternalError(format!(
489                        "Failed to insert event: {:?}, {}",
490                        event, err
491                    )));
492                }
493            };
494        }
495        Ok(events)
496    }
497
498    pub fn delete_events_by_id(
499        &self,
500        conn: &Connection,
501        bucket_id: &str,
502        event_ids: Vec<i64>,
503    ) -> Result<(), DatastoreError> {
504        let bucket = self.get_bucket(&bucket_id)?;
505        let mut stmt = match conn.prepare(
506            "
507                DELETE FROM events
508                WHERE bucketrow = ?1 AND id = ?2",
509        ) {
510            Ok(stmt) => stmt,
511            Err(err) => {
512                return Err(DatastoreError::InternalError(format!(
513                    "Failed to prepare insert_events SQL statement: {}",
514                    err
515                )))
516            }
517        };
518        for id in event_ids {
519            let res = stmt.execute(&[&bucket.bid.unwrap(), &id as &dyn ToSql]);
520            match res {
521                Ok(_) => {}
522                Err(err) => {
523                    return Err(DatastoreError::InternalError(format!(
524                        "Failed to delete event with id {} in bucket {}: {:?}",
525                        id, bucket_id, err
526                    )));
527                }
528            };
529        }
530        Ok(())
531    }
532
533    // TODO: Function for deleteing events by timerange with limit
534
535    fn update_endtime(&mut self, bucket: &mut Bucket, event: &Event) {
536        let mut update = false;
537        /* Potentially update start */
538        match bucket.metadata.start {
539            None => {
540                bucket.metadata.start = Some(event.timestamp);
541                update = true;
542            }
543            Some(current_start) => {
544                if current_start > event.timestamp {
545                    bucket.metadata.start = Some(event.timestamp);
546                    update = true;
547                }
548            }
549        }
550        /* Potentially update end */
551        let event_endtime = event.calculate_endtime();
552        match bucket.metadata.end {
553            None => {
554                bucket.metadata.end = Some(event_endtime);
555                update = true;
556            }
557            Some(current_end) => {
558                if current_end < event_endtime {
559                    bucket.metadata.end = Some(event_endtime);
560                    update = true;
561                }
562            }
563        }
564        /* Update buchets_cache if start or end has been updated */
565        if update {
566            self.buckets_cache.insert(bucket.id.clone(), bucket.clone());
567        }
568    }
569
570    pub fn replace_last_event(
571        &mut self,
572        conn: &Connection,
573        bucket_id: &str,
574        event: &Event,
575    ) -> Result<(), DatastoreError> {
576        let mut bucket = self.get_bucket(&bucket_id)?;
577
578        let mut stmt = match conn.prepare(
579            "
580                UPDATE events
581                SET starttime = ?2, endtime = ?3, data = ?4
582                WHERE bucketrow = ?1
583                    AND endtime = (SELECT max(endtime) FROM events WHERE bucketrow = ?1)
584            ",
585        ) {
586            Ok(stmt) => stmt,
587            Err(err) => {
588                return Err(DatastoreError::InternalError(format!(
589                    "Failed to prepare replace_last_event SQL statement: {}",
590                    err
591                )))
592            }
593        };
594        let starttime_nanos = event.timestamp.timestamp_nanos();
595        let duration_nanos = match event.duration.num_nanoseconds() {
596            Some(nanos) => nanos,
597            None => {
598                return Err(DatastoreError::InternalError(
599                    "Failed to convert duration to nanoseconds".to_string(),
600                ))
601            }
602        };
603        let endtime_nanos = starttime_nanos + duration_nanos;
604        let data = serde_json::to_string(&event.data).unwrap();
605        match stmt.execute(&[
606            &bucket.bid.unwrap(),
607            &starttime_nanos,
608            &endtime_nanos,
609            &data as &dyn ToSql,
610        ]) {
611            Ok(_) => self.update_endtime(&mut bucket, event),
612            Err(err) => {
613                return Err(DatastoreError::InternalError(format!(
614                    "Failed to execute replace_last_event SQL statement: {}",
615                    err
616                )))
617            }
618        };
619        Ok(())
620    }
621
622    pub fn heartbeat(
623        &mut self,
624        conn: &Connection,
625        bucket_id: &str,
626        heartbeat: Event,
627        pulsetime: f64,
628        last_heartbeat: &mut HashMap<String, Option<Event>>,
629    ) -> Result<Event, DatastoreError> {
630        self.get_bucket(&bucket_id)?;
631        if !last_heartbeat.contains_key(bucket_id) {
632            last_heartbeat.insert(bucket_id.to_string(), None);
633        }
634        let last_event = match last_heartbeat.remove(bucket_id).unwrap() {
635            // last heartbeat is in cache
636            Some(last_event) => last_event,
637            None => {
638                // last heartbeat was not in cache, fetch from DB
639                let mut last_event_vec = self.get_events(conn, &bucket_id, None, None, Some(1))?;
640                match last_event_vec.pop() {
641                    Some(last_event) => last_event,
642                    None => {
643                        // There was no last event, insert and return
644                        self.insert_events(conn, &bucket_id, vec![heartbeat.clone()])?;
645                        return Ok(heartbeat);
646                    }
647                }
648            }
649        };
650        let inserted_heartbeat = match dw_transform::heartbeat(&last_event, &heartbeat, pulsetime) {
651            Some(merged_heartbeat) => {
652                self.replace_last_event(conn, &bucket_id, &merged_heartbeat)?;
653                merged_heartbeat
654            }
655            None => {
656                debug!("Failed to merge heartbeat!");
657                self.insert_events(conn, &bucket_id, vec![heartbeat.clone()])?;
658                heartbeat
659            }
660        };
661        last_heartbeat.insert(bucket_id.to_string(), Some(inserted_heartbeat.clone()));
662        Ok(inserted_heartbeat)
663    }
664
665    pub fn get_events(
666        &mut self,
667        conn: &Connection,
668        bucket_id: &str,
669        starttime_opt: Option<DateTime<Utc>>,
670        endtime_opt: Option<DateTime<Utc>>,
671        limit_opt: Option<u64>,
672    ) -> Result<Vec<Event>, DatastoreError> {
673        let bucket = self.get_bucket(&bucket_id)?;
674
675        let mut list = Vec::new();
676
677        let starttime_filter_ns: i64 = match starttime_opt {
678            Some(dt) => dt.timestamp_nanos(),
679            None => 0,
680        };
681        let endtime_filter_ns = match endtime_opt {
682            Some(dt) => dt.timestamp_nanos() as i64,
683            None => std::i64::MAX,
684        };
685        if starttime_filter_ns > endtime_filter_ns {
686            warn!("Starttime in event query was lower than endtime!");
687            return Ok(list);
688        }
689        let limit = match limit_opt {
690            Some(l) => l as i64,
691            None => -1,
692        };
693
694        let mut stmt = match conn.prepare(
695            "
696                SELECT id, starttime, endtime, data
697                FROM events
698                WHERE bucketrow = ?1
699                    AND endtime >= ?2
700                    AND starttime <= ?3
701                ORDER BY starttime DESC
702                LIMIT ?4
703            ;",
704        ) {
705            Ok(stmt) => stmt,
706            Err(err) => {
707                return Err(DatastoreError::InternalError(format!(
708                    "Failed to prepare get_events SQL statement: {}",
709                    err
710                )))
711            }
712        };
713
714        let rows = match stmt.query_map(
715            &[
716                &bucket.bid.unwrap(),
717                &starttime_filter_ns,
718                &endtime_filter_ns,
719                &limit,
720            ],
721            |row| {
722                let id = row.get(0)?;
723                let mut starttime_ns: i64 = row.get(1)?;
724                let mut endtime_ns: i64 = row.get(2)?;
725                let data_str: String = row.get(3)?;
726
727                if starttime_ns < starttime_filter_ns {
728                    starttime_ns = starttime_filter_ns
729                }
730                if endtime_ns > endtime_filter_ns {
731                    endtime_ns = endtime_filter_ns
732                }
733                let duration_ns = endtime_ns - starttime_ns;
734
735                let time_seconds: i64 = (starttime_ns / 1_000_000_000) as i64;
736                let time_subnanos: u32 = (starttime_ns % 1_000_000_000) as u32;
737                let data: serde_json::map::Map<String, Value> =
738                    serde_json::from_str(&data_str).unwrap();
739
740                Ok(Event {
741                    id: Some(id),
742                    timestamp: DateTime::<Utc>::from_utc(
743                        NaiveDateTime::from_timestamp(time_seconds, time_subnanos),
744                        Utc,
745                    ),
746                    duration: Duration::nanoseconds(duration_ns),
747                    data,
748                })
749            },
750        ) {
751            Ok(rows) => rows,
752            Err(err) => {
753                return Err(DatastoreError::InternalError(format!(
754                    "Failed to map get_events SQL statement: {}",
755                    err
756                )))
757            }
758        };
759        for row in rows {
760            match row {
761                Ok(event) => list.push(event),
762                Err(err) => warn!("Corrupt event in bucket {}: {}", bucket_id, err),
763            };
764        }
765
766        Ok(list)
767    }
768
769    pub fn get_event_count(
770        &self,
771        conn: &Connection,
772        bucket_id: &str,
773        starttime_opt: Option<DateTime<Utc>>,
774        endtime_opt: Option<DateTime<Utc>>,
775    ) -> Result<i64, DatastoreError> {
776        let bucket = self.get_bucket(&bucket_id)?;
777
778        let starttime_filter_ns = match starttime_opt {
779            Some(dt) => dt.timestamp_nanos() as i64,
780            None => 0,
781        };
782        let endtime_filter_ns = match endtime_opt {
783            Some(dt) => dt.timestamp_nanos() as i64,
784            None => std::i64::MAX,
785        };
786        if starttime_filter_ns >= endtime_filter_ns {
787            warn!("Endtime in event query was same or lower than starttime!");
788            return Ok(0);
789        }
790
791        let mut stmt = match conn.prepare(
792            "
793            SELECT count(*) FROM events
794            WHERE bucketrow = ?1
795                AND endtime >= ?2
796                AND starttime <= ?3",
797        ) {
798            Ok(stmt) => stmt,
799            Err(err) => {
800                return Err(DatastoreError::InternalError(format!(
801                    "Failed to prepare get_event_count SQL statement: {}",
802                    err
803                )))
804            }
805        };
806
807        let count = match stmt.query_row(
808            &[
809                &bucket.bid.unwrap(),
810                &starttime_filter_ns,
811                &endtime_filter_ns,
812            ],
813            |row| row.get(0),
814        ) {
815            Ok(count) => count,
816            Err(err) => {
817                return Err(DatastoreError::InternalError(format!(
818                    "Failed to query get_event_count SQL statement: {}",
819                    err
820                )))
821            }
822        };
823
824        Ok(count)
825    }
826
827    pub fn insert_key_value(
828        &self,
829        conn: &Connection,
830        key: &str,
831        data: &str,
832    ) -> Result<(), DatastoreError> {
833        let mut stmt = match conn.prepare(
834            "
835                INSERT OR REPLACE INTO key_value(key, value, last_modified)
836                VALUES (?1, ?2, ?3)",
837        ) {
838            Ok(stmt) => stmt,
839            Err(err) => {
840                return Err(DatastoreError::InternalError(format!(
841                    "Failed to prepare insert_value SQL statement: {}",
842                    err
843                )))
844            }
845        };
846        let timestamp = Utc::now().timestamp();
847        #[allow(clippy::expect_fun_call)]
848        stmt.execute(params![key, data, &timestamp])
849            .expect(&format!("Failed to insert key-value pair: {}", key));
850        Ok(())
851    }
852
853    pub fn delete_key_value(&self, conn: &Connection, key: &str) -> Result<(), DatastoreError> {
854        conn.execute("DELETE FROM key_value WHERE key = ?1", &[key])
855            .expect("Error deleting value from database");
856        Ok(())
857    }
858
859    pub fn get_key_value(&self, conn: &Connection, key: &str) -> Result<KeyValue, DatastoreError> {
860        let mut stmt = match conn.prepare(
861            "
862                SELECT * FROM key_value WHERE KEY = ?1",
863        ) {
864            Ok(stmt) => stmt,
865            Err(err) => {
866                return Err(DatastoreError::InternalError(format!(
867                    "Failed to prepare get_value SQL statement: {}",
868                    err
869                )))
870            }
871        };
872
873        match stmt.query_row(&[key], |row| {
874            Ok(KeyValue {
875                key: row.get(0)?,
876                value: row.get(1)?,
877                timestamp: Some(DateTime::from_utc(
878                    NaiveDateTime::from_timestamp(row.get(2)?, 0),
879                    Utc,
880                )),
881            })
882        }) {
883            Ok(result) => Ok(result),
884            Err(err) => match err {
885                rusqlite::Error::QueryReturnedNoRows => {
886                    Err(DatastoreError::NoSuchKey(key.to_string()))
887                }
888                _ => Err(DatastoreError::InternalError(format!(
889                    "Get value query failed for key {}",
890                    key
891                ))),
892            },
893        }
894    }
895
896    pub fn get_keys_starting(
897        &self,
898        conn: &Connection,
899        pattern: &str,
900    ) -> Result<Vec<String>, DatastoreError> {
901        let mut stmt = match conn.prepare("SELECT key FROM key_value WHERE key LIKE ?") {
902            Ok(stmt) => stmt,
903            Err(err) => {
904                return Err(DatastoreError::InternalError(format!(
905                    "Failed to prepare get_value SQL statement: {}",
906                    err
907                )))
908            }
909        };
910
911        let mut output = Vec::<String>::new();
912        // Rusqlite's get wants index and item type as parameters.
913        let result = stmt.query_map(&[pattern], |row| row.get::<usize, String>(0));
914        match result {
915            Ok(keys) => {
916                for row in keys {
917                    // Unwrap to String or panic on SQL row if type is invalid. Can't happen with a
918                    // properly initialized table.
919                    output.push(row.unwrap());
920                }
921                Ok(output)
922            }
923            Err(err) => match err {
924                rusqlite::Error::QueryReturnedNoRows => {
925                    Err(DatastoreError::NoSuchKey(pattern.to_string()))
926                }
927                _ => Err(DatastoreError::InternalError(format!(
928                    "Failed to get key_value rows starting with pattern {}",
929                    pattern
930                ))),
931            },
932        }
933    }
934}