mod message;
mod quic;
mod tls;
pub use message::*;
pub use quic::QuicTransport;
use async_trait::async_trait;
use pollen_clock::Timestamp;
use pollen_types::{NodeId, Result};
use std::net::SocketAddr;
use tokio::sync::mpsc;
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct Envelope {
pub from: NodeId,
pub to: NodeId,
pub msg_type: MessageType,
pub payload: bytes::Bytes,
pub timestamp: Timestamp,
}
impl Envelope {
pub fn new(
from: NodeId,
to: NodeId,
msg_type: MessageType,
payload: bytes::Bytes,
timestamp: Timestamp,
) -> Self {
Self {
from,
to,
msg_type,
payload,
timestamp,
}
}
pub fn serialize(&self) -> Result<bytes::Bytes> {
bincode::serialize(self)
.map(bytes::Bytes::from)
.map_err(|e| pollen_types::PollenError::Serialization(e.to_string()))
}
pub fn deserialize(data: &[u8]) -> Result<Self> {
bincode::deserialize(data)
.map_err(|e| pollen_types::PollenError::Serialization(e.to_string()))
}
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum MessageType {
Ping,
PingAck,
PingReq,
MembershipUpdate,
CrdtDelta,
CrdtFullSync,
CrdtSyncRequest,
MerkleTreeRequest,
MerkleTreeResponse,
DataRangeRequest,
DataRangeResponse,
TaskClaim,
TaskClaimAck,
TaskComplete,
}
#[async_trait]
pub trait Transport: Send + Sync + 'static {
async fn send(&self, to: SocketAddr, envelope: Envelope) -> Result<()>;
async fn send_recv(&self, to: SocketAddr, envelope: Envelope) -> Result<Envelope>;
fn incoming(&self) -> mpsc::Receiver<Envelope>;
fn local_addr(&self) -> SocketAddr;
fn node_id(&self) -> NodeId;
async fn shutdown(&self);
}
#[derive(Clone, Debug)]
pub struct TransportConfig {
pub bind_addr: SocketAddr,
pub node_id: NodeId,
pub max_connections: usize,
pub idle_timeout: std::time::Duration,
}
impl Default for TransportConfig {
fn default() -> Self {
Self {
bind_addr: "0.0.0.0:7000".parse().unwrap(),
node_id: NodeId::new(),
max_connections: 100,
idle_timeout: std::time::Duration::from_secs(30),
}
}
}
impl TransportConfig {
pub fn new(bind_addr: SocketAddr) -> Self {
Self {
bind_addr,
..Default::default()
}
}
pub fn with_node_id(mut self, node_id: NodeId) -> Self {
self.node_id = node_id;
self
}
}