gl_plugin/node/
mod.rs

1use crate::config::Config;
2use crate::pb::{self, node_server::Node};
3use crate::rpc::LightningClient;
4use crate::storage::StateStore;
5use crate::{messages, Event};
6use crate::{stager, tramp};
7use anyhow::{Context, Error, Result};
8use base64::{engine::general_purpose, Engine as _};
9use bytes::BufMut;
10use gl_client::bitcoin::hashes::hex::ToHex;
11use gl_client::persist::State;
12use governor::{
13    clock::MonotonicClock, state::direct::NotKeyed, state::InMemoryState, Quota, RateLimiter,
14};
15use lazy_static::lazy_static;
16use log::{debug, error, info, trace, warn};
17use serde_json::json;
18use std::path::PathBuf;
19use std::sync::atomic::AtomicBool;
20use std::sync::{
21    atomic::{AtomicUsize, Ordering},
22    Arc,
23};
24use tokio::sync::{broadcast, mpsc, Mutex, OnceCell};
25use tokio_stream::wrappers::ReceiverStream;
26use tonic::{transport::ServerTlsConfig, Code, Request, Response, Status};
27mod wrapper;
28use gl_client::bitcoin;
29use std::str::FromStr;
30pub use wrapper::WrappedNodeServer;
31
32static LIMITER: OnceCell<RateLimiter<NotKeyed, InMemoryState, MonotonicClock>> =
33    OnceCell::const_new();
34
35lazy_static! {
36    static ref HSM_ID_COUNT: AtomicUsize = AtomicUsize::new(0);
37
38    /// The number of signers that are currently connected (best guess
39    /// due to races). Allows us to determine whether we should
40    /// initiate operations that might require signatures.
41    static ref SIGNER_COUNT: AtomicUsize = AtomicUsize::new(0);
42    static ref RPC_BCAST: broadcast::Sender<super::Event> = broadcast::channel(4).0;
43
44    static ref SERIALIZED_CONFIGURE_REQUEST: Mutex<Option<String>> = Mutex::new(None);
45
46    static ref RPC_READY: AtomicBool = AtomicBool::new(false);
47}
48
49/// The PluginNodeServer is the interface that is exposed to client devices
50/// and is in charge of coordinating the various user-controlled
51/// entities. This includes dispatching incoming RPC calls to the JSON-RPC
52/// interface, as well as staging requests from the HSM so that they can be
53/// streamed and replied to by devices that have access to the signing keys.
54#[derive(Clone)]
55pub struct PluginNodeServer {
56    pub tls: ServerTlsConfig,
57    pub stage: Arc<stager::Stage>,
58    pub rpc: Arc<Mutex<LightningClient>>,
59    rpc_path: PathBuf,
60    events: tokio::sync::broadcast::Sender<super::Event>,
61    signer_state: Arc<Mutex<State>>,
62    grpc_binding: String,
63    signer_state_store: Arc<Mutex<Box<dyn StateStore>>>,
64    pub ctx: crate::context::Context,
65}
66
67impl PluginNodeServer {
68    pub async fn new(
69        stage: Arc<stager::Stage>,
70        config: Config,
71        events: tokio::sync::broadcast::Sender<super::Event>,
72        signer_state_store: Box<dyn StateStore>,
73    ) -> Result<Self, Error> {
74        let tls = ServerTlsConfig::new()
75            .identity(config.identity.id)
76            .client_ca_root(config.identity.ca);
77
78        let mut rpc_path = std::env::current_dir().unwrap();
79        rpc_path.push("lightning-rpc");
80        info!("Connecting to lightning-rpc at {:?}", rpc_path);
81
82        let rpc = Arc::new(Mutex::new(LightningClient::new(rpc_path.clone())));
83
84        // Bridge the RPC_BCAST into the events queue
85        let tx = events.clone();
86        tokio::spawn(async move {
87            let mut rx = RPC_BCAST.subscribe();
88            loop {
89                if let Ok(e) = rx.recv().await {
90                    let _ = tx.send(e);
91                }
92            }
93        });
94
95        let signer_state = signer_state_store.read().await?;
96
97        let ctx = crate::context::Context::new();
98
99        let rrpc = rpc.clone();
100
101        let s = PluginNodeServer {
102            ctx,
103            tls,
104            rpc,
105            stage,
106            events,
107            rpc_path,
108            signer_state: Arc::new(Mutex::new(signer_state)),
109            signer_state_store: Arc::new(Mutex::new(signer_state_store)),
110            grpc_binding: config.node_grpc_binding,
111        };
112
113        tokio::spawn(async move {
114            debug!("Locking grpc interface until the JSON-RPC interface becomes available.");
115            use tokio::time::{sleep, Duration};
116
117            // Move the lock into the closure so we can release it later.
118            let rpc = rrpc.lock().await;
119            loop {
120                let res: Result<crate::responses::GetInfo, crate::rpc::Error> =
121                    rpc.call("getinfo", json!({})).await;
122                match res {
123                    Ok(_) => break,
124                    Err(e) => {
125                        warn!(
126                            "JSON-RPC interface not yet available. Delaying 50ms. {:?}",
127                            e
128                        );
129                        sleep(Duration::from_millis(50)).await;
130                    }
131                }
132            }
133
134            // Signal that the RPC is ready now.
135            RPC_READY.store(true, Ordering::SeqCst);
136
137            let list_datastore_req = cln_rpc::model::requests::ListdatastoreRequest {
138                key: Some(vec!["glconf".to_string(), "request".to_string()]),
139            };
140
141            let res: Result<cln_rpc::model::responses::ListdatastoreResponse, crate::rpc::Error> =
142                rpc.call("listdatastore", list_datastore_req).await;
143
144            match res {
145                Ok(list_datastore_res) => {
146                    if list_datastore_res.datastore.len() > 0 {
147                        let serialized_configure_request =
148                            list_datastore_res.datastore[0].string.clone();
149                        match serialized_configure_request {
150                            Some(serialized_configure_request) => {
151                                let mut cached_serialized_configure_request =
152                                    SERIALIZED_CONFIGURE_REQUEST.lock().await;
153                                *cached_serialized_configure_request =
154                                    Some(serialized_configure_request);
155                            }
156                            None => {}
157                        }
158                    }
159                }
160                Err(_) => {}
161            }
162
163            drop(rpc);
164        });
165
166        Ok(s)
167    }
168
169    // Wait for the limiter to allow a new RPC call
170    pub async fn limit(&self) {
171        let limiter = LIMITER
172            .get_or_init(|| async {
173                let quota = Quota::per_minute(core::num::NonZeroU32::new(300).unwrap());
174                RateLimiter::direct_with_clock(quota, &MonotonicClock::default())
175            })
176            .await;
177
178        limiter.until_ready().await
179    }
180
181    pub async fn get_rpc(&self) -> LightningClient {
182        let rpc = self.rpc.lock().await;
183        let r = rpc.clone();
184        drop(rpc);
185        r
186    }
187}
188
189#[tonic::async_trait]
190impl Node for PluginNodeServer {
191    type StreamCustommsgStream = ReceiverStream<Result<pb::Custommsg, Status>>;
192    type StreamHsmRequestsStream = ReceiverStream<Result<pb::HsmRequest, Status>>;
193    type StreamLogStream = ReceiverStream<Result<pb::LogEntry, Status>>;
194
195    async fn stream_custommsg(
196        &self,
197        _: Request<pb::StreamCustommsgRequest>,
198    ) -> Result<Response<Self::StreamCustommsgStream>, Status> {
199        log::debug!("Added a new listener for custommsg");
200        let (tx, rx) = mpsc::channel(1);
201        let mut stream = self.events.subscribe();
202        // TODO: We can do better by returning the broadcast receiver
203        // directly. Well really we should be filtering the events by
204        // type, so maybe a `.map()` on the stream can work?
205        tokio::spawn(async move {
206            while let Ok(msg) = stream.recv().await {
207                if let Event::CustomMsg(m) = msg {
208                    log::trace!("Forwarding custommsg {:?} to listener", m);
209                    if let Err(e) = tx.send(Ok(m)).await {
210                        log::warn!("Unable to send custmmsg to listener: {:?}", e);
211                        break;
212                    }
213                }
214            }
215            panic!("stream.recv loop exited...");
216        });
217        return Ok(Response::new(ReceiverStream::new(rx)));
218    }
219
220    async fn stream_log(
221        &self,
222        _: Request<pb::StreamLogRequest>,
223    ) -> Result<Response<Self::StreamLogStream>, Status> {
224        match async {
225            let (tx, rx) = mpsc::channel(1);
226            let mut lines = linemux::MuxedLines::new()?;
227            lines.add_file("/tmp/log").await?;
228
229            // TODO: Yes, this may produce duplicate lines, when new
230            // log entries are produced while we're streaming the
231            // backlog out, but do we care?
232            use tokio::io::{AsyncBufReadExt, BufReader};
233            let file = tokio::fs::File::open("/tmp/log").await?;
234            let mut file = BufReader::new(file).lines();
235
236            tokio::spawn(async move {
237                match async {
238                    while let Some(line) = file.next_line().await? {
239                        tx.send(Ok(pb::LogEntry {
240                            line: line.trim().to_owned(),
241                        }))
242                        .await?
243                    }
244
245                    while let Ok(Some(line)) = lines.next_line().await {
246                        tx.send(Ok(pb::LogEntry {
247                            line: line.line().trim().to_string(),
248                        }))
249                        .await?;
250                    }
251                    Ok(())
252                }
253                .await as Result<(), anyhow::Error>
254                {
255                    Ok(()) => {}
256                    Err(e) => {
257                        warn!("error streaming logs to client: {}", e);
258                    }
259                }
260            });
261            Ok(ReceiverStream::new(rx))
262        }
263        .await as Result<Self::StreamLogStream, anyhow::Error>
264        {
265            Ok(v) => Ok(Response::new(v)),
266            Err(e) => Err(Status::new(Code::Internal, e.to_string())),
267        }
268    }
269
270    async fn stream_hsm_requests(
271        &self,
272        _request: Request<pb::Empty>,
273    ) -> Result<Response<Self::StreamHsmRequestsStream>, Status> {
274        let hsm_id = HSM_ID_COUNT.fetch_add(1, Ordering::SeqCst);
275        SIGNER_COUNT.fetch_add(1, Ordering::SeqCst);
276        info!(
277            "New signer with hsm_id={} attached, streaming requests",
278            hsm_id
279        );
280
281        let (tx, rx) = mpsc::channel(10);
282        let mut stream = self.stage.mystream().await;
283        let signer_state = self.signer_state.clone();
284        let ctx = self.ctx.clone();
285
286        tokio::spawn(async move {
287            trace!("hsmd hsm_id={} request processor started", hsm_id);
288
289            {
290                // We start by immediately injecting a
291                // vls_protocol::Message::GetHeartbeat. This serves two
292                // purposes: already send the initial snapshot of the
293                // signer state to the signer as early as possible, and
294                // triggering a pruning on the signer, if enabled. In
295                // incremental mode this ensures that any subsequent,
296                // presumably time-critical messages, do not have to carry
297                // the large state with them.
298
299                let state = signer_state.lock().await.clone();
300                let state: Vec<gl_client::pb::SignerStateEntry> = state.into();
301                let state: Vec<pb::SignerStateEntry> = state
302                    .into_iter()
303                    .map(|s| pb::SignerStateEntry {
304                        key: s.key,
305                        version: s.version,
306                        value: s.value,
307                    })
308                    .collect();
309
310                let msg = vls_protocol::msgs::GetHeartbeat {};
311                use vls_protocol::msgs::SerBolt;
312                let req = crate::pb::HsmRequest {
313                    // Notice that the request_counter starts at 1000, to
314                    // avoid collisions.
315                    request_id: 0,
316                    signer_state: state,
317                    raw: msg.as_vec(),
318                    requests: vec![], // No pending requests yet, nothing to authorize.
319                    context: None,
320                };
321
322                if let Err(e) = tx.send(Ok(req)).await {
323                    log::warn!("Failed to send heartbeat message to signer: {}", e);
324                }
325            }
326
327            loop {
328                let mut req = match stream.next().await {
329                    Err(e) => {
330                        error!(
331                            "Could not get next request from stage: {:?} for hsm_id={}",
332                            e, hsm_id
333                        );
334                        break;
335                    }
336                    Ok(r) => r,
337                };
338                trace!(
339                    "Sending request={} to hsm_id={}",
340                    req.request.request_id,
341                    hsm_id
342                );
343
344                let state = signer_state.lock().await.clone();
345                let state: Vec<gl_client::pb::SignerStateEntry> = state.into();
346
347                // TODO Consolidate protos in `gl-client` and `gl-plugin`, then remove this map.
348                let state: Vec<pb::SignerStateEntry> = state
349                    .into_iter()
350                    .map(|s| pb::SignerStateEntry {
351                        key: s.key,
352                        version: s.version,
353                        value: s.value,
354                    })
355                    .collect();
356
357                req.request.signer_state = state.into();
358                req.request.requests = ctx.snapshot().await.into_iter().map(|r| r.into()).collect();
359
360                let serialized_configure_request = SERIALIZED_CONFIGURE_REQUEST.lock().await;
361
362                match &(*serialized_configure_request) {
363                    Some(serialized_configure_request) => {
364                        let configure_request = serde_json::from_str::<crate::context::Request>(
365                            serialized_configure_request,
366                        )
367                        .unwrap();
368                        req.request.requests.push(configure_request.into());
369                    }
370                    None => {}
371                }
372
373                debug!(
374                    "Sending signer requests with {} requests and {} state entries",
375                    req.request.requests.len(),
376                    req.request.signer_state.len()
377                );
378
379                if let Err(e) = tx.send(Ok(req.request)).await {
380                    warn!("Error streaming request {:?} to hsm_id={}", e, hsm_id);
381                    break;
382                }
383            }
384            info!("Signer hsm_id={} exited", hsm_id);
385            SIGNER_COUNT.fetch_sub(1, Ordering::SeqCst);
386        });
387
388        trace!("Returning stream_hsm_request channel");
389        Ok(Response::new(ReceiverStream::new(rx)))
390    }
391
392    async fn respond_hsm_request(
393        &self,
394        request: Request<pb::HsmResponse>,
395    ) -> Result<Response<pb::Empty>, Status> {
396        let req = request.into_inner();
397
398        if req.error != "" {
399            log::error!("Signer reports an error: {}", req.error);
400            log::warn!("The above error was returned instead of a response.");
401            return Ok(Response::new(pb::Empty::default()));
402        }
403
404        // Create a state from the key-value-version tuples. Need to
405        // convert here, since `pb` is duplicated in the two different
406        // crates.
407        let signer_state: Vec<gl_client::pb::SignerStateEntry> = req
408            .signer_state
409            .iter()
410            .map(|i| gl_client::pb::SignerStateEntry {
411                key: i.key.to_owned(),
412                value: i.value.to_owned(),
413                version: i.version,
414            })
415            .collect();
416        let new_state: gl_client::persist::State = signer_state.into();
417
418        // Apply state changes to the in-memory state
419        let mut state = self.signer_state.lock().await;
420        state.merge(&new_state).map_err(|e| {
421            Status::new(
422                Code::Internal,
423                format!("Error updating internal state: {e}"),
424            )
425        })?;
426
427        // Send changes to the signer_state_store for persistence
428        let store = self.signer_state_store.lock().await;
429        if let Err(e) = store.write(state.clone()).await {
430            log::warn!(
431                "The returned state could not be stored. Ignoring response for request_id={}, error={:?}",
432                req.request_id, e
433            );
434            /* Exit here so we don't end up committing the changes
435             * to CLN, but not to the state store. That'd cause
436             * drifts in states that are very hard to debug, and
437             * harder to correct. */
438            return Ok(Response::new(pb::Empty::default()));
439        }
440
441        if let Err(e) = self.stage.respond(req).await {
442            warn!("Suppressing error: {:?}", e);
443        }
444        Ok(Response::new(pb::Empty::default()))
445    }
446
447    type StreamIncomingStream = ReceiverStream<Result<pb::IncomingPayment, Status>>;
448
449    async fn stream_incoming(
450        &self,
451        _req: tonic::Request<pb::StreamIncomingFilter>,
452    ) -> Result<Response<Self::StreamIncomingStream>, Status> {
453        // TODO See if we can just return the broadcast::Receiver
454        // instead of pulling off broadcast and into an mpsc.
455        let (tx, rx) = mpsc::channel(1);
456        let mut bcast = self.events.subscribe();
457        tokio::spawn(async move {
458            while let Ok(p) = bcast.recv().await {
459                match p {
460                    super::Event::IncomingPayment(p) => {
461                        let _ = tx.send(Ok(p)).await;
462                    }
463                    _ => {}
464                }
465            }
466        });
467
468        return Ok(Response::new(ReceiverStream::new(rx)));
469    }
470
471    async fn configure(
472        &self,
473        req: tonic::Request<pb::GlConfig>,
474    ) -> Result<Response<pb::Empty>, Status> {
475        self.limit().await;
476        let gl_config = req.into_inner();
477        let rpc = self.get_rpc().await;
478
479        let res: Result<crate::responses::GetInfo, crate::rpc::Error> =
480            rpc.call("getinfo", json!({})).await;
481
482        let network = match res {
483            Ok(get_info_response) => match get_info_response.network.parse() {
484                Ok(v) => v,
485                Err(_) => Err(Status::new(
486                    Code::Unknown,
487                    format!("Failed to parse 'network' from 'getinfo' response"),
488                ))?,
489            },
490            Err(e) => {
491                return Err(Status::new(
492                        Code::Unknown,
493                        format!("Failed to retrieve a response from 'getinfo' while setting the node's configuration: {}", e),
494                    ));
495            }
496        };
497
498        match bitcoin::Address::from_str(&gl_config.close_to_addr) {
499            Ok(address) => {
500                if address.network != network {
501                    return Err(Status::new(
502                        Code::Unknown,
503                        format!(
504                            "Network mismatch: \
505                            Expected an address for {} but received an address for {}",
506                            network, address.network
507                        ),
508                    ));
509                }
510            }
511            Err(e) => {
512                return Err(Status::new(
513                    Code::Unknown,
514                    format!(
515                        "The address {} is not valid: {}",
516                        gl_config.close_to_addr, e
517                    ),
518                ));
519            }
520        }
521
522        let requests: Vec<crate::context::Request> = self
523            .ctx
524            .snapshot()
525            .await
526            .into_iter()
527            .map(|r| r.into())
528            .collect();
529        let serialized_req = serde_json::to_string(&requests[0]).unwrap();
530        let datastore_res: Result<
531            crate::cln_rpc::model::responses::DatastoreResponse,
532            crate::rpc::Error,
533        > = rpc
534            .call(
535                "datastore",
536                json!({
537                    "key": vec![
538                        "glconf".to_string(),
539                        "request".to_string(),
540                    ],
541                    "string": serialized_req,
542                }),
543            )
544            .await;
545
546        match datastore_res {
547            Ok(_) => {
548                let mut cached_gl_config = SERIALIZED_CONFIGURE_REQUEST.lock().await;
549                *cached_gl_config = Some(serialized_req);
550
551                Ok(Response::new(pb::Empty::default()))
552            }
553            Err(e) => {
554                return Err(Status::new(
555                    Code::Unknown,
556                    format!(
557                        "Failed to store the raw configure request in the datastore: {}",
558                        e
559                    ),
560                ))
561            }
562        }
563    }
564
565    async fn trampoline_pay(
566        &self,
567        r: tonic::Request<pb::TrampolinePayRequest>,
568    ) -> Result<tonic::Response<pb::TrampolinePayResponse>, Status> {
569        match tramp::trampolinepay(r.into_inner(), self.rpc_path.clone())
570            .await
571            .map(|res| {
572                <cln_rpc::model::responses::PayResponse as Into<cln_grpc::pb::PayResponse>>::into(
573                    res,
574                )
575            }) {
576            Ok(res) => Ok(tonic::Response::new(pb::TrampolinePayResponse {
577                payment_preimage: res.payment_preimage,
578                payment_hash: res.payment_hash,
579                created_at: res.created_at,
580                parts: res.parts,
581                amount_msat: res.amount_msat.unwrap_or_default().msat,
582                amount_sent_msat: res.amount_sent_msat.unwrap_or_default().msat,
583                destination: res.destination.unwrap_or_default(),
584            })),
585            Err(e) => Err(Status::new(Code::Unknown, e.to_string())),
586        }
587    }
588}
589
590use cln_grpc::pb::node_server::NodeServer;
591
592impl PluginNodeServer {
593    pub async fn run(self) -> Result<()> {
594        let addr = self.grpc_binding.parse().unwrap();
595
596        let cln_node = NodeServer::new(
597            WrappedNodeServer::new(self.clone())
598                .await
599                .context("creating NodeServer instance")?,
600        );
601
602        let router = tonic::transport::Server::builder()
603            .max_frame_size(4 * 1024 * 1024) // 4MB max request size
604            .tcp_keepalive(Some(tokio::time::Duration::from_secs(1)))
605            .tls_config(self.tls.clone())?
606            .layer(SignatureContextLayer {
607                ctx: self.ctx.clone(),
608            })
609            .add_service(RpcWaitService::new(cln_node, self.rpc_path.clone()))
610            .add_service(crate::pb::node_server::NodeServer::new(self.clone()));
611
612        router
613            .serve(addr)
614            .await
615            .context("grpc interface exited with error")
616    }
617
618    /// Reconnect all peers with whom we have a channel or previously
619    /// connected explicitly to.
620    pub async fn reconnect_peers(&self) -> Result<(), Error> {
621        if SIGNER_COUNT.load(Ordering::SeqCst) < 1 {
622            use anyhow::anyhow;
623            return Err(anyhow!(
624                "Cannot reconnect peers, no signer to complete the handshake"
625            ));
626        }
627
628        log::info!("Reconnecting all peers (plugin)");
629        let mut rpc = cln_rpc::ClnRpc::new(self.rpc_path.clone()).await?;
630        let peers = self.get_reconnect_peers().await?;
631        log::info!(
632            "Found {} peers to reconnect: {:?} (plugin)",
633            peers.len(),
634            peers.iter().map(|p| p.id.clone())
635        );
636
637        for r in peers {
638            trace!("Calling connect: {:?} (plugin)", &r.id);
639            let res = rpc.call_typed(&r).await;
640            trace!("Connect returned: {:?} -> {:?} (plugin)", &r.id, res);
641
642            match res {
643                Ok(r) => info!("Connection to {} established: {:?} (plugin)", &r.id, r),
644                Err(e) => warn!("Could not connect to {}: {:?} (plugin)", &r.id, e),
645            }
646        }
647        return Ok(());
648    }
649
650    async fn get_reconnect_peers(
651        &self,
652    ) -> Result<Vec<cln_rpc::model::requests::ConnectRequest>, Error> {
653        let rpc_path = self.rpc_path.clone();
654        let mut rpc = cln_rpc::ClnRpc::new(rpc_path).await?;
655        let peers = rpc
656            .call_typed(&cln_rpc::model::requests::ListpeersRequest {
657                id: None,
658                level: None,
659            })
660            .await?;
661
662        let mut requests: Vec<cln_rpc::model::requests::ConnectRequest> = peers
663            .peers
664            .iter()
665            .filter(|&p| p.connected)
666            .map(|p| cln_rpc::model::requests::ConnectRequest {
667                id: p.id.to_string(),
668                host: None,
669                port: None,
670            })
671            .collect();
672
673        let mut dspeers: Vec<cln_rpc::model::requests::ConnectRequest> = rpc
674            .call_typed(&cln_rpc::model::requests::ListdatastoreRequest {
675                key: Some(vec!["greenlight".to_string(), "peerlist".to_string()]),
676            })
677            .await?
678            .datastore
679            .iter()
680            .map(|x| {
681                // We need to replace unnecessary escape characters that
682                // have been added by the datastore, as serde is a bit
683                // picky on that.
684                let mut s = x.string.clone().unwrap();
685                s = s.replace('\\', "");
686                serde_json::from_str::<messages::Peer>(&s).unwrap()
687            })
688            .map(|x| cln_rpc::model::requests::ConnectRequest {
689                id: x.id,
690                host: Some(x.addr),
691                port: None,
692            })
693            .collect();
694
695        // Merge the two peer lists;
696        requests.append(&mut dspeers);
697        requests.sort_by(|a, b| a.id.cmp(&b.id));
698        requests.dedup_by(|a, b| a.id.eq(&b.id));
699
700        Ok(requests)
701    }
702}
703
704use tower::{Layer, Service};
705
706#[derive(Debug, Clone)]
707pub struct SignatureContextLayer {
708    ctx: crate::context::Context,
709}
710
711impl SignatureContextLayer {
712    pub fn new(context: crate::context::Context) -> Self {
713        SignatureContextLayer { ctx: context }
714    }
715}
716
717impl<S> Layer<S> for SignatureContextLayer {
718    type Service = SignatureContextService<S>;
719
720    fn layer(&self, service: S) -> Self::Service {
721        SignatureContextService {
722            inner: service,
723            ctx: self.ctx.clone(),
724        }
725    }
726}
727
728// Is the maximum message size we allow to buffer up on requests.
729const MAX_MESSAGE_SIZE: usize = 4000000;
730
731#[derive(Debug, Clone)]
732pub struct SignatureContextService<S> {
733    inner: S,
734    ctx: crate::context::Context,
735}
736
737impl<S> Service<hyper::Request<hyper::Body>> for SignatureContextService<S>
738where
739    S: Service<hyper::Request<hyper::Body>, Response = hyper::Response<tonic::body::BoxBody>>
740        + Clone
741        + Send
742        + 'static,
743    S::Future: Send + 'static,
744    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
745{
746    type Response = S::Response;
747    type Error = Box<dyn std::error::Error + Send + Sync>;
748    type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
749
750    fn poll_ready(
751        &mut self,
752        cx: &mut std::task::Context<'_>,
753    ) -> std::task::Poll<Result<(), Self::Error>> {
754        self.inner.poll_ready(cx).map_err(Into::into)
755    }
756
757    fn call(&mut self, request: hyper::Request<hyper::Body>) -> Self::Future {
758        // This is necessary because tonic internally uses `tower::buffer::Buffer`.
759        // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
760        // for details on why this is necessary
761        let clone = self.inner.clone();
762        let mut inner = std::mem::replace(&mut self.inner, clone);
763        let reqctx = self.ctx.clone();
764
765        Box::pin(async move {
766            use tonic::codegen::Body;
767            let (parts, mut body) = request.into_parts();
768
769            let uri = parts.uri.path_and_query().unwrap();
770            let _ = RPC_BCAST
771                .clone()
772                .send(super::Event::RpcCall(uri.to_string()));
773
774            let pubkey = parts
775                .headers
776                .get("glauthpubkey")
777                .and_then(|k| general_purpose::STANDARD_NO_PAD.decode(k).ok());
778
779            let sig = parts
780                .headers
781                .get("glauthsig")
782                .and_then(|k| general_purpose::STANDARD_NO_PAD.decode(k).ok());
783
784            use bytes::Buf;
785            let timestamp: Option<u64> = parts
786                .headers
787                .get("glts")
788                .and_then(|k| general_purpose::STANDARD_NO_PAD.decode(k).ok())
789                .map(|s| s.as_slice().get_u64());
790
791            let rune = parts
792                .headers
793                .get("glrune")
794                .and_then(|k| general_purpose::URL_SAFE.decode(k).ok());
795
796            if let (Some(pk), Some(sig), Some(rune)) = (pubkey, sig, rune) {
797                // Now that we know we'll be adding this to the
798                // context we can start buffering the request.
799                let mut buf = Vec::new();
800                while let Some(chunk) = body.data().await {
801                    let chunk = chunk.unwrap();
802                    // We check on the MAX_MESSAGE_SIZE to avoid an unlimited sized
803                    // message buffer
804                    if buf.len() + chunk.len() > MAX_MESSAGE_SIZE {
805                        debug!("Message {} exceeds size limit", uri.path().to_string());
806                        return Ok(tonic::Status::new(
807                            tonic::Code::InvalidArgument,
808                            format!("payload too large"),
809                        )
810                        .to_http());
811                    }
812                    buf.put(chunk);
813                }
814
815                trace!(
816                    "Got a request for {} with pubkey={}, sig={}, rune={} and body size={:?}",
817                    uri,
818                    hex::encode(&pk),
819                    hex::encode(&sig),
820                    hex::encode(&rune),
821                    &buf.len(),
822                );
823                let req = crate::context::Request::new(
824                    uri.to_string(),
825                    <bytes::Bytes>::from(buf.clone()),
826                    pk,
827                    sig,
828                    timestamp,
829                    rune,
830                );
831
832                reqctx.add_request(req.clone()).await;
833
834                let body: hyper::Body = buf.into();
835                let request = hyper::Request::from_parts(parts, body);
836                let res = inner.call(request).await;
837
838                // Defer cleanup into a separate task, otherwise we'd
839                // need `res` to be `Send` which we can't
840                // guarantee. This is needed since adding an await
841                // point splits the state machine at that point.
842                tokio::spawn(async move {
843                    reqctx.remove_request(req).await;
844                });
845                res.map_err(Into::into)
846            } else {
847                // No point in buffering the request, we're not going
848                // to add it to the `HsmRequestContext`
849                let request = hyper::Request::from_parts(parts, body);
850                inner.call(request).await.map_err(Into::into)
851            }
852        })
853    }
854}
855
856mod rpcwait;
857pub use rpcwait::RpcWaitService;