broker-tokio 0.2.16

tokio for broker
Documentation
use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};

use std::fmt;
#[cfg(not(loom))]
use std::sync::Arc;

/// Builds Tokio Runtime with custom configuration values.
///
/// Methods can be chained in order to set the configuration values. The
/// Runtime is constructed by calling [`build`].
///
/// New instances of `Builder` are obtained via [`Builder::new`].
///
/// See function level documentation for details on the various configuration
/// settings.
///
/// [`build`]: #method.build
/// [`Builder::new`]: #method.new
///
/// # Examples
///
/// ```
/// use tokio::runtime::Builder;
///
/// fn main() {
///     // build runtime
///     let runtime = Builder::new()
///         .threaded_scheduler()
///         .core_threads(4)
///         .thread_name("my-custom-name")
///         .thread_stack_size(3 * 1024 * 1024)
///         .build()
///         .unwrap();
///
///     // use runtime ...
/// }
/// ```
pub struct Builder {
    /// The task execution model to use.
    kind: Kind,

    /// Whether or not to enable the I/O driver
    enable_io: bool,

    /// Whether or not to enable the time driver
    enable_time: bool,

    /// The number of worker threads, used by Runtime.
    ///
    /// Only used when not using the current-thread executor.
    core_threads: usize,

    /// Cap on thread usage.
    max_threads: usize,

    /// Name used for threads spawned by the runtime.
    pub(super) thread_name: String,

    /// Stack size used for threads spawned by the runtime.
    pub(super) thread_stack_size: Option<usize>,

    /// Callback to run after each thread starts.
    pub(super) after_start: Option<Callback>,

    /// To run before each worker thread stops
    pub(super) before_stop: Option<Callback>,
}

#[derive(Debug, Clone, Copy)]
enum Kind {
    Shell,
    #[cfg(feature = "rt-core")]
    Basic,
    #[cfg(feature = "rt-threaded")]
    ThreadPool,
}

impl Builder {
    /// Returns a new runtime builder initialized with default configuration
    /// values.
    ///
    /// Configuration methods can be chained on the return value.
    pub fn new() -> Builder {
        Builder {
            // No task execution by default
            kind: Kind::Shell,

            // I/O defaults to "off"
            enable_io: false,

            // Time defaults to "off"
            enable_time: false,

            // Default to use an equal number of threads to number of CPU cores
            core_threads: crate::loom::sys::num_cpus(),

            max_threads: 512,

            // Default thread name
            thread_name: "tokio-runtime-worker".into(),

            // Do not set a stack size by default
            thread_stack_size: None,

            // No worker thread callbacks
            after_start: None,
            before_stop: None,
        }
    }

    /// Enable both I/O and time drivers.
    ///
    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
    /// individually. If additional components are added to Tokio in the future,
    /// `enable_all` will include these future components.
    ///
    /// # Examples
    ///
    /// ```
    /// use tokio::runtime;
    ///
    /// let rt = runtime::Builder::new()
    ///     .enable_all()
    ///     .build()
    ///     .unwrap();
    /// ```
    pub fn enable_all(&mut self) -> &mut Self {
        #[cfg(feature = "io-driver")]
        self.enable_io();
        #[cfg(feature = "time")]
        self.enable_time();

        self
    }

    #[deprecated(note = "In future will be replaced by core_threads method")]
    /// Set the maximum number of worker threads for the `Runtime`'s thread pool.
    ///
    /// This must be a number between 1 and 32,768 though it is advised to keep
    /// this value on the smaller side.
    ///
    /// The default value is the number of cores available to the system.
    pub fn num_threads(&mut self, val: usize) -> &mut Self {
        self.core_threads = val;
        self
    }

    /// Set the core number of worker threads for the `Runtime`'s thread pool.
    ///
    /// This should be a number between 1 and 32,768 though it is advised to keep
    /// this value on the smaller side.
    ///
    /// The default value is the number of cores available to the system.
    ///
    /// These threads will be always active and running.
    ///
    /// # Examples
    ///
    /// ```
    /// use tokio::runtime;
    ///
    /// let rt = runtime::Builder::new()
    ///     .core_threads(4)
    ///     .build()
    ///     .unwrap();
    /// ```
    pub fn core_threads(&mut self, val: usize) -> &mut Self {
        assert_ne!(val, 0, "Core threads cannot be zero");
        self.core_threads = val;
        self
    }

    /// Specifies limit for threads, spawned by the Runtime.
    ///
    /// This is number of threads to be used by Runtime, including `core_threads`
    /// Having `max_threads` less than `core_threads` results in invalid configuration
    /// when building multi-threaded `Runtime`, which would cause a panic.
    ///
    /// Similarly to the `core_threads`, this number should be between 1 and 32,768.
    ///
    /// The default value is 512.
    ///
    /// When multi-threaded runtime is not used, will act as limit on additional threads.
    ///
    /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for
    /// blocking annotations) as `max_threads - core_threads`.
    pub fn max_threads(&mut self, val: usize) -> &mut Self {
        assert_ne!(val, 0, "Thread limit cannot be zero");
        self.max_threads = val;
        self
    }

    /// Set name of threads spawned by the `Runtime`'s thread pool.
    ///
    /// The default name is "tokio-runtime-worker".
    ///
    /// # Examples
    ///
    /// ```
    /// # use tokio::runtime;
    ///
    /// # pub fn main() {
    /// let rt = runtime::Builder::new()
    ///     .thread_name("my-pool")
    ///     .build();
    /// # }
    /// ```
    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
        self.thread_name = val.into();
        self
    }

    /// Set the stack size (in bytes) for worker threads.
    ///
    /// The actual stack size may be greater than this value if the platform
    /// specifies minimal stack size.
    ///
    /// The default stack size for spawned threads is 2 MiB, though this
    /// particular stack size is subject to change in the future.
    ///
    /// # Examples
    ///
    /// ```
    /// # use tokio::runtime;
    ///
    /// # pub fn main() {
    /// let rt = runtime::Builder::new()
    ///     .thread_stack_size(32 * 1024)
    ///     .build();
    /// # }
    /// ```
    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
        self.thread_stack_size = Some(val);
        self
    }

    /// Execute function `f` after each thread is started but before it starts
    /// doing work.
    ///
    /// This is intended for bookkeeping and monitoring use cases.
    ///
    /// # Examples
    ///
    /// ```
    /// # use tokio::runtime;
    ///
    /// # pub fn main() {
    /// let runtime = runtime::Builder::new()
    ///     .on_thread_start(|| {
    ///         println!("thread started");
    ///     })
    ///     .build();
    /// # }
    /// ```
    #[cfg(not(loom))]
    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
    where
        F: Fn() + Send + Sync + 'static,
    {
        self.after_start = Some(Arc::new(f));
        self
    }

    /// Execute function `f` before each thread stops.
    ///
    /// This is intended for bookkeeping and monitoring use cases.
    ///
    /// # Examples
    ///
    /// ```
    /// # use tokio::runtime;
    ///
    /// # pub fn main() {
    /// let runtime = runtime::Builder::new()
    ///     .on_thread_stop(|| {
    ///         println!("thread stopping");
    ///     })
    ///     .build();
    /// # }
    /// ```
    #[cfg(not(loom))]
    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
    where
        F: Fn() + Send + Sync + 'static,
    {
        self.before_stop = Some(Arc::new(f));
        self
    }

    /// Create the configured `Runtime`.
    ///
    /// The returned `ThreadPool` instance is ready to spawn tasks.
    ///
    /// # Examples
    ///
    /// ```
    /// use tokio::runtime::Builder;
    ///
    /// let mut rt = Builder::new().build().unwrap();
    ///
    /// rt.block_on(async {
    ///     println!("Hello from the Tokio runtime");
    /// });
    /// ```
    pub fn build(&mut self) -> io::Result<Runtime> {
        match self.kind {
            Kind::Shell => self.build_shell_runtime(),
            #[cfg(feature = "rt-core")]
            Kind::Basic => self.build_basic_runtime(),
            #[cfg(feature = "rt-threaded")]
            Kind::ThreadPool => self.build_threaded_runtime(),
        }
    }

    fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
        use crate::runtime::Kind;

        let clock = time::create_clock();

        // Create I/O driver
        let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
        let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());

        let spawner = Spawner::Shell;

        let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
        let blocking_spawner = blocking_pool.spawner().clone();

        Ok(Runtime {
            kind: Kind::Shell(Shell::new(driver)),
            handle: Handle {
                spawner,
                io_handle,
                time_handle,
                clock,
                blocking_spawner,
            },
            blocking_pool,
        })
    }
}

cfg_io_driver! {
    impl Builder {
        /// Enable the I/O driver.
        ///
        /// Doing this enables using net, process, signal, and some I/O types on
        /// the runtime.
        ///
        /// # Examples
        ///
        /// ```
        /// use tokio::runtime;
        ///
        /// let rt = runtime::Builder::new()
        ///     .enable_io()
        ///     .build()
        ///     .unwrap();
        /// ```
        pub fn enable_io(&mut self) -> &mut Self {
            self.enable_io = true;
            self
        }
    }
}

cfg_time! {
    impl Builder {
        /// Enable the time driver.
        ///
        /// Doing this enables using `tokio::time` on the runtime.
        ///
        /// # Examples
        ///
        /// ```
        /// use tokio::runtime;
        ///
        /// let rt = runtime::Builder::new()
        ///     .enable_time()
        ///     .build()
        ///     .unwrap();
        /// ```
        pub fn enable_time(&mut self) -> &mut Self {
            self.enable_time = true;
            self
        }
    }
}

cfg_rt_core! {
    impl Builder {
        /// Use a simpler scheduler that runs all tasks on the current-thread.
        ///
        /// The executor and all necessary drivers will all be run on the current
        /// thread during `block_on` calls.
        pub fn basic_scheduler(&mut self) -> &mut Self {
            self.kind = Kind::Basic;
            self
        }

        fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
            use crate::runtime::{BasicScheduler, Kind};

            let clock = time::create_clock();

            // Create I/O driver
            let (io_driver, io_handle) = io::create_driver(self.enable_io)?;

            let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());

            // And now put a single-threaded scheduler on top of the timer. When
            // there are no futures ready to do something, it'll let the timer or
            // the reactor to generate some new stimuli for the futures to continue
            // in their life.
            let scheduler = BasicScheduler::new(driver);
            let spawner = Spawner::Basic(scheduler.spawner());

            // Blocking pool
            let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
            let blocking_spawner = blocking_pool.spawner().clone();

            Ok(Runtime {
                kind: Kind::Basic(scheduler),
                handle: Handle {
                    spawner,
                    io_handle,
                    time_handle,
                    clock,
                    blocking_spawner,
                },
                blocking_pool,
            })
        }
    }
}

cfg_rt_threaded! {
    impl Builder {
        /// Use a multi-threaded scheduler for executing tasks.
        pub fn threaded_scheduler(&mut self) -> &mut Self {
            self.kind = Kind::ThreadPool;
            self
        }

        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
            use crate::runtime::{Kind, ThreadPool};
            use crate::runtime::park::Parker;

            assert!(self.core_threads <= self.max_threads, "Core threads number cannot be above max limit");

            let clock = time::create_clock();

            let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
            let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
            let (scheduler, workers) = ThreadPool::new(self.core_threads, Parker::new(driver));
            let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

            // Create the blocking pool
            let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
            let blocking_spawner = blocking_pool.spawner().clone();

            // Create the runtime handle
            let handle = Handle {
                spawner,
                io_handle,
                time_handle,
                clock,
                blocking_spawner,
            };

            // Spawn the thread pool workers
            workers.spawn(&handle);

            Ok(Runtime {
                kind: Kind::ThreadPool(scheduler),
                handle,
                blocking_pool,
            })
        }
    }
}

impl Default for Builder {
    fn default() -> Self {
        Self::new()
    }
}

impl fmt::Debug for Builder {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt.debug_struct("Builder")
            .field("kind", &self.kind)
            .field("core_threads", &self.core_threads)
            .field("max_threads", &self.max_threads)
            .field("thread_name", &self.thread_name)
            .field("thread_stack_size", &self.thread_stack_size)
            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
            .field("before_stop", &self.after_start.as_ref().map(|_| "..."))
            .finish()
    }
}