1use std::fmt::Debug;
22use std::num::NonZero;
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25use std::time::Duration;
26
27use astarte_interfaces::schema::{MappingType, Ownership};
28use astarte_interfaces::{Properties, Schema};
29use rusqlite::ToSql;
30use rusqlite::types::{FromSql, FromSqlError};
31use serde::{Deserialize, Serialize};
32use statements::include_query;
33use tracing::{debug, error, info, instrument, trace};
34
35use self::connection::SqliteConnection;
36use self::pool::Connections;
37use super::{OptStoredProp, PropertyMapping, PropertyStore, StoreCapabilities, StoredProp};
38use crate::store::sqlite::options::SqliteOptions;
39use crate::{
40 store::PropertyState,
41 transport::mqtt::payload::{Payload, PayloadError},
42 types::{AstarteData, TypeError, de::BsonConverter},
43};
44
45pub(crate) mod connection;
46pub mod options;
47pub(crate) mod pool;
48pub(crate) mod statements;
49
50pub const SQLITE_BUSY_TIMEOUT: u16 = Duration::from_secs(5).as_millis() as u16;
54
55pub const SQLITE_CACHE_SIZE: i16 =
59 -(Size::MiB(NonZero::<u64>::new(2).unwrap()).to_kibibytes_ceil() as i16);
60pub const SQLITE_JOURNAL_SIZE_LIMIT: Size = Size::MiB(NonZero::<u64>::new(64).unwrap());
65
66pub const SQLITE_DEFAULT_DB_MAX_SIZE: Size = Size::GiB(NonZero::<u64>::new(1).unwrap());
68
69pub const SQLITE_MAX_PAGE_COUNT: u32 = 4294967294;
73
74pub const SQLITE_WAL_AUTOCHECKPOINT: u32 = 1000;
79
80pub const DEFAULT_MAX_READERS: NonZero<usize> = NonZero::<usize>::new(4).unwrap();
84
85#[non_exhaustive]
87#[derive(Debug, thiserror::Error)]
88pub enum SqliteError {
89 #[error("could not connect to database")]
91 Connection(#[source] rusqlite::Error),
92 #[error("couldn't set database option")]
94 Option(#[source] rusqlite::Error),
95 #[error("couldn't prepare sqlite statement")]
97 Prepare(#[source] rusqlite::Error),
98 #[error("could not start a transaction database")]
100 Transaction(#[source] rusqlite::Error),
101 #[error("couldn't run migration")]
103 Migration(#[source] rusqlite::Error),
104 #[error("could not execute query")]
106 Query(#[from] rusqlite::Error),
107 #[error("couldn't convert the stored value")]
109 Value(#[from] ValueError),
110 #[error("could not deserialize ownership")]
112 Ownership(#[from] OwnershipError),
113 #[error("couldn't set max size {ctx}")]
115 InvalidMaxSize {
116 ctx: &'static str,
118 },
119 #[error("couldn't acquire a reader permit")]
121 Reader,
122 #[error("couldn't join the connection task")]
124 Join,
125 #[error("couldn't convert passed input")]
127 Conversion(usize),
128}
129
130#[derive(Debug, thiserror::Error)]
132#[error("invalid ownership value {value}")]
133pub struct OwnershipError {
134 value: u8,
135}
136
137#[derive(Debug, Clone, Copy)]
144#[repr(u8)]
145enum RecordOwnership {
146 Device = 0,
147 Server = 1,
148}
149
150impl From<RecordOwnership> for Ownership {
151 fn from(value: RecordOwnership) -> Self {
152 match value {
153 RecordOwnership::Device => Ownership::Device,
154 RecordOwnership::Server => Ownership::Server,
155 }
156 }
157}
158
159impl From<Ownership> for RecordOwnership {
160 fn from(value: Ownership) -> Self {
161 match value {
162 Ownership::Device => RecordOwnership::Device,
163 Ownership::Server => RecordOwnership::Server,
164 }
165 }
166}
167
168impl ToSql for RecordOwnership {
169 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
170 Ok((*self as u8).into())
171 }
172}
173
174impl FromSql for RecordOwnership {
175 fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
176 let value = u8::column_result(value)?;
177
178 match value {
179 0 => Ok(RecordOwnership::Device),
180 1 => Ok(RecordOwnership::Server),
181 _ => Err(FromSqlError::Other(OwnershipError { value }.into())),
182 }
183 }
184}
185
186#[non_exhaustive]
188#[derive(Debug, thiserror::Error)]
189pub enum ValueError {
190 #[error("couldn't convert to AstarteData")]
192 Conversion(#[from] TypeError),
193 #[error("couldn't decode property from bson")]
195 Decode(#[source] PayloadError),
196 #[error("couldn't encode property from bson")]
198 Encode(#[source] PayloadError),
199 #[error("unsupported property type {0}")]
201 UnsupportedType(&'static str),
202 #[error("unsupported stored type {0}, expected [0-13]")]
204 StoredType(u8),
205}
206
207#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
209#[serde(tag = "unit", content = "value")]
210pub enum Size {
211 #[serde(rename = "kb")]
213 Kb(NonZero<u64>),
214 #[serde(rename = "mb")]
216 Mb(NonZero<u64>),
217 #[serde(rename = "gb")]
219 Gb(NonZero<u64>),
220 #[serde(rename = "kib")]
222 KiB(NonZero<u64>),
223 #[serde(rename = "mib")]
225 MiB(NonZero<u64>),
226 #[serde(rename = "gib")]
228 GiB(NonZero<u64>),
229}
230
231impl Size {
232 const ONE: NonZero<u32> = NonZero::<u32>::new(1).unwrap();
233
234 const KB: NonZero<u64> = NonZero::<u64>::new(1000).unwrap();
235 const MB: NonZero<u64> = NonZero::<u64>::new(1000 * 1000).unwrap();
236 const GB: NonZero<u64> = NonZero::<u64>::new(1000 * 1000 * 1000).unwrap();
237 const KI_B: NonZero<u64> = NonZero::<u64>::new(1024).unwrap();
238 const MI_B: NonZero<u64> = NonZero::<u64>::new(1024 * 1024).unwrap();
239 const GI_B: NonZero<u64> = NonZero::<u64>::new(1024 * 1024 * 1024).unwrap();
240
241 const fn to_bytes(self) -> NonZero<u64> {
243 match self {
244 Size::Kb(kb) => kb.saturating_mul(Self::KB),
245 Size::Mb(mb) => mb.saturating_mul(Self::MB),
246 Size::Gb(gb) => gb.saturating_mul(Self::GB),
247 Size::KiB(kib) => kib.saturating_mul(Self::KI_B),
248 Size::MiB(mib) => mib.saturating_mul(Self::MI_B),
249 Size::GiB(gib) => gib.saturating_mul(Self::GI_B),
250 }
251 }
252
253 const fn to_kibibytes_ceil(self) -> u64 {
254 self.to_bytes().get().div_ceil(1024)
255 }
256
257 #[instrument]
259 fn into_page_count(self, page_size: NonZero<u64>) -> NonZero<u32> {
260 let value = u32::try_from(self.to_bytes().get().div_euclid(page_size.get()))
261 .unwrap_or(SQLITE_MAX_PAGE_COUNT);
263
264 trace!(pages = value, "calculated pages");
265
266 NonZero::<u32>::new(value).unwrap_or(Self::ONE)
268 }
269
270 #[instrument]
273 fn into_wall_autocheckpoint(self, page_size: NonZero<u64>) -> NonZero<u32> {
274 let journal_pages = self.into_page_count(page_size);
275
276 let pages = journal_pages
277 .get()
278 .div_euclid(10)
279 .min(SQLITE_WAL_AUTOCHECKPOINT);
281
282 trace!(pages, "calculated pages");
283
284 NonZero::<u32>::new(pages).unwrap_or(Self::ONE)
286 }
287}
288
289#[derive(Clone)]
291struct PropRecord {
292 value: Option<Vec<u8>>,
293 stored_type: u8,
294 interface_major: i32,
295}
296
297impl PropRecord {
298 fn try_into_value(self) -> Result<Option<AstarteData>, ValueError> {
299 self.value
300 .map(|value| deserialize_prop(self.stored_type, &value))
301 .transpose()
302 }
303}
304
305impl Debug for PropRecord {
306 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307 use itertools::Itertools;
308
309 let mut d = f.debug_struct("PropRecord");
312
313 d.field("interface_major", &self.interface_major);
314
315 match &self.value {
316 Some(value) => {
317 let hex_value = value
318 .iter()
319 .format_with("", |element, f| f(&format_args!("Some({element:x})")));
320
321 d.field("value", &format_args!("{hex_value}"))
322 }
323 None => d.field("value", &self.value),
324 }
325 .finish()
326 }
327}
328
329#[derive(Debug, thiserror::Error)]
331#[error("invalid property state value {value}")]
332pub struct PropertyStateError {
333 value: u8,
334}
335
336#[derive(Debug, Copy, Clone)]
337#[repr(u8)]
338enum RecordPropertyState {
339 Changed = 0,
340 Completed = 1,
341}
342
343impl From<PropertyState> for RecordPropertyState {
344 fn from(value: PropertyState) -> Self {
345 match value {
346 PropertyState::Changed => Self::Changed,
347 PropertyState::Completed => Self::Completed,
348 }
349 }
350}
351
352impl From<RecordPropertyState> for PropertyState {
353 fn from(value: RecordPropertyState) -> Self {
354 match value {
355 RecordPropertyState::Changed => Self::Changed,
356 RecordPropertyState::Completed => Self::Completed,
357 }
358 }
359}
360
361impl ToSql for RecordPropertyState {
362 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
363 Ok((*self as u8).into())
364 }
365}
366
367impl FromSql for RecordPropertyState {
368 fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
369 let value = u8::column_result(value)?;
370
371 match value {
372 0 => Ok(RecordPropertyState::Changed),
373 1 => Ok(RecordPropertyState::Completed),
374 _ => Err(FromSqlError::Other(PropertyStateError { value }.into())),
375 }
376 }
377}
378
379#[derive(Debug, Clone)]
381struct StoredRecord {
382 interface: String,
383 path: String,
384 value: Option<Vec<u8>>,
385 stored_type: u8,
386 interface_major: i32,
387 ownership: RecordOwnership,
388}
389
390impl StoredRecord {
391 pub(crate) fn try_into_prop(self) -> Result<Option<StoredProp>, SqliteError> {
392 let Some(value) = self.value else {
393 return Ok(None);
394 };
395
396 let value = deserialize_prop(self.stored_type, &value)?;
397
398 Ok(Some(StoredProp {
399 interface: self.interface,
400 path: self.path,
401 value,
402 interface_major: self.interface_major,
403 ownership: self.ownership.into(),
404 }))
405 }
406}
407
408impl TryFrom<StoredRecord> for OptStoredProp {
409 type Error = SqliteError;
410
411 fn try_from(record: StoredRecord) -> Result<Self, Self::Error> {
412 let value = record
413 .value
414 .map(|value| deserialize_prop(record.stored_type, &value))
415 .transpose()?;
416
417 Ok(Self {
418 interface: record.interface,
419 path: record.path,
420 value,
421 interface_major: record.interface_major,
422 ownership: record.ownership.into(),
423 })
424 }
425}
426
427fn into_stored_type(value: &AstarteData) -> Result<u8, ValueError> {
428 let mapping_type = match value {
429 AstarteData::Double(_) => 1,
430 AstarteData::Integer(_) => 2,
431 AstarteData::Boolean(_) => 3,
432 AstarteData::LongInteger(_) => 4,
433 AstarteData::String(_) => 5,
434 AstarteData::BinaryBlob(_) => 6,
435 AstarteData::DateTime(_) => 7,
436 AstarteData::DoubleArray(_) => 8,
437 AstarteData::IntegerArray(_) => 9,
438 AstarteData::BooleanArray(_) => 10,
439 AstarteData::LongIntegerArray(_) => 11,
440 AstarteData::StringArray(_) => 12,
441 AstarteData::BinaryBlobArray(_) => 13,
442 AstarteData::DateTimeArray(_) => 14,
443 };
444
445 Ok(mapping_type)
446}
447
448fn from_stored_type(value: u8) -> Result<MappingType, ValueError> {
449 let mapping_type = match value {
450 1 => MappingType::Double,
451 2 => MappingType::Integer,
452 3 => MappingType::Boolean,
453 4 => MappingType::LongInteger,
454 5 => MappingType::String,
455 6 => MappingType::BinaryBlob,
456 7 => MappingType::DateTime,
457 8 => MappingType::DoubleArray,
458 9 => MappingType::IntegerArray,
459 10 => MappingType::BooleanArray,
460 11 => MappingType::LongIntegerArray,
461 12 => MappingType::StringArray,
462 13 => MappingType::BinaryBlobArray,
463 14 => MappingType::DateTimeArray,
464 0 | 15.. => {
465 return Err(ValueError::StoredType(value));
466 }
467 };
468
469 Ok(mapping_type)
470}
471
472#[derive(Clone, Debug)]
482pub struct SqliteStore {
483 pub(crate) pool: Arc<Connections>,
484}
485
486impl SqliteStore {
487 pub fn options() -> SqliteOptions {
489 SqliteOptions::default()
490 }
491
492 async fn new(db_file: PathBuf, options: SqliteOptions) -> Result<Self, SqliteError> {
494 let sqlite_store = SqliteStore {
495 pool: Arc::new(Connections::new(db_file, options)),
496 };
497
498 sqlite_store.migrate().await?;
499
500 debug!("vacuum the database");
501
502 sqlite_store
503 .pool
504 .acquire_writer(|writer| {
505 writer
506 .execute("PRAGMA incremental_vacuum", ())
507 .map_err(SqliteError::Option)
508 })
509 .await?;
510
511 Ok(sqlite_store)
512 }
513
514 pub async fn with_writable_dir(
529 writable_path: impl AsRef<Path>,
530 options: SqliteOptions,
531 ) -> Result<Self, SqliteError> {
532 let path = writable_path.as_ref();
533
534 if let Err(error) = tokio::fs::create_dir_all(path).await {
535 error!(%error,path = %path.display(), "couldn't create writable path for database");
536 }
537
538 let db = path.join("prop-cache.db");
540
541 Self::new(db, options).await
542 }
543
544 pub async fn with_db_file(
557 database_file: impl AsRef<Path>,
558 options: SqliteOptions,
559 ) -> Result<Self, SqliteError> {
560 Self::new(database_file.as_ref().to_path_buf(), options).await
561 }
562
563 #[instrument(skip(self))]
564 async fn migrate(&self) -> Result<(), SqliteError> {
565 const MIGRATIONS: &[&str] = &[
567 include_query!("migrations/0001_init.sql"),
568 include_query!("migrations/0002_unset_property.sql"),
569 include_query!("migrations/0003_session.sql"),
570 include_query!("migrations/0004_sent_properties.sql"),
571 ];
572 const USER_VERSION: u32 = {
573 assert!(MIGRATIONS.len() < (u32::MAX as usize));
574
575 MIGRATIONS.len() as u32
576 };
577
578 self.pool
579 .acquire_writer(|writer| -> Result<(), SqliteError> {
580 let version: usize = writer
582 .get_pragma::<u32>("user_version")
583 .ok()
584 .and_then(|value| usize::try_from(value).ok())
585 .unwrap_or(0);
586
587 debug!(
588 current = version,
589 migrations = MIGRATIONS.len(),
590 "checking migrations"
591 );
592
593 if version >= MIGRATIONS.len() {
594 info!("no migration to run");
595
596 return Ok(());
597 }
598
599 for migration in &MIGRATIONS[version..] {
600 writer
601 .execute_batch(migration)
602 .map_err(SqliteError::Migration)?;
603 }
604
605 debug!(version = MIGRATIONS.len(), "setting new database version");
606
607 writer.set_pragma("user_version", &USER_VERSION)?;
608
609 info!("store migrated to new version");
610
611 Ok(())
612 })
613 .await?;
614
615 Ok(())
616 }
617}
618
619impl StoreCapabilities for SqliteStore {
620 type Retention = Self;
621 type Session = Self;
622
623 fn get_retention(&self) -> Option<&Self::Retention> {
624 Some(self)
625 }
626
627 fn get_session(&self) -> Option<&Self::Session> {
628 Some(self)
629 }
630}
631
632impl PropertyStore for SqliteStore {
633 type Err = SqliteError;
634
635 async fn store_prop(&self, prop: StoredProp<&str, &AstarteData>) -> Result<(), Self::Err> {
636 trace!(
637 interface = prop.interface,
638 path = prop.path,
639 "storing property",
640 );
641
642 let buf = Payload::new(prop.value)
643 .to_vec()
644 .map_err(ValueError::Encode)?;
645
646 let prop = StoredProp::<String, AstarteData>::from(prop);
647 self.pool
648 .acquire_writer(move |writer| writer.store_prop((&prop).into(), &buf))
649 .await?;
650
651 Ok(())
652 }
653
654 async fn update_state(
655 &self,
656 property: &PropertyMapping<'_>,
657 state: PropertyState,
658 expected: Option<AstarteData>,
659 ) -> Result<bool, Self::Err> {
660 let interface_name = property.interface_name().to_string();
661 let path = property.path().to_string();
662
663 let updated = self
664 .pool
665 .acquire_writer(move |writer| {
666 writer.update_state(&interface_name, &path, expected.as_ref(), state)
667 })
668 .await?;
669
670 Ok(updated > 0)
671 }
672
673 async fn load_prop(
674 &self,
675 property: &PropertyMapping<'_>,
676 ) -> Result<Option<AstarteData>, Self::Err> {
677 let interface_name = property.interface_name().to_string();
678 let path = property.path().to_string();
679
680 let opt_record = self
681 .pool
682 .acquire_reader(move |reader| reader.load_prop(&interface_name, &path))
683 .await?;
684
685 match opt_record {
686 Some(record) => {
687 trace!(
688 interface = property.interface_name(),
689 path = property.path(),
690 "loaded property",
691 );
692
693 if record.interface_major != property.version_major() {
695 error!(
696 "Version mismatch for property {}{} (stored {}, interface {}). Deleting.",
697 property.interface_name(),
698 property.path(),
699 record.interface_major,
700 property.version_major()
701 );
702
703 self.delete_prop(property).await?;
704
705 return Ok(None);
706 }
707
708 record.try_into_value().map_err(SqliteError::Value)
709 }
710 None => Ok(None),
711 }
712 }
713
714 async fn unset_prop(&self, property: &PropertyMapping<'_>) -> Result<(), Self::Err> {
715 let interface_name = property.interface_name().to_string();
716 let path = property.path().to_string();
717 self.pool
718 .acquire_writer(move |writer| writer.unset_prop(&interface_name, &path))
719 .await
720 }
721
722 async fn delete_prop(&self, property: &PropertyMapping<'_>) -> Result<(), Self::Err> {
723 let interface_name = property.interface_name().to_string();
724 let path = property.path().to_string();
725 self.pool
726 .acquire_writer(move |writer| writer.delete_prop(&interface_name, &path))
727 .await
728 }
729
730 async fn delete_expected_prop(
731 &self,
732 property: &PropertyMapping<'_>,
733 expected: Option<AstarteData>,
734 ) -> Result<bool, Self::Err> {
735 let interface_name = property.interface_name().to_string();
736 let path = property.path().to_string();
737
738 let updated = self
739 .pool
740 .acquire_writer(move |writer| {
741 writer.delete_expected_prop(&interface_name, &path, expected.as_ref())
742 })
743 .await?;
744
745 Ok(updated > 0)
746 }
747
748 async fn clear(&self) -> Result<(), Self::Err> {
749 self.pool
750 .acquire_writer(|writer| writer.clear_props())
751 .await
752 }
753
754 async fn load_all_props(&self) -> Result<Vec<StoredProp>, Self::Err> {
755 self.pool
756 .acquire_reader(|reader| reader.load_all_props())
757 .await
758 }
759
760 async fn device_props(&self) -> Result<Vec<StoredProp>, Self::Err> {
761 self.pool
762 .acquire_reader(|reader| reader.props_with_ownership(Ownership::Device))
763 .await
764 }
765
766 async fn server_props(&self) -> Result<Vec<StoredProp>, Self::Err> {
767 self.pool
768 .acquire_reader(|reader| reader.props_with_ownership(Ownership::Server))
769 .await
770 }
771
772 async fn interface_props(&self, interface: &Properties) -> Result<Vec<StoredProp>, Self::Err> {
773 let interface_name = interface.name().to_string();
774
775 self.pool
776 .acquire_reader(move |reader| reader.interface_props(&interface_name))
777 .await
778 }
779
780 async fn delete_interface(&self, interface: &Properties) -> Result<(), Self::Err> {
781 let interface_name = interface.name().to_string();
782
783 self.pool
784 .acquire_writer(move |writer| writer.delete_interface_props(&interface_name))
785 .await
786 }
787
788 async fn device_props_with_unset(
789 &self,
790 state: PropertyState,
791 limit: usize,
792 offset: usize,
793 ) -> Result<Vec<OptStoredProp>, Self::Err> {
794 self.pool
795 .acquire_reader(move |reader| {
796 reader.props_with_unset(Ownership::Device, state, limit, offset)
797 })
798 .await
799 }
800
801 async fn reset_state(&self, ownership: Ownership) -> Result<(), Self::Err> {
802 self.pool
803 .acquire_writer(move |writer| writer.reset_state(ownership))
804 .await
805 }
806}
807
808fn deserialize_prop(stored_type: u8, buf: &[u8]) -> Result<AstarteData, ValueError> {
810 let mapping_type = from_stored_type(stored_type)?;
811
812 let payload = Payload::from_slice(buf).map_err(ValueError::Decode)?;
813 let value = BsonConverter::new(mapping_type, payload.value);
814
815 value.try_into().map_err(ValueError::from)
816}
817
818#[cfg(test)]
819mod tests {
820 use pretty_assertions::assert_eq;
821
822 use super::*;
823 use crate::store::tests::test_property_store;
824
825 #[tokio::test]
826 async fn test_sqlite_store() {
827 let dir = tempfile::tempdir().unwrap();
828
829 let db = SqliteStore::options()
830 .with_writable_dir(dir.path())
831 .await
832 .unwrap();
833
834 test_property_store(db).await;
835 }
836
837 #[tokio::test]
838 async fn multiple_db_per_thread() {
839 let dir1 = tempfile::tempdir().unwrap();
840 let dir2 = tempfile::tempdir().unwrap();
841
842 let db1 = SqliteStore::options()
843 .with_writable_dir(dir1.path())
844 .await
845 .unwrap();
846
847 let test = |store: SqliteStore| async move {
848 let value = AstarteData::Integer(42);
849 let prop = StoredProp {
850 interface: "com.test",
851 path: "/test",
852 value: &value,
853 interface_major: 1,
854 ownership: Ownership::Device,
855 };
856 let prop_interface_data = PropertyMapping::from(&prop);
857
858 store.store_prop(prop).await.unwrap();
859 assert_eq!(
860 store
861 .load_prop(&prop_interface_data)
862 .await
863 .unwrap()
864 .unwrap(),
865 value
866 );
867 };
868
869 (test)(db1).await;
870
871 let db2 = SqliteStore::options()
872 .with_writable_dir(dir2.path())
873 .await
874 .unwrap();
875
876 (test)(db2).await;
877 }
878
879 #[tokio::test]
880 async fn set_max_pages_cannot_shrink() {
881 let dir = tempfile::tempdir().unwrap();
882
883 {
884 let db = SqliteStore::options()
885 .with_writable_dir(dir.path())
886 .await
887 .unwrap();
888
889 let page_size: i64 = db
890 .pool
891 .acquire_writer(|writer| writer.get_pragma("page_size"))
892 .await
893 .unwrap();
894
895 db.store_prop(StoredProp {
896 interface: "interface",
897 path: "/path",
898 value: &AstarteData::BinaryBlob(vec![1; usize::try_from(page_size).unwrap() * 3]),
899 interface_major: 0,
900 ownership: Ownership::Device,
901 })
902 .await
903 .unwrap();
904 }
905
906 let err = SqliteStore::options()
907 .set_max_page_count(NonZero::new(1).unwrap())
908 .with_writable_dir(dir.path())
909 .await
910 .unwrap_err();
911
912 assert!(matches!(
913 err,
914 SqliteError::InvalidMaxSize {
915 ctx,
916 } if ctx == "cannot shrink the database"
917 ));
918 }
919
920 #[tokio::test]
921 async fn store_cannot_exceed_max_pages() {
922 let dir = tempfile::tempdir().unwrap();
923
924 let (page_size, page_count) = {
925 let db = SqliteStore::options()
926 .with_writable_dir(dir.path())
927 .await
928 .unwrap();
929
930 db.pool
931 .acquire_writer(|writer| -> Result<_, SqliteError> {
932 let size = writer.get_pragma::<i64>("page_size")?;
933 let count = writer.get_pragma::<i64>("page_count")?;
934 Ok((size, count))
935 })
936 .await
937 .unwrap()
938 };
939
940 let max_page_count = u32::try_from(page_count)
941 .ok()
942 .and_then(NonZero::new)
943 .unwrap();
944
945 let db = SqliteStore::options()
946 .set_max_page_count(max_page_count)
947 .with_writable_dir(dir.path())
948 .await
949 .unwrap();
950
951 let size = usize::try_from(page_size * page_count + 1).unwrap();
952
953 let err = db
954 .store_prop(StoredProp {
955 interface: "interface",
956 path: "/path",
957 value: &AstarteData::BinaryBlob(vec![1; size]),
958 interface_major: 0,
959 ownership: Ownership::Device,
960 })
961 .await
962 .unwrap_err();
963
964 assert!(matches!(
965 err,
966 SqliteError::Query(err) if err.sqlite_error_code() == Some(rusqlite::ErrorCode::DiskFull)
967 ));
968 }
969
970 #[tokio::test]
971 async fn set_max_pages() {
972 let exp_count = 15;
973 let max = NonZero::new(exp_count).unwrap();
974
975 let dir = tempfile::tempdir().unwrap();
976
977 let db = SqliteStore::options()
978 .set_max_page_count(max)
979 .with_writable_dir(dir.path())
980 .await
981 .unwrap();
982
983 let page_count: u32 = db
984 .pool
985 .acquire_writer(|writer| writer.get_pragma("max_page_count"))
986 .await
987 .unwrap();
988
989 assert_eq!(page_count, exp_count);
990 }
991
992 #[tokio::test]
993 async fn set_db_max_size() {
994 let dir = tempfile::tempdir().unwrap();
995
996 let db = SqliteStore::options()
997 .set_db_max_size(Size::MiB(NonZero::new(4).unwrap()))
998 .with_writable_dir(dir.path())
999 .await
1000 .unwrap();
1001
1002 let max_page_count: u32 = db
1003 .pool
1004 .acquire_writer(|writer| writer.get_pragma("max_page_count"))
1005 .await
1006 .unwrap();
1007
1008 assert_eq!(max_page_count, 1024);
1009 }
1010
1011 #[tokio::test]
1012 async fn set_db_max_size_min() {
1013 let dir = tempfile::tempdir().unwrap();
1014
1015 let size = Size::Kb(NonZero::<u64>::new(1).unwrap());
1020
1021 SqliteStore::options()
1022 .set_db_max_size(size)
1023 .with_writable_dir(dir.path())
1024 .await
1025 .unwrap_err();
1026 }
1027
1028 #[test]
1029 fn size_to_kibibytes_ceil_min() {
1030 let size = Size::Kb(NonZero::<u64>::new(1).unwrap());
1031 assert_eq!(size.to_kibibytes_ceil(), 1);
1032 }
1033
1034 #[tokio::test]
1035 async fn set_journal_size_limit() {
1036 let dir = tempfile::tempdir().unwrap();
1037
1038 let size = Size::MiB(NonZero::<u64>::new(1).unwrap());
1039
1040 let db = SqliteStore::options()
1041 .set_journal_size_limit(size)
1042 .with_writable_dir(dir.path())
1043 .await
1044 .unwrap();
1045
1046 let journal_size: i64 = db
1047 .pool
1048 .acquire_writer(|writer| writer.get_pragma("journal_size_limit"))
1049 .await
1050 .unwrap();
1051 assert_eq!(journal_size, 1024 * 1024);
1052
1053 let wal_autocheckpoint: u32 = db
1054 .pool
1055 .acquire_writer(|writer| writer.get_pragma("wal_autocheckpoint"))
1056 .await
1057 .unwrap();
1058
1059 assert_eq!(wal_autocheckpoint, 25);
1065 }
1066
1067 #[tokio::test]
1068 async fn set_journal_size_limit_min() {
1069 let dir = tempfile::tempdir().unwrap();
1070
1071 let size = Size::Kb(NonZero::<u64>::new(1).unwrap());
1072
1073 let db = SqliteStore::options()
1074 .set_journal_size_limit(size)
1075 .with_writable_dir(dir.path())
1076 .await
1077 .unwrap();
1078
1079 let journal_size: u32 = db
1080 .pool
1081 .acquire_writer(|writer| writer.get_pragma("journal_size_limit"))
1082 .await
1083 .unwrap();
1084
1085 assert_eq!(journal_size, 1000);
1086
1087 let wal_autocheckpoint: u32 = db
1088 .pool
1089 .acquire_writer(|writer| writer.get_pragma("wal_autocheckpoint"))
1090 .await
1091 .unwrap();
1092
1093 assert_eq!(wal_autocheckpoint, 1);
1098 }
1099
1100 #[test]
1101 fn size_to_bytes() {
1102 let size = Size::Kb(NonZero::<u64>::new(1).unwrap());
1103 assert_eq!(size.to_bytes().get(), 1000);
1104
1105 let size = Size::Mb(NonZero::<u64>::new(1).unwrap());
1106 assert_eq!(size.to_bytes().get(), 1000 * 1000);
1107
1108 let size = Size::Gb(NonZero::<u64>::new(1).unwrap());
1109 assert_eq!(size.to_bytes().get(), 1000 * 1000 * 1000);
1110 }
1111
1112 #[test]
1113 fn size_to_kib() {
1114 let size = Size::KiB(NonZero::<u64>::new(1).unwrap());
1115 assert_eq!(size.to_bytes().get(), 1024);
1116
1117 let size = Size::MiB(NonZero::<u64>::new(1).unwrap());
1118 assert_eq!(size.to_bytes().get(), 1024 * 1024);
1119
1120 let size = Size::GiB(NonZero::<u64>::new(1).unwrap());
1121 assert_eq!(size.to_bytes().get(), 1024 * 1024 * 1024);
1122 }
1123
1124 #[test]
1125 fn should_serialize_deserialize_size() {
1126 let expected = r#"{"unit":"gb","value":2}"#;
1127
1128 let size = Size::Gb(2.try_into().unwrap());
1129 let out = serde_json::to_string(&size).unwrap();
1130
1131 let deser_size: Size = serde_json::from_str(&out).unwrap();
1132
1133 assert_eq!(out, expected);
1134 assert_eq!(size, deser_size);
1135 }
1136}