1use std::{
16 error::Error,
17 future::{Future, IntoFuture},
18 marker::PhantomData,
19 mem::MaybeUninit,
20 num::NonZeroUsize,
21 pin::Pin,
22 sync::{atomic::Ordering, Mutex},
23 time::Duration,
24};
25
26use async_trait::async_trait;
27use zenoh_core::{zlock, zresult::ZResult, Resolvable, Wait};
28
29use super::{
30 chunk::{AllocatedChunk, ChunkDescriptor},
31 shm_provider_backend::ShmProviderBackend,
32 types::{ChunkAllocResult, ZAllocError, ZLayoutAllocError, ZLayoutError},
33};
34use crate::{
35 api::{
36 buffer::{typed::Typed, zshmmut::ZShmMut},
37 protocol_implementations::posix::posix_shm_provider_backend::{
38 PosixShmProviderBackend, PosixShmProviderBackendBuilder,
39 },
40 provider::memory_layout::{MemoryLayout, TypedLayout},
41 },
42 metadata::{
43 allocated_descriptor::AllocatedMetadataDescriptor, descriptor::MetadataDescriptor,
44 storage::GLOBAL_METADATA_STORAGE,
45 },
46 watchdog::{
47 confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR},
48 validator::GLOBAL_VALIDATOR,
49 },
50 ShmBufInfo, ShmBufInner,
51};
52
53pub trait AllocLayout {
55 type Buffer;
57 fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError>;
59 unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer;
65}
66
67impl<T: TryInto<MemoryLayout>> AllocLayout for T
68where
69 T::Error: Into<ZLayoutError>,
70{
71 type Buffer = ZShmMut;
72
73 fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError> {
74 self.try_into().map_err(Into::into)
75 }
76
77 unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer {
78 buffer
79 }
80}
81
82impl<T> AllocLayout for TypedLayout<T> {
83 type Buffer = Typed<MaybeUninit<T>, ZShmMut>;
84
85 fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError> {
86 Ok(MemoryLayout::for_type::<T>())
87 }
88
89 unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer {
90 unsafe { Typed::new_unchecked(buffer) }
92 }
93}
94
95#[zenoh_macros::unstable_doc]
99#[derive(Debug)]
100pub struct PrecomputedLayout<'a, Backend, Layout> {
101 size: NonZeroUsize,
102 provider_layout: MemoryLayout,
103 provider: &'a ShmProvider<Backend>,
104 _phantom: PhantomData<Layout>,
105}
106
107impl<'a, Backend, Layout> PrecomputedLayout<'a, Backend, Layout>
108where
109 Backend: ShmProviderBackend,
110{
111 #[zenoh_macros::unstable_doc]
113 pub fn alloc<'b>(&'b self) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout> {
114 PrecomputedAllocBuilder {
115 layout: self,
116 policy: JustAlloc,
117 }
118 }
119}
120
121impl<'a, Backend, Layout> PrecomputedLayout<'a, Backend, Layout>
122where
123 Backend: ShmProviderBackend,
124{
125 fn new(
126 provider: &'a ShmProvider<Backend>,
127 required_layout: MemoryLayout,
128 ) -> Result<Self, ZLayoutError> {
129 let size = required_layout.size();
134
135 let provider_layout = provider
137 .backend
138 .layout_for(required_layout)
139 .map_err(|_| ZLayoutError::ProviderIncompatibleLayout)?;
140
141 Ok(Self {
142 size,
143 provider_layout,
144 provider,
145 _phantom: PhantomData,
146 })
147 }
148}
149
150#[zenoh_macros::unstable_doc]
152pub trait AllocPolicy<Backend> {
153 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult;
154}
155
156#[zenoh_macros::unstable_doc]
158#[async_trait]
159pub trait AsyncAllocPolicy<Backend>: Send {
160 async fn alloc_async(
161 &self,
162 layout: &MemoryLayout,
163 provider: &ShmProvider<Backend>,
164 ) -> ChunkAllocResult;
165}
166
167pub unsafe trait SafePolicy {}
175
176#[zenoh_macros::unstable_doc]
178pub trait ConstPolicy {
179 const NEW: Self;
180}
181
182pub trait PolicyValue<T> {
184 fn get(&self) -> T;
185}
186
187impl<T: Copy> PolicyValue<T> for T {
188 fn get(&self) -> T {
189 *self
190 }
191}
192pub struct ConstBool<const VALUE: bool>;
194impl<const VALUE: bool> ConstPolicy for ConstBool<VALUE> {
195 const NEW: Self = Self;
196}
197impl<const VALUE: bool> PolicyValue<bool> for ConstBool<VALUE> {
198 fn get(&self) -> bool {
199 VALUE
200 }
201}
202pub struct ConstUsize<const VALUE: usize>;
204impl<const VALUE: usize> ConstPolicy for ConstUsize<VALUE> {
205 const NEW: Self = Self;
206}
207impl<const VALUE: usize> PolicyValue<usize> for ConstUsize<VALUE> {
208 fn get(&self) -> usize {
209 VALUE
210 }
211}
212
213#[zenoh_macros::unstable_doc]
215#[derive(Clone, Copy)]
216pub struct JustAlloc;
217impl<Backend> AllocPolicy<Backend> for JustAlloc
218where
219 Backend: ShmProviderBackend,
220{
221 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
222 provider.backend.alloc(layout)
223 }
224}
225unsafe impl SafePolicy for JustAlloc {}
226impl ConstPolicy for JustAlloc {
227 const NEW: Self = Self;
228}
229
230#[zenoh_macros::unstable_doc]
237#[derive(Clone, Copy)]
238pub struct GarbageCollect<InnerPolicy = JustAlloc, AltPolicy = JustAlloc, Safe = ConstBool<true>> {
239 inner_policy: InnerPolicy,
240 alt_policy: AltPolicy,
241 safe: Safe,
242}
243impl<InnerPolicy, AltPolicy, Safe> GarbageCollect<InnerPolicy, AltPolicy, Safe> {
244 pub fn new(inner_policy: InnerPolicy, alt_policy: AltPolicy, safe: Safe) -> Self {
246 Self {
247 inner_policy,
248 alt_policy,
249 safe,
250 }
251 }
252}
253impl<Backend, InnerPolicy, AltPolicy, Safe> AllocPolicy<Backend>
254 for GarbageCollect<InnerPolicy, AltPolicy, Safe>
255where
256 Backend: ShmProviderBackend,
257 InnerPolicy: AllocPolicy<Backend>,
258 AltPolicy: AllocPolicy<Backend>,
259 Safe: PolicyValue<bool>,
260{
261 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
262 let result = self.inner_policy.alloc(layout, provider);
263 if result.is_err() {
264 let collected = if self.safe.get() {
266 provider.garbage_collect()
267 } else {
268 unsafe { provider.garbage_collect_unsafe() }
271 };
272 if collected >= layout.size().get() {
273 return self.alt_policy.alloc(layout, provider);
274 }
275 }
276 result
277 }
278}
279unsafe impl<InnerPolicy: SafePolicy, AltPolicy: SafePolicy> SafePolicy
280 for GarbageCollect<InnerPolicy, AltPolicy, ConstBool<true>>
281{
282}
283impl<InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy, Safe: ConstPolicy> ConstPolicy
284 for GarbageCollect<InnerPolicy, AltPolicy, Safe>
285{
286 const NEW: Self = Self {
287 inner_policy: InnerPolicy::NEW,
288 alt_policy: AltPolicy::NEW,
289 safe: Safe::NEW,
290 };
291}
292
293#[zenoh_macros::unstable_doc]
298#[derive(Clone, Copy)]
299pub struct Defragment<InnerPolicy = JustAlloc, AltPolicy = JustAlloc> {
300 inner_policy: InnerPolicy,
301 alt_policy: AltPolicy,
302}
303impl<InnerPolicy, AltPolicy> Defragment<InnerPolicy, AltPolicy> {
304 pub fn new(inner_policy: InnerPolicy, alt_policy: AltPolicy) -> Self {
306 Self {
307 inner_policy,
308 alt_policy,
309 }
310 }
311}
312impl<Backend, InnerPolicy, AltPolicy> AllocPolicy<Backend> for Defragment<InnerPolicy, AltPolicy>
313where
314 Backend: ShmProviderBackend,
315 InnerPolicy: AllocPolicy<Backend>,
316 AltPolicy: AllocPolicy<Backend>,
317{
318 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
319 let result = self.inner_policy.alloc(layout, provider);
320 if let Err(ZAllocError::NeedDefragment) = result {
321 if provider.defragment() >= layout.size().get() {
323 return self.alt_policy.alloc(layout, provider);
324 }
325 }
326 result
327 }
328}
329unsafe impl<InnerPolicy: SafePolicy, AltPolicy: SafePolicy> SafePolicy
330 for Defragment<InnerPolicy, AltPolicy>
331{
332}
333impl<InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy> ConstPolicy
334 for Defragment<InnerPolicy, AltPolicy>
335{
336 const NEW: Self = Self {
337 inner_policy: InnerPolicy::NEW,
338 alt_policy: AltPolicy::NEW,
339 };
340}
341
342#[zenoh_macros::unstable_doc]
348#[derive(Clone, Copy)]
349pub struct Deallocate<Limit, InnerPolicy = JustAlloc, AltPolicy = InnerPolicy> {
350 limit: Limit,
351 inner_policy: InnerPolicy,
352 alt_policy: AltPolicy,
353}
354impl<Limit, InnerPolicy, AltPolicy> Deallocate<Limit, InnerPolicy, AltPolicy> {
355 pub fn new(limit: Limit, inner_policy: InnerPolicy, alt_policy: AltPolicy) -> Self {
357 Self {
358 limit,
359 inner_policy,
360 alt_policy,
361 }
362 }
363}
364impl<Backend, Limit, InnerPolicy, AltPolicy> AllocPolicy<Backend>
365 for Deallocate<Limit, InnerPolicy, AltPolicy>
366where
367 Backend: ShmProviderBackend,
368 Limit: PolicyValue<usize>,
369 InnerPolicy: AllocPolicy<Backend>,
370 AltPolicy: AllocPolicy<Backend>,
371{
372 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
373 for _ in 0..self.limit.get() {
374 match self.inner_policy.alloc(layout, provider) {
375 res @ (Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory)) => {
376 let Some(chunk) = zlock!(provider.busy_list).pop() else {
377 return res;
378 };
379 provider.backend.free(&chunk.descriptor());
380 }
381 res => return res,
382 }
383 }
384 self.alt_policy.alloc(layout, provider)
385 }
386}
387impl<Limit: ConstPolicy, InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy> ConstPolicy
388 for Deallocate<Limit, InnerPolicy, AltPolicy>
389{
390 const NEW: Self = Self {
391 limit: Limit::NEW,
392 inner_policy: InnerPolicy::NEW,
393 alt_policy: AltPolicy::NEW,
394 };
395}
396
397#[zenoh_macros::unstable_doc]
401#[derive(Clone, Copy)]
402pub struct BlockOn<InnerPolicy = JustAlloc> {
403 inner_policy: InnerPolicy,
404}
405impl<InnerPolicy> BlockOn<InnerPolicy> {
406 pub fn new(inner_policy: InnerPolicy) -> Self {
408 Self { inner_policy }
409 }
410}
411#[async_trait]
412impl<Backend, InnerPolicy> AsyncAllocPolicy<Backend> for BlockOn<InnerPolicy>
413where
414 Backend: ShmProviderBackend + Sync,
415 InnerPolicy: AllocPolicy<Backend> + Send + Sync,
416{
417 async fn alloc_async(
418 &self,
419 layout: &MemoryLayout,
420 provider: &ShmProvider<Backend>,
421 ) -> ChunkAllocResult {
422 loop {
423 match self.inner_policy.alloc(layout, provider) {
424 Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
425 tokio::time::sleep(Duration::from_millis(1)).await;
427 }
428 other_result => {
429 return other_result;
430 }
431 }
432 }
433 }
434}
435unsafe impl<InnerPolicy: SafePolicy> SafePolicy for BlockOn<InnerPolicy> {}
436impl<Backend, InnerPolicy> AllocPolicy<Backend> for BlockOn<InnerPolicy>
437where
438 Backend: ShmProviderBackend,
439 InnerPolicy: AllocPolicy<Backend>,
440{
441 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
442 loop {
443 match self.inner_policy.alloc(layout, provider) {
444 Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
445 std::thread::sleep(Duration::from_millis(1));
447 }
448 other_result => {
449 return other_result;
450 }
451 }
452 }
453 }
454}
455impl<InnerPolicy: ConstPolicy> ConstPolicy for BlockOn<InnerPolicy> {
456 const NEW: Self = Self {
457 inner_policy: InnerPolicy::NEW,
458 };
459}
460
461#[zenoh_macros::unstable_doc]
462pub struct AllocBuilder<'a, Backend, Layout, Policy = JustAlloc> {
463 provider: &'a ShmProvider<Backend>,
464 layout: Layout,
465 policy: Policy,
466}
467
468impl<'a, Backend, Layout, Policy> AllocBuilder<'a, Backend, Layout, Policy> {
470 #[zenoh_macros::unstable_doc]
472 pub fn with_policy<OtherPolicy: SafePolicy + ConstPolicy>(
473 self,
474 ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
475 unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
477 }
478
479 #[zenoh_macros::unstable_doc]
485 pub unsafe fn with_unsafe_policy<OtherPolicy: ConstPolicy>(
486 self,
487 ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
488 unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
490 }
491
492 #[zenoh_macros::unstable_doc]
498 pub unsafe fn with_runtime_policy<OtherPolicy>(
499 self,
500 policy: OtherPolicy,
501 ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
502 AllocBuilder {
503 provider: self.provider,
504 layout: self.layout,
505 policy,
506 }
507 }
508
509 fn layout_policy(self) -> Result<(PrecomputedLayout<'a, Backend, Layout>, Policy), ZLayoutError>
510 where
511 Backend: ShmProviderBackend,
512 Layout: AllocLayout,
513 {
514 let layout = PrecomputedLayout::new(self.provider, self.layout.memory_layout()?)?;
515 Ok((layout, self.policy))
516 }
517}
518
519impl<'a, Backend: ShmProviderBackend, Layout: TryInto<MemoryLayout>>
520 AllocBuilder<'a, Backend, Layout>
521where
522 Layout::Error: Into<ZLayoutError>,
523{
524 #[deprecated(
526 since = "1.5.3",
527 note = "Please use `ShmProvider::alloc_layout` method instead"
528 )]
529 #[zenoh_macros::unstable_doc]
530 pub fn into_layout(self) -> Result<PrecomputedLayout<'a, Backend, Layout>, ZLayoutError> {
531 PrecomputedLayout::new(self.provider, self.layout.try_into().map_err(Into::into)?)
532 }
533}
534
535impl<Backend, Layout: AllocLayout, Policy> Resolvable
536 for AllocBuilder<'_, Backend, Layout, Policy>
537{
538 type To = Result<Layout::Buffer, ZLayoutAllocError>;
539}
540
541impl<Backend: ShmProviderBackend, Layout: AllocLayout, Policy: AllocPolicy<Backend>> Wait
542 for AllocBuilder<'_, Backend, Layout, Policy>
543{
544 fn wait(self) -> <Self as Resolvable>::To {
545 let (layout, policy) = self.layout_policy()?;
546 Ok(unsafe { layout.alloc().with_runtime_policy(policy) }.wait()?)
548 }
549}
550
551impl<
552 'a,
553 Backend: ShmProviderBackend + Sync,
554 Layout: AllocLayout + Send + Sync + 'a,
555 Policy: AsyncAllocPolicy<Backend> + Sync + 'a,
556 > IntoFuture for AllocBuilder<'a, Backend, Layout, Policy>
557{
558 type Output = <Self as Resolvable>::To;
559 type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'a + Send>>;
560
561 fn into_future(self) -> Self::IntoFuture {
562 Box::pin(async move {
563 let (layout, policy) = self.layout_policy()?;
564 Ok(unsafe { layout.alloc().with_runtime_policy(policy) }.await?)
566 })
567 }
568}
569
570#[zenoh_macros::unstable_doc]
572pub struct PrecomputedAllocBuilder<'b, 'a: 'b, Backend, Layout, Policy = JustAlloc> {
573 layout: &'b PrecomputedLayout<'a, Backend, Layout>,
574 policy: Policy,
575}
576
577impl<'b, 'a: 'b, Backend, Layout, Policy> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, Policy> {
579 #[zenoh_macros::unstable_doc]
581 pub fn with_policy<OtherPolicy: ConstPolicy>(
582 self,
583 ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
584 unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
586 }
587
588 #[zenoh_macros::unstable_doc]
594 pub unsafe fn with_unsafe_policy<OtherPolicy: ConstPolicy>(
595 self,
596 ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
597 unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
599 }
600
601 #[zenoh_macros::unstable_doc]
607 pub unsafe fn with_runtime_policy<OtherPolicy>(
608 self,
609 policy: OtherPolicy,
610 ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
611 PrecomputedAllocBuilder {
612 layout: self.layout,
613 policy,
614 }
615 }
616}
617
618impl<Backend, Layout: AllocLayout, Policy> Resolvable
619 for PrecomputedAllocBuilder<'_, '_, Backend, Layout, Policy>
620{
621 type To = Result<Layout::Buffer, ZAllocError>;
622}
623
624impl<Backend: ShmProviderBackend, Layout: AllocLayout, Policy: AllocPolicy<Backend>> Wait
625 for PrecomputedAllocBuilder<'_, '_, Backend, Layout, Policy>
626{
627 fn wait(self) -> <Self as Resolvable>::To {
628 let buffer = self.layout.provider.alloc_inner(
629 self.layout.size,
630 &self.layout.provider_layout,
631 &self.policy,
632 )?;
633 Ok(unsafe { Layout::wrap_buffer(buffer) })
635 }
636}
637
638impl<
639 'b,
640 'a: 'b,
641 Backend: ShmProviderBackend + Sync,
642 Layout: AllocLayout + Sync,
643 Policy: AsyncAllocPolicy<Backend> + Sync + 'b,
644 > IntoFuture for PrecomputedAllocBuilder<'b, 'a, Backend, Layout, Policy>
645{
646 type Output = <Self as Resolvable>::To;
647 type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'b + Send>>;
648
649 fn into_future(self) -> Self::IntoFuture {
650 Box::pin(async move {
651 let buffer = self
652 .layout
653 .provider
654 .alloc_inner_async(self.layout.size, &self.layout.provider_layout, &self.policy)
655 .await?;
656 Ok(unsafe { Layout::wrap_buffer(buffer) })
658 })
659 }
660}
661
662#[zenoh_macros::unstable_doc]
663pub struct ShmProviderBuilder;
664impl ShmProviderBuilder {
665 #[zenoh_macros::unstable_doc]
667 pub fn backend<Backend: ShmProviderBackend>(
668 backend: Backend,
669 ) -> ShmProviderBuilderBackend<Backend> {
670 ShmProviderBuilderBackend { backend }
671 }
672
673 #[zenoh_macros::unstable_doc]
675 pub fn default_backend<Layout>(layout: Layout) -> ShmProviderBuilderWithDefaultBackend<Layout> {
676 ShmProviderBuilderWithDefaultBackend { layout }
677 }
678}
679
680#[zenoh_macros::unstable_doc]
681pub struct ShmProviderBuilderBackend<Backend>
682where
683 Backend: ShmProviderBackend,
684{
685 backend: Backend,
686}
687#[zenoh_macros::unstable_doc]
688impl<Backend> Resolvable for ShmProviderBuilderBackend<Backend>
689where
690 Backend: ShmProviderBackend,
691{
692 type To = ShmProvider<Backend>;
693}
694
695#[zenoh_macros::unstable_doc]
696impl<Backend> Wait for ShmProviderBuilderBackend<Backend>
697where
698 Backend: ShmProviderBackend,
699{
700 fn wait(self) -> <Self as Resolvable>::To {
702 ShmProvider::new(self.backend)
703 }
704}
705
706#[zenoh_macros::unstable_doc]
707pub struct ShmProviderBuilderWithDefaultBackend<Layout> {
708 layout: Layout,
709}
710
711#[zenoh_macros::unstable_doc]
712impl<Layout> Resolvable for ShmProviderBuilderWithDefaultBackend<Layout> {
713 type To = ZResult<ShmProvider<PosixShmProviderBackend>>;
714}
715
716#[zenoh_macros::unstable_doc]
717impl<Layout: TryInto<MemoryLayout>> Wait for ShmProviderBuilderWithDefaultBackend<Layout>
718where
719 Layout::Error: Error,
720 PosixShmProviderBackendBuilder<Layout>:
721 Resolvable<To = ZResult<PosixShmProviderBackend>> + Wait,
722{
723 fn wait(self) -> <Self as Resolvable>::To {
725 let backend = PosixShmProviderBackend::builder(self.layout).wait()?;
726 Ok(ShmProvider::new(backend))
727 }
728}
729
730#[derive(Debug)]
731struct BusyChunk {
732 metadata: AllocatedMetadataDescriptor,
733}
734
735impl BusyChunk {
736 fn new(metadata: AllocatedMetadataDescriptor) -> Self {
737 Self { metadata }
738 }
739
740 fn descriptor(&self) -> ChunkDescriptor {
741 self.metadata.header().data_descriptor()
742 }
743}
744
745#[zenoh_macros::unstable_doc]
747#[derive(Debug)]
748pub struct ShmProvider<Backend> {
749 backend: Backend,
750 busy_list: Mutex<Vec<BusyChunk>>,
751}
752
753impl<Backend: Default> Default for ShmProvider<Backend> {
754 fn default() -> Self {
755 Self::new(Backend::default())
756 }
757}
758
759impl<Backend> ShmProvider<Backend>
760where
761 Backend: ShmProviderBackend,
762{
763 #[zenoh_macros::unstable_doc]
765 pub fn alloc<Layout>(&self, layout: Layout) -> AllocBuilder<'_, Backend, Layout> {
766 AllocBuilder {
767 provider: self,
768 layout,
769 policy: JustAlloc,
770 }
771 }
772
773 #[zenoh_macros::unstable_doc]
775 pub fn alloc_layout<Layout: TryInto<MemoryLayout>>(
776 &self,
777 layout: Layout,
778 ) -> Result<PrecomputedLayout<'_, Backend, Layout>, ZLayoutError>
779 where
780 Layout::Error: Into<ZLayoutError>,
781 {
782 PrecomputedLayout::new(self, layout.try_into().map_err(Into::into)?)
783 }
784 #[zenoh_macros::unstable_doc]
786 pub fn defragment(&self) -> usize {
787 self.backend.defragment()
788 }
789
790 #[zenoh_macros::unstable_doc]
794 pub fn map(&self, chunk: AllocatedChunk, len: usize) -> Result<ZShmMut, ZAllocError> {
795 let len = len.try_into().map_err(|_| ZAllocError::Other)?;
796
797 let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
799
800 let wrapped = self.wrap(chunk, len, allocated_metadata, confirmed_metadata);
802 Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
803 }
804
805 #[zenoh_macros::unstable_doc]
806 fn garbage_collect_impl<const SAFE: bool>(&self) -> usize {
807 fn retain_unordered<T>(vec: &mut Vec<T>, mut f: impl FnMut(&T) -> bool) {
808 let mut i = 0;
809 while i < vec.len() {
810 if f(&vec[i]) {
811 i += 1;
812 } else {
813 vec.swap_remove(i); }
816 }
817 }
818
819 tracing::trace!("Running Garbage Collector");
820 let mut largest = 0usize;
821 retain_unordered(&mut zlock!(self.busy_list), |chunk| {
822 let header = chunk.metadata.header();
823 if header.refcount.load(Ordering::SeqCst) == 0
824 || (!SAFE && header.watchdog_invalidated.load(Ordering::SeqCst))
825 {
826 tracing::trace!("Garbage Collecting Chunk: {:?}", chunk);
827 let descriptor_to_free = chunk.descriptor();
828 self.backend.free(&descriptor_to_free);
829 largest = largest.max(descriptor_to_free.len.get());
830 return false;
831 }
832 true
833 });
834 largest
835 }
836
837 #[zenoh_macros::unstable_doc]
840 pub fn garbage_collect(&self) -> usize {
841 self.garbage_collect_impl::<true>()
842 }
843
844 #[zenoh_macros::unstable_doc]
851 pub unsafe fn garbage_collect_unsafe(&self) -> usize {
852 self.garbage_collect_impl::<false>()
853 }
854
855 #[zenoh_macros::unstable_doc]
857 pub fn available(&self) -> usize {
858 self.backend.available()
859 }
860}
861
862impl<Backend> ShmProvider<Backend> {
864 fn new(backend: Backend) -> Self {
865 Self {
866 backend,
867 busy_list: Default::default(),
868 }
869 }
870}
871
872impl<Backend> ShmProvider<Backend>
873where
874 Backend: ShmProviderBackend,
875{
876 fn alloc_inner<Policy>(
877 &self,
878 size: NonZeroUsize,
879 layout: &MemoryLayout,
880 policy: &Policy,
881 ) -> Result<ZShmMut, ZAllocError>
882 where
883 Policy: AllocPolicy<Backend>,
884 {
885 let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
887
888 let chunk = policy.alloc(layout, self)?;
895
896 let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
898 Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
899 }
900
901 fn alloc_resources() -> Result<(AllocatedMetadataDescriptor, ConfirmedDescriptor), ZAllocError>
902 {
903 let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?;
905
906 let confirmed_metadata = GLOBAL_CONFIRMATOR.read().add(allocated_metadata.clone());
908
909 Ok((allocated_metadata, confirmed_metadata))
910 }
911
912 fn wrap(
913 &self,
914 chunk: AllocatedChunk,
915 len: NonZeroUsize,
916 allocated_metadata: AllocatedMetadataDescriptor,
917 confirmed_metadata: ConfirmedDescriptor,
918 ) -> ShmBufInner {
919 allocated_metadata
922 .header()
923 .set_data_descriptor(&chunk.descriptor);
924 allocated_metadata
926 .header()
927 .protocol
928 .store(self.backend.id(), Ordering::Relaxed);
929
930 GLOBAL_VALIDATOR
932 .read()
933 .add(confirmed_metadata.owned.clone());
934
935 let info = ShmBufInfo::new(
937 len,
938 MetadataDescriptor::from(&confirmed_metadata.owned),
939 allocated_metadata
940 .header()
941 .generation
942 .load(Ordering::SeqCst),
943 );
944
945 let shmb = ShmBufInner {
947 metadata: confirmed_metadata,
948 buf: chunk.data,
949 info,
950 };
951
952 zlock!(self.busy_list).push(BusyChunk::new(allocated_metadata));
954
955 shmb
956 }
957}
958
959impl<Backend> ShmProvider<Backend>
961where
962 Backend: ShmProviderBackend + Sync,
963{
964 async fn alloc_inner_async<Policy>(
965 &self,
966 size: NonZeroUsize,
967 backend_layout: &MemoryLayout,
968 policy: &Policy,
969 ) -> Result<ZShmMut, ZAllocError>
970 where
971 Policy: AsyncAllocPolicy<Backend>,
972 {
973 let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
975
976 let chunk = policy.alloc_async(backend_layout, self).await?;
983
984 let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
986 Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
987 }
988}