flexrc/algorithm/
hybrid.rs1use core::cell::Cell;
2use core::marker::PhantomData;
3use core::sync::atomic;
4#[cfg(feature = "track_threads")]
5use core::sync::atomic::AtomicUsize;
6use core::sync::atomic::{AtomicU32, Ordering};
7
8use static_assertions::{assert_eq_align, assert_eq_size, assert_impl_all, assert_not_impl_any};
9
10use crate::algorithm::abort;
11#[cfg(feature = "track_threads")]
12use crate::algorithm::hybrid_threads::THREAD_ID;
13use crate::{Algorithm, FlexRc, FlexRcInner};
14
15#[cfg(not(feature = "track_threads"))]
16assert_eq_size!(HybridMeta<LocalMode>, u64);
17#[cfg(not(feature = "track_threads"))]
18assert_eq_size!(HybridMeta<SharedMode>, u64);
19
20assert_eq_size!(HybridMeta<LocalMode>, HybridMeta<SharedMode>);
21assert_eq_align!(HybridMeta<LocalMode>, HybridMeta<SharedMode>);
22assert_eq_size!(LocalInner<usize>, SharedInner<usize>);
23assert_eq_align!(LocalInner<usize>, SharedInner<usize>);
24assert_eq_size!(LocalHybridRc<usize>, SharedHybridRc<usize>);
25assert_eq_align!(LocalHybridRc<usize>, SharedHybridRc<usize>);
26
27assert_impl_all!(SharedHybridRc<usize>: Send, Sync);
28assert_not_impl_any!(LocalHybridRc<usize>: Send, Sync);
29
30#[cfg(feature = "track_threads")]
31const THREAD_ID_LOCKED: usize = (usize::MAX >> 1) + 1;
32#[cfg(feature = "track_threads")]
33const THREAD_ID_UNLOCKED: usize = usize::MAX >> 1;
34
35const MAX_LOCAL_COUNT: u32 = u32::MAX;
37const MAX_SHARED_COUNT: u32 = u32::MAX >> 2;
39const LOCAL_PRESENT: u32 = (u32::MAX >> 1) + 1;
41const CLEAR_LOCAL: u32 = u32::MAX >> 1;
43
44pub struct LocalMode;
45pub struct SharedMode;
46
47#[repr(C)]
48pub struct HybridMeta<MODE> {
49 #[cfg(feature = "track_threads")]
50 thread_id: AtomicUsize,
51 local_count: Cell<u32>,
52 shared_count: AtomicU32,
53 phantom: PhantomData<MODE>,
54}
55
56pub type LocalHybridRc<T> = FlexRc<HybridMeta<LocalMode>, HybridMeta<SharedMode>, T>;
57
58type LocalInner<T> = FlexRcInner<HybridMeta<LocalMode>, HybridMeta<SharedMode>, T>;
59type SharedInner<T> = FlexRcInner<HybridMeta<SharedMode>, HybridMeta<LocalMode>, T>;
60
61impl Algorithm<HybridMeta<LocalMode>, HybridMeta<SharedMode>> for HybridMeta<LocalMode> {
62 #[inline]
63 fn create() -> Self {
64 Self {
65 #[cfg(feature = "track_threads")]
66 thread_id: AtomicUsize::new(THREAD_ID.with(|t| t.0)),
67 local_count: Cell::new(1),
68 shared_count: AtomicU32::new(LOCAL_PRESENT),
69 phantom: PhantomData,
70 }
71 }
72
73 #[inline]
74 fn is_unique(&self) -> bool {
75 self.local_count.get() == 1 && self.shared_count.load(Ordering::Acquire) == LOCAL_PRESENT
78 }
79
80 #[inline(always)]
81 fn clone(&self) {
82 let old = self.local_count.get();
83
84 if old == MAX_LOCAL_COUNT {
86 abort()
87 }
88 self.local_count.set(old + 1);
89 }
90
91 #[inline(always)]
92 fn drop(&self) -> bool {
93 self.local_count.set(self.local_count.get() - 1);
94
95 if self.local_count.get() == 0 {
96 let old = self.shared_count.fetch_and(CLEAR_LOCAL, Ordering::Release);
98
99 old == LOCAL_PRESENT
102 } else {
103 false
104 }
105 }
106
107 #[inline]
108 fn try_into_other<T: ?Sized>(
109 &self,
110 inner: *mut LocalInner<T>,
111 ) -> Result<*mut SharedInner<T>, *mut LocalInner<T>> {
112 let inner = inner as *mut SharedInner<T>;
117
118 unsafe {
122 (*inner).metadata.clone();
123 }
124 Ok(inner)
125 }
126
127 #[inline]
128 fn try_to_other<T: ?Sized>(
129 &self,
130 inner: *mut LocalInner<T>,
131 ) -> Result<*mut SharedInner<T>, *mut LocalInner<T>> {
132 self.try_into_other(inner)
134 }
135}
136
137pub type SharedHybridRc<T> = FlexRc<HybridMeta<SharedMode>, HybridMeta<LocalMode>, T>;
138
139unsafe impl<T: Send + Sync> Send for SharedHybridRc<T> {}
142unsafe impl<T: Send + Sync> Sync for SharedHybridRc<T> {}
143
144impl Algorithm<HybridMeta<SharedMode>, HybridMeta<LocalMode>> for HybridMeta<SharedMode> {
145 #[inline]
146 fn create() -> Self {
147 Self {
148 #[cfg(feature = "track_threads")]
149 thread_id: AtomicUsize::new(0),
151 local_count: Cell::new(0),
152 shared_count: AtomicU32::new(1),
153 phantom: PhantomData,
154 }
155 }
156
157 #[inline]
158 fn is_unique(&self) -> bool {
159 self.shared_count.load(Ordering::Acquire) == 1
162 }
163
164 #[inline(always)]
165 fn clone(&self) {
166 let old = self.shared_count.fetch_add(1, Ordering::Relaxed);
167
168 if old > MAX_SHARED_COUNT {
169 abort()
170 }
171 }
172
173 #[inline(always)]
174 fn drop(&self) -> bool {
175 if self.shared_count.fetch_sub(1, Ordering::Release) == 1 {
178 atomic::fence(Ordering::Acquire);
179 true
180 } else {
181 false
182 }
183 }
184
185 #[cfg(feature = "track_threads")]
186 #[inline]
187 fn try_into_other<T: ?Sized>(
188 &self,
189 inner: *mut SharedInner<T>,
190 ) -> Result<*mut LocalInner<T>, *mut SharedInner<T>> {
191 let thread_id = THREAD_ID.with(|thread_id| thread_id.0);
192
193 let old_thread_id = loop {
195 let old_thread_id = self.thread_id.fetch_or(THREAD_ID_LOCKED, Ordering::Acquire);
197
198 if old_thread_id < THREAD_ID_LOCKED {
200 break old_thread_id;
201 }
202 std::hint::spin_loop();
203 };
204
205 let old_shared_count = self.shared_count.fetch_or(LOCAL_PRESENT, Ordering::Acquire);
208
209 if thread_id == old_thread_id || old_shared_count < LOCAL_PRESENT {
211 self.thread_id.store(thread_id, Ordering::Release);
214
215 let inner = inner as *mut LocalInner<T>;
218
219 unsafe {
223 (*inner).metadata.clone();
224 }
225 Ok(inner)
226 } else {
227 self.thread_id
230 .fetch_and(THREAD_ID_UNLOCKED, Ordering::Release);
231 Err(inner)
232 }
233 }
234
235 #[cfg(not(feature = "track_threads"))]
236 #[inline]
237 fn try_into_other<T: ?Sized>(
238 &self,
239 inner: *mut SharedInner<T>,
240 ) -> Result<*mut LocalInner<T>, *mut SharedInner<T>> {
241 if self.shared_count.fetch_or(LOCAL_PRESENT, Ordering::Acquire) < LOCAL_PRESENT {
247 let inner = inner as *mut LocalInner<T>;
250
251 unsafe {
255 (*inner).metadata.clone();
256 }
257 Ok(inner)
258 } else {
259 Err(inner)
260 }
261 }
262
263 #[inline]
264 fn try_to_other<T: ?Sized>(
265 &self,
266 inner: *mut SharedInner<T>,
267 ) -> Result<*mut LocalInner<T>, *mut SharedInner<T>> {
268 self.try_into_other(inner)
270 }
271}