thin_jsonrpc_client/
backend.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4
5/// Something that can be handed bytes to send out. WHen every copy of the [`crate::Client`]
6/// is dropped, this will be too.
7pub trait BackendSender: Send + Sync + 'static {
8    /// Send a message to the JSON-RPC server, emitting an error if something goes wrong.
9    /// The message should be serializable to a valid JSON-RPC object.
10    fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), BackendError>> + Send + 'static>>;
11}
12
13/// Something that can receive bytes.
14pub trait BackendReceiver: Send + 'static {
15    /// Hand back the next message each time it's called. If this emits a [`BackendError`], we'll
16    /// stop asking for messages. If it emits `None`, then the connection has been closed gracefully.
17    /// The bytes given back should deserialize to a valid JSON-RPC object.
18    fn receive(&self) -> Pin<Box<dyn Future<Output = Option<Result<Vec<u8>, BackendError>>> + Send + 'static>>;
19}
20
21/// An error that can occur from the backend.
22pub type BackendError = Arc<dyn std::error::Error + Send + Sync + 'static>;
23
24/// A mock backend which can be used in tests or examples.
25pub mod mock {
26    use crate::raw_response::ErrorObject;
27    use crate::raw_response::RawResponse;
28
29    use super::*;
30    use std::task::Poll;
31    use std::collections::VecDeque;
32    use std::collections::HashMap;
33
34    /// Construct a new mock backend.
35    pub fn build() -> (MockBackend, MockBackendSender, MockBackendReceiver) {
36        let inner = Default::default();
37
38        (
39            MockBackend {inner: Arc::clone(&inner)},
40            MockBackendSender { inner: Arc::clone(&inner)},
41            MockBackendReceiver { inner }
42        )
43    }
44
45    /// A mock backend.
46    #[derive(Clone)]
47    pub struct MockBackend {
48        inner: Arc<std::sync::Mutex<MockBackendInner>>
49    }
50
51    /// The sender half of the mock backend.
52    pub struct MockBackendSender {
53        inner: Arc<std::sync::Mutex<MockBackendInner>>
54    }
55
56    /// The receiver half of the mock backend.
57    pub struct MockBackendReceiver {
58        inner: Arc<std::sync::Mutex<MockBackendInner>>
59    }
60
61    /// A mock request
62    #[allow(missing_docs)]
63    #[derive(serde::Deserialize, Debug, Clone)]
64    pub struct MockRequest {
65        pub jsonrpc: String,
66        pub id: Option<String>,
67        pub method: String,
68        #[serde(default = "empty_raw_value")]
69        pub params: Box<serde_json::value::RawValue>
70    }
71
72    fn empty_raw_value() -> Box<serde_json::value::RawValue> {
73        serde_json::value::RawValue::from_string("null".to_owned()).unwrap()
74    }
75
76    #[derive(Default)]
77    struct MockBackendInner {
78        handlers: HashMap<&'static str, Arc<dyn Fn(MockBackend, MockRequest) + Send + Sync + 'static>>,
79        stopped: bool,
80        send_waker: Option<std::task::Waker>,
81        send_queue: VecDeque<Vec<u8>>,
82    }
83
84    impl MockBackend {
85        /// Add a handler.
86        pub fn handler<F>(&self, name: &'static str, callback: F) -> &Self
87        where F: Fn(MockBackend, MockRequest) + Send + Sync + 'static
88        {
89            self.inner.lock().unwrap().handlers.insert(name, Arc::new(callback));
90            self
91        }
92
93        /// Send an OK notification back
94        pub fn send_ok_notification<V: serde::Serialize>(&self, value: V) {
95            let res = crate::RawResponse::ok_from_value::<_, u8>(None, value);
96            self.send_response(res)
97        }
98
99        /// Send an OK response back
100        pub fn send_ok_response<Id: ToString, V: serde::Serialize>(&self, id: Option<Id>, value: V) {
101            let res = crate::RawResponse::ok_from_value(id, value);
102            self.send_response(res)
103        }
104
105        /// Send a [`crate::RawResponse`] back.
106        pub fn send_response(&self, response: crate::RawResponse<'_>) {
107            let res = serde_json::to_string(&response).unwrap();
108            self.send_bytes(res.into_bytes())
109        }
110
111        /// Send raw bytes to be received by the [`BackendReceiver`].
112        pub fn send_bytes(&self, bytes: Vec<u8>) {
113            let mut inner = self.inner.lock().unwrap();
114            if inner.stopped {
115                return
116            }
117            inner.send_queue.push_back(bytes);
118            if let Some(waker) = inner.send_waker.take() {
119                waker.wake();
120            }
121        }
122
123        /// Shut the backend down. It'll return None when asked for
124        /// messages as soon as any existing queue has been drained.
125        pub fn shutdown(&self) {
126            let mut inner = self.inner.lock().unwrap();
127            inner.stopped = true;
128            if let Some(waker) = inner.send_waker.take() {
129                waker.wake();
130            }
131        }
132    }
133
134    impl Drop for MockBackendSender {
135        fn drop(&mut self) {
136            MockBackend { inner: self.inner.clone() }.shutdown();
137        }
138    }
139
140    impl BackendSender for MockBackendSender {
141        fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), BackendError>> + Send + 'static>> {
142            let Ok(req) = serde_json::from_slice::<MockRequest>(data) else {
143                eprintln!("Error decoding: {}", std::str::from_utf8(data).unwrap());
144                return Box::pin(std::future::ready(Ok(())))
145            };
146
147            let mock_backend = MockBackend { inner: self.inner.clone() };
148
149            // Complain if not 2.0
150            if req.jsonrpc != "2.0" {
151                mock_backend.send_response(RawResponse::err_from_value(req.id, ErrorObject {
152                    code: crate::raw_response::CODE_PARSE_ERROR,
153                    message: format!("\"jsonrpc\" field was not equal to \"2.0\", was {}", req.jsonrpc).into(),
154                    data: None
155                }));
156                return Box::pin(std::future::ready(Ok(())))
157            }
158
159            // Acquire lock only long enough to ge a copy our our handler, so we don't deadlock
160            // trying to call the handler or send some other response (which will also lock).
161            let maybe_handler = self.inner.lock().unwrap().handlers.get(req.method.as_str()).cloned();
162
163            if let Some(handler) = maybe_handler {
164                handler(mock_backend, req);
165            } else {
166                mock_backend.send_response(RawResponse::err_from_value(req.id, ErrorObject {
167                    code: crate::raw_response::CODE_METHOD_NOT_FOUND,
168                    message: format!("No method named '{}'", req.method).into(),
169                    data: None
170                }));
171            }
172
173            Box::pin(std::future::ready(Ok(())))
174        }
175    }
176
177    impl BackendReceiver for MockBackendReceiver {
178        fn receive(&self) -> Pin<Box<dyn Future<Output = Option<Result<Vec<u8>, BackendError>>> + Send + 'static>> {
179            let inner = self.inner.clone();
180            Box::pin(std::future::poll_fn(move |cx| {
181                let mut inner = inner.lock().unwrap();
182                if let Some(item) = inner.send_queue.pop_front() {
183                    Poll::Ready(Some(Ok(item)))
184                } else if inner.stopped {
185                    Poll::Ready(None)
186                } else {
187                    inner.send_waker = Some(cx.waker().clone());
188                    Poll::Pending
189                }
190            }))
191        }
192    }
193
194}