1use crate::config::Config;
2use crate::pb::{self, node_server::Node};
3use crate::rpc::LightningClient;
4use crate::stager;
5use crate::storage::StateStore;
6use crate::{messages, Event};
7use anyhow::{Context, Error, Result};
8use base64::{engine::general_purpose, Engine as _};
9use bytes::BufMut;
10use bs_gl_client::persist::State;
11use governor::{
12 clock::MonotonicClock, state::direct::NotKeyed, state::InMemoryState, Quota, RateLimiter,
13};
14use lazy_static::lazy_static;
15use log::{debug, error, info, trace, warn};
16use serde_json::json;
17use std::path::PathBuf;
18use std::sync::atomic::AtomicBool;
19use std::sync::{
20 atomic::{AtomicUsize, Ordering},
21 Arc,
22};
23use tokio::sync::{broadcast, mpsc, Mutex, OnceCell};
24use tokio_stream::wrappers::ReceiverStream;
25use tonic::{transport::ServerTlsConfig, Code, Request, Response, Status};
26mod wrapper;
27pub use wrapper::WrappedNodeServer;
28use bs_gl_client::bitcoin;
29use std::str::FromStr;
30
31
32static LIMITER: OnceCell<RateLimiter<NotKeyed, InMemoryState, MonotonicClock>> =
33 OnceCell::const_new();
34
35lazy_static! {
36 static ref HSM_ID_COUNT: AtomicUsize = AtomicUsize::new(0);
37
38 static ref SIGNER_COUNT: AtomicUsize = AtomicUsize::new(0);
42 static ref RPC_BCAST: broadcast::Sender<super::Event> = broadcast::channel(4).0;
43
44 static ref SERIALIZED_CONFIGURE_REQUEST: Mutex<Option<String>> = Mutex::new(None);
45
46 static ref RPC_READY: AtomicBool = AtomicBool::new(false);
47}
48
49#[derive(Clone)]
55pub struct PluginNodeServer {
56 pub tls: ServerTlsConfig,
57 pub stage: Arc<stager::Stage>,
58 pub rpc: Arc<Mutex<LightningClient>>,
59 rpc_path: PathBuf,
60 events: tokio::sync::broadcast::Sender<super::Event>,
61 signer_state: Arc<Mutex<State>>,
62 grpc_binding: String,
63 signer_state_store: Arc<Mutex<Box<dyn StateStore>>>,
64 pub ctx: crate::context::Context,
65}
66
67impl PluginNodeServer {
68 pub async fn new(
69 stage: Arc<stager::Stage>,
70 config: Config,
71 events: tokio::sync::broadcast::Sender<super::Event>,
72 signer_state_store: Box<dyn StateStore>,
73 ) -> Result<Self, Error> {
74 let tls = ServerTlsConfig::new()
75 .identity(config.identity.id)
76 .client_ca_root(config.identity.ca);
77
78 let mut rpc_path = std::env::current_dir().unwrap();
79 rpc_path.push("lightning-rpc");
80 info!("Connecting to lightning-rpc at {:?}", rpc_path);
81
82 let rpc = Arc::new(Mutex::new(LightningClient::new(rpc_path.clone())));
83
84 let tx = events.clone();
86 tokio::spawn(async move {
87 let mut rx = RPC_BCAST.subscribe();
88 loop {
89 if let Ok(e) = rx.recv().await {
90 let _ = tx.send(e);
91 }
92 }
93 });
94
95 let signer_state = signer_state_store.read().await?;
96
97 let ctx = crate::context::Context::new();
98
99 let rrpc = rpc.clone();
100
101 let s = PluginNodeServer {
102 ctx,
103 tls,
104 rpc,
105 stage,
106 events,
107 rpc_path,
108 signer_state: Arc::new(Mutex::new(signer_state)),
109 signer_state_store: Arc::new(Mutex::new(signer_state_store)),
110 grpc_binding: config.node_grpc_binding,
111 };
112
113 tokio::spawn(async move {
114 debug!("Locking grpc interface until the JSON-RPC interface becomes available.");
115 use tokio::time::{sleep, Duration};
116
117 let rpc = rrpc.lock().await;
119 loop {
120 let res: Result<crate::responses::GetInfo, crate::rpc::Error> =
121 rpc.call("getinfo", json!({})).await;
122 match res {
123 Ok(_) => break,
124 Err(e) => {
125 warn!(
126 "JSON-RPC interface not yet available. Delaying 50ms. {:?}",
127 e
128 );
129 sleep(Duration::from_millis(50)).await;
130 }
131 }
132 }
133
134 RPC_READY.store(true, Ordering::SeqCst);
136
137 let list_datastore_req = cln_rpc::model::requests::ListdatastoreRequest{
138 key: Some(vec![
139 "glconf".to_string(),
140 "request".to_string()
141 ])
142 };
143
144 let res: Result<cln_rpc::model::responses::ListdatastoreResponse, crate::rpc::Error> =
145 rpc.call("listdatastore", list_datastore_req).await;
146
147 match res {
148 Ok(list_datastore_res) => {
149 if list_datastore_res.datastore.len() > 0 {
150 let serialized_configure_request = list_datastore_res.datastore[0].string.clone();
151 match serialized_configure_request {
152 Some(serialized_configure_request) => {
153 let mut cached_serialized_configure_request = SERIALIZED_CONFIGURE_REQUEST.lock().await;
154 *cached_serialized_configure_request = Some(serialized_configure_request);
155 }
156 None => {}
157 }
158 }
159 }
160 Err(_) => {}
161 }
162
163 drop(rpc);
164 });
165
166 Ok(s)
167 }
168
169 pub async fn limit(&self) {
171 let limiter = LIMITER
172 .get_or_init(|| async {
173 let quota = Quota::per_minute(core::num::NonZeroU32::new(300).unwrap());
174 RateLimiter::direct_with_clock(quota, &MonotonicClock::default())
175 })
176 .await;
177
178 limiter.until_ready().await
179 }
180
181 pub async fn get_rpc(&self) -> LightningClient {
182 let rpc = self.rpc.lock().await;
183 let r = rpc.clone();
184 drop(rpc);
185 r
186 }
187}
188
189#[tonic::async_trait]
190impl Node for PluginNodeServer {
191 type StreamCustommsgStream = ReceiverStream<Result<pb::Custommsg, Status>>;
192 type StreamHsmRequestsStream = ReceiverStream<Result<pb::HsmRequest, Status>>;
193 type StreamLogStream = ReceiverStream<Result<pb::LogEntry, Status>>;
194
195 async fn stream_custommsg(
196 &self,
197 _: Request<pb::StreamCustommsgRequest>,
198 ) -> Result<Response<Self::StreamCustommsgStream>, Status> {
199 log::debug!("Added a new listener for custommsg");
200 let (tx, rx) = mpsc::channel(1);
201 let mut stream = self.events.subscribe();
202 tokio::spawn(async move {
206 while let Ok(msg) = stream.recv().await {
207 if let Event::CustomMsg(m) = msg {
208 log::trace!("Forwarding custommsg {:?} to listener", m);
209 if let Err(e) = tx.send(Ok(m)).await {
210 log::warn!("Unable to send custmmsg to listener: {:?}", e);
211 break;
212 }
213 }
214 }
215 panic!("stream.recv loop exited...");
216 });
217 return Ok(Response::new(ReceiverStream::new(rx)));
218 }
219
220 async fn stream_log(
221 &self,
222 _: Request<pb::StreamLogRequest>,
223 ) -> Result<Response<Self::StreamLogStream>, Status> {
224 match async {
225 let (tx, rx) = mpsc::channel(1);
226 let mut lines = linemux::MuxedLines::new()?;
227 lines.add_file("/tmp/log").await?;
228
229 use tokio::io::{AsyncBufReadExt, BufReader};
233 let file = tokio::fs::File::open("/tmp/log").await?;
234 let mut file = BufReader::new(file).lines();
235
236 tokio::spawn(async move {
237 match async {
238 while let Some(line) = file.next_line().await? {
239 tx.send(Ok(pb::LogEntry {
240 line: line.trim().to_owned(),
241 }))
242 .await?
243 }
244
245 while let Ok(Some(line)) = lines.next_line().await {
246 tx.send(Ok(pb::LogEntry {
247 line: line.line().trim().to_string(),
248 }))
249 .await?;
250 }
251 Ok(())
252 }
253 .await as Result<(), anyhow::Error>
254 {
255 Ok(()) => {}
256 Err(e) => {
257 warn!("error streaming logs to client: {}", e);
258 }
259 }
260 });
261 Ok(ReceiverStream::new(rx))
262 }
263 .await as Result<Self::StreamLogStream, anyhow::Error>
264 {
265 Ok(v) => Ok(Response::new(v)),
266 Err(e) => Err(Status::new(Code::Internal, e.to_string())),
267 }
268 }
269
270 async fn stream_hsm_requests(
271 &self,
272 _request: Request<pb::Empty>,
273 ) -> Result<Response<Self::StreamHsmRequestsStream>, Status> {
274 let hsm_id = HSM_ID_COUNT.fetch_add(1, Ordering::SeqCst);
275 SIGNER_COUNT.fetch_add(1, Ordering::SeqCst);
276 info!(
277 "New signer with hsm_id={} attached, streaming requests",
278 hsm_id
279 );
280
281 let (tx, rx) = mpsc::channel(10);
282 let mut stream = self.stage.mystream().await;
283 let signer_state = self.signer_state.clone();
284 let ctx = self.ctx.clone();
285
286 tokio::spawn(async move {
287 trace!("hsmd hsm_id={} request processor started", hsm_id);
288
289 {
290 let state = signer_state.lock().await.clone();
300 let state: Vec<bs_gl_client::pb::SignerStateEntry> = state.into();
301 let state: Vec<pb::SignerStateEntry> = state
302 .into_iter()
303 .map(|s| pb::SignerStateEntry {
304 key: s.key,
305 version: s.version,
306 value: s.value,
307 })
308 .collect();
309
310 let msg = vls_protocol::msgs::GetHeartbeat {};
311 use vls_protocol::msgs::SerBolt;
312 let req = crate::pb::HsmRequest {
313 request_id: 0,
316 signer_state: state,
317 raw: msg.as_vec(),
318 requests: vec![], context: None,
320 };
321
322 if let Err(e) = tx.send(Ok(req)).await {
323 log::warn!("Failed to send heartbeat message to signer: {}", e);
324 }
325 }
326
327 loop {
328 let mut req = match stream.next().await {
329 Err(e) => {
330 error!(
331 "Could not get next request from stage: {:?} for hsm_id={}",
332 e, hsm_id
333 );
334 break;
335 }
336 Ok(r) => r,
337 };
338 trace!(
339 "Sending request={} to hsm_id={}",
340 req.request.request_id,
341 hsm_id
342 );
343
344 let state = signer_state.lock().await.clone();
345 let state: Vec<bs_gl_client::pb::SignerStateEntry> = state.into();
346
347 let state: Vec<pb::SignerStateEntry> = state
349 .into_iter()
350 .map(|s| pb::SignerStateEntry {
351 key: s.key,
352 version: s.version,
353 value: s.value,
354 })
355 .collect();
356
357 req.request.signer_state = state.into();
358 req.request.requests = ctx.snapshot().await.into_iter().map(|r| r.into()).collect();
359
360 let serialized_configure_request = SERIALIZED_CONFIGURE_REQUEST.lock().await;
361
362 match &(*serialized_configure_request) {
363 Some(serialized_configure_request) => {
364 let configure_request = serde_json::from_str::<crate::context::Request>(
365 serialized_configure_request,
366 )
367 .unwrap();
368 req.request.requests.push(configure_request.into());
369 }
370 None => {}
371 }
372
373 debug!(
374 "Sending signer requests with {} requests and {} state entries",
375 req.request.requests.len(),
376 req.request.signer_state.len()
377 );
378
379 if let Err(e) = tx.send(Ok(req.request)).await {
380 warn!("Error streaming request {:?} to hsm_id={}", e, hsm_id);
381 break;
382 }
383 }
384 info!("Signer hsm_id={} exited", hsm_id);
385 SIGNER_COUNT.fetch_sub(1, Ordering::SeqCst);
386 });
387
388 trace!("Returning stream_hsm_request channel");
389 Ok(Response::new(ReceiverStream::new(rx)))
390 }
391
392 async fn respond_hsm_request(
393 &self,
394 request: Request<pb::HsmResponse>,
395 ) -> Result<Response<pb::Empty>, Status> {
396 let req = request.into_inner();
397 let signer_state: Vec<bs_gl_client::pb::SignerStateEntry> = req
401 .signer_state
402 .iter()
403 .map(|i| bs_gl_client::pb::SignerStateEntry {
404 key: i.key.to_owned(),
405 value: i.value.to_owned(),
406 version: i.version,
407 })
408 .collect();
409 let new_state: bs_gl_client::persist::State = signer_state.into();
410
411 {
412 let mut state = self.signer_state.lock().await;
414 state.merge(&new_state).map_err(|e| {
415 Status::new(
416 Code::Internal,
417 format!("Error updating internal state: {e}"),
418 )
419 })?;
420
421 self.signer_state_store
423 .lock()
424 .await
425 .write(state.clone())
426 .await
427 .map_err(|e| {
428 Status::new(
429 Code::Internal,
430 format!("error persisting state changes: {}", e),
431 )
432 })?;
433 }
434
435 if let Err(e) = self.stage.respond(req).await {
436 warn!("Suppressing error: {:?}", e);
437 }
438 Ok(Response::new(pb::Empty::default()))
439 }
440
441 type StreamIncomingStream = ReceiverStream<Result<pb::IncomingPayment, Status>>;
442
443 async fn stream_incoming(
444 &self,
445 _req: tonic::Request<pb::StreamIncomingFilter>,
446 ) -> Result<Response<Self::StreamIncomingStream>, Status> {
447 let (tx, rx) = mpsc::channel(1);
450 let mut bcast = self.events.subscribe();
451 tokio::spawn(async move {
452 while let Ok(p) = bcast.recv().await {
453 match p {
454 super::Event::IncomingPayment(p) => {
455 let _ = tx.send(Ok(p)).await;
456 }
457 _ => {}
458 }
459 }
460 });
461
462 return Ok(Response::new(ReceiverStream::new(rx)));
463 }
464
465 async fn configure(&self, req: tonic::Request<pb::GlConfig>) -> Result<Response<pb::Empty>, Status> {
466 self.limit().await;
467 let gl_config = req.into_inner();
468 let rpc = self.get_rpc().await;
469
470 let res: Result<crate::responses::GetInfo, crate::rpc::Error> =
471 rpc.call("getinfo", json!({})).await;
472
473 let network = match res {
474 Ok(get_info_response) => match get_info_response.network.parse() {
475 Ok(v) => v,
476 Err(_) => Err(Status::new(
477 Code::Unknown,
478 format!("Failed to parse 'network' from 'getinfo' response"),
479 ))?,
480 },
481 Err(e) => {
482 return Err(Status::new(
483 Code::Unknown,
484 format!("Failed to retrieve a response from 'getinfo' while setting the node's configuration: {}", e),
485 ));
486 }
487 };
488
489 match bitcoin::Address::from_str(&gl_config.close_to_addr) {
490 Ok(address) => {
491 if address.network != network {
492 return Err(Status::new(
493 Code::Unknown,
494 format!(
495 "Network mismatch: \
496 Expected an address for {} but received an address for {}",
497 network,
498 address.network
499 ),
500 ));
501 }
502 }
503 Err(e) => {
504 return Err(Status::new(
505 Code::Unknown,
506 format!("The address {} is not valid: {}", gl_config.close_to_addr, e),
507 ));
508 }
509 }
510
511 let requests: Vec<crate::context::Request> = self.ctx.snapshot().await.into_iter().map(|r| r.into()).collect();
512 let serialized_req = serde_json::to_string(&requests[0]).unwrap();
513 let datastore_res: Result<crate::cln_rpc::model::responses::DatastoreResponse, crate::rpc::Error> =
514 rpc.call("datastore", json!({
515 "key": vec![
516 "glconf".to_string(),
517 "request".to_string(),
518 ],
519 "string": serialized_req,
520 })).await;
521
522 match datastore_res {
523 Ok(_) => {
524 let mut cached_gl_config = SERIALIZED_CONFIGURE_REQUEST.lock().await;
525 *cached_gl_config = Some(serialized_req);
526
527 Ok(Response::new(pb::Empty::default()))
528 }
529 Err(e) => {
530 return Err(Status::new(
531 Code::Unknown,
532 format!("Failed to store the raw configure request in the datastore: {}", e),
533 ))
534 }
535 }
536 }
537}
538
539use cln_grpc::pb::node_server::NodeServer;
540
541impl PluginNodeServer {
542 pub async fn run(self) -> Result<()> {
543 let addr = self.grpc_binding.parse().unwrap();
544
545 let cln_node = NodeServer::new(
546 WrappedNodeServer::new(self.clone())
547 .await
548 .context("creating NodeServer instance")?,
549 );
550
551 let router = tonic::transport::Server::builder()
552 .max_frame_size(4 * 1024 * 1024) .tcp_keepalive(Some(tokio::time::Duration::from_secs(1)))
554 .tls_config(self.tls.clone())?
555 .layer(SignatureContextLayer {
556 ctx: self.ctx.clone(),
557 })
558 .add_service(RpcWaitService::new(cln_node, self.rpc_path.clone()))
559 .add_service(crate::pb::node_server::NodeServer::new(self.clone()));
560
561 router
562 .serve(addr)
563 .await
564 .context("grpc interface exited with error")
565 }
566
567 pub async fn reconnect_peers(&self) -> Result<(), Error> {
570 if SIGNER_COUNT.load(Ordering::SeqCst) < 1 {
571 use anyhow::anyhow;
572 return Err(anyhow!(
573 "Cannot reconnect peers, no signer to complete the handshake"
574 ));
575 }
576
577 log::info!("Reconnecting all peers (plugin)");
578 let mut rpc = cln_rpc::ClnRpc::new(self.rpc_path.clone()).await?;
579 let peers = self.get_reconnect_peers().await?;
580 log::info!(
581 "Found {} peers to reconnect: {:?} (plugin)",
582 peers.len(),
583 peers.iter().map(|p| p.id.clone())
584 );
585
586 for r in peers {
587 trace!("Calling connect: {:?} (plugin)", &r.id);
588 let res = rpc.call_typed(&r).await;
589 trace!("Connect returned: {:?} -> {:?} (plugin)", &r.id, res);
590
591 match res {
592 Ok(r) => info!("Connection to {} established: {:?} (plugin)", &r.id, r),
593 Err(e) => warn!("Could not connect to {}: {:?} (plugin)", &r.id, e),
594 }
595 }
596 return Ok(());
597 }
598
599 async fn get_reconnect_peers(
600 &self,
601 ) -> Result<Vec<cln_rpc::model::requests::ConnectRequest>, Error> {
602 let rpc_path = self.rpc_path.clone();
603 let mut rpc = cln_rpc::ClnRpc::new(rpc_path).await?;
604 let peers = rpc
605 .call_typed(&cln_rpc::model::requests::ListpeersRequest {
606 id: None,
607 level: None,
608 })
609 .await?;
610
611 let mut requests: Vec<cln_rpc::model::requests::ConnectRequest> = peers
612 .peers
613 .iter()
614 .filter(|&p| p.connected)
615 .map(|p| cln_rpc::model::requests::ConnectRequest {
616 id: p.id.to_string(),
617 host: None,
618 port: None,
619 })
620 .collect();
621
622 let mut dspeers: Vec<cln_rpc::model::requests::ConnectRequest> = rpc
623 .call_typed(&cln_rpc::model::requests::ListdatastoreRequest {
624 key: Some(vec!["greenlight".to_string(), "peerlist".to_string()]),
625 })
626 .await?
627 .datastore
628 .iter()
629 .map(|x| {
630 let mut s = x.string.clone().unwrap();
634 s = s.replace('\\', "");
635 serde_json::from_str::<messages::Peer>(&s).unwrap()
636 })
637 .map(|x| cln_rpc::model::requests::ConnectRequest {
638 id: x.id,
639 host: Some(x.addr),
640 port: None,
641 })
642 .collect();
643
644 requests.append(&mut dspeers);
646 requests.sort_by(|a, b| a.id.cmp(&b.id));
647 requests.dedup_by(|a, b| a.id.eq(&b.id));
648
649 Ok(requests)
650 }
651}
652
653use tower::{Layer, Service};
654
655#[derive(Debug, Clone)]
656pub struct SignatureContextLayer {
657 ctx: crate::context::Context,
658}
659
660impl SignatureContextLayer {
661 pub fn new(context: crate::context::Context) -> Self {
662 SignatureContextLayer { ctx: context }
663 }
664}
665
666impl<S> Layer<S> for SignatureContextLayer {
667 type Service = SignatureContextService<S>;
668
669 fn layer(&self, service: S) -> Self::Service {
670 SignatureContextService {
671 inner: service,
672 ctx: self.ctx.clone(),
673 }
674 }
675}
676
677const MAX_MESSAGE_SIZE: usize = 4000000;
679
680#[derive(Debug, Clone)]
681pub struct SignatureContextService<S> {
682 inner: S,
683 ctx: crate::context::Context,
684}
685
686impl<S> Service<hyper::Request<hyper::Body>> for SignatureContextService<S>
687where
688 S: Service<hyper::Request<hyper::Body>, Response = hyper::Response<tonic::body::BoxBody>>
689 + Clone
690 + Send
691 + 'static,
692 S::Future: Send + 'static,
693 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
694{
695 type Response = S::Response;
696 type Error = Box<dyn std::error::Error + Send + Sync>;
697 type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
698
699 fn poll_ready(
700 &mut self,
701 cx: &mut std::task::Context<'_>,
702 ) -> std::task::Poll<Result<(), Self::Error>> {
703 self.inner.poll_ready(cx).map_err(Into::into)
704 }
705
706 fn call(&mut self, request: hyper::Request<hyper::Body>) -> Self::Future {
707 let clone = self.inner.clone();
711 let mut inner = std::mem::replace(&mut self.inner, clone);
712 let reqctx = self.ctx.clone();
713
714 Box::pin(async move {
715 use tonic::codegen::Body;
716 let (parts, mut body) = request.into_parts();
717
718 let uri = parts.uri.path_and_query().unwrap();
719 let _ = RPC_BCAST
720 .clone()
721 .send(super::Event::RpcCall(uri.to_string()));
722
723 let pubkey = parts
724 .headers
725 .get("glauthpubkey")
726 .and_then(|k| general_purpose::STANDARD_NO_PAD.decode(k).ok());
727
728 let sig = parts
729 .headers
730 .get("glauthsig")
731 .and_then(|k| general_purpose::STANDARD_NO_PAD.decode(k).ok());
732
733 use bytes::Buf;
734 let timestamp: Option<u64> = parts
735 .headers
736 .get("glts")
737 .and_then(|k| general_purpose::STANDARD_NO_PAD.decode(k).ok())
738 .map(|s| s.as_slice().get_u64());
739
740 let rune = parts
741 .headers
742 .get("glrune")
743 .and_then(|k| general_purpose::URL_SAFE.decode(k).ok());
744
745 if let (Some(pk), Some(sig), Some(rune)) = (pubkey, sig, rune) {
746 let mut buf = Vec::new();
749 while let Some(chunk) = body.data().await {
750 let chunk = chunk.unwrap();
751 if buf.len() + chunk.len() > MAX_MESSAGE_SIZE {
754 debug!("Message {} exceeds size limit", uri.path().to_string());
755 return Ok(tonic::Status::new(
756 tonic::Code::InvalidArgument,
757 format!("payload too large"),
758 )
759 .to_http());
760 }
761 buf.put(chunk);
762 }
763
764 trace!(
765 "Got a request for {} with pubkey={}, sig={}, rune={} and body size={:?}",
766 uri,
767 hex::encode(&pk),
768 hex::encode(&sig),
769 hex::encode(&rune),
770 &buf.len(),
771 );
772 let req = crate::context::Request::new(
773 uri.to_string(),
774 <bytes::Bytes>::from(buf.clone()),
775 pk,
776 sig,
777 timestamp,
778 rune,
779 );
780
781 reqctx.add_request(req.clone()).await;
782
783 let body: hyper::Body = buf.into();
784 let request = hyper::Request::from_parts(parts, body);
785 let res = inner.call(request).await;
786
787 tokio::spawn(async move {
792 reqctx.remove_request(req).await;
793 });
794 res.map_err(Into::into)
795 } else {
796 let request = hyper::Request::from_parts(parts, body);
799 inner.call(request).await.map_err(Into::into)
800 }
801 })
802 }
803}
804
805mod rpcwait;
806pub use rpcwait::RpcWaitService;