1use crate::codec::{JsonCodec, JsonRpcCodec};
2pub use anyhow::anyhow;
3use anyhow::{Context, Result};
4use futures::sink::SinkExt;
5use serde::Serialize;
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7extern crate log;
8use log::trace;
9use messages::{Configuration, FeatureBits, NotificationTopic};
10use options::{OptionType, UntypedConfigOption};
11use std::collections::HashMap;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use tokio::io::{AsyncRead, AsyncWrite};
16use tokio::sync::Mutex;
17use tokio_stream::StreamExt;
18use tokio_util::codec::FramedRead;
19use tokio_util::codec::FramedWrite;
20
21mod codec;
22mod logging;
23pub mod messages;
24
25#[macro_use]
26extern crate serde_json;
27
28pub mod options;
29
30pub type Error = anyhow::Error;
35
36pub struct Builder<S, I, O>
38where
39 I: AsyncRead + Unpin,
40 O: Send + AsyncWrite + Unpin,
41 S: Clone + Send,
42{
43 input: Option<I>,
44 output: Option<O>,
45
46 hooks: HashMap<String, Hook<S>>,
47 options: HashMap<String, UntypedConfigOption>,
48 option_values: HashMap<String, Option<options::Value>>,
49 rpcmethods: HashMap<String, RpcMethod<S>>,
50 setconfig_callback: Option<AsyncCallback<S>>,
51 subscriptions: HashMap<String, Subscription<S>>,
52 wildcard_subscription: Option<Subscription<S>>,
54 notifications: Vec<NotificationTopic>,
55 custommessages: Vec<u16>,
56 featurebits: FeatureBits,
57 dynamic: bool,
58 logging: bool,
60}
61
62pub struct ConfiguredPlugin<S, I, O>
67where
68 S: Clone + Send,
69{
70 init_id: serde_json::Value,
71 input: FramedRead<I, JsonRpcCodec>,
72 output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
73 options: HashMap<String, UntypedConfigOption>,
74 option_values: HashMap<String, Option<options::Value>>,
75 configuration: Configuration,
76 rpcmethods: HashMap<String, AsyncCallback<S>>,
77 setconfig_callback: Option<AsyncCallback<S>>,
78 hooks: HashMap<String, AsyncCallback<S>>,
79 subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
80 wildcard_subscription: Option<AsyncNotificationCallback<S>>,
81 #[allow(dead_code)] notifications: Vec<NotificationTopic>,
83}
84
85struct PluginDriver<S>
91where
92 S: Send + Clone,
93{
94 plugin: Plugin<S>,
95 rpcmethods: HashMap<String, AsyncCallback<S>>,
96 setconfig_callback: Option<AsyncCallback<S>>,
97
98 #[allow(dead_code)] hooks: HashMap<String, AsyncCallback<S>>,
100 subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
101 wildcard_subscription: Option<AsyncNotificationCallback<S>>,
102}
103
104#[derive(Clone)]
105pub struct Plugin<S>
106where
107 S: Clone + Send,
108{
109 state: S,
111 options: HashMap<String, UntypedConfigOption>,
113 option_values: Arc<std::sync::Mutex<HashMap<String, Option<options::Value>>>>,
114 configuration: Configuration,
116 wait_handle: tokio::sync::broadcast::Sender<()>,
118
119 sender: tokio::sync::mpsc::Sender<serde_json::Value>,
120}
121
122impl<S, I, O> Builder<S, I, O>
123where
124 O: Send + AsyncWrite + Unpin + 'static,
125 S: Clone + Sync + Send + 'static,
126 I: AsyncRead + Send + Unpin + 'static,
127{
128 pub fn new(input: I, output: O) -> Self {
129 Self {
130 input: Some(input),
131 output: Some(output),
132 hooks: HashMap::new(),
133 subscriptions: HashMap::new(),
134 wildcard_subscription: None,
135 options: HashMap::new(),
136 option_values: HashMap::new(),
139 rpcmethods: HashMap::new(),
140 setconfig_callback: None,
141 notifications: vec![],
142 featurebits: FeatureBits::default(),
143 dynamic: false,
144 custommessages: vec![],
145 logging: true,
146 }
147 }
148
149 pub fn option<'a, V: options::OptionType<'a>>(
150 mut self,
151 opt: options::ConfigOption<'a, V>,
152 ) -> Builder<S, I, O> {
153 self.options.insert(opt.name().to_string(), opt.build());
154 self
155 }
156
157 pub fn notification(mut self, notif: messages::NotificationTopic) -> Builder<S, I, O> {
158 self.notifications.push(notif);
159 self
160 }
161
162 pub fn subscribe<C, F>(mut self, topic: &str, callback: C) -> Builder<S, I, O>
181 where
182 C: Send + Sync + 'static,
183 C: Fn(Plugin<S>, Request) -> F + 'static,
184 F: Future<Output = Result<(), Error>> + Send + 'static,
185 {
186 let subscription = Subscription {
187 callback: Box::new(move |p, r| Box::pin(callback(p, r))),
188 };
189
190 if topic == "*" {
191 self.wildcard_subscription = Some(subscription);
192 } else {
193 self.subscriptions.insert(topic.to_string(), subscription);
194 };
195 self
196 }
197
198 pub fn hook<C, F>(mut self, hookname: &str, callback: C) -> Self
200 where
201 C: Send + Sync + 'static,
202 C: Fn(Plugin<S>, Request) -> F + 'static,
203 F: Future<Output = Response> + Send + 'static,
204 {
205 self.hooks.insert(
206 hookname.to_string(),
207 Hook {
208 name: hookname.to_string(),
209 callback: Box::new(move |p, r| Box::pin(callback(p, r))),
210 before: Vec::new(),
211 after: Vec::new(),
212 filters: None,
213 },
214 );
215 self
216 }
217
218 pub fn hook_from_builder(mut self, hook: HookBuilder<S>) -> Builder<S, I, O> {
219 self.hooks.insert(hook.name.clone(), hook.build());
220 self
221 }
222
223 pub fn rpcmethod<C, F>(mut self, name: &str, description: &str, callback: C) -> Builder<S, I, O>
226 where
227 C: Send + Sync + 'static,
228 C: Fn(Plugin<S>, Request) -> F + 'static,
229 F: Future<Output = Response> + Send + 'static,
230 {
231 self.rpcmethods.insert(
232 name.to_string(),
233 RpcMethod {
234 name: name.to_string(),
235 description: description.to_string(),
236 usage: String::default(),
237 callback: Box::new(move |p, r| Box::pin(callback(p, r))),
238 },
239 );
240 self
241 }
242
243 pub fn rpcmethod_from_builder(mut self, rpc_method: RpcMethodBuilder<S>) -> Builder<S, I, O> {
244 self.rpcmethods
245 .insert(rpc_method.name.to_string(), rpc_method.build());
246 self
247 }
248
249 pub fn setconfig_callback<C, F>(mut self, setconfig_callback: C) -> Builder<S, I, O>
251 where
252 C: Send + Sync + 'static,
253 C: Fn(Plugin<S>, Request) -> F + 'static,
254 F: Future<Output = Response> + Send + 'static,
255 {
256 self.setconfig_callback = Some(Box::new(move |p, r| Box::pin(setconfig_callback(p, r))));
257 self
258 }
259
260 pub fn dynamic(mut self) -> Builder<S, I, O> {
262 self.dynamic = true;
263 self
264 }
265
266 pub fn featurebits(mut self, kind: FeatureBitsKind, hex: String) -> Self {
268 match kind {
269 FeatureBitsKind::Node => self.featurebits.node = Some(hex),
270 FeatureBitsKind::Channel => self.featurebits.channel = Some(hex),
271 FeatureBitsKind::Init => self.featurebits.init = Some(hex),
272 FeatureBitsKind::Invoice => self.featurebits.invoice = Some(hex),
273 }
274 self
275 }
276
277 pub fn with_logging(mut self, log: bool) -> Builder<S, I, O> {
284 self.logging = log;
285 self
286 }
287
288 pub fn custommessages(mut self, custommessages: Vec<u16>) -> Self {
291 self.custommessages = custommessages;
292 self
293 }
294
295 pub async fn configure(mut self) -> Result<Option<ConfiguredPlugin<S, I, O>>, anyhow::Error> {
302 let mut input = FramedRead::new(self.input.take().unwrap(), JsonRpcCodec::default());
303
304 let output = Arc::new(Mutex::new(FramedWrite::new(
311 self.output.take().unwrap(),
312 JsonCodec::default(),
313 )));
314
315 if self.logging {
318 crate::logging::init(output.clone()).await?;
319 trace!("Plugin logging initialized");
320 }
321
322 match input.next().await {
324 Some(Ok(messages::JsonRpc::Request(id, messages::Request::Getmanifest(m)))) => {
325 output
326 .lock()
327 .await
328 .send(json!({
329 "jsonrpc": "2.0",
330 "result": self.handle_get_manifest(m),
331 "id": id,
332 }))
333 .await?
334 }
335 Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
336 None => {
337 return Err(anyhow!(
338 "Lost connection to lightning expecting getmanifest"
339 ))
340 }
341 };
342 let (init_id, configuration) = match input.next().await {
343 Some(Ok(messages::JsonRpc::Request(id, messages::Request::Init(m)))) => {
344 (id, self.handle_init(m)?)
345 }
346
347 Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
348 None => {
349 return Ok(None);
353 }
354 };
355
356 let mut rpcmethods: HashMap<String, AsyncCallback<S>> =
359 HashMap::from_iter(self.rpcmethods.drain().map(|(k, v)| (k, v.callback)));
360 rpcmethods.extend(self.hooks.drain().map(|(k, v)| (k, v.callback)));
361
362 let subscriptions =
363 HashMap::from_iter(self.subscriptions.drain().map(|(k, v)| (k, v.callback)));
364 let all_subscription = self.wildcard_subscription.map(|s| s.callback);
365
366 Ok(Some(ConfiguredPlugin {
369 init_id,
371 input,
372 output,
373 rpcmethods,
374 setconfig_callback: self.setconfig_callback,
375 notifications: self.notifications,
376 subscriptions,
377 wildcard_subscription: all_subscription,
378 options: self.options,
379 option_values: self.option_values,
380 configuration,
381 hooks: HashMap::new(),
382 }))
383 }
384
385 pub async fn start(self, state: S) -> Result<Option<Plugin<S>>, anyhow::Error> {
396 if let Some(cp) = self.configure().await? {
397 Ok(Some(cp.start(state).await?))
398 } else {
399 Ok(None)
400 }
401 }
402
403 fn handle_get_manifest(
404 &mut self,
405 _call: messages::GetManifestCall,
406 ) -> messages::GetManifestResponse {
407 let rpcmethods: Vec<_> = self
408 .rpcmethods
409 .values()
410 .map(|v| messages::RpcMethod {
411 name: v.name.clone(),
412 description: v.description.clone(),
413 usage: v.usage.clone(),
414 })
415 .collect();
416
417 let subscriptions = self
418 .subscriptions
419 .keys()
420 .map(|s| s.clone())
421 .chain(self.wildcard_subscription.iter().map(|_| String::from("*")))
422 .collect();
423
424 let hooks: Vec<messages::Hook> = self
425 .hooks
426 .values()
427 .map(|v| messages::Hook {
428 name: v.name.clone(),
429 before: v.before.clone(),
430 after: v.after.clone(),
431 filters: v.filters.clone(),
432 })
433 .collect();
434
435 messages::GetManifestResponse {
436 options: self.options.values().cloned().collect(),
437 subscriptions,
438 hooks,
439 rpcmethods,
440 notifications: self.notifications.clone(),
441 featurebits: self.featurebits.clone(),
442 dynamic: self.dynamic,
443 nonnumericids: true,
444 custommessages: self.custommessages.clone(),
445 }
446 }
447
448 fn handle_init(&mut self, call: messages::InitCall) -> Result<Configuration, Error> {
449 use options::Value as OValue;
450 use serde_json::Value as JValue;
451
452 for (name, option) in self.options.iter() {
455 let json_value = call.options.get(name);
456 let default_value = option.default();
457
458 let option_value: Option<options::Value> = match (json_value, default_value) {
459 (None, None) => None,
460 (None, Some(default)) => Some(default.clone()),
461 (Some(JValue::Array(a)), _) => match a.first() {
462 Some(JValue::String(_)) => Some(OValue::StringArray(
463 a.iter().map(|x| x.as_str().unwrap().to_string()).collect(),
464 )),
465 Some(JValue::Number(_)) => Some(OValue::IntegerArray(
466 a.iter().map(|x| x.as_i64().unwrap()).collect(),
467 )),
468 _ => panic!("Array type not supported for option: {}", name),
469 },
470 (Some(JValue::String(s)), _) => Some(OValue::String(s.to_string())),
471 (Some(JValue::Number(i)), _) => Some(OValue::Integer(i.as_i64().unwrap())),
472 (Some(JValue::Bool(b)), _) => Some(OValue::Boolean(*b)),
473 _ => panic!("Type mismatch for option {}", name),
474 };
475
476 self.option_values.insert(name.to_string(), option_value);
477 }
478 Ok(call.configuration)
479 }
480}
481
482impl<S> HookBuilder<S>
483where
484 S: Send + Clone,
485{
486 pub fn new<C, F>(name: &str, callback: C) -> Self
487 where
488 C: Send + Sync + 'static,
489 C: Fn(Plugin<S>, Request) -> F + 'static,
490 F: Future<Output = Response> + Send + 'static,
491 {
492 Self {
493 name: name.to_string(),
494 callback: Box::new(move |p, r| Box::pin(callback(p, r))),
495 before: Vec::new(),
496 after: Vec::new(),
497 filters: None,
498 }
499 }
500
501 pub fn before(mut self, before: Vec<String>) -> Self {
502 self.before = before;
503 self
504 }
505
506 pub fn after(mut self, after: Vec<String>) -> Self {
507 self.after = after;
508 self
509 }
510
511 pub fn filters(mut self, filters: Vec<HookFilter>) -> Self {
512 if filters.is_empty() {
514 self.filters = None;
515 } else {
516 self.filters = Some(filters);
517 }
518 self
519 }
520
521 fn build(self) -> Hook<S> {
522 Hook {
523 callback: self.callback,
524 name: self.name,
525 before: self.before,
526 after: self.after,
527 filters: self.filters,
528 }
529 }
530}
531
532impl<S> RpcMethodBuilder<S>
533where
534 S: Send + Clone,
535{
536 pub fn new<C, F>(name: &str, callback: C) -> Self
537 where
538 C: Send + Sync + 'static,
539 C: Fn(Plugin<S>, Request) -> F + 'static,
540 F: Future<Output = Response> + Send + 'static,
541 {
542 Self {
543 name: name.to_string(),
544 callback: Box::new(move |p, r| Box::pin(callback(p, r))),
545 usage: None,
546 description: None,
547 }
548 }
549
550 pub fn description(mut self, description: &str) -> Self {
551 self.description = Some(description.to_string());
552 self
553 }
554
555 pub fn usage(mut self, usage: &str) -> Self {
556 self.usage = Some(usage.to_string());
557 self
558 }
559
560 fn build(self) -> RpcMethod<S> {
561 RpcMethod {
562 callback: self.callback,
563 name: self.name,
564 description: self.description.unwrap_or_default(),
565 usage: self.usage.unwrap_or_default(),
566 }
567 }
568}
569
570type Request = serde_json::Value;
573type Response = Result<serde_json::Value, Error>;
574type AsyncCallback<S> =
575 Box<dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Response> + Send>> + Send + Sync>;
576type AsyncNotificationCallback<S> = Box<
577 dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
578 + Send
579 + Sync,
580>;
581
582struct RpcMethod<S>
586where
587 S: Clone + Send,
588{
589 callback: AsyncCallback<S>,
590 description: String,
591 name: String,
592 usage: String,
593}
594
595pub struct RpcMethodBuilder<S>
596where
597 S: Clone + Send,
598{
599 callback: AsyncCallback<S>,
600 name: String,
601 description: Option<String>,
602 usage: Option<String>,
603}
604
605struct Subscription<S>
606where
607 S: Clone + Send,
608{
609 callback: AsyncNotificationCallback<S>,
610}
611
612struct Hook<S>
613where
614 S: Clone + Send,
615{
616 name: String,
617 callback: AsyncCallback<S>,
618 before: Vec<String>,
619 after: Vec<String>,
620 filters: Option<Vec<HookFilter>>,
621}
622
623pub struct HookBuilder<S>
624where
625 S: Clone + Send,
626{
627 name: String,
628 callback: AsyncCallback<S>,
629 before: Vec<String>,
630 after: Vec<String>,
631 filters: Option<Vec<HookFilter>>,
632}
633
634#[derive(Debug, Clone, Serialize)]
635#[serde(untagged)]
636pub enum HookFilter {
637 Str(String),
638 Int(i64),
639}
640
641impl<S> Plugin<S>
642where
643 S: Clone + Send,
644{
645 pub fn option_str(&self, name: &str) -> Result<Option<options::Value>> {
646 self.option_values
647 .lock()
648 .unwrap()
649 .get(name)
650 .ok_or(anyhow!("No option named {}", name))
651 .cloned()
652 }
653
654 pub fn option<'a, OV: OptionType<'a>>(
655 &self,
656 config_option: &options::ConfigOption<'a, OV>,
657 ) -> Result<OV::OutputValue> {
658 let value = self.option_str(config_option.name())?;
659 Ok(OV::from_value(&value))
660 }
661
662 pub fn set_option_str(&self, name: &str, value: options::Value) -> Result<()> {
663 *self
664 .option_values
665 .lock()
666 .unwrap()
667 .get_mut(name)
668 .ok_or(anyhow!("No option named {}", name))? = Some(value);
669 Ok(())
670 }
671
672 pub fn set_option<'a, OV: OptionType<'a>>(
673 &self,
674 config_option: &options::ConfigOption<'a, OV>,
675 value: options::Value,
676 ) -> Result<()> {
677 self.set_option_str(config_option.name(), value)?;
678 Ok(())
679 }
680}
681
682impl<S, I, O> ConfiguredPlugin<S, I, O>
683where
684 S: Send + Clone + Sync + 'static,
685 I: AsyncRead + Send + Unpin + 'static,
686 O: Send + AsyncWrite + Unpin + 'static,
687{
688 #[allow(unused_mut)]
689 pub async fn start(mut self, state: S) -> Result<Plugin<S>, anyhow::Error> {
690 let output = self.output;
691 let input = self.input;
692 let (wait_handle, _) = tokio::sync::broadcast::channel(1);
693
694 let (sender, receiver) = tokio::sync::mpsc::channel(4);
697
698 let plugin = Plugin {
699 state,
700 options: self.options,
701 option_values: Arc::new(std::sync::Mutex::new(self.option_values)),
702 configuration: self.configuration,
703 wait_handle,
704 sender,
705 };
706
707 let driver = PluginDriver {
708 plugin: plugin.clone(),
709 rpcmethods: self.rpcmethods,
710 setconfig_callback: self.setconfig_callback,
711 hooks: self.hooks,
712 subscriptions: self.subscriptions,
713 wildcard_subscription: self.wildcard_subscription,
714 };
715
716 output
717 .lock()
718 .await
719 .send(json!(
720 {
721 "jsonrpc": "2.0",
722 "id": self.init_id,
723 "result": crate::messages::InitResponse{disable: None}
724 }
725 ))
726 .await
727 .context("sending init response")?;
728
729 let joiner = plugin.wait_handle.clone();
730 tokio::spawn(async move {
732 if let Err(e) = driver.run(receiver, input, output).await {
733 log::warn!("Plugin loop returned error {:?}", e);
734 }
735
736 joiner.send(())
740 });
741 Ok(plugin)
742 }
743
744 #[allow(unused_mut)]
747 pub async fn disable(mut self, reason: &str) -> Result<(), anyhow::Error> {
748 self.output
749 .lock()
750 .await
751 .send(json!(
752 {
753 "jsonrpc": "2.0",
754 "id": self.init_id,
755 "result": crate::messages::InitResponse{
756 disable: Some(reason.to_string())
757 }
758 }
759 ))
760 .await
761 .context("sending init response")?;
762 Ok(())
763 }
764
765 pub fn option_str(&self, name: &str) -> Result<Option<options::Value>> {
766 self.option_values
767 .get(name)
768 .ok_or(anyhow!("No option named '{}'", name))
769 .map(|c| c.clone())
770 }
771
772 pub fn option<'a, OV: OptionType<'a>>(
773 &self,
774 config_option: &options::ConfigOption<'a, OV>,
775 ) -> Result<OV::OutputValue> {
776 let value = self.option_str(config_option.name())?;
777 Ok(OV::from_value(&value))
778 }
779
780 pub fn configuration(&self) -> Configuration {
783 self.configuration.clone()
784 }
785}
786
787impl<S> PluginDriver<S>
788where
789 S: Send + Clone,
790{
791 async fn run<I, O>(
793 self,
794 mut receiver: tokio::sync::mpsc::Receiver<serde_json::Value>,
795 mut input: FramedRead<I, JsonRpcCodec>,
796 output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
797 ) -> Result<(), Error>
798 where
799 I: Send + AsyncReadExt + Unpin,
800 O: Send + AsyncWriteExt + Unpin,
801 {
802 loop {
803 tokio::select! {
808 e = self.dispatch_one(&mut input, &self.plugin) => {
809 if let Err(e) = e {
810 return Err(e)
811 }
812 },
813 v = receiver.recv() => {
814 output.lock().await.send(
815 v.context("internal communication error")?
816 ).await?;
817 },
818 }
819 }
820 }
821
822 async fn dispatch_one<I>(
825 &self,
826 input: &mut FramedRead<I, JsonRpcCodec>,
827 plugin: &Plugin<S>,
828 ) -> Result<(), Error>
829 where
830 I: Send + AsyncReadExt + Unpin,
831 {
832 match input.next().await {
833 Some(Ok(msg)) => {
834 trace!("Received a message: {:?}", msg);
835 match msg {
836 messages::JsonRpc::Request(_id, _p) => {
837 todo!("This is unreachable until we start filling in messages:Request. Until then the custom dispatcher below is used exclusively.");
838 }
839 messages::JsonRpc::Notification(_n) => {
840 todo!("As soon as we define the full structure of the messages::Notification we'll get here. Until then the custom dispatcher below is used.")
841 }
842 messages::JsonRpc::CustomRequest(id, request) => {
843 trace!("Dispatching custom method {:?}", request);
844 let method = request
845 .get("method")
846 .context("Missing 'method' in request")?
847 .as_str()
848 .context("'method' is not a string")?;
849 let callback = match method {
850 name if name.eq("setconfig") => {
851 self.setconfig_callback.as_ref().ok_or_else(|| {
852 anyhow!("No handler for method '{}' registered", method)
853 })?
854 }
855 _ => self.rpcmethods.get(method).with_context(|| {
856 anyhow!("No handler for method '{}' registered", method)
857 })?,
858 };
859 let params = request
860 .get("params")
861 .context("Missing 'params' field in request")?
862 .clone();
863
864 let plugin = plugin.clone();
865 let call = callback(plugin.clone(), params);
866
867 tokio::spawn(async move {
868 match call.await {
869 Ok(v) => plugin
870 .sender
871 .send(json!({
872 "jsonrpc": "2.0",
873 "id": id,
874 "result": v
875 }))
876 .await
877 .context("returning custom response"),
878 Err(e) => plugin
879 .sender
880 .send(json!({
881 "jsonrpc": "2.0",
882 "id": id,
883 "error": parse_error(e.to_string()),
884 }))
885 .await
886 .context("returning custom error"),
887 }
888 });
889 Ok(())
890 }
891 messages::JsonRpc::CustomNotification(request) => {
892 trace!("Dispatching custom notification {:?}", request);
894 let method = request
895 .get("method")
896 .context("Missing 'method' in request")?
897 .as_str()
898 .context("'method' is not a string")?;
899
900 let params = request
901 .get("params")
902 .context("Missing 'params' field in request")?;
903
904 match &self.wildcard_subscription {
907 Some(cb) => {
908 let call = cb(plugin.clone(), params.clone());
909 tokio::spawn(async move {
910 if let Err(e) = call.await {
911 log::warn!("Wildcard notification handler error: '{}'", e)
912 }
913 });
914 }
915 None => {}
916 };
917
918 match self.subscriptions.get(method) {
921 Some(cb) => {
922 let call = cb(plugin.clone(), params.clone());
923 tokio::spawn(async move {
924 if let Err(e) = call.await {
925 log::warn!("Notification handler error: '{}'", e)
926 }
927 });
928 }
929 None => {
930 if self.wildcard_subscription.is_none() {
931 log::warn!(
932 "No handler for notification '{}' registered",
933 method
934 );
935 }
936 }
937 };
938 Ok(())
939 }
940 }
941 }
942 Some(Err(e)) => Err(anyhow!("Error reading command: {}", e)),
943 None => Err(anyhow!("Error reading from master")),
944 }
945 }
946}
947
948impl<S> Plugin<S>
949where
950 S: Clone + Send,
951{
952 pub fn options(&self) -> Vec<UntypedConfigOption> {
953 self.options.values().cloned().collect()
954 }
955 pub fn configuration(&self) -> Configuration {
956 self.configuration.clone()
957 }
958 pub fn state(&self) -> &S {
959 &self.state
960 }
961}
962
963impl<S> Plugin<S>
964where
965 S: Send + Clone,
966{
967 pub async fn send_custom_notification(
968 &self,
969 method: String,
970 v: serde_json::Value,
971 ) -> Result<(), Error> {
972 let mut params = match &v {
975 serde_json::Value::Object(map) => map.clone(),
976 _ => return Err(anyhow::anyhow!("params must be a JSON object")),
977 };
978 params.insert(method.clone(), json!(v));
979
980 self.sender
981 .send(json!({
982 "jsonrpc": "2.0",
983 "method": method,
984 "params": params,
985 }))
986 .await
987 .context("sending custom notification")?;
988 Ok(())
989 }
990
991 pub async fn join(&self) -> Result<(), Error> {
993 self.wait_handle
994 .subscribe()
995 .recv()
996 .await
997 .context("error waiting for shutdown")
998 }
999
1000 pub fn shutdown(&self) -> Result<(), Error> {
1002 self.wait_handle
1003 .send(())
1004 .context("error waiting for shutdown")?;
1005 Ok(())
1006 }
1007}
1008
1009pub enum FeatureBitsKind {
1010 Node,
1011 Channel,
1012 Invoice,
1013 Init,
1014}
1015
1016#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)]
1017struct RpcError {
1018 pub code: Option<i32>,
1019 pub message: String,
1020 pub data: Option<serde_json::Value>,
1021}
1022fn parse_error(error: String) -> RpcError {
1023 match serde_json::from_str::<RpcError>(&error) {
1024 Ok(o) => o,
1025 Err(_) => RpcError {
1026 code: Some(-32700),
1027 message: error,
1028 data: None,
1029 },
1030 }
1031}
1032
1033#[cfg(test)]
1034mod test {
1035 use super::*;
1036
1037 #[tokio::test]
1038 async fn init() {
1039 let state = ();
1040 let builder = Builder::new(tokio::io::stdin(), tokio::io::stdout());
1041 let _ = builder.start(state);
1042 }
1043}