noosphere_common/unshared.rs
1use crate::ConditionalSend;
2use futures_util::Stream;
3
4/// NOTE: This type was adapted from https://github.com/Nullus157/async-compression/blob/main/src/unshared.rs
5/// Original implementation licensed MIT/Apache 2
6///
7/// Wraps a type and only allows unique borrowing, the main usecase is to wrap a `!Sync` type and
8/// implement `Sync` for it as this type blocks having multiple shared references to the inner
9/// value.
10///
11/// # Safety
12///
13/// We must be careful when accessing `inner`, there must be no way to create a shared reference to
14/// it from a shared reference to an `Unshared`, as that would allow creating shared references on
15/// multiple threads.
16///
17/// As an example deriving or implementing `Clone` is impossible, two threads could attempt to
18/// clone a shared `Unshared<T>` reference which would result in accessing the same inner value
19/// concurrently.
20#[repr(transparent)]
21pub struct Unshared<T>(T);
22
23impl<T> Unshared<T> {
24 /// Initialize a new [Unshared], wrapping the provided inner value
25 pub fn new(inner: T) -> Self {
26 Unshared(inner)
27 }
28
29 /// Get a mutable (unique) reference to the inner value
30 pub fn get_mut(&mut self) -> &mut T {
31 &mut self.0
32 }
33}
34
35/// Safety: See comments on main docs for `Unshared`
36unsafe impl<T> Sync for Unshared<T> {}
37
38impl<T> std::fmt::Debug for Unshared<T> {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct(core::any::type_name::<T>()).finish()
41 }
42}
43
44/// Wrapper that implements [Stream] for any [Unshared] that happens to wrap an
45/// appropriately bounded [Stream]. This is useful for making a `!Sync` stream
46/// into a `Sync` one in cases where we know it will not be shared by concurrent
47/// actors.
48///
49/// Implementation note: we do not implement [Stream] directly on [Unshared] as
50/// an expression of hygiene; only mutable borrows of the inner value should be
51/// possible in order to preserve the soundness of [Unshared].
52#[repr(transparent)]
53pub struct UnsharedStream<T>(Unshared<T>)
54where
55 T: Stream + Unpin,
56 for<'a> T::Item: ConditionalSend + 'a;
57
58impl<T> UnsharedStream<T>
59where
60 T: Stream + Unpin,
61 for<'a> T::Item: ConditionalSend + 'a,
62{
63 /// Initialize a new [UnsharedStream] wrapping a provided (presumably `!Sync`)
64 /// [Stream]
65 pub fn new(inner: T) -> Self {
66 UnsharedStream(Unshared::new(inner))
67 }
68}
69
70impl<T> Stream for UnsharedStream<T>
71where
72 T: Stream + Unpin,
73 for<'a> T::Item: ConditionalSend + 'a,
74{
75 type Item = T::Item;
76
77 fn poll_next(
78 self: std::pin::Pin<&mut Self>,
79 cx: &mut std::task::Context<'_>,
80 ) -> std::task::Poll<Option<Self::Item>> {
81 std::pin::pin!(self.get_mut().0.get_mut()).poll_next(cx)
82 }
83}