1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
//! A Riak client for Rust.
//!
//! This client can be used to communicate with Riak clusters to send and receive objects
//! and other information. Operations are done through the `Client` struct and there are
//! several other structs designed to build data structures for sending and receiving
//! data from a Riak cluster.
//!
//! Examples:
//!
//! Storing an object is the most fundamental operation of Riak, it can be done like the following:
//!
//! ```
//! use riak::Client;
//! use riak::object::{ObjectContent, StoreObjectReq};
//!
//! // connect to Riak and ping the server
//! let mut riak = Client::new("10.0.0.2:8087").unwrap();
//! riak.ping().unwrap();
//!
//! // prepare an object
//! let contents = ObjectContent::new("I am the night, I am Batman!".as_bytes());
//!
//! // build a request to store the object
//! let bucket_name = "superheroes".to_string();
//! let mut req = StoreObjectReq::new(&bucket_name, &contents);
//! req.set_key("batman");
//!
//! // store the object
//! riak.store_object(&req).unwrap();
//! ```

#[macro_use]
extern crate log;
extern crate protobuf;

pub mod bucket;
pub mod errors;
pub mod object;

mod connection;
mod rpb;

use bucket::BucketProps;
use errors::RiakError;
use connection::RiakConn;
use object::{FetchObjectReq, StoreObjectReq, FetchObjectResp};
use protobuf::{Message, parse_from_bytes};
use rpb::codes;
use rpb::riak::{RpbGetBucketReq, RpbGetBucketResp};
use rpb::riak_kv::{RpbGetResp, RpbListBucketsResp};
use rpb::utils::{rpb_get_resp_to_fetch_object_resp, rpb_bucket_props_to_bucket_props,
                 RpbGenerator, RpbGeneratorID};
use std::net::ToSocketAddrs;
use std::string::String;

/// `Client` Represents a connection to a Riak server's Protocol Buffers API.
///
/// For more information: https://docs.basho.com/riak/kv/latest/developing/api/protocol-buffers/
#[derive(Debug)]
pub struct Client {
    conn: RiakConn,
}

impl Client {
    /// Constructs a new `Client`.
    ///
    /// # Examples
    ///
    /// ```
    /// use riak::Client;
    ///
    /// let mut riak = Client::new("10.0.0.2:8087").unwrap();
    /// riak.ping().unwrap();
    /// ```
    ///
    /// # Errors
    ///
    /// TODO
    pub fn new<A: ToSocketAddrs>(addr: A) -> Result<Client, RiakError> {
        debug!("creating a new Riak client");
        let conn = match RiakConn::new(addr) {
            Ok(c) => c,
            Err(err) => return Err(err),
        };

        Ok(Client { conn: conn })
    }

    /// Reconnect to the Riak server originally connected to when this client was initiated.
    ///
    /// # Examples
    ///
    /// ```
    /// use riak::Client;
    ///
    /// let mut riak = Client::new("10.0.0.2:8087").unwrap();
    /// riak.reconnect().unwrap();
    /// ```
    ///
    /// # Errors
    ///
    /// TODO
    pub fn reconnect(&mut self) -> Result<(), RiakError> {
        self.conn.reconnect()
    }

    /// Sends a ping message to Riak and returns a Result.
    ///
    /// # Examples
    ///
    /// ```
    /// use riak::Client;
    ///
    /// let mut riak = Client::new("10.0.0.2:8087").unwrap();
    /// riak.ping().unwrap();
    /// ```
    ///
    /// # Errors
    ///
    /// TODO
    pub fn ping(&mut self) -> Result<(), RiakError> {
        let ping_data: Vec<u8> = vec![];
        match self.conn.send(codes::RpbPingReq, codes::RpbPingResp, &ping_data) {
            Ok(_) => Ok(()),
            Err(err) => Err(err),
        }
    }

    /// Produces a list of bucket names.
    ///
    /// Note: This operation is expensive for the Riak server and should be
    /// used as rarely as possible.
    ///
    /// # Examples
    ///
    /// ```
    /// use riak::Client;
    ///
    /// let mut riak = Client::new("10.0.0.2:8087").unwrap();
    /// let buckets = riak.list_buckets().unwrap();
    ///
    /// for bucket in buckets.iter() {
    ///     println!("found bucket named {}", bucket);
    /// }
    /// ```
    ///
    /// # Errors
    ///
    /// TODO
    pub fn list_buckets(&mut self) -> Result<Vec<String>, RiakError> {
        let response = match self.conn.send(codes::RpbListBucketsReq,
                                            codes::RpbListBucketsResp,
                                            &Vec::new()) {
            Ok(resp) => resp,
            Err(err) => {
                debug!("failed to list buckets error was: {:?}", err);
                return Err(err);
            }
        };

        let buckets_resp = match parse_from_bytes::<RpbListBucketsResp>(&response) {
            Ok(parsed) => parsed,
            Err(err) => {
                debug!("failed to parse RpbListBucketsResp error was: {:?}", err);
                return Err(RiakError::ProtobufError(err));
            }
        };

        let buckets = buckets_resp.get_buckets();
        debug!("found buckets: {:?}", buckets);
        let mut bucket_names: Vec<String> = Vec::new();
        for b in buckets {
            let bucket = b.clone();
            let bucket_name = String::from_utf8(bucket).unwrap(); // FIXME
            debug!("found a bucket named: {}", bucket_name);
            bucket_names.push(bucket_name);
        }

        Ok(bucket_names)
    }

    /// Sets the properties for a bucket given a bucket name.
    ///
    /// # Examples
    ///
    /// ```
    /// use riak::Client;
    /// use riak::bucket::BucketProps;
    ///
    /// let mut riak = Client::new("10.0.0.2:8087").unwrap();
    /// let mut bucket_props = BucketProps::new();
    /// bucket_props.set_backend("bitcask");
    ///
    /// riak.set_bucket_properties("superheroes", &bucket_props).unwrap();
    /// ```
    ///
    /// # Errors
    ///
    /// TODO
    pub fn set_bucket_properties(&mut self,
                                 bucket_name: &str,
                                 bucket_props: &BucketProps)
                                 -> Result<(), RiakError> {
        let bytes = match bucket_props.generate_rpb(bucket_name) {
            Ok(b) => b,
            Err(err) => return Err(err),
        };
        match self.conn.send(codes::RpbSetBucketReq, codes::RpbSetBucketResp, &bytes) {
            Ok(_) => Ok(()),
            Err(err) => Err(err),
        }
    }

    /// Retrieves bucket properties for a bucket given a bucket name.
    ///
    /// # Examples
    ///
    /// ```
    /// use riak::Client;
    ///
    /// let mut riak = Client::new("10.0.0.2:8087").unwrap();
    /// let bucket_props = riak.get_bucket_properties("superheroes").unwrap();
    /// println!("bucket properties for superheroes: {:?}", bucket_props);
    /// ```
    ///
    /// # Errors
    ///
    /// TODO
    pub fn get_bucket_properties(&mut self, bucket_name: &str) -> Result<BucketProps, RiakError> {
        let mut req = RpbGetBucketReq::new();
        req.set_bucket(bucket_name.to_string().into_bytes());
        // TODO implement field_type
        let serialized = match req.write_to_bytes() {
            Ok(s) => s,
            Err(err) => return Err(RiakError::ProtobufError(err)),
        };
        match self.conn.send(codes::RpbGetBucketReq, codes::RpbGetBucketResp, &serialized) {
            Ok(response) => {
                match parse_from_bytes::<RpbGetBucketResp>(&response) {
                    Ok(mut rpb_get_bucket_resp) => {
                        let rpb_bucket_props = rpb_get_bucket_resp.take_props();
                        let bucket_props = rpb_bucket_props_to_bucket_props(&rpb_bucket_props);
                        Ok(bucket_props)
                    }
                    Err(err) => {
                        debug!("failed to parse RpbGetBucketResp error was: {:?}", err);
                        Err(RiakError::ProtobufError(err))
                    }
                }
            }
            Err(err) => Err(err),
        }
    }

    /// Stores an object on the Riak server.
    ///
    /// # Examples
    ///
    /// ```
    /// use riak::Client;
    /// use riak::object::{ObjectContent, StoreObjectReq};
    ///
    /// let mut riak = Client::new("10.0.0.2:8087").unwrap();
    ///
    /// let contents = ObjectContent::new("I am the night, I am Batman!".as_bytes());
    /// let mut req = StoreObjectReq::new("superheroes", &contents);
    /// req.set_key("batman");
    ///
    /// riak.store_object(&req).unwrap();
    /// ```
    ///
    /// # Errors
    ///
    /// TODO
    pub fn store_object(&mut self, req: &StoreObjectReq) -> Result<(), RiakError> {
        let bytes = match req.generate_rpb() {
            Ok(b) => b,
            Err(err) => return Err(err),
        };
        match self.conn.send(codes::RpbPutReq, codes::RpbPutResp, &bytes) {
            Ok(_) => Ok(()),
            Err(err) => Err(err),
        }
    }

    /// Fetches an object from the Riak server.
    ///
    /// # Examples
    ///
    /// ```
    /// use riak::Client;
    /// use riak::object::FetchObjectReq;
    ///
    /// let mut riak = Client::new("10.0.0.2:8087").unwrap();
    ///
    /// let req = FetchObjectReq::new("superheroes", "batman");
    /// let object = riak.fetch_object(&req).unwrap();
    /// println!("batman object contained: {:?}", object);
    /// ```
    ///
    /// # Errors
    ///
    /// TODO
    pub fn fetch_object(&mut self, req: &FetchObjectReq) -> Result<FetchObjectResp, RiakError> {
        let bytes = match req.generate_rpb() {
            Ok(b) => b,
            Err(err) => return Err(err),
        };
        match self.conn.send(codes::RpbGetReq, codes::RpbGetResp, &bytes) {
            Ok(response) => {
                match parse_from_bytes::<RpbGetResp>(&response) {
                    Ok(mut rpb_get_resp) => {
                        let fetch_object_resp =
                            rpb_get_resp_to_fetch_object_resp(&mut rpb_get_resp);
                        Ok(fetch_object_resp)
                    }
                    Err(err) => {
                        debug!("failed to parse RpbGetResp error was: {:?}", err);
                        Err(RiakError::ProtobufError(err))
                    }
                }
            }
            Err(err) => Err(err),
        }
    }
}