bs_gl_client/node/
mod.rs

1use crate::credentials::{RuneProvider, TlsConfigProvider};
2use crate::pb::cln::node_client as cln_client;
3use crate::pb::node_client::NodeClient;
4use crate::pb::scheduler::{scheduler_client::SchedulerClient, ScheduleRequest};
5use crate::tls::TlsConfig;
6use crate::utils;
7use anyhow::{anyhow, Result};
8use log::{debug, info, trace};
9use tonic::transport::{Channel, Uri};
10use tower::ServiceBuilder;
11
12/// A client to the remotely running node on the greenlight
13/// infrastructure. It is configured to authenticate itself with the
14/// device mTLS keypair and will sign outgoing requests with the same
15/// mTLS keypair.
16pub type Client = NodeClient<service::AuthService>;
17
18pub type GClient = GenericClient<service::AuthService>;
19
20pub type ClnClient = cln_client::NodeClient<service::AuthService>;
21
22pub trait GrpcClient {
23    fn new_with_inner(inner: service::AuthService) -> Self;
24}
25
26/// A builder to configure a [`Client`] that can either connect to a
27/// node directly, assuming you have the `grpc_uri` that the node is
28/// listening on, or it can talk to the
29/// [`crate::scheduler::Scheduler`] to schedule the node and configure
30/// the [`Client`] accordingly.
31#[allow(dead_code)]
32#[derive(Clone)]
33pub struct Node {
34    node_id: Vec<u8>,
35    tls: TlsConfig,
36    rune: String,
37}
38
39impl GrpcClient for Client {
40    fn new_with_inner(inner: service::AuthService) -> Self {
41        Client::new(inner)
42    }
43}
44
45impl GrpcClient for GClient {
46    fn new_with_inner(inner: service::AuthService) -> Self {
47        GenericClient::new(inner)
48    }
49}
50
51impl GrpcClient for ClnClient {
52    fn new_with_inner(inner: service::AuthService) -> Self {
53        ClnClient::new(inner)
54    }
55}
56
57impl Node {
58    pub fn new<Creds>(node_id: Vec<u8>, creds: Creds) -> Result<Node>
59    where
60        Creds: TlsConfigProvider + RuneProvider,
61    {
62        let tls = creds.tls_config();
63        let rune = creds.rune();
64        Ok(Node {
65            node_id,
66            tls,
67            rune,
68        })
69    }
70
71    pub async fn connect<C>(&self, node_uri: String) -> Result<C>
72    where
73        C: GrpcClient,
74    {
75        let node_uri = Uri::from_maybe_shared(node_uri)?;
76        info!("Connecting to node at {}", node_uri);
77
78        // If this is not yet a node-domain address we need to also
79        // accept "localhost" as domain name from the certificate.
80        let host = node_uri.host().unwrap();
81        let tls = if host.starts_with("gl") {
82            trace!(
83                "Using real hostname {}, expecting the node to have a matching certificate",
84                host
85            );
86            self.tls.clone()
87        } else {
88            trace!(
89                "Overriding hostname, since this is not a gl node domain: {}",
90                host
91            );
92            let mut tls = self.tls.clone();
93            tls.inner = tls.inner.domain_name("localhost");
94            tls
95        };
96
97        let layer = match tls.private_key {
98            Some(k) => service::AuthLayer::new(k, self.rune.clone())?,
99            None => {
100                return Err(anyhow!(
101                    "Cannot connect a node::Client without first configuring its identity"
102                ))
103            }
104        };
105
106        let chan = tonic::transport::Endpoint::from_shared(node_uri.to_string())?
107            .tls_config(tls.inner)?
108            .tcp_keepalive(Some(crate::TCP_KEEPALIVE))
109            .http2_keep_alive_interval(crate::TCP_KEEPALIVE)
110            .keep_alive_timeout(crate::TCP_KEEPALIVE_TIMEOUT)
111            .keep_alive_while_idle(true)
112            .connect_lazy();
113        let chan = ServiceBuilder::new().layer(layer).service(chan);
114
115        Ok(C::new_with_inner(chan))
116    }
117
118    pub async fn schedule_with_uri<C>(self, scheduler_uri: String) -> Result<C>
119    where
120        C: GrpcClient,
121    {
122        debug!(
123            "Contacting scheduler at {} to get the node address",
124            scheduler_uri
125        );
126
127        let channel = Channel::from_shared(scheduler_uri)?
128            .tls_config(self.tls.inner.clone())?
129            .connect()
130            .await?;
131        let mut scheduler = SchedulerClient::new(channel);
132
133        let node_info = scheduler
134            .schedule(ScheduleRequest {
135                node_id: self.node_id.clone(),
136            })
137            .await
138            .map(|v| v.into_inner())?;
139
140        debug!("Node scheduled at {}", node_info.grpc_uri);
141
142        self.connect(node_info.grpc_uri).await
143    }
144
145    pub async fn schedule<C>(self) -> Result<C>
146    where
147        C: GrpcClient,
148    {
149        let uri = utils::scheduler_uri();
150        self.schedule_with_uri(uri).await
151    }
152}
153
154mod generic;
155mod service;
156pub use generic::GenericClient;
157
158mod stasher {
159    use bytes::Bytes;
160    use http::HeaderMap;
161    use http_body::Body;
162    use pin_project::pin_project;
163    use std::{
164        pin::Pin,
165        task::{Context, Poll},
166    };
167    use tonic::body::BoxBody;
168    use tonic::Status;
169
170    #[pin_project]
171    #[derive(Debug)]
172    pub(crate) struct StashBody {
173        value: Option<Bytes>,
174    }
175
176    impl StashBody {
177        pub(crate) fn new(val: Bytes) -> Self {
178            Self { value: Some(val) }
179        }
180    }
181
182    impl Body for StashBody {
183        type Data = Bytes;
184        type Error = Status;
185
186        fn is_end_stream(&self) -> bool {
187            self.value.is_none()
188        }
189
190        fn poll_data(
191            self: Pin<&mut Self>,
192            _cx: &mut Context<'_>,
193        ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
194            Poll::Ready(self.project().value.take().map(Ok))
195        }
196
197        fn poll_trailers(
198            self: Pin<&mut Self>,
199            _cx: &mut Context<'_>,
200        ) -> Poll<Result<Option<HeaderMap>, Status>> {
201            Poll::Ready(Ok(None))
202        }
203    }
204
205    impl From<StashBody> for BoxBody {
206        fn from(v: StashBody) -> BoxBody {
207            BoxBody::new(v)
208        }
209    }
210}