Features
Security
- Secure handshake + post-handshake encryption using Elliptic Curve Diffie-Hellman (
ECDH) key exchange
- TLS transport with client/server implementations and mutual authentication (
mTLS)
- Certificate pinning for enhanced security in TLS connections
- Self-signed certificate generation capability for development environments
- Protection against replay attacks using timestamps and nonce verification
Performance & Reliability
- Advanced backpressure mechanism to prevent server overload from slow clients
- Bounded channels with dynamic read pausing to maintain stable memory usage
- Configurable connection timeouts for all network operations with proper error handling
- Heartbeat mechanism with keep-alive ping/pong messages for connection health monitoring
- Automatic detection and cleanup of dead connections
- Client-side timeout handling with reconnection capabilities
Core Architecture
- Custom binary packet format with optional compression (
LZ4, Zstd)
- Plugin-friendly dispatcher for message routing with zero-copy serialization
- Graceful shutdown support for all server implementations with configurable timeouts
- Modular transport:
TCP, Unix socket, TLS, cluster sync
- Comprehensive configuration system with
TOML files and environment variable overrides
- Structured logging with flexible log level control via configuration
Compatibility
- Cross-platform support for local transport (Windows, Linux, macOS)
- Windows-compatible alternative for Unix Domain Sockets
- Ready for microservices, databases, daemons, and system protocols
Installation
Add the library to your Cargo.toml:
[dependencies]
network-protocol = "1.0.0"
Example Usage
TCP Server with Backpressure and Structured Logging
use network_protocol::utils::logging;
use network_protocol::service::daemon::{self, ServerConfig};
use network_protocol::config::NetworkConfig;
use network_protocol::protocol::dispatcher::Dispatcher;
use network_protocol::error::Result;
use std::sync::Arc;
use std::time::Duration;
use tracing::{info, warn};
#[tokio::main]
async fn main() -> Result<()> {
logging::init_logging(Some("info"), None).expect("Failed to initialize logging");
let dispatcher = Arc::new(Dispatcher::default());
dispatcher.register("ECHO", |msg| {
info!(message_type = "ECHO", "Processing echo request");
Ok(msg.clone())
});
let config = ServerConfig {
address: "127.0.0.1:9000".to_string(),
backpressure_limit: 100, connection_timeout: Duration::from_secs(30),
heartbeat_interval: Duration::from_secs(15),
shutdown_timeout: Duration::from_secs(10),
max_connections: 1000,
};
let server = daemon::new_with_config(config, dispatcher);
tokio::spawn(async move {
tokio::signal::ctrl_c().await.expect("Failed to listen for ctrl+c");
info!("Initiating graceful shutdown...");
server.shutdown(Some(Duration::from_secs(10))).await;
});
info!("Server starting on 127.0.0.1:9000");
server.run().await
}
TLS Server
#[tokio::main]
async fn main() -> Result<()> {
let cert_config = TlsConfig {
cert_path: "server_cert.pem",
key_path: "server_key.pem",
ca_path: Some("ca_cert.pem"), verify_client: true, };
network_protocol::service::tls_daemon::start("127.0.0.1:9443", cert_config).await?;
Ok(())
}
Client with Timeout Handling
use network_protocol::utils::logging;
use network_protocol::service::client::{self, ClientConfig};
use network_protocol::config::NetworkConfig;
use network_protocol::protocol::message::Message;
use network_protocol::error::ProtocolError;
use std::time::Duration;
use tracing::{info, error};
use tokio::time::timeout;
#[tokio::main]
async fn main() -> Result<(), ProtocolError> {
logging::init_logging(Some("info"), None)?;
let config = ClientConfig {
address: "127.0.0.1:9000".to_string(),
connection_timeout: Duration::from_secs(5),
operation_timeout: Duration::from_secs(3),
response_timeout: Duration::from_secs(30),
heartbeat_interval: Duration::from_secs(15),
auto_reconnect: true,
max_reconnect_attempts: 3,
reconnect_delay: Duration::from_secs(1),
};
info!("Connecting to server...");
let mut conn = match timeout(Duration::from_secs(5), client::connect_with_config(config)).await {
Ok(Ok(conn)) => conn,
Ok(Err(e)) => {
error!(error = ?e, "Failed to connect to server");
return Err(e);
}
Err(_) => {
error!("Connection timeout");
return Err(ProtocolError::Timeout);
}
};
info!("Connected successfully");
match timeout(Duration::from_secs(3), conn.secure_send(Message::Echo("hello".into()))).await {
Ok(Ok(_)) => info!("Message sent successfully"),
Ok(Err(e)) => {
error!(error = ?e, "Failed to send message");
return Err(e);
}
Err(_) => {
error!("Send timeout");
return Err(ProtocolError::Timeout);
}
}
let reply = match timeout(Duration::from_secs(3), conn.secure_recv()).await {
Ok(Ok(msg)) => msg,
Ok(Err(e)) => {
error!(error = ?e, "Failed to receive reply");
return Err(e);
}
Err(_) => {
error!("Receive timeout");
return Err(ProtocolError::Timeout);
}
};
info!(reply = ?reply, "Received reply");
conn.close().await?
Ok(())
}
TLS Client
use network_protocol::service::client::{self, TlsClientConfig};
use network_protocol::protocol::message::Message;
use network_protocol::error::Result;
use tracing::info;
#[tokio::main]
async fn main() -> Result<()> {
let tls_config = TlsClientConfig {
cert_path: Some("client_cert.pem"), key_path: Some("client_key.pem"), ca_path: Some("ca_cert.pem"), server_name: "example.com", };
let mut conn = client::connect_tls(
"127.0.0.1:9443",
tls_config
).await?;
info!("Connected securely to TLS server");
conn.send(Message::Echo("secure message".into())).await?;
let reply = conn.receive().await?;
info!(response = ?reply, "Received secure response");
conn.close().await?
}
Message Types
Built-in messages include:
HandshakeInit / HandshakeAck
Ping / Pong
Echo(String)
Unknown
You can extend this list with your own enums or handlers.
Custom Message Handlers
Register your own handlers with the dispatcher to process different message types:
use network_protocol::protocol::dispatcher::Dispatcher;
use network_protocol::protocol::message::Message;
use network_protocol::error::Result;
use std::sync::Arc;
use tracing::info;
let dispatcher = Arc::new(Dispatcher::default());
dispatcher.register("PING", |_| {
info!("Ping received, sending pong");
Ok(Message::Pong)
});
dispatcher.register("ECHO", |msg| {
info!(content = ?msg, "Echo request received");
Ok(msg.clone())
});
dispatcher.register("DATA_PROCESS", |msg| {
if let Message::Custom(data) = msg {
info!(bytes = data.len(), "Processing custom data");
if data.len() > 100 {
Ok(Message::Custom(vec![1, 0, 1])) } else {
Ok(Message::Custom(vec![0, 0, 1])) }
} else {
info!("Received incorrect message type for DATA_PROCESS");
Ok(Message::Unknown)
}
});
The dispatcher automatically routes incoming messages based on their message_type(). You can register handlers for both built-in message types and your own custom message types.
Running Tests
cargo test
Runs full unit + integration tests.
Benchmarking
cargo test --test perf -- --nocapture
cargo test --test perf benchmark_roundtrip_latency -- --nocapture
cargo test --test perf benchmark_throughput -- --nocapture
Performance Metrics
| Metric |
Result |
Environment |
| Roundtrip Latency |
<1ms avg |
Local transport |
| Throughput |
~5,000 msg/sec |
Standard payload |
| TLS Overhead |
+2-5ms |
With certificate validation |
The library includes comprehensive benchmarking tools that measure:
- Message roundtrip latency (client → server → client)
- Maximum throughput under various conditions
- Backpressure effectiveness during high load
- Connection recovery after network failures
For detailed benchmarking documentation, see the API Reference.
Project Structure
src/
├── config.rs # Configuration structures and loading
├── core/ # Codec, packet structure
├── protocol/ # Handshake, heartbeat, message types
├── transport/ # TCP, Unix socket, Cluster (WIP)
├── service/ # Daemon + client APIs
├── utils/ # Compression, crypto, timers
Documentation |
API Reference |
Performance |
Principles