mod media_retention_policy;
mod media_service;
mod memory_store;
mod traits;
#[cfg(any(test, feature = "testing"))]
#[macro_use]
pub mod integration_tests;
#[cfg(not(tarpaulin_include))]
use std::fmt;
use std::{ops::Deref, sync::Arc};
use matrix_sdk_common::cross_process_lock::{
CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard,
CrossProcessLockState, TryLock,
};
use matrix_sdk_store_encryption::Error as StoreEncryptionError;
pub use traits::{DynMediaStore, IntoMediaStore, MediaStore, MediaStoreInner};
#[cfg(any(test, feature = "testing"))]
pub use self::integration_tests::{MediaStoreInnerIntegrationTests, MediaStoreIntegrationTests};
pub use self::{
media_retention_policy::MediaRetentionPolicy,
media_service::{IgnoreMediaRetentionPolicy, MediaService},
memory_store::MemoryMediaStore,
};
#[derive(Debug, thiserror::Error)]
pub enum MediaStoreError {
#[error(transparent)]
Backend(Box<dyn std::error::Error + Send + Sync>),
#[error("Error encrypting or decrypting data from the media store: {0}")]
Encryption(#[from] StoreEncryptionError),
#[error("The store contains invalid data: {details}")]
InvalidData {
details: String,
},
#[error("Error serializing or deserializing data from the media store: {0}")]
Serialization(#[from] serde_json::Error),
}
impl MediaStoreError {
#[inline]
pub fn backend<E>(error: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
Self::Backend(Box::new(error))
}
}
impl From<MediaStoreError> for CrossProcessLockError {
fn from(value: MediaStoreError) -> Self {
Self::TryLock(Box::new(value))
}
}
pub type Result<T, E = MediaStoreError> = std::result::Result<T, E>;
#[derive(Clone)]
pub struct MediaStoreLock {
cross_process_lock: Arc<CrossProcessLock<LockableMediaStore>>,
store: Arc<DynMediaStore>,
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for MediaStoreLock {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("MediaStoreLock").finish_non_exhaustive()
}
}
impl MediaStoreLock {
pub fn new<S>(store: S, holder: String) -> Self
where
S: IntoMediaStore,
{
let store = store.into_media_store();
Self {
cross_process_lock: Arc::new(CrossProcessLock::new(
LockableMediaStore(store.clone()),
"default".to_owned(),
holder,
)),
store,
}
}
pub async fn lock(&self) -> Result<MediaStoreLockGuard<'_>, CrossProcessLockError> {
let cross_process_lock_guard = match self.cross_process_lock.spin_lock(None).await?? {
CrossProcessLockState::Clean(guard) => guard,
CrossProcessLockState::Dirty(guard) => {
guard.clear_dirty();
guard
}
};
Ok(MediaStoreLockGuard { cross_process_lock_guard, store: self.store.deref() })
}
}
pub struct MediaStoreLockGuard<'a> {
#[allow(unused)]
cross_process_lock_guard: CrossProcessLockGuard,
store: &'a DynMediaStore,
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for MediaStoreLockGuard<'_> {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("MediaStoreLockGuard").finish_non_exhaustive()
}
}
impl Deref for MediaStoreLockGuard<'_> {
type Target = DynMediaStore;
fn deref(&self) -> &Self::Target {
self.store
}
}
#[derive(Clone, Debug)]
struct LockableMediaStore(Arc<DynMediaStore>);
impl TryLock for LockableMediaStore {
type LockError = MediaStoreError;
async fn try_lock(
&self,
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
}
}