mod cmds;
mod data;
mod file_apis;
mod queries;
mod register_apis;
pub use register_apis::RegisterWriteAheadLog;
use crate::client::{connections::Session, errors::Error, ClientConfig};
use crate::utils::read_prefix_map_from_disk;
use sn_interface::messaging::{
data::{CmdError, DataQuery, RegisterQuery, ServiceMsg},
ServiceAuth, WireMsg,
};
use sn_interface::network_knowledge::prefix_map::NetworkPrefixMap;
use sn_interface::types::{Chunk, Keypair, Peer, PublicKey, RegisterAddress};
use bytes::Bytes;
use itertools::Itertools;
use rand::rngs::OsRng;
use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};
use tokio::{
sync::{mpsc::Receiver, RwLock},
time::Duration,
};
use tracing::{debug, info};
use uluru::LRUCache;
use xor_name::XorName;
const CHUNK_CACHE_SIZE: usize = 50;
const NETWORK_PROBE_RETRY_COUNT: usize = 5;
type ChunksCache = LRUCache<Chunk, CHUNK_CACHE_SIZE>;
#[derive(Clone, Debug)]
pub struct Client {
keypair: Keypair,
#[allow(dead_code)]
incoming_errors: Arc<RwLock<Receiver<CmdError>>>,
session: Session,
pub(crate) query_timeout: Duration,
pub(crate) cmd_timeout: Duration,
chunks_cache: Arc<RwLock<ChunksCache>>,
}
impl Client {
#[instrument(skip_all, level = "debug", name = "New client")]
pub async fn new(
config: ClientConfig,
bootstrap_nodes: BTreeSet<SocketAddr>,
optional_keypair: Option<Keypair>,
) -> Result<Self, Error> {
Client::create_with(config, bootstrap_nodes, optional_keypair, true).await
}
#[instrument]
pub(crate) async fn create_with(
config: ClientConfig,
bootstrap_nodes: BTreeSet<SocketAddr>,
optional_keypair: Option<Keypair>,
read_prefixmap: bool,
) -> Result<Self, Error> {
let mut rng = OsRng;
let keypair = match optional_keypair {
Some(id) => {
info!("Client started for specific pk: {:?}", id.public_key());
id
}
None => {
let keypair = Keypair::new_ed25519(&mut rng);
info!(
"Client started for new randomly created pk: {:?}",
keypair.public_key()
);
keypair
}
};
let home_dir = dirs_next::home_dir().ok_or(Error::CouldNotReadHomeDir)?;
let prefix_map = if read_prefixmap {
match read_prefix_map_from_disk(
&home_dir.join(format!(".safe/prefix_maps/{:?}", config.genesis_key)),
)
.await
{
Ok(prefix_map) => prefix_map,
Err(e) => {
warn!("Could not read PrefixMap at '.safe/prefix_maps': {:?}", e);
info!(
"Creating a fresh PrefixMap with GenesisKey {:?}",
config.genesis_key
);
NetworkPrefixMap::new(config.genesis_key)
}
}
} else {
NetworkPrefixMap::new(config.genesis_key)
};
if config.genesis_key != prefix_map.genesis_key() {
return Err(Error::GenesisKeyMismatch);
}
let (err_sender, err_receiver) = tokio::sync::mpsc::channel::<CmdError>(10);
let client_pk = keypair.public_key();
debug!(
"Creating new session with genesis key: {:?} ",
config.genesis_key
);
debug!(
"Creating new session with genesis key (in hex format): {} ",
hex::encode(config.genesis_key.to_bytes())
);
let session = Session::new(
client_pk,
config.genesis_key,
config.qp2p,
err_sender,
config.local_addr,
config.cmd_ack_wait,
prefix_map.clone(),
)?;
let client = Self {
keypair,
session,
incoming_errors: Arc::new(RwLock::new(err_receiver)),
query_timeout: config.query_timeout,
cmd_timeout: config.cmd_timeout,
chunks_cache: Arc::new(RwLock::new(ChunksCache::default())),
};
fn generate_probe_msg(
client: &Client,
pk: PublicKey,
) -> Result<(XorName, ServiceAuth, Bytes), Error> {
let random_dst_addr = xor_name::rand::random();
let serialised_cmd = {
let msg = ServiceMsg::Query(DataQuery::Register(RegisterQuery::Get(
RegisterAddress::Public {
name: random_dst_addr,
tag: 1,
},
)));
WireMsg::serialize_msg_payload(&msg)?
};
let signature = client.keypair.sign(&serialised_cmd);
let auth = ServiceAuth {
public_key: pk,
signature,
};
Ok((random_dst_addr, auth, serialised_cmd))
}
let (random_dst_addr, auth, serialised_cmd) = generate_probe_msg(&client, client_pk)?;
let bootstrap_nodes = {
if let Some(sap) = prefix_map.closest_or_opposite(&xor_name::rand::random(), None) {
sap.elders_vec()
} else {
bootstrap_nodes
.iter()
.copied()
.map(|socket| Peer::new(xor_name::rand::random(), socket))
.collect_vec()
}
};
let mut attempts = 0;
let mut initial_probe = client
.session
.make_contact_with_nodes(
bootstrap_nodes.clone(),
random_dst_addr,
auth.clone(),
serialised_cmd,
)
.await;
while attempts < NETWORK_PROBE_RETRY_COUNT && initial_probe.is_err() {
warn!(
"Initial probe msg to network failed. Trying again (attempt {}): {:?}",
attempts, initial_probe
);
if attempts == NETWORK_PROBE_RETRY_COUNT {
return Err(Error::NetworkContact);
}
attempts += 1;
tokio::time::sleep(Duration::from_secs(5)).await;
let (random_dst_addr, auth, serialised_cmd) = generate_probe_msg(&client, client_pk)?;
initial_probe = client
.session
.make_contact_with_nodes(
bootstrap_nodes.clone(),
random_dst_addr,
auth,
serialised_cmd,
)
.await;
}
Ok(client)
}
pub fn keypair(&self) -> Keypair {
self.keypair.clone()
}
pub fn public_key(&self) -> PublicKey {
self.keypair().public_key()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::client::utils::test_utils::{
create_test_client, create_test_client_with, init_test_logger,
};
use eyre::Result;
use sn_interface::types::utils::random_bytes;
use sn_interface::types::Scope;
use std::{
collections::HashSet,
net::{IpAddr, Ipv4Addr, SocketAddr},
};
#[tokio::test(flavor = "multi_thread")]
async fn client_creation() -> Result<()> {
init_test_logger();
let _client = create_test_client().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn client_nonsense_bootstrap_fails() -> Result<()> {
init_test_logger();
let mut nonsense_bootstrap = HashSet::new();
let _ = nonsense_bootstrap.insert(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
3033,
));
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn client_creation_with_existing_keypair() -> Result<()> {
init_test_logger();
let mut rng = OsRng;
let full_id = Keypair::new_ed25519(&mut rng);
let pk = full_id.public_key();
let client = create_test_client_with(Some(full_id), None, true).await?;
assert_eq!(pk, client.public_key());
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn long_lived_connection_survives() -> Result<()> {
init_test_logger();
let client = create_test_client().await?;
tokio::time::sleep(tokio::time::Duration::from_secs(40)).await;
let bytes = random_bytes(self_encryption::MIN_ENCRYPTABLE_BYTES / 2);
let _ = client.upload(bytes, Scope::Public).await?;
Ok(())
}
#[test]
fn client_is_send() {
init_test_logger();
fn require_send<T: Send>(_t: T) {}
require_send(create_test_client());
}
}