matrix_sdk/event_cache/
deduplicator.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Simple but efficient types to find duplicated events. See [`Deduplicator`]
16//! to learn more.
17
18use std::collections::BTreeSet;
19
20use matrix_sdk_base::{
21    event_cache::store::EventCacheStoreLock,
22    linked_chunk::{LinkedChunkId, Position},
23};
24use ruma::{OwnedEventId, OwnedRoomId};
25
26use super::{
27    room::events::{Event, RoomEvents},
28    EventCacheError,
29};
30
31/// An events deduplication mechanism based on the persistent storage associated
32/// to the event cache.
33///
34/// It will use queries to the persistent storage to figure when events are
35/// duplicates or not, making it entirely stateless.
36pub struct Deduplicator {
37    /// The room this deduplicator applies to.
38    room_id: OwnedRoomId,
39    /// The actual event cache store implementation used to query events.
40    store: EventCacheStoreLock,
41}
42
43impl Deduplicator {
44    /// Create a new instance of a [`StoreDeduplicator`].
45    pub fn new(room_id: OwnedRoomId, store: EventCacheStoreLock) -> Self {
46        Self { room_id, store }
47    }
48
49    /// Find duplicates in the given collection of events, and return both
50    /// valid events (those with an event id) as well as the event ids of
51    /// duplicate events along with their position.
52    pub async fn filter_duplicate_events(
53        &self,
54        mut events: Vec<Event>,
55        room_events: &RoomEvents,
56    ) -> Result<DeduplicationOutcome, EventCacheError> {
57        // Remove all events with no ID, or that is duplicated inside `events`, i.e.
58        // `events` contains duplicated events in itself, e.g. `[$e0, $e1, $e0]`, here
59        // `$e0` is duplicated in within `events`.
60        {
61            let mut event_ids = BTreeSet::new();
62
63            events.retain(|event| {
64                let Some(event_id) = event.event_id() else {
65                    // No event ID? Bye bye.
66                    return false;
67                };
68
69                // Already seen this event in `events`? Bye bye.
70                if event_ids.contains(&event_id) {
71                    return false;
72                }
73
74                event_ids.insert(event_id);
75
76                // Let's keep this event!
77                true
78            });
79        }
80
81        let store = self.store.lock().await?;
82
83        // Let the store do its magic ✨
84        let duplicated_event_ids = store
85            .filter_duplicated_events(
86                LinkedChunkId::Room(&self.room_id),
87                events.iter().filter_map(|event| event.event_id()).collect(),
88            )
89            .await?;
90
91        // Separate duplicated events in two collections: ones that are in-memory, ones
92        // that are in the store.
93        let (in_memory_duplicated_event_ids, in_store_duplicated_event_ids) = {
94            // Collect all in-memory chunk identifiers.
95            let in_memory_chunk_identifiers =
96                room_events.chunks().map(|chunk| chunk.identifier()).collect::<Vec<_>>();
97
98            let mut in_memory = vec![];
99            let mut in_store = vec![];
100
101            for (duplicated_event_id, position) in duplicated_event_ids {
102                if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) {
103                    in_memory.push((duplicated_event_id, position));
104                } else {
105                    in_store.push((duplicated_event_id, position));
106                }
107            }
108
109            (in_memory, in_store)
110        };
111
112        Ok(DeduplicationOutcome {
113            all_events: events,
114            in_memory_duplicated_event_ids,
115            in_store_duplicated_event_ids,
116        })
117    }
118}
119
120pub(super) struct DeduplicationOutcome {
121    /// All events passed to the deduplicator.
122    ///
123    /// All events in this collection have a valid event ID.
124    ///
125    /// This collection does not contain duplicated events in itself.
126    pub all_events: Vec<Event>,
127
128    /// Events in [`Self::all_events`] that are duplicated and present in
129    /// memory. It means they have been loaded from the store if any.
130    ///
131    /// Events are sorted by their position, from the newest to the oldest
132    /// (position is descending).
133    pub in_memory_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
134
135    /// Events in [`Self::all_events`] that are duplicated and present in
136    /// the store. It means they have **NOT** been loaded from the store into
137    /// memory yet.
138    ///
139    /// Events are sorted by their position, from the newest to the oldest
140    /// (position is descending).
141    pub in_store_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
142}
143
144#[cfg(test)]
145#[cfg(not(target_family = "wasm"))] // These tests uses the cross-process lock, so need time support.
146mod tests {
147    use matrix_sdk_base::{deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier};
148    use matrix_sdk_test::{async_test, event_factory::EventFactory};
149    use ruma::{owned_event_id, serde::Raw, user_id, EventId};
150
151    use super::*;
152
153    fn timeline_event(event_id: &EventId) -> TimelineEvent {
154        EventFactory::new()
155            .text_msg("")
156            .sender(user_id!("@mnt_io:matrix.org"))
157            .event_id(event_id)
158            .into_event()
159    }
160
161    #[async_test]
162    async fn test_store_based_duplicated_event_ids_from_in_memory_vs_in_store() {
163        use std::sync::Arc;
164
165        use matrix_sdk_base::{
166            event_cache::store::{EventCacheStore, MemoryStore},
167            linked_chunk::Update,
168        };
169        use ruma::room_id;
170
171        let event_id_0 = owned_event_id!("$ev0");
172        let event_id_1 = owned_event_id!("$ev1");
173        let event_id_2 = owned_event_id!("$ev2");
174        let event_id_3 = owned_event_id!("$ev3");
175        let event_id_4 = owned_event_id!("$ev4");
176
177        // `event_0` and `event_1` are in the store.
178        // `event_2` and `event_3` is in the store, but also in memory: it's loaded in
179        // memory from the store.
180        // `event_4` is nowhere, it's new.
181        let event_0 = timeline_event(&event_id_0);
182        let event_1 = timeline_event(&event_id_1);
183        let event_2 = timeline_event(&event_id_2);
184        let event_3 = timeline_event(&event_id_3);
185        let event_4 = timeline_event(&event_id_4);
186
187        let event_cache_store = Arc::new(MemoryStore::new());
188        let room_id = room_id!("!fondue:raclette.ch");
189
190        // Prefill the store with ev1 and ev2.
191        event_cache_store
192            .handle_linked_chunk_updates(
193                LinkedChunkId::Room(room_id),
194                vec![
195                    Update::NewItemsChunk {
196                        previous: None,
197                        new: ChunkIdentifier::new(42),
198                        next: None,
199                    },
200                    Update::PushItems {
201                        at: Position::new(ChunkIdentifier::new(42), 0),
202                        items: vec![event_0.clone(), event_1.clone()],
203                    },
204                    Update::NewItemsChunk {
205                        previous: Some(ChunkIdentifier::new(42)),
206                        new: ChunkIdentifier::new(0), // must match the chunk in `RoomEvents`, so 0. It simulates a lazy-load for example.
207                        next: None,
208                    },
209                    Update::PushItems {
210                        at: Position::new(ChunkIdentifier::new(0), 0),
211                        items: vec![event_2.clone(), event_3.clone()],
212                    },
213                ],
214            )
215            .await
216            .unwrap();
217
218        let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
219
220        let deduplicator = Deduplicator::new(room_id.to_owned(), event_cache_store);
221        let mut room_events = RoomEvents::new();
222        room_events.push_events([event_2.clone(), event_3.clone()]);
223
224        let outcome = deduplicator
225            .filter_duplicate_events(
226                vec![event_0, event_1, event_2, event_3, event_4],
227                &room_events,
228            )
229            .await
230            .unwrap();
231
232        // The deduplication says 5 events are valid.
233        assert_eq!(outcome.all_events.len(), 5);
234        assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0.clone()));
235        assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1.clone()));
236        assert_eq!(outcome.all_events[2].event_id(), Some(event_id_2.clone()));
237        assert_eq!(outcome.all_events[3].event_id(), Some(event_id_3.clone()));
238        assert_eq!(outcome.all_events[4].event_id(), Some(event_id_4.clone()));
239
240        // From these 5 events, 2 are duplicated and have been loaded in memory.
241        //
242        // Note that events are sorted by their descending position.
243        assert_eq!(outcome.in_memory_duplicated_event_ids.len(), 2);
244        assert_eq!(
245            outcome.in_memory_duplicated_event_ids[0],
246            (event_id_2, Position::new(ChunkIdentifier::new(0), 0))
247        );
248        assert_eq!(
249            outcome.in_memory_duplicated_event_ids[1],
250            (event_id_3, Position::new(ChunkIdentifier::new(0), 1))
251        );
252
253        // From these 4 events, 2 are duplicated and live in the store only, they have
254        // not been loaded in memory.
255        //
256        // Note that events are sorted by their descending position.
257        assert_eq!(outcome.in_store_duplicated_event_ids.len(), 2);
258        assert_eq!(
259            outcome.in_store_duplicated_event_ids[0],
260            (event_id_0, Position::new(ChunkIdentifier::new(42), 0))
261        );
262        assert_eq!(
263            outcome.in_store_duplicated_event_ids[1],
264            (event_id_1, Position::new(ChunkIdentifier::new(42), 1))
265        );
266    }
267
268    #[async_test]
269    async fn test_storage_deduplication() {
270        use std::sync::Arc;
271
272        use matrix_sdk_base::{
273            event_cache::store::{EventCacheStore as _, MemoryStore},
274            linked_chunk::{ChunkIdentifier, Position, Update},
275        };
276        use matrix_sdk_test::{ALICE, BOB};
277        use ruma::{event_id, room_id};
278
279        let room_id = room_id!("!galette:saucisse.bzh");
280        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
281
282        let event_cache_store = Arc::new(MemoryStore::new());
283
284        let eid1 = event_id!("$1");
285        let eid2 = event_id!("$2");
286        let eid3 = event_id!("$3");
287
288        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(eid1).into_event();
289        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(eid2).into_event();
290        let ev3 = f.text_msg("wassup").sender(*ALICE).event_id(eid3).into_event();
291        // An invalid event (doesn't have an event id.).
292        let ev4 = TimelineEvent::from_plaintext(Raw::from_json_string("{}".to_owned()).unwrap());
293
294        // Prefill the store with ev1 and ev2.
295        event_cache_store
296            .handle_linked_chunk_updates(
297                LinkedChunkId::Room(room_id),
298                vec![
299                    // Non empty items chunk.
300                    Update::NewItemsChunk {
301                        previous: None,
302                        new: ChunkIdentifier::new(42),
303                        next: None,
304                    },
305                    Update::PushItems {
306                        at: Position::new(ChunkIdentifier::new(42), 0),
307                        items: vec![ev1.clone()],
308                    },
309                    // And another items chunk, non-empty again.
310                    Update::NewItemsChunk {
311                        previous: Some(ChunkIdentifier::new(42)),
312                        new: ChunkIdentifier::new(43),
313                        next: None,
314                    },
315                    Update::PushItems {
316                        at: Position::new(ChunkIdentifier::new(43), 0),
317                        items: vec![ev2.clone()],
318                    },
319                ],
320            )
321            .await
322            .unwrap();
323
324        // Wrap the store into its lock.
325        let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
326
327        let deduplicator = Deduplicator::new(room_id.to_owned(), event_cache_store);
328
329        let room_events = RoomEvents::new();
330        let DeduplicationOutcome {
331            all_events: events,
332            in_memory_duplicated_event_ids,
333            in_store_duplicated_event_ids,
334        } = deduplicator
335            .filter_duplicate_events(vec![ev1, ev2, ev3, ev4], &room_events)
336            .await
337            .unwrap();
338
339        assert_eq!(events.len(), 3);
340        assert_eq!(events[0].event_id().as_deref(), Some(eid1));
341        assert_eq!(events[1].event_id().as_deref(), Some(eid2));
342        assert_eq!(events[2].event_id().as_deref(), Some(eid3));
343
344        assert!(in_memory_duplicated_event_ids.is_empty());
345
346        assert_eq!(in_store_duplicated_event_ids.len(), 2);
347        assert_eq!(
348            in_store_duplicated_event_ids[0],
349            (eid1.to_owned(), Position::new(ChunkIdentifier::new(42), 0))
350        );
351        assert_eq!(
352            in_store_duplicated_event_ids[1],
353            (eid2.to_owned(), Position::new(ChunkIdentifier::new(43), 0))
354        );
355    }
356}