1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
//! Utilities for tokio runtime.

use ckb_logger::debug;
use ckb_spawn::Spawn;
use ckb_stop_handler::{SignalSender, StopHandler};
use core::future::Future;
use std::{
    sync::atomic::{AtomicU32, Ordering},
    thread,
};
use tokio::runtime::Builder;
use tokio::runtime::Handle as TokioHandle;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

pub use tokio;

// Handle is a newtype wrap and unwrap tokio::Handle, it is workaround with Rust Orphan Rules.
// We need `Handle` impl ckb spawn trait decouple tokio dependence

/// Handle to the runtime.
#[derive(Debug, Clone)]
pub struct Handle {
    pub(crate) inner: TokioHandle,
}

impl Handle {
    /// Enter the runtime context. This allows you to construct types that must
    /// have an executor available on creation such as [`tokio::time::Sleep`] or [`tokio::net::TcpStream`].
    /// It will also allow you to call methods such as [`tokio::spawn`].
    pub fn enter<F, R>(&self, f: F) -> R
    where
        F: FnOnce() -> R,
    {
        let _enter = self.inner.enter();
        f()
    }

    /// Spawns a future onto the runtime.
    ///
    /// This spawns the given future onto the runtime's executor
    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
    where
        F: Future + Send + 'static,
        F::Output: Send + 'static,
    {
        self.inner.spawn(future)
    }

    /// Run a future to completion on the Tokio runtime from a synchronous context.
    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
        self.inner.block_on(future)
    }

    /// Spawns a future onto the runtime blocking pool.
    ///
    /// This spawns the given future onto the runtime's blocking executor
    pub fn spawn_blocking<F, R>(&self, f: F) -> JoinHandle<R>
    where
        F: FnOnce() -> R + Send + 'static,
        R: Send + 'static,
    {
        self.inner.spawn_blocking(f)
    }
}

/// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle
pub fn new_global_runtime() -> (Handle, StopHandler<()>) {
    let runtime = Builder::new_multi_thread()
        .enable_all()
        .thread_name("GlobalRt")
        .thread_name_fn(|| {
            static ATOMIC_ID: AtomicU32 = AtomicU32::new(0);
            let id = ATOMIC_ID
                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| {
                    // A long thread name will cut to 15 characters in debug tools.
                    // Such as "top", "htop", "gdb" and so on.
                    // It's a kernel limit.
                    //
                    // So if we want to see the whole name in debug tools,
                    // this number should have 6 digits at most,
                    // since the prefix uses 9 characters in below code.
                    //
                    // There still has a issue:
                    // When id wraps around, we couldn't know whether the old id
                    // is released or not.
                    // But we can ignore this, because it's almost impossible.
                    if n >= 999_999 {
                        Some(0)
                    } else {
                        Some(n + 1)
                    }
                })
                .expect("impossible since the above closure must return Some(number)");
            format!("GlobalRt-{}", id)
        })
        .build()
        .expect("ckb runtime initialized");

    let handle = runtime.handle().clone();

    let (tx, rx) = oneshot::channel();
    let thread = thread::Builder::new()
        .name("GlobalRtBuilder".to_string())
        .spawn(move || {
            let ret = runtime.block_on(rx);
            debug!("global runtime finish {:?}", ret);
        })
        .expect("tokio runtime started");

    (
        Handle { inner: handle },
        StopHandler::new(SignalSender::Tokio(tx), Some(thread), "GT".to_string()),
    )
}

impl Spawn for Handle {
    fn spawn_task<F>(&self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        self.spawn(future);
    }
}