Skip to main content

cln_plugin/
lib.rs

1use crate::codec::{JsonCodec, JsonRpcCodec};
2pub use anyhow::anyhow;
3use anyhow::{Context, Result};
4use futures::sink::SinkExt;
5use serde::Serialize;
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7extern crate log;
8use log::trace;
9use messages::{Configuration, FeatureBits, NotificationTopic};
10use options::{OptionType, UntypedConfigOption};
11use std::collections::HashMap;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use tokio::io::{AsyncRead, AsyncWrite};
16use tokio::sync::Mutex;
17use tokio_stream::StreamExt;
18use tokio_util::codec::FramedRead;
19use tokio_util::codec::FramedWrite;
20
21mod codec;
22mod logging;
23pub mod messages;
24
25#[macro_use]
26extern crate serde_json;
27
28pub mod options;
29
30/// Need to tell us about something that went wrong? Use this error
31/// type to do that. Use this alias to be safe from future changes in
32/// our internal error handling, since we'll implement any necessary
33/// conversions for you :-)
34pub type Error = anyhow::Error;
35
36/// Builder for a new plugin.
37pub struct Builder<S, I, O>
38where
39    I: AsyncRead + Unpin,
40    O: Send + AsyncWrite + Unpin,
41    S: Clone + Send,
42{
43    input: Option<I>,
44    output: Option<O>,
45
46    hooks: HashMap<String, Hook<S>>,
47    options: HashMap<String, UntypedConfigOption>,
48    option_values: HashMap<String, Option<options::Value>>,
49    rpcmethods: HashMap<String, RpcMethod<S>>,
50    setconfig_callback: Option<AsyncCallback<S>>,
51    subscriptions: HashMap<String, Subscription<S>>,
52    // Contains a Subscription if the user subscribed to "*"
53    wildcard_subscription: Option<Subscription<S>>,
54    notifications: Vec<NotificationTopic>,
55    custommessages: Vec<u16>,
56    featurebits: FeatureBits,
57    dynamic: bool,
58    // Do we want the plugin framework to automatically register a logging handler?
59    logging: bool,
60}
61
62/// A plugin that has registered with the lightning daemon, and gotten
63/// its options filled, however has not yet acknowledged the `init`
64/// message. This is a mid-state allowing a plugin to disable itself,
65/// based on the options.
66pub struct ConfiguredPlugin<S, I, O>
67where
68    S: Clone + Send,
69{
70    init_id: serde_json::Value,
71    input: FramedRead<I, JsonRpcCodec>,
72    output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
73    options: HashMap<String, UntypedConfigOption>,
74    option_values: HashMap<String, Option<options::Value>>,
75    configuration: Configuration,
76    rpcmethods: HashMap<String, AsyncCallback<S>>,
77    setconfig_callback: Option<AsyncCallback<S>>,
78    hooks: HashMap<String, AsyncCallback<S>>,
79    subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
80    wildcard_subscription: Option<AsyncNotificationCallback<S>>,
81    #[allow(dead_code)] // unsure why rust thinks this field isn't used
82    notifications: Vec<NotificationTopic>,
83}
84
85/// The [PluginDriver] is used to run the IO loop, reading messages
86/// from the Lightning daemon, dispatching calls and notifications to
87/// the plugin, and returning responses to the the daemon. We also use
88/// it to handle spontaneous messages like Notifications and logging
89/// events.
90struct PluginDriver<S>
91where
92    S: Send + Clone,
93{
94    plugin: Plugin<S>,
95    rpcmethods: HashMap<String, AsyncCallback<S>>,
96    setconfig_callback: Option<AsyncCallback<S>>,
97
98    #[allow(dead_code)] // Unused until we fill in the Hook structs.
99    hooks: HashMap<String, AsyncCallback<S>>,
100    subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
101    wildcard_subscription: Option<AsyncNotificationCallback<S>>,
102}
103
104#[derive(Clone)]
105pub struct Plugin<S>
106where
107    S: Clone + Send,
108{
109    /// The state gets cloned for each request
110    state: S,
111    /// "options" field of "init" message sent by cln
112    options: HashMap<String, UntypedConfigOption>,
113    option_values: Arc<std::sync::Mutex<HashMap<String, Option<options::Value>>>>,
114    /// "configuration" field of "init" message sent by cln
115    configuration: Configuration,
116    /// A signal that allows us to wait on the plugin's shutdown.
117    wait_handle: tokio::sync::broadcast::Sender<()>,
118
119    sender: tokio::sync::mpsc::Sender<serde_json::Value>,
120}
121
122impl<S, I, O> Builder<S, I, O>
123where
124    O: Send + AsyncWrite + Unpin + 'static,
125    S: Clone + Sync + Send + 'static,
126    I: AsyncRead + Send + Unpin + 'static,
127{
128    pub fn new(input: I, output: O) -> Self {
129        Self {
130            input: Some(input),
131            output: Some(output),
132            hooks: HashMap::new(),
133            subscriptions: HashMap::new(),
134            wildcard_subscription: None,
135            options: HashMap::new(),
136            // Should not be configured by user.
137            // This values are set when parsing the init-call
138            option_values: HashMap::new(),
139            rpcmethods: HashMap::new(),
140            setconfig_callback: None,
141            notifications: vec![],
142            featurebits: FeatureBits::default(),
143            dynamic: false,
144            custommessages: vec![],
145            logging: true,
146        }
147    }
148
149    pub fn option<'a, V: options::OptionType<'a>>(
150        mut self,
151        opt: options::ConfigOption<'a, V>,
152    ) -> Builder<S, I, O> {
153        self.options.insert(opt.name().to_string(), opt.build());
154        self
155    }
156
157    pub fn notification(mut self, notif: messages::NotificationTopic) -> Builder<S, I, O> {
158        self.notifications.push(notif);
159        self
160    }
161
162    /// Subscribe to notifications for the given `topic`. The handler
163    /// is an async function that takes a `Plugin<S>` and the
164    /// notification as a `serde_json::Value` as inputs. Since
165    /// notifications do not expect a result the handler should only
166    /// report errors while processing. Any error reported while
167    /// processing the notification will be logged in the cln logs.
168    ///
169    /// ```
170    /// use cln_plugin::{options, Builder, Error, Plugin};
171    ///
172    /// async fn connect_handler(_p: Plugin<()>, v: serde_json::Value) -> Result<(), Error> {
173    ///     println!("Got a connect notification: {}", v);
174    ///     Ok(())
175    /// }
176    ///
177    /// let b = Builder::new(tokio::io::stdin(), tokio::io::stdout())
178    ///     .subscribe("connect", connect_handler);
179    /// ```
180    pub fn subscribe<C, F>(mut self, topic: &str, callback: C) -> Builder<S, I, O>
181    where
182        C: Send + Sync + 'static,
183        C: Fn(Plugin<S>, Request) -> F + 'static,
184        F: Future<Output = Result<(), Error>> + Send + 'static,
185    {
186        let subscription = Subscription {
187            callback: Box::new(move |p, r| Box::pin(callback(p, r))),
188        };
189
190        if topic == "*" {
191            self.wildcard_subscription = Some(subscription);
192        } else {
193            self.subscriptions.insert(topic.to_string(), subscription);
194        };
195        self
196    }
197
198    /// Add a subscription to a given `hookname`
199    pub fn hook<C, F>(mut self, hookname: &str, callback: C) -> Self
200    where
201        C: Send + Sync + 'static,
202        C: Fn(Plugin<S>, Request) -> F + 'static,
203        F: Future<Output = Response> + Send + 'static,
204    {
205        self.hooks.insert(
206            hookname.to_string(),
207            Hook {
208                name: hookname.to_string(),
209                callback: Box::new(move |p, r| Box::pin(callback(p, r))),
210                before: Vec::new(),
211                after: Vec::new(),
212                filters: None,
213            },
214        );
215        self
216    }
217
218    pub fn hook_from_builder(mut self, hook: HookBuilder<S>) -> Builder<S, I, O> {
219        self.hooks.insert(hook.name.clone(), hook.build());
220        self
221    }
222
223    /// Register a custom RPC method for the RPC passthrough from the
224    /// main daemon
225    pub fn rpcmethod<C, F>(mut self, name: &str, description: &str, callback: C) -> Builder<S, I, O>
226    where
227        C: Send + Sync + 'static,
228        C: Fn(Plugin<S>, Request) -> F + 'static,
229        F: Future<Output = Response> + Send + 'static,
230    {
231        self.rpcmethods.insert(
232            name.to_string(),
233            RpcMethod {
234                name: name.to_string(),
235                description: description.to_string(),
236                usage: String::default(),
237                callback: Box::new(move |p, r| Box::pin(callback(p, r))),
238            },
239        );
240        self
241    }
242
243    pub fn rpcmethod_from_builder(mut self, rpc_method: RpcMethodBuilder<S>) -> Builder<S, I, O> {
244        self.rpcmethods
245            .insert(rpc_method.name.to_string(), rpc_method.build());
246        self
247    }
248
249    /// Register a callback for setconfig to accept changes for dynamic options
250    pub fn setconfig_callback<C, F>(mut self, setconfig_callback: C) -> Builder<S, I, O>
251    where
252        C: Send + Sync + 'static,
253        C: Fn(Plugin<S>, Request) -> F + 'static,
254        F: Future<Output = Response> + Send + 'static,
255    {
256        self.setconfig_callback = Some(Box::new(move |p, r| Box::pin(setconfig_callback(p, r))));
257        self
258    }
259
260    /// Send true value for "dynamic" field in "getmanifest" response
261    pub fn dynamic(mut self) -> Builder<S, I, O> {
262        self.dynamic = true;
263        self
264    }
265
266    /// Sets the "featurebits" in the "getmanifest" response
267    pub fn featurebits(mut self, kind: FeatureBitsKind, hex: String) -> Self {
268        match kind {
269            FeatureBitsKind::Node => self.featurebits.node = Some(hex),
270            FeatureBitsKind::Channel => self.featurebits.channel = Some(hex),
271            FeatureBitsKind::Init => self.featurebits.init = Some(hex),
272            FeatureBitsKind::Invoice => self.featurebits.invoice = Some(hex),
273        }
274        self
275    }
276
277    /// Should the plugin automatically register a logging handler? If
278    /// not you may need to register a logging handler yourself. Be
279    /// careful not to print raw lines to `stdout` if you do, since
280    /// that'll interfere with the plugin communication. See the CLN
281    /// documentation on logging to see what logging events should
282    /// look like.
283    pub fn with_logging(mut self, log: bool) -> Builder<S, I, O> {
284        self.logging = log;
285        self
286    }
287
288    /// Tells lightningd explicitly to allow custommmessages of the provided
289    /// type
290    pub fn custommessages(mut self, custommessages: Vec<u16>) -> Self {
291        self.custommessages = custommessages;
292        self
293    }
294
295    /// Communicate with `lightningd` to tell it about our options,
296    /// RPC methods and subscribe to hooks, and then process the
297    /// initialization, configuring the plugin.
298    ///
299    /// Returns `None` if we were invoked with `--help` and thus
300    /// should exit after this handshake
301    pub async fn configure(mut self) -> Result<Option<ConfiguredPlugin<S, I, O>>, anyhow::Error> {
302        let mut input = FramedRead::new(self.input.take().unwrap(), JsonRpcCodec::default());
303
304        // Sadly we need to wrap the output in a mutex in order to
305        // enable early logging, i.e., logging that is done before the
306        // PluginDriver is processing events during the
307        // handshake. Otherwise we could just write the log events to
308        // the event queue and have the PluginDriver be the sole owner
309        // of `Stdout`.
310        let output = Arc::new(Mutex::new(FramedWrite::new(
311            self.output.take().unwrap(),
312            JsonCodec::default(),
313        )));
314
315        // Now configure the logging, so any `log` call is wrapped
316        // in a JSON-RPC notification and sent to Core Lightning
317        if self.logging {
318            crate::logging::init(output.clone()).await?;
319            trace!("Plugin logging initialized");
320        }
321
322        // Read the `getmanifest` message:
323        match input.next().await {
324            Some(Ok(messages::JsonRpc::Request(id, messages::Request::Getmanifest(m)))) => {
325                output
326                    .lock()
327                    .await
328                    .send(json!({
329                        "jsonrpc": "2.0",
330                        "result": self.handle_get_manifest(m),
331                        "id": id,
332                    }))
333                    .await?
334            }
335            Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
336            None => {
337                return Err(anyhow!(
338                    "Lost connection to lightning expecting getmanifest"
339                ))
340            }
341        };
342        let (init_id, configuration) = match input.next().await {
343            Some(Ok(messages::JsonRpc::Request(id, messages::Request::Init(m)))) => {
344                (id, self.handle_init(m)?)
345            }
346
347            Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
348            None => {
349                // If we are being called with --help we will get
350                // disconnected here. That's expected, so don't
351                // complain about it.
352                return Ok(None);
353            }
354        };
355
356        // TODO Split the two hashmaps once we fill in the hook
357        // payload structs in messages.rs
358        let mut rpcmethods: HashMap<String, AsyncCallback<S>> =
359            HashMap::from_iter(self.rpcmethods.drain().map(|(k, v)| (k, v.callback)));
360        rpcmethods.extend(self.hooks.drain().map(|(k, v)| (k, v.callback)));
361
362        let subscriptions =
363            HashMap::from_iter(self.subscriptions.drain().map(|(k, v)| (k, v.callback)));
364        let all_subscription = self.wildcard_subscription.map(|s| s.callback);
365
366        // Leave the `init` reply pending, so we can disable based on
367        // the options if required.
368        Ok(Some(ConfiguredPlugin {
369            // The JSON-RPC `id` field so we can reply correctly.
370            init_id,
371            input,
372            output,
373            rpcmethods,
374            setconfig_callback: self.setconfig_callback,
375            notifications: self.notifications,
376            subscriptions,
377            wildcard_subscription: all_subscription,
378            options: self.options,
379            option_values: self.option_values,
380            configuration,
381            hooks: HashMap::new(),
382        }))
383    }
384
385    /// Build and start the plugin loop. This performs the handshake
386    /// and spawns a new task that accepts incoming messages from
387    /// Core Lightning and dispatches them to the handlers. It only
388    /// returns after completing the handshake to ensure that the
389    /// configuration and initialization was successfull.
390    ///
391    /// If `lightningd` was called with `--help` we won't get a
392    /// `Plugin` instance and return `None` instead. This signals that
393    /// we should exit, and not continue running. `start()` returns in
394    /// order to allow user code to perform cleanup if necessary.
395    pub async fn start(self, state: S) -> Result<Option<Plugin<S>>, anyhow::Error> {
396        if let Some(cp) = self.configure().await? {
397            Ok(Some(cp.start(state).await?))
398        } else {
399            Ok(None)
400        }
401    }
402
403    fn handle_get_manifest(
404        &mut self,
405        _call: messages::GetManifestCall,
406    ) -> messages::GetManifestResponse {
407        let rpcmethods: Vec<_> = self
408            .rpcmethods
409            .values()
410            .map(|v| messages::RpcMethod {
411                name: v.name.clone(),
412                description: v.description.clone(),
413                usage: v.usage.clone(),
414            })
415            .collect();
416
417        let subscriptions = self
418            .subscriptions
419            .keys()
420            .map(|s| s.clone())
421            .chain(self.wildcard_subscription.iter().map(|_| String::from("*")))
422            .collect();
423
424        let hooks: Vec<messages::Hook> = self
425            .hooks
426            .values()
427            .map(|v| messages::Hook {
428                name: v.name.clone(),
429                before: v.before.clone(),
430                after: v.after.clone(),
431                filters: v.filters.clone(),
432            })
433            .collect();
434
435        messages::GetManifestResponse {
436            options: self.options.values().cloned().collect(),
437            subscriptions,
438            hooks,
439            rpcmethods,
440            notifications: self.notifications.clone(),
441            featurebits: self.featurebits.clone(),
442            dynamic: self.dynamic,
443            nonnumericids: true,
444            custommessages: self.custommessages.clone(),
445        }
446    }
447
448    fn handle_init(&mut self, call: messages::InitCall) -> Result<Configuration, Error> {
449        use options::Value as OValue;
450        use serde_json::Value as JValue;
451
452        // Match up the ConfigOptions and fill in their values if we
453        // have a matching entry.
454        for (name, option) in self.options.iter() {
455            let json_value = call.options.get(name);
456            let default_value = option.default();
457
458            let option_value: Option<options::Value> = match (json_value, default_value) {
459                (None, None) => None,
460                (None, Some(default)) => Some(default.clone()),
461                (Some(JValue::Array(a)), _) => match a.first() {
462                    Some(JValue::String(_)) => Some(OValue::StringArray(
463                        a.iter().map(|x| x.as_str().unwrap().to_string()).collect(),
464                    )),
465                    Some(JValue::Number(_)) => Some(OValue::IntegerArray(
466                        a.iter().map(|x| x.as_i64().unwrap()).collect(),
467                    )),
468                    _ => panic!("Array type not supported for option: {}", name),
469                },
470                (Some(JValue::String(s)), _) => Some(OValue::String(s.to_string())),
471                (Some(JValue::Number(i)), _) => Some(OValue::Integer(i.as_i64().unwrap())),
472                (Some(JValue::Bool(b)), _) => Some(OValue::Boolean(*b)),
473                _ => panic!("Type mismatch for option {}", name),
474            };
475
476            self.option_values.insert(name.to_string(), option_value);
477        }
478        Ok(call.configuration)
479    }
480}
481
482impl<S> HookBuilder<S>
483where
484    S: Send + Clone,
485{
486    pub fn new<C, F>(name: &str, callback: C) -> Self
487    where
488        C: Send + Sync + 'static,
489        C: Fn(Plugin<S>, Request) -> F + 'static,
490        F: Future<Output = Response> + Send + 'static,
491    {
492        Self {
493            name: name.to_string(),
494            callback: Box::new(move |p, r| Box::pin(callback(p, r))),
495            before: Vec::new(),
496            after: Vec::new(),
497            filters: None,
498        }
499    }
500
501    pub fn before(mut self, before: Vec<String>) -> Self {
502        self.before = before;
503        self
504    }
505
506    pub fn after(mut self, after: Vec<String>) -> Self {
507        self.after = after;
508        self
509    }
510
511    pub fn filters(mut self, filters: Vec<HookFilter>) -> Self {
512        // Empty Vec would filter everything, must be None instead to not get serialized
513        if filters.is_empty() {
514            self.filters = None;
515        } else {
516            self.filters = Some(filters);
517        }
518        self
519    }
520
521    fn build(self) -> Hook<S> {
522        Hook {
523            callback: self.callback,
524            name: self.name,
525            before: self.before,
526            after: self.after,
527            filters: self.filters,
528        }
529    }
530}
531
532impl<S> RpcMethodBuilder<S>
533where
534    S: Send + Clone,
535{
536    pub fn new<C, F>(name: &str, callback: C) -> Self
537    where
538        C: Send + Sync + 'static,
539        C: Fn(Plugin<S>, Request) -> F + 'static,
540        F: Future<Output = Response> + Send + 'static,
541    {
542        Self {
543            name: name.to_string(),
544            callback: Box::new(move |p, r| Box::pin(callback(p, r))),
545            usage: None,
546            description: None,
547        }
548    }
549
550    pub fn description(mut self, description: &str) -> Self {
551        self.description = Some(description.to_string());
552        self
553    }
554
555    pub fn usage(mut self, usage: &str) -> Self {
556        self.usage = Some(usage.to_string());
557        self
558    }
559
560    fn build(self) -> RpcMethod<S> {
561        RpcMethod {
562            callback: self.callback,
563            name: self.name,
564            description: self.description.unwrap_or_default(),
565            usage: self.usage.unwrap_or_default(),
566        }
567    }
568}
569
570// Just some type aliases so we don't get confused in a lisp-like sea
571// of parentheses.
572type Request = serde_json::Value;
573type Response = Result<serde_json::Value, Error>;
574type AsyncCallback<S> =
575    Box<dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Response> + Send>> + Send + Sync>;
576type AsyncNotificationCallback<S> = Box<
577    dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
578        + Send
579        + Sync,
580>;
581
582/// A struct collecting the metadata required to register a custom
583/// rpcmethod with the main daemon upon init. It'll get deconstructed
584/// into just the callback after the init.
585struct RpcMethod<S>
586where
587    S: Clone + Send,
588{
589    callback: AsyncCallback<S>,
590    description: String,
591    name: String,
592    usage: String,
593}
594
595pub struct RpcMethodBuilder<S>
596where
597    S: Clone + Send,
598{
599    callback: AsyncCallback<S>,
600    name: String,
601    description: Option<String>,
602    usage: Option<String>,
603}
604
605struct Subscription<S>
606where
607    S: Clone + Send,
608{
609    callback: AsyncNotificationCallback<S>,
610}
611
612struct Hook<S>
613where
614    S: Clone + Send,
615{
616    name: String,
617    callback: AsyncCallback<S>,
618    before: Vec<String>,
619    after: Vec<String>,
620    filters: Option<Vec<HookFilter>>,
621}
622
623pub struct HookBuilder<S>
624where
625    S: Clone + Send,
626{
627    name: String,
628    callback: AsyncCallback<S>,
629    before: Vec<String>,
630    after: Vec<String>,
631    filters: Option<Vec<HookFilter>>,
632}
633
634#[derive(Debug, Clone, Serialize)]
635#[serde(untagged)]
636pub enum HookFilter {
637    Str(String),
638    Int(i64),
639}
640
641impl<S> Plugin<S>
642where
643    S: Clone + Send,
644{
645    pub fn option_str(&self, name: &str) -> Result<Option<options::Value>> {
646        self.option_values
647            .lock()
648            .unwrap()
649            .get(name)
650            .ok_or(anyhow!("No option named {}", name))
651            .cloned()
652    }
653
654    pub fn option<'a, OV: OptionType<'a>>(
655        &self,
656        config_option: &options::ConfigOption<'a, OV>,
657    ) -> Result<OV::OutputValue> {
658        let value = self.option_str(config_option.name())?;
659        Ok(OV::from_value(&value))
660    }
661
662    pub fn set_option_str(&self, name: &str, value: options::Value) -> Result<()> {
663        *self
664            .option_values
665            .lock()
666            .unwrap()
667            .get_mut(name)
668            .ok_or(anyhow!("No option named {}", name))? = Some(value);
669        Ok(())
670    }
671
672    pub fn set_option<'a, OV: OptionType<'a>>(
673        &self,
674        config_option: &options::ConfigOption<'a, OV>,
675        value: options::Value,
676    ) -> Result<()> {
677        self.set_option_str(config_option.name(), value)?;
678        Ok(())
679    }
680}
681
682impl<S, I, O> ConfiguredPlugin<S, I, O>
683where
684    S: Send + Clone + Sync + 'static,
685    I: AsyncRead + Send + Unpin + 'static,
686    O: Send + AsyncWrite + Unpin + 'static,
687{
688    #[allow(unused_mut)]
689    pub async fn start(mut self, state: S) -> Result<Plugin<S>, anyhow::Error> {
690        let output = self.output;
691        let input = self.input;
692        let (wait_handle, _) = tokio::sync::broadcast::channel(1);
693
694        // An MPSC pair used by anything that needs to send messages
695        // to the main daemon.
696        let (sender, receiver) = tokio::sync::mpsc::channel(4);
697
698        let plugin = Plugin {
699            state,
700            options: self.options,
701            option_values: Arc::new(std::sync::Mutex::new(self.option_values)),
702            configuration: self.configuration,
703            wait_handle,
704            sender,
705        };
706
707        let driver = PluginDriver {
708            plugin: plugin.clone(),
709            rpcmethods: self.rpcmethods,
710            setconfig_callback: self.setconfig_callback,
711            hooks: self.hooks,
712            subscriptions: self.subscriptions,
713            wildcard_subscription: self.wildcard_subscription,
714        };
715
716        output
717            .lock()
718            .await
719            .send(json!(
720                {
721                    "jsonrpc": "2.0",
722                    "id": self.init_id,
723            "result": crate::messages::InitResponse{disable: None}
724                }
725            ))
726            .await
727            .context("sending init response")?;
728
729        let joiner = plugin.wait_handle.clone();
730        // Start the PluginDriver to handle plugin IO
731        tokio::spawn(async move {
732            if let Err(e) = driver.run(receiver, input, output).await {
733                log::warn!("Plugin loop returned error {:?}", e);
734            }
735
736            // Now that we have left the reader loop its time to
737            // notify any waiting tasks. This most likely will cause
738            // the main task to exit and the plugin to terminate.
739            joiner.send(())
740        });
741        Ok(plugin)
742    }
743
744    /// Abort the plugin startup. Communicate that we're about to exit
745    /// voluntarily, and this is not an error.
746    #[allow(unused_mut)]
747    pub async fn disable(mut self, reason: &str) -> Result<(), anyhow::Error> {
748        self.output
749            .lock()
750            .await
751            .send(json!(
752                {
753                    "jsonrpc": "2.0",
754                    "id": self.init_id,
755            "result": crate::messages::InitResponse{
756            disable: Some(reason.to_string())
757            }
758                }
759            ))
760            .await
761            .context("sending init response")?;
762        Ok(())
763    }
764
765    pub fn option_str(&self, name: &str) -> Result<Option<options::Value>> {
766        self.option_values
767            .get(name)
768            .ok_or(anyhow!("No option named '{}'", name))
769            .map(|c| c.clone())
770    }
771
772    pub fn option<'a, OV: OptionType<'a>>(
773        &self,
774        config_option: &options::ConfigOption<'a, OV>,
775    ) -> Result<OV::OutputValue> {
776        let value = self.option_str(config_option.name())?;
777        Ok(OV::from_value(&value))
778    }
779
780    /// return the cln configuration send to the
781    /// plugin after the initialization.
782    pub fn configuration(&self) -> Configuration {
783        self.configuration.clone()
784    }
785}
786
787impl<S> PluginDriver<S>
788where
789    S: Send + Clone,
790{
791    /// Run the plugin until we get a shutdown command.
792    async fn run<I, O>(
793        self,
794        mut receiver: tokio::sync::mpsc::Receiver<serde_json::Value>,
795        mut input: FramedRead<I, JsonRpcCodec>,
796        output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
797    ) -> Result<(), Error>
798    where
799        I: Send + AsyncReadExt + Unpin,
800        O: Send + AsyncWriteExt + Unpin,
801    {
802        loop {
803            // If we encounter any error reading or writing from/to
804            // the master we hand them up, so we can return control to
805            // the user-code, which may require some cleanups or
806            // similar.
807            tokio::select! {
808                    e = self.dispatch_one(&mut input, &self.plugin) => {
809                        if let Err(e) = e {
810                return Err(e)
811                        }
812            },
813            v = receiver.recv() => {
814                        output.lock().await.send(
815                v.context("internal communication error")?
816                        ).await?;
817            },
818                }
819        }
820    }
821
822    /// Dispatch one server-side event and then return. Just so we
823    /// have a nicer looking `select` statement in `run` :-)
824    async fn dispatch_one<I>(
825        &self,
826        input: &mut FramedRead<I, JsonRpcCodec>,
827        plugin: &Plugin<S>,
828    ) -> Result<(), Error>
829    where
830        I: Send + AsyncReadExt + Unpin,
831    {
832        match input.next().await {
833            Some(Ok(msg)) => {
834                trace!("Received a message: {:?}", msg);
835                match msg {
836                    messages::JsonRpc::Request(_id, _p) => {
837                        todo!("This is unreachable until we start filling in messages:Request. Until then the custom dispatcher below is used exclusively.");
838                    }
839                    messages::JsonRpc::Notification(_n) => {
840                        todo!("As soon as we define the full structure of the messages::Notification we'll get here. Until then the custom dispatcher below is used.")
841                    }
842                    messages::JsonRpc::CustomRequest(id, request) => {
843                        trace!("Dispatching custom method {:?}", request);
844                        let method = request
845                            .get("method")
846                            .context("Missing 'method' in request")?
847                            .as_str()
848                            .context("'method' is not a string")?;
849                        let callback = match method {
850                            name if name.eq("setconfig") => {
851                                self.setconfig_callback.as_ref().ok_or_else(|| {
852                                    anyhow!("No handler for method '{}' registered", method)
853                                })?
854                            }
855                            _ => self.rpcmethods.get(method).with_context(|| {
856                                anyhow!("No handler for method '{}' registered", method)
857                            })?,
858                        };
859                        let params = request
860                            .get("params")
861                            .context("Missing 'params' field in request")?
862                            .clone();
863
864                        let plugin = plugin.clone();
865                        let call = callback(plugin.clone(), params);
866
867                        tokio::spawn(async move {
868                            match call.await {
869                                Ok(v) => plugin
870                                    .sender
871                                    .send(json!({
872                                    "jsonrpc": "2.0",
873                                    "id": id,
874                                    "result": v
875                                    }))
876                                    .await
877                                    .context("returning custom response"),
878                                Err(e) => plugin
879                                    .sender
880                                    .send(json!({
881                                    "jsonrpc": "2.0",
882                                    "id": id,
883                                    "error": parse_error(e.to_string()),
884                                    }))
885                                    .await
886                                    .context("returning custom error"),
887                            }
888                        });
889                        Ok(())
890                    }
891                    messages::JsonRpc::CustomNotification(request) => {
892                        // This code handles notifications
893                        trace!("Dispatching custom notification {:?}", request);
894                        let method = request
895                            .get("method")
896                            .context("Missing 'method' in request")?
897                            .as_str()
898                            .context("'method' is not a string")?;
899
900                        let params = request
901                            .get("params")
902                            .context("Missing 'params' field in request")?;
903
904                        // Send to notification to the wildcard
905                        // subscription "*" it it exists
906                        match &self.wildcard_subscription {
907                            Some(cb) => {
908                                let call = cb(plugin.clone(), params.clone());
909                                tokio::spawn(async move {
910                                    if let Err(e) = call.await {
911                                        log::warn!("Wildcard notification handler error: '{}'", e)
912                                    }
913                                });
914                            }
915                            None => {}
916                        };
917
918                        // Find the appropriate callback and process it
919                        // We'll log a warning if no handler is defined
920                        match self.subscriptions.get(method) {
921                            Some(cb) => {
922                                let call = cb(plugin.clone(), params.clone());
923                                tokio::spawn(async move {
924                                    if let Err(e) = call.await {
925                                        log::warn!("Notification handler error: '{}'", e)
926                                    }
927                                });
928                            }
929                            None => {
930                                if self.wildcard_subscription.is_none() {
931                                    log::warn!(
932                                        "No handler for notification '{}' registered",
933                                        method
934                                    );
935                                }
936                            }
937                        };
938                        Ok(())
939                    }
940                }
941            }
942            Some(Err(e)) => Err(anyhow!("Error reading command: {}", e)),
943            None => Err(anyhow!("Error reading from master")),
944        }
945    }
946}
947
948impl<S> Plugin<S>
949where
950    S: Clone + Send,
951{
952    pub fn options(&self) -> Vec<UntypedConfigOption> {
953        self.options.values().cloned().collect()
954    }
955    pub fn configuration(&self) -> Configuration {
956        self.configuration.clone()
957    }
958    pub fn state(&self) -> &S {
959        &self.state
960    }
961}
962
963impl<S> Plugin<S>
964where
965    S: Send + Clone,
966{
967    pub async fn send_custom_notification(
968        &self,
969        method: String,
970        v: serde_json::Value,
971    ) -> Result<(), Error> {
972        // Modern has them inside object of same name.
973        // This is deprecated, scheduled for removal 26.09.
974        let mut params = match &v {
975            serde_json::Value::Object(map) => map.clone(),
976            _ => return Err(anyhow::anyhow!("params must be a JSON object")),
977        };
978        params.insert(method.clone(), json!(v));
979
980        self.sender
981            .send(json!({
982                "jsonrpc": "2.0",
983                "method": method,
984                "params": params,
985            }))
986            .await
987            .context("sending custom notification")?;
988        Ok(())
989    }
990
991    /// Wait for plugin shutdown
992    pub async fn join(&self) -> Result<(), Error> {
993        self.wait_handle
994            .subscribe()
995            .recv()
996            .await
997            .context("error waiting for shutdown")
998    }
999
1000    /// Request plugin shutdown
1001    pub fn shutdown(&self) -> Result<(), Error> {
1002        self.wait_handle
1003            .send(())
1004            .context("error waiting for shutdown")?;
1005        Ok(())
1006    }
1007}
1008
1009pub enum FeatureBitsKind {
1010    Node,
1011    Channel,
1012    Invoice,
1013    Init,
1014}
1015
1016#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)]
1017struct RpcError {
1018    pub code: Option<i32>,
1019    pub message: String,
1020    pub data: Option<serde_json::Value>,
1021}
1022fn parse_error(error: String) -> RpcError {
1023    match serde_json::from_str::<RpcError>(&error) {
1024        Ok(o) => o,
1025        Err(_) => RpcError {
1026            code: Some(-32700),
1027            message: error,
1028            data: None,
1029        },
1030    }
1031}
1032
1033#[cfg(test)]
1034mod test {
1035    use super::*;
1036
1037    #[tokio::test]
1038    async fn init() {
1039        let state = ();
1040        let builder = Builder::new(tokio::io::stdin(), tokio::io::stdout());
1041        let _ = builder.start(state);
1042    }
1043}