bs_gl_plugin/node/
mod.rs

1use crate::config::Config;
2use crate::pb::{self, node_server::Node};
3use crate::rpc::LightningClient;
4use crate::stager;
5use crate::storage::StateStore;
6use crate::{messages, Event};
7use anyhow::{Context, Error, Result};
8use base64::{engine::general_purpose, Engine as _};
9use bytes::BufMut;
10use bs_gl_client::persist::State;
11use governor::{
12    clock::MonotonicClock, state::direct::NotKeyed, state::InMemoryState, Quota, RateLimiter,
13};
14use lazy_static::lazy_static;
15use log::{debug, error, info, trace, warn};
16use serde_json::json;
17use std::path::PathBuf;
18use std::sync::atomic::AtomicBool;
19use std::sync::{
20    atomic::{AtomicUsize, Ordering},
21    Arc,
22};
23use tokio::sync::{broadcast, mpsc, Mutex, OnceCell};
24use tokio_stream::wrappers::ReceiverStream;
25use tonic::{transport::ServerTlsConfig, Code, Request, Response, Status};
26mod wrapper;
27pub use wrapper::WrappedNodeServer;
28use bs_gl_client::bitcoin;
29use std::str::FromStr;
30
31
32static LIMITER: OnceCell<RateLimiter<NotKeyed, InMemoryState, MonotonicClock>> =
33    OnceCell::const_new();
34
35lazy_static! {
36    static ref HSM_ID_COUNT: AtomicUsize = AtomicUsize::new(0);
37
38    /// The number of signers that are currently connected (best guess
39    /// due to races). Allows us to determine whether we should
40    /// initiate operations that might require signatures.
41    static ref SIGNER_COUNT: AtomicUsize = AtomicUsize::new(0);
42    static ref RPC_BCAST: broadcast::Sender<super::Event> = broadcast::channel(4).0;
43
44    static ref SERIALIZED_CONFIGURE_REQUEST: Mutex<Option<String>> = Mutex::new(None);
45
46    static ref RPC_READY: AtomicBool = AtomicBool::new(false);
47}
48
49/// The PluginNodeServer is the interface that is exposed to client devices
50/// and is in charge of coordinating the various user-controlled
51/// entities. This includes dispatching incoming RPC calls to the JSON-RPC
52/// interface, as well as staging requests from the HSM so that they can be
53/// streamed and replied to by devices that have access to the signing keys.
54#[derive(Clone)]
55pub struct PluginNodeServer {
56    pub tls: ServerTlsConfig,
57    pub stage: Arc<stager::Stage>,
58    pub rpc: Arc<Mutex<LightningClient>>,
59    rpc_path: PathBuf,
60    events: tokio::sync::broadcast::Sender<super::Event>,
61    signer_state: Arc<Mutex<State>>,
62    grpc_binding: String,
63    signer_state_store: Arc<Mutex<Box<dyn StateStore>>>,
64    pub ctx: crate::context::Context,
65}
66
67impl PluginNodeServer {
68    pub async fn new(
69        stage: Arc<stager::Stage>,
70        config: Config,
71        events: tokio::sync::broadcast::Sender<super::Event>,
72        signer_state_store: Box<dyn StateStore>,
73    ) -> Result<Self, Error> {
74        let tls = ServerTlsConfig::new()
75            .identity(config.identity.id)
76            .client_ca_root(config.identity.ca);
77
78        let mut rpc_path = std::env::current_dir().unwrap();
79        rpc_path.push("lightning-rpc");
80        info!("Connecting to lightning-rpc at {:?}", rpc_path);
81
82        let rpc = Arc::new(Mutex::new(LightningClient::new(rpc_path.clone())));
83
84        // Bridge the RPC_BCAST into the events queue
85        let tx = events.clone();
86        tokio::spawn(async move {
87            let mut rx = RPC_BCAST.subscribe();
88            loop {
89                if let Ok(e) = rx.recv().await {
90                    let _ = tx.send(e);
91                }
92            }
93        });
94
95        let signer_state = signer_state_store.read().await?;
96
97        let ctx = crate::context::Context::new();
98
99        let rrpc = rpc.clone();
100
101        let s = PluginNodeServer {
102            ctx,
103            tls,
104            rpc,
105            stage,
106            events,
107            rpc_path,
108            signer_state: Arc::new(Mutex::new(signer_state)),
109            signer_state_store: Arc::new(Mutex::new(signer_state_store)),
110            grpc_binding: config.node_grpc_binding,
111        };
112
113        tokio::spawn(async move {
114            debug!("Locking grpc interface until the JSON-RPC interface becomes available.");
115            use tokio::time::{sleep, Duration};
116
117            // Move the lock into the closure so we can release it later.
118            let rpc = rrpc.lock().await;
119            loop {
120                let res: Result<crate::responses::GetInfo, crate::rpc::Error> =
121                    rpc.call("getinfo", json!({})).await;
122                match res {
123                    Ok(_) => break,
124                    Err(e) => {
125                        warn!(
126                            "JSON-RPC interface not yet available. Delaying 50ms. {:?}",
127                            e
128                        );
129                        sleep(Duration::from_millis(50)).await;
130                    }
131                }
132            }
133
134	    // Signal that the RPC is ready now.
135	    RPC_READY.store(true, Ordering::SeqCst);
136
137            let list_datastore_req = cln_rpc::model::requests::ListdatastoreRequest{
138                key: Some(vec![
139                    "glconf".to_string(),
140                    "request".to_string()
141                ])
142            };
143
144            let res: Result<cln_rpc::model::responses::ListdatastoreResponse, crate::rpc::Error> =
145                rpc.call("listdatastore", list_datastore_req).await;
146
147            match res {
148                Ok(list_datastore_res) => {
149                    if list_datastore_res.datastore.len() > 0 {
150                        let serialized_configure_request = list_datastore_res.datastore[0].string.clone();
151                        match serialized_configure_request {
152                            Some(serialized_configure_request) => {
153                                let mut cached_serialized_configure_request = SERIALIZED_CONFIGURE_REQUEST.lock().await;
154                                *cached_serialized_configure_request = Some(serialized_configure_request);
155                            }
156                            None => {}
157                        }
158                    }
159                }
160                Err(_) => {}
161            }
162            
163            drop(rpc);
164        });
165
166        Ok(s)
167    }
168
169    // Wait for the limiter to allow a new RPC call
170    pub async fn limit(&self) {
171        let limiter = LIMITER
172            .get_or_init(|| async {
173                let quota = Quota::per_minute(core::num::NonZeroU32::new(300).unwrap());
174                RateLimiter::direct_with_clock(quota, &MonotonicClock::default())
175            })
176            .await;
177
178        limiter.until_ready().await
179    }
180
181    pub async fn get_rpc(&self) -> LightningClient {
182        let rpc = self.rpc.lock().await;
183        let r = rpc.clone();
184        drop(rpc);
185        r
186    }
187}
188
189#[tonic::async_trait]
190impl Node for PluginNodeServer {
191    type StreamCustommsgStream = ReceiverStream<Result<pb::Custommsg, Status>>;
192    type StreamHsmRequestsStream = ReceiverStream<Result<pb::HsmRequest, Status>>;
193    type StreamLogStream = ReceiverStream<Result<pb::LogEntry, Status>>;
194
195    async fn stream_custommsg(
196        &self,
197        _: Request<pb::StreamCustommsgRequest>,
198    ) -> Result<Response<Self::StreamCustommsgStream>, Status> {
199        log::debug!("Added a new listener for custommsg");
200        let (tx, rx) = mpsc::channel(1);
201        let mut stream = self.events.subscribe();
202        // TODO: We can do better by returning the broadcast receiver
203        // directly. Well really we should be filtering the events by
204        // type, so maybe a `.map()` on the stream can work?
205        tokio::spawn(async move {
206            while let Ok(msg) = stream.recv().await {
207                if let Event::CustomMsg(m) = msg {
208                    log::trace!("Forwarding custommsg {:?} to listener", m);
209                    if let Err(e) = tx.send(Ok(m)).await {
210                        log::warn!("Unable to send custmmsg to listener: {:?}", e);
211                        break;
212                    }
213                }
214            }
215            panic!("stream.recv loop exited...");
216        });
217        return Ok(Response::new(ReceiverStream::new(rx)));
218    }
219
220    async fn stream_log(
221        &self,
222        _: Request<pb::StreamLogRequest>,
223    ) -> Result<Response<Self::StreamLogStream>, Status> {
224        match async {
225            let (tx, rx) = mpsc::channel(1);
226            let mut lines = linemux::MuxedLines::new()?;
227            lines.add_file("/tmp/log").await?;
228
229            // TODO: Yes, this may produce duplicate lines, when new
230            // log entries are produced while we're streaming the
231            // backlog out, but do we care?
232            use tokio::io::{AsyncBufReadExt, BufReader};
233            let file = tokio::fs::File::open("/tmp/log").await?;
234            let mut file = BufReader::new(file).lines();
235
236            tokio::spawn(async move {
237                match async {
238                    while let Some(line) = file.next_line().await? {
239                        tx.send(Ok(pb::LogEntry {
240                            line: line.trim().to_owned(),
241                        }))
242                        .await?
243                    }
244
245                    while let Ok(Some(line)) = lines.next_line().await {
246                        tx.send(Ok(pb::LogEntry {
247                            line: line.line().trim().to_string(),
248                        }))
249                        .await?;
250                    }
251                    Ok(())
252                }
253                .await as Result<(), anyhow::Error>
254                {
255                    Ok(()) => {}
256                    Err(e) => {
257                        warn!("error streaming logs to client: {}", e);
258                    }
259                }
260            });
261            Ok(ReceiverStream::new(rx))
262        }
263        .await as Result<Self::StreamLogStream, anyhow::Error>
264        {
265            Ok(v) => Ok(Response::new(v)),
266            Err(e) => Err(Status::new(Code::Internal, e.to_string())),
267        }
268    }
269
270    async fn stream_hsm_requests(
271        &self,
272        _request: Request<pb::Empty>,
273    ) -> Result<Response<Self::StreamHsmRequestsStream>, Status> {
274        let hsm_id = HSM_ID_COUNT.fetch_add(1, Ordering::SeqCst);
275        SIGNER_COUNT.fetch_add(1, Ordering::SeqCst);
276        info!(
277            "New signer with hsm_id={} attached, streaming requests",
278            hsm_id
279        );
280
281        let (tx, rx) = mpsc::channel(10);
282        let mut stream = self.stage.mystream().await;
283        let signer_state = self.signer_state.clone();
284        let ctx = self.ctx.clone();
285
286        tokio::spawn(async move {
287            trace!("hsmd hsm_id={} request processor started", hsm_id);
288
289            {
290                // We start by immediately injecting a
291                // vls_protocol::Message::GetHeartbeat. This serves two
292                // purposes: already send the initial snapshot of the
293                // signer state to the signer as early as possible, and
294                // triggering a pruning on the signer, if enabled. In
295                // incremental mode this ensures that any subsequent,
296                // presumably time-critical messages, do not have to carry
297                // the large state with them.
298
299                let state = signer_state.lock().await.clone();
300                let state: Vec<bs_gl_client::pb::SignerStateEntry> = state.into();
301                let state: Vec<pb::SignerStateEntry> = state
302                    .into_iter()
303                    .map(|s| pb::SignerStateEntry {
304                        key: s.key,
305                        version: s.version,
306                        value: s.value,
307                    })
308                    .collect();
309
310                let msg = vls_protocol::msgs::GetHeartbeat {};
311                use vls_protocol::msgs::SerBolt;
312                let req = crate::pb::HsmRequest {
313                    // Notice that the request_counter starts at 1000, to
314                    // avoid collisions.
315                    request_id: 0,
316                    signer_state: state,
317                    raw: msg.as_vec(),
318                    requests: vec![], // No pending requests yet, nothing to authorize.
319                    context: None,
320                };
321
322                if let Err(e) = tx.send(Ok(req)).await {
323                    log::warn!("Failed to send heartbeat message to signer: {}", e);
324                }
325            }
326
327            loop {
328                let mut req = match stream.next().await {
329                    Err(e) => {
330                        error!(
331                            "Could not get next request from stage: {:?} for hsm_id={}",
332                            e, hsm_id
333                        );
334                        break;
335                    }
336                    Ok(r) => r,
337                };
338                trace!(
339                    "Sending request={} to hsm_id={}",
340                    req.request.request_id,
341                    hsm_id
342                );
343
344                let state = signer_state.lock().await.clone();
345                let state: Vec<bs_gl_client::pb::SignerStateEntry> = state.into();
346
347                // TODO Consolidate protos in `gl-client` and `gl-plugin`, then remove this map.
348                let state: Vec<pb::SignerStateEntry> = state
349                    .into_iter()
350                    .map(|s| pb::SignerStateEntry {
351                        key: s.key,
352                        version: s.version,
353                        value: s.value,
354                    })
355                    .collect();
356
357                req.request.signer_state = state.into();
358                req.request.requests = ctx.snapshot().await.into_iter().map(|r| r.into()).collect();
359
360                let serialized_configure_request = SERIALIZED_CONFIGURE_REQUEST.lock().await;
361
362                match &(*serialized_configure_request) {
363                    Some(serialized_configure_request) => {
364                        let configure_request = serde_json::from_str::<crate::context::Request>(
365                            serialized_configure_request,
366                        )
367                        .unwrap();
368                        req.request.requests.push(configure_request.into());
369                    }
370                    None => {}
371                }
372
373                debug!(
374                    "Sending signer requests with {} requests and {} state entries",
375                    req.request.requests.len(),
376                    req.request.signer_state.len()
377                );
378
379                if let Err(e) = tx.send(Ok(req.request)).await {
380                    warn!("Error streaming request {:?} to hsm_id={}", e, hsm_id);
381                    break;
382                }
383            }
384            info!("Signer hsm_id={} exited", hsm_id);
385            SIGNER_COUNT.fetch_sub(1, Ordering::SeqCst);
386        });
387
388        trace!("Returning stream_hsm_request channel");
389        Ok(Response::new(ReceiverStream::new(rx)))
390    }
391
392    async fn respond_hsm_request(
393        &self,
394        request: Request<pb::HsmResponse>,
395    ) -> Result<Response<pb::Empty>, Status> {
396        let req = request.into_inner();
397        // Create a state from the key-value-version tuples. Need to
398        // convert here, since `pb` is duplicated in the two different
399        // crates.
400        let signer_state: Vec<bs_gl_client::pb::SignerStateEntry> = req
401            .signer_state
402            .iter()
403            .map(|i| bs_gl_client::pb::SignerStateEntry {
404                key: i.key.to_owned(),
405                value: i.value.to_owned(),
406                version: i.version,
407            })
408            .collect();
409        let new_state: bs_gl_client::persist::State = signer_state.into();
410
411        {
412            // Apply state changes to the in-memory state
413            let mut state = self.signer_state.lock().await;
414            state.merge(&new_state).map_err(|e| {
415                Status::new(
416                    Code::Internal,
417                    format!("Error updating internal state: {e}"),
418                )
419            })?;
420
421            // Send changes to the signer_state_store for persistence
422            self.signer_state_store
423                .lock()
424                .await
425                .write(state.clone())
426                .await
427                .map_err(|e| {
428                    Status::new(
429                        Code::Internal,
430                        format!("error persisting state changes: {}", e),
431                    )
432                })?;
433        }
434
435        if let Err(e) = self.stage.respond(req).await {
436            warn!("Suppressing error: {:?}", e);
437        }
438        Ok(Response::new(pb::Empty::default()))
439    }
440
441    type StreamIncomingStream = ReceiverStream<Result<pb::IncomingPayment, Status>>;
442
443    async fn stream_incoming(
444        &self,
445        _req: tonic::Request<pb::StreamIncomingFilter>,
446    ) -> Result<Response<Self::StreamIncomingStream>, Status> {
447        // TODO See if we can just return the broadcast::Receiver
448        // instead of pulling off broadcast and into an mpsc.
449        let (tx, rx) = mpsc::channel(1);
450        let mut bcast = self.events.subscribe();
451        tokio::spawn(async move {
452            while let Ok(p) = bcast.recv().await {
453                match p {
454                    super::Event::IncomingPayment(p) => {
455                        let _ = tx.send(Ok(p)).await;
456                    }
457                    _ => {}
458                }
459            }
460        });
461
462        return Ok(Response::new(ReceiverStream::new(rx)));
463    }
464
465    async fn configure(&self, req: tonic::Request<pb::GlConfig>) -> Result<Response<pb::Empty>, Status>  {
466        self.limit().await;
467        let gl_config = req.into_inner();
468        let rpc = self.get_rpc().await;
469
470        let res: Result<crate::responses::GetInfo, crate::rpc::Error> =
471            rpc.call("getinfo", json!({})).await;
472
473        let network = match res {
474            Ok(get_info_response) => match get_info_response.network.parse() {
475                Ok(v) => v,
476                Err(_) => Err(Status::new(
477                    Code::Unknown,
478                    format!("Failed to parse 'network' from 'getinfo' response"),
479                ))?,
480            },
481            Err(e) => {
482                return Err(Status::new(
483                        Code::Unknown,
484                        format!("Failed to retrieve a response from 'getinfo' while setting the node's configuration: {}", e),
485                    ));
486            }
487        };
488    
489        match bitcoin::Address::from_str(&gl_config.close_to_addr) {
490            Ok(address) => {
491                if address.network != network {
492                    return Err(Status::new(
493                        Code::Unknown,
494                        format!(
495                            "Network mismatch: \
496                            Expected an address for {} but received an address for {}",
497                            network,
498                            address.network
499                        ),
500                    ));
501                }
502            }
503            Err(e) => {
504                return Err(Status::new(
505                    Code::Unknown,
506                    format!("The address {} is not valid: {}", gl_config.close_to_addr, e),
507                ));
508            }
509        }
510
511        let requests: Vec<crate::context::Request> = self.ctx.snapshot().await.into_iter().map(|r| r.into()).collect();
512        let serialized_req = serde_json::to_string(&requests[0]).unwrap();
513        let datastore_res: Result<crate::cln_rpc::model::responses::DatastoreResponse, crate::rpc::Error> =
514            rpc.call("datastore", json!({
515                "key": vec![
516                    "glconf".to_string(),
517                    "request".to_string(),
518                ],
519                "string": serialized_req,
520            })).await;
521        
522        match datastore_res {
523            Ok(_) => {
524                let mut cached_gl_config = SERIALIZED_CONFIGURE_REQUEST.lock().await;
525                *cached_gl_config = Some(serialized_req);
526
527                Ok(Response::new(pb::Empty::default()))
528            }
529            Err(e) => {
530                return Err(Status::new(
531                    Code::Unknown,
532                    format!("Failed to store the raw configure request in the datastore: {}", e),
533                ))
534            }
535        }
536    }
537}
538
539use cln_grpc::pb::node_server::NodeServer;
540
541impl PluginNodeServer {
542    pub async fn run(self) -> Result<()> {
543        let addr = self.grpc_binding.parse().unwrap();
544
545        let cln_node = NodeServer::new(
546            WrappedNodeServer::new(self.clone())
547                .await
548                .context("creating NodeServer instance")?,
549        );
550
551        let router = tonic::transport::Server::builder()
552            .max_frame_size(4 * 1024 * 1024) // 4MB max request size
553            .tcp_keepalive(Some(tokio::time::Duration::from_secs(1)))
554            .tls_config(self.tls.clone())?
555            .layer(SignatureContextLayer {
556                ctx: self.ctx.clone(),
557            })
558            .add_service(RpcWaitService::new(cln_node, self.rpc_path.clone()))
559            .add_service(crate::pb::node_server::NodeServer::new(self.clone()));
560
561        router
562            .serve(addr)
563            .await
564            .context("grpc interface exited with error")
565    }
566
567    /// Reconnect all peers with whom we have a channel or previously
568    /// connected explicitly to.
569    pub async fn reconnect_peers(&self) -> Result<(), Error> {
570        if SIGNER_COUNT.load(Ordering::SeqCst) < 1 {
571            use anyhow::anyhow;
572            return Err(anyhow!(
573                "Cannot reconnect peers, no signer to complete the handshake"
574            ));
575        }
576
577        log::info!("Reconnecting all peers (plugin)");
578        let mut rpc = cln_rpc::ClnRpc::new(self.rpc_path.clone()).await?;
579        let peers = self.get_reconnect_peers().await?;
580        log::info!(
581            "Found {} peers to reconnect: {:?} (plugin)",
582            peers.len(),
583            peers.iter().map(|p| p.id.clone())
584        );
585
586        for r in peers {
587            trace!("Calling connect: {:?} (plugin)", &r.id);
588            let res = rpc.call_typed(&r).await;
589            trace!("Connect returned: {:?} -> {:?} (plugin)", &r.id, res);
590
591            match res {
592                Ok(r) => info!("Connection to {} established: {:?} (plugin)", &r.id, r),
593                Err(e) => warn!("Could not connect to {}: {:?} (plugin)", &r.id, e),
594            }
595        }
596        return Ok(());
597    }
598
599    async fn get_reconnect_peers(
600        &self,
601    ) -> Result<Vec<cln_rpc::model::requests::ConnectRequest>, Error> {
602        let rpc_path = self.rpc_path.clone();
603        let mut rpc = cln_rpc::ClnRpc::new(rpc_path).await?;
604        let peers = rpc
605            .call_typed(&cln_rpc::model::requests::ListpeersRequest {
606                id: None,
607                level: None,
608            })
609            .await?;
610
611        let mut requests: Vec<cln_rpc::model::requests::ConnectRequest> = peers
612            .peers
613            .iter()
614            .filter(|&p| p.connected)
615            .map(|p| cln_rpc::model::requests::ConnectRequest {
616                id: p.id.to_string(),
617                host: None,
618                port: None,
619            })
620            .collect();
621
622        let mut dspeers: Vec<cln_rpc::model::requests::ConnectRequest> = rpc
623            .call_typed(&cln_rpc::model::requests::ListdatastoreRequest {
624                key: Some(vec!["greenlight".to_string(), "peerlist".to_string()]),
625            })
626            .await?
627            .datastore
628            .iter()
629            .map(|x| {
630                // We need to replace unnecessary escape characters that
631                // have been added by the datastore, as serde is a bit
632                // picky on that.
633                let mut s = x.string.clone().unwrap();
634                s = s.replace('\\', "");
635                serde_json::from_str::<messages::Peer>(&s).unwrap()
636            })
637            .map(|x| cln_rpc::model::requests::ConnectRequest {
638                id: x.id,
639                host: Some(x.addr),
640                port: None,
641            })
642            .collect();
643
644        // Merge the two peer lists;
645        requests.append(&mut dspeers);
646        requests.sort_by(|a, b| a.id.cmp(&b.id));
647        requests.dedup_by(|a, b| a.id.eq(&b.id));
648
649        Ok(requests)
650    }
651}
652
653use tower::{Layer, Service};
654
655#[derive(Debug, Clone)]
656pub struct SignatureContextLayer {
657    ctx: crate::context::Context,
658}
659
660impl SignatureContextLayer {
661    pub fn new(context: crate::context::Context) -> Self {
662        SignatureContextLayer { ctx: context }
663    }
664}
665
666impl<S> Layer<S> for SignatureContextLayer {
667    type Service = SignatureContextService<S>;
668
669    fn layer(&self, service: S) -> Self::Service {
670        SignatureContextService {
671            inner: service,
672            ctx: self.ctx.clone(),
673        }
674    }
675}
676
677// Is the maximum message size we allow to buffer up on requests.
678const MAX_MESSAGE_SIZE: usize = 4000000;
679
680#[derive(Debug, Clone)]
681pub struct SignatureContextService<S> {
682    inner: S,
683    ctx: crate::context::Context,
684}
685
686impl<S> Service<hyper::Request<hyper::Body>> for SignatureContextService<S>
687where
688    S: Service<hyper::Request<hyper::Body>, Response = hyper::Response<tonic::body::BoxBody>>
689        + Clone
690        + Send
691        + 'static,
692    S::Future: Send + 'static,
693    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
694{
695    type Response = S::Response;
696    type Error = Box<dyn std::error::Error + Send + Sync>;
697    type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
698
699    fn poll_ready(
700        &mut self,
701        cx: &mut std::task::Context<'_>,
702    ) -> std::task::Poll<Result<(), Self::Error>> {
703        self.inner.poll_ready(cx).map_err(Into::into)
704    }
705
706    fn call(&mut self, request: hyper::Request<hyper::Body>) -> Self::Future {
707        // This is necessary because tonic internally uses `tower::buffer::Buffer`.
708        // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
709        // for details on why this is necessary
710        let clone = self.inner.clone();
711        let mut inner = std::mem::replace(&mut self.inner, clone);
712        let reqctx = self.ctx.clone();
713
714        Box::pin(async move {
715            use tonic::codegen::Body;
716            let (parts, mut body) = request.into_parts();
717
718            let uri = parts.uri.path_and_query().unwrap();
719            let _ = RPC_BCAST
720                .clone()
721                .send(super::Event::RpcCall(uri.to_string()));
722
723            let pubkey = parts
724                .headers
725                .get("glauthpubkey")
726                .and_then(|k| general_purpose::STANDARD_NO_PAD.decode(k).ok());
727
728            let sig = parts
729                .headers
730                .get("glauthsig")
731                .and_then(|k| general_purpose::STANDARD_NO_PAD.decode(k).ok());
732
733            use bytes::Buf;
734            let timestamp: Option<u64> = parts
735                .headers
736                .get("glts")
737                .and_then(|k| general_purpose::STANDARD_NO_PAD.decode(k).ok())
738                .map(|s| s.as_slice().get_u64());
739
740            let rune = parts
741                .headers
742                .get("glrune")
743                .and_then(|k| general_purpose::URL_SAFE.decode(k).ok());
744
745            if let (Some(pk), Some(sig), Some(rune)) = (pubkey, sig, rune) {
746                // Now that we know we'll be adding this to the
747                // context we can start buffering the request.
748                let mut buf = Vec::new();
749                while let Some(chunk) = body.data().await {
750                    let chunk = chunk.unwrap();
751                    // We check on the MAX_MESSAGE_SIZE to avoid an unlimited sized
752                    // message buffer
753                    if buf.len() + chunk.len() > MAX_MESSAGE_SIZE {
754                        debug!("Message {} exceeds size limit", uri.path().to_string());
755                        return Ok(tonic::Status::new(
756                            tonic::Code::InvalidArgument,
757                            format!("payload too large"),
758                        )
759                        .to_http());
760                    }
761                    buf.put(chunk);
762                }
763
764                trace!(
765                    "Got a request for {} with pubkey={}, sig={}, rune={} and body size={:?}",
766                    uri,
767                    hex::encode(&pk),
768                    hex::encode(&sig),
769                    hex::encode(&rune),
770                    &buf.len(),
771                );
772                let req = crate::context::Request::new(
773                    uri.to_string(),
774                    <bytes::Bytes>::from(buf.clone()),
775                    pk,
776                    sig,
777                    timestamp,
778                    rune,
779                );
780
781                reqctx.add_request(req.clone()).await;
782
783                let body: hyper::Body = buf.into();
784                let request = hyper::Request::from_parts(parts, body);
785                let res = inner.call(request).await;
786
787                // Defer cleanup into a separate task, otherwise we'd
788                // need `res` to be `Send` which we can't
789                // guarantee. This is needed since adding an await
790                // point splits the state machine at that point.
791                tokio::spawn(async move {
792                    reqctx.remove_request(req).await;
793                });
794                res.map_err(Into::into)
795            } else {
796                // No point in buffering the request, we're not going
797                // to add it to the `HsmRequestContext`
798                let request = hyper::Request::from_parts(parts, body);
799                inner.call(request).await.map_err(Into::into)
800            }
801        })
802    }
803}
804
805mod rpcwait;
806pub use rpcwait::RpcWaitService;