Skip to main content

zenoh_shm/api/provider/
shm_provider.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use std::{
16    error::Error,
17    future::{Future, IntoFuture},
18    marker::PhantomData,
19    mem::MaybeUninit,
20    num::NonZeroUsize,
21    pin::Pin,
22    sync::{atomic::Ordering, Mutex},
23    time::Duration,
24};
25
26use async_trait::async_trait;
27use zenoh_core::{zlock, zresult::ZResult, Resolvable, Wait};
28
29use super::{
30    chunk::{AllocatedChunk, ChunkDescriptor},
31    shm_provider_backend::ShmProviderBackend,
32    types::{ChunkAllocResult, ZAllocError, ZLayoutAllocError, ZLayoutError},
33};
34use crate::{
35    api::{
36        buffer::{typed::Typed, zshmmut::ZShmMut},
37        protocol_implementations::posix::posix_shm_provider_backend::{
38            PosixShmProviderBackend, PosixShmProviderBackendBuilder,
39        },
40        provider::memory_layout::{MemoryLayout, TypedLayout},
41    },
42    metadata::{
43        allocated_descriptor::AllocatedMetadataDescriptor, descriptor::MetadataDescriptor,
44        storage::GLOBAL_METADATA_STORAGE,
45    },
46    watchdog::{
47        confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR},
48        validator::GLOBAL_VALIDATOR,
49    },
50    ShmBufInfo, ShmBufInner,
51};
52
53/// The type of allocation.
54pub trait AllocLayout {
55    /// The buffer returned by the allocation.
56    type Buffer;
57    /// Returns the memory layouts of the allocation.
58    fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError>;
59    /// Wraps the raw allocated buffer.
60    ///
61    /// # Safety
62    ///
63    /// The buffer must have the layout returned by [`Self::memory_layout`]
64    unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer;
65}
66
67impl<T: TryInto<MemoryLayout>> AllocLayout for T
68where
69    T::Error: Into<ZLayoutError>,
70{
71    type Buffer = ZShmMut;
72
73    fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError> {
74        self.try_into().map_err(Into::into)
75    }
76
77    unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer {
78        buffer
79    }
80}
81
82impl<T> AllocLayout for TypedLayout<T> {
83    type Buffer = Typed<MaybeUninit<T>, ZShmMut>;
84
85    fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError> {
86        Ok(MemoryLayout::for_type::<T>())
87    }
88
89    unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer {
90        // SAFETY: same precondition
91        unsafe { Typed::new_unchecked(buffer) }
92    }
93}
94
95/// A layout for allocations.
96///
97/// This is a pre-calculated layout suitable for making series of similar allocations
98/// adopted for particular ShmProvider
99#[zenoh_macros::unstable_doc]
100#[derive(Debug)]
101pub struct PrecomputedLayout<'a, Backend, Layout> {
102    size: NonZeroUsize,
103    provider_layout: MemoryLayout,
104    provider: &'a ShmProvider<Backend>,
105    _phantom: PhantomData<Layout>,
106}
107
108impl<'a, Backend, Layout> PrecomputedLayout<'a, Backend, Layout>
109where
110    Backend: ShmProviderBackend,
111{
112    /// Allocate the new buffer with this layout
113    #[zenoh_macros::unstable_doc]
114    pub fn alloc<'b>(&'b self) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout> {
115        PrecomputedAllocBuilder {
116            layout: self,
117            policy: JustAlloc,
118        }
119    }
120}
121
122impl<'a, Backend, Layout> PrecomputedLayout<'a, Backend, Layout>
123where
124    Backend: ShmProviderBackend,
125{
126    fn new(
127        provider: &'a ShmProvider<Backend>,
128        required_layout: MemoryLayout,
129    ) -> Result<Self, ZLayoutError> {
130        // NOTE: Depending on internal implementation, provider's backend might relayout
131        // the allocations for bigger alignment (ex. 4-byte aligned allocation to 8-bytes aligned)
132
133        // calculate required layout size
134        let size = required_layout.size();
135
136        // Obtain provider's layout for our layout
137        let provider_layout = provider
138            .backend
139            .layout_for(required_layout)
140            .map_err(|_| ZLayoutError::ProviderIncompatibleLayout)?;
141
142        Ok(Self {
143            size,
144            provider_layout,
145            provider,
146            _phantom: PhantomData,
147        })
148    }
149}
150
151/// Trait for allocation policies
152#[zenoh_macros::unstable_doc]
153pub trait AllocPolicy<Backend> {
154    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult;
155}
156
157/// Trait for async allocation policies
158#[zenoh_macros::unstable_doc]
159#[async_trait]
160pub trait AsyncAllocPolicy<Backend>: Send {
161    async fn alloc_async(
162        &self,
163        layout: &MemoryLayout,
164        provider: &ShmProvider<Backend>,
165    ) -> ChunkAllocResult;
166}
167
168/// Marker trait for policies that are safe to use
169///
170/// # Safety
171///
172/// Policies implementing this trait must be safe to use. For example, [`Deallocate`] is not safe
173/// as it may deallocate and reuse a buffer that is currently in use. Users of unsafe policy must
174/// enforce the safety of their code by other means.
175pub unsafe trait SafePolicy {}
176
177/// Marker trait for compile-time allocation policies
178#[zenoh_macros::unstable_doc]
179pub trait ConstPolicy {
180    const NEW: Self;
181}
182
183/// A policy value that can be either constant or runtime provided.
184pub trait PolicyValue<T> {
185    fn get(&self) -> T;
186}
187
188impl<T: Copy> PolicyValue<T> for T {
189    fn get(&self) -> T {
190        *self
191    }
192}
193/// A const `bool`.
194pub struct ConstBool<const VALUE: bool>;
195impl<const VALUE: bool> ConstPolicy for ConstBool<VALUE> {
196    const NEW: Self = Self;
197}
198impl<const VALUE: bool> PolicyValue<bool> for ConstBool<VALUE> {
199    fn get(&self) -> bool {
200        VALUE
201    }
202}
203/// A const `usize`.
204pub struct ConstUsize<const VALUE: usize>;
205impl<const VALUE: usize> ConstPolicy for ConstUsize<VALUE> {
206    const NEW: Self = Self;
207}
208impl<const VALUE: usize> PolicyValue<usize> for ConstUsize<VALUE> {
209    fn get(&self) -> usize {
210        VALUE
211    }
212}
213
214/// Just try to allocate
215#[zenoh_macros::unstable_doc]
216#[derive(Clone, Copy)]
217pub struct JustAlloc;
218impl<Backend> AllocPolicy<Backend> for JustAlloc
219where
220    Backend: ShmProviderBackend,
221{
222    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
223        provider.backend.alloc(layout)
224    }
225}
226unsafe impl SafePolicy for JustAlloc {}
227impl ConstPolicy for JustAlloc {
228    const NEW: Self = Self;
229}
230
231/// Garbage collection policy.
232///
233/// Try to reclaim old buffers if allocation failed and allocate again
234/// if the largest reclaimed chuk is not smaller than the one required.
235///
236/// If `Safe` is set to false, the policy may lead to reallocate memory currently in-use.
237#[zenoh_macros::unstable_doc]
238#[derive(Clone, Copy)]
239pub struct GarbageCollect<InnerPolicy = JustAlloc, AltPolicy = JustAlloc, Safe = ConstBool<true>> {
240    inner_policy: InnerPolicy,
241    alt_policy: AltPolicy,
242    safe: Safe,
243}
244impl<InnerPolicy, AltPolicy, Safe> GarbageCollect<InnerPolicy, AltPolicy, Safe> {
245    /// Creates a new `GarbageCollect` policy.
246    pub fn new(inner_policy: InnerPolicy, alt_policy: AltPolicy, safe: Safe) -> Self {
247        Self {
248            inner_policy,
249            alt_policy,
250            safe,
251        }
252    }
253}
254impl<Backend, InnerPolicy, AltPolicy, Safe> AllocPolicy<Backend>
255    for GarbageCollect<InnerPolicy, AltPolicy, Safe>
256where
257    Backend: ShmProviderBackend,
258    InnerPolicy: AllocPolicy<Backend>,
259    AltPolicy: AllocPolicy<Backend>,
260    Safe: PolicyValue<bool>,
261{
262    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
263        let result = self.inner_policy.alloc(layout, provider);
264        if result.is_err() {
265            // try to alloc again only if GC managed to reclaim big enough chunk
266            let collected = if self.safe.get() {
267                provider.garbage_collect()
268            } else {
269                // SAFETY: the policy doesn't implement `SafePolicy` in this case, so the user
270                // must enforce the safety himself
271                unsafe { provider.garbage_collect_unsafe() }
272            };
273            if collected >= layout.size().get() {
274                return self.alt_policy.alloc(layout, provider);
275            }
276        }
277        result
278    }
279}
280unsafe impl<InnerPolicy: SafePolicy, AltPolicy: SafePolicy> SafePolicy
281    for GarbageCollect<InnerPolicy, AltPolicy, ConstBool<true>>
282{
283}
284impl<InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy, Safe: ConstPolicy> ConstPolicy
285    for GarbageCollect<InnerPolicy, AltPolicy, Safe>
286{
287    const NEW: Self = Self {
288        inner_policy: InnerPolicy::NEW,
289        alt_policy: AltPolicy::NEW,
290        safe: Safe::NEW,
291    };
292}
293
294/// Defragmenting policy.
295///
296/// Try to defragment if allocation failed and allocate again
297/// if the largest defragmented chuk is not smaller than the one required
298#[zenoh_macros::unstable_doc]
299#[derive(Clone, Copy)]
300pub struct Defragment<InnerPolicy = JustAlloc, AltPolicy = JustAlloc> {
301    inner_policy: InnerPolicy,
302    alt_policy: AltPolicy,
303}
304impl<InnerPolicy, AltPolicy> Defragment<InnerPolicy, AltPolicy> {
305    /// Creates a new `Defragment` policy.
306    pub fn new(inner_policy: InnerPolicy, alt_policy: AltPolicy) -> Self {
307        Self {
308            inner_policy,
309            alt_policy,
310        }
311    }
312}
313impl<Backend, InnerPolicy, AltPolicy> AllocPolicy<Backend> for Defragment<InnerPolicy, AltPolicy>
314where
315    Backend: ShmProviderBackend,
316    InnerPolicy: AllocPolicy<Backend>,
317    AltPolicy: AllocPolicy<Backend>,
318{
319    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
320        let result = self.inner_policy.alloc(layout, provider);
321        if let Err(ZAllocError::NeedDefragment) = result {
322            // try to alloc again only if big enough chunk was defragmented
323            if provider.defragment() >= layout.size().get() {
324                return self.alt_policy.alloc(layout, provider);
325            }
326        }
327        result
328    }
329}
330unsafe impl<InnerPolicy: SafePolicy, AltPolicy: SafePolicy> SafePolicy
331    for Defragment<InnerPolicy, AltPolicy>
332{
333}
334impl<InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy> ConstPolicy
335    for Defragment<InnerPolicy, AltPolicy>
336{
337    const NEW: Self = Self {
338        inner_policy: InnerPolicy::NEW,
339        alt_policy: AltPolicy::NEW,
340    };
341}
342
343/// Deallocating policy.
344///
345/// Forcibly deallocate up to N buffers until allocation succeeds.
346///
347/// This policy is unsafe as it may lead to reallocate memory currently in-use.
348#[zenoh_macros::unstable_doc]
349#[derive(Clone, Copy)]
350pub struct Deallocate<Limit, InnerPolicy = JustAlloc, AltPolicy = InnerPolicy> {
351    limit: Limit,
352    inner_policy: InnerPolicy,
353    alt_policy: AltPolicy,
354}
355impl<Limit, InnerPolicy, AltPolicy> Deallocate<Limit, InnerPolicy, AltPolicy> {
356    /// Creates a new `Deallocate` policy.
357    pub fn new(limit: Limit, inner_policy: InnerPolicy, alt_policy: AltPolicy) -> Self {
358        Self {
359            limit,
360            inner_policy,
361            alt_policy,
362        }
363    }
364}
365impl<Backend, Limit, InnerPolicy, AltPolicy> AllocPolicy<Backend>
366    for Deallocate<Limit, InnerPolicy, AltPolicy>
367where
368    Backend: ShmProviderBackend,
369    Limit: PolicyValue<usize>,
370    InnerPolicy: AllocPolicy<Backend>,
371    AltPolicy: AllocPolicy<Backend>,
372{
373    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
374        for _ in 0..self.limit.get() {
375            match self.inner_policy.alloc(layout, provider) {
376                res @ (Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory)) => {
377                    let Some(chunk) = zlock!(provider.busy_list).pop() else {
378                        return res;
379                    };
380                    provider.backend.free(&chunk.descriptor());
381                }
382                res => return res,
383            }
384        }
385        self.alt_policy.alloc(layout, provider)
386    }
387}
388impl<Limit: ConstPolicy, InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy> ConstPolicy
389    for Deallocate<Limit, InnerPolicy, AltPolicy>
390{
391    const NEW: Self = Self {
392        limit: Limit::NEW,
393        inner_policy: InnerPolicy::NEW,
394        alt_policy: AltPolicy::NEW,
395    };
396}
397
398/// Blocking allocation policy.
399///
400/// This policy will block until the allocation succeeds.
401/// Both sync and async modes available.
402#[zenoh_macros::unstable_doc]
403#[derive(Clone, Copy)]
404pub struct BlockOn<InnerPolicy = JustAlloc> {
405    inner_policy: InnerPolicy,
406}
407impl<InnerPolicy> BlockOn<InnerPolicy> {
408    /// Creates a new `BlockOn` policy.
409    pub fn new(inner_policy: InnerPolicy) -> Self {
410        Self { inner_policy }
411    }
412}
413#[async_trait]
414impl<Backend, InnerPolicy> AsyncAllocPolicy<Backend> for BlockOn<InnerPolicy>
415where
416    Backend: ShmProviderBackend + Sync,
417    InnerPolicy: AllocPolicy<Backend> + Send + Sync,
418{
419    async fn alloc_async(
420        &self,
421        layout: &MemoryLayout,
422        provider: &ShmProvider<Backend>,
423    ) -> ChunkAllocResult {
424        loop {
425            match self.inner_policy.alloc(layout, provider) {
426                Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
427                    // TODO: implement provider's async signalling instead of this!
428                    tokio::time::sleep(Duration::from_millis(1)).await;
429                }
430                other_result => {
431                    return other_result;
432                }
433            }
434        }
435    }
436}
437unsafe impl<InnerPolicy: SafePolicy> SafePolicy for BlockOn<InnerPolicy> {}
438impl<Backend, InnerPolicy> AllocPolicy<Backend> for BlockOn<InnerPolicy>
439where
440    Backend: ShmProviderBackend,
441    InnerPolicy: AllocPolicy<Backend>,
442{
443    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
444        loop {
445            match self.inner_policy.alloc(layout, provider) {
446                Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
447                    // TODO: implement provider's async signalling instead of this!
448                    std::thread::sleep(Duration::from_millis(1));
449                }
450                other_result => {
451                    return other_result;
452                }
453            }
454        }
455    }
456}
457impl<InnerPolicy: ConstPolicy> ConstPolicy for BlockOn<InnerPolicy> {
458    const NEW: Self = Self {
459        inner_policy: InnerPolicy::NEW,
460    };
461}
462
463#[zenoh_macros::unstable_doc]
464pub struct AllocBuilder<'a, Backend, Layout, Policy = JustAlloc> {
465    provider: &'a ShmProvider<Backend>,
466    layout: Layout,
467    policy: Policy,
468}
469
470// Generic impl
471impl<'a, Backend, Layout, Policy> AllocBuilder<'a, Backend, Layout, Policy> {
472    /// Set the allocation policy
473    #[zenoh_macros::unstable_doc]
474    pub fn with_policy<OtherPolicy: SafePolicy + ConstPolicy>(
475        self,
476    ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
477        // SAFETY: the policy is safe
478        unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
479    }
480
481    /// Set the unsafe allocation policy
482    ///
483    /// # Safety
484    ///
485    /// The user must ensure its use of allocated buffers are safe taking into account the policy.
486    #[zenoh_macros::unstable_doc]
487    pub unsafe fn with_unsafe_policy<OtherPolicy: ConstPolicy>(
488        self,
489    ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
490        // SAFETY: same function contract
491        unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
492    }
493
494    /// Set the unsafe allocation policy at runtime
495    ///
496    /// # Safety
497    ///
498    /// The user must ensure its use of allocated buffers are safe taking into account the policy.
499    #[zenoh_macros::unstable_doc]
500    pub unsafe fn with_runtime_policy<OtherPolicy>(
501        self,
502        policy: OtherPolicy,
503    ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
504        AllocBuilder {
505            provider: self.provider,
506            layout: self.layout,
507            policy,
508        }
509    }
510
511    fn layout_policy(self) -> Result<(PrecomputedLayout<'a, Backend, Layout>, Policy), ZLayoutError>
512    where
513        Backend: ShmProviderBackend,
514        Layout: AllocLayout,
515    {
516        let layout = PrecomputedLayout::new(self.provider, self.layout.memory_layout()?)?;
517        Ok((layout, self.policy))
518    }
519}
520
521impl<'a, Backend: ShmProviderBackend, Layout: TryInto<MemoryLayout>>
522    AllocBuilder<'a, Backend, Layout>
523where
524    Layout::Error: Into<ZLayoutError>,
525{
526    /// Try to build an allocation layout
527    #[deprecated(
528        since = "1.5.3",
529        note = "Please use `ShmProvider::alloc_layout` method instead"
530    )]
531    #[zenoh_macros::unstable_doc]
532    pub fn into_layout(self) -> Result<PrecomputedLayout<'a, Backend, Layout>, ZLayoutError> {
533        PrecomputedLayout::new(self.provider, self.layout.try_into().map_err(Into::into)?)
534    }
535}
536
537impl<Backend, Layout: AllocLayout, Policy> Resolvable
538    for AllocBuilder<'_, Backend, Layout, Policy>
539{
540    type To = Result<Layout::Buffer, ZLayoutAllocError>;
541}
542
543impl<Backend: ShmProviderBackend, Layout: AllocLayout, Policy: AllocPolicy<Backend>> Wait
544    for AllocBuilder<'_, Backend, Layout, Policy>
545{
546    fn wait(self) -> <Self as Resolvable>::To {
547        let (layout, policy) = self.layout_policy()?;
548        // SAFETY: same contract used to set the policy
549        Ok(unsafe { layout.alloc().with_runtime_policy(policy) }.wait()?)
550    }
551}
552
553impl<
554        'a,
555        Backend: ShmProviderBackend + Sync,
556        Layout: AllocLayout + Send + Sync + 'a,
557        Policy: AsyncAllocPolicy<Backend> + Sync + 'a,
558    > IntoFuture for AllocBuilder<'a, Backend, Layout, Policy>
559{
560    type Output = <Self as Resolvable>::To;
561    type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'a + Send>>;
562
563    fn into_future(self) -> Self::IntoFuture {
564        Box::pin(async move {
565            let (layout, policy) = self.layout_policy()?;
566            // SAFETY: same contract used to set the policy
567            Ok(unsafe { layout.alloc().with_runtime_policy(policy) }.await?)
568        })
569    }
570}
571
572/// Builder for making allocations with instant layout calculation
573#[zenoh_macros::unstable_doc]
574pub struct PrecomputedAllocBuilder<'b, 'a: 'b, Backend, Layout, Policy = JustAlloc> {
575    layout: &'b PrecomputedLayout<'a, Backend, Layout>,
576    policy: Policy,
577}
578
579// Generic impl
580impl<'b, 'a: 'b, Backend, Layout, Policy> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, Policy> {
581    /// Set the allocation policy
582    #[zenoh_macros::unstable_doc]
583    pub fn with_policy<OtherPolicy: ConstPolicy>(
584        self,
585    ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
586        // SAFETY: the policy is safe
587        unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
588    }
589
590    /// Set the unsafe allocation policy
591    ///
592    /// # Safety
593    ///
594    /// The user must ensure its use of allocated buffers are safe taking into account the policy.
595    #[zenoh_macros::unstable_doc]
596    pub unsafe fn with_unsafe_policy<OtherPolicy: ConstPolicy>(
597        self,
598    ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
599        // SAFETY: same function contract
600        unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
601    }
602
603    /// Set the unsafe allocation policy at runtime
604    ///
605    /// # Safety
606    ///
607    /// The user must ensure its use of allocated buffers are safe taking into account the policy.
608    #[zenoh_macros::unstable_doc]
609    pub unsafe fn with_runtime_policy<OtherPolicy>(
610        self,
611        policy: OtherPolicy,
612    ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
613        PrecomputedAllocBuilder {
614            layout: self.layout,
615            policy,
616        }
617    }
618}
619
620impl<Backend, Layout: AllocLayout, Policy> Resolvable
621    for PrecomputedAllocBuilder<'_, '_, Backend, Layout, Policy>
622{
623    type To = Result<Layout::Buffer, ZAllocError>;
624}
625
626impl<Backend: ShmProviderBackend, Layout: AllocLayout, Policy: AllocPolicy<Backend>> Wait
627    for PrecomputedAllocBuilder<'_, '_, Backend, Layout, Policy>
628{
629    fn wait(self) -> <Self as Resolvable>::To {
630        let buffer = self.layout.provider.alloc_inner(
631            self.layout.size,
632            &self.layout.provider_layout,
633            &self.policy,
634        )?;
635        // SAFETY: the buffer has been allocated with the requested layout
636        Ok(unsafe { Layout::wrap_buffer(buffer) })
637    }
638}
639
640impl<
641        'b,
642        'a: 'b,
643        Backend: ShmProviderBackend + Sync,
644        Layout: AllocLayout + Sync,
645        Policy: AsyncAllocPolicy<Backend> + Sync + 'b,
646    > IntoFuture for PrecomputedAllocBuilder<'b, 'a, Backend, Layout, Policy>
647{
648    type Output = <Self as Resolvable>::To;
649    type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'b + Send>>;
650
651    fn into_future(self) -> Self::IntoFuture {
652        Box::pin(async move {
653            let buffer = self
654                .layout
655                .provider
656                .alloc_inner_async(self.layout.size, &self.layout.provider_layout, &self.policy)
657                .await?;
658            // SAFETY: the buffer has been allocated with the requested layout
659            Ok(unsafe { Layout::wrap_buffer(buffer) })
660        })
661    }
662}
663
664#[zenoh_macros::unstable_doc]
665pub struct ShmProviderBuilder;
666impl ShmProviderBuilder {
667    /// Set the backend
668    #[zenoh_macros::unstable_doc]
669    pub fn backend<Backend: ShmProviderBackend>(
670        backend: Backend,
671    ) -> ShmProviderBuilderBackend<Backend> {
672        ShmProviderBuilderBackend { backend }
673    }
674
675    /// Set the default backend
676    #[zenoh_macros::unstable_doc]
677    pub fn default_backend<Layout>(layout: Layout) -> ShmProviderBuilderWithDefaultBackend<Layout> {
678        ShmProviderBuilderWithDefaultBackend { layout }
679    }
680}
681
682#[zenoh_macros::unstable_doc]
683pub struct ShmProviderBuilderBackend<Backend>
684where
685    Backend: ShmProviderBackend,
686{
687    backend: Backend,
688}
689#[zenoh_macros::unstable_doc]
690impl<Backend> Resolvable for ShmProviderBuilderBackend<Backend>
691where
692    Backend: ShmProviderBackend,
693{
694    type To = ShmProvider<Backend>;
695}
696
697#[zenoh_macros::unstable_doc]
698impl<Backend> Wait for ShmProviderBuilderBackend<Backend>
699where
700    Backend: ShmProviderBackend,
701{
702    /// build ShmProvider
703    fn wait(self) -> <Self as Resolvable>::To {
704        ShmProvider::new(self.backend)
705    }
706}
707
708#[zenoh_macros::unstable_doc]
709pub struct ShmProviderBuilderWithDefaultBackend<Layout> {
710    layout: Layout,
711}
712
713#[zenoh_macros::unstable_doc]
714impl<Layout> Resolvable for ShmProviderBuilderWithDefaultBackend<Layout> {
715    type To = ZResult<ShmProvider<PosixShmProviderBackend>>;
716}
717
718#[zenoh_macros::unstable_doc]
719impl<Layout: TryInto<MemoryLayout>> Wait for ShmProviderBuilderWithDefaultBackend<Layout>
720where
721    Layout::Error: Error,
722    PosixShmProviderBackendBuilder<Layout>:
723        Resolvable<To = ZResult<PosixShmProviderBackend>> + Wait,
724{
725    /// build ShmProvider
726    fn wait(self) -> <Self as Resolvable>::To {
727        let backend = PosixShmProviderBackend::builder(self.layout).wait()?;
728        Ok(ShmProvider::new(backend))
729    }
730}
731
732#[derive(Debug)]
733struct BusyChunk {
734    metadata: AllocatedMetadataDescriptor,
735}
736
737impl BusyChunk {
738    fn new(metadata: AllocatedMetadataDescriptor) -> Self {
739        Self { metadata }
740    }
741
742    fn descriptor(&self) -> ChunkDescriptor {
743        self.metadata.header().data_descriptor()
744    }
745}
746
747/// A generalized interface for shared memory data sources
748#[zenoh_macros::unstable_doc]
749#[derive(Debug)]
750pub struct ShmProvider<Backend> {
751    backend: Backend,
752    busy_list: Mutex<Vec<BusyChunk>>,
753}
754
755impl<Backend: Default> Default for ShmProvider<Backend> {
756    fn default() -> Self {
757        Self::new(Backend::default())
758    }
759}
760
761impl<Backend> ShmProvider<Backend>
762where
763    Backend: ShmProviderBackend,
764{
765    /// Allocates a buffer with a given memory layout.
766    #[zenoh_macros::unstable_doc]
767    pub fn alloc<Layout>(&self, layout: Layout) -> AllocBuilder<'_, Backend, Layout> {
768        AllocBuilder {
769            provider: self,
770            layout,
771            policy: JustAlloc,
772        }
773    }
774
775    /// Precompute the actual layout for an allocation.
776    #[zenoh_macros::unstable_doc]
777    pub fn alloc_layout<Layout: TryInto<MemoryLayout>>(
778        &self,
779        layout: Layout,
780    ) -> Result<PrecomputedLayout<'_, Backend, Layout>, ZLayoutError>
781    where
782        Layout::Error: Into<ZLayoutError>,
783    {
784        PrecomputedLayout::new(self, layout.try_into().map_err(Into::into)?)
785    }
786    /// Defragment memory
787    #[zenoh_macros::unstable_doc]
788    pub fn defragment(&self) -> usize {
789        self.backend.defragment()
790    }
791
792    /// Map externally-allocated chunk into ZShmMut.
793    ///
794    /// This method is designed to be used with push data sources.
795    /// Remember that chunk's len may be >= len!
796    #[zenoh_macros::unstable_doc]
797    pub fn map(&self, chunk: AllocatedChunk, len: usize) -> Result<ZShmMut, ZAllocError> {
798        let len = len.try_into().map_err(|_| ZAllocError::Other)?;
799
800        // allocate resources for SHM buffer
801        let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
802
803        // wrap everything to ShmBufInner
804        let wrapped = self.wrap(chunk, len, allocated_metadata, confirmed_metadata);
805        Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
806    }
807
808    #[zenoh_macros::unstable_doc]
809    fn garbage_collect_impl<const SAFE: bool>(&self) -> usize {
810        fn retain_unordered<T>(vec: &mut Vec<T>, mut f: impl FnMut(&T) -> bool) {
811            let mut i = 0;
812            while i < vec.len() {
813                if f(&vec[i]) {
814                    i += 1;
815                } else {
816                    vec.swap_remove(i); // move last into place of vec[i]
817                                        // don't increment i: need to test the swapped-in element
818                }
819            }
820        }
821
822        tracing::trace!("Running Garbage Collector");
823        let mut largest = 0usize;
824        retain_unordered(&mut zlock!(self.busy_list), |chunk| {
825            let header = chunk.metadata.header();
826            if header.refcount.load(Ordering::SeqCst) == 0
827                || (!SAFE && header.watchdog_invalidated.load(Ordering::SeqCst))
828            {
829                tracing::trace!("Garbage Collecting Chunk: {:?}", chunk);
830                let descriptor_to_free = chunk.descriptor();
831                self.backend.free(&descriptor_to_free);
832                largest = largest.max(descriptor_to_free.len.get());
833                return false;
834            }
835            true
836        });
837        largest
838    }
839
840    /// Try to collect free chunks.
841    ///
842    /// Returns the size of largest collected chunk
843    #[zenoh_macros::unstable_doc]
844    pub fn garbage_collect(&self) -> usize {
845        self.garbage_collect_impl::<true>()
846    }
847
848    /// Try to collect free chunks.
849    ///
850    /// Returns the size of largest collected chunk
851    ///
852    /// # Safety
853    ///
854    /// User must ensure there is no data races with collected chunks, as some of them may still be in-use.
855    #[zenoh_macros::unstable_doc]
856    pub unsafe fn garbage_collect_unsafe(&self) -> usize {
857        self.garbage_collect_impl::<false>()
858    }
859
860    /// Bytes available for use
861    #[zenoh_macros::unstable_doc]
862    pub fn available(&self) -> usize {
863        self.backend.available()
864    }
865}
866
867// PRIVATE impls
868impl<Backend> ShmProvider<Backend> {
869    fn new(backend: Backend) -> Self {
870        Self {
871            backend,
872            busy_list: Default::default(),
873        }
874    }
875}
876
877impl<Backend> ShmProvider<Backend>
878where
879    Backend: ShmProviderBackend,
880{
881    fn alloc_inner<Policy>(
882        &self,
883        size: NonZeroUsize,
884        layout: &MemoryLayout,
885        policy: &Policy,
886    ) -> Result<ZShmMut, ZAllocError>
887    where
888        Policy: AllocPolicy<Backend>,
889    {
890        // allocate resources for SHM buffer
891        let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
892
893        // allocate data chunk
894        // Perform actions depending on the Policy
895        // NOTE: it is necessary to properly map this chunk OR free it if mapping fails!
896        // Don't loose this chunk as it leads to memory leak at the backend side!
897        // NOTE: self.backend.alloc(len) returns chunk with len >= required len,
898        // and it is necessary to handle that properly and pass this len to corresponding free(...)
899        let chunk = policy.alloc(layout, self)?;
900
901        // wrap allocated chunk to ShmBufInner
902        let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
903        Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
904    }
905
906    fn alloc_resources() -> Result<(AllocatedMetadataDescriptor, ConfirmedDescriptor), ZAllocError>
907    {
908        // allocate metadata
909        let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?;
910
911        // add watchdog to confirmator
912        let confirmed_metadata = GLOBAL_CONFIRMATOR.read().add(allocated_metadata.clone());
913
914        Ok((allocated_metadata, confirmed_metadata))
915    }
916
917    fn wrap(
918        &self,
919        chunk: AllocatedChunk,
920        len: NonZeroUsize,
921        allocated_metadata: AllocatedMetadataDescriptor,
922        confirmed_metadata: ConfirmedDescriptor,
923    ) -> ShmBufInner {
924        // write additional metadata
925        // chunk descriptor
926        allocated_metadata
927            .header()
928            .set_data_descriptor(&chunk.descriptor);
929        // protocol
930        allocated_metadata
931            .header()
932            .protocol
933            .store(self.backend.id(), Ordering::Relaxed);
934
935        // add watchdog to validator
936        GLOBAL_VALIDATOR
937            .read()
938            .add(confirmed_metadata.owned.clone());
939
940        // Create buffer's info
941        let info = ShmBufInfo::new(
942            len,
943            MetadataDescriptor::from(&confirmed_metadata.owned),
944            allocated_metadata
945                .header()
946                .generation
947                .load(Ordering::SeqCst),
948        );
949
950        // Create buffer
951        let shmb = ShmBufInner {
952            metadata: confirmed_metadata,
953            buf: chunk.data,
954            info,
955        };
956
957        // Create and store busy chunk
958        zlock!(self.busy_list).push(BusyChunk::new(allocated_metadata));
959
960        shmb
961    }
962}
963
964// PRIVATE impls for Sync backend
965impl<Backend> ShmProvider<Backend>
966where
967    Backend: ShmProviderBackend + Sync,
968{
969    async fn alloc_inner_async<Policy>(
970        &self,
971        size: NonZeroUsize,
972        backend_layout: &MemoryLayout,
973        policy: &Policy,
974    ) -> Result<ZShmMut, ZAllocError>
975    where
976        Policy: AsyncAllocPolicy<Backend>,
977    {
978        // allocate resources for SHM buffer
979        let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
980
981        // allocate data chunk
982        // Perform actions depending on the Policy
983        // NOTE: it is necessary to properly map this chunk OR free it if mapping fails!
984        // Don't loose this chunk as it leads to memory leak at the backend side!
985        // NOTE: self.backend.alloc(len) returns chunk with len >= required len,
986        // and it is necessary to handle that properly and pass this len to corresponding free(...)
987        let chunk = policy.alloc_async(backend_layout, self).await?;
988
989        // wrap allocated chunk to ShmBufInner
990        let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
991        Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
992    }
993}