pollen-transport 0.1.0

QUIC-based transport layer for Pollen
Documentation
//! QUIC-based transport layer for Pollen.
//!
//! Provides reliable, encrypted communication between cluster nodes
//! using the QUIC protocol (RFC 9000).

mod message;
mod quic;
mod tls;

pub use message::*;
pub use quic::QuicTransport;

use async_trait::async_trait;
use pollen_clock::Timestamp;
use pollen_types::{NodeId, Result};
use std::net::SocketAddr;
use tokio::sync::mpsc;

/// Message envelope for cluster communication.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct Envelope {
    /// Sender node ID.
    pub from: NodeId,
    /// Recipient node ID.
    pub to: NodeId,
    /// Message type.
    pub msg_type: MessageType,
    /// Serialized payload.
    pub payload: bytes::Bytes,
    /// HLC timestamp.
    pub timestamp: Timestamp,
}

impl Envelope {
    /// Create a new envelope.
    pub fn new(
        from: NodeId,
        to: NodeId,
        msg_type: MessageType,
        payload: bytes::Bytes,
        timestamp: Timestamp,
    ) -> Self {
        Self {
            from,
            to,
            msg_type,
            payload,
            timestamp,
        }
    }

    /// Serialize the envelope.
    pub fn serialize(&self) -> Result<bytes::Bytes> {
        bincode::serialize(self)
            .map(bytes::Bytes::from)
            .map_err(|e| pollen_types::PollenError::Serialization(e.to_string()))
    }

    /// Deserialize an envelope.
    pub fn deserialize(data: &[u8]) -> Result<Self> {
        bincode::deserialize(data)
            .map_err(|e| pollen_types::PollenError::Serialization(e.to_string()))
    }
}

/// Types of messages exchanged between nodes.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum MessageType {
    // Membership messages
    Ping,
    PingAck,
    PingReq,
    MembershipUpdate,

    // CRDT synchronization
    CrdtDelta,
    CrdtFullSync,
    CrdtSyncRequest,

    // Anti-entropy
    MerkleTreeRequest,
    MerkleTreeResponse,
    DataRangeRequest,
    DataRangeResponse,

    // Task coordination
    TaskClaim,
    TaskClaimAck,
    TaskComplete,
}

/// Transport abstraction for cluster communication.
#[async_trait]
pub trait Transport: Send + Sync + 'static {
    /// Send a message to a specific node.
    async fn send(&self, to: SocketAddr, envelope: Envelope) -> Result<()>;

    /// Send a message and wait for a response.
    async fn send_recv(&self, to: SocketAddr, envelope: Envelope) -> Result<Envelope>;

    /// Get the receiver for incoming messages.
    fn incoming(&self) -> mpsc::Receiver<Envelope>;

    /// Get the local bind address.
    fn local_addr(&self) -> SocketAddr;

    /// Get the local node ID.
    fn node_id(&self) -> NodeId;

    /// Shutdown the transport.
    async fn shutdown(&self);
}

/// Transport configuration.
#[derive(Clone, Debug)]
pub struct TransportConfig {
    /// Address to bind for listening.
    pub bind_addr: SocketAddr,
    /// Node ID.
    pub node_id: NodeId,
    /// Maximum concurrent connections.
    pub max_connections: usize,
    /// Idle timeout for connections.
    pub idle_timeout: std::time::Duration,
}

impl Default for TransportConfig {
    fn default() -> Self {
        Self {
            bind_addr: "0.0.0.0:7000".parse().unwrap(),
            node_id: NodeId::new(),
            max_connections: 100,
            idle_timeout: std::time::Duration::from_secs(30),
        }
    }
}

impl TransportConfig {
    /// Create a config with the specified bind address.
    pub fn new(bind_addr: SocketAddr) -> Self {
        Self {
            bind_addr,
            ..Default::default()
        }
    }

    /// Set the node ID.
    pub fn with_node_id(mut self, node_id: NodeId) -> Self {
        self.node_id = node_id;
        self
    }
}