1use std::borrow::Cow;
22use std::collections::HashMap;
23use std::fmt;
24use std::sync::atomic;
25
26use serde;
27use serde_json;
28use serde_json::value::RawValue;
29
30use super::{Request, Response};
31use crate::error::Error;
32use crate::util::HashableValue;
33use async_trait::async_trait;
34
35#[async_trait]
37pub trait Transport: Send + Sync + 'static {
38 async fn send_request(&self, r: Request<'_>) -> Result<Response, Error>;
40 async fn send_batch(&self, rs: &[Request<'_>]) -> Result<Vec<Response>, Error>;
42 fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result;
45}
46
47pub struct Client {
52 pub(crate) transport: Box<dyn Transport>,
53 nonce: atomic::AtomicUsize,
54}
55
56impl Client {
57 pub fn with_transport<T: Transport>(transport: T) -> Client {
59 Client {
60 transport: Box::new(transport),
61 nonce: atomic::AtomicUsize::new(1),
62 }
63 }
64
65 pub fn build_request<'a>(&self, method: &'a str, params: &'a [Box<RawValue>]) -> Request<'a> {
70 let nonce = self.nonce.fetch_add(1, atomic::Ordering::Relaxed);
71 Request {
72 method: method,
73 params: params,
74 id: serde_json::Value::from(nonce),
75 jsonrpc: Some("2.0"),
76 }
77 }
78
79 pub async fn send_request(&self, request: Request<'_>) -> Result<Response, Error> {
81 self.transport.send_request(request).await
82 }
83
84 pub async fn send_batch(
90 &self,
91 requests: &[Request<'_>],
92 ) -> Result<Vec<Option<Response>>, Error> {
93 if requests.is_empty() {
94 return Err(Error::EmptyBatch);
95 }
96
97 let responses = self.transport.send_batch(requests).await?;
100 if responses.len() > requests.len() {
101 return Err(Error::WrongBatchResponseSize);
102 }
103
104 let mut by_id = HashMap::with_capacity(requests.len());
108 for resp in responses.into_iter() {
109 let id = HashableValue(Cow::Owned(resp.id.clone()));
110 if let Some(dup) = by_id.insert(id, resp) {
111 return Err(Error::BatchDuplicateResponseId(dup.id));
112 }
113 }
114 let results = requests
116 .into_iter()
117 .map(|r| by_id.remove(&HashableValue(Cow::Borrowed(&r.id))))
118 .collect();
119
120 if let Some((id, _)) = by_id.into_iter().nth(0) {
123 return Err(Error::WrongBatchResponseId(id.0.into_owned()));
124 }
125
126 Ok(results)
127 }
128
129 pub async fn call<R: for<'a> serde::de::Deserialize<'a>>(
134 &self,
135 method: &str,
136 args: &[Box<RawValue>],
137 ) -> Result<R, Error> {
138 let request = self.build_request(method, args);
139 let id = request.id.clone();
140
141 let response = self.send_request(request).await?;
142 if response.jsonrpc != None && response.jsonrpc != Some(From::from("2.0")) {
143 return Err(Error::VersionMismatch);
144 }
145 if response.id != id {
146 return Err(Error::NonceMismatch);
147 }
148
149 Ok(response.result()?)
150 }
151}
152
153impl fmt::Debug for Client {
154 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
155 write!(f, "jsonrpc::Client(")?;
156 self.transport.fmt_target(f)?;
157 write!(f, ")")
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164 use std::sync;
165
166 struct DummyTransport;
167 #[async_trait]
168 impl Transport for DummyTransport {
169 async fn send_request(&self, _: Request<'_>) -> Result<Response, Error> {
170 Err(Error::NonceMismatch)
171 }
172 async fn send_batch(&self, _: &[Request<'_>]) -> Result<Vec<Response>, Error> {
173 Ok(vec![])
174 }
175 fn fmt_target(&self, _: &mut fmt::Formatter) -> fmt::Result {
176 Ok(())
177 }
178 }
179
180 #[test]
181 fn sanity() {
182 let client = Client::with_transport(DummyTransport);
183 assert_eq!(client.nonce.load(sync::atomic::Ordering::Relaxed), 1);
184 let req1 = client.build_request("test", &[]);
185 assert_eq!(client.nonce.load(sync::atomic::Ordering::Relaxed), 2);
186 let req2 = client.build_request("test", &[]);
187 assert_eq!(client.nonce.load(sync::atomic::Ordering::Relaxed), 3);
188 assert!(req1.id != req2.id);
189 }
190}