use Client;
use connection::RiakConn;
use errors::RiakErr;
use protobuf::{Message, parse_from_bytes};
use rpb::codes;
use rpb::riak_kv::{RpbListBucketsResp, RpbListBucketsReq, RpbIndexResp, RpbListKeysResp,
RpbListKeysReq};
use secondary_index::{IndexReq, IndexResp};
use utils::{IndexRespPrivate, ProtobufBytes};
#[derive(Debug)]
pub struct BucketStream {
connection: RiakConn,
done: bool,
first_request_made: bool,
}
impl BucketStream {
pub fn new(client: &mut Client) -> Result<BucketStream, RiakErr> {
let connection = match RiakConn::new(client.connection.peer_addr,
client.connection.timeout) {
Ok(connection) => connection,
Err(error) => return Err(error),
};
Ok(BucketStream {
connection: connection,
done: false,
first_request_made: false,
})
}
pub fn all(&mut self) -> Result<Vec<Vec<u8>>, RiakErr> {
let mut buckets: Vec<Vec<u8>> = Vec::new();
loop {
let next_buckets = match self.next() {
Some(result) => {
match result {
Ok(next_buckets) => next_buckets,
Err(error) => return Err(error),
}
}
None => break,
};
buckets.extend(next_buckets.iter().cloned());
}
Ok(buckets)
}
pub fn next(&mut self) -> Option<Result<Vec<Vec<u8>>, RiakErr>> {
if self.done {
return None;
}
if self.first_request_made {
let response = match self.connection.receive(codes::RpbListBucketsResp) {
Ok(response) => response,
Err(error) => return Some(Err(error)),
};
let mut rpb_resp = match parse_from_bytes::<RpbListBucketsResp>(&response) {
Ok(parsed) => parsed,
Err(error) => {
match self.connection.reconnect() {
Ok(()) => (),
Err(error) => {
debug!("failure during reconnect: {:?}", error);
return Some(Err(error));
}
};
return Some(Err(RiakErr::ProtobufError(error)));
}
};
let resp_buckets = rpb_resp.take_buckets();
let mut buckets: Vec<Vec<u8>> = Vec::new();
for bucket in resp_buckets.into_iter() {
buckets.push(bucket);
}
self.done = rpb_resp.get_done();
Some(Ok(buckets))
} else {
let mut request = RpbListBucketsReq::new();
request.set_stream(true);
request.set_timeout(self.connection.timeout);
let bytes = match request.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Some(Err(RiakErr::ProtobufError(error))),
};
let response = match self.connection
.exchange(codes::RpbListBucketsReq, codes::RpbListBucketsResp, &bytes) {
Ok(response) => response,
Err(error) => {
match self.connection.reconnect() {
Ok(()) => (),
Err(error) => {
debug!("failure during reconnect: {:?}", error);
return Some(Err(error));
}
};
return Some(Err(error));
}
};
let mut rpb_resp = match parse_from_bytes::<RpbListBucketsResp>(&response) {
Ok(parsed) => parsed,
Err(error) => {
match self.connection.reconnect() {
Ok(()) => (),
Err(error) => {
debug!("failure during reconnect: {:?}", error);
return Some(Err(error));
}
};
return Some(Err(RiakErr::ProtobufError(error)));
}
};
let resp_buckets = rpb_resp.take_buckets();
let mut buckets: Vec<Vec<u8>> = Vec::new();
for bucket in resp_buckets.into_iter() {
buckets.push(bucket);
}
self.first_request_made = true;
self.done = rpb_resp.get_done();
Some(Ok(buckets))
}
}
}
#[derive(Debug)]
pub struct KeyStream {
bucket: Vec<u8>,
connection: RiakConn,
done: bool,
first_request_made: bool,
}
impl KeyStream {
pub fn new(client: &mut Client, bucket: Vec<u8>) -> Result<KeyStream, RiakErr> {
let connection = match RiakConn::new(client.connection.peer_addr,
client.connection.timeout) {
Ok(connection) => connection,
Err(error) => return Err(error),
};
Ok(KeyStream {
bucket: bucket,
connection: connection,
done: false,
first_request_made: false,
})
}
pub fn all(&mut self) -> Result<Vec<Vec<u8>>, RiakErr> {
let mut keys: Vec<Vec<u8>> = Vec::new();
loop {
let new_keys = match self.next() {
Some(result) => {
match result {
Ok(new_keys) => new_keys,
Err(error) => return Err(error),
}
}
None => break,
};
keys.extend(new_keys);
}
Ok(keys)
}
pub fn next(&mut self) -> Option<Result<Vec<Vec<u8>>, RiakErr>> {
if self.done {
return None;
}
if self.first_request_made {
let response = match self.connection.receive(codes::RpbListKeysResp) {
Ok(response) => response,
Err(error) => return Some(Err(error)),
};
let mut rpb_resp = match parse_from_bytes::<RpbListKeysResp>(&response) {
Ok(parsed) => parsed,
Err(error) => {
match self.connection.reconnect() {
Ok(()) => (),
Err(error) => {
debug!("failure during reconnect: {:?}", error);
return Some(Err(error));
}
};
return Some(Err(RiakErr::ProtobufError(error)));
}
};
let resp_keys = rpb_resp.take_keys();
let mut keys: Vec<Vec<u8>> = Vec::new();
for key in resp_keys.into_iter() {
keys.push(key);
}
self.done = rpb_resp.get_done();
Some(Ok(keys))
} else {
let mut request = RpbListKeysReq::new();
request.set_bucket(self.bucket.clone());
request.set_timeout(self.connection.timeout);
let bytes = match request.write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Some(Err(RiakErr::ProtobufError(error))),
};
let response = match self.connection
.exchange(codes::RpbListKeysReq, codes::RpbListKeysResp, &bytes) {
Ok(response) => response,
Err(error) => {
match self.connection.reconnect() {
Ok(()) => (),
Err(error) => {
debug!("failure during reconnect: {:?}", error);
return Some(Err(error));
}
};
return Some(Err(error));
}
};
let mut rpb_resp = match parse_from_bytes::<RpbListKeysResp>(&response) {
Ok(parsed) => parsed,
Err(error) => {
match self.connection.reconnect() {
Ok(()) => (),
Err(error) => {
debug!("failure during reconnect: {:?}", error);
return Some(Err(error));
}
};
return Some(Err(RiakErr::ProtobufError(error)));
}
};
let resp_keys = rpb_resp.take_keys();
let mut keys: Vec<Vec<u8>> = Vec::new();
for key in resp_keys.into_iter() {
keys.push(key);
}
self.first_request_made = true;
self.done = rpb_resp.get_done();
Some(Ok(keys))
}
}
}
#[derive(Debug)]
pub struct SecondaryIndexStream {
connection: RiakConn,
done: bool,
first_request_made: bool,
request: IndexReq,
}
impl SecondaryIndexStream {
pub fn new(client: &mut Client, request: IndexReq) -> Result<SecondaryIndexStream, RiakErr> {
let connection = match RiakConn::new(client.connection.peer_addr,
client.connection.timeout) {
Ok(connection) => connection,
Err(error) => return Err(error),
};
Ok(SecondaryIndexStream {
connection: connection,
done: false,
first_request_made: false,
request: request,
})
}
pub fn all(&mut self) -> Result<Vec<IndexResp>, RiakErr> {
let mut responses: Vec<IndexResp> = Vec::new();
while !self.done {
match self.next() {
Some(Ok(resp)) => responses.push(resp),
Some(Err(error)) => return Err(error),
None => {
self.done = true;
}
};
}
Ok(responses)
}
pub fn next(&mut self) -> Option<Result<IndexResp, RiakErr>> {
if self.done {
return None;
}
if self.first_request_made {
let response = match self.connection.receive(codes::RpbIndexResp) {
Ok(rpb_index_resp) => rpb_index_resp,
Err(error) => return Some(Err(error)),
};
let rpb_index_resp = match parse_from_bytes::<RpbIndexResp>(&response) {
Ok(rpb_index_resp) => rpb_index_resp,
Err(error) => return Some(Err(RiakErr::ProtobufError(error))),
};
self.done = rpb_index_resp.get_done();
Some(Ok(IndexResp::new_from_rpb(rpb_index_resp)))
} else {
let bytes = match self.request.clone().write_to_bytes() {
Ok(bytes) => bytes,
Err(error) => return Some(Err(error)),
};
let response = match self.connection
.exchange(codes::RpbIndexReq, codes::RpbIndexResp, &bytes) {
Ok(rpb_index_resp) => rpb_index_resp,
Err(error) => return Some(Err(error)),
};
let rpb_index_resp = match parse_from_bytes::<RpbIndexResp>(&response) {
Ok(rpb_index_resp) => rpb_index_resp,
Err(error) => return Some(Err(RiakErr::ProtobufError(error))),
};
self.done = rpb_index_resp.get_done();
Some(Ok(IndexResp::new_from_rpb(rpb_index_resp)))
}
}
}