loro_internal/utils/subscription.rs
1use either::Either;
2/*
3This file is modified from the original file in the following repo:
4https://github.com/zed-industries/zed
5
6
7Copyright 2022 - 2024 Zed Industries, Inc.
8
9 Licensed under the Apache License, Version 2.0 (the "License");
10 you may not use this file except in compliance with the License.
11 You may obtain a copy of the License at
12
13
14 http://www.apache.org/licenses/LICENSE-2.0
15
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22
23
24
25
26Apache License
27 Version 2.0, January 2004
28 http://www.apache.org/licenses/
29
30
31 TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
32
33
34 1. Definitions.
35
36
37 "License" shall mean the terms and conditions for use, reproduction,
38 and distribution as defined by Sections 1 through 9 of this document.
39
40
41 "Licensor" shall mean the copyright owner or entity authorized by
42 the copyright owner that is granting the License.
43
44
45 "Legal Entity" shall mean the union of the acting entity and all
46 other entities that control, are controlled by, or are under common
47 control with that entity. For the purposes of this definition,
48 "control" means (i) the power, direct or indirect, to cause the
49 direction or management of such entity, whether by contract or
50 otherwise, or (ii) ownership of fifty percent (50%) or more of the
51 outstanding shares, or (iii) beneficial ownership of such entity.
52
53
54 "You" (or "Your") shall mean an individual or Legal Entity
55 exercising permissions granted by this License.
56
57
58 "Source" form shall mean the preferred form for making modifications,
59 including but not limited to software source code, documentation
60 source, and configuration files.
61
62
63 "Object" form shall mean any form resulting from mechanical
64 transformation or translation of a Source form, including but
65 not limited to compiled object code, generated documentation,
66 and conversions to other media types.
67
68
69 "Work" shall mean the work of authorship, whether in Source or
70 Object form, made available under the License, as indicated by a
71 copyright notice that is included in or attached to the work
72 (an example is provided in the Appendix below).
73
74
75 "Derivative Works" shall mean any work, whether in Source or Object
76 form, that is based on (or derived from) the Work and for which the
77 editorial revisions, annotations, elaborations, or other modifications
78 represent, as a whole, an original work of authorship. For the purposes
79 of this License, Derivative Works shall not include works that remain
80 separable from, or merely link (or bind by name) to the interfaces of,
81 the Work and Derivative Works thereof.
82
83
84 "Contribution" shall mean any work of authorship, including
85 the original version of the Work and any modifications or additions
86 to that Work or Derivative Works thereof, that is intentionally
87 submitted to Licensor for inclusion in the Work by the copyright owner
88 or by an individual or Legal Entity authorized to submit on behalf of
89 the copyright owner. For the purposes of this definition, "submitted"
90 means any form of electronic, verbal, or written communication sent
91 to the Licensor or its representatives, including but not limited to
92 communication on electronic mailing lists, source code control systems,
93 and issue tracking systems that are managed by, or on behalf of, the
94 Licensor for the purpose of discussing and improving the Work, but
95 excluding communication that is conspicuously marked or otherwise
96 designated in writing by the copyright owner as "Not a Contribution."
97
98
99 "Contributor" shall mean Licensor and any individual or Legal Entity
100 on behalf of whom a Contribution has been received by Licensor and
101 subsequently incorporated within the Work.
102
103
104 2. Grant of Copyright License. Subject to the terms and conditions of
105 this License, each Contributor hereby grants to You a perpetual,
106 worldwide, non-exclusive, no-charge, royalty-free, irrevocable
107 copyright license to reproduce, prepare Derivative Works of,
108 publicly display, publicly perform, sublicense, and distribute the
109 Work and such Derivative Works in Source or Object form.
110
111
112 3. Grant of Patent License. Subject to the terms and conditions of
113 this License, each Contributor hereby grants to You a perpetual,
114 worldwide, non-exclusive, no-charge, royalty-free, irrevocable
115 (except as stated in this section) patent license to make, have made,
116 use, offer to sell, sell, import, and otherwise transfer the Work,
117 where such license applies only to those patent claims licensable
118 by such Contributor that are necessarily infringed by their
119 Contribution(s) alone or by combination of their Contribution(s)
120 with the Work to which such Contribution(s) was submitted. If You
121 institute patent litigation against any entity (including a
122 cross-claim or counterclaim in a lawsuit) alleging that the Work
123 or a Contribution incorporated within the Work constitutes direct
124 or contributory patent infringement, then any patent licenses
125 granted to You under this License for that Work shall terminate
126 as of the date such litigation is filed.
127
128
129 4. Redistribution. You may reproduce and distribute copies of the
130 Work or Derivative Works thereof in any medium, with or without
131 modifications, and in Source or Object form, provided that You
132 meet the following conditions:
133
134
135 (a) You must give any other recipients of the Work or
136 Derivative Works a copy of this License; and
137
138
139 (b) You must cause any modified files to carry prominent notices
140 stating that You changed the files; and
141
142
143 (c) You must retain, in the Source form of any Derivative Works
144 that You distribute, all copyright, patent, trademark, and
145 attribution notices from the Source form of the Work,
146 excluding those notices that do not pertain to any part of
147 the Derivative Works; and
148
149
150 (d) If the Work includes a "NOTICE" text file as part of its
151 distribution, then any Derivative Works that You distribute must
152 include a readable copy of the attribution notices contained
153 within such NOTICE file, excluding those notices that do not
154 pertain to any part of the Derivative Works, in at least one
155 of the following places: within a NOTICE text file distributed
156 as part of the Derivative Works; within the Source form or
157 documentation, if provided along with the Derivative Works; or,
158 within a display generated by the Derivative Works, if and
159 wherever such third-party notices normally appear. The contents
160 of the NOTICE file are for informational purposes only and
161 do not modify the License. You may add Your own attribution
162 notices within Derivative Works that You distribute, alongside
163 or as an addendum to the NOTICE text from the Work, provided
164 that such additional attribution notices cannot be construed
165 as modifying the License.
166
167
168 You may add Your own copyright statement to Your modifications and
169 may provide additional or different license terms and conditions
170 for use, reproduction, or distribution of Your modifications, or
171 for any such Derivative Works as a whole, provided Your use,
172 reproduction, and distribution of the Work otherwise complies with
173 the conditions stated in this License.
174
175
176 5. Submission of Contributions. Unless You explicitly state otherwise,
177 any Contribution intentionally submitted for inclusion in the Work
178 by You to the Licensor shall be under the terms and conditions of
179 this License, without any additional terms or conditions.
180 Notwithstanding the above, nothing herein shall supersede or modify
181 the terms of any separate license agreement you may have executed
182 with Licensor regarding such Contributions.
183
184
185 6. Trademarks. This License does not grant permission to use the trade
186 names, trademarks, service marks, or product names of the Licensor,
187 except as required for reasonable and customary use in describing the
188 origin of the Work and reproducing the content of the NOTICE file.
189
190
191 7. Disclaimer of Warranty. Unless required by applicable law or
192 agreed to in writing, Licensor provides the Work (and each
193 Contributor provides its Contributions) on an "AS IS" BASIS,
194 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
195 implied, including, without limitation, any warranties or conditions
196 of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
197 PARTICULAR PURPOSE. You are solely responsible for determining the
198 appropriateness of using or redistributing the Work and assume any
199 risks associated with Your exercise of permissions under this License.
200
201
202 8. Limitation of Liability. In no event and under no legal theory,
203 whether in tort (including negligence), contract, or otherwise,
204 unless required by applicable law (such as deliberate and grossly
205 negligent acts) or agreed to in writing, shall any Contributor be
206 liable to You for damages, including any direct, indirect, special,
207 incidental, or consequential damages of any character arising as a
208 result of this License or out of the use or inability to use the
209 Work (including but not limited to damages for loss of goodwill,
210 work stoppage, computer failure or malfunction, or any and all
211 other commercial damages or losses), even if such Contributor
212 has been advised of the possibility of such damages.
213
214
215 9. Accepting Warranty or Additional Liability. While redistributing
216 the Work or Derivative Works thereof, You may choose to offer,
217 and charge a fee for, acceptance of support, warranty, indemnity,
218 or other liability obligations and/or rights consistent with this
219 License. However, in accepting such obligations, You may act only
220 on Your own behalf and on Your sole responsibility, not on behalf
221 of any other Contributor, and only if You agree to indemnify,
222 defend, and hold each Contributor harmless for any liability
223 incurred by, or claims asserted against, such Contributor by reason
224 of your accepting any such warranty or additional liability.
225
226
227 END OF TERMS AND CONDITIONS
228
229*/
230use crate::sync::{thread, AtomicBool, Mutex};
231use smallvec::SmallVec;
232use std::collections::{BTreeMap, BTreeSet};
233use std::sync::atomic::Ordering;
234use std::sync::{Arc, Weak};
235use std::{fmt::Debug, mem};
236use thread::ThreadId;
237
238#[derive(Debug)]
239pub enum SubscriptionError {
240 CannotEmitEventDueToRecursiveCall,
241}
242
243impl std::fmt::Display for Subscription {
244 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245 write!(f, "SubscriptionError")
246 }
247}
248
249pub struct SubscriberSet<EmitterKey, Callback>(
250 Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
251);
252
253impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
254 fn clone(&self) -> Self {
255 SubscriberSet(self.0.clone())
256 }
257}
258
259struct SubscriberSetState<EmitterKey, Callback> {
260 subscribers: BTreeMap<EmitterKey, Either<BTreeMap<usize, Subscriber<Callback>>, ThreadId>>,
261 dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
262 next_subscriber_id: usize,
263}
264
265struct Subscriber<Callback> {
266 active: Arc<AtomicBool>,
267 callback: Callback,
268 /// This field is used to drop the subscription when the subscriber is dropped.
269 _sub: InnerSubscription,
270}
271
272impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
273where
274 EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
275 Callback: 'static + Send + Sync,
276{
277 pub fn new() -> Self {
278 Self(Arc::new(Mutex::new(SubscriberSetState {
279 subscribers: Default::default(),
280 dropped_subscribers: Default::default(),
281 next_subscriber_id: 0,
282 })))
283 }
284
285 /// Inserts a new [`Subscription`] for the given `emitter_key`.
286 ///
287 /// By default, subscriptions are inert, meaning that they won't be listed when calling
288 /// `[SubscriberSet::remove]` or `[SubscriberSet::retain]`. This method returns a tuple
289 /// of a [`Subscription`] and an `impl FnOnce`, and you can use the latter to activate
290 /// the [`Subscription`].
291 pub fn insert(
292 &self,
293 emitter_key: EmitterKey,
294 callback: Callback,
295 ) -> (Subscription, impl FnOnce()) {
296 let active = Arc::new(AtomicBool::new(false));
297 let mut lock = self.0.lock().unwrap();
298 let subscriber_id = post_inc(&mut lock.next_subscriber_id);
299 let this = Arc::downgrade(&self.0);
300 let emitter_key_1 = emitter_key.clone();
301 let inner_sub = InnerSubscription {
302 unsubscribe: Arc::new(Mutex::new(Some(Box::new(move || {
303 let Some(this) = this.upgrade() else {
304 return;
305 };
306
307 let mut lock = this.lock().unwrap();
308 let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
309 // remove was called with this emitter_key
310 return;
311 };
312
313 if let Either::Left(subscribers) = subscribers {
314 subscribers.remove(&subscriber_id);
315 if subscribers.is_empty() {
316 lock.subscribers.remove(&emitter_key);
317 }
318 return;
319 }
320
321 // We didn't manage to remove the subscription, which means it was dropped
322 // while invoking the callback. Mark it as dropped so that we can remove it
323 // later.
324 lock.dropped_subscribers
325 .insert((emitter_key, subscriber_id));
326 })))),
327 };
328 let subscription = Subscription {
329 unsubscribe: Arc::downgrade(&inner_sub.unsubscribe),
330 };
331
332 lock.subscribers
333 .entry(emitter_key_1)
334 .or_insert_with(|| Either::Left(BTreeMap::new()))
335 .as_mut()
336 .unwrap_left()
337 .insert(
338 subscriber_id,
339 Subscriber {
340 active: active.clone(),
341 callback,
342 _sub: inner_sub,
343 },
344 );
345 (subscription, move || active.store(true, Ordering::Relaxed))
346 }
347
348 #[allow(unused)]
349 pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
350 let mut lock = self.0.lock().unwrap();
351 let subscribers = lock.subscribers.remove(emitter);
352 subscribers
353 .and_then(|x| x.left().map(|s| s.into_values()))
354 .into_iter()
355 .flatten()
356 .filter_map(|subscriber| {
357 if subscriber.active.load(Ordering::Relaxed) {
358 Some(subscriber.callback)
359 } else {
360 None
361 }
362 })
363 }
364
365 pub fn is_recursive_calling(&self, emitter: &EmitterKey) -> bool {
366 if let Some(Either::Right(thread_id)) = self.0.lock().unwrap().subscribers.get(emitter) {
367 *thread_id == thread::current().id()
368 } else {
369 false
370 }
371 }
372
373 /// Call the given callback for each subscriber to the given emitter.
374 /// If the callback returns false, the subscriber is removed.
375 pub fn retain(
376 &self,
377 emitter: &EmitterKey,
378 f: &mut dyn FnMut(&mut Callback) -> bool,
379 ) -> Result<(), SubscriptionError> {
380 let mut subscribers = {
381 let inner = loop {
382 let mut subscriber_set_state = self.0.lock().unwrap();
383 let Some(set) = subscriber_set_state.subscribers.get_mut(emitter) else {
384 return Ok(());
385 };
386 match set {
387 Either::Left(_) => {
388 break std::mem::replace(set, Either::Right(thread::current().id()))
389 .unwrap_left();
390 }
391 Either::Right(lock_thread) => {
392 if thread::current().id() == *lock_thread {
393 return Err(SubscriptionError::CannotEmitEventDueToRecursiveCall);
394 } else {
395 // return Ok(());
396 drop(subscriber_set_state);
397 #[cfg(loom)]
398 loom::thread::yield_now();
399 #[cfg(not(loom))]
400 std::thread::sleep(std::time::Duration::from_millis(10));
401 }
402 }
403 }
404 };
405 inner
406 };
407
408 subscribers.retain(|_, subscriber| {
409 if subscriber.active.load(Ordering::Relaxed) {
410 f(&mut subscriber.callback)
411 } else {
412 true
413 }
414 });
415
416 let mut lock = self.0.lock().unwrap();
417
418 // Add any new subscribers that were added while invoking the callback.
419 if let Some(Either::Left(new_subscribers)) = lock.subscribers.remove(emitter) {
420 subscribers.extend(new_subscribers);
421 }
422
423 // Remove any dropped subscriptions that were dropped while invoking the callback.
424 for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
425 if *emitter == dropped_emitter {
426 subscribers.remove(&dropped_subscription_id);
427 } else {
428 lock.dropped_subscribers
429 .insert((dropped_emitter, dropped_subscription_id));
430 }
431 }
432
433 lock.subscribers
434 .insert(emitter.clone(), Either::Left(subscribers));
435 Ok(())
436 }
437
438 pub fn is_empty(&self) -> bool {
439 self.0.lock().unwrap().subscribers.is_empty()
440 }
441
442 pub fn may_include(&self, emitter: &EmitterKey) -> bool {
443 self.0.lock().unwrap().subscribers.contains_key(emitter)
444 }
445}
446
447impl<EmitterKey, Callback> Default for SubscriberSet<EmitterKey, Callback>
448where
449 EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
450 Callback: 'static + Send + Sync,
451{
452 fn default() -> Self {
453 Self::new()
454 }
455}
456
457impl<EmitterKey, Callback> std::fmt::Debug for SubscriberSet<EmitterKey, Callback>
458where
459 EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
460 Callback: 'static + Send + Sync,
461{
462 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
463 let lock = self.0.lock().unwrap();
464 f.debug_struct("SubscriberSet")
465 .field("subscriber_count", &lock.subscribers.len())
466 .field("dropped_subscribers_count", &lock.dropped_subscribers.len())
467 .field("next_subscriber_id", &lock.next_subscriber_id)
468 .finish()
469 }
470}
471
472fn post_inc(next_subscriber_id: &mut usize) -> usize {
473 let ans = *next_subscriber_id;
474 *next_subscriber_id += 1;
475 ans
476}
477type Callback = Box<dyn FnOnce() + 'static + Send + Sync>;
478
479/// A handle to a subscription created by GPUI. When dropped, the subscription
480/// is cancelled and the callback will no longer be invoked.
481#[must_use]
482pub struct Subscription {
483 unsubscribe: Weak<Mutex<Option<Callback>>>,
484}
485
486impl std::fmt::Debug for Subscription {
487 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
488 f.debug_struct("Subscription").finish()
489 }
490}
491
492impl Subscription {
493 /// Detaches the subscription from this handle. The callback will
494 /// continue to be invoked until the doc has been subscribed to
495 /// are dropped
496 pub fn detach(self) {
497 if let Some(unsubscribe) = self.unsubscribe.upgrade() {
498 unsubscribe.lock().unwrap().take();
499 }
500 }
501
502 /// Unsubscribes the subscription.
503 #[inline]
504 pub fn unsubscribe(self) {
505 drop(self)
506 }
507}
508
509impl Drop for Subscription {
510 fn drop(&mut self) {
511 if let Some(unsubscribe) = self.unsubscribe.upgrade() {
512 let unsubscribe = unsubscribe.lock().unwrap().take();
513 if let Some(unsubscribe) = unsubscribe {
514 unsubscribe();
515 }
516 }
517 }
518}
519
520struct InnerSubscription {
521 unsubscribe: Arc<Mutex<Option<Callback>>>,
522}
523
524impl Drop for InnerSubscription {
525 fn drop(&mut self) {
526 self.unsubscribe.lock().unwrap().take();
527 }
528}
529
530/// A wrapper around `SubscriberSet` that automatically handles recursive event emission.
531///
532/// This struct differs from `SubscriberSet` in the following ways:
533/// 1. It automatically handles the `CannotEmitEventDueToRecursiveCall` error that can occur in `SubscriberSet`.
534/// 2. When a recursive event emission is detected, it queues the event instead of throwing an error.
535/// 3. After the current event processing is complete, it automatically processes the queued events.
536///
537/// This behavior ensures that all events are processed in the order they were emitted, even in cases
538/// where recursive event emission would normally cause an error.
539#[derive(Clone)]
540pub struct SubscriberSetWithQueue<EmitterKey, Callback, Payload> {
541 subscriber_set: SubscriberSet<EmitterKey, Callback>,
542 queue: Arc<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
543}
544
545impl<EmitterKey, Callback, Payload> Debug
546 for SubscriberSetWithQueue<EmitterKey, Callback, Payload>
547{
548 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
549 f.debug_struct("SubscriberSetWithQueue").finish()
550 }
551}
552
553impl<EmitterKey, Callback, Payload> Default
554 for SubscriberSetWithQueue<EmitterKey, Callback, Payload>
555where
556 EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
557 Callback: 'static + Send + Sync + for<'a> FnMut(&'a Payload) -> bool,
558 Payload: Send + Sync + Debug,
559{
560 fn default() -> Self {
561 Self::new()
562 }
563}
564
565pub struct WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
566 subscriber_set: Weak<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
567 queue: Weak<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
568}
569
570impl<EmitterKey, Callback, Payload> Debug
571 for WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload>
572{
573 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
574 f.debug_struct("WeakSubscriberSetWithQueue").finish()
575 }
576}
577
578impl<EmitterKey, Callback, Payload> Clone
579 for WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload>
580{
581 fn clone(&self) -> Self {
582 Self {
583 subscriber_set: self.subscriber_set.clone(),
584 queue: self.queue.clone(),
585 }
586 }
587}
588
589impl<EmitterKey, Callback, Payload> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
590 pub fn upgrade(self) -> Option<SubscriberSetWithQueue<EmitterKey, Callback, Payload>> {
591 Some(SubscriberSetWithQueue {
592 subscriber_set: SubscriberSet(self.subscriber_set.upgrade()?),
593 queue: self.queue.upgrade()?,
594 })
595 }
596}
597
598impl<EmitterKey, Callback, Payload> SubscriberSetWithQueue<EmitterKey, Callback, Payload>
599where
600 EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
601 Callback: 'static + Send + Sync + for<'a> FnMut(&'a Payload) -> bool,
602 Payload: Send + Sync + Debug,
603{
604 pub fn new() -> Self {
605 Self {
606 subscriber_set: SubscriberSet::new(),
607 queue: Arc::new(Mutex::new(Default::default())),
608 }
609 }
610
611 pub fn downgrade(&self) -> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
612 WeakSubscriberSetWithQueue {
613 subscriber_set: Arc::downgrade(&self.subscriber_set.0),
614 queue: Arc::downgrade(&self.queue),
615 }
616 }
617
618 pub fn inner(&self) -> &SubscriberSet<EmitterKey, Callback> {
619 &self.subscriber_set
620 }
621
622 pub fn emit(&self, key: &EmitterKey, payload: Payload) {
623 let mut pending_events: SmallVec<[Payload; 1]> = SmallVec::new();
624 pending_events.push(payload);
625 while let Some(payload) = pending_events.pop() {
626 let result = self
627 .subscriber_set
628 .retain(key, &mut |callback| (callback)(&payload));
629 match result {
630 Ok(_) => {
631 let mut queue = self.queue.lock().unwrap();
632 if let Some(new_pending_events) = queue.remove(key) {
633 pending_events.extend(new_pending_events);
634 }
635 }
636 Err(SubscriptionError::CannotEmitEventDueToRecursiveCall) => {
637 let mut queue = self.queue.lock().unwrap();
638 queue.entry(key.clone()).or_default().push(payload);
639 }
640 }
641 }
642 }
643}
644
645#[cfg(test)]
646mod test {
647 use super::*;
648
649 #[test]
650 fn test_inner_subscription_drop() {
651 let subscriber_set = SubscriberSet::<i32, Box<dyn Fn(&i32) -> bool + Send + Sync>>::new();
652 let (subscription, activate) = subscriber_set.insert(1, Box::new(move |_: &i32| true));
653 activate();
654 drop(subscriber_set);
655 assert!(subscription.unsubscribe.upgrade().is_none());
656 }
657
658 #[test]
659 fn test_inner_subscription_drop_2() {
660 let subscriber_set = SubscriberSet::<i32, Box<dyn Fn(&i32) -> bool + Send + Sync>>::new();
661 let (subscription, activate) = subscriber_set.insert(1, Box::new(move |_: &i32| false));
662 activate();
663 subscriber_set
664 .retain(&1, &mut |callback| callback(&1))
665 .unwrap();
666 assert!(subscription.unsubscribe.upgrade().is_none());
667 }
668}