bs_gl_plugin/node/
wrapper.rs

1use crate::LightningClient;
2use anyhow::Error;
3use cln_grpc::pb::{self, node_server::Node};
4use log::debug;
5use tonic::{Request, Response, Status};
6
7use super::PluginNodeServer;
8
9/// `WrappedNodeServer` enables us to quickly add customizations to
10/// the pure passthru of the `cln_grpc::Server`. In particular it
11/// implements the guarding against RPC commands that'd require a
12/// signature if no HSM is attached (that'd lock up our node) and
13/// providing RouteHints for disconnected and zeroconf channels too.
14#[derive(Clone)]
15pub struct WrappedNodeServer {
16    inner: cln_grpc::Server,
17    node_server: PluginNodeServer,
18}
19
20// TODO Make node into a module and add the WrappedNodeServer as a submodule.
21impl WrappedNodeServer {
22    pub async fn new(node_server: PluginNodeServer) -> anyhow::Result<Self> {
23        let inner = cln_grpc::Server::new(&node_server.rpc_path).await?;
24        Ok(WrappedNodeServer { inner, node_server })
25    }
26}
27
28// This would be so much easier if we have some form of delegation
29// already...
30#[tonic::async_trait]
31impl Node for WrappedNodeServer {
32    async fn invoice(
33        &self,
34        req: Request<pb::InvoiceRequest>,
35    ) -> Result<Response<pb::InvoiceResponse>, Status> {
36        let req = req.into_inner();
37
38        use crate::rpc::LightningClient;
39        let mut rpc = LightningClient::new(self.node_server.rpc_path.clone());
40
41        // First we get the incoming channels so we can force them to
42        // be added to the invoice. This is best effort and will be
43        // left out if the call fails, reverting to the default
44        // behavior.
45        let hints: Option<Vec<Vec<pb::RouteHop>>> = self
46            .get_routehints(&mut rpc)
47            .await
48            .map(
49                // Map Result to Result
50                |v| {
51                    v.into_iter()
52                        .map(
53                            // map each vector element
54                            |rh| rh.hops,
55                        )
56                        .collect()
57                },
58            )
59            .ok();
60
61        let mut pbreq: crate::requests::Invoice = match req.clone().try_into() {
62            Ok(v) => v,
63            Err(e) => {
64                return Err(Status::new(
65                    tonic::Code::Internal,
66                    format!(
67                        "could not convert protobuf request into JSON-RPC request: {:?}",
68                        e.to_string()
69                    ),
70                ));
71            }
72        };
73        pbreq.dev_routes = hints.map(|v| {
74            v.into_iter()
75                .map(|e| e.into_iter().map(|ee| ee.into()).collect())
76                .collect()
77        });
78
79        pbreq.cltv = match pbreq.cltv {
80            Some(c) => Some(c), // Keep any set value
81            None => Some(144),  // Use a day if not set
82        };
83
84        let res: Result<crate::responses::Invoice, crate::rpc::Error> =
85            rpc.call("invoice", pbreq).await;
86
87        let res: Result<cln_grpc::pb::InvoiceResponse, tonic::Status> = res
88            .map(|r| cln_grpc::pb::InvoiceResponse::from(r))
89            .map_err(|e| {
90                tonic::Status::new(
91                    tonic::Code::Internal,
92                    format!("converting invoice response to grpc: {}", e),
93                )
94            });
95
96        res.map(|r| Response::new(r))
97    }
98
99    async fn getinfo(
100        &self,
101        r: Request<pb::GetinfoRequest>,
102    ) -> Result<Response<pb::GetinfoResponse>, Status> {
103        self.inner.getinfo(r).await
104    }
105
106    async fn list_offers(
107        &self,
108        r: Request<pb::ListoffersRequest>,
109    ) -> Result<Response<pb::ListoffersResponse>, Status> {
110        self.inner.list_offers(r).await
111    }
112
113    async fn offer(
114        &self,
115        r: Request<pb::OfferRequest>,
116    ) -> Result<Response<pb::OfferResponse>, Status> {
117        self.inner.offer(r).await
118    }
119
120    async fn bkpr_list_income(
121        &self,
122        r: Request<pb::BkprlistincomeRequest>,
123    ) -> Result<Response<pb::BkprlistincomeResponse>, Status> {
124        self.inner.bkpr_list_income(r).await
125    }
126
127    async fn list_peers(
128        &self,
129        r: Request<pb::ListpeersRequest>,
130    ) -> Result<Response<pb::ListpeersResponse>, Status> {
131        self.inner.list_peers(r).await
132    }
133
134    async fn list_peer_channels(
135        &self,
136        r: Request<pb::ListpeerchannelsRequest>,
137    ) -> Result<Response<pb::ListpeerchannelsResponse>, Status> {
138        self.inner.list_peer_channels(r).await
139    }
140
141    async fn list_closed_channels(
142        &self,
143        r: Request<pb::ListclosedchannelsRequest>,
144    ) -> Result<Response<pb::ListclosedchannelsResponse>, Status> {
145        self.inner.list_closed_channels(r).await
146    }
147
148    async fn list_funds(
149        &self,
150        r: Request<pb::ListfundsRequest>,
151    ) -> Result<Response<pb::ListfundsResponse>, Status> {
152        self.inner.list_funds(r).await
153    }
154
155    async fn decode_pay(
156        &self,
157        r: Request<pb::DecodepayRequest>,
158    ) -> Result<Response<pb::DecodepayResponse>, Status> {
159        self.inner.decode_pay(r).await
160    }
161
162    async fn decode(
163        &self,
164        r: Request<pb::DecodeRequest>,
165    ) -> Result<Response<pb::DecodeResponse>, Status> {
166        self.inner.decode(r).await
167    }
168
169    async fn sign_invoice(
170        &self,
171        r: Request<pb::SigninvoiceRequest>,
172    ) -> Result<Response<pb::SigninvoiceResponse>, Status> {
173        self.inner.sign_invoice(r).await
174    }
175    async fn pre_approve_keysend(
176        &self,
177        r: Request<pb::PreapprovekeysendRequest>,
178    ) -> Result<Response<pb::PreapprovekeysendResponse>, Status> {
179        self.inner.pre_approve_keysend(r).await
180    }
181
182    async fn pre_approve_invoice(
183        &self,
184        r: Request<pb::PreapproveinvoiceRequest>,
185    ) -> Result<Response<pb::PreapproveinvoiceResponse>, Status> {
186        self.inner.pre_approve_invoice(r).await
187    }
188
189    async fn send_custom_msg(
190        &self,
191        r: Request<pb::SendcustommsgRequest>,
192    ) -> Result<Response<pb::SendcustommsgResponse>, Status> {
193        self.inner.send_custom_msg(r).await
194    }
195
196    async fn send_pay(
197        &self,
198        r: Request<pb::SendpayRequest>,
199    ) -> Result<Response<pb::SendpayResponse>, Status> {
200        self.inner.send_pay(r).await
201    }
202
203    async fn list_channels(
204        &self,
205        r: Request<pb::ListchannelsRequest>,
206    ) -> Result<Response<pb::ListchannelsResponse>, Status> {
207        self.inner.list_channels(r).await
208    }
209
210    async fn add_gossip(
211        &self,
212        r: Request<pb::AddgossipRequest>,
213    ) -> Result<Response<pb::AddgossipResponse>, Status> {
214        self.inner.add_gossip(r).await
215    }
216
217    async fn auto_clean_invoice(
218        &self,
219        r: Request<pb::AutocleaninvoiceRequest>,
220    ) -> Result<Response<pb::AutocleaninvoiceResponse>, Status> {
221        self.inner.auto_clean_invoice(r).await
222    }
223
224    async fn check_message(
225        &self,
226        r: Request<pb::CheckmessageRequest>,
227    ) -> Result<Response<pb::CheckmessageResponse>, Status> {
228        self.inner.check_message(r).await
229    }
230
231    async fn close(
232        &self,
233        r: Request<pb::CloseRequest>,
234    ) -> Result<Response<pb::CloseResponse>, Status> {
235        self.inner.close(r).await
236    }
237
238    async fn connect_peer(
239        &self,
240        r: Request<pb::ConnectRequest>,
241    ) -> Result<Response<pb::ConnectResponse>, Status> {
242        self.inner.connect_peer(r).await
243    }
244
245    async fn create_invoice(
246        &self,
247        r: Request<pb::CreateinvoiceRequest>,
248    ) -> Result<Response<pb::CreateinvoiceResponse>, Status> {
249        self.inner.create_invoice(r).await
250    }
251
252    async fn datastore(
253        &self,
254        r: Request<pb::DatastoreRequest>,
255    ) -> Result<Response<pb::DatastoreResponse>, Status> {
256        self.inner.datastore(r).await
257    }
258
259    async fn create_onion(
260        &self,
261        r: Request<pb::CreateonionRequest>,
262    ) -> Result<Response<pb::CreateonionResponse>, Status> {
263        self.inner.create_onion(r).await
264    }
265
266    async fn del_datastore(
267        &self,
268        r: Request<pb::DeldatastoreRequest>,
269    ) -> Result<Response<pb::DeldatastoreResponse>, Status> {
270        self.inner.del_datastore(r).await
271    }
272
273    async fn del_expired_invoice(
274        &self,
275        r: Request<pb::DelexpiredinvoiceRequest>,
276    ) -> Result<Response<pb::DelexpiredinvoiceResponse>, Status> {
277        self.inner.del_expired_invoice(r).await
278    }
279
280    async fn del_invoice(
281        &self,
282        r: Request<pb::DelinvoiceRequest>,
283    ) -> Result<Response<pb::DelinvoiceResponse>, Status> {
284        self.inner.del_invoice(r).await
285    }
286
287    async fn list_datastore(
288        &self,
289        r: Request<pb::ListdatastoreRequest>,
290    ) -> Result<Response<pb::ListdatastoreResponse>, Status> {
291        self.inner.list_datastore(r).await
292    }
293
294    async fn list_invoices(
295        &self,
296        r: Request<pb::ListinvoicesRequest>,
297    ) -> Result<Response<pb::ListinvoicesResponse>, Status> {
298        self.inner.list_invoices(r).await
299    }
300
301    async fn send_onion(
302        &self,
303        r: Request<pb::SendonionRequest>,
304    ) -> Result<Response<pb::SendonionResponse>, Status> {
305        self.inner.send_onion(r).await
306    }
307
308    async fn list_send_pays(
309        &self,
310        r: Request<pb::ListsendpaysRequest>,
311    ) -> Result<Response<pb::ListsendpaysResponse>, Status> {
312        self.inner.list_send_pays(r).await
313    }
314
315    async fn list_transactions(
316        &self,
317        r: Request<pb::ListtransactionsRequest>,
318    ) -> Result<Response<pb::ListtransactionsResponse>, Status> {
319        self.inner.list_transactions(r).await
320    }
321
322    async fn pay(&self, r: Request<pb::PayRequest>) -> Result<Response<pb::PayResponse>, Status> {
323        self.inner.pay(r).await
324    }
325
326    async fn list_nodes(
327        &self,
328        r: Request<pb::ListnodesRequest>,
329    ) -> Result<Response<pb::ListnodesResponse>, Status> {
330        self.inner.list_nodes(r).await
331    }
332
333    async fn wait_any_invoice(
334        &self,
335        r: Request<pb::WaitanyinvoiceRequest>,
336    ) -> Result<Response<pb::WaitanyinvoiceResponse>, Status> {
337        self.inner.wait_any_invoice(r).await
338    }
339
340    async fn wait_invoice(
341        &self,
342        r: Request<pb::WaitinvoiceRequest>,
343    ) -> Result<Response<pb::WaitinvoiceResponse>, Status> {
344        self.inner.wait_invoice(r).await
345    }
346
347    async fn wait_send_pay(
348        &self,
349        r: Request<pb::WaitsendpayRequest>,
350    ) -> Result<Response<pb::WaitsendpayResponse>, Status> {
351        self.inner.wait_send_pay(r).await
352    }
353
354    async fn wait_block_height(
355        &self,
356        r: Request<pb::WaitblockheightRequest>,
357    ) -> Result<Response<pb::WaitblockheightResponse>, Status> {
358        self.inner.wait_block_height(r).await
359    }
360
361    async fn new_addr(
362        &self,
363        r: Request<pb::NewaddrRequest>,
364    ) -> Result<Response<pb::NewaddrResponse>, Status> {
365        self.inner.new_addr(r).await
366    }
367
368    async fn withdraw(
369        &self,
370        r: Request<pb::WithdrawRequest>,
371    ) -> Result<Response<pb::WithdrawResponse>, Status> {
372        self.inner.withdraw(r).await
373    }
374
375    async fn key_send(
376        &self,
377        r: Request<pb::KeysendRequest>,
378    ) -> Result<Response<pb::KeysendResponse>, Status> {
379        self.inner.key_send(r).await
380    }
381
382    async fn fund_psbt(
383        &self,
384        r: Request<pb::FundpsbtRequest>,
385    ) -> Result<Response<pb::FundpsbtResponse>, Status> {
386        self.inner.fund_psbt(r).await
387    }
388
389    async fn send_psbt(
390        &self,
391        r: Request<pb::SendpsbtRequest>,
392    ) -> Result<Response<pb::SendpsbtResponse>, Status> {
393        self.inner.send_psbt(r).await
394    }
395
396    async fn sign_psbt(
397        &self,
398        r: Request<pb::SignpsbtRequest>,
399    ) -> Result<Response<pb::SignpsbtResponse>, Status> {
400        self.inner.sign_psbt(r).await
401    }
402
403    async fn utxo_psbt(
404        &self,
405        r: Request<pb::UtxopsbtRequest>,
406    ) -> Result<Response<pb::UtxopsbtResponse>, Status> {
407        self.inner.utxo_psbt(r).await
408    }
409
410    async fn tx_discard(
411        &self,
412        r: Request<pb::TxdiscardRequest>,
413    ) -> Result<Response<pb::TxdiscardResponse>, Status> {
414        self.inner.tx_discard(r).await
415    }
416
417    async fn tx_prepare(
418        &self,
419        r: Request<pb::TxprepareRequest>,
420    ) -> Result<Response<pb::TxprepareResponse>, Status> {
421        self.inner.tx_prepare(r).await
422    }
423
424    async fn tx_send(
425        &self,
426        r: Request<pb::TxsendRequest>,
427    ) -> Result<Response<pb::TxsendResponse>, Status> {
428        self.inner.tx_send(r).await
429    }
430
431    async fn disconnect(
432        &self,
433        r: Request<pb::DisconnectRequest>,
434    ) -> Result<Response<pb::DisconnectResponse>, Status> {
435        let inner = r.into_inner();
436        let id = hex::encode(inner.id.clone());
437        debug!(
438            "Got a disconnect request for {}, try to delete it from the datastore peerlist.",
439            id
440        );
441
442        // We try to delete the peer that we disconnect from from the datastore.
443        // We don't want to be overly strict on this so we don't throw an error
444        // if this does not work.
445        let data_res = self
446            .del_datastore(Request::new(pb::DeldatastoreRequest {
447                key: vec!["greenlight".to_string(), "peerlist".to_string(), id.clone()],
448                generation: None,
449            }))
450            .await;
451        if let Err(e) = data_res {
452            log::debug!("Could not delete peer {} from datastore: {}", id, e);
453        }
454
455        self.inner.disconnect(Request::new(inner.clone())).await
456    }
457
458    async fn feerates(
459        &self,
460        r: Request<pb::FeeratesRequest>,
461    ) -> Result<Response<pb::FeeratesResponse>, Status> {
462        self.inner.feerates(r).await
463    }
464
465    async fn fund_channel(
466        &self,
467        r: Request<pb::FundchannelRequest>,
468    ) -> Result<Response<pb::FundchannelResponse>, Status> {
469        self.inner.fund_channel(r).await
470    }
471
472    async fn get_route(
473        &self,
474        r: Request<pb::GetrouteRequest>,
475    ) -> Result<Response<pb::GetrouteResponse>, Status> {
476        self.inner.get_route(r).await
477    }
478
479    async fn list_forwards(
480        &self,
481        r: Request<pb::ListforwardsRequest>,
482    ) -> Result<Response<pb::ListforwardsResponse>, Status> {
483        self.inner.list_forwards(r).await
484    }
485
486    async fn list_pays(
487        &self,
488        r: Request<pb::ListpaysRequest>,
489    ) -> Result<Response<pb::ListpaysResponse>, Status> {
490        self.inner.list_pays(r).await
491    }
492
493    async fn ping(
494        &self,
495        r: Request<pb::PingRequest>,
496    ) -> Result<Response<pb::PingResponse>, Status> {
497        self.inner.ping(r).await
498    }
499
500    async fn set_channel(
501        &self,
502        r: Request<pb::SetchannelRequest>,
503    ) -> Result<Response<pb::SetchannelResponse>, Status> {
504        self.inner.set_channel(r).await
505    }
506
507    async fn sign_message(
508        &self,
509        r: Request<pb::SignmessageRequest>,
510    ) -> Result<Response<pb::SignmessageResponse>, Status> {
511        self.inner.sign_message(r).await
512    }
513
514    async fn stop(
515        &self,
516        r: Request<pb::StopRequest>,
517    ) -> Result<Response<pb::StopResponse>, Status> {
518        self.inner.stop(r).await
519    }
520
521    async fn static_backup(
522        &self,
523        r: Request<pb::StaticbackupRequest>,
524    ) -> Result<Response<pb::StaticbackupResponse>, Status> {
525        self.inner.static_backup(r).await
526    }
527
528    async fn list_htlcs(
529        &self,
530        r: Request<pb::ListhtlcsRequest>,
531    ) -> Result<Response<pb::ListhtlcsResponse>, Status> {
532        self.inner.list_htlcs(r).await
533    }
534
535    async fn datastore_usage(
536        &self,
537        r: Request<pb::DatastoreusageRequest>,
538    ) -> Result<Response<pb::DatastoreusageResponse>, Status> {
539        self.inner.datastore_usage(r).await
540    }
541
542    async fn fetch_invoice(
543        &self,
544        request: tonic::Request<pb::FetchinvoiceRequest>,
545    ) -> Result<tonic::Response<pb::FetchinvoiceResponse>, tonic::Status> {
546        self.inner.fetch_invoice(request).await
547    }
548
549    async fn wait(
550        &self,
551        request: tonic::Request<pb::WaitRequest>,
552    ) -> Result<tonic::Response<pb::WaitResponse>, tonic::Status> {
553        self.inner.wait(request).await
554    }
555}
556
557impl WrappedNodeServer {
558    async fn get_routehints(&self, rpc: &mut LightningClient) -> Result<Vec<pb::Routehint>, Error> {
559        use crate::responses::Peer;
560
561        // Get a list of active channels to peers so we can filter out
562        // offline peers or peers with unconfirmed or closing
563        // channels.
564        let res = rpc
565            .listpeers(None)
566            .await?
567            .peers
568            .into_iter()
569            .filter(|p| p.channels.len() > 0)
570            .collect::<Vec<Peer>>();
571
572        // Now project channels to their state and flatten into a vec
573        // of short_channel_ids
574        let active: Vec<String> = res
575            .iter()
576            .map(|p| {
577                p.channels
578                    .iter()
579                    .filter(|c| c.state == "CHANNELD_NORMAL")
580                    .filter_map(|c| c.clone().short_channel_id)
581            })
582            .flatten()
583            .collect();
584
585        /* Due to a bug in `listincoming` in CLN we get the real
586         * `short_channel_id`, whereas we're supposed to use the
587         * remote alias if the channel is unannounced. This patches
588         * the issue in GL, and should work transparently once we fix
589         * `listincoming`. */
590        use std::collections::HashMap;
591        let aliases: HashMap<String, String> = HashMap::from_iter(
592            res.iter()
593                .map(|p| {
594                    p.channels
595                        .iter()
596                        .filter(|c| {
597                            c.short_channel_id.is_some()
598                                && c.alias.is_some()
599                                && c.alias.as_ref().unwrap().remote.is_some()
600                        })
601                        .map(|c| {
602                            (
603                                c.short_channel_id.clone().unwrap(),
604                                c.alias.clone().unwrap().remote.unwrap(),
605                            )
606                        })
607                })
608                .flatten(),
609        );
610
611        // Now we can listincoming, filter with the above active list,
612        // and then map to become `pb::Routehint` instances
613        Ok(rpc
614            .listincoming()
615            .await?
616            .incoming
617            .into_iter()
618            .filter(|i| active.contains(&i.short_channel_id))
619            .map(|i| {
620                let base: Option<cln_rpc::primitives::Amount> =
621                    i.fee_base_msat.as_str().try_into().ok();
622
623                pb::Routehint {
624                    hops: vec![pb::RouteHop {
625                        id: hex::decode(i.id).expect("hex-decoding node_id"),
626                        short_channel_id: aliases
627                            .get(&i.short_channel_id)
628                            .or(Some(&i.short_channel_id))
629                            .unwrap()
630                            .to_owned(),
631                        feebase: base.map(|b| b.into()),
632                        feeprop: i.fee_proportional_millionths,
633                        expirydelta: i.cltv_expiry_delta,
634                    }],
635                }
636            })
637            .collect())
638    }
639}
640
641use crate::pb::{
642    node_server::Node as GlNode, Custommsg, Empty, HsmRequest, HsmResponse, IncomingPayment,
643    LogEntry, StreamCustommsgRequest, StreamIncomingFilter, StreamLogRequest,
644};
645use tokio_stream::wrappers::ReceiverStream;
646
647#[tonic::async_trait]
648impl GlNode for WrappedNodeServer {
649    type StreamCustommsgStream = ReceiverStream<Result<Custommsg, Status>>;
650    type StreamHsmRequestsStream = ReceiverStream<Result<HsmRequest, Status>>;
651    type StreamLogStream = ReceiverStream<Result<LogEntry, Status>>;
652    type StreamIncomingStream = ReceiverStream<Result<IncomingPayment, Status>>;
653
654    async fn stream_incoming(
655        &self,
656        req: tonic::Request<StreamIncomingFilter>,
657    ) -> Result<Response<Self::StreamIncomingStream>, Status> {
658        self.node_server.stream_incoming(req).await
659    }
660
661    async fn respond_hsm_request(
662        &self,
663        req: Request<HsmResponse>,
664    ) -> Result<Response<Empty>, Status> {
665        self.node_server.respond_hsm_request(req).await
666    }
667
668    async fn stream_hsm_requests(
669        &self,
670        req: Request<Empty>,
671    ) -> Result<Response<Self::StreamHsmRequestsStream>, Status> {
672        // Best Effort reconnection logic
673        let s = self.node_server.clone();
674
675        // First though call the `node_server` which records the
676        // signer being present.
677        let res = self.node_server.stream_hsm_requests(req).await;
678        tokio::spawn(async move { s.reconnect_peers().await });
679
680        res
681    }
682
683    async fn stream_log(
684        &self,
685        req: Request<StreamLogRequest>,
686    ) -> Result<Response<Self::StreamLogStream>, Status> {
687        self.node_server.stream_log(req).await
688    }
689
690    async fn stream_custommsg(
691        &self,
692        req: Request<StreamCustommsgRequest>,
693    ) -> Result<Response<Self::StreamCustommsgStream>, Status> {
694        self.node_server.stream_custommsg(req).await
695    }
696
697    async fn configure(
698        &self,
699        request: tonic::Request<crate::pb::GlConfig>,
700    ) -> Result<tonic::Response<crate::pb::Empty>, tonic::Status> {
701        self.node_server.configure(request).await
702    }
703}