use std::sync::atomic::Ordering;
use async_std::sync::Arc;
use futures::{channel::mpsc::Sender, sink::SinkExt};
use log::{error, info};
use koibumi_core::{
message::{self, InvHash, Message, NetAddr, Pack, StreamNumbers},
net::SocketAddrExt,
time::Time,
};
use crate::{
connection_loop::{Context, Event, Result},
constant::{OBJECT_FUTURE_LIFETIME, OBJECT_PAST_LIFETIME},
inv_manager::Event as InvEvent,
manager::Event as BmEvent,
net::SocketAddrNode,
node_manager::{Entry as NodeEntry, Event as NodeEvent},
};
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum Direction {
Incoming,
Outgoing,
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
enum State {
Pending,
Established,
}
pub struct Connection {
broker: Sender<Event>,
addr: SocketAddrNode,
direction: Direction,
ctx: Arc<Context>,
bm_event_sender: Sender<BmEvent>,
node_sender: Sender<NodeEvent>,
inv_sender: Sender<InvEvent>,
state: State,
version_sent: bool,
version_received: bool,
verack_received: bool,
peer_version: Option<message::Version>,
}
impl Connection {
pub fn new(
broker: Sender<Event>,
addr: SocketAddrNode,
direction: Direction,
ctx: Arc<Context>,
bm_event_sender: Sender<BmEvent>,
node_sender: Sender<NodeEvent>,
inv_sender: Sender<InvEvent>,
) -> Self {
Self {
broker,
addr,
direction,
ctx,
bm_event_sender,
node_sender,
inv_sender,
state: State::Pending,
version_sent: false,
version_received: false,
verack_received: false,
peer_version: None,
}
}
pub fn direction(&self) -> Direction {
self.direction
}
pub fn ctx(&self) -> &Arc<Context> {
&self.ctx
}
pub async fn write_message<T>(&mut self, message: &T) -> Result<()>
where
T: Message + Pack,
{
let packet = message.pack(self.ctx.config().core()).unwrap();
let list = vec![packet];
self.broker
.send(Event::Write {
addr: self.addr.clone(),
list,
})
.await?;
Ok(())
}
pub fn stream_numbers(&self) -> StreamNumbers {
vec![1u32.into()].into()
}
pub fn common_stream_numbers(&self, v: &StreamNumbers) -> StreamNumbers {
self.stream_numbers().intersection(v)
}
pub async fn write_version(&mut self) -> Result<()> {
let version = message::Version::builder(
self.ctx.config().core(),
self.ctx.node_nonce(),
self.ctx.config().user_agent().clone(),
)
.stream_numbers(self.ctx.config().stream_numbers().clone())
.build();
self.write_message(&version).await?;
self.version_sent = true;
Ok(())
}
pub async fn write_version_if_not_sent(&mut self) -> Result<()> {
if self.version_sent {
return Ok(());
}
self.write_version().await?;
self.version_sent = true;
Ok(())
}
pub async fn write_verack(&mut self) -> Result<()> {
let verack = message::Verack::new();
self.write_message(&verack).await
}
pub async fn write_error(&mut self, fatal: u64, error_text: impl AsRef<[u8]>) -> Result<()> {
let error = message::Error::new(fatal.into(), error_text.as_ref().to_vec().into());
self.write_message(&error).await
}
pub async fn write_pong(&mut self) -> Result<()> {
let pong = message::Pong::new();
self.write_message(&pong).await
}
pub fn set_peer_version(&mut self, version: Option<message::Version>) {
self.peer_version = version;
}
pub async fn set_state_established(&mut self) -> Result<()> {
self.state = State::Established;
let established = self
.ctx
.established(self.direction)
.fetch_add(1, Ordering::SeqCst)
+ 1;
info!("({:?}) Established: {}", self.direction, established);
if let Err(err) = self
.broker
.send(Event::Established {
addr: self.addr.clone(),
})
.await
{
error!("{}", err);
}
self.send_bm_event_connection_counts().await;
#[allow(clippy::collapsible_if)]
if let Some(version) = &self.peer_version {
if self.direction == Direction::Outgoing {
if let Err(err) = self
.node_sender
.send(NodeEvent::ConnectionSucceeded(
self.addr.clone(),
version.user_agent().clone(),
))
.await
{
error!("{}", err);
}
} else {
if let Err(err) = self
.bm_event_sender
.send(BmEvent::Established {
addr: self.addr.clone(),
user_agent: version.user_agent().clone(),
rating: 0.into(),
})
.await
{
error!("{}", err)
}
}
}
let mut close = false;
if self.direction == Direction::Incoming {
let count = self.ctx.established(self.direction).load(Ordering::SeqCst);
if count > self.ctx.config().max_incoming_established() {
close = true;
}
}
if let Err(err) = self
.node_sender
.send(NodeEvent::Send(self.addr.clone(), close))
.await
{
error!("{}", err);
}
if !close {
if let Err(err) = self
.inv_sender
.send(InvEvent::SendBigInv {
addr: self.addr.clone(),
})
.await
{
error!("{}", err);
}
}
Ok(())
}
pub fn established(&self) -> bool {
self.state == State::Established
}
pub fn version_sent(&self) -> bool {
self.version_sent
}
pub fn version_received(&self) -> bool {
self.version_received
}
pub fn set_version_received(&mut self) {
self.version_received = true
}
pub fn verack_received(&self) -> bool {
self.verack_received
}
pub fn set_verack_received(&mut self) {
self.verack_received = true
}
pub async fn send_bm_event_connection_counts(&mut self) {
if let Err(err) = self
.bm_event_sender
.send(BmEvent::ConnectionCounts {
incoming_initiated: self
.ctx
.initiated(Direction::Incoming)
.load(Ordering::SeqCst),
incoming_connected: self
.ctx
.connected(Direction::Incoming)
.load(Ordering::SeqCst),
incoming_established: self
.ctx
.established(Direction::Incoming)
.load(Ordering::SeqCst),
outgoing_initiated: self
.ctx
.initiated(Direction::Outgoing)
.load(Ordering::SeqCst),
outgoing_connected: self
.ctx
.connected(Direction::Outgoing)
.load(Ordering::SeqCst),
outgoing_established: self
.ctx
.established(Direction::Outgoing)
.load(Ordering::SeqCst),
})
.await
{
error!("{}", err);
}
}
pub async fn add_addrs(&mut self, addrs: &[NetAddr]) {
let mut list = Vec::with_capacity(addrs.len());
for addr in addrs {
let saddr: SocketAddrExt = addr.socket_addr().clone().into();
let entry = NodeEntry::new(addr.stream_number(), saddr.into(), addr.time());
list.push(entry);
}
if let Err(err) = self.node_sender.send(NodeEvent::Add(list)).await {
error!("{}", err);
}
}
pub async fn add_inv_hashes(&mut self, list: Vec<InvHash>) {
if let Err(err) = self
.inv_sender
.send(InvEvent::Inv {
addr: self.addr.clone(),
list,
})
.await
{
error!("{}", err);
}
}
pub async fn send_objects(&mut self, list: Vec<InvHash>) {
if let Err(err) = self
.broker
.send(Event::GetObjects {
addr: self.addr.clone(),
list,
})
.await
{
error!("{}", err);
}
}
pub async fn add_object(&mut self, object: message::Object) {
let now = Time::now();
let expires_time = object.header().expires_time();
match expires_time.checked_add(OBJECT_PAST_LIFETIME) {
Some(target) => {
if now > target {
return;
}
}
None => return,
}
match now.checked_add(OBJECT_FUTURE_LIFETIME) {
Some(target) => {
if expires_time > target {
return;
}
}
None => return,
}
match object.validate_pow(self.ctx.config().core()) {
Ok(_) => {
if let Err(err) = self
.inv_sender
.send(InvEvent::Object {
addr: self.addr.clone(),
object,
})
.await
{
error!("{}", err);
}
}
Err(_) => {
if let Err(err) = self
.inv_sender
.send(InvEvent::Drop(object.inv_hash()))
.await
{
error!("{}", err);
}
}
}
}
}