1use std::collections::BTreeSet;
19
20use matrix_sdk_base::{
21 event_cache::store::EventCacheStoreLock,
22 linked_chunk::{LinkedChunkId, Position},
23};
24use ruma::{OwnedEventId, OwnedRoomId};
25
26use super::{
27 room::events::{Event, RoomEvents},
28 EventCacheError,
29};
30
31pub struct Deduplicator {
37 room_id: OwnedRoomId,
39 store: EventCacheStoreLock,
41}
42
43impl Deduplicator {
44 pub fn new(room_id: OwnedRoomId, store: EventCacheStoreLock) -> Self {
46 Self { room_id, store }
47 }
48
49 pub async fn filter_duplicate_events(
53 &self,
54 mut events: Vec<Event>,
55 room_events: &RoomEvents,
56 ) -> Result<DeduplicationOutcome, EventCacheError> {
57 {
61 let mut event_ids = BTreeSet::new();
62
63 events.retain(|event| {
64 let Some(event_id) = event.event_id() else {
65 return false;
67 };
68
69 if event_ids.contains(&event_id) {
71 return false;
72 }
73
74 event_ids.insert(event_id);
75
76 true
78 });
79 }
80
81 let store = self.store.lock().await?;
82
83 let duplicated_event_ids = store
85 .filter_duplicated_events(
86 LinkedChunkId::Room(&self.room_id),
87 events.iter().filter_map(|event| event.event_id()).collect(),
88 )
89 .await?;
90
91 let (in_memory_duplicated_event_ids, in_store_duplicated_event_ids) = {
94 let in_memory_chunk_identifiers =
96 room_events.chunks().map(|chunk| chunk.identifier()).collect::<Vec<_>>();
97
98 let mut in_memory = vec![];
99 let mut in_store = vec![];
100
101 for (duplicated_event_id, position) in duplicated_event_ids {
102 if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) {
103 in_memory.push((duplicated_event_id, position));
104 } else {
105 in_store.push((duplicated_event_id, position));
106 }
107 }
108
109 (in_memory, in_store)
110 };
111
112 Ok(DeduplicationOutcome {
113 all_events: events,
114 in_memory_duplicated_event_ids,
115 in_store_duplicated_event_ids,
116 })
117 }
118}
119
120pub(super) struct DeduplicationOutcome {
121 pub all_events: Vec<Event>,
127
128 pub in_memory_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
134
135 pub in_store_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
142}
143
144#[cfg(test)]
145#[cfg(not(target_family = "wasm"))] mod tests {
147 use matrix_sdk_base::{deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier};
148 use matrix_sdk_test::{async_test, event_factory::EventFactory};
149 use ruma::{owned_event_id, serde::Raw, user_id, EventId};
150
151 use super::*;
152
153 fn timeline_event(event_id: &EventId) -> TimelineEvent {
154 EventFactory::new()
155 .text_msg("")
156 .sender(user_id!("@mnt_io:matrix.org"))
157 .event_id(event_id)
158 .into_event()
159 }
160
161 #[async_test]
162 async fn test_store_based_duplicated_event_ids_from_in_memory_vs_in_store() {
163 use std::sync::Arc;
164
165 use matrix_sdk_base::{
166 event_cache::store::{EventCacheStore, MemoryStore},
167 linked_chunk::Update,
168 };
169 use ruma::room_id;
170
171 let event_id_0 = owned_event_id!("$ev0");
172 let event_id_1 = owned_event_id!("$ev1");
173 let event_id_2 = owned_event_id!("$ev2");
174 let event_id_3 = owned_event_id!("$ev3");
175 let event_id_4 = owned_event_id!("$ev4");
176
177 let event_0 = timeline_event(&event_id_0);
182 let event_1 = timeline_event(&event_id_1);
183 let event_2 = timeline_event(&event_id_2);
184 let event_3 = timeline_event(&event_id_3);
185 let event_4 = timeline_event(&event_id_4);
186
187 let event_cache_store = Arc::new(MemoryStore::new());
188 let room_id = room_id!("!fondue:raclette.ch");
189
190 event_cache_store
192 .handle_linked_chunk_updates(
193 LinkedChunkId::Room(room_id),
194 vec![
195 Update::NewItemsChunk {
196 previous: None,
197 new: ChunkIdentifier::new(42),
198 next: None,
199 },
200 Update::PushItems {
201 at: Position::new(ChunkIdentifier::new(42), 0),
202 items: vec![event_0.clone(), event_1.clone()],
203 },
204 Update::NewItemsChunk {
205 previous: Some(ChunkIdentifier::new(42)),
206 new: ChunkIdentifier::new(0), next: None,
208 },
209 Update::PushItems {
210 at: Position::new(ChunkIdentifier::new(0), 0),
211 items: vec![event_2.clone(), event_3.clone()],
212 },
213 ],
214 )
215 .await
216 .unwrap();
217
218 let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
219
220 let deduplicator = Deduplicator::new(room_id.to_owned(), event_cache_store);
221 let mut room_events = RoomEvents::new();
222 room_events.push_events([event_2.clone(), event_3.clone()]);
223
224 let outcome = deduplicator
225 .filter_duplicate_events(
226 vec![event_0, event_1, event_2, event_3, event_4],
227 &room_events,
228 )
229 .await
230 .unwrap();
231
232 assert_eq!(outcome.all_events.len(), 5);
234 assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0.clone()));
235 assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1.clone()));
236 assert_eq!(outcome.all_events[2].event_id(), Some(event_id_2.clone()));
237 assert_eq!(outcome.all_events[3].event_id(), Some(event_id_3.clone()));
238 assert_eq!(outcome.all_events[4].event_id(), Some(event_id_4.clone()));
239
240 assert_eq!(outcome.in_memory_duplicated_event_ids.len(), 2);
244 assert_eq!(
245 outcome.in_memory_duplicated_event_ids[0],
246 (event_id_2, Position::new(ChunkIdentifier::new(0), 0))
247 );
248 assert_eq!(
249 outcome.in_memory_duplicated_event_ids[1],
250 (event_id_3, Position::new(ChunkIdentifier::new(0), 1))
251 );
252
253 assert_eq!(outcome.in_store_duplicated_event_ids.len(), 2);
258 assert_eq!(
259 outcome.in_store_duplicated_event_ids[0],
260 (event_id_0, Position::new(ChunkIdentifier::new(42), 0))
261 );
262 assert_eq!(
263 outcome.in_store_duplicated_event_ids[1],
264 (event_id_1, Position::new(ChunkIdentifier::new(42), 1))
265 );
266 }
267
268 #[async_test]
269 async fn test_storage_deduplication() {
270 use std::sync::Arc;
271
272 use matrix_sdk_base::{
273 event_cache::store::{EventCacheStore as _, MemoryStore},
274 linked_chunk::{ChunkIdentifier, Position, Update},
275 };
276 use matrix_sdk_test::{ALICE, BOB};
277 use ruma::{event_id, room_id};
278
279 let room_id = room_id!("!galette:saucisse.bzh");
280 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
281
282 let event_cache_store = Arc::new(MemoryStore::new());
283
284 let eid1 = event_id!("$1");
285 let eid2 = event_id!("$2");
286 let eid3 = event_id!("$3");
287
288 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(eid1).into_event();
289 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(eid2).into_event();
290 let ev3 = f.text_msg("wassup").sender(*ALICE).event_id(eid3).into_event();
291 let ev4 = TimelineEvent::from_plaintext(Raw::from_json_string("{}".to_owned()).unwrap());
293
294 event_cache_store
296 .handle_linked_chunk_updates(
297 LinkedChunkId::Room(room_id),
298 vec![
299 Update::NewItemsChunk {
301 previous: None,
302 new: ChunkIdentifier::new(42),
303 next: None,
304 },
305 Update::PushItems {
306 at: Position::new(ChunkIdentifier::new(42), 0),
307 items: vec![ev1.clone()],
308 },
309 Update::NewItemsChunk {
311 previous: Some(ChunkIdentifier::new(42)),
312 new: ChunkIdentifier::new(43),
313 next: None,
314 },
315 Update::PushItems {
316 at: Position::new(ChunkIdentifier::new(43), 0),
317 items: vec![ev2.clone()],
318 },
319 ],
320 )
321 .await
322 .unwrap();
323
324 let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
326
327 let deduplicator = Deduplicator::new(room_id.to_owned(), event_cache_store);
328
329 let room_events = RoomEvents::new();
330 let DeduplicationOutcome {
331 all_events: events,
332 in_memory_duplicated_event_ids,
333 in_store_duplicated_event_ids,
334 } = deduplicator
335 .filter_duplicate_events(vec![ev1, ev2, ev3, ev4], &room_events)
336 .await
337 .unwrap();
338
339 assert_eq!(events.len(), 3);
340 assert_eq!(events[0].event_id().as_deref(), Some(eid1));
341 assert_eq!(events[1].event_id().as_deref(), Some(eid2));
342 assert_eq!(events[2].event_id().as_deref(), Some(eid3));
343
344 assert!(in_memory_duplicated_event_ids.is_empty());
345
346 assert_eq!(in_store_duplicated_event_ids.len(), 2);
347 assert_eq!(
348 in_store_duplicated_event_ids[0],
349 (eid1.to_owned(), Position::new(ChunkIdentifier::new(42), 0))
350 );
351 assert_eq!(
352 in_store_duplicated_event_ids[1],
353 (eid2.to_owned(), Position::new(ChunkIdentifier::new(43), 0))
354 );
355 }
356}