Skip to main content

astarte_device_sdk/store/sqlite/
mod.rs

1// This file is part of Astarte.
2//
3// Copyright 2023-2026 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19//! Provides functionality for instantiating an Astarte sqlite database.
20
21use std::fmt::Debug;
22use std::num::NonZero;
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25use std::time::Duration;
26
27use astarte_interfaces::schema::{MappingType, Ownership};
28use astarte_interfaces::{Properties, Schema};
29use rusqlite::ToSql;
30use rusqlite::types::{FromSql, FromSqlError};
31use serde::{Deserialize, Serialize};
32use statements::include_query;
33use tracing::{debug, error, info, instrument, trace};
34
35use self::connection::SqliteConnection;
36use self::pool::Connections;
37use super::{OptStoredProp, PropertyMapping, PropertyStore, StoreCapabilities, StoredProp};
38use crate::store::sqlite::options::SqliteOptions;
39use crate::{
40    store::PropertyState,
41    transport::mqtt::payload::{Payload, PayloadError},
42    types::{AstarteData, TypeError, de::BsonConverter},
43};
44
45pub(crate) mod connection;
46pub mod options;
47pub(crate) mod pool;
48pub(crate) mod statements;
49
50/// Milliseconds for the busy timeout
51///
52/// <https://www.sqlite.org/c3ref/busy_timeout.html>
53pub const SQLITE_BUSY_TIMEOUT: u16 = Duration::from_secs(5).as_millis() as u16;
54
55/// Cache size in kibibytes
56///
57/// <https://www.sqlite.org/pragma.html#pragma_cache_size>
58pub const SQLITE_CACHE_SIZE: i16 =
59    -(Size::MiB(NonZero::<u64>::new(2).unwrap()).to_kibibytes_ceil() as i16);
60/// Max journal size
61///
62/// The default value specidfied in <https://www.sqlite.org/pragma.html#pragma_journal_size_limit> is -1
63/// which does not set an effective limit, therefore we assume a default size of 64 mebibytes
64pub const SQLITE_JOURNAL_SIZE_LIMIT: Size = Size::MiB(NonZero::<u64>::new(64).unwrap());
65
66/// Default database size
67pub const SQLITE_DEFAULT_DB_MAX_SIZE: Size = Size::GiB(NonZero::<u64>::new(1).unwrap());
68
69/// SQLite maximum number of pages in the database.
70///
71/// <https://www.sqlite.org/limits.html>
72pub const SQLITE_MAX_PAGE_COUNT: u32 = 4294967294;
73
74/// SQLite auto checkpoint limit, a checkpoint will run whenever the log
75/// is equal or above this size.
76///
77/// <https://www.sqlite.org/pragma.html#pragma_wal_autocheckpoint>
78pub const SQLITE_WAL_AUTOCHECKPOINT: u32 = 1000;
79
80/// Maximum number of reader connections to create.
81///
82/// This is the default if we cannot access the available_parallelism
83pub const DEFAULT_MAX_READERS: NonZero<usize> = NonZero::<usize>::new(4).unwrap();
84
85/// Error returned by the [`SqliteStore`].
86#[non_exhaustive]
87#[derive(Debug, thiserror::Error)]
88pub enum SqliteError {
89    /// Error returned when the database connection fails.
90    #[error("could not connect to database")]
91    Connection(#[source] rusqlite::Error),
92    /// Couldn't set SQLite option.
93    #[error("couldn't set database option")]
94    Option(#[source] rusqlite::Error),
95    /// Couldn't prepare the SQLite statement.
96    #[error("couldn't prepare sqlite statement")]
97    Prepare(#[source] rusqlite::Error),
98    /// Couldn't start a transaction.
99    #[error("could not start a transaction database")]
100    Transaction(#[source] rusqlite::Error),
101    /// Couldn't run migration
102    #[error("couldn't run migration")]
103    Migration(#[source] rusqlite::Error),
104    /// Error returned when the database query fails.
105    #[error("could not execute query")]
106    Query(#[from] rusqlite::Error),
107    /// Couldn't convert the stored value.
108    #[error("couldn't convert the stored value")]
109    Value(#[from] ValueError),
110    /// Couldn't convert ownership value
111    #[error("could not deserialize ownership")]
112    Ownership(#[from] OwnershipError),
113    /// Couldn't set max size
114    #[error("couldn't set max size {ctx}")]
115    InvalidMaxSize {
116        /// Context of the error
117        ctx: &'static str,
118    },
119    /// Couldn't acquire a reader permit
120    #[error("couldn't acquire a reader permit")]
121    Reader,
122    /// Couldn't join the connection task
123    #[error("couldn't join the connection task")]
124    Join,
125    /// Couldn't convert passed input
126    #[error("couldn't convert passed input")]
127    Conversion(usize),
128}
129
130/// Error when converting a u8 into the [`Ownership`] struct.
131#[derive(Debug, thiserror::Error)]
132#[error("invalid ownership value {value}")]
133pub struct OwnershipError {
134    value: u8,
135}
136
137/// Ownership of a property.
138///
139/// The ownership is an enum stored as an single byte integer (u8) in the SQLite database, the values
140/// for the enum are:
141/// - **Device owned**: 0
142/// - **Server owned**: 1
143#[derive(Debug, Clone, Copy)]
144#[repr(u8)]
145enum RecordOwnership {
146    Device = 0,
147    Server = 1,
148}
149
150impl From<RecordOwnership> for Ownership {
151    fn from(value: RecordOwnership) -> Self {
152        match value {
153            RecordOwnership::Device => Ownership::Device,
154            RecordOwnership::Server => Ownership::Server,
155        }
156    }
157}
158
159impl From<Ownership> for RecordOwnership {
160    fn from(value: Ownership) -> Self {
161        match value {
162            Ownership::Device => RecordOwnership::Device,
163            Ownership::Server => RecordOwnership::Server,
164        }
165    }
166}
167
168impl ToSql for RecordOwnership {
169    fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
170        Ok((*self as u8).into())
171    }
172}
173
174impl FromSql for RecordOwnership {
175    fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
176        let value = u8::column_result(value)?;
177
178        match value {
179            0 => Ok(RecordOwnership::Device),
180            1 => Ok(RecordOwnership::Server),
181            _ => Err(FromSqlError::Other(OwnershipError { value }.into())),
182        }
183    }
184}
185
186/// Error when de/serializing a value stored in the [`SqliteStore`].
187#[non_exhaustive]
188#[derive(Debug, thiserror::Error)]
189pub enum ValueError {
190    /// Couldn't convert to AstarteData.
191    #[error("couldn't convert to AstarteData")]
192    Conversion(#[from] TypeError),
193    /// Couldn't decode the BSON buffer.
194    #[error("couldn't decode property from bson")]
195    Decode(#[source] PayloadError),
196    /// Couldn't encode the BSON buffer.
197    #[error("couldn't encode property from bson")]
198    Encode(#[source] PayloadError),
199    /// Unsupported [`AstarteData`].
200    #[error("unsupported property type {0}")]
201    UnsupportedType(&'static str),
202    /// Unsupported [`AstarteData`].
203    #[error("unsupported stored type {0}, expected [0-13]")]
204    StoredType(u8),
205}
206
207/// Dimension of the database
208#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
209#[serde(tag = "unit", content = "value")]
210pub enum Size {
211    /// Dimension expressed in KiloBytes
212    #[serde(rename = "kb")]
213    Kb(NonZero<u64>),
214    /// Dimension expressed in MegaBytes
215    #[serde(rename = "mb")]
216    Mb(NonZero<u64>),
217    /// Dimension expressed in GigaBytes
218    #[serde(rename = "gb")]
219    Gb(NonZero<u64>),
220    /// Dimension expressed in KibiBytes
221    #[serde(rename = "kib")]
222    KiB(NonZero<u64>),
223    /// Dimension expressed in MebiBytes
224    #[serde(rename = "mib")]
225    MiB(NonZero<u64>),
226    /// Dimension expressed in GibiBytes
227    #[serde(rename = "gib")]
228    GiB(NonZero<u64>),
229}
230
231impl Size {
232    const ONE: NonZero<u32> = NonZero::<u32>::new(1).unwrap();
233
234    const KB: NonZero<u64> = NonZero::<u64>::new(1000).unwrap();
235    const MB: NonZero<u64> = NonZero::<u64>::new(1000 * 1000).unwrap();
236    const GB: NonZero<u64> = NonZero::<u64>::new(1000 * 1000 * 1000).unwrap();
237    const KI_B: NonZero<u64> = NonZero::<u64>::new(1024).unwrap();
238    const MI_B: NonZero<u64> = NonZero::<u64>::new(1024 * 1024).unwrap();
239    const GI_B: NonZero<u64> = NonZero::<u64>::new(1024 * 1024 * 1024).unwrap();
240
241    /// Convert the size to bytes
242    const fn to_bytes(self) -> NonZero<u64> {
243        match self {
244            Size::Kb(kb) => kb.saturating_mul(Self::KB),
245            Size::Mb(mb) => mb.saturating_mul(Self::MB),
246            Size::Gb(gb) => gb.saturating_mul(Self::GB),
247            Size::KiB(kib) => kib.saturating_mul(Self::KI_B),
248            Size::MiB(mib) => mib.saturating_mul(Self::MI_B),
249            Size::GiB(gib) => gib.saturating_mul(Self::GI_B),
250        }
251    }
252
253    const fn to_kibibytes_ceil(self) -> u64 {
254        self.to_bytes().get().div_ceil(1024)
255    }
256
257    /// Approximate the max page count with the page size, with a minimum of 1 page
258    #[instrument]
259    fn into_page_count(self, page_size: NonZero<u64>) -> NonZero<u32> {
260        let value = u32::try_from(self.to_bytes().get().div_euclid(page_size.get()))
261            // default value
262            .unwrap_or(SQLITE_MAX_PAGE_COUNT);
263
264        trace!(pages = value, "calculated pages");
265
266        // we must have at least one page
267        NonZero::<u32>::new(value).unwrap_or(Self::ONE)
268    }
269
270    /// Calculate the into_wall_autocheckpoint page count to be at 1/10 of the journal_size_limit
271    ///  if it's less than 1000 pages.
272    #[instrument]
273    fn into_wall_autocheckpoint(self, page_size: NonZero<u64>) -> NonZero<u32> {
274        let journal_pages = self.into_page_count(page_size);
275
276        let pages = journal_pages
277            .get()
278            .div_euclid(10)
279            // upper bound
280            .min(SQLITE_WAL_AUTOCHECKPOINT);
281
282        trace!(pages, "calculated pages");
283
284        // we must have at least one page
285        NonZero::<u32>::new(pages).unwrap_or(Self::ONE)
286    }
287}
288
289/// Result of the load_prop query
290#[derive(Clone)]
291struct PropRecord {
292    value: Option<Vec<u8>>,
293    stored_type: u8,
294    interface_major: i32,
295}
296
297impl PropRecord {
298    fn try_into_value(self) -> Result<Option<AstarteData>, ValueError> {
299        self.value
300            .map(|value| deserialize_prop(self.stored_type, &value))
301            .transpose()
302    }
303}
304
305impl Debug for PropRecord {
306    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307        use itertools::Itertools;
308
309        // Print the value as an hex string instead of an array of numbers
310
311        let mut d = f.debug_struct("PropRecord");
312
313        d.field("interface_major", &self.interface_major);
314
315        match &self.value {
316            Some(value) => {
317                let hex_value = value
318                    .iter()
319                    .format_with("", |element, f| f(&format_args!("Some({element:x})")));
320
321                d.field("value", &format_args!("{hex_value}"))
322            }
323            None => d.field("value", &self.value),
324        }
325        .finish()
326    }
327}
328
329/// Error when converting a u8 into the [`PropertyState`] struct.
330#[derive(Debug, thiserror::Error)]
331#[error("invalid property state value {value}")]
332pub struct PropertyStateError {
333    value: u8,
334}
335
336#[derive(Debug, Copy, Clone)]
337#[repr(u8)]
338enum RecordPropertyState {
339    Changed = 0,
340    Completed = 1,
341}
342
343impl From<PropertyState> for RecordPropertyState {
344    fn from(value: PropertyState) -> Self {
345        match value {
346            PropertyState::Changed => Self::Changed,
347            PropertyState::Completed => Self::Completed,
348        }
349    }
350}
351
352impl From<RecordPropertyState> for PropertyState {
353    fn from(value: RecordPropertyState) -> Self {
354        match value {
355            RecordPropertyState::Changed => Self::Changed,
356            RecordPropertyState::Completed => Self::Completed,
357        }
358    }
359}
360
361impl ToSql for RecordPropertyState {
362    fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
363        Ok((*self as u8).into())
364    }
365}
366
367impl FromSql for RecordPropertyState {
368    fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
369        let value = u8::column_result(value)?;
370
371        match value {
372            0 => Ok(RecordPropertyState::Changed),
373            1 => Ok(RecordPropertyState::Completed),
374            _ => Err(FromSqlError::Other(PropertyStateError { value }.into())),
375        }
376    }
377}
378
379/// Result of the load_all_props query
380#[derive(Debug, Clone)]
381struct StoredRecord {
382    interface: String,
383    path: String,
384    value: Option<Vec<u8>>,
385    stored_type: u8,
386    interface_major: i32,
387    ownership: RecordOwnership,
388}
389
390impl StoredRecord {
391    pub(crate) fn try_into_prop(self) -> Result<Option<StoredProp>, SqliteError> {
392        let Some(value) = self.value else {
393            return Ok(None);
394        };
395
396        let value = deserialize_prop(self.stored_type, &value)?;
397
398        Ok(Some(StoredProp {
399            interface: self.interface,
400            path: self.path,
401            value,
402            interface_major: self.interface_major,
403            ownership: self.ownership.into(),
404        }))
405    }
406}
407
408impl TryFrom<StoredRecord> for OptStoredProp {
409    type Error = SqliteError;
410
411    fn try_from(record: StoredRecord) -> Result<Self, Self::Error> {
412        let value = record
413            .value
414            .map(|value| deserialize_prop(record.stored_type, &value))
415            .transpose()?;
416
417        Ok(Self {
418            interface: record.interface,
419            path: record.path,
420            value,
421            interface_major: record.interface_major,
422            ownership: record.ownership.into(),
423        })
424    }
425}
426
427fn into_stored_type(value: &AstarteData) -> Result<u8, ValueError> {
428    let mapping_type = match value {
429        AstarteData::Double(_) => 1,
430        AstarteData::Integer(_) => 2,
431        AstarteData::Boolean(_) => 3,
432        AstarteData::LongInteger(_) => 4,
433        AstarteData::String(_) => 5,
434        AstarteData::BinaryBlob(_) => 6,
435        AstarteData::DateTime(_) => 7,
436        AstarteData::DoubleArray(_) => 8,
437        AstarteData::IntegerArray(_) => 9,
438        AstarteData::BooleanArray(_) => 10,
439        AstarteData::LongIntegerArray(_) => 11,
440        AstarteData::StringArray(_) => 12,
441        AstarteData::BinaryBlobArray(_) => 13,
442        AstarteData::DateTimeArray(_) => 14,
443    };
444
445    Ok(mapping_type)
446}
447
448fn from_stored_type(value: u8) -> Result<MappingType, ValueError> {
449    let mapping_type = match value {
450        1 => MappingType::Double,
451        2 => MappingType::Integer,
452        3 => MappingType::Boolean,
453        4 => MappingType::LongInteger,
454        5 => MappingType::String,
455        6 => MappingType::BinaryBlob,
456        7 => MappingType::DateTime,
457        8 => MappingType::DoubleArray,
458        9 => MappingType::IntegerArray,
459        10 => MappingType::BooleanArray,
460        11 => MappingType::LongIntegerArray,
461        12 => MappingType::StringArray,
462        13 => MappingType::BinaryBlobArray,
463        14 => MappingType::DateTimeArray,
464        0 | 15.. => {
465            return Err(ValueError::StoredType(value));
466        }
467    };
468
469    Ok(mapping_type)
470}
471
472/// Data structure providing an implementation of a sqlite database.
473///
474/// Can be used by an Astarte device to store permanently properties values and published with
475/// retention stored.
476///
477/// The properties are stored as a BSON serialized SQLite BLOB. That can be then deserialized in the
478/// respective [`AstarteData`].
479///
480/// The retention is stored as a BLOB serialized by the connection.
481#[derive(Clone, Debug)]
482pub struct SqliteStore {
483    pub(crate) pool: Arc<Connections>,
484}
485
486impl SqliteStore {
487    /// Configures the SQLite connection
488    pub fn options() -> SqliteOptions {
489        SqliteOptions::default()
490    }
491
492    /// Creates a SQLite database for the Astarte device.
493    async fn new(db_file: PathBuf, options: SqliteOptions) -> Result<Self, SqliteError> {
494        let sqlite_store = SqliteStore {
495            pool: Arc::new(Connections::new(db_file, options)),
496        };
497
498        sqlite_store.migrate().await?;
499
500        debug!("vacuum the database");
501
502        sqlite_store
503            .pool
504            .acquire_writer(|writer| {
505                writer
506                    .execute("PRAGMA incremental_vacuum", ())
507                    .map_err(SqliteError::Option)
508            })
509            .await?;
510
511        Ok(sqlite_store)
512    }
513
514    /// Connect to the SQLite database using the default db name in the writable path.
515    ///
516    /// # Example
517    ///
518    /// ```no_run
519    /// # use astarte_device_sdk::store::sqlite::{SqliteStore, options::SqliteOptions};
520    ///
521    /// #[tokio::main]
522    /// async fn main() {
523    ///     let store = SqliteStore::with_writable_dir("/val/lib/astarte/", SqliteOptions::default())
524    ///         .await
525    ///         .expect("should connect");
526    /// }
527    /// ```
528    pub async fn with_writable_dir(
529        writable_path: impl AsRef<Path>,
530        options: SqliteOptions,
531    ) -> Result<Self, SqliteError> {
532        let path = writable_path.as_ref();
533
534        if let Err(error) = tokio::fs::create_dir_all(path).await {
535            error!(%error,path = %path.display(), "couldn't create writable path for database");
536        }
537
538        // TODO: rename this since it doesn't store only properties
539        let db = path.join("prop-cache.db");
540
541        Self::new(db, options).await
542    }
543
544    /// Connect to the SQLite database give as a filename.
545    ///
546    /// # Example
547    ///
548    /// ```no_run
549    /// # use astarte_device_sdk::store::sqlite::{SqliteStore, options::SqliteOptions};
550    ///
551    /// #[tokio::main]
552    /// async fn main() {
553    ///     let store = SqliteStore::with_db_file("/val/lib/astarte/store.db", SqliteOptions::default()).await.unwrap();
554    /// }
555    /// ```
556    pub async fn with_db_file(
557        database_file: impl AsRef<Path>,
558        options: SqliteOptions,
559    ) -> Result<Self, SqliteError> {
560        Self::new(database_file.as_ref().to_path_buf(), options).await
561    }
562
563    #[instrument(skip(self))]
564    async fn migrate(&self) -> Result<(), SqliteError> {
565        // Order is important
566        const MIGRATIONS: &[&str] = &[
567            include_query!("migrations/0001_init.sql"),
568            include_query!("migrations/0002_unset_property.sql"),
569            include_query!("migrations/0003_session.sql"),
570            include_query!("migrations/0004_sent_properties.sql"),
571        ];
572        const USER_VERSION: u32 = {
573            assert!(MIGRATIONS.len() < (u32::MAX as usize));
574
575            MIGRATIONS.len() as u32
576        };
577
578        self.pool
579            .acquire_writer(|writer| -> Result<(), SqliteError> {
580                // re-run migrations on error
581                let version: usize = writer
582                    .get_pragma::<u32>("user_version")
583                    .ok()
584                    .and_then(|value| usize::try_from(value).ok())
585                    .unwrap_or(0);
586
587                debug!(
588                    current = version,
589                    migrations = MIGRATIONS.len(),
590                    "checking migrations"
591                );
592
593                if version >= MIGRATIONS.len() {
594                    info!("no migration to run");
595
596                    return Ok(());
597                }
598
599                for migration in &MIGRATIONS[version..] {
600                    writer
601                        .execute_batch(migration)
602                        .map_err(SqliteError::Migration)?;
603                }
604
605                debug!(version = MIGRATIONS.len(), "setting new database version");
606
607                writer.set_pragma("user_version", &USER_VERSION)?;
608
609                info!("store migrated to new version");
610
611                Ok(())
612            })
613            .await?;
614
615        Ok(())
616    }
617}
618
619impl StoreCapabilities for SqliteStore {
620    type Retention = Self;
621    type Session = Self;
622
623    fn get_retention(&self) -> Option<&Self::Retention> {
624        Some(self)
625    }
626
627    fn get_session(&self) -> Option<&Self::Session> {
628        Some(self)
629    }
630}
631
632impl PropertyStore for SqliteStore {
633    type Err = SqliteError;
634
635    async fn store_prop(&self, prop: StoredProp<&str, &AstarteData>) -> Result<(), Self::Err> {
636        trace!(
637            interface = prop.interface,
638            path = prop.path,
639            "storing property",
640        );
641
642        let buf = Payload::new(prop.value)
643            .to_vec()
644            .map_err(ValueError::Encode)?;
645
646        let prop = StoredProp::<String, AstarteData>::from(prop);
647        self.pool
648            .acquire_writer(move |writer| writer.store_prop((&prop).into(), &buf))
649            .await?;
650
651        Ok(())
652    }
653
654    async fn update_state(
655        &self,
656        property: &PropertyMapping<'_>,
657        state: PropertyState,
658        expected: Option<AstarteData>,
659    ) -> Result<bool, Self::Err> {
660        let interface_name = property.interface_name().to_string();
661        let path = property.path().to_string();
662
663        let updated = self
664            .pool
665            .acquire_writer(move |writer| {
666                writer.update_state(&interface_name, &path, expected.as_ref(), state)
667            })
668            .await?;
669
670        Ok(updated > 0)
671    }
672
673    async fn load_prop(
674        &self,
675        property: &PropertyMapping<'_>,
676    ) -> Result<Option<AstarteData>, Self::Err> {
677        let interface_name = property.interface_name().to_string();
678        let path = property.path().to_string();
679
680        let opt_record = self
681            .pool
682            .acquire_reader(move |reader| reader.load_prop(&interface_name, &path))
683            .await?;
684
685        match opt_record {
686            Some(record) => {
687                trace!(
688                    interface = property.interface_name(),
689                    path = property.path(),
690                    "loaded property",
691                );
692
693                // if version mismatch, delete
694                if record.interface_major != property.version_major() {
695                    error!(
696                        "Version mismatch for property {}{} (stored {}, interface {}). Deleting.",
697                        property.interface_name(),
698                        property.path(),
699                        record.interface_major,
700                        property.version_major()
701                    );
702
703                    self.delete_prop(property).await?;
704
705                    return Ok(None);
706                }
707
708                record.try_into_value().map_err(SqliteError::Value)
709            }
710            None => Ok(None),
711        }
712    }
713
714    async fn unset_prop(&self, property: &PropertyMapping<'_>) -> Result<(), Self::Err> {
715        let interface_name = property.interface_name().to_string();
716        let path = property.path().to_string();
717        self.pool
718            .acquire_writer(move |writer| writer.unset_prop(&interface_name, &path))
719            .await
720    }
721
722    async fn delete_prop(&self, property: &PropertyMapping<'_>) -> Result<(), Self::Err> {
723        let interface_name = property.interface_name().to_string();
724        let path = property.path().to_string();
725        self.pool
726            .acquire_writer(move |writer| writer.delete_prop(&interface_name, &path))
727            .await
728    }
729
730    async fn delete_expected_prop(
731        &self,
732        property: &PropertyMapping<'_>,
733        expected: Option<AstarteData>,
734    ) -> Result<bool, Self::Err> {
735        let interface_name = property.interface_name().to_string();
736        let path = property.path().to_string();
737
738        let updated = self
739            .pool
740            .acquire_writer(move |writer| {
741                writer.delete_expected_prop(&interface_name, &path, expected.as_ref())
742            })
743            .await?;
744
745        Ok(updated > 0)
746    }
747
748    async fn clear(&self) -> Result<(), Self::Err> {
749        self.pool
750            .acquire_writer(|writer| writer.clear_props())
751            .await
752    }
753
754    async fn load_all_props(&self) -> Result<Vec<StoredProp>, Self::Err> {
755        self.pool
756            .acquire_reader(|reader| reader.load_all_props())
757            .await
758    }
759
760    async fn device_props(&self) -> Result<Vec<StoredProp>, Self::Err> {
761        self.pool
762            .acquire_reader(|reader| reader.props_with_ownership(Ownership::Device))
763            .await
764    }
765
766    async fn server_props(&self) -> Result<Vec<StoredProp>, Self::Err> {
767        self.pool
768            .acquire_reader(|reader| reader.props_with_ownership(Ownership::Server))
769            .await
770    }
771
772    async fn interface_props(&self, interface: &Properties) -> Result<Vec<StoredProp>, Self::Err> {
773        let interface_name = interface.name().to_string();
774
775        self.pool
776            .acquire_reader(move |reader| reader.interface_props(&interface_name))
777            .await
778    }
779
780    async fn delete_interface(&self, interface: &Properties) -> Result<(), Self::Err> {
781        let interface_name = interface.name().to_string();
782
783        self.pool
784            .acquire_writer(move |writer| writer.delete_interface_props(&interface_name))
785            .await
786    }
787
788    async fn device_props_with_unset(
789        &self,
790        state: PropertyState,
791        limit: usize,
792        offset: usize,
793    ) -> Result<Vec<OptStoredProp>, Self::Err> {
794        self.pool
795            .acquire_reader(move |reader| {
796                reader.props_with_unset(Ownership::Device, state, limit, offset)
797            })
798            .await
799    }
800
801    async fn reset_state(&self, ownership: Ownership) -> Result<(), Self::Err> {
802        self.pool
803            .acquire_writer(move |writer| writer.reset_state(ownership))
804            .await
805    }
806}
807
808/// Deserialize a property from the store.
809fn deserialize_prop(stored_type: u8, buf: &[u8]) -> Result<AstarteData, ValueError> {
810    let mapping_type = from_stored_type(stored_type)?;
811
812    let payload = Payload::from_slice(buf).map_err(ValueError::Decode)?;
813    let value = BsonConverter::new(mapping_type, payload.value);
814
815    value.try_into().map_err(ValueError::from)
816}
817
818#[cfg(test)]
819mod tests {
820    use pretty_assertions::assert_eq;
821
822    use super::*;
823    use crate::store::tests::test_property_store;
824
825    #[tokio::test]
826    async fn test_sqlite_store() {
827        let dir = tempfile::tempdir().unwrap();
828
829        let db = SqliteStore::options()
830            .with_writable_dir(dir.path())
831            .await
832            .unwrap();
833
834        test_property_store(db).await;
835    }
836
837    #[tokio::test]
838    async fn multiple_db_per_thread() {
839        let dir1 = tempfile::tempdir().unwrap();
840        let dir2 = tempfile::tempdir().unwrap();
841
842        let db1 = SqliteStore::options()
843            .with_writable_dir(dir1.path())
844            .await
845            .unwrap();
846
847        let test = |store: SqliteStore| async move {
848            let value = AstarteData::Integer(42);
849            let prop = StoredProp {
850                interface: "com.test",
851                path: "/test",
852                value: &value,
853                interface_major: 1,
854                ownership: Ownership::Device,
855            };
856            let prop_interface_data = PropertyMapping::from(&prop);
857
858            store.store_prop(prop).await.unwrap();
859            assert_eq!(
860                store
861                    .load_prop(&prop_interface_data)
862                    .await
863                    .unwrap()
864                    .unwrap(),
865                value
866            );
867        };
868
869        (test)(db1).await;
870
871        let db2 = SqliteStore::options()
872            .with_writable_dir(dir2.path())
873            .await
874            .unwrap();
875
876        (test)(db2).await;
877    }
878
879    #[tokio::test]
880    async fn set_max_pages_cannot_shrink() {
881        let dir = tempfile::tempdir().unwrap();
882
883        {
884            let db = SqliteStore::options()
885                .with_writable_dir(dir.path())
886                .await
887                .unwrap();
888
889            let page_size: i64 = db
890                .pool
891                .acquire_writer(|writer| writer.get_pragma("page_size"))
892                .await
893                .unwrap();
894
895            db.store_prop(StoredProp {
896                interface: "interface",
897                path: "/path",
898                value: &AstarteData::BinaryBlob(vec![1; usize::try_from(page_size).unwrap() * 3]),
899                interface_major: 0,
900                ownership: Ownership::Device,
901            })
902            .await
903            .unwrap();
904        }
905
906        let err = SqliteStore::options()
907            .set_max_page_count(NonZero::new(1).unwrap())
908            .with_writable_dir(dir.path())
909            .await
910            .unwrap_err();
911
912        assert!(matches!(
913            err,
914            SqliteError::InvalidMaxSize {
915                ctx,
916            } if ctx == "cannot shrink the database"
917        ));
918    }
919
920    #[tokio::test]
921    async fn store_cannot_exceed_max_pages() {
922        let dir = tempfile::tempdir().unwrap();
923
924        let (page_size, page_count) = {
925            let db = SqliteStore::options()
926                .with_writable_dir(dir.path())
927                .await
928                .unwrap();
929
930            db.pool
931                .acquire_writer(|writer| -> Result<_, SqliteError> {
932                    let size = writer.get_pragma::<i64>("page_size")?;
933                    let count = writer.get_pragma::<i64>("page_count")?;
934                    Ok((size, count))
935                })
936                .await
937                .unwrap()
938        };
939
940        let max_page_count = u32::try_from(page_count)
941            .ok()
942            .and_then(NonZero::new)
943            .unwrap();
944
945        let db = SqliteStore::options()
946            .set_max_page_count(max_page_count)
947            .with_writable_dir(dir.path())
948            .await
949            .unwrap();
950
951        let size = usize::try_from(page_size * page_count + 1).unwrap();
952
953        let err = db
954            .store_prop(StoredProp {
955                interface: "interface",
956                path: "/path",
957                value: &AstarteData::BinaryBlob(vec![1; size]),
958                interface_major: 0,
959                ownership: Ownership::Device,
960            })
961            .await
962            .unwrap_err();
963
964        assert!(matches!(
965            err,
966            SqliteError::Query(err) if err.sqlite_error_code() == Some(rusqlite::ErrorCode::DiskFull)
967        ));
968    }
969
970    #[tokio::test]
971    async fn set_max_pages() {
972        let exp_count = 15;
973        let max = NonZero::new(exp_count).unwrap();
974
975        let dir = tempfile::tempdir().unwrap();
976
977        let db = SqliteStore::options()
978            .set_max_page_count(max)
979            .with_writable_dir(dir.path())
980            .await
981            .unwrap();
982
983        let page_count: u32 = db
984            .pool
985            .acquire_writer(|writer| writer.get_pragma("max_page_count"))
986            .await
987            .unwrap();
988
989        assert_eq!(page_count, exp_count);
990    }
991
992    #[tokio::test]
993    async fn set_db_max_size() {
994        let dir = tempfile::tempdir().unwrap();
995
996        let db = SqliteStore::options()
997            .set_db_max_size(Size::MiB(NonZero::new(4).unwrap()))
998            .with_writable_dir(dir.path())
999            .await
1000            .unwrap();
1001
1002        let max_page_count: u32 = db
1003            .pool
1004            .acquire_writer(|writer| writer.get_pragma("max_page_count"))
1005            .await
1006            .unwrap();
1007
1008        assert_eq!(max_page_count, 1024);
1009    }
1010
1011    #[tokio::test]
1012    async fn set_db_max_size_min() {
1013        let dir = tempfile::tempdir().unwrap();
1014
1015        // set the max size considering the default page size of 4096 bytes
1016        // NOTE since the limit is set after the database is created we can't shrink an
1017        // already created database this means that setting a 1KB limit is currently not supported
1018        // even a 1 page limit (4096B) would not work
1019        let size = Size::Kb(NonZero::<u64>::new(1).unwrap());
1020
1021        SqliteStore::options()
1022            .set_db_max_size(size)
1023            .with_writable_dir(dir.path())
1024            .await
1025            .unwrap_err();
1026    }
1027
1028    #[test]
1029    fn size_to_kibibytes_ceil_min() {
1030        let size = Size::Kb(NonZero::<u64>::new(1).unwrap());
1031        assert_eq!(size.to_kibibytes_ceil(), 1);
1032    }
1033
1034    #[tokio::test]
1035    async fn set_journal_size_limit() {
1036        let dir = tempfile::tempdir().unwrap();
1037
1038        let size = Size::MiB(NonZero::<u64>::new(1).unwrap());
1039
1040        let db = SqliteStore::options()
1041            .set_journal_size_limit(size)
1042            .with_writable_dir(dir.path())
1043            .await
1044            .unwrap();
1045
1046        let journal_size: i64 = db
1047            .pool
1048            .acquire_writer(|writer| writer.get_pragma("journal_size_limit"))
1049            .await
1050            .unwrap();
1051        assert_eq!(journal_size, 1024 * 1024);
1052
1053        let wal_autocheckpoint: u32 = db
1054            .pool
1055            .acquire_writer(|writer| writer.get_pragma("wal_autocheckpoint"))
1056            .await
1057            .unwrap();
1058
1059        // autocheckpoin is set to a fraction of the journal_size in pages (pages / 10)
1060        // in this case
1061        //
1062        // 1MiB / 4096 = 256
1063        // 256 / 10 = 25
1064        assert_eq!(wal_autocheckpoint, 25);
1065    }
1066
1067    #[tokio::test]
1068    async fn set_journal_size_limit_min() {
1069        let dir = tempfile::tempdir().unwrap();
1070
1071        let size = Size::Kb(NonZero::<u64>::new(1).unwrap());
1072
1073        let db = SqliteStore::options()
1074            .set_journal_size_limit(size)
1075            .with_writable_dir(dir.path())
1076            .await
1077            .unwrap();
1078
1079        let journal_size: u32 = db
1080            .pool
1081            .acquire_writer(|writer| writer.get_pragma("journal_size_limit"))
1082            .await
1083            .unwrap();
1084
1085        assert_eq!(journal_size, 1000);
1086
1087        let wal_autocheckpoint: u32 = db
1088            .pool
1089            .acquire_writer(|writer| writer.get_pragma("wal_autocheckpoint"))
1090            .await
1091            .unwrap();
1092
1093        // autocheckpoin is set to a fraction of the journal_size in pages
1094        // in this case the size in pages is 0
1095        //
1096        // (1KB / 4096 = 0) but the minimum we allow is 1
1097        assert_eq!(wal_autocheckpoint, 1);
1098    }
1099
1100    #[test]
1101    fn size_to_bytes() {
1102        let size = Size::Kb(NonZero::<u64>::new(1).unwrap());
1103        assert_eq!(size.to_bytes().get(), 1000);
1104
1105        let size = Size::Mb(NonZero::<u64>::new(1).unwrap());
1106        assert_eq!(size.to_bytes().get(), 1000 * 1000);
1107
1108        let size = Size::Gb(NonZero::<u64>::new(1).unwrap());
1109        assert_eq!(size.to_bytes().get(), 1000 * 1000 * 1000);
1110    }
1111
1112    #[test]
1113    fn size_to_kib() {
1114        let size = Size::KiB(NonZero::<u64>::new(1).unwrap());
1115        assert_eq!(size.to_bytes().get(), 1024);
1116
1117        let size = Size::MiB(NonZero::<u64>::new(1).unwrap());
1118        assert_eq!(size.to_bytes().get(), 1024 * 1024);
1119
1120        let size = Size::GiB(NonZero::<u64>::new(1).unwrap());
1121        assert_eq!(size.to_bytes().get(), 1024 * 1024 * 1024);
1122    }
1123
1124    #[test]
1125    fn should_serialize_deserialize_size() {
1126        let expected = r#"{"unit":"gb","value":2}"#;
1127
1128        let size = Size::Gb(2.try_into().unwrap());
1129        let out = serde_json::to_string(&size).unwrap();
1130
1131        let deser_size: Size = serde_json::from_str(&out).unwrap();
1132
1133        assert_eq!(out, expected);
1134        assert_eq!(size, deser_size);
1135    }
1136}