monoio/
builder.rs

1use std::{io, marker::PhantomData};
2
3#[cfg(all(target_os = "linux", feature = "iouring"))]
4use crate::driver::IoUringDriver;
5#[cfg(feature = "legacy")]
6use crate::driver::LegacyDriver;
7#[cfg(any(feature = "legacy", feature = "iouring"))]
8use crate::utils::thread_id::gen_id;
9use crate::{
10    driver::Driver,
11    time::{driver::TimeDriver, Clock},
12    Runtime,
13};
14
15// ===== basic builder structure definition =====
16
17/// Runtime builder
18pub struct RuntimeBuilder<D> {
19    // iouring entries
20    entries: Option<u32>,
21
22    #[cfg(all(target_os = "linux", feature = "iouring"))]
23    urb: io_uring::Builder,
24
25    // blocking handle
26    #[cfg(feature = "sync")]
27    blocking_handle: crate::blocking::BlockingHandle,
28    // driver mark
29    _mark: PhantomData<D>,
30}
31
32scoped_thread_local!(pub(crate) static BUILD_THREAD_ID: usize);
33
34impl<T> Default for RuntimeBuilder<T> {
35    /// Create a default runtime builder
36    #[must_use]
37    fn default() -> Self {
38        RuntimeBuilder::<T>::new()
39    }
40}
41
42impl<T> RuntimeBuilder<T> {
43    /// Create a default runtime builder
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            entries: None,
48
49            #[cfg(all(target_os = "linux", feature = "iouring"))]
50            urb: io_uring::IoUring::builder(),
51
52            #[cfg(feature = "sync")]
53            blocking_handle: crate::blocking::BlockingStrategy::Panic.into(),
54            _mark: PhantomData,
55        }
56    }
57}
58
59// ===== buildable trait and forward methods =====
60
61/// Buildable trait.
62pub trait Buildable: Sized {
63    /// Build the runtime.
64    fn build(this: RuntimeBuilder<Self>) -> io::Result<Runtime<Self>>;
65}
66
67#[allow(unused)]
68macro_rules! direct_build {
69    ($ty: ty) => {
70        impl RuntimeBuilder<$ty> {
71            /// Build the runtime.
72            pub fn build(self) -> io::Result<Runtime<$ty>> {
73                Buildable::build(self)
74            }
75        }
76    };
77}
78
79#[cfg(all(target_os = "linux", feature = "iouring"))]
80direct_build!(IoUringDriver);
81#[cfg(all(target_os = "linux", feature = "iouring"))]
82direct_build!(TimeDriver<IoUringDriver>);
83#[cfg(feature = "legacy")]
84direct_build!(LegacyDriver);
85#[cfg(feature = "legacy")]
86direct_build!(TimeDriver<LegacyDriver>);
87
88// ===== builder impl =====
89
90#[cfg(feature = "legacy")]
91impl Buildable for LegacyDriver {
92    fn build(this: RuntimeBuilder<Self>) -> io::Result<Runtime<LegacyDriver>> {
93        let thread_id = gen_id();
94        #[cfg(feature = "sync")]
95        let blocking_handle = this.blocking_handle;
96
97        BUILD_THREAD_ID.set(&thread_id, || {
98            let driver = match this.entries {
99                Some(entries) => LegacyDriver::new_with_entries(entries)?,
100                None => LegacyDriver::new()?,
101            };
102            #[cfg(feature = "sync")]
103            let context = crate::runtime::Context::new(blocking_handle);
104            #[cfg(not(feature = "sync"))]
105            let context = crate::runtime::Context::new();
106            Ok(Runtime::new(context, driver))
107        })
108    }
109}
110
111#[cfg(all(target_os = "linux", feature = "iouring"))]
112impl Buildable for IoUringDriver {
113    fn build(this: RuntimeBuilder<Self>) -> io::Result<Runtime<IoUringDriver>> {
114        let thread_id = gen_id();
115        #[cfg(feature = "sync")]
116        let blocking_handle = this.blocking_handle;
117
118        BUILD_THREAD_ID.set(&thread_id, || {
119            let driver = match this.entries {
120                Some(entries) => IoUringDriver::new_with_entries(&this.urb, entries)?,
121                None => IoUringDriver::new(&this.urb)?,
122            };
123            #[cfg(feature = "sync")]
124            let context = crate::runtime::Context::new(blocking_handle);
125            #[cfg(not(feature = "sync"))]
126            let context = crate::runtime::Context::new();
127            Ok(Runtime::new(context, driver))
128        })
129    }
130}
131
132impl<D> RuntimeBuilder<D> {
133    const MIN_ENTRIES: u32 = 256;
134
135    /// Set io_uring entries, min size is 256 and the default size is 1024.
136    #[must_use]
137    pub fn with_entries(mut self, entries: u32) -> Self {
138        // If entries is less than 256, it will be 256.
139        if entries < Self::MIN_ENTRIES {
140            self.entries = Some(Self::MIN_ENTRIES);
141            return self;
142        }
143        self.entries = Some(entries);
144        self
145    }
146
147    /// Replaces the default [`io_uring::Builder`], which controls the settings for the
148    /// inner `io_uring` API.
149    ///
150    /// Refer to the [`io_uring::Builder`] documentation for all the supported methods.
151
152    #[cfg(all(target_os = "linux", feature = "iouring"))]
153    #[must_use]
154    pub fn uring_builder(mut self, urb: io_uring::Builder) -> Self {
155        self.urb = urb;
156        self
157    }
158}
159
160// ===== FusionDriver =====
161
162/// Fake driver only for conditionally building.
163#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))]
164pub struct FusionDriver;
165
166#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))]
167impl RuntimeBuilder<FusionDriver> {
168    /// Build the runtime.
169    #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))]
170    pub fn build(self) -> io::Result<crate::FusionRuntime<IoUringDriver, LegacyDriver>> {
171        if crate::utils::detect_uring() {
172            let builder = RuntimeBuilder::<IoUringDriver> {
173                entries: self.entries,
174                urb: self.urb,
175                #[cfg(feature = "sync")]
176                blocking_handle: self.blocking_handle,
177                _mark: PhantomData,
178            };
179            info!("io_uring driver built");
180            Ok(builder.build()?.into())
181        } else {
182            let builder = RuntimeBuilder::<LegacyDriver> {
183                entries: self.entries,
184                urb: self.urb,
185                #[cfg(feature = "sync")]
186                blocking_handle: self.blocking_handle,
187                _mark: PhantomData,
188            };
189            info!("legacy driver built");
190            Ok(builder.build()?.into())
191        }
192    }
193
194    /// Build the runtime.
195    #[cfg(not(all(target_os = "linux", feature = "iouring")))]
196    pub fn build(self) -> io::Result<crate::FusionRuntime<LegacyDriver>> {
197        let builder = RuntimeBuilder::<LegacyDriver> {
198            entries: self.entries,
199            #[cfg(feature = "sync")]
200            blocking_handle: self.blocking_handle,
201            _mark: PhantomData,
202        };
203        Ok(builder.build()?.into())
204    }
205
206    /// Build the runtime.
207    #[cfg(all(target_os = "linux", feature = "iouring", not(feature = "legacy")))]
208    pub fn build(self) -> io::Result<crate::FusionRuntime<IoUringDriver>> {
209        let builder = RuntimeBuilder::<IoUringDriver> {
210            entries: self.entries,
211            urb: self.urb,
212            #[cfg(feature = "sync")]
213            blocking_handle: self.blocking_handle,
214            _mark: PhantomData,
215        };
216        Ok(builder.build()?.into())
217    }
218}
219
220#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))]
221impl RuntimeBuilder<TimeDriver<FusionDriver>> {
222    /// Build the runtime.
223    #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))]
224    pub fn build(
225        self,
226    ) -> io::Result<crate::FusionRuntime<TimeDriver<IoUringDriver>, TimeDriver<LegacyDriver>>> {
227        if crate::utils::detect_uring() {
228            let builder = RuntimeBuilder::<TimeDriver<IoUringDriver>> {
229                entries: self.entries,
230                urb: self.urb,
231                #[cfg(feature = "sync")]
232                blocking_handle: self.blocking_handle,
233                _mark: PhantomData,
234            };
235            info!("io_uring driver with timer built");
236            Ok(builder.build()?.into())
237        } else {
238            let builder = RuntimeBuilder::<TimeDriver<LegacyDriver>> {
239                entries: self.entries,
240                urb: self.urb,
241                #[cfg(feature = "sync")]
242                blocking_handle: self.blocking_handle,
243                _mark: PhantomData,
244            };
245            info!("legacy driver with timer built");
246            Ok(builder.build()?.into())
247        }
248    }
249
250    /// Build the runtime.
251    #[cfg(not(all(target_os = "linux", feature = "iouring")))]
252    pub fn build(self) -> io::Result<crate::FusionRuntime<TimeDriver<LegacyDriver>>> {
253        let builder = RuntimeBuilder::<TimeDriver<LegacyDriver>> {
254            entries: self.entries,
255            #[cfg(feature = "sync")]
256            blocking_handle: self.blocking_handle,
257            _mark: PhantomData,
258        };
259        Ok(builder.build()?.into())
260    }
261
262    /// Build the runtime.
263    #[cfg(all(target_os = "linux", feature = "iouring", not(feature = "legacy")))]
264    pub fn build(self) -> io::Result<crate::FusionRuntime<TimeDriver<IoUringDriver>>> {
265        let builder = RuntimeBuilder::<TimeDriver<IoUringDriver>> {
266            entries: self.entries,
267            urb: self.urb,
268            #[cfg(feature = "sync")]
269            blocking_handle: self.blocking_handle,
270            _mark: PhantomData,
271        };
272        Ok(builder.build()?.into())
273    }
274}
275
276// ===== enable_timer related =====
277mod time_wrap {
278    pub trait TimeWrapable {}
279}
280
281#[cfg(all(target_os = "linux", feature = "iouring"))]
282impl time_wrap::TimeWrapable for IoUringDriver {}
283#[cfg(feature = "legacy")]
284impl time_wrap::TimeWrapable for LegacyDriver {}
285#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))]
286impl time_wrap::TimeWrapable for FusionDriver {}
287
288impl<D: Driver> Buildable for TimeDriver<D>
289where
290    D: Buildable,
291{
292    /// Build the runtime
293    fn build(this: RuntimeBuilder<Self>) -> io::Result<Runtime<TimeDriver<D>>> {
294        let Runtime {
295            driver,
296            mut context,
297        } = Buildable::build(RuntimeBuilder::<D> {
298            entries: this.entries,
299            #[cfg(all(target_os = "linux", feature = "iouring"))]
300            urb: this.urb,
301            #[cfg(feature = "sync")]
302            blocking_handle: this.blocking_handle,
303            _mark: PhantomData,
304        })?;
305
306        let timer_driver = TimeDriver::new(driver, Clock::new());
307        context.time_handle = Some(timer_driver.handle.clone());
308        Ok(Runtime {
309            driver: timer_driver,
310            context,
311        })
312    }
313}
314
315impl<D: time_wrap::TimeWrapable> RuntimeBuilder<D> {
316    /// Enable all(currently only timer)
317    #[must_use]
318    pub fn enable_all(self) -> RuntimeBuilder<TimeDriver<D>> {
319        self.enable_timer()
320    }
321
322    /// Enable timer
323    #[must_use]
324    pub fn enable_timer(self) -> RuntimeBuilder<TimeDriver<D>> {
325        let Self {
326            entries,
327            #[cfg(all(target_os = "linux", feature = "iouring"))]
328            urb,
329            #[cfg(feature = "sync")]
330            blocking_handle,
331            ..
332        } = self;
333        RuntimeBuilder {
334            entries,
335            #[cfg(all(target_os = "linux", feature = "iouring"))]
336            urb,
337            #[cfg(feature = "sync")]
338            blocking_handle,
339            _mark: PhantomData,
340        }
341    }
342}
343
344impl<D> RuntimeBuilder<D> {
345    /// Attach thread pool, this will overwrite blocking strategy.
346    /// All `spawn_blocking` will be executed on given thread pool.
347    #[cfg(feature = "sync")]
348    #[must_use]
349    pub fn attach_thread_pool(
350        mut self,
351        tp: Box<dyn crate::blocking::ThreadPool + Send + 'static>,
352    ) -> Self {
353        self.blocking_handle = crate::blocking::BlockingHandle::Attached(tp);
354        self
355    }
356
357    /// Set blocking strategy, this will overwrite thread pool setting.
358    /// If `BlockingStrategy::Panic` is used, it will panic if `spawn_blocking` on this thread.
359    /// If `BlockingStrategy::ExecuteLocal` is used, it will execute with current thread, and may
360    /// cause tasks high latency.
361    /// Attaching a thread pool is recommended if `spawn_blocking` will be used.
362    #[cfg(feature = "sync")]
363    #[must_use]
364    pub fn with_blocking_strategy(mut self, strategy: crate::blocking::BlockingStrategy) -> Self {
365        self.blocking_handle = crate::blocking::BlockingHandle::Empty(strategy);
366        self
367    }
368}