use super::Client;
use crate::{Error, Result};
use sn_interface::{
messaging::data::{
CreateRegister, DataCmd, DataQueryVariant, EditRegister, QueryResponse, RegisterCmd,
RegisterQuery, SignedRegisterCreate, SignedRegisterEdit,
},
types::{
register::{Action, Entry, EntryHash, Permissions, Policy, Register, User},
RegisterAddress as Address,
},
};
use std::collections::BTreeSet;
use xor_name::XorName;
pub type RegisterWriteAheadLog = Vec<DataCmd>;
impl Client {
#[instrument(skip(self), level = "debug")]
pub async fn publish_register_ops(&self, wal: RegisterWriteAheadLog) -> Result<()> {
for cmd in &wal {
self.send_cmd(cmd.clone()).await?;
}
Ok(())
}
#[instrument(skip(self), level = "debug")]
pub async fn create_register(
&self,
name: XorName,
tag: u64,
policy: Policy,
) -> Result<(Address, RegisterWriteAheadLog)> {
let address = Address { name, tag };
let op = CreateRegister { name, tag, policy };
let signature = self.keypair.sign(&bincode::serialize(&op)?);
let cmd = DataCmd::Register(RegisterCmd::Create {
cmd: SignedRegisterCreate {
op,
auth: sn_interface::messaging::ClientAuth {
public_key: self.keypair.public_key(),
signature,
},
},
section_sig: section_sig(), });
debug!("Creating Register: {:?}", cmd);
Ok((address, vec![cmd]))
}
#[instrument(skip(self, children), level = "debug")]
pub async fn write_to_local_register(
&self,
address: Address,
entry: Entry,
children: BTreeSet<EntryHash>,
) -> Result<(EntryHash, RegisterWriteAheadLog)> {
debug!("Writing to register at {:?}", address);
let mut register = self.get_register(address).await?;
let public_key = self.keypair.public_key();
register.check_permissions(Action::Write, Some(User::Key(public_key)))?;
let (hash, op) = register.write(entry, children)?;
let op = EditRegister { address, edit: op };
let signature = self.keypair.sign(&bincode::serialize(&op)?);
let edit = SignedRegisterEdit {
op,
auth: sn_interface::messaging::ClientAuth {
public_key,
signature,
},
};
let cmd = DataCmd::Register(RegisterCmd::Edit(edit));
let batch = vec![cmd];
Ok((hash, batch))
}
#[instrument(skip(self), level = "debug")]
pub async fn get_register(&self, address: Address) -> Result<Register> {
let query = DataQueryVariant::Register(RegisterQuery::Get(address));
let query_result = self.send_query(query.clone()).await?;
debug!("get_register result is; {query_result:?}");
match query_result.response {
QueryResponse::GetRegister(res) => res.map_err(|err| Error::ErrorMsg { source: err }),
other => Err(Error::UnexpectedQueryResponse {
query,
response: other,
}),
}
}
#[instrument(skip(self), level = "debug")]
pub async fn read_register(&self, address: Address) -> Result<BTreeSet<(EntryHash, Entry)>> {
let query = DataQueryVariant::Register(RegisterQuery::Read(address));
let query_result = self.send_query(query.clone()).await?;
match query_result.response {
QueryResponse::ReadRegister(res) => res.map_err(|err| Error::ErrorMsg { source: err }),
other => Err(Error::UnexpectedQueryResponse {
query,
response: other,
}),
}
}
#[instrument(skip(self), level = "debug")]
pub async fn get_register_entry(&self, address: Address, hash: EntryHash) -> Result<Entry> {
let query = DataQueryVariant::Register(RegisterQuery::GetEntry { address, hash });
let query_result = self.send_query(query.clone()).await?;
match query_result.response {
QueryResponse::GetRegisterEntry(res) => {
res.map_err(|err| Error::ErrorMsg { source: err })
}
other => Err(Error::UnexpectedQueryResponse {
query,
response: other,
}),
}
}
#[instrument(skip(self), level = "debug")]
pub async fn get_register_owner(&self, address: Address) -> Result<User> {
let query = DataQueryVariant::Register(RegisterQuery::GetOwner(address));
let query_result = self.send_query(query.clone()).await?;
match query_result.response {
QueryResponse::GetRegisterOwner(res) => {
res.map_err(|err| Error::ErrorMsg { source: err })
}
other => Err(Error::UnexpectedQueryResponse {
query,
response: other,
}),
}
}
#[instrument(skip(self), level = "debug")]
pub async fn get_register_permissions_for_user(
&self,
address: Address,
user: User,
) -> Result<Permissions> {
let query = DataQueryVariant::Register(RegisterQuery::GetUserPermissions { address, user });
let query_result = self.send_query(query.clone()).await?;
match query_result.response {
QueryResponse::GetRegisterUserPermissions(res) => {
res.map_err(|err| Error::ErrorMsg { source: err })
}
other => Err(Error::UnexpectedQueryResponse {
query,
response: other,
}),
}
}
#[instrument(skip(self), level = "debug")]
pub async fn get_register_policy(&self, address: Address) -> Result<Policy> {
let query = DataQueryVariant::Register(RegisterQuery::GetPolicy(address));
let query_result = self.send_query(query.clone()).await?;
match query_result.response {
QueryResponse::GetRegisterPolicy(res) => {
res.map_err(|err| Error::ErrorMsg { source: err })
}
other => Err(Error::UnexpectedQueryResponse {
query,
response: other,
}),
}
}
}
fn section_sig() -> sn_interface::messaging::SectionSig {
use sn_interface::messaging::system::SectionSig;
let sk = bls::SecretKey::random();
let public_key = sk.public_key();
let data = "hello".to_string();
let signature = sk.sign(data);
SectionSig {
public_key,
signature,
}
}
#[cfg(test)]
mod tests {
use crate::{
utils::test_utils::{create_test_client, init_logger},
Error,
};
use sn_interface::{
messaging::data::Error as ErrorMsg,
types::{
log_markers::LogMarker,
register::{Action, EntryHash, Permissions, Policy, User},
Keypair,
},
};
use eyre::{bail, eyre, Context, Result};
use rand::Rng;
use std::{
collections::{BTreeMap, BTreeSet},
time::Instant,
};
use tokio::time::{sleep, Duration};
use tracing::Instrument;
#[tokio::test(flavor = "multi_thread")]
async fn test_register_batching() -> Result<()> {
init_logger();
let _outer_span = tracing::info_span!("test__register_batching").entered();
let client = create_test_client().await?;
let name = xor_name::rand::random();
let tag = 15000;
let owner = User::Key(client.public_key());
let (address, mut batch) = client.create_register(name, tag, policy(owner)).await?;
let (address2, mut batch2) = client.create_register(name, tag, policy(owner)).await?;
batch.append(&mut batch2);
client.publish_register_ops(batch).await?;
let register1 = client.get_register(address).await?;
assert_eq!(*register1.name(), name);
assert_eq!(register1.tag(), tag);
assert_eq!(register1.size(), 0);
assert_eq!(register1.owner(), owner);
let register2 = client.get_register(address2).await?;
assert_eq!(*register2.name(), name);
assert_eq!(register2.tag(), tag);
assert_eq!(register2.size(), 0);
assert_eq!(register2.owner(), owner);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "Testnet network_assert_ tests should be excluded from normal tests runs, they need to be run in sequence to ensure validity of checks"]
async fn register_network_assert_expected_log_counts() -> Result<()> {
init_logger();
let _outer_span = tracing::info_span!("register_network_assert").entered();
let mut the_logs = crate::testnet_grep::NetworkLogState::new()?;
let network_assert_delay: u64 = std::env::var("NETWORK_ASSERT_DELAY")
.unwrap_or_else(|_| "3".to_string())
.parse()?;
let delay = Duration::from_secs(network_assert_delay);
debug!("Running network asserts with delay of {:?}", delay);
let client = create_test_client().await?;
let name = xor_name::rand::random();
let tag = 15000;
let owner = User::Key(client.public_key());
let (_address, batch) = client.create_register(name, tag, policy(owner)).await?;
client.publish_register_ops(batch).await?;
sleep(delay).await;
the_logs.assert_count(LogMarker::RegisterWrite, 7)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "too heavy for CI"]
async fn measure_upload_times() -> Result<()> {
init_logger();
let _outer_span = tracing::info_span!("test__measure_upload_times").entered();
let client = create_test_client().await?;
let name = xor_name::rand::random();
let tag = 10;
let owner = User::Key(client.public_key());
let (address, batch) = client.create_register(name, tag, policy(owner)).await?;
client.publish_register_ops(batch).await?;
let mut total = 0;
let value_1 = random_register_entry();
for i in 0..1000_usize {
let now = Instant::now();
let (_value1_hash, batch) = client
.write_to_local_register(address, value_1.clone(), BTreeSet::new())
.await?;
client.publish_register_ops(batch).await?;
let elapsed = now.elapsed().as_millis();
total += elapsed;
println!("Iter # {}, elapsed: {}", i, elapsed);
}
println!("Total elapsed: {}", total);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn register_basics() -> Result<()> {
init_logger();
let _outer_span = tracing::info_span!("test__register_basics").entered();
let client = create_test_client().await?;
let name = xor_name::rand::random();
let tag = 15000;
let owner = User::Key(client.public_key());
let (address, batch) = client.create_register(name, tag, policy(owner)).await?;
client.publish_register_ops(batch).await?;
let register = client.get_register(address).await?;
assert_eq!(*register.name(), name);
assert_eq!(register.tag(), tag);
assert_eq!(register.size(), 0);
assert_eq!(register.owner(), owner);
let (address, batch) = client.create_register(name, tag, policy(owner)).await?;
client.publish_register_ops(batch).await?;
let register = client.get_register(address).await?;
assert_eq!(*register.name(), name);
assert_eq!(register.tag(), tag);
assert_eq!(register.size(), 0);
assert_eq!(register.owner(), owner);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn register_permissions() -> Result<()> {
init_logger();
let _outer_span = tracing::info_span!("test__register_permissions").entered();
let client = create_test_client().await?;
let name = xor_name::rand::random();
let tag = 15000;
let owner = User::Key(client.public_key());
let (address, batch) = client
.create_register(name, tag, none_policy(owner)) .await?;
client
.publish_register_ops(batch)
.await
.context("publish ops failed")?;
let permissions = client
.get_register_permissions_for_user(address, owner)
.instrument(tracing::info_span!("get owner perms"))
.await
.context("get user perms failed")?;
assert_eq!(Some(true), permissions.is_allowed(Action::Read));
assert_eq!(Some(true), permissions.is_allowed(Action::Write));
let other_user = User::Key(Keypair::new_ed25519().public_key());
match client
.get_register_permissions_for_user(address, other_user)
.instrument(tracing::info_span!("get other user perms"))
.await
{
Ok(_) => bail!("Should not be able to retrieve an entry for a random user"),
Err(Error::ErrorMsg {
source: ErrorMsg::NoSuchUser(user),
..
}) => {
assert_eq!(user, other_user);
Ok(())
},
Err(err) => Err(eyre!(
"Unexpected error returned when retrieving non-existing Register user permission: {:?}", err,
)),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn register_hashes_dont_clash() -> Result<()> {
init_logger();
let start_span =
tracing::info_span!("test__register_write_without_publish_start").entered();
let client = create_test_client().await?;
let name = xor_name::rand::random();
let tag = 10;
let owner = User::Key(client.public_key());
let (address, batch) = client.create_register(name, tag, policy(owner)).await?;
client.publish_register_ops(batch).await?;
let value_1 = random_register_entry();
let (value1_hash, _batch) = client
.write_to_local_register(address, value_1.clone(), BTreeSet::new())
.await?;
let value_2 = random_register_entry();
drop(start_span);
let second_span =
tracing::info_span!("test__register_write_without_publish__second_write").entered();
let (value2_hash, _batch) = client
.write_to_local_register(address, value_2.clone(), BTreeSet::new())
.await?;
assert!(value1_hash != value2_hash);
let mut children = BTreeSet::new();
let _ = children.insert(value1_hash);
let value_3 = random_register_entry();
drop(second_span);
let _third_span =
tracing::info_span!("test__register_write_without_publish__third_write").entered();
let (value1_3_hash, _batch) = client
.write_to_local_register(address, value_3.clone(), children.clone())
.await?;
let (value1_2_hash, _batch) = client
.write_to_local_register(address, value_2.clone(), children.clone())
.await?;
assert!(value1_2_hash != value1_3_hash);
assert!(value2_hash != value1_2_hash);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn register_write() -> Result<()> {
init_logger();
let start_span = tracing::info_span!("test__register_write_start").entered();
let client = create_test_client().await?;
let name = xor_name::rand::random();
let tag = 10;
let owner = User::Key(client.public_key());
let (address, batch) = client.create_register(name, tag, policy(owner)).await?;
client.publish_register_ops(batch).await?;
let value_1 = random_register_entry();
let (value1_hash, batch) = client
.write_to_local_register(address, value_1.clone(), BTreeSet::new())
.await?;
client.publish_register_ops(batch).await?;
let hashes = client.read_register(address).await?;
assert_eq!(1, hashes.len());
let current = hashes.iter().next();
assert_eq!(current, Some(&(value1_hash, value_1.clone())));
let value_2 = random_register_entry();
drop(start_span);
let _second_span = tracing::info_span!("test__register_write__second_write").entered();
let (value2_hash, batch) = client
.write_to_local_register(address, value_2.clone(), BTreeSet::new())
.await?;
assert!(batch.len() == 1);
client.publish_register_ops(batch).await?;
let hashes = client.read_register(address).await?;
assert_eq!(2, hashes.len());
let retrieved_value_1 = client
.get_register_entry(address, value1_hash)
.instrument(tracing::info_span!("get_value_1"))
.await?;
assert_eq!(retrieved_value_1, value_1);
let retrieved_value_2 = client
.get_register_entry(address, value2_hash)
.instrument(tracing::info_span!("get_value_2"))
.await?;
assert_eq!(retrieved_value_2, value_2);
let entry_hash = EntryHash(rand::thread_rng().gen::<[u8; 32]>());
match client
.get_register_entry(address, entry_hash)
.instrument(tracing::info_span!("final get"))
.await
{
Err(Error::ErrorMsg {
source: ErrorMsg::NoSuchEntry(hash),
..
}) => {
assert_eq!(hash, entry_hash);
Ok(())
}
Err(err) => Err(eyre!(
"Unexpected error returned when retrieving a non-existing Register entry: {:?}",
err,
)),
Ok(_data) => Err(eyre!(
"Unexpectedly retrieved a register entry with a random hash!",
)),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn register_owner() -> Result<()> {
init_logger();
let _outer_span = tracing::info_span!("test__register_owner").entered();
let client = create_test_client().await?;
let name = xor_name::rand::random();
let tag = 10;
let owner = User::Key(client.public_key());
let (address, batch) = client.create_register(name, tag, policy(owner)).await?;
client.publish_register_ops(batch).await?;
let current_owner = client.get_register_owner(address).await?;
assert_eq!(owner, current_owner);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn ae_checks_register_test() -> Result<()> {
init_logger();
let _outer_span = tracing::info_span!("ae_checks_register_test").entered();
let client = create_test_client()
.await
.context("test client creation failed")?;
let name = xor_name::rand::random();
let tag = 15000;
let owner = User::Key(client.public_key());
let (address, batch) = client
.create_register(name, tag, policy(owner))
.await
.context("Creating register failed")?;
client
.publish_register_ops(batch)
.await
.context("publishing reg failed")?;
let _register = client
.get_register(address)
.await
.context("get reg failed after publish")?;
Ok(())
}
fn random_register_entry() -> Vec<u8> {
let random_bytes = rand::thread_rng().gen::<[u8; 32]>();
random_bytes.to_vec()
}
fn policy(owner: User) -> Policy {
let permissions = BTreeMap::new();
Policy { owner, permissions }
}
fn none_policy(owner: User) -> Policy {
let mut permissions = BTreeMap::new();
let _ = permissions.insert(owner, Permissions::new(None));
Policy { owner, permissions }
}
}