1mod message;
7mod quic;
8mod tls;
9
10pub use message::*;
11pub use quic::QuicTransport;
12
13use async_trait::async_trait;
14use pollen_clock::Timestamp;
15use pollen_types::{NodeId, Result};
16use std::net::SocketAddr;
17use tokio::sync::mpsc;
18
19#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
21pub struct Envelope {
22 pub from: NodeId,
24 pub to: NodeId,
26 pub msg_type: MessageType,
28 pub payload: bytes::Bytes,
30 pub timestamp: Timestamp,
32}
33
34impl Envelope {
35 pub fn new(
37 from: NodeId,
38 to: NodeId,
39 msg_type: MessageType,
40 payload: bytes::Bytes,
41 timestamp: Timestamp,
42 ) -> Self {
43 Self {
44 from,
45 to,
46 msg_type,
47 payload,
48 timestamp,
49 }
50 }
51
52 pub fn serialize(&self) -> Result<bytes::Bytes> {
54 bincode::serialize(self)
55 .map(bytes::Bytes::from)
56 .map_err(|e| pollen_types::PollenError::Serialization(e.to_string()))
57 }
58
59 pub fn deserialize(data: &[u8]) -> Result<Self> {
61 bincode::deserialize(data)
62 .map_err(|e| pollen_types::PollenError::Serialization(e.to_string()))
63 }
64}
65
66#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
68pub enum MessageType {
69 Ping,
71 PingAck,
72 PingReq,
73 MembershipUpdate,
74
75 CrdtDelta,
77 CrdtFullSync,
78 CrdtSyncRequest,
79
80 MerkleTreeRequest,
82 MerkleTreeResponse,
83 DataRangeRequest,
84 DataRangeResponse,
85
86 TaskClaim,
88 TaskClaimAck,
89 TaskComplete,
90}
91
92#[async_trait]
94pub trait Transport: Send + Sync + 'static {
95 async fn send(&self, to: SocketAddr, envelope: Envelope) -> Result<()>;
97
98 async fn send_recv(&self, to: SocketAddr, envelope: Envelope) -> Result<Envelope>;
100
101 fn incoming(&self) -> mpsc::Receiver<Envelope>;
103
104 fn local_addr(&self) -> SocketAddr;
106
107 fn node_id(&self) -> NodeId;
109
110 async fn shutdown(&self);
112}
113
114#[derive(Clone, Debug)]
116pub struct TransportConfig {
117 pub bind_addr: SocketAddr,
119 pub node_id: NodeId,
121 pub max_connections: usize,
123 pub idle_timeout: std::time::Duration,
125}
126
127impl Default for TransportConfig {
128 fn default() -> Self {
129 Self {
130 bind_addr: "0.0.0.0:7000".parse().unwrap(),
131 node_id: NodeId::new(),
132 max_connections: 100,
133 idle_timeout: std::time::Duration::from_secs(30),
134 }
135 }
136}
137
138impl TransportConfig {
139 pub fn new(bind_addr: SocketAddr) -> Self {
141 Self {
142 bind_addr,
143 ..Default::default()
144 }
145 }
146
147 pub fn with_node_id(mut self, node_id: NodeId) -> Self {
149 self.node_id = node_id;
150 self
151 }
152}