Server

Struct Server 

Source
pub struct Server { /* private fields */ }
Expand description

Network performance test server.

The Server listens for incoming TCP control connections and handles both TCP and UDP performance test requests. All tests use a TCP control channel for coordination, with UDP data transfer happening on the same port. The server supports reverse mode testing, bandwidth limiting, interval reporting, JSON output, and can handle multiple concurrent clients.

§Features

  • TCP Control Channel: Always uses TCP for client coordination
  • TCP and UDP Data: Handle both reliable (TCP) and unreliable (UDP) performance tests
  • Reverse Mode: Send data to client for reverse throughput testing
  • Bandwidth Limiting: Control send rate in reverse mode tests
  • Interval Reporting: Display periodic statistics during tests
  • JSON Output: Machine-readable output format for automation
  • UDP Metrics: Track packet loss, jitter, and out-of-order delivery
  • Concurrent Clients: Handle multiple simultaneous test connections

§Examples

§Basic TCP Server

use rperf3::{Server, Config};

let config = Config::server(5201);
let server = Server::new(config);

println!("Starting server on port 5201...");
server.run().await?;

§UDP Server with Reverse Mode

use rperf3::{Server, Config, Protocol};

let config = Config::server(5201)
    .with_protocol(Protocol::Udp)
    .with_reverse(true); // Server will send UDP data

let server = Server::new(config);
server.run().await?;

Implementations§

Source§

impl Server

Source

pub fn new(config: Config) -> Self

Creates a new server with the given configuration.

§Arguments
  • config - The server configuration including port and protocol
§Examples
use rperf3::{Server, Config};

let config = Config::server(5201);
let server = Server::new(config);
Examples found in repository?
examples/server.rs (line 11)
4async fn main() -> Result<(), Box<dyn std::error::Error>> {
5    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
6
7    println!("Starting rperf3 server on port 5201...");
8
9    let config = Config::server(5201).with_protocol(Protocol::Tcp);
10
11    let server = Server::new(config);
12    server.run().await?;
13
14    Ok(())
15}
More examples
Hide additional examples
examples/cancellable_test.rs (line 35)
30async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
31    println!("Starting TCP server on port 5201...");
32    println!("Press Ctrl+C to stop the server gracefully.\n");
33
34    let config = Config::server(5201);
35    let server = Server::new(config);
36
37    // Clone the cancellation token to handle CTRL+C
38    let cancel_token = server.cancellation_token().clone();
39
40    // Set up CTRL+C handler
41    tokio::spawn(async move {
42        tokio::signal::ctrl_c()
43            .await
44            .expect("Failed to listen for CTRL+C");
45        println!("\nReceived CTRL+C, shutting down server gracefully...");
46        cancel_token.cancel();
47    });
48
49    server.run().await?;
50    println!("Server stopped.");
51    Ok(())
52}
examples/tcp_nodelay_test.rs (line 30)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18    // Initialize logging
19    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
20
21    println!("TCP Socket Optimization Demo");
22    println!("============================\n");
23    println!("This demo tests TCP with:");
24    println!("- TCP_NODELAY enabled (Nagle's algorithm disabled)");
25    println!("- 256KB send/receive buffers");
26    println!("- Expected 10-20% improvement over default settings\n");
27
28    // Start server in background
29    let server_config = Config::server(5201).with_protocol(Protocol::Tcp);
30    let server = Server::new(server_config);
31
32    // Run server in background task
33    tokio::spawn(async move {
34        if let Err(e) = server.run().await {
35            eprintln!("Server error: {}", e);
36        }
37    });
38
39    // Give server time to start
40    time::sleep(Duration::from_millis(100)).await;
41
42    println!("Starting TCP test (5 seconds)...\n");
43
44    // Run client test
45    let client_config = Config::client("127.0.0.1".to_string(), 5201)
46        .with_protocol(Protocol::Tcp)
47        .with_duration(Duration::from_secs(5))
48        .with_interval(Duration::from_secs(1));
49
50    let client = Client::new(client_config)?;
51    client.run().await?;
52
53    // Get measurements
54    let measurements = client.get_measurements();
55    let total_bytes = measurements.total_bytes_sent + measurements.total_bytes_received;
56    let bits_per_second = measurements.total_bits_per_second();
57
58    println!("\n=== Results ===");
59    println!(
60        "Total transferred: {:.2} MB",
61        total_bytes as f64 / 1_000_000.0
62    );
63    println!("Throughput: {:.2} Mbps", bits_per_second / 1_000_000.0);
64    println!("\nSocket optimizations applied:");
65    println!("✓ TCP_NODELAY: enabled");
66    println!("✓ Send buffer: 256KB");
67    println!("✓ Recv buffer: 256KB");
68
69    Ok(())
70}
examples/test_retransmits.rs (line 24)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    env_logger::init();
14
15    let args: Vec<String> = std::env::args().collect();
16    let mode = args.get(1).map(|s| s.as_str()).unwrap_or("client");
17
18    match mode {
19        "server" => {
20            println!("Starting TCP server on port 5201...");
21            println!("Retransmits will be shown if they occur during the test.\n");
22
23            let config = Config::server(5201);
24            let server = Server::new(config);
25            server.run().await?;
26        }
27        "client" => {
28            println!("Starting TCP client test (10 seconds)...");
29            println!("Retransmits will be tracked and displayed per interval.\n");
30
31            let config = Config::client("127.0.0.1".to_string(), 5201)
32                .with_duration(Duration::from_secs(10));
33
34            let client = Client::new(config)?.with_callback(|event: ProgressEvent| match event {
35                ProgressEvent::TestStarted => {
36                    println!("Test started");
37                }
38                ProgressEvent::IntervalUpdate {
39                    interval_start,
40                    interval_end,
41                    bits_per_second,
42                    retransmits,
43                    ..
44                } => {
45                    if let Some(retrans) = retransmits {
46                        println!(
47                            "[{:.1}-{:.1}s] {:.2} Mbps  - {} retransmits detected",
48                            interval_start.as_secs_f64(),
49                            interval_end.as_secs_f64(),
50                            bits_per_second / 1_000_000.0,
51                            retrans
52                        );
53                    } else {
54                        println!(
55                            "[{:.1}-{:.1}s] {:.2} Mbps  - no retransmits",
56                            interval_start.as_secs_f64(),
57                            interval_end.as_secs_f64(),
58                            bits_per_second / 1_000_000.0
59                        );
60                    }
61                }
62                ProgressEvent::TestCompleted {
63                    total_bytes,
64                    bits_per_second,
65                    ..
66                } => {
67                    println!("\nTest completed:");
68                    println!("  Total bytes: {}", total_bytes);
69                    println!(
70                        "  Average throughput: {:.2} Mbps",
71                        bits_per_second / 1_000_000.0
72                    );
73                    println!("\nNote: Retransmit tracking is only available on Linux systems.");
74                }
75                ProgressEvent::Error(msg) => {
76                    eprintln!("Error: {}", msg);
77                }
78            });
79
80            client.run().await?;
81        }
82        _ => {
83            eprintln!("Usage: {} [server|client]", args[0]);
84            std::process::exit(1);
85        }
86    }
87
88    Ok(())
89}
Source

pub fn cancellation_token(&self) -> &CancellationToken

Returns a reference to the cancellation token.

This allows external code to cancel the running server gracefully.

§Examples
use rperf3::{Server, Config};

let config = Config::server(5201);
let server = Server::new(config);

// Get cancellation token to stop from another task
let cancel_token = server.cancellation_token().clone();

tokio::spawn(async move {
    // Server will be running
});

// Later, to stop the server:
cancel_token.cancel();
Examples found in repository?
examples/cancellable_test.rs (line 38)
30async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
31    println!("Starting TCP server on port 5201...");
32    println!("Press Ctrl+C to stop the server gracefully.\n");
33
34    let config = Config::server(5201);
35    let server = Server::new(config);
36
37    // Clone the cancellation token to handle CTRL+C
38    let cancel_token = server.cancellation_token().clone();
39
40    // Set up CTRL+C handler
41    tokio::spawn(async move {
42        tokio::signal::ctrl_c()
43            .await
44            .expect("Failed to listen for CTRL+C");
45        println!("\nReceived CTRL+C, shutting down server gracefully...");
46        cancel_token.cancel();
47    });
48
49    server.run().await?;
50    println!("Server stopped.");
51    Ok(())
52}
Source

pub async fn run(&self) -> Result<()>

Starts the server and begins listening for client connections.

This method will run indefinitely, accepting and handling client connections. For TCP, each client connection is handled in a separate task. For UDP, the server processes incoming datagrams.

§Errors

Returns an error if:

  • Cannot bind to the specified port
  • Network I/O errors occur
§Examples
use rperf3::{Server, Config};

let config = Config::server(5201);
let server = Server::new(config);

println!("Server running...");
server.run().await?;
Examples found in repository?
examples/server.rs (line 12)
4async fn main() -> Result<(), Box<dyn std::error::Error>> {
5    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
6
7    println!("Starting rperf3 server on port 5201...");
8
9    let config = Config::server(5201).with_protocol(Protocol::Tcp);
10
11    let server = Server::new(config);
12    server.run().await?;
13
14    Ok(())
15}
More examples
Hide additional examples
examples/cancellable_test.rs (line 49)
30async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
31    println!("Starting TCP server on port 5201...");
32    println!("Press Ctrl+C to stop the server gracefully.\n");
33
34    let config = Config::server(5201);
35    let server = Server::new(config);
36
37    // Clone the cancellation token to handle CTRL+C
38    let cancel_token = server.cancellation_token().clone();
39
40    // Set up CTRL+C handler
41    tokio::spawn(async move {
42        tokio::signal::ctrl_c()
43            .await
44            .expect("Failed to listen for CTRL+C");
45        println!("\nReceived CTRL+C, shutting down server gracefully...");
46        cancel_token.cancel();
47    });
48
49    server.run().await?;
50    println!("Server stopped.");
51    Ok(())
52}
examples/tcp_nodelay_test.rs (line 34)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18    // Initialize logging
19    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
20
21    println!("TCP Socket Optimization Demo");
22    println!("============================\n");
23    println!("This demo tests TCP with:");
24    println!("- TCP_NODELAY enabled (Nagle's algorithm disabled)");
25    println!("- 256KB send/receive buffers");
26    println!("- Expected 10-20% improvement over default settings\n");
27
28    // Start server in background
29    let server_config = Config::server(5201).with_protocol(Protocol::Tcp);
30    let server = Server::new(server_config);
31
32    // Run server in background task
33    tokio::spawn(async move {
34        if let Err(e) = server.run().await {
35            eprintln!("Server error: {}", e);
36        }
37    });
38
39    // Give server time to start
40    time::sleep(Duration::from_millis(100)).await;
41
42    println!("Starting TCP test (5 seconds)...\n");
43
44    // Run client test
45    let client_config = Config::client("127.0.0.1".to_string(), 5201)
46        .with_protocol(Protocol::Tcp)
47        .with_duration(Duration::from_secs(5))
48        .with_interval(Duration::from_secs(1));
49
50    let client = Client::new(client_config)?;
51    client.run().await?;
52
53    // Get measurements
54    let measurements = client.get_measurements();
55    let total_bytes = measurements.total_bytes_sent + measurements.total_bytes_received;
56    let bits_per_second = measurements.total_bits_per_second();
57
58    println!("\n=== Results ===");
59    println!(
60        "Total transferred: {:.2} MB",
61        total_bytes as f64 / 1_000_000.0
62    );
63    println!("Throughput: {:.2} Mbps", bits_per_second / 1_000_000.0);
64    println!("\nSocket optimizations applied:");
65    println!("✓ TCP_NODELAY: enabled");
66    println!("✓ Send buffer: 256KB");
67    println!("✓ Recv buffer: 256KB");
68
69    Ok(())
70}
examples/test_retransmits.rs (line 25)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    env_logger::init();
14
15    let args: Vec<String> = std::env::args().collect();
16    let mode = args.get(1).map(|s| s.as_str()).unwrap_or("client");
17
18    match mode {
19        "server" => {
20            println!("Starting TCP server on port 5201...");
21            println!("Retransmits will be shown if they occur during the test.\n");
22
23            let config = Config::server(5201);
24            let server = Server::new(config);
25            server.run().await?;
26        }
27        "client" => {
28            println!("Starting TCP client test (10 seconds)...");
29            println!("Retransmits will be tracked and displayed per interval.\n");
30
31            let config = Config::client("127.0.0.1".to_string(), 5201)
32                .with_duration(Duration::from_secs(10));
33
34            let client = Client::new(config)?.with_callback(|event: ProgressEvent| match event {
35                ProgressEvent::TestStarted => {
36                    println!("Test started");
37                }
38                ProgressEvent::IntervalUpdate {
39                    interval_start,
40                    interval_end,
41                    bits_per_second,
42                    retransmits,
43                    ..
44                } => {
45                    if let Some(retrans) = retransmits {
46                        println!(
47                            "[{:.1}-{:.1}s] {:.2} Mbps  - {} retransmits detected",
48                            interval_start.as_secs_f64(),
49                            interval_end.as_secs_f64(),
50                            bits_per_second / 1_000_000.0,
51                            retrans
52                        );
53                    } else {
54                        println!(
55                            "[{:.1}-{:.1}s] {:.2} Mbps  - no retransmits",
56                            interval_start.as_secs_f64(),
57                            interval_end.as_secs_f64(),
58                            bits_per_second / 1_000_000.0
59                        );
60                    }
61                }
62                ProgressEvent::TestCompleted {
63                    total_bytes,
64                    bits_per_second,
65                    ..
66                } => {
67                    println!("\nTest completed:");
68                    println!("  Total bytes: {}", total_bytes);
69                    println!(
70                        "  Average throughput: {:.2} Mbps",
71                        bits_per_second / 1_000_000.0
72                    );
73                    println!("\nNote: Retransmit tracking is only available on Linux systems.");
74                }
75                ProgressEvent::Error(msg) => {
76                    eprintln!("Error: {}", msg);
77                }
78            });
79
80            client.run().await?;
81        }
82        _ => {
83            eprintln!("Usage: {} [server|client]", args[0]);
84            std::process::exit(1);
85        }
86    }
87
88    Ok(())
89}
Source

pub fn get_measurements(&self) -> Measurements

Retrieves the current measurements collected by the server.

Returns a snapshot of the statistics collected from client tests. This includes total bytes transferred, bandwidth measurements, and UDP-specific metrics like packet loss and jitter.

§Returns

A Measurements struct containing comprehensive test statistics.

§Examples
use rperf3::{Server, Config};

let config = Config::server(5201);
let server = Server::new(config);

// After tests have run
let measurements = server.get_measurements();
println!("Total bytes: {}", measurements.total_bytes_received);
println!("Throughput: {:.2} Mbps",
         measurements.total_bits_per_second() / 1_000_000.0);

Auto Trait Implementations§

§

impl Freeze for Server

§

impl !RefUnwindSafe for Server

§

impl Send for Server

§

impl Sync for Server

§

impl Unpin for Server

§

impl !UnwindSafe for Server

Blanket Implementations§

§

impl<T> Any for T
where T: 'static + ?Sized,

§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<T> Borrow<T> for T
where T: ?Sized,

§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
§

impl<T> BorrowMut<T> for T
where T: ?Sized,

§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> From<T> for T

§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T, U> Into<U> for T
where U: From<T>,

§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V