use cassandra::future::ConnectFuture;
use cassandra::policy::retry::RetryPolicy;
use cassandra::session::Session;
use cassandra::ssl::Ssl;
use cassandra::time::TimestampGen;
use cassandra::util::Protected;
use cassandra_sys::CassCluster as _Cluster;
use cassandra_sys::cass_bool_t;
use cassandra_sys::cass_cluster_free;
use cassandra_sys::cass_cluster_new;
use cassandra_sys::cass_cluster_set_connect_timeout;
use cassandra_sys::cass_cluster_set_connection_heartbeat_interval;
use cassandra_sys::cass_cluster_set_connection_idle_timeout;
use cassandra_sys::cass_cluster_set_contact_points;
use cassandra_sys::cass_cluster_set_core_connections_per_host;
use cassandra_sys::cass_cluster_set_credentials;
use cassandra_sys::cass_cluster_set_latency_aware_routing;
use cassandra_sys::cass_cluster_set_latency_aware_routing_settings;
use cassandra_sys::cass_cluster_set_load_balance_dc_aware;
use cassandra_sys::cass_cluster_set_load_balance_round_robin;
use cassandra_sys::cass_cluster_set_max_concurrent_creation;
use cassandra_sys::cass_cluster_set_max_concurrent_requests_threshold;
use cassandra_sys::cass_cluster_set_max_connections_per_host;
use cassandra_sys::cass_cluster_set_max_requests_per_flush;
use cassandra_sys::cass_cluster_set_num_threads_io;
use cassandra_sys::cass_cluster_set_pending_requests_high_water_mark;
use cassandra_sys::cass_cluster_set_pending_requests_low_water_mark;
use cassandra_sys::cass_cluster_set_port;
use cassandra_sys::cass_cluster_set_protocol_version;
use cassandra_sys::cass_cluster_set_queue_size_event;
use cassandra_sys::cass_cluster_set_queue_size_io;
use cassandra_sys::cass_cluster_set_reconnect_wait_time;
use cassandra_sys::cass_cluster_set_request_timeout;
use cassandra_sys::cass_cluster_set_retry_policy;
use cassandra_sys::cass_cluster_set_ssl;
use cassandra_sys::cass_cluster_set_tcp_keepalive;
use cassandra_sys::cass_cluster_set_tcp_nodelay;
use cassandra_sys::cass_cluster_set_timestamp_gen;
use cassandra_sys::cass_cluster_set_token_aware_routing;
use cassandra_sys::cass_cluster_set_use_schema;
use cassandra_sys::cass_cluster_set_whitelist_filtering;
use cassandra_sys::cass_cluster_set_write_bytes_high_water_mark;
use cassandra_sys::cass_cluster_set_write_bytes_low_water_mark;
use cassandra_sys::cass_false;
use cassandra_sys::cass_future_error_code;
use cassandra_sys::cass_session_connect;
use cassandra_sys::cass_session_new;
use cassandra_sys::cass_true;
use errors::*;
use errors::*;
use std::ffi::CString;
use std::ffi::NulError;
use std::fmt;
use std::iter::Map;
use std::net::AddrParseError;
use std::net::Ipv4Addr;
use std::result;
use std::str::FromStr;
use time::Duration;
#[allow(missing_docs)]
pub enum CqlProtocol {
ONE = 1,
TWO = 2,
THREE = 3,
FOUR = 4,
}
#[derive(Debug)]
pub struct ContactPoints(Vec<Ipv4Addr>);
impl fmt::Display for ContactPoints {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let contact_points: Vec<String> = self.0.iter().map(|ip| format!("{}", ip)).collect();
write!(f, "{} ", contact_points.join(","))
}
}
impl FromStr for ContactPoints {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
let points: Vec<&str> = s.split(',').collect();
let contact_points: result::Result<Vec<Ipv4Addr>, AddrParseError> = points.iter()
.map(|addr| Ipv4Addr::from_str(addr))
.collect();
Ok(ContactPoints(contact_points.unwrap()))
}
}
#[derive(Debug)]
pub struct Cluster(pub *mut _Cluster);
impl Drop for Cluster {
fn drop(&mut self) { unsafe { cass_cluster_free(self.0) } }
}
impl Protected<*mut _Cluster> for Cluster {
fn inner(&self) -> *mut _Cluster { self.0 }
fn build(inner: *mut _Cluster) -> Self { Cluster(inner) }
}
impl Default for Cluster {
fn default() -> Cluster { unsafe { Cluster(cass_cluster_new()) } }
}
impl Cluster {
pub fn set_contact_points<T: Into<ContactPoints>>(&mut self, contact_points: T) -> Result<&mut Self> {
unsafe {
let s = CString::new(contact_points.into().to_string()).expect("must be utf8");
let err = cass_cluster_set_contact_points(self.0, s.as_ptr());
err.to_result(self).chain_err(|| "Could not set contact points")
}
}
pub fn set_port(&mut self, port: u16) -> Result<&mut Self> {
unsafe { cass_cluster_set_port(self.0, port as i32).to_result(self).chain_err(|| "Could not set port") }
}
pub fn set_ssl(&mut self, ssl: &mut Ssl) -> &Self {
unsafe {
cass_cluster_set_ssl(self.0, ssl.inner());
self
}
}
pub fn connect(&mut self) -> Result<Session> {
unsafe {
let session = Session(cass_session_new());
let connect_future = ConnectFuture::build(cass_session_connect(session.0, self.0));
cass_future_error_code(connect_future.inner()).to_result(session).chain_err(|| "Could not connect")
}
}
pub fn set_protocol_version(&mut self, protocol_version: CqlProtocol) -> Result<&mut Self> {
unsafe {
cass_cluster_set_protocol_version(self.0, protocol_version as i32)
.to_result(self)
.chain_err(|| "Couldn't set protocol version")
}
}
pub fn set_num_threads_io(&mut self, num_threads: u32) -> Result<&mut Self> {
unsafe {
cass_cluster_set_num_threads_io(self.0, num_threads)
.to_result(self)
.chain_err(|| "couldn't set thread count")
}
}
pub fn set_queue_size_io(&mut self, queue_size: u32) -> Result<&mut Self> {
unsafe {
cass_cluster_set_queue_size_io(self.0, queue_size)
.to_result(self)
.chain_err(|| "couldn't set io queue size")
}
}
pub fn set_queue_size_event(&mut self, queue_size: u32) -> Result<&mut Self> {
unsafe {
cass_cluster_set_queue_size_event(self.0, queue_size)
.to_result(self)
.chain_err(|| "couldn't set event queue size")
}
}
pub fn set_core_connections_per_host(&mut self, num_connections: u32) -> Result<&mut Self> {
unsafe {
cass_cluster_set_core_connections_per_host(self.0, num_connections)
.to_result(self)
.chain_err(|| "couldn't set connections per host")
}
}
pub fn set_max_connections_per_host(&mut self, num_connections: u32) -> Result<&mut Self> {
unsafe {
cass_cluster_set_max_connections_per_host(self.0, num_connections)
.to_result(self)
.chain_err(|| "couldn't set max connections per host")
}
}
pub fn set_reconnect_wait_time(&mut self, wait_time: u32) -> &Self {
unsafe {
cass_cluster_set_reconnect_wait_time(self.0, wait_time);
}
self
}
pub fn set_max_concurrent_creation(&mut self, num_connections: u32) -> Result<&mut Self> {
unsafe {
cass_cluster_set_max_concurrent_creation(self.0, num_connections)
.to_result(self)
.chain_err(|| "couldn't set max_concurrent_creation")
}
}
pub fn set_max_concurrent_requests_threshold(&mut self, num_requests: u32) -> Result<&mut Self> {
unsafe {
cass_cluster_set_max_concurrent_requests_threshold(self.0, num_requests)
.to_result(self)
.chain_err(|| "couldn't set max concurrend requests threshold")
}
}
pub fn set_max_requests_per_flush(&mut self, num_requests: u32) -> Result<&mut Self> {
unsafe {
cass_cluster_set_max_requests_per_flush(self.0, num_requests)
.to_result(self)
.chain_err(|| "couldn't set max requests per flush")
}
}
pub fn set_write_bytes_high_water_mark(&mut self, num_bytes: u32) -> Result<&mut Self> {
unsafe {
cass_cluster_set_write_bytes_high_water_mark(self.0, num_bytes)
.to_result(self)
.chain_err(|| "couldn't set write bytes high water mark")
}
}
pub fn set_write_bytes_low_water_mark(&mut self, num_bytes: u32) -> Result<&mut Self> {
unsafe {
cass_cluster_set_write_bytes_low_water_mark(self.0, num_bytes)
.to_result(self)
.chain_err(|| "couldn't set write bytes low water mark")
}
}
pub fn set_pending_requests_high_water_mark(&mut self, num_requests: u32) -> Result<&mut Self> {
unsafe {
cass_cluster_set_pending_requests_high_water_mark(self.0, num_requests)
.to_result(self)
.chain_err(|| "couldn't set pending requests high water mark")
}
}
pub fn set_pending_requests_low_water_mark(&mut self, num_requests: u32) -> Result<&mut Self> {
unsafe {
cass_cluster_set_pending_requests_low_water_mark(self.0, num_requests)
.to_result(self)
.chain_err(|| "couldn't set pending requests low water mark")
}
}
#[allow(cast_possible_truncation,cast_sign_loss)]
pub fn set_connect_timeout(&mut self, timeout: Duration) -> &Self {
unsafe {
cass_cluster_set_connect_timeout(self.0, timeout.num_milliseconds() as u32);
}
self
}
#[allow(cast_possible_truncation,cast_sign_loss)]
pub fn set_request_timeout(&mut self, timeout: Duration) -> &Self {
unsafe {
cass_cluster_set_request_timeout(self.0, timeout.num_milliseconds() as u32);
}
self
}
pub fn set_credentials(&mut self, username: &str, password: &str) -> Result<&Self> {
unsafe {
cass_cluster_set_credentials(self.0,
CString::new(username).chain_err(|| "username not a valid CString")?.as_ptr(),
CString::new(password).chain_err(|| "password not a valid CString")?.as_ptr());
}
Ok(self)
}
pub fn set_load_balance_round_robin(&mut self) -> &Self {
unsafe {
cass_cluster_set_load_balance_round_robin(self.0);
self
}
}
pub fn set_load_balance_dc_aware<S>(&mut self, local_dc: &str, used_hosts_per_remote_dc: u32,
allow_remote_dcs_for_local_cl: cass_bool_t)
-> Result<&mut Self> {
unsafe {
{
let local_dc = CString::new(local_dc).expect("must be utf8");
cass_cluster_set_load_balance_dc_aware(self.0,
local_dc.as_ptr(),
used_hosts_per_remote_dc,
allow_remote_dcs_for_local_cl)
}
.to_result(self)
.chain_err(|| "couldn't set dc aware load balancing policy")
}
}
pub fn set_token_aware_routing(&mut self, enabled: bool) -> &Self {
unsafe {
cass_cluster_set_token_aware_routing(self.0, if enabled { cass_true } else { cass_false });
}
self
}
pub fn set_latency_aware_routing(&mut self, enabled: bool) -> &Self {
unsafe {
cass_cluster_set_latency_aware_routing(self.0, if enabled { cass_true } else { cass_false });
}
self
}
#[allow(cast_sign_loss)]
pub fn set_latency_aware_routing_settings(&mut self, exclusion_threshold: f64, scale: Duration,
retry_period: Duration, update_rate: Duration, min_measured: u64)
-> &Self {
unsafe {
cass_cluster_set_latency_aware_routing_settings(self.0,
exclusion_threshold,
scale.num_milliseconds() as u64,
retry_period.num_milliseconds() as u64,
update_rate.num_milliseconds() as u64,
min_measured);
}
self
}
pub fn set_whitelist_filtering(&mut self, hosts: Vec<String>) -> &Self {
unsafe {
cass_cluster_set_whitelist_filtering(self.0, hosts.join(",").as_ptr() as *const i8);
}
self
}
pub fn set_tcp_nodelay(&mut self, enable: bool) -> &Self {
unsafe {
cass_cluster_set_tcp_nodelay(self.0, if enable { cass_true } else { cass_false });
}
self
}
#[allow(cast_possible_truncation,cast_sign_loss)]
pub fn set_tcp_keepalive(&mut self, enable: bool, delay: Duration) -> &Self {
unsafe {
cass_cluster_set_tcp_keepalive(self.0,
if enable { cass_true } else { cass_false },
delay.num_seconds() as u32);
}
self
}
pub fn set_timestamp_gen(&mut self, tsg: &TimestampGen) -> &mut Self {
unsafe {
cass_cluster_set_timestamp_gen(self.0, TimestampGen::inner(tsg));
self
}
}
#[allow(cast_possible_truncation,cast_sign_loss)]
pub fn set_connection_heartbeat_interval(&mut self, hearbeat: Duration) -> &mut Self {
unsafe {
cass_cluster_set_connection_heartbeat_interval(self.0, hearbeat.num_seconds() as u32);
self
}
}
#[allow(cast_possible_truncation,cast_sign_loss)]
pub fn set_connection_idle_timeout(&mut self, timeout: Duration) -> &mut Self {
unsafe {
cass_cluster_set_connection_idle_timeout(self.0, timeout.num_seconds() as u32);
self
}
}
pub fn set_retry_policy(&mut self, retry_policy: RetryPolicy) -> &mut Self {
unsafe {
cass_cluster_set_retry_policy(self.0, retry_policy.inner());
self
}
}
pub fn set_use_schema(&mut self, enabled: bool) -> &Self {
unsafe {
cass_cluster_set_use_schema(self.0, if enabled { cass_true } else { cass_false });
}
self
}
}