1use anyhow::Result;
2use cln_rpc;
3use log::{debug, warn};
4use rpc::LightningClient;
5use serde_json::json;
6use std::future::Future;
7use std::sync::Arc;
8use tokio::sync::broadcast;
9use tokio::sync::Mutex;
10
11mod awaitables;
12pub mod config;
13pub mod hsm;
14mod lsp;
15pub mod messages;
16pub mod node;
17pub mod pb;
18pub mod requests;
19pub mod responses;
20pub mod rpc;
21pub mod stager;
22pub mod storage;
23pub mod tlv;
24mod tramp;
25#[cfg(unix)]
26mod unix;
27
28mod context;
29
30#[derive(Clone)]
31pub struct GlPlugin {
32 rpc: Arc<Mutex<LightningClient>>,
33 stage: Arc<stager::Stage>,
34 events: broadcast::Sender<Event>,
35}
36
37pub struct Builder {
43 inner: cln_plugin::Builder<GlPlugin, tokio::io::Stdin, tokio::io::Stdout>,
44 events: broadcast::Sender<Event>,
45 state: GlPlugin,
46}
47
48impl Builder {
49 pub fn subscribe_events(&self) -> broadcast::Receiver<Event> {
50 self.events.subscribe()
51 }
52 pub async fn start(self) -> Result<Option<Plugin>> {
53 self.inner.start(self.state).await
54 }
55
56 pub fn hook<C, F>(self, hookname: &str, callback: C) -> Self
57 where
58 C: Send + Sync + 'static,
59 C: Fn(cln_plugin::Plugin<GlPlugin>, serde_json::Value) -> F + 'static,
60 F: Future<Output = Result<serde_json::Value, anyhow::Error>> + Send + Sync + 'static,
61 {
62 Builder {
63 inner: self.inner.hook(hookname, callback),
64 ..self
65 }
66 }
67 pub fn subscribe<C, F>(self, hookname: &str, callback: C) -> Self
68 where
69 C: Send + Sync + 'static,
70 C: Fn(cln_plugin::Plugin<GlPlugin>, serde_json::Value) -> F + 'static,
71 F: Future<Output = Result<(), anyhow::Error>> + Send + Sync + 'static,
72 {
73 Builder {
74 inner: self.inner.subscribe(hookname, callback),
75 ..self
76 }
77 }
78
79 pub fn stage(&self) -> Arc<stager::Stage> {
80 self.state.stage.clone()
81 }
82}
83
84pub type Plugin = cln_plugin::Plugin<GlPlugin>;
85
86impl GlPlugin {
87 pub fn get_stage(&self) -> Arc<stager::Stage> {
88 self.stage.clone()
89 }
90}
91
92pub async fn init(
95 stage: Arc<stager::Stage>,
96 events: tokio::sync::broadcast::Sender<Event>,
97) -> Result<Builder> {
98 let rpc = Arc::new(Mutex::new(LightningClient::new("lightning-rpc")));
99
100 let state = GlPlugin {
101 events: events.clone(),
102 rpc,
103 stage,
104 };
105
106 let inner = cln_plugin::Builder::new(tokio::io::stdin(), tokio::io::stdout())
107 .hook("htlc_accepted", lsp::on_htlc_accepted)
108 .hook("invoice_payment", on_invoice_payment)
109 .hook("peer_connected", on_peer_connected)
110 .hook("openchannel", on_openchannel)
111 .hook("custommsg", on_custommsg);
112
113 Ok(Builder {
114 state,
115 inner,
116 events,
117 })
118}
119
120async fn on_custommsg(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
121 let call: messages::Custommsg = serde_json::from_value(v).unwrap();
122 debug!("Received a custommsg {:?}", &call);
123
124 let msg = pb::Custommsg {
125 peer_id: hex::decode(call.peer_id).unwrap(),
126 payload: hex::decode(call.payload).unwrap(),
127 };
128
129 if let Err(e) = plugin.state().events.clone().send(Event::CustomMsg(msg)) {
131 log::debug!("Error sending custommsg to listeners: {}", e);
132 }
133
134 Ok(json!({"result": "continue"}))
135}
136
137async fn on_peer_connected(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
141 debug!("Got a successful peer connection: {:?}", v);
142 let call = serde_json::from_value::<messages::PeerConnectedCall>(v.clone()).unwrap();
143 let mut rpc = cln_rpc::ClnRpc::new(plugin.configuration().rpc_file).await?;
144 let req = cln_rpc::model::requests::DatastoreRequest {
145 key: vec![
146 "greenlight".to_string(),
147 "peerlist".to_string(),
148 call.peer.id.clone(),
149 ],
150 string: Some(serde_json::to_string(&call.peer).unwrap()),
151 hex: None,
152 mode: Some(cln_rpc::model::requests::DatastoreMode::CREATE_OR_REPLACE),
153 generation: None,
154 };
155
156 let res = rpc.call_typed(&req).await;
158 debug!("Got datastore response: {:?}", res);
159 Ok(json!({"result": "continue"}))
160}
161
162async fn on_openchannel(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
163 debug!("Received an openchannel request: {:?}", v);
164 let mut rpc = cln_rpc::ClnRpc::new(plugin.configuration().rpc_file).await?;
165
166 let req = cln_rpc::model::requests::ListdatastoreRequest{
167 key: Some(vec![
168 "glconf".to_string(),
169 "request".to_string(),
170 ])
171 };
172
173 let res = rpc.call_typed(&req).await;
174 debug!("ListdatastoreRequest response: {:?}", res);
175
176 match res {
177 Ok(res) => {
178 if !res.datastore.is_empty() {
179 match &res.datastore[0].string {
180 Some(serialized_request) => {
181 match _parse_gl_config_from_serialized_request(serialized_request.to_string()) {
182 Some(gl_config) => {
183 return Ok(json!({"result": "continue", "close_to": gl_config.close_to_addr}));
184 }
185 None => {
186 debug!("Failed to parse the GlConfig from the serialized request's payload");
187 }
188 }
189 }
190 None => {
191 debug!("Got empty response from datastore for key glconf.request");
192 }
193 }
194 }
195
196 return Ok(json!({"result": "continue"}))
197 }
198 Err(e) => {
199 log::debug!("An error occurred while searching for a custom close_to address: {}", e);
200 Ok(json!({"result": "continue"}))
201 }
202 }
203}
204
205fn _parse_gl_config_from_serialized_request(request: String) -> Option<pb::GlConfig> {
206 use prost::Message;
207 let gl_conf_req: crate::context::Request = serde_json::from_str(&request).unwrap();
208 let gl_conf_req: crate::pb::PendingRequest = gl_conf_req.into();
209 let payload = &gl_conf_req.request[5..];
210 let glconfig = crate::pb::GlConfig::decode(payload);
211
212 match glconfig {
213 Ok(glconfig) => Some(glconfig),
214 Err(e) => {
215 debug!("Failed to parse glconfig from string: {:?}", e);
216 None
217 }
218 }
219}
220
221async fn on_invoice_payment(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
225 log::info!("Got an incoming payment via invoice_payment: {:?}", v);
226 let state = plugin.state();
227 let call: messages::InvoicePaymentCall = match serde_json::from_value(v) {
228 Ok(v) => v,
229 Err(e) => {
230 log::error!("Could not decode the invoice_payment_call: {e}");
231 return Ok(json!({"result": "continue"}));
232 }
233 };
234
235 let rpc = state.rpc.lock().await.clone();
236 let req = requests::ListInvoices {
237 label: Some(call.payment.label.clone()),
238 invstring: None,
239 payment_hash: None,
240 };
241
242 let invoice = match rpc
243 .call::<_, responses::ListInvoices>("listinvoices", req)
244 .await
245 .unwrap()
246 .invoices
247 .pop()
248 {
249 Some(i) => i,
250 None => {
251 warn!(
252 "No invoice matching the notification label {} found",
253 call.payment.label
254 );
255 return Ok(json!({"result": "continue"}));
256 }
257 };
258
259 debug!(
260 "Retrieved matching invoice for invoice_payment: {:?}",
261 invoice
262 );
263
264 let amount: pb::Amount = call.payment.amount.try_into().unwrap();
265
266 let mut tlvs = vec![];
267
268 for t in call.payment.extratlvs.unwrap_or(vec![]) {
269 tlvs.push(t.into());
270 }
271 use crate::pb::incoming_payment::Details;
272 let details = pb::OffChainPayment {
273 label: invoice.label,
274 preimage: hex::decode(call.payment.preimage).unwrap(),
275 amount: Some(amount.try_into().unwrap()),
276 extratlvs: tlvs,
277 bolt11: invoice.bolt11,
278 payment_hash: hex::decode(invoice.payment_hash).unwrap(),
279 };
280
281 let p = pb::IncomingPayment {
282 details: Some(Details::Offchain(details)),
283 };
284
285 match state.events.clone().send(Event::IncomingPayment(p)) {
286 Ok(_) => {}
287 Err(_) => warn!("No active listener for the incoming payment"),
288 }
289 Ok(json!({"result": "continue"}))
290}
291
292#[derive(Clone, Debug)]
294pub enum Event {
295 Stop(Arc<stager::Stage>),
296
297 RpcCall(String),
299 IncomingPayment(pb::IncomingPayment),
300 CustomMsg(pb::Custommsg),
301}
302
303pub use cln_grpc as grpc;