loro_internal/utils/subscription.rs
1use either::Either;
2/*
3This file is modified from the original file in the following repo:
4https://github.com/zed-industries/zed
5
6
7Copyright 2022 - 2024 Zed Industries, Inc.
8
9 Licensed under the Apache License, Version 2.0 (the "License");
10 you may not use this file except in compliance with the License.
11 You may obtain a copy of the License at
12
13
14 http://www.apache.org/licenses/LICENSE-2.0
15
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22
23
24
25
26Apache License
27 Version 2.0, January 2004
28 http://www.apache.org/licenses/
29
30
31 TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
32
33
34 1. Definitions.
35
36
37 "License" shall mean the terms and conditions for use, reproduction,
38 and distribution as defined by Sections 1 through 9 of this document.
39
40
41 "Licensor" shall mean the copyright owner or entity authorized by
42 the copyright owner that is granting the License.
43
44
45 "Legal Entity" shall mean the union of the acting entity and all
46 other entities that control, are controlled by, or are under common
47 control with that entity. For the purposes of this definition,
48 "control" means (i) the power, direct or indirect, to cause the
49 direction or management of such entity, whether by contract or
50 otherwise, or (ii) ownership of fifty percent (50%) or more of the
51 outstanding shares, or (iii) beneficial ownership of such entity.
52
53
54 "You" (or "Your") shall mean an individual or Legal Entity
55 exercising permissions granted by this License.
56
57
58 "Source" form shall mean the preferred form for making modifications,
59 including but not limited to software source code, documentation
60 source, and configuration files.
61
62
63 "Object" form shall mean any form resulting from mechanical
64 transformation or translation of a Source form, including but
65 not limited to compiled object code, generated documentation,
66 and conversions to other media types.
67
68
69 "Work" shall mean the work of authorship, whether in Source or
70 Object form, made available under the License, as indicated by a
71 copyright notice that is included in or attached to the work
72 (an example is provided in the Appendix below).
73
74
75 "Derivative Works" shall mean any work, whether in Source or Object
76 form, that is based on (or derived from) the Work and for which the
77 editorial revisions, annotations, elaborations, or other modifications
78 represent, as a whole, an original work of authorship. For the purposes
79 of this License, Derivative Works shall not include works that remain
80 separable from, or merely link (or bind by name) to the interfaces of,
81 the Work and Derivative Works thereof.
82
83
84 "Contribution" shall mean any work of authorship, including
85 the original version of the Work and any modifications or additions
86 to that Work or Derivative Works thereof, that is intentionally
87 submitted to Licensor for inclusion in the Work by the copyright owner
88 or by an individual or Legal Entity authorized to submit on behalf of
89 the copyright owner. For the purposes of this definition, "submitted"
90 means any form of electronic, verbal, or written communication sent
91 to the Licensor or its representatives, including but not limited to
92 communication on electronic mailing lists, source code control systems,
93 and issue tracking systems that are managed by, or on behalf of, the
94 Licensor for the purpose of discussing and improving the Work, but
95 excluding communication that is conspicuously marked or otherwise
96 designated in writing by the copyright owner as "Not a Contribution."
97
98
99 "Contributor" shall mean Licensor and any individual or Legal Entity
100 on behalf of whom a Contribution has been received by Licensor and
101 subsequently incorporated within the Work.
102
103
104 2. Grant of Copyright License. Subject to the terms and conditions of
105 this License, each Contributor hereby grants to You a perpetual,
106 worldwide, non-exclusive, no-charge, royalty-free, irrevocable
107 copyright license to reproduce, prepare Derivative Works of,
108 publicly display, publicly perform, sublicense, and distribute the
109 Work and such Derivative Works in Source or Object form.
110
111
112 3. Grant of Patent License. Subject to the terms and conditions of
113 this License, each Contributor hereby grants to You a perpetual,
114 worldwide, non-exclusive, no-charge, royalty-free, irrevocable
115 (except as stated in this section) patent license to make, have made,
116 use, offer to sell, sell, import, and otherwise transfer the Work,
117 where such license applies only to those patent claims licensable
118 by such Contributor that are necessarily infringed by their
119 Contribution(s) alone or by combination of their Contribution(s)
120 with the Work to which such Contribution(s) was submitted. If You
121 institute patent litigation against any entity (including a
122 cross-claim or counterclaim in a lawsuit) alleging that the Work
123 or a Contribution incorporated within the Work constitutes direct
124 or contributory patent infringement, then any patent licenses
125 granted to You under this License for that Work shall terminate
126 as of the date such litigation is filed.
127
128
129 4. Redistribution. You may reproduce and distribute copies of the
130 Work or Derivative Works thereof in any medium, with or without
131 modifications, and in Source or Object form, provided that You
132 meet the following conditions:
133
134
135 (a) You must give any other recipients of the Work or
136 Derivative Works a copy of this License; and
137
138
139 (b) You must cause any modified files to carry prominent notices
140 stating that You changed the files; and
141
142
143 (c) You must retain, in the Source form of any Derivative Works
144 that You distribute, all copyright, patent, trademark, and
145 attribution notices from the Source form of the Work,
146 excluding those notices that do not pertain to any part of
147 the Derivative Works; and
148
149
150 (d) If the Work includes a "NOTICE" text file as part of its
151 distribution, then any Derivative Works that You distribute must
152 include a readable copy of the attribution notices contained
153 within such NOTICE file, excluding those notices that do not
154 pertain to any part of the Derivative Works, in at least one
155 of the following places: within a NOTICE text file distributed
156 as part of the Derivative Works; within the Source form or
157 documentation, if provided along with the Derivative Works; or,
158 within a display generated by the Derivative Works, if and
159 wherever such third-party notices normally appear. The contents
160 of the NOTICE file are for informational purposes only and
161 do not modify the License. You may add Your own attribution
162 notices within Derivative Works that You distribute, alongside
163 or as an addendum to the NOTICE text from the Work, provided
164 that such additional attribution notices cannot be construed
165 as modifying the License.
166
167
168 You may add Your own copyright statement to Your modifications and
169 may provide additional or different license terms and conditions
170 for use, reproduction, or distribution of Your modifications, or
171 for any such Derivative Works as a whole, provided Your use,
172 reproduction, and distribution of the Work otherwise complies with
173 the conditions stated in this License.
174
175
176 5. Submission of Contributions. Unless You explicitly state otherwise,
177 any Contribution intentionally submitted for inclusion in the Work
178 by You to the Licensor shall be under the terms and conditions of
179 this License, without any additional terms or conditions.
180 Notwithstanding the above, nothing herein shall supersede or modify
181 the terms of any separate license agreement you may have executed
182 with Licensor regarding such Contributions.
183
184
185 6. Trademarks. This License does not grant permission to use the trade
186 names, trademarks, service marks, or product names of the Licensor,
187 except as required for reasonable and customary use in describing the
188 origin of the Work and reproducing the content of the NOTICE file.
189
190
191 7. Disclaimer of Warranty. Unless required by applicable law or
192 agreed to in writing, Licensor provides the Work (and each
193 Contributor provides its Contributions) on an "AS IS" BASIS,
194 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
195 implied, including, without limitation, any warranties or conditions
196 of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
197 PARTICULAR PURPOSE. You are solely responsible for determining the
198 appropriateness of using or redistributing the Work and assume any
199 risks associated with Your exercise of permissions under this License.
200
201
202 8. Limitation of Liability. In no event and under no legal theory,
203 whether in tort (including negligence), contract, or otherwise,
204 unless required by applicable law (such as deliberate and grossly
205 negligent acts) or agreed to in writing, shall any Contributor be
206 liable to You for damages, including any direct, indirect, special,
207 incidental, or consequential damages of any character arising as a
208 result of this License or out of the use or inability to use the
209 Work (including but not limited to damages for loss of goodwill,
210 work stoppage, computer failure or malfunction, or any and all
211 other commercial damages or losses), even if such Contributor
212 has been advised of the possibility of such damages.
213
214
215 9. Accepting Warranty or Additional Liability. While redistributing
216 the Work or Derivative Works thereof, You may choose to offer,
217 and charge a fee for, acceptance of support, warranty, indemnity,
218 or other liability obligations and/or rights consistent with this
219 License. However, in accepting such obligations, You may act only
220 on Your own behalf and on Your sole responsibility, not on behalf
221 of any other Contributor, and only if You agree to indemnify,
222 defend, and hold each Contributor harmless for any liability
223 incurred by, or claims asserted against, such Contributor by reason
224 of your accepting any such warranty or additional liability.
225
226
227 END OF TERMS AND CONDITIONS
228
229*/
230use crate::sync::{thread, AtomicBool, Mutex};
231use smallvec::SmallVec;
232use std::collections::{BTreeMap, BTreeSet};
233use std::sync::atomic::Ordering;
234use std::sync::{Arc, Weak};
235use std::{fmt::Debug, mem};
236use thread::ThreadId;
237
238#[derive(Debug)]
239pub enum SubscriptionError {
240 CannotEmitEventDueToRecursiveCall,
241}
242
243impl std::fmt::Display for Subscription {
244 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245 write!(f, "SubscriptionError")
246 }
247}
248
249pub(crate) struct SubscriberSet<EmitterKey, Callback>(
250 Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
251);
252
253impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
254 fn clone(&self) -> Self {
255 SubscriberSet(self.0.clone())
256 }
257}
258
259struct SubscriberSetState<EmitterKey, Callback> {
260 subscribers: BTreeMap<EmitterKey, Either<BTreeMap<usize, Subscriber<Callback>>, ThreadId>>,
261 dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
262 next_subscriber_id: usize,
263}
264
265struct Subscriber<Callback> {
266 active: Arc<AtomicBool>,
267 callback: Callback,
268 /// This field is used to drop the subscription when the subscriber is dropped.
269 _sub: InnerSubscription,
270}
271
272impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
273where
274 EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
275 Callback: 'static + Send + Sync,
276{
277 pub fn new() -> Self {
278 Self(Arc::new(Mutex::new(SubscriberSetState {
279 subscribers: Default::default(),
280 dropped_subscribers: Default::default(),
281 next_subscriber_id: 0,
282 })))
283 }
284
285 /// Inserts a new [`Subscription`] for the given `emitter_key`. By default, subscriptions
286 /// are inert, meaning that they won't be listed when calling `[SubscriberSet::remove]` or `[SubscriberSet::retain]`.
287 /// This method returns a tuple of a [`Subscription`] and an `impl FnOnce`, and you can use the latter
288 /// to activate the [`Subscription`].
289 pub fn insert(
290 &self,
291 emitter_key: EmitterKey,
292 callback: Callback,
293 ) -> (Subscription, impl FnOnce()) {
294 let active = Arc::new(AtomicBool::new(false));
295 let mut lock = self.0.lock().unwrap();
296 let subscriber_id = post_inc(&mut lock.next_subscriber_id);
297 let this = Arc::downgrade(&self.0);
298 let emitter_key_1 = emitter_key.clone();
299 let inner_sub = InnerSubscription {
300 unsubscribe: Arc::new(Mutex::new(Some(Box::new(move || {
301 let Some(this) = this.upgrade() else {
302 return;
303 };
304
305 let mut lock = this.lock().unwrap();
306 let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
307 // remove was called with this emitter_key
308 return;
309 };
310
311 if let Either::Left(subscribers) = subscribers {
312 subscribers.remove(&subscriber_id);
313 if subscribers.is_empty() {
314 lock.subscribers.remove(&emitter_key);
315 }
316 return;
317 }
318
319 // We didn't manage to remove the subscription, which means it was dropped
320 // while invoking the callback. Mark it as dropped so that we can remove it
321 // later.
322 lock.dropped_subscribers
323 .insert((emitter_key, subscriber_id));
324 })))),
325 };
326 let subscription = Subscription {
327 unsubscribe: Arc::downgrade(&inner_sub.unsubscribe),
328 };
329
330 lock.subscribers
331 .entry(emitter_key_1)
332 .or_insert_with(|| Either::Left(BTreeMap::new()))
333 .as_mut()
334 .unwrap_left()
335 .insert(
336 subscriber_id,
337 Subscriber {
338 active: active.clone(),
339 callback,
340 _sub: inner_sub,
341 },
342 );
343 (subscription, move || active.store(true, Ordering::Relaxed))
344 }
345
346 #[allow(unused)]
347 pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
348 let mut lock = self.0.lock().unwrap();
349 let subscribers = lock.subscribers.remove(emitter);
350 subscribers
351 .and_then(|x| x.left().map(|s| s.into_values()))
352 .into_iter()
353 .flatten()
354 .filter_map(|subscriber| {
355 if subscriber.active.load(Ordering::Relaxed) {
356 Some(subscriber.callback)
357 } else {
358 None
359 }
360 })
361 }
362
363 pub fn is_recursive_calling(&self, emitter: &EmitterKey) -> bool {
364 if let Some(Either::Right(thread_id)) = self.0.lock().unwrap().subscribers.get(emitter) {
365 *thread_id == thread::current().id()
366 } else {
367 false
368 }
369 }
370
371 /// Call the given callback for each subscriber to the given emitter.
372 /// If the callback returns false, the subscriber is removed.
373 pub fn retain(
374 &self,
375 emitter: &EmitterKey,
376 f: &mut dyn FnMut(&mut Callback) -> bool,
377 ) -> Result<(), SubscriptionError> {
378 let mut subscribers = {
379 let inner = loop {
380 let mut subscriber_set_state = self.0.lock().unwrap();
381 let Some(set) = subscriber_set_state.subscribers.get_mut(emitter) else {
382 return Ok(());
383 };
384 match set {
385 Either::Left(_) => {
386 break std::mem::replace(set, Either::Right(thread::current().id()))
387 .unwrap_left();
388 }
389 Either::Right(lock_thread) => {
390 if thread::current().id() == *lock_thread {
391 return Err(SubscriptionError::CannotEmitEventDueToRecursiveCall);
392 } else {
393 // return Ok(());
394 drop(subscriber_set_state);
395 #[cfg(loom)]
396 loom::thread::yield_now();
397 #[cfg(not(loom))]
398 std::thread::sleep(std::time::Duration::from_millis(10));
399 }
400 }
401 }
402 };
403 inner
404 };
405
406 subscribers.retain(|_, subscriber| {
407 if subscriber.active.load(Ordering::Relaxed) {
408 f(&mut subscriber.callback)
409 } else {
410 true
411 }
412 });
413
414 let mut lock = self.0.lock().unwrap();
415
416 // Add any new subscribers that were added while invoking the callback.
417 if let Some(Either::Left(new_subscribers)) = lock.subscribers.remove(emitter) {
418 subscribers.extend(new_subscribers);
419 }
420
421 // Remove any dropped subscriptions that were dropped while invoking the callback.
422 for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
423 if *emitter == dropped_emitter {
424 subscribers.remove(&dropped_subscription_id);
425 } else {
426 lock.dropped_subscribers
427 .insert((dropped_emitter, dropped_subscription_id));
428 }
429 }
430
431 lock.subscribers
432 .insert(emitter.clone(), Either::Left(subscribers));
433 Ok(())
434 }
435
436 pub fn is_empty(&self) -> bool {
437 self.0.lock().unwrap().subscribers.is_empty()
438 }
439
440 pub fn may_include(&self, emitter: &EmitterKey) -> bool {
441 self.0.lock().unwrap().subscribers.contains_key(emitter)
442 }
443}
444
445fn post_inc(next_subscriber_id: &mut usize) -> usize {
446 let ans = *next_subscriber_id;
447 *next_subscriber_id += 1;
448 ans
449}
450type Callback = Box<dyn FnOnce() + 'static + Send + Sync>;
451
452/// A handle to a subscription created by GPUI. When dropped, the subscription
453/// is cancelled and the callback will no longer be invoked.
454#[must_use]
455pub struct Subscription {
456 unsubscribe: Weak<Mutex<Option<Callback>>>,
457}
458
459impl std::fmt::Debug for Subscription {
460 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
461 f.debug_struct("Subscription").finish()
462 }
463}
464
465impl Subscription {
466 /// Detaches the subscription from this handle. The callback will
467 /// continue to be invoked until the doc has been subscribed to
468 /// are dropped
469 pub fn detach(self) {
470 if let Some(unsubscribe) = self.unsubscribe.upgrade() {
471 unsubscribe.lock().unwrap().take();
472 }
473 }
474
475 /// Unsubscribes the subscription.
476 #[inline]
477 pub fn unsubscribe(self) {
478 drop(self)
479 }
480}
481
482impl Drop for Subscription {
483 fn drop(&mut self) {
484 if let Some(unsubscribe) = self.unsubscribe.upgrade() {
485 let unsubscribe = unsubscribe.lock().unwrap().take();
486 if let Some(unsubscribe) = unsubscribe {
487 unsubscribe();
488 }
489 }
490 }
491}
492
493struct InnerSubscription {
494 unsubscribe: Arc<Mutex<Option<Callback>>>,
495}
496
497impl Drop for InnerSubscription {
498 fn drop(&mut self) {
499 self.unsubscribe.lock().unwrap().take();
500 }
501}
502
503/// A wrapper around `SubscriberSet` that automatically handles recursive event emission.
504///
505/// This struct differs from `SubscriberSet` in the following ways:
506/// 1. It automatically handles the `CannotEmitEventDueToRecursiveCall` error that can occur in `SubscriberSet`.
507/// 2. When a recursive event emission is detected, it queues the event instead of throwing an error.
508/// 3. After the current event processing is complete, it automatically processes the queued events.
509///
510/// This behavior ensures that all events are processed in the order they were emitted, even in cases
511/// where recursive event emission would normally cause an error.
512pub(crate) struct SubscriberSetWithQueue<EmitterKey, Callback, Payload> {
513 subscriber_set: SubscriberSet<EmitterKey, Callback>,
514 queue: Arc<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
515}
516
517pub(crate) struct WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
518 subscriber_set: Weak<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
519 queue: Weak<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
520}
521
522impl<EmitterKey, Callback, Payload> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
523 pub fn upgrade(self) -> Option<SubscriberSetWithQueue<EmitterKey, Callback, Payload>> {
524 Some(SubscriberSetWithQueue {
525 subscriber_set: SubscriberSet(self.subscriber_set.upgrade()?),
526 queue: self.queue.upgrade()?,
527 })
528 }
529}
530
531impl<EmitterKey, Callback, Payload> SubscriberSetWithQueue<EmitterKey, Callback, Payload>
532where
533 EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
534 Callback: 'static + Send + Sync + for<'a> FnMut(&'a Payload) -> bool,
535 Payload: Send + Sync + Debug,
536{
537 pub fn new() -> Self {
538 Self {
539 subscriber_set: SubscriberSet::new(),
540 queue: Arc::new(Mutex::new(Default::default())),
541 }
542 }
543
544 pub fn downgrade(&self) -> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
545 WeakSubscriberSetWithQueue {
546 subscriber_set: Arc::downgrade(&self.subscriber_set.0),
547 queue: Arc::downgrade(&self.queue),
548 }
549 }
550
551 pub fn inner(&self) -> &SubscriberSet<EmitterKey, Callback> {
552 &self.subscriber_set
553 }
554
555 pub(crate) fn emit(&self, key: &EmitterKey, payload: Payload) {
556 let mut pending_events: SmallVec<[Payload; 1]> = SmallVec::new();
557 pending_events.push(payload);
558 while let Some(payload) = pending_events.pop() {
559 let result = self
560 .subscriber_set
561 .retain(key, &mut |callback| (callback)(&payload));
562 match result {
563 Ok(_) => {
564 let mut queue = self.queue.lock().unwrap();
565 if let Some(new_pending_events) = queue.remove(key) {
566 pending_events.extend(new_pending_events);
567 }
568 }
569 Err(SubscriptionError::CannotEmitEventDueToRecursiveCall) => {
570 let mut queue = self.queue.lock().unwrap();
571 queue.entry(key.clone()).or_default().push(payload);
572 }
573 }
574 }
575 }
576}
577
578#[cfg(test)]
579mod test {
580 use super::*;
581
582 #[test]
583 fn test_inner_subscription_drop() {
584 let subscriber_set = SubscriberSet::<i32, Box<dyn Fn(&i32) -> bool + Send + Sync>>::new();
585 let (subscription, activate) = subscriber_set.insert(1, Box::new(move |_: &i32| true));
586 activate();
587 drop(subscriber_set);
588 assert!(subscription.unsubscribe.upgrade().is_none());
589 }
590
591 #[test]
592 fn test_inner_subscription_drop_2() {
593 let subscriber_set = SubscriberSet::<i32, Box<dyn Fn(&i32) -> bool + Send + Sync>>::new();
594 let (subscription, activate) = subscriber_set.insert(1, Box::new(move |_: &i32| false));
595 activate();
596 subscriber_set
597 .retain(&1, &mut |callback| callback(&1))
598 .unwrap();
599 assert!(subscription.unsubscribe.upgrade().is_none());
600 }
601}