gl_plugin/
lib.rs

1use anyhow::Result;
2use cln_rpc;
3use log::{debug, warn};
4use rpc::LightningClient;
5use serde_json::json;
6use std::future::Future;
7use std::sync::Arc;
8use tokio::sync::broadcast;
9use tokio::sync::Mutex;
10
11mod awaitables;
12pub mod config;
13pub mod hsm;
14mod lsp;
15pub mod messages;
16pub mod node;
17pub mod pb;
18pub mod requests;
19pub mod responses;
20pub mod rpc;
21pub mod stager;
22pub mod storage;
23pub mod tlv;
24mod tramp;
25#[cfg(unix)]
26mod unix;
27
28mod context;
29
30#[derive(Clone)]
31pub struct GlPlugin {
32    rpc: Arc<Mutex<LightningClient>>,
33    stage: Arc<stager::Stage>,
34    events: broadcast::Sender<Event>,
35}
36
37/// A small wrapper around [`cln_plugin::Builder`] that allows us to
38/// subscribe to events outside the plugin state itself, before
39/// getting configured.
40// TODO: Switch this out once the [`cln_plugin::Builder`] no longer
41// pre-binds state
42pub struct Builder {
43    inner: cln_plugin::Builder<GlPlugin, tokio::io::Stdin, tokio::io::Stdout>,
44    events: broadcast::Sender<Event>,
45    state: GlPlugin,
46}
47
48impl Builder {
49    pub fn subscribe_events(&self) -> broadcast::Receiver<Event> {
50        self.events.subscribe()
51    }
52    pub async fn start(self) -> Result<Option<Plugin>> {
53        self.inner.start(self.state).await
54    }
55
56    pub fn hook<C, F>(self, hookname: &str, callback: C) -> Self
57    where
58        C: Send + Sync + 'static,
59        C: Fn(cln_plugin::Plugin<GlPlugin>, serde_json::Value) -> F + 'static,
60        F: Future<Output = Result<serde_json::Value, anyhow::Error>> + Send + Sync + 'static,
61    {
62        Builder {
63            inner: self.inner.hook(hookname, callback),
64            ..self
65        }
66    }
67    pub fn subscribe<C, F>(self, hookname: &str, callback: C) -> Self
68    where
69        C: Send + Sync + 'static,
70        C: Fn(cln_plugin::Plugin<GlPlugin>, serde_json::Value) -> F + 'static,
71        F: Future<Output = Result<(), anyhow::Error>> + Send + Sync + 'static,
72    {
73        Builder {
74            inner: self.inner.subscribe(hookname, callback),
75            ..self
76        }
77    }
78
79    pub fn stage(&self) -> Arc<stager::Stage> {
80        self.state.stage.clone()
81    }
82}
83
84pub type Plugin = cln_plugin::Plugin<GlPlugin>;
85
86impl GlPlugin {
87    pub fn get_stage(&self) -> Arc<stager::Stage> {
88        self.stage.clone()
89    }
90}
91
92/// Initialize the plugin, but don't start it yet. Allows attaching
93/// additional methods, hooks, and subscriptions.
94pub async fn init(
95    stage: Arc<stager::Stage>,
96    events: tokio::sync::broadcast::Sender<Event>,
97) -> Result<Builder> {
98    let rpc = Arc::new(Mutex::new(LightningClient::new("lightning-rpc")));
99
100    let state = GlPlugin {
101        events: events.clone(),
102        rpc,
103        stage,
104    };
105
106    let inner = cln_plugin::Builder::new(tokio::io::stdin(), tokio::io::stdout())
107        .hook("htlc_accepted", lsp::on_htlc_accepted)
108        .hook("invoice_payment", on_invoice_payment)
109        .hook("peer_connected", on_peer_connected)
110        .hook("openchannel", on_openchannel)
111        .hook("custommsg", on_custommsg);
112
113    Ok(Builder {
114        state,
115        inner,
116        events,
117    })
118}
119
120async fn on_custommsg(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
121    let call: messages::Custommsg = serde_json::from_value(v).unwrap();
122    debug!("Received a custommsg {:?}", &call);
123
124    let msg = pb::Custommsg {
125        peer_id: hex::decode(call.peer_id).unwrap(),
126        payload: hex::decode(call.payload).unwrap(),
127    };
128
129    // Custommsg delivery is best effort, so don't use the Result<>.
130    if let Err(e) = plugin.state().events.clone().send(Event::CustomMsg(msg)) {
131        log::debug!("Error sending custommsg to listeners: {}", e);
132    }
133
134    Ok(json!({"result": "continue"}))
135}
136
137/// Notification handler that receives notifications on successful
138/// peer connections, then stores them into the `datastore` for future
139/// reference.
140async fn on_peer_connected(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
141    debug!("Got a successful peer connection: {:?}", v);
142    let call = serde_json::from_value::<messages::PeerConnectedCall>(v.clone()).unwrap();
143    let mut rpc = cln_rpc::ClnRpc::new(plugin.configuration().rpc_file).await?;
144    let req = cln_rpc::model::requests::DatastoreRequest {
145        key: vec![
146            "greenlight".to_string(),
147            "peerlist".to_string(),
148            call.peer.id.clone(),
149        ],
150        string: Some(serde_json::to_string(&call.peer).unwrap()),
151        hex: None,
152        mode: Some(cln_rpc::model::requests::DatastoreMode::CREATE_OR_REPLACE),
153        generation: None,
154    };
155
156    // We ignore the response and continue anyways.
157    let res = rpc.call_typed(&req).await;
158    debug!("Got datastore response: {:?}", res);
159    Ok(json!({"result": "continue"}))
160}
161
162async fn on_openchannel(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
163    debug!("Received an openchannel request: {:?}", v);
164    let mut rpc = cln_rpc::ClnRpc::new(plugin.configuration().rpc_file).await?;
165    
166    let req = cln_rpc::model::requests::ListdatastoreRequest{
167        key: Some(vec![
168            "glconf".to_string(),
169            "request".to_string(),
170        ])
171    };
172
173    let res = rpc.call_typed(&req).await;
174    debug!("ListdatastoreRequest response: {:?}", res);
175
176    match res {
177        Ok(res) => {
178            if !res.datastore.is_empty() {
179                match &res.datastore[0].string {
180                    Some(serialized_request) => {
181                        match _parse_gl_config_from_serialized_request(serialized_request.to_string()) {
182                            Some(gl_config) => {
183                                return Ok(json!({"result": "continue", "close_to":  gl_config.close_to_addr}));
184                            }
185                            None => {
186                                debug!("Failed to parse the GlConfig from the serialized request's payload");
187                            }  
188                        }
189                    }
190                    None => {
191                        debug!("Got empty response from datastore for key glconf.request");
192                    }
193                }
194            }
195
196            return Ok(json!({"result": "continue"}))
197        }
198        Err(e) => {
199            log::debug!("An error occurred while searching for a custom close_to address: {}", e);
200            Ok(json!({"result": "continue"}))
201        }
202    }
203}
204
205fn _parse_gl_config_from_serialized_request(request: String) -> Option<pb::GlConfig> {
206    use prost::Message;
207    let gl_conf_req: crate::context::Request = serde_json::from_str(&request).unwrap();
208    let gl_conf_req: crate::pb::PendingRequest = gl_conf_req.into();
209    let payload = &gl_conf_req.request[5..];
210    let glconfig = crate::pb::GlConfig::decode(payload);
211
212    match glconfig {
213        Ok(glconfig) => Some(glconfig),
214        Err(e) => {
215            debug!("Failed to parse glconfig from string: {:?}", e);
216            None
217        }
218    }
219}
220
221/// Notification handler that receives notifications on incoming
222/// payments, then looks up the invoice in the DB, and forwards the
223/// full information to the GRPC interface.
224async fn on_invoice_payment(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
225    log::info!("Got an incoming payment via invoice_payment: {:?}", v);
226    let state = plugin.state();
227    let call: messages::InvoicePaymentCall = match serde_json::from_value(v) {
228        Ok(v) => v,
229        Err(e) => {
230            log::error!("Could not decode the invoice_payment_call: {e}");
231            return Ok(json!({"result": "continue"}));
232        }
233    };
234
235    let rpc = state.rpc.lock().await.clone();
236    let req = requests::ListInvoices {
237        label: Some(call.payment.label.clone()),
238        invstring: None,
239        payment_hash: None,
240    };
241
242    let invoice = match rpc
243        .call::<_, responses::ListInvoices>("listinvoices", req)
244        .await
245        .unwrap()
246        .invoices
247        .pop()
248    {
249        Some(i) => i,
250        None => {
251            warn!(
252                "No invoice matching the notification label {} found",
253                call.payment.label
254            );
255            return Ok(json!({"result": "continue"}));
256        }
257    };
258
259    debug!(
260        "Retrieved matching invoice for invoice_payment: {:?}",
261        invoice
262    );
263
264    let amount: pb::Amount = call.payment.amount.try_into().unwrap();
265
266    let mut tlvs = vec![];
267
268    for t in call.payment.extratlvs.unwrap_or(vec![]) {
269        tlvs.push(t.into());
270    }
271    use crate::pb::incoming_payment::Details;
272    let details = pb::OffChainPayment {
273        label: invoice.label,
274        preimage: hex::decode(call.payment.preimage).unwrap(),
275        amount: Some(amount.try_into().unwrap()),
276        extratlvs: tlvs,
277        bolt11: invoice.bolt11,
278        payment_hash: hex::decode(invoice.payment_hash).unwrap(),
279    };
280
281    let p = pb::IncomingPayment {
282        details: Some(Details::Offchain(details)),
283    };
284
285    match state.events.clone().send(Event::IncomingPayment(p)) {
286        Ok(_) => {}
287        Err(_) => warn!("No active listener for the incoming payment"),
288    }
289    Ok(json!({"result": "continue"}))
290}
291
292/// An event that we can observe during the operation of the plugin.
293#[derive(Clone, Debug)]
294pub enum Event {
295    Stop(Arc<stager::Stage>),
296
297    /// A grpc call. The first element is the URI of the request.
298    RpcCall(String),
299    IncomingPayment(pb::IncomingPayment),
300    CustomMsg(pb::Custommsg),
301}
302
303pub use cln_grpc as grpc;