grpcio 0.12.0

The rust language implementation of gRPC, base on the gRPC c core library.
Documentation
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::borrow::Cow;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use std::{cmp, i32, ptr};

use crate::{
    grpc_sys::{self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args},
    Deadline,
};
use libc::{self, c_char, c_int};

use crate::call::{Call, Method};
use crate::cq::CompletionQueue;
use crate::env::Environment;
use crate::error::Result;
use crate::task::CallTag;
use crate::task::Kicker;
use crate::ResourceQuota;
use crate::{CallOption, ChannelCredentials};

pub use crate::grpc_sys::{
    grpc_compression_algorithm as CompressionAlgorithms,
    grpc_compression_level as CompressionLevel, grpc_connectivity_state as ConnectivityState,
};

/// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents
fn format_user_agent_string(agent: &str) -> CString {
    let version = env!("CARGO_PKG_VERSION");
    let trimed_agent = agent.trim();
    let val = if trimed_agent.is_empty() {
        format!("grpc-rust/{}", version)
    } else {
        format!("{} grpc-rust/{}", trimed_agent, version)
    };
    CString::new(val).unwrap()
}

fn dur_to_ms(dur: Duration) -> i32 {
    let millis = dur.as_secs() * 1000 + dur.subsec_nanos() as u64 / 1_000_000;
    cmp::min(i32::MAX as u64, millis) as i32
}

enum Options {
    Integer(i32),
    String(CString),
    Pointer(ResourceQuota, *const grpc_arg_pointer_vtable),
}

/// The optimization target for a [`Channel`].
#[derive(Clone, Copy)]
pub enum OptTarget {
    /// Minimize latency at the cost of throughput.
    Latency,
    /// Balance latency and throughput.
    Blend,
    /// Maximize throughput at the expense of latency.
    Throughput,
}

#[derive(Clone, Copy)]
pub enum LbPolicy {
    PickFirst,
    RoundRobin,
}

/// [`Channel`] factory in order to configure the properties.
pub struct ChannelBuilder {
    env: Arc<Environment>,
    options: HashMap<Cow<'static, [u8]>, Options>,
    credentials: Option<ChannelCredentials>,
}

impl ChannelBuilder {
    /// Initialize a new [`ChannelBuilder`].
    pub fn new(env: Arc<Environment>) -> ChannelBuilder {
        ChannelBuilder {
            env,
            options: HashMap::new(),
            credentials: None,
        }
    }

    /// Set default authority to pass if none specified on call construction.
    pub fn default_authority<S: Into<Vec<u8>>>(mut self, authority: S) -> ChannelBuilder {
        let authority = CString::new(authority).unwrap();
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_DEFAULT_AUTHORITY),
            Options::String(authority),
        );
        self
    }

    /// Set resource quota by consuming a ResourceQuota
    pub fn set_resource_quota(mut self, quota: ResourceQuota) -> ChannelBuilder {
        unsafe {
            self.options.insert(
                Cow::Borrowed(grpcio_sys::GRPC_ARG_RESOURCE_QUOTA),
                Options::Pointer(quota, grpc_sys::grpc_resource_quota_arg_vtable()),
            );
        }
        self
    }

    /// Set maximum number of concurrent incoming streams to allow on a HTTP/2 connection.
    pub fn max_concurrent_stream(mut self, num: i32) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_CONCURRENT_STREAMS),
            Options::Integer(num),
        );
        self
    }

    /// Set maximum message length that the channel can receive. `-1` means unlimited.
    pub fn max_receive_message_len(mut self, len: i32) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH),
            Options::Integer(len),
        );
        self
    }

    /// Set maximum message length that the channel can send. `-1` means unlimited.
    pub fn max_send_message_len(mut self, len: i32) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_SEND_MESSAGE_LENGTH),
            Options::Integer(len),
        );
        self
    }

    /// Set maximum time between subsequent connection attempts.
    pub fn max_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECONNECT_BACKOFF_MS),
            Options::Integer(dur_to_ms(backoff)),
        );
        self
    }

    /// Set time between the first and second connection attempts.
    pub fn initial_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS),
            Options::Integer(dur_to_ms(backoff)),
        );
        self
    }

    /// Set initial sequence number for HTTP/2 transports.
    pub fn https_initial_seq_number(mut self, number: i32) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER),
            Options::Integer(number),
        );
        self
    }

    /// Set amount to read ahead on individual streams. Defaults to 64KB. Larger
    /// values help throughput on high-latency connections.
    pub fn stream_initial_window_size(mut self, window_size: i32) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES),
            Options::Integer(window_size),
        );
        self
    }

    /// Set primary user agent, which goes at the start of the user-agent metadata sent on
    /// each request.
    pub fn primary_user_agent(mut self, agent: &str) -> ChannelBuilder {
        let agent_string = format_user_agent_string(agent);
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_PRIMARY_USER_AGENT_STRING),
            Options::String(agent_string),
        );
        self
    }

    /// Set whether to allow the use of `SO_REUSEPORT` if available. Defaults to `true`.
    pub fn reuse_port(mut self, reuse: bool) -> ChannelBuilder {
        let opt = if reuse { 1 } else { 0 };
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_ALLOW_REUSEPORT),
            Options::Integer(opt),
        );
        self
    }

    /// Set the size of slice to try and read from the wire each time.
    pub fn tcp_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_READ_CHUNK_SIZE),
            Options::Integer(bytes),
        );
        self
    }

    /// Set the minimum size of slice to try and read from the wire each time.
    pub fn tcp_min_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE),
            Options::Integer(bytes),
        );
        self
    }

    /// Set the maximum size of slice to try and read from the wire each time.
    pub fn tcp_max_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE),
            Options::Integer(bytes),
        );
        self
    }

    /// How much data are we willing to queue up per stream if
    /// write_buffer_hint is set. This is an upper bound.
    pub fn http2_write_buffer_size(mut self, size: i32) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE),
            Options::Integer(size),
        );
        self
    }

    /// How big a frame are we willing to receive via HTTP/2.
    /// Min 16384, max 16777215.
    /// Larger values give lower CPU usage for large messages, but more head of line
    /// blocking for small messages.
    pub fn http2_max_frame_size(mut self, size: i32) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_FRAME_SIZE),
            Options::Integer(size),
        );
        self
    }

    /// Set whether to enable BDP probing.
    pub fn http2_bdp_probe(mut self, enable: bool) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_BDP_PROBE),
            Options::Integer(enable as i32),
        );
        self
    }

    /// Minimum time between sending successive ping frames without receiving any
    /// data frame.
    pub fn http2_min_sent_ping_interval_without_data(
        mut self,
        interval: Duration,
    ) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS),
            Options::Integer(dur_to_ms(interval)),
        );
        self
    }

    /// Minimum allowed time between receiving successive ping frames without
    /// sending any data frame.
    pub fn http2_min_recv_ping_interval_without_data(
        mut self,
        interval: Duration,
    ) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS),
            Options::Integer(dur_to_ms(interval)),
        );
        self
    }

    /// How many pings can we send before needing to send a data frame or header
    /// frame? (0 indicates that an infinite number of pings can be sent without
    /// sending a data frame or header frame)
    pub fn http2_max_pings_without_data(mut self, num: i32) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA),
            Options::Integer(num),
        );
        self
    }

    /// How many misbehaving pings the server can bear before sending goaway and
    /// closing the transport? (0 indicates that the server can bear an infinite
    /// number of misbehaving pings)
    pub fn http2_max_ping_strikes(mut self, num: i32) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_PING_STRIKES),
            Options::Integer(num),
        );
        self
    }

    /// If set to zero, disables use of http proxies.
    pub fn enable_http_proxy(mut self, num: bool) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_ENABLE_HTTP_PROXY),
            Options::Integer(num as i32),
        );
        self
    }

    /// Set default compression algorithm for the channel.
    pub fn default_compression_algorithm(mut self, algo: CompressionAlgorithms) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM),
            Options::Integer(algo as i32),
        );
        self
    }

    /// Set default gzip compression level.
    #[cfg(feature = "nightly")]
    pub fn default_gzip_compression_level(mut self, level: usize) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_GZIP_COMPRESSION_LEVEL),
            Options::Integer(level as i32),
        );
        self
    }

    /// Set default grpc min message size to compression.
    #[cfg(feature = "nightly")]
    pub fn default_grpc_min_message_size_to_compress(
        mut self,
        lower_bound: usize,
    ) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_MIN_MESSAGE_SIZE_TO_COMPRESS),
            Options::Integer(lower_bound as i32),
        );
        self
    }

    /// Set default compression level for the channel.
    pub fn default_compression_level(mut self, level: CompressionLevel) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL),
            Options::Integer(level as i32),
        );
        self
    }

    /// After a duration of this time the client/server pings its peer to see
    /// if the transport is still alive.
    pub fn keepalive_time(mut self, timeout: Duration) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_TIME_MS),
            Options::Integer(dur_to_ms(timeout)),
        );
        self
    }

    /// After waiting for a duration of this time, if the keepalive ping sender does
    /// not receive the ping ack, it will close the transport.
    pub fn keepalive_timeout(mut self, timeout: Duration) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_TIMEOUT_MS),
            Options::Integer(dur_to_ms(timeout)),
        );
        self
    }

    /// Is it permissible to send keepalive pings without any outstanding streams.
    pub fn keepalive_permit_without_calls(mut self, allow: bool) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS),
            Options::Integer(allow as i32),
        );
        self
    }

    /// Set optimization target for the channel. See [`OptTarget`] for all available
    /// optimization targets. Defaults to `OptTarget::Blend`.
    pub fn optimize_for(mut self, target: OptTarget) -> ChannelBuilder {
        let val = match target {
            OptTarget::Latency => CString::new("latency"),
            OptTarget::Blend => CString::new("blend"),
            OptTarget::Throughput => CString::new("throughput"),
        };
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_OPTIMIZATION_TARGET),
            Options::String(val.unwrap()),
        );
        self
    }

    /// Set LbPolicy for channel
    ///
    /// This method allows one to set the load-balancing policy for a given channel.
    pub fn load_balancing_policy(mut self, lb_policy: LbPolicy) -> ChannelBuilder {
        let val = match lb_policy {
            LbPolicy::PickFirst => CString::new("pick_first"),
            LbPolicy::RoundRobin => CString::new("round_robin"),
        };
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_LB_POLICY_NAME),
            Options::String(val.unwrap()),
        );
        self
    }

    /// Set use local subchannel pool
    ///
    /// This method allows channel use it's owned subchannel pool.
    pub fn use_local_subchannel_pool(mut self, enable: bool) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL),
            Options::Integer(enable as i32),
        );
        self
    }

    /// Enables retry functionality.  Defaults to true.  When enabled, transparent
    /// retries will be performed as appropriate, and configurable retries are
    /// enabled when they are configured via the service config. For details, see:
    ///   https://github.com/grpc/proposal/blob/master/A6-client-retries.md
    /// NOTE: Hedging functionality is not yet implemented.
    pub fn enable_retry(mut self, enable: bool) -> ChannelBuilder {
        self.options.insert(
            Cow::Borrowed(grpcio_sys::GRPC_ARG_ENABLE_RETRIES),
            Options::Integer(enable as i32),
        );
        self
    }

    /// Set a raw integer configuration.
    ///
    /// This method is only for bench usage, users should use the encapsulated API instead.
    #[doc(hidden)]
    pub fn raw_cfg_int(mut self, key: CString, val: i32) -> ChannelBuilder {
        self.options
            .insert(Cow::Owned(key.into_bytes_with_nul()), Options::Integer(val));
        self
    }

    /// Set a raw string configuration.
    ///
    /// This method is only for bench usage, users should use the encapsulated API instead.
    #[doc(hidden)]
    pub fn raw_cfg_string(mut self, key: CString, val: CString) -> ChannelBuilder {
        self.options
            .insert(Cow::Owned(key.into_bytes_with_nul()), Options::String(val));
        self
    }

    /// Build `ChannelArgs` from the current configuration.
    #[allow(clippy::useless_conversion)]
    #[allow(clippy::cmp_owned)]
    pub fn build_args(&self) -> ChannelArgs {
        let args = unsafe { grpc_sys::grpcwrap_channel_args_create(self.options.len()) };
        for (i, (k, v)) in self.options.iter().enumerate() {
            let key = k.as_ptr() as *const c_char;
            match *v {
                Options::Integer(val) => unsafe {
                    // On most modern compiler and architect, c_int is the same as i32,
                    // panic directly to simplify signature.
                    assert!(
                        val <= i32::from(libc::INT_MAX) && val >= i32::from(libc::INT_MIN),
                        "{} is out of range for {:?}",
                        val,
                        CStr::from_bytes_with_nul(k).unwrap()
                    );
                    grpc_sys::grpcwrap_channel_args_set_integer(args, i, key, val as c_int)
                },
                Options::String(ref val) => unsafe {
                    grpc_sys::grpcwrap_channel_args_set_string(args, i, key, val.as_ptr())
                },
                Options::Pointer(ref quota, vtable) => unsafe {
                    grpc_sys::grpcwrap_channel_args_set_pointer_vtable(
                        args,
                        i,
                        key,
                        quota.get_ptr() as _,
                        vtable,
                    )
                },
            }
        }
        ChannelArgs { args }
    }

    fn prepare_connect_args(&mut self) -> ChannelArgs {
        if let Entry::Vacant(e) = self.options.entry(Cow::Borrowed(
            grpcio_sys::GRPC_ARG_PRIMARY_USER_AGENT_STRING,
        )) {
            e.insert(Options::String(format_user_agent_string("")));
        }
        self.build_args()
    }

    /// Build an [`Channel`] that connects to a specific address.
    pub fn connect(mut self, addr: &str) -> Channel {
        let args = self.prepare_connect_args();
        let addr = CString::new(addr).unwrap();
        let addr_ptr = addr.as_ptr();
        let mut creds = self
            .credentials
            .unwrap_or_else(ChannelCredentials::insecure);
        let channel =
            unsafe { grpcio_sys::grpc_channel_create(addr_ptr, creds.as_mut_ptr(), args.args) };

        unsafe { Channel::new(self.env.pick_cq(), self.env, channel) }
    }

    /// Build an [`Channel`] taking over an established connection from
    /// a file descriptor. The target string given is purely informative to
    /// describe the endpoint of the connection. Takes ownership of the given
    /// file descriptor and will close it when the connection is closed.
    ///
    /// This function is available on posix systems only.
    ///
    /// # Safety
    ///
    /// The file descriptor must correspond to a connected stream socket. After
    /// this call, the socket must not be accessed (read / written / closed)
    /// by other code.
    #[cfg(unix)]
    pub unsafe fn connect_from_fd(mut self, target: &str, fd: ::std::os::raw::c_int) -> Channel {
        let args = self.prepare_connect_args();
        let target = CString::new(target).unwrap();
        let target_ptr = target.as_ptr();
        // Actually only insecure credentials are supported currently.
        let mut creds = self
            .credentials
            .unwrap_or_else(ChannelCredentials::insecure);
        let channel =
            grpcio_sys::grpc_channel_create_from_fd(target_ptr, fd, creds.as_mut_ptr(), args.args);

        Channel::new(self.env.pick_cq(), self.env, channel)
    }
}

#[cfg(feature = "_secure")]
mod secure_channel {
    use std::borrow::Cow;
    use std::ffi::CString;

    use crate::ChannelCredentials;

    use super::{ChannelBuilder, Options};

    const OPT_SSL_TARGET_NAME_OVERRIDE: &[u8] = b"grpc.ssl_target_name_override\0";

    impl ChannelBuilder {
        /// The caller of the secure_channel_create functions may override the target name used
        /// for SSL host name checking using this channel argument.
        ///
        /// This *should* be used for testing only.
        #[doc(hidden)]
        pub fn override_ssl_target<S: Into<Vec<u8>>>(mut self, target: S) -> ChannelBuilder {
            let target = CString::new(target).unwrap();
            self.options.insert(
                Cow::Borrowed(OPT_SSL_TARGET_NAME_OVERRIDE),
                Options::String(target),
            );
            self
        }

        /// Set the credentials used to build the connection.
        pub fn set_credentials(mut self, creds: ChannelCredentials) -> ChannelBuilder {
            self.credentials = Some(creds);
            self
        }
    }
}

pub struct ChannelArgs {
    args: *mut grpc_channel_args,
}

impl ChannelArgs {
    pub fn as_ptr(&self) -> *const grpc_channel_args {
        self.args
    }
}

impl Drop for ChannelArgs {
    fn drop(&mut self) {
        unsafe { grpc_sys::grpcwrap_channel_args_destroy(self.args) }
    }
}

struct ChannelInner {
    _env: Arc<Environment>,
    channel: *mut grpc_channel,
}

impl ChannelInner {
    // If try_to_connect is true, the channel will try to establish a connection, potentially
    // changing the state.
    fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
        let should_try = if try_to_connect { 1 } else { 0 };
        unsafe { grpc_sys::grpc_channel_check_connectivity_state(self.channel, should_try) }
    }
}

impl Drop for ChannelInner {
    fn drop(&mut self) {
        unsafe {
            grpc_sys::grpc_channel_destroy(self.channel);
        }
    }
}

/// A gRPC channel.
///
/// Channels are an abstraction of long-lived connections to remote servers. More client objects
/// can reuse the same channel.
///
/// Use [`ChannelBuilder`] to build a [`Channel`].
#[derive(Clone)]
pub struct Channel {
    inner: Arc<ChannelInner>,
    cq: CompletionQueue,
}

#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl Send for Channel {}
unsafe impl Sync for Channel {}

impl Channel {
    /// Create a new channel. Avoid using this directly and use
    /// [`ChannelBuilder`] to build a [`Channel`] instead.
    ///
    /// # Safety
    ///
    /// The given grpc_channel must correspond to an instantiated grpc core
    /// channel. Takes exclusive ownership of the channel and will close it after
    /// use.
    pub unsafe fn new(
        cq: CompletionQueue,
        env: Arc<Environment>,
        channel: *mut grpc_channel,
    ) -> Channel {
        Channel {
            inner: Arc::new(ChannelInner { _env: env, channel }),
            cq,
        }
    }

    /// If try_to_connect is true, the channel will try to establish a connection, potentially
    /// changing the state.
    pub fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
        self.inner.check_connectivity_state(try_to_connect)
    }

    /// Blocking wait for channel state change or deadline expiration.
    ///
    /// `check_connectivity_state` needs to be called to get the current state. Returns false
    /// means deadline excceeds before observing any state changes.
    pub fn wait_for_state_change(
        &self,
        last_observed: ConnectivityState,
        deadline: impl Into<Deadline>,
    ) -> impl Future<Output = bool> {
        let (cq_f, prom) = CallTag::action_pair();
        let prom_box = Box::new(prom);
        let tag = Box::into_raw(prom_box);
        let should_wait = if let Ok(cq_ref) = self.cq.borrow() {
            unsafe {
                grpcio_sys::grpc_channel_watch_connectivity_state(
                    self.inner.channel,
                    last_observed,
                    deadline.into().spec(),
                    cq_ref.as_ptr(),
                    tag as *mut _,
                )
            }
            true
        } else {
            // It's already shutdown.
            false
        };
        async move { should_wait && cq_f.await.unwrap() }
    }

    /// Wait for this channel to be connected.
    ///
    /// Returns false means deadline excceeds before connection is connected.
    pub async fn wait_for_connected(&self, deadline: impl Into<Deadline>) -> bool {
        // Fast path, it's probably connected.
        let mut state = self.check_connectivity_state(true);
        if ConnectivityState::GRPC_CHANNEL_READY == state {
            return true;
        }
        let deadline = deadline.into();
        loop {
            if self.wait_for_state_change(state, deadline).await {
                state = self.check_connectivity_state(true);
                match state {
                    ConnectivityState::GRPC_CHANNEL_READY => return true,
                    ConnectivityState::GRPC_CHANNEL_SHUTDOWN => return false,
                    _ => (),
                }
                continue;
            }
            return false;
        }
    }

    /// Create a Kicker.
    pub(crate) fn create_kicker(&self) -> Result<Kicker> {
        let cq_ref = self.cq.borrow()?;
        let raw_call = unsafe {
            let ch = self.inner.channel;
            let cq = cq_ref.as_ptr();
            // Do not timeout.
            let timeout = gpr_timespec::inf_future();
            grpc_sys::grpcwrap_channel_create_call(
                ch,
                ptr::null_mut(),
                0,
                cq,
                ptr::null(),
                0,
                ptr::null(),
                0,
                timeout,
            )
        };
        let call = unsafe { Call::from_raw(raw_call, self.cq.clone()) };
        Ok(Kicker::from_call(call))
    }

    /// Create a call using the method and option.
    pub(crate) fn create_call<Req, Resp>(
        &self,
        method: &Method<Req, Resp>,
        opt: &CallOption,
    ) -> Result<Call> {
        let cq_ref = self.cq.borrow()?;
        let raw_call = unsafe {
            let ch = self.inner.channel;
            let cq = cq_ref.as_ptr();
            let method_ptr = method.name.as_ptr();
            let method_len = method.name.len();
            let timeout = opt
                .get_timeout()
                .map_or_else(gpr_timespec::inf_future, gpr_timespec::from);
            grpc_sys::grpcwrap_channel_create_call(
                ch,
                ptr::null_mut(),
                0,
                cq,
                method_ptr as *const _,
                method_len,
                ptr::null(),
                0,
                timeout,
            )
        };

        unsafe { Ok(Call::from_raw(raw_call, self.cq.clone())) }
    }

    pub(crate) fn cq(&self) -> &CompletionQueue {
        &self.cq
    }
}

#[cfg(test)]
#[cfg(feature = "nightly")]
mod tests {
    use crate::env::Environment;
    use crate::ChannelBuilder;
    use std::sync::Arc;

    #[test]
    #[cfg(feature = "nightly")]
    fn test_grpc_min_message_size_to_compress() {
        let env = Arc::new(Environment::new(1));
        let cb = ChannelBuilder::new(env);
        cb.default_grpc_min_message_size_to_compress(1);
    }
    #[test]
    #[cfg(feature = "nightly")]
    fn test_gzip_compression_level() {
        let env = Arc::new(Environment::new(1));
        let cb = ChannelBuilder::new(env);
        cb.default_gzip_compression_level(1);
    }
}