1use crate::LightningClient;
2use anyhow::Error;
3use cln_grpc::pb::{self, node_server::Node};
4use log::debug;
5use tonic::{Request, Response, Status};
6
7use super::PluginNodeServer;
8
9#[derive(Clone)]
15pub struct WrappedNodeServer {
16 inner: cln_grpc::Server,
17 node_server: PluginNodeServer,
18}
19
20impl WrappedNodeServer {
22 pub async fn new(node_server: PluginNodeServer) -> anyhow::Result<Self> {
23 let inner = cln_grpc::Server::new(&node_server.rpc_path).await?;
24 Ok(WrappedNodeServer { inner, node_server })
25 }
26}
27
28#[tonic::async_trait]
31impl Node for WrappedNodeServer {
32 async fn invoice(
33 &self,
34 req: Request<pb::InvoiceRequest>,
35 ) -> Result<Response<pb::InvoiceResponse>, Status> {
36 let req = req.into_inner();
37
38 use crate::rpc::LightningClient;
39 let mut rpc = LightningClient::new(self.node_server.rpc_path.clone());
40
41 let hints: Option<Vec<Vec<pb::RouteHop>>> = self
46 .get_routehints(&mut rpc)
47 .await
48 .map(
49 |v| {
51 v.into_iter()
52 .map(
53 |rh| rh.hops,
55 )
56 .collect()
57 },
58 )
59 .ok();
60
61 let mut pbreq: crate::requests::Invoice = match req.clone().try_into() {
62 Ok(v) => v,
63 Err(e) => {
64 return Err(Status::new(
65 tonic::Code::Internal,
66 format!(
67 "could not convert protobuf request into JSON-RPC request: {:?}",
68 e.to_string()
69 ),
70 ));
71 }
72 };
73 pbreq.dev_routes = hints.map(|v| {
74 v.into_iter()
75 .map(|e| e.into_iter().map(|ee| ee.into()).collect())
76 .collect()
77 });
78
79 pbreq.cltv = match pbreq.cltv {
80 Some(c) => Some(c), None => Some(144), };
83
84 let res: Result<crate::responses::Invoice, crate::rpc::Error> =
85 rpc.call("invoice", pbreq).await;
86
87 let res: Result<cln_grpc::pb::InvoiceResponse, tonic::Status> = res
88 .map(|r| cln_grpc::pb::InvoiceResponse::from(r))
89 .map_err(|e| {
90 tonic::Status::new(
91 tonic::Code::Internal,
92 format!("converting invoice response to grpc: {}", e),
93 )
94 });
95
96 res.map(|r| Response::new(r))
97 }
98
99 async fn getinfo(
100 &self,
101 r: Request<pb::GetinfoRequest>,
102 ) -> Result<Response<pb::GetinfoResponse>, Status> {
103 self.inner.getinfo(r).await
104 }
105
106 async fn list_offers(
107 &self,
108 r: Request<pb::ListoffersRequest>,
109 ) -> Result<Response<pb::ListoffersResponse>, Status> {
110 self.inner.list_offers(r).await
111 }
112
113 async fn offer(
114 &self,
115 r: Request<pb::OfferRequest>,
116 ) -> Result<Response<pb::OfferResponse>, Status> {
117 self.inner.offer(r).await
118 }
119
120 async fn bkpr_list_income(
121 &self,
122 r: Request<pb::BkprlistincomeRequest>,
123 ) -> Result<Response<pb::BkprlistincomeResponse>, Status> {
124 self.inner.bkpr_list_income(r).await
125 }
126
127 async fn list_peers(
128 &self,
129 r: Request<pb::ListpeersRequest>,
130 ) -> Result<Response<pb::ListpeersResponse>, Status> {
131 self.inner.list_peers(r).await
132 }
133
134 async fn list_peer_channels(
135 &self,
136 r: Request<pb::ListpeerchannelsRequest>,
137 ) -> Result<Response<pb::ListpeerchannelsResponse>, Status> {
138 self.inner.list_peer_channels(r).await
139 }
140
141 async fn list_closed_channels(
142 &self,
143 r: Request<pb::ListclosedchannelsRequest>,
144 ) -> Result<Response<pb::ListclosedchannelsResponse>, Status> {
145 self.inner.list_closed_channels(r).await
146 }
147
148 async fn list_funds(
149 &self,
150 r: Request<pb::ListfundsRequest>,
151 ) -> Result<Response<pb::ListfundsResponse>, Status> {
152 self.inner.list_funds(r).await
153 }
154
155 async fn decode_pay(
156 &self,
157 r: Request<pb::DecodepayRequest>,
158 ) -> Result<Response<pb::DecodepayResponse>, Status> {
159 self.inner.decode_pay(r).await
160 }
161
162 async fn decode(
163 &self,
164 r: Request<pb::DecodeRequest>,
165 ) -> Result<Response<pb::DecodeResponse>, Status> {
166 self.inner.decode(r).await
167 }
168
169 async fn sign_invoice(
170 &self,
171 r: Request<pb::SigninvoiceRequest>,
172 ) -> Result<Response<pb::SigninvoiceResponse>, Status> {
173 self.inner.sign_invoice(r).await
174 }
175 async fn pre_approve_keysend(
176 &self,
177 r: Request<pb::PreapprovekeysendRequest>,
178 ) -> Result<Response<pb::PreapprovekeysendResponse>, Status> {
179 self.inner.pre_approve_keysend(r).await
180 }
181
182 async fn pre_approve_invoice(
183 &self,
184 r: Request<pb::PreapproveinvoiceRequest>,
185 ) -> Result<Response<pb::PreapproveinvoiceResponse>, Status> {
186 self.inner.pre_approve_invoice(r).await
187 }
188
189 async fn send_custom_msg(
190 &self,
191 r: Request<pb::SendcustommsgRequest>,
192 ) -> Result<Response<pb::SendcustommsgResponse>, Status> {
193 self.inner.send_custom_msg(r).await
194 }
195
196 async fn send_pay(
197 &self,
198 r: Request<pb::SendpayRequest>,
199 ) -> Result<Response<pb::SendpayResponse>, Status> {
200 self.inner.send_pay(r).await
201 }
202
203 async fn list_channels(
204 &self,
205 r: Request<pb::ListchannelsRequest>,
206 ) -> Result<Response<pb::ListchannelsResponse>, Status> {
207 self.inner.list_channels(r).await
208 }
209
210 async fn add_gossip(
211 &self,
212 r: Request<pb::AddgossipRequest>,
213 ) -> Result<Response<pb::AddgossipResponse>, Status> {
214 self.inner.add_gossip(r).await
215 }
216
217 async fn auto_clean_invoice(
218 &self,
219 r: Request<pb::AutocleaninvoiceRequest>,
220 ) -> Result<Response<pb::AutocleaninvoiceResponse>, Status> {
221 self.inner.auto_clean_invoice(r).await
222 }
223
224 async fn check_message(
225 &self,
226 r: Request<pb::CheckmessageRequest>,
227 ) -> Result<Response<pb::CheckmessageResponse>, Status> {
228 self.inner.check_message(r).await
229 }
230
231 async fn close(
232 &self,
233 r: Request<pb::CloseRequest>,
234 ) -> Result<Response<pb::CloseResponse>, Status> {
235 self.inner.close(r).await
236 }
237
238 async fn connect_peer(
239 &self,
240 r: Request<pb::ConnectRequest>,
241 ) -> Result<Response<pb::ConnectResponse>, Status> {
242 self.inner.connect_peer(r).await
243 }
244
245 async fn create_invoice(
246 &self,
247 r: Request<pb::CreateinvoiceRequest>,
248 ) -> Result<Response<pb::CreateinvoiceResponse>, Status> {
249 self.inner.create_invoice(r).await
250 }
251
252 async fn datastore(
253 &self,
254 r: Request<pb::DatastoreRequest>,
255 ) -> Result<Response<pb::DatastoreResponse>, Status> {
256 self.inner.datastore(r).await
257 }
258
259 async fn create_onion(
260 &self,
261 r: Request<pb::CreateonionRequest>,
262 ) -> Result<Response<pb::CreateonionResponse>, Status> {
263 self.inner.create_onion(r).await
264 }
265
266 async fn del_datastore(
267 &self,
268 r: Request<pb::DeldatastoreRequest>,
269 ) -> Result<Response<pb::DeldatastoreResponse>, Status> {
270 self.inner.del_datastore(r).await
271 }
272
273 async fn del_expired_invoice(
274 &self,
275 r: Request<pb::DelexpiredinvoiceRequest>,
276 ) -> Result<Response<pb::DelexpiredinvoiceResponse>, Status> {
277 self.inner.del_expired_invoice(r).await
278 }
279
280 async fn del_invoice(
281 &self,
282 r: Request<pb::DelinvoiceRequest>,
283 ) -> Result<Response<pb::DelinvoiceResponse>, Status> {
284 self.inner.del_invoice(r).await
285 }
286
287 async fn list_datastore(
288 &self,
289 r: Request<pb::ListdatastoreRequest>,
290 ) -> Result<Response<pb::ListdatastoreResponse>, Status> {
291 self.inner.list_datastore(r).await
292 }
293
294 async fn list_invoices(
295 &self,
296 r: Request<pb::ListinvoicesRequest>,
297 ) -> Result<Response<pb::ListinvoicesResponse>, Status> {
298 self.inner.list_invoices(r).await
299 }
300
301 async fn send_onion(
302 &self,
303 r: Request<pb::SendonionRequest>,
304 ) -> Result<Response<pb::SendonionResponse>, Status> {
305 self.inner.send_onion(r).await
306 }
307
308 async fn list_send_pays(
309 &self,
310 r: Request<pb::ListsendpaysRequest>,
311 ) -> Result<Response<pb::ListsendpaysResponse>, Status> {
312 self.inner.list_send_pays(r).await
313 }
314
315 async fn list_transactions(
316 &self,
317 r: Request<pb::ListtransactionsRequest>,
318 ) -> Result<Response<pb::ListtransactionsResponse>, Status> {
319 self.inner.list_transactions(r).await
320 }
321
322 async fn pay(&self, r: Request<pb::PayRequest>) -> Result<Response<pb::PayResponse>, Status> {
323 self.inner.pay(r).await
324 }
325
326 async fn list_nodes(
327 &self,
328 r: Request<pb::ListnodesRequest>,
329 ) -> Result<Response<pb::ListnodesResponse>, Status> {
330 self.inner.list_nodes(r).await
331 }
332
333 async fn wait_any_invoice(
334 &self,
335 r: Request<pb::WaitanyinvoiceRequest>,
336 ) -> Result<Response<pb::WaitanyinvoiceResponse>, Status> {
337 self.inner.wait_any_invoice(r).await
338 }
339
340 async fn wait_invoice(
341 &self,
342 r: Request<pb::WaitinvoiceRequest>,
343 ) -> Result<Response<pb::WaitinvoiceResponse>, Status> {
344 self.inner.wait_invoice(r).await
345 }
346
347 async fn wait_send_pay(
348 &self,
349 r: Request<pb::WaitsendpayRequest>,
350 ) -> Result<Response<pb::WaitsendpayResponse>, Status> {
351 self.inner.wait_send_pay(r).await
352 }
353
354 async fn wait_block_height(
355 &self,
356 r: Request<pb::WaitblockheightRequest>,
357 ) -> Result<Response<pb::WaitblockheightResponse>, Status> {
358 self.inner.wait_block_height(r).await
359 }
360
361 async fn new_addr(
362 &self,
363 r: Request<pb::NewaddrRequest>,
364 ) -> Result<Response<pb::NewaddrResponse>, Status> {
365 self.inner.new_addr(r).await
366 }
367
368 async fn withdraw(
369 &self,
370 r: Request<pb::WithdrawRequest>,
371 ) -> Result<Response<pb::WithdrawResponse>, Status> {
372 self.inner.withdraw(r).await
373 }
374
375 async fn key_send(
376 &self,
377 r: Request<pb::KeysendRequest>,
378 ) -> Result<Response<pb::KeysendResponse>, Status> {
379 self.inner.key_send(r).await
380 }
381
382 async fn fund_psbt(
383 &self,
384 r: Request<pb::FundpsbtRequest>,
385 ) -> Result<Response<pb::FundpsbtResponse>, Status> {
386 self.inner.fund_psbt(r).await
387 }
388
389 async fn send_psbt(
390 &self,
391 r: Request<pb::SendpsbtRequest>,
392 ) -> Result<Response<pb::SendpsbtResponse>, Status> {
393 self.inner.send_psbt(r).await
394 }
395
396 async fn sign_psbt(
397 &self,
398 r: Request<pb::SignpsbtRequest>,
399 ) -> Result<Response<pb::SignpsbtResponse>, Status> {
400 self.inner.sign_psbt(r).await
401 }
402
403 async fn utxo_psbt(
404 &self,
405 r: Request<pb::UtxopsbtRequest>,
406 ) -> Result<Response<pb::UtxopsbtResponse>, Status> {
407 self.inner.utxo_psbt(r).await
408 }
409
410 async fn tx_discard(
411 &self,
412 r: Request<pb::TxdiscardRequest>,
413 ) -> Result<Response<pb::TxdiscardResponse>, Status> {
414 self.inner.tx_discard(r).await
415 }
416
417 async fn tx_prepare(
418 &self,
419 r: Request<pb::TxprepareRequest>,
420 ) -> Result<Response<pb::TxprepareResponse>, Status> {
421 self.inner.tx_prepare(r).await
422 }
423
424 async fn tx_send(
425 &self,
426 r: Request<pb::TxsendRequest>,
427 ) -> Result<Response<pb::TxsendResponse>, Status> {
428 self.inner.tx_send(r).await
429 }
430
431 async fn disconnect(
432 &self,
433 r: Request<pb::DisconnectRequest>,
434 ) -> Result<Response<pb::DisconnectResponse>, Status> {
435 let inner = r.into_inner();
436 let id = hex::encode(inner.id.clone());
437 debug!(
438 "Got a disconnect request for {}, try to delete it from the datastore peerlist.",
439 id
440 );
441
442 let data_res = self
446 .del_datastore(Request::new(pb::DeldatastoreRequest {
447 key: vec!["greenlight".to_string(), "peerlist".to_string(), id.clone()],
448 generation: None,
449 }))
450 .await;
451 if let Err(e) = data_res {
452 log::debug!("Could not delete peer {} from datastore: {}", id, e);
453 }
454
455 self.inner.disconnect(Request::new(inner.clone())).await
456 }
457
458 async fn feerates(
459 &self,
460 r: Request<pb::FeeratesRequest>,
461 ) -> Result<Response<pb::FeeratesResponse>, Status> {
462 self.inner.feerates(r).await
463 }
464
465 async fn fund_channel(
466 &self,
467 r: Request<pb::FundchannelRequest>,
468 ) -> Result<Response<pb::FundchannelResponse>, Status> {
469 self.inner.fund_channel(r).await
470 }
471
472 async fn get_route(
473 &self,
474 r: Request<pb::GetrouteRequest>,
475 ) -> Result<Response<pb::GetrouteResponse>, Status> {
476 self.inner.get_route(r).await
477 }
478
479 async fn list_forwards(
480 &self,
481 r: Request<pb::ListforwardsRequest>,
482 ) -> Result<Response<pb::ListforwardsResponse>, Status> {
483 self.inner.list_forwards(r).await
484 }
485
486 async fn list_pays(
487 &self,
488 r: Request<pb::ListpaysRequest>,
489 ) -> Result<Response<pb::ListpaysResponse>, Status> {
490 self.inner.list_pays(r).await
491 }
492
493 async fn ping(
494 &self,
495 r: Request<pb::PingRequest>,
496 ) -> Result<Response<pb::PingResponse>, Status> {
497 self.inner.ping(r).await
498 }
499
500 async fn set_channel(
501 &self,
502 r: Request<pb::SetchannelRequest>,
503 ) -> Result<Response<pb::SetchannelResponse>, Status> {
504 self.inner.set_channel(r).await
505 }
506
507 async fn sign_message(
508 &self,
509 r: Request<pb::SignmessageRequest>,
510 ) -> Result<Response<pb::SignmessageResponse>, Status> {
511 self.inner.sign_message(r).await
512 }
513
514 async fn stop(
515 &self,
516 r: Request<pb::StopRequest>,
517 ) -> Result<Response<pb::StopResponse>, Status> {
518 self.inner.stop(r).await
519 }
520
521 async fn static_backup(
522 &self,
523 r: Request<pb::StaticbackupRequest>,
524 ) -> Result<Response<pb::StaticbackupResponse>, Status> {
525 self.inner.static_backup(r).await
526 }
527
528 async fn list_htlcs(
529 &self,
530 r: Request<pb::ListhtlcsRequest>,
531 ) -> Result<Response<pb::ListhtlcsResponse>, Status> {
532 self.inner.list_htlcs(r).await
533 }
534
535 async fn datastore_usage(
536 &self,
537 r: Request<pb::DatastoreusageRequest>,
538 ) -> Result<Response<pb::DatastoreusageResponse>, Status> {
539 self.inner.datastore_usage(r).await
540 }
541
542 async fn fetch_invoice(
543 &self,
544 request: tonic::Request<pb::FetchinvoiceRequest>,
545 ) -> Result<tonic::Response<pb::FetchinvoiceResponse>, tonic::Status> {
546 self.inner.fetch_invoice(request).await
547 }
548
549 async fn wait(
550 &self,
551 request: tonic::Request<pb::WaitRequest>,
552 ) -> Result<tonic::Response<pb::WaitResponse>, tonic::Status> {
553 self.inner.wait(request).await
554 }
555}
556
557impl WrappedNodeServer {
558 async fn get_routehints(&self, rpc: &mut LightningClient) -> Result<Vec<pb::Routehint>, Error> {
559 use crate::responses::Peer;
560
561 let res = rpc
565 .listpeers(None)
566 .await?
567 .peers
568 .into_iter()
569 .filter(|p| p.channels.len() > 0)
570 .collect::<Vec<Peer>>();
571
572 let active: Vec<String> = res
575 .iter()
576 .map(|p| {
577 p.channels
578 .iter()
579 .filter(|c| c.state == "CHANNELD_NORMAL")
580 .filter_map(|c| c.clone().short_channel_id)
581 })
582 .flatten()
583 .collect();
584
585 use std::collections::HashMap;
591 let aliases: HashMap<String, String> = HashMap::from_iter(
592 res.iter()
593 .map(|p| {
594 p.channels
595 .iter()
596 .filter(|c| {
597 c.short_channel_id.is_some()
598 && c.alias.is_some()
599 && c.alias.as_ref().unwrap().remote.is_some()
600 })
601 .map(|c| {
602 (
603 c.short_channel_id.clone().unwrap(),
604 c.alias.clone().unwrap().remote.unwrap(),
605 )
606 })
607 })
608 .flatten(),
609 );
610
611 Ok(rpc
614 .listincoming()
615 .await?
616 .incoming
617 .into_iter()
618 .filter(|i| active.contains(&i.short_channel_id))
619 .map(|i| {
620 let base: Option<cln_rpc::primitives::Amount> =
621 i.fee_base_msat.as_str().try_into().ok();
622
623 pb::Routehint {
624 hops: vec![pb::RouteHop {
625 id: hex::decode(i.id).expect("hex-decoding node_id"),
626 short_channel_id: aliases
627 .get(&i.short_channel_id)
628 .or(Some(&i.short_channel_id))
629 .unwrap()
630 .to_owned(),
631 feebase: base.map(|b| b.into()),
632 feeprop: i.fee_proportional_millionths,
633 expirydelta: i.cltv_expiry_delta,
634 }],
635 }
636 })
637 .collect())
638 }
639}
640
641use crate::pb::{
642 node_server::Node as GlNode, Custommsg, Empty, HsmRequest, HsmResponse, IncomingPayment,
643 LogEntry, StreamCustommsgRequest, StreamIncomingFilter, StreamLogRequest,
644};
645use tokio_stream::wrappers::ReceiverStream;
646
647#[tonic::async_trait]
648impl GlNode for WrappedNodeServer {
649 type StreamCustommsgStream = ReceiverStream<Result<Custommsg, Status>>;
650 type StreamHsmRequestsStream = ReceiverStream<Result<HsmRequest, Status>>;
651 type StreamLogStream = ReceiverStream<Result<LogEntry, Status>>;
652 type StreamIncomingStream = ReceiverStream<Result<IncomingPayment, Status>>;
653
654 async fn stream_incoming(
655 &self,
656 req: tonic::Request<StreamIncomingFilter>,
657 ) -> Result<Response<Self::StreamIncomingStream>, Status> {
658 self.node_server.stream_incoming(req).await
659 }
660
661 async fn respond_hsm_request(
662 &self,
663 req: Request<HsmResponse>,
664 ) -> Result<Response<Empty>, Status> {
665 self.node_server.respond_hsm_request(req).await
666 }
667
668 async fn stream_hsm_requests(
669 &self,
670 req: Request<Empty>,
671 ) -> Result<Response<Self::StreamHsmRequestsStream>, Status> {
672 let s = self.node_server.clone();
674
675 let res = self.node_server.stream_hsm_requests(req).await;
678 tokio::spawn(async move { s.reconnect_peers().await });
679
680 res
681 }
682
683 async fn stream_log(
684 &self,
685 req: Request<StreamLogRequest>,
686 ) -> Result<Response<Self::StreamLogStream>, Status> {
687 self.node_server.stream_log(req).await
688 }
689
690 async fn stream_custommsg(
691 &self,
692 req: Request<StreamCustommsgRequest>,
693 ) -> Result<Response<Self::StreamCustommsgStream>, Status> {
694 self.node_server.stream_custommsg(req).await
695 }
696
697 async fn configure(
698 &self,
699 request: tonic::Request<crate::pb::GlConfig>,
700 ) -> Result<tonic::Response<crate::pb::Empty>, tonic::Status> {
701 self.node_server.configure(request).await
702 }
703}