tokio_uring/buf/fixed/pool.rs
1//! A dynamic collection of I/O buffers pre-registered with the kernel.
2//!
3//! This module provides [`FixedBufPool`], a collection that implements
4//! dynamic management of sets of interchangeable memory buffers
5//! registered with the kernel for `io-uring` operations. Asynchronous
6//! rotation of the buffers shared by multiple tasks is also supported
7//! by [`FixedBufPool`].
8//!
9//! [`FixedBufPool`]: self::FixedBufPool
10
11use super::plumbing;
12use super::FixedBuf;
13use crate::buf::IoBufMut;
14use crate::runtime::CONTEXT;
15
16use tokio::pin;
17use tokio::sync::Notify;
18
19use std::cell::RefCell;
20use std::io;
21use std::rc::Rc;
22use std::sync::Arc;
23
24/// A dynamic collection of I/O buffers pre-registered with the kernel.
25///
26/// `FixedBufPool` allows the application to manage a collection of buffers
27/// allocated in memory, that can be registered in the current `tokio-uring`
28/// context using the [`register`] method. Unlike [`FixedBufRegistry`],
29/// individual buffers are not retrieved by index; instead, an available
30/// buffer matching a specified capacity can be retrieved with the [`try_next`]
31/// method. In asynchronous contexts, the [`next`] method can be used to wait
32/// until such a buffer becomes available.
33/// This allows some flexibility in managing sets of buffers with
34/// different capacity tiers. The need to maintain lists of free buffers,
35/// however, imposes additional runtime overhead.
36///
37/// A `FixedBufPool` value is a lightweight handle for a collection of
38/// allocated buffers. Cloning of a `FixedBufPool` creates a new reference to
39/// the same collection of buffers.
40///
41/// The buffers of the collection are not deallocated until:
42/// - all `FixedBufPool` references to the collection have been dropped;
43/// - all [`FixedBuf`] handles to individual buffers in the collection have
44/// been dropped, including the buffer handles owned by any I/O operations
45/// in flight;
46/// - The `tokio-uring` [`Runtime`] the buffers are registered with
47/// has been dropped.
48///
49/// [`register`]: Self::register
50/// [`try_next`]: Self::try_next
51/// [`next`]: Self::next
52/// [`FixedBufRegistry`]: super::FixedBufRegistry
53/// [`Runtime`]: crate::Runtime
54/// [`FixedBuf`]: super::FixedBuf
55///
56/// # Examples
57///
58/// ```
59/// use tokio_uring::buf::fixed::FixedBufPool;
60/// use tokio_uring::buf::IoBuf;
61/// use std::iter;
62/// use std::mem;
63///
64/// # #[allow(non_snake_case)]
65/// # fn main() -> Result<(), std::io::Error> {
66/// # use nix::sys::resource::{getrlimit, Resource};
67/// # let (memlock_limit, _) = getrlimit(Resource::RLIMIT_MEMLOCK)?;
68/// # let BUF_SIZE_LARGE = memlock_limit as usize / 8;
69/// # let BUF_SIZE_SMALL = memlock_limit as usize / 16;
70/// tokio_uring::start(async {
71/// let pool = FixedBufPool::new(
72/// iter::once(Vec::with_capacity(BUF_SIZE_LARGE))
73/// .chain(iter::repeat_with(|| Vec::with_capacity(BUF_SIZE_SMALL)).take(2))
74/// );
75///
76/// pool.register()?;
77///
78/// let buf = pool.try_next(BUF_SIZE_LARGE).unwrap();
79/// assert_eq!(buf.bytes_total(), BUF_SIZE_LARGE);
80/// let next = pool.try_next(BUF_SIZE_LARGE);
81/// assert!(next.is_none());
82/// let buf1 = pool.try_next(BUF_SIZE_SMALL).unwrap();
83/// assert_eq!(buf1.bytes_total(), BUF_SIZE_SMALL);
84/// let buf2 = pool.try_next(BUF_SIZE_SMALL).unwrap();
85/// assert_eq!(buf2.bytes_total(), BUF_SIZE_SMALL);
86/// let next = pool.try_next(BUF_SIZE_SMALL);
87/// assert!(next.is_none());
88/// mem::drop(buf);
89/// let buf = pool.try_next(BUF_SIZE_LARGE).unwrap();
90/// assert_eq!(buf.bytes_total(), BUF_SIZE_LARGE);
91///
92/// Ok(())
93/// })
94/// # }
95/// ```
96#[derive(Clone)]
97pub struct FixedBufPool<T: IoBufMut> {
98 inner: Rc<RefCell<plumbing::Pool<T>>>,
99}
100
101impl<T: IoBufMut> FixedBufPool<T> {
102 /// Creates a new collection of buffers from the provided allocated vectors.
103 ///
104 /// The buffers are assigned 0-based indices in the order of the iterable
105 /// input parameter. The returned collection takes up to [`UIO_MAXIOV`]
106 /// buffers from the input. Any items in excess of that amount are silently
107 /// dropped, unless the input iterator produces the vectors lazily.
108 ///
109 /// [`UIO_MAXIOV`]: libc::UIO_MAXIOV
110 ///
111 /// # Examples
112 ///
113 /// When providing uninitialized vectors for the collection, take care to
114 /// not replicate a vector with `.clone()` as that does not preserve the
115 /// capacity and the resulting buffer pointer will be rejected by the kernel.
116 /// This means that the following use of [`iter::repeat`] would not work:
117 ///
118 /// [`iter::repeat`]: std::iter::repeat
119 ///
120 /// ```should_panic
121 /// use tokio_uring::buf::fixed::FixedBufPool;
122 /// use std::iter;
123 ///
124 /// # #[allow(non_snake_case)]
125 /// # fn main() -> Result<(), std::io::Error> {
126 /// # use nix::sys::resource::{getrlimit, Resource};
127 /// # let (memlock_limit, _) = getrlimit(Resource::RLIMIT_MEMLOCK)?;
128 /// # let NUM_BUFFERS = std::cmp::max(memlock_limit as usize / 4096 / 8, 1);
129 /// # let BUF_SIZE = 4096;
130 /// let pool = FixedBufPool::new(
131 /// iter::repeat(Vec::with_capacity(BUF_SIZE)).take(NUM_BUFFERS)
132 /// );
133 ///
134 /// tokio_uring::start(async {
135 /// pool.register()?;
136 /// // ...
137 /// Ok(())
138 /// })
139 /// # }
140 /// ```
141 ///
142 /// Instead, create the vectors with requested capacity directly:
143 ///
144 /// ```
145 /// use tokio_uring::buf::fixed::FixedBufPool;
146 /// use std::iter;
147 ///
148 /// # #[allow(non_snake_case)]
149 /// # fn main() -> Result<(), std::io::Error> {
150 /// # use nix::sys::resource::{getrlimit, Resource};
151 /// # let (memlock_limit, _) = getrlimit(Resource::RLIMIT_MEMLOCK)?;
152 /// # let NUM_BUFFERS = std::cmp::max(memlock_limit as usize / 4096 / 8, 1);
153 /// # let BUF_SIZE = 4096;
154 /// let pool = FixedBufPool::new(
155 /// iter::repeat_with(|| Vec::with_capacity(BUF_SIZE)).take(NUM_BUFFERS)
156 /// );
157 ///
158 /// tokio_uring::start(async {
159 /// pool.register()?;
160 /// // ...
161 /// Ok(())
162 /// })
163 /// # }
164 /// ```
165 pub fn new(bufs: impl IntoIterator<Item = T>) -> Self {
166 FixedBufPool {
167 inner: Rc::new(RefCell::new(plumbing::Pool::new(bufs.into_iter()))),
168 }
169 }
170
171 /// Registers the buffers with the kernel.
172 ///
173 /// This method must be called in the context of a `tokio-uring` runtime.
174 /// The registration persists for the lifetime of the runtime, unless
175 /// revoked by the [`unregister`] method. Dropping the
176 /// `FixedBufPool` instance this method has been called on does not revoke
177 /// the registration or deallocate the buffers.
178 ///
179 /// [`unregister`]: Self::unregister
180 ///
181 /// This call can be blocked in the kernel to complete any operations
182 /// in-flight on the same `io-uring` instance. The application is
183 /// recommended to register buffers before starting any I/O operations.
184 ///
185 /// # Errors
186 ///
187 /// If a collection of buffers is currently registered in the context
188 /// of the `tokio-uring` runtime this call is made in, the function returns
189 /// an error.
190 pub fn register(&self) -> io::Result<()> {
191 CONTEXT.with(|x| {
192 x.handle()
193 .as_ref()
194 .expect("Not in a runtime context")
195 .register_buffers(Rc::clone(&self.inner) as _)
196 })
197 }
198
199 /// Unregisters this collection of buffers.
200 ///
201 /// This method must be called in the context of a `tokio-uring` runtime,
202 /// where the buffers should have been previously registered.
203 ///
204 /// This operation invalidates any `FixedBuf` handles checked out from
205 /// this registry instance. Continued use of such handles in I/O
206 /// operations may result in an error.
207 ///
208 /// # Errors
209 ///
210 /// If another collection of buffers is currently registered in the context
211 /// of the `tokio-uring` runtime this call is made in, the function returns
212 /// an error. Calling `unregister` when no `FixedBufPool` is currently
213 /// registered on this runtime also returns an error.
214 pub fn unregister(&self) -> io::Result<()> {
215 CONTEXT.with(|x| {
216 x.handle()
217 .as_ref()
218 .expect("Not in a runtime context")
219 .unregister_buffers(Rc::clone(&self.inner) as _)
220 })
221 }
222
223 /// Returns a buffer of requested capacity from this pool
224 /// that is not currently owned by any other [`FixedBuf`] handle.
225 /// If no such free buffer is available, returns `None`.
226 ///
227 /// The buffer is released to be available again once the
228 /// returned `FixedBuf` handle has been dropped. An I/O operation
229 /// using the buffer takes ownership of it and returns it once completed,
230 /// preventing shared use of the buffer while the operation is in flight.
231 ///
232 /// An application should not rely on any particular order
233 /// in which available buffers are retrieved.
234 pub fn try_next(&self, cap: usize) -> Option<FixedBuf> {
235 let mut inner = self.inner.borrow_mut();
236 inner.try_next(cap).map(|data| {
237 let pool = Rc::clone(&self.inner);
238 // Safety: the validity of buffer data is ensured by
239 // plumbing::Pool::try_next
240 unsafe { FixedBuf::new(pool, data) }
241 })
242 }
243
244 /// Resolves to a buffer of requested capacity
245 /// when it is or becomes available in this pool.
246 /// This may happen when a [`FixedBuf`] handle owning a buffer
247 /// of the same capacity is dropped.
248 ///
249 /// If no matching buffers are available and none are being released,
250 /// this asynchronous function will never resolve. Applications should take
251 /// care to wait on the returned future concurrently with some tasks that
252 /// will complete I/O operations owning the buffers, or back it up with a
253 /// timeout using, for example, `tokio::util::timeout`.
254 pub async fn next(&self, cap: usize) -> FixedBuf {
255 // Fast path: get the buffer if it's already available
256 let notify = {
257 let mut inner = self.inner.borrow_mut();
258 if let Some(data) = inner.try_next(cap) {
259 // Safety: the validity of buffer data is ensured by
260 // plumbing::Pool::try_next
261 let buf = unsafe { FixedBuf::new(Rc::clone(&self.inner) as _, data) };
262 return buf;
263 }
264 inner.notify_on_next(cap)
265 };
266
267 // Poll for a buffer, engaging the `Notify` machinery.
268 self.next_when_notified(cap, notify).await
269 }
270
271 #[cold]
272 async fn next_when_notified(&self, cap: usize, notify: Arc<Notify>) -> FixedBuf {
273 let notified = notify.notified();
274 pin!(notified);
275 loop {
276 // In the single-threaded case, no buffers could get checked in
277 // between us calling `try_next` and here, so we can't miss a wakeup.
278 notified.as_mut().await;
279
280 if let Some(data) = self.inner.borrow_mut().try_next(cap) {
281 // Safety: the validity of buffer data is ensured by
282 // plumbing::Pool::try_next
283 let buf = unsafe { FixedBuf::new(Rc::clone(&self.inner) as _, data) };
284 return buf;
285 }
286
287 // It's possible that the task did not get a buffer from `try_next`.
288 // The `Notify` entries are created once for each requested capacity
289 // and never removed, so this `Notify` could have been holding
290 // a permit from a buffer checked in previously when no tasks were
291 // waiting. Then a task would call `next` on this pool and receive
292 // the buffer without consuming the permit. It's also possible that
293 // a task calls `try_next` directly.
294 // Reset the `Notified` future to wait for another wakeup.
295 notified.set(notify.notified());
296 }
297 }
298}