1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
use futures::{Stream, StreamExt}; use prost::Message; use std::pin::Pin; use tonic::{Request, Response, Status, Streaming}; mod mock; pub use mock::{MockBody, ProstDecoder}; pub type StreamResponseInner<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + Sync>>; pub type StreamResponse<T> = Response<StreamResponseInner<T>>; /// Generate streaming request for GRPC /// /// When testing streaming RPC implemented with tonic, it is pretty clumsy /// to build the streaming request, this function extracted test code and prost /// decoder from tonic source code and wrap it with a nice interface. With it, /// testing your streaming RPC implementation is much easier. /// /// Usage: /// ``` /// use bytes::Bytes; /// use prost::Message; /// use tonic_mock::streaming_request; /// /// // normally this should be generated from protos with prost /// #[derive(Clone, PartialEq, Message)] /// pub struct Event { /// #[prost(bytes = "bytes", tag = "1")] /// pub id: Bytes, /// #[prost(bytes = "bytes", tag = "2")] /// pub data: Bytes, /// } /// /// let event = Event { id: Bytes::from("1"), data: Bytes::from("a".repeat(10)) }; /// let mut events = vec![event.clone(), event.clone(), event]; /// let stream = tonic_mock::streaming_request(events); /// pub fn streaming_request<T>(messages: Vec<T>) -> Request<Streaming<T>> where T: Message + Default + 'static, { let body = MockBody::new(messages); let decoder: ProstDecoder<T> = ProstDecoder::new(); let stream = Streaming::new_request(decoder, body); Request::new(stream) } /// a simple wrapper to process and validate streaming response /// /// Usage: /// ``` /// use tonic::{Response, Status}; /// use futures::Stream; /// use std::pin::Pin; /// /// #[derive(Clone, PartialEq, ::prost::Message)] /// pub struct ResponsePush { /// #[prost(int32, tag = "1")] /// pub code: i32, /// } /// /// // below code is to mimic a stream response from a GRPC service /// let output = async_stream::try_stream! { /// yield ResponsePush { code: 0 }; /// yield ResponsePush { code: 1 }; /// yield ResponsePush { code: 2 }; /// }; /// let response = Response::new(Box::pin(output) as tonic_mock::StreamResponseInner<ResponsePush>); /// let rt = tokio::runtime::Runtime::new().unwrap(); /// /// // now we process the events /// rt.block_on(async { /// tonic_mock::process_streaming_response(response, |msg, i| { /// assert!(msg.is_ok()); /// assert_eq!(msg.as_ref().unwrap().code, i as i32); /// }).await; /// }); /// ``` pub async fn process_streaming_response<T, F>(response: StreamResponse<T>, f: F) where T: Message + Default + 'static, F: Fn(Result<T, Status>, usize), { let mut i: usize = 0; let mut messages = response.into_inner(); while let Some(v) = messages.next().await { f(v, i); i += 1; } } /// convert a streaming response to a Vec for simplified testing /// /// Usage: /// ``` /// use tonic::{Response, Status}; /// use futures::Stream; /// use std::pin::Pin; /// /// #[derive(Clone, PartialEq, ::prost::Message)] /// pub struct ResponsePush { /// #[prost(int32, tag = "1")] /// pub code: i32, /// } /// /// // below code is to mimic a stream response from a GRPC service /// let output = async_stream::try_stream! { /// yield ResponsePush { code: 0 }; /// yield ResponsePush { code: 1 }; /// yield ResponsePush { code: 2 }; /// }; /// let response = Response::new(Box::pin(output) as tonic_mock::StreamResponseInner<ResponsePush>); /// let rt = tokio::runtime::Runtime::new().unwrap(); /// /// // now we convert response to vec /// let result: Vec<Result<ResponsePush, Status>> = rt.block_on(async { tonic_mock::stream_to_vec(response).await }); /// for (i, v) in result.iter().enumerate() { /// assert!(v.is_ok()); /// assert_eq!(v.as_ref().unwrap().code, i as i32); /// } /// ``` pub async fn stream_to_vec<T>(response: StreamResponse<T>) -> Vec<Result<T, Status>> where T: Message + Default + 'static, { let mut result = Vec::new(); let mut messages = response.into_inner(); while let Some(v) = messages.next().await { result.push(v) } result }