Skip to main content

zenoh_shm/api/provider/
shm_provider.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use std::{
16    error::Error,
17    future::{Future, IntoFuture},
18    marker::PhantomData,
19    mem::MaybeUninit,
20    num::NonZeroUsize,
21    pin::Pin,
22    sync::{atomic::Ordering, Mutex},
23    time::Duration,
24};
25
26use async_trait::async_trait;
27use zenoh_core::{zlock, zresult::ZResult, Resolvable, Wait};
28
29use super::{
30    chunk::{AllocatedChunk, ChunkDescriptor},
31    shm_provider_backend::ShmProviderBackend,
32    types::{ChunkAllocResult, ZAllocError, ZLayoutAllocError, ZLayoutError},
33};
34use crate::{
35    api::{
36        buffer::{typed::Typed, zshmmut::ZShmMut},
37        protocol_implementations::posix::posix_shm_provider_backend::{
38            PosixShmProviderBackend, PosixShmProviderBackendBuilder,
39        },
40        provider::memory_layout::{MemoryLayout, TypedLayout},
41    },
42    metadata::{
43        allocated_descriptor::AllocatedMetadataDescriptor, descriptor::MetadataDescriptor,
44        storage::GLOBAL_METADATA_STORAGE,
45    },
46    watchdog::{
47        confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR},
48        validator::GLOBAL_VALIDATOR,
49    },
50    ShmBufInfo, ShmBufInner,
51};
52
53/// The type of allocation.
54pub trait AllocLayout {
55    /// The buffer returned by the allocation.
56    type Buffer;
57    /// Returns the memory layouts of the allocation.
58    fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError>;
59    /// Wraps the raw allocated buffer.
60    ///
61    /// # Safety
62    ///
63    /// The buffer must have the layout returned by [`Self::memory_layout`]
64    unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer;
65}
66
67impl<T: TryInto<MemoryLayout>> AllocLayout for T
68where
69    T::Error: Into<ZLayoutError>,
70{
71    type Buffer = ZShmMut;
72
73    fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError> {
74        self.try_into().map_err(Into::into)
75    }
76
77    /// # Safety
78    ///
79    /// The buffer must have the layout returned by `memory_layout`
80    unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer {
81        buffer
82    }
83}
84
85impl<T> AllocLayout for TypedLayout<T> {
86    type Buffer = Typed<MaybeUninit<T>, ZShmMut>;
87
88    fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError> {
89        Ok(MemoryLayout::for_type::<T>())
90    }
91
92    /// # Safety
93    ///
94    /// The buffer must have the layout returned by `memory_layout`
95    unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer {
96        // SAFETY: same precondition
97        unsafe { Typed::new_unchecked(buffer) }
98    }
99}
100
101/// A layout for allocations.
102///
103/// This is a pre-calculated layout suitable for making series of similar allocations
104/// adopted for particular ShmProvider
105#[zenoh_macros::unstable_doc]
106#[derive(Debug)]
107pub struct PrecomputedLayout<'a, Backend, Layout> {
108    size: NonZeroUsize,
109    provider_layout: MemoryLayout,
110    provider: &'a ShmProvider<Backend>,
111    _phantom: PhantomData<Layout>,
112}
113
114impl<'a, Backend, Layout> PrecomputedLayout<'a, Backend, Layout>
115where
116    Backend: ShmProviderBackend,
117{
118    /// Allocate the new buffer with this layout
119    #[zenoh_macros::unstable_doc]
120    pub fn alloc<'b>(&'b self) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout> {
121        PrecomputedAllocBuilder {
122            layout: self,
123            policy: JustAlloc,
124        }
125    }
126}
127
128impl<'a, Backend, Layout> PrecomputedLayout<'a, Backend, Layout>
129where
130    Backend: ShmProviderBackend,
131{
132    fn new(
133        provider: &'a ShmProvider<Backend>,
134        required_layout: MemoryLayout,
135    ) -> Result<Self, ZLayoutError> {
136        // NOTE: Depending on internal implementation, provider's backend might relayout
137        // the allocations for bigger alignment (ex. 4-byte aligned allocation to 8-bytes aligned)
138
139        // calculate required layout size
140        let size = required_layout.size();
141
142        // Obtain provider's layout for our layout
143        let provider_layout = provider
144            .backend
145            .layout_for(required_layout)
146            .map_err(|_| ZLayoutError::ProviderIncompatibleLayout)?;
147
148        Ok(Self {
149            size,
150            provider_layout,
151            provider,
152            _phantom: PhantomData,
153        })
154    }
155}
156
157/// Trait for allocation policies
158#[zenoh_macros::unstable_doc]
159pub trait AllocPolicy<Backend> {
160    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult;
161}
162
163/// Trait for async allocation policies
164#[zenoh_macros::unstable_doc]
165#[async_trait]
166pub trait AsyncAllocPolicy<Backend>: Send {
167    async fn alloc_async(
168        &self,
169        layout: &MemoryLayout,
170        provider: &ShmProvider<Backend>,
171    ) -> ChunkAllocResult;
172}
173
174/// Marker trait for policies that are safe to use
175///
176/// # Safety
177///
178/// Policies implementing this trait must be safe to use. For example, [`Deallocate`] is not safe
179/// as it may deallocate and reuse a buffer that is currently in use. Users of unsafe policy must
180/// enforce the safety of their code by other means.
181pub unsafe trait SafePolicy {}
182
183/// Marker trait for compile-time allocation policies
184#[zenoh_macros::unstable_doc]
185pub trait ConstPolicy {
186    const NEW: Self;
187}
188
189/// A policy value that can be either constant or runtime provided.
190pub trait PolicyValue<T> {
191    fn get(&self) -> T;
192}
193
194impl<T: Copy> PolicyValue<T> for T {
195    fn get(&self) -> T {
196        *self
197    }
198}
199/// A const `bool`.
200pub struct ConstBool<const VALUE: bool>;
201impl<const VALUE: bool> ConstPolicy for ConstBool<VALUE> {
202    const NEW: Self = Self;
203}
204impl<const VALUE: bool> PolicyValue<bool> for ConstBool<VALUE> {
205    fn get(&self) -> bool {
206        VALUE
207    }
208}
209/// A const `usize`.
210pub struct ConstUsize<const VALUE: usize>;
211impl<const VALUE: usize> ConstPolicy for ConstUsize<VALUE> {
212    const NEW: Self = Self;
213}
214impl<const VALUE: usize> PolicyValue<usize> for ConstUsize<VALUE> {
215    fn get(&self) -> usize {
216        VALUE
217    }
218}
219
220/// Just try to allocate
221#[zenoh_macros::unstable_doc]
222#[derive(Clone, Copy)]
223pub struct JustAlloc;
224impl<Backend> AllocPolicy<Backend> for JustAlloc
225where
226    Backend: ShmProviderBackend,
227{
228    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
229        provider.backend.alloc(layout)
230    }
231}
232unsafe impl SafePolicy for JustAlloc {}
233impl ConstPolicy for JustAlloc {
234    const NEW: Self = Self;
235}
236
237/// Garbage collection policy.
238///
239/// Try to reclaim old buffers if allocation failed and allocate again
240/// if the largest reclaimed chuk is not smaller than the one required.
241///
242/// If `Safe` is set to false, the policy may lead to reallocate memory currently in-use.
243#[zenoh_macros::unstable_doc]
244#[derive(Clone, Copy)]
245pub struct GarbageCollect<InnerPolicy = JustAlloc, AltPolicy = JustAlloc, Safe = ConstBool<true>> {
246    inner_policy: InnerPolicy,
247    alt_policy: AltPolicy,
248    safe: Safe,
249}
250impl<InnerPolicy, AltPolicy, Safe> GarbageCollect<InnerPolicy, AltPolicy, Safe> {
251    /// Creates a new `GarbageCollect` policy.
252    pub fn new(inner_policy: InnerPolicy, alt_policy: AltPolicy, safe: Safe) -> Self {
253        Self {
254            inner_policy,
255            alt_policy,
256            safe,
257        }
258    }
259}
260impl<Backend, InnerPolicy, AltPolicy, Safe> AllocPolicy<Backend>
261    for GarbageCollect<InnerPolicy, AltPolicy, Safe>
262where
263    Backend: ShmProviderBackend,
264    InnerPolicy: AllocPolicy<Backend>,
265    AltPolicy: AllocPolicy<Backend>,
266    Safe: PolicyValue<bool>,
267{
268    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
269        let result = self.inner_policy.alloc(layout, provider);
270        if result.is_err() {
271            // try to alloc again only if GC managed to reclaim big enough chunk
272            let collected = if self.safe.get() {
273                provider.garbage_collect()
274            } else {
275                // SAFETY: the policy doesn't implement `SafePolicy` in this case, so the user
276                // must enforce the safety himself
277                unsafe { provider.garbage_collect_unsafe() }
278            };
279            if collected >= layout.size().get() {
280                return self.alt_policy.alloc(layout, provider);
281            }
282        }
283        result
284    }
285}
286unsafe impl<InnerPolicy: SafePolicy, AltPolicy: SafePolicy> SafePolicy
287    for GarbageCollect<InnerPolicy, AltPolicy, ConstBool<true>>
288{
289}
290impl<InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy, Safe: ConstPolicy> ConstPolicy
291    for GarbageCollect<InnerPolicy, AltPolicy, Safe>
292{
293    const NEW: Self = Self {
294        inner_policy: InnerPolicy::NEW,
295        alt_policy: AltPolicy::NEW,
296        safe: Safe::NEW,
297    };
298}
299
300/// Defragmenting policy.
301///
302/// Try to defragment if allocation failed and allocate again
303/// if the largest defragmented chuk is not smaller than the one required
304#[zenoh_macros::unstable_doc]
305#[derive(Clone, Copy)]
306pub struct Defragment<InnerPolicy = JustAlloc, AltPolicy = JustAlloc> {
307    inner_policy: InnerPolicy,
308    alt_policy: AltPolicy,
309}
310impl<InnerPolicy, AltPolicy> Defragment<InnerPolicy, AltPolicy> {
311    /// Creates a new `Defragment` policy.
312    pub fn new(inner_policy: InnerPolicy, alt_policy: AltPolicy) -> Self {
313        Self {
314            inner_policy,
315            alt_policy,
316        }
317    }
318}
319impl<Backend, InnerPolicy, AltPolicy> AllocPolicy<Backend> for Defragment<InnerPolicy, AltPolicy>
320where
321    Backend: ShmProviderBackend,
322    InnerPolicy: AllocPolicy<Backend>,
323    AltPolicy: AllocPolicy<Backend>,
324{
325    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
326        let result = self.inner_policy.alloc(layout, provider);
327        if let Err(ZAllocError::NeedDefragment) = result {
328            // try to alloc again only if big enough chunk was defragmented
329            if provider.defragment() >= layout.size().get() {
330                return self.alt_policy.alloc(layout, provider);
331            }
332        }
333        result
334    }
335}
336unsafe impl<InnerPolicy: SafePolicy, AltPolicy: SafePolicy> SafePolicy
337    for Defragment<InnerPolicy, AltPolicy>
338{
339}
340impl<InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy> ConstPolicy
341    for Defragment<InnerPolicy, AltPolicy>
342{
343    const NEW: Self = Self {
344        inner_policy: InnerPolicy::NEW,
345        alt_policy: AltPolicy::NEW,
346    };
347}
348
349/// Deallocating policy.
350///
351/// Forcibly deallocate up to N buffers until allocation succeeds.
352///
353/// This policy is unsafe as it may lead to reallocate memory currently in-use.
354#[zenoh_macros::unstable_doc]
355#[derive(Clone, Copy)]
356pub struct Deallocate<Limit, InnerPolicy = JustAlloc, AltPolicy = InnerPolicy> {
357    limit: Limit,
358    inner_policy: InnerPolicy,
359    alt_policy: AltPolicy,
360}
361impl<Limit, InnerPolicy, AltPolicy> Deallocate<Limit, InnerPolicy, AltPolicy> {
362    /// Creates a new `Deallocate` policy.
363    pub fn new(limit: Limit, inner_policy: InnerPolicy, alt_policy: AltPolicy) -> Self {
364        Self {
365            limit,
366            inner_policy,
367            alt_policy,
368        }
369    }
370}
371impl<Backend, Limit, InnerPolicy, AltPolicy> AllocPolicy<Backend>
372    for Deallocate<Limit, InnerPolicy, AltPolicy>
373where
374    Backend: ShmProviderBackend,
375    Limit: PolicyValue<usize>,
376    InnerPolicy: AllocPolicy<Backend>,
377    AltPolicy: AllocPolicy<Backend>,
378{
379    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
380        for _ in 0..self.limit.get() {
381            match self.inner_policy.alloc(layout, provider) {
382                res @ (Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory)) => {
383                    let Some(chunk) = zlock!(provider.busy_list).pop() else {
384                        return res;
385                    };
386                    provider.backend.free(&chunk.descriptor());
387                }
388                res => return res,
389            }
390        }
391        self.alt_policy.alloc(layout, provider)
392    }
393}
394impl<Limit: ConstPolicy, InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy> ConstPolicy
395    for Deallocate<Limit, InnerPolicy, AltPolicy>
396{
397    const NEW: Self = Self {
398        limit: Limit::NEW,
399        inner_policy: InnerPolicy::NEW,
400        alt_policy: AltPolicy::NEW,
401    };
402}
403
404/// Blocking allocation policy.
405///
406/// This policy will block until the allocation succeeds.
407/// Both sync and async modes available.
408#[zenoh_macros::unstable_doc]
409#[derive(Clone, Copy)]
410pub struct BlockOn<InnerPolicy = JustAlloc> {
411    inner_policy: InnerPolicy,
412}
413impl<InnerPolicy> BlockOn<InnerPolicy> {
414    /// Creates a new `BlockOn` policy.
415    pub fn new(inner_policy: InnerPolicy) -> Self {
416        Self { inner_policy }
417    }
418}
419#[async_trait]
420impl<Backend, InnerPolicy> AsyncAllocPolicy<Backend> for BlockOn<InnerPolicy>
421where
422    Backend: ShmProviderBackend + Sync,
423    InnerPolicy: AllocPolicy<Backend> + Send + Sync,
424{
425    async fn alloc_async(
426        &self,
427        layout: &MemoryLayout,
428        provider: &ShmProvider<Backend>,
429    ) -> ChunkAllocResult {
430        loop {
431            match self.inner_policy.alloc(layout, provider) {
432                Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
433                    // TODO: implement provider's async signalling instead of this!
434                    tokio::time::sleep(Duration::from_millis(1)).await;
435                }
436                other_result => {
437                    return other_result;
438                }
439            }
440        }
441    }
442}
443unsafe impl<InnerPolicy: SafePolicy> SafePolicy for BlockOn<InnerPolicy> {}
444impl<Backend, InnerPolicy> AllocPolicy<Backend> for BlockOn<InnerPolicy>
445where
446    Backend: ShmProviderBackend,
447    InnerPolicy: AllocPolicy<Backend>,
448{
449    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
450        loop {
451            match self.inner_policy.alloc(layout, provider) {
452                Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
453                    // TODO: implement provider's async signalling instead of this!
454                    std::thread::sleep(Duration::from_millis(1));
455                }
456                other_result => {
457                    return other_result;
458                }
459            }
460        }
461    }
462}
463impl<InnerPolicy: ConstPolicy> ConstPolicy for BlockOn<InnerPolicy> {
464    const NEW: Self = Self {
465        inner_policy: InnerPolicy::NEW,
466    };
467}
468
469#[zenoh_macros::unstable_doc]
470pub struct AllocBuilder<'a, Backend, Layout, Policy = JustAlloc> {
471    provider: &'a ShmProvider<Backend>,
472    layout: Layout,
473    policy: Policy,
474}
475
476// Generic impl
477impl<'a, Backend, Layout, Policy> AllocBuilder<'a, Backend, Layout, Policy> {
478    /// Set the allocation policy
479    #[zenoh_macros::unstable_doc]
480    pub fn with_policy<OtherPolicy: SafePolicy + ConstPolicy>(
481        self,
482    ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
483        // SAFETY: the policy is safe
484        unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
485    }
486
487    /// Set the unsafe allocation policy
488    ///
489    /// # Safety
490    ///
491    /// The user must ensure its use of allocated buffers are safe taking into account the policy.
492    #[zenoh_macros::unstable_doc]
493    pub unsafe fn with_unsafe_policy<OtherPolicy: ConstPolicy>(
494        self,
495    ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
496        // SAFETY: same function contract
497        unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
498    }
499
500    /// Set the unsafe allocation policy at runtime
501    ///
502    /// # Safety
503    ///
504    /// The user must ensure its use of allocated buffers are safe taking into account the policy.
505    #[zenoh_macros::unstable_doc]
506    pub unsafe fn with_runtime_policy<OtherPolicy>(
507        self,
508        policy: OtherPolicy,
509    ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
510        AllocBuilder {
511            provider: self.provider,
512            layout: self.layout,
513            policy,
514        }
515    }
516
517    fn layout_policy(self) -> Result<(PrecomputedLayout<'a, Backend, Layout>, Policy), ZLayoutError>
518    where
519        Backend: ShmProviderBackend,
520        Layout: AllocLayout,
521    {
522        let layout = PrecomputedLayout::new(self.provider, self.layout.memory_layout()?)?;
523        Ok((layout, self.policy))
524    }
525}
526
527impl<'a, Backend: ShmProviderBackend, Layout: TryInto<MemoryLayout>>
528    AllocBuilder<'a, Backend, Layout>
529where
530    Layout::Error: Into<ZLayoutError>,
531{
532    /// Try to build an allocation layout
533    #[deprecated(
534        since = "1.5.3",
535        note = "Please use `ShmProvider::alloc_layout` method instead"
536    )]
537    #[zenoh_macros::unstable_doc]
538    pub fn into_layout(self) -> Result<PrecomputedLayout<'a, Backend, Layout>, ZLayoutError> {
539        PrecomputedLayout::new(self.provider, self.layout.try_into().map_err(Into::into)?)
540    }
541}
542
543impl<Backend, Layout: AllocLayout, Policy> Resolvable
544    for AllocBuilder<'_, Backend, Layout, Policy>
545{
546    type To = Result<Layout::Buffer, ZLayoutAllocError>;
547}
548
549impl<Backend: ShmProviderBackend, Layout: AllocLayout, Policy: AllocPolicy<Backend>> Wait
550    for AllocBuilder<'_, Backend, Layout, Policy>
551{
552    fn wait(self) -> <Self as Resolvable>::To {
553        let (layout, policy) = self.layout_policy()?;
554        // SAFETY: same contract used to set the policy
555        Ok(unsafe { layout.alloc().with_runtime_policy(policy) }.wait()?)
556    }
557}
558
559impl<
560        'a,
561        Backend: ShmProviderBackend + Sync,
562        Layout: AllocLayout + Send + Sync + 'a,
563        Policy: AsyncAllocPolicy<Backend> + Sync + 'a,
564    > IntoFuture for AllocBuilder<'a, Backend, Layout, Policy>
565{
566    type Output = <Self as Resolvable>::To;
567    type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'a + Send>>;
568
569    fn into_future(self) -> Self::IntoFuture {
570        Box::pin(async move {
571            let (layout, policy) = self.layout_policy()?;
572            // SAFETY: same contract used to set the policy
573            Ok(unsafe { layout.alloc().with_runtime_policy(policy) }.await?)
574        })
575    }
576}
577
578/// Builder for making allocations with instant layout calculation
579#[zenoh_macros::unstable_doc]
580pub struct PrecomputedAllocBuilder<'b, 'a: 'b, Backend, Layout, Policy = JustAlloc> {
581    layout: &'b PrecomputedLayout<'a, Backend, Layout>,
582    policy: Policy,
583}
584
585// Generic impl
586impl<'b, 'a: 'b, Backend, Layout, Policy> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, Policy> {
587    /// Set the allocation policy
588    #[zenoh_macros::unstable_doc]
589    pub fn with_policy<OtherPolicy: ConstPolicy>(
590        self,
591    ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
592        // SAFETY: the policy is safe
593        unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
594    }
595
596    /// Set the unsafe allocation policy
597    ///
598    /// # Safety
599    ///
600    /// The user must ensure its use of allocated buffers are safe taking into account the policy.
601    #[zenoh_macros::unstable_doc]
602    pub unsafe fn with_unsafe_policy<OtherPolicy: ConstPolicy>(
603        self,
604    ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
605        // SAFETY: same function contract
606        unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
607    }
608
609    /// Set the unsafe allocation policy at runtime
610    ///
611    /// # Safety
612    ///
613    /// The user must ensure its use of allocated buffers are safe taking into account the policy.
614    #[zenoh_macros::unstable_doc]
615    pub unsafe fn with_runtime_policy<OtherPolicy>(
616        self,
617        policy: OtherPolicy,
618    ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
619        PrecomputedAllocBuilder {
620            layout: self.layout,
621            policy,
622        }
623    }
624}
625
626impl<Backend, Layout: AllocLayout, Policy> Resolvable
627    for PrecomputedAllocBuilder<'_, '_, Backend, Layout, Policy>
628{
629    type To = Result<Layout::Buffer, ZAllocError>;
630}
631
632impl<Backend: ShmProviderBackend, Layout: AllocLayout, Policy: AllocPolicy<Backend>> Wait
633    for PrecomputedAllocBuilder<'_, '_, Backend, Layout, Policy>
634{
635    fn wait(self) -> <Self as Resolvable>::To {
636        let buffer = self.layout.provider.alloc_inner(
637            self.layout.size,
638            &self.layout.provider_layout,
639            &self.policy,
640        )?;
641        // SAFETY: the buffer has been allocated with the requested layout
642        Ok(unsafe { Layout::wrap_buffer(buffer) })
643    }
644}
645
646impl<
647        'b,
648        'a: 'b,
649        Backend: ShmProviderBackend + Sync,
650        Layout: AllocLayout + Sync,
651        Policy: AsyncAllocPolicy<Backend> + Sync + 'b,
652    > IntoFuture for PrecomputedAllocBuilder<'b, 'a, Backend, Layout, Policy>
653{
654    type Output = <Self as Resolvable>::To;
655    type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'b + Send>>;
656
657    fn into_future(self) -> Self::IntoFuture {
658        Box::pin(async move {
659            let buffer = self
660                .layout
661                .provider
662                .alloc_inner_async(self.layout.size, &self.layout.provider_layout, &self.policy)
663                .await?;
664            // SAFETY: the buffer has been allocated with the requested layout
665            Ok(unsafe { Layout::wrap_buffer(buffer) })
666        })
667    }
668}
669
670#[zenoh_macros::unstable_doc]
671pub struct ShmProviderBuilder;
672impl ShmProviderBuilder {
673    /// Set the backend
674    #[zenoh_macros::unstable_doc]
675    pub fn backend<Backend: ShmProviderBackend>(
676        backend: Backend,
677    ) -> ShmProviderBuilderBackend<Backend> {
678        ShmProviderBuilderBackend { backend }
679    }
680
681    /// Set the default backend
682    #[zenoh_macros::unstable_doc]
683    pub fn default_backend<Layout>(layout: Layout) -> ShmProviderBuilderWithDefaultBackend<Layout> {
684        ShmProviderBuilderWithDefaultBackend { layout }
685    }
686}
687
688#[zenoh_macros::unstable_doc]
689pub struct ShmProviderBuilderBackend<Backend>
690where
691    Backend: ShmProviderBackend,
692{
693    backend: Backend,
694}
695#[zenoh_macros::unstable_doc]
696impl<Backend> Resolvable for ShmProviderBuilderBackend<Backend>
697where
698    Backend: ShmProviderBackend,
699{
700    type To = ShmProvider<Backend>;
701}
702
703#[zenoh_macros::unstable_doc]
704impl<Backend> Wait for ShmProviderBuilderBackend<Backend>
705where
706    Backend: ShmProviderBackend,
707{
708    /// build ShmProvider
709    fn wait(self) -> <Self as Resolvable>::To {
710        ShmProvider::new(self.backend)
711    }
712}
713
714#[zenoh_macros::unstable_doc]
715pub struct ShmProviderBuilderWithDefaultBackend<Layout> {
716    layout: Layout,
717}
718
719#[zenoh_macros::unstable_doc]
720impl<Layout> Resolvable for ShmProviderBuilderWithDefaultBackend<Layout> {
721    type To = ZResult<ShmProvider<PosixShmProviderBackend>>;
722}
723
724#[zenoh_macros::unstable_doc]
725impl<Layout: TryInto<MemoryLayout>> Wait for ShmProviderBuilderWithDefaultBackend<Layout>
726where
727    Layout::Error: Error,
728    PosixShmProviderBackendBuilder<Layout>:
729        Resolvable<To = ZResult<PosixShmProviderBackend>> + Wait,
730{
731    /// build ShmProvider
732    fn wait(self) -> <Self as Resolvable>::To {
733        let backend = PosixShmProviderBackend::builder(self.layout).wait()?;
734        Ok(ShmProvider::new(backend))
735    }
736}
737
738#[derive(Debug)]
739struct BusyChunk {
740    metadata: AllocatedMetadataDescriptor,
741}
742
743impl BusyChunk {
744    fn new(metadata: AllocatedMetadataDescriptor) -> Self {
745        Self { metadata }
746    }
747
748    fn descriptor(&self) -> ChunkDescriptor {
749        self.metadata.header().data_descriptor()
750    }
751}
752
753/// A generalized interface for shared memory data sources
754#[zenoh_macros::unstable_doc]
755#[derive(Debug)]
756pub struct ShmProvider<Backend> {
757    backend: Backend,
758    busy_list: Mutex<Vec<BusyChunk>>,
759}
760
761impl<Backend: Default> Default for ShmProvider<Backend> {
762    fn default() -> Self {
763        Self::new(Backend::default())
764    }
765}
766
767impl<Backend> ShmProvider<Backend>
768where
769    Backend: ShmProviderBackend,
770{
771    /// Allocates a buffer with a given memory layout.
772    #[zenoh_macros::unstable_doc]
773    pub fn alloc<Layout>(&self, layout: Layout) -> AllocBuilder<'_, Backend, Layout> {
774        AllocBuilder {
775            provider: self,
776            layout,
777            policy: JustAlloc,
778        }
779    }
780
781    /// Precompute the actual layout for an allocation.
782    #[zenoh_macros::unstable_doc]
783    pub fn alloc_layout<Layout: TryInto<MemoryLayout>>(
784        &self,
785        layout: Layout,
786    ) -> Result<PrecomputedLayout<'_, Backend, Layout>, ZLayoutError>
787    where
788        Layout::Error: Into<ZLayoutError>,
789    {
790        PrecomputedLayout::new(self, layout.try_into().map_err(Into::into)?)
791    }
792    /// Defragment memory
793    #[zenoh_macros::unstable_doc]
794    pub fn defragment(&self) -> usize {
795        self.backend.defragment()
796    }
797
798    /// Map externally-allocated chunk into ZShmMut.
799    ///
800    /// This method is designed to be used with push data sources.
801    /// Remember that chunk's len may be >= len!
802    #[zenoh_macros::unstable_doc]
803    pub fn map(&self, chunk: AllocatedChunk, len: usize) -> Result<ZShmMut, ZAllocError> {
804        let len = len.try_into().map_err(|_| ZAllocError::Other)?;
805
806        // allocate resources for SHM buffer
807        let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
808
809        // wrap everything to ShmBufInner
810        let wrapped = self.wrap(chunk, len, allocated_metadata, confirmed_metadata);
811        // SAFETY: `wrapped` is guaranteed to be valid as it represents a mapped chunk.
812        Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
813    }
814
815    #[zenoh_macros::unstable_doc]
816    fn garbage_collect_impl<const SAFE: bool>(&self) -> usize {
817        fn retain_unordered<T>(vec: &mut Vec<T>, mut f: impl FnMut(&T) -> bool) {
818            let mut i = 0;
819            while i < vec.len() {
820                if f(&vec[i]) {
821                    i += 1;
822                } else {
823                    vec.swap_remove(i); // move last into place of vec[i]
824                                        // don't increment i: need to test the swapped-in element
825                }
826            }
827        }
828
829        tracing::trace!("Running Garbage Collector");
830        let mut largest = 0usize;
831        retain_unordered(&mut zlock!(self.busy_list), |chunk| {
832            let header = chunk.metadata.header();
833            if header.refcount.load(Ordering::SeqCst) == 0
834                || (!SAFE && header.watchdog_invalidated.load(Ordering::SeqCst))
835            {
836                tracing::trace!("Garbage Collecting Chunk: {:?}", chunk);
837                let descriptor_to_free = chunk.descriptor();
838                self.backend.free(&descriptor_to_free);
839                largest = largest.max(descriptor_to_free.len.get());
840                return false;
841            }
842            true
843        });
844        largest
845    }
846
847    /// Try to collect free chunks.
848    ///
849    /// Returns the size of largest collected chunk
850    #[zenoh_macros::unstable_doc]
851    pub fn garbage_collect(&self) -> usize {
852        self.garbage_collect_impl::<true>()
853    }
854
855    /// Try to collect free chunks.
856    ///
857    /// Returns the size of largest collected chunk
858    ///
859    /// # Safety
860    ///
861    /// User must ensure there is no data races with collected chunks, as some of them may still be in-use.
862    #[zenoh_macros::unstable_doc]
863    pub unsafe fn garbage_collect_unsafe(&self) -> usize {
864        self.garbage_collect_impl::<false>()
865    }
866
867    /// Bytes available for use
868    #[zenoh_macros::unstable_doc]
869    pub fn available(&self) -> usize {
870        self.backend.available()
871    }
872}
873
874// PRIVATE impls
875impl<Backend> ShmProvider<Backend> {
876    fn new(backend: Backend) -> Self {
877        Self {
878            backend,
879            busy_list: Default::default(),
880        }
881    }
882}
883
884impl<Backend> ShmProvider<Backend>
885where
886    Backend: ShmProviderBackend,
887{
888    fn alloc_inner<Policy>(
889        &self,
890        size: NonZeroUsize,
891        layout: &MemoryLayout,
892        policy: &Policy,
893    ) -> Result<ZShmMut, ZAllocError>
894    where
895        Policy: AllocPolicy<Backend>,
896    {
897        // allocate resources for SHM buffer
898        let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
899
900        // allocate data chunk
901        // Perform actions depending on the Policy
902        // NOTE: it is necessary to properly map this chunk OR free it if mapping fails!
903        // Don't loose this chunk as it leads to memory leak at the backend side!
904        // NOTE: self.backend.alloc(len) returns chunk with len >= required len,
905        // and it is necessary to handle that properly and pass this len to corresponding free(...)
906        let chunk = policy.alloc(layout, self)?;
907
908        // wrap allocated chunk to ShmBufInner
909        let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
910        // SAFETY: `wrapped` is guaranteed to be valid as it represents an allocated chunk.
911        Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
912    }
913
914    fn alloc_resources() -> Result<(AllocatedMetadataDescriptor, ConfirmedDescriptor), ZAllocError>
915    {
916        // allocate metadata
917        let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?;
918
919        // add watchdog to confirmator
920        let confirmed_metadata = GLOBAL_CONFIRMATOR.read().add(allocated_metadata.clone());
921
922        Ok((allocated_metadata, confirmed_metadata))
923    }
924
925    fn wrap(
926        &self,
927        chunk: AllocatedChunk,
928        len: NonZeroUsize,
929        allocated_metadata: AllocatedMetadataDescriptor,
930        confirmed_metadata: ConfirmedDescriptor,
931    ) -> ShmBufInner {
932        // write additional metadata
933        // chunk descriptor
934        allocated_metadata
935            .header()
936            .set_data_descriptor(&chunk.descriptor);
937        // protocol
938        allocated_metadata
939            .header()
940            .protocol
941            .store(self.backend.id(), Ordering::Relaxed);
942
943        // add watchdog to validator
944        GLOBAL_VALIDATOR
945            .read()
946            .add(confirmed_metadata.owned.clone());
947
948        // Create buffer's info
949        let info = ShmBufInfo::new(
950            len,
951            MetadataDescriptor::from(&confirmed_metadata.owned),
952            allocated_metadata
953                .header()
954                .generation
955                .load(Ordering::SeqCst),
956        );
957
958        // Create buffer
959        let shmb = ShmBufInner {
960            metadata: confirmed_metadata,
961            buf: chunk.data,
962            info,
963        };
964
965        // Create and store busy chunk
966        zlock!(self.busy_list).push(BusyChunk::new(allocated_metadata));
967
968        shmb
969    }
970}
971
972// PRIVATE impls for Sync backend
973impl<Backend> ShmProvider<Backend>
974where
975    Backend: ShmProviderBackend + Sync,
976{
977    async fn alloc_inner_async<Policy>(
978        &self,
979        size: NonZeroUsize,
980        backend_layout: &MemoryLayout,
981        policy: &Policy,
982    ) -> Result<ZShmMut, ZAllocError>
983    where
984        Policy: AsyncAllocPolicy<Backend>,
985    {
986        // allocate resources for SHM buffer
987        let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
988
989        // allocate data chunk
990        // Perform actions depending on the Policy
991        // NOTE: it is necessary to properly map this chunk OR free it if mapping fails!
992        // Don't loose this chunk as it leads to memory leak at the backend side!
993        // NOTE: self.backend.alloc(len) returns chunk with len >= required len,
994        // and it is necessary to handle that properly and pass this len to corresponding free(...)
995        let chunk = policy.alloc_async(backend_layout, self).await?;
996
997        // wrap allocated chunk to ShmBufInner
998        let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
999        // SAFETY: `wrapped` is guaranteed to be valid as it represents an allocated chunk.
1000        Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
1001    }
1002}