Skip to main content

pollen_transport/
lib.rs

1//! QUIC-based transport layer for Pollen.
2//!
3//! Provides reliable, encrypted communication between cluster nodes
4//! using the QUIC protocol (RFC 9000).
5
6mod message;
7mod quic;
8mod tls;
9
10pub use message::*;
11pub use quic::QuicTransport;
12
13use async_trait::async_trait;
14use pollen_clock::Timestamp;
15use pollen_types::{NodeId, Result};
16use std::net::SocketAddr;
17use tokio::sync::mpsc;
18
19/// Message envelope for cluster communication.
20#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
21pub struct Envelope {
22    /// Sender node ID.
23    pub from: NodeId,
24    /// Recipient node ID.
25    pub to: NodeId,
26    /// Message type.
27    pub msg_type: MessageType,
28    /// Serialized payload.
29    pub payload: bytes::Bytes,
30    /// HLC timestamp.
31    pub timestamp: Timestamp,
32}
33
34impl Envelope {
35    /// Create a new envelope.
36    pub fn new(
37        from: NodeId,
38        to: NodeId,
39        msg_type: MessageType,
40        payload: bytes::Bytes,
41        timestamp: Timestamp,
42    ) -> Self {
43        Self {
44            from,
45            to,
46            msg_type,
47            payload,
48            timestamp,
49        }
50    }
51
52    /// Serialize the envelope.
53    pub fn serialize(&self) -> Result<bytes::Bytes> {
54        bincode::serialize(self)
55            .map(bytes::Bytes::from)
56            .map_err(|e| pollen_types::PollenError::Serialization(e.to_string()))
57    }
58
59    /// Deserialize an envelope.
60    pub fn deserialize(data: &[u8]) -> Result<Self> {
61        bincode::deserialize(data)
62            .map_err(|e| pollen_types::PollenError::Serialization(e.to_string()))
63    }
64}
65
66/// Types of messages exchanged between nodes.
67#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
68pub enum MessageType {
69    // Membership messages
70    Ping,
71    PingAck,
72    PingReq,
73    MembershipUpdate,
74
75    // CRDT synchronization
76    CrdtDelta,
77    CrdtFullSync,
78    CrdtSyncRequest,
79
80    // Anti-entropy
81    MerkleTreeRequest,
82    MerkleTreeResponse,
83    DataRangeRequest,
84    DataRangeResponse,
85
86    // Task coordination
87    TaskClaim,
88    TaskClaimAck,
89    TaskComplete,
90}
91
92/// Transport abstraction for cluster communication.
93#[async_trait]
94pub trait Transport: Send + Sync + 'static {
95    /// Send a message to a specific node.
96    async fn send(&self, to: SocketAddr, envelope: Envelope) -> Result<()>;
97
98    /// Send a message and wait for a response.
99    async fn send_recv(&self, to: SocketAddr, envelope: Envelope) -> Result<Envelope>;
100
101    /// Get the receiver for incoming messages.
102    fn incoming(&self) -> mpsc::Receiver<Envelope>;
103
104    /// Get the local bind address.
105    fn local_addr(&self) -> SocketAddr;
106
107    /// Get the local node ID.
108    fn node_id(&self) -> NodeId;
109
110    /// Shutdown the transport.
111    async fn shutdown(&self);
112}
113
114/// Transport configuration.
115#[derive(Clone, Debug)]
116pub struct TransportConfig {
117    /// Address to bind for listening.
118    pub bind_addr: SocketAddr,
119    /// Node ID.
120    pub node_id: NodeId,
121    /// Maximum concurrent connections.
122    pub max_connections: usize,
123    /// Idle timeout for connections.
124    pub idle_timeout: std::time::Duration,
125}
126
127impl Default for TransportConfig {
128    fn default() -> Self {
129        Self {
130            bind_addr: "0.0.0.0:7000".parse().unwrap(),
131            node_id: NodeId::new(),
132            max_connections: 100,
133            idle_timeout: std::time::Duration::from_secs(30),
134        }
135    }
136}
137
138impl TransportConfig {
139    /// Create a config with the specified bind address.
140    pub fn new(bind_addr: SocketAddr) -> Self {
141        Self {
142            bind_addr,
143            ..Default::default()
144        }
145    }
146
147    /// Set the node ID.
148    pub fn with_node_id(mut self, node_id: NodeId) -> Self {
149        self.node_id = node_id;
150        self
151    }
152}