bs_gl_plugin/node/
rpcwait.rs

1use tonic::transport::NamedService;
2use log::warn;
3use tower::Service;
4
5/// The RPC socket will not be available right away, so we wrap the
6/// cln-grpc service with this `Service` which essentially checks for
7/// the file's existence, and if it doesn't exist we wait for up to 5
8/// seconds for it to appear.
9#[derive(Debug, Clone)]
10pub struct RpcWaitService<S> {
11    rpc_path: std::path::PathBuf,
12    inner: S,
13}
14
15impl<S> RpcWaitService<S> {
16    pub fn new(inner: S, rpc_path: std::path::PathBuf) -> Self {
17        RpcWaitService { rpc_path, inner }
18    }
19}
20
21impl<S> Service<hyper::Request<hyper::Body>> for RpcWaitService<S>
22where
23    S: Service<hyper::Request<hyper::Body>, Response = hyper::Response<tonic::body::BoxBody>>
24        + Clone
25        + Send
26        + 'static,
27    S::Future: Send + 'static,
28{
29    type Response = S::Response;
30    type Error = S::Error;
31    type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
32
33    fn poll_ready(
34        &mut self,
35        cx: &mut std::task::Context<'_>,
36    ) -> std::task::Poll<Result<(), Self::Error>> {
37        self.inner.poll_ready(cx)
38    }
39
40    fn call(&mut self, request: hyper::Request<hyper::Body>) -> Self::Future {
41        // This is necessary because tonic internally uses `tower::buffer::Buffer`.
42        // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
43        // for details on why this is necessary
44        let clone = self.inner.clone();
45        let mut inner = std::mem::replace(&mut self.inner, clone);
46        let path = self.rpc_path.clone();
47        Box::pin(async move {
48            let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(5);
49            loop {
50                if deadline < tokio::time::Instant::now() {
51                    // Break and let it fail in the `inner.call`
52                    warn!("Deadline reached, letting the call fail");
53                    break;
54                }
55                match path.metadata() {
56                    Ok(_) => break,
57                    Err(_) => tokio::time::sleep(tokio::time::Duration::from_millis(500)).await,
58                }
59            }
60            inner.call(request).await
61        })
62    }
63}
64
65impl<S> NamedService for RpcWaitService<S> {
66    // Well, this is cheating a bit, since we'll only ever wrap the
67    // cln.Node we can have this fixed.
68    const NAME: &'static str = "cln.Node";
69}