matrix_sdk/event_cache/
deduplicator.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Simple but efficient types to find duplicated events. See [`Deduplicator`]
16//! to learn more.
17
18use std::collections::BTreeSet;
19
20use matrix_sdk_base::{
21    event_cache::store::EventCacheStoreLockGuard,
22    linked_chunk::{LinkedChunkId, Position},
23};
24use ruma::OwnedEventId;
25
26use super::{
27    EventCacheError,
28    room::events::{Event, EventLinkedChunk},
29};
30
31/// Find duplicates in the given collection of new events, and return relevant
32/// information about the duplicates found in the new events, including the
33/// events that are not loaded in memory.
34pub async fn filter_duplicate_events(
35    store_guard: &EventCacheStoreLockGuard,
36    linked_chunk_id: LinkedChunkId<'_>,
37    linked_chunk: &EventLinkedChunk,
38    mut new_events: Vec<Event>,
39) -> Result<DeduplicationOutcome, EventCacheError> {
40    // Remove all events with no ID, or that are duplicated among the new events,
41    // i.e. `new_events` contains duplicated events in itself (e.g. `[$e0, $e1,
42    // $e0]`, here `$e0` is duplicated).
43    {
44        let mut event_ids = BTreeSet::new();
45
46        new_events.retain(|event| {
47            // Only keep events with IDs, and those for which `insert` returns `true`
48            // (meaning they were not in the set).
49            event.event_id().is_some_and(|event_id| event_ids.insert(event_id))
50        });
51    }
52
53    // Let the store do its magic ✨
54    let duplicated_event_ids = store_guard
55        .filter_duplicated_events(
56            linked_chunk_id,
57            new_events.iter().filter_map(|event| event.event_id()).collect(),
58        )
59        .await?;
60
61    // Separate duplicated events in two collections: ones that are in-memory, ones
62    // that are in the store.
63    let (in_memory_duplicated_event_ids, in_store_duplicated_event_ids) = {
64        // Collect all in-memory chunk identifiers.
65        let in_memory_chunk_identifiers =
66            linked_chunk.chunks().map(|chunk| chunk.identifier()).collect::<Vec<_>>();
67
68        let mut in_memory = vec![];
69        let mut in_store = vec![];
70
71        for (duplicated_event_id, position) in duplicated_event_ids {
72            if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) {
73                in_memory.push((duplicated_event_id, position));
74            } else {
75                in_store.push((duplicated_event_id, position));
76            }
77        }
78
79        (in_memory, in_store)
80    };
81
82    let at_least_one_event = !new_events.is_empty();
83    let all_duplicates = (in_memory_duplicated_event_ids.len()
84        + in_store_duplicated_event_ids.len())
85        == new_events.len();
86    let non_empty_all_duplicates = at_least_one_event && all_duplicates;
87
88    Ok(DeduplicationOutcome {
89        all_events: new_events,
90        in_memory_duplicated_event_ids,
91        in_store_duplicated_event_ids,
92        non_empty_all_duplicates,
93    })
94}
95
96pub(super) struct DeduplicationOutcome {
97    /// All events passed to the deduplicator.
98    ///
99    /// All events in this collection have a valid event ID.
100    ///
101    /// This collection does not contain duplicated events in itself.
102    pub all_events: Vec<Event>,
103
104    /// Events in [`Self::all_events`] that are duplicated and present in
105    /// memory. It means they have been loaded from the store if any.
106    ///
107    /// Events are sorted by their position, from the newest to the oldest
108    /// (position is descending).
109    pub in_memory_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
110
111    /// Events in [`Self::all_events`] that are duplicated and present in
112    /// the store. It means they have **NOT** been loaded from the store into
113    /// memory yet.
114    ///
115    /// Events are sorted by their position, from the newest to the oldest
116    /// (position is descending).
117    pub in_store_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
118
119    /// Whether there's at least one new event, and all new events are
120    /// duplicate.
121    ///
122    /// This boolean is useful to know whether we need to store a
123    /// previous-batch token (gap) we received from a server-side
124    /// request (sync or back-pagination), or if we should
125    /// *not* store it.
126    ///
127    /// Since there can be empty back-paginations with a previous-batch
128    /// token (that is, they don't contain any events), we need to
129    /// make sure that there is *at least* one new event that has
130    /// been added. Otherwise, we might conclude something wrong
131    /// because a subsequent back-pagination might
132    /// return non-duplicated events.
133    ///
134    /// If we had already seen all the duplicated events that we're trying
135    /// to add, then it would be wasteful to store a previous-batch
136    /// token, or even touch the linked chunk: we would repeat
137    /// back-paginations for events that we have already seen, and
138    /// possibly misplace them. And we should not be missing
139    /// events either: the already-known events would have their own
140    /// previous-batch token (it might already be consumed).
141    pub non_empty_all_duplicates: bool,
142}
143
144#[cfg(test)]
145#[cfg(not(target_family = "wasm"))] // These tests uses the cross-process lock, so need time support.
146mod tests {
147    use std::ops::Not as _;
148
149    use matrix_sdk_base::{
150        deserialized_responses::TimelineEvent, event_cache::store::EventCacheStoreLock,
151        linked_chunk::ChunkIdentifier,
152    };
153    use matrix_sdk_test::{async_test, event_factory::EventFactory};
154    use ruma::{EventId, owned_event_id, serde::Raw, user_id};
155
156    use super::*;
157
158    fn timeline_event(event_id: &EventId) -> TimelineEvent {
159        EventFactory::new()
160            .text_msg("")
161            .sender(user_id!("@mnt_io:matrix.org"))
162            .event_id(event_id)
163            .into_event()
164    }
165
166    #[async_test]
167    async fn test_store_based_duplicated_event_ids_from_in_memory_vs_in_store() {
168        use std::sync::Arc;
169
170        use matrix_sdk_base::{
171            event_cache::store::{EventCacheStore, MemoryStore},
172            linked_chunk::Update,
173        };
174        use ruma::room_id;
175
176        let event_id_0 = owned_event_id!("$ev0");
177        let event_id_1 = owned_event_id!("$ev1");
178        let event_id_2 = owned_event_id!("$ev2");
179        let event_id_3 = owned_event_id!("$ev3");
180        let event_id_4 = owned_event_id!("$ev4");
181
182        // `event_0` and `event_1` are in the store.
183        // `event_2` and `event_3` is in the store, but also in memory: it's loaded in
184        // memory from the store.
185        // `event_4` is nowhere, it's new.
186        let event_0 = timeline_event(&event_id_0);
187        let event_1 = timeline_event(&event_id_1);
188        let event_2 = timeline_event(&event_id_2);
189        let event_3 = timeline_event(&event_id_3);
190        let event_4 = timeline_event(&event_id_4);
191
192        let event_cache_store = Arc::new(MemoryStore::new());
193        let room_id = room_id!("!fondue:raclette.ch");
194
195        // Prefill the store with ev1 and ev2.
196        event_cache_store
197            .handle_linked_chunk_updates(
198                LinkedChunkId::Room(room_id),
199                vec![
200                    Update::NewItemsChunk {
201                        previous: None,
202                        new: ChunkIdentifier::new(42),
203                        next: None,
204                    },
205                    Update::PushItems {
206                        at: Position::new(ChunkIdentifier::new(42), 0),
207                        items: vec![event_0.clone(), event_1.clone()],
208                    },
209                    Update::NewItemsChunk {
210                        previous: Some(ChunkIdentifier::new(42)),
211                        new: ChunkIdentifier::new(0), /* must match the chunk in
212                                                       * `EventLinkedChunk`, so 0. It simulates a
213                                                       * lazy-load for example. */
214                        next: None,
215                    },
216                    Update::PushItems {
217                        at: Position::new(ChunkIdentifier::new(0), 0),
218                        items: vec![event_2.clone(), event_3.clone()],
219                    },
220                ],
221            )
222            .await
223            .unwrap();
224
225        let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
226        let event_cache_store = event_cache_store.lock().await.unwrap();
227        let event_cache_store_guard = event_cache_store.as_clean().unwrap();
228
229        {
230            // When presenting with only duplicate events, some of them in the in-memory
231            // chunk, all of them in the store, we should return all of them as
232            // duplicates.
233
234            let mut linked_chunk = EventLinkedChunk::new();
235            linked_chunk.push_events([event_1.clone(), event_2.clone(), event_3.clone()]);
236
237            let outcome = filter_duplicate_events(
238                event_cache_store_guard,
239                LinkedChunkId::Room(room_id),
240                &linked_chunk,
241                vec![event_0.clone(), event_1.clone(), event_2.clone(), event_3.clone()],
242            )
243            .await
244            .unwrap();
245
246            assert!(outcome.non_empty_all_duplicates);
247        }
248
249        let mut linked_chunk = EventLinkedChunk::new();
250        linked_chunk.push_events([event_2.clone(), event_3.clone()]);
251
252        let outcome = filter_duplicate_events(
253            event_cache_store_guard,
254            LinkedChunkId::Room(room_id),
255            &linked_chunk,
256            vec![event_0, event_1, event_2, event_3, event_4],
257        )
258        .await
259        .unwrap();
260
261        assert!(outcome.non_empty_all_duplicates.not());
262
263        // The deduplication says 5 events are valid.
264        assert_eq!(outcome.all_events.len(), 5);
265        assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0.clone()));
266        assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1.clone()));
267        assert_eq!(outcome.all_events[2].event_id(), Some(event_id_2.clone()));
268        assert_eq!(outcome.all_events[3].event_id(), Some(event_id_3.clone()));
269        assert_eq!(outcome.all_events[4].event_id(), Some(event_id_4.clone()));
270
271        // From these 5 events, 2 are duplicated and have been loaded in memory.
272        //
273        // Note that events are sorted by their descending position.
274        assert_eq!(outcome.in_memory_duplicated_event_ids.len(), 2);
275        assert_eq!(
276            outcome.in_memory_duplicated_event_ids[0],
277            (event_id_2, Position::new(ChunkIdentifier::new(0), 0))
278        );
279        assert_eq!(
280            outcome.in_memory_duplicated_event_ids[1],
281            (event_id_3, Position::new(ChunkIdentifier::new(0), 1))
282        );
283
284        // From these 4 events, 2 are duplicated and live in the store only, they have
285        // not been loaded in memory.
286        //
287        // Note that events are sorted by their descending position.
288        assert_eq!(outcome.in_store_duplicated_event_ids.len(), 2);
289        assert_eq!(
290            outcome.in_store_duplicated_event_ids[0],
291            (event_id_0, Position::new(ChunkIdentifier::new(42), 0))
292        );
293        assert_eq!(
294            outcome.in_store_duplicated_event_ids[1],
295            (event_id_1, Position::new(ChunkIdentifier::new(42), 1))
296        );
297    }
298
299    #[async_test]
300    async fn test_storage_deduplication() {
301        use std::sync::Arc;
302
303        use matrix_sdk_base::{
304            event_cache::store::{EventCacheStore as _, MemoryStore},
305            linked_chunk::{ChunkIdentifier, Position, Update},
306        };
307        use matrix_sdk_test::{ALICE, BOB};
308        use ruma::{event_id, room_id};
309
310        let room_id = room_id!("!galette:saucisse.bzh");
311        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
312
313        let event_cache_store = Arc::new(MemoryStore::new());
314
315        let eid1 = event_id!("$1");
316        let eid2 = event_id!("$2");
317        let eid3 = event_id!("$3");
318
319        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(eid1).into_event();
320        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(eid2).into_event();
321        let ev3 = f.text_msg("wassup").sender(*ALICE).event_id(eid3).into_event();
322        // An invalid event (doesn't have an event id.).
323        let ev4 = TimelineEvent::from_plaintext(Raw::from_json_string("{}".to_owned()).unwrap());
324
325        // Prefill the store with ev1 and ev2.
326        event_cache_store
327            .handle_linked_chunk_updates(
328                LinkedChunkId::Room(room_id),
329                vec![
330                    // Non empty items chunk.
331                    Update::NewItemsChunk {
332                        previous: None,
333                        new: ChunkIdentifier::new(42),
334                        next: None,
335                    },
336                    Update::PushItems {
337                        at: Position::new(ChunkIdentifier::new(42), 0),
338                        items: vec![ev1.clone()],
339                    },
340                    // And another items chunk, non-empty again.
341                    Update::NewItemsChunk {
342                        previous: Some(ChunkIdentifier::new(42)),
343                        new: ChunkIdentifier::new(43),
344                        next: None,
345                    },
346                    Update::PushItems {
347                        at: Position::new(ChunkIdentifier::new(43), 0),
348                        items: vec![ev2.clone()],
349                    },
350                ],
351            )
352            .await
353            .unwrap();
354
355        // Wrap the store into its lock.
356        let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
357        let event_cache_store = event_cache_store.lock().await.unwrap();
358        let event_cache_store_guard = event_cache_store.as_clean().unwrap();
359
360        let linked_chunk = EventLinkedChunk::new();
361
362        let DeduplicationOutcome {
363            all_events: events,
364            in_memory_duplicated_event_ids,
365            in_store_duplicated_event_ids,
366            non_empty_all_duplicates,
367        } = filter_duplicate_events(
368            event_cache_store_guard,
369            LinkedChunkId::Room(room_id),
370            &linked_chunk,
371            vec![ev1, ev2, ev3, ev4],
372        )
373        .await
374        .unwrap();
375
376        assert!(non_empty_all_duplicates.not());
377
378        assert_eq!(events.len(), 3);
379        assert_eq!(events[0].event_id().as_deref(), Some(eid1));
380        assert_eq!(events[1].event_id().as_deref(), Some(eid2));
381        assert_eq!(events[2].event_id().as_deref(), Some(eid3));
382
383        assert!(in_memory_duplicated_event_ids.is_empty());
384
385        assert_eq!(in_store_duplicated_event_ids.len(), 2);
386        assert_eq!(
387            in_store_duplicated_event_ids[0],
388            (eid1.to_owned(), Position::new(ChunkIdentifier::new(42), 0))
389        );
390        assert_eq!(
391            in_store_duplicated_event_ids[1],
392            (eid2.to_owned(), Position::new(ChunkIdentifier::new(43), 0))
393        );
394    }
395}