starpc 0.49.7

Streaming protobuf RPC framework
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
//! Integration tests for starpc, mirroring the Go/JS test patterns.
//!
//! These tests verify:
//! 1. Unary RPC
//! 2. Server streaming (5 messages)
//! 3. Client streaming
//! 4. Bidirectional streaming
//! 5. Error handling
//! 6. Wire format compatibility

use async_trait::async_trait;
use prost::Message;
use std::sync::Arc;
use std::time::Duration;

use starpc::error::{Error, Result};
use starpc::handler::Handler;
use starpc::invoker::Invoker;
use starpc::mux::Mux;
use starpc::server::Server;
use starpc::stream::{Stream, StreamExt};
use starpc::testing::{create_test_pair, SingleInMemoryOpener};
use starpc::Client;

// Simple EchoMsg for testing
#[derive(Clone, PartialEq, Message)]
struct EchoMsg {
    #[prost(string, tag = "1")]
    body: String,
}

const BODY_TXT: &str = "hello world via starpc e2e test";

/// Echo server implementation matching Go's echo/server.go
struct EchoServer;

#[async_trait]
impl Invoker for EchoServer {
    async fn invoke_method(
        &self,
        _service_id: &str,
        method_id: &str,
        stream: Box<dyn Stream>,
    ) -> (bool, Result<()>) {
        match method_id {
            "Echo" => (true, self.echo(stream).await),
            "EchoServerStream" => (true, self.echo_server_stream(stream).await),
            "EchoClientStream" => (true, self.echo_client_stream(stream).await),
            "EchoBidiStream" => (true, self.echo_bidi_stream(stream).await),
            _ => (false, Err(Error::Unimplemented)),
        }
    }
}

impl Handler for EchoServer {
    fn service_id(&self) -> &'static str {
        "echo.Echoer"
    }

    fn method_ids(&self) -> &'static [&'static str] {
        &[
            "Echo",
            "EchoServerStream",
            "EchoClientStream",
            "EchoBidiStream",
        ]
    }
}

impl EchoServer {
    /// Unary echo - returns the same message
    async fn echo(&self, stream: Box<dyn Stream>) -> Result<()> {
        let msg: EchoMsg = stream.msg_recv().await?;
        stream.msg_send(&msg).await?;
        Ok(())
    }

    /// Server streaming - sends 5 copies of the message
    async fn echo_server_stream(&self, stream: Box<dyn Stream>) -> Result<()> {
        let msg: EchoMsg = stream.msg_recv().await?;

        // Send 5 responses with delays
        for _ in 0..5 {
            if stream.context().is_cancelled() {
                return Err(Error::Cancelled);
            }
            stream.msg_send(&msg).await?;
            tokio::time::sleep(Duration::from_millis(10)).await;
        }
        Ok(())
    }

    /// Client streaming - returns first message received
    async fn echo_client_stream(&self, stream: Box<dyn Stream>) -> Result<()> {
        let msg: EchoMsg = stream.msg_recv().await?;
        stream.msg_send(&msg).await?;
        Ok(())
    }

    /// Bidirectional streaming - server sends first, then echoes all messages
    async fn echo_bidi_stream(&self, stream: Box<dyn Stream>) -> Result<()> {
        // Server sends initial message
        let initial = EchoMsg {
            body: "hello from server".to_string(),
        };
        stream.msg_send(&initial).await?;

        // Echo all received messages
        loop {
            match stream.msg_recv::<EchoMsg>().await {
                Ok(msg) => {
                    if msg.body.is_empty() {
                        return Err(Error::Remote("got message with empty body".to_string()));
                    }
                    stream.msg_send(&msg).await?;
                }
                Err(Error::StreamClosed) => break,
                Err(e) => return Err(e),
            }
        }
        Ok(())
    }
}

/// Test infrastructure: creates connected client and server
async fn setup_e2e() -> (starpc::SrpcClient<SingleInMemoryOpener>, tokio::task::JoinHandle<()>) {
    let (opener, server_stream) = create_test_pair();

    // Set up the server
    let mux = Arc::new(Mux::new());
    mux.register(Arc::new(EchoServer)).unwrap();
    let server = Server::with_arc(mux);

    // Spawn server handler
    let server_handle = tokio::spawn(async move {
        let _ = server.handle_stream(server_stream).await;
    });

    // Create client
    let client = starpc::SrpcClient::new(opener);

    (client, server_handle)
}

// ============================================================================
// Tests matching Go's server_test.go
// ============================================================================

#[tokio::test]
async fn test_e2e_unary() {
    let (client, server_handle) = setup_e2e().await;

    // Make unary call
    let request = EchoMsg {
        body: BODY_TXT.to_string(),
    };
    let response: EchoMsg = client
        .exec_call("echo.Echoer", "Echo", &request)
        .await
        .expect("exec_call failed");

    assert_eq!(response.body, BODY_TXT);

    server_handle.abort();
}

#[tokio::test]
async fn test_e2e_server_stream() {
    let (client, server_handle) = setup_e2e().await;

    // Send request and open stream
    let request = EchoMsg {
        body: BODY_TXT.to_string(),
    };
    let data = request.encode_to_vec();
    let stream = client
        .new_stream("echo.Echoer", "EchoServerStream", Some(&data))
        .await
        .expect("new_stream failed");

    // Close send side
    stream.close_send().await.expect("close_send failed");

    // Expect to receive 5 messages
    let expected_rx = 5;
    let mut received = 0;

    loop {
        match stream.msg_recv::<EchoMsg>().await {
            Ok(msg) => {
                assert_eq!(msg.body, BODY_TXT);
                received += 1;
            }
            Err(Error::StreamClosed) => break,
            Err(e) => panic!("unexpected error: {}", e),
        }
    }

    assert_eq!(
        received, expected_rx,
        "expected {} messages, got {}",
        expected_rx, received
    );

    server_handle.abort();
}

#[tokio::test]
async fn test_e2e_client_stream() {
    let (client, server_handle) = setup_e2e().await;

    // Open stream without initial message
    let stream = client
        .new_stream("echo.Echoer", "EchoClientStream", None)
        .await
        .expect("new_stream failed");

    // Send a message
    let request = EchoMsg {
        body: BODY_TXT.to_string(),
    };
    stream.msg_send(&request).await.expect("msg_send failed");

    // Close send side
    stream.close_send().await.expect("close_send failed");

    // Receive response
    let response: EchoMsg = stream.msg_recv().await.expect("msg_recv failed");
    assert_eq!(response.body, BODY_TXT);

    stream.close().await.ok();
    server_handle.abort();
}

#[tokio::test]
async fn test_e2e_bidi_stream() {
    let (client, server_handle) = setup_e2e().await;

    // Open bidirectional stream
    let stream = client
        .new_stream("echo.Echoer", "EchoBidiStream", None)
        .await
        .expect("new_stream failed");

    // Receive server's initial message
    let initial: EchoMsg = stream.msg_recv().await.expect("msg_recv failed");
    assert_eq!(initial.body, "hello from server");

    // Send a message from client
    let client_msg = EchoMsg {
        body: "hello from client".to_string(),
    };
    stream.msg_send(&client_msg).await.expect("msg_send failed");

    // Receive echoed message
    let echoed: EchoMsg = stream.msg_recv().await.expect("msg_recv failed");
    assert_eq!(echoed.body, "hello from client");

    // Close the stream
    stream.close().await.expect("close failed");
    server_handle.abort();
}

#[tokio::test]
async fn test_e2e_multiple_bidi_messages() {
    let (client, server_handle) = setup_e2e().await;

    let stream = client
        .new_stream("echo.Echoer", "EchoBidiStream", None)
        .await
        .expect("new_stream failed");

    // Receive server's initial message
    let _: EchoMsg = stream.msg_recv().await.expect("initial recv failed");

    // Send and receive multiple messages
    for i in 0..10 {
        let msg = EchoMsg {
            body: format!("message {}", i),
        };
        stream.msg_send(&msg).await.expect("msg_send failed");

        let echoed: EchoMsg = stream.msg_recv().await.expect("msg_recv failed");
        assert_eq!(echoed.body, format!("message {}", i));
    }

    stream.close().await.expect("close failed");
    server_handle.abort();
}

#[tokio::test]
async fn test_e2e_unary_empty_message() {
    let (client, server_handle) = setup_e2e().await;

    // Send empty message
    let request = EchoMsg {
        body: String::new(),
    };
    let response: EchoMsg = client
        .exec_call("echo.Echoer", "Echo", &request)
        .await
        .expect("exec_call failed");

    assert_eq!(response.body, "");

    server_handle.abort();
}

#[tokio::test]
async fn test_e2e_unimplemented_method() {
    let (client, server_handle) = setup_e2e().await;

    let request = EchoMsg {
        body: "test".to_string(),
    };
    let result: Result<EchoMsg> = client
        .exec_call("echo.Echoer", "NonExistentMethod", &request)
        .await;

    assert!(result.is_err());

    server_handle.abort();
}

#[tokio::test]
async fn test_codec_wire_format() {
    use starpc::codec::PacketCodec;
    use starpc::proto::{packet::Body, CallData, CallStart, Packet};
    use tokio_util::codec::{Decoder, Encoder};

    let mut codec = PacketCodec::new();
    let mut buf = bytes::BytesMut::new();

    // Test CallStart encoding
    let call_start = Packet {
        body: Some(Body::CallStart(CallStart {
            rpc_service: "test.Service".into(),
            rpc_method: "TestMethod".into(),
            data: vec![1, 2, 3, 4],
            data_is_zero: false,
        })),
    };

    codec
        .encode(call_start.clone(), &mut buf)
        .expect("encode failed");

    // Verify length prefix (little-endian u32)
    let len = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
    assert_eq!(len, buf.len() - 4);

    // Decode and verify
    let decoded = codec
        .decode(&mut buf)
        .expect("decode failed")
        .expect("no packet");
    assert_eq!(decoded, call_start);

    // Test CallData encoding
    buf.clear();
    let call_data = Packet {
        body: Some(Body::CallData(CallData {
            data: vec![5, 6, 7, 8],
            data_is_zero: false,
            complete: true,
            error: String::new(),
        })),
    };

    codec
        .encode(call_data.clone(), &mut buf)
        .expect("encode failed");
    let decoded = codec
        .decode(&mut buf)
        .expect("decode failed")
        .expect("no packet");
    assert_eq!(decoded, call_data);

    // Test empty data with data_is_zero flag
    buf.clear();
    let empty_data = Packet {
        body: Some(Body::CallData(CallData {
            data: vec![],
            data_is_zero: true,
            complete: false,
            error: String::new(),
        })),
    };

    codec
        .encode(empty_data.clone(), &mut buf)
        .expect("encode failed");
    let decoded = codec
        .decode(&mut buf)
        .expect("decode failed")
        .expect("no packet");
    assert_eq!(decoded, empty_data);
}

#[tokio::test]
async fn test_packet_validation() {
    use starpc::packet::Validate;
    use starpc::proto::{packet::Body, CallData, CallStart, Packet};

    // Valid CallStart
    let valid_start = Packet {
        body: Some(Body::CallStart(CallStart {
            rpc_service: "svc".into(),
            rpc_method: "method".into(),
            data: vec![],
            data_is_zero: false,
        })),
    };
    assert!(valid_start.validate().is_ok());

    // Invalid CallStart - empty method
    let invalid_start = Packet {
        body: Some(Body::CallStart(CallStart {
            rpc_service: "svc".into(),
            rpc_method: String::new(),
            data: vec![],
            data_is_zero: false,
        })),
    };
    assert!(invalid_start.validate().is_err());

    // Valid CallData with data
    let valid_data = Packet {
        body: Some(Body::CallData(CallData {
            data: vec![1, 2, 3],
            data_is_zero: false,
            complete: false,
            error: String::new(),
        })),
    };
    assert!(valid_data.validate().is_ok());

    // Invalid CallData - empty everything
    let invalid_data = Packet {
        body: Some(Body::CallData(CallData {
            data: vec![],
            data_is_zero: false,
            complete: false,
            error: String::new(),
        })),
    };
    assert!(invalid_data.validate().is_err());

    // Empty packet
    let empty_packet = Packet { body: None };
    assert!(empty_packet.validate().is_err());
}

#[tokio::test]
async fn test_mux_registration_and_lookup() {
    let mux = Mux::new();

    // Register handler
    mux.register(Arc::new(EchoServer)).unwrap();

    // Check service exists
    assert!(mux.has_service("echo.Echoer"));
    assert!(!mux.has_service("nonexistent"));

    // Check methods exist
    assert!(mux.has_service_method("echo.Echoer", "Echo"));
    assert!(mux.has_service_method("echo.Echoer", "EchoServerStream"));
    assert!(!mux.has_service_method("echo.Echoer", "NonExistent"));

    // Empty strings should return false
    assert!(!mux.has_service(""));
    assert!(!mux.has_service_method("", "Echo"));
    assert!(!mux.has_service_method("echo.Echoer", ""));
}

#[tokio::test]
async fn test_error_types() {
    use starpc::error::codes;

    // Test error predicates
    assert!(Error::Aborted.is_abort());
    assert!(Error::Cancelled.is_abort());
    assert!(!Error::StreamClosed.is_abort());

    assert!(Error::StreamClosed.is_closed());
    assert!(Error::Cancelled.is_closed());

    assert!(Error::StreamIdle.is_timeout());

    assert!(Error::Unimplemented.is_unimplemented());

    // Test error codes
    assert_eq!(codes::ERR_RPC_ABORT, "ERR_RPC_ABORT");
    assert_eq!(codes::ERR_STREAM_IDLE, "ERR_STREAM_IDLE");
}