use mwc_libp2p::core::Multiaddr;
use mwc_libp2p::{
core::{
muxing::StreamMuxerBox,
upgrade::{SelectUpgrade, Version},
SimplePopSerializer, SimplePushSerializer,
},
dns::DnsConfig,
identity::Keypair,
mplex::MplexConfig,
noise::{self, NoiseConfig, X25519Spec},
swarm::SwarmBuilder,
yamux::YamuxConfig,
PeerId, Swarm, Transport,
};
use mwc_libp2p_tokio_socks5::Socks5TokioTcpConfig;
use mwc_libp2p::gossipsub::{
self, GossipsubEvent, IdentTopic as Topic, MessageAuthenticity, MessageId, ValidationMode,
};
use mwc_libp2p::gossipsub::{Gossipsub, MessageAcceptance, TopicHash};
use crate::mwc_core::global;
use crate::types::Error;
use crate::PeerAddr;
use async_std::task;
use chrono::Utc;
use ed25519_dalek::PublicKey as DalekPublicKey;
use futures::{future, prelude::*};
use mwc_core::core::hash::Hash;
use mwc_core::core::TxKernel;
use mwc_core::libtx::aggsig;
use mwc_libp2p::core::network::NetworkInfo;
use mwc_util::secp::pedersen::Commitment;
use mwc_util::secp::rand::Rng;
use mwc_util::secp::{ContextFlag, Message, Secp256k1, Signature};
use mwc_util::RwLock;
use mwc_util::{Mutex, OnionV3Address, OnionV3AddressError, ToHex};
use rand::seq::SliceRandom;
use std::collections::VecDeque;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::Instant;
use std::{
collections::HashMap,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
struct TokioExecutor;
impl mwc_libp2p::core::Executor for TokioExecutor {
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
tokio::spawn(future);
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReceivedMessage {
pub timestamp: i64,
pub peer_id: String,
pub topic: String,
pub fee: u64,
pub message: String,
}
const MESSAGING_RECEIVED_LIMIT: usize = 1000;
lazy_static! {
static ref LIBP2P_SWARM: Mutex<Option<Swarm<Gossipsub>>> = Mutex::new(None);
static ref LIBP2P_PEERS: RwLock<HashMap<String, (Vec<String>, u64)>> =
RwLock::new(HashMap::new());
static ref THIS_PEER_ID: RwLock<Option<PeerId>> = RwLock::new(None);
static ref LIBP2P_MESSAGE_HANDLERS: RwLock<HashMap<TopicHash, (fn(sender_address: &String, topic: &TopicHash, Vec<u8>, u64) -> bool, Topic)>> = RwLock::new(HashMap::new());
static ref SEED_LIST: RwLock<Vec<PeerAddr>> = RwLock::new(vec![]);
static ref MESSAGING_TOPICS: RwLock<HashMap<TopicHash, (String, Topic, u64)>> = RwLock::new(HashMap::new());
static ref MESSAGING_RECEIVED: RwLock<VecDeque<ReceivedMessage>> = RwLock::new(VecDeque::new());
}
pub const INTEGRITY_CALL_HISTORY_LEN_LIMIT: usize = 10;
pub const INTEGRITY_CALL_MAX_PERIOD: i64 = 15;
pub const INTEGRITY_FEE_VALID_BLOCKS: u64 = 1443;
pub const INTEGRITY_FEE_MIN_X: u64 = 10;
pub fn get_this_peer_id() -> Option<PeerId> {
THIS_PEER_ID.read().clone()
}
pub fn set_this_peer_id(peer_id: &PeerId) {
THIS_PEER_ID.write().replace(peer_id.clone());
}
pub fn init_libp2p_swarm(swarm: Swarm<Gossipsub>) {
LIBP2P_SWARM.lock().replace(swarm);
}
pub fn reset_libp2p_swarm() {
LIBP2P_SWARM.lock().take();
}
pub fn set_seed_list(seed_list: &Vec<PeerAddr>, update_seed_list: bool) {
if update_seed_list {
*SEED_LIST.write() = seed_list.clone();
}
for s in seed_list {
match s {
PeerAddr::Onion(_) => {
if let Err(e) = add_new_peer(s) {
error!("Unable to add libp2p peer, {}", e);
}
}
_ => {}
}
}
}
pub fn get_libp2p_running() -> bool {
LIBP2P_SWARM.lock().is_some()
}
pub fn get_topics() -> Vec<(String, Topic, u64)> {
MESSAGING_TOPICS
.read()
.iter()
.map(|(_k, v)| v.clone())
.collect()
}
fn get_message_version() -> u16 {
if global::is_mainnet() {
1
} else {
256
}
}
fn listener_handler(sender_address: &String, topic: &TopicHash, data: Vec<u8>, fee: u64) -> bool {
if let Some((topic_str, _topic, min_fee)) = MESSAGING_TOPICS.read().get(topic) {
if fee >= *min_fee {
let message_str = match String::from_utf8(data) {
Ok(s) => s,
Err(_) => return false,
};
if serde_json::from_str::<serde_json::Value>(&message_str).is_err() {
return false;
}
debug!(
"Get a message from {}, on topic {}, data {}, fee {}",
sender_address, topic_str, message_str, fee
);
{
let mut messages = MESSAGING_RECEIVED.write();
messages.retain(|m| m.message != message_str || m.peer_id != *sender_address);
messages.push_back(ReceivedMessage {
timestamp: Utc::now().timestamp(),
peer_id: sender_address.clone(),
topic: topic_str.clone(),
fee,
message: message_str,
});
while messages.len() > MESSAGING_RECEIVED_LIMIT {
messages.pop_front();
}
}
}
}
true
}
pub fn add_topic(topic_str: &String, min_fee: u64) -> bool {
let topic = Topic::new(topic_str.clone());
match MESSAGING_TOPICS
.write()
.insert(topic.hash(), (topic_str.clone(), topic, min_fee))
{
Some(_) => (), None => {
add_topic_to_libp2p(&topic_str, listener_handler);
return true;
}
}
return false;
}
pub fn remove_topic(topic_str: &String) -> bool {
let topic = Topic::new(topic_str.clone());
match MESSAGING_TOPICS.write().remove(&topic.hash()) {
Some(_) => {
remove_topic_from_libp2p(&topic_str);
return true;
}
None => (),
}
return false;
}
pub fn inject_received_messaged(inject_msgs: Vec<ReceivedMessage>) {
let mut messages = MESSAGING_RECEIVED.write();
for inj_msg in inject_msgs {
messages.retain(|m| m.message != inj_msg.message || m.peer_id != inj_msg.peer_id);
messages.push_back(inj_msg);
}
while messages.len() > MESSAGING_RECEIVED_LIMIT {
messages.pop_front();
}
}
pub fn get_received_messages(delete: bool) -> VecDeque<ReceivedMessage> {
if delete {
let mut res: VecDeque<ReceivedMessage> = VecDeque::new();
res.append(&mut *MESSAGING_RECEIVED.write());
res
} else {
let mut messages = MESSAGING_RECEIVED.write();
let time_limit = Utc::now().timestamp() - 600; messages.retain(|m| m.timestamp > time_limit);
messages.clone()
}
}
pub fn get_received_messages_num() -> usize {
MESSAGING_RECEIVED.read().len()
}
pub fn remove_topic_from_libp2p(topic: &str) {
let topic = Topic::new(topic);
let mut handlers = LIBP2P_MESSAGE_HANDLERS.write();
if handlers.remove(&topic.hash()).is_some() {
match &mut *LIBP2P_SWARM.lock() {
Some(swarm) => match swarm.unsubscribe(&topic) {
Ok(res) => {
if !res {
warn!("Not found expected subscribed topic {}", topic);
}
}
Err(e) => warn!("Unable to unsubscribe from the topic {}", e),
},
None => (),
}
}
}
pub fn add_topic_to_libp2p(
topic: &str,
handler: fn(sender_address: &String, topic: &TopicHash, Vec<u8>, u64) -> bool,
) {
let mut handlers = LIBP2P_MESSAGE_HANDLERS.write();
let topic = Topic::new(topic);
let _ = handlers.insert(topic.hash(), (handler, topic.clone()));
match &mut *LIBP2P_SWARM.lock() {
Some(swarm) => match swarm.subscribe(&topic) {
Ok(_res) => (),
Err(e) => warn!("Unable to subscribe to the topic {:?}", e),
},
None => (),
}
}
pub fn publish_message(topic: &Topic, integrity_message: Vec<u8>) -> Option<MessageId> {
match &mut *LIBP2P_SWARM.lock() {
Some(swarm) => match swarm.publish(topic.clone(), integrity_message) {
Ok(msg_id) => Some(msg_id),
Err(e) => {
warn!("Unable to publish libp2p message, {}", e);
None
}
},
None => None,
}
}
pub fn get_libp2p_connections() -> Vec<PeerId> {
match &*LIBP2P_SWARM.lock() {
Some(swarm) => Swarm::network_info(swarm).into_peers(),
None => vec![],
}
}
pub fn add_new_peer(peer: &PeerAddr) -> Result<(), Error> {
info!("libp2p adding a new peer {}", peer);
let addr = peer.tor_address().map_err(|e| {
Error::Libp2pError(format!(
"Unable to retrieve TOR pk from the peer address, {}",
e
))
})?;
let cur_time = Utc::now().timestamp() as u64;
let mut peer_list = LIBP2P_PEERS.write();
if let Some((peers, time)) = peer_list.get_mut("SELF") {
if !peers.contains(&addr) {
peers.push(addr);
}
*time = cur_time;
} else {
peer_list.insert("SELF".to_string(), (vec![addr], cur_time));
}
Ok(())
}
pub async fn run_libp2p_node(
tor_socks_port: u16,
tor_secret: &[u8; 32],
libp2p_port: u16,
fee_base: u64,
kernel_validation_fn: Arc<impl Fn(&Commitment) -> Result<Option<TxKernel>, Error>>,
stop_mutex: std::sync::Arc<std::sync::Mutex<u32>>,
) -> Result<(), Error> {
let secp = Arc::new(Secp256k1::with_caps(ContextFlag::Commit));
let onion_address = OnionV3Address::from_private(tor_secret)
.map_err(|e| Error::Libp2pError(format!("Unable to build onion address, {}", e)))?;
let addr_str = format!(
"/onion3/{}:{}",
onion_address.to_string(),
global::get_tor_libp2p_port()
);
let addr = addr_str
.parse::<Multiaddr>()
.map_err(|e| Error::Internal(format!("Unable to construct onion multiaddress, {}", e)))?;
let mut map = HashMap::new();
map.insert(addr.clone(), libp2p_port);
let id_keys = Keypair::ed25519_from_secret(&mut tor_secret.clone())
.map_err(|e| Error::Libp2pError(format!("Unable to build ed25519 key pairs, {}", e)))?;
let this_peer_id = PeerId::from_public_key(id_keys.public());
set_this_peer_id(&this_peer_id);
warn!("Starting libp2p, this peer: {}", this_peer_id);
debug_assert_eq!(this_peer_id.to_string(), onion_address.to_string());
let dh_keys = noise::Keypair::<X25519Spec>::new()
.into_authentic(&id_keys)
.map_err(|e| Error::Libp2pError(format!("Unable to build p2p keys, {}", e)))?;
let noise = NoiseConfig::xx(dh_keys).into_authenticated();
let tcp = Socks5TokioTcpConfig::new(tor_socks_port)
.nodelay(true)
.onion_map(map);
let transport = DnsConfig::new(tcp)
.map_err(|e| Error::Libp2pError(format!("Unable to build a transport, {}", e)))?;
let transport = transport
.upgrade(Version::V1)
.authenticate(noise)
.multiplex(SelectUpgrade::new(
YamuxConfig::default(),
MplexConfig::new(),
))
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.boxed();
let gossipsub_config = gossipsub::GossipsubConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(3)) .validation_mode(ValidationMode::Strict) .validate_messages() .accept_dalek_pk_peers_only()
.build()
.expect("Valid gossip config");
let connections_number_low = gossipsub_config.mesh_n_high();
let gossipsub: gossipsub::Gossipsub =
gossipsub::Gossipsub::new(MessageAuthenticity::Signed(id_keys), gossipsub_config)
.expect("Correct configuration");
let mut swarm = SwarmBuilder::new(transport, gossipsub, this_peer_id.clone())
.executor(Box::new(TokioExecutor))
.build();
Swarm::listen_on(&mut swarm, addr.clone())
.map_err(|e| Error::Libp2pError(format!("Unable to start listening, {}", e)))?;
let peer_topic = Topic::new(mwc_libp2p::gossipsub::PEER_TOPIC).hash();
LIBP2P_MESSAGE_HANDLERS
.read()
.iter()
.for_each(|(_topic_hash, (_fn, topic))| {
if let Err(e) = swarm.subscribe(&topic) {
error!("Unable initial subscribe to the topic, {:?}", e);
}
});
init_libp2p_swarm(swarm);
let mut requests_cash: HashMap<Commitment, VecDeque<i64>> = HashMap::new();
let mut last_cash_clean = Instant::now();
let mut last_reconnect = Instant::now();
let secp = secp.clone();
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
let mut swarm = LIBP2P_SWARM.lock();
match &mut *swarm {
Some(swarm) => {
loop {
let event = swarm.poll_next_unpin(cx);
match event {
Poll::Ready(Some(gossip_event)) => match gossip_event {
GossipsubEvent::Message {
propagation_source: peer_id,
message_id: id,
message,
} => {
debug!(
"Get libp2p message from {}, with ID {}, topic {}, data: {}",
peer_id,
id,
message.topic,
String::from_utf8_lossy(&read_message_data(&message.data))
.to_string(),
);
if message.topic == peer_topic {
if !Swarm::is_connected(&swarm, &peer_id) {
error!(
"Get topic from nodes that we are not connected to."
);
let gossip = swarm.get_behaviour();
let _ = gossip.report_message_validation_result(
&id,
&peer_id,
MessageAcceptance::Reject,
);
gossip.disconnect_peer(peer_id, true);
continue;
} else {
let gossip = swarm.get_behaviour();
if let Err(e) = gossip.report_message_validation_result(
&id,
&peer_id,
MessageAcceptance::Ignore,
) {
error!("report_message_validation_result failed for error {}", e);
}
}
let mut serializer = SimplePopSerializer::new(&message.data);
if serializer.version != 1 {
warn!("Get peer info data of unexpected version. Probably your client need to be upgraded");
continue;
}
let sz = serializer.pop_u16() as usize;
if sz > gossipsub::PEER_EXCHANGE_NUMBER_LIMIT {
warn!("Get too many peers from {}", peer_id);
let gossip = swarm.get_behaviour();
gossip.disconnect_peer(peer_id, true);
continue;
}
let mut peer_arr = vec![];
for _i in 0..sz {
let peer_data = serializer.pop_vec();
match PeerId::from_bytes(&peer_data) {
Ok(peer) => match peer.as_onion_address() {
Ok(addr) => peer_arr.push(addr),
Err(e) => {
error!("Get from libp2p peer without Dalek PK {}, {}", peer, e);
continue;
}
},
Err(e) => {
warn!("Unable to decode the libp2p peer form the peer update message, {}", e);
continue;
}
}
}
info!("Get {} peers from {}. Will process them later when we will need to increase connection number", peer_arr.len(), peer_id);
if let Ok(addr) = peer_id.as_onion_address() {
let mut new_peers_list = LIBP2P_PEERS.write();
(*new_peers_list).insert(
addr,
(peer_arr, Utc::now().timestamp() as u64),
);
} else {
error!(
"Internal Error. Getting peer without onion address {}",
peer_id
);
}
} else {
let gossip = swarm.get_behaviour();
let acceptance = match validate_integrity_message(
&peer_id,
&message.data,
kernel_validation_fn.clone(),
&mut requests_cash,
fee_base,
&secp,
) {
Ok((integrity_fee, sender_address)) => {
if integrity_fee > 0 {
let mut acceptance = MessageAcceptance::Accept;
if let Some((handler, _topic)) =
LIBP2P_MESSAGE_HANDLERS
.read()
.get(&message.topic)
{
if !(handler)(
&sender_address,
&message.topic,
read_message_data(&message.data),
integrity_fee,
) {
acceptance = MessageAcceptance::Reject;
}
}
acceptance
} else {
MessageAcceptance::Reject
}
}
Err(e) => {
warn!("Message is skipped, Unable to verify the message because of some error. {:?}", e);
MessageAcceptance::Ignore
}
};
debug!("report_message_validation_result as {:?}", acceptance);
let _ = gossip.report_message_validation_result(
&id, &peer_id, acceptance,
);
}
}
_ => {}
},
Poll::Ready(None) | Poll::Pending => {
break;
}
}
}
let history_time_limit = Utc::now().timestamp()
- INTEGRITY_CALL_HISTORY_LEN_LIMIT as i64 * INTEGRITY_CALL_MAX_PERIOD;
let now = Instant::now();
if last_cash_clean + Duration::from_secs(600) < now {
last_cash_clean = now;
requests_cash.retain(|_commit, history| {
*history.back().unwrap_or(&0) > history_time_limit
});
}
if last_reconnect + Duration::from_secs(14) < now {
last_reconnect = now;
let nw_info: NetworkInfo = Swarm::network_info(&swarm);
let mut rng = rand::thread_rng();
debug!(
"Processing libp2p reconnection task. Has connections: {}, {:?}",
nw_info.connection_counters().num_connections(),
nw_info.connection_counters()
);
if nw_info.connection_counters().num_pending_outgoing() > 100 {
info!("Restarting libp2p engine...");
return Poll::Ready(()); }
if nw_info.connection_counters().num_connections()
< connections_number_low as u32
{
let mut address_to_connect: Option<Multiaddr> = None;
loop {
let mut libp2p_peers = LIBP2P_PEERS.write();
let peers: Vec<String> = libp2p_peers.keys().cloned().collect();
if let Some(peer_id) = peers.choose(&mut rng) {
if let Some(peers) = libp2p_peers.get_mut(peer_id) {
if !peers.0.is_empty() {
let tor_address =
peers.0.remove(rng.gen::<usize>() % peers.0.len());
let res: Result<OnionV3Address, OnionV3AddressError> =
tor_address.as_str().try_into();
let p = match res {
Ok(onion_addr) => match onion_addr.to_ed25519() {
Ok(pk) => PeerId::from_public_key(
mwc_libp2p::identity::PublicKey::Ed25519(
mwc_libp2p::identity::ed25519::PublicKey(
pk,
),
),
),
Err(e) => {
error!("Unable to build PeerId form onion address {}, {}", tor_address, e);
continue;
}
},
Err(e) => {
error!("Unable to build PeerId form onion address {}, {}", tor_address, e);
continue;
}
};
if Swarm::is_connected(&swarm, &p)
|| Swarm::is_dialing(&swarm, &p) || p == this_peer_id
{
continue;
}
let address = match p.get_address() {
Ok(addr) => addr,
Err(e) => {
warn!("Unable to get peer address to connect . Will skip it, {}", e);
continue;
}
};
let multiaddress = format!(
"/onion3/{}:{}",
address,
global::get_tor_libp2p_port()
);
match multiaddress.parse::<Multiaddr>() {
Ok(addr) => {
address_to_connect = Some(addr);
break;
}
Err(e) => {
warn!("Unable to construct onion multiaddress from {} the peer address. Will skip it, {}", multiaddress, e);
continue;
}
}
} else {
libp2p_peers.remove(peer_id);
continue;
}
}
continue;
} else {
break; }
}
if address_to_connect.is_none()
&& nw_info.connection_counters().num_connections() == 0
{
info!("Retry connect to libp2p seeds peers...");
let seed_list = SEED_LIST.read().clone();
set_seed_list(&seed_list, false);
}
if let Some(addr) = address_to_connect {
match Swarm::dial_addr(swarm, addr.clone()) {
Ok(_) => {
info!("Dialling to a new peer {}", addr);
}
Err(con_limit) => {
error!("Unable deal to a new peer. Connected to {} peers, connection limit {}", con_limit.current, con_limit.limit);
}
}
}
}
}
}
None => (),
};
if *stop_mutex.lock().unwrap() == 0 {
info!("Exiting libp2p polling task");
Poll::Ready(()) } else {
Poll::Pending as Poll<()>
}
}));
reset_libp2p_swarm();
Ok(())
}
pub fn validate_integrity_message(
peer_id: &PeerId,
message: &Vec<u8>,
output_validation_fn: Arc<impl Fn(&Commitment) -> Result<Option<TxKernel>, Error>>,
requests_cash: &mut HashMap<Commitment, VecDeque<i64>>,
fee_base: u64,
secp: &Secp256k1,
) -> Result<(u64, String), Error> {
let mut ser = SimplePopSerializer::new(message);
if ser.version != get_message_version() {
debug!(
"Get message with invalid version {} from peer {}",
ser.version, peer_id
);
return Ok((0, String::new()));
}
let integrity_kernel_excess = Commitment::from_vec(ser.pop_vec());
let integrity_pk = match integrity_kernel_excess.to_pubkey(secp) {
Ok(pk) => pk,
Err(e) => {
debug!(
"Get invalid message from peer {}. integrity_kernel is not valid, {}",
peer_id, e
);
return Ok((0, String::new()));
}
};
let sender_address_pk = match DalekPublicKey::from_bytes(&ser.pop_vec()) {
Ok(pk) => pk,
Err(e) => {
debug!(
"Get invalid message from peer {}. Unable to decode sender address PK, {}",
peer_id, e
);
return Ok((0, String::new()));
}
};
let msg_hash = Hash::from_vec(&sender_address_pk.to_bytes());
let msg_message = match Message::from_slice(msg_hash.as_bytes()) {
Ok(m) => m,
Err(e) => {
debug!(
"Get invalid message from peer {}. Unable to build a message, {}",
peer_id, e
);
return Ok((0, String::new()));
}
};
let sender_address = PeerId::onion_v3_from_pubkey(&sender_address_pk);
let signature = match Signature::from_compact(&secp, &ser.pop_vec()) {
Ok(s) => s,
Err(e) => {
debug!(
"Get invalid message from peer {}. Unable to read signature, {}",
peer_id, e
);
return Ok((0, String::new()));
}
};
match aggsig::verify_completed_sig(
secp,
&signature,
&integrity_pk,
Some(&integrity_pk),
&msg_message,
) {
Ok(()) => (),
Err(e) => {
debug!(
"Get invalid message from peer {}. Integrity kernel signature is invalid, {}",
peer_id, e
);
return Ok((0, String::new()));
}
}
let integrity_kernel = match (output_validation_fn)(&integrity_kernel_excess)? {
Some(r) => r.clone(),
None => {
debug!(
"Get invalid message from peer {}. integrity_kernel {} is not found at the blockchain",
peer_id, integrity_kernel_excess.to_hex()
);
return Ok((0, String::new()));
}
};
let integrity_fee = integrity_kernel.features.get_fee_pessimistic();
if integrity_fee < fee_base * INTEGRITY_FEE_MIN_X {
debug!(
"Get invalid message from peer {}. integrity_kernel fee is below minimal level of 10X accepted base fee",
peer_id
);
return Ok((0, String::new()));
}
let now = Utc::now().timestamp();
match requests_cash.get_mut(&integrity_kernel_excess) {
Some(calls) => {
calls.push_back(now);
while calls.len() > INTEGRITY_CALL_HISTORY_LEN_LIMIT {
calls.pop_front();
}
}
None => {
let mut calls: VecDeque<i64> = VecDeque::new();
calls.push_back(now);
requests_cash.insert(integrity_kernel_excess.clone(), calls);
}
}
let call_history = requests_cash.get(&integrity_kernel_excess).unwrap();
if call_history.len() >= INTEGRITY_CALL_HISTORY_LEN_LIMIT {
let call_period = (call_history.back().unwrap() - call_history.front().unwrap())
/ (call_history.len() - 1) as i64;
if call_period < INTEGRITY_CALL_MAX_PERIOD {
debug!(
"Get invalid message from peer {}. Message sending period is {}, limit {}",
peer_id, call_period, INTEGRITY_CALL_MAX_PERIOD
);
return Ok((0, String::new()));
}
}
debug!(
"Validated the message from peer {} with integrity fee {}, sender address {}",
peer_id, integrity_fee, sender_address
);
return Ok((integrity_fee, sender_address));
}
pub fn read_message_data(message: &Vec<u8>) -> Vec<u8> {
let mut ser = SimplePopSerializer::new(message);
if ser.version != get_message_version() {
return vec![];
}
ser.skip_vec();
ser.skip_vec();
ser.skip_vec();
ser.pop_vec()
}
pub fn build_integrity_message(
kernel_excess: &Commitment,
tor_pk: &DalekPublicKey,
signature: &Signature,
message_data: &[u8],
secp: &Secp256k1,
) -> Result<Vec<u8>, Error> {
let mut ser = SimplePushSerializer::new(get_message_version());
ser.push_vec(&kernel_excess.0);
ser.push_vec(tor_pk.as_bytes());
ser.push_vec(&signature.serialize_compact(secp));
ser.push_vec(message_data);
Ok(ser.to_vec())
}
#[test]
#[ignore]
fn test_integrity() -> Result<(), Error> {
use mwc_core::core::KernelFeatures;
use mwc_util::from_hex;
use mwc_util::secp::ContextFlag;
let peer_id = PeerId::from_bytes( &from_hex("000100220020720661bf2f0d7c81c2980db83bb973be2816cf5a0da2da9aacd0ad47d534215c001c2f6f6e696f6e332f776861745f657665725f616464726573733a3737").unwrap() ).unwrap();
let peer_pk = peer_id.as_dalek_pubkey().unwrap();
let onion_address = PeerId::onion_v3_from_pubkey(&peer_pk);
let secp = Secp256k1::with_caps(ContextFlag::Full);
let integrity_kernel = Commitment::from_vec(
from_hex("08a8f99853d65cee63c973a78a005f4646b777262440a8bfa090694a339a388865").unwrap(),
);
let integrity_signature = Signature::from_compact(&secp, &from_hex("102a84ec71494d69c1b4cca181b7715beea1ebd0822efb4d6440a0f2be75119b56270affac659214c27903347676c27063dc7f5f2f0c6a8441cab73d16aa7ebe").unwrap()).unwrap();
let message: Vec<u8> = vec![1, 2, 3, 4, 3, 2, 1];
let encoded_message = build_integrity_message(
&integrity_kernel,
&peer_pk,
&integrity_signature,
&message,
&secp,
)
.unwrap();
let mut requests_cache: HashMap<Commitment, VecDeque<i64>> = HashMap::new();
let empty_output_validation_fn =
|_commit: &Commitment| -> Result<Option<TxKernel>, Error> { Ok(None) };
let empty_output_validation_fn = Arc::new(empty_output_validation_fn);
let fee_base: u64 = 1_000_000;
let mut valid_kernels = HashMap::<Commitment, TxKernel>::new();
let paid_integrity_fee = fee_base * 10;
valid_kernels.insert(
integrity_kernel,
TxKernel::with_features(KernelFeatures::Plain {
fee: paid_integrity_fee
.try_into()
.expect("Failed to convert the paid_integrity_fee"),
}),
);
let output_validation_fn = |commit: &Commitment| -> Result<Option<TxKernel>, Error> {
Ok(valid_kernels.get(commit).cloned())
};
let output_validation_fn = Arc::new(output_validation_fn);
assert_eq!(
validate_integrity_message(
&peer_id,
&encoded_message,
empty_output_validation_fn.clone(),
&mut requests_cache,
fee_base,
&secp
)
.unwrap()
.0,
0
);
assert!(requests_cache.is_empty());
let res = validate_integrity_message(
&peer_id,
&encoded_message,
output_validation_fn.clone(),
&mut requests_cache,
fee_base,
&secp,
)
.unwrap();
assert_eq!(res.0, paid_integrity_fee);
assert_eq!(res.1, onion_address);
assert!(requests_cache.len() == 1);
assert!(requests_cache.get(&integrity_kernel).unwrap().len() == 1);
requests_cache.clear();
assert_eq!(
validate_integrity_message(
&PeerId::random(),
&encoded_message,
output_validation_fn.clone(),
&mut requests_cache,
fee_base,
&secp
)
.unwrap()
.0,
0
);
assert!(requests_cache.len() == 0);
for i in 0..(INTEGRITY_CALL_HISTORY_LEN_LIMIT - 1) {
assert_eq!(
validate_integrity_message(
&peer_id,
&encoded_message,
output_validation_fn.clone(),
&mut requests_cache,
fee_base,
&secp
)
.unwrap()
.0,
paid_integrity_fee
);
assert!(requests_cache.len() == 1);
assert!(requests_cache.get(&integrity_kernel).unwrap().len() == i + 1); }
assert_eq!(
validate_integrity_message(
&peer_id,
&encoded_message,
output_validation_fn.clone(),
&mut requests_cache,
fee_base,
&secp
)
.unwrap()
.0,
0
);
assert!(
requests_cache.get(&integrity_kernel).unwrap().len() == INTEGRITY_CALL_HISTORY_LEN_LIMIT
); assert_eq!(
validate_integrity_message(
&peer_id,
&encoded_message,
output_validation_fn.clone(),
&mut requests_cache,
fee_base,
&secp
)
.unwrap()
.0,
0
);
assert!(
requests_cache.get(&integrity_kernel).unwrap().len() == INTEGRITY_CALL_HISTORY_LEN_LIMIT
); assert_eq!(
validate_integrity_message(
&peer_id,
&encoded_message,
output_validation_fn.clone(),
&mut requests_cache,
fee_base,
&secp
)
.unwrap()
.0,
0
);
assert!(
requests_cache.get(&integrity_kernel).unwrap().len() == INTEGRITY_CALL_HISTORY_LEN_LIMIT
);
assert_eq!(read_message_data(&encoded_message), message);
Ok(())
}