gl_client/node/
mod.rs

1use crate::credentials::{RuneProvider, TlsConfigProvider};
2use crate::pb::cln::node_client as cln_client;
3use crate::pb::node_client::NodeClient;
4use crate::pb::scheduler::{scheduler_client::SchedulerClient, ScheduleRequest};
5use crate::tls::TlsConfig;
6use crate::utils;
7use anyhow::{anyhow, Result};
8use log::{debug, info, trace};
9use tonic::transport::{Channel, Uri};
10use tower::ServiceBuilder;
11
12const DEFAULT_MAX_DECODING_MESSAGE_SIZE: usize = 32 * 1024 * 1024; // 32 MiB
13
14/// A client to the remotely running node on the greenlight
15/// infrastructure. It is configured to authenticate itself with the
16/// device mTLS keypair and will sign outgoing requests with the same
17/// mTLS keypair.
18pub type Client = NodeClient<service::AuthService>;
19
20pub type GClient = GenericClient<service::AuthService>;
21
22pub type ClnClient = cln_client::NodeClient<service::AuthService>;
23
24pub trait GrpcClient {
25    fn new_with_inner(inner: service::AuthService, max_decoding_message_size: usize) -> Self;
26}
27
28/// A builder to configure a [`Client`] that can either connect to a
29/// node directly, assuming you have the `grpc_uri` that the node is
30/// listening on, or it can talk to the
31/// [`crate::scheduler::Scheduler`] to schedule the node and configure
32/// the [`Client`] accordingly.
33#[allow(dead_code)]
34#[derive(Clone)]
35pub struct Node {
36    node_id: Vec<u8>,
37    tls: TlsConfig,
38    rune: String,
39    max_decoding_message_size: Option<usize>,
40}
41
42impl GrpcClient for Client {
43    fn new_with_inner(inner: service::AuthService, max_decoding_message_size: usize) -> Self {
44        Client::new(inner).max_decoding_message_size(max_decoding_message_size)
45    }
46}
47
48impl GrpcClient for GClient {
49    fn new_with_inner(inner: service::AuthService, max_decoding_message_size: usize) -> Self {
50        GenericClient::new(inner).max_decoding_message_size(max_decoding_message_size)
51    }
52}
53
54impl GrpcClient for ClnClient {
55    fn new_with_inner(inner: service::AuthService, max_decoding_message_size: usize) -> Self {
56        ClnClient::new(inner).max_decoding_message_size(max_decoding_message_size)
57    }
58}
59
60impl Node {
61    pub fn new<Creds>(node_id: Vec<u8>, creds: Creds) -> Result<Node>
62    where
63        Creds: TlsConfigProvider + RuneProvider,
64    {
65        let tls = creds.tls_config();
66        let rune = creds.rune();
67        Ok(Node {
68            node_id,
69            tls,
70            rune,
71            max_decoding_message_size: None,
72        })
73    }
74
75    pub fn with_max_decoding_message_size(mut self, size: usize) -> Self {
76        self.max_decoding_message_size = Some(size);
77        self
78    }
79
80    pub async fn connect<C>(&self, node_uri: String) -> Result<C>
81    where
82        C: GrpcClient,
83    {
84        let node_uri = Uri::from_maybe_shared(node_uri)?;
85        info!("Connecting to node at {}", node_uri);
86
87        // If this is not yet a node-domain address we need to also
88        // accept "localhost" as domain name from the certificate.
89        let host = node_uri.host().unwrap();
90        let tls = if host.starts_with("gl") {
91            trace!(
92                "Using real hostname {}, expecting the node to have a matching certificate",
93                host
94            );
95            self.tls.clone()
96        } else {
97            trace!(
98                "Overriding hostname, since this is not a gl node domain: {}",
99                host
100            );
101            let mut tls = self.tls.clone();
102            tls.inner = tls.inner.domain_name("localhost");
103            tls
104        };
105
106        let layer = match tls.private_key {
107            Some(k) => service::AuthLayer::new(k, self.rune.clone())?,
108            None => {
109                return Err(anyhow!(
110                    "Cannot connect a node::Client without first configuring its identity"
111                ))
112            }
113        };
114
115        let chan = tonic::transport::Endpoint::from_shared(node_uri.to_string())?
116            .tls_config(tls.inner)?
117            .tcp_keepalive(Some(crate::TCP_KEEPALIVE))
118            .http2_keep_alive_interval(crate::TCP_KEEPALIVE)
119            .keep_alive_timeout(crate::TCP_KEEPALIVE_TIMEOUT)
120            .keep_alive_while_idle(true)
121            .connect_lazy();
122        let chan = ServiceBuilder::new().layer(layer).service(chan);
123
124        let size = self
125            .max_decoding_message_size
126            .unwrap_or(DEFAULT_MAX_DECODING_MESSAGE_SIZE);
127
128        Ok(C::new_with_inner(chan, size))
129    }
130
131    pub async fn schedule_with_uri<C>(self, scheduler_uri: String) -> Result<C>
132    where
133        C: GrpcClient,
134    {
135        debug!(
136            "Contacting scheduler at {} to get the node address",
137            scheduler_uri
138        );
139
140        let channel = Channel::from_shared(scheduler_uri)?
141            .tls_config(self.tls.inner.clone())?
142            .connect()
143            .await?;
144        let mut scheduler = SchedulerClient::new(channel);
145
146        let node_info = scheduler
147            .schedule(ScheduleRequest {
148                node_id: self.node_id.clone(),
149            })
150            .await
151            .map(|v| v.into_inner())?;
152
153        debug!("Node scheduled at {}", node_info.grpc_uri);
154
155        self.connect(node_info.grpc_uri).await
156    }
157
158    pub async fn schedule<C>(self) -> Result<C>
159    where
160        C: GrpcClient,
161    {
162        let uri = utils::scheduler_uri();
163        self.schedule_with_uri(uri).await
164    }
165}
166
167mod generic;
168pub mod service;
169pub use generic::GenericClient;
170
171mod stasher {
172    use bytes::Bytes;
173    use http::HeaderMap;
174    use http_body::Body;
175    use pin_project::pin_project;
176    use std::{
177        pin::Pin,
178        task::{Context, Poll},
179    };
180    use tonic::body::BoxBody;
181    use tonic::Status;
182
183    #[pin_project]
184    #[derive(Debug)]
185    pub(crate) struct StashBody {
186        value: Option<Bytes>,
187    }
188
189    impl StashBody {
190        pub(crate) fn new(val: Bytes) -> Self {
191            Self { value: Some(val) }
192        }
193    }
194
195    impl Body for StashBody {
196        type Data = Bytes;
197        type Error = Status;
198
199        fn is_end_stream(&self) -> bool {
200            self.value.is_none()
201        }
202
203        fn poll_data(
204            self: Pin<&mut Self>,
205            _cx: &mut Context<'_>,
206        ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
207            Poll::Ready(self.project().value.take().map(Ok))
208        }
209
210        fn poll_trailers(
211            self: Pin<&mut Self>,
212            _cx: &mut Context<'_>,
213        ) -> Poll<Result<Option<HeaderMap>, Status>> {
214            Poll::Ready(Ok(None))
215        }
216    }
217
218    impl From<StashBody> for BoxBody {
219        fn from(v: StashBody) -> BoxBody {
220            BoxBody::new(v)
221        }
222    }
223}