1use super::config::Config;
13
14pub struct Region {
17 #[cfg(feature = "std")]
18 base: *mut u8,
19 #[cfg(feature = "std")]
20 pub(crate) config: Config,
21 #[cfg(feature = "std")]
22 pub(crate) pool_offset: usize,
23 #[cfg(feature = "std")]
24 pub(crate) last_freed: core::sync::atomic::AtomicU32,
25 #[cfg(not(feature = "std"))]
27 _phantom: core::marker::PhantomData<Config>,
28}
29
30unsafe impl Send for Region {}
34unsafe impl Sync for Region {}
35
36#[cfg(feature = "std")]
37use super::layout::*;
38#[cfg(feature = "std")]
39use crate::platform::notify;
40#[cfg(feature = "std")]
41use core::sync::atomic::{AtomicU32, AtomicU64, Ordering};
42
43#[cfg(feature = "std")]
44impl Region {
45 pub(crate) unsafe fn from_raw(base: *mut u8, _len: usize, config: Config) -> Self {
52 let pool_offset = block_pool_offset(&config);
53 Self {
54 base,
55 config,
56 pool_offset,
57 last_freed: AtomicU32::new(NO_BLOCK),
58 }
59 }
60
61 #[inline]
63 pub(crate) fn base_ptr(&self) -> *mut u8 {
64 self.base
65 }
66
67 #[inline]
68 pub(crate) fn pool_head(&self) -> &AtomicU64 {
69 unsafe { &*(self.base.add(GH_POOL_HEAD) as *const AtomicU64) }
70 }
71
72 #[inline]
74 pub(crate) fn block_ptr_checked(&self, idx: u32) -> Option<*mut u8> {
75 if (idx as usize) >= self.config.block_count as usize {
76 return None;
77 }
78 Some(unsafe {
79 self.base
80 .add(self.pool_offset + idx as usize * self.config.block_size as usize)
81 })
82 }
83
84 #[inline]
90 pub(crate) fn block_ptr(&self, idx: u32) -> *mut u8 {
91 debug_assert!((idx as usize) < self.config.block_count as usize);
92 unsafe {
93 self.base
94 .add(self.pool_offset + idx as usize * self.config.block_size as usize)
95 }
96 }
97
98 #[inline]
101 pub(crate) fn block_refcount_checked(&self, idx: u32) -> Option<&AtomicU32> {
102 let ptr = self.block_ptr_checked(idx)?;
103 Some(unsafe { &*(ptr.add(BK_REFCOUNT) as *const AtomicU32) })
104 }
105
106 #[inline]
107 pub(crate) fn block_refcount(&self, idx: u32) -> &AtomicU32 {
108 unsafe { &*(self.block_ptr(idx).add(BK_REFCOUNT) as *const AtomicU32) }
109 }
110
111 #[inline]
118 pub(crate) fn alloc_block(&self) -> Result<Option<u32>, ()> {
119 let mut head = self.pool_head().load(Ordering::Acquire);
120 loop {
121 let (gen, idx) = unpack(head);
122 if idx == NO_BLOCK {
123 return Ok(None);
124 }
125 if (idx as usize) >= self.config.block_count as usize {
127 return Err(());
128 }
129 let block = self.block_ptr(idx);
130 let next = unsafe { &*(block as *const AtomicU32) }.load(Ordering::Relaxed);
131
132 if next != NO_BLOCK && (next as usize) >= self.config.block_count as usize {
134 return Err(());
135 }
136
137 let new_head = pack(gen.wrapping_add(1), next);
138
139 #[cfg(target_arch = "x86_64")]
141 unsafe {
142 core::arch::x86_64::_mm_prefetch::<{ core::arch::x86_64::_MM_HINT_T0 }>(
143 block as *const i8,
144 );
145 }
146 #[cfg(target_arch = "aarch64")]
147 unsafe {
148 core::arch::asm!(
149 "prfm pstl1keep, [{addr}]",
150 addr = in(reg) block,
151 options(nostack, preserves_flags)
152 );
153 }
154
155 match self.pool_head().compare_exchange_weak(
156 head,
157 new_head,
158 Ordering::AcqRel,
159 Ordering::Acquire,
160 ) {
161 Ok(_) => return Ok(Some(idx)),
162 Err(current) => head = current,
163 }
164 }
165 }
166
167 #[inline]
170 pub(crate) fn alloc_recycled(&self) -> Option<u32> {
171 let idx = self.last_freed.swap(NO_BLOCK, Ordering::AcqRel);
172 if idx != NO_BLOCK {
173 Some(idx)
174 } else {
175 None
176 }
177 }
178
179 #[inline]
180 pub(crate) fn free_block(&self, idx: u32) {
181 if (idx as usize) >= self.config.block_count as usize {
182 return; }
184 let mut head = self.pool_head().load(Ordering::Acquire);
185 loop {
186 let (gen, old_head_idx) = unpack(head);
187 unsafe { &*(self.block_ptr(idx) as *const AtomicU32) }
189 .store(old_head_idx, Ordering::Relaxed);
190 let new_head = pack(gen.wrapping_add(1), idx);
191 match self.pool_head().compare_exchange_weak(
192 head,
193 new_head,
194 Ordering::AcqRel,
195 Ordering::Acquire,
196 ) {
197 Ok(_) => return,
198 Err(current) => head = current,
199 }
200 }
201 }
202
203 pub(crate) fn init_free_list(&self) {
204 for i in 0..self.config.block_count {
205 let next = if i + 1 < self.config.block_count {
206 i + 1
207 } else {
208 NO_BLOCK
209 };
210 let ptr = self.block_ptr(i);
211 unsafe { &*(ptr as *const AtomicU32) }.store(next, Ordering::Relaxed);
212 unsafe { &*(ptr.add(BK_REFCOUNT) as *const AtomicU32) }.store(0, Ordering::Relaxed);
214 }
215 self.pool_head().store(pack(0, 0), Ordering::Release);
216 }
217
218 pub(crate) fn data_capacity(&self) -> usize {
220 self.config.block_size as usize - BLOCK_DATA_OFFSET
221 }
222
223 fn heartbeat_atom(&self) -> &AtomicU64 {
224 unsafe { &*(self.base.add(GH_HEARTBEAT) as *const AtomicU64) }
225 }
226
227 #[cfg(feature = "std")]
229 #[allow(clippy::cast_possible_truncation)]
230 fn now_micros() -> Result<u64, crate::error::Error> {
231 std::time::SystemTime::now()
232 .duration_since(std::time::UNIX_EPOCH)
233 .map(|d| d.as_micros() as u64)
234 .map_err(|_| crate::error::Error::ClockError)
235 }
236
237 #[cfg(feature = "std")]
238 pub(crate) fn update_heartbeat(&self) -> Result<(), crate::error::Error> {
239 let now = Self::now_micros()?;
240 self.heartbeat_atom().fetch_max(now, Ordering::Release);
241 Ok(())
242 }
243
244 #[cfg(feature = "std")]
245 #[allow(clippy::cast_possible_truncation)]
246 pub(crate) fn check_heartbeat(&self) -> Result<(), crate::error::Error> {
247 let hb = self.heartbeat_atom().load(Ordering::Acquire);
248 let now = Self::now_micros()?;
249 if now.saturating_sub(hb) > self.config.stale_timeout.as_micros() as u64 {
250 return Err(crate::error::Error::PublisherDead);
251 }
252 Ok(())
253 }
254
255 #[cfg(feature = "std")]
258 #[inline]
259 pub(crate) fn commit_pinned(
260 &self,
261 data_len: u32,
262 topic_idx: u32,
263 write_seq_atom: &AtomicU64,
264 waiters_atom: &AtomicU32,
265 ) {
266 let seq = write_seq_atom.fetch_add(1, Ordering::AcqRel) + 1;
268
269 let off = topic_entry_off(topic_idx);
272 let pinned_seq = unsafe { &*(self.base.add(off + TE_PINNED_SEQ) as *const AtomicU64) };
273 let packed = (seq & 0xFFFF_FFFF) << 32 | u64::from(data_len);
274 pinned_seq.store(packed, Ordering::Release);
275
276 if waiters_atom.load(Ordering::Acquire) > 0 {
278 let seq_futex = unsafe { &*(write_seq_atom as *const AtomicU64 as *const AtomicU32) };
279 notify::wake_all(seq_futex);
280 }
281 }
282
283 #[inline]
284 pub(crate) fn set_pinned_block(&self, topic_idx: u32, block_idx: u32) {
285 let off = topic_entry_off(topic_idx);
286 unsafe { &*(self.base.add(off + TE_PINNED_BLOCK) as *const AtomicU32) }
288 .store(block_idx, Ordering::Release);
289 }
290
291 #[inline]
293 pub(crate) fn pinned_block(&self, topic_idx: u32) -> u32 {
294 let off = topic_entry_off(topic_idx);
295 unsafe { &*(self.base.add(off + TE_PINNED_BLOCK) as *const AtomicU32) }
296 .load(Ordering::Acquire)
297 }
298
299 #[inline]
301 pub(crate) fn pinned_readers(&self, topic_idx: u32) -> &AtomicU32 {
302 let off = topic_entry_off(topic_idx);
303 unsafe { &*(self.base.add(off + TE_PINNED_READERS) as *const AtomicU32) }
304 }
305
306 #[cfg(feature = "std")]
312 #[inline]
313 #[allow(clippy::too_many_arguments)]
314 pub(crate) fn commit_to_ring(
315 &self,
316 block_idx: u32,
317 data_len: u32,
318 topic_idx: u32,
319 write_seq_atom: &AtomicU64,
320 waiters_atom: &AtomicU32,
321 wake: bool,
322 single_publisher: bool,
323 ) {
324 let seq = write_seq_atom.fetch_add(1, Ordering::AcqRel) + 1;
326
327 let ring_mask = self.config.ring_depth as u64 - 1;
328 let slot = (seq & ring_mask) as u32;
329 let entry_off = ring_entry_off(&self.config, topic_idx, slot);
330 let entry_ptr = unsafe { self.base.add(entry_off) };
331 let entry_seq = unsafe { &*(entry_ptr.add(RE_SEQ) as *const AtomicU64) };
332
333 let next_slot = ((seq + 1) & ring_mask) as u32;
335 let next_entry_off = ring_entry_off(&self.config, topic_idx, next_slot);
336 let next_entry_ptr = unsafe { self.base.add(next_entry_off) };
337 #[cfg(target_arch = "x86_64")]
338 unsafe {
339 core::arch::x86_64::_mm_prefetch::<{ core::arch::x86_64::_MM_HINT_T0 }>(
340 next_entry_ptr as *const i8,
341 );
342 }
343 #[cfg(target_arch = "aarch64")]
344 unsafe {
345 core::arch::asm!(
346 "prfm pstl1keep, [{addr}]",
347 addr = in(reg) next_entry_ptr,
348 options(nostack, preserves_flags)
349 );
350 }
351
352 if single_publisher {
356 entry_seq.store(SEQ_WRITING, Ordering::Release);
357 } else {
358 let mut spin_count = 0u32;
359 loop {
360 let current = entry_seq.load(Ordering::Acquire);
361 if current == SEQ_WRITING {
362 spin_count += 1;
363 if spin_count > 1_000_000 {
364 break;
367 }
368 crate::wait::yield_hint();
369 continue;
370 }
371 if entry_seq
372 .compare_exchange_weak(
373 current,
374 SEQ_WRITING,
375 Ordering::AcqRel,
376 Ordering::Acquire,
377 )
378 .is_ok()
379 {
380 break;
381 }
382 crate::wait::yield_hint();
383 }
384 }
385
386 let old_block_idx =
388 unsafe { &*(entry_ptr.add(RE_BLOCK_IDX) as *const AtomicU32) }.load(Ordering::Relaxed);
389
390 self.block_refcount(block_idx).store(1, Ordering::Release);
392
393 unsafe { &*(entry_ptr.add(RE_BLOCK_IDX) as *const AtomicU32) }
395 .store(block_idx, Ordering::Relaxed);
396 unsafe { &*(entry_ptr.add(RE_DATA_LEN) as *const AtomicU32) }
397 .store(data_len, Ordering::Relaxed);
398
399 entry_seq.store(seq, Ordering::Release);
401
402 if old_block_idx != NO_BLOCK && (old_block_idx as usize) < self.config.block_count as usize
404 {
405 let prev = self
406 .block_refcount(old_block_idx)
407 .fetch_sub(1, Ordering::AcqRel);
408 if prev == 1 {
409 let old_recycled = self.last_freed.swap(old_block_idx, Ordering::AcqRel);
412 if old_recycled != NO_BLOCK {
413 self.free_block(old_recycled);
414 }
415 }
416 }
417
418 if wake && waiters_atom.load(Ordering::Acquire) > 0 {
423 let seq_futex = unsafe { &*(write_seq_atom as *const AtomicU64 as *const AtomicU32) };
427 notify::wake_all(seq_futex);
428 }
429 }
430}
431
432#[cfg(feature = "std")]
433pub(crate) fn release_block(region: &Region, block_idx: u32) {
436 let refcount = match region.block_refcount_checked(block_idx) {
438 Some(rc) => rc,
439 None => return, };
441 let prev = refcount.fetch_sub(1, Ordering::Release);
442 if prev == 1 {
443 core::sync::atomic::fence(Ordering::Acquire);
444 region.free_block(block_idx);
445 }
446}