1use crate::credentials::{RuneProvider, TlsConfigProvider};
2use crate::pb::cln::node_client as cln_client;
3use crate::pb::node_client::NodeClient;
4use crate::pb::scheduler::{scheduler_client::SchedulerClient, ScheduleRequest};
5use crate::tls::TlsConfig;
6use crate::utils;
7use anyhow::{anyhow, Result};
8use log::{debug, info, trace};
9use tonic::transport::{Channel, Uri};
10use tower::ServiceBuilder;
11
12pub type Client = NodeClient<service::AuthService>;
17
18pub type GClient = GenericClient<service::AuthService>;
19
20pub type ClnClient = cln_client::NodeClient<service::AuthService>;
21
22pub trait GrpcClient {
23 fn new_with_inner(inner: service::AuthService) -> Self;
24}
25
26#[allow(dead_code)]
32#[derive(Clone)]
33pub struct Node {
34 node_id: Vec<u8>,
35 tls: TlsConfig,
36 rune: String,
37}
38
39impl GrpcClient for Client {
40 fn new_with_inner(inner: service::AuthService) -> Self {
41 Client::new(inner)
42 }
43}
44
45impl GrpcClient for GClient {
46 fn new_with_inner(inner: service::AuthService) -> Self {
47 GenericClient::new(inner)
48 }
49}
50
51impl GrpcClient for ClnClient {
52 fn new_with_inner(inner: service::AuthService) -> Self {
53 ClnClient::new(inner)
54 }
55}
56
57impl Node {
58 pub fn new<Creds>(node_id: Vec<u8>, creds: Creds) -> Result<Node>
59 where
60 Creds: TlsConfigProvider + RuneProvider,
61 {
62 let tls = creds.tls_config();
63 let rune = creds.rune();
64 Ok(Node {
65 node_id,
66 tls,
67 rune,
68 })
69 }
70
71 pub async fn connect<C>(&self, node_uri: String) -> Result<C>
72 where
73 C: GrpcClient,
74 {
75 let node_uri = Uri::from_maybe_shared(node_uri)?;
76 info!("Connecting to node at {}", node_uri);
77
78 let host = node_uri.host().unwrap();
81 let tls = if host.starts_with("gl") {
82 trace!(
83 "Using real hostname {}, expecting the node to have a matching certificate",
84 host
85 );
86 self.tls.clone()
87 } else {
88 trace!(
89 "Overriding hostname, since this is not a gl node domain: {}",
90 host
91 );
92 let mut tls = self.tls.clone();
93 tls.inner = tls.inner.domain_name("localhost");
94 tls
95 };
96
97 let layer = match tls.private_key {
98 Some(k) => service::AuthLayer::new(k, self.rune.clone())?,
99 None => {
100 return Err(anyhow!(
101 "Cannot connect a node::Client without first configuring its identity"
102 ))
103 }
104 };
105
106 let chan = tonic::transport::Endpoint::from_shared(node_uri.to_string())?
107 .tls_config(tls.inner)?
108 .tcp_keepalive(Some(crate::TCP_KEEPALIVE))
109 .http2_keep_alive_interval(crate::TCP_KEEPALIVE)
110 .keep_alive_timeout(crate::TCP_KEEPALIVE_TIMEOUT)
111 .keep_alive_while_idle(true)
112 .connect_lazy();
113 let chan = ServiceBuilder::new().layer(layer).service(chan);
114
115 Ok(C::new_with_inner(chan))
116 }
117
118 pub async fn schedule_with_uri<C>(self, scheduler_uri: String) -> Result<C>
119 where
120 C: GrpcClient,
121 {
122 debug!(
123 "Contacting scheduler at {} to get the node address",
124 scheduler_uri
125 );
126
127 let channel = Channel::from_shared(scheduler_uri)?
128 .tls_config(self.tls.inner.clone())?
129 .connect()
130 .await?;
131 let mut scheduler = SchedulerClient::new(channel);
132
133 let node_info = scheduler
134 .schedule(ScheduleRequest {
135 node_id: self.node_id.clone(),
136 })
137 .await
138 .map(|v| v.into_inner())?;
139
140 debug!("Node scheduled at {}", node_info.grpc_uri);
141
142 self.connect(node_info.grpc_uri).await
143 }
144
145 pub async fn schedule<C>(self) -> Result<C>
146 where
147 C: GrpcClient,
148 {
149 let uri = utils::scheduler_uri();
150 self.schedule_with_uri(uri).await
151 }
152}
153
154mod generic;
155mod service;
156pub use generic::GenericClient;
157
158mod stasher {
159 use bytes::Bytes;
160 use http::HeaderMap;
161 use http_body::Body;
162 use pin_project::pin_project;
163 use std::{
164 pin::Pin,
165 task::{Context, Poll},
166 };
167 use tonic::body::BoxBody;
168 use tonic::Status;
169
170 #[pin_project]
171 #[derive(Debug)]
172 pub(crate) struct StashBody {
173 value: Option<Bytes>,
174 }
175
176 impl StashBody {
177 pub(crate) fn new(val: Bytes) -> Self {
178 Self { value: Some(val) }
179 }
180 }
181
182 impl Body for StashBody {
183 type Data = Bytes;
184 type Error = Status;
185
186 fn is_end_stream(&self) -> bool {
187 self.value.is_none()
188 }
189
190 fn poll_data(
191 self: Pin<&mut Self>,
192 _cx: &mut Context<'_>,
193 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
194 Poll::Ready(self.project().value.take().map(Ok))
195 }
196
197 fn poll_trailers(
198 self: Pin<&mut Self>,
199 _cx: &mut Context<'_>,
200 ) -> Poll<Result<Option<HeaderMap>, Status>> {
201 Poll::Ready(Ok(None))
202 }
203 }
204
205 impl From<StashBody> for BoxBody {
206 fn from(v: StashBody) -> BoxBody {
207 BoxBody::new(v)
208 }
209 }
210}