use std::str::FromStr;
use futures::{Future, IntoFuture, Stream};
use hyper::client::connect::Connect;
use hyper::{StatusCode, Uri};
use serde_derive::{Deserialize, Serialize};
use serde_json;
use crate::client::{Client, ClusterInfo, Response};
use crate::error::{ApiError, Error};
use crate::first_ok::first_ok;
#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
struct AuthStatus {
pub enabled: bool,
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub enum AuthChange {
Changed,
Unchanged,
}
#[derive(Debug, Clone, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct User {
#[serde(rename = "user")]
name: String,
roles: Vec<String>,
}
impl User {
pub fn name(&self) -> &str {
&self.name
}
pub fn role_names(&self) -> &[String] {
&self.roles
}
}
#[derive(Debug, Clone, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct UserDetail {
#[serde(rename = "user")]
name: String,
roles: Vec<Role>,
}
impl UserDetail {
pub fn name(&self) -> &str {
&self.name
}
pub fn roles(&self) -> &[Role] {
&self.roles
}
}
#[derive(Debug, Clone, Deserialize, Eq, Hash, PartialEq, Serialize)]
struct Users {
users: Option<Vec<UserDetail>>,
}
#[derive(Debug, Deserialize, Clone, Eq, Hash, PartialEq, Serialize)]
pub struct NewUser {
#[serde(rename = "user")]
name: String,
password: String,
#[serde(skip_serializing_if = "Option::is_none")]
roles: Option<Vec<String>>,
}
impl NewUser {
pub fn new<N, P>(name: N, password: P) -> Self
where
N: Into<String>,
P: Into<String>,
{
NewUser {
name: name.into(),
password: password.into(),
roles: None,
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn add_role<R>(&mut self, role: R)
where
R: Into<String>,
{
match self.roles {
Some(ref mut roles) => roles.push(role.into()),
None => self.roles = Some(vec![role.into()]),
}
}
}
#[derive(Debug, Deserialize, Clone, Eq, Hash, PartialEq, Serialize)]
pub struct UserUpdate {
#[serde(rename = "user")]
name: String,
#[serde(skip_serializing_if = "Option::is_none")]
password: Option<String>,
#[serde(rename = "grant")]
#[serde(skip_serializing_if = "Option::is_none")]
grants: Option<Vec<String>>,
#[serde(rename = "revoke")]
#[serde(skip_serializing_if = "Option::is_none")]
revocations: Option<Vec<String>>,
}
impl UserUpdate {
pub fn new<N>(name: N) -> Self
where
N: Into<String>,
{
UserUpdate {
name: name.into(),
password: None,
grants: None,
revocations: None,
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn update_password<P>(&mut self, password: P)
where
P: Into<String>,
{
self.password = Some(password.into());
}
pub fn grant_role<R>(&mut self, role: R)
where
R: Into<String>,
{
match self.grants {
Some(ref mut grants) => grants.push(role.into()),
None => self.grants = Some(vec![role.into()]),
}
}
pub fn revoke_role<R>(&mut self, role: R)
where
R: Into<String>,
{
match self.revocations {
Some(ref mut revocations) => revocations.push(role.into()),
None => self.revocations = Some(vec![role.into()]),
}
}
}
#[derive(Debug, Deserialize, Clone, Eq, Hash, PartialEq, Serialize)]
pub struct Role {
#[serde(rename = "role")]
name: String,
permissions: Permissions,
}
impl Role {
pub fn new<N>(name: N) -> Self
where
N: Into<String>,
{
Role {
name: name.into(),
permissions: Permissions::new(),
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn grant_kv_read_permission<K>(&mut self, key: K)
where
K: Into<String>,
{
self.permissions.kv.modify_read_permission(key)
}
pub fn grant_kv_write_permission<K>(&mut self, key: K)
where
K: Into<String>,
{
self.permissions.kv.modify_write_permission(key)
}
pub fn kv_read_permissions(&self) -> &[String] {
match self.permissions.kv.read {
Some(ref read) => read,
None => &[],
}
}
pub fn kv_write_permissions(&self) -> &[String] {
match self.permissions.kv.write {
Some(ref write) => write,
None => &[],
}
}
}
#[derive(Debug, Clone, Deserialize, Eq, Hash, PartialEq, Serialize)]
struct Roles {
roles: Option<Vec<Role>>,
}
#[derive(Debug, Deserialize, Clone, Eq, Hash, PartialEq, Serialize)]
pub struct RoleUpdate {
#[serde(rename = "role")]
name: String,
#[serde(rename = "grant")]
#[serde(skip_serializing_if = "Option::is_none")]
grants: Option<Permissions>,
#[serde(rename = "revoke")]
#[serde(skip_serializing_if = "Option::is_none")]
revocations: Option<Permissions>,
}
impl RoleUpdate {
pub fn new<R>(role: R) -> Self
where
R: Into<String>,
{
RoleUpdate {
name: role.into(),
grants: None,
revocations: None,
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn grant_kv_read_permission<K>(&mut self, key: K)
where
K: Into<String>,
{
match self.grants {
Some(ref mut grants) => grants.kv.modify_read_permission(key),
None => {
let mut permissions = Permissions::new();
permissions.kv.modify_read_permission(key);
self.grants = Some(permissions);
}
}
}
pub fn grant_kv_write_permission<K>(&mut self, key: K)
where
K: Into<String>,
{
match self.grants {
Some(ref mut grants) => grants.kv.modify_write_permission(key),
None => {
let mut permissions = Permissions::new();
permissions.kv.modify_write_permission(key);
self.grants = Some(permissions);
}
}
}
pub fn revoke_kv_read_permission<K>(&mut self, key: K)
where
K: Into<String>,
{
match self.revocations {
Some(ref mut revocations) => revocations.kv.modify_read_permission(key),
None => {
let mut permissions = Permissions::new();
permissions.kv.modify_read_permission(key);
self.revocations = Some(permissions);
}
}
}
pub fn revoke_kv_write_permission<K>(&mut self, key: K)
where
K: Into<String>,
{
match self.revocations {
Some(ref mut revocations) => revocations.kv.modify_write_permission(key),
None => {
let mut permissions = Permissions::new();
permissions.kv.modify_write_permission(key);
self.revocations = Some(permissions);
}
}
}
}
#[derive(Debug, Deserialize, Clone, Eq, Hash, PartialEq, Serialize)]
struct Permissions {
kv: Permission,
}
impl Permissions {
fn new() -> Self {
Permissions {
kv: Permission::new(),
}
}
}
#[derive(Debug, Deserialize, Clone, Eq, Hash, PartialEq, Serialize)]
struct Permission {
#[serde(skip_serializing_if = "Option::is_none")]
read: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
write: Option<Vec<String>>,
}
impl Permission {
fn new() -> Self {
Permission {
read: None,
write: None,
}
}
fn modify_read_permission<K>(&mut self, key: K)
where
K: Into<String>,
{
match self.read {
Some(ref mut read) => read.push(key.into()),
None => self.read = Some(vec![key.into()]),
}
}
fn modify_write_permission<K>(&mut self, key: K)
where
K: Into<String>,
{
match self.write {
Some(ref mut write) => write.push(key.into()),
None => self.write = Some(vec![key.into()]),
}
}
}
pub fn create_role<C>(
client: &Client<C>,
role: Role,
) -> impl Future<Item = Response<Role>, Error = Vec<Error>> + Send
where
C: Clone + Connect + Sync + 'static,
{
let http_client = client.http_client().clone();
first_ok(client.endpoints().to_vec(), move |member| {
let body = serde_json::to_string(&role)
.map_err(Error::from)
.into_future();
let url = build_url(member, &format!("/roles/{}", role.name));
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let params = uri.join(body);
let http_client = http_client.clone();
let response =
params.and_then(move |(uri, body)| http_client.put(uri, body).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| match status {
StatusCode::OK | StatusCode::CREATED => {
match serde_json::from_slice::<Role>(body) {
Ok(data) => Ok(Response { data, cluster_info }),
Err(error) => Err(Error::Serialization(error)),
}
}
status => Err(Error::UnexpectedStatus(status)),
})
})
})
}
pub fn create_user<C>(
client: &Client<C>,
user: NewUser,
) -> impl Future<Item = Response<User>, Error = Vec<Error>> + Send
where
C: Clone + Connect + Sync + 'static,
{
let http_client = client.http_client().clone();
first_ok(client.endpoints().to_vec(), move |member| {
let body = serde_json::to_string(&user)
.map_err(Error::from)
.into_future();
let url = build_url(member, &format!("/users/{}", user.name));
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let params = uri.join(body);
let http_client = http_client.clone();
let response =
params.and_then(move |(uri, body)| http_client.put(uri, body).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| match status {
StatusCode::OK | StatusCode::CREATED => {
match serde_json::from_slice::<User>(body) {
Ok(data) => Ok(Response { data, cluster_info }),
Err(error) => Err(Error::Serialization(error)),
}
}
status => Err(Error::UnexpectedStatus(status)),
})
})
})
}
pub fn delete_role<C, N>(
client: &Client<C>,
name: N,
) -> impl Future<Item = Response<()>, Error = Vec<Error>> + Send
where
C: Clone + Connect + Sync + 'static,
N: Into<String>,
{
let http_client = client.http_client().clone();
let name = name.into();
first_ok(client.endpoints().to_vec(), move |member| {
let url = build_url(member, &format!("/roles/{}", name));
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.delete(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
if status == StatusCode::OK {
Ok(Response {
data: (),
cluster_info,
})
} else {
Err(Error::UnexpectedStatus(status))
}
})
})
}
pub fn delete_user<C, N>(
client: &Client<C>,
name: N,
) -> impl Future<Item = Response<()>, Error = Vec<Error>> + Send
where
C: Clone + Connect + Sync + 'static,
N: Into<String>,
{
let http_client = client.http_client().clone();
let name = name.into();
first_ok(client.endpoints().to_vec(), move |member| {
let url = build_url(member, &format!("/users/{}", name));
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.delete(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
if status == StatusCode::OK {
Ok(Response {
data: (),
cluster_info,
})
} else {
Err(Error::UnexpectedStatus(status))
}
})
})
}
pub fn disable<C>(
client: &Client<C>,
) -> impl Future<Item = Response<AuthChange>, Error = Vec<Error>> + Send
where
C: Clone + Connect + Sync + 'static,
{
let http_client = client.http_client().clone();
first_ok(client.endpoints().to_vec(), move |member| {
let url = build_url(member, "/enable");
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.delete(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
match status {
StatusCode::OK => Ok(Response {
data: AuthChange::Changed,
cluster_info,
}),
StatusCode::CONFLICT => Ok(Response {
data: AuthChange::Unchanged,
cluster_info,
}),
_ => Err(Error::UnexpectedStatus(status)),
}
})
})
}
pub fn enable<C>(
client: &Client<C>,
) -> impl Future<Item = Response<AuthChange>, Error = Vec<Error>> + Send
where
C: Clone + Connect + Sync + 'static,
{
let http_client = client.http_client().clone();
first_ok(client.endpoints().to_vec(), move |member| {
let url = build_url(member, "/enable");
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let http_client = http_client.clone();
let response =
uri.and_then(move |uri| http_client.put(uri, "".to_owned()).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
match status {
StatusCode::OK => Ok(Response {
data: AuthChange::Changed,
cluster_info,
}),
StatusCode::CONFLICT => Ok(Response {
data: AuthChange::Unchanged,
cluster_info,
}),
_ => return Err(Error::UnexpectedStatus(status)),
}
})
})
}
pub fn get_role<C, N>(
client: &Client<C>,
name: N,
) -> impl Future<Item = Response<Role>, Error = Vec<Error>> + Send
where
C: Clone + Connect + Sync + 'static,
N: Into<String>,
{
let http_client = client.http_client().clone();
let name = name.into();
first_ok(client.endpoints().to_vec(), move |member| {
let url = build_url(member, &format!("/roles/{}", name));
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::OK {
match serde_json::from_slice::<Role>(body) {
Ok(data) => Ok(Response { data, cluster_info }),
Err(error) => Err(Error::Serialization(error)),
}
} else {
Err(Error::UnexpectedStatus(status))
}
})
})
})
}
pub fn get_roles<C>(
client: &Client<C>,
) -> impl Future<Item = Response<Vec<Role>>, Error = Vec<Error>> + Send
where
C: Clone + Connect + Sync + 'static,
{
let http_client = client.http_client().clone();
first_ok(client.endpoints().to_vec(), move |member| {
let url = build_url(member, "/roles");
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::OK {
match serde_json::from_slice::<Roles>(body) {
Ok(roles) => {
let data = roles.roles.unwrap_or_else(|| Vec::with_capacity(0));
Ok(Response { data, cluster_info })
}
Err(error) => Err(Error::Serialization(error)),
}
} else {
Err(Error::UnexpectedStatus(status))
}
})
})
})
}
pub fn get_user<C, N>(
client: &Client<C>,
name: N,
) -> impl Future<Item = Response<UserDetail>, Error = Vec<Error>> + Send
where
C: Clone + Connect + Sync + 'static,
N: Into<String>,
{
let http_client = client.http_client().clone();
let name = name.into();
first_ok(client.endpoints().to_vec(), move |member| {
let url = build_url(member, &format!("/users/{}", name));
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::OK {
match serde_json::from_slice::<UserDetail>(body) {
Ok(data) => Ok(Response { data, cluster_info }),
Err(error) => Err(Error::Serialization(error)),
}
} else {
Err(Error::UnexpectedStatus(status))
}
})
})
})
}
pub fn get_users<C>(
client: &Client<C>,
) -> impl Future<Item = Response<Vec<UserDetail>>, Error = Vec<Error>> + Send
where
C: Clone + Connect + Sync + 'static,
{
let http_client = client.http_client().clone();
first_ok(client.endpoints().to_vec(), move |member| {
let url = build_url(member, "/users");
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::OK {
match serde_json::from_slice::<Users>(body) {
Ok(users) => {
let data = users.users.unwrap_or_else(|| Vec::with_capacity(0));
Ok(Response { data, cluster_info })
}
Err(error) => Err(Error::Serialization(error)),
}
} else {
Err(Error::UnexpectedStatus(status))
}
})
})
})
}
pub fn status<C>(
client: &Client<C>,
) -> impl Future<Item = Response<bool>, Error = Vec<Error>> + Send
where
C: Clone + Connect + Sync + 'static,
{
let http_client = client.http_client().clone();
first_ok(client.endpoints().to_vec(), move |member| {
let url = build_url(member, "/enable");
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::OK {
match serde_json::from_slice::<AuthStatus>(body) {
Ok(data) => Ok(Response {
data: data.enabled,
cluster_info,
}),
Err(error) => Err(Error::Serialization(error)),
}
} else {
match serde_json::from_slice::<ApiError>(body) {
Ok(error) => Err(Error::Api(error)),
Err(error) => Err(Error::Serialization(error)),
}
}
})
})
})
}
pub fn update_role<C>(
client: &Client<C>,
role: RoleUpdate,
) -> impl Future<Item = Response<Role>, Error = Vec<Error>> + Send
where
C: Clone + Connect + Sync + 'static,
{
let http_client = client.http_client().clone();
first_ok(client.endpoints().to_vec(), move |member| {
let body = serde_json::to_string(&role)
.map_err(Error::from)
.into_future();
let url = build_url(member, &format!("/roles/{}", role.name));
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let params = uri.join(body);
let http_client = http_client.clone();
let response =
params.and_then(move |(uri, body)| http_client.put(uri, body).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::OK {
match serde_json::from_slice::<Role>(body) {
Ok(data) => Ok(Response { data, cluster_info }),
Err(error) => Err(Error::Serialization(error)),
}
} else {
Err(Error::UnexpectedStatus(status))
}
})
})
})
}
pub fn update_user<C>(
client: &Client<C>,
user: UserUpdate,
) -> impl Future<Item = Response<User>, Error = Vec<Error>> + Send
where
C: Clone + Connect + Sync + 'static,
{
let http_client = client.http_client().clone();
first_ok(client.endpoints().to_vec(), move |member| {
let body = serde_json::to_string(&user)
.map_err(Error::from)
.into_future();
let url = build_url(member, &format!("/users/{}", user.name));
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let params = uri.join(body);
let http_client = http_client.clone();
let response =
params.and_then(move |(uri, body)| http_client.put(uri, body).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::OK {
match serde_json::from_slice::<User>(body) {
Ok(data) => Ok(Response { data, cluster_info }),
Err(error) => Err(Error::Serialization(error)),
}
} else {
Err(Error::UnexpectedStatus(status))
}
})
})
})
}
fn build_url(endpoint: &Uri, path: &str) -> String {
format!("{}v2/auth{}", endpoint, path)
}