1use super::common::Params;
2use async_trait::async_trait;
3use bytes::{Buf, BytesMut};
4use ethers_core::types::U256;
5use futures_channel::mpsc;
6use futures_util::stream::StreamExt;
7use hashers::fx_hash::FxHasher64;
8use serde::{de::DeserializeOwned, Serialize};
9use serde_json::{value::RawValue, Deserializer};
10use std::{
11 cell::RefCell,
12 convert::Infallible,
13 hash::BuildHasherDefault,
14 io,
15 path::Path,
16 sync::{
17 atomic::{AtomicU64, Ordering},
18 Arc,
19 },
20 thread,
21};
22use thiserror::Error;
23use tokio::{
24 io::{AsyncReadExt, AsyncWriteExt, BufReader},
25 runtime,
26 sync::oneshot::{self, error::RecvError},
27};
28
29use super::common::{JsonRpcError, Request, Response};
30use crate::{errors::ProviderError, JsonRpcClient, PubsubClient};
31
32type FxHashMap<K, V> = std::collections::HashMap<K, V, BuildHasherDefault<FxHasher64>>;
33
34type Pending = oneshot::Sender<Result<Box<RawValue>, JsonRpcError>>;
35type Subscription = mpsc::UnboundedSender<Box<RawValue>>;
36
37#[cfg(unix)]
38#[doc(hidden)]
39mod imp {
40 pub(super) use tokio::net::{
41 unix::{ReadHalf, WriteHalf},
42 UnixStream as Stream,
43 };
44}
45
46#[cfg(windows)]
47#[doc(hidden)]
48mod imp {
49 use super::*;
50 use std::{
51 ops::{Deref, DerefMut},
52 pin::Pin,
53 task::{Context, Poll},
54 time::Duration,
55 };
56 use tokio::{
57 io::{AsyncRead, AsyncWrite, ReadBuf},
58 net::windows::named_pipe::{ClientOptions, NamedPipeClient},
59 time::sleep,
60 };
61 use winapi::shared::winerror;
62
63 #[repr(transparent)]
67 pub(super) struct Stream(pub NamedPipeClient);
68
69 impl Deref for Stream {
70 type Target = NamedPipeClient;
71
72 fn deref(&self) -> &Self::Target {
73 &self.0
74 }
75 }
76
77 impl DerefMut for Stream {
78 fn deref_mut(&mut self) -> &mut Self::Target {
79 &mut self.0
80 }
81 }
82
83 impl Stream {
84 pub async fn connect(addr: impl AsRef<Path>) -> Result<Self, io::Error> {
85 let addr = addr.as_ref().as_os_str();
86 loop {
87 match ClientOptions::new().open(addr) {
88 Ok(client) => break Ok(Self(client)),
89 Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
90 Err(e) => break Err(e),
91 }
92
93 sleep(Duration::from_millis(50)).await;
94 }
95 }
96
97 #[allow(unsafe_code)]
98 pub fn split(&mut self) -> (ReadHalf, WriteHalf) {
99 let self1 = unsafe { &mut *(self as *mut Self) };
102 let self2 = self;
103 (ReadHalf(self1), WriteHalf(self2))
104 }
105 }
106
107 impl AsyncRead for Stream {
108 fn poll_read(
109 self: Pin<&mut Self>,
110 cx: &mut Context<'_>,
111 buf: &mut ReadBuf<'_>,
112 ) -> Poll<io::Result<()>> {
113 let this = Pin::new(&mut self.get_mut().0);
114 this.poll_read(cx, buf)
115 }
116 }
117
118 impl AsyncWrite for Stream {
119 fn poll_write(
120 self: Pin<&mut Self>,
121 cx: &mut Context<'_>,
122 buf: &[u8],
123 ) -> Poll<io::Result<usize>> {
124 let this = Pin::new(&mut self.get_mut().0);
125 this.poll_write(cx, buf)
126 }
127
128 fn poll_write_vectored(
129 self: Pin<&mut Self>,
130 cx: &mut Context<'_>,
131 bufs: &[io::IoSlice<'_>],
132 ) -> Poll<io::Result<usize>> {
133 let this = Pin::new(&mut self.get_mut().0);
134 this.poll_write_vectored(cx, bufs)
135 }
136
137 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
138 Poll::Ready(Ok(()))
139 }
140
141 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
142 self.poll_flush(cx)
143 }
144 }
145
146 pub(super) struct ReadHalf<'a>(pub &'a mut Stream);
147
148 pub(super) struct WriteHalf<'a>(pub &'a mut Stream);
149
150 impl AsyncRead for ReadHalf<'_> {
151 fn poll_read(
152 self: Pin<&mut Self>,
153 cx: &mut Context<'_>,
154 buf: &mut ReadBuf<'_>,
155 ) -> Poll<io::Result<()>> {
156 let this = Pin::new(&mut self.get_mut().0 .0);
157 this.poll_read(cx, buf)
158 }
159 }
160
161 impl AsyncWrite for WriteHalf<'_> {
162 fn poll_write(
163 self: Pin<&mut Self>,
164 cx: &mut Context<'_>,
165 buf: &[u8],
166 ) -> Poll<io::Result<usize>> {
167 let this = Pin::new(&mut self.get_mut().0 .0);
168 this.poll_write(cx, buf)
169 }
170
171 fn poll_write_vectored(
172 self: Pin<&mut Self>,
173 cx: &mut Context<'_>,
174 bufs: &[io::IoSlice<'_>],
175 ) -> Poll<io::Result<usize>> {
176 let this = Pin::new(&mut self.get_mut().0 .0);
177 this.poll_write_vectored(cx, bufs)
178 }
179
180 fn is_write_vectored(&self) -> bool {
181 self.0.is_write_vectored()
182 }
183
184 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
185 let this = Pin::new(&mut self.get_mut().0 .0);
186 this.poll_flush(cx)
187 }
188
189 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
190 self.poll_flush(cx)
191 }
192 }
193}
194
195use self::imp::*;
196
197#[cfg_attr(unix, doc = "A JSON-RPC Client over Unix IPC.")]
198#[cfg_attr(windows, doc = "A JSON-RPC Client over named pipes.")]
199#[cfg_attr(unix, doc = r#"let path = "/home/user/.local/share/reth/reth.ipc";"#)]
208#[cfg_attr(windows, doc = r#"let path = r"\\.\pipe\reth.ipc";"#)]
209#[derive(Debug, Clone)]
214pub struct Ipc {
215 id: Arc<AtomicU64>,
216 request_tx: mpsc::UnboundedSender<TransportMessage>,
217}
218
219#[derive(Debug)]
220enum TransportMessage {
221 Request { id: u64, request: Box<[u8]>, sender: Pending },
222 Subscribe { id: U256, sink: Subscription },
223 Unsubscribe { id: U256 },
224}
225
226impl Ipc {
227 #[cfg_attr(unix, doc = "Connects to the Unix socket at the provided path.")]
228 #[cfg_attr(windows, doc = "Connects to the named pipe at the provided path.\n")]
229 #[cfg_attr(
230 windows,
231 doc = r"Note: the path must be the fully qualified, like: `\\.\pipe\<name>`."
232 )]
233 pub async fn connect(path: impl AsRef<Path>) -> Result<Self, IpcError> {
234 let id = Arc::new(AtomicU64::new(1));
235 let (request_tx, request_rx) = mpsc::unbounded();
236
237 let stream = Stream::connect(path).await?;
238 spawn_ipc_server(stream, request_rx);
239
240 Ok(Self { id, request_tx })
241 }
242
243 fn send(&self, msg: TransportMessage) -> Result<(), IpcError> {
244 self.request_tx
245 .unbounded_send(msg)
246 .map_err(|_| IpcError::ChannelError("IPC server receiver dropped".to_string()))?;
247
248 Ok(())
249 }
250}
251
252#[async_trait]
253impl JsonRpcClient for Ipc {
254 type Error = IpcError;
255
256 async fn request<T: Serialize + Send + Sync, R: DeserializeOwned>(
257 &self,
258 method: &str,
259 params: T,
260 ) -> Result<R, IpcError> {
261 let next_id = self.id.fetch_add(1, Ordering::SeqCst);
262
263 let (sender, receiver) = oneshot::channel();
265 let payload = TransportMessage::Request {
266 id: next_id,
267 request: serde_json::to_vec(&Request::new(next_id, method, params))?.into_boxed_slice(),
268 sender,
269 };
270
271 self.send(payload)?;
273
274 let res = receiver.await??;
276
277 Ok(serde_json::from_str(res.get())?)
279 }
280}
281
282impl PubsubClient for Ipc {
283 type NotificationStream = mpsc::UnboundedReceiver<Box<RawValue>>;
284
285 fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, IpcError> {
286 let (sink, stream) = mpsc::unbounded();
287 self.send(TransportMessage::Subscribe { id: id.into(), sink })?;
288 Ok(stream)
289 }
290
291 fn unsubscribe<T: Into<U256>>(&self, id: T) -> Result<(), IpcError> {
292 self.send(TransportMessage::Unsubscribe { id: id.into() })
293 }
294}
295
296fn spawn_ipc_server(stream: Stream, request_rx: mpsc::UnboundedReceiver<TransportMessage>) {
297 const STACK_SIZE: usize = 1 << 18;
301 let _ = thread::Builder::new()
304 .name("ipc-server-thread".to_string())
305 .stack_size(STACK_SIZE)
306 .spawn(move || {
307 let rt = runtime::Builder::new_current_thread()
308 .enable_io()
309 .build()
310 .expect("failed to create ipc-server-thread async runtime");
311
312 rt.block_on(run_ipc_server(stream, request_rx));
313 })
314 .expect("failed to spawn ipc server thread");
315}
316
317async fn run_ipc_server(mut stream: Stream, request_rx: mpsc::UnboundedReceiver<TransportMessage>) {
318 let shared = Shared {
320 pending: FxHashMap::with_capacity_and_hasher(64, BuildHasherDefault::default()).into(),
321 subs: FxHashMap::with_capacity_and_hasher(64, BuildHasherDefault::default()).into(),
322 };
323
324 let (reader, writer) = stream.split();
327 let read = shared.handle_ipc_reads(reader);
328 let write = shared.handle_ipc_writes(writer, request_rx);
329
330 if let Err(e) = futures_util::try_join!(read, write) {
332 match e {
333 IpcError::ServerExit => {}
334 err => tracing::error!(?err, "exiting IPC server due to error"),
335 }
336 }
337}
338
339struct Shared {
340 pending: RefCell<FxHashMap<u64, Pending>>,
341 subs: RefCell<FxHashMap<U256, Subscription>>,
342}
343
344impl Shared {
345 async fn handle_ipc_reads(&self, reader: ReadHalf<'_>) -> Result<Infallible, IpcError> {
346 let mut reader = BufReader::new(reader);
347 let mut buf = BytesMut::with_capacity(4096);
348
349 loop {
350 let read = reader.read_buf(&mut buf).await?;
352 if read == 0 {
353 return Err(IpcError::ServerExit)
355 }
356
357 let read = self.handle_bytes(&buf)?;
359 buf.advance(read);
363 }
364 }
365
366 async fn handle_ipc_writes(
367 &self,
368 mut writer: WriteHalf<'_>,
369 mut request_rx: mpsc::UnboundedReceiver<TransportMessage>,
370 ) -> Result<Infallible, IpcError> {
371 use TransportMessage::*;
372
373 while let Some(msg) = request_rx.next().await {
374 match msg {
375 Request { id, request, sender } => {
376 let prev = self.pending.borrow_mut().insert(id, sender);
377 assert!(prev.is_none(), "{}", "replaced pending IPC request (id={id})");
378
379 if let Err(err) = writer.write_all(&request).await {
380 tracing::error!("IPC connection error: {:?}", err);
381 self.pending.borrow_mut().remove(&id);
382 }
383 }
384 Subscribe { id, sink } => {
385 if self.subs.borrow_mut().insert(id, sink).is_some() {
386 tracing::warn!(
387 %id,
388 "replaced already-registered subscription"
389 );
390 }
391 }
392 Unsubscribe { id } => {
393 if self.subs.borrow_mut().remove(&id).is_none() {
394 tracing::warn!(
395 %id,
396 "attempted to unsubscribe from non-existent subscription"
397 );
398 }
399 }
400 }
401 }
402
403 Err(IpcError::ServerExit)
408 }
409
410 fn handle_bytes(&self, bytes: &BytesMut) -> Result<usize, IpcError> {
411 let mut de = Deserializer::from_slice(bytes.as_ref()).into_iter();
413 while let Some(Ok(response)) = de.next() {
414 match response {
415 Response::Success { id, result } => self.send_response(id, Ok(result.to_owned())),
416 Response::Error { id, error } => self.send_response(id, Err(error)),
417 Response::Notification { params, .. } => self.send_notification(params),
418 };
419 }
420
421 Ok(de.byte_offset())
422 }
423
424 fn send_response(&self, id: u64, result: Result<Box<RawValue>, JsonRpcError>) {
425 let response_tx = match self.pending.borrow_mut().remove(&id) {
427 Some(tx) => tx,
428 None => {
429 tracing::warn!(%id, "no pending request exists for the response ID");
430 return
431 }
432 };
433
434 let _ = response_tx.send(result.map_err(Into::into));
437 }
438
439 fn send_notification(&self, params: Params<'_>) {
442 let subs = self.subs.borrow();
444 let tx = match subs.get(¶ms.subscription) {
445 Some(tx) => tx,
446 None => {
447 tracing::warn!(
448 id = ?params.subscription,
449 "no subscription exists for the notification ID"
450 );
451 return
452 }
453 };
454
455 let _ = tx.unbounded_send(params.result.to_owned());
458 }
459}
460
461#[derive(Debug, Error)]
463pub enum IpcError {
464 #[error(transparent)]
466 JsonError(#[from] serde_json::Error),
467
468 #[error(transparent)]
470 IoError(#[from] io::Error),
471
472 #[error(transparent)]
474 JsonRpcError(#[from] JsonRpcError),
475
476 #[error("{0}")]
478 ChannelError(String),
479
480 #[error(transparent)]
482 RequestCancelled(#[from] RecvError),
483
484 #[error("The IPC server has exited")]
486 ServerExit,
487}
488
489impl From<IpcError> for ProviderError {
490 fn from(src: IpcError) -> Self {
491 ProviderError::JsonRpcClientError(Box::new(src))
492 }
493}
494
495impl crate::RpcError for IpcError {
496 fn as_error_response(&self) -> Option<&super::JsonRpcError> {
497 if let IpcError::JsonRpcError(err) = self {
498 Some(err)
499 } else {
500 None
501 }
502 }
503
504 fn as_serde_error(&self) -> Option<&serde_json::Error> {
505 match self {
506 IpcError::JsonError(err) => Some(err),
507 _ => None,
508 }
509 }
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515 use ethers_core::utils::{Geth, GethInstance};
516 use std::time::Duration;
517 use tempfile::NamedTempFile;
518
519 async fn connect() -> (Ipc, GethInstance) {
520 let temp_file = NamedTempFile::new().unwrap();
521 let path = temp_file.into_temp_path().to_path_buf();
522 let geth = Geth::new().block_time(1u64).ipc_path(&path).spawn();
523
524 #[cfg(windows)]
527 let path = format!(r"\\.\pipe\{}", path.display());
528 let ipc = Ipc::connect(path).await.unwrap();
529
530 (ipc, geth)
531 }
532
533 #[tokio::test]
534 async fn request() {
535 let (ipc, _geth) = connect().await;
536
537 let block_num: U256 = ipc.request("eth_blockNumber", ()).await.unwrap();
538 tokio::time::sleep(Duration::from_secs(2)).await;
539 let block_num2: U256 = ipc.request("eth_blockNumber", ()).await.unwrap();
540 assert!(block_num2 > block_num);
541 }
542
543 #[tokio::test]
544 #[cfg(not(feature = "celo"))]
545 async fn subscription() {
546 use ethers_core::types::{Block, TxHash};
547
548 let (ipc, _geth) = connect().await;
549
550 let sub_id: U256 = ipc.request("eth_subscribe", ["newHeads"]).await.unwrap();
553 let stream = ipc.subscribe(sub_id).unwrap();
554
555 let blocks: Vec<u64> = stream
556 .take(3)
557 .map(|item| {
558 let block: Block<TxHash> = serde_json::from_str(item.get()).unwrap();
559 block.number.unwrap_or_default().as_u64()
560 })
561 .collect()
562 .await;
563 assert_eq!(blocks[2], blocks[1] + 1);
565 assert_eq!(blocks[1], blocks[0] + 1);
566 }
567}