Skip to main content

cln_plugin/
lib.rs

1use crate::codec::{JsonCodec, JsonRpcCodec};
2pub use anyhow::anyhow;
3use anyhow::{Context, Result};
4use futures::sink::SinkExt;
5use serde::de::DeserializeOwned;
6use serde::Serialize;
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8extern crate log;
9use log::trace;
10use messages::{Configuration, FeatureBits, NotificationTopic};
11use options::{OptionType, UntypedConfigOption};
12use std::collections::HashMap;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::Arc;
16use tokio::io::{AsyncRead, AsyncWrite};
17use tokio::sync::Mutex;
18use tokio_stream::StreamExt;
19use tokio_util::codec::FramedRead;
20use tokio_util::codec::FramedWrite;
21
22mod codec;
23mod logging;
24pub mod messages;
25
26#[macro_use]
27extern crate serde_json;
28
29pub mod options;
30
31/// Need to tell us about something that went wrong? Use this error
32/// type to do that. Use this alias to be safe from future changes in
33/// our internal error handling, since we'll implement any necessary
34/// conversions for you :-)
35pub type Error = anyhow::Error;
36
37/// Builder for a new plugin.
38pub struct Builder<S, I, O>
39where
40    I: AsyncRead + Unpin,
41    O: Send + AsyncWrite + Unpin,
42    S: Clone + Send,
43{
44    input: Option<I>,
45    output: Option<O>,
46
47    hooks: HashMap<String, Hook<S>>,
48    options: HashMap<String, UntypedConfigOption>,
49    option_values: HashMap<String, Option<options::Value>>,
50    rpcmethods: HashMap<String, RpcMethod<S>>,
51    setconfig_callback: Option<AsyncCallback<S>>,
52    subscriptions: HashMap<String, Subscription<S>>,
53    // Contains a Subscription if the user subscribed to "*"
54    wildcard_subscription: Option<Subscription<S>>,
55    notifications: Vec<NotificationTopic>,
56    custommessages: Vec<u16>,
57    featurebits: FeatureBits,
58    dynamic: bool,
59    // Do we want the plugin framework to automatically register a logging handler?
60    logging: bool,
61}
62
63/// A plugin that has registered with the lightning daemon, and gotten
64/// its options filled, however has not yet acknowledged the `init`
65/// message. This is a mid-state allowing a plugin to disable itself,
66/// based on the options.
67pub struct ConfiguredPlugin<S, I, O>
68where
69    S: Clone + Send,
70{
71    init_id: serde_json::Value,
72    input: FramedRead<I, JsonRpcCodec>,
73    output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
74    options: HashMap<String, UntypedConfigOption>,
75    option_values: HashMap<String, Option<options::Value>>,
76    configuration: Configuration,
77    rpcmethods: HashMap<String, AsyncCallback<S>>,
78    setconfig_callback: Option<AsyncCallback<S>>,
79    hooks: HashMap<String, AsyncCallback<S>>,
80    subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
81    wildcard_subscription: Option<AsyncNotificationCallback<S>>,
82    #[allow(dead_code)] // unsure why rust thinks this field isn't used
83    notifications: Vec<NotificationTopic>,
84}
85
86/// The [PluginDriver] is used to run the IO loop, reading messages
87/// from the Lightning daemon, dispatching calls and notifications to
88/// the plugin, and returning responses to the the daemon. We also use
89/// it to handle spontaneous messages like Notifications and logging
90/// events.
91struct PluginDriver<S>
92where
93    S: Send + Clone,
94{
95    plugin: Plugin<S>,
96    rpcmethods: HashMap<String, AsyncCallback<S>>,
97    setconfig_callback: Option<AsyncCallback<S>>,
98
99    #[allow(dead_code)] // Unused until we fill in the Hook structs.
100    hooks: HashMap<String, AsyncCallback<S>>,
101    subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
102    wildcard_subscription: Option<AsyncNotificationCallback<S>>,
103}
104
105#[derive(Clone)]
106pub struct Plugin<S>
107where
108    S: Clone + Send,
109{
110    /// The state gets cloned for each request
111    state: S,
112    /// "options" field of "init" message sent by cln
113    options: HashMap<String, UntypedConfigOption>,
114    option_values: Arc<std::sync::Mutex<HashMap<String, Option<options::Value>>>>,
115    /// "configuration" field of "init" message sent by cln
116    configuration: Configuration,
117    /// A signal that allows us to wait on the plugin's shutdown.
118    wait_handle: tokio::sync::broadcast::Sender<()>,
119
120    sender: tokio::sync::mpsc::Sender<serde_json::Value>,
121}
122
123impl<S, I, O> Builder<S, I, O>
124where
125    O: Send + AsyncWrite + Unpin + 'static,
126    S: Clone + Sync + Send + 'static,
127    I: AsyncRead + Send + Unpin + 'static,
128{
129    pub fn new(input: I, output: O) -> Self {
130        Self {
131            input: Some(input),
132            output: Some(output),
133            hooks: HashMap::new(),
134            subscriptions: HashMap::new(),
135            wildcard_subscription: None,
136            options: HashMap::new(),
137            // Should not be configured by user.
138            // This values are set when parsing the init-call
139            option_values: HashMap::new(),
140            rpcmethods: HashMap::new(),
141            setconfig_callback: None,
142            notifications: vec![],
143            featurebits: FeatureBits::default(),
144            dynamic: false,
145            custommessages: vec![],
146            logging: true,
147        }
148    }
149
150    pub fn option<'a, V: options::OptionType<'a>>(
151        mut self,
152        opt: options::ConfigOption<'a, V>,
153    ) -> Builder<S, I, O> {
154        self.options.insert(opt.name().to_string(), opt.build());
155        self
156    }
157
158    pub fn notification(mut self, notif: messages::NotificationTopic) -> Builder<S, I, O> {
159        self.notifications.push(notif);
160        self
161    }
162
163    /// Subscribe to notifications for the given `topic`. The handler
164    /// is an async function that takes a `Plugin<S>` and the
165    /// notification as a `serde_json::Value` as inputs. Since
166    /// notifications do not expect a result the handler should only
167    /// report errors while processing. Any error reported while
168    /// processing the notification will be logged in the cln logs.
169    ///
170    /// ```
171    /// use cln_plugin::{options, Builder, Error, Plugin};
172    ///
173    /// async fn connect_handler(_p: Plugin<()>, v: serde_json::Value) -> Result<(), Error> {
174    ///     println!("Got a connect notification: {}", v);
175    ///     Ok(())
176    /// }
177    ///
178    /// let b = Builder::new(tokio::io::stdin(), tokio::io::stdout())
179    ///     .subscribe("connect", connect_handler);
180    /// ```
181    pub fn subscribe<C, F>(mut self, topic: &str, callback: C) -> Builder<S, I, O>
182    where
183        C: Send + Sync + 'static,
184        C: Fn(Plugin<S>, Request) -> F + 'static,
185        F: Future<Output = Result<(), Error>> + Send + 'static,
186    {
187        let subscription = Subscription {
188            callback: Box::new(move |p, r| Box::pin(callback(p, r))),
189        };
190
191        if topic == "*" {
192            self.wildcard_subscription = Some(subscription);
193        } else {
194            self.subscriptions.insert(topic.to_string(), subscription);
195        };
196        self
197    }
198
199    /// Add a hook subscription for `hookname` with a raw [`serde_json::Value`] request and response.
200    /// Prefer [`Builder::hook_typed`] for type-safe hooks, or [`Builder::hook_from_builder`] if you
201    /// need to configure `before`, `after`, or `filters`.
202    pub fn hook<C, F>(mut self, hookname: &str, callback: C) -> Self
203    where
204        C: Send + Sync + 'static,
205        C: Fn(Plugin<S>, Request) -> F + 'static,
206        F: Future<Output = Response> + Send + 'static,
207    {
208        self.hooks.insert(
209            hookname.to_string(),
210            Hook {
211                name: hookname.to_string(),
212                callback: Box::new(move |p, r| Box::pin(callback(p, r))),
213                before: Vec::new(),
214                after: Vec::new(),
215                filters: None,
216            },
217        );
218        self
219    }
220
221    /// Add a hook subscription using a [`HookBuilder`], which allows configuring `before`, `after`,
222    /// and `filters` in addition to the callback. Use [`HookBuilder::new`] for raw
223    /// [`serde_json::Value`] hooks or [`HookBuilder::new_typed`] for type-safe hooks.
224    pub fn hook_from_builder(mut self, hook: HookBuilder<S>) -> Builder<S, I, O> {
225        self.hooks.insert(hook.name.clone(), hook.build());
226        self
227    }
228
229    /// Add a hook subscription for `hookname` with typed request and response. The request is
230    /// deserialized from JSON into `Req` and the response is serialized from `Resp` back to JSON
231    /// automatically. If deserialization of the request fails, the hook returns an error to CLN.
232    /// Use [`Builder::hook_from_builder`] with [`HookBuilder::new_typed`] if you additionally need
233    /// to configure `before`, `after`, or `filters`.
234    pub fn hook_typed<C, F, Req, Resp>(mut self, hookname: &str, callback: C) -> Self
235    where
236        C: Send + Sync + 'static,
237        C: Fn(Plugin<S>, Req) -> F + 'static,
238        F: Future<Output = Result<Resp, Error>> + Send + 'static,
239        Req: DeserializeOwned + Send + 'static,
240        Resp: Serialize + Send + 'static,
241    {
242        let hookname = hookname.to_string();
243        self.hooks.insert(
244            hookname.clone(),
245            Hook {
246                name: hookname.clone(),
247                callback: Box::new(move |p, r| {
248                    let typed_req = serde_json::from_value(r).unwrap_or_else(|e| {
249                        let error = format!(
250                            "cln-plugin: hook '{hookname}' received a request that doesn't match \
251                            the expected schema. Error: {e}"
252                        );
253                        println!(
254                            "{}",
255                            serde_json::json!({"jsonrpc": "2.0",
256                          "method": "log",
257                          "params": {"level":"warn", "message":error}})
258                        );
259                        std::process::exit(1);
260                    });
261                    let fut = callback(p, typed_req);
262                    Box::pin(async move {
263                        let typed_resp = fut.await?;
264                        serde_json::to_value(typed_resp).map_err(Error::from)
265                    })
266                }),
267                before: Vec::new(),
268                after: Vec::new(),
269                filters: None,
270            },
271        );
272        self
273    }
274
275    /// Register a custom RPC method for the RPC passthrough from the
276    /// main daemon
277    pub fn rpcmethod<C, F>(mut self, name: &str, description: &str, callback: C) -> Builder<S, I, O>
278    where
279        C: Send + Sync + 'static,
280        C: Fn(Plugin<S>, Request) -> F + 'static,
281        F: Future<Output = Response> + Send + 'static,
282    {
283        self.rpcmethods.insert(
284            name.to_string(),
285            RpcMethod {
286                name: name.to_string(),
287                description: description.to_string(),
288                usage: String::default(),
289                callback: Box::new(move |p, r| Box::pin(callback(p, r))),
290            },
291        );
292        self
293    }
294
295    pub fn rpcmethod_from_builder(mut self, rpc_method: RpcMethodBuilder<S>) -> Builder<S, I, O> {
296        self.rpcmethods
297            .insert(rpc_method.name.to_string(), rpc_method.build());
298        self
299    }
300
301    /// Register a callback for setconfig to accept changes for dynamic options
302    pub fn setconfig_callback<C, F>(mut self, setconfig_callback: C) -> Builder<S, I, O>
303    where
304        C: Send + Sync + 'static,
305        C: Fn(Plugin<S>, Request) -> F + 'static,
306        F: Future<Output = Response> + Send + 'static,
307    {
308        self.setconfig_callback = Some(Box::new(move |p, r| Box::pin(setconfig_callback(p, r))));
309        self
310    }
311
312    /// Send true value for "dynamic" field in "getmanifest" response
313    pub fn dynamic(mut self) -> Builder<S, I, O> {
314        self.dynamic = true;
315        self
316    }
317
318    /// Sets the "featurebits" in the "getmanifest" response
319    pub fn featurebits(mut self, kind: FeatureBitsKind, hex: String) -> Self {
320        match kind {
321            FeatureBitsKind::Node => self.featurebits.node = Some(hex),
322            FeatureBitsKind::Channel => self.featurebits.channel = Some(hex),
323            FeatureBitsKind::Init => self.featurebits.init = Some(hex),
324            FeatureBitsKind::Invoice => self.featurebits.invoice = Some(hex),
325        }
326        self
327    }
328
329    /// Should the plugin automatically register a logging handler? If
330    /// not you may need to register a logging handler yourself. Be
331    /// careful not to print raw lines to `stdout` if you do, since
332    /// that'll interfere with the plugin communication. See the CLN
333    /// documentation on logging to see what logging events should
334    /// look like.
335    pub fn with_logging(mut self, log: bool) -> Builder<S, I, O> {
336        self.logging = log;
337        self
338    }
339
340    /// Tells lightningd explicitly to allow custommmessages of the provided
341    /// type
342    pub fn custommessages(mut self, custommessages: Vec<u16>) -> Self {
343        self.custommessages = custommessages;
344        self
345    }
346
347    /// Communicate with `lightningd` to tell it about our options,
348    /// RPC methods and subscribe to hooks, and then process the
349    /// initialization, configuring the plugin.
350    ///
351    /// Returns `None` if we were invoked with `--help` and thus
352    /// should exit after this handshake
353    pub async fn configure(mut self) -> Result<Option<ConfiguredPlugin<S, I, O>>, anyhow::Error> {
354        let mut input = FramedRead::new(self.input.take().unwrap(), JsonRpcCodec::default());
355
356        // Sadly we need to wrap the output in a mutex in order to
357        // enable early logging, i.e., logging that is done before the
358        // PluginDriver is processing events during the
359        // handshake. Otherwise we could just write the log events to
360        // the event queue and have the PluginDriver be the sole owner
361        // of `Stdout`.
362        let output = Arc::new(Mutex::new(FramedWrite::new(
363            self.output.take().unwrap(),
364            JsonCodec::default(),
365        )));
366
367        // Now configure the logging, so any `log` call is wrapped
368        // in a JSON-RPC notification and sent to Core Lightning
369        if self.logging {
370            crate::logging::init(output.clone()).await?;
371            trace!("Plugin logging initialized");
372        }
373
374        // Read the `getmanifest` message:
375        match input.next().await {
376            Some(Ok(messages::JsonRpc::Request(id, messages::Request::Getmanifest(m)))) => {
377                output
378                    .lock()
379                    .await
380                    .send(json!({
381                        "jsonrpc": "2.0",
382                        "result": self.handle_get_manifest(m),
383                        "id": id,
384                    }))
385                    .await?
386            }
387            Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
388            None => {
389                return Err(anyhow!(
390                    "Lost connection to lightning expecting getmanifest"
391                ));
392            }
393        };
394        let (init_id, configuration) = match input.next().await {
395            Some(Ok(messages::JsonRpc::Request(id, messages::Request::Init(m)))) => {
396                (id, self.handle_init(m)?)
397            }
398
399            Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
400            None => {
401                // If we are being called with --help we will get
402                // disconnected here. That's expected, so don't
403                // complain about it.
404                return Ok(None);
405            }
406        };
407
408        // TODO Split the two hashmaps once we fill in the hook
409        // payload structs in messages.rs
410        let mut rpcmethods: HashMap<String, AsyncCallback<S>> =
411            HashMap::from_iter(self.rpcmethods.drain().map(|(k, v)| (k, v.callback)));
412        rpcmethods.extend(self.hooks.drain().map(|(k, v)| (k, v.callback)));
413
414        let subscriptions =
415            HashMap::from_iter(self.subscriptions.drain().map(|(k, v)| (k, v.callback)));
416        let all_subscription = self.wildcard_subscription.map(|s| s.callback);
417
418        // Leave the `init` reply pending, so we can disable based on
419        // the options if required.
420        Ok(Some(ConfiguredPlugin {
421            // The JSON-RPC `id` field so we can reply correctly.
422            init_id,
423            input,
424            output,
425            rpcmethods,
426            setconfig_callback: self.setconfig_callback,
427            notifications: self.notifications,
428            subscriptions,
429            wildcard_subscription: all_subscription,
430            options: self.options,
431            option_values: self.option_values,
432            configuration,
433            hooks: HashMap::new(),
434        }))
435    }
436
437    /// Build and start the plugin loop. This performs the handshake
438    /// and spawns a new task that accepts incoming messages from
439    /// Core Lightning and dispatches them to the handlers. It only
440    /// returns after completing the handshake to ensure that the
441    /// configuration and initialization was successfull.
442    ///
443    /// If `lightningd` was called with `--help` we won't get a
444    /// `Plugin` instance and return `None` instead. This signals that
445    /// we should exit, and not continue running. `start()` returns in
446    /// order to allow user code to perform cleanup if necessary.
447    pub async fn start(self, state: S) -> Result<Option<Plugin<S>>, anyhow::Error> {
448        if let Some(cp) = self.configure().await? {
449            Ok(Some(cp.start(state).await?))
450        } else {
451            Ok(None)
452        }
453    }
454
455    fn handle_get_manifest(
456        &mut self,
457        _call: messages::GetManifestCall,
458    ) -> messages::GetManifestResponse {
459        let rpcmethods: Vec<_> = self
460            .rpcmethods
461            .values()
462            .map(|v| messages::RpcMethod {
463                name: v.name.clone(),
464                description: v.description.clone(),
465                usage: v.usage.clone(),
466            })
467            .collect();
468
469        let subscriptions = self
470            .subscriptions
471            .keys()
472            .map(|s| s.clone())
473            .chain(self.wildcard_subscription.iter().map(|_| String::from("*")))
474            .collect();
475
476        let hooks: Vec<messages::Hook> = self
477            .hooks
478            .values()
479            .map(|v| messages::Hook {
480                name: v.name.clone(),
481                before: v.before.clone(),
482                after: v.after.clone(),
483                filters: v.filters.clone(),
484            })
485            .collect();
486
487        messages::GetManifestResponse {
488            options: self.options.values().cloned().collect(),
489            subscriptions,
490            hooks,
491            rpcmethods,
492            notifications: self.notifications.clone(),
493            featurebits: self.featurebits.clone(),
494            dynamic: self.dynamic,
495            nonnumericids: true,
496            custommessages: self.custommessages.clone(),
497        }
498    }
499
500    fn handle_init(&mut self, call: messages::InitCall) -> Result<Configuration, Error> {
501        use options::Value as OValue;
502        use serde_json::Value as JValue;
503
504        // Match up the ConfigOptions and fill in their values if we
505        // have a matching entry.
506        for (name, option) in self.options.iter() {
507            let json_value = call.options.get(name);
508            let default_value = option.default();
509
510            let option_value: Option<options::Value> = match (json_value, default_value) {
511                (None, None) => None,
512                (None, Some(default)) => Some(default.clone()),
513                (Some(JValue::Array(a)), _) => match a.first() {
514                    Some(JValue::String(_)) => Some(OValue::StringArray(
515                        a.iter().map(|x| x.as_str().unwrap().to_string()).collect(),
516                    )),
517                    Some(JValue::Number(_)) => Some(OValue::IntegerArray(
518                        a.iter().map(|x| x.as_i64().unwrap()).collect(),
519                    )),
520                    _ => panic!("Array type not supported for option: {}", name),
521                },
522                (Some(JValue::String(s)), _) => Some(OValue::String(s.to_string())),
523                (Some(JValue::Number(i)), _) => Some(OValue::Integer(i.as_i64().unwrap())),
524                (Some(JValue::Bool(b)), _) => Some(OValue::Boolean(*b)),
525                _ => panic!("Type mismatch for option {}", name),
526            };
527
528            self.option_values.insert(name.to_string(), option_value);
529        }
530        Ok(call.configuration)
531    }
532}
533
534impl<S> HookBuilder<S>
535where
536    S: Send + Clone,
537{
538    pub fn new<C, F>(name: &str, callback: C) -> Self
539    where
540        C: Send + Sync + 'static,
541        C: Fn(Plugin<S>, Request) -> F + 'static,
542        F: Future<Output = Response> + Send + 'static,
543    {
544        Self {
545            name: name.to_string(),
546            callback: Box::new(move |p, r| Box::pin(callback(p, r))),
547            before: Vec::new(),
548            after: Vec::new(),
549            filters: None,
550        }
551    }
552
553    pub fn new_typed<C, F, Req, Resp>(name: &str, callback: C) -> Self
554    where
555        C: Send + Sync + 'static,
556        C: Fn(Plugin<S>, Req) -> F + 'static,
557        F: Future<Output = Result<Resp, Error>> + Send + 'static,
558        Req: DeserializeOwned + Send + 'static,
559        Resp: Serialize + Send + 'static,
560    {
561        let hookname = name.to_string();
562        Self {
563            name: hookname.clone(),
564            callback: Box::new(move |p, r| {
565                let typed_req = serde_json::from_value(r).unwrap_or_else(|e| {
566                    let error = format!(
567                        "cln-plugin: hook '{hookname}' received a request that doesn't match \
568                        the expected schema. Error: {e}"
569                    );
570                    println!(
571                        "{}",
572                        serde_json::json!({"jsonrpc": "2.0",
573                          "method": "log",
574                          "params": {"level":"warn", "message":error}})
575                    );
576                    std::process::exit(1);
577                });
578                let fut = callback(p, typed_req);
579                Box::pin(async move {
580                    let typed_resp = fut.await?;
581                    serde_json::to_value(typed_resp).map_err(Error::from)
582                })
583            }),
584            before: Vec::new(),
585            after: Vec::new(),
586            filters: None,
587        }
588    }
589
590    pub fn before(mut self, before: Vec<String>) -> Self {
591        self.before = before;
592        self
593    }
594
595    pub fn after(mut self, after: Vec<String>) -> Self {
596        self.after = after;
597        self
598    }
599
600    pub fn filters(mut self, filters: Vec<HookFilter>) -> Self {
601        // Empty Vec would filter everything, must be None instead to not get serialized
602        if filters.is_empty() {
603            self.filters = None;
604        } else {
605            self.filters = Some(filters);
606        }
607        self
608    }
609
610    fn build(self) -> Hook<S> {
611        Hook {
612            callback: self.callback,
613            name: self.name,
614            before: self.before,
615            after: self.after,
616            filters: self.filters,
617        }
618    }
619}
620
621impl<S> RpcMethodBuilder<S>
622where
623    S: Send + Clone,
624{
625    pub fn new<C, F>(name: &str, callback: C) -> Self
626    where
627        C: Send + Sync + 'static,
628        C: Fn(Plugin<S>, Request) -> F + 'static,
629        F: Future<Output = Response> + Send + 'static,
630    {
631        Self {
632            name: name.to_string(),
633            callback: Box::new(move |p, r| Box::pin(callback(p, r))),
634            usage: None,
635            description: None,
636        }
637    }
638
639    pub fn description(mut self, description: &str) -> Self {
640        self.description = Some(description.to_string());
641        self
642    }
643
644    pub fn usage(mut self, usage: &str) -> Self {
645        self.usage = Some(usage.to_string());
646        self
647    }
648
649    fn build(self) -> RpcMethod<S> {
650        RpcMethod {
651            callback: self.callback,
652            name: self.name,
653            description: self.description.unwrap_or_default(),
654            usage: self.usage.unwrap_or_default(),
655        }
656    }
657}
658
659// Just some type aliases so we don't get confused in a lisp-like sea
660// of parentheses.
661type Request = serde_json::Value;
662type Response = Result<serde_json::Value, Error>;
663type AsyncCallback<S> =
664    Box<dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Response> + Send>> + Send + Sync>;
665type AsyncNotificationCallback<S> = Box<
666    dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
667        + Send
668        + Sync,
669>;
670
671/// A struct collecting the metadata required to register a custom
672/// rpcmethod with the main daemon upon init. It'll get deconstructed
673/// into just the callback after the init.
674struct RpcMethod<S>
675where
676    S: Clone + Send,
677{
678    callback: AsyncCallback<S>,
679    description: String,
680    name: String,
681    usage: String,
682}
683
684pub struct RpcMethodBuilder<S>
685where
686    S: Clone + Send,
687{
688    callback: AsyncCallback<S>,
689    name: String,
690    description: Option<String>,
691    usage: Option<String>,
692}
693
694struct Subscription<S>
695where
696    S: Clone + Send,
697{
698    callback: AsyncNotificationCallback<S>,
699}
700
701struct Hook<S>
702where
703    S: Clone + Send,
704{
705    name: String,
706    callback: AsyncCallback<S>,
707    before: Vec<String>,
708    after: Vec<String>,
709    filters: Option<Vec<HookFilter>>,
710}
711
712pub struct HookBuilder<S>
713where
714    S: Clone + Send,
715{
716    name: String,
717    callback: AsyncCallback<S>,
718    before: Vec<String>,
719    after: Vec<String>,
720    filters: Option<Vec<HookFilter>>,
721}
722
723#[derive(Debug, Clone, Serialize)]
724#[serde(untagged)]
725pub enum HookFilter {
726    Str(String),
727    Int(i64),
728}
729
730impl<S> Plugin<S>
731where
732    S: Clone + Send,
733{
734    pub fn option_str(&self, name: &str) -> Result<Option<options::Value>> {
735        self.option_values
736            .lock()
737            .unwrap()
738            .get(name)
739            .ok_or(anyhow!("No option named {}", name))
740            .cloned()
741    }
742
743    pub fn option<'a, OV: OptionType<'a>>(
744        &self,
745        config_option: &options::ConfigOption<'a, OV>,
746    ) -> Result<OV::OutputValue> {
747        let value = self.option_str(config_option.name())?;
748        Ok(OV::from_value(&value))
749    }
750
751    pub fn set_option_str(&self, name: &str, value: options::Value) -> Result<()> {
752        *self
753            .option_values
754            .lock()
755            .unwrap()
756            .get_mut(name)
757            .ok_or(anyhow!("No option named {}", name))? = Some(value);
758        Ok(())
759    }
760
761    pub fn set_option<'a, OV: OptionType<'a>>(
762        &self,
763        config_option: &options::ConfigOption<'a, OV>,
764        value: options::Value,
765    ) -> Result<()> {
766        self.set_option_str(config_option.name(), value)?;
767        Ok(())
768    }
769}
770
771impl<S, I, O> ConfiguredPlugin<S, I, O>
772where
773    S: Send + Clone + Sync + 'static,
774    I: AsyncRead + Send + Unpin + 'static,
775    O: Send + AsyncWrite + Unpin + 'static,
776{
777    #[allow(unused_mut)]
778    pub async fn start(mut self, state: S) -> Result<Plugin<S>, anyhow::Error> {
779        let output = self.output;
780        let input = self.input;
781        let (wait_handle, _) = tokio::sync::broadcast::channel(1);
782
783        // An MPSC pair used by anything that needs to send messages
784        // to the main daemon.
785        let (sender, receiver) = tokio::sync::mpsc::channel(4);
786
787        let plugin = Plugin {
788            state,
789            options: self.options,
790            option_values: Arc::new(std::sync::Mutex::new(self.option_values)),
791            configuration: self.configuration,
792            wait_handle,
793            sender,
794        };
795
796        let driver = PluginDriver {
797            plugin: plugin.clone(),
798            rpcmethods: self.rpcmethods,
799            setconfig_callback: self.setconfig_callback,
800            hooks: self.hooks,
801            subscriptions: self.subscriptions,
802            wildcard_subscription: self.wildcard_subscription,
803        };
804
805        output
806            .lock()
807            .await
808            .send(json!(
809                {
810                    "jsonrpc": "2.0",
811                    "id": self.init_id,
812            "result": crate::messages::InitResponse{disable: None}
813                }
814            ))
815            .await
816            .context("sending init response")?;
817
818        let joiner = plugin.wait_handle.clone();
819        // Start the PluginDriver to handle plugin IO
820        tokio::spawn(async move {
821            if let Err(e) = driver.run(receiver, input, output).await {
822                log::warn!("Plugin loop returned error {:?}", e);
823            }
824
825            // Now that we have left the reader loop its time to
826            // notify any waiting tasks. This most likely will cause
827            // the main task to exit and the plugin to terminate.
828            joiner.send(())
829        });
830        Ok(plugin)
831    }
832
833    /// Abort the plugin startup. Communicate that we're about to exit
834    /// voluntarily, and this is not an error.
835    #[allow(unused_mut)]
836    pub async fn disable(mut self, reason: &str) -> Result<(), anyhow::Error> {
837        self.output
838            .lock()
839            .await
840            .send(json!(
841                {
842                    "jsonrpc": "2.0",
843                    "id": self.init_id,
844            "result": crate::messages::InitResponse{
845            disable: Some(reason.to_string())
846            }
847                }
848            ))
849            .await
850            .context("sending init response")?;
851        Ok(())
852    }
853
854    pub fn option_str(&self, name: &str) -> Result<Option<options::Value>> {
855        self.option_values
856            .get(name)
857            .ok_or(anyhow!("No option named '{}'", name))
858            .map(|c| c.clone())
859    }
860
861    pub fn option<'a, OV: OptionType<'a>>(
862        &self,
863        config_option: &options::ConfigOption<'a, OV>,
864    ) -> Result<OV::OutputValue> {
865        let value = self.option_str(config_option.name())?;
866        Ok(OV::from_value(&value))
867    }
868
869    /// return the cln configuration send to the
870    /// plugin after the initialization.
871    pub fn configuration(&self) -> Configuration {
872        self.configuration.clone()
873    }
874}
875
876impl<S> PluginDriver<S>
877where
878    S: Send + Clone,
879{
880    /// Run the plugin until we get a shutdown command.
881    async fn run<I, O>(
882        self,
883        mut receiver: tokio::sync::mpsc::Receiver<serde_json::Value>,
884        mut input: FramedRead<I, JsonRpcCodec>,
885        output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
886    ) -> Result<(), Error>
887    where
888        I: Send + AsyncReadExt + Unpin,
889        O: Send + AsyncWriteExt + Unpin,
890    {
891        loop {
892            // If we encounter any error reading or writing from/to
893            // the master we hand them up, so we can return control to
894            // the user-code, which may require some cleanups or
895            // similar.
896            tokio::select! {
897                    e = self.dispatch_one(&mut input, &self.plugin) => {
898                        if let Err(e) = e {
899                return Err(e)
900                        }
901            },
902            v = receiver.recv() => {
903                        output.lock().await.send(
904                v.context("internal communication error")?
905                        ).await?;
906            },
907                }
908        }
909    }
910
911    /// Dispatch one server-side event and then return. Just so we
912    /// have a nicer looking `select` statement in `run` :-)
913    async fn dispatch_one<I>(
914        &self,
915        input: &mut FramedRead<I, JsonRpcCodec>,
916        plugin: &Plugin<S>,
917    ) -> Result<(), Error>
918    where
919        I: Send + AsyncReadExt + Unpin,
920    {
921        match input.next().await {
922            Some(Ok(msg)) => {
923                trace!("Received a message: {:?}", msg);
924                match msg {
925                    messages::JsonRpc::Request(_id, _p) => {
926                        todo!(
927                            "This is unreachable until we start filling in messages:Request. Until then the custom dispatcher below is used exclusively."
928                        );
929                    }
930                    messages::JsonRpc::Notification(_n) => {
931                        todo!(
932                            "As soon as we define the full structure of the messages::Notification we'll get here. Until then the custom dispatcher below is used."
933                        )
934                    }
935                    messages::JsonRpc::CustomRequest(id, request) => {
936                        trace!("Dispatching custom method {:?}", request);
937                        let method = request
938                            .get("method")
939                            .context("Missing 'method' in request")?
940                            .as_str()
941                            .context("'method' is not a string")?;
942                        let callback = match method {
943                            name if name.eq("setconfig") => {
944                                self.setconfig_callback.as_ref().ok_or_else(|| {
945                                    anyhow!("No handler for method '{}' registered", method)
946                                })?
947                            }
948                            _ => self.rpcmethods.get(method).with_context(|| {
949                                anyhow!("No handler for method '{}' registered", method)
950                            })?,
951                        };
952                        let params = request
953                            .get("params")
954                            .context("Missing 'params' field in request")?
955                            .clone();
956
957                        let plugin = plugin.clone();
958                        let call = callback(plugin.clone(), params);
959
960                        tokio::spawn(async move {
961                            match call.await {
962                                Ok(v) => plugin
963                                    .sender
964                                    .send(json!({
965                                    "jsonrpc": "2.0",
966                                    "id": id,
967                                    "result": v
968                                    }))
969                                    .await
970                                    .context("returning custom response"),
971                                Err(e) => plugin
972                                    .sender
973                                    .send(json!({
974                                    "jsonrpc": "2.0",
975                                    "id": id,
976                                    "error": parse_error(e.to_string()),
977                                    }))
978                                    .await
979                                    .context("returning custom error"),
980                            }
981                        });
982                        Ok(())
983                    }
984                    messages::JsonRpc::CustomNotification(request) => {
985                        // This code handles notifications
986                        trace!("Dispatching custom notification {:?}", request);
987                        let method = request
988                            .get("method")
989                            .context("Missing 'method' in request")?
990                            .as_str()
991                            .context("'method' is not a string")?;
992
993                        let params = request
994                            .get("params")
995                            .context("Missing 'params' field in request")?;
996
997                        // Send to notification to the wildcard
998                        // subscription "*" it it exists
999                        match &self.wildcard_subscription {
1000                            Some(cb) => {
1001                                let call = cb(plugin.clone(), params.clone());
1002                                tokio::spawn(async move {
1003                                    if let Err(e) = call.await {
1004                                        log::warn!("Wildcard notification handler error: '{}'", e)
1005                                    }
1006                                });
1007                            }
1008                            None => {}
1009                        };
1010
1011                        // Find the appropriate callback and process it
1012                        // We'll log a warning if no handler is defined
1013                        match self.subscriptions.get(method) {
1014                            Some(cb) => {
1015                                let call = cb(plugin.clone(), params.clone());
1016                                tokio::spawn(async move {
1017                                    if let Err(e) = call.await {
1018                                        log::warn!("Notification handler error: '{}'", e)
1019                                    }
1020                                });
1021                            }
1022                            None => {
1023                                if self.wildcard_subscription.is_none() {
1024                                    log::warn!(
1025                                        "No handler for notification '{}' registered",
1026                                        method
1027                                    );
1028                                }
1029                            }
1030                        };
1031                        Ok(())
1032                    }
1033                }
1034            }
1035            Some(Err(e)) => Err(anyhow!("Error reading command: {}", e)),
1036            None => Err(anyhow!("Error reading from master")),
1037        }
1038    }
1039}
1040
1041impl<S> Plugin<S>
1042where
1043    S: Clone + Send,
1044{
1045    pub fn options(&self) -> Vec<UntypedConfigOption> {
1046        self.options.values().cloned().collect()
1047    }
1048    pub fn configuration(&self) -> Configuration {
1049        self.configuration.clone()
1050    }
1051    pub fn state(&self) -> &S {
1052        &self.state
1053    }
1054}
1055
1056impl<S> Plugin<S>
1057where
1058    S: Send + Clone,
1059{
1060    pub async fn send_custom_notification(
1061        &self,
1062        method: String,
1063        v: serde_json::Value,
1064    ) -> Result<(), Error> {
1065        // Modern has them inside object of same name.
1066        // This is deprecated, scheduled for removal 26.09.
1067        let mut params = match &v {
1068            serde_json::Value::Object(map) => map.clone(),
1069            _ => return Err(anyhow::anyhow!("params must be a JSON object")),
1070        };
1071        params.insert(method.clone(), json!(v));
1072
1073        self.sender
1074            .send(json!({
1075                "jsonrpc": "2.0",
1076                "method": method,
1077                "params": params,
1078            }))
1079            .await
1080            .context("sending custom notification")?;
1081        Ok(())
1082    }
1083
1084    /// Wait for plugin shutdown
1085    pub async fn join(&self) -> Result<(), Error> {
1086        self.wait_handle
1087            .subscribe()
1088            .recv()
1089            .await
1090            .context("error waiting for shutdown")
1091    }
1092
1093    /// Request plugin shutdown
1094    pub fn shutdown(&self) -> Result<(), Error> {
1095        self.wait_handle
1096            .send(())
1097            .context("error waiting for shutdown")?;
1098        Ok(())
1099    }
1100}
1101
1102pub enum FeatureBitsKind {
1103    Node,
1104    Channel,
1105    Invoice,
1106    Init,
1107}
1108
1109#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)]
1110struct RpcError {
1111    pub code: Option<i32>,
1112    pub message: String,
1113    pub data: Option<serde_json::Value>,
1114}
1115fn parse_error(error: String) -> RpcError {
1116    match serde_json::from_str::<RpcError>(&error) {
1117        Ok(o) => o,
1118        Err(_) => RpcError {
1119            code: Some(-32700),
1120            message: error,
1121            data: None,
1122        },
1123    }
1124}
1125
1126#[cfg(test)]
1127mod test {
1128    use super::*;
1129
1130    #[tokio::test]
1131    async fn init() {
1132        let state = ();
1133        let builder = Builder::new(tokio::io::stdin(), tokio::io::stdout());
1134        let _ = builder.start(state);
1135    }
1136}