matrix_sdk_sqlite/
media_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`MediaStore`].
16
17use std::{fmt, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use matrix_sdk_base::{
21    cross_process_lock::CrossProcessLockGeneration,
22    media::{
23        store::{
24            IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore,
25            MediaStoreInner,
26        },
27        MediaRequestParameters, UniqueKey,
28    },
29    timer,
30};
31use matrix_sdk_store_encryption::StoreCipher;
32use ruma::{time::SystemTime, MilliSecondsSinceUnixEpoch, MxcUri};
33use rusqlite::{params_from_iter, OptionalExtension};
34use tokio::{
35    fs,
36    sync::{Mutex, OwnedMutexGuard},
37};
38use tracing::{debug, instrument};
39
40use crate::{
41    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
42    error::{Error, Result},
43    utils::{
44        repeat_vars, time_to_timestamp, EncryptableStore, SqliteAsyncConnExt,
45        SqliteKeyValueStoreAsyncConnExt, SqliteKeyValueStoreConnExt, SqliteTransactionExt,
46    },
47    OpenStoreError, Secret, SqliteStoreConfig,
48};
49
50mod keys {
51    // Entries in Key-value store
52    pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
53    pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
54
55    // Tables
56    pub const MEDIA: &str = "media";
57}
58
59/// The database name.
60const DATABASE_NAME: &str = "matrix-sdk-media.sqlite3";
61
62/// Identifier of the latest database version.
63///
64/// This is used to figure whether the SQLite database requires a migration.
65/// Every new SQL migration should imply a bump of this number, and changes in
66/// the [`run_migrations`] function.
67const DATABASE_VERSION: u8 = 2;
68
69/// An SQLite-based media store.
70#[derive(Clone)]
71pub struct SqliteMediaStore {
72    store_cipher: Option<Arc<StoreCipher>>,
73
74    /// The pool of connections.
75    pool: SqlitePool,
76
77    /// We make the difference between connections for read operations, and for
78    /// write operations. We keep a single connection apart from write
79    /// operations. All other connections are used for read operations. The
80    /// lock is used to ensure there is one owner at a time.
81    write_connection: Arc<Mutex<SqliteAsyncConn>>,
82
83    media_service: MediaService,
84}
85
86#[cfg(not(tarpaulin_include))]
87impl fmt::Debug for SqliteMediaStore {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        f.debug_struct("SqliteMediaStore").finish_non_exhaustive()
90    }
91}
92
93impl EncryptableStore for SqliteMediaStore {
94    fn get_cypher(&self) -> Option<&StoreCipher> {
95        self.store_cipher.as_deref()
96    }
97}
98
99impl SqliteMediaStore {
100    /// Open the SQLite-based media store at the given path using the
101    /// given passphrase to encrypt private data.
102    pub async fn open(
103        path: impl AsRef<Path>,
104        passphrase: Option<&str>,
105    ) -> Result<Self, OpenStoreError> {
106        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
107    }
108
109    /// Open the SQLite-based media store at the given path using the given
110    /// key to encrypt private data.
111    pub async fn open_with_key(
112        path: impl AsRef<Path>,
113        key: Option<&[u8; 32]>,
114    ) -> Result<Self, OpenStoreError> {
115        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
116    }
117
118    /// Open the SQLite-based media store with the config open config.
119    #[instrument(skip(config), fields(path = ?config.path))]
120    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
121        debug!(?config);
122
123        let _timer = timer!("open_with_config");
124
125        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
126
127        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
128
129        let this = Self::open_with_pool(pool, config.secret).await?;
130        this.write().await?.apply_runtime_config(config.runtime_config).await?;
131
132        Ok(this)
133    }
134
135    /// Open an SQLite-based media store using the given SQLite database
136    /// pool. The given passphrase will be used to encrypt private data.
137    async fn open_with_pool(
138        pool: SqlitePool,
139        secret: Option<Secret>,
140    ) -> Result<Self, OpenStoreError> {
141        let conn = pool.get().await?;
142
143        let version = conn.db_version().await?;
144        run_migrations(&conn, version).await?;
145
146        let store_cipher = match secret {
147            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
148            None => None,
149        };
150
151        let media_service = MediaService::new();
152        let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
153        let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
154        media_service.restore(media_retention_policy, last_media_cleanup_time);
155
156        Ok(Self {
157            store_cipher,
158            pool,
159            // Use `conn` as our selected write connections.
160            write_connection: Arc::new(Mutex::new(conn)),
161            media_service,
162        })
163    }
164
165    // Acquire a connection for executing read operations.
166    #[instrument(skip_all)]
167    async fn read(&self) -> Result<SqliteAsyncConn> {
168        let connection = self.pool.get().await?;
169
170        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
171        // support must be enabled on a per-connection basis. Execute it every
172        // time we try to get a connection, since we can't guarantee a previous
173        // connection did enable it before.
174        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
175
176        Ok(connection)
177    }
178
179    // Acquire a connection for executing write operations.
180    #[instrument(skip_all)]
181    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
182        let connection = self.write_connection.clone().lock_owned().await;
183
184        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
185        // support must be enabled on a per-connection basis. Execute it every
186        // time we try to get a connection, since we can't guarantee a previous
187        // connection did enable it before.
188        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
189
190        Ok(connection)
191    }
192
193    pub async fn vacuum(&self) -> Result<()> {
194        self.write_connection.lock().await.vacuum().await
195    }
196
197    async fn get_db_size(&self) -> Result<Option<usize>> {
198        Ok(Some(self.pool.get().await?.get_db_size().await?))
199    }
200}
201
202/// Run migrations for the given version of the database.
203async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
204    if version == 0 {
205        debug!("Creating database");
206    } else if version < DATABASE_VERSION {
207        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
208    } else {
209        return Ok(());
210    }
211
212    // Always enable foreign keys for the current connection.
213    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
214
215    if version < 1 {
216        // First turn on WAL mode, this can't be done in the transaction, it fails with
217        // the error message: "cannot change into wal mode from within a transaction".
218        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
219        conn.with_transaction(|txn| {
220            txn.execute_batch(include_str!("../migrations/media_store/001_init.sql"))?;
221            txn.set_db_version(1)
222        })
223        .await?;
224    }
225
226    if version < 2 {
227        conn.with_transaction(|txn| {
228            txn.execute_batch(include_str!(
229                "../migrations/media_store/002_lease_locks_with_generation.sql"
230            ))?;
231            txn.set_db_version(2)
232        })
233        .await?;
234    }
235
236    Ok(())
237}
238
239#[async_trait]
240impl MediaStore for SqliteMediaStore {
241    type Error = Error;
242
243    #[instrument(skip(self))]
244    async fn try_take_leased_lock(
245        &self,
246        lease_duration_ms: u32,
247        key: &str,
248        holder: &str,
249    ) -> Result<Option<CrossProcessLockGeneration>> {
250        let key = key.to_owned();
251        let holder = holder.to_owned();
252
253        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
254        let expiration = now + lease_duration_ms as u64;
255
256        // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html.
257        let generation = self
258            .write()
259            .await?
260            .with_transaction(move |txn| {
261                txn.query_row(
262                    "INSERT INTO lease_locks (key, holder, expiration)
263                    VALUES (?1, ?2, ?3)
264                    ON CONFLICT (key)
265                    DO
266                        UPDATE SET
267                            holder = excluded.holder,
268                            expiration = excluded.expiration,
269                            generation =
270                                CASE holder
271                                    WHEN excluded.holder THEN generation
272                                    ELSE generation + 1
273                                END
274                        WHERE
275                            holder = excluded.holder
276                            OR expiration < ?4
277                    RETURNING generation
278                    ",
279                    (key, holder, expiration, now),
280                    |row| row.get(0),
281                )
282                .optional()
283            })
284            .await?;
285
286        Ok(generation)
287    }
288
289    async fn add_media_content(
290        &self,
291        request: &MediaRequestParameters,
292        content: Vec<u8>,
293        ignore_policy: IgnoreMediaRetentionPolicy,
294    ) -> Result<()> {
295        let _timer = timer!("method");
296
297        self.media_service.add_media_content(self, request, content, ignore_policy).await
298    }
299
300    #[instrument(skip_all)]
301    async fn replace_media_key(
302        &self,
303        from: &MediaRequestParameters,
304        to: &MediaRequestParameters,
305    ) -> Result<(), Self::Error> {
306        let _timer = timer!("method");
307
308        let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
309        let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
310
311        let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
312        let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
313
314        let conn = self.write().await?;
315        conn.execute(
316            r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
317            (new_uri, new_format, prev_uri, prev_format),
318        )
319        .await?;
320
321        Ok(())
322    }
323
324    #[instrument(skip_all)]
325    async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
326        let _timer = timer!("method");
327
328        self.media_service.get_media_content(self, request).await
329    }
330
331    #[instrument(skip_all)]
332    async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
333        let _timer = timer!("method");
334
335        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
336        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
337
338        let conn = self.write().await?;
339        conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
340
341        Ok(())
342    }
343
344    #[instrument(skip(self))]
345    async fn get_media_content_for_uri(
346        &self,
347        uri: &MxcUri,
348    ) -> Result<Option<Vec<u8>>, Self::Error> {
349        let _timer = timer!("method");
350
351        self.media_service.get_media_content_for_uri(self, uri).await
352    }
353
354    #[instrument(skip(self))]
355    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
356        let _timer = timer!("method");
357
358        let uri = self.encode_key(keys::MEDIA, uri);
359
360        let conn = self.write().await?;
361        conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
362
363        Ok(())
364    }
365
366    #[instrument(skip_all)]
367    async fn set_media_retention_policy(
368        &self,
369        policy: MediaRetentionPolicy,
370    ) -> Result<(), Self::Error> {
371        let _timer = timer!("method");
372
373        self.media_service.set_media_retention_policy(self, policy).await
374    }
375
376    #[instrument(skip_all)]
377    fn media_retention_policy(&self) -> MediaRetentionPolicy {
378        let _timer = timer!("method");
379
380        self.media_service.media_retention_policy()
381    }
382
383    #[instrument(skip_all)]
384    async fn set_ignore_media_retention_policy(
385        &self,
386        request: &MediaRequestParameters,
387        ignore_policy: IgnoreMediaRetentionPolicy,
388    ) -> Result<(), Self::Error> {
389        let _timer = timer!("method");
390
391        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
392    }
393
394    #[instrument(skip_all)]
395    async fn clean(&self) -> Result<(), Self::Error> {
396        let _timer = timer!("method");
397
398        self.media_service.clean(self).await
399    }
400
401    async fn optimize(&self) -> Result<(), Self::Error> {
402        Ok(self.vacuum().await?)
403    }
404
405    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
406        self.get_db_size().await
407    }
408}
409
410#[cfg_attr(target_family = "wasm", async_trait(?Send))]
411#[cfg_attr(not(target_family = "wasm"), async_trait)]
412impl MediaStoreInner for SqliteMediaStore {
413    type Error = Error;
414
415    async fn media_retention_policy_inner(
416        &self,
417    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
418        let conn = self.read().await?;
419        conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
420    }
421
422    async fn set_media_retention_policy_inner(
423        &self,
424        policy: MediaRetentionPolicy,
425    ) -> Result<(), Self::Error> {
426        let conn = self.write().await?;
427        conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
428        Ok(())
429    }
430
431    async fn add_media_content_inner(
432        &self,
433        request: &MediaRequestParameters,
434        data: Vec<u8>,
435        last_access: SystemTime,
436        policy: MediaRetentionPolicy,
437        ignore_policy: IgnoreMediaRetentionPolicy,
438    ) -> Result<(), Self::Error> {
439        let ignore_policy = ignore_policy.is_yes();
440        let data = self.encode_value(data)?;
441
442        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
443            return Ok(());
444        }
445
446        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
447        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
448        let timestamp = time_to_timestamp(last_access);
449
450        let conn = self.write().await?;
451        conn.execute(
452            "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
453            (uri, format, data, timestamp, ignore_policy),
454        )
455        .await?;
456
457        Ok(())
458    }
459
460    async fn set_ignore_media_retention_policy_inner(
461        &self,
462        request: &MediaRequestParameters,
463        ignore_policy: IgnoreMediaRetentionPolicy,
464    ) -> Result<(), Self::Error> {
465        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
466        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
467        let ignore_policy = ignore_policy.is_yes();
468
469        let conn = self.write().await?;
470        conn.execute(
471            r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
472            (ignore_policy, uri, format),
473        )
474        .await?;
475
476        Ok(())
477    }
478
479    async fn get_media_content_inner(
480        &self,
481        request: &MediaRequestParameters,
482        current_time: SystemTime,
483    ) -> Result<Option<Vec<u8>>, Self::Error> {
484        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
485        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
486        let timestamp = time_to_timestamp(current_time);
487
488        let conn = self.write().await?;
489        let data = conn
490            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
491                // Update the last access.
492                // We need to do this first so the transaction is in write mode right away.
493                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
494                txn.execute(
495                    "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
496                    (timestamp, &uri, &format),
497                )?;
498
499                txn.query_row::<Vec<u8>, _, _>(
500                    "SELECT data FROM media WHERE uri = ? AND format = ?",
501                    (&uri, &format),
502                    |row| row.get(0),
503                )
504                .optional()
505            })
506            .await?;
507
508        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
509    }
510
511    async fn get_media_content_for_uri_inner(
512        &self,
513        uri: &MxcUri,
514        current_time: SystemTime,
515    ) -> Result<Option<Vec<u8>>, Self::Error> {
516        let uri = self.encode_key(keys::MEDIA, uri);
517        let timestamp = time_to_timestamp(current_time);
518
519        let conn = self.write().await?;
520        let data = conn
521            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
522                // Update the last access.
523                // We need to do this first so the transaction is in write mode right away.
524                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
525                txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
526
527                txn.query_row::<Vec<u8>, _, _>(
528                    "SELECT data FROM media WHERE uri = ?",
529                    (&uri,),
530                    |row| row.get(0),
531                )
532                .optional()
533            })
534            .await?;
535
536        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
537    }
538
539    async fn clean_inner(
540        &self,
541        policy: MediaRetentionPolicy,
542        current_time: SystemTime,
543    ) -> Result<(), Self::Error> {
544        if !policy.has_limitations() {
545            // We can safely skip all the checks.
546            return Ok(());
547        }
548
549        let conn = self.write().await?;
550        let removed = conn
551            .with_transaction::<_, Error, _>(move |txn| {
552                let mut removed = false;
553
554                // First, check media content that exceed the max filesize.
555                if let Some(max_file_size) = policy.computed_max_file_size() {
556                    let count = txn.execute(
557                        "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
558                        (max_file_size,),
559                    )?;
560
561                    if count > 0 {
562                        removed = true;
563                    }
564                }
565
566                // Then, clean up expired media content.
567                if let Some(last_access_expiry) = policy.last_access_expiry {
568                    let current_timestamp = time_to_timestamp(current_time);
569                    let expiry_secs = last_access_expiry.as_secs();
570                    let count = txn.execute(
571                        "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
572                        (current_timestamp, expiry_secs),
573                    )?;
574
575                    if count > 0 {
576                        removed = true;
577                    }
578                }
579
580                // Finally, if the cache size is too big, remove old items until it fits.
581                if let Some(max_cache_size) = policy.max_cache_size {
582                    // i64 is the integer type used by SQLite, use it here to avoid usize overflow
583                    // during the conversion of the result.
584                    let cache_size = txn
585                        .query_row(
586                            "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
587                            (),
588                            |row| {
589                                // `sum()` returns `NULL` if there are no rows.
590                                row.get::<_, Option<u64>>(0)
591                            },
592                        )?
593                        .unwrap_or_default();
594
595                    // If the cache size is overflowing or bigger than max cache size, clean up.
596                    if cache_size > max_cache_size {
597                        // Get the sizes of the media contents ordered by last access.
598                        let mut cached_stmt = txn.prepare_cached(
599                            "SELECT rowid, length(data) FROM media \
600                             WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
601                        )?;
602                        let content_sizes = cached_stmt
603                            .query(())?
604                            .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
605
606                        let mut accumulated_items_size = 0u64;
607                        let mut limit_reached = false;
608                        let mut rows_to_remove = Vec::new();
609
610                        for result in content_sizes {
611                            let (row_id, size) = match result {
612                                Ok(content_size) => content_size,
613                                Err(error) => {
614                                    return Err(error.into());
615                                }
616                            };
617
618                            if limit_reached {
619                                rows_to_remove.push(row_id);
620                                continue;
621                            }
622
623                            match accumulated_items_size.checked_add(size) {
624                                Some(acc) if acc > max_cache_size => {
625                                    // We can stop accumulating.
626                                    limit_reached = true;
627                                    rows_to_remove.push(row_id);
628                                }
629                                Some(acc) => accumulated_items_size = acc,
630                                None => {
631                                    // The accumulated size is overflowing but the setting cannot be
632                                    // bigger than usize::MAX, we can stop accumulating.
633                                    limit_reached = true;
634                                    rows_to_remove.push(row_id);
635                                }
636                            }
637                        }
638
639                        if !rows_to_remove.is_empty() {
640                            removed = true;
641                        }
642
643                        txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
644                            let sql_params = repeat_vars(row_ids.len());
645                            let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
646                            txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
647                            Ok(Vec::<()>::new())
648                        })?;
649                    }
650                }
651
652                txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
653
654                Ok(removed)
655            })
656            .await?;
657
658        // If we removed media, defragment the database and free space on the
659        // filesystem.
660        if removed {
661            conn.vacuum().await?;
662        }
663
664        Ok(())
665    }
666
667    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
668        let conn = self.read().await?;
669        conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
670    }
671}
672
673#[cfg(test)]
674mod tests {
675    use std::{
676        path::PathBuf,
677        sync::atomic::{AtomicU32, Ordering::SeqCst},
678        time::Duration,
679    };
680
681    use matrix_sdk_base::{
682        media::{
683            store::{IgnoreMediaRetentionPolicy, MediaStore, MediaStoreError},
684            MediaFormat, MediaRequestParameters, MediaThumbnailSettings,
685        },
686        media_store_inner_integration_tests, media_store_integration_tests,
687        media_store_integration_tests_time,
688    };
689    use matrix_sdk_test::async_test;
690    use once_cell::sync::Lazy;
691    use ruma::{events::room::MediaSource, media::Method, mxc_uri, uint};
692    use tempfile::{tempdir, TempDir};
693
694    use super::SqliteMediaStore;
695    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
696
697    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
698    static NUM: AtomicU32 = AtomicU32::new(0);
699
700    fn new_media_store_workspace() -> PathBuf {
701        let name = NUM.fetch_add(1, SeqCst).to_string();
702        TMP_DIR.path().join(name)
703    }
704
705    async fn get_media_store() -> Result<SqliteMediaStore, MediaStoreError> {
706        let tmpdir_path = new_media_store_workspace();
707
708        tracing::info!("using media store @ {}", tmpdir_path.to_str().unwrap());
709
710        Ok(SqliteMediaStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
711    }
712
713    media_store_integration_tests!();
714    media_store_integration_tests_time!();
715    media_store_inner_integration_tests!();
716
717    async fn get_media_store_content_sorted_by_last_access(
718        media_store: &SqliteMediaStore,
719    ) -> Vec<Vec<u8>> {
720        let sqlite_db = media_store.read().await.expect("accessing sqlite db failed");
721        sqlite_db
722            .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
723                stmt.query(())?.mapped(|row| row.get(0)).collect()
724            })
725            .await
726            .expect("querying media cache content by last access failed")
727    }
728
729    #[async_test]
730    async fn test_pool_size() {
731        let tmpdir_path = new_media_store_workspace();
732        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
733
734        let store = SqliteMediaStore::open_with_config(store_open_config).await.unwrap();
735
736        assert_eq!(store.pool.status().max_size, 42);
737    }
738
739    #[async_test]
740    async fn test_last_access() {
741        let media_store = get_media_store().await.expect("creating media cache failed");
742        let uri = mxc_uri!("mxc://localhost/media");
743        let file_request = MediaRequestParameters {
744            source: MediaSource::Plain(uri.to_owned()),
745            format: MediaFormat::File,
746        };
747        let thumbnail_request = MediaRequestParameters {
748            source: MediaSource::Plain(uri.to_owned()),
749            format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
750                Method::Crop,
751                uint!(100),
752                uint!(100),
753            )),
754        };
755
756        let content: Vec<u8> = "hello world".into();
757        let thumbnail_content: Vec<u8> = "hello…".into();
758
759        // Add the media.
760        media_store
761            .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
762            .await
763            .expect("adding file failed");
764
765        // Since the precision of the timestamp is in seconds, wait so the timestamps
766        // differ.
767        tokio::time::sleep(Duration::from_secs(3)).await;
768
769        media_store
770            .add_media_content(
771                &thumbnail_request,
772                thumbnail_content.clone(),
773                IgnoreMediaRetentionPolicy::No,
774            )
775            .await
776            .expect("adding thumbnail failed");
777
778        // File's last access is older than thumbnail.
779        let contents = get_media_store_content_sorted_by_last_access(&media_store).await;
780
781        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
782        assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
783        assert_eq!(contents[1], content, "file is not second-to-last access");
784
785        // Since the precision of the timestamp is in seconds, wait so the timestamps
786        // differ.
787        tokio::time::sleep(Duration::from_secs(3)).await;
788
789        // Access the file so its last access is more recent.
790        let _ = media_store
791            .get_media_content(&file_request)
792            .await
793            .expect("getting file failed")
794            .expect("file is missing");
795
796        // File's last access is more recent than thumbnail.
797        let contents = get_media_store_content_sorted_by_last_access(&media_store).await;
798
799        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
800        assert_eq!(contents[0], content, "file is not last access");
801        assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
802    }
803}
804
805#[cfg(test)]
806mod encrypted_tests {
807    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
808
809    use matrix_sdk_base::{
810        media::store::MediaStoreError, media_store_inner_integration_tests,
811        media_store_integration_tests, media_store_integration_tests_time,
812    };
813    use once_cell::sync::Lazy;
814    use tempfile::{tempdir, TempDir};
815
816    use super::SqliteMediaStore;
817
818    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
819    static NUM: AtomicU32 = AtomicU32::new(0);
820
821    async fn get_media_store() -> Result<SqliteMediaStore, MediaStoreError> {
822        let name = NUM.fetch_add(1, SeqCst).to_string();
823        let tmpdir_path = TMP_DIR.path().join(name);
824
825        tracing::info!("using media store @ {}", tmpdir_path.to_str().unwrap());
826
827        Ok(SqliteMediaStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
828            .await
829            .unwrap())
830    }
831
832    media_store_integration_tests!();
833    media_store_integration_tests_time!();
834    media_store_inner_integration_tests!();
835}