Skip to main content

matrix_sdk/event_cache/
deduplicator.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Simple but efficient types to find duplicated events. See [`Deduplicator`]
16//! to learn more.
17
18use std::collections::BTreeSet;
19
20use matrix_sdk_base::{
21    event_cache::store::EventCacheStoreLockGuard,
22    linked_chunk::{LinkedChunkId, Position},
23};
24use ruma::{OwnedEventId, UserId};
25
26use super::{
27    EventCacheError,
28    caches::event_linked_chunk::{Event, EventLinkedChunk},
29};
30
31/// Find duplicates in the given collection of new events, and return relevant
32/// information about the duplicates found in the new events, including the
33/// events that are not loaded in memory.
34pub async fn filter_duplicate_events(
35    own_user_id: &UserId,
36    store_guard: &EventCacheStoreLockGuard,
37    linked_chunk_id: LinkedChunkId<'_>,
38    linked_chunk: &EventLinkedChunk,
39    mut new_events: Vec<Event>,
40) -> Result<DeduplicationOutcome, EventCacheError> {
41    // Remove all events with no ID, or that are duplicated among the new events,
42    // i.e. `new_events` contains duplicated events in itself (e.g. `[$e0, $e1,
43    // $e0]`, here `$e0` is duplicated).
44    {
45        let mut event_ids = BTreeSet::new();
46
47        new_events.retain(|event| {
48            // Only keep events with IDs, and those for which `insert` returns `true`
49            // (meaning they were not in the set).
50            event.event_id().is_some_and(|event_id| event_ids.insert(event_id))
51        });
52    }
53
54    // Let the store do its magic ✨
55    let duplicated_event_ids = store_guard
56        .filter_duplicated_events(
57            linked_chunk_id,
58            new_events.iter().filter_map(|event| event.event_id()).collect(),
59        )
60        .await?;
61
62    // Separate duplicated events in two collections: ones that are in-memory, ones
63    // that are in the store.
64    let (in_memory_duplicated_event_ids, in_store_duplicated_event_ids) = {
65        // Collect all in-memory chunk identifiers.
66        let in_memory_chunk_identifiers =
67            linked_chunk.chunks().map(|chunk| chunk.identifier()).collect::<Vec<_>>();
68
69        let mut in_memory = vec![];
70        let mut in_store = vec![];
71
72        for (duplicated_event_id, position) in duplicated_event_ids {
73            if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) {
74                in_memory.push((duplicated_event_id, position));
75            } else {
76                in_store.push((duplicated_event_id, position));
77            }
78        }
79
80        (in_memory, in_store)
81    };
82
83    // See comment of `DeduplicationOutcome::non_empty_all_duplicates` for the
84    // rationale behind the following booleans.
85    let at_least_one_event_not_sent_by_me =
86        new_events.iter().any(|ev| ev.sender().is_some_and(|sender| sender != own_user_id));
87
88    let all_duplicates = (in_memory_duplicated_event_ids.len()
89        + in_store_duplicated_event_ids.len())
90        == new_events.len();
91
92    let non_empty_all_duplicates = at_least_one_event_not_sent_by_me && all_duplicates;
93
94    Ok(DeduplicationOutcome {
95        all_events: new_events,
96        in_memory_duplicated_event_ids,
97        in_store_duplicated_event_ids,
98        non_empty_all_duplicates,
99    })
100}
101
102pub(super) struct DeduplicationOutcome {
103    /// All events passed to the deduplicator.
104    ///
105    /// All events in this collection have a valid event ID.
106    ///
107    /// This collection does not contain duplicated events in itself.
108    pub all_events: Vec<Event>,
109
110    /// Events in [`Self::all_events`] that are duplicated and present in
111    /// memory. It means they have been loaded from the store if any.
112    ///
113    /// Events are sorted by their position, from the newest to the oldest
114    /// (position is descending).
115    pub in_memory_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
116
117    /// Events in [`Self::all_events`] that are duplicated and present in
118    /// the store. It means they have **NOT** been loaded from the store into
119    /// memory yet.
120    ///
121    /// Events are sorted by their position, from the newest to the oldest
122    /// (position is descending).
123    pub in_store_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
124
125    /// Whether there's at least one new event sent by some other user, and all
126    /// new events are duplicate.
127    ///
128    /// This boolean is useful to know whether we need to store a previous-batch
129    /// token (gap) we received from a server-side request (sync or
130    /// back-pagination), or if we should *not* store it.
131    ///
132    /// Since there can be empty back-paginations with a previous-batch token
133    /// (that is, they don't contain any events), we need to make sure that
134    /// there is *at least* one new event that has been added. Otherwise, we
135    /// might conclude something wrong because a subsequent back-pagination
136    /// might return non-duplicated events.
137    ///
138    /// Because the send queue inserts sent events in the event cache, we also
139    /// need to make sure that we're *not* considering the user's own
140    /// events. Indeed, there could be a sync response only containing the
141    /// user's own events, that are considered duplicates because the send queue
142    /// inserted them prior to receiving the response. In this case, if the sync
143    /// is gappy, then the previouos-batch token would be incorrectly dropped.
144    ///
145    /// If we had already seen all the duplicated events that we're trying to
146    /// add, then it would be wasteful to store a previous-batch token, or
147    /// even touch the linked chunk: we would repeat back-paginations for
148    /// events that we have already seen, and possibly misplace them. And we
149    /// should not be missing events either: the already-known events would have
150    /// their own previous-batch token (it might already be consumed).
151    pub non_empty_all_duplicates: bool,
152}
153
154#[cfg(test)]
155#[cfg(not(target_family = "wasm"))] // These tests uses the cross-process lock, so need time support.
156mod tests {
157    use std::ops::Not as _;
158
159    use matrix_sdk_base::{
160        deserialized_responses::TimelineEvent, event_cache::store::EventCacheStoreLock,
161        linked_chunk::ChunkIdentifier,
162    };
163    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
164    use matrix_sdk_test::{async_test, event_factory::EventFactory};
165    use ruma::{EventId, owned_event_id, serde::Raw, user_id};
166
167    use super::*;
168
169    fn timeline_event(event_id: &EventId) -> TimelineEvent {
170        EventFactory::new()
171            .text_msg("")
172            .sender(user_id!("@mnt_io:matrix.org"))
173            .event_id(event_id)
174            .into_event()
175    }
176
177    #[async_test]
178    async fn test_store_based_duplicated_event_ids_from_in_memory_vs_in_store() {
179        use std::sync::Arc;
180
181        use matrix_sdk_base::{
182            event_cache::store::{EventCacheStore, MemoryStore},
183            linked_chunk::Update,
184        };
185        use ruma::room_id;
186
187        let user_id = user_id!("@user:example.com");
188        let event_id_0 = owned_event_id!("$ev0");
189        let event_id_1 = owned_event_id!("$ev1");
190        let event_id_2 = owned_event_id!("$ev2");
191        let event_id_3 = owned_event_id!("$ev3");
192        let event_id_4 = owned_event_id!("$ev4");
193
194        // `event_0` and `event_1` are in the store.
195        // `event_2` and `event_3` is in the store, but also in memory: it's loaded in
196        // memory from the store.
197        // `event_4` is nowhere, it's new.
198        let event_0 = timeline_event(&event_id_0);
199        let event_1 = timeline_event(&event_id_1);
200        let event_2 = timeline_event(&event_id_2);
201        let event_3 = timeline_event(&event_id_3);
202        let event_4 = timeline_event(&event_id_4);
203
204        let event_cache_store = Arc::new(MemoryStore::new());
205        let room_id = room_id!("!fondue:raclette.ch");
206
207        // Prefill the store with ev1 and ev2.
208        event_cache_store
209            .handle_linked_chunk_updates(
210                LinkedChunkId::Room(room_id),
211                vec![
212                    Update::NewItemsChunk {
213                        previous: None,
214                        new: ChunkIdentifier::new(42),
215                        next: None,
216                    },
217                    Update::PushItems {
218                        at: Position::new(ChunkIdentifier::new(42), 0),
219                        items: vec![event_0.clone(), event_1.clone()],
220                    },
221                    Update::NewItemsChunk {
222                        previous: Some(ChunkIdentifier::new(42)),
223                        new: ChunkIdentifier::new(0), /* must match the chunk in
224                                                       * `EventLinkedChunk`, so 0. It simulates a
225                                                       * lazy-load for example. */
226                        next: None,
227                    },
228                    Update::PushItems {
229                        at: Position::new(ChunkIdentifier::new(0), 0),
230                        items: vec![event_2.clone(), event_3.clone()],
231                    },
232                ],
233            )
234            .await
235            .unwrap();
236
237        let event_cache_store = EventCacheStoreLock::new(
238            event_cache_store,
239            CrossProcessLockConfig::multi_process("hodor"),
240        );
241        let event_cache_store = event_cache_store.lock().await.unwrap();
242        let event_cache_store_guard = event_cache_store.as_clean().unwrap();
243
244        {
245            // When presenting with only duplicate events, some of them in the in-memory
246            // chunk, all of them in the store, we should return all of them as
247            // duplicates.
248
249            let mut linked_chunk = EventLinkedChunk::new();
250            linked_chunk.push_events([event_1.clone(), event_2.clone(), event_3.clone()]);
251
252            let outcome = filter_duplicate_events(
253                user_id,
254                event_cache_store_guard,
255                LinkedChunkId::Room(room_id),
256                &linked_chunk,
257                vec![event_0.clone(), event_1.clone(), event_2.clone(), event_3.clone()],
258            )
259            .await
260            .unwrap();
261
262            assert!(outcome.non_empty_all_duplicates);
263        }
264
265        let mut linked_chunk = EventLinkedChunk::new();
266        linked_chunk.push_events([event_2.clone(), event_3.clone()]);
267
268        let outcome = filter_duplicate_events(
269            user_id,
270            event_cache_store_guard,
271            LinkedChunkId::Room(room_id),
272            &linked_chunk,
273            vec![event_0, event_1, event_2, event_3, event_4],
274        )
275        .await
276        .unwrap();
277
278        assert!(outcome.non_empty_all_duplicates.not());
279
280        // The deduplication says 5 events are valid.
281        assert_eq!(outcome.all_events.len(), 5);
282        assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0.clone()));
283        assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1.clone()));
284        assert_eq!(outcome.all_events[2].event_id(), Some(event_id_2.clone()));
285        assert_eq!(outcome.all_events[3].event_id(), Some(event_id_3.clone()));
286        assert_eq!(outcome.all_events[4].event_id(), Some(event_id_4.clone()));
287
288        // From these 5 events, 2 are duplicated and have been loaded in memory.
289        //
290        // Note that events are sorted by their descending position.
291        assert_eq!(outcome.in_memory_duplicated_event_ids.len(), 2);
292        assert_eq!(
293            outcome.in_memory_duplicated_event_ids[0],
294            (event_id_2, Position::new(ChunkIdentifier::new(0), 0))
295        );
296        assert_eq!(
297            outcome.in_memory_duplicated_event_ids[1],
298            (event_id_3, Position::new(ChunkIdentifier::new(0), 1))
299        );
300
301        // From these 4 events, 2 are duplicated and live in the store only, they have
302        // not been loaded in memory.
303        //
304        // Note that events are sorted by their descending position.
305        assert_eq!(outcome.in_store_duplicated_event_ids.len(), 2);
306        assert_eq!(
307            outcome.in_store_duplicated_event_ids[0],
308            (event_id_0, Position::new(ChunkIdentifier::new(42), 0))
309        );
310        assert_eq!(
311            outcome.in_store_duplicated_event_ids[1],
312            (event_id_1, Position::new(ChunkIdentifier::new(42), 1))
313        );
314    }
315
316    #[async_test]
317    async fn test_storage_deduplication() {
318        use std::sync::Arc;
319
320        use matrix_sdk_base::{
321            event_cache::store::{EventCacheStore as _, MemoryStore},
322            linked_chunk::{ChunkIdentifier, Position, Update},
323        };
324        use matrix_sdk_test::{ALICE, BOB};
325        use ruma::{event_id, room_id};
326
327        let user_id = user_id!("@user:example.com");
328        let room_id = room_id!("!galette:saucisse.bzh");
329        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
330
331        let event_cache_store = Arc::new(MemoryStore::new());
332
333        let eid1 = event_id!("$1");
334        let eid2 = event_id!("$2");
335        let eid3 = event_id!("$3");
336
337        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(eid1).into_event();
338        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(eid2).into_event();
339        let ev3 = f.text_msg("wassup").sender(*ALICE).event_id(eid3).into_event();
340        // An invalid event (doesn't have an event id.).
341        let ev4 = TimelineEvent::from_plaintext(Raw::from_json_string("{}".to_owned()).unwrap());
342
343        // Prefill the store with ev1 and ev2.
344        event_cache_store
345            .handle_linked_chunk_updates(
346                LinkedChunkId::Room(room_id),
347                vec![
348                    // Non empty items chunk.
349                    Update::NewItemsChunk {
350                        previous: None,
351                        new: ChunkIdentifier::new(42),
352                        next: None,
353                    },
354                    Update::PushItems {
355                        at: Position::new(ChunkIdentifier::new(42), 0),
356                        items: vec![ev1.clone()],
357                    },
358                    // And another items chunk, non-empty again.
359                    Update::NewItemsChunk {
360                        previous: Some(ChunkIdentifier::new(42)),
361                        new: ChunkIdentifier::new(43),
362                        next: None,
363                    },
364                    Update::PushItems {
365                        at: Position::new(ChunkIdentifier::new(43), 0),
366                        items: vec![ev2.clone()],
367                    },
368                ],
369            )
370            .await
371            .unwrap();
372
373        // Wrap the store into its lock.
374        let event_cache_store = EventCacheStoreLock::new(
375            event_cache_store,
376            CrossProcessLockConfig::multi_process("hodor"),
377        );
378        let event_cache_store = event_cache_store.lock().await.unwrap();
379        let event_cache_store_guard = event_cache_store.as_clean().unwrap();
380
381        let linked_chunk = EventLinkedChunk::new();
382
383        let DeduplicationOutcome {
384            all_events: events,
385            in_memory_duplicated_event_ids,
386            in_store_duplicated_event_ids,
387            non_empty_all_duplicates,
388        } = filter_duplicate_events(
389            user_id,
390            event_cache_store_guard,
391            LinkedChunkId::Room(room_id),
392            &linked_chunk,
393            vec![ev1, ev2, ev3, ev4],
394        )
395        .await
396        .unwrap();
397
398        assert!(non_empty_all_duplicates.not());
399
400        assert_eq!(events.len(), 3);
401        assert_eq!(events[0].event_id().as_deref(), Some(eid1));
402        assert_eq!(events[1].event_id().as_deref(), Some(eid2));
403        assert_eq!(events[2].event_id().as_deref(), Some(eid3));
404
405        assert!(in_memory_duplicated_event_ids.is_empty());
406
407        assert_eq!(in_store_duplicated_event_ids.len(), 2);
408        assert_eq!(
409            in_store_duplicated_event_ids[0],
410            (eid1.to_owned(), Position::new(ChunkIdentifier::new(42), 0))
411        );
412        assert_eq!(
413            in_store_duplicated_event_ids[1],
414            (eid2.to_owned(), Position::new(ChunkIdentifier::new(43), 0))
415        );
416    }
417}