1use crate::credentials::{RuneProvider, TlsConfigProvider};
2use crate::pb::cln::node_client as cln_client;
3use crate::pb::node_client::NodeClient;
4use crate::pb::scheduler::{scheduler_client::SchedulerClient, ScheduleRequest};
5use crate::tls::TlsConfig;
6use crate::utils;
7use anyhow::{anyhow, Result};
8use log::{debug, info, trace};
9use tonic::transport::{Channel, Uri};
10use tower::ServiceBuilder;
11
12const DEFAULT_MAX_DECODING_MESSAGE_SIZE: usize = 32 * 1024 * 1024; pub type Client = NodeClient<service::AuthService>;
19
20pub type GClient = GenericClient<service::AuthService>;
21
22pub type ClnClient = cln_client::NodeClient<service::AuthService>;
23
24pub trait GrpcClient {
25 fn new_with_inner(inner: service::AuthService, max_decoding_message_size: usize) -> Self;
26}
27
28#[allow(dead_code)]
34#[derive(Clone)]
35pub struct Node {
36 node_id: Vec<u8>,
37 tls: TlsConfig,
38 rune: String,
39 max_decoding_message_size: Option<usize>,
40}
41
42impl GrpcClient for Client {
43 fn new_with_inner(inner: service::AuthService, max_decoding_message_size: usize) -> Self {
44 Client::new(inner).max_decoding_message_size(max_decoding_message_size)
45 }
46}
47
48impl GrpcClient for GClient {
49 fn new_with_inner(inner: service::AuthService, max_decoding_message_size: usize) -> Self {
50 GenericClient::new(inner).max_decoding_message_size(max_decoding_message_size)
51 }
52}
53
54impl GrpcClient for ClnClient {
55 fn new_with_inner(inner: service::AuthService, max_decoding_message_size: usize) -> Self {
56 ClnClient::new(inner).max_decoding_message_size(max_decoding_message_size)
57 }
58}
59
60impl Node {
61 pub fn new<Creds>(node_id: Vec<u8>, creds: Creds) -> Result<Node>
62 where
63 Creds: TlsConfigProvider + RuneProvider,
64 {
65 let tls = creds.tls_config();
66 let rune = creds.rune();
67 Ok(Node {
68 node_id,
69 tls,
70 rune,
71 max_decoding_message_size: None,
72 })
73 }
74
75 pub fn with_max_decoding_message_size(mut self, size: usize) -> Self {
76 self.max_decoding_message_size = Some(size);
77 self
78 }
79
80 pub async fn connect<C>(&self, node_uri: String) -> Result<C>
81 where
82 C: GrpcClient,
83 {
84 let node_uri = Uri::from_maybe_shared(node_uri)?;
85 info!("Connecting to node at {}", node_uri);
86
87 let host = node_uri.host().unwrap();
90 let tls = if host.starts_with("gl") {
91 trace!(
92 "Using real hostname {}, expecting the node to have a matching certificate",
93 host
94 );
95 self.tls.clone()
96 } else {
97 trace!(
98 "Overriding hostname, since this is not a gl node domain: {}",
99 host
100 );
101 let mut tls = self.tls.clone();
102 tls.inner = tls.inner.domain_name("localhost");
103 tls
104 };
105
106 let layer = match tls.private_key {
107 Some(k) => service::AuthLayer::new(k, self.rune.clone())?,
108 None => {
109 return Err(anyhow!(
110 "Cannot connect a node::Client without first configuring its identity"
111 ))
112 }
113 };
114
115 let chan = tonic::transport::Endpoint::from_shared(node_uri.to_string())?
116 .tls_config(tls.inner)?
117 .tcp_keepalive(Some(crate::TCP_KEEPALIVE))
118 .http2_keep_alive_interval(crate::TCP_KEEPALIVE)
119 .keep_alive_timeout(crate::TCP_KEEPALIVE_TIMEOUT)
120 .keep_alive_while_idle(true)
121 .connect_lazy();
122 let chan = ServiceBuilder::new().layer(layer).service(chan);
123
124 let size = self
125 .max_decoding_message_size
126 .unwrap_or(DEFAULT_MAX_DECODING_MESSAGE_SIZE);
127
128 Ok(C::new_with_inner(chan, size))
129 }
130
131 pub async fn schedule_with_uri<C>(self, scheduler_uri: String) -> Result<C>
132 where
133 C: GrpcClient,
134 {
135 debug!(
136 "Contacting scheduler at {} to get the node address",
137 scheduler_uri
138 );
139
140 let channel = Channel::from_shared(scheduler_uri)?
141 .tls_config(self.tls.inner.clone())?
142 .connect()
143 .await?;
144 let mut scheduler = SchedulerClient::new(channel);
145
146 let node_info = scheduler
147 .schedule(ScheduleRequest {
148 node_id: self.node_id.clone(),
149 })
150 .await
151 .map(|v| v.into_inner())?;
152
153 debug!("Node scheduled at {}", node_info.grpc_uri);
154
155 self.connect(node_info.grpc_uri).await
156 }
157
158 pub async fn schedule<C>(self) -> Result<C>
159 where
160 C: GrpcClient,
161 {
162 let uri = utils::scheduler_uri();
163 self.schedule_with_uri(uri).await
164 }
165}
166
167mod generic;
168pub mod service;
169pub use generic::GenericClient;
170
171mod stasher {
172 use bytes::Bytes;
173 use http::HeaderMap;
174 use http_body::Body;
175 use pin_project::pin_project;
176 use std::{
177 pin::Pin,
178 task::{Context, Poll},
179 };
180 use tonic::body::BoxBody;
181 use tonic::Status;
182
183 #[pin_project]
184 #[derive(Debug)]
185 pub(crate) struct StashBody {
186 value: Option<Bytes>,
187 }
188
189 impl StashBody {
190 pub(crate) fn new(val: Bytes) -> Self {
191 Self { value: Some(val) }
192 }
193 }
194
195 impl Body for StashBody {
196 type Data = Bytes;
197 type Error = Status;
198
199 fn is_end_stream(&self) -> bool {
200 self.value.is_none()
201 }
202
203 fn poll_data(
204 self: Pin<&mut Self>,
205 _cx: &mut Context<'_>,
206 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
207 Poll::Ready(self.project().value.take().map(Ok))
208 }
209
210 fn poll_trailers(
211 self: Pin<&mut Self>,
212 _cx: &mut Context<'_>,
213 ) -> Poll<Result<Option<HeaderMap>, Status>> {
214 Poll::Ready(Ok(None))
215 }
216 }
217
218 impl From<StashBody> for BoxBody {
219 fn from(v: StashBody) -> BoxBody {
220 BoxBody::new(v)
221 }
222 }
223}