#[macro_use]
extern crate log;
extern crate protobuf;
pub mod bucket;
pub mod data_type;
pub mod errors;
pub mod object;
pub mod preflist;
pub mod secondary_index;
pub mod stream;
pub mod yokozuna;
mod connection;
mod rpb;
mod utils;
use bucket::{BucketProps, BucketTypeProps};
use connection::RiakConn;
use data_type::{DataTypeFetchResp, DataTypeFetchReq};
use errors::RiakErr;
use object::{DeleteObjectReq, FetchObjectResp, FetchObjectReq, StoreObjectReq};
use preflist::PreflistItem;
use protobuf::{Message, parse_from_bytes};
use rpb::codes;
use rpb::riak::{RpbGetBucketResp, RpbGetBucketReq, RpbGetServerInfoResp, RpbGetBucketTypeReq,
RpbResetBucketReq};
use rpb::riak_dt::DtFetchResp;
use rpb::riak_kv::{RpbGetBucketKeyPreflistResp, RpbGetBucketKeyPreflistReq, RpbGetResp,
RpbIndexResp, RpbMapRedResp, RpbMapRedReq};
use rpb::riak_search::RpbSearchQueryResp;
use rpb::riak_yokozuna::{RpbYokozunaIndexDeleteReq, RpbYokozunaIndexGetResp, RpbYokozunaIndexGetReq,
RpbYokozunaSchema, RpbYokozunaSchemaGetResp, RpbYokozunaSchemaGetReq,
RpbYokozunaSchemaPutReq};
use secondary_index::{IndexResp, IndexReq};
use std::net::ToSocketAddrs;
use stream::{BucketStream, KeyStream, SecondaryIndexStream};
use utils::{BucketPropsPrivate, DataTypeFetchRespPrivate, FetchObjectRespPrivate, IndexRespPrivate,
ProtobufBytes, SearchQueryRespPrivate, YokozunaIndexPrivate};
use yokozuna::{SearchQuery, SearchQueryResp, YokozunaIndex};
static DEFAULT_TIMEOUT: u32 = 3600;
#[derive(Debug)]
pub struct Client {
connection: RiakConn,
timeout: u32,
}
impl Client {
pub fn new<A: ToSocketAddrs>(addr: A) -> Result<Client, RiakErr> {
Client::new_with_timeout(addr, DEFAULT_TIMEOUT)
}
pub fn new_with_timeout<A: ToSocketAddrs>(addr: A, timeout: u32) -> Result<Client, RiakErr> {
let connection = match RiakConn::new(addr, timeout) {
Ok(connection) => connection,
Err(error) => return Err(error),
};
Ok(Client {
connection: connection,
timeout: timeout,
})
}
pub fn set_timeout(&mut self, timeout: u32) {
self.timeout = timeout;
}
pub fn reconnect(&mut self) -> Result<(), RiakErr> {
self.connection.reconnect()
}
pub fn ping(&mut self) -> Result<(), RiakErr> {
let ping_data: Vec<u8> = vec![];
match self.connection.exchange(codes::RpbPingReq, codes::RpbPingResp, &ping_data) {
Ok(_) => Ok(()),
Err(err) => Err(err),
}
}
pub fn server_info(&mut self) -> Result<(String, String), RiakErr> {
let response = match self.connection.exchange(codes::RpbGetServerInfoReq,
codes::RpbGetServerInfoResp,
&vec![]) {
Ok(response) => response,
Err(error) => return Err(error),
};
let rpb_get_server_info_resp = match parse_from_bytes::<RpbGetServerInfoResp>(&response) {
Ok(response) => response,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
Ok((String::from_utf8_lossy(rpb_get_server_info_resp.get_node()).into_owned(),
String::from_utf8_lossy(rpb_get_server_info_resp.get_server_version()).into_owned()))
}
pub fn stream_buckets(&mut self) -> Result<BucketStream, RiakErr> {
BucketStream::new(self)
}
pub fn list_buckets(&mut self) -> Result<Vec<Vec<u8>>, RiakErr> {
let mut bucket_stream = match self.stream_buckets() {
Ok(bucket_stream) => bucket_stream,
Err(error) => return Err(error),
};
bucket_stream.all()
}
pub fn set_bucket_properties(&mut self, bucket_props: BucketProps) -> Result<(), RiakErr> {
let bytes = match bucket_props.write_to_bytes() {
Ok(b) => b,
Err(err) => return Err(err),
};
match self.connection.exchange(codes::RpbSetBucketReq, codes::RpbSetBucketResp, &bytes) {
Ok(_) => Ok(()),
Err(err) => Err(err),
}
}
pub fn get_bucket_properties<T: Into<Vec<u8>>>(&mut self,
bucket_name: T)
-> Result<BucketProps, RiakErr> {
let bucket_name = bucket_name.into();
let mut req = RpbGetBucketReq::new();
req.set_bucket(bucket_name.clone());
let bytes = match req.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
let response = match self.connection
.exchange(codes::RpbGetBucketReq, codes::RpbGetBucketResp, &bytes) {
Ok(response) => response,
Err(error) => return Err(error),
};
let mut rpb_get_bucket_resp = match parse_from_bytes::<RpbGetBucketResp>(&response) {
Ok(rpb_get_bucket_resp) => rpb_get_bucket_resp,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
let rpb_bucket_props = rpb_get_bucket_resp.take_props();
let mut bucket_props = BucketProps::new(bucket_name);
bucket_props.set_props(rpb_bucket_props);
Ok(bucket_props)
}
pub fn set_bucket_type_properties(&mut self,
bucket_type_props: BucketTypeProps)
-> Result<(), RiakErr> {
let bytes = match bucket_type_props.write_to_bytes() {
Ok(b) => b,
Err(err) => return Err(err),
};
match self.connection
.exchange(codes::RpbSetBucketTypeReq, codes::RpbSetBucketResp, &bytes) {
Ok(_) => Ok(()),
Err(err) => Err(err),
}
}
pub fn get_bucket_type_properties<T: Into<Vec<u8>>>(&mut self,
bucket_type_name: T)
-> Result<BucketTypeProps, RiakErr> {
let bucket_type_name = bucket_type_name.into();
let mut req = RpbGetBucketTypeReq::new();
req.set_field_type(bucket_type_name.clone());
let bytes = match req.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
let response = match self.connection
.exchange(codes::RpbGetBucketTypeReq, codes::RpbGetBucketResp, &bytes) {
Ok(response) => response,
Err(error) => return Err(error),
};
let mut rpb_get_bucket_resp = match parse_from_bytes::<RpbGetBucketResp>(&response) {
Ok(rpb_get_bucket_resp) => rpb_get_bucket_resp,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
let rpb_bucket_props = rpb_get_bucket_resp.take_props();
let mut bucket_type_props = BucketTypeProps::new(bucket_type_name);
bucket_type_props.set_props(rpb_bucket_props);
Ok(bucket_type_props)
}
pub fn reset_bucket<T: Into<Vec<u8>>>(&mut self,
bucket_type_name: T,
bucket_name: T)
-> Result<(), RiakErr> {
let mut request = RpbResetBucketReq::new();
request.set_field_type(bucket_type_name.into());
request.set_bucket(bucket_name.into());
let bytes = match request.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
match self.connection
.exchange(codes::RpbResetBucketReq, codes::RpbResetBucketResp, &bytes) {
Ok(_) => Ok(()),
Err(error) => Err(error),
}
}
pub fn stream_keys<T: Into<Vec<u8>>>(&mut self, bucket: T) -> Result<KeyStream, RiakErr> {
KeyStream::new(self, bucket.into())
}
pub fn list_keys<T: Into<Vec<u8>>>(&mut self, bucket: T) -> Result<Vec<Vec<u8>>, RiakErr> {
match KeyStream::new(self, bucket.into()) {
Ok(mut keys) => keys.all(),
Err(error) => Err(error),
}
}
pub fn store_object(&mut self, req: StoreObjectReq) -> Result<(), RiakErr> {
let bytes = match req.write_to_bytes() {
Ok(bytes) => bytes,
Err(err) => return Err(err),
};
match self.connection.exchange(codes::RpbPutReq, codes::RpbPutResp, &bytes) {
Ok(_) => Ok(()),
Err(err) => Err(err),
}
}
pub fn fetch_object(&mut self, req: FetchObjectReq) -> Result<FetchObjectResp, RiakErr> {
let bytes = match req.write_to_bytes() {
Ok(b) => b,
Err(err) => return Err(err),
};
let response = match self.connection
.exchange(codes::RpbGetReq, codes::RpbGetResp, &bytes) {
Ok(response) => response,
Err(error) => return Err(error),
};
let rpb_get_resp = match parse_from_bytes::<RpbGetResp>(&response) {
Ok(rpb_get_resp) => rpb_get_resp,
Err(err) => return Err(RiakErr::ProtobufError(err)),
};
Ok(FetchObjectResp::new_from_rpb(rpb_get_resp))
}
pub fn delete_object(&mut self, request: DeleteObjectReq) -> Result<(), RiakErr> {
let bytes = match request.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Err(error),
};
match self.connection.exchange(codes::RpbDelReq, codes::RpbDelResp, &bytes) {
Ok(_) => Ok(()),
Err(error) => Err(error),
}
}
pub fn fetch_preflist<T: Into<Vec<u8>>>(&mut self,
bucket: T,
key: T)
-> Result<Vec<PreflistItem>, RiakErr> {
let mut req = RpbGetBucketKeyPreflistReq::new();
req.set_bucket(bucket.into());
req.set_key(key.into());
let bytes = match req.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
let response = match self.connection
.exchange(codes::RpbGetBucketKeyPreflistReq,
codes::RpbGetBucketKeyPreflistResp,
&bytes) {
Ok(response) => response,
Err(error) => return Err(error),
};
let rpb_preflist_resp = match parse_from_bytes::<RpbGetBucketKeyPreflistResp>(&response) {
Ok(parsed) => parsed,
Err(err) => return Err(RiakErr::ProtobufError(err)),
};
let mut preflist: Vec<PreflistItem> = Vec::new();
for preflist_item in rpb_preflist_resp.get_preflist() {
let node = String::from_utf8_lossy(preflist_item.get_node()).into_owned();
let converted = PreflistItem::new(preflist_item.get_partition(),
&node,
preflist_item.get_primary());
preflist.push(converted);
}
Ok(preflist)
}
pub fn set_yokozuna_schema<T: Into<Vec<u8>>>(&mut self,
name: T,
content: T)
-> Result<(), RiakErr> {
let mut schema = RpbYokozunaSchema::new();
schema.set_name(name.into());
schema.set_content(content.into());
let mut req = RpbYokozunaSchemaPutReq::new();
req.set_schema(schema);
let bytes = match req.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
match self.connection
.exchange(codes::RpbYokozunaSchemaPutReq, codes::RpbPutResp, &bytes) {
Ok(_) => Ok(()),
Err(error) => Err(error),
}
}
pub fn get_yokozuna_schema<T: Into<Vec<u8>>>(&mut self, name: T) -> Result<Vec<u8>, RiakErr> {
let mut req = RpbYokozunaSchemaGetReq::new();
req.set_name(name.into());
let bytes = match req.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
let response = match self.connection.exchange(codes::RpbYokozunaSchemaGetReq,
codes::RpbYokozunaSchemaGetResp,
&bytes) {
Ok(response) => response,
Err(error) => return Err(error),
};
let mut rpb_yokozuna_schema_get_resp =
match parse_from_bytes::<RpbYokozunaSchemaGetResp>(&response) {
Ok(parsed) => parsed,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
let mut rpb_yokozuna_schema = rpb_yokozuna_schema_get_resp.take_schema();
Ok(rpb_yokozuna_schema.take_content())
}
pub fn set_yokozuna_index(&mut self, index: YokozunaIndex) -> Result<(), RiakErr> {
let bytes = match index.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Err(error),
};
match self.connection.exchange(codes::RpbYokozunaIndexPutReq, codes::RpbPutResp, &bytes) {
Ok(_) => Ok(()),
Err(error) => Err(error),
}
}
pub fn get_yokozuna_index<T: Into<Vec<u8>>>(&mut self,
name: T)
-> Result<Vec<YokozunaIndex>, RiakErr> {
let mut req = RpbYokozunaIndexGetReq::new();
req.set_name(name.into());
let bytes = match req.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
let response = match self.connection.exchange(codes::RpbYokozunaIndexGetReq,
codes::RpbYokozunaIndexGetResp,
&bytes) {
Ok(response) => response,
Err(error) => return Err(error),
};
let mut rpb_yokozuna_index_get_resp =
match parse_from_bytes::<RpbYokozunaIndexGetResp>(&response) {
Ok(rpb_yokozuna_index_get_resp) => rpb_yokozuna_index_get_resp,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
let mut indexes: Vec<YokozunaIndex> = Vec::new();
for rpb_yokozuna_index in rpb_yokozuna_index_get_resp.take_index().into_iter() {
let index = YokozunaIndex::new_from_rpb_yokozuna_index(rpb_yokozuna_index);
indexes.push(index);
}
Ok(indexes)
}
pub fn delete_yokozuna_index<T: Into<Vec<u8>>>(&mut self, name: T) -> Result<(), RiakErr> {
let mut req = RpbYokozunaIndexDeleteReq::new();
req.set_name(name.into());
let bytes = match req.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
match self.connection
.exchange(codes::RpbYokozunaIndexDeleteReq, codes::RpbDelResp, &bytes) {
Ok(_) => Ok(()),
Err(error) => Err(error),
}
}
pub fn search(&mut self, query: SearchQuery) -> Result<SearchQueryResp, RiakErr> {
let bytes = match query.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Err(error),
};
let response = match self.connection
.exchange(codes::RpbSearchQueryReq, codes::RpbSearchQueryResp, &bytes) {
Ok(response) => response,
Err(error) => return Err(error),
};
let rpb_search_query_resp = match parse_from_bytes::<RpbSearchQueryResp>(&response) {
Ok(rpb_search_query_resp) => rpb_search_query_resp,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
Ok(SearchQueryResp::new_from_rpb(rpb_search_query_resp))
}
pub fn mapreduce<T: Into<Vec<u8>>>(&mut self,
request: T,
content_type: T)
-> Result<Vec<Vec<u8>>, RiakErr> {
let mut req = RpbMapRedReq::new();
req.set_request(request.into());
req.set_content_type(content_type.into());
let bytes = match req.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
let mut data: Vec<Vec<u8>> = Vec::new();
let mut done = false;
while !done {
let response = try!(self.connection
.exchange(codes::RpbMapRedReq, codes::RpbMapRedResp, &bytes));
let mut rpb_map_red_req = match parse_from_bytes::<RpbMapRedResp>(&response) {
Ok(rpb_map_red_req) => rpb_map_red_req,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
data.push(rpb_map_red_req.take_response());
done = rpb_map_red_req.get_done();
}
Ok(data)
}
pub fn data_type_fetch(&mut self,
request: DataTypeFetchReq)
-> Result<DataTypeFetchResp, RiakErr> {
let bytes = match request.write_to_bytes() {
Ok(bytes) => bytes,
Err(e) => return Err(e),
};
let response = try!(self.connection
.exchange(codes::DtFetchReq, codes::DtFetchResp, &bytes));
let data_type_fetch_resp = match parse_from_bytes::<DtFetchResp>(&response) {
Ok(data_type_fetch_resp) => data_type_fetch_resp,
Err(e) => return Err(RiakErr::ProtobufError(e)),
};
Ok(DataTypeFetchResp::new_from_rpb(data_type_fetch_resp))
}
pub fn secondary_index_request_streaming(&mut self, mut req: IndexReq) -> Result<SecondaryIndexStream, RiakErr> {
req.set_stream(true);
SecondaryIndexStream::new(self, req)
}
pub fn secondary_index_request_non_streaming(&mut self, mut req: IndexReq) -> Result<IndexResp, RiakErr> {
req.set_stream(false);
let bytes = match req.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Err(error),
};
let response = try!(self.connection
.exchange(codes::RpbIndexReq, codes::RpbIndexResp, &bytes));
let rpb_index_resp = match parse_from_bytes::<RpbIndexResp>(&response) {
Ok(rpb_index_resp) => rpb_index_resp,
Err(error) => return Err(RiakErr::ProtobufError(error)),
};
Ok(IndexResp::new_from_rpb(rpb_index_resp))
}
}