loro_internal/utils/
subscription.rs

1use either::Either;
2/*
3This file is modified from the original file in the following repo:
4https://github.com/zed-industries/zed
5
6
7Copyright 2022 - 2024 Zed Industries, Inc.
8
9   Licensed under the Apache License, Version 2.0 (the "License");
10   you may not use this file except in compliance with the License.
11   You may obtain a copy of the License at
12
13
14       http://www.apache.org/licenses/LICENSE-2.0
15
16
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22
23
24
25
26Apache License
27                           Version 2.0, January 2004
28                        http://www.apache.org/licenses/
29
30
31   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
32
33
34   1. Definitions.
35
36
37      "License" shall mean the terms and conditions for use, reproduction,
38      and distribution as defined by Sections 1 through 9 of this document.
39
40
41      "Licensor" shall mean the copyright owner or entity authorized by
42      the copyright owner that is granting the License.
43
44
45      "Legal Entity" shall mean the union of the acting entity and all
46      other entities that control, are controlled by, or are under common
47      control with that entity. For the purposes of this definition,
48      "control" means (i) the power, direct or indirect, to cause the
49      direction or management of such entity, whether by contract or
50      otherwise, or (ii) ownership of fifty percent (50%) or more of the
51      outstanding shares, or (iii) beneficial ownership of such entity.
52
53
54      "You" (or "Your") shall mean an individual or Legal Entity
55      exercising permissions granted by this License.
56
57
58      "Source" form shall mean the preferred form for making modifications,
59      including but not limited to software source code, documentation
60      source, and configuration files.
61
62
63      "Object" form shall mean any form resulting from mechanical
64      transformation or translation of a Source form, including but
65      not limited to compiled object code, generated documentation,
66      and conversions to other media types.
67
68
69      "Work" shall mean the work of authorship, whether in Source or
70      Object form, made available under the License, as indicated by a
71      copyright notice that is included in or attached to the work
72      (an example is provided in the Appendix below).
73
74
75      "Derivative Works" shall mean any work, whether in Source or Object
76      form, that is based on (or derived from) the Work and for which the
77      editorial revisions, annotations, elaborations, or other modifications
78      represent, as a whole, an original work of authorship. For the purposes
79      of this License, Derivative Works shall not include works that remain
80      separable from, or merely link (or bind by name) to the interfaces of,
81      the Work and Derivative Works thereof.
82
83
84      "Contribution" shall mean any work of authorship, including
85      the original version of the Work and any modifications or additions
86      to that Work or Derivative Works thereof, that is intentionally
87      submitted to Licensor for inclusion in the Work by the copyright owner
88      or by an individual or Legal Entity authorized to submit on behalf of
89      the copyright owner. For the purposes of this definition, "submitted"
90      means any form of electronic, verbal, or written communication sent
91      to the Licensor or its representatives, including but not limited to
92      communication on electronic mailing lists, source code control systems,
93      and issue tracking systems that are managed by, or on behalf of, the
94      Licensor for the purpose of discussing and improving the Work, but
95      excluding communication that is conspicuously marked or otherwise
96      designated in writing by the copyright owner as "Not a Contribution."
97
98
99      "Contributor" shall mean Licensor and any individual or Legal Entity
100      on behalf of whom a Contribution has been received by Licensor and
101      subsequently incorporated within the Work.
102
103
104   2. Grant of Copyright License. Subject to the terms and conditions of
105      this License, each Contributor hereby grants to You a perpetual,
106      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
107      copyright license to reproduce, prepare Derivative Works of,
108      publicly display, publicly perform, sublicense, and distribute the
109      Work and such Derivative Works in Source or Object form.
110
111
112   3. Grant of Patent License. Subject to the terms and conditions of
113      this License, each Contributor hereby grants to You a perpetual,
114      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
115      (except as stated in this section) patent license to make, have made,
116      use, offer to sell, sell, import, and otherwise transfer the Work,
117      where such license applies only to those patent claims licensable
118      by such Contributor that are necessarily infringed by their
119      Contribution(s) alone or by combination of their Contribution(s)
120      with the Work to which such Contribution(s) was submitted. If You
121      institute patent litigation against any entity (including a
122      cross-claim or counterclaim in a lawsuit) alleging that the Work
123      or a Contribution incorporated within the Work constitutes direct
124      or contributory patent infringement, then any patent licenses
125      granted to You under this License for that Work shall terminate
126      as of the date such litigation is filed.
127
128
129   4. Redistribution. You may reproduce and distribute copies of the
130      Work or Derivative Works thereof in any medium, with or without
131      modifications, and in Source or Object form, provided that You
132      meet the following conditions:
133
134
135      (a) You must give any other recipients of the Work or
136          Derivative Works a copy of this License; and
137
138
139      (b) You must cause any modified files to carry prominent notices
140          stating that You changed the files; and
141
142
143      (c) You must retain, in the Source form of any Derivative Works
144          that You distribute, all copyright, patent, trademark, and
145          attribution notices from the Source form of the Work,
146          excluding those notices that do not pertain to any part of
147          the Derivative Works; and
148
149
150      (d) If the Work includes a "NOTICE" text file as part of its
151          distribution, then any Derivative Works that You distribute must
152          include a readable copy of the attribution notices contained
153          within such NOTICE file, excluding those notices that do not
154          pertain to any part of the Derivative Works, in at least one
155          of the following places: within a NOTICE text file distributed
156          as part of the Derivative Works; within the Source form or
157          documentation, if provided along with the Derivative Works; or,
158          within a display generated by the Derivative Works, if and
159          wherever such third-party notices normally appear. The contents
160          of the NOTICE file are for informational purposes only and
161          do not modify the License. You may add Your own attribution
162          notices within Derivative Works that You distribute, alongside
163          or as an addendum to the NOTICE text from the Work, provided
164          that such additional attribution notices cannot be construed
165          as modifying the License.
166
167
168      You may add Your own copyright statement to Your modifications and
169      may provide additional or different license terms and conditions
170      for use, reproduction, or distribution of Your modifications, or
171      for any such Derivative Works as a whole, provided Your use,
172      reproduction, and distribution of the Work otherwise complies with
173      the conditions stated in this License.
174
175
176   5. Submission of Contributions. Unless You explicitly state otherwise,
177      any Contribution intentionally submitted for inclusion in the Work
178      by You to the Licensor shall be under the terms and conditions of
179      this License, without any additional terms or conditions.
180      Notwithstanding the above, nothing herein shall supersede or modify
181      the terms of any separate license agreement you may have executed
182      with Licensor regarding such Contributions.
183
184
185   6. Trademarks. This License does not grant permission to use the trade
186      names, trademarks, service marks, or product names of the Licensor,
187      except as required for reasonable and customary use in describing the
188      origin of the Work and reproducing the content of the NOTICE file.
189
190
191   7. Disclaimer of Warranty. Unless required by applicable law or
192      agreed to in writing, Licensor provides the Work (and each
193      Contributor provides its Contributions) on an "AS IS" BASIS,
194      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
195      implied, including, without limitation, any warranties or conditions
196      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
197      PARTICULAR PURPOSE. You are solely responsible for determining the
198      appropriateness of using or redistributing the Work and assume any
199      risks associated with Your exercise of permissions under this License.
200
201
202   8. Limitation of Liability. In no event and under no legal theory,
203      whether in tort (including negligence), contract, or otherwise,
204      unless required by applicable law (such as deliberate and grossly
205      negligent acts) or agreed to in writing, shall any Contributor be
206      liable to You for damages, including any direct, indirect, special,
207      incidental, or consequential damages of any character arising as a
208      result of this License or out of the use or inability to use the
209      Work (including but not limited to damages for loss of goodwill,
210      work stoppage, computer failure or malfunction, or any and all
211      other commercial damages or losses), even if such Contributor
212      has been advised of the possibility of such damages.
213
214
215   9. Accepting Warranty or Additional Liability. While redistributing
216      the Work or Derivative Works thereof, You may choose to offer,
217      and charge a fee for, acceptance of support, warranty, indemnity,
218      or other liability obligations and/or rights consistent with this
219      License. However, in accepting such obligations, You may act only
220      on Your own behalf and on Your sole responsibility, not on behalf
221      of any other Contributor, and only if You agree to indemnify,
222      defend, and hold each Contributor harmless for any liability
223      incurred by, or claims asserted against, such Contributor by reason
224      of your accepting any such warranty or additional liability.
225
226
227   END OF TERMS AND CONDITIONS
228
229*/
230use crate::sync::{thread, AtomicBool, Mutex};
231use smallvec::SmallVec;
232use std::collections::{BTreeMap, BTreeSet};
233use std::sync::atomic::Ordering;
234use std::sync::{Arc, Weak};
235use std::{fmt::Debug, mem};
236use thread::ThreadId;
237
238#[derive(Debug)]
239pub enum SubscriptionError {
240    CannotEmitEventDueToRecursiveCall,
241}
242
243impl std::fmt::Display for Subscription {
244    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245        write!(f, "SubscriptionError")
246    }
247}
248
249pub(crate) struct SubscriberSet<EmitterKey, Callback>(
250    Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
251);
252
253impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
254    fn clone(&self) -> Self {
255        SubscriberSet(self.0.clone())
256    }
257}
258
259struct SubscriberSetState<EmitterKey, Callback> {
260    subscribers: BTreeMap<EmitterKey, Either<BTreeMap<usize, Subscriber<Callback>>, ThreadId>>,
261    dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
262    next_subscriber_id: usize,
263}
264
265struct Subscriber<Callback> {
266    active: Arc<AtomicBool>,
267    callback: Callback,
268    /// This field is used to drop the subscription when the subscriber is dropped.
269    _sub: InnerSubscription,
270}
271
272impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
273where
274    EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
275    Callback: 'static + Send + Sync,
276{
277    pub fn new() -> Self {
278        Self(Arc::new(Mutex::new(SubscriberSetState {
279            subscribers: Default::default(),
280            dropped_subscribers: Default::default(),
281            next_subscriber_id: 0,
282        })))
283    }
284
285    /// Inserts a new [`Subscription`] for the given `emitter_key`. By default, subscriptions
286    /// are inert, meaning that they won't be listed when calling `[SubscriberSet::remove]` or `[SubscriberSet::retain]`.
287    /// This method returns a tuple of a [`Subscription`] and an `impl FnOnce`, and you can use the latter
288    /// to activate the [`Subscription`].
289    pub fn insert(
290        &self,
291        emitter_key: EmitterKey,
292        callback: Callback,
293    ) -> (Subscription, impl FnOnce()) {
294        let active = Arc::new(AtomicBool::new(false));
295        let mut lock = self.0.lock().unwrap();
296        let subscriber_id = post_inc(&mut lock.next_subscriber_id);
297        let this = Arc::downgrade(&self.0);
298        let emitter_key_1 = emitter_key.clone();
299        let inner_sub = InnerSubscription {
300            unsubscribe: Arc::new(Mutex::new(Some(Box::new(move || {
301                let Some(this) = this.upgrade() else {
302                    return;
303                };
304
305                let mut lock = this.lock().unwrap();
306                let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
307                    // remove was called with this emitter_key
308                    return;
309                };
310
311                if let Either::Left(subscribers) = subscribers {
312                    subscribers.remove(&subscriber_id);
313                    if subscribers.is_empty() {
314                        lock.subscribers.remove(&emitter_key);
315                    }
316                    return;
317                }
318
319                // We didn't manage to remove the subscription, which means it was dropped
320                // while invoking the callback. Mark it as dropped so that we can remove it
321                // later.
322                lock.dropped_subscribers
323                    .insert((emitter_key, subscriber_id));
324            })))),
325        };
326        let subscription = Subscription {
327            unsubscribe: Arc::downgrade(&inner_sub.unsubscribe),
328        };
329
330        lock.subscribers
331            .entry(emitter_key_1)
332            .or_insert_with(|| Either::Left(BTreeMap::new()))
333            .as_mut()
334            .unwrap_left()
335            .insert(
336                subscriber_id,
337                Subscriber {
338                    active: active.clone(),
339                    callback,
340                    _sub: inner_sub,
341                },
342            );
343        (subscription, move || active.store(true, Ordering::Relaxed))
344    }
345
346    #[allow(unused)]
347    pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
348        let mut lock = self.0.lock().unwrap();
349        let subscribers = lock.subscribers.remove(emitter);
350        subscribers
351            .and_then(|x| x.left().map(|s| s.into_values()))
352            .into_iter()
353            .flatten()
354            .filter_map(|subscriber| {
355                if subscriber.active.load(Ordering::Relaxed) {
356                    Some(subscriber.callback)
357                } else {
358                    None
359                }
360            })
361    }
362
363    pub fn is_recursive_calling(&self, emitter: &EmitterKey) -> bool {
364        if let Some(Either::Right(thread_id)) = self.0.lock().unwrap().subscribers.get(emitter) {
365            *thread_id == thread::current().id()
366        } else {
367            false
368        }
369    }
370
371    /// Call the given callback for each subscriber to the given emitter.
372    /// If the callback returns false, the subscriber is removed.
373    pub fn retain(
374        &self,
375        emitter: &EmitterKey,
376        f: &mut dyn FnMut(&mut Callback) -> bool,
377    ) -> Result<(), SubscriptionError> {
378        let mut subscribers = {
379            let inner = loop {
380                let mut subscriber_set_state = self.0.lock().unwrap();
381                let Some(set) = subscriber_set_state.subscribers.get_mut(emitter) else {
382                    return Ok(());
383                };
384                match set {
385                    Either::Left(_) => {
386                        break std::mem::replace(set, Either::Right(thread::current().id()))
387                            .unwrap_left();
388                    }
389                    Either::Right(lock_thread) => {
390                        if thread::current().id() == *lock_thread {
391                            return Err(SubscriptionError::CannotEmitEventDueToRecursiveCall);
392                        } else {
393                            // return Ok(());
394                            drop(subscriber_set_state);
395                            #[cfg(loom)]
396                            loom::thread::yield_now();
397                            #[cfg(not(loom))]
398                            std::thread::sleep(std::time::Duration::from_millis(10));
399                        }
400                    }
401                }
402            };
403            inner
404        };
405
406        subscribers.retain(|_, subscriber| {
407            if subscriber.active.load(Ordering::Relaxed) {
408                f(&mut subscriber.callback)
409            } else {
410                true
411            }
412        });
413
414        let mut lock = self.0.lock().unwrap();
415
416        // Add any new subscribers that were added while invoking the callback.
417        if let Some(Either::Left(new_subscribers)) = lock.subscribers.remove(emitter) {
418            subscribers.extend(new_subscribers);
419        }
420
421        // Remove any dropped subscriptions that were dropped while invoking the callback.
422        for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
423            if *emitter == dropped_emitter {
424                subscribers.remove(&dropped_subscription_id);
425            } else {
426                lock.dropped_subscribers
427                    .insert((dropped_emitter, dropped_subscription_id));
428            }
429        }
430
431        lock.subscribers
432            .insert(emitter.clone(), Either::Left(subscribers));
433        Ok(())
434    }
435
436    pub fn is_empty(&self) -> bool {
437        self.0.lock().unwrap().subscribers.is_empty()
438    }
439
440    pub fn may_include(&self, emitter: &EmitterKey) -> bool {
441        self.0.lock().unwrap().subscribers.contains_key(emitter)
442    }
443}
444
445fn post_inc(next_subscriber_id: &mut usize) -> usize {
446    let ans = *next_subscriber_id;
447    *next_subscriber_id += 1;
448    ans
449}
450type Callback = Box<dyn FnOnce() + 'static + Send + Sync>;
451
452/// A handle to a subscription created by GPUI. When dropped, the subscription
453/// is cancelled and the callback will no longer be invoked.
454#[must_use]
455pub struct Subscription {
456    unsubscribe: Weak<Mutex<Option<Callback>>>,
457}
458
459impl std::fmt::Debug for Subscription {
460    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
461        f.debug_struct("Subscription").finish()
462    }
463}
464
465impl Subscription {
466    /// Detaches the subscription from this handle. The callback will
467    /// continue to be invoked until the doc has been subscribed to
468    /// are dropped
469    pub fn detach(self) {
470        if let Some(unsubscribe) = self.unsubscribe.upgrade() {
471            unsubscribe.lock().unwrap().take();
472        }
473    }
474
475    /// Unsubscribes the subscription.
476    #[inline]
477    pub fn unsubscribe(self) {
478        drop(self)
479    }
480}
481
482impl Drop for Subscription {
483    fn drop(&mut self) {
484        if let Some(unsubscribe) = self.unsubscribe.upgrade() {
485            let unsubscribe = unsubscribe.lock().unwrap().take();
486            if let Some(unsubscribe) = unsubscribe {
487                unsubscribe();
488            }
489        }
490    }
491}
492
493struct InnerSubscription {
494    unsubscribe: Arc<Mutex<Option<Callback>>>,
495}
496
497impl Drop for InnerSubscription {
498    fn drop(&mut self) {
499        self.unsubscribe.lock().unwrap().take();
500    }
501}
502
503/// A wrapper around `SubscriberSet` that automatically handles recursive event emission.
504///
505/// This struct differs from `SubscriberSet` in the following ways:
506/// 1. It automatically handles the `CannotEmitEventDueToRecursiveCall` error that can occur in `SubscriberSet`.
507/// 2. When a recursive event emission is detected, it queues the event instead of throwing an error.
508/// 3. After the current event processing is complete, it automatically processes the queued events.
509///
510/// This behavior ensures that all events are processed in the order they were emitted, even in cases
511/// where recursive event emission would normally cause an error.
512pub(crate) struct SubscriberSetWithQueue<EmitterKey, Callback, Payload> {
513    subscriber_set: SubscriberSet<EmitterKey, Callback>,
514    queue: Arc<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
515}
516
517pub(crate) struct WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
518    subscriber_set: Weak<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
519    queue: Weak<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
520}
521
522impl<EmitterKey, Callback, Payload> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
523    pub fn upgrade(self) -> Option<SubscriberSetWithQueue<EmitterKey, Callback, Payload>> {
524        Some(SubscriberSetWithQueue {
525            subscriber_set: SubscriberSet(self.subscriber_set.upgrade()?),
526            queue: self.queue.upgrade()?,
527        })
528    }
529}
530
531impl<EmitterKey, Callback, Payload> SubscriberSetWithQueue<EmitterKey, Callback, Payload>
532where
533    EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
534    Callback: 'static + Send + Sync + for<'a> FnMut(&'a Payload) -> bool,
535    Payload: Send + Sync + Debug,
536{
537    pub fn new() -> Self {
538        Self {
539            subscriber_set: SubscriberSet::new(),
540            queue: Arc::new(Mutex::new(Default::default())),
541        }
542    }
543
544    pub fn downgrade(&self) -> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
545        WeakSubscriberSetWithQueue {
546            subscriber_set: Arc::downgrade(&self.subscriber_set.0),
547            queue: Arc::downgrade(&self.queue),
548        }
549    }
550
551    pub fn inner(&self) -> &SubscriberSet<EmitterKey, Callback> {
552        &self.subscriber_set
553    }
554
555    pub(crate) fn emit(&self, key: &EmitterKey, payload: Payload) {
556        let mut pending_events: SmallVec<[Payload; 1]> = SmallVec::new();
557        pending_events.push(payload);
558        while let Some(payload) = pending_events.pop() {
559            let result = self
560                .subscriber_set
561                .retain(key, &mut |callback| (callback)(&payload));
562            match result {
563                Ok(_) => {
564                    let mut queue = self.queue.lock().unwrap();
565                    if let Some(new_pending_events) = queue.remove(key) {
566                        pending_events.extend(new_pending_events);
567                    }
568                }
569                Err(SubscriptionError::CannotEmitEventDueToRecursiveCall) => {
570                    let mut queue = self.queue.lock().unwrap();
571                    queue.entry(key.clone()).or_default().push(payload);
572                }
573            }
574        }
575    }
576}
577
578#[cfg(test)]
579mod test {
580    use super::*;
581
582    #[test]
583    fn test_inner_subscription_drop() {
584        let subscriber_set = SubscriberSet::<i32, Box<dyn Fn(&i32) -> bool + Send + Sync>>::new();
585        let (subscription, activate) = subscriber_set.insert(1, Box::new(move |_: &i32| true));
586        activate();
587        drop(subscriber_set);
588        assert!(subscription.unsubscribe.upgrade().is_none());
589    }
590
591    #[test]
592    fn test_inner_subscription_drop_2() {
593        let subscriber_set = SubscriberSet::<i32, Box<dyn Fn(&i32) -> bool + Send + Sync>>::new();
594        let (subscription, activate) = subscriber_set.insert(1, Box::new(move |_: &i32| false));
595        activate();
596        subscriber_set
597            .retain(&1, &mut |callback| callback(&1))
598            .unwrap();
599        assert!(subscription.unsubscribe.upgrade().is_none());
600    }
601}