zenoh_shm/api/provider/
shm_provider.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use std::{
16    error::Error,
17    future::{Future, IntoFuture},
18    marker::PhantomData,
19    mem::MaybeUninit,
20    num::NonZeroUsize,
21    pin::Pin,
22    sync::{atomic::Ordering, Mutex},
23    time::Duration,
24};
25
26use async_trait::async_trait;
27use zenoh_core::{zlock, zresult::ZResult, Resolvable, Wait};
28
29use super::{
30    chunk::{AllocatedChunk, ChunkDescriptor},
31    shm_provider_backend::ShmProviderBackend,
32    types::{ChunkAllocResult, ZAllocError, ZLayoutAllocError, ZLayoutError},
33};
34use crate::{
35    api::{
36        buffer::{typed::Typed, zshmmut::ZShmMut},
37        protocol_implementations::posix::posix_shm_provider_backend::{
38            PosixShmProviderBackend, PosixShmProviderBackendBuilder,
39        },
40        provider::memory_layout::{MemoryLayout, TypedLayout},
41    },
42    metadata::{
43        allocated_descriptor::AllocatedMetadataDescriptor, descriptor::MetadataDescriptor,
44        storage::GLOBAL_METADATA_STORAGE,
45    },
46    watchdog::{
47        confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR},
48        validator::GLOBAL_VALIDATOR,
49    },
50    ShmBufInfo, ShmBufInner,
51};
52
53/// The type of allocation.
54pub trait AllocLayout {
55    /// The buffer returned by the allocation.
56    type Buffer;
57    /// Returns the memory layouts of the allocation.
58    fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError>;
59    /// Wraps the raw allocated buffer.
60    ///
61    /// # Safety
62    ///
63    /// The buffer must have the layout returned by [`Self::memory_layout`]
64    unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer;
65}
66
67impl<T: TryInto<MemoryLayout>> AllocLayout for T
68where
69    T::Error: Into<ZLayoutError>,
70{
71    type Buffer = ZShmMut;
72
73    fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError> {
74        self.try_into().map_err(Into::into)
75    }
76
77    unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer {
78        buffer
79    }
80}
81
82impl<T> AllocLayout for TypedLayout<T> {
83    type Buffer = Typed<MaybeUninit<T>, ZShmMut>;
84
85    fn memory_layout(self) -> Result<MemoryLayout, ZLayoutError> {
86        Ok(MemoryLayout::for_type::<T>())
87    }
88
89    unsafe fn wrap_buffer(buffer: ZShmMut) -> Self::Buffer {
90        // SAFETY: same precondition
91        unsafe { Typed::new_unchecked(buffer) }
92    }
93}
94
95/// A layout for allocations.
96/// This is a pre-calculated layout suitable for making series of similar allocations
97/// adopted for particular ShmProvider
98#[zenoh_macros::unstable_doc]
99#[derive(Debug)]
100pub struct PrecomputedLayout<'a, Backend, Layout> {
101    size: NonZeroUsize,
102    provider_layout: MemoryLayout,
103    provider: &'a ShmProvider<Backend>,
104    _phantom: PhantomData<Layout>,
105}
106
107impl<'a, Backend, Layout> PrecomputedLayout<'a, Backend, Layout>
108where
109    Backend: ShmProviderBackend,
110{
111    /// Allocate the new buffer with this layout
112    #[zenoh_macros::unstable_doc]
113    pub fn alloc<'b>(&'b self) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout> {
114        PrecomputedAllocBuilder {
115            layout: self,
116            policy: JustAlloc,
117        }
118    }
119}
120
121impl<'a, Backend, Layout> PrecomputedLayout<'a, Backend, Layout>
122where
123    Backend: ShmProviderBackend,
124{
125    fn new(
126        provider: &'a ShmProvider<Backend>,
127        required_layout: MemoryLayout,
128    ) -> Result<Self, ZLayoutError> {
129        // NOTE: Depending on internal implementation, provider's backend might relayout
130        // the allocations for bigger alignment (ex. 4-byte aligned allocation to 8-bytes aligned)
131
132        // calculate required layout size
133        let size = required_layout.size();
134
135        // Obtain provider's layout for our layout
136        let provider_layout = provider
137            .backend
138            .layout_for(required_layout)
139            .map_err(|_| ZLayoutError::ProviderIncompatibleLayout)?;
140
141        Ok(Self {
142            size,
143            provider_layout,
144            provider,
145            _phantom: PhantomData,
146        })
147    }
148}
149
150/// Trait for allocation policies
151#[zenoh_macros::unstable_doc]
152pub trait AllocPolicy<Backend> {
153    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult;
154}
155
156/// Trait for async allocation policies
157#[zenoh_macros::unstable_doc]
158#[async_trait]
159pub trait AsyncAllocPolicy<Backend>: Send {
160    async fn alloc_async(
161        &self,
162        layout: &MemoryLayout,
163        provider: &ShmProvider<Backend>,
164    ) -> ChunkAllocResult;
165}
166
167/// Marker trait for policies that are safe to use
168///
169/// # Safety
170///
171/// Policies implementing this trait must be safe to use. For example, [`Deallocate`] is not safe
172/// as it may deallocate and reuse a buffer that is currently in use. Users of unsafe policy must
173/// enforce the safety of their code by other means.
174pub unsafe trait SafePolicy {}
175
176/// Marker trait for compile-time allocation policies
177#[zenoh_macros::unstable_doc]
178pub trait ConstPolicy {
179    const NEW: Self;
180}
181
182/// A policy value that can be either constant or runtime provided.
183pub trait PolicyValue<T> {
184    fn get(&self) -> T;
185}
186
187impl<T: Copy> PolicyValue<T> for T {
188    fn get(&self) -> T {
189        *self
190    }
191}
192/// A const `bool`.
193pub struct ConstBool<const VALUE: bool>;
194impl<const VALUE: bool> ConstPolicy for ConstBool<VALUE> {
195    const NEW: Self = Self;
196}
197impl<const VALUE: bool> PolicyValue<bool> for ConstBool<VALUE> {
198    fn get(&self) -> bool {
199        VALUE
200    }
201}
202/// A const `usize`.
203pub struct ConstUsize<const VALUE: usize>;
204impl<const VALUE: usize> ConstPolicy for ConstUsize<VALUE> {
205    const NEW: Self = Self;
206}
207impl<const VALUE: usize> PolicyValue<usize> for ConstUsize<VALUE> {
208    fn get(&self) -> usize {
209        VALUE
210    }
211}
212
213/// Just try to allocate
214#[zenoh_macros::unstable_doc]
215#[derive(Clone, Copy)]
216pub struct JustAlloc;
217impl<Backend> AllocPolicy<Backend> for JustAlloc
218where
219    Backend: ShmProviderBackend,
220{
221    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
222        provider.backend.alloc(layout)
223    }
224}
225unsafe impl SafePolicy for JustAlloc {}
226impl ConstPolicy for JustAlloc {
227    const NEW: Self = Self;
228}
229
230/// Garbage collection policy.
231///
232/// Try to reclaim old buffers if allocation failed and allocate again
233/// if the largest reclaimed chuk is not smaller than the one required.
234///
235/// If `Safe` is set to false, the policy may lead to reallocate memory currently in-use.
236#[zenoh_macros::unstable_doc]
237#[derive(Clone, Copy)]
238pub struct GarbageCollect<InnerPolicy = JustAlloc, AltPolicy = JustAlloc, Safe = ConstBool<true>> {
239    inner_policy: InnerPolicy,
240    alt_policy: AltPolicy,
241    safe: Safe,
242}
243impl<InnerPolicy, AltPolicy, Safe> GarbageCollect<InnerPolicy, AltPolicy, Safe> {
244    /// Creates a new `GarbageCollect` policy.
245    pub fn new(inner_policy: InnerPolicy, alt_policy: AltPolicy, safe: Safe) -> Self {
246        Self {
247            inner_policy,
248            alt_policy,
249            safe,
250        }
251    }
252}
253impl<Backend, InnerPolicy, AltPolicy, Safe> AllocPolicy<Backend>
254    for GarbageCollect<InnerPolicy, AltPolicy, Safe>
255where
256    Backend: ShmProviderBackend,
257    InnerPolicy: AllocPolicy<Backend>,
258    AltPolicy: AllocPolicy<Backend>,
259    Safe: PolicyValue<bool>,
260{
261    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
262        let result = self.inner_policy.alloc(layout, provider);
263        if result.is_err() {
264            // try to alloc again only if GC managed to reclaim big enough chunk
265            let collected = if self.safe.get() {
266                provider.garbage_collect()
267            } else {
268                // SAFETY: the policy doesn't implement `SafePolicy` in this case, so the user
269                // must enforce the safety himself
270                unsafe { provider.garbage_collect_unsafe() }
271            };
272            if collected >= layout.size().get() {
273                return self.alt_policy.alloc(layout, provider);
274            }
275        }
276        result
277    }
278}
279unsafe impl<InnerPolicy: SafePolicy, AltPolicy: SafePolicy> SafePolicy
280    for GarbageCollect<InnerPolicy, AltPolicy, ConstBool<true>>
281{
282}
283impl<InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy, Safe: ConstPolicy> ConstPolicy
284    for GarbageCollect<InnerPolicy, AltPolicy, Safe>
285{
286    const NEW: Self = Self {
287        inner_policy: InnerPolicy::NEW,
288        alt_policy: AltPolicy::NEW,
289        safe: Safe::NEW,
290    };
291}
292
293/// Defragmenting policy.
294///
295/// Try to defragment if allocation failed and allocate again
296/// if the largest defragmented chuk is not smaller than the one required
297#[zenoh_macros::unstable_doc]
298#[derive(Clone, Copy)]
299pub struct Defragment<InnerPolicy = JustAlloc, AltPolicy = JustAlloc> {
300    inner_policy: InnerPolicy,
301    alt_policy: AltPolicy,
302}
303impl<InnerPolicy, AltPolicy> Defragment<InnerPolicy, AltPolicy> {
304    /// Creates a new `Defragment` policy.
305    pub fn new(inner_policy: InnerPolicy, alt_policy: AltPolicy) -> Self {
306        Self {
307            inner_policy,
308            alt_policy,
309        }
310    }
311}
312impl<Backend, InnerPolicy, AltPolicy> AllocPolicy<Backend> for Defragment<InnerPolicy, AltPolicy>
313where
314    Backend: ShmProviderBackend,
315    InnerPolicy: AllocPolicy<Backend>,
316    AltPolicy: AllocPolicy<Backend>,
317{
318    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
319        let result = self.inner_policy.alloc(layout, provider);
320        if let Err(ZAllocError::NeedDefragment) = result {
321            // try to alloc again only if big enough chunk was defragmented
322            if provider.defragment() >= layout.size().get() {
323                return self.alt_policy.alloc(layout, provider);
324            }
325        }
326        result
327    }
328}
329unsafe impl<InnerPolicy: SafePolicy, AltPolicy: SafePolicy> SafePolicy
330    for Defragment<InnerPolicy, AltPolicy>
331{
332}
333impl<InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy> ConstPolicy
334    for Defragment<InnerPolicy, AltPolicy>
335{
336    const NEW: Self = Self {
337        inner_policy: InnerPolicy::NEW,
338        alt_policy: AltPolicy::NEW,
339    };
340}
341
342/// Deallocating policy.
343///
344/// Forcibly deallocate up to N buffers until allocation succeeds.
345///
346/// This policy is unsafe as it may lead to reallocate memory currently in-use.
347#[zenoh_macros::unstable_doc]
348#[derive(Clone, Copy)]
349pub struct Deallocate<Limit, InnerPolicy = JustAlloc, AltPolicy = InnerPolicy> {
350    limit: Limit,
351    inner_policy: InnerPolicy,
352    alt_policy: AltPolicy,
353}
354impl<Limit, InnerPolicy, AltPolicy> Deallocate<Limit, InnerPolicy, AltPolicy> {
355    /// Creates a new `Deallocate` policy.
356    pub fn new(limit: Limit, inner_policy: InnerPolicy, alt_policy: AltPolicy) -> Self {
357        Self {
358            limit,
359            inner_policy,
360            alt_policy,
361        }
362    }
363}
364impl<Backend, Limit, InnerPolicy, AltPolicy> AllocPolicy<Backend>
365    for Deallocate<Limit, InnerPolicy, AltPolicy>
366where
367    Backend: ShmProviderBackend,
368    Limit: PolicyValue<usize>,
369    InnerPolicy: AllocPolicy<Backend>,
370    AltPolicy: AllocPolicy<Backend>,
371{
372    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
373        for _ in 0..self.limit.get() {
374            match self.inner_policy.alloc(layout, provider) {
375                res @ (Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory)) => {
376                    let Some(chunk) = zlock!(provider.busy_list).pop() else {
377                        return res;
378                    };
379                    provider.backend.free(&chunk.descriptor());
380                }
381                res => return res,
382            }
383        }
384        self.alt_policy.alloc(layout, provider)
385    }
386}
387impl<Limit: ConstPolicy, InnerPolicy: ConstPolicy, AltPolicy: ConstPolicy> ConstPolicy
388    for Deallocate<Limit, InnerPolicy, AltPolicy>
389{
390    const NEW: Self = Self {
391        limit: Limit::NEW,
392        inner_policy: InnerPolicy::NEW,
393        alt_policy: AltPolicy::NEW,
394    };
395}
396
397/// Blocking allocation policy.
398/// This policy will block until the allocation succeeds.
399/// Both sync and async modes available.
400#[zenoh_macros::unstable_doc]
401#[derive(Clone, Copy)]
402pub struct BlockOn<InnerPolicy = JustAlloc> {
403    inner_policy: InnerPolicy,
404}
405impl<InnerPolicy> BlockOn<InnerPolicy> {
406    /// Creates a new `BlockOn` policy.
407    pub fn new(inner_policy: InnerPolicy) -> Self {
408        Self { inner_policy }
409    }
410}
411#[async_trait]
412impl<Backend, InnerPolicy> AsyncAllocPolicy<Backend> for BlockOn<InnerPolicy>
413where
414    Backend: ShmProviderBackend + Sync,
415    InnerPolicy: AllocPolicy<Backend> + Send + Sync,
416{
417    async fn alloc_async(
418        &self,
419        layout: &MemoryLayout,
420        provider: &ShmProvider<Backend>,
421    ) -> ChunkAllocResult {
422        loop {
423            match self.inner_policy.alloc(layout, provider) {
424                Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
425                    // TODO: implement provider's async signalling instead of this!
426                    tokio::time::sleep(Duration::from_millis(1)).await;
427                }
428                other_result => {
429                    return other_result;
430                }
431            }
432        }
433    }
434}
435unsafe impl<InnerPolicy: SafePolicy> SafePolicy for BlockOn<InnerPolicy> {}
436impl<Backend, InnerPolicy> AllocPolicy<Backend> for BlockOn<InnerPolicy>
437where
438    Backend: ShmProviderBackend,
439    InnerPolicy: AllocPolicy<Backend>,
440{
441    fn alloc(&self, layout: &MemoryLayout, provider: &ShmProvider<Backend>) -> ChunkAllocResult {
442        loop {
443            match self.inner_policy.alloc(layout, provider) {
444                Err(ZAllocError::NeedDefragment) | Err(ZAllocError::OutOfMemory) => {
445                    // TODO: implement provider's async signalling instead of this!
446                    std::thread::sleep(Duration::from_millis(1));
447                }
448                other_result => {
449                    return other_result;
450                }
451            }
452        }
453    }
454}
455impl<InnerPolicy: ConstPolicy> ConstPolicy for BlockOn<InnerPolicy> {
456    const NEW: Self = Self {
457        inner_policy: InnerPolicy::NEW,
458    };
459}
460
461#[zenoh_macros::unstable_doc]
462pub struct AllocBuilder<'a, Backend, Layout, Policy = JustAlloc> {
463    provider: &'a ShmProvider<Backend>,
464    layout: Layout,
465    policy: Policy,
466}
467
468// Generic impl
469impl<'a, Backend, Layout, Policy> AllocBuilder<'a, Backend, Layout, Policy> {
470    /// Set the allocation policy
471    #[zenoh_macros::unstable_doc]
472    pub fn with_policy<OtherPolicy: SafePolicy + ConstPolicy>(
473        self,
474    ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
475        // SAFETY: the policy is safe
476        unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
477    }
478
479    /// Set the unsafe allocation policy
480    ///
481    /// # Safety
482    ///
483    /// The user must ensure its use of allocated buffers are safe taking into account the policy.
484    #[zenoh_macros::unstable_doc]
485    pub unsafe fn with_unsafe_policy<OtherPolicy: ConstPolicy>(
486        self,
487    ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
488        // SAFETY: same function contract
489        unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
490    }
491
492    /// Set the unsafe allocation policy at runtime
493    ///
494    /// # Safety
495    ///
496    /// The user must ensure its use of allocated buffers are safe taking into account the policy.
497    #[zenoh_macros::unstable_doc]
498    pub unsafe fn with_runtime_policy<OtherPolicy>(
499        self,
500        policy: OtherPolicy,
501    ) -> AllocBuilder<'a, Backend, Layout, OtherPolicy> {
502        AllocBuilder {
503            provider: self.provider,
504            layout: self.layout,
505            policy,
506        }
507    }
508
509    fn layout_policy(self) -> Result<(PrecomputedLayout<'a, Backend, Layout>, Policy), ZLayoutError>
510    where
511        Backend: ShmProviderBackend,
512        Layout: AllocLayout,
513    {
514        let layout = PrecomputedLayout::new(self.provider, self.layout.memory_layout()?)?;
515        Ok((layout, self.policy))
516    }
517}
518
519impl<'a, Backend: ShmProviderBackend, Layout: TryInto<MemoryLayout>>
520    AllocBuilder<'a, Backend, Layout>
521where
522    Layout::Error: Into<ZLayoutError>,
523{
524    /// Try to build an allocation layout
525    #[deprecated(
526        since = "1.5.3",
527        note = "Please use `ShmProvider::alloc_layout` method instead"
528    )]
529    #[zenoh_macros::unstable_doc]
530    pub fn into_layout(self) -> Result<PrecomputedLayout<'a, Backend, Layout>, ZLayoutError> {
531        PrecomputedLayout::new(self.provider, self.layout.try_into().map_err(Into::into)?)
532    }
533}
534
535impl<Backend, Layout: AllocLayout, Policy> Resolvable
536    for AllocBuilder<'_, Backend, Layout, Policy>
537{
538    type To = Result<Layout::Buffer, ZLayoutAllocError>;
539}
540
541impl<Backend: ShmProviderBackend, Layout: AllocLayout, Policy: AllocPolicy<Backend>> Wait
542    for AllocBuilder<'_, Backend, Layout, Policy>
543{
544    fn wait(self) -> <Self as Resolvable>::To {
545        let (layout, policy) = self.layout_policy()?;
546        // SAFETY: same contract used to set the policy
547        Ok(unsafe { layout.alloc().with_runtime_policy(policy) }.wait()?)
548    }
549}
550
551impl<
552        'a,
553        Backend: ShmProviderBackend + Sync,
554        Layout: AllocLayout + Send + Sync + 'a,
555        Policy: AsyncAllocPolicy<Backend> + Sync + 'a,
556    > IntoFuture for AllocBuilder<'a, Backend, Layout, Policy>
557{
558    type Output = <Self as Resolvable>::To;
559    type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'a + Send>>;
560
561    fn into_future(self) -> Self::IntoFuture {
562        Box::pin(async move {
563            let (layout, policy) = self.layout_policy()?;
564            // SAFETY: same contract used to set the policy
565            Ok(unsafe { layout.alloc().with_runtime_policy(policy) }.await?)
566        })
567    }
568}
569
570/// Builder for making allocations with instant layout calculation
571#[zenoh_macros::unstable_doc]
572pub struct PrecomputedAllocBuilder<'b, 'a: 'b, Backend, Layout, Policy = JustAlloc> {
573    layout: &'b PrecomputedLayout<'a, Backend, Layout>,
574    policy: Policy,
575}
576
577// Generic impl
578impl<'b, 'a: 'b, Backend, Layout, Policy> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, Policy> {
579    /// Set the allocation policy
580    #[zenoh_macros::unstable_doc]
581    pub fn with_policy<OtherPolicy: ConstPolicy>(
582        self,
583    ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
584        // SAFETY: the policy is safe
585        unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
586    }
587
588    /// Set the unsafe allocation policy
589    ///
590    /// # Safety
591    ///
592    /// The user must ensure its use of allocated buffers are safe taking into account the policy.
593    #[zenoh_macros::unstable_doc]
594    pub unsafe fn with_unsafe_policy<OtherPolicy: ConstPolicy>(
595        self,
596    ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
597        // SAFETY: same function contract
598        unsafe { self.with_runtime_policy(OtherPolicy::NEW) }
599    }
600
601    /// Set the unsafe allocation policy at runtime
602    ///
603    /// # Safety
604    ///
605    /// The user must ensure its use of allocated buffers are safe taking into account the policy.
606    #[zenoh_macros::unstable_doc]
607    pub unsafe fn with_runtime_policy<OtherPolicy>(
608        self,
609        policy: OtherPolicy,
610    ) -> PrecomputedAllocBuilder<'b, 'a, Backend, Layout, OtherPolicy> {
611        PrecomputedAllocBuilder {
612            layout: self.layout,
613            policy,
614        }
615    }
616}
617
618impl<Backend, Layout: AllocLayout, Policy> Resolvable
619    for PrecomputedAllocBuilder<'_, '_, Backend, Layout, Policy>
620{
621    type To = Result<Layout::Buffer, ZAllocError>;
622}
623
624impl<Backend: ShmProviderBackend, Layout: AllocLayout, Policy: AllocPolicy<Backend>> Wait
625    for PrecomputedAllocBuilder<'_, '_, Backend, Layout, Policy>
626{
627    fn wait(self) -> <Self as Resolvable>::To {
628        let buffer = self.layout.provider.alloc_inner(
629            self.layout.size,
630            &self.layout.provider_layout,
631            &self.policy,
632        )?;
633        // SAFETY: the buffer has been allocated with the requested layout
634        Ok(unsafe { Layout::wrap_buffer(buffer) })
635    }
636}
637
638impl<
639        'b,
640        'a: 'b,
641        Backend: ShmProviderBackend + Sync,
642        Layout: AllocLayout + Sync,
643        Policy: AsyncAllocPolicy<Backend> + Sync + 'b,
644    > IntoFuture for PrecomputedAllocBuilder<'b, 'a, Backend, Layout, Policy>
645{
646    type Output = <Self as Resolvable>::To;
647    type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + 'b + Send>>;
648
649    fn into_future(self) -> Self::IntoFuture {
650        Box::pin(async move {
651            let buffer = self
652                .layout
653                .provider
654                .alloc_inner_async(self.layout.size, &self.layout.provider_layout, &self.policy)
655                .await?;
656            // SAFETY: the buffer has been allocated with the requested layout
657            Ok(unsafe { Layout::wrap_buffer(buffer) })
658        })
659    }
660}
661
662#[zenoh_macros::unstable_doc]
663pub struct ShmProviderBuilder;
664impl ShmProviderBuilder {
665    /// Set the backend
666    #[zenoh_macros::unstable_doc]
667    pub fn backend<Backend: ShmProviderBackend>(
668        backend: Backend,
669    ) -> ShmProviderBuilderBackend<Backend> {
670        ShmProviderBuilderBackend { backend }
671    }
672
673    /// Set the default backend
674    #[zenoh_macros::unstable_doc]
675    pub fn default_backend<Layout>(layout: Layout) -> ShmProviderBuilderWithDefaultBackend<Layout> {
676        ShmProviderBuilderWithDefaultBackend { layout }
677    }
678}
679
680#[zenoh_macros::unstable_doc]
681pub struct ShmProviderBuilderBackend<Backend>
682where
683    Backend: ShmProviderBackend,
684{
685    backend: Backend,
686}
687#[zenoh_macros::unstable_doc]
688impl<Backend> Resolvable for ShmProviderBuilderBackend<Backend>
689where
690    Backend: ShmProviderBackend,
691{
692    type To = ShmProvider<Backend>;
693}
694
695#[zenoh_macros::unstable_doc]
696impl<Backend> Wait for ShmProviderBuilderBackend<Backend>
697where
698    Backend: ShmProviderBackend,
699{
700    /// build ShmProvider
701    fn wait(self) -> <Self as Resolvable>::To {
702        ShmProvider::new(self.backend)
703    }
704}
705
706#[zenoh_macros::unstable_doc]
707pub struct ShmProviderBuilderWithDefaultBackend<Layout> {
708    layout: Layout,
709}
710
711#[zenoh_macros::unstable_doc]
712impl<Layout> Resolvable for ShmProviderBuilderWithDefaultBackend<Layout> {
713    type To = ZResult<ShmProvider<PosixShmProviderBackend>>;
714}
715
716#[zenoh_macros::unstable_doc]
717impl<Layout: TryInto<MemoryLayout>> Wait for ShmProviderBuilderWithDefaultBackend<Layout>
718where
719    Layout::Error: Error,
720    PosixShmProviderBackendBuilder<Layout>:
721        Resolvable<To = ZResult<PosixShmProviderBackend>> + Wait,
722{
723    /// build ShmProvider
724    fn wait(self) -> <Self as Resolvable>::To {
725        let backend = PosixShmProviderBackend::builder(self.layout).wait()?;
726        Ok(ShmProvider::new(backend))
727    }
728}
729
730#[derive(Debug)]
731struct BusyChunk {
732    metadata: AllocatedMetadataDescriptor,
733}
734
735impl BusyChunk {
736    fn new(metadata: AllocatedMetadataDescriptor) -> Self {
737        Self { metadata }
738    }
739
740    fn descriptor(&self) -> ChunkDescriptor {
741        self.metadata.header().data_descriptor()
742    }
743}
744
745/// A generalized interface for shared memory data sources
746#[zenoh_macros::unstable_doc]
747#[derive(Debug)]
748pub struct ShmProvider<Backend> {
749    backend: Backend,
750    busy_list: Mutex<Vec<BusyChunk>>,
751}
752
753impl<Backend: Default> Default for ShmProvider<Backend> {
754    fn default() -> Self {
755        Self::new(Backend::default())
756    }
757}
758
759impl<Backend> ShmProvider<Backend>
760where
761    Backend: ShmProviderBackend,
762{
763    /// Allocates a buffer with a given memory layout.
764    #[zenoh_macros::unstable_doc]
765    pub fn alloc<Layout>(&self, layout: Layout) -> AllocBuilder<'_, Backend, Layout> {
766        AllocBuilder {
767            provider: self,
768            layout,
769            policy: JustAlloc,
770        }
771    }
772
773    /// Precompute the actual layout for an allocation.
774    #[zenoh_macros::unstable_doc]
775    pub fn alloc_layout<Layout: TryInto<MemoryLayout>>(
776        &self,
777        layout: Layout,
778    ) -> Result<PrecomputedLayout<'_, Backend, Layout>, ZLayoutError>
779    where
780        Layout::Error: Into<ZLayoutError>,
781    {
782        PrecomputedLayout::new(self, layout.try_into().map_err(Into::into)?)
783    }
784    /// Defragment memory
785    #[zenoh_macros::unstable_doc]
786    pub fn defragment(&self) -> usize {
787        self.backend.defragment()
788    }
789
790    /// Map externally-allocated chunk into ZShmMut.
791    /// This method is designed to be used with push data sources.
792    /// Remember that chunk's len may be >= len!
793    #[zenoh_macros::unstable_doc]
794    pub fn map(&self, chunk: AllocatedChunk, len: usize) -> Result<ZShmMut, ZAllocError> {
795        let len = len.try_into().map_err(|_| ZAllocError::Other)?;
796
797        // allocate resources for SHM buffer
798        let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
799
800        // wrap everything to ShmBufInner
801        let wrapped = self.wrap(chunk, len, allocated_metadata, confirmed_metadata);
802        Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
803    }
804
805    #[zenoh_macros::unstable_doc]
806    fn garbage_collect_impl<const SAFE: bool>(&self) -> usize {
807        fn retain_unordered<T>(vec: &mut Vec<T>, mut f: impl FnMut(&T) -> bool) {
808            let mut i = 0;
809            while i < vec.len() {
810                if f(&vec[i]) {
811                    i += 1;
812                } else {
813                    vec.swap_remove(i); // move last into place of vec[i]
814                                        // don't increment i: need to test the swapped-in element
815                }
816            }
817        }
818
819        tracing::trace!("Running Garbage Collector");
820        let mut largest = 0usize;
821        retain_unordered(&mut zlock!(self.busy_list), |chunk| {
822            let header = chunk.metadata.header();
823            if header.refcount.load(Ordering::SeqCst) == 0
824                || (!SAFE && header.watchdog_invalidated.load(Ordering::SeqCst))
825            {
826                tracing::trace!("Garbage Collecting Chunk: {:?}", chunk);
827                let descriptor_to_free = chunk.descriptor();
828                self.backend.free(&descriptor_to_free);
829                largest = largest.max(descriptor_to_free.len.get());
830                return false;
831            }
832            true
833        });
834        largest
835    }
836
837    /// Try to collect free chunks.
838    /// Returns the size of largest collected chunk
839    #[zenoh_macros::unstable_doc]
840    pub fn garbage_collect(&self) -> usize {
841        self.garbage_collect_impl::<true>()
842    }
843
844    /// Try to collect free chunks.
845    /// Returns the size of largest collected chunk
846    ///
847    /// # Safety
848    ///
849    /// User must ensure there is no data races with collected chunks, as some of them may still be in-use.
850    #[zenoh_macros::unstable_doc]
851    pub unsafe fn garbage_collect_unsafe(&self) -> usize {
852        self.garbage_collect_impl::<false>()
853    }
854
855    /// Bytes available for use
856    #[zenoh_macros::unstable_doc]
857    pub fn available(&self) -> usize {
858        self.backend.available()
859    }
860}
861
862// PRIVATE impls
863impl<Backend> ShmProvider<Backend> {
864    fn new(backend: Backend) -> Self {
865        Self {
866            backend,
867            busy_list: Default::default(),
868        }
869    }
870}
871
872impl<Backend> ShmProvider<Backend>
873where
874    Backend: ShmProviderBackend,
875{
876    fn alloc_inner<Policy>(
877        &self,
878        size: NonZeroUsize,
879        layout: &MemoryLayout,
880        policy: &Policy,
881    ) -> Result<ZShmMut, ZAllocError>
882    where
883        Policy: AllocPolicy<Backend>,
884    {
885        // allocate resources for SHM buffer
886        let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
887
888        // allocate data chunk
889        // Perform actions depending on the Policy
890        // NOTE: it is necessary to properly map this chunk OR free it if mapping fails!
891        // Don't loose this chunk as it leads to memory leak at the backend side!
892        // NOTE: self.backend.alloc(len) returns chunk with len >= required len,
893        // and it is necessary to handle that properly and pass this len to corresponding free(...)
894        let chunk = policy.alloc(layout, self)?;
895
896        // wrap allocated chunk to ShmBufInner
897        let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
898        Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
899    }
900
901    fn alloc_resources() -> Result<(AllocatedMetadataDescriptor, ConfirmedDescriptor), ZAllocError>
902    {
903        // allocate metadata
904        let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?;
905
906        // add watchdog to confirmator
907        let confirmed_metadata = GLOBAL_CONFIRMATOR.read().add(allocated_metadata.clone());
908
909        Ok((allocated_metadata, confirmed_metadata))
910    }
911
912    fn wrap(
913        &self,
914        chunk: AllocatedChunk,
915        len: NonZeroUsize,
916        allocated_metadata: AllocatedMetadataDescriptor,
917        confirmed_metadata: ConfirmedDescriptor,
918    ) -> ShmBufInner {
919        // write additional metadata
920        // chunk descriptor
921        allocated_metadata
922            .header()
923            .set_data_descriptor(&chunk.descriptor);
924        // protocol
925        allocated_metadata
926            .header()
927            .protocol
928            .store(self.backend.id(), Ordering::Relaxed);
929
930        // add watchdog to validator
931        GLOBAL_VALIDATOR
932            .read()
933            .add(confirmed_metadata.owned.clone());
934
935        // Create buffer's info
936        let info = ShmBufInfo::new(
937            len,
938            MetadataDescriptor::from(&confirmed_metadata.owned),
939            allocated_metadata
940                .header()
941                .generation
942                .load(Ordering::SeqCst),
943        );
944
945        // Create buffer
946        let shmb = ShmBufInner {
947            metadata: confirmed_metadata,
948            buf: chunk.data,
949            info,
950        };
951
952        // Create and store busy chunk
953        zlock!(self.busy_list).push(BusyChunk::new(allocated_metadata));
954
955        shmb
956    }
957}
958
959// PRIVATE impls for Sync backend
960impl<Backend> ShmProvider<Backend>
961where
962    Backend: ShmProviderBackend + Sync,
963{
964    async fn alloc_inner_async<Policy>(
965        &self,
966        size: NonZeroUsize,
967        backend_layout: &MemoryLayout,
968        policy: &Policy,
969    ) -> Result<ZShmMut, ZAllocError>
970    where
971        Policy: AsyncAllocPolicy<Backend>,
972    {
973        // allocate resources for SHM buffer
974        let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;
975
976        // allocate data chunk
977        // Perform actions depending on the Policy
978        // NOTE: it is necessary to properly map this chunk OR free it if mapping fails!
979        // Don't loose this chunk as it leads to memory leak at the backend side!
980        // NOTE: self.backend.alloc(len) returns chunk with len >= required len,
981        // and it is necessary to handle that properly and pass this len to corresponding free(...)
982        let chunk = policy.alloc_async(backend_layout, self).await?;
983
984        // wrap allocated chunk to ShmBufInner
985        let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
986        Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
987    }
988}