1use std::collections::BTreeSet;
19
20use matrix_sdk_base::{
21 event_cache::store::EventCacheStoreLockGuard,
22 linked_chunk::{LinkedChunkId, Position},
23};
24use ruma::OwnedEventId;
25
26use super::{
27 EventCacheError,
28 room::events::{Event, EventLinkedChunk},
29};
30
31pub async fn filter_duplicate_events(
35 store_guard: &EventCacheStoreLockGuard,
36 linked_chunk_id: LinkedChunkId<'_>,
37 linked_chunk: &EventLinkedChunk,
38 mut new_events: Vec<Event>,
39) -> Result<DeduplicationOutcome, EventCacheError> {
40 {
44 let mut event_ids = BTreeSet::new();
45
46 new_events.retain(|event| {
47 event.event_id().is_some_and(|event_id| event_ids.insert(event_id))
50 });
51 }
52
53 let duplicated_event_ids = store_guard
55 .filter_duplicated_events(
56 linked_chunk_id,
57 new_events.iter().filter_map(|event| event.event_id()).collect(),
58 )
59 .await?;
60
61 let (in_memory_duplicated_event_ids, in_store_duplicated_event_ids) = {
64 let in_memory_chunk_identifiers =
66 linked_chunk.chunks().map(|chunk| chunk.identifier()).collect::<Vec<_>>();
67
68 let mut in_memory = vec![];
69 let mut in_store = vec![];
70
71 for (duplicated_event_id, position) in duplicated_event_ids {
72 if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) {
73 in_memory.push((duplicated_event_id, position));
74 } else {
75 in_store.push((duplicated_event_id, position));
76 }
77 }
78
79 (in_memory, in_store)
80 };
81
82 let at_least_one_event = !new_events.is_empty();
83 let all_duplicates = (in_memory_duplicated_event_ids.len()
84 + in_store_duplicated_event_ids.len())
85 == new_events.len();
86 let non_empty_all_duplicates = at_least_one_event && all_duplicates;
87
88 Ok(DeduplicationOutcome {
89 all_events: new_events,
90 in_memory_duplicated_event_ids,
91 in_store_duplicated_event_ids,
92 non_empty_all_duplicates,
93 })
94}
95
96pub(super) struct DeduplicationOutcome {
97 pub all_events: Vec<Event>,
103
104 pub in_memory_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
110
111 pub in_store_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
118
119 pub non_empty_all_duplicates: bool,
142}
143
144#[cfg(test)]
145#[cfg(not(target_family = "wasm"))] mod tests {
147 use std::ops::Not as _;
148
149 use matrix_sdk_base::{
150 deserialized_responses::TimelineEvent, event_cache::store::EventCacheStoreLock,
151 linked_chunk::ChunkIdentifier,
152 };
153 use matrix_sdk_test::{async_test, event_factory::EventFactory};
154 use ruma::{EventId, owned_event_id, serde::Raw, user_id};
155
156 use super::*;
157
158 fn timeline_event(event_id: &EventId) -> TimelineEvent {
159 EventFactory::new()
160 .text_msg("")
161 .sender(user_id!("@mnt_io:matrix.org"))
162 .event_id(event_id)
163 .into_event()
164 }
165
166 #[async_test]
167 async fn test_store_based_duplicated_event_ids_from_in_memory_vs_in_store() {
168 use std::sync::Arc;
169
170 use matrix_sdk_base::{
171 event_cache::store::{EventCacheStore, MemoryStore},
172 linked_chunk::Update,
173 };
174 use ruma::room_id;
175
176 let event_id_0 = owned_event_id!("$ev0");
177 let event_id_1 = owned_event_id!("$ev1");
178 let event_id_2 = owned_event_id!("$ev2");
179 let event_id_3 = owned_event_id!("$ev3");
180 let event_id_4 = owned_event_id!("$ev4");
181
182 let event_0 = timeline_event(&event_id_0);
187 let event_1 = timeline_event(&event_id_1);
188 let event_2 = timeline_event(&event_id_2);
189 let event_3 = timeline_event(&event_id_3);
190 let event_4 = timeline_event(&event_id_4);
191
192 let event_cache_store = Arc::new(MemoryStore::new());
193 let room_id = room_id!("!fondue:raclette.ch");
194
195 event_cache_store
197 .handle_linked_chunk_updates(
198 LinkedChunkId::Room(room_id),
199 vec![
200 Update::NewItemsChunk {
201 previous: None,
202 new: ChunkIdentifier::new(42),
203 next: None,
204 },
205 Update::PushItems {
206 at: Position::new(ChunkIdentifier::new(42), 0),
207 items: vec![event_0.clone(), event_1.clone()],
208 },
209 Update::NewItemsChunk {
210 previous: Some(ChunkIdentifier::new(42)),
211 new: ChunkIdentifier::new(0), next: None,
215 },
216 Update::PushItems {
217 at: Position::new(ChunkIdentifier::new(0), 0),
218 items: vec![event_2.clone(), event_3.clone()],
219 },
220 ],
221 )
222 .await
223 .unwrap();
224
225 let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
226 let event_cache_store = event_cache_store.lock().await.unwrap();
227 let event_cache_store_guard = event_cache_store.as_clean().unwrap();
228
229 {
230 let mut linked_chunk = EventLinkedChunk::new();
235 linked_chunk.push_events([event_1.clone(), event_2.clone(), event_3.clone()]);
236
237 let outcome = filter_duplicate_events(
238 event_cache_store_guard,
239 LinkedChunkId::Room(room_id),
240 &linked_chunk,
241 vec![event_0.clone(), event_1.clone(), event_2.clone(), event_3.clone()],
242 )
243 .await
244 .unwrap();
245
246 assert!(outcome.non_empty_all_duplicates);
247 }
248
249 let mut linked_chunk = EventLinkedChunk::new();
250 linked_chunk.push_events([event_2.clone(), event_3.clone()]);
251
252 let outcome = filter_duplicate_events(
253 event_cache_store_guard,
254 LinkedChunkId::Room(room_id),
255 &linked_chunk,
256 vec![event_0, event_1, event_2, event_3, event_4],
257 )
258 .await
259 .unwrap();
260
261 assert!(outcome.non_empty_all_duplicates.not());
262
263 assert_eq!(outcome.all_events.len(), 5);
265 assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0.clone()));
266 assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1.clone()));
267 assert_eq!(outcome.all_events[2].event_id(), Some(event_id_2.clone()));
268 assert_eq!(outcome.all_events[3].event_id(), Some(event_id_3.clone()));
269 assert_eq!(outcome.all_events[4].event_id(), Some(event_id_4.clone()));
270
271 assert_eq!(outcome.in_memory_duplicated_event_ids.len(), 2);
275 assert_eq!(
276 outcome.in_memory_duplicated_event_ids[0],
277 (event_id_2, Position::new(ChunkIdentifier::new(0), 0))
278 );
279 assert_eq!(
280 outcome.in_memory_duplicated_event_ids[1],
281 (event_id_3, Position::new(ChunkIdentifier::new(0), 1))
282 );
283
284 assert_eq!(outcome.in_store_duplicated_event_ids.len(), 2);
289 assert_eq!(
290 outcome.in_store_duplicated_event_ids[0],
291 (event_id_0, Position::new(ChunkIdentifier::new(42), 0))
292 );
293 assert_eq!(
294 outcome.in_store_duplicated_event_ids[1],
295 (event_id_1, Position::new(ChunkIdentifier::new(42), 1))
296 );
297 }
298
299 #[async_test]
300 async fn test_storage_deduplication() {
301 use std::sync::Arc;
302
303 use matrix_sdk_base::{
304 event_cache::store::{EventCacheStore as _, MemoryStore},
305 linked_chunk::{ChunkIdentifier, Position, Update},
306 };
307 use matrix_sdk_test::{ALICE, BOB};
308 use ruma::{event_id, room_id};
309
310 let room_id = room_id!("!galette:saucisse.bzh");
311 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
312
313 let event_cache_store = Arc::new(MemoryStore::new());
314
315 let eid1 = event_id!("$1");
316 let eid2 = event_id!("$2");
317 let eid3 = event_id!("$3");
318
319 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(eid1).into_event();
320 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(eid2).into_event();
321 let ev3 = f.text_msg("wassup").sender(*ALICE).event_id(eid3).into_event();
322 let ev4 = TimelineEvent::from_plaintext(Raw::from_json_string("{}".to_owned()).unwrap());
324
325 event_cache_store
327 .handle_linked_chunk_updates(
328 LinkedChunkId::Room(room_id),
329 vec![
330 Update::NewItemsChunk {
332 previous: None,
333 new: ChunkIdentifier::new(42),
334 next: None,
335 },
336 Update::PushItems {
337 at: Position::new(ChunkIdentifier::new(42), 0),
338 items: vec![ev1.clone()],
339 },
340 Update::NewItemsChunk {
342 previous: Some(ChunkIdentifier::new(42)),
343 new: ChunkIdentifier::new(43),
344 next: None,
345 },
346 Update::PushItems {
347 at: Position::new(ChunkIdentifier::new(43), 0),
348 items: vec![ev2.clone()],
349 },
350 ],
351 )
352 .await
353 .unwrap();
354
355 let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
357 let event_cache_store = event_cache_store.lock().await.unwrap();
358 let event_cache_store_guard = event_cache_store.as_clean().unwrap();
359
360 let linked_chunk = EventLinkedChunk::new();
361
362 let DeduplicationOutcome {
363 all_events: events,
364 in_memory_duplicated_event_ids,
365 in_store_duplicated_event_ids,
366 non_empty_all_duplicates,
367 } = filter_duplicate_events(
368 event_cache_store_guard,
369 LinkedChunkId::Room(room_id),
370 &linked_chunk,
371 vec![ev1, ev2, ev3, ev4],
372 )
373 .await
374 .unwrap();
375
376 assert!(non_empty_all_duplicates.not());
377
378 assert_eq!(events.len(), 3);
379 assert_eq!(events[0].event_id().as_deref(), Some(eid1));
380 assert_eq!(events[1].event_id().as_deref(), Some(eid2));
381 assert_eq!(events[2].event_id().as_deref(), Some(eid3));
382
383 assert!(in_memory_duplicated_event_ids.is_empty());
384
385 assert_eq!(in_store_duplicated_event_ids.len(), 2);
386 assert_eq!(
387 in_store_duplicated_event_ids[0],
388 (eid1.to_owned(), Position::new(ChunkIdentifier::new(42), 0))
389 );
390 assert_eq!(
391 in_store_duplicated_event_ids[1],
392 (eid2.to_owned(), Position::new(ChunkIdentifier::new(43), 0))
393 );
394 }
395}