1use crate::config::Config;
2use crate::pb::{self, node_server::Node};
3use crate::rpc::LightningClient;
4use crate::storage::StateStore;
5use crate::{messages, Event};
6use crate::{stager, tramp};
7use anyhow::{Context, Error, Result};
8use base64::{engine::general_purpose, Engine as _};
9use bytes::BufMut;
10use gl_client::bitcoin::hashes::hex::ToHex;
11use gl_client::persist::State;
12use governor::{
13 clock::MonotonicClock, state::direct::NotKeyed, state::InMemoryState, Quota, RateLimiter,
14};
15use lazy_static::lazy_static;
16use log::{debug, error, info, trace, warn};
17use serde_json::json;
18use std::path::PathBuf;
19use std::sync::atomic::AtomicBool;
20use std::sync::{
21 atomic::{AtomicUsize, Ordering},
22 Arc,
23};
24use tokio::sync::{broadcast, mpsc, Mutex, OnceCell};
25use tokio_stream::wrappers::ReceiverStream;
26use tonic::{transport::ServerTlsConfig, Code, Request, Response, Status};
27mod wrapper;
28use gl_client::bitcoin;
29use std::str::FromStr;
30pub use wrapper::WrappedNodeServer;
31
32static LIMITER: OnceCell<RateLimiter<NotKeyed, InMemoryState, MonotonicClock>> =
33 OnceCell::const_new();
34
35lazy_static! {
36 static ref HSM_ID_COUNT: AtomicUsize = AtomicUsize::new(0);
37
38 static ref SIGNER_COUNT: AtomicUsize = AtomicUsize::new(0);
42 static ref RPC_BCAST: broadcast::Sender<super::Event> = broadcast::channel(4).0;
43
44 static ref SERIALIZED_CONFIGURE_REQUEST: Mutex<Option<String>> = Mutex::new(None);
45
46 static ref RPC_READY: AtomicBool = AtomicBool::new(false);
47}
48
49#[derive(Clone)]
55pub struct PluginNodeServer {
56 pub tls: ServerTlsConfig,
57 pub stage: Arc<stager::Stage>,
58 pub rpc: Arc<Mutex<LightningClient>>,
59 rpc_path: PathBuf,
60 events: tokio::sync::broadcast::Sender<super::Event>,
61 signer_state: Arc<Mutex<State>>,
62 grpc_binding: String,
63 signer_state_store: Arc<Mutex<Box<dyn StateStore>>>,
64 pub ctx: crate::context::Context,
65}
66
67impl PluginNodeServer {
68 pub async fn new(
69 stage: Arc<stager::Stage>,
70 config: Config,
71 events: tokio::sync::broadcast::Sender<super::Event>,
72 signer_state_store: Box<dyn StateStore>,
73 ) -> Result<Self, Error> {
74 let tls = ServerTlsConfig::new()
75 .identity(config.identity.id)
76 .client_ca_root(config.identity.ca);
77
78 let mut rpc_path = std::env::current_dir().unwrap();
79 rpc_path.push("lightning-rpc");
80 info!("Connecting to lightning-rpc at {:?}", rpc_path);
81
82 let rpc = Arc::new(Mutex::new(LightningClient::new(rpc_path.clone())));
83
84 let tx = events.clone();
86 tokio::spawn(async move {
87 let mut rx = RPC_BCAST.subscribe();
88 loop {
89 if let Ok(e) = rx.recv().await {
90 let _ = tx.send(e);
91 }
92 }
93 });
94
95 let signer_state = signer_state_store.read().await?;
96
97 let ctx = crate::context::Context::new();
98
99 let rrpc = rpc.clone();
100
101 let s = PluginNodeServer {
102 ctx,
103 tls,
104 rpc,
105 stage,
106 events,
107 rpc_path,
108 signer_state: Arc::new(Mutex::new(signer_state)),
109 signer_state_store: Arc::new(Mutex::new(signer_state_store)),
110 grpc_binding: config.node_grpc_binding,
111 };
112
113 tokio::spawn(async move {
114 debug!("Locking grpc interface until the JSON-RPC interface becomes available.");
115 use tokio::time::{sleep, Duration};
116
117 let rpc = rrpc.lock().await;
119 loop {
120 let res: Result<crate::responses::GetInfo, crate::rpc::Error> =
121 rpc.call("getinfo", json!({})).await;
122 match res {
123 Ok(_) => break,
124 Err(e) => {
125 warn!(
126 "JSON-RPC interface not yet available. Delaying 50ms. {:?}",
127 e
128 );
129 sleep(Duration::from_millis(50)).await;
130 }
131 }
132 }
133
134 RPC_READY.store(true, Ordering::SeqCst);
136
137 let list_datastore_req = cln_rpc::model::requests::ListdatastoreRequest {
138 key: Some(vec!["glconf".to_string(), "request".to_string()]),
139 };
140
141 let res: Result<cln_rpc::model::responses::ListdatastoreResponse, crate::rpc::Error> =
142 rpc.call("listdatastore", list_datastore_req).await;
143
144 match res {
145 Ok(list_datastore_res) => {
146 if list_datastore_res.datastore.len() > 0 {
147 let serialized_configure_request =
148 list_datastore_res.datastore[0].string.clone();
149 match serialized_configure_request {
150 Some(serialized_configure_request) => {
151 let mut cached_serialized_configure_request =
152 SERIALIZED_CONFIGURE_REQUEST.lock().await;
153 *cached_serialized_configure_request =
154 Some(serialized_configure_request);
155 }
156 None => {}
157 }
158 }
159 }
160 Err(_) => {}
161 }
162
163 drop(rpc);
164 });
165
166 Ok(s)
167 }
168
169 pub async fn limit(&self) {
171 let limiter = LIMITER
172 .get_or_init(|| async {
173 let quota = Quota::per_minute(core::num::NonZeroU32::new(300).unwrap());
174 RateLimiter::direct_with_clock(quota, &MonotonicClock::default())
175 })
176 .await;
177
178 limiter.until_ready().await
179 }
180
181 pub async fn get_rpc(&self) -> LightningClient {
182 let rpc = self.rpc.lock().await;
183 let r = rpc.clone();
184 drop(rpc);
185 r
186 }
187}
188
189#[tonic::async_trait]
190impl Node for PluginNodeServer {
191 type StreamCustommsgStream = ReceiverStream<Result<pb::Custommsg, Status>>;
192 type StreamHsmRequestsStream = ReceiverStream<Result<pb::HsmRequest, Status>>;
193 type StreamLogStream = ReceiverStream<Result<pb::LogEntry, Status>>;
194
195 async fn stream_custommsg(
196 &self,
197 _: Request<pb::StreamCustommsgRequest>,
198 ) -> Result<Response<Self::StreamCustommsgStream>, Status> {
199 log::debug!("Added a new listener for custommsg");
200 let (tx, rx) = mpsc::channel(1);
201 let mut stream = self.events.subscribe();
202 tokio::spawn(async move {
206 while let Ok(msg) = stream.recv().await {
207 if let Event::CustomMsg(m) = msg {
208 log::trace!("Forwarding custommsg {:?} to listener", m);
209 if let Err(e) = tx.send(Ok(m)).await {
210 log::warn!("Unable to send custmmsg to listener: {:?}", e);
211 break;
212 }
213 }
214 }
215 panic!("stream.recv loop exited...");
216 });
217 return Ok(Response::new(ReceiverStream::new(rx)));
218 }
219
220 async fn stream_log(
221 &self,
222 _: Request<pb::StreamLogRequest>,
223 ) -> Result<Response<Self::StreamLogStream>, Status> {
224 match async {
225 let (tx, rx) = mpsc::channel(1);
226 let mut lines = linemux::MuxedLines::new()?;
227 lines.add_file("/tmp/log").await?;
228
229 use tokio::io::{AsyncBufReadExt, BufReader};
233 let file = tokio::fs::File::open("/tmp/log").await?;
234 let mut file = BufReader::new(file).lines();
235
236 tokio::spawn(async move {
237 match async {
238 while let Some(line) = file.next_line().await? {
239 tx.send(Ok(pb::LogEntry {
240 line: line.trim().to_owned(),
241 }))
242 .await?
243 }
244
245 while let Ok(Some(line)) = lines.next_line().await {
246 tx.send(Ok(pb::LogEntry {
247 line: line.line().trim().to_string(),
248 }))
249 .await?;
250 }
251 Ok(())
252 }
253 .await as Result<(), anyhow::Error>
254 {
255 Ok(()) => {}
256 Err(e) => {
257 warn!("error streaming logs to client: {}", e);
258 }
259 }
260 });
261 Ok(ReceiverStream::new(rx))
262 }
263 .await as Result<Self::StreamLogStream, anyhow::Error>
264 {
265 Ok(v) => Ok(Response::new(v)),
266 Err(e) => Err(Status::new(Code::Internal, e.to_string())),
267 }
268 }
269
270 async fn stream_hsm_requests(
271 &self,
272 _request: Request<pb::Empty>,
273 ) -> Result<Response<Self::StreamHsmRequestsStream>, Status> {
274 let hsm_id = HSM_ID_COUNT.fetch_add(1, Ordering::SeqCst);
275 SIGNER_COUNT.fetch_add(1, Ordering::SeqCst);
276 info!(
277 "New signer with hsm_id={} attached, streaming requests",
278 hsm_id
279 );
280
281 let (tx, rx) = mpsc::channel(10);
282 let mut stream = self.stage.mystream().await;
283 let signer_state = self.signer_state.clone();
284 let ctx = self.ctx.clone();
285
286 tokio::spawn(async move {
287 trace!("hsmd hsm_id={} request processor started", hsm_id);
288
289 {
290 let state = signer_state.lock().await.clone();
300 let state: Vec<gl_client::pb::SignerStateEntry> = state.into();
301 let state: Vec<pb::SignerStateEntry> = state
302 .into_iter()
303 .map(|s| pb::SignerStateEntry {
304 key: s.key,
305 version: s.version,
306 value: s.value,
307 })
308 .collect();
309
310 let msg = vls_protocol::msgs::GetHeartbeat {};
311 use vls_protocol::msgs::SerBolt;
312 let req = crate::pb::HsmRequest {
313 request_id: 0,
316 signer_state: state,
317 raw: msg.as_vec(),
318 requests: vec![], context: None,
320 };
321
322 if let Err(e) = tx.send(Ok(req)).await {
323 log::warn!("Failed to send heartbeat message to signer: {}", e);
324 }
325 }
326
327 loop {
328 let mut req = match stream.next().await {
329 Err(e) => {
330 error!(
331 "Could not get next request from stage: {:?} for hsm_id={}",
332 e, hsm_id
333 );
334 break;
335 }
336 Ok(r) => r,
337 };
338 trace!(
339 "Sending request={} to hsm_id={}",
340 req.request.request_id,
341 hsm_id
342 );
343
344 let state = signer_state.lock().await.clone();
345 let state: Vec<gl_client::pb::SignerStateEntry> = state.into();
346
347 let state: Vec<pb::SignerStateEntry> = state
349 .into_iter()
350 .map(|s| pb::SignerStateEntry {
351 key: s.key,
352 version: s.version,
353 value: s.value,
354 })
355 .collect();
356
357 req.request.signer_state = state.into();
358 req.request.requests = ctx.snapshot().await.into_iter().map(|r| r.into()).collect();
359
360 let serialized_configure_request = SERIALIZED_CONFIGURE_REQUEST.lock().await;
361
362 match &(*serialized_configure_request) {
363 Some(serialized_configure_request) => {
364 let configure_request = serde_json::from_str::<crate::context::Request>(
365 serialized_configure_request,
366 )
367 .unwrap();
368 req.request.requests.push(configure_request.into());
369 }
370 None => {}
371 }
372
373 debug!(
374 "Sending signer requests with {} requests and {} state entries",
375 req.request.requests.len(),
376 req.request.signer_state.len()
377 );
378
379 if let Err(e) = tx.send(Ok(req.request)).await {
380 warn!("Error streaming request {:?} to hsm_id={}", e, hsm_id);
381 break;
382 }
383 }
384 info!("Signer hsm_id={} exited", hsm_id);
385 SIGNER_COUNT.fetch_sub(1, Ordering::SeqCst);
386 });
387
388 trace!("Returning stream_hsm_request channel");
389 Ok(Response::new(ReceiverStream::new(rx)))
390 }
391
392 async fn respond_hsm_request(
393 &self,
394 request: Request<pb::HsmResponse>,
395 ) -> Result<Response<pb::Empty>, Status> {
396 let req = request.into_inner();
397
398 if req.error != "" {
399 log::error!("Signer reports an error: {}", req.error);
400 log::warn!("The above error was returned instead of a response.");
401 return Ok(Response::new(pb::Empty::default()));
402 }
403
404 let signer_state: Vec<gl_client::pb::SignerStateEntry> = req
408 .signer_state
409 .iter()
410 .map(|i| gl_client::pb::SignerStateEntry {
411 key: i.key.to_owned(),
412 value: i.value.to_owned(),
413 version: i.version,
414 })
415 .collect();
416 let new_state: gl_client::persist::State = signer_state.into();
417
418 let mut state = self.signer_state.lock().await;
420 state.merge(&new_state).map_err(|e| {
421 Status::new(
422 Code::Internal,
423 format!("Error updating internal state: {e}"),
424 )
425 })?;
426
427 let store = self.signer_state_store.lock().await;
429 if let Err(e) = store.write(state.clone()).await {
430 log::warn!(
431 "The returned state could not be stored. Ignoring response for request_id={}, error={:?}",
432 req.request_id, e
433 );
434 return Ok(Response::new(pb::Empty::default()));
439 }
440
441 if let Err(e) = self.stage.respond(req).await {
442 warn!("Suppressing error: {:?}", e);
443 }
444 Ok(Response::new(pb::Empty::default()))
445 }
446
447 type StreamIncomingStream = ReceiverStream<Result<pb::IncomingPayment, Status>>;
448
449 async fn stream_incoming(
450 &self,
451 _req: tonic::Request<pb::StreamIncomingFilter>,
452 ) -> Result<Response<Self::StreamIncomingStream>, Status> {
453 let (tx, rx) = mpsc::channel(1);
456 let mut bcast = self.events.subscribe();
457 tokio::spawn(async move {
458 while let Ok(p) = bcast.recv().await {
459 match p {
460 super::Event::IncomingPayment(p) => {
461 let _ = tx.send(Ok(p)).await;
462 }
463 _ => {}
464 }
465 }
466 });
467
468 return Ok(Response::new(ReceiverStream::new(rx)));
469 }
470
471 async fn configure(
472 &self,
473 req: tonic::Request<pb::GlConfig>,
474 ) -> Result<Response<pb::Empty>, Status> {
475 self.limit().await;
476 let gl_config = req.into_inner();
477 let rpc = self.get_rpc().await;
478
479 let res: Result<crate::responses::GetInfo, crate::rpc::Error> =
480 rpc.call("getinfo", json!({})).await;
481
482 let network = match res {
483 Ok(get_info_response) => match get_info_response.network.parse() {
484 Ok(v) => v,
485 Err(_) => Err(Status::new(
486 Code::Unknown,
487 format!("Failed to parse 'network' from 'getinfo' response"),
488 ))?,
489 },
490 Err(e) => {
491 return Err(Status::new(
492 Code::Unknown,
493 format!("Failed to retrieve a response from 'getinfo' while setting the node's configuration: {}", e),
494 ));
495 }
496 };
497
498 match bitcoin::Address::from_str(&gl_config.close_to_addr) {
499 Ok(address) => {
500 if address.network != network {
501 return Err(Status::new(
502 Code::Unknown,
503 format!(
504 "Network mismatch: \
505 Expected an address for {} but received an address for {}",
506 network, address.network
507 ),
508 ));
509 }
510 }
511 Err(e) => {
512 return Err(Status::new(
513 Code::Unknown,
514 format!(
515 "The address {} is not valid: {}",
516 gl_config.close_to_addr, e
517 ),
518 ));
519 }
520 }
521
522 let requests: Vec<crate::context::Request> = self
523 .ctx
524 .snapshot()
525 .await
526 .into_iter()
527 .map(|r| r.into())
528 .collect();
529 let serialized_req = serde_json::to_string(&requests[0]).unwrap();
530 let datastore_res: Result<
531 crate::cln_rpc::model::responses::DatastoreResponse,
532 crate::rpc::Error,
533 > = rpc
534 .call(
535 "datastore",
536 json!({
537 "key": vec![
538 "glconf".to_string(),
539 "request".to_string(),
540 ],
541 "string": serialized_req,
542 }),
543 )
544 .await;
545
546 match datastore_res {
547 Ok(_) => {
548 let mut cached_gl_config = SERIALIZED_CONFIGURE_REQUEST.lock().await;
549 *cached_gl_config = Some(serialized_req);
550
551 Ok(Response::new(pb::Empty::default()))
552 }
553 Err(e) => {
554 return Err(Status::new(
555 Code::Unknown,
556 format!(
557 "Failed to store the raw configure request in the datastore: {}",
558 e
559 ),
560 ))
561 }
562 }
563 }
564
565 async fn trampoline_pay(
566 &self,
567 r: tonic::Request<pb::TrampolinePayRequest>,
568 ) -> Result<tonic::Response<pb::TrampolinePayResponse>, Status> {
569 match tramp::trampolinepay(r.into_inner(), self.rpc_path.clone())
570 .await
571 .map(|res| {
572 <cln_rpc::model::responses::PayResponse as Into<cln_grpc::pb::PayResponse>>::into(
573 res,
574 )
575 }) {
576 Ok(res) => Ok(tonic::Response::new(pb::TrampolinePayResponse {
577 payment_preimage: res.payment_preimage,
578 payment_hash: res.payment_hash,
579 created_at: res.created_at,
580 parts: res.parts,
581 amount_msat: res.amount_msat.unwrap_or_default().msat,
582 amount_sent_msat: res.amount_sent_msat.unwrap_or_default().msat,
583 destination: res.destination.unwrap_or_default(),
584 })),
585 Err(e) => Err(Status::new(Code::Unknown, e.to_string())),
586 }
587 }
588}
589
590use cln_grpc::pb::node_server::NodeServer;
591
592impl PluginNodeServer {
593 pub async fn run(self) -> Result<()> {
594 let addr = self.grpc_binding.parse().unwrap();
595
596 let cln_node = NodeServer::new(
597 WrappedNodeServer::new(self.clone())
598 .await
599 .context("creating NodeServer instance")?,
600 );
601
602 let router = tonic::transport::Server::builder()
603 .max_frame_size(4 * 1024 * 1024) .tcp_keepalive(Some(tokio::time::Duration::from_secs(1)))
605 .tls_config(self.tls.clone())?
606 .layer(SignatureContextLayer {
607 ctx: self.ctx.clone(),
608 })
609 .add_service(RpcWaitService::new(cln_node, self.rpc_path.clone()))
610 .add_service(crate::pb::node_server::NodeServer::new(self.clone()));
611
612 router
613 .serve(addr)
614 .await
615 .context("grpc interface exited with error")
616 }
617
618 pub async fn reconnect_peers(&self) -> Result<(), Error> {
621 if SIGNER_COUNT.load(Ordering::SeqCst) < 1 {
622 use anyhow::anyhow;
623 return Err(anyhow!(
624 "Cannot reconnect peers, no signer to complete the handshake"
625 ));
626 }
627
628 log::info!("Reconnecting all peers (plugin)");
629 let mut rpc = cln_rpc::ClnRpc::new(self.rpc_path.clone()).await?;
630 let peers = self.get_reconnect_peers().await?;
631 log::info!(
632 "Found {} peers to reconnect: {:?} (plugin)",
633 peers.len(),
634 peers.iter().map(|p| p.id.clone())
635 );
636
637 for r in peers {
638 trace!("Calling connect: {:?} (plugin)", &r.id);
639 let res = rpc.call_typed(&r).await;
640 trace!("Connect returned: {:?} -> {:?} (plugin)", &r.id, res);
641
642 match res {
643 Ok(r) => info!("Connection to {} established: {:?} (plugin)", &r.id, r),
644 Err(e) => warn!("Could not connect to {}: {:?} (plugin)", &r.id, e),
645 }
646 }
647 return Ok(());
648 }
649
650 async fn get_reconnect_peers(
651 &self,
652 ) -> Result<Vec<cln_rpc::model::requests::ConnectRequest>, Error> {
653 let rpc_path = self.rpc_path.clone();
654 let mut rpc = cln_rpc::ClnRpc::new(rpc_path).await?;
655 let peers = rpc
656 .call_typed(&cln_rpc::model::requests::ListpeersRequest {
657 id: None,
658 level: None,
659 })
660 .await?;
661
662 let mut requests: Vec<cln_rpc::model::requests::ConnectRequest> = peers
663 .peers
664 .iter()
665 .filter(|&p| p.connected)
666 .map(|p| cln_rpc::model::requests::ConnectRequest {
667 id: p.id.to_string(),
668 host: None,
669 port: None,
670 })
671 .collect();
672
673 let mut dspeers: Vec<cln_rpc::model::requests::ConnectRequest> = rpc
674 .call_typed(&cln_rpc::model::requests::ListdatastoreRequest {
675 key: Some(vec!["greenlight".to_string(), "peerlist".to_string()]),
676 })
677 .await?
678 .datastore
679 .iter()
680 .map(|x| {
681 let mut s = x.string.clone().unwrap();
685 s = s.replace('\\', "");
686 serde_json::from_str::<messages::Peer>(&s).unwrap()
687 })
688 .map(|x| cln_rpc::model::requests::ConnectRequest {
689 id: x.id,
690 host: Some(x.addr),
691 port: None,
692 })
693 .collect();
694
695 requests.append(&mut dspeers);
697 requests.sort_by(|a, b| a.id.cmp(&b.id));
698 requests.dedup_by(|a, b| a.id.eq(&b.id));
699
700 Ok(requests)
701 }
702}
703
704use tower::{Layer, Service};
705
706#[derive(Debug, Clone)]
707pub struct SignatureContextLayer {
708 ctx: crate::context::Context,
709}
710
711impl SignatureContextLayer {
712 pub fn new(context: crate::context::Context) -> Self {
713 SignatureContextLayer { ctx: context }
714 }
715}
716
717impl<S> Layer<S> for SignatureContextLayer {
718 type Service = SignatureContextService<S>;
719
720 fn layer(&self, service: S) -> Self::Service {
721 SignatureContextService {
722 inner: service,
723 ctx: self.ctx.clone(),
724 }
725 }
726}
727
728const MAX_MESSAGE_SIZE: usize = 4000000;
730
731#[derive(Debug, Clone)]
732pub struct SignatureContextService<S> {
733 inner: S,
734 ctx: crate::context::Context,
735}
736
737impl<S> Service<hyper::Request<hyper::Body>> for SignatureContextService<S>
738where
739 S: Service<hyper::Request<hyper::Body>, Response = hyper::Response<tonic::body::BoxBody>>
740 + Clone
741 + Send
742 + 'static,
743 S::Future: Send + 'static,
744 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
745{
746 type Response = S::Response;
747 type Error = Box<dyn std::error::Error + Send + Sync>;
748 type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
749
750 fn poll_ready(
751 &mut self,
752 cx: &mut std::task::Context<'_>,
753 ) -> std::task::Poll<Result<(), Self::Error>> {
754 self.inner.poll_ready(cx).map_err(Into::into)
755 }
756
757 fn call(&mut self, request: hyper::Request<hyper::Body>) -> Self::Future {
758 let clone = self.inner.clone();
762 let mut inner = std::mem::replace(&mut self.inner, clone);
763 let reqctx = self.ctx.clone();
764
765 Box::pin(async move {
766 use tonic::codegen::Body;
767 let (parts, mut body) = request.into_parts();
768
769 let uri = parts.uri.path_and_query().unwrap();
770 let _ = RPC_BCAST
771 .clone()
772 .send(super::Event::RpcCall(uri.to_string()));
773
774 let pubkey = parts
775 .headers
776 .get("glauthpubkey")
777 .and_then(|k| general_purpose::STANDARD_NO_PAD.decode(k).ok());
778
779 let sig = parts
780 .headers
781 .get("glauthsig")
782 .and_then(|k| general_purpose::STANDARD_NO_PAD.decode(k).ok());
783
784 use bytes::Buf;
785 let timestamp: Option<u64> = parts
786 .headers
787 .get("glts")
788 .and_then(|k| general_purpose::STANDARD_NO_PAD.decode(k).ok())
789 .map(|s| s.as_slice().get_u64());
790
791 let rune = parts
792 .headers
793 .get("glrune")
794 .and_then(|k| general_purpose::URL_SAFE.decode(k).ok());
795
796 if let (Some(pk), Some(sig), Some(rune)) = (pubkey, sig, rune) {
797 let mut buf = Vec::new();
800 while let Some(chunk) = body.data().await {
801 let chunk = chunk.unwrap();
802 if buf.len() + chunk.len() > MAX_MESSAGE_SIZE {
805 debug!("Message {} exceeds size limit", uri.path().to_string());
806 return Ok(tonic::Status::new(
807 tonic::Code::InvalidArgument,
808 format!("payload too large"),
809 )
810 .to_http());
811 }
812 buf.put(chunk);
813 }
814
815 trace!(
816 "Got a request for {} with pubkey={}, sig={}, rune={} and body size={:?}",
817 uri,
818 hex::encode(&pk),
819 hex::encode(&sig),
820 hex::encode(&rune),
821 &buf.len(),
822 );
823 let req = crate::context::Request::new(
824 uri.to_string(),
825 <bytes::Bytes>::from(buf.clone()),
826 pk,
827 sig,
828 timestamp,
829 rune,
830 );
831
832 reqctx.add_request(req.clone()).await;
833
834 let body: hyper::Body = buf.into();
835 let request = hyper::Request::from_parts(parts, body);
836 let res = inner.call(request).await;
837
838 tokio::spawn(async move {
843 reqctx.remove_request(req).await;
844 });
845 res.map_err(Into::into)
846 } else {
847 let request = hyper::Request::from_parts(parts, body);
850 inner.call(request).await.map_err(Into::into)
851 }
852 })
853 }
854}
855
856mod rpcwait;
857pub use rpcwait::RpcWaitService;