pub mod map_info;
pub mod map_apis;
pub mod blob_apis;
pub mod transfer_actor;
pub mod sequence_apis;
pub mod blob_storage;
pub use self::map_info::MapInfo;
pub use self::transfer_actor::{ClientTransferValidator, SafeTransferActor};
pub use blob_storage::{BlobStorage, BlobStorageDryRun};
use crate::{
config_handler::Config,
connection_manager::{ConnectionManager, Session, Signer},
errors::Error,
};
use crdts::Dot;
use futures::lock::Mutex;
use log::{debug, info, trace, warn};
use qp2p::Config as QuicP2pConfig;
use rand::rngs::OsRng;
use sn_data_types::{Keypair, PublicKey, Token};
use sn_messaging::{
client::{Cmd, DataCmd, Message, Query, QueryResponse},
MessageId,
};
use std::{
path::Path,
str::FromStr,
{collections::HashSet, net::SocketAddr, sync::Arc},
};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
pub const ELDER_SIZE: usize = 5;
pub const IMMUT_DATA_CACHE_SIZE: usize = 300;
pub const SEQUENCE_CRDT_REPLICA_SIZE: usize = 300;
#[derive(Clone)]
pub struct Client {
keypair: Keypair,
transfer_actor: Arc<Mutex<SafeTransferActor<ClientTransferValidator, Keypair>>>,
simulated_farming_payout_dot: Dot<PublicKey>,
session: Session,
notification_receiver: Arc<Mutex<UnboundedReceiver<Error>>>,
}
impl Client {
pub async fn new(
optional_keypair: Option<Keypair>,
config_file_path: Option<&Path>,
bootstrap_config: Option<HashSet<SocketAddr>>,
) -> Result<Self, Error> {
let mut rng = OsRng;
let (keypair, is_random_client) = match optional_keypair {
Some(id) => {
info!("Client started for specific pk: {:?}", id.public_key());
(id, false)
}
None => {
let keypair = Keypair::new_ed25519(&mut rng);
info!(
"Client started for new randomly created pk: {:?}",
keypair.public_key()
);
(keypair, true)
}
};
let (notification_sender, notification_receiver) = unbounded_channel::<Error>();
let qp2p_config = Config::new(config_file_path, bootstrap_config).qp2p;
let session = attempt_bootstrap(qp2p_config, keypair.clone(), notification_sender).await?;
let random_payment_id = Keypair::new_ed25519(&mut rng);
let random_payment_pk = random_payment_id.public_key();
let simulated_farming_payout_dot = Dot::new(random_payment_pk, 0);
let validator = ClientTransferValidator {};
let transfer_actor = Arc::new(Mutex::new(SafeTransferActor::new(
keypair.clone(),
session.section_key_set().await?.clone(),
validator,
)));
let mut client = Self {
session,
keypair,
transfer_actor,
simulated_farming_payout_dot,
notification_receiver: Arc::new(Mutex::new(notification_receiver)),
};
if cfg!(feature = "simulated-payouts") {
if is_random_client {
debug!("Attempting to trigger simulated payout");
let _ = client
.trigger_simulated_farming_payout(Token::from_str("10")?)
.await?;
} else {
warn!("No automatic simulated payout occurs for clients created for pre-existing SecretKeys")
}
}
match client.get_history().await {
Ok(_) => {}
Err(error) => {
let err = error.to_string();
warn!("{:?}", &err);
}
};
Ok(client)
}
pub async fn keypair(&self) -> Keypair {
self.keypair.clone()
}
pub async fn public_key(&self) -> PublicKey {
let id = self.keypair().await;
id.public_key()
}
async fn send_query(&self, query: Query) -> Result<QueryResponse, Error> {
debug!("Sending QueryRequest: {:?}", query);
let message = self.create_query_message(query)?;
let endpoint = self.session.endpoint()?.clone();
let elders = self.session.elders.iter().cloned().collect();
let pending_queries = self.session.pending_queries.clone();
ConnectionManager::send_query(&message, endpoint, elders, pending_queries).await
}
pub(crate) fn create_cmd_message(&self, msg_contents: Cmd) -> Result<Message, Error> {
let id = MessageId::new();
trace!("Creating cmd message with id: {:?}", id);
let target_section_pk = Some(self.session.section_key()?);
Ok(Message::Cmd {
cmd: msg_contents,
id,
target_section_pk,
})
}
pub(crate) fn create_query_message(&self, msg_contents: Query) -> Result<Message, Error> {
let id = MessageId::new();
trace!("Creating query message with id : {:?}", id);
let target_section_pk = Some(self.session.section_key()?);
Ok(Message::Query {
query: msg_contents,
id,
target_section_pk,
})
}
async fn pay_and_send_data_command(&self, cmd: DataCmd) -> Result<(), Error> {
let payment_proof = self.create_write_payment_proof(&cmd).await?;
let msg_contents = Cmd::Data {
cmd,
payment: payment_proof.clone(),
};
let message = self.create_cmd_message(msg_contents)?;
let endpoint = self.session.endpoint()?.clone();
let elders = self.session.elders.iter().cloned().collect();
let _ = ConnectionManager::send_cmd(&message, endpoint, elders).await?;
self.apply_write_payment_to_local_actor(payment_proof).await
}
}
pub async fn attempt_bootstrap(
qp2p_config: QuicP2pConfig,
keypair: Keypair,
notifier: UnboundedSender<Error>,
) -> Result<Session, Error> {
let mut attempts: u32 = 0;
let signer = Signer::new(keypair);
let session = Session::new(qp2p_config, signer, notifier)?;
loop {
let res = ConnectionManager::bootstrap(session.clone()).await;
match res {
Ok(session) => return Ok(session),
Err(err) => {
attempts += 1;
if attempts < 3 {
trace!("Error connecting to network! Retrying... ({})", attempts);
} else {
return Err(err);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::test_utils::{create_test_client, create_test_client_with};
use anyhow::Result;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
#[tokio::test]
pub async fn client_creation() -> Result<()> {
let _client = create_test_client().await?;
Ok(())
}
#[tokio::test]
#[ignore]
pub async fn client_nonsense_bootstrap_fails() -> Result<()> {
let mut nonsense_bootstrap = HashSet::new();
let _ = nonsense_bootstrap.insert(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
3033,
));
Ok(())
}
#[tokio::test]
pub async fn client_creation_with_existing_keypair() -> Result<()> {
let mut rng = OsRng;
let full_id = Keypair::new_ed25519(&mut rng);
let pk = full_id.public_key();
let client = create_test_client_with(Some(full_id)).await?;
assert_eq!(pk, client.public_key().await);
Ok(())
}
#[tokio::test]
pub async fn long_lived_connection_survives() -> Result<()> {
let client = create_test_client().await?;
tokio::time::delay_for(tokio::time::Duration::from_secs(40)).await;
let balance = client.get_balance().await?;
assert_ne!(balance, Token::from_nano(0));
Ok(())
}
}