tendermint_rpc/client/transport/
mock.rs1use alloc::collections::BTreeMap as HashMap;
4
5use async_trait::async_trait;
6
7use crate::dialect::{v0_38, Dialect};
8use crate::{
9 client::{
10 subscription::SubscriptionTx,
11 sync::{unbounded, ChannelRx, ChannelTx},
12 transport::router::SubscriptionRouter,
13 Client,
14 },
15 event::Event,
16 prelude::*,
17 query::Query,
18 request::SimpleRequest,
19 utils::uuid_str,
20 Error, Method, Request, Response, Subscription, SubscriptionClient,
21};
22
23#[derive(Debug)]
59pub struct MockClient<M: MockRequestMatcher> {
60 matcher: M,
61 driver_tx: ChannelTx<DriverCommand>,
62}
63
64#[async_trait]
65impl<M: MockRequestMatcher> Client for MockClient<M> {
66 async fn perform<R>(&self, request: R) -> Result<R::Output, Error>
67 where
68 R: SimpleRequest<v0_38::Dialect>,
69 {
70 self.matcher
71 .response_for(request)
72 .ok_or_else(Error::mismatch_response)?
73 .map(Into::into)
74 }
75}
76
77impl<M: MockRequestMatcher> MockClient<M> {
78 pub fn new(matcher: M) -> (Self, MockClientDriver) {
80 let (driver_tx, driver_rx) = unbounded();
81 (
82 Self { matcher, driver_tx },
83 MockClientDriver::new(driver_rx),
84 )
85 }
86
87 pub fn publish(&self, ev: &Event) {
90 self.driver_tx
91 .send(DriverCommand::Publish(Box::new(ev.clone())))
92 .unwrap();
93 }
94
95 pub fn close(self) {
97 self.driver_tx.send(DriverCommand::Terminate).unwrap();
98 }
99}
100
101#[async_trait]
102impl<M: MockRequestMatcher> SubscriptionClient for MockClient<M> {
103 async fn subscribe(&self, query: Query) -> Result<Subscription, Error> {
104 let id = uuid_str();
105 let (subs_tx, subs_rx) = unbounded();
106 let (result_tx, mut result_rx) = unbounded();
107 self.driver_tx.send(DriverCommand::Subscribe {
108 id: id.clone(),
109 query: query.clone(),
110 subscription_tx: subs_tx,
111 result_tx,
112 })?;
113 result_rx.recv().await.unwrap()?;
114 Ok(Subscription::new(id, query, subs_rx))
115 }
116
117 async fn unsubscribe(&self, query: Query) -> Result<(), Error> {
118 let (result_tx, mut result_rx) = unbounded();
119 self.driver_tx
120 .send(DriverCommand::Unsubscribe { query, result_tx })?;
121 result_rx.recv().await.unwrap()
122 }
123
124 fn close(self) -> Result<(), Error> {
125 Ok(())
126 }
127}
128
129#[derive(Debug)]
130pub enum DriverCommand {
131 Subscribe {
132 id: String,
133 query: Query,
134 subscription_tx: SubscriptionTx,
135 result_tx: ChannelTx<Result<(), Error>>,
136 },
137 Unsubscribe {
138 query: Query,
139 result_tx: ChannelTx<Result<(), Error>>,
140 },
141 Publish(Box<Event>),
142 Terminate,
143}
144
145#[derive(Debug)]
146pub struct MockClientDriver {
147 router: SubscriptionRouter,
148 rx: ChannelRx<DriverCommand>,
149}
150
151impl MockClientDriver {
152 pub fn new(rx: ChannelRx<DriverCommand>) -> Self {
153 Self {
154 router: SubscriptionRouter::default(),
155 rx,
156 }
157 }
158
159 pub async fn run(mut self) -> Result<(), Error> {
160 loop {
161 tokio::select! {
162 Some(cmd) = self.rx.recv() => match cmd {
163 DriverCommand::Subscribe { id, query, subscription_tx, result_tx } => {
164 self.subscribe(id, query, subscription_tx, result_tx);
165 }
166 DriverCommand::Unsubscribe { query, result_tx } => {
167 self.unsubscribe(query, result_tx);
168 }
169 DriverCommand::Publish(event) => self.publish(*event),
170 DriverCommand::Terminate => return Ok(()),
171 }
172 }
173 }
174 }
175
176 fn subscribe(
177 &mut self,
178 id: String,
179 query: Query,
180 subscription_tx: SubscriptionTx,
181 result_tx: ChannelTx<Result<(), Error>>,
182 ) {
183 self.router.add(id, query, subscription_tx);
184 result_tx.send(Ok(())).unwrap();
185 }
186
187 fn unsubscribe(&mut self, query: Query, result_tx: ChannelTx<Result<(), Error>>) {
188 self.router.remove_by_query(query);
189 result_tx.send(Ok(())).unwrap();
190 }
191
192 fn publish(&mut self, event: Event) {
193 self.router.publish_event(event);
194 }
195}
196
197pub trait MockRequestMatcher: Send + Sync {
202 fn response_for<R, S>(&self, request: R) -> Option<Result<R::Response, Error>>
204 where
205 R: Request<S>,
206 S: Dialect;
207}
208
209#[derive(Debug, Default)]
214pub struct MockRequestMethodMatcher {
215 mappings: HashMap<Method, Result<String, Error>>,
216}
217
218impl MockRequestMatcher for MockRequestMethodMatcher {
219 fn response_for<R, S>(&self, request: R) -> Option<Result<R::Response, Error>>
220 where
221 R: Request<S>,
222 S: Dialect,
223 {
224 self.mappings.get(&request.method()).map(|res| match res {
225 Ok(json) => R::Response::from_string(json),
226 Err(e) => Err(e.clone()),
227 })
228 }
229}
230
231impl MockRequestMethodMatcher {
232 #[allow(dead_code)]
237 pub fn map(mut self, method: Method, response: Result<String, Error>) -> Self {
238 self.mappings.insert(method, response);
239 self
240 }
241}
242
243#[cfg(test)]
244mod test {
245 use std::path::PathBuf;
246
247 use futures::StreamExt;
248 use tendermint::{block::Height, chain::Id};
249 use tokio::fs;
250
251 use super::*;
252 use crate::query::EventType;
253
254 async fn read_json_fixture(version: &str, name: &str) -> String {
255 fs::read_to_string(
256 PathBuf::from("./tests/kvstore_fixtures")
257 .join(version)
258 .join("incoming")
259 .join(name.to_owned() + ".json"),
260 )
261 .await
262 .unwrap()
263 }
264
265 mod v0_34 {
266 use super::*;
267 use crate::event::v0_34::DeEvent;
268
269 async fn read_event(name: &str) -> Event {
270 let msg = DeEvent::from_string(read_json_fixture("v0_34", name).await).unwrap();
271 msg.into()
272 }
273
274 #[tokio::test]
275 async fn mock_client() {
276 let abci_info_fixture = read_json_fixture("v0_34", "abci_info").await;
277 let block_fixture = read_json_fixture("v0_34", "block_at_height_10").await;
278 let matcher = MockRequestMethodMatcher::default()
279 .map(Method::AbciInfo, Ok(abci_info_fixture))
280 .map(Method::Block, Ok(block_fixture));
281 let (client, driver) = MockClient::new(matcher);
282 let driver_hdl = tokio::spawn(async move { driver.run().await });
283
284 let abci_info = client.abci_info().await.unwrap();
285 assert_eq!("{\"size\":0}".to_string(), abci_info.data);
286
287 let block = client.block(Height::from(10_u32)).await.unwrap().block;
288 assert_eq!(Height::from(10_u32), block.header.height);
289 assert_eq!("dockerchain".parse::<Id>().unwrap(), block.header.chain_id);
290
291 client.close();
292 driver_hdl.await.unwrap().unwrap();
293 }
294
295 #[tokio::test]
296 async fn mock_subscription_client() {
297 let (client, driver) = MockClient::new(MockRequestMethodMatcher::default());
298 let driver_hdl = tokio::spawn(async move { driver.run().await });
299
300 let event1 = read_event("subscribe_newblock_0").await;
301 let event2 = read_event("subscribe_newblock_1").await;
302 let event3 = read_event("subscribe_newblock_2").await;
303 let events = vec![event1, event2, event3];
304
305 let subs1 = client.subscribe(EventType::NewBlock.into()).await.unwrap();
306 let subs2 = client.subscribe(EventType::NewBlock.into()).await.unwrap();
307 assert_ne!(subs1.id().to_string(), subs2.id().to_string());
308
309 let subs1_events = subs1.take(3);
312 let subs2_events = subs2.take(3);
313 for ev in &events {
314 client.publish(ev);
315 }
316
317 let subs1_events = subs1_events.collect::<Vec<Result<Event, Error>>>().await;
319 let subs2_events = subs2_events.collect::<Vec<Result<Event, Error>>>().await;
320
321 assert_eq!(3, subs1_events.len());
322 assert_eq!(3, subs2_events.len());
323
324 for i in 0..3 {
325 assert!(events[i].eq(subs1_events[i].as_ref().unwrap()));
326 }
327
328 client.close();
329 driver_hdl.await.unwrap().unwrap();
330 }
331 }
332
333 mod v0_37 {
334 use super::*;
335 use crate::event::v0_37::DeEvent;
336
337 async fn read_event(name: &str) -> Event {
338 let msg = DeEvent::from_string(read_json_fixture("v0_37", name).await).unwrap();
339 msg.into()
340 }
341
342 #[tokio::test]
343 async fn mock_client() {
344 let abci_info_fixture = read_json_fixture("v0_37", "abci_info").await;
345 let block_fixture = read_json_fixture("v0_37", "block_at_height_10").await;
346 let matcher = MockRequestMethodMatcher::default()
347 .map(Method::AbciInfo, Ok(abci_info_fixture))
348 .map(Method::Block, Ok(block_fixture));
349 let (client, driver) = MockClient::new(matcher);
350 let driver_hdl = tokio::spawn(async move { driver.run().await });
351
352 let abci_info = client.abci_info().await.unwrap();
353 assert_eq!("{\"size\":9}".to_string(), abci_info.data);
354
355 let block = client.block(Height::from(10_u32)).await.unwrap().block;
356 assert_eq!(Height::from(10_u32), block.header.height);
357 assert_eq!("dockerchain".parse::<Id>().unwrap(), block.header.chain_id);
358
359 client.close();
360 driver_hdl.await.unwrap().unwrap();
361 }
362
363 #[tokio::test]
364 async fn mock_subscription_client() {
365 let (client, driver) = MockClient::new(MockRequestMethodMatcher::default());
366 let driver_hdl = tokio::spawn(async move { driver.run().await });
367
368 let event1 = read_event("subscribe_newblock_0").await;
369 let event2 = read_event("subscribe_newblock_1").await;
370 let event3 = read_event("subscribe_newblock_2").await;
371 let events = vec![event1, event2, event3];
372
373 let subs1 = client.subscribe(EventType::NewBlock.into()).await.unwrap();
374 let subs2 = client.subscribe(EventType::NewBlock.into()).await.unwrap();
375 assert_ne!(subs1.id().to_string(), subs2.id().to_string());
376
377 let subs1_events = subs1.take(3);
380 let subs2_events = subs2.take(3);
381 for ev in &events {
382 client.publish(ev);
383 }
384
385 let subs1_events = subs1_events.collect::<Vec<Result<Event, Error>>>().await;
387 let subs2_events = subs2_events.collect::<Vec<Result<Event, Error>>>().await;
388
389 assert_eq!(3, subs1_events.len());
390 assert_eq!(3, subs2_events.len());
391
392 for i in 0..3 {
393 assert!(events[i].eq(subs1_events[i].as_ref().unwrap()));
394 }
395
396 client.close();
397 driver_hdl.await.unwrap().unwrap();
398 }
399 }
400}