1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#![allow(clippy::all)]
#![allow(warnings)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(clippy::needless_borrows_for_generic_args)]
#![allow(clippy::assertions_on_constants)]
// Unit tests for RpcServer::drive_connection method
// These tests focus on the core logic flow and error handling of the drive_connection method
use rpcnet::{ConnectionDriveOutcome, RpcConfig, RpcServer};
use std::sync::Arc;
use tokio::sync::oneshot;
// Mock implementation for testing drive_connection logic
// Since we can't easily mock QUIC connections, we'll test the logic we can access
#[test]
fn test_connection_drive_outcome_enum() {
// Test the ConnectionDriveOutcome enum variants
// This tests the enum used as return value from drive_connection
// Note: We can't create actual QuicConnection instances for testing without
// a full QUIC setup, but we can verify the enum structure exists
// and matches what drive_connection expects to return
// This test ensures the enum variants exist and can be matched
let test_match = |outcome: Result<ConnectionDriveOutcome, rpcnet::RpcError>| match outcome {
Ok(ConnectionDriveOutcome::ConnectionClosed) => "closed",
Ok(ConnectionDriveOutcome::HandoffReady(_)) => "handoff",
Err(_) => "error",
};
// Create a mock error to test the error case
let error_outcome = Err(rpcnet::RpcError::StreamError("test error".to_string()));
assert_eq!(test_match(error_outcome), "error");
}
#[test]
fn test_rpc_server_creation_for_drive_connection() {
// Test that we can create an RpcServer instance that has the drive_connection method
// This verifies the basic setup needed for drive_connection
let config = RpcConfig::new("test_cert.pem", "localhost:8080");
let server = RpcServer::new(config);
// Verify the server has the expected structure
// The handlers should be empty initially
assert!(Arc::strong_count(&server.handlers) >= 1);
assert!(Arc::strong_count(&server.streaming_handlers) >= 1);
}
#[tokio::test]
async fn test_shutdown_channel_creation() {
// Test the oneshot channel creation that drive_connection uses
// This tests the shutdown mechanism pattern used in drive_connection
let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
// Test that we can check if shutdown was signaled
// This simulates the shutdown check in drive_connection
tokio::select! {
_ = &mut shutdown_rx => {
panic!("Shutdown should not be signaled yet");
}
_ = tokio::time::sleep(tokio::time::Duration::from_millis(1)) => {
// Expected path - shutdown not signaled
}
}
// Send shutdown signal
shutdown_tx
.send(())
.expect("Failed to send shutdown signal");
// Now shutdown should be signaled
tokio::select! {
_ = &mut shutdown_rx => {
// Expected path - shutdown signaled
}
_ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
panic!("Shutdown signal should have been received");
}
}
}
#[tokio::test]
async fn test_task_join_handle_collection() {
// Test the pattern of collecting JoinHandles used in drive_connection
// This tests the stream task management logic
use tokio::task::JoinHandle;
let mut stream_tasks: Vec<JoinHandle<()>> = Vec::new();
// Spawn some test tasks similar to how drive_connection spawns stream handlers
for i in 0..3 {
let task = tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(10 * i)).await;
});
stream_tasks.push(task);
}
// Test aborting tasks (like drive_connection does on shutdown)
for task in &stream_tasks {
task.abort();
}
// Verify tasks were aborted
for task in stream_tasks {
let result = task.await;
assert!(result.is_err()); // Should be cancelled
}
}
#[test]
fn test_handlers_initialization() {
// Test the handlers structure used by drive_connection
// This verifies the RwLock and HashMap structure used for storing handlers
let config = RpcConfig::new("test_cert.pem", "localhost:8080");
let server = RpcServer::new(config);
// Test that handlers are properly initialized as empty collections
// This is the state drive_connection expects when it clones the handlers
let handlers = server.handlers.clone();
let streaming_handlers = server.streaming_handlers.clone();
// Verify they are the expected Arc<RwLock<HashMap>> types
assert!(Arc::strong_count(&handlers) >= 1);
assert!(Arc::strong_count(&streaming_handlers) >= 1);
}
#[tokio::test]
async fn test_handlers_access_pattern() {
// Test the pattern of accessing handlers that drive_connection uses
// This tests the async access to handler collections
let config = RpcConfig::new("test_cert.pem", "localhost:8080");
let server = RpcServer::new(config);
let handlers = server.handlers.clone();
let streaming_handlers = server.streaming_handlers.clone();
// Test read access (like drive_connection does)
{
let _handlers_ref = handlers.read().await;
let _streaming_handlers_ref = streaming_handlers.read().await;
// This simulates the access pattern in drive_connection
}
// Test that we can get multiple read locks simultaneously
let (_h1, _h2) = tokio::join!(handlers.read(), streaming_handlers.read());
}
#[test]
fn test_rpc_error_stream_error_creation() {
// Test creating the specific error type that drive_connection returns
// This tests the error path in drive_connection
let error_msg = "Connection stream failed";
let rpc_error = rpcnet::RpcError::StreamError(error_msg.to_string());
match rpc_error {
rpcnet::RpcError::StreamError(msg) => {
assert_eq!(msg, error_msg);
}
_ => panic!("Expected StreamError variant"),
}
}
#[test]
fn test_connection_drive_outcome_variants() {
// Test that we can work with ConnectionDriveOutcome enum
// This verifies the return types that drive_connection produces
// Test Debug formatting (used in logging)
let debug_str = format!("{:?}", rpcnet::RpcError::StreamError("test".to_string()));
assert!(debug_str.contains("StreamError"));
assert!(debug_str.contains("test"));
}
// Integration test showing the drive_connection method exists and has correct signature
#[tokio::test]
async fn test_drive_connection_method_exists() {
// This test verifies that the drive_connection method exists with the expected signature
// We can't easily test the full functionality without complex QUIC mocking,
// but we can verify the method signature and basic setup
let config = RpcConfig::new("test_cert.pem", "localhost:8080");
let server = RpcServer::new(config);
// Create a shutdown channel like drive_connection expects
let (_shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
// Note: We can't create a real QuicConnection for testing without a full QUIC setup
// So this test mainly verifies the method exists and the basic types work
// In a real test, you would need to mock the QuicConnection or use integration tests
// Verify that the method exists by checking we can reference it
let _method_ref = RpcServer::drive_connection;
// The actual call would look like:
// let result = server.drive_connection(mock_connection, shutdown_rx).await;
// but requires a proper QuicConnection mock
}