loro_internal/utils/
subscription.rs

1use either::Either;
2/*
3This file is modified from the original file in the following repo:
4https://github.com/zed-industries/zed
5
6
7Copyright 2022 - 2024 Zed Industries, Inc.
8
9   Licensed under the Apache License, Version 2.0 (the "License");
10   you may not use this file except in compliance with the License.
11   You may obtain a copy of the License at
12
13
14       http://www.apache.org/licenses/LICENSE-2.0
15
16
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22
23
24
25
26Apache License
27                           Version 2.0, January 2004
28                        http://www.apache.org/licenses/
29
30
31   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
32
33
34   1. Definitions.
35
36
37      "License" shall mean the terms and conditions for use, reproduction,
38      and distribution as defined by Sections 1 through 9 of this document.
39
40
41      "Licensor" shall mean the copyright owner or entity authorized by
42      the copyright owner that is granting the License.
43
44
45      "Legal Entity" shall mean the union of the acting entity and all
46      other entities that control, are controlled by, or are under common
47      control with that entity. For the purposes of this definition,
48      "control" means (i) the power, direct or indirect, to cause the
49      direction or management of such entity, whether by contract or
50      otherwise, or (ii) ownership of fifty percent (50%) or more of the
51      outstanding shares, or (iii) beneficial ownership of such entity.
52
53
54      "You" (or "Your") shall mean an individual or Legal Entity
55      exercising permissions granted by this License.
56
57
58      "Source" form shall mean the preferred form for making modifications,
59      including but not limited to software source code, documentation
60      source, and configuration files.
61
62
63      "Object" form shall mean any form resulting from mechanical
64      transformation or translation of a Source form, including but
65      not limited to compiled object code, generated documentation,
66      and conversions to other media types.
67
68
69      "Work" shall mean the work of authorship, whether in Source or
70      Object form, made available under the License, as indicated by a
71      copyright notice that is included in or attached to the work
72      (an example is provided in the Appendix below).
73
74
75      "Derivative Works" shall mean any work, whether in Source or Object
76      form, that is based on (or derived from) the Work and for which the
77      editorial revisions, annotations, elaborations, or other modifications
78      represent, as a whole, an original work of authorship. For the purposes
79      of this License, Derivative Works shall not include works that remain
80      separable from, or merely link (or bind by name) to the interfaces of,
81      the Work and Derivative Works thereof.
82
83
84      "Contribution" shall mean any work of authorship, including
85      the original version of the Work and any modifications or additions
86      to that Work or Derivative Works thereof, that is intentionally
87      submitted to Licensor for inclusion in the Work by the copyright owner
88      or by an individual or Legal Entity authorized to submit on behalf of
89      the copyright owner. For the purposes of this definition, "submitted"
90      means any form of electronic, verbal, or written communication sent
91      to the Licensor or its representatives, including but not limited to
92      communication on electronic mailing lists, source code control systems,
93      and issue tracking systems that are managed by, or on behalf of, the
94      Licensor for the purpose of discussing and improving the Work, but
95      excluding communication that is conspicuously marked or otherwise
96      designated in writing by the copyright owner as "Not a Contribution."
97
98
99      "Contributor" shall mean Licensor and any individual or Legal Entity
100      on behalf of whom a Contribution has been received by Licensor and
101      subsequently incorporated within the Work.
102
103
104   2. Grant of Copyright License. Subject to the terms and conditions of
105      this License, each Contributor hereby grants to You a perpetual,
106      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
107      copyright license to reproduce, prepare Derivative Works of,
108      publicly display, publicly perform, sublicense, and distribute the
109      Work and such Derivative Works in Source or Object form.
110
111
112   3. Grant of Patent License. Subject to the terms and conditions of
113      this License, each Contributor hereby grants to You a perpetual,
114      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
115      (except as stated in this section) patent license to make, have made,
116      use, offer to sell, sell, import, and otherwise transfer the Work,
117      where such license applies only to those patent claims licensable
118      by such Contributor that are necessarily infringed by their
119      Contribution(s) alone or by combination of their Contribution(s)
120      with the Work to which such Contribution(s) was submitted. If You
121      institute patent litigation against any entity (including a
122      cross-claim or counterclaim in a lawsuit) alleging that the Work
123      or a Contribution incorporated within the Work constitutes direct
124      or contributory patent infringement, then any patent licenses
125      granted to You under this License for that Work shall terminate
126      as of the date such litigation is filed.
127
128
129   4. Redistribution. You may reproduce and distribute copies of the
130      Work or Derivative Works thereof in any medium, with or without
131      modifications, and in Source or Object form, provided that You
132      meet the following conditions:
133
134
135      (a) You must give any other recipients of the Work or
136          Derivative Works a copy of this License; and
137
138
139      (b) You must cause any modified files to carry prominent notices
140          stating that You changed the files; and
141
142
143      (c) You must retain, in the Source form of any Derivative Works
144          that You distribute, all copyright, patent, trademark, and
145          attribution notices from the Source form of the Work,
146          excluding those notices that do not pertain to any part of
147          the Derivative Works; and
148
149
150      (d) If the Work includes a "NOTICE" text file as part of its
151          distribution, then any Derivative Works that You distribute must
152          include a readable copy of the attribution notices contained
153          within such NOTICE file, excluding those notices that do not
154          pertain to any part of the Derivative Works, in at least one
155          of the following places: within a NOTICE text file distributed
156          as part of the Derivative Works; within the Source form or
157          documentation, if provided along with the Derivative Works; or,
158          within a display generated by the Derivative Works, if and
159          wherever such third-party notices normally appear. The contents
160          of the NOTICE file are for informational purposes only and
161          do not modify the License. You may add Your own attribution
162          notices within Derivative Works that You distribute, alongside
163          or as an addendum to the NOTICE text from the Work, provided
164          that such additional attribution notices cannot be construed
165          as modifying the License.
166
167
168      You may add Your own copyright statement to Your modifications and
169      may provide additional or different license terms and conditions
170      for use, reproduction, or distribution of Your modifications, or
171      for any such Derivative Works as a whole, provided Your use,
172      reproduction, and distribution of the Work otherwise complies with
173      the conditions stated in this License.
174
175
176   5. Submission of Contributions. Unless You explicitly state otherwise,
177      any Contribution intentionally submitted for inclusion in the Work
178      by You to the Licensor shall be under the terms and conditions of
179      this License, without any additional terms or conditions.
180      Notwithstanding the above, nothing herein shall supersede or modify
181      the terms of any separate license agreement you may have executed
182      with Licensor regarding such Contributions.
183
184
185   6. Trademarks. This License does not grant permission to use the trade
186      names, trademarks, service marks, or product names of the Licensor,
187      except as required for reasonable and customary use in describing the
188      origin of the Work and reproducing the content of the NOTICE file.
189
190
191   7. Disclaimer of Warranty. Unless required by applicable law or
192      agreed to in writing, Licensor provides the Work (and each
193      Contributor provides its Contributions) on an "AS IS" BASIS,
194      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
195      implied, including, without limitation, any warranties or conditions
196      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
197      PARTICULAR PURPOSE. You are solely responsible for determining the
198      appropriateness of using or redistributing the Work and assume any
199      risks associated with Your exercise of permissions under this License.
200
201
202   8. Limitation of Liability. In no event and under no legal theory,
203      whether in tort (including negligence), contract, or otherwise,
204      unless required by applicable law (such as deliberate and grossly
205      negligent acts) or agreed to in writing, shall any Contributor be
206      liable to You for damages, including any direct, indirect, special,
207      incidental, or consequential damages of any character arising as a
208      result of this License or out of the use or inability to use the
209      Work (including but not limited to damages for loss of goodwill,
210      work stoppage, computer failure or malfunction, or any and all
211      other commercial damages or losses), even if such Contributor
212      has been advised of the possibility of such damages.
213
214
215   9. Accepting Warranty or Additional Liability. While redistributing
216      the Work or Derivative Works thereof, You may choose to offer,
217      and charge a fee for, acceptance of support, warranty, indemnity,
218      or other liability obligations and/or rights consistent with this
219      License. However, in accepting such obligations, You may act only
220      on Your own behalf and on Your sole responsibility, not on behalf
221      of any other Contributor, and only if You agree to indemnify,
222      defend, and hold each Contributor harmless for any liability
223      incurred by, or claims asserted against, such Contributor by reason
224      of your accepting any such warranty or additional liability.
225
226
227   END OF TERMS AND CONDITIONS
228
229*/
230use crate::sync::{thread, AtomicBool, Mutex};
231use smallvec::SmallVec;
232use std::collections::{BTreeMap, BTreeSet};
233use std::sync::atomic::Ordering;
234use std::sync::{Arc, Weak};
235use std::{fmt::Debug, mem};
236use thread::ThreadId;
237
238#[derive(Debug)]
239pub enum SubscriptionError {
240    CannotEmitEventDueToRecursiveCall,
241}
242
243impl std::fmt::Display for Subscription {
244    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245        write!(f, "SubscriptionError")
246    }
247}
248
249pub struct SubscriberSet<EmitterKey, Callback>(
250    Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
251);
252
253impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
254    fn clone(&self) -> Self {
255        SubscriberSet(self.0.clone())
256    }
257}
258
259struct SubscriberSetState<EmitterKey, Callback> {
260    subscribers: BTreeMap<EmitterKey, Either<BTreeMap<usize, Subscriber<Callback>>, ThreadId>>,
261    dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
262    next_subscriber_id: usize,
263}
264
265struct Subscriber<Callback> {
266    active: Arc<AtomicBool>,
267    callback: Callback,
268    /// This field is used to drop the subscription when the subscriber is dropped.
269    _sub: InnerSubscription,
270}
271
272impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
273where
274    EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
275    Callback: 'static + Send + Sync,
276{
277    pub fn new() -> Self {
278        Self(Arc::new(Mutex::new(SubscriberSetState {
279            subscribers: Default::default(),
280            dropped_subscribers: Default::default(),
281            next_subscriber_id: 0,
282        })))
283    }
284
285    /// Inserts a new [`Subscription`] for the given `emitter_key`.
286    ///
287    /// By default, subscriptions are inert, meaning that they won't be listed when calling
288    /// `[SubscriberSet::remove]` or `[SubscriberSet::retain]`. This method returns a tuple
289    /// of a [`Subscription`] and an `impl FnOnce`, and you can use the latter to activate
290    /// the [`Subscription`].
291    pub fn insert(
292        &self,
293        emitter_key: EmitterKey,
294        callback: Callback,
295    ) -> (Subscription, impl FnOnce()) {
296        let active = Arc::new(AtomicBool::new(false));
297        let mut lock = self.0.lock().unwrap();
298        let subscriber_id = post_inc(&mut lock.next_subscriber_id);
299        let this = Arc::downgrade(&self.0);
300        let emitter_key_1 = emitter_key.clone();
301        let inner_sub = InnerSubscription {
302            unsubscribe: Arc::new(Mutex::new(Some(Box::new(move || {
303                let Some(this) = this.upgrade() else {
304                    return;
305                };
306
307                let mut lock = this.lock().unwrap();
308                let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
309                    // remove was called with this emitter_key
310                    return;
311                };
312
313                if let Either::Left(subscribers) = subscribers {
314                    subscribers.remove(&subscriber_id);
315                    if subscribers.is_empty() {
316                        lock.subscribers.remove(&emitter_key);
317                    }
318                    return;
319                }
320
321                // We didn't manage to remove the subscription, which means it was dropped
322                // while invoking the callback. Mark it as dropped so that we can remove it
323                // later.
324                lock.dropped_subscribers
325                    .insert((emitter_key, subscriber_id));
326            })))),
327        };
328        let subscription = Subscription {
329            unsubscribe: Arc::downgrade(&inner_sub.unsubscribe),
330        };
331
332        lock.subscribers
333            .entry(emitter_key_1)
334            .or_insert_with(|| Either::Left(BTreeMap::new()))
335            .as_mut()
336            .unwrap_left()
337            .insert(
338                subscriber_id,
339                Subscriber {
340                    active: active.clone(),
341                    callback,
342                    _sub: inner_sub,
343                },
344            );
345        (subscription, move || active.store(true, Ordering::Relaxed))
346    }
347
348    #[allow(unused)]
349    pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
350        let mut lock = self.0.lock().unwrap();
351        let subscribers = lock.subscribers.remove(emitter);
352        subscribers
353            .and_then(|x| x.left().map(|s| s.into_values()))
354            .into_iter()
355            .flatten()
356            .filter_map(|subscriber| {
357                if subscriber.active.load(Ordering::Relaxed) {
358                    Some(subscriber.callback)
359                } else {
360                    None
361                }
362            })
363    }
364
365    pub fn is_recursive_calling(&self, emitter: &EmitterKey) -> bool {
366        if let Some(Either::Right(thread_id)) = self.0.lock().unwrap().subscribers.get(emitter) {
367            *thread_id == thread::current().id()
368        } else {
369            false
370        }
371    }
372
373    /// Call the given callback for each subscriber to the given emitter.
374    /// If the callback returns false, the subscriber is removed.
375    pub fn retain(
376        &self,
377        emitter: &EmitterKey,
378        f: &mut dyn FnMut(&mut Callback) -> bool,
379    ) -> Result<(), SubscriptionError> {
380        let mut subscribers = {
381            let inner = loop {
382                let mut subscriber_set_state = self.0.lock().unwrap();
383                let Some(set) = subscriber_set_state.subscribers.get_mut(emitter) else {
384                    return Ok(());
385                };
386                match set {
387                    Either::Left(_) => {
388                        break std::mem::replace(set, Either::Right(thread::current().id()))
389                            .unwrap_left();
390                    }
391                    Either::Right(lock_thread) => {
392                        if thread::current().id() == *lock_thread {
393                            return Err(SubscriptionError::CannotEmitEventDueToRecursiveCall);
394                        } else {
395                            // return Ok(());
396                            drop(subscriber_set_state);
397                            #[cfg(loom)]
398                            loom::thread::yield_now();
399                            #[cfg(not(loom))]
400                            std::thread::sleep(std::time::Duration::from_millis(10));
401                        }
402                    }
403                }
404            };
405            inner
406        };
407
408        subscribers.retain(|_, subscriber| {
409            if subscriber.active.load(Ordering::Relaxed) {
410                f(&mut subscriber.callback)
411            } else {
412                true
413            }
414        });
415
416        let mut lock = self.0.lock().unwrap();
417
418        // Add any new subscribers that were added while invoking the callback.
419        if let Some(Either::Left(new_subscribers)) = lock.subscribers.remove(emitter) {
420            subscribers.extend(new_subscribers);
421        }
422
423        // Remove any dropped subscriptions that were dropped while invoking the callback.
424        for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
425            if *emitter == dropped_emitter {
426                subscribers.remove(&dropped_subscription_id);
427            } else {
428                lock.dropped_subscribers
429                    .insert((dropped_emitter, dropped_subscription_id));
430            }
431        }
432
433        lock.subscribers
434            .insert(emitter.clone(), Either::Left(subscribers));
435        Ok(())
436    }
437
438    pub fn is_empty(&self) -> bool {
439        self.0.lock().unwrap().subscribers.is_empty()
440    }
441
442    pub fn may_include(&self, emitter: &EmitterKey) -> bool {
443        self.0.lock().unwrap().subscribers.contains_key(emitter)
444    }
445}
446
447impl<EmitterKey, Callback> Default for SubscriberSet<EmitterKey, Callback>
448where
449    EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
450    Callback: 'static + Send + Sync,
451{
452    fn default() -> Self {
453        Self::new()
454    }
455}
456
457impl<EmitterKey, Callback> std::fmt::Debug for SubscriberSet<EmitterKey, Callback>
458where
459    EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
460    Callback: 'static + Send + Sync,
461{
462    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
463        let lock = self.0.lock().unwrap();
464        f.debug_struct("SubscriberSet")
465            .field("subscriber_count", &lock.subscribers.len())
466            .field("dropped_subscribers_count", &lock.dropped_subscribers.len())
467            .field("next_subscriber_id", &lock.next_subscriber_id)
468            .finish()
469    }
470}
471
472fn post_inc(next_subscriber_id: &mut usize) -> usize {
473    let ans = *next_subscriber_id;
474    *next_subscriber_id += 1;
475    ans
476}
477type Callback = Box<dyn FnOnce() + 'static + Send + Sync>;
478
479/// A handle to a subscription created by GPUI. When dropped, the subscription
480/// is cancelled and the callback will no longer be invoked.
481#[must_use]
482pub struct Subscription {
483    unsubscribe: Weak<Mutex<Option<Callback>>>,
484}
485
486impl std::fmt::Debug for Subscription {
487    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
488        f.debug_struct("Subscription").finish()
489    }
490}
491
492impl Subscription {
493    /// Detaches the subscription from this handle. The callback will
494    /// continue to be invoked until the doc has been subscribed to
495    /// are dropped
496    pub fn detach(self) {
497        if let Some(unsubscribe) = self.unsubscribe.upgrade() {
498            unsubscribe.lock().unwrap().take();
499        }
500    }
501
502    /// Unsubscribes the subscription.
503    #[inline]
504    pub fn unsubscribe(self) {
505        drop(self)
506    }
507}
508
509impl Drop for Subscription {
510    fn drop(&mut self) {
511        if let Some(unsubscribe) = self.unsubscribe.upgrade() {
512            let unsubscribe = unsubscribe.lock().unwrap().take();
513            if let Some(unsubscribe) = unsubscribe {
514                unsubscribe();
515            }
516        }
517    }
518}
519
520struct InnerSubscription {
521    unsubscribe: Arc<Mutex<Option<Callback>>>,
522}
523
524impl Drop for InnerSubscription {
525    fn drop(&mut self) {
526        self.unsubscribe.lock().unwrap().take();
527    }
528}
529
530/// A wrapper around `SubscriberSet` that automatically handles recursive event emission.
531///
532/// This struct differs from `SubscriberSet` in the following ways:
533/// 1. It automatically handles the `CannotEmitEventDueToRecursiveCall` error that can occur in `SubscriberSet`.
534/// 2. When a recursive event emission is detected, it queues the event instead of throwing an error.
535/// 3. After the current event processing is complete, it automatically processes the queued events.
536///
537/// This behavior ensures that all events are processed in the order they were emitted, even in cases
538/// where recursive event emission would normally cause an error.
539#[derive(Clone)]
540pub struct SubscriberSetWithQueue<EmitterKey, Callback, Payload> {
541    subscriber_set: SubscriberSet<EmitterKey, Callback>,
542    queue: Arc<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
543}
544
545impl<EmitterKey, Callback, Payload> Debug
546    for SubscriberSetWithQueue<EmitterKey, Callback, Payload>
547{
548    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
549        f.debug_struct("SubscriberSetWithQueue").finish()
550    }
551}
552
553impl<EmitterKey, Callback, Payload> Default
554    for SubscriberSetWithQueue<EmitterKey, Callback, Payload>
555where
556    EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
557    Callback: 'static + Send + Sync + for<'a> FnMut(&'a Payload) -> bool,
558    Payload: Send + Sync + Debug,
559{
560    fn default() -> Self {
561        Self::new()
562    }
563}
564
565pub struct WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
566    subscriber_set: Weak<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
567    queue: Weak<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
568}
569
570impl<EmitterKey, Callback, Payload> Debug
571    for WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload>
572{
573    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
574        f.debug_struct("WeakSubscriberSetWithQueue").finish()
575    }
576}
577
578impl<EmitterKey, Callback, Payload> Clone
579    for WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload>
580{
581    fn clone(&self) -> Self {
582        Self {
583            subscriber_set: self.subscriber_set.clone(),
584            queue: self.queue.clone(),
585        }
586    }
587}
588
589impl<EmitterKey, Callback, Payload> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
590    pub fn upgrade(self) -> Option<SubscriberSetWithQueue<EmitterKey, Callback, Payload>> {
591        Some(SubscriberSetWithQueue {
592            subscriber_set: SubscriberSet(self.subscriber_set.upgrade()?),
593            queue: self.queue.upgrade()?,
594        })
595    }
596}
597
598impl<EmitterKey, Callback, Payload> SubscriberSetWithQueue<EmitterKey, Callback, Payload>
599where
600    EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
601    Callback: 'static + Send + Sync + for<'a> FnMut(&'a Payload) -> bool,
602    Payload: Send + Sync + Debug,
603{
604    pub fn new() -> Self {
605        Self {
606            subscriber_set: SubscriberSet::new(),
607            queue: Arc::new(Mutex::new(Default::default())),
608        }
609    }
610
611    pub fn downgrade(&self) -> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
612        WeakSubscriberSetWithQueue {
613            subscriber_set: Arc::downgrade(&self.subscriber_set.0),
614            queue: Arc::downgrade(&self.queue),
615        }
616    }
617
618    pub fn inner(&self) -> &SubscriberSet<EmitterKey, Callback> {
619        &self.subscriber_set
620    }
621
622    pub fn emit(&self, key: &EmitterKey, payload: Payload) {
623        let mut pending_events: SmallVec<[Payload; 1]> = SmallVec::new();
624        pending_events.push(payload);
625        while let Some(payload) = pending_events.pop() {
626            let result = self
627                .subscriber_set
628                .retain(key, &mut |callback| (callback)(&payload));
629            match result {
630                Ok(_) => {
631                    let mut queue = self.queue.lock().unwrap();
632                    if let Some(new_pending_events) = queue.remove(key) {
633                        pending_events.extend(new_pending_events);
634                    }
635                }
636                Err(SubscriptionError::CannotEmitEventDueToRecursiveCall) => {
637                    let mut queue = self.queue.lock().unwrap();
638                    queue.entry(key.clone()).or_default().push(payload);
639                }
640            }
641        }
642    }
643}
644
645#[cfg(test)]
646mod test {
647    use super::*;
648
649    #[test]
650    fn test_inner_subscription_drop() {
651        let subscriber_set = SubscriberSet::<i32, Box<dyn Fn(&i32) -> bool + Send + Sync>>::new();
652        let (subscription, activate) = subscriber_set.insert(1, Box::new(move |_: &i32| true));
653        activate();
654        drop(subscriber_set);
655        assert!(subscription.unsubscribe.upgrade().is_none());
656    }
657
658    #[test]
659    fn test_inner_subscription_drop_2() {
660        let subscriber_set = SubscriberSet::<i32, Box<dyn Fn(&i32) -> bool + Send + Sync>>::new();
661        let (subscription, activate) = subscriber_set.insert(1, Box::new(move |_: &i32| false));
662        activate();
663        subscriber_set
664            .retain(&1, &mut |callback| callback(&1))
665            .unwrap();
666        assert!(subscription.unsubscribe.upgrade().is_none());
667    }
668}