summa_core/utils/
sync.rs

1use std::fmt::{Debug, Formatter};
2use std::hash::{Hash, Hasher};
3use std::mem::ManuallyDrop;
4use std::ops::Deref;
5use std::sync::Arc;
6
7use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
8
9/// `Handler` wraps data with [Arc](std::sync::Arc) and notifies exteriors by channel when dropping
10#[derive(Debug)]
11pub struct Handler<T> {
12    data: ManuallyDrop<Arc<T>>,
13    sender: UnboundedSender<()>,
14}
15
16impl<T> Handler<T> {
17    pub fn new(data: T, sender: UnboundedSender<()>) -> Handler<T> {
18        Handler {
19            sender,
20            data: ManuallyDrop::new(Arc::new(data)),
21        }
22    }
23}
24
25impl<T> Clone for Handler<T> {
26    fn clone(&self) -> Self {
27        Handler {
28            sender: self.sender.clone(),
29            data: self.data.clone(),
30        }
31    }
32}
33
34impl<T> Drop for Handler<T> {
35    fn drop(&mut self) {
36        unsafe { ManuallyDrop::drop(&mut self.data) };
37        self.sender.send(()).ok();
38    }
39}
40
41impl<T> Deref for Handler<T> {
42    type Target = T;
43
44    fn deref(&self) -> &Self::Target {
45        &self.data
46    }
47}
48
49/// `OwningHandler` is like [Arc](std::sync::Arc) but with additional possibility to wait until
50/// the last strong reference drops and then return wrapped data.
51pub struct OwningHandler<T> {
52    handler: Handler<T>,
53    receiver: UnboundedReceiver<()>,
54}
55
56impl<T: Debug> Debug for OwningHandler<T> {
57    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58        T::fmt(&**self.handler.data, f)
59    }
60}
61
62impl<T: Default> Default for OwningHandler<T> {
63    fn default() -> Self {
64        Self::new(T::default())
65    }
66}
67
68impl<T> OwningHandler<T> {
69    /// Wraps data with [Arc](std::sync::Arc) and creates a channel for listening drop events
70    pub fn new(data: T) -> OwningHandler<T> {
71        let (sender, receiver) = unbounded_channel();
72        let handler = Handler::new(data, sender);
73        OwningHandler { handler, receiver }
74    }
75
76    /// Takes new [Handler](crate::utils::sync::Handler)
77    pub fn handler(&self) -> Handler<T> {
78        self.handler.clone()
79    }
80
81    fn destruct(self) -> (UnboundedReceiver<()>, Handler<T>) {
82        (self.receiver, self.handler)
83    }
84
85    /// Blocks until last strong references drops and return wrapped data afterwards
86    pub async fn into_inner(self) -> T {
87        let (mut receiver, handler) = self.destruct();
88        let mut data: Arc<T> = Arc::clone(&handler.data);
89        drop(handler);
90        loop {
91            match Arc::try_unwrap(data) {
92                Ok(data) => return data,
93                Err(arc_data) => {
94                    data = arc_data;
95                    receiver.recv().await.expect("channel unexpectedly closed");
96                }
97            }
98        }
99    }
100}
101
102impl<T> Deref for OwningHandler<T> {
103    type Target = T;
104
105    fn deref(&self) -> &Self::Target {
106        &self.handler.data
107    }
108}
109
110impl<T: Hash> Hash for Handler<T> {
111    fn hash<H: Hasher>(&self, state: &mut H) {
112        self.data.hash(state)
113    }
114}
115
116impl<T: PartialEq> PartialEq<Self> for Handler<T> {
117    fn eq(&self, other: &Self) -> bool {
118        self.data.eq(&other.data)
119    }
120}
121
122impl<T: PartialEq> Eq for Handler<T> {}