rpcnet 0.1.0

RPC library based on QUIC+TLS encryption
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
#![allow(clippy::all)]
#![allow(warnings)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(clippy::needless_borrows_for_generic_args)]
#![allow(clippy::assertions_on_constants)]
// Unit tests for RpcServer start() method with focus on graceful shutdown
// Testing the start() method's behavior when cancelled and stopped gracefully

use rpcnet::{RpcClient, RpcConfig, RpcServer};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use tokio::time::timeout;

fn create_test_config() -> RpcConfig {
    RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
        .with_key_path("certs/test_key.pem")
        .with_server_name("localhost")
        .with_keep_alive_interval(Duration::from_millis(100))
}

#[tokio::test]
async fn test_start_method_with_cancellation() {
    // Test that start() method can be cancelled gracefully using tokio::select!

    let mut server = RpcServer::new(create_test_config());

    // Register a simple handler
    server
        .register("ping", |_| async move { Ok(b"pong".to_vec()) })
        .await;

    // Bind the server to get a quic server instance
    let quic_server = server.bind().expect("Failed to bind server");
    let server_addr = quic_server
        .local_addr()
        .expect("Failed to get server address");

    // Create a cancellation mechanism
    let shutdown_notify = Arc::new(Notify::new());
    let shutdown_notify_clone = shutdown_notify.clone();

    // Start server with cancellation using tokio::select!
    let server_task = tokio::spawn(async move {
        tokio::select! {
            // This branch runs the server
            result = server.start(quic_server) => {
                println!("Server start() completed with: {:?}", result);
                result
            }
            // This branch waits for shutdown signal
            _ = shutdown_notify_clone.notified() => {
                println!("Server received shutdown signal");
                Ok(()) // Return Ok to indicate graceful shutdown
            }
        }
    });

    // Give server a moment to start up
    tokio::time::sleep(Duration::from_millis(200)).await;

    // Verify server is running by connecting to it
    let client = timeout(
        Duration::from_millis(1000),
        RpcClient::connect(server_addr, create_test_config()),
    )
    .await
    .expect("Connection timeout - server may not be running")
    .expect("Failed to connect to server");

    // Make a test call to verify server is responding
    let response = timeout(
        Duration::from_millis(1000),
        client.call("ping", b"test".to_vec()),
    )
    .await
    .expect("Call timeout")
    .expect("Call failed");

    assert_eq!(response, b"pong", "Server should respond correctly");

    // Now trigger graceful shutdown
    shutdown_notify.notify_one();

    // Wait for server task to complete gracefully
    let server_result = timeout(Duration::from_millis(2000), server_task)
        .await
        .expect("Server task should complete within timeout");

    // Verify that the server shut down successfully
    assert!(
        server_result.is_ok(),
        "Server task should complete successfully: {:?}",
        server_result
    );
    let start_result = server_result.expect("Server task should not panic");
    assert!(
        start_result.is_ok(),
        "start() method should return Ok(()) on graceful shutdown"
    );

    println!("✅ Server gracefully shut down after cancellation");
}

#[tokio::test]
async fn test_start_method_with_timeout_cancellation() {
    // Test that start() method works correctly when cancelled by timeout

    let mut server = RpcServer::new(create_test_config());

    server
        .register("echo", |params| async move { Ok(params) })
        .await;

    let quic_server = server.bind().expect("Failed to bind server");
    let server_addr = quic_server
        .local_addr()
        .expect("Failed to get server address");

    // Start server with a timeout (this will cancel it)
    let server_task = tokio::spawn(async move {
        // Use timeout to automatically cancel the server after a short time
        let result = timeout(Duration::from_millis(500), server.start(quic_server)).await;

        match result {
            Ok(start_result) => {
                println!("Server start() completed normally: {:?}", start_result);
                start_result
            }
            Err(_timeout) => {
                println!("Server start() was cancelled by timeout (expected)");
                Ok(()) // This is expected behavior
            }
        }
    });

    // Give server time to start up
    tokio::time::sleep(Duration::from_millis(200)).await;

    // Test that server is working while running
    let connection_result = timeout(
        Duration::from_millis(500),
        RpcClient::connect(server_addr, create_test_config()),
    )
    .await;

    if let Ok(Ok(client)) = connection_result {
        println!("✅ Successfully connected to server");

        // Try to make a call (may succeed or fail depending on timing)
        let call_result = timeout(
            Duration::from_millis(300),
            client.call("echo", b"test_data".to_vec()),
        )
        .await;

        match call_result {
            Ok(Ok(response)) => {
                println!(
                    "✅ Server responded: {:?}",
                    String::from_utf8_lossy(&response)
                );
                assert_eq!(response, b"test_data");
            }
            _ => {
                println!("⏰ Call timed out or failed (acceptable during shutdown)");
            }
        }
    } else {
        println!("⏰ Connection failed (acceptable during rapid startup/shutdown)");
    }

    // Wait for server task to complete
    let server_result = timeout(Duration::from_millis(2000), server_task)
        .await
        .expect("Server task should complete");

    assert!(
        server_result.is_ok(),
        "Server task should complete without panic"
    );
    let start_result = server_result.expect("Server task should not panic");
    assert!(
        start_result.is_ok(),
        "start() should handle cancellation gracefully"
    );

    println!("✅ Server start() method handled timeout cancellation correctly");
}

#[tokio::test]
async fn test_start_method_multiple_connections_during_shutdown() {
    // Test start() method behavior when multiple clients are connected during shutdown

    let mut server = RpcServer::new(create_test_config());

    // Register a handler that takes some time
    server
        .register("slow_task", |_| async move {
            tokio::time::sleep(Duration::from_millis(200)).await;
            Ok(b"task_completed".to_vec())
        })
        .await;

    let quic_server = server.bind().expect("Failed to bind server");
    let server_addr = quic_server
        .local_addr()
        .expect("Failed to get server address");

    let shutdown_notify = Arc::new(Notify::new());
    let shutdown_notify_clone = shutdown_notify.clone();

    // Start server
    let server_task = tokio::spawn(async move {
        tokio::select! {
            result = server.start(quic_server) => {
                println!("Server start() completed: {:?}", result);
                result
            }
            _ = shutdown_notify_clone.notified() => {
                println!("Server shutdown requested");
                Ok(())
            }
        }
    });

    // Wait for server to start
    tokio::time::sleep(Duration::from_millis(200)).await;

    // Create multiple clients
    let mut client_tasks = Vec::new();

    for i in 0..3 {
        let addr = server_addr;
        let config = create_test_config();

        let task = tokio::spawn(async move {
            println!("Client {} attempting to connect", i);

            // Try to connect
            let client_result = timeout(
                Duration::from_millis(1000),
                RpcClient::connect(addr, config),
            )
            .await;

            if let Ok(Ok(client)) = client_result {
                println!("Client {} connected successfully", i);

                // Try to make a call
                let call_result = timeout(
                    Duration::from_millis(1000),
                    client.call("slow_task", format!("request_{}", i).into_bytes()),
                )
                .await;

                match call_result {
                    Ok(Ok(response)) => {
                        println!(
                            "Client {} call succeeded: {:?}",
                            i,
                            String::from_utf8_lossy(&response)
                        );
                        true
                    }
                    _ => {
                        println!("Client {} call failed or timed out", i);
                        false
                    }
                }
            } else {
                println!("Client {} failed to connect", i);
                false
            }
        });

        client_tasks.push(task);
    }

    // Give clients a moment to connect and start their requests
    tokio::time::sleep(Duration::from_millis(100)).await;

    // Trigger shutdown while clients are potentially active
    println!("Triggering server shutdown...");
    shutdown_notify.notify_one();

    // Wait for server to shut down
    let server_result = timeout(Duration::from_millis(3000), server_task).await;

    assert!(
        server_result.is_ok(),
        "Server should shut down within timeout"
    );
    let server_task_result = server_result.unwrap();
    assert!(
        server_task_result.is_ok(),
        "Server task should complete successfully"
    );
    let start_method_result = server_task_result.unwrap();
    assert!(
        start_method_result.is_ok(),
        "start() method should return Ok(()) on graceful shutdown"
    );

    // Wait for all client tasks to complete and collect results
    let mut successful_calls = 0;
    for (i, task) in client_tasks.into_iter().enumerate() {
        let client_result = timeout(Duration::from_millis(2000), task).await;

        match client_result {
            Ok(Ok(true)) => {
                successful_calls += 1;
                println!("Client {} completed successfully", i);
            }
            Ok(Ok(false)) => {
                println!("Client {} completed but call failed", i);
            }
            _ => {
                println!("Client {} task failed or timed out", i);
            }
        }
    }

    println!(
        "✅ Server gracefully shut down with {} successful client calls",
        successful_calls
    );
    println!("✅ start() method handled multiple connections during shutdown correctly");
}

#[tokio::test]
async fn test_start_method_immediate_shutdown() {
    // Test start() method when shutdown is triggered immediately after start

    let mut server = RpcServer::new(create_test_config());

    server
        .register("test", |_| async move { Ok(b"response".to_vec()) })
        .await;

    let quic_server = server.bind().expect("Failed to bind server");

    let shutdown_notify = Arc::new(Notify::new());
    let shutdown_notify_clone = shutdown_notify.clone();

    // Trigger shutdown immediately
    shutdown_notify.notify_one();

    // Start server after shutdown is already triggered
    let server_result = timeout(Duration::from_millis(1000), async move {
        tokio::select! {
            result = server.start(quic_server) => {
                println!("Server start() completed: {:?}", result);
                result
            }
            _ = shutdown_notify_clone.notified() => {
                println!("Server shutdown signal received immediately");
                Ok(())
            }
        }
    })
    .await;

    // The server should shut down immediately
    assert!(
        server_result.is_ok(),
        "Server should complete quickly when shutdown is pre-triggered"
    );
    let start_result = server_result.unwrap();
    assert!(
        start_result.is_ok(),
        "start() should handle immediate shutdown gracefully"
    );

    println!("✅ start() method handled immediate shutdown correctly");
}

#[tokio::test]
async fn test_start_method_returns_ok_on_completion() {
    // Test that start() method returns Ok(()) when it completes successfully

    let mut server = RpcServer::new(create_test_config());

    server
        .register("simple", |_| async move { Ok(b"ok".to_vec()) })
        .await;

    let quic_server = server.bind().expect("Failed to bind server");

    // Test that we can call start() and it returns the correct type
    // We'll cancel it quickly to test the return value
    let start_future = server.start(quic_server);

    // Cancel the future quickly using timeout
    let result = timeout(Duration::from_millis(10), start_future).await;

    // The timeout should occur (since start() runs indefinitely)
    assert!(
        result.is_err(),
        "start() should run indefinitely until cancelled"
    );

    println!("✅ start() method signature and behavior verified - runs indefinitely as expected");
}