1use std::{
16 collections::VecDeque,
17 future::{Future, IntoFuture},
18 marker::PhantomData,
19 num::NonZeroUsize,
20 pin::Pin,
21 sync::{atomic::Ordering, Arc, Mutex},
22 time::Duration,
23};
24
25use async_trait::async_trait;
26use zenoh_core::{Resolvable, Wait};
27use zenoh_result::ZResult;
28
29use super::{
30 chunk::{AllocatedChunk, ChunkDescriptor},
31 shm_provider_backend::ShmProviderBackend,
32 types::{
33 AllocAlignment, BufAllocResult, BufLayoutAllocResult, ChunkAllocResult, MemoryLayout,
34 ZAllocError, ZLayoutAllocError, ZLayoutError,
35 },
36};
37use crate::{
38 api::{buffer::zshmmut::ZShmMut, common::types::ProtocolID},
39 metadata::{
40 allocated_descriptor::AllocatedMetadataDescriptor, descriptor::MetadataDescriptor,
41 storage::GLOBAL_METADATA_STORAGE,
42 },
43 watchdog::{
44 confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR},
45 validator::GLOBAL_VALIDATOR,
46 },
47 ShmBufInfo, ShmBufInner,
48};
49
50#[derive(Debug)]
51struct BusyChunk {
52 metadata: AllocatedMetadataDescriptor,
53}
54
55impl BusyChunk {
56 fn new(metadata: AllocatedMetadataDescriptor) -> Self {
57 Self { metadata }
58 }
59
60 fn descriptor(&self) -> ChunkDescriptor {
61 self.metadata.header().data_descriptor()
62 }
63}
64
65struct AllocData<'a, IDSource, Backend>
66where
67 IDSource: ProtocolIDSource,
68 Backend: ShmProviderBackend,
69{
70 size: usize,
71 alignment: AllocAlignment,
72 provider: &'a ShmProvider<IDSource, Backend>,
73}
74
75#[zenoh_macros::unstable_doc]
76pub struct AllocLayoutSizedBuilder<'a, IDSource, Backend>(AllocData<'a, IDSource, Backend>)
77where
78 IDSource: ProtocolIDSource,
79 Backend: ShmProviderBackend;
80
81impl<'a, IDSource, Backend> AllocLayoutSizedBuilder<'a, IDSource, Backend>
82where
83 IDSource: ProtocolIDSource,
84 Backend: ShmProviderBackend,
85{
86 fn new(provider: &'a ShmProvider<IDSource, Backend>, size: usize) -> Self {
87 Self(AllocData {
88 provider,
89 size,
90 alignment: AllocAlignment::default(),
91 })
92 }
93
94 #[zenoh_macros::unstable_doc]
96 pub fn with_alignment(self, alignment: AllocAlignment) -> Self {
97 Self(AllocData {
98 provider: self.0.provider,
99 size: self.0.size,
100 alignment,
101 })
102 }
103
104 #[zenoh_macros::unstable_doc]
106 pub fn into_layout(self) -> Result<AllocLayout<'a, IDSource, Backend>, ZLayoutError> {
107 AllocLayout::new(self.0)
108 }
109
110 #[zenoh_macros::unstable_doc]
112 pub fn with_policy<Policy>(self) -> ProviderAllocBuilder<'a, IDSource, Backend, Policy> {
113 ProviderAllocBuilder {
114 data: self.0,
115 _phantom: PhantomData,
116 }
117 }
118}
119
120#[zenoh_macros::unstable_doc]
121impl<IDSource, Backend> Resolvable for AllocLayoutSizedBuilder<'_, IDSource, Backend>
122where
123 IDSource: ProtocolIDSource,
124 Backend: ShmProviderBackend,
125{
126 type To = BufLayoutAllocResult;
127}
128
129impl<'a, IDSource, Backend> Wait for AllocLayoutSizedBuilder<'a, IDSource, Backend>
131where
132 IDSource: ProtocolIDSource,
133 Backend: ShmProviderBackend,
134{
135 fn wait(self) -> <Self as Resolvable>::To {
136 let builder = ProviderAllocBuilder::<'a, IDSource, Backend, JustAlloc> {
137 data: self.0,
138 _phantom: PhantomData,
139 };
140 builder.wait()
141 }
142}
143
144#[zenoh_macros::unstable_doc]
148#[derive(Debug)]
149pub struct AllocLayout<'a, IDSource, Backend>
150where
151 IDSource: ProtocolIDSource,
152 Backend: ShmProviderBackend,
153{
154 size: NonZeroUsize,
155 provider_layout: MemoryLayout,
156 provider: &'a ShmProvider<IDSource, Backend>,
157}
158
159impl<'a, IDSource, Backend> AllocLayout<'a, IDSource, Backend>
160where
161 IDSource: ProtocolIDSource,
162 Backend: ShmProviderBackend,
163{
164 #[zenoh_macros::unstable_doc]
166 pub fn alloc(&'a self) -> LayoutAllocBuilder<'a, IDSource, Backend> {
167 LayoutAllocBuilder {
168 layout: self,
169 _phantom: PhantomData,
170 }
171 }
172
173 fn new(data: AllocData<'a, IDSource, Backend>) -> Result<Self, ZLayoutError> {
174 let layout = MemoryLayout::new(data.size, data.alignment)
179 .map_err(|_| ZLayoutError::IncorrectLayoutArgs)?;
180 let size = layout.size();
181
182 let provider_layout = data
184 .provider
185 .backend
186 .layout_for(layout)
187 .map_err(|_| ZLayoutError::ProviderIncompatibleLayout)?;
188
189 Ok(Self {
190 size,
191 provider_layout,
192 provider: data.provider,
193 })
194 }
195}
196
197#[zenoh_macros::unstable_doc]
199pub trait ForceDeallocPolicy {
200 fn dealloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
201 provider: &ShmProvider<IDSource, Backend>,
202 ) -> bool;
203}
204
205#[zenoh_macros::unstable_doc]
207pub struct DeallocOptimal;
208impl ForceDeallocPolicy for DeallocOptimal {
209 fn dealloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
210 provider: &ShmProvider<IDSource, Backend>,
211 ) -> bool {
212 let mut guard = provider.busy_list.lock().unwrap();
213 let chunk_to_dealloc = match guard.remove(1) {
214 Some(val) => val,
215 None => match guard.pop_front() {
216 Some(val) => val,
217 None => return false,
218 },
219 };
220 drop(guard);
221
222 provider.backend.free(&chunk_to_dealloc.descriptor());
223 true
224 }
225}
226
227#[zenoh_macros::unstable_doc]
229pub struct DeallocYoungest;
230impl ForceDeallocPolicy for DeallocYoungest {
231 fn dealloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
232 provider: &ShmProvider<IDSource, Backend>,
233 ) -> bool {
234 match provider.busy_list.lock().unwrap().pop_back() {
235 Some(val) => {
236 provider.backend.free(&val.descriptor());
237 true
238 }
239 None => false,
240 }
241 }
242}
243
244#[zenoh_macros::unstable_doc]
246pub struct DeallocEldest;
247impl ForceDeallocPolicy for DeallocEldest {
248 fn dealloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
249 provider: &ShmProvider<IDSource, Backend>,
250 ) -> bool {
251 match provider.busy_list.lock().unwrap().pop_front() {
252 Some(val) => {
253 provider.backend.free(&val.descriptor());
254 true
255 }
256 None => false,
257 }
258 }
259}
260
261#[zenoh_macros::unstable_doc]
263pub trait AllocPolicy {
264 fn alloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
265 layout: &MemoryLayout,
266 provider: &ShmProvider<IDSource, Backend>,
267 ) -> ChunkAllocResult;
268}
269
270#[zenoh_macros::unstable_doc]
272#[async_trait]
273pub trait AsyncAllocPolicy: Send {
274 async fn alloc_async<IDSource: ProtocolIDSource, Backend: ShmProviderBackend + Sync>(
275 layout: &MemoryLayout,
276 provider: &ShmProvider<IDSource, Backend>,
277 ) -> ChunkAllocResult;
278}
279
280#[zenoh_macros::unstable_doc]
282pub struct JustAlloc;
283impl AllocPolicy for JustAlloc {
284 fn alloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
285 layout: &MemoryLayout,
286 provider: &ShmProvider<IDSource, Backend>,
287 ) -> ChunkAllocResult {
288 provider.backend.alloc(layout)
289 }
290}
291
292#[zenoh_macros::unstable_doc]
296pub struct GarbageCollect<InnerPolicy = JustAlloc, AltPolicy = JustAlloc>
297where
298 InnerPolicy: AllocPolicy,
299 AltPolicy: AllocPolicy,
300{
301 _phantom: PhantomData<InnerPolicy>,
302 _phantom2: PhantomData<AltPolicy>,
303}
304impl<InnerPolicy, AltPolicy> AllocPolicy for GarbageCollect<InnerPolicy, AltPolicy>
305where
306 InnerPolicy: AllocPolicy,
307 AltPolicy: AllocPolicy,
308{
309 fn alloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
310 layout: &MemoryLayout,
311 provider: &ShmProvider<IDSource, Backend>,
312 ) -> ChunkAllocResult {
313 let result = InnerPolicy::alloc(layout, provider);
314 if result.is_err() {
315 if provider.garbage_collect() >= layout.size().get() {
317 return AltPolicy::alloc(layout, provider);
318 }
319 }
320 result
321 }
322}
323
324#[zenoh_macros::unstable_doc]
328pub struct Defragment<InnerPolicy = JustAlloc, AltPolicy = JustAlloc>
329where
330 InnerPolicy: AllocPolicy,
331 AltPolicy: AllocPolicy,
332{
333 _phantom: PhantomData<InnerPolicy>,
334 _phantom2: PhantomData<AltPolicy>,
335}
336impl<InnerPolicy, AltPolicy> AllocPolicy for Defragment<InnerPolicy, AltPolicy>
337where
338 InnerPolicy: AllocPolicy,
339 AltPolicy: AllocPolicy,
340{
341 fn alloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
342 layout: &MemoryLayout,
343 provider: &ShmProvider<IDSource, Backend>,
344 ) -> ChunkAllocResult {
345 let result = InnerPolicy::alloc(layout, provider);
346 if let Err(ZAllocError::NeedDefragment) = result {
347 if provider.defragment() >= layout.size().get() {
349 return AltPolicy::alloc(layout, provider);
350 }
351 }
352 result
353 }
354}
355
356#[zenoh_macros::unstable_doc]
359pub struct Deallocate<
360 const N: usize,
361 InnerPolicy = JustAlloc,
362 AltPolicy = InnerPolicy,
363 DeallocatePolicy = DeallocOptimal,
364> where
365 InnerPolicy: AllocPolicy,
366 AltPolicy: AllocPolicy,
367 DeallocatePolicy: ForceDeallocPolicy,
368{
369 _phantom: PhantomData<InnerPolicy>,
370 _phantom2: PhantomData<AltPolicy>,
371 _phantom3: PhantomData<DeallocatePolicy>,
372}
373impl<const N: usize, InnerPolicy, AltPolicy, DeallocatePolicy> AllocPolicy
374 for Deallocate<N, InnerPolicy, AltPolicy, DeallocatePolicy>
375where
376 InnerPolicy: AllocPolicy,
377 AltPolicy: AllocPolicy,
378 DeallocatePolicy: ForceDeallocPolicy,
379{
380 fn alloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
381 layout: &MemoryLayout,
382 provider: &ShmProvider<IDSource, Backend>,
383 ) -> ChunkAllocResult {
384 let mut result = InnerPolicy::alloc(layout, provider);
385 for _ in 0..N {
386 match result {
387 Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
388 if !DeallocatePolicy::dealloc(provider) {
389 return result;
390 }
391 }
392 _ => {
393 return result;
394 }
395 }
396 result = AltPolicy::alloc(layout, provider);
397 }
398 result
399 }
400}
401
402#[zenoh_macros::unstable_doc]
406pub struct BlockOn<InnerPolicy = JustAlloc>
407where
408 InnerPolicy: AllocPolicy,
409{
410 _phantom: PhantomData<InnerPolicy>,
411}
412
413#[async_trait]
414impl<InnerPolicy> AsyncAllocPolicy for BlockOn<InnerPolicy>
415where
416 InnerPolicy: AllocPolicy + Send,
417{
418 async fn alloc_async<IDSource: ProtocolIDSource, Backend: ShmProviderBackend + Sync>(
419 layout: &MemoryLayout,
420 provider: &ShmProvider<IDSource, Backend>,
421 ) -> ChunkAllocResult {
422 loop {
423 match InnerPolicy::alloc(layout, provider) {
424 Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
425 tokio::time::sleep(Duration::from_millis(1)).await;
427 }
428 other_result => {
429 return other_result;
430 }
431 }
432 }
433 }
434}
435impl<InnerPolicy> AllocPolicy for BlockOn<InnerPolicy>
436where
437 InnerPolicy: AllocPolicy,
438{
439 fn alloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
440 layout: &MemoryLayout,
441 provider: &ShmProvider<IDSource, Backend>,
442 ) -> ChunkAllocResult {
443 loop {
444 match InnerPolicy::alloc(layout, provider) {
445 Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
446 std::thread::sleep(Duration::from_millis(1));
448 }
449 other_result => {
450 return other_result;
451 }
452 }
453 }
454 }
455}
456
457#[zenoh_macros::unstable_doc]
510pub struct ProviderAllocBuilder<
511 'a,
512 IDSource: ProtocolIDSource,
513 Backend: ShmProviderBackend,
514 Policy = JustAlloc,
515> {
516 data: AllocData<'a, IDSource, Backend>,
517 _phantom: PhantomData<Policy>,
518}
519
520impl<'a, IDSource, Backend, Policy> ProviderAllocBuilder<'a, IDSource, Backend, Policy>
522where
523 IDSource: ProtocolIDSource,
524 Backend: ShmProviderBackend,
525{
526 #[zenoh_macros::unstable_doc]
528 pub fn with_policy<OtherPolicy>(
529 self,
530 ) -> ProviderAllocBuilder<'a, IDSource, Backend, OtherPolicy> {
531 ProviderAllocBuilder {
532 data: self.data,
533 _phantom: PhantomData,
534 }
535 }
536}
537
538impl<IDSource, Backend, Policy> Resolvable for ProviderAllocBuilder<'_, IDSource, Backend, Policy>
539where
540 IDSource: ProtocolIDSource,
541 Backend: ShmProviderBackend,
542{
543 type To = BufLayoutAllocResult;
544}
545
546impl<IDSource, Backend, Policy> Wait for ProviderAllocBuilder<'_, IDSource, Backend, Policy>
548where
549 IDSource: ProtocolIDSource,
550 Backend: ShmProviderBackend,
551 Policy: AllocPolicy,
552{
553 fn wait(self) -> <Self as Resolvable>::To {
554 let layout = AllocLayout::new(self.data).map_err(ZLayoutAllocError::Layout)?;
555
556 layout
557 .alloc()
558 .with_policy::<Policy>()
559 .wait()
560 .map_err(ZLayoutAllocError::Alloc)
561 }
562}
563
564impl<'a, IDSource, Backend, Policy> IntoFuture
566 for ProviderAllocBuilder<'a, IDSource, Backend, Policy>
567where
568 IDSource: ProtocolIDSource,
569 Backend: ShmProviderBackend + Sync,
570 Policy: AsyncAllocPolicy,
571{
572 type Output = <Self as Resolvable>::To;
573 type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'a + Send>>;
574
575 fn into_future(self) -> Self::IntoFuture {
576 Box::pin(
577 async move {
578 let layout = AllocLayout::new(self.data).map_err(ZLayoutAllocError::Layout)?;
579 layout
580 .alloc()
581 .with_policy::<Policy>()
582 .await
583 .map_err(ZLayoutAllocError::Alloc)
584 }
585 .into_future(),
586 )
587 }
588}
589
590#[zenoh_macros::unstable_doc]
592pub struct LayoutAllocBuilder<
593 'a,
594 IDSource: ProtocolIDSource,
595 Backend: ShmProviderBackend,
596 Policy = JustAlloc,
597> {
598 layout: &'a AllocLayout<'a, IDSource, Backend>,
599 _phantom: PhantomData<Policy>,
600}
601
602impl<'a, IDSource, Backend, Policy> LayoutAllocBuilder<'a, IDSource, Backend, Policy>
604where
605 IDSource: ProtocolIDSource,
606 Backend: ShmProviderBackend,
607{
608 #[zenoh_macros::unstable_doc]
610 pub fn with_policy<OtherPolicy>(
611 self,
612 ) -> LayoutAllocBuilder<'a, IDSource, Backend, OtherPolicy> {
613 LayoutAllocBuilder {
614 layout: self.layout,
615 _phantom: PhantomData,
616 }
617 }
618}
619
620impl<IDSource, Backend, Policy> Resolvable for LayoutAllocBuilder<'_, IDSource, Backend, Policy>
621where
622 IDSource: ProtocolIDSource,
623 Backend: ShmProviderBackend,
624{
625 type To = BufAllocResult;
626}
627
628impl<IDSource, Backend, Policy> Wait for LayoutAllocBuilder<'_, IDSource, Backend, Policy>
630where
631 IDSource: ProtocolIDSource,
632 Backend: ShmProviderBackend,
633 Policy: AllocPolicy,
634{
635 fn wait(self) -> <Self as Resolvable>::To {
636 self.layout
637 .provider
638 .alloc_inner::<Policy>(self.layout.size, &self.layout.provider_layout)
639 }
640}
641
642impl<'a, IDSource, Backend, Policy> IntoFuture for LayoutAllocBuilder<'a, IDSource, Backend, Policy>
644where
645 IDSource: ProtocolIDSource,
646 Backend: ShmProviderBackend + Sync,
647 Policy: AsyncAllocPolicy,
648{
649 type Output = <Self as Resolvable>::To;
650 type IntoFuture = Pin<Box<dyn Future<Output = <Self as Resolvable>::To> + 'a + Send>>;
651
652 fn into_future(self) -> Self::IntoFuture {
653 Box::pin(
654 async move {
655 self.layout
656 .provider
657 .alloc_inner_async::<Policy>(self.layout.size, &self.layout.provider_layout)
658 .await
659 }
660 .into_future(),
661 )
662 }
663}
664
665#[zenoh_macros::unstable_doc]
666pub struct ShmProviderBuilder;
667impl ShmProviderBuilder {
668 #[zenoh_macros::unstable_doc]
670 pub fn builder() -> Self {
671 Self
672 }
673
674 #[zenoh_macros::unstable_doc]
676 pub fn protocol_id<const ID: ProtocolID>(self) -> ShmProviderBuilderID<StaticProtocolID<ID>> {
677 ShmProviderBuilderID::<StaticProtocolID<ID>> {
678 id: StaticProtocolID,
679 }
680 }
681
682 #[zenoh_macros::unstable_doc]
684 pub fn dynamic_protocol_id(self, id: ProtocolID) -> ShmProviderBuilderID<DynamicProtocolID> {
685 ShmProviderBuilderID::<DynamicProtocolID> {
686 id: DynamicProtocolID::new(id),
687 }
688 }
689}
690
691#[zenoh_macros::unstable_doc]
692pub struct ShmProviderBuilderID<IDSource: ProtocolIDSource> {
693 id: IDSource,
694}
695impl<IDSource: ProtocolIDSource> ShmProviderBuilderID<IDSource> {
696 #[zenoh_macros::unstable_doc]
698 pub fn backend<Backend: ShmProviderBackend>(
699 self,
700 backend: Backend,
701 ) -> ShmProviderBuilderBackendID<IDSource, Backend> {
702 ShmProviderBuilderBackendID {
703 backend,
704 id: self.id,
705 }
706 }
707}
708
709#[zenoh_macros::unstable_doc]
710pub struct ShmProviderBuilderBackendID<IDSource, Backend>
711where
712 IDSource: ProtocolIDSource,
713 Backend: ShmProviderBackend,
714{
715 backend: Backend,
716 id: IDSource,
717}
718#[zenoh_macros::unstable_doc]
719impl<IDSource, Backend> Resolvable for ShmProviderBuilderBackendID<IDSource, Backend>
720where
721 IDSource: ProtocolIDSource,
722 Backend: ShmProviderBackend,
723{
724 type To = ShmProvider<IDSource, Backend>;
725}
726
727#[zenoh_macros::unstable_doc]
728impl<IDSource, Backend> Wait for ShmProviderBuilderBackendID<IDSource, Backend>
729where
730 IDSource: ProtocolIDSource,
731 Backend: ShmProviderBackend,
732{
733 fn wait(self) -> <Self as Resolvable>::To {
735 ShmProvider::new(self.backend, self.id)
736 }
737}
738
739#[zenoh_macros::unstable_doc]
741pub trait ProtocolIDSource: Send + Sync {
742 fn id(&self) -> ProtocolID;
743}
744
745#[zenoh_macros::unstable_doc]
749#[derive(Default)]
750pub struct StaticProtocolID<const ID: ProtocolID>;
751impl<const ID: ProtocolID> ProtocolIDSource for StaticProtocolID<ID> {
752 fn id(&self) -> ProtocolID {
753 ID
754 }
755}
756
757#[zenoh_macros::unstable_doc]
761pub struct DynamicProtocolID {
762 id: ProtocolID,
763}
764impl DynamicProtocolID {
765 #[zenoh_macros::unstable_doc]
766 pub fn new(id: ProtocolID) -> Self {
767 Self { id }
768 }
769}
770impl ProtocolIDSource for DynamicProtocolID {
771 fn id(&self) -> ProtocolID {
772 self.id
773 }
774}
775unsafe impl Send for DynamicProtocolID {}
776unsafe impl Sync for DynamicProtocolID {}
777
778#[zenoh_macros::unstable_doc]
780#[derive(Debug)]
781pub struct ShmProvider<IDSource, Backend>
782where
783 IDSource: ProtocolIDSource,
784 Backend: ShmProviderBackend,
785{
786 backend: Backend,
787 busy_list: Mutex<VecDeque<BusyChunk>>,
788 id: IDSource,
789}
790
791impl<IDSource, Backend> ShmProvider<IDSource, Backend>
792where
793 IDSource: ProtocolIDSource,
794 Backend: ShmProviderBackend,
795{
796 #[zenoh_macros::unstable_doc]
798 pub fn alloc(&self, size: usize) -> AllocLayoutSizedBuilder<IDSource, Backend> {
799 AllocLayoutSizedBuilder::new(self, size)
800 }
801
802 #[zenoh_macros::unstable_doc]
804 pub fn defragment(&self) -> usize {
805 self.backend.defragment()
806 }
807
808 #[zenoh_macros::unstable_doc]
812 pub fn map(&self, chunk: AllocatedChunk, len: usize) -> ZResult<ZShmMut> {
813 let len = len.try_into()?;
814
815 let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
817
818 let wrapped = self.wrap(chunk, len, allocated_metadata, confirmed_metadata);
820 Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
821 }
822
823 #[zenoh_macros::unstable_doc]
826 pub fn garbage_collect(&self) -> usize {
827 fn is_free_chunk(chunk: &BusyChunk) -> bool {
828 let header = chunk.metadata.header();
829 if header.refcount.load(Ordering::SeqCst) != 0 {
830 return header.watchdog_invalidated.load(Ordering::SeqCst);
831 }
832 true
833 }
834
835 tracing::trace!("Running Garbage Collector");
836
837 let mut largest = 0usize;
838 let mut guard = self.busy_list.lock().unwrap();
839 guard.retain(|maybe_free| {
840 if is_free_chunk(maybe_free) {
841 tracing::trace!("Garbage Collecting Chunk: {:?}", maybe_free);
842 let descriptor_to_free = maybe_free.descriptor();
843 self.backend.free(&descriptor_to_free);
844 largest = largest.max(descriptor_to_free.len.get());
845 return false;
846 }
847 true
848 });
849 drop(guard);
850
851 largest
852 }
853
854 #[zenoh_macros::unstable_doc]
856 pub fn available(&self) -> usize {
857 self.backend.available()
858 }
859}
860
861impl<IDSource, Backend> ShmProvider<IDSource, Backend>
863where
864 IDSource: ProtocolIDSource,
865 Backend: ShmProviderBackend,
866{
867 fn new(backend: Backend, id: IDSource) -> Self {
868 Self {
869 backend,
870 busy_list: Mutex::new(VecDeque::default()),
871 id,
872 }
873 }
874
875 fn alloc_inner<Policy>(&self, size: NonZeroUsize, layout: &MemoryLayout) -> BufAllocResult
876 where
877 Policy: AllocPolicy,
878 {
879 let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
881
882 let chunk = Policy::alloc(layout, self)?;
889
890 let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
892 Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
893 }
894
895 fn alloc_resources() -> ZResult<(AllocatedMetadataDescriptor, ConfirmedDescriptor)> {
896 let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?;
898
899 let confirmed_metadata = GLOBAL_CONFIRMATOR.read().add(allocated_metadata.clone());
901
902 Ok((allocated_metadata, confirmed_metadata))
903 }
904
905 fn wrap(
906 &self,
907 chunk: AllocatedChunk,
908 len: NonZeroUsize,
909 allocated_metadata: AllocatedMetadataDescriptor,
910 confirmed_metadata: ConfirmedDescriptor,
911 ) -> ShmBufInner {
912 allocated_metadata
915 .header()
916 .set_data_descriptor(&chunk.descriptor);
917 allocated_metadata
919 .header()
920 .protocol
921 .store(self.id.id(), Ordering::Relaxed);
922
923 GLOBAL_VALIDATOR
925 .read()
926 .add(confirmed_metadata.owned.clone());
927
928 let info = ShmBufInfo::new(
930 len,
931 MetadataDescriptor::from(&confirmed_metadata.owned),
932 allocated_metadata
933 .header()
934 .generation
935 .load(Ordering::SeqCst),
936 );
937
938 let shmb = ShmBufInner {
940 metadata: Arc::new(confirmed_metadata),
941 buf: chunk.data,
942 info,
943 };
944
945 self.busy_list
947 .lock()
948 .unwrap()
949 .push_back(BusyChunk::new(allocated_metadata));
950
951 shmb
952 }
953}
954
955impl<IDSource, Backend> ShmProvider<IDSource, Backend>
957where
958 IDSource: ProtocolIDSource,
959 Backend: ShmProviderBackend + Sync,
960{
961 async fn alloc_inner_async<Policy>(
962 &self,
963 size: NonZeroUsize,
964 backend_layout: &MemoryLayout,
965 ) -> BufAllocResult
966 where
967 Policy: AsyncAllocPolicy,
968 {
969 let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
971
972 let chunk = Policy::alloc_async(backend_layout, self).await?;
979
980 let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
982 Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
983 }
984}