use super::Client;
use errors::CoreError;
use event_loop::CoreFuture;
use futures::Future;
use futures::future::{self, Either, Loop};
use routing::{Action, ClientError, EntryAction, EntryError, MutableData, PermissionSet, User,
Value, XorName};
use rust_sodium::crypto::sign;
use std::collections::BTreeMap;
use utils::FutureExt;
const MAX_ATTEMPTS: usize = 10;
pub fn put_mdata<T: 'static>(client: &Client<T>, data: MutableData) -> Box<CoreFuture<()>> {
let client2 = client.clone();
client
.put_mdata(data.clone())
.or_else(move |error| match error {
CoreError::RoutingClientError(ClientError::DataExists) => {
Either::A(update_mdata(&client2, data))
}
error => Either::B(future::err(error)),
})
.into_box()
}
pub fn mutate_mdata_entries<T: 'static>(
client: &Client<T>,
name: XorName,
tag: u64,
actions: BTreeMap<Vec<u8>, EntryAction>,
) -> Box<CoreFuture<()>> {
let state = (0, actions);
let client = client.clone();
future::loop_fn(state, move |(attempts, actions)| {
client
.mutate_mdata_entries(name, tag, actions.clone())
.map(|_| Loop::Break(()))
.or_else(move |error| match error {
CoreError::RoutingClientError(ClientError::InvalidEntryActions(errors)) => {
if attempts < MAX_ATTEMPTS {
let actions = fix_entry_actions(actions, &errors);
Ok(Loop::Continue((attempts + 1, actions)))
} else {
Err(CoreError::RoutingClientError(
ClientError::InvalidEntryActions(errors),
))
}
}
CoreError::RequestTimeout => {
if attempts < MAX_ATTEMPTS {
Ok(Loop::Continue((attempts + 1, actions)))
} else {
Err(CoreError::RequestTimeout)
}
}
error => Err(error),
})
}).into_box()
}
pub fn set_mdata_user_permissions<T: 'static>(
client: &Client<T>,
name: XorName,
tag: u64,
user: User,
permissions: PermissionSet,
version: u64,
) -> Box<CoreFuture<()>> {
let state = (0, version);
let client = client.clone();
future::loop_fn(state, move |(attempts, version)| {
client
.set_mdata_user_permissions(name, tag, user, permissions, version)
.map(|_| Loop::Break(()))
.or_else(move |error| match error {
CoreError::RoutingClientError(ClientError::InvalidSuccessor(current_version)) => {
if attempts < MAX_ATTEMPTS {
Ok(Loop::Continue((attempts + 1, current_version + 1)))
} else {
Err(error)
}
}
CoreError::RequestTimeout => {
if attempts < MAX_ATTEMPTS {
Ok(Loop::Continue((attempts + 1, version)))
} else {
Err(CoreError::RequestTimeout)
}
}
error => Err(error),
})
}).into_box()
}
pub fn del_mdata_user_permissions<T: 'static>(
client: &Client<T>,
name: XorName,
tag: u64,
user: User,
version: u64,
) -> Box<CoreFuture<()>> {
let state = (0, version);
let client = client.clone();
future::loop_fn(state, move |(attempts, version)| {
client
.del_mdata_user_permissions(name, tag, user, version)
.map(|_| Loop::Break(()))
.or_else(move |error| match error {
CoreError::RoutingClientError(ClientError::NoSuchKey) => Ok(Loop::Break(())),
CoreError::RoutingClientError(ClientError::InvalidSuccessor(current_version)) => {
if attempts < MAX_ATTEMPTS {
Ok(Loop::Continue((attempts + 1, current_version + 1)))
} else {
Err(error)
}
}
CoreError::RequestTimeout => {
if attempts < MAX_ATTEMPTS {
Ok(Loop::Continue((attempts + 1, version)))
} else {
Err(CoreError::RequestTimeout)
}
}
error => Err(error),
})
}).into_box()
}
fn update_mdata<T: 'static>(client: &Client<T>, data: MutableData) -> Box<CoreFuture<()>> {
let client2 = client.clone();
let client3 = client.clone();
let f0 = client.list_mdata_entries(*data.name(), data.tag());
let f1 = client.list_mdata_permissions(*data.name(), data.tag());
let f2 = client.get_mdata_version(*data.name(), data.tag());
f0.join3(f1, f2)
.and_then(move |(entries, permissions, version)| {
update_mdata_permissions(
&client2,
*data.name(),
data.tag(),
&permissions,
data.permissions().clone(),
version + 1,
).map(move |_| (data, entries))
})
.and_then(move |(data, entries)| {
update_mdata_entries(
&client3,
*data.name(),
data.tag(),
&entries,
data.entries().clone(),
)
})
.into_box()
}
fn update_mdata_entries<T: 'static>(
client: &Client<T>,
name: XorName,
tag: u64,
current_entries: &BTreeMap<Vec<u8>, Value>,
desired_entries: BTreeMap<Vec<u8>, Value>,
) -> Box<CoreFuture<()>> {
let actions = desired_entries
.into_iter()
.filter_map(|(key, value)| if let Some(current_value) =
current_entries.get(&key)
{
if current_value.entry_version <= value.entry_version {
Some((key, EntryAction::Update(value)))
} else {
None
}
} else {
Some((key, EntryAction::Ins(value)))
})
.collect();
mutate_mdata_entries(client, name, tag, actions)
}
fn update_mdata_permissions<T: 'static>(
client: &Client<T>,
name: XorName,
tag: u64,
current_permissions: &BTreeMap<User, PermissionSet>,
desired_permissions: BTreeMap<User, PermissionSet>,
version: u64,
) -> Box<CoreFuture<()>> {
let permissions: Vec<_> = desired_permissions
.into_iter()
.map(|(user, desired_set)| if let Some(current_set) =
current_permissions.get(&user)
{
(user, union_permission_sets(current_set, &desired_set))
} else {
(user, desired_set)
})
.collect();
let state = (client.clone(), permissions, version);
future::loop_fn(state, move |(client, mut permissions, version)| {
if let Some((user, set)) = permissions.pop() {
let f = set_mdata_user_permissions(&client, name, tag, user, set, version)
.map(move |_| Loop::Continue((client, permissions, version + 1)));
Either::A(f)
} else {
Either::B(future::ok(Loop::Break(())))
}
}).into_box()
}
fn fix_entry_actions(
actions: BTreeMap<Vec<u8>, EntryAction>,
errors: &BTreeMap<Vec<u8>, EntryError>,
) -> BTreeMap<Vec<u8>, EntryAction> {
actions
.into_iter()
.filter_map(|(key, action)| if let Some(error) = errors.get(&key) {
if let Some(action) = fix_entry_action(action, error) {
Some((key, action))
} else {
None
}
} else {
Some((key, action))
})
.collect()
}
fn fix_entry_action(action: EntryAction, error: &EntryError) -> Option<EntryAction> {
match (action, *error) {
(EntryAction::Ins(value), EntryError::EntryExists(current_version)) |
(EntryAction::Update(value), EntryError::InvalidSuccessor(current_version)) => {
Some(EntryAction::Update(Value {
content: value.content,
entry_version: current_version + 1,
}))
}
(EntryAction::Update(value), EntryError::NoSuchEntry) => Some(EntryAction::Ins(value)),
(EntryAction::Del(_), EntryError::NoSuchEntry) => None,
(EntryAction::Del(_), EntryError::InvalidSuccessor(current_version)) => {
Some(EntryAction::Del(current_version + 1))
}
(action, _) => Some(action),
}
}
fn union_permission_sets(a: &PermissionSet, b: &PermissionSet) -> PermissionSet {
let actions = [
Action::Insert,
Action::Update,
Action::Delete,
Action::ManagePermissions,
];
actions.into_iter().fold(
PermissionSet::new(),
|set, &action| match (
a.is_allowed(
action,
),
b.is_allowed(
action,
),
) {
(Some(true), _) | (_, Some(true)) => set.allow(action),
(Some(false), _) | (_, Some(false)) => set.deny(action),
_ => set,
},
)
}
pub fn ins_auth_key<T: 'static>(
client: &Client<T>,
key: sign::PublicKey,
version: u64,
) -> Box<CoreFuture<()>> {
let state = (0, version);
let client = client.clone();
future::loop_fn(state, move |(attempts, version)| {
client
.ins_auth_key(key, version)
.map(|_| Loop::Break(()))
.or_else(move |error| match error {
CoreError::RoutingClientError(ClientError::InvalidSuccessor(current_version)) => {
if attempts < MAX_ATTEMPTS {
Ok(Loop::Continue((attempts + 1, current_version + 1)))
} else {
Err(error)
}
}
CoreError::RequestTimeout => {
if attempts < MAX_ATTEMPTS {
Ok(Loop::Continue((attempts + 1, version)))
} else {
Err(CoreError::RequestTimeout)
}
}
error => Err(error),
})
}).into_box()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fix_entry_actions() {
let mut actions = BTreeMap::new();
let _ = actions.insert(
vec![0],
EntryAction::Ins(Value {
content: vec![0],
entry_version: 0,
}),
);
let _ = actions.insert(
vec![1],
EntryAction::Ins(Value {
content: vec![1],
entry_version: 0,
}),
);
let _ = actions.insert(
vec![2],
EntryAction::Update(Value {
content: vec![2],
entry_version: 1,
}),
);
let _ = actions.insert(
vec![3],
EntryAction::Update(Value {
content: vec![3],
entry_version: 1,
}),
);
let _ = actions.insert(
vec![4],
EntryAction::Update(Value {
content: vec![4],
entry_version: 1,
}),
);
let _ = actions.insert(vec![5], EntryAction::Del(1));
let _ = actions.insert(vec![6], EntryAction::Del(1));
let _ = actions.insert(vec![7], EntryAction::Del(1));
let mut errors = BTreeMap::new();
let _ = errors.insert(vec![1], EntryError::EntryExists(2));
let _ = errors.insert(vec![3], EntryError::NoSuchEntry);
let _ = errors.insert(vec![4], EntryError::InvalidSuccessor(2));
let _ = errors.insert(vec![6], EntryError::NoSuchEntry);
let _ = errors.insert(vec![7], EntryError::InvalidSuccessor(2));
let actions = fix_entry_actions(actions, &errors);
assert_eq!(
*unwrap!(actions.get([0].as_ref())),
EntryAction::Ins(Value {
content: vec![0],
entry_version: 0,
})
);
assert_eq!(
*unwrap!(actions.get([1].as_ref())),
EntryAction::Update(Value {
content: vec![1],
entry_version: 3,
})
);
assert_eq!(
*unwrap!(actions.get([2].as_ref())),
EntryAction::Update(Value {
content: vec![2],
entry_version: 1,
})
);
assert_eq!(
*unwrap!(actions.get([3].as_ref())),
EntryAction::Ins(Value {
content: vec![3],
entry_version: 1,
})
);
assert_eq!(
*unwrap!(actions.get([4].as_ref())),
EntryAction::Update(Value {
content: vec![4],
entry_version: 3,
})
);
assert_eq!(*unwrap!(actions.get([5].as_ref())), EntryAction::Del(1));
assert!(actions.get([6].as_ref()).is_none());
assert_eq!(*unwrap!(actions.get([7].as_ref())), EntryAction::Del(3));
}
#[test]
fn test_union_permission_sets() {
let a = PermissionSet::new()
.allow(Action::Insert)
.deny(Action::Update)
.deny(Action::ManagePermissions);
let b = PermissionSet::new().allow(Action::Update).allow(
Action::Delete,
);
let c = union_permission_sets(&a, &b);
assert_eq!(c.is_allowed(Action::Insert), Some(true));
assert_eq!(c.is_allowed(Action::Update), Some(true));
assert_eq!(c.is_allowed(Action::Delete), Some(true));
assert_eq!(c.is_allowed(Action::ManagePermissions), Some(false));
}
}
#[cfg(all(test, feature = "use-mock-routing"))]
mod tests_with_mock_routing {
use super::*;
use rand;
use routing::{Action, EntryActions, MutableData};
use rust_sodium::crypto::sign;
use utils::test_utils::random_client;
#[test]
fn put_mdata_with_recovery() {
random_client(|client| {
let client2 = client.clone();
let client3 = client.clone();
let client4 = client.clone();
let name = rand::random();
let tag = 10_000;
let owners = btree_set![unwrap!(client.public_signing_key())];
let entries =
btree_map![
vec![0] => Value {
content: vec![0, 0],
entry_version: 0,
},
vec![1] => Value {
content: vec![1, 0],
entry_version: 1,
},
vec![2] => Value {
content: vec![2, 0],
entry_version: 0,
}
];
let permissions =
btree_map![
User::Anyone => PermissionSet::new().allow(Action::Insert)
];
let data0 = unwrap!(MutableData::new(
name,
tag,
permissions,
entries,
owners.clone(),
));
let entries =
btree_map![
vec![0] => Value {
content: vec![0, 1],
entry_version: 1,
},
vec![1] => Value {
content: vec![1, 1],
entry_version: 0,
},
vec![3] => Value {
content: vec![3, 1],
entry_version: 0,
}
];
let user = User::Key(sign::gen_keypair().0);
let permissions =
btree_map![
User::Anyone => PermissionSet::new().allow(Action::Insert).allow(Action::Update),
user => PermissionSet::new().allow(Action::Delete)
];
let data1 = unwrap!(MutableData::new(name, tag, permissions, entries, owners));
client
.put_mdata(data0)
.then(move |res| {
unwrap!(res);
put_mdata(&client2, data1)
})
.then(move |res| {
unwrap!(res);
client3.list_mdata_entries(name, tag)
})
.then(move |res| {
let entries = unwrap!(res);
assert_eq!(entries.len(), 4);
assert_eq!(
*unwrap!(entries.get([0].as_ref())),
Value {
content: vec![0, 1],
entry_version: 1,
}
);
assert_eq!(
*unwrap!(entries.get([1].as_ref())),
Value {
content: vec![1, 0],
entry_version: 1,
}
);
client4.list_mdata_permissions(name, tag)
})
.then(move |res| {
let permissions = unwrap!(res);
assert_eq!(permissions.len(), 2);
assert_eq!(
*unwrap!(permissions.get(&User::Anyone)),
PermissionSet::new().allow(Action::Insert).allow(
Action::Update,
)
);
assert_eq!(
*unwrap!(permissions.get(&user)),
PermissionSet::new().allow(Action::Delete)
);
Ok::<_, CoreError>(())
})
})
}
#[test]
fn mutate_mdata_entries_with_recovery() {
random_client(|client| {
let client2 = client.clone();
let client3 = client.clone();
let name = rand::random();
let tag = 10_000;
let entries =
btree_map![
vec![1] => Value {
content: vec![1],
entry_version: 0,
},
vec![2] => Value {
content: vec![2],
entry_version: 0,
},
vec![4] => Value {
content: vec![4],
entry_version: 0,
},
vec![5] => Value {
content: vec![5],
entry_version: 0,
},
vec![7] => Value {
content: vec![7],
entry_version: 0,
}
];
let owners = btree_set![unwrap!(client.public_signing_key())];
let data = unwrap!(MutableData::new(
name,
tag,
Default::default(),
entries,
owners,
));
client
.put_mdata(data)
.then(move |res| {
unwrap!(res);
let actions = EntryActions::new()
.ins(vec![0], vec![0], 0) .ins(vec![1], vec![1, 0], 0) .update(vec![2], vec![2, 0], 1) .update(vec![3], vec![3], 1) .update(vec![4], vec![4, 0], 0) .del(vec![5], 1) .del(vec![6], 1) .del(vec![7], 0) .into();
mutate_mdata_entries(&client2, name, tag, actions)
})
.then(move |res| {
unwrap!(res);
client3.list_mdata_entries(name, tag)
})
.then(move |res| {
let entries = unwrap!(res);
assert_eq!(entries.len(), 7);
assert_eq!(
*unwrap!(entries.get([0].as_ref())),
Value {
content: vec![0],
entry_version: 0,
}
);
assert_eq!(
*unwrap!(entries.get([1].as_ref())),
Value {
content: vec![1, 0],
entry_version: 1,
}
);
assert_eq!(
*unwrap!(entries.get([2].as_ref())),
Value {
content: vec![2, 0],
entry_version: 1,
}
);
assert_eq!(
*unwrap!(entries.get([3].as_ref())),
Value {
content: vec![3],
entry_version: 1,
}
);
assert_eq!(
*unwrap!(entries.get([4].as_ref())),
Value {
content: vec![4, 0],
entry_version: 1,
}
);
assert_eq!(
*unwrap!(entries.get([5].as_ref())),
Value {
content: vec![],
entry_version: 1,
}
);
assert!(entries.get([6].as_ref()).is_none());
assert_eq!(
*unwrap!(entries.get([7].as_ref())),
Value {
content: vec![],
entry_version: 1,
}
);
Ok::<_, CoreError>(())
})
})
}
#[test]
fn set_and_del_mdata_user_permissions_with_recovery() {
random_client(|client| {
let client2 = client.clone();
let client3 = client.clone();
let client4 = client.clone();
let client5 = client.clone();
let client6 = client.clone();
let name = rand::random();
let tag = 10_000;
let owners = btree_set![unwrap!(client.public_signing_key())];
let data = unwrap!(MutableData::new(
name,
tag,
Default::default(),
Default::default(),
owners,
));
let user0 = User::Key(sign::gen_keypair().0);
let user1 = User::Key(sign::gen_keypair().0);
let permissions = PermissionSet::new().allow(Action::Insert);
client
.put_mdata(data)
.then(move |res| {
unwrap!(res);
set_mdata_user_permissions(&client2, name, tag, user0, permissions, 0)
})
.then(move |res| {
unwrap!(res);
client3.list_mdata_user_permissions(name, tag, user0)
})
.then(move |res| {
let retrieved_permissions = unwrap!(res);
assert_eq!(retrieved_permissions, permissions);
del_mdata_user_permissions(&client4, name, tag, user0, 0)
})
.then(move |res| {
unwrap!(res);
client5.list_mdata_user_permissions(name, tag, user0)
})
.then(move |res| {
match res {
Err(CoreError::RoutingClientError(ClientError::NoSuchKey)) => (),
x => panic!("Unexpected {:?}", x),
}
del_mdata_user_permissions(&client6, name, tag, user1, 3)
})
.then(move |res| {
unwrap!(res);
Ok::<_, CoreError>(())
})
})
}
}