1use anyhow::Result;
2use cln_rpc;
3use log::{debug, warn};
4use rpc::LightningClient;
5use serde_json::json;
6use std::future::Future;
7use std::sync::Arc;
8use tokio::sync::broadcast;
9use tokio::sync::Mutex;
10
11pub mod config;
12pub mod hsm;
13mod lsp;
14pub mod messages;
15pub mod node;
16pub mod pb;
17pub mod requests;
18pub mod responses;
19pub mod rpc;
20pub mod stager;
21pub mod storage;
22#[cfg(unix)]
23mod unix;
24
25mod context;
26
27#[derive(Clone)]
28pub struct GlPlugin {
29 rpc: Arc<Mutex<LightningClient>>,
30 stage: Arc<stager::Stage>,
31 events: broadcast::Sender<Event>,
32}
33
34pub struct Builder {
40 inner: cln_plugin::Builder<GlPlugin, tokio::io::Stdin, tokio::io::Stdout>,
41 events: broadcast::Sender<Event>,
42 state: GlPlugin,
43}
44
45impl Builder {
46 pub fn subscribe_events(&self) -> broadcast::Receiver<Event> {
47 self.events.subscribe()
48 }
49 pub async fn start(self) -> Result<Option<Plugin>> {
50 self.inner.start(self.state).await
51 }
52
53 pub fn hook<C, F>(self, hookname: &str, callback: C) -> Self
54 where
55 C: Send + Sync + 'static,
56 C: Fn(cln_plugin::Plugin<GlPlugin>, serde_json::Value) -> F + 'static,
57 F: Future<Output = Result<serde_json::Value, anyhow::Error>> + Send + Sync + 'static,
58 {
59 Builder {
60 inner: self.inner.hook(hookname, callback),
61 ..self
62 }
63 }
64 pub fn subscribe<C, F>(self, hookname: &str, callback: C) -> Self
65 where
66 C: Send + Sync + 'static,
67 C: Fn(cln_plugin::Plugin<GlPlugin>, serde_json::Value) -> F + 'static,
68 F: Future<Output = Result<(), anyhow::Error>> + Send + Sync + 'static,
69 {
70 Builder {
71 inner: self.inner.subscribe(hookname, callback),
72 ..self
73 }
74 }
75
76 pub fn stage(&self) -> Arc<stager::Stage> {
77 self.state.stage.clone()
78 }
79}
80
81pub type Plugin = cln_plugin::Plugin<GlPlugin>;
82
83impl GlPlugin {
84 pub fn get_stage(&self) -> Arc<stager::Stage> {
85 self.stage.clone()
86 }
87}
88
89pub async fn init(
92 stage: Arc<stager::Stage>,
93 events: tokio::sync::broadcast::Sender<Event>,
94) -> Result<Builder> {
95 let rpc = Arc::new(Mutex::new(LightningClient::new("lightning-rpc")));
96
97 let state = GlPlugin {
98 events: events.clone(),
99 rpc,
100 stage,
101 };
102
103 let inner = cln_plugin::Builder::new(tokio::io::stdin(), tokio::io::stdout())
104 .hook("htlc_accepted", lsp::on_htlc_accepted)
105 .hook("invoice_payment", on_invoice_payment)
106 .hook("peer_connected", on_peer_connected)
107 .hook("openchannel", on_openchannel)
108 .hook("custommsg", on_custommsg);
109 Ok(Builder {
110 state,
111 inner,
112 events,
113 })
114}
115
116async fn on_custommsg(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
117 let call: messages::Custommsg = serde_json::from_value(v).unwrap();
118 debug!("Received a custommsg {:?}", &call);
119
120 let msg = pb::Custommsg {
121 peer_id: hex::decode(call.peer_id).unwrap(),
122 payload: hex::decode(call.payload).unwrap(),
123 };
124
125 if let Err(e) = plugin.state().events.clone().send(Event::CustomMsg(msg)) {
127 log::debug!("Error sending custommsg to listeners: {}", e);
128 }
129
130 Ok(json!({"result": "continue"}))
131}
132
133async fn on_peer_connected(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
137 debug!("Got a successful peer connection: {:?}", v);
138 let call = serde_json::from_value::<messages::PeerConnectedCall>(v.clone()).unwrap();
139 let mut rpc = cln_rpc::ClnRpc::new(plugin.configuration().rpc_file).await?;
140 let req = cln_rpc::model::requests::DatastoreRequest {
141 key: vec![
142 "greenlight".to_string(),
143 "peerlist".to_string(),
144 call.peer.id.clone(),
145 ],
146 string: Some(serde_json::to_string(&call.peer).unwrap()),
147 hex: None,
148 mode: Some(cln_rpc::model::requests::DatastoreMode::CREATE_OR_REPLACE),
149 generation: None,
150 };
151
152 let res = rpc.call_typed(&req).await;
154 debug!("Got datastore response: {:?}", res);
155 Ok(json!({"result": "continue"}))
156}
157
158async fn on_openchannel(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
159 debug!("Received an openchannel request: {:?}", v);
160 let mut rpc = cln_rpc::ClnRpc::new(plugin.configuration().rpc_file).await?;
161
162 let req = cln_rpc::model::requests::ListdatastoreRequest{
163 key: Some(vec![
164 "glconf".to_string(),
165 "request".to_string(),
166 ])
167 };
168
169 let res = rpc.call_typed(&req).await;
170 debug!("ListdatastoreRequest response: {:?}", res);
171
172 match res {
173 Ok(res) => {
174 if !res.datastore.is_empty() {
175 match &res.datastore[0].string {
176 Some(serialized_request) => {
177 match _parse_gl_config_from_serialized_request(serialized_request.to_string()) {
178 Some(gl_config) => {
179 return Ok(json!({"result": "continue", "close_to": gl_config.close_to_addr}));
180 }
181 None => {
182 debug!("Failed to parse the GlConfig from the serialized request's payload");
183 }
184 }
185 }
186 None => {
187 debug!("Got empty response from datastore for key glconf.request");
188 }
189 }
190 }
191
192 return Ok(json!({"result": "continue"}))
193 }
194 Err(e) => {
195 log::debug!("An error occurred while searching for a custom close_to address: {}", e);
196 Ok(json!({"result": "continue"}))
197 }
198 }
199}
200
201fn _parse_gl_config_from_serialized_request(request: String) -> Option<pb::GlConfig> {
202 use prost::Message;
203 let gl_conf_req: crate::context::Request = serde_json::from_str(&request).unwrap();
204 let gl_conf_req: crate::pb::PendingRequest = gl_conf_req.into();
205 let payload = &gl_conf_req.request[5..];
206 let glconfig = crate::pb::GlConfig::decode(payload);
207
208 match glconfig {
209 Ok(glconfig) => Some(glconfig),
210 Err(e) => {
211 debug!("Failed to parse glconfig from string: {:?}", e);
212 None
213 }
214 }
215}
216
217
218async fn on_invoice_payment(plugin: Plugin, v: serde_json::Value) -> Result<serde_json::Value> {
222 debug!("Got an incoming payment via invoice_payment: {:?}", v);
223 let state = plugin.state();
224 let call: messages::InvoicePaymentCall = serde_json::from_value(v).unwrap();
225
226 let rpc = state.rpc.lock().await.clone();
227 let req = requests::ListInvoices {
228 label: Some(call.payment.label.clone()),
229 invstring: None,
230 payment_hash: None,
231 };
232
233 let invoice = match rpc
234 .call::<_, responses::ListInvoices>("listinvoices", req)
235 .await
236 .unwrap()
237 .invoices
238 .pop()
239 {
240 Some(i) => i,
241 None => {
242 warn!(
243 "No invoice matching the notification label {} found",
244 call.payment.label
245 );
246 return Ok(json!({"result": "continue"}));
247 }
248 };
249
250 debug!(
251 "Retrieved matching invoice for invoice_payment: {:?}",
252 invoice
253 );
254
255 let amount: pb::Amount = call.payment.amount.try_into().unwrap();
256
257 let mut tlvs = vec![];
258
259 for t in call.payment.extratlvs.unwrap_or(vec![]) {
260 tlvs.push(t.into());
261 }
262 use crate::pb::incoming_payment::Details;
263 let details = pb::OffChainPayment {
264 label: invoice.label,
265 preimage: hex::decode(call.payment.preimage).unwrap(),
266 amount: Some(amount.try_into().unwrap()),
267 extratlvs: tlvs,
268 bolt11: invoice.bolt11,
269 payment_hash: hex::decode(invoice.payment_hash).unwrap(),
270 };
271
272 let p = pb::IncomingPayment {
273 details: Some(Details::Offchain(details)),
274 };
275
276 match state.events.clone().send(Event::IncomingPayment(p)) {
277 Ok(_) => {}
278 Err(_) => warn!("No active listener for the incoming payment"),
279 }
280 Ok(json!({"result": "continue"}))
281}
282
283#[derive(Clone, Debug)]
285pub enum Event {
286 Stop(Arc<stager::Stage>),
287
288 RpcCall(String),
290 IncomingPayment(pb::IncomingPayment),
291 CustomMsg(pb::Custommsg),
292}
293
294pub use cln_grpc as grpc;