1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
//! Vinyl is
//!

#![deny(
    missing_docs,
    trivial_numeric_casts,
    unstable_features,
    unused_extern_crates,
    unused_features
)]
#![warn(unused_import_braces, unused_parens)]
#![cfg_attr(feature = "clippy", plugin(clippy(conf_file = "../../clippy.toml")))]
#![cfg_attr(
    feature = "cargo-clippy",
    allow(clippy::new_without_default, clippy::new_without_default)
)]
#![cfg_attr(
    feature = "cargo-clippy",
    warn(
        clippy::float_arithmetic,
        clippy::mut_mut,
        clippy::nonminimal_bool,
        clippy::option_map_unwrap_or,
        clippy::option_map_unwrap_or_else,
        clippy::unicode_not_nfc,
        clippy::use_self
    )
)]

/// protobuf generated files
pub mod proto;

pub use vinyl_core::query;
pub use vinyl_core::ToValue;
pub use vinyl_core::{DefaultValue, Index, Record};

#[macro_use]
extern crate failure;

use failure::Error;
use grpc::ClientStubExt;
use proto::transport_grpc::{Vinyl, VinylClient};
use protobuf;
use protobuf::parse_from_bytes;
use url::Url;
use vinyl_core::proto::transport::{Request, Response};

/// An instance of the vinyl DB. Holds metadata and a connection to the database server
pub struct DB {
    client: VinylClient,
    token: String,
}

impl DB {
    /// insert a record
    pub fn insert<T: protobuf::Message>(&self, msg: T) -> Result<T, Error> {
        let (msg, req) = vinyl_core::insert_request(msg)?;
        self.send_request(req)?;
        Ok(msg)
    }
    /// return records that match the provided query
    pub fn execute_query<T: protobuf::Message>(&self, q: query::Query) -> Result<Vec<T>, Error> {
        let req = vinyl_core::execute_query_request::<T>(q);
        let resp = self.send_request(req)?;
        let mut v = Vec::new();
        for record in resp.get_records().iter() {
            v.push(parse_from_bytes(record).unwrap());
        }
        Ok(v)
    }
    /// delete records that match the provided query
    pub fn delete_record<T: protobuf::Message, K: ToValue>(&self, pk: K) -> Result<(), Error> {
        let req = vinyl_core::delete_record::<T, K>(pk);
        self.send_request(req)?;
        Ok(())
    }

    fn send_request(&self, mut req: Request) -> Result<Response, Error> {
        req.set_token(self.token.to_string());
        let (_, resp, _) = self.client.query(grpc::RequestOptions::new(), req).wait()?;
        if !resp.error.is_empty() {
            Err(format_err!("{}", resp.error))
        } else {
            Ok(resp)
        }
    }
}

/// build connection details and metadata before connecting to the database server
pub struct ConnectionBuilder {
    connection_string: String,
    descriptor_bytes: Vec<u8>,
    records: Vec<Record>,
}

impl ConnectionBuilder {
    /// pass a connection string and descriptor
    pub fn new(connection_string: &str, descriptor_bytes: Vec<u8>) -> Self {
        Self {
            records: Vec::new(),
            descriptor_bytes,
            connection_string: connection_string.to_string(),
        }
    }
    /// add record data
    pub fn add_record(mut self, record: Record) -> Self {
        self.records.push(record);
        self
    }

    /// connect to the database server
    pub fn connect(self) -> Result<DB, Error> {
        let url = Url::parse(&self.connection_string)?;
        let addrs = url.socket_addrs(|| None)?;
        let addr = addrs.first().ok_or_else(|| {
            format_err!("Couldn't resolve an ip address for the provided hostname")
        })?;
        let client =
            VinylClient::new_plain(&addr.ip().to_string(), addr.port(), Default::default())?;

        let keyspace = url.path();
        let username = url.username();
        let password = url
            .password()
            .ok_or_else(|| format_err!("No password provided"))?;

        let login_request = vinyl_core::construct_login_request(
            self.descriptor_bytes,
            self.records,
            username,
            password,
            keyspace,
        )?;
        let resp = client.login(grpc::RequestOptions::new(), login_request);
        let (_, login_response, _) = resp.wait()?;

        Ok(DB {
            client,
            token: login_response.token,
        })
    }
}