zenoh_shm/api/provider/
shm_provider.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use std::{
16    collections::VecDeque,
17    future::{Future, IntoFuture},
18    marker::PhantomData,
19    num::NonZeroUsize,
20    pin::Pin,
21    sync::{atomic::Ordering, Arc, Mutex},
22    time::Duration,
23};
24
25use async_trait::async_trait;
26use zenoh_core::{Resolvable, Wait};
27use zenoh_result::ZResult;
28
29use super::{
30    chunk::{AllocatedChunk, ChunkDescriptor},
31    shm_provider_backend::ShmProviderBackend,
32    types::{
33        AllocAlignment, BufAllocResult, BufLayoutAllocResult, ChunkAllocResult, MemoryLayout,
34        ZAllocError, ZLayoutAllocError, ZLayoutError,
35    },
36};
37use crate::{
38    api::{buffer::zshmmut::ZShmMut, common::types::ProtocolID},
39    metadata::{
40        allocated_descriptor::AllocatedMetadataDescriptor, descriptor::MetadataDescriptor,
41        storage::GLOBAL_METADATA_STORAGE,
42    },
43    watchdog::{
44        confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR},
45        validator::GLOBAL_VALIDATOR,
46    },
47    ShmBufInfo, ShmBufInner,
48};
49
50#[derive(Debug)]
51struct BusyChunk {
52    metadata: AllocatedMetadataDescriptor,
53}
54
55impl BusyChunk {
56    fn new(metadata: AllocatedMetadataDescriptor) -> Self {
57        Self { metadata }
58    }
59
60    fn descriptor(&self) -> ChunkDescriptor {
61        self.metadata.header().data_descriptor()
62    }
63}
64
65struct AllocData<'a, IDSource, Backend>
66where
67    IDSource: ProtocolIDSource,
68    Backend: ShmProviderBackend,
69{
70    size: usize,
71    alignment: AllocAlignment,
72    provider: &'a ShmProvider<IDSource, Backend>,
73}
74
75#[zenoh_macros::unstable_doc]
76pub struct AllocLayoutSizedBuilder<'a, IDSource, Backend>(AllocData<'a, IDSource, Backend>)
77where
78    IDSource: ProtocolIDSource,
79    Backend: ShmProviderBackend;
80
81impl<'a, IDSource, Backend> AllocLayoutSizedBuilder<'a, IDSource, Backend>
82where
83    IDSource: ProtocolIDSource,
84    Backend: ShmProviderBackend,
85{
86    fn new(provider: &'a ShmProvider<IDSource, Backend>, size: usize) -> Self {
87        Self(AllocData {
88            provider,
89            size,
90            alignment: AllocAlignment::default(),
91        })
92    }
93
94    /// Set alignment
95    #[zenoh_macros::unstable_doc]
96    pub fn with_alignment(self, alignment: AllocAlignment) -> Self {
97        Self(AllocData {
98            provider: self.0.provider,
99            size: self.0.size,
100            alignment,
101        })
102    }
103
104    /// Try to build an allocation layout
105    #[zenoh_macros::unstable_doc]
106    pub fn into_layout(self) -> Result<AllocLayout<'a, IDSource, Backend>, ZLayoutError> {
107        AllocLayout::new(self.0)
108    }
109
110    /// Set the allocation policy
111    #[zenoh_macros::unstable_doc]
112    pub fn with_policy<Policy>(self) -> ProviderAllocBuilder<'a, IDSource, Backend, Policy> {
113        ProviderAllocBuilder {
114            data: self.0,
115            _phantom: PhantomData,
116        }
117    }
118}
119
120#[zenoh_macros::unstable_doc]
121impl<IDSource, Backend> Resolvable for AllocLayoutSizedBuilder<'_, IDSource, Backend>
122where
123    IDSource: ProtocolIDSource,
124    Backend: ShmProviderBackend,
125{
126    type To = BufLayoutAllocResult;
127}
128
129// Sync alloc policy
130impl<'a, IDSource, Backend> Wait for AllocLayoutSizedBuilder<'a, IDSource, Backend>
131where
132    IDSource: ProtocolIDSource,
133    Backend: ShmProviderBackend,
134{
135    fn wait(self) -> <Self as Resolvable>::To {
136        let builder = ProviderAllocBuilder::<'a, IDSource, Backend, JustAlloc> {
137            data: self.0,
138            _phantom: PhantomData,
139        };
140        builder.wait()
141    }
142}
143
144/// A layout for allocations.
145/// This is a pre-calculated layout suitable for making series of similar allocations
146/// adopted for particular ShmProvider
147#[zenoh_macros::unstable_doc]
148#[derive(Debug)]
149pub struct AllocLayout<'a, IDSource, Backend>
150where
151    IDSource: ProtocolIDSource,
152    Backend: ShmProviderBackend,
153{
154    size: NonZeroUsize,
155    provider_layout: MemoryLayout,
156    provider: &'a ShmProvider<IDSource, Backend>,
157}
158
159impl<'a, IDSource, Backend> AllocLayout<'a, IDSource, Backend>
160where
161    IDSource: ProtocolIDSource,
162    Backend: ShmProviderBackend,
163{
164    /// Allocate the new buffer with this layout
165    #[zenoh_macros::unstable_doc]
166    pub fn alloc(&'a self) -> LayoutAllocBuilder<'a, IDSource, Backend> {
167        LayoutAllocBuilder {
168            layout: self,
169            _phantom: PhantomData,
170        }
171    }
172
173    fn new(data: AllocData<'a, IDSource, Backend>) -> Result<Self, ZLayoutError> {
174        // NOTE: Depending on internal implementation, provider's backend might relayout
175        // the allocations for bigger alignment (ex. 4-byte aligned allocation to 8-bytes aligned)
176
177        // Create layout for specified arguments
178        let layout = MemoryLayout::new(data.size, data.alignment)
179            .map_err(|_| ZLayoutError::IncorrectLayoutArgs)?;
180        let size = layout.size();
181
182        // Obtain provider's layout for our layout
183        let provider_layout = data
184            .provider
185            .backend
186            .layout_for(layout)
187            .map_err(|_| ZLayoutError::ProviderIncompatibleLayout)?;
188
189        Ok(Self {
190            size,
191            provider_layout,
192            provider: data.provider,
193        })
194    }
195}
196
197/// Trait for deallocation policies.
198#[zenoh_macros::unstable_doc]
199pub trait ForceDeallocPolicy {
200    fn dealloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
201        provider: &ShmProvider<IDSource, Backend>,
202    ) -> bool;
203}
204
205/// Try to dealloc optimal (currently eldest+1) chunk
206#[zenoh_macros::unstable_doc]
207pub struct DeallocOptimal;
208impl ForceDeallocPolicy for DeallocOptimal {
209    fn dealloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
210        provider: &ShmProvider<IDSource, Backend>,
211    ) -> bool {
212        let mut guard = provider.busy_list.lock().unwrap();
213        let chunk_to_dealloc = match guard.remove(1) {
214            Some(val) => val,
215            None => match guard.pop_front() {
216                Some(val) => val,
217                None => return false,
218            },
219        };
220        drop(guard);
221
222        provider.backend.free(&chunk_to_dealloc.descriptor());
223        true
224    }
225}
226
227/// Try to dealloc youngest chunk
228#[zenoh_macros::unstable_doc]
229pub struct DeallocYoungest;
230impl ForceDeallocPolicy for DeallocYoungest {
231    fn dealloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
232        provider: &ShmProvider<IDSource, Backend>,
233    ) -> bool {
234        match provider.busy_list.lock().unwrap().pop_back() {
235            Some(val) => {
236                provider.backend.free(&val.descriptor());
237                true
238            }
239            None => false,
240        }
241    }
242}
243
244/// Try to dealloc eldest chunk
245#[zenoh_macros::unstable_doc]
246pub struct DeallocEldest;
247impl ForceDeallocPolicy for DeallocEldest {
248    fn dealloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
249        provider: &ShmProvider<IDSource, Backend>,
250    ) -> bool {
251        match provider.busy_list.lock().unwrap().pop_front() {
252            Some(val) => {
253                provider.backend.free(&val.descriptor());
254                true
255            }
256            None => false,
257        }
258    }
259}
260
261/// Trait for allocation policies
262#[zenoh_macros::unstable_doc]
263pub trait AllocPolicy {
264    fn alloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
265        layout: &MemoryLayout,
266        provider: &ShmProvider<IDSource, Backend>,
267    ) -> ChunkAllocResult;
268}
269
270/// Trait for async allocation policies
271#[zenoh_macros::unstable_doc]
272#[async_trait]
273pub trait AsyncAllocPolicy: Send {
274    async fn alloc_async<IDSource: ProtocolIDSource, Backend: ShmProviderBackend + Sync>(
275        layout: &MemoryLayout,
276        provider: &ShmProvider<IDSource, Backend>,
277    ) -> ChunkAllocResult;
278}
279
280/// Just try to allocate
281#[zenoh_macros::unstable_doc]
282pub struct JustAlloc;
283impl AllocPolicy for JustAlloc {
284    fn alloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
285        layout: &MemoryLayout,
286        provider: &ShmProvider<IDSource, Backend>,
287    ) -> ChunkAllocResult {
288        provider.backend.alloc(layout)
289    }
290}
291
292/// Garbage collection policy.
293/// Try to reclaim old buffers if allocation failed and allocate again
294/// if the largest reclaimed chuk is not smaller than the one required
295#[zenoh_macros::unstable_doc]
296pub struct GarbageCollect<InnerPolicy = JustAlloc, AltPolicy = JustAlloc>
297where
298    InnerPolicy: AllocPolicy,
299    AltPolicy: AllocPolicy,
300{
301    _phantom: PhantomData<InnerPolicy>,
302    _phantom2: PhantomData<AltPolicy>,
303}
304impl<InnerPolicy, AltPolicy> AllocPolicy for GarbageCollect<InnerPolicy, AltPolicy>
305where
306    InnerPolicy: AllocPolicy,
307    AltPolicy: AllocPolicy,
308{
309    fn alloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
310        layout: &MemoryLayout,
311        provider: &ShmProvider<IDSource, Backend>,
312    ) -> ChunkAllocResult {
313        let result = InnerPolicy::alloc(layout, provider);
314        if result.is_err() {
315            // try to alloc again only if GC managed to reclaim big enough chunk
316            if provider.garbage_collect() >= layout.size().get() {
317                return AltPolicy::alloc(layout, provider);
318            }
319        }
320        result
321    }
322}
323
324/// Defragmenting policy.
325/// Try to defragment if allocation failed and allocate again
326/// if the largest defragmented chuk is not smaller than the one required
327#[zenoh_macros::unstable_doc]
328pub struct Defragment<InnerPolicy = JustAlloc, AltPolicy = JustAlloc>
329where
330    InnerPolicy: AllocPolicy,
331    AltPolicy: AllocPolicy,
332{
333    _phantom: PhantomData<InnerPolicy>,
334    _phantom2: PhantomData<AltPolicy>,
335}
336impl<InnerPolicy, AltPolicy> AllocPolicy for Defragment<InnerPolicy, AltPolicy>
337where
338    InnerPolicy: AllocPolicy,
339    AltPolicy: AllocPolicy,
340{
341    fn alloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
342        layout: &MemoryLayout,
343        provider: &ShmProvider<IDSource, Backend>,
344    ) -> ChunkAllocResult {
345        let result = InnerPolicy::alloc(layout, provider);
346        if let Err(ZAllocError::NeedDefragment) = result {
347            // try to alloc again only if big enough chunk was defragmented
348            if provider.defragment() >= layout.size().get() {
349                return AltPolicy::alloc(layout, provider);
350            }
351        }
352        result
353    }
354}
355
356/// Deallocating policy.
357/// Forcibly deallocate up to N buffers until allocation succeeds.
358#[zenoh_macros::unstable_doc]
359pub struct Deallocate<
360    const N: usize,
361    InnerPolicy = JustAlloc,
362    AltPolicy = InnerPolicy,
363    DeallocatePolicy = DeallocOptimal,
364> where
365    InnerPolicy: AllocPolicy,
366    AltPolicy: AllocPolicy,
367    DeallocatePolicy: ForceDeallocPolicy,
368{
369    _phantom: PhantomData<InnerPolicy>,
370    _phantom2: PhantomData<AltPolicy>,
371    _phantom3: PhantomData<DeallocatePolicy>,
372}
373impl<const N: usize, InnerPolicy, AltPolicy, DeallocatePolicy> AllocPolicy
374    for Deallocate<N, InnerPolicy, AltPolicy, DeallocatePolicy>
375where
376    InnerPolicy: AllocPolicy,
377    AltPolicy: AllocPolicy,
378    DeallocatePolicy: ForceDeallocPolicy,
379{
380    fn alloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
381        layout: &MemoryLayout,
382        provider: &ShmProvider<IDSource, Backend>,
383    ) -> ChunkAllocResult {
384        let mut result = InnerPolicy::alloc(layout, provider);
385        for _ in 0..N {
386            match result {
387                Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
388                    if !DeallocatePolicy::dealloc(provider) {
389                        return result;
390                    }
391                }
392                _ => {
393                    return result;
394                }
395            }
396            result = AltPolicy::alloc(layout, provider);
397        }
398        result
399    }
400}
401
402/// Blocking allocation policy.
403/// This policy will block until the allocation succeeds.
404/// Both sync and async modes available.
405#[zenoh_macros::unstable_doc]
406pub struct BlockOn<InnerPolicy = JustAlloc>
407where
408    InnerPolicy: AllocPolicy,
409{
410    _phantom: PhantomData<InnerPolicy>,
411}
412
413#[async_trait]
414impl<InnerPolicy> AsyncAllocPolicy for BlockOn<InnerPolicy>
415where
416    InnerPolicy: AllocPolicy + Send,
417{
418    async fn alloc_async<IDSource: ProtocolIDSource, Backend: ShmProviderBackend + Sync>(
419        layout: &MemoryLayout,
420        provider: &ShmProvider<IDSource, Backend>,
421    ) -> ChunkAllocResult {
422        loop {
423            match InnerPolicy::alloc(layout, provider) {
424                Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
425                    // TODO: implement provider's async signalling instead of this!
426                    tokio::time::sleep(Duration::from_millis(1)).await;
427                }
428                other_result => {
429                    return other_result;
430                }
431            }
432        }
433    }
434}
435impl<InnerPolicy> AllocPolicy for BlockOn<InnerPolicy>
436where
437    InnerPolicy: AllocPolicy,
438{
439    fn alloc<IDSource: ProtocolIDSource, Backend: ShmProviderBackend>(
440        layout: &MemoryLayout,
441        provider: &ShmProvider<IDSource, Backend>,
442    ) -> ChunkAllocResult {
443        loop {
444            match InnerPolicy::alloc(layout, provider) {
445                Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
446                    // TODO: implement provider's async signalling instead of this!
447                    std::thread::sleep(Duration::from_millis(1));
448                }
449                other_result => {
450                    return other_result;
451                }
452            }
453        }
454    }
455}
456
457// TODO: allocator API
458/*pub struct ShmAllocator<
459    'a,
460    Policy: AllocPolicy,
461    IDSource,
462    Backend: ShmProviderBackend,
463> {
464    provider: &'a ShmProvider<IDSource, Backend>,
465    allocations: lockfree::map::Map<std::ptr::NonNull<u8>, ShmBufInner>,
466    _phantom: PhantomData<Policy>,
467}
468
469impl<'a, Policy: AllocPolicy, IDSource, Backend: ShmProviderBackend>
470    ShmAllocator<'a, Policy, IDSource, Backend>
471{
472    fn allocate(&self, layout: std::alloc::Layout) -> BufAllocResult {
473        self.provider
474            .alloc_layout()
475            .size(layout.size())
476            .alignment(AllocAlignment::new(layout.align() as u32))
477            .res()?
478            .alloc()
479            .res()
480    }
481}
482
483unsafe impl<'a, Policy: AllocPolicy, IDSource, Backend: ShmProviderBackend>
484    allocator_api2::alloc::Allocator for ShmAllocator<'a, Policy, IDSource, Backend>
485{
486    fn allocate(
487        &self,
488        layout: std::alloc::Layout,
489    ) -> Result<std::ptr::NonNull<[u8]>, allocator_api2::alloc::AllocError> {
490        let allocation = self
491            .allocate(layout)
492            .map_err(|_| allocator_api2::alloc::AllocError)?;
493
494        let inner = allocation.buf.load(Ordering::Relaxed);
495        let ptr = NonNull::new(inner).ok_or(allocator_api2::alloc::AllocError)?;
496        let sl = unsafe { std::slice::from_raw_parts(inner, 2) };
497        let res = NonNull::from(sl);
498
499        self.allocations.insert(ptr, allocation);
500        Ok(res)
501    }
502
503    unsafe fn deallocate(&self, ptr: std::ptr::NonNull<u8>, _layout: std::alloc::Layout) {
504        let _ = self.allocations.remove(&ptr);
505    }
506}*/
507
508/// Builder for making allocations with instant layout calculation
509#[zenoh_macros::unstable_doc]
510pub struct ProviderAllocBuilder<
511    'a,
512    IDSource: ProtocolIDSource,
513    Backend: ShmProviderBackend,
514    Policy = JustAlloc,
515> {
516    data: AllocData<'a, IDSource, Backend>,
517    _phantom: PhantomData<Policy>,
518}
519
520// Generic impl
521impl<'a, IDSource, Backend, Policy> ProviderAllocBuilder<'a, IDSource, Backend, Policy>
522where
523    IDSource: ProtocolIDSource,
524    Backend: ShmProviderBackend,
525{
526    /// Set the allocation policy
527    #[zenoh_macros::unstable_doc]
528    pub fn with_policy<OtherPolicy>(
529        self,
530    ) -> ProviderAllocBuilder<'a, IDSource, Backend, OtherPolicy> {
531        ProviderAllocBuilder {
532            data: self.data,
533            _phantom: PhantomData,
534        }
535    }
536}
537
538impl<IDSource, Backend, Policy> Resolvable for ProviderAllocBuilder<'_, IDSource, Backend, Policy>
539where
540    IDSource: ProtocolIDSource,
541    Backend: ShmProviderBackend,
542{
543    type To = BufLayoutAllocResult;
544}
545
546// Sync alloc policy
547impl<IDSource, Backend, Policy> Wait for ProviderAllocBuilder<'_, IDSource, Backend, Policy>
548where
549    IDSource: ProtocolIDSource,
550    Backend: ShmProviderBackend,
551    Policy: AllocPolicy,
552{
553    fn wait(self) -> <Self as Resolvable>::To {
554        let layout = AllocLayout::new(self.data).map_err(ZLayoutAllocError::Layout)?;
555
556        layout
557            .alloc()
558            .with_policy::<Policy>()
559            .wait()
560            .map_err(ZLayoutAllocError::Alloc)
561    }
562}
563
564// Async alloc policy
565impl<'a, IDSource, Backend, Policy> IntoFuture
566    for ProviderAllocBuilder<'a, IDSource, Backend, Policy>
567where
568    IDSource: ProtocolIDSource,
569    Backend: ShmProviderBackend + Sync,
570    Policy: AsyncAllocPolicy,
571{
572    type Output = <Self as Resolvable>::To;
573    type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'a + Send>>;
574
575    fn into_future(self) -> Self::IntoFuture {
576        Box::pin(
577            async move {
578                let layout = AllocLayout::new(self.data).map_err(ZLayoutAllocError::Layout)?;
579                layout
580                    .alloc()
581                    .with_policy::<Policy>()
582                    .await
583                    .map_err(ZLayoutAllocError::Alloc)
584            }
585            .into_future(),
586        )
587    }
588}
589
590/// Builder for making allocations through precalculated Layout
591#[zenoh_macros::unstable_doc]
592pub struct LayoutAllocBuilder<
593    'a,
594    IDSource: ProtocolIDSource,
595    Backend: ShmProviderBackend,
596    Policy = JustAlloc,
597> {
598    layout: &'a AllocLayout<'a, IDSource, Backend>,
599    _phantom: PhantomData<Policy>,
600}
601
602// Generic impl
603impl<'a, IDSource, Backend, Policy> LayoutAllocBuilder<'a, IDSource, Backend, Policy>
604where
605    IDSource: ProtocolIDSource,
606    Backend: ShmProviderBackend,
607{
608    /// Set the allocation policy
609    #[zenoh_macros::unstable_doc]
610    pub fn with_policy<OtherPolicy>(
611        self,
612    ) -> LayoutAllocBuilder<'a, IDSource, Backend, OtherPolicy> {
613        LayoutAllocBuilder {
614            layout: self.layout,
615            _phantom: PhantomData,
616        }
617    }
618}
619
620impl<IDSource, Backend, Policy> Resolvable for LayoutAllocBuilder<'_, IDSource, Backend, Policy>
621where
622    IDSource: ProtocolIDSource,
623    Backend: ShmProviderBackend,
624{
625    type To = BufAllocResult;
626}
627
628// Sync alloc policy
629impl<IDSource, Backend, Policy> Wait for LayoutAllocBuilder<'_, IDSource, Backend, Policy>
630where
631    IDSource: ProtocolIDSource,
632    Backend: ShmProviderBackend,
633    Policy: AllocPolicy,
634{
635    fn wait(self) -> <Self as Resolvable>::To {
636        self.layout
637            .provider
638            .alloc_inner::<Policy>(self.layout.size, &self.layout.provider_layout)
639    }
640}
641
642// Async alloc policy
643impl<'a, IDSource, Backend, Policy> IntoFuture for LayoutAllocBuilder<'a, IDSource, Backend, Policy>
644where
645    IDSource: ProtocolIDSource,
646    Backend: ShmProviderBackend + Sync,
647    Policy: AsyncAllocPolicy,
648{
649    type Output = <Self as Resolvable>::To;
650    type IntoFuture = Pin<Box<dyn Future<Output = <Self as Resolvable>::To> + 'a + Send>>;
651
652    fn into_future(self) -> Self::IntoFuture {
653        Box::pin(
654            async move {
655                self.layout
656                    .provider
657                    .alloc_inner_async::<Policy>(self.layout.size, &self.layout.provider_layout)
658                    .await
659            }
660            .into_future(),
661        )
662    }
663}
664
665#[zenoh_macros::unstable_doc]
666pub struct ShmProviderBuilder;
667impl ShmProviderBuilder {
668    /// Get the builder to construct ShmProvider
669    #[zenoh_macros::unstable_doc]
670    pub fn builder() -> Self {
671        Self
672    }
673
674    /// Set compile-time-evaluated protocol ID (preferred)
675    #[zenoh_macros::unstable_doc]
676    pub fn protocol_id<const ID: ProtocolID>(self) -> ShmProviderBuilderID<StaticProtocolID<ID>> {
677        ShmProviderBuilderID::<StaticProtocolID<ID>> {
678            id: StaticProtocolID,
679        }
680    }
681
682    /// Set runtime-evaluated protocol ID
683    #[zenoh_macros::unstable_doc]
684    pub fn dynamic_protocol_id(self, id: ProtocolID) -> ShmProviderBuilderID<DynamicProtocolID> {
685        ShmProviderBuilderID::<DynamicProtocolID> {
686            id: DynamicProtocolID::new(id),
687        }
688    }
689}
690
691#[zenoh_macros::unstable_doc]
692pub struct ShmProviderBuilderID<IDSource: ProtocolIDSource> {
693    id: IDSource,
694}
695impl<IDSource: ProtocolIDSource> ShmProviderBuilderID<IDSource> {
696    /// Set the backend
697    #[zenoh_macros::unstable_doc]
698    pub fn backend<Backend: ShmProviderBackend>(
699        self,
700        backend: Backend,
701    ) -> ShmProviderBuilderBackendID<IDSource, Backend> {
702        ShmProviderBuilderBackendID {
703            backend,
704            id: self.id,
705        }
706    }
707}
708
709#[zenoh_macros::unstable_doc]
710pub struct ShmProviderBuilderBackendID<IDSource, Backend>
711where
712    IDSource: ProtocolIDSource,
713    Backend: ShmProviderBackend,
714{
715    backend: Backend,
716    id: IDSource,
717}
718#[zenoh_macros::unstable_doc]
719impl<IDSource, Backend> Resolvable for ShmProviderBuilderBackendID<IDSource, Backend>
720where
721    IDSource: ProtocolIDSource,
722    Backend: ShmProviderBackend,
723{
724    type To = ShmProvider<IDSource, Backend>;
725}
726
727#[zenoh_macros::unstable_doc]
728impl<IDSource, Backend> Wait for ShmProviderBuilderBackendID<IDSource, Backend>
729where
730    IDSource: ProtocolIDSource,
731    Backend: ShmProviderBackend,
732{
733    /// build ShmProvider
734    fn wait(self) -> <Self as Resolvable>::To {
735        ShmProvider::new(self.backend, self.id)
736    }
737}
738
739/// Trait to create ProtocolID sources for ShmProvider
740#[zenoh_macros::unstable_doc]
741pub trait ProtocolIDSource: Send + Sync {
742    fn id(&self) -> ProtocolID;
743}
744
745/// Static ProtocolID source. This is a recommended API to set ProtocolID
746/// when creating ShmProvider as the ID value is statically evaluated
747/// at compile-time and can be optimized.
748#[zenoh_macros::unstable_doc]
749#[derive(Default)]
750pub struct StaticProtocolID<const ID: ProtocolID>;
751impl<const ID: ProtocolID> ProtocolIDSource for StaticProtocolID<ID> {
752    fn id(&self) -> ProtocolID {
753        ID
754    }
755}
756
757/// Dynamic ProtocolID source. This is an alternative API to set ProtocolID
758/// when creating ShmProvider for cases where ProtocolID is unknown
759/// at compile-time.
760#[zenoh_macros::unstable_doc]
761pub struct DynamicProtocolID {
762    id: ProtocolID,
763}
764impl DynamicProtocolID {
765    #[zenoh_macros::unstable_doc]
766    pub fn new(id: ProtocolID) -> Self {
767        Self { id }
768    }
769}
770impl ProtocolIDSource for DynamicProtocolID {
771    fn id(&self) -> ProtocolID {
772        self.id
773    }
774}
775unsafe impl Send for DynamicProtocolID {}
776unsafe impl Sync for DynamicProtocolID {}
777
778/// A generalized interface for shared memory data sources
779#[zenoh_macros::unstable_doc]
780#[derive(Debug)]
781pub struct ShmProvider<IDSource, Backend>
782where
783    IDSource: ProtocolIDSource,
784    Backend: ShmProviderBackend,
785{
786    backend: Backend,
787    busy_list: Mutex<VecDeque<BusyChunk>>,
788    id: IDSource,
789}
790
791impl<IDSource, Backend> ShmProvider<IDSource, Backend>
792where
793    IDSource: ProtocolIDSource,
794    Backend: ShmProviderBackend,
795{
796    /// Rich interface for making allocations
797    #[zenoh_macros::unstable_doc]
798    pub fn alloc(&self, size: usize) -> AllocLayoutSizedBuilder<IDSource, Backend> {
799        AllocLayoutSizedBuilder::new(self, size)
800    }
801
802    /// Defragment memory
803    #[zenoh_macros::unstable_doc]
804    pub fn defragment(&self) -> usize {
805        self.backend.defragment()
806    }
807
808    /// Map externally-allocated chunk into ZShmMut.
809    /// This method is designed to be used with push data sources.
810    /// Remember that chunk's len may be >= len!
811    #[zenoh_macros::unstable_doc]
812    pub fn map(&self, chunk: AllocatedChunk, len: usize) -> ZResult<ZShmMut> {
813        let len = len.try_into()?;
814
815        // allocate resources for SHM buffer
816        let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
817
818        // wrap everything to ShmBufInner
819        let wrapped = self.wrap(chunk, len, allocated_metadata, confirmed_metadata);
820        Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
821    }
822
823    /// Try to collect free chunks.
824    /// Returns the size of largest collected chunk
825    #[zenoh_macros::unstable_doc]
826    pub fn garbage_collect(&self) -> usize {
827        fn is_free_chunk(chunk: &BusyChunk) -> bool {
828            let header = chunk.metadata.header();
829            if header.refcount.load(Ordering::SeqCst) != 0 {
830                return header.watchdog_invalidated.load(Ordering::SeqCst);
831            }
832            true
833        }
834
835        tracing::trace!("Running Garbage Collector");
836
837        let mut largest = 0usize;
838        let mut guard = self.busy_list.lock().unwrap();
839        guard.retain(|maybe_free| {
840            if is_free_chunk(maybe_free) {
841                tracing::trace!("Garbage Collecting Chunk: {:?}", maybe_free);
842                let descriptor_to_free = maybe_free.descriptor();
843                self.backend.free(&descriptor_to_free);
844                largest = largest.max(descriptor_to_free.len.get());
845                return false;
846            }
847            true
848        });
849        drop(guard);
850
851        largest
852    }
853
854    /// Bytes available for use
855    #[zenoh_macros::unstable_doc]
856    pub fn available(&self) -> usize {
857        self.backend.available()
858    }
859}
860
861// PRIVATE impls
862impl<IDSource, Backend> ShmProvider<IDSource, Backend>
863where
864    IDSource: ProtocolIDSource,
865    Backend: ShmProviderBackend,
866{
867    fn new(backend: Backend, id: IDSource) -> Self {
868        Self {
869            backend,
870            busy_list: Mutex::new(VecDeque::default()),
871            id,
872        }
873    }
874
875    fn alloc_inner<Policy>(&self, size: NonZeroUsize, layout: &MemoryLayout) -> BufAllocResult
876    where
877        Policy: AllocPolicy,
878    {
879        // allocate resources for SHM buffer
880        let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
881
882        // allocate data chunk
883        // Perform actions depending on the Policy
884        // NOTE: it is necessary to properly map this chunk OR free it if mapping fails!
885        // Don't loose this chunk as it leads to memory leak at the backend side!
886        // NOTE: self.backend.alloc(len) returns chunk with len >= required len,
887        // and it is necessary to handle that properly and pass this len to corresponding free(...)
888        let chunk = Policy::alloc(layout, self)?;
889
890        // wrap allocated chunk to ShmBufInner
891        let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
892        Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
893    }
894
895    fn alloc_resources() -> ZResult<(AllocatedMetadataDescriptor, ConfirmedDescriptor)> {
896        // allocate metadata
897        let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?;
898
899        // add watchdog to confirmator
900        let confirmed_metadata = GLOBAL_CONFIRMATOR.read().add(allocated_metadata.clone());
901
902        Ok((allocated_metadata, confirmed_metadata))
903    }
904
905    fn wrap(
906        &self,
907        chunk: AllocatedChunk,
908        len: NonZeroUsize,
909        allocated_metadata: AllocatedMetadataDescriptor,
910        confirmed_metadata: ConfirmedDescriptor,
911    ) -> ShmBufInner {
912        // write additional metadata
913        // chunk descriptor
914        allocated_metadata
915            .header()
916            .set_data_descriptor(&chunk.descriptor);
917        // protocol
918        allocated_metadata
919            .header()
920            .protocol
921            .store(self.id.id(), Ordering::Relaxed);
922
923        // add watchdog to validator
924        GLOBAL_VALIDATOR
925            .read()
926            .add(confirmed_metadata.owned.clone());
927
928        // Create buffer's info
929        let info = ShmBufInfo::new(
930            len,
931            MetadataDescriptor::from(&confirmed_metadata.owned),
932            allocated_metadata
933                .header()
934                .generation
935                .load(Ordering::SeqCst),
936        );
937
938        // Create buffer
939        let shmb = ShmBufInner {
940            metadata: Arc::new(confirmed_metadata),
941            buf: chunk.data,
942            info,
943        };
944
945        // Create and store busy chunk
946        self.busy_list
947            .lock()
948            .unwrap()
949            .push_back(BusyChunk::new(allocated_metadata));
950
951        shmb
952    }
953}
954
955// PRIVATE impls for Sync backend
956impl<IDSource, Backend> ShmProvider<IDSource, Backend>
957where
958    IDSource: ProtocolIDSource,
959    Backend: ShmProviderBackend + Sync,
960{
961    async fn alloc_inner_async<Policy>(
962        &self,
963        size: NonZeroUsize,
964        backend_layout: &MemoryLayout,
965    ) -> BufAllocResult
966    where
967        Policy: AsyncAllocPolicy,
968    {
969        // allocate resources for SHM buffer
970        let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
971
972        // allocate data chunk
973        // Perform actions depending on the Policy
974        // NOTE: it is necessary to properly map this chunk OR free it if mapping fails!
975        // Don't loose this chunk as it leads to memory leak at the backend side!
976        // NOTE: self.backend.alloc(len) returns chunk with len >= required len,
977        // and it is necessary to handle that properly and pass this len to corresponding free(...)
978        let chunk = Policy::alloc_async(backend_layout, self).await?;
979
980        // wrap allocated chunk to ShmBufInner
981        let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
982        Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
983    }
984}