tokio_uring/buf/fixed/
pool.rs

1//! A dynamic collection of I/O buffers pre-registered with the kernel.
2//!
3//! This module provides [`FixedBufPool`], a collection that implements
4//! dynamic management of sets of interchangeable memory buffers
5//! registered with the kernel for `io-uring` operations. Asynchronous
6//! rotation of the buffers shared by multiple tasks is also supported
7//! by [`FixedBufPool`].
8//!
9//! [`FixedBufPool`]: self::FixedBufPool
10
11use super::plumbing;
12use super::FixedBuf;
13use crate::buf::IoBufMut;
14use crate::runtime::CONTEXT;
15
16use tokio::pin;
17use tokio::sync::Notify;
18
19use std::cell::RefCell;
20use std::io;
21use std::rc::Rc;
22use std::sync::Arc;
23
24/// A dynamic collection of I/O buffers pre-registered with the kernel.
25///
26/// `FixedBufPool` allows the application to manage a collection of buffers
27/// allocated in memory, that can be registered in the current `tokio-uring`
28/// context using the [`register`] method. Unlike [`FixedBufRegistry`],
29/// individual buffers are not retrieved by index; instead, an available
30/// buffer matching a specified capacity can be retrieved with the [`try_next`]
31/// method. In asynchronous contexts, the [`next`] method can be used to wait
32/// until such a buffer becomes available.
33/// This allows some flexibility in managing sets of buffers with
34/// different capacity tiers. The need to maintain lists of free buffers,
35/// however, imposes additional runtime overhead.
36///
37/// A `FixedBufPool` value is a lightweight handle for a collection of
38/// allocated buffers. Cloning of a `FixedBufPool` creates a new reference to
39/// the same collection of buffers.
40///
41/// The buffers of the collection are not deallocated until:
42/// - all `FixedBufPool` references to the collection have been dropped;
43/// - all [`FixedBuf`] handles to individual buffers in the collection have
44///   been dropped, including the buffer handles owned by any I/O operations
45///   in flight;
46/// - The `tokio-uring` [`Runtime`] the buffers are registered with
47///   has been dropped.
48///
49/// [`register`]: Self::register
50/// [`try_next`]: Self::try_next
51/// [`next`]: Self::next
52/// [`FixedBufRegistry`]: super::FixedBufRegistry
53/// [`Runtime`]: crate::Runtime
54/// [`FixedBuf`]: super::FixedBuf
55///
56/// # Examples
57///
58/// ```
59/// use tokio_uring::buf::fixed::FixedBufPool;
60/// use tokio_uring::buf::IoBuf;
61/// use std::iter;
62/// use std::mem;
63///
64/// # #[allow(non_snake_case)]
65/// # fn main() -> Result<(), std::io::Error> {
66/// # use nix::sys::resource::{getrlimit, Resource};
67/// # let (memlock_limit, _) = getrlimit(Resource::RLIMIT_MEMLOCK)?;
68/// # let BUF_SIZE_LARGE = memlock_limit as usize / 8;
69/// # let BUF_SIZE_SMALL = memlock_limit as usize / 16;
70/// tokio_uring::start(async {
71///     let pool = FixedBufPool::new(
72///          iter::once(Vec::with_capacity(BUF_SIZE_LARGE))
73///              .chain(iter::repeat_with(|| Vec::with_capacity(BUF_SIZE_SMALL)).take(2))
74///      );
75///
76///     pool.register()?;
77///
78///     let buf = pool.try_next(BUF_SIZE_LARGE).unwrap();
79///     assert_eq!(buf.bytes_total(), BUF_SIZE_LARGE);
80///     let next = pool.try_next(BUF_SIZE_LARGE);
81///     assert!(next.is_none());
82///     let buf1 = pool.try_next(BUF_SIZE_SMALL).unwrap();
83///     assert_eq!(buf1.bytes_total(), BUF_SIZE_SMALL);
84///     let buf2 = pool.try_next(BUF_SIZE_SMALL).unwrap();
85///     assert_eq!(buf2.bytes_total(), BUF_SIZE_SMALL);
86///     let next = pool.try_next(BUF_SIZE_SMALL);
87///     assert!(next.is_none());
88///     mem::drop(buf);
89///     let buf = pool.try_next(BUF_SIZE_LARGE).unwrap();
90///     assert_eq!(buf.bytes_total(), BUF_SIZE_LARGE);
91///
92///     Ok(())
93/// })
94/// # }
95/// ```
96#[derive(Clone)]
97pub struct FixedBufPool<T: IoBufMut> {
98    inner: Rc<RefCell<plumbing::Pool<T>>>,
99}
100
101impl<T: IoBufMut> FixedBufPool<T> {
102    /// Creates a new collection of buffers from the provided allocated vectors.
103    ///
104    /// The buffers are assigned 0-based indices in the order of the iterable
105    /// input parameter. The returned collection takes up to [`UIO_MAXIOV`]
106    /// buffers from the input. Any items in excess of that amount are silently
107    /// dropped, unless the input iterator produces the vectors lazily.
108    ///
109    /// [`UIO_MAXIOV`]: libc::UIO_MAXIOV
110    ///
111    /// # Examples
112    ///
113    /// When providing uninitialized vectors for the collection, take care to
114    /// not replicate a vector with `.clone()` as that does not preserve the
115    /// capacity and the resulting buffer pointer will be rejected by the kernel.
116    /// This means that the following use of [`iter::repeat`] would not work:
117    ///
118    /// [`iter::repeat`]: std::iter::repeat
119    ///
120    /// ```should_panic
121    /// use tokio_uring::buf::fixed::FixedBufPool;
122    /// use std::iter;
123    ///
124    /// # #[allow(non_snake_case)]
125    /// # fn main() -> Result<(), std::io::Error> {
126    /// # use nix::sys::resource::{getrlimit, Resource};
127    /// # let (memlock_limit, _) = getrlimit(Resource::RLIMIT_MEMLOCK)?;
128    /// # let NUM_BUFFERS = std::cmp::max(memlock_limit as usize / 4096 / 8, 1);
129    /// # let BUF_SIZE = 4096;
130    /// let pool = FixedBufPool::new(
131    ///     iter::repeat(Vec::with_capacity(BUF_SIZE)).take(NUM_BUFFERS)
132    /// );
133    ///
134    /// tokio_uring::start(async {
135    ///     pool.register()?;
136    ///     // ...
137    ///     Ok(())
138    /// })
139    /// # }
140    /// ```
141    ///
142    /// Instead, create the vectors with requested capacity directly:
143    ///
144    /// ```
145    /// use tokio_uring::buf::fixed::FixedBufPool;
146    /// use std::iter;
147    ///
148    /// # #[allow(non_snake_case)]
149    /// # fn main() -> Result<(), std::io::Error> {
150    /// # use nix::sys::resource::{getrlimit, Resource};
151    /// # let (memlock_limit, _) = getrlimit(Resource::RLIMIT_MEMLOCK)?;
152    /// # let NUM_BUFFERS = std::cmp::max(memlock_limit as usize / 4096 / 8, 1);
153    /// # let BUF_SIZE = 4096;
154    /// let pool = FixedBufPool::new(
155    ///     iter::repeat_with(|| Vec::with_capacity(BUF_SIZE)).take(NUM_BUFFERS)
156    /// );
157    ///
158    /// tokio_uring::start(async {
159    ///     pool.register()?;
160    ///     // ...
161    ///     Ok(())
162    /// })
163    /// # }
164    /// ```
165    pub fn new(bufs: impl IntoIterator<Item = T>) -> Self {
166        FixedBufPool {
167            inner: Rc::new(RefCell::new(plumbing::Pool::new(bufs.into_iter()))),
168        }
169    }
170
171    /// Registers the buffers with the kernel.
172    ///
173    /// This method must be called in the context of a `tokio-uring` runtime.
174    /// The registration persists for the lifetime of the runtime, unless
175    /// revoked by the [`unregister`] method. Dropping the
176    /// `FixedBufPool` instance this method has been called on does not revoke
177    /// the registration or deallocate the buffers.
178    ///
179    /// [`unregister`]: Self::unregister
180    ///
181    /// This call can be blocked in the kernel to complete any operations
182    /// in-flight on the same `io-uring` instance. The application is
183    /// recommended to register buffers before starting any I/O operations.
184    ///
185    /// # Errors
186    ///
187    /// If a collection of buffers is currently registered in the context
188    /// of the `tokio-uring` runtime this call is made in, the function returns
189    /// an error.
190    pub fn register(&self) -> io::Result<()> {
191        CONTEXT.with(|x| {
192            x.handle()
193                .as_ref()
194                .expect("Not in a runtime context")
195                .register_buffers(Rc::clone(&self.inner) as _)
196        })
197    }
198
199    /// Unregisters this collection of buffers.
200    ///
201    /// This method must be called in the context of a `tokio-uring` runtime,
202    /// where the buffers should have been previously registered.
203    ///
204    /// This operation invalidates any `FixedBuf` handles checked out from
205    /// this registry instance. Continued use of such handles in I/O
206    /// operations may result in an error.
207    ///
208    /// # Errors
209    ///
210    /// If another collection of buffers is currently registered in the context
211    /// of the `tokio-uring` runtime this call is made in, the function returns
212    /// an error. Calling `unregister` when no `FixedBufPool` is currently
213    /// registered on this runtime also returns an error.
214    pub fn unregister(&self) -> io::Result<()> {
215        CONTEXT.with(|x| {
216            x.handle()
217                .as_ref()
218                .expect("Not in a runtime context")
219                .unregister_buffers(Rc::clone(&self.inner) as _)
220        })
221    }
222
223    /// Returns a buffer of requested capacity from this pool
224    /// that is not currently owned by any other [`FixedBuf`] handle.
225    /// If no such free buffer is available, returns `None`.
226    ///
227    /// The buffer is released to be available again once the
228    /// returned `FixedBuf` handle has been dropped. An I/O operation
229    /// using the buffer takes ownership of it and returns it once completed,
230    /// preventing shared use of the buffer while the operation is in flight.
231    ///
232    /// An application should not rely on any particular order
233    /// in which available buffers are retrieved.
234    pub fn try_next(&self, cap: usize) -> Option<FixedBuf> {
235        let mut inner = self.inner.borrow_mut();
236        inner.try_next(cap).map(|data| {
237            let pool = Rc::clone(&self.inner);
238            // Safety: the validity of buffer data is ensured by
239            // plumbing::Pool::try_next
240            unsafe { FixedBuf::new(pool, data) }
241        })
242    }
243
244    /// Resolves to a buffer of requested capacity
245    /// when it is or becomes available in this pool.
246    /// This may happen when a [`FixedBuf`] handle owning a buffer
247    /// of the same capacity is dropped.
248    ///
249    /// If no matching buffers are available and none are being released,
250    /// this asynchronous function will never resolve. Applications should take
251    /// care to wait on the returned future concurrently with some tasks that
252    /// will complete I/O operations owning the buffers, or back it up with a
253    /// timeout using, for example, `tokio::util::timeout`.
254    pub async fn next(&self, cap: usize) -> FixedBuf {
255        // Fast path: get the buffer if it's already available
256        let notify = {
257            let mut inner = self.inner.borrow_mut();
258            if let Some(data) = inner.try_next(cap) {
259                // Safety: the validity of buffer data is ensured by
260                // plumbing::Pool::try_next
261                let buf = unsafe { FixedBuf::new(Rc::clone(&self.inner) as _, data) };
262                return buf;
263            }
264            inner.notify_on_next(cap)
265        };
266
267        // Poll for a buffer, engaging the `Notify` machinery.
268        self.next_when_notified(cap, notify).await
269    }
270
271    #[cold]
272    async fn next_when_notified(&self, cap: usize, notify: Arc<Notify>) -> FixedBuf {
273        let notified = notify.notified();
274        pin!(notified);
275        loop {
276            // In the single-threaded case, no buffers could get checked in
277            // between us calling `try_next` and here, so we can't miss a wakeup.
278            notified.as_mut().await;
279
280            if let Some(data) = self.inner.borrow_mut().try_next(cap) {
281                // Safety: the validity of buffer data is ensured by
282                // plumbing::Pool::try_next
283                let buf = unsafe { FixedBuf::new(Rc::clone(&self.inner) as _, data) };
284                return buf;
285            }
286
287            // It's possible that the task did not get a buffer from `try_next`.
288            // The `Notify` entries are created once for each requested capacity
289            // and never removed, so this `Notify` could have been holding
290            // a permit from a buffer checked in previously when no tasks were
291            // waiting. Then a task would call `next` on this pool and receive
292            // the buffer without consuming the permit. It's also possible that
293            // a task calls `try_next` directly.
294            // Reset the `Notified` future to wait for another wakeup.
295            notified.set(notify.notified());
296        }
297    }
298}