1use std::collections::BTreeSet;
19
20use matrix_sdk_base::{
21 event_cache::store::EventCacheStoreLockGuard,
22 linked_chunk::{LinkedChunkId, Position},
23};
24use ruma::{OwnedEventId, UserId};
25
26use super::{
27 EventCacheError,
28 caches::event_linked_chunk::{Event, EventLinkedChunk},
29};
30
31pub async fn filter_duplicate_events(
35 own_user_id: &UserId,
36 store_guard: &EventCacheStoreLockGuard,
37 linked_chunk_id: LinkedChunkId<'_>,
38 linked_chunk: &EventLinkedChunk,
39 mut new_events: Vec<Event>,
40) -> Result<DeduplicationOutcome, EventCacheError> {
41 {
45 let mut event_ids = BTreeSet::new();
46
47 new_events.retain(|event| {
48 event.event_id().is_some_and(|event_id| event_ids.insert(event_id))
51 });
52 }
53
54 let duplicated_event_ids = store_guard
56 .filter_duplicated_events(
57 linked_chunk_id,
58 new_events.iter().filter_map(|event| event.event_id()).collect(),
59 )
60 .await?;
61
62 let (in_memory_duplicated_event_ids, in_store_duplicated_event_ids) = {
65 let in_memory_chunk_identifiers =
67 linked_chunk.chunks().map(|chunk| chunk.identifier()).collect::<Vec<_>>();
68
69 let mut in_memory = vec![];
70 let mut in_store = vec![];
71
72 for (duplicated_event_id, position) in duplicated_event_ids {
73 if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) {
74 in_memory.push((duplicated_event_id, position));
75 } else {
76 in_store.push((duplicated_event_id, position));
77 }
78 }
79
80 (in_memory, in_store)
81 };
82
83 let at_least_one_event_not_sent_by_me =
86 new_events.iter().any(|ev| ev.sender().is_some_and(|sender| sender != own_user_id));
87
88 let all_duplicates = (in_memory_duplicated_event_ids.len()
89 + in_store_duplicated_event_ids.len())
90 == new_events.len();
91
92 let non_empty_all_duplicates = at_least_one_event_not_sent_by_me && all_duplicates;
93
94 Ok(DeduplicationOutcome {
95 all_events: new_events,
96 in_memory_duplicated_event_ids,
97 in_store_duplicated_event_ids,
98 non_empty_all_duplicates,
99 })
100}
101
102pub(super) struct DeduplicationOutcome {
103 pub all_events: Vec<Event>,
109
110 pub in_memory_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
116
117 pub in_store_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
124
125 pub non_empty_all_duplicates: bool,
152}
153
154#[cfg(test)]
155#[cfg(not(target_family = "wasm"))] mod tests {
157 use std::ops::Not as _;
158
159 use matrix_sdk_base::{
160 deserialized_responses::TimelineEvent, event_cache::store::EventCacheStoreLock,
161 linked_chunk::ChunkIdentifier,
162 };
163 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
164 use matrix_sdk_test::{async_test, event_factory::EventFactory};
165 use ruma::{EventId, owned_event_id, serde::Raw, user_id};
166
167 use super::*;
168
169 fn timeline_event(event_id: &EventId) -> TimelineEvent {
170 EventFactory::new()
171 .text_msg("")
172 .sender(user_id!("@mnt_io:matrix.org"))
173 .event_id(event_id)
174 .into_event()
175 }
176
177 #[async_test]
178 async fn test_store_based_duplicated_event_ids_from_in_memory_vs_in_store() {
179 use std::sync::Arc;
180
181 use matrix_sdk_base::{
182 event_cache::store::{EventCacheStore, MemoryStore},
183 linked_chunk::Update,
184 };
185 use ruma::room_id;
186
187 let user_id = user_id!("@user:example.com");
188 let event_id_0 = owned_event_id!("$ev0");
189 let event_id_1 = owned_event_id!("$ev1");
190 let event_id_2 = owned_event_id!("$ev2");
191 let event_id_3 = owned_event_id!("$ev3");
192 let event_id_4 = owned_event_id!("$ev4");
193
194 let event_0 = timeline_event(&event_id_0);
199 let event_1 = timeline_event(&event_id_1);
200 let event_2 = timeline_event(&event_id_2);
201 let event_3 = timeline_event(&event_id_3);
202 let event_4 = timeline_event(&event_id_4);
203
204 let event_cache_store = Arc::new(MemoryStore::new());
205 let room_id = room_id!("!fondue:raclette.ch");
206
207 event_cache_store
209 .handle_linked_chunk_updates(
210 LinkedChunkId::Room(room_id),
211 vec![
212 Update::NewItemsChunk {
213 previous: None,
214 new: ChunkIdentifier::new(42),
215 next: None,
216 },
217 Update::PushItems {
218 at: Position::new(ChunkIdentifier::new(42), 0),
219 items: vec![event_0.clone(), event_1.clone()],
220 },
221 Update::NewItemsChunk {
222 previous: Some(ChunkIdentifier::new(42)),
223 new: ChunkIdentifier::new(0), next: None,
227 },
228 Update::PushItems {
229 at: Position::new(ChunkIdentifier::new(0), 0),
230 items: vec![event_2.clone(), event_3.clone()],
231 },
232 ],
233 )
234 .await
235 .unwrap();
236
237 let event_cache_store = EventCacheStoreLock::new(
238 event_cache_store,
239 CrossProcessLockConfig::multi_process("hodor"),
240 );
241 let event_cache_store = event_cache_store.lock().await.unwrap();
242 let event_cache_store_guard = event_cache_store.as_clean().unwrap();
243
244 {
245 let mut linked_chunk = EventLinkedChunk::new();
250 linked_chunk.push_events([event_1.clone(), event_2.clone(), event_3.clone()]);
251
252 let outcome = filter_duplicate_events(
253 user_id,
254 event_cache_store_guard,
255 LinkedChunkId::Room(room_id),
256 &linked_chunk,
257 vec![event_0.clone(), event_1.clone(), event_2.clone(), event_3.clone()],
258 )
259 .await
260 .unwrap();
261
262 assert!(outcome.non_empty_all_duplicates);
263 }
264
265 let mut linked_chunk = EventLinkedChunk::new();
266 linked_chunk.push_events([event_2.clone(), event_3.clone()]);
267
268 let outcome = filter_duplicate_events(
269 user_id,
270 event_cache_store_guard,
271 LinkedChunkId::Room(room_id),
272 &linked_chunk,
273 vec![event_0, event_1, event_2, event_3, event_4],
274 )
275 .await
276 .unwrap();
277
278 assert!(outcome.non_empty_all_duplicates.not());
279
280 assert_eq!(outcome.all_events.len(), 5);
282 assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0.clone()));
283 assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1.clone()));
284 assert_eq!(outcome.all_events[2].event_id(), Some(event_id_2.clone()));
285 assert_eq!(outcome.all_events[3].event_id(), Some(event_id_3.clone()));
286 assert_eq!(outcome.all_events[4].event_id(), Some(event_id_4.clone()));
287
288 assert_eq!(outcome.in_memory_duplicated_event_ids.len(), 2);
292 assert_eq!(
293 outcome.in_memory_duplicated_event_ids[0],
294 (event_id_2, Position::new(ChunkIdentifier::new(0), 0))
295 );
296 assert_eq!(
297 outcome.in_memory_duplicated_event_ids[1],
298 (event_id_3, Position::new(ChunkIdentifier::new(0), 1))
299 );
300
301 assert_eq!(outcome.in_store_duplicated_event_ids.len(), 2);
306 assert_eq!(
307 outcome.in_store_duplicated_event_ids[0],
308 (event_id_0, Position::new(ChunkIdentifier::new(42), 0))
309 );
310 assert_eq!(
311 outcome.in_store_duplicated_event_ids[1],
312 (event_id_1, Position::new(ChunkIdentifier::new(42), 1))
313 );
314 }
315
316 #[async_test]
317 async fn test_storage_deduplication() {
318 use std::sync::Arc;
319
320 use matrix_sdk_base::{
321 event_cache::store::{EventCacheStore as _, MemoryStore},
322 linked_chunk::{ChunkIdentifier, Position, Update},
323 };
324 use matrix_sdk_test::{ALICE, BOB};
325 use ruma::{event_id, room_id};
326
327 let user_id = user_id!("@user:example.com");
328 let room_id = room_id!("!galette:saucisse.bzh");
329 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
330
331 let event_cache_store = Arc::new(MemoryStore::new());
332
333 let eid1 = event_id!("$1");
334 let eid2 = event_id!("$2");
335 let eid3 = event_id!("$3");
336
337 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(eid1).into_event();
338 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(eid2).into_event();
339 let ev3 = f.text_msg("wassup").sender(*ALICE).event_id(eid3).into_event();
340 let ev4 = TimelineEvent::from_plaintext(Raw::from_json_string("{}".to_owned()).unwrap());
342
343 event_cache_store
345 .handle_linked_chunk_updates(
346 LinkedChunkId::Room(room_id),
347 vec![
348 Update::NewItemsChunk {
350 previous: None,
351 new: ChunkIdentifier::new(42),
352 next: None,
353 },
354 Update::PushItems {
355 at: Position::new(ChunkIdentifier::new(42), 0),
356 items: vec![ev1.clone()],
357 },
358 Update::NewItemsChunk {
360 previous: Some(ChunkIdentifier::new(42)),
361 new: ChunkIdentifier::new(43),
362 next: None,
363 },
364 Update::PushItems {
365 at: Position::new(ChunkIdentifier::new(43), 0),
366 items: vec![ev2.clone()],
367 },
368 ],
369 )
370 .await
371 .unwrap();
372
373 let event_cache_store = EventCacheStoreLock::new(
375 event_cache_store,
376 CrossProcessLockConfig::multi_process("hodor"),
377 );
378 let event_cache_store = event_cache_store.lock().await.unwrap();
379 let event_cache_store_guard = event_cache_store.as_clean().unwrap();
380
381 let linked_chunk = EventLinkedChunk::new();
382
383 let DeduplicationOutcome {
384 all_events: events,
385 in_memory_duplicated_event_ids,
386 in_store_duplicated_event_ids,
387 non_empty_all_duplicates,
388 } = filter_duplicate_events(
389 user_id,
390 event_cache_store_guard,
391 LinkedChunkId::Room(room_id),
392 &linked_chunk,
393 vec![ev1, ev2, ev3, ev4],
394 )
395 .await
396 .unwrap();
397
398 assert!(non_empty_all_duplicates.not());
399
400 assert_eq!(events.len(), 3);
401 assert_eq!(events[0].event_id().as_deref(), Some(eid1));
402 assert_eq!(events[1].event_id().as_deref(), Some(eid2));
403 assert_eq!(events[2].event_id().as_deref(), Some(eid3));
404
405 assert!(in_memory_duplicated_event_ids.is_empty());
406
407 assert_eq!(in_store_duplicated_event_ids.len(), 2);
408 assert_eq!(
409 in_store_duplicated_event_ids[0],
410 (eid1.to_owned(), Position::new(ChunkIdentifier::new(42), 0))
411 );
412 assert_eq!(
413 in_store_duplicated_event_ids[1],
414 (eid2.to_owned(), Position::new(ChunkIdentifier::new(43), 0))
415 );
416 }
417}