use std::{fmt, ops::Deref, str::Utf8Error, sync::Arc};
#[cfg(any(test, feature = "testing"))]
#[macro_use]
pub mod integration_tests;
mod memory_store;
mod traits;
use matrix_sdk_common::cross_process_lock::{
CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard,
MappedCrossProcessLockState, TryLock,
};
pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
use ruma::{OwnedEventId, events::AnySyncTimelineEvent, serde::Raw};
use tracing::trace;
#[cfg(any(test, feature = "testing"))]
pub use self::integration_tests::EventCacheStoreIntegrationTests;
pub use self::{
memory_store::MemoryStore,
traits::{DEFAULT_CHUNK_CAPACITY, DynEventCacheStore, EventCacheStore, IntoEventCacheStore},
};
#[derive(Clone)]
pub struct EventCacheStoreLock {
cross_process_lock: Arc<CrossProcessLock<LockableEventCacheStore>>,
store: Arc<DynEventCacheStore>,
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for EventCacheStoreLock {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("EventCacheStoreLock").finish_non_exhaustive()
}
}
impl EventCacheStoreLock {
pub fn new<S>(store: S, holder: String) -> Self
where
S: IntoEventCacheStore,
{
let store = store.into_event_cache_store();
Self {
cross_process_lock: Arc::new(CrossProcessLock::new(
LockableEventCacheStore(store.clone()),
"default".to_owned(),
holder,
)),
store,
}
}
pub async fn lock(&self) -> Result<EventCacheStoreLockState, CrossProcessLockError> {
let lock_state =
self.cross_process_lock.spin_lock(None).await??.map(|cross_process_lock_guard| {
EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.clone() }
});
Ok(lock_state)
}
}
pub type EventCacheStoreLockState = MappedCrossProcessLockState<EventCacheStoreLockGuard>;
#[derive(Clone)]
pub struct EventCacheStoreLockGuard {
#[allow(unused)]
cross_process_lock_guard: CrossProcessLockGuard,
store: Arc<DynEventCacheStore>,
}
impl EventCacheStoreLockGuard {
pub fn clear_dirty(this: &Self) {
this.cross_process_lock_guard.clear_dirty();
}
pub fn is_dirty(this: &Self) -> bool {
this.cross_process_lock_guard.is_dirty()
}
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for EventCacheStoreLockGuard {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("EventCacheStoreLockGuard").finish_non_exhaustive()
}
}
impl Deref for EventCacheStoreLockGuard {
type Target = DynEventCacheStore;
fn deref(&self) -> &Self::Target {
self.store.as_ref()
}
}
#[derive(Debug, thiserror::Error)]
pub enum EventCacheStoreError {
#[error(transparent)]
Backend(Box<dyn std::error::Error + Send + Sync>),
#[error("The event cache store failed to be unlocked")]
Locked,
#[error("The event cache store is not encrypted but tried to be opened with a passphrase")]
Unencrypted,
#[error("Error encrypting or decrypting data from the event cache store: {0}")]
Encryption(#[from] StoreEncryptionError),
#[error("Error encoding or decoding data from the event cache store: {0}")]
Codec(#[from] Utf8Error),
#[error("Error serializing or deserializing data from the event cache store: {0}")]
Serialization(#[from] serde_json::Error),
#[error(
"The database format of the event cache store changed in an incompatible way, \
current version: {0}, latest version: {1}"
)]
UnsupportedDatabaseVersion(usize, usize),
#[error("The store contains invalid data: {details}")]
InvalidData {
details: String,
},
}
impl EventCacheStoreError {
#[inline]
pub fn backend<E>(error: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
Self::Backend(Box::new(error))
}
}
impl From<EventCacheStoreError> for CrossProcessLockError {
fn from(value: EventCacheStoreError) -> Self {
Self::TryLock(Box::new(value))
}
}
pub type Result<T, E = EventCacheStoreError> = std::result::Result<T, E>;
#[derive(Clone, Debug)]
struct LockableEventCacheStore(Arc<DynEventCacheStore>);
impl TryLock for LockableEventCacheStore {
type LockError = EventCacheStoreError;
async fn try_lock(
&self,
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
}
}
pub fn extract_event_relation(event: &Raw<AnySyncTimelineEvent>) -> Option<(OwnedEventId, String)> {
#[derive(serde::Deserialize)]
struct RelatesTo {
event_id: OwnedEventId,
rel_type: String,
}
#[derive(serde::Deserialize)]
struct EventContent {
#[serde(rename = "m.relates_to")]
rel: Option<RelatesTo>,
}
match event.get_field::<EventContent>("content") {
Ok(event_content) => {
event_content.and_then(|c| c.rel).map(|rel| (rel.event_id, rel.rel_type))
}
Err(err) => {
trace!("when extracting relation data from an event: {err}");
None
}
}
}