use std::str::FromStr;
use async_trait::async_trait;
use rings_transport::core::transport::ConnectionCreation;
use rings_transport::core::transport::ConnectionInterface;
use crate::dht::Did;
use crate::error::Error;
use crate::error::Result;
use crate::measure::MeasureCounter;
use crate::message::ConnectNodeReport;
use crate::message::ConnectNodeSend;
use crate::message::Message;
use crate::message::MessagePayload;
use crate::message::PayloadSender;
use crate::swarm::Swarm;
use crate::types::Connection;
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait ConnectionHandshake {
async fn prepare_connection_offer(&self, peer: Did) -> Result<(Connection, ConnectNodeSend)>;
async fn answer_remote_connection(
&self,
peer: Did,
offer_msg: &ConnectNodeSend,
) -> Result<(Connection, ConnectNodeReport)>;
async fn accept_remote_connection(
&self,
peer: Did,
answer_msg: &ConnectNodeReport,
) -> Result<Connection>;
async fn create_offer(&self, peer: Did) -> Result<(Connection, MessagePayload<Message>)>;
async fn answer_offer(
&self,
offer_payload: MessagePayload<Message>,
) -> Result<(Connection, MessagePayload<Message>)>;
async fn accept_answer(
&self,
answer_payload: MessagePayload<Message>,
) -> Result<(Did, Connection)>;
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait ConnectionManager {
async fn disconnect(&self, did: Did) -> Result<()>;
async fn connect(&self, did: Did) -> Result<Connection>;
async fn connect_via(&self, did: Did, next_hop: Did) -> Result<Connection>;
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait Judegement {
async fn should_connect(&self, did: Did) -> bool;
async fn record_connect(&self, did: Did);
async fn record_disconnected(&self, did: Did);
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait JudgeConnection: Judegement + ConnectionManager {
async fn disconnect(&self, did: Did) -> Result<()> {
self.record_disconnected(did).await;
tracing::debug!("[JudegeConnection] Disconnected {:?}", &did);
ConnectionManager::disconnect(self, did).await
}
async fn connect(&self, did: Did) -> Result<Connection> {
if !self.should_connect(did).await {
return Err(Error::NodeBehaviourBad(did));
}
tracing::debug!("[JudgeConnection] Try Connect {:?}", &did);
self.record_connect(did).await;
ConnectionManager::connect(self, did).await
}
async fn connect_via(&self, did: Did, next_hop: Did) -> Result<Connection> {
if !self.should_connect(did).await {
return Err(Error::NodeBehaviourBad(did));
}
tracing::debug!("[JudgeConnection] Try Connect {:?}", &did);
self.record_connect(did).await;
ConnectionManager::connect_via(self, did, next_hop).await
}
}
impl Swarm {
pub async fn record_sent(&self, did: Did) {
if let Some(measure) = &self.measure {
measure.incr(did, MeasureCounter::Sent).await;
}
}
pub async fn record_sent_failed(&self, did: Did) {
if let Some(measure) = &self.measure {
measure.incr(did, MeasureCounter::FailedToSend).await;
}
}
pub async fn behaviour_good(&self, did: Did) -> bool {
if let Some(measure) = &self.measure {
measure.good(did).await
} else {
true
}
}
pub async fn new_connection(&self, did: Did) -> Result<Connection> {
let cid = did.to_string();
self.transport
.new_connection(&cid, self.callback.clone())
.await
.map_err(Error::Transport)?;
self.transport.get_connection(&cid).map_err(|e| e.into())
}
pub async fn get_and_check_connection(&self, did: Did) -> Option<Connection> {
let Some(c) = self.get_connection(did) else {
return None;
};
if c.is_connected().await {
return Some(c);
}
tracing::debug!(
"[get_and_check_connection] connection {did} is not connected, will be dropped"
);
if let Err(e) = self.disconnect(did).await {
tracing::error!("Failed on close connection {did}: {e:?}");
};
None
}
pub fn get_connection(&self, did: Did) -> Option<Connection> {
self.transport.get_connection(&did.to_string()).ok()
}
pub fn get_connections(&self) -> Vec<(Did, Connection)> {
self.transport
.get_connections()
.into_iter()
.filter_map(|(k, v)| Did::from_str(&k).ok().map(|did| (did, v)))
.collect()
}
pub fn get_connection_ids(&self) -> Vec<Did> {
self.transport
.get_connection_ids()
.into_iter()
.filter_map(|k| Did::from_str(&k).ok())
.collect()
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl ConnectionHandshake for Swarm {
async fn prepare_connection_offer(&self, peer: Did) -> Result<(Connection, ConnectNodeSend)> {
if self.get_and_check_connection(peer).await.is_some() {
return Err(Error::AlreadyConnected);
};
let conn = self.new_connection(peer).await?;
let offer = conn.webrtc_create_offer().await.map_err(Error::Transport)?;
let offer_str = serde_json::to_string(&offer).map_err(|_| Error::SerializeToString)?;
let offer_msg = ConnectNodeSend { sdp: offer_str };
Ok((conn, offer_msg))
}
async fn answer_remote_connection(
&self,
peer: Did,
offer_msg: &ConnectNodeSend,
) -> Result<(Connection, ConnectNodeReport)> {
if self.get_and_check_connection(peer).await.is_some() {
return Err(Error::AlreadyConnected);
};
let offer = serde_json::from_str(&offer_msg.sdp).map_err(Error::Deserialize)?;
let conn = self.new_connection(peer).await?;
let answer = conn
.webrtc_answer_offer(offer)
.await
.map_err(Error::Transport)?;
let answer_str = serde_json::to_string(&answer).map_err(|_| Error::SerializeToString)?;
let answer_msg = ConnectNodeReport { sdp: answer_str };
Ok((conn, answer_msg))
}
async fn accept_remote_connection(
&self,
peer: Did,
answer_msg: &ConnectNodeReport,
) -> Result<Connection> {
let answer = serde_json::from_str(&answer_msg.sdp).map_err(Error::Deserialize)?;
let conn = self.get_connection(peer).ok_or(Error::ConnectionNotFound)?;
conn.webrtc_accept_answer(answer)
.await
.map_err(Error::Transport)?;
Ok(conn)
}
async fn create_offer(&self, peer: Did) -> Result<(Connection, MessagePayload<Message>)> {
let (conn, offer_msg) = self.prepare_connection_offer(peer).await?;
let payload = MessagePayload::new_send(
Message::ConnectNodeSend(offer_msg),
self.session_sk(),
self.did(),
peer,
)?;
Ok((conn, payload))
}
async fn answer_offer(
&self,
offer_payload: MessagePayload<Message>,
) -> Result<(Connection, MessagePayload<Message>)> {
if !offer_payload.verify() {
return Err(Error::VerifySignatureFailed);
}
let Message::ConnectNodeSend(msg) = offer_payload.data else {
return Err(Error::InvalidMessage(
"Should be ConnectNodeSend".to_string(),
));
};
let peer = offer_payload.relay.origin_sender();
let (conn, answer_msg) = self.answer_remote_connection(peer, &msg).await?;
let answer_payload = MessagePayload::new_send(
Message::ConnectNodeReport(answer_msg),
self.session_sk(),
self.did(),
self.did(),
)?;
Ok((conn, answer_payload))
}
async fn accept_answer(
&self,
answer_payload: MessagePayload<Message>,
) -> Result<(Did, Connection)> {
tracing::debug!("accept_answer: {:?}", answer_payload);
if !answer_payload.verify() {
return Err(Error::VerifySignatureFailed);
}
let Message::ConnectNodeReport(ref msg) = answer_payload.data else {
return Err(Error::InvalidMessage(
"Should be ConnectNodeReport".to_string(),
));
};
let peer = answer_payload.relay.origin_sender();
let conn = self.accept_remote_connection(peer, msg).await?;
Ok((peer, conn))
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl ConnectionManager for Swarm {
async fn disconnect(&self, did: Did) -> Result<()> {
tracing::info!("[disconnect] removing from DHT {:?}", did);
self.dht.remove(did)?;
self.transport
.close_connection(&did.to_string())
.await
.map_err(|e| e.into())
}
async fn connect(&self, did: Did) -> Result<Connection> {
tracing::info!("Try connect Did {:?}", &did);
if let Some(t) = self.get_and_check_connection(did).await {
return Ok(t);
}
let conn = self.new_connection(did).await?;
let offer = conn.webrtc_create_offer().await.map_err(Error::Transport)?;
let offer_str = serde_json::to_string(&offer).map_err(|_| Error::SerializeToString)?;
let offer_msg = ConnectNodeSend { sdp: offer_str };
self.send_message(Message::ConnectNodeSend(offer_msg), did)
.await?;
Ok(conn)
}
async fn connect_via(&self, did: Did, next_hop: Did) -> Result<Connection> {
if let Some(t) = self.get_and_check_connection(did).await {
return Ok(t);
}
tracing::info!("Try connect Did {:?}", &did);
let conn = self.new_connection(did).await?;
let offer = conn.webrtc_create_offer().await.map_err(Error::Transport)?;
let offer_str = serde_json::to_string(&offer).map_err(|_| Error::SerializeToString)?;
let offer_msg = ConnectNodeSend { sdp: offer_str };
self.send_message_by_hop(Message::ConnectNodeSend(offer_msg), did, next_hop)
.await?;
Ok(conn)
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl Judegement for Swarm {
async fn record_connect(&self, did: Did) {
if let Some(measure) = &self.measure {
tracing::info!("[Judgement] Record connect");
measure.incr(did, MeasureCounter::Connect).await;
}
}
async fn record_disconnected(&self, did: Did) {
if let Some(measure) = &self.measure {
tracing::info!("[Judgement] Record disconnected");
measure.incr(did, MeasureCounter::Disconnected).await;
}
}
async fn should_connect(&self, did: Did) -> bool {
self.behaviour_good(did).await
}
}