use std::collections::BTreeSet;
use matrix_sdk_base::{
event_cache::store::EventCacheStoreLockGuard,
linked_chunk::{LinkedChunkId, Position},
};
use ruma::OwnedEventId;
use super::{
EventCacheError,
room::events::{Event, EventLinkedChunk},
};
pub async fn filter_duplicate_events(
store_guard: &EventCacheStoreLockGuard,
linked_chunk_id: LinkedChunkId<'_>,
linked_chunk: &EventLinkedChunk,
mut new_events: Vec<Event>,
) -> Result<DeduplicationOutcome, EventCacheError> {
{
let mut event_ids = BTreeSet::new();
new_events.retain(|event| {
event.event_id().is_some_and(|event_id| event_ids.insert(event_id))
});
}
let duplicated_event_ids = store_guard
.filter_duplicated_events(
linked_chunk_id,
new_events.iter().filter_map(|event| event.event_id()).collect(),
)
.await?;
let (in_memory_duplicated_event_ids, in_store_duplicated_event_ids) = {
let in_memory_chunk_identifiers =
linked_chunk.chunks().map(|chunk| chunk.identifier()).collect::<Vec<_>>();
let mut in_memory = vec![];
let mut in_store = vec![];
for (duplicated_event_id, position) in duplicated_event_ids {
if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) {
in_memory.push((duplicated_event_id, position));
} else {
in_store.push((duplicated_event_id, position));
}
}
(in_memory, in_store)
};
let at_least_one_event = !new_events.is_empty();
let all_duplicates = (in_memory_duplicated_event_ids.len()
+ in_store_duplicated_event_ids.len())
== new_events.len();
let non_empty_all_duplicates = at_least_one_event && all_duplicates;
Ok(DeduplicationOutcome {
all_events: new_events,
in_memory_duplicated_event_ids,
in_store_duplicated_event_ids,
non_empty_all_duplicates,
})
}
pub(super) struct DeduplicationOutcome {
pub all_events: Vec<Event>,
pub in_memory_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
pub in_store_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
pub non_empty_all_duplicates: bool,
}
#[cfg(test)]
#[cfg(not(target_family = "wasm"))] mod tests {
use std::ops::Not as _;
use matrix_sdk_base::{
deserialized_responses::TimelineEvent, event_cache::store::EventCacheStoreLock,
linked_chunk::ChunkIdentifier,
};
use matrix_sdk_test::{async_test, event_factory::EventFactory};
use ruma::{EventId, owned_event_id, serde::Raw, user_id};
use super::*;
fn timeline_event(event_id: &EventId) -> TimelineEvent {
EventFactory::new()
.text_msg("")
.sender(user_id!("@mnt_io:matrix.org"))
.event_id(event_id)
.into_event()
}
#[async_test]
async fn test_store_based_duplicated_event_ids_from_in_memory_vs_in_store() {
use std::sync::Arc;
use matrix_sdk_base::{
event_cache::store::{EventCacheStore, MemoryStore},
linked_chunk::Update,
};
use ruma::room_id;
let event_id_0 = owned_event_id!("$ev0");
let event_id_1 = owned_event_id!("$ev1");
let event_id_2 = owned_event_id!("$ev2");
let event_id_3 = owned_event_id!("$ev3");
let event_id_4 = owned_event_id!("$ev4");
let event_0 = timeline_event(&event_id_0);
let event_1 = timeline_event(&event_id_1);
let event_2 = timeline_event(&event_id_2);
let event_3 = timeline_event(&event_id_3);
let event_4 = timeline_event(&event_id_4);
let event_cache_store = Arc::new(MemoryStore::new());
let room_id = room_id!("!fondue:raclette.ch");
event_cache_store
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(42),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(42), 0),
items: vec![event_0.clone(), event_1.clone()],
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(42)),
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![event_2.clone(), event_3.clone()],
},
],
)
.await
.unwrap();
let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
let event_cache_store = event_cache_store.lock().await.unwrap();
let event_cache_store_guard = event_cache_store.as_clean().unwrap();
{
let mut linked_chunk = EventLinkedChunk::new();
linked_chunk.push_events([event_1.clone(), event_2.clone(), event_3.clone()]);
let outcome = filter_duplicate_events(
event_cache_store_guard,
LinkedChunkId::Room(room_id),
&linked_chunk,
vec![event_0.clone(), event_1.clone(), event_2.clone(), event_3.clone()],
)
.await
.unwrap();
assert!(outcome.non_empty_all_duplicates);
}
let mut linked_chunk = EventLinkedChunk::new();
linked_chunk.push_events([event_2.clone(), event_3.clone()]);
let outcome = filter_duplicate_events(
event_cache_store_guard,
LinkedChunkId::Room(room_id),
&linked_chunk,
vec![event_0, event_1, event_2, event_3, event_4],
)
.await
.unwrap();
assert!(outcome.non_empty_all_duplicates.not());
assert_eq!(outcome.all_events.len(), 5);
assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0.clone()));
assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1.clone()));
assert_eq!(outcome.all_events[2].event_id(), Some(event_id_2.clone()));
assert_eq!(outcome.all_events[3].event_id(), Some(event_id_3.clone()));
assert_eq!(outcome.all_events[4].event_id(), Some(event_id_4.clone()));
assert_eq!(outcome.in_memory_duplicated_event_ids.len(), 2);
assert_eq!(
outcome.in_memory_duplicated_event_ids[0],
(event_id_2, Position::new(ChunkIdentifier::new(0), 0))
);
assert_eq!(
outcome.in_memory_duplicated_event_ids[1],
(event_id_3, Position::new(ChunkIdentifier::new(0), 1))
);
assert_eq!(outcome.in_store_duplicated_event_ids.len(), 2);
assert_eq!(
outcome.in_store_duplicated_event_ids[0],
(event_id_0, Position::new(ChunkIdentifier::new(42), 0))
);
assert_eq!(
outcome.in_store_duplicated_event_ids[1],
(event_id_1, Position::new(ChunkIdentifier::new(42), 1))
);
}
#[async_test]
async fn test_storage_deduplication() {
use std::sync::Arc;
use matrix_sdk_base::{
event_cache::store::{EventCacheStore as _, MemoryStore},
linked_chunk::{ChunkIdentifier, Position, Update},
};
use matrix_sdk_test::{ALICE, BOB};
use ruma::{event_id, room_id};
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
let event_cache_store = Arc::new(MemoryStore::new());
let eid1 = event_id!("$1");
let eid2 = event_id!("$2");
let eid3 = event_id!("$3");
let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(eid1).into_event();
let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(eid2).into_event();
let ev3 = f.text_msg("wassup").sender(*ALICE).event_id(eid3).into_event();
let ev4 = TimelineEvent::from_plaintext(Raw::from_json_string("{}".to_owned()).unwrap());
event_cache_store
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(42),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(42), 0),
items: vec![ev1.clone()],
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(42)),
new: ChunkIdentifier::new(43),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(43), 0),
items: vec![ev2.clone()],
},
],
)
.await
.unwrap();
let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
let event_cache_store = event_cache_store.lock().await.unwrap();
let event_cache_store_guard = event_cache_store.as_clean().unwrap();
let linked_chunk = EventLinkedChunk::new();
let DeduplicationOutcome {
all_events: events,
in_memory_duplicated_event_ids,
in_store_duplicated_event_ids,
non_empty_all_duplicates,
} = filter_duplicate_events(
event_cache_store_guard,
LinkedChunkId::Room(room_id),
&linked_chunk,
vec![ev1, ev2, ev3, ev4],
)
.await
.unwrap();
assert!(non_empty_all_duplicates.not());
assert_eq!(events.len(), 3);
assert_eq!(events[0].event_id().as_deref(), Some(eid1));
assert_eq!(events[1].event_id().as_deref(), Some(eid2));
assert_eq!(events[2].event_id().as_deref(), Some(eid3));
assert!(in_memory_duplicated_event_ids.is_empty());
assert_eq!(in_store_duplicated_event_ids.len(), 2);
assert_eq!(
in_store_duplicated_event_ids[0],
(eid1.to_owned(), Position::new(ChunkIdentifier::new(42), 0))
);
assert_eq!(
in_store_duplicated_event_ids[1],
(eid2.to_owned(), Position::new(ChunkIdentifier::new(43), 0))
);
}
}