tendermint_rpc/client/transport/
mock.rs

1//! Mock client implementation for use in testing.
2
3use alloc::collections::BTreeMap as HashMap;
4
5use async_trait::async_trait;
6
7use crate::dialect::{v0_38, Dialect};
8use crate::{
9    client::{
10        subscription::SubscriptionTx,
11        sync::{unbounded, ChannelRx, ChannelTx},
12        transport::router::SubscriptionRouter,
13        Client,
14    },
15    event::Event,
16    prelude::*,
17    query::Query,
18    request::SimpleRequest,
19    utils::uuid_str,
20    Error, Method, Request, Response, Subscription, SubscriptionClient,
21};
22
23/// A mock client implementation for use in testing.
24///
25/// ## Examples
26///
27/// ```rust
28/// use tendermint_rpc::{Client, Method, MockClient, MockRequestMatcher, MockRequestMethodMatcher};
29///
30/// const ABCI_INFO_RESPONSE: &str = r#"{
31///   "jsonrpc": "2.0",
32///   "id": "",
33///   "result": {
34///     "response": {
35///       "data": "GaiaApp",
36///       "version": "0.17.0",
37///       "app_version": "1",
38///       "last_block_height": "488120",
39///       "last_block_app_hash": "2LnCw0fN+Zq/gs5SOuya/GRHUmtWftAqAkTUuoxl4g4="
40///     }
41///   }
42/// }"#;
43///
44/// tokio_test::block_on(async {
45///     let matcher = MockRequestMethodMatcher::default()
46///         .map(Method::AbciInfo, Ok(ABCI_INFO_RESPONSE.to_string()));
47///     let (client, driver) = MockClient::new(matcher);
48///     let driver_hdl = tokio::spawn(async move { driver.run().await });
49///
50///     let abci_info = client.abci_info().await.unwrap();
51///     println!("Got mock ABCI info: {:?}", abci_info);
52///     assert_eq!("GaiaApp".to_string(), abci_info.data);
53///
54///     client.close();
55///     driver_hdl.await.unwrap();
56/// });
57/// ```
58#[derive(Debug)]
59pub struct MockClient<M: MockRequestMatcher> {
60    matcher: M,
61    driver_tx: ChannelTx<DriverCommand>,
62}
63
64#[async_trait]
65impl<M: MockRequestMatcher> Client for MockClient<M> {
66    async fn perform<R>(&self, request: R) -> Result<R::Output, Error>
67    where
68        R: SimpleRequest<v0_38::Dialect>,
69    {
70        self.matcher
71            .response_for(request)
72            .ok_or_else(Error::mismatch_response)?
73            .map(Into::into)
74    }
75}
76
77impl<M: MockRequestMatcher> MockClient<M> {
78    /// Create a new mock RPC client using the given request matcher.
79    pub fn new(matcher: M) -> (Self, MockClientDriver) {
80        let (driver_tx, driver_rx) = unbounded();
81        (
82            Self { matcher, driver_tx },
83            MockClientDriver::new(driver_rx),
84        )
85    }
86
87    /// Publishes the given event to all subscribers whose query exactly
88    /// matches that of the event.
89    pub fn publish(&self, ev: &Event) {
90        self.driver_tx
91            .send(DriverCommand::Publish(Box::new(ev.clone())))
92            .unwrap();
93    }
94
95    /// Signal to the mock client's driver to terminate.
96    pub fn close(self) {
97        self.driver_tx.send(DriverCommand::Terminate).unwrap();
98    }
99}
100
101#[async_trait]
102impl<M: MockRequestMatcher> SubscriptionClient for MockClient<M> {
103    async fn subscribe(&self, query: Query) -> Result<Subscription, Error> {
104        let id = uuid_str();
105        let (subs_tx, subs_rx) = unbounded();
106        let (result_tx, mut result_rx) = unbounded();
107        self.driver_tx.send(DriverCommand::Subscribe {
108            id: id.clone(),
109            query: query.clone(),
110            subscription_tx: subs_tx,
111            result_tx,
112        })?;
113        result_rx.recv().await.unwrap()?;
114        Ok(Subscription::new(id, query, subs_rx))
115    }
116
117    async fn unsubscribe(&self, query: Query) -> Result<(), Error> {
118        let (result_tx, mut result_rx) = unbounded();
119        self.driver_tx
120            .send(DriverCommand::Unsubscribe { query, result_tx })?;
121        result_rx.recv().await.unwrap()
122    }
123
124    fn close(self) -> Result<(), Error> {
125        Ok(())
126    }
127}
128
129#[derive(Debug)]
130pub enum DriverCommand {
131    Subscribe {
132        id: String,
133        query: Query,
134        subscription_tx: SubscriptionTx,
135        result_tx: ChannelTx<Result<(), Error>>,
136    },
137    Unsubscribe {
138        query: Query,
139        result_tx: ChannelTx<Result<(), Error>>,
140    },
141    Publish(Box<Event>),
142    Terminate,
143}
144
145#[derive(Debug)]
146pub struct MockClientDriver {
147    router: SubscriptionRouter,
148    rx: ChannelRx<DriverCommand>,
149}
150
151impl MockClientDriver {
152    pub fn new(rx: ChannelRx<DriverCommand>) -> Self {
153        Self {
154            router: SubscriptionRouter::default(),
155            rx,
156        }
157    }
158
159    pub async fn run(mut self) -> Result<(), Error> {
160        loop {
161            tokio::select! {
162            Some(cmd) = self.rx.recv() => match cmd {
163                    DriverCommand::Subscribe { id, query, subscription_tx, result_tx } => {
164                        self.subscribe(id, query, subscription_tx, result_tx);
165                    }
166                    DriverCommand::Unsubscribe { query, result_tx } => {
167                        self.unsubscribe(query, result_tx);
168                    }
169                    DriverCommand::Publish(event) => self.publish(*event),
170                    DriverCommand::Terminate => return Ok(()),
171                }
172            }
173        }
174    }
175
176    fn subscribe(
177        &mut self,
178        id: String,
179        query: Query,
180        subscription_tx: SubscriptionTx,
181        result_tx: ChannelTx<Result<(), Error>>,
182    ) {
183        self.router.add(id, query, subscription_tx);
184        result_tx.send(Ok(())).unwrap();
185    }
186
187    fn unsubscribe(&mut self, query: Query, result_tx: ChannelTx<Result<(), Error>>) {
188        self.router.remove_by_query(query);
189        result_tx.send(Ok(())).unwrap();
190    }
191
192    fn publish(&mut self, event: Event) {
193        self.router.publish_event(event);
194    }
195}
196
197/// A trait required by the [`MockClient`] that allows for different approaches
198/// to mocking responses for specific requests.
199///
200/// [`MockClient`]: struct.MockClient.html
201pub trait MockRequestMatcher: Send + Sync {
202    /// Provide the corresponding response for the given request (if any).
203    fn response_for<R, S>(&self, request: R) -> Option<Result<R::Response, Error>>
204    where
205        R: Request<S>,
206        S: Dialect;
207}
208
209/// Provides a simple [`MockRequestMatcher`] implementation that simply maps
210/// requests with specific methods to responses.
211///
212/// [`MockRequestMatcher`]: trait.MockRequestMatcher.html
213#[derive(Debug, Default)]
214pub struct MockRequestMethodMatcher {
215    mappings: HashMap<Method, Result<String, Error>>,
216}
217
218impl MockRequestMatcher for MockRequestMethodMatcher {
219    fn response_for<R, S>(&self, request: R) -> Option<Result<R::Response, Error>>
220    where
221        R: Request<S>,
222        S: Dialect,
223    {
224        self.mappings.get(&request.method()).map(|res| match res {
225            Ok(json) => R::Response::from_string(json),
226            Err(e) => Err(e.clone()),
227        })
228    }
229}
230
231impl MockRequestMethodMatcher {
232    /// Maps all incoming requests with the given method such that their
233    /// corresponding response will be `response`.
234    ///
235    /// Successful responses must be JSON-encoded.
236    #[allow(dead_code)]
237    pub fn map(mut self, method: Method, response: Result<String, Error>) -> Self {
238        self.mappings.insert(method, response);
239        self
240    }
241}
242
243#[cfg(test)]
244mod test {
245    use std::path::PathBuf;
246
247    use futures::StreamExt;
248    use tendermint::{block::Height, chain::Id};
249    use tokio::fs;
250
251    use super::*;
252    use crate::query::EventType;
253
254    async fn read_json_fixture(version: &str, name: &str) -> String {
255        fs::read_to_string(
256            PathBuf::from("./tests/kvstore_fixtures")
257                .join(version)
258                .join("incoming")
259                .join(name.to_owned() + ".json"),
260        )
261        .await
262        .unwrap()
263    }
264
265    mod v0_34 {
266        use super::*;
267        use crate::event::v0_34::DeEvent;
268
269        async fn read_event(name: &str) -> Event {
270            let msg = DeEvent::from_string(read_json_fixture("v0_34", name).await).unwrap();
271            msg.into()
272        }
273
274        #[tokio::test]
275        async fn mock_client() {
276            let abci_info_fixture = read_json_fixture("v0_34", "abci_info").await;
277            let block_fixture = read_json_fixture("v0_34", "block_at_height_10").await;
278            let matcher = MockRequestMethodMatcher::default()
279                .map(Method::AbciInfo, Ok(abci_info_fixture))
280                .map(Method::Block, Ok(block_fixture));
281            let (client, driver) = MockClient::new(matcher);
282            let driver_hdl = tokio::spawn(async move { driver.run().await });
283
284            let abci_info = client.abci_info().await.unwrap();
285            assert_eq!("{\"size\":0}".to_string(), abci_info.data);
286
287            let block = client.block(Height::from(10_u32)).await.unwrap().block;
288            assert_eq!(Height::from(10_u32), block.header.height);
289            assert_eq!("dockerchain".parse::<Id>().unwrap(), block.header.chain_id);
290
291            client.close();
292            driver_hdl.await.unwrap().unwrap();
293        }
294
295        #[tokio::test]
296        async fn mock_subscription_client() {
297            let (client, driver) = MockClient::new(MockRequestMethodMatcher::default());
298            let driver_hdl = tokio::spawn(async move { driver.run().await });
299
300            let event1 = read_event("subscribe_newblock_0").await;
301            let event2 = read_event("subscribe_newblock_1").await;
302            let event3 = read_event("subscribe_newblock_2").await;
303            let events = vec![event1, event2, event3];
304
305            let subs1 = client.subscribe(EventType::NewBlock.into()).await.unwrap();
306            let subs2 = client.subscribe(EventType::NewBlock.into()).await.unwrap();
307            assert_ne!(subs1.id().to_string(), subs2.id().to_string());
308
309            // We can do this because the underlying channels can buffer the
310            // messages as we publish them.
311            let subs1_events = subs1.take(3);
312            let subs2_events = subs2.take(3);
313            for ev in &events {
314                client.publish(ev);
315            }
316
317            // Here each subscription's channel is drained.
318            let subs1_events = subs1_events.collect::<Vec<Result<Event, Error>>>().await;
319            let subs2_events = subs2_events.collect::<Vec<Result<Event, Error>>>().await;
320
321            assert_eq!(3, subs1_events.len());
322            assert_eq!(3, subs2_events.len());
323
324            for i in 0..3 {
325                assert!(events[i].eq(subs1_events[i].as_ref().unwrap()));
326            }
327
328            client.close();
329            driver_hdl.await.unwrap().unwrap();
330        }
331    }
332
333    mod v0_37 {
334        use super::*;
335        use crate::event::v0_37::DeEvent;
336
337        async fn read_event(name: &str) -> Event {
338            let msg = DeEvent::from_string(read_json_fixture("v0_37", name).await).unwrap();
339            msg.into()
340        }
341
342        #[tokio::test]
343        async fn mock_client() {
344            let abci_info_fixture = read_json_fixture("v0_37", "abci_info").await;
345            let block_fixture = read_json_fixture("v0_37", "block_at_height_10").await;
346            let matcher = MockRequestMethodMatcher::default()
347                .map(Method::AbciInfo, Ok(abci_info_fixture))
348                .map(Method::Block, Ok(block_fixture));
349            let (client, driver) = MockClient::new(matcher);
350            let driver_hdl = tokio::spawn(async move { driver.run().await });
351
352            let abci_info = client.abci_info().await.unwrap();
353            assert_eq!("{\"size\":9}".to_string(), abci_info.data);
354
355            let block = client.block(Height::from(10_u32)).await.unwrap().block;
356            assert_eq!(Height::from(10_u32), block.header.height);
357            assert_eq!("dockerchain".parse::<Id>().unwrap(), block.header.chain_id);
358
359            client.close();
360            driver_hdl.await.unwrap().unwrap();
361        }
362
363        #[tokio::test]
364        async fn mock_subscription_client() {
365            let (client, driver) = MockClient::new(MockRequestMethodMatcher::default());
366            let driver_hdl = tokio::spawn(async move { driver.run().await });
367
368            let event1 = read_event("subscribe_newblock_0").await;
369            let event2 = read_event("subscribe_newblock_1").await;
370            let event3 = read_event("subscribe_newblock_2").await;
371            let events = vec![event1, event2, event3];
372
373            let subs1 = client.subscribe(EventType::NewBlock.into()).await.unwrap();
374            let subs2 = client.subscribe(EventType::NewBlock.into()).await.unwrap();
375            assert_ne!(subs1.id().to_string(), subs2.id().to_string());
376
377            // We can do this because the underlying channels can buffer the
378            // messages as we publish them.
379            let subs1_events = subs1.take(3);
380            let subs2_events = subs2.take(3);
381            for ev in &events {
382                client.publish(ev);
383            }
384
385            // Here each subscription's channel is drained.
386            let subs1_events = subs1_events.collect::<Vec<Result<Event, Error>>>().await;
387            let subs2_events = subs2_events.collect::<Vec<Result<Event, Error>>>().await;
388
389            assert_eq!(3, subs1_events.len());
390            assert_eq!(3, subs2_events.len());
391
392            for i in 0..3 {
393                assert!(events[i].eq(subs1_events[i].as_ref().unwrap()));
394            }
395
396            client.close();
397            driver_hdl.await.unwrap().unwrap();
398        }
399    }
400}