1use std::io;
10use std::sync::atomic::{AtomicU32, AtomicU64};
11use std::sync::Arc;
12
13use crate::error::Error;
14use crate::protocol::layout::BLOCK_DATA_OFFSET;
15use crate::protocol::Region;
16
17#[cfg(target_arch = "x86_64")]
28unsafe fn nontemporal_copy(src: *const u8, dst: *mut u8, len: usize) {
29 use core::arch::x86_64::{__m128i, _mm_loadu_si128, _mm_sfence, _mm_stream_si128};
30
31 let mut offset = 0usize;
32
33 let align_offset = dst.align_offset(16).min(len);
35 if align_offset > 0 {
36 core::ptr::copy_nonoverlapping(src, dst, align_offset);
37 offset = align_offset;
38 }
39
40 while offset + 16 <= len {
42 let chunk = _mm_loadu_si128(src.add(offset) as *const __m128i);
43 _mm_stream_si128(dst.add(offset) as *mut __m128i, chunk);
44 offset += 16;
45 }
46
47 if offset < len {
49 core::ptr::copy_nonoverlapping(src.add(offset), dst.add(offset), len - offset);
50 }
51
52 _mm_sfence();
54}
55
56#[derive(Clone, Copy, Debug, PartialEq, Eq)]
61pub struct Topic {
62 pub(crate) topic_idx: u32,
63 pub(crate) publisher_id: u64,
64}
65
66pub struct Loan<'a> {
77 pub(crate) region: &'a Arc<Region>,
78 pub(crate) data_ptr: *mut u8,
79 pub(crate) capacity: usize,
80 pub(crate) len: usize,
81 pub(crate) block_idx: u32,
82 pub(crate) topic_idx: u32,
83 pub(crate) write_seq_atom: &'a AtomicU64,
84 pub(crate) waiters_atom: &'a AtomicU32,
85 pub(crate) single_publisher: bool,
86}
87
88impl<'a> Loan<'a> {
89 pub fn as_mut_slice(&mut self) -> &mut [u8] {
91 unsafe { core::slice::from_raw_parts_mut(self.data_ptr, self.capacity) }
92 }
93
94 pub fn set_data(&mut self, data: &[u8]) -> Result<(), Error> {
100 if data.len() > self.capacity {
101 return Err(Error::DataTooLarge {
102 size: data.len(),
103 capacity: self.capacity,
104 });
105 }
106 #[cfg(target_arch = "x86_64")]
107 {
108 if data.len() >= 2_097_152 {
109 unsafe { nontemporal_copy(data.as_ptr(), self.data_ptr, data.len()) };
113 self.len = data.len();
114 return Ok(());
115 }
116 }
117 unsafe {
118 core::ptr::copy_nonoverlapping(data.as_ptr(), self.data_ptr, data.len());
119 }
120 self.len = data.len();
121 Ok(())
122 }
123
124 pub fn set_len(&mut self, len: usize) -> Result<(), Error> {
130 if len > self.capacity {
131 return Err(Error::DataTooLarge {
132 size: len,
133 capacity: self.capacity,
134 });
135 }
136 self.len = len;
137 Ok(())
138 }
139
140 pub fn capacity(&self) -> usize {
142 self.capacity
143 }
144
145 pub fn write_structured<T: crate::Pod, E: crate::Pod>(
154 &mut self,
155 header: &T,
156 entries: &[E],
157 ) -> Result<(), Error> {
158 let header_size = core::mem::size_of::<T>();
159 let array_size = core::mem::size_of_val(entries);
160 let total = header_size + array_size + 4; if total > self.capacity {
163 return Err(Error::DataTooLarge {
164 size: total,
165 capacity: self.capacity,
166 });
167 }
168
169 let buf = self.as_mut_slice();
170 unsafe {
172 core::ptr::copy_nonoverlapping(
173 header as *const T as *const u8,
174 buf.as_mut_ptr(),
175 header_size,
176 );
177 }
178 if !entries.is_empty() {
180 unsafe {
181 core::ptr::copy_nonoverlapping(
182 entries.as_ptr() as *const u8,
183 buf.as_mut_ptr().add(header_size),
184 array_size,
185 );
186 }
187 }
188 let count = entries.len() as u32;
190 buf[header_size + array_size..header_size + array_size + 4]
191 .copy_from_slice(&count.to_le_bytes());
192
193 self.set_len(total)?;
194 Ok(())
195 }
196
197 #[inline]
200 pub fn publish(self) {
201 self.commit(true);
202 core::mem::forget(self); }
204
205 #[inline]
208 pub fn publish_silent(self) {
209 self.commit(false);
210 core::mem::forget(self);
211 }
212
213 #[inline]
214 fn commit(&self, wake: bool) {
215 assert!(self.len <= u32::MAX as usize, "data_len overflow");
218 self.region.commit_to_ring(
219 self.block_idx,
220 self.len as u32,
221 self.topic_idx,
222 self.write_seq_atom,
223 self.waiters_atom,
224 wake,
225 self.single_publisher,
226 );
227 }
228}
229
230impl<'a> Drop for Loan<'a> {
231 fn drop(&mut self) {
232 self.region.free_block(self.block_idx);
234 }
235}
236
237impl<'a> io::Write for Loan<'a> {
238 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
239 let remaining = self.capacity - self.len;
240 if remaining == 0 && !buf.is_empty() {
241 return Err(io::Error::new(io::ErrorKind::WriteZero, "block full"));
242 }
243 let n = buf.len().min(remaining);
244 unsafe {
245 core::ptr::copy_nonoverlapping(buf.as_ptr(), self.data_ptr.add(self.len), n);
246 }
247 self.len += n;
248 Ok(n)
249 }
250
251 fn flush(&mut self) -> io::Result<()> {
252 Ok(())
253 }
254}
255
256pub struct TypedLoan<'a, T: crate::Pod> {
266 pub(crate) region: &'a Arc<Region>,
267 pub(crate) block_idx: u32,
268 pub(crate) topic_idx: u32,
269 pub(crate) write_seq_atom: &'a AtomicU64,
270 pub(crate) waiters_atom: &'a AtomicU32,
271 pub(crate) single_publisher: bool,
272 pub(crate) _marker: core::marker::PhantomData<&'a mut T>,
273}
274
275impl<'a, T: crate::Pod> TypedLoan<'a, T> {
276 #[allow(clippy::should_implement_trait)]
281 pub fn as_mut(&mut self) -> &mut T {
282 unsafe {
283 let ptr = self.region.block_ptr(self.block_idx).add(BLOCK_DATA_OFFSET);
284 &mut *(ptr as *mut T)
285 }
286 }
287
288 #[inline]
291 pub fn send(self, value: T) {
292 unsafe {
293 let ptr = self.region.block_ptr(self.block_idx).add(BLOCK_DATA_OFFSET);
294 core::ptr::write(ptr as *mut T, value);
295 }
296 self.commit(true);
297 core::mem::forget(self);
298 }
299
300 #[inline]
303 pub fn publish(self) {
304 self.commit(true);
305 core::mem::forget(self);
306 }
307
308 #[inline]
310 pub fn publish_silent(self) {
311 self.commit(false);
312 core::mem::forget(self);
313 }
314
315 #[inline]
316 fn commit(&self, wake: bool) {
317 self.region.commit_to_ring(
318 self.block_idx,
319 core::mem::size_of::<T>() as u32,
320 self.topic_idx,
321 self.write_seq_atom,
322 self.waiters_atom,
323 wake,
324 self.single_publisher,
325 );
326 }
327}
328
329impl<T: crate::Pod> Drop for TypedLoan<'_, T> {
330 fn drop(&mut self) {
331 self.region.free_block(self.block_idx);
333 }
334}
335
336pub struct PinnedLoan<'a> {
348 pub(crate) region: &'a Arc<Region>,
349 pub(crate) data_ptr: *mut u8,
350 pub(crate) capacity: usize,
351 pub(crate) len: usize,
352 pub(crate) topic_idx: u32,
353 pub(crate) write_seq_atom: &'a AtomicU64,
354 pub(crate) waiters_atom: &'a AtomicU32,
355 pub(crate) readers: &'a AtomicU32,
356}
357
358impl<'a> PinnedLoan<'a> {
359 pub fn as_mut_slice(&mut self) -> &mut [u8] {
361 unsafe { core::slice::from_raw_parts_mut(self.data_ptr, self.capacity) }
362 }
363
364 pub fn set_data(&mut self, data: &[u8]) -> Result<(), Error> {
370 if data.len() > self.capacity {
371 return Err(Error::DataTooLarge {
372 size: data.len(),
373 capacity: self.capacity,
374 });
375 }
376 unsafe {
377 core::ptr::copy_nonoverlapping(data.as_ptr(), self.data_ptr, data.len());
378 }
379 self.len = data.len();
380 Ok(())
381 }
382
383 pub fn set_len(&mut self, len: usize) -> Result<(), Error> {
389 if len > self.capacity {
390 return Err(Error::DataTooLarge {
391 size: len,
392 capacity: self.capacity,
393 });
394 }
395 self.len = len;
396 Ok(())
397 }
398
399 pub fn capacity(&self) -> usize {
401 self.capacity
402 }
403
404 pub fn write_structured<T: crate::Pod, E: crate::Pod>(
413 &mut self,
414 header: &T,
415 entries: &[E],
416 ) -> Result<(), Error> {
417 let header_size = core::mem::size_of::<T>();
418 let array_size = core::mem::size_of_val(entries);
419 let total = header_size + array_size + 4; if total > self.capacity {
422 return Err(Error::DataTooLarge {
423 size: total,
424 capacity: self.capacity,
425 });
426 }
427
428 let buf = self.as_mut_slice();
429 unsafe {
431 core::ptr::copy_nonoverlapping(
432 header as *const T as *const u8,
433 buf.as_mut_ptr(),
434 header_size,
435 );
436 }
437 if !entries.is_empty() {
439 unsafe {
440 core::ptr::copy_nonoverlapping(
441 entries.as_ptr() as *const u8,
442 buf.as_mut_ptr().add(header_size),
443 array_size,
444 );
445 }
446 }
447 let count = entries.len() as u32;
449 buf[header_size + array_size..header_size + array_size + 4]
450 .copy_from_slice(&count.to_le_bytes());
451
452 self.set_len(total)?;
453 Ok(())
454 }
455
456 #[inline]
458 pub fn publish(self) {
459 assert!(self.len <= u32::MAX as usize, "data_len overflow");
460 self.region.commit_pinned(
461 self.len as u32,
462 self.topic_idx,
463 self.write_seq_atom,
464 self.waiters_atom,
465 );
466 self.readers.store(0, core::sync::atomic::Ordering::Release);
468 core::mem::forget(self);
470 }
471}
472
473impl Drop for PinnedLoan<'_> {
474 fn drop(&mut self) {
475 self.readers.store(0, core::sync::atomic::Ordering::Release);
480 }
481}