1use std::{
16 error::Error,
17 future::{Future, IntoFuture},
18 marker::PhantomData,
19 mem::MaybeUninit,
20 num::NonZeroUsize,
21 pin::Pin,
22 sync::{atomic::Ordering, Mutex},
23 time::Duration,
24};
25
26use async_trait::async_trait;
27use zenoh_core::{zlock, zresult::ZResult, Resolvable, Wait};
28
29use super::{
30 chunk::{AllocatedChunk, ChunkDescriptor},
31 shm_provider_backend::ShmProviderBackend,
32 types::{ChunkAllocResult, ZAllocError, ZLayoutAllocError, ZLayoutError},
33};
34use crate::{
35 api::{
36 buffer::{typed::Typed, zshmmut::ZShmMut},
37 protocol_implementations::posix::posix_shm_provider_backend::{
38 PosixShmProviderBackend, PosixShmProviderBackendBuilder,
39 },
40 provider::memory_layout::{MemoryLayout, TypedLayout},
41 },
42 metadata::{
43 allocated_descriptor::AllocatedMetadataDescriptor, descriptor::MetadataDescriptor,
44 storage::GLOBAL_METADATA_STORAGE,
45 },
46 watchdog::{
47 confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR},
48 validator::GLOBAL_VALIDATOR,
49 },
50 ShmBufInfo, ShmBufInner,
51};
52
53pub trait AllocLayout {
55 type Buffer;
57 fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError>;
59 unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer;
65}
66
67impl<T: TryInto<MemoryLayout>> AllocLayout for T
68where
69 T::Error: Into<ZLayoutError>,
70{
71 type Buffer = ZShmMut;
72
73 fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError> {
74 self.try_into().map_err(Into::into)
75 }
76
77 unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer {
81 buffer
82 }
83}
84
85impl<T> AllocLayout for TypedLayout<T> {
86 type Buffer = Typed<MaybeUninit<T>, ZShmMut>;
87
88 fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError> {
89 Ok(MemoryLayout::for_type::<T>())
90 }
91
92 unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer {
96 unsafe { Typed::new_unchecked(buffer) }
98 }
99}
100
101#[zenoh_macros::unstable_doc]
106#[derive(Debug)]
107pub struct PrecomputedLayout<'a, Backend, Layout> {
108 size: NonZeroUsize,
109 provider_layout: MemoryLayout,
110 provider: &'a ShmProvider<Backend>,
111 _phantom: PhantomData<Layout>,
112}
113
114impl<'a, Backend, Layout> PrecomputedLayout<'a, Backend, Layout>
115where
116 Backend: ShmProviderBackend,
117{
118 #[zenoh_macros::unstable_doc]
120 pub fn alloc<'b>(&'b self) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout> {
121 PrecomputedAllocBuilder {
122 layout: self,
123 policy: JustAlloc,
124 }
125 }
126}
127
128impl<'a, Backend, Layout> PrecomputedLayout<'a, Backend, Layout>
129where
130 Backend: ShmProviderBackend,
131{
132 fn new(
133 provider: &'a ShmProvider<Backend>,
134 required_layout: MemoryLayout,
135 ) -> Result<Self, ZLayoutError> {
136 let size = required_layout.size();
141
142 let provider_layout = provider
144 .backend
145 .layout_for(required_layout)
146 .map_err(|_| ZLayoutError::ProviderIncompatibleLayout)?;
147
148 Ok(Self {
149 size,
150 provider_layout,
151 provider,
152 _phantom: PhantomData,
153 })
154 }
155}
156
157#[zenoh_macros::unstable_doc]
159pub trait AllocPolicy<Backend> {
160 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult;
161}
162
163#[zenoh_macros::unstable_doc]
165#[async_trait]
166pub trait AsyncAllocPolicy<Backend>: Send {
167 async fn alloc_async(
168 &self,
169 layout: &MemoryLayout,
170 provider: &ShmProvider<Backend>,
171 ) -> ChunkAllocResult;
172}
173
174pub unsafe trait SafePolicy {}
182
183#[zenoh_macros::unstable_doc]
185pub trait ConstPolicy {
186 const NEW: Self;
187}
188
189pub trait PolicyValue<T> {
191 fn get(&self) -> T;
192}
193
194impl<T: Copy> PolicyValue<T> for T {
195 fn get(&self) -> T {
196 *self
197 }
198}
199pub struct ConstBool<const VALUE: bool>;
201impl<const VALUE: bool> ConstPolicy for ConstBool<VALUE> {
202 const NEW: Self = Self;
203}
204impl<const VALUE: bool> PolicyValue<bool> for ConstBool<VALUE> {
205 fn get(&self) -> bool {
206 VALUE
207 }
208}
209pub struct ConstUsize<const VALUE: usize>;
211impl<const VALUE: usize> ConstPolicy for ConstUsize<VALUE> {
212 const NEW: Self = Self;
213}
214impl<const VALUE: usize> PolicyValue<usize> for ConstUsize<VALUE> {
215 fn get(&self) -> usize {
216 VALUE
217 }
218}
219
220#[zenoh_macros::unstable_doc]
222#[derive(Clone, Copy)]
223pub struct JustAlloc;
224impl<Backend> AllocPolicy<Backend> for JustAlloc
225where
226 Backend: ShmProviderBackend,
227{
228 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
229 provider.backend.alloc(layout)
230 }
231}
232unsafe impl SafePolicy for JustAlloc {}
233impl ConstPolicy for JustAlloc {
234 const NEW: Self = Self;
235}
236
237#[zenoh_macros::unstable_doc]
244#[derive(Clone, Copy)]
245pub struct GarbageCollect<InnerPolicy = JustAlloc, AltPolicy = JustAlloc, Safe = ConstBool<true>> {
246 inner_policy: InnerPolicy,
247 alt_policy: AltPolicy,
248 safe: Safe,
249}
250impl<InnerPolicy, AltPolicy, Safe> GarbageCollect<InnerPolicy, AltPolicy, Safe> {
251 pub fn new(inner_policy: InnerPolicy, alt_policy: AltPolicy, safe: Safe) -> Self {
253 Self {
254 inner_policy,
255 alt_policy,
256 safe,
257 }
258 }
259}
260impl<Backend, InnerPolicy, AltPolicy, Safe> AllocPolicy<Backend>
261 for GarbageCollect<InnerPolicy, AltPolicy, Safe>
262where
263 Backend: ShmProviderBackend,
264 InnerPolicy: AllocPolicy<Backend>,
265 AltPolicy: AllocPolicy<Backend>,
266 Safe: PolicyValue<bool>,
267{
268 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
269 let result = self.inner_policy.alloc(layout, provider);
270 if result.is_err() {
271 let collected = if self.safe.get() {
273 provider.garbage_collect()
274 } else {
275 unsafe { provider.garbage_collect_unsafe() }
278 };
279 if collected >= layout.size().get() {
280 return self.alt_policy.alloc(layout, provider);
281 }
282 }
283 result
284 }
285}
286unsafe impl<InnerPolicy: SafePolicy, AltPolicy: SafePolicy> SafePolicy
287 for GarbageCollect<InnerPolicy, AltPolicy, ConstBool<true>>
288{
289}
290impl<InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy, Safe: ConstPolicy> ConstPolicy
291 for GarbageCollect<InnerPolicy, AltPolicy, Safe>
292{
293 const NEW: Self = Self {
294 inner_policy: InnerPolicy::NEW,
295 alt_policy: AltPolicy::NEW,
296 safe: Safe::NEW,
297 };
298}
299
300#[zenoh_macros::unstable_doc]
305#[derive(Clone, Copy)]
306pub struct Defragment<InnerPolicy = JustAlloc, AltPolicy = JustAlloc> {
307 inner_policy: InnerPolicy,
308 alt_policy: AltPolicy,
309}
310impl<InnerPolicy, AltPolicy> Defragment<InnerPolicy, AltPolicy> {
311 pub fn new(inner_policy: InnerPolicy, alt_policy: AltPolicy) -> Self {
313 Self {
314 inner_policy,
315 alt_policy,
316 }
317 }
318}
319impl<Backend, InnerPolicy, AltPolicy> AllocPolicy<Backend> for Defragment<InnerPolicy, AltPolicy>
320where
321 Backend: ShmProviderBackend,
322 InnerPolicy: AllocPolicy<Backend>,
323 AltPolicy: AllocPolicy<Backend>,
324{
325 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
326 let result = self.inner_policy.alloc(layout, provider);
327 if let Err(ZAllocError::NeedDefragment) = result {
328 if provider.defragment() >= layout.size().get() {
330 return self.alt_policy.alloc(layout, provider);
331 }
332 }
333 result
334 }
335}
336unsafe impl<InnerPolicy: SafePolicy, AltPolicy: SafePolicy> SafePolicy
337 for Defragment<InnerPolicy, AltPolicy>
338{
339}
340impl<InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy> ConstPolicy
341 for Defragment<InnerPolicy, AltPolicy>
342{
343 const NEW: Self = Self {
344 inner_policy: InnerPolicy::NEW,
345 alt_policy: AltPolicy::NEW,
346 };
347}
348
349#[zenoh_macros::unstable_doc]
355#[derive(Clone, Copy)]
356pub struct Deallocate<Limit, InnerPolicy = JustAlloc, AltPolicy = InnerPolicy> {
357 limit: Limit,
358 inner_policy: InnerPolicy,
359 alt_policy: AltPolicy,
360}
361impl<Limit, InnerPolicy, AltPolicy> Deallocate<Limit, InnerPolicy, AltPolicy> {
362 pub fn new(limit: Limit, inner_policy: InnerPolicy, alt_policy: AltPolicy) -> Self {
364 Self {
365 limit,
366 inner_policy,
367 alt_policy,
368 }
369 }
370}
371impl<Backend, Limit, InnerPolicy, AltPolicy> AllocPolicy<Backend>
372 for Deallocate<Limit, InnerPolicy, AltPolicy>
373where
374 Backend: ShmProviderBackend,
375 Limit: PolicyValue<usize>,
376 InnerPolicy: AllocPolicy<Backend>,
377 AltPolicy: AllocPolicy<Backend>,
378{
379 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
380 for _ in 0..self.limit.get() {
381 match self.inner_policy.alloc(layout, provider) {
382 res @ (Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory)) => {
383 let Some(chunk) = zlock!(provider.busy_list).pop() else {
384 return res;
385 };
386 provider.backend.free(&chunk.descriptor());
387 }
388 res => return res,
389 }
390 }
391 self.alt_policy.alloc(layout, provider)
392 }
393}
394impl<Limit: ConstPolicy, InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy> ConstPolicy
395 for Deallocate<Limit, InnerPolicy, AltPolicy>
396{
397 const NEW: Self = Self {
398 limit: Limit::NEW,
399 inner_policy: InnerPolicy::NEW,
400 alt_policy: AltPolicy::NEW,
401 };
402}
403
404#[zenoh_macros::unstable_doc]
409#[derive(Clone, Copy)]
410pub struct BlockOn<InnerPolicy = JustAlloc> {
411 inner_policy: InnerPolicy,
412}
413impl<InnerPolicy> BlockOn<InnerPolicy> {
414 pub fn new(inner_policy: InnerPolicy) -> Self {
416 Self { inner_policy }
417 }
418}
419#[async_trait]
420impl<Backend, InnerPolicy> AsyncAllocPolicy<Backend> for BlockOn<InnerPolicy>
421where
422 Backend: ShmProviderBackend + Sync,
423 InnerPolicy: AllocPolicy<Backend> + Send + Sync,
424{
425 async fn alloc_async(
426 &self,
427 layout: &MemoryLayout,
428 provider: &ShmProvider<Backend>,
429 ) -> ChunkAllocResult {
430 loop {
431 match self.inner_policy.alloc(layout, provider) {
432 Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
433 tokio::time::sleep(Duration::from_millis(1)).await;
435 }
436 other_result => {
437 return other_result;
438 }
439 }
440 }
441 }
442}
443unsafe impl<InnerPolicy: SafePolicy> SafePolicy for BlockOn<InnerPolicy> {}
444impl<Backend, InnerPolicy> AllocPolicy<Backend> for BlockOn<InnerPolicy>
445where
446 Backend: ShmProviderBackend,
447 InnerPolicy: AllocPolicy<Backend>,
448{
449 fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
450 loop {
451 match self.inner_policy.alloc(layout, provider) {
452 Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
453 std::thread::sleep(Duration::from_millis(1));
455 }
456 other_result => {
457 return other_result;
458 }
459 }
460 }
461 }
462}
463impl<InnerPolicy: ConstPolicy> ConstPolicy for BlockOn<InnerPolicy> {
464 const NEW: Self = Self {
465 inner_policy: InnerPolicy::NEW,
466 };
467}
468
469#[zenoh_macros::unstable_doc]
470pub struct AllocBuilder<'a, Backend, Layout, Policy = JustAlloc> {
471 provider: &'a ShmProvider<Backend>,
472 layout: Layout,
473 policy: Policy,
474}
475
476impl<'a, Backend, Layout, Policy> AllocBuilder<'a, Backend, Layout, Policy> {
478 #[zenoh_macros::unstable_doc]
480 pub fn with_policy<OtherPolicy: SafePolicy + ConstPolicy>(
481 self,
482 ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
483 unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
485 }
486
487 #[zenoh_macros::unstable_doc]
493 pub unsafe fn with_unsafe_policy<OtherPolicy: ConstPolicy>(
494 self,
495 ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
496 unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
498 }
499
500 #[zenoh_macros::unstable_doc]
506 pub unsafe fn with_runtime_policy<OtherPolicy>(
507 self,
508 policy: OtherPolicy,
509 ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
510 AllocBuilder {
511 provider: self.provider,
512 layout: self.layout,
513 policy,
514 }
515 }
516
517 fn layout_policy(self) -> Result<(PrecomputedLayout<'a, Backend, Layout>, Policy), ZLayoutError>
518 where
519 Backend: ShmProviderBackend,
520 Layout: AllocLayout,
521 {
522 let layout = PrecomputedLayout::new(self.provider, self.layout.memory_layout()?)?;
523 Ok((layout, self.policy))
524 }
525}
526
527impl<'a, Backend: ShmProviderBackend, Layout: TryInto<MemoryLayout>>
528 AllocBuilder<'a, Backend, Layout>
529where
530 Layout::Error: Into<ZLayoutError>,
531{
532 #[deprecated(
534 since = "1.5.3",
535 note = "Please use `ShmProvider::alloc_layout` method instead"
536 )]
537 #[zenoh_macros::unstable_doc]
538 pub fn into_layout(self) -> Result<PrecomputedLayout<'a, Backend, Layout>, ZLayoutError> {
539 PrecomputedLayout::new(self.provider, self.layout.try_into().map_err(Into::into)?)
540 }
541}
542
543impl<Backend, Layout: AllocLayout, Policy> Resolvable
544 for AllocBuilder<'_, Backend, Layout, Policy>
545{
546 type To = Result<Layout::Buffer, ZLayoutAllocError>;
547}
548
549impl<Backend: ShmProviderBackend, Layout: AllocLayout, Policy: AllocPolicy<Backend>> Wait
550 for AllocBuilder<'_, Backend, Layout, Policy>
551{
552 fn wait(self) -> <Self as Resolvable>::To {
553 let (layout, policy) = self.layout_policy()?;
554 Ok(unsafe { layout.alloc().with_runtime_policy(policy) }.wait()?)
556 }
557}
558
559impl<
560 'a,
561 Backend: ShmProviderBackend + Sync,
562 Layout: AllocLayout + Send + Sync + 'a,
563 Policy: AsyncAllocPolicy<Backend> + Sync + 'a,
564 > IntoFuture for AllocBuilder<'a, Backend, Layout, Policy>
565{
566 type Output = <Self as Resolvable>::To;
567 type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'a + Send>>;
568
569 fn into_future(self) -> Self::IntoFuture {
570 Box::pin(async move {
571 let (layout, policy) = self.layout_policy()?;
572 Ok(unsafe { layout.alloc().with_runtime_policy(policy) }.await?)
574 })
575 }
576}
577
578#[zenoh_macros::unstable_doc]
580pub struct PrecomputedAllocBuilder<'b, 'a: 'b, Backend, Layout, Policy = JustAlloc> {
581 layout: &'b PrecomputedLayout<'a, Backend, Layout>,
582 policy: Policy,
583}
584
585impl<'b, 'a: 'b, Backend, Layout, Policy> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, Policy> {
587 #[zenoh_macros::unstable_doc]
589 pub fn with_policy<OtherPolicy: ConstPolicy>(
590 self,
591 ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
592 unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
594 }
595
596 #[zenoh_macros::unstable_doc]
602 pub unsafe fn with_unsafe_policy<OtherPolicy: ConstPolicy>(
603 self,
604 ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
605 unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
607 }
608
609 #[zenoh_macros::unstable_doc]
615 pub unsafe fn with_runtime_policy<OtherPolicy>(
616 self,
617 policy: OtherPolicy,
618 ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
619 PrecomputedAllocBuilder {
620 layout: self.layout,
621 policy,
622 }
623 }
624}
625
626impl<Backend, Layout: AllocLayout, Policy> Resolvable
627 for PrecomputedAllocBuilder<'_, '_, Backend, Layout, Policy>
628{
629 type To = Result<Layout::Buffer, ZAllocError>;
630}
631
632impl<Backend: ShmProviderBackend, Layout: AllocLayout, Policy: AllocPolicy<Backend>> Wait
633 for PrecomputedAllocBuilder<'_, '_, Backend, Layout, Policy>
634{
635 fn wait(self) -> <Self as Resolvable>::To {
636 let buffer = self.layout.provider.alloc_inner(
637 self.layout.size,
638 &self.layout.provider_layout,
639 &self.policy,
640 )?;
641 Ok(unsafe { Layout::wrap_buffer(buffer) })
643 }
644}
645
646impl<
647 'b,
648 'a: 'b,
649 Backend: ShmProviderBackend + Sync,
650 Layout: AllocLayout + Sync,
651 Policy: AsyncAllocPolicy<Backend> + Sync + 'b,
652 > IntoFuture for PrecomputedAllocBuilder<'b, 'a, Backend, Layout, Policy>
653{
654 type Output = <Self as Resolvable>::To;
655 type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'b + Send>>;
656
657 fn into_future(self) -> Self::IntoFuture {
658 Box::pin(async move {
659 let buffer = self
660 .layout
661 .provider
662 .alloc_inner_async(self.layout.size, &self.layout.provider_layout, &self.policy)
663 .await?;
664 Ok(unsafe { Layout::wrap_buffer(buffer) })
666 })
667 }
668}
669
670#[zenoh_macros::unstable_doc]
671pub struct ShmProviderBuilder;
672impl ShmProviderBuilder {
673 #[zenoh_macros::unstable_doc]
675 pub fn backend<Backend: ShmProviderBackend>(
676 backend: Backend,
677 ) -> ShmProviderBuilderBackend<Backend> {
678 ShmProviderBuilderBackend { backend }
679 }
680
681 #[zenoh_macros::unstable_doc]
683 pub fn default_backend<Layout>(layout: Layout) -> ShmProviderBuilderWithDefaultBackend<Layout> {
684 ShmProviderBuilderWithDefaultBackend { layout }
685 }
686}
687
688#[zenoh_macros::unstable_doc]
689pub struct ShmProviderBuilderBackend<Backend>
690where
691 Backend: ShmProviderBackend,
692{
693 backend: Backend,
694}
695#[zenoh_macros::unstable_doc]
696impl<Backend> Resolvable for ShmProviderBuilderBackend<Backend>
697where
698 Backend: ShmProviderBackend,
699{
700 type To = ShmProvider<Backend>;
701}
702
703#[zenoh_macros::unstable_doc]
704impl<Backend> Wait for ShmProviderBuilderBackend<Backend>
705where
706 Backend: ShmProviderBackend,
707{
708 fn wait(self) -> <Self as Resolvable>::To {
710 ShmProvider::new(self.backend)
711 }
712}
713
714#[zenoh_macros::unstable_doc]
715pub struct ShmProviderBuilderWithDefaultBackend<Layout> {
716 layout: Layout,
717}
718
719#[zenoh_macros::unstable_doc]
720impl<Layout> Resolvable for ShmProviderBuilderWithDefaultBackend<Layout> {
721 type To = ZResult<ShmProvider<PosixShmProviderBackend>>;
722}
723
724#[zenoh_macros::unstable_doc]
725impl<Layout: TryInto<MemoryLayout>> Wait for ShmProviderBuilderWithDefaultBackend<Layout>
726where
727 Layout::Error: Error,
728 PosixShmProviderBackendBuilder<Layout>:
729 Resolvable<To = ZResult<PosixShmProviderBackend>> + Wait,
730{
731 fn wait(self) -> <Self as Resolvable>::To {
733 let backend = PosixShmProviderBackend::builder(self.layout).wait()?;
734 Ok(ShmProvider::new(backend))
735 }
736}
737
738#[derive(Debug)]
739struct BusyChunk {
740 metadata: AllocatedMetadataDescriptor,
741}
742
743impl BusyChunk {
744 fn new(metadata: AllocatedMetadataDescriptor) -> Self {
745 Self { metadata }
746 }
747
748 fn descriptor(&self) -> ChunkDescriptor {
749 self.metadata.header().data_descriptor()
750 }
751}
752
753#[zenoh_macros::unstable_doc]
755#[derive(Debug)]
756pub struct ShmProvider<Backend> {
757 backend: Backend,
758 busy_list: Mutex<Vec<BusyChunk>>,
759}
760
761impl<Backend: Default> Default for ShmProvider<Backend> {
762 fn default() -> Self {
763 Self::new(Backend::default())
764 }
765}
766
767impl<Backend> ShmProvider<Backend>
768where
769 Backend: ShmProviderBackend,
770{
771 #[zenoh_macros::unstable_doc]
773 pub fn alloc<Layout>(&self, layout: Layout) -> AllocBuilder<'_, Backend, Layout> {
774 AllocBuilder {
775 provider: self,
776 layout,
777 policy: JustAlloc,
778 }
779 }
780
781 #[zenoh_macros::unstable_doc]
783 pub fn alloc_layout<Layout: TryInto<MemoryLayout>>(
784 &self,
785 layout: Layout,
786 ) -> Result<PrecomputedLayout<'_, Backend, Layout>, ZLayoutError>
787 where
788 Layout::Error: Into<ZLayoutError>,
789 {
790 PrecomputedLayout::new(self, layout.try_into().map_err(Into::into)?)
791 }
792 #[zenoh_macros::unstable_doc]
794 pub fn defragment(&self) -> usize {
795 self.backend.defragment()
796 }
797
798 #[zenoh_macros::unstable_doc]
803 pub fn map(&self, chunk: AllocatedChunk, len: usize) -> Result<ZShmMut, ZAllocError> {
804 let len = len.try_into().map_err(|_| ZAllocError::Other)?;
805
806 let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
808
809 let wrapped = self.wrap(chunk, len, allocated_metadata, confirmed_metadata);
811 Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
813 }
814
815 #[zenoh_macros::unstable_doc]
816 fn garbage_collect_impl<const SAFE: bool>(&self) -> usize {
817 fn retain_unordered<T>(vec: &mut Vec<T>, mut f: impl FnMut(&T) -> bool) {
818 let mut i = 0;
819 while i < vec.len() {
820 if f(&vec[i]) {
821 i += 1;
822 } else {
823 vec.swap_remove(i); }
826 }
827 }
828
829 tracing::trace!("Running Garbage Collector");
830 let mut largest = 0usize;
831 retain_unordered(&mut zlock!(self.busy_list), |chunk| {
832 let header = chunk.metadata.header();
833 if header.refcount.load(Ordering::SeqCst) == 0
834 || (!SAFE && header.watchdog_invalidated.load(Ordering::SeqCst))
835 {
836 tracing::trace!("Garbage Collecting Chunk: {:?}", chunk);
837 let descriptor_to_free = chunk.descriptor();
838 self.backend.free(&descriptor_to_free);
839 largest = largest.max(descriptor_to_free.len.get());
840 return false;
841 }
842 true
843 });
844 largest
845 }
846
847 #[zenoh_macros::unstable_doc]
851 pub fn garbage_collect(&self) -> usize {
852 self.garbage_collect_impl::<true>()
853 }
854
855 #[zenoh_macros::unstable_doc]
863 pub unsafe fn garbage_collect_unsafe(&self) -> usize {
864 self.garbage_collect_impl::<false>()
865 }
866
867 #[zenoh_macros::unstable_doc]
869 pub fn available(&self) -> usize {
870 self.backend.available()
871 }
872}
873
874impl<Backend> ShmProvider<Backend> {
876 fn new(backend: Backend) -> Self {
877 Self {
878 backend,
879 busy_list: Default::default(),
880 }
881 }
882}
883
884impl<Backend> ShmProvider<Backend>
885where
886 Backend: ShmProviderBackend,
887{
888 fn alloc_inner<Policy>(
889 &self,
890 size: NonZeroUsize,
891 layout: &MemoryLayout,
892 policy: &Policy,
893 ) -> Result<ZShmMut, ZAllocError>
894 where
895 Policy: AllocPolicy<Backend>,
896 {
897 let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
899
900 let chunk = policy.alloc(layout, self)?;
907
908 let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
910 Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
912 }
913
914 fn alloc_resources() -> Result<(AllocatedMetadataDescriptor, ConfirmedDescriptor), ZAllocError>
915 {
916 let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?;
918
919 let confirmed_metadata = GLOBAL_CONFIRMATOR.read().add(allocated_metadata.clone());
921
922 Ok((allocated_metadata, confirmed_metadata))
923 }
924
925 fn wrap(
926 &self,
927 chunk: AllocatedChunk,
928 len: NonZeroUsize,
929 allocated_metadata: AllocatedMetadataDescriptor,
930 confirmed_metadata: ConfirmedDescriptor,
931 ) -> ShmBufInner {
932 allocated_metadata
935 .header()
936 .set_data_descriptor(&chunk.descriptor);
937 allocated_metadata
939 .header()
940 .protocol
941 .store(self.backend.id(), Ordering::Relaxed);
942
943 GLOBAL_VALIDATOR
945 .read()
946 .add(confirmed_metadata.owned.clone());
947
948 let info = ShmBufInfo::new(
950 len,
951 MetadataDescriptor::from(&confirmed_metadata.owned),
952 allocated_metadata
953 .header()
954 .generation
955 .load(Ordering::SeqCst),
956 );
957
958 let shmb = ShmBufInner {
960 metadata: confirmed_metadata,
961 buf: chunk.data,
962 info,
963 };
964
965 zlock!(self.busy_list).push(BusyChunk::new(allocated_metadata));
967
968 shmb
969 }
970}
971
972impl<Backend> ShmProvider<Backend>
974where
975 Backend: ShmProviderBackend + Sync,
976{
977 async fn alloc_inner_async<Policy>(
978 &self,
979 size: NonZeroUsize,
980 backend_layout: &MemoryLayout,
981 policy: &Policy,
982 ) -> Result<ZShmMut, ZAllocError>
983 where
984 Policy: AsyncAllocPolicy<Backend>,
985 {
986 let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
988
989 let chunk = policy.alloc_async(backend_layout, self).await?;
996
997 let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
999 Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
1001 }
1002}