ethers_providers/rpc/transports/
ipc.rs

1use super::common::Params;
2use async_trait::async_trait;
3use bytes::{Buf, BytesMut};
4use ethers_core::types::U256;
5use futures_channel::mpsc;
6use futures_util::stream::StreamExt;
7use hashers::fx_hash::FxHasher64;
8use serde::{de::DeserializeOwned, Serialize};
9use serde_json::{value::RawValue, Deserializer};
10use std::{
11    cell::RefCell,
12    convert::Infallible,
13    hash::BuildHasherDefault,
14    io,
15    path::Path,
16    sync::{
17        atomic::{AtomicU64, Ordering},
18        Arc,
19    },
20    thread,
21};
22use thiserror::Error;
23use tokio::{
24    io::{AsyncReadExt, AsyncWriteExt, BufReader},
25    runtime,
26    sync::oneshot::{self, error::RecvError},
27};
28
29use super::common::{JsonRpcError, Request, Response};
30use crate::{errors::ProviderError, JsonRpcClient, PubsubClient};
31
32type FxHashMap<K, V> = std::collections::HashMap<K, V, BuildHasherDefault<FxHasher64>>;
33
34type Pending = oneshot::Sender<Result<Box<RawValue>, JsonRpcError>>;
35type Subscription = mpsc::UnboundedSender<Box<RawValue>>;
36
37#[cfg(unix)]
38#[doc(hidden)]
39mod imp {
40    pub(super) use tokio::net::{
41        unix::{ReadHalf, WriteHalf},
42        UnixStream as Stream,
43    };
44}
45
46#[cfg(windows)]
47#[doc(hidden)]
48mod imp {
49    use super::*;
50    use std::{
51        ops::{Deref, DerefMut},
52        pin::Pin,
53        task::{Context, Poll},
54        time::Duration,
55    };
56    use tokio::{
57        io::{AsyncRead, AsyncWrite, ReadBuf},
58        net::windows::named_pipe::{ClientOptions, NamedPipeClient},
59        time::sleep,
60    };
61    use winapi::shared::winerror;
62
63    /// Wrapper around [NamedPipeClient] to have the same methods as a UnixStream.
64    ///
65    /// Should not be exported.
66    #[repr(transparent)]
67    pub(super) struct Stream(pub NamedPipeClient);
68
69    impl Deref for Stream {
70        type Target = NamedPipeClient;
71
72        fn deref(&self) -> &Self::Target {
73            &self.0
74        }
75    }
76
77    impl DerefMut for Stream {
78        fn deref_mut(&mut self) -> &mut Self::Target {
79            &mut self.0
80        }
81    }
82
83    impl Stream {
84        pub async fn connect(addr: impl AsRef<Path>) -> Result<Self, io::Error> {
85            let addr = addr.as_ref().as_os_str();
86            loop {
87                match ClientOptions::new().open(addr) {
88                    Ok(client) => break Ok(Self(client)),
89                    Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
90                    Err(e) => break Err(e),
91                }
92
93                sleep(Duration::from_millis(50)).await;
94            }
95        }
96
97        #[allow(unsafe_code)]
98        pub fn split(&mut self) -> (ReadHalf, WriteHalf) {
99            // SAFETY: ReadHalf cannot write but still needs a mutable reference for polling.
100            // NamedPipeClient calls its `io` using immutable references, but it's private.
101            let self1 = unsafe { &mut *(self as *mut Self) };
102            let self2 = self;
103            (ReadHalf(self1), WriteHalf(self2))
104        }
105    }
106
107    impl AsyncRead for Stream {
108        fn poll_read(
109            self: Pin<&mut Self>,
110            cx: &mut Context<'_>,
111            buf: &mut ReadBuf<'_>,
112        ) -> Poll<io::Result<()>> {
113            let this = Pin::new(&mut self.get_mut().0);
114            this.poll_read(cx, buf)
115        }
116    }
117
118    impl AsyncWrite for Stream {
119        fn poll_write(
120            self: Pin<&mut Self>,
121            cx: &mut Context<'_>,
122            buf: &[u8],
123        ) -> Poll<io::Result<usize>> {
124            let this = Pin::new(&mut self.get_mut().0);
125            this.poll_write(cx, buf)
126        }
127
128        fn poll_write_vectored(
129            self: Pin<&mut Self>,
130            cx: &mut Context<'_>,
131            bufs: &[io::IoSlice<'_>],
132        ) -> Poll<io::Result<usize>> {
133            let this = Pin::new(&mut self.get_mut().0);
134            this.poll_write_vectored(cx, bufs)
135        }
136
137        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
138            Poll::Ready(Ok(()))
139        }
140
141        fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
142            self.poll_flush(cx)
143        }
144    }
145
146    pub(super) struct ReadHalf<'a>(pub &'a mut Stream);
147
148    pub(super) struct WriteHalf<'a>(pub &'a mut Stream);
149
150    impl AsyncRead for ReadHalf<'_> {
151        fn poll_read(
152            self: Pin<&mut Self>,
153            cx: &mut Context<'_>,
154            buf: &mut ReadBuf<'_>,
155        ) -> Poll<io::Result<()>> {
156            let this = Pin::new(&mut self.get_mut().0 .0);
157            this.poll_read(cx, buf)
158        }
159    }
160
161    impl AsyncWrite for WriteHalf<'_> {
162        fn poll_write(
163            self: Pin<&mut Self>,
164            cx: &mut Context<'_>,
165            buf: &[u8],
166        ) -> Poll<io::Result<usize>> {
167            let this = Pin::new(&mut self.get_mut().0 .0);
168            this.poll_write(cx, buf)
169        }
170
171        fn poll_write_vectored(
172            self: Pin<&mut Self>,
173            cx: &mut Context<'_>,
174            bufs: &[io::IoSlice<'_>],
175        ) -> Poll<io::Result<usize>> {
176            let this = Pin::new(&mut self.get_mut().0 .0);
177            this.poll_write_vectored(cx, bufs)
178        }
179
180        fn is_write_vectored(&self) -> bool {
181            self.0.is_write_vectored()
182        }
183
184        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
185            let this = Pin::new(&mut self.get_mut().0 .0);
186            this.poll_flush(cx)
187        }
188
189        fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
190            self.poll_flush(cx)
191        }
192    }
193}
194
195use self::imp::*;
196
197#[cfg_attr(unix, doc = "A JSON-RPC Client over Unix IPC.")]
198#[cfg_attr(windows, doc = "A JSON-RPC Client over named pipes.")]
199///
200/// # Example
201///
202/// ```no_run
203/// # async fn foo() -> Result<(), Box<dyn std::error::Error>> {
204/// use ethers_providers::Ipc;
205///
206/// // the ipc's path
207#[cfg_attr(unix, doc = r#"let path = "/home/user/.local/share/reth/reth.ipc";"#)]
208#[cfg_attr(windows, doc = r#"let path = r"\\.\pipe\reth.ipc";"#)]
209/// let ipc = Ipc::connect(path).await?;
210/// # Ok(())
211/// # }
212/// ```
213#[derive(Debug, Clone)]
214pub struct Ipc {
215    id: Arc<AtomicU64>,
216    request_tx: mpsc::UnboundedSender<TransportMessage>,
217}
218
219#[derive(Debug)]
220enum TransportMessage {
221    Request { id: u64, request: Box<[u8]>, sender: Pending },
222    Subscribe { id: U256, sink: Subscription },
223    Unsubscribe { id: U256 },
224}
225
226impl Ipc {
227    #[cfg_attr(unix, doc = "Connects to the Unix socket at the provided path.")]
228    #[cfg_attr(windows, doc = "Connects to the named pipe at the provided path.\n")]
229    #[cfg_attr(
230        windows,
231        doc = r"Note: the path must be the fully qualified, like: `\\.\pipe\<name>`."
232    )]
233    pub async fn connect(path: impl AsRef<Path>) -> Result<Self, IpcError> {
234        let id = Arc::new(AtomicU64::new(1));
235        let (request_tx, request_rx) = mpsc::unbounded();
236
237        let stream = Stream::connect(path).await?;
238        spawn_ipc_server(stream, request_rx);
239
240        Ok(Self { id, request_tx })
241    }
242
243    fn send(&self, msg: TransportMessage) -> Result<(), IpcError> {
244        self.request_tx
245            .unbounded_send(msg)
246            .map_err(|_| IpcError::ChannelError("IPC server receiver dropped".to_string()))?;
247
248        Ok(())
249    }
250}
251
252#[async_trait]
253impl JsonRpcClient for Ipc {
254    type Error = IpcError;
255
256    async fn request<T: Serialize + Send + Sync, R: DeserializeOwned>(
257        &self,
258        method: &str,
259        params: T,
260    ) -> Result<R, IpcError> {
261        let next_id = self.id.fetch_add(1, Ordering::SeqCst);
262
263        // Create the request and initialize the response channel
264        let (sender, receiver) = oneshot::channel();
265        let payload = TransportMessage::Request {
266            id: next_id,
267            request: serde_json::to_vec(&Request::new(next_id, method, params))?.into_boxed_slice(),
268            sender,
269        };
270
271        // Send the request to the IPC server to be handled.
272        self.send(payload)?;
273
274        // Wait for the response from the IPC server.
275        let res = receiver.await??;
276
277        // Parse JSON response.
278        Ok(serde_json::from_str(res.get())?)
279    }
280}
281
282impl PubsubClient for Ipc {
283    type NotificationStream = mpsc::UnboundedReceiver<Box<RawValue>>;
284
285    fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, IpcError> {
286        let (sink, stream) = mpsc::unbounded();
287        self.send(TransportMessage::Subscribe { id: id.into(), sink })?;
288        Ok(stream)
289    }
290
291    fn unsubscribe<T: Into<U256>>(&self, id: T) -> Result<(), IpcError> {
292        self.send(TransportMessage::Unsubscribe { id: id.into() })
293    }
294}
295
296fn spawn_ipc_server(stream: Stream, request_rx: mpsc::UnboundedReceiver<TransportMessage>) {
297    // 256 Kb should be more than enough for this thread, as all unbounded data
298    // growth occurs on heap-allocated data structures and buffers and the call
299    // stack is not going to do anything crazy either
300    const STACK_SIZE: usize = 1 << 18;
301    // spawn a light-weight thread with a thread-local async runtime just for
302    // sending and receiving data over the IPC socket
303    let _ = thread::Builder::new()
304        .name("ipc-server-thread".to_string())
305        .stack_size(STACK_SIZE)
306        .spawn(move || {
307            let rt = runtime::Builder::new_current_thread()
308                .enable_io()
309                .build()
310                .expect("failed to create ipc-server-thread async runtime");
311
312            rt.block_on(run_ipc_server(stream, request_rx));
313        })
314        .expect("failed to spawn ipc server thread");
315}
316
317async fn run_ipc_server(mut stream: Stream, request_rx: mpsc::UnboundedReceiver<TransportMessage>) {
318    // the shared state for both reads & writes
319    let shared = Shared {
320        pending: FxHashMap::with_capacity_and_hasher(64, BuildHasherDefault::default()).into(),
321        subs: FxHashMap::with_capacity_and_hasher(64, BuildHasherDefault::default()).into(),
322    };
323
324    // split the stream and run two independent concurrently (local), thereby
325    // allowing reads and writes to occurr concurrently
326    let (reader, writer) = stream.split();
327    let read = shared.handle_ipc_reads(reader);
328    let write = shared.handle_ipc_writes(writer, request_rx);
329
330    // run both loops concurrently, until either encounts an error
331    if let Err(e) = futures_util::try_join!(read, write) {
332        match e {
333            IpcError::ServerExit => {}
334            err => tracing::error!(?err, "exiting IPC server due to error"),
335        }
336    }
337}
338
339struct Shared {
340    pending: RefCell<FxHashMap<u64, Pending>>,
341    subs: RefCell<FxHashMap<U256, Subscription>>,
342}
343
344impl Shared {
345    async fn handle_ipc_reads(&self, reader: ReadHalf<'_>) -> Result<Infallible, IpcError> {
346        let mut reader = BufReader::new(reader);
347        let mut buf = BytesMut::with_capacity(4096);
348
349        loop {
350            // try to read the next batch of bytes into the buffer
351            let read = reader.read_buf(&mut buf).await?;
352            if read == 0 {
353                // eof, socket was closed
354                return Err(IpcError::ServerExit)
355            }
356
357            // parse the received bytes into 0-n jsonrpc messages
358            let read = self.handle_bytes(&buf)?;
359            // split off all bytes that were parsed into complete messages
360            // any remaining bytes that correspond to incomplete messages remain
361            // in the buffer
362            buf.advance(read);
363        }
364    }
365
366    async fn handle_ipc_writes(
367        &self,
368        mut writer: WriteHalf<'_>,
369        mut request_rx: mpsc::UnboundedReceiver<TransportMessage>,
370    ) -> Result<Infallible, IpcError> {
371        use TransportMessage::*;
372
373        while let Some(msg) = request_rx.next().await {
374            match msg {
375                Request { id, request, sender } => {
376                    let prev = self.pending.borrow_mut().insert(id, sender);
377                    assert!(prev.is_none(), "{}", "replaced pending IPC request (id={id})");
378
379                    if let Err(err) = writer.write_all(&request).await {
380                        tracing::error!("IPC connection error: {:?}", err);
381                        self.pending.borrow_mut().remove(&id);
382                    }
383                }
384                Subscribe { id, sink } => {
385                    if self.subs.borrow_mut().insert(id, sink).is_some() {
386                        tracing::warn!(
387                            %id,
388                            "replaced already-registered subscription"
389                        );
390                    }
391                }
392                Unsubscribe { id } => {
393                    if self.subs.borrow_mut().remove(&id).is_none() {
394                        tracing::warn!(
395                            %id,
396                            "attempted to unsubscribe from non-existent subscription"
397                        );
398                    }
399                }
400            }
401        }
402
403        // the request receiver will only be closed if the sender instance
404        // located within the transport handle is dropped, this is not truly an
405        // error but leads to the `try_join` in `run_ipc_server` to cancel the
406        // read half future
407        Err(IpcError::ServerExit)
408    }
409
410    fn handle_bytes(&self, bytes: &BytesMut) -> Result<usize, IpcError> {
411        // deserialize all complete jsonrpc responses in the buffer
412        let mut de = Deserializer::from_slice(bytes.as_ref()).into_iter();
413        while let Some(Ok(response)) = de.next() {
414            match response {
415                Response::Success { id, result } => self.send_response(id, Ok(result.to_owned())),
416                Response::Error { id, error } => self.send_response(id, Err(error)),
417                Response::Notification { params, .. } => self.send_notification(params),
418            };
419        }
420
421        Ok(de.byte_offset())
422    }
423
424    fn send_response(&self, id: u64, result: Result<Box<RawValue>, JsonRpcError>) {
425        // retrieve the channel sender for responding to the pending request
426        let response_tx = match self.pending.borrow_mut().remove(&id) {
427            Some(tx) => tx,
428            None => {
429                tracing::warn!(%id, "no pending request exists for the response ID");
430                return
431            }
432        };
433
434        // a failure to send the response indicates that the pending request has
435        // been dropped in the mean time
436        let _ = response_tx.send(result.map_err(Into::into));
437    }
438
439    /// Sends notification through the channel based on the ID of the subscription.
440    /// This handles streaming responses.
441    fn send_notification(&self, params: Params<'_>) {
442        // retrieve the channel sender for notifying the subscription stream
443        let subs = self.subs.borrow();
444        let tx = match subs.get(&params.subscription) {
445            Some(tx) => tx,
446            None => {
447                tracing::warn!(
448                    id = ?params.subscription,
449                    "no subscription exists for the notification ID"
450                );
451                return
452            }
453        };
454
455        // a failure to send the response indicates that the pending request has
456        // been dropped in the mean time (and should have been unsubscribed!)
457        let _ = tx.unbounded_send(params.result.to_owned());
458    }
459}
460
461/// Error thrown when sending or receiving an IPC message.
462#[derive(Debug, Error)]
463pub enum IpcError {
464    /// Thrown if deserialization failed
465    #[error(transparent)]
466    JsonError(#[from] serde_json::Error),
467
468    /// std IO error forwarding.
469    #[error(transparent)]
470    IoError(#[from] io::Error),
471
472    /// Server responded to the request with a valid JSON-RPC error response
473    #[error(transparent)]
474    JsonRpcError(#[from] JsonRpcError),
475
476    /// Internal channel failed
477    #[error("{0}")]
478    ChannelError(String),
479
480    /// Listener for request result is gone
481    #[error(transparent)]
482    RequestCancelled(#[from] RecvError),
483
484    /// IPC server exited
485    #[error("The IPC server has exited")]
486    ServerExit,
487}
488
489impl From<IpcError> for ProviderError {
490    fn from(src: IpcError) -> Self {
491        ProviderError::JsonRpcClientError(Box::new(src))
492    }
493}
494
495impl crate::RpcError for IpcError {
496    fn as_error_response(&self) -> Option<&super::JsonRpcError> {
497        if let IpcError::JsonRpcError(err) = self {
498            Some(err)
499        } else {
500            None
501        }
502    }
503
504    fn as_serde_error(&self) -> Option<&serde_json::Error> {
505        match self {
506            IpcError::JsonError(err) => Some(err),
507            _ => None,
508        }
509    }
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515    use ethers_core::utils::{Geth, GethInstance};
516    use std::time::Duration;
517    use tempfile::NamedTempFile;
518
519    async fn connect() -> (Ipc, GethInstance) {
520        let temp_file = NamedTempFile::new().unwrap();
521        let path = temp_file.into_temp_path().to_path_buf();
522        let geth = Geth::new().block_time(1u64).ipc_path(&path).spawn();
523
524        // [Windows named pipes](https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipes)
525        // are located at `\\<machine_address>\pipe\<pipe_name>`.
526        #[cfg(windows)]
527        let path = format!(r"\\.\pipe\{}", path.display());
528        let ipc = Ipc::connect(path).await.unwrap();
529
530        (ipc, geth)
531    }
532
533    #[tokio::test]
534    async fn request() {
535        let (ipc, _geth) = connect().await;
536
537        let block_num: U256 = ipc.request("eth_blockNumber", ()).await.unwrap();
538        tokio::time::sleep(Duration::from_secs(2)).await;
539        let block_num2: U256 = ipc.request("eth_blockNumber", ()).await.unwrap();
540        assert!(block_num2 > block_num);
541    }
542
543    #[tokio::test]
544    #[cfg(not(feature = "celo"))]
545    async fn subscription() {
546        use ethers_core::types::{Block, TxHash};
547
548        let (ipc, _geth) = connect().await;
549
550        // Subscribing requires sending the sub request and then subscribing to
551        // the returned sub_id
552        let sub_id: U256 = ipc.request("eth_subscribe", ["newHeads"]).await.unwrap();
553        let stream = ipc.subscribe(sub_id).unwrap();
554
555        let blocks: Vec<u64> = stream
556            .take(3)
557            .map(|item| {
558                let block: Block<TxHash> = serde_json::from_str(item.get()).unwrap();
559                block.number.unwrap_or_default().as_u64()
560            })
561            .collect()
562            .await;
563        // `[1, 2, 3]` or `[2, 3, 4]` etc, depending on test latency
564        assert_eq!(blocks[2], blocks[1] + 1);
565        assert_eq!(blocks[1], blocks[0] + 1);
566    }
567}