1use std::{
16 error::Error,
17 future::{Future, IntoFuture},
18 marker::PhantomData,
19 mem::MaybeUninit,
20 num::NonZeroUsize,
21 pin::Pin,
22 sync::{atomic::Ordering, Mutex},
23 time::Duration,
24};
25
26use async_trait::async_trait;
27use zenoh_core::{zlock, zresult::ZResult, Resolvable, Wait};
28
29use super::{
30 chunk::{AllocatedChunk, ChunkDescriptor},
31 shm_provider_backend::ShmProviderBackend,
32 types::{ChunkAllocResult, ZAllocError, ZLayoutAllocError, ZLayoutError},
33};
34use crate::{
35 api::{
36 buffer::{typed::Typed, zshmmut::ZShmMut},
37 protocol_implementations::posix::posix_shm_provider_backend::{
38 PosixShmProviderBackend, PosixShmProviderBackendBuilder,
39 },
40 provider::memory_layout::{MemoryLayout, TypedLayout},
41 },
42 metadata::{
43 allocated_descriptor::AllocatedMetadataDescriptor, descriptor::MetadataDescriptor,
44 storage::GLOBAL_METADATA_STORAGE,
45 },
46 watchdog::{
47 confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR},
48 validator::GLOBAL_VALIDATOR,
49 },
50 ShmBufInfo, ShmBufInner,
51};
52
53pub trait AllocLayout {
55 type Buffer;
57 fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError>;
59 unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer;
65}
66
67impl<T: TryInto<MemoryLayout>> AllocLayout for T
68where
69 T::Error: Into<ZLayoutError>,
70{
71 type Buffer = ZShmMut;
72
73 fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError> {
74 self.try_into().map_err(Into::into)
75 }
76
77 unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer {
78 buffer
79 }
80}
81
82impl<T> AllocLayout for TypedLayout<T> {
83 type Buffer = Typed<MaybeUninit<T>, ZShmMut>;
84
85 fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError> {
86 Ok(MemoryLayout::for_type::<T>())
87 }
88
89 unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer {
90 unsafe { Typed::new_unchecked(buffer) }
92 }
93}
94
95#[zenoh_macros::unstable_doc]
100#[derive(Debug)]
101pub struct PrecomputedLayout<'a, Backend, Layout> {
102 size: NonZeroUsize,
103 provider_layout: MemoryLayout,
104 provider: &'a ShmProvider<Backend>,
105 _phantom: PhantomData<Layout>,
106}
107
108impl<'a, Backend, Layout> PrecomputedLayout<'a, Backend, Layout>
109where
110 Backend: ShmProviderBackend,
111{
112 #[zenoh_macros::unstable_doc]
114 pub fn alloc<'b>(&'b self) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout> {
115 PrecomputedAllocBuilder {
116 layout: self,
117 policy: JustAlloc,
118 }
119 }
120}
121
122impl<'a, Backend, Layout> PrecomputedLayout<'a, Backend, Layout>
123where
124 Backend: ShmProviderBackend,
125{
126 fn new(
127 provider: &'a ShmProvider<Backend>,
128 required_layout: MemoryLayout,
129 ) -> Result<Self, ZLayoutError> {
130 let size = required_layout.size();
135
136 let provider_layout = provider
138 .backend
139 .layout_for(required_layout)
140 .map_err(|_| ZLayoutError::ProviderIncompatibleLayout)?;
141
142 Ok(Self {
143 size,
144 provider_layout,
145 provider,
146 _phantom: PhantomData,
147 })
148 }
149}
150
151#[zenoh_macros::unstable_doc]
153pub trait AllocPolicy<Backend> {
154 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult;
155}
156
157#[zenoh_macros::unstable_doc]
159#[async_trait]
160pub trait AsyncAllocPolicy<Backend>: Send {
161 async fn alloc_async(
162 &self,
163 layout: &MemoryLayout,
164 provider: &ShmProvider<Backend>,
165 ) -> ChunkAllocResult;
166}
167
168pub unsafe trait SafePolicy {}
176
177#[zenoh_macros::unstable_doc]
179pub trait ConstPolicy {
180 const NEW: Self;
181}
182
183pub trait PolicyValue<T> {
185 fn get(&self) -> T;
186}
187
188impl<T: Copy> PolicyValue<T> for T {
189 fn get(&self) -> T {
190 *self
191 }
192}
193pub struct ConstBool<const VALUE: bool>;
195impl<const VALUE: bool> ConstPolicy for ConstBool<VALUE> {
196 const NEW: Self = Self;
197}
198impl<const VALUE: bool> PolicyValue<bool> for ConstBool<VALUE> {
199 fn get(&self) -> bool {
200 VALUE
201 }
202}
203pub struct ConstUsize<const VALUE: usize>;
205impl<const VALUE: usize> ConstPolicy for ConstUsize<VALUE> {
206 const NEW: Self = Self;
207}
208impl<const VALUE: usize> PolicyValue<usize> for ConstUsize<VALUE> {
209 fn get(&self) -> usize {
210 VALUE
211 }
212}
213
214#[zenoh_macros::unstable_doc]
216#[derive(Clone, Copy)]
217pub struct JustAlloc;
218impl<Backend> AllocPolicy<Backend> for JustAlloc
219where
220 Backend: ShmProviderBackend,
221{
222 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
223 provider.backend.alloc(layout)
224 }
225}
226unsafe impl SafePolicy for JustAlloc {}
227impl ConstPolicy for JustAlloc {
228 const NEW: Self = Self;
229}
230
231#[zenoh_macros::unstable_doc]
238#[derive(Clone, Copy)]
239pub struct GarbageCollect<InnerPolicy = JustAlloc, AltPolicy = JustAlloc, Safe = ConstBool<true>> {
240 inner_policy: InnerPolicy,
241 alt_policy: AltPolicy,
242 safe: Safe,
243}
244impl<InnerPolicy, AltPolicy, Safe> GarbageCollect<InnerPolicy, AltPolicy, Safe> {
245 pub fn new(inner_policy: InnerPolicy, alt_policy: AltPolicy, safe: Safe) -> Self {
247 Self {
248 inner_policy,
249 alt_policy,
250 safe,
251 }
252 }
253}
254impl<Backend, InnerPolicy, AltPolicy, Safe> AllocPolicy<Backend>
255 for GarbageCollect<InnerPolicy, AltPolicy, Safe>
256where
257 Backend: ShmProviderBackend,
258 InnerPolicy: AllocPolicy<Backend>,
259 AltPolicy: AllocPolicy<Backend>,
260 Safe: PolicyValue<bool>,
261{
262 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
263 let result = self.inner_policy.alloc(layout, provider);
264 if result.is_err() {
265 let collected = if self.safe.get() {
267 provider.garbage_collect()
268 } else {
269 unsafe { provider.garbage_collect_unsafe() }
272 };
273 if collected >= layout.size().get() {
274 return self.alt_policy.alloc(layout, provider);
275 }
276 }
277 result
278 }
279}
280unsafe impl<InnerPolicy: SafePolicy, AltPolicy: SafePolicy> SafePolicy
281 for GarbageCollect<InnerPolicy, AltPolicy, ConstBool<true>>
282{
283}
284impl<InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy, Safe: ConstPolicy> ConstPolicy
285 for GarbageCollect<InnerPolicy, AltPolicy, Safe>
286{
287 const NEW: Self = Self {
288 inner_policy: InnerPolicy::NEW,
289 alt_policy: AltPolicy::NEW,
290 safe: Safe::NEW,
291 };
292}
293
294#[zenoh_macros::unstable_doc]
299#[derive(Clone, Copy)]
300pub struct Defragment<InnerPolicy = JustAlloc, AltPolicy = JustAlloc> {
301 inner_policy: InnerPolicy,
302 alt_policy: AltPolicy,
303}
304impl<InnerPolicy, AltPolicy> Defragment<InnerPolicy, AltPolicy> {
305 pub fn new(inner_policy: InnerPolicy, alt_policy: AltPolicy) -> Self {
307 Self {
308 inner_policy,
309 alt_policy,
310 }
311 }
312}
313impl<Backend, InnerPolicy, AltPolicy> AllocPolicy<Backend> for Defragment<InnerPolicy, AltPolicy>
314where
315 Backend: ShmProviderBackend,
316 InnerPolicy: AllocPolicy<Backend>,
317 AltPolicy: AllocPolicy<Backend>,
318{
319 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
320 let result = self.inner_policy.alloc(layout, provider);
321 if let Err(ZAllocError::NeedDefragment) = result {
322 if provider.defragment() >= layout.size().get() {
324 return self.alt_policy.alloc(layout, provider);
325 }
326 }
327 result
328 }
329}
330unsafe impl<InnerPolicy: SafePolicy, AltPolicy: SafePolicy> SafePolicy
331 for Defragment<InnerPolicy, AltPolicy>
332{
333}
334impl<InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy> ConstPolicy
335 for Defragment<InnerPolicy, AltPolicy>
336{
337 const NEW: Self = Self {
338 inner_policy: InnerPolicy::NEW,
339 alt_policy: AltPolicy::NEW,
340 };
341}
342
343#[zenoh_macros::unstable_doc]
349#[derive(Clone, Copy)]
350pub struct Deallocate<Limit, InnerPolicy = JustAlloc, AltPolicy = InnerPolicy> {
351 limit: Limit,
352 inner_policy: InnerPolicy,
353 alt_policy: AltPolicy,
354}
355impl<Limit, InnerPolicy, AltPolicy> Deallocate<Limit, InnerPolicy, AltPolicy> {
356 pub fn new(limit: Limit, inner_policy: InnerPolicy, alt_policy: AltPolicy) -> Self {
358 Self {
359 limit,
360 inner_policy,
361 alt_policy,
362 }
363 }
364}
365impl<Backend, Limit, InnerPolicy, AltPolicy> AllocPolicy<Backend>
366 for Deallocate<Limit, InnerPolicy, AltPolicy>
367where
368 Backend: ShmProviderBackend,
369 Limit: PolicyValue<usize>,
370 InnerPolicy: AllocPolicy<Backend>,
371 AltPolicy: AllocPolicy<Backend>,
372{
373 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
374 for _ in 0..self.limit.get() {
375 match self.inner_policy.alloc(layout, provider) {
376 res @ (Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory)) => {
377 let Some(chunk) = zlock!(provider.busy_list).pop() else {
378 return res;
379 };
380 provider.backend.free(&chunk.descriptor());
381 }
382 res => return res,
383 }
384 }
385 self.alt_policy.alloc(layout, provider)
386 }
387}
388impl<Limit: ConstPolicy, InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy> ConstPolicy
389 for Deallocate<Limit, InnerPolicy, AltPolicy>
390{
391 const NEW: Self = Self {
392 limit: Limit::NEW,
393 inner_policy: InnerPolicy::NEW,
394 alt_policy: AltPolicy::NEW,
395 };
396}
397
398#[zenoh_macros::unstable_doc]
403#[derive(Clone, Copy)]
404pub struct BlockOn<InnerPolicy = JustAlloc> {
405 inner_policy: InnerPolicy,
406}
407impl<InnerPolicy> BlockOn<InnerPolicy> {
408 pub fn new(inner_policy: InnerPolicy) -> Self {
410 Self { inner_policy }
411 }
412}
413#[async_trait]
414impl<Backend, InnerPolicy> AsyncAllocPolicy<Backend> for BlockOn<InnerPolicy>
415where
416 Backend: ShmProviderBackend + Sync,
417 InnerPolicy: AllocPolicy<Backend> + Send + Sync,
418{
419 async fn alloc_async(
420 &self,
421 layout: &MemoryLayout,
422 provider: &ShmProvider<Backend>,
423 ) -> ChunkAllocResult {
424 loop {
425 match self.inner_policy.alloc(layout, provider) {
426 Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
427 tokio::time::sleep(Duration::from_millis(1)).await;
429 }
430 other_result => {
431 return other_result;
432 }
433 }
434 }
435 }
436}
437unsafe impl<InnerPolicy: SafePolicy> SafePolicy for BlockOn<InnerPolicy> {}
438impl<Backend, InnerPolicy> AllocPolicy<Backend> for BlockOn<InnerPolicy>
439where
440 Backend: ShmProviderBackend,
441 InnerPolicy: AllocPolicy<Backend>,
442{
443 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
444 loop {
445 match self.inner_policy.alloc(layout, provider) {
446 Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
447 std::thread::sleep(Duration::from_millis(1));
449 }
450 other_result => {
451 return other_result;
452 }
453 }
454 }
455 }
456}
457impl<InnerPolicy: ConstPolicy> ConstPolicy for BlockOn<InnerPolicy> {
458 const NEW: Self = Self {
459 inner_policy: InnerPolicy::NEW,
460 };
461}
462
463#[zenoh_macros::unstable_doc]
464pub struct AllocBuilder<'a, Backend, Layout, Policy = JustAlloc> {
465 provider: &'a ShmProvider<Backend>,
466 layout: Layout,
467 policy: Policy,
468}
469
470impl<'a, Backend, Layout, Policy> AllocBuilder<'a, Backend, Layout, Policy> {
472 #[zenoh_macros::unstable_doc]
474 pub fn with_policy<OtherPolicy: SafePolicy + ConstPolicy>(
475 self,
476 ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
477 unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
479 }
480
481 #[zenoh_macros::unstable_doc]
487 pub unsafe fn with_unsafe_policy<OtherPolicy: ConstPolicy>(
488 self,
489 ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
490 unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
492 }
493
494 #[zenoh_macros::unstable_doc]
500 pub unsafe fn with_runtime_policy<OtherPolicy>(
501 self,
502 policy: OtherPolicy,
503 ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
504 AllocBuilder {
505 provider: self.provider,
506 layout: self.layout,
507 policy,
508 }
509 }
510
511 fn layout_policy(self) -> Result<(PrecomputedLayout<'a, Backend, Layout>, Policy), ZLayoutError>
512 where
513 Backend: ShmProviderBackend,
514 Layout: AllocLayout,
515 {
516 let layout = PrecomputedLayout::new(self.provider, self.layout.memory_layout()?)?;
517 Ok((layout, self.policy))
518 }
519}
520
521impl<'a, Backend: ShmProviderBackend, Layout: TryInto<MemoryLayout>>
522 AllocBuilder<'a, Backend, Layout>
523where
524 Layout::Error: Into<ZLayoutError>,
525{
526 #[deprecated(
528 since = "1.5.3",
529 note = "Please use `ShmProvider::alloc_layout` method instead"
530 )]
531 #[zenoh_macros::unstable_doc]
532 pub fn into_layout(self) -> Result<PrecomputedLayout<'a, Backend, Layout>, ZLayoutError> {
533 PrecomputedLayout::new(self.provider, self.layout.try_into().map_err(Into::into)?)
534 }
535}
536
537impl<Backend, Layout: AllocLayout, Policy> Resolvable
538 for AllocBuilder<'_, Backend, Layout, Policy>
539{
540 type To = Result<Layout::Buffer, ZLayoutAllocError>;
541}
542
543impl<Backend: ShmProviderBackend, Layout: AllocLayout, Policy: AllocPolicy<Backend>> Wait
544 for AllocBuilder<'_, Backend, Layout, Policy>
545{
546 fn wait(self) -> <Self as Resolvable>::To {
547 let (layout, policy) = self.layout_policy()?;
548 Ok(unsafe { layout.alloc().with_runtime_policy(policy) }.wait()?)
550 }
551}
552
553impl<
554 'a,
555 Backend: ShmProviderBackend + Sync,
556 Layout: AllocLayout + Send + Sync + 'a,
557 Policy: AsyncAllocPolicy<Backend> + Sync + 'a,
558 > IntoFuture for AllocBuilder<'a, Backend, Layout, Policy>
559{
560 type Output = <Self as Resolvable>::To;
561 type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'a + Send>>;
562
563 fn into_future(self) -> Self::IntoFuture {
564 Box::pin(async move {
565 let (layout, policy) = self.layout_policy()?;
566 Ok(unsafe { layout.alloc().with_runtime_policy(policy) }.await?)
568 })
569 }
570}
571
572#[zenoh_macros::unstable_doc]
574pub struct PrecomputedAllocBuilder<'b, 'a: 'b, Backend, Layout, Policy = JustAlloc> {
575 layout: &'b PrecomputedLayout<'a, Backend, Layout>,
576 policy: Policy,
577}
578
579impl<'b, 'a: 'b, Backend, Layout, Policy> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, Policy> {
581 #[zenoh_macros::unstable_doc]
583 pub fn with_policy<OtherPolicy: ConstPolicy>(
584 self,
585 ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
586 unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
588 }
589
590 #[zenoh_macros::unstable_doc]
596 pub unsafe fn with_unsafe_policy<OtherPolicy: ConstPolicy>(
597 self,
598 ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
599 unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
601 }
602
603 #[zenoh_macros::unstable_doc]
609 pub unsafe fn with_runtime_policy<OtherPolicy>(
610 self,
611 policy: OtherPolicy,
612 ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
613 PrecomputedAllocBuilder {
614 layout: self.layout,
615 policy,
616 }
617 }
618}
619
620impl<Backend, Layout: AllocLayout, Policy> Resolvable
621 for PrecomputedAllocBuilder<'_, '_, Backend, Layout, Policy>
622{
623 type To = Result<Layout::Buffer, ZAllocError>;
624}
625
626impl<Backend: ShmProviderBackend, Layout: AllocLayout, Policy: AllocPolicy<Backend>> Wait
627 for PrecomputedAllocBuilder<'_, '_, Backend, Layout, Policy>
628{
629 fn wait(self) -> <Self as Resolvable>::To {
630 let buffer = self.layout.provider.alloc_inner(
631 self.layout.size,
632 &self.layout.provider_layout,
633 &self.policy,
634 )?;
635 Ok(unsafe { Layout::wrap_buffer(buffer) })
637 }
638}
639
640impl<
641 'b,
642 'a: 'b,
643 Backend: ShmProviderBackend + Sync,
644 Layout: AllocLayout + Sync,
645 Policy: AsyncAllocPolicy<Backend> + Sync + 'b,
646 > IntoFuture for PrecomputedAllocBuilder<'b, 'a, Backend, Layout, Policy>
647{
648 type Output = <Self as Resolvable>::To;
649 type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'b + Send>>;
650
651 fn into_future(self) -> Self::IntoFuture {
652 Box::pin(async move {
653 let buffer = self
654 .layout
655 .provider
656 .alloc_inner_async(self.layout.size, &self.layout.provider_layout, &self.policy)
657 .await?;
658 Ok(unsafe { Layout::wrap_buffer(buffer) })
660 })
661 }
662}
663
664#[zenoh_macros::unstable_doc]
665pub struct ShmProviderBuilder;
666impl ShmProviderBuilder {
667 #[zenoh_macros::unstable_doc]
669 pub fn backend<Backend: ShmProviderBackend>(
670 backend: Backend,
671 ) -> ShmProviderBuilderBackend<Backend> {
672 ShmProviderBuilderBackend { backend }
673 }
674
675 #[zenoh_macros::unstable_doc]
677 pub fn default_backend<Layout>(layout: Layout) -> ShmProviderBuilderWithDefaultBackend<Layout> {
678 ShmProviderBuilderWithDefaultBackend { layout }
679 }
680}
681
682#[zenoh_macros::unstable_doc]
683pub struct ShmProviderBuilderBackend<Backend>
684where
685 Backend: ShmProviderBackend,
686{
687 backend: Backend,
688}
689#[zenoh_macros::unstable_doc]
690impl<Backend> Resolvable for ShmProviderBuilderBackend<Backend>
691where
692 Backend: ShmProviderBackend,
693{
694 type To = ShmProvider<Backend>;
695}
696
697#[zenoh_macros::unstable_doc]
698impl<Backend> Wait for ShmProviderBuilderBackend<Backend>
699where
700 Backend: ShmProviderBackend,
701{
702 fn wait(self) -> <Self as Resolvable>::To {
704 ShmProvider::new(self.backend)
705 }
706}
707
708#[zenoh_macros::unstable_doc]
709pub struct ShmProviderBuilderWithDefaultBackend<Layout> {
710 layout: Layout,
711}
712
713#[zenoh_macros::unstable_doc]
714impl<Layout> Resolvable for ShmProviderBuilderWithDefaultBackend<Layout> {
715 type To = ZResult<ShmProvider<PosixShmProviderBackend>>;
716}
717
718#[zenoh_macros::unstable_doc]
719impl<Layout: TryInto<MemoryLayout>> Wait for ShmProviderBuilderWithDefaultBackend<Layout>
720where
721 Layout::Error: Error,
722 PosixShmProviderBackendBuilder<Layout>:
723 Resolvable<To = ZResult<PosixShmProviderBackend>> + Wait,
724{
725 fn wait(self) -> <Self as Resolvable>::To {
727 let backend = PosixShmProviderBackend::builder(self.layout).wait()?;
728 Ok(ShmProvider::new(backend))
729 }
730}
731
732#[derive(Debug)]
733struct BusyChunk {
734 metadata: AllocatedMetadataDescriptor,
735}
736
737impl BusyChunk {
738 fn new(metadata: AllocatedMetadataDescriptor) -> Self {
739 Self { metadata }
740 }
741
742 fn descriptor(&self) -> ChunkDescriptor {
743 self.metadata.header().data_descriptor()
744 }
745}
746
747#[zenoh_macros::unstable_doc]
749#[derive(Debug)]
750pub struct ShmProvider<Backend> {
751 backend: Backend,
752 busy_list: Mutex<Vec<BusyChunk>>,
753}
754
755impl<Backend: Default> Default for ShmProvider<Backend> {
756 fn default() -> Self {
757 Self::new(Backend::default())
758 }
759}
760
761impl<Backend> ShmProvider<Backend>
762where
763 Backend: ShmProviderBackend,
764{
765 #[zenoh_macros::unstable_doc]
767 pub fn alloc<Layout>(&self, layout: Layout) -> AllocBuilder<'_, Backend, Layout> {
768 AllocBuilder {
769 provider: self,
770 layout,
771 policy: JustAlloc,
772 }
773 }
774
775 #[zenoh_macros::unstable_doc]
777 pub fn alloc_layout<Layout: TryInto<MemoryLayout>>(
778 &self,
779 layout: Layout,
780 ) -> Result<PrecomputedLayout<'_, Backend, Layout>, ZLayoutError>
781 where
782 Layout::Error: Into<ZLayoutError>,
783 {
784 PrecomputedLayout::new(self, layout.try_into().map_err(Into::into)?)
785 }
786 #[zenoh_macros::unstable_doc]
788 pub fn defragment(&self) -> usize {
789 self.backend.defragment()
790 }
791
792 #[zenoh_macros::unstable_doc]
797 pub fn map(&self, chunk: AllocatedChunk, len: usize) -> Result<ZShmMut, ZAllocError> {
798 let len = len.try_into().map_err(|_| ZAllocError::Other)?;
799
800 let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
802
803 let wrapped = self.wrap(chunk, len, allocated_metadata, confirmed_metadata);
805 Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
806 }
807
808 #[zenoh_macros::unstable_doc]
809 fn garbage_collect_impl<const SAFE: bool>(&self) -> usize {
810 fn retain_unordered<T>(vec: &mut Vec<T>, mut f: impl FnMut(&T) -> bool) {
811 let mut i = 0;
812 while i < vec.len() {
813 if f(&vec[i]) {
814 i += 1;
815 } else {
816 vec.swap_remove(i); }
819 }
820 }
821
822 tracing::trace!("Running Garbage Collector");
823 let mut largest = 0usize;
824 retain_unordered(&mut zlock!(self.busy_list), |chunk| {
825 let header = chunk.metadata.header();
826 if header.refcount.load(Ordering::SeqCst) == 0
827 || (!SAFE && header.watchdog_invalidated.load(Ordering::SeqCst))
828 {
829 tracing::trace!("Garbage Collecting Chunk: {:?}", chunk);
830 let descriptor_to_free = chunk.descriptor();
831 self.backend.free(&descriptor_to_free);
832 largest = largest.max(descriptor_to_free.len.get());
833 return false;
834 }
835 true
836 });
837 largest
838 }
839
840 #[zenoh_macros::unstable_doc]
844 pub fn garbage_collect(&self) -> usize {
845 self.garbage_collect_impl::<true>()
846 }
847
848 #[zenoh_macros::unstable_doc]
856 pub unsafe fn garbage_collect_unsafe(&self) -> usize {
857 self.garbage_collect_impl::<false>()
858 }
859
860 #[zenoh_macros::unstable_doc]
862 pub fn available(&self) -> usize {
863 self.backend.available()
864 }
865}
866
867impl<Backend> ShmProvider<Backend> {
869 fn new(backend: Backend) -> Self {
870 Self {
871 backend,
872 busy_list: Default::default(),
873 }
874 }
875}
876
877impl<Backend> ShmProvider<Backend>
878where
879 Backend: ShmProviderBackend,
880{
881 fn alloc_inner<Policy>(
882 &self,
883 size: NonZeroUsize,
884 layout: &MemoryLayout,
885 policy: &Policy,
886 ) -> Result<ZShmMut, ZAllocError>
887 where
888 Policy: AllocPolicy<Backend>,
889 {
890 let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
892
893 let chunk = policy.alloc(layout, self)?;
900
901 let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
903 Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
904 }
905
906 fn alloc_resources() -> Result<(AllocatedMetadataDescriptor, ConfirmedDescriptor), ZAllocError>
907 {
908 let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?;
910
911 let confirmed_metadata = GLOBAL_CONFIRMATOR.read().add(allocated_metadata.clone());
913
914 Ok((allocated_metadata, confirmed_metadata))
915 }
916
917 fn wrap(
918 &self,
919 chunk: AllocatedChunk,
920 len: NonZeroUsize,
921 allocated_metadata: AllocatedMetadataDescriptor,
922 confirmed_metadata: ConfirmedDescriptor,
923 ) -> ShmBufInner {
924 allocated_metadata
927 .header()
928 .set_data_descriptor(&chunk.descriptor);
929 allocated_metadata
931 .header()
932 .protocol
933 .store(self.backend.id(), Ordering::Relaxed);
934
935 GLOBAL_VALIDATOR
937 .read()
938 .add(confirmed_metadata.owned.clone());
939
940 let info = ShmBufInfo::new(
942 len,
943 MetadataDescriptor::from(&confirmed_metadata.owned),
944 allocated_metadata
945 .header()
946 .generation
947 .load(Ordering::SeqCst),
948 );
949
950 let shmb = ShmBufInner {
952 metadata: confirmed_metadata,
953 buf: chunk.data,
954 info,
955 };
956
957 zlock!(self.busy_list).push(BusyChunk::new(allocated_metadata));
959
960 shmb
961 }
962}
963
964impl<Backend> ShmProvider<Backend>
966where
967 Backend: ShmProviderBackend + Sync,
968{
969 async fn alloc_inner_async<Policy>(
970 &self,
971 size: NonZeroUsize,
972 backend_layout: &MemoryLayout,
973 policy: &Policy,
974 ) -> Result<ZShmMut, ZAllocError>
975 where
976 Policy: AsyncAllocPolicy<Backend>,
977 {
978 let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
980
981 let chunk = policy.alloc_async(backend_layout, self).await?;
988
989 let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
991 Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
992 }
993}