bs_gl_plugin/
lib.rs

1use anyhow::Result;
2use cln_rpc;
3use log::{debug, warn};
4use rpc::LightningClient;
5use serde_json::json;
6use std::future::Future;
7use std::sync::Arc;
8use tokio::sync::broadcast;
9use tokio::sync::Mutex;
10
11pub mod config;
12pub mod hsm;
13mod lsp;
14pub mod messages;
15pub mod node;
16pub mod pb;
17pub mod requests;
18pub mod responses;
19pub mod rpc;
20pub mod stager;
21pub mod storage;
22#[cfg(unix)]
23mod unix;
24
25mod context;
26
27#[derive(Clone)]
28pub struct GlPlugin {
29    rpc: Arc<Mutex<LightningClient>>,
30    stage: Arc<stager::Stage>,
31    events: broadcast::Sender<Event>,
32}
33
34/// A small wrapper around [`cln_plugin::Builder`] that allows us to
35/// subscribe to events outside the plugin state itself, before
36/// getting configured.
37// TODO: Switch this out once the [`cln_plugin::Builder`] no longer
38// pre-binds state
39pub struct Builder {
40    inner: cln_plugin::Builder<GlPlugin, tokio::io::Stdin, tokio::io::Stdout>,
41    events: broadcast::Sender<Event>,
42    state: GlPlugin,
43}
44
45impl Builder {
46    pub fn subscribe_events(&self) -> broadcast::Receiver<Event> {
47        self.events.subscribe()
48    }
49    pub async fn start(self) -> Result<Option<Plugin>> {
50        self.inner.start(self.state).await
51    }
52
53    pub fn hook<C, F>(self, hookname: &str, callback: C) -> Self
54    where
55        C: Send + Sync + 'static,
56        C: Fn(cln_plugin::Plugin<GlPlugin>, serde_json::Value) -> F + 'static,
57        F: Future<Output = Result<serde_json::Value, anyhow::Error>> + Send + Sync + 'static,
58    {
59        Builder {
60            inner: self.inner.hook(hookname, callback),
61            ..self
62        }
63    }
64    pub fn subscribe<C, F>(self, hookname: &str, callback: C) -> Self
65    where
66        C: Send + Sync + 'static,
67        C: Fn(cln_plugin::Plugin<GlPlugin>, serde_json::Value) -> F + 'static,
68        F: Future<Output = Result<(), anyhow::Error>> + Send + Sync + 'static,
69    {
70        Builder {
71            inner: self.inner.subscribe(hookname, callback),
72            ..self
73        }
74    }
75
76    pub fn stage(&self) -> Arc<stager::Stage> {
77        self.state.stage.clone()
78    }
79}
80
81pub type Plugin = cln_plugin::Plugin<GlPlugin>;
82
83impl GlPlugin {
84    pub fn get_stage(&self) -> Arc<stager::Stage> {
85        self.stage.clone()
86    }
87}
88
89/// Initialize the plugin, but don't start it yet. Allows attaching
90/// additional methods, hooks, and subscriptions.
91pub async fn init(
92    stage: Arc<stager::Stage>,
93    events: tokio::sync::broadcast::Sender<Event>,
94) -> Result<Builder> {
95    let rpc = Arc::new(Mutex::new(LightningClient::new("lightning-rpc")));
96
97    let state = GlPlugin {
98        events: events.clone(),
99        rpc,
100        stage,
101    };
102
103    let inner = cln_plugin::Builder::new(tokio::io::stdin(), tokio::io::stdout())
104        .hook("htlc_accepted", lsp::on_htlc_accepted)
105        .hook("invoice_payment", on_invoice_payment)
106        .hook("peer_connected", on_peer_connected)
107        .hook("openchannel", on_openchannel)
108        .hook("custommsg", on_custommsg);
109    Ok(Builder {
110        state,
111        inner,
112        events,
113    })
114}
115
116async fn on_custommsg(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
117    let call: messages::Custommsg = serde_json::from_value(v).unwrap();
118    debug!("Received a custommsg {:?}", &call);
119
120    let msg = pb::Custommsg {
121        peer_id: hex::decode(call.peer_id).unwrap(),
122        payload: hex::decode(call.payload).unwrap(),
123    };
124
125    // Custommsg delivery is best effort, so don't use the Result<>.
126    if let Err(e) = plugin.state().events.clone().send(Event::CustomMsg(msg)) {
127        log::debug!("Error sending custommsg to listeners: {}", e);
128    }
129
130    Ok(json!({"result": "continue"}))
131}
132
133/// Notification handler that receives notifications on successful
134/// peer connections, then stores them into the `datastore` for future
135/// reference.
136async fn on_peer_connected(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
137    debug!("Got a successful peer connection: {:?}", v);
138    let call = serde_json::from_value::<messages::PeerConnectedCall>(v.clone()).unwrap();
139    let mut rpc = cln_rpc::ClnRpc::new(plugin.configuration().rpc_file).await?;
140    let req = cln_rpc::model::requests::DatastoreRequest {
141        key: vec![
142            "greenlight".to_string(),
143            "peerlist".to_string(),
144            call.peer.id.clone(),
145        ],
146        string: Some(serde_json::to_string(&call.peer).unwrap()),
147        hex: None,
148        mode: Some(cln_rpc::model::requests::DatastoreMode::CREATE_OR_REPLACE),
149        generation: None,
150    };
151
152    // We ignore the response and continue anyways.
153    let res = rpc.call_typed(&req).await;
154    debug!("Got datastore response: {:?}", res);
155    Ok(json!({"result": "continue"}))
156}
157
158async fn on_openchannel(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
159    debug!("Received an openchannel request: {:?}", v);
160    let mut rpc = cln_rpc::ClnRpc::new(plugin.configuration().rpc_file).await?;
161    
162    let req = cln_rpc::model::requests::ListdatastoreRequest{
163        key: Some(vec![
164            "glconf".to_string(),
165            "request".to_string(),
166        ])
167    };
168
169    let res = rpc.call_typed(&req).await;
170    debug!("ListdatastoreRequest response: {:?}", res);
171
172    match res {
173        Ok(res) => {
174            if !res.datastore.is_empty() {
175                match &res.datastore[0].string {
176                    Some(serialized_request) => {
177                        match _parse_gl_config_from_serialized_request(serialized_request.to_string()) {
178                            Some(gl_config) => {
179                                return Ok(json!({"result": "continue", "close_to":  gl_config.close_to_addr}));
180                            }
181                            None => {
182                                debug!("Failed to parse the GlConfig from the serialized request's payload");
183                            }  
184                        }
185                    }
186                    None => {
187                        debug!("Got empty response from datastore for key glconf.request");
188                    }
189                }
190            }
191
192            return Ok(json!({"result": "continue"}))
193        }
194        Err(e) => {
195            log::debug!("An error occurred while searching for a custom close_to address: {}", e);
196            Ok(json!({"result": "continue"}))
197        }
198    }
199}
200
201fn _parse_gl_config_from_serialized_request(request: String) -> Option<pb::GlConfig> {
202    use prost::Message;
203    let gl_conf_req: crate::context::Request = serde_json::from_str(&request).unwrap();
204    let gl_conf_req: crate::pb::PendingRequest = gl_conf_req.into();
205    let payload = &gl_conf_req.request[5..];
206    let glconfig = crate::pb::GlConfig::decode(payload);
207
208    match glconfig {
209        Ok(glconfig) => Some(glconfig),
210        Err(e) => {
211            debug!("Failed to parse glconfig from string: {:?}", e);
212            None
213        }
214    }
215}
216
217
218/// Notification handler that receives notifications on incoming
219/// payments, then looks up the invoice in the DB, and forwards the
220/// full information to the GRPC interface.
221async fn on_invoice_payment(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
222    debug!("Got an incoming payment via invoice_payment: {:?}", v);
223    let state = plugin.state();
224    let call: messages::InvoicePaymentCall = serde_json::from_value(v).unwrap();
225
226    let rpc = state.rpc.lock().await.clone();
227    let req = requests::ListInvoices {
228        label: Some(call.payment.label.clone()),
229        invstring: None,
230        payment_hash: None,
231    };
232
233    let invoice = match rpc
234        .call::<_, responses::ListInvoices>("listinvoices", req)
235        .await
236        .unwrap()
237        .invoices
238        .pop()
239    {
240        Some(i) => i,
241        None => {
242            warn!(
243                "No invoice matching the notification label {} found",
244                call.payment.label
245            );
246            return Ok(json!({"result": "continue"}));
247        }
248    };
249
250    debug!(
251        "Retrieved matching invoice for invoice_payment: {:?}",
252        invoice
253    );
254
255    let amount: pb::Amount = call.payment.amount.try_into().unwrap();
256
257    let mut tlvs = vec![];
258
259    for t in call.payment.extratlvs.unwrap_or(vec![]) {
260        tlvs.push(t.into());
261    }
262    use crate::pb::incoming_payment::Details;
263    let details = pb::OffChainPayment {
264        label: invoice.label,
265        preimage: hex::decode(call.payment.preimage).unwrap(),
266        amount: Some(amount.try_into().unwrap()),
267        extratlvs: tlvs,
268        bolt11: invoice.bolt11,
269        payment_hash: hex::decode(invoice.payment_hash).unwrap(),
270    };
271
272    let p = pb::IncomingPayment {
273        details: Some(Details::Offchain(details)),
274    };
275
276    match state.events.clone().send(Event::IncomingPayment(p)) {
277        Ok(_) => {}
278        Err(_) => warn!("No active listener for the incoming payment"),
279    }
280    Ok(json!({"result": "continue"}))
281}
282
283/// An event that we can observe during the operation of the plugin.
284#[derive(Clone, Debug)]
285pub enum Event {
286    Stop(Arc<stager::Stage>),
287
288    /// A grpc call. The first element is the URI of the request.
289    RpcCall(String),
290    IncomingPayment(pb::IncomingPayment),
291    CustomMsg(pb::Custommsg),
292}
293
294pub use cln_grpc as grpc;