1use crate::codec::{JsonCodec, JsonRpcCodec};
2pub use anyhow::anyhow;
3use anyhow::{Context, Result};
4use futures::sink::SinkExt;
5use serde::de::DeserializeOwned;
6use serde::Serialize;
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8extern crate log;
9use log::trace;
10use messages::{Configuration, FeatureBits, NotificationTopic};
11use options::{OptionType, UntypedConfigOption};
12use std::collections::HashMap;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::Arc;
16use tokio::io::{AsyncRead, AsyncWrite};
17use tokio::sync::Mutex;
18use tokio_stream::StreamExt;
19use tokio_util::codec::FramedRead;
20use tokio_util::codec::FramedWrite;
21
22mod codec;
23mod logging;
24pub mod messages;
25
26#[macro_use]
27extern crate serde_json;
28
29pub mod options;
30
31pub type Error = anyhow::Error;
36
37pub struct Builder<S, I, O>
39where
40 I: AsyncRead + Unpin,
41 O: Send + AsyncWrite + Unpin,
42 S: Clone + Send,
43{
44 input: Option<I>,
45 output: Option<O>,
46
47 hooks: HashMap<String, Hook<S>>,
48 options: HashMap<String, UntypedConfigOption>,
49 option_values: HashMap<String, Option<options::Value>>,
50 rpcmethods: HashMap<String, RpcMethod<S>>,
51 setconfig_callback: Option<AsyncCallback<S>>,
52 subscriptions: HashMap<String, Subscription<S>>,
53 wildcard_subscription: Option<Subscription<S>>,
55 notifications: Vec<NotificationTopic>,
56 custommessages: Vec<u16>,
57 featurebits: FeatureBits,
58 dynamic: bool,
59 logging: bool,
61}
62
63pub struct ConfiguredPlugin<S, I, O>
68where
69 S: Clone + Send,
70{
71 init_id: serde_json::Value,
72 input: FramedRead<I, JsonRpcCodec>,
73 output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
74 options: HashMap<String, UntypedConfigOption>,
75 option_values: HashMap<String, Option<options::Value>>,
76 configuration: Configuration,
77 rpcmethods: HashMap<String, AsyncCallback<S>>,
78 setconfig_callback: Option<AsyncCallback<S>>,
79 hooks: HashMap<String, AsyncCallback<S>>,
80 subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
81 wildcard_subscription: Option<AsyncNotificationCallback<S>>,
82 #[allow(dead_code)] notifications: Vec<NotificationTopic>,
84}
85
86struct PluginDriver<S>
92where
93 S: Send + Clone,
94{
95 plugin: Plugin<S>,
96 rpcmethods: HashMap<String, AsyncCallback<S>>,
97 setconfig_callback: Option<AsyncCallback<S>>,
98
99 #[allow(dead_code)] hooks: HashMap<String, AsyncCallback<S>>,
101 subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
102 wildcard_subscription: Option<AsyncNotificationCallback<S>>,
103}
104
105#[derive(Clone)]
106pub struct Plugin<S>
107where
108 S: Clone + Send,
109{
110 state: S,
112 options: HashMap<String, UntypedConfigOption>,
114 option_values: Arc<std::sync::Mutex<HashMap<String, Option<options::Value>>>>,
115 configuration: Configuration,
117 wait_handle: tokio::sync::broadcast::Sender<()>,
119
120 sender: tokio::sync::mpsc::Sender<serde_json::Value>,
121}
122
123impl<S, I, O> Builder<S, I, O>
124where
125 O: Send + AsyncWrite + Unpin + 'static,
126 S: Clone + Sync + Send + 'static,
127 I: AsyncRead + Send + Unpin + 'static,
128{
129 pub fn new(input: I, output: O) -> Self {
130 Self {
131 input: Some(input),
132 output: Some(output),
133 hooks: HashMap::new(),
134 subscriptions: HashMap::new(),
135 wildcard_subscription: None,
136 options: HashMap::new(),
137 option_values: HashMap::new(),
140 rpcmethods: HashMap::new(),
141 setconfig_callback: None,
142 notifications: vec![],
143 featurebits: FeatureBits::default(),
144 dynamic: false,
145 custommessages: vec![],
146 logging: true,
147 }
148 }
149
150 pub fn option<'a, V: options::OptionType<'a>>(
151 mut self,
152 opt: options::ConfigOption<'a, V>,
153 ) -> Builder<S, I, O> {
154 self.options.insert(opt.name().to_string(), opt.build());
155 self
156 }
157
158 pub fn notification(mut self, notif: messages::NotificationTopic) -> Builder<S, I, O> {
159 self.notifications.push(notif);
160 self
161 }
162
163 pub fn subscribe<C, F>(mut self, topic: &str, callback: C) -> Builder<S, I, O>
182 where
183 C: Send + Sync + 'static,
184 C: Fn(Plugin<S>, Request) -> F + 'static,
185 F: Future<Output = Result<(), Error>> + Send + 'static,
186 {
187 let subscription = Subscription {
188 callback: Box::new(move |p, r| Box::pin(callback(p, r))),
189 };
190
191 if topic == "*" {
192 self.wildcard_subscription = Some(subscription);
193 } else {
194 self.subscriptions.insert(topic.to_string(), subscription);
195 };
196 self
197 }
198
199 pub fn hook<C, F>(mut self, hookname: &str, callback: C) -> Self
203 where
204 C: Send + Sync + 'static,
205 C: Fn(Plugin<S>, Request) -> F + 'static,
206 F: Future<Output = Response> + Send + 'static,
207 {
208 self.hooks.insert(
209 hookname.to_string(),
210 Hook {
211 name: hookname.to_string(),
212 callback: Box::new(move |p, r| Box::pin(callback(p, r))),
213 before: Vec::new(),
214 after: Vec::new(),
215 filters: None,
216 },
217 );
218 self
219 }
220
221 pub fn hook_from_builder(mut self, hook: HookBuilder<S>) -> Builder<S, I, O> {
225 self.hooks.insert(hook.name.clone(), hook.build());
226 self
227 }
228
229 pub fn hook_typed<C, F, Req, Resp>(mut self, hookname: &str, callback: C) -> Self
235 where
236 C: Send + Sync + 'static,
237 C: Fn(Plugin<S>, Req) -> F + 'static,
238 F: Future<Output = Result<Resp, Error>> + Send + 'static,
239 Req: DeserializeOwned + Send + 'static,
240 Resp: Serialize + Send + 'static,
241 {
242 let hookname = hookname.to_string();
243 self.hooks.insert(
244 hookname.clone(),
245 Hook {
246 name: hookname.clone(),
247 callback: Box::new(move |p, r| {
248 let typed_req = serde_json::from_value(r).unwrap_or_else(|e| {
249 let error = format!(
250 "cln-plugin: hook '{hookname}' received a request that doesn't match \
251 the expected schema. Error: {e}"
252 );
253 println!(
254 "{}",
255 serde_json::json!({"jsonrpc": "2.0",
256 "method": "log",
257 "params": {"level":"warn", "message":error}})
258 );
259 std::process::exit(1);
260 });
261 let fut = callback(p, typed_req);
262 Box::pin(async move {
263 let typed_resp = fut.await?;
264 serde_json::to_value(typed_resp).map_err(Error::from)
265 })
266 }),
267 before: Vec::new(),
268 after: Vec::new(),
269 filters: None,
270 },
271 );
272 self
273 }
274
275 pub fn rpcmethod<C, F>(mut self, name: &str, description: &str, callback: C) -> Builder<S, I, O>
278 where
279 C: Send + Sync + 'static,
280 C: Fn(Plugin<S>, Request) -> F + 'static,
281 F: Future<Output = Response> + Send + 'static,
282 {
283 self.rpcmethods.insert(
284 name.to_string(),
285 RpcMethod {
286 name: name.to_string(),
287 description: description.to_string(),
288 usage: String::default(),
289 callback: Box::new(move |p, r| Box::pin(callback(p, r))),
290 },
291 );
292 self
293 }
294
295 pub fn rpcmethod_from_builder(mut self, rpc_method: RpcMethodBuilder<S>) -> Builder<S, I, O> {
296 self.rpcmethods
297 .insert(rpc_method.name.to_string(), rpc_method.build());
298 self
299 }
300
301 pub fn setconfig_callback<C, F>(mut self, setconfig_callback: C) -> Builder<S, I, O>
303 where
304 C: Send + Sync + 'static,
305 C: Fn(Plugin<S>, Request) -> F + 'static,
306 F: Future<Output = Response> + Send + 'static,
307 {
308 self.setconfig_callback = Some(Box::new(move |p, r| Box::pin(setconfig_callback(p, r))));
309 self
310 }
311
312 pub fn dynamic(mut self) -> Builder<S, I, O> {
314 self.dynamic = true;
315 self
316 }
317
318 pub fn featurebits(mut self, kind: FeatureBitsKind, hex: String) -> Self {
320 match kind {
321 FeatureBitsKind::Node => self.featurebits.node = Some(hex),
322 FeatureBitsKind::Channel => self.featurebits.channel = Some(hex),
323 FeatureBitsKind::Init => self.featurebits.init = Some(hex),
324 FeatureBitsKind::Invoice => self.featurebits.invoice = Some(hex),
325 }
326 self
327 }
328
329 pub fn with_logging(mut self, log: bool) -> Builder<S, I, O> {
336 self.logging = log;
337 self
338 }
339
340 pub fn custommessages(mut self, custommessages: Vec<u16>) -> Self {
343 self.custommessages = custommessages;
344 self
345 }
346
347 pub async fn configure(mut self) -> Result<Option<ConfiguredPlugin<S, I, O>>, anyhow::Error> {
354 let mut input = FramedRead::new(self.input.take().unwrap(), JsonRpcCodec::default());
355
356 let output = Arc::new(Mutex::new(FramedWrite::new(
363 self.output.take().unwrap(),
364 JsonCodec::default(),
365 )));
366
367 if self.logging {
370 crate::logging::init(output.clone()).await?;
371 trace!("Plugin logging initialized");
372 }
373
374 match input.next().await {
376 Some(Ok(messages::JsonRpc::Request(id, messages::Request::Getmanifest(m)))) => {
377 output
378 .lock()
379 .await
380 .send(json!({
381 "jsonrpc": "2.0",
382 "result": self.handle_get_manifest(m),
383 "id": id,
384 }))
385 .await?
386 }
387 Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
388 None => {
389 return Err(anyhow!(
390 "Lost connection to lightning expecting getmanifest"
391 ));
392 }
393 };
394 let (init_id, configuration) = match input.next().await {
395 Some(Ok(messages::JsonRpc::Request(id, messages::Request::Init(m)))) => {
396 (id, self.handle_init(m)?)
397 }
398
399 Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
400 None => {
401 return Ok(None);
405 }
406 };
407
408 let mut rpcmethods: HashMap<String, AsyncCallback<S>> =
411 HashMap::from_iter(self.rpcmethods.drain().map(|(k, v)| (k, v.callback)));
412 rpcmethods.extend(self.hooks.drain().map(|(k, v)| (k, v.callback)));
413
414 let subscriptions =
415 HashMap::from_iter(self.subscriptions.drain().map(|(k, v)| (k, v.callback)));
416 let all_subscription = self.wildcard_subscription.map(|s| s.callback);
417
418 Ok(Some(ConfiguredPlugin {
421 init_id,
423 input,
424 output,
425 rpcmethods,
426 setconfig_callback: self.setconfig_callback,
427 notifications: self.notifications,
428 subscriptions,
429 wildcard_subscription: all_subscription,
430 options: self.options,
431 option_values: self.option_values,
432 configuration,
433 hooks: HashMap::new(),
434 }))
435 }
436
437 pub async fn start(self, state: S) -> Result<Option<Plugin<S>>, anyhow::Error> {
448 if let Some(cp) = self.configure().await? {
449 Ok(Some(cp.start(state).await?))
450 } else {
451 Ok(None)
452 }
453 }
454
455 fn handle_get_manifest(
456 &mut self,
457 _call: messages::GetManifestCall,
458 ) -> messages::GetManifestResponse {
459 let rpcmethods: Vec<_> = self
460 .rpcmethods
461 .values()
462 .map(|v| messages::RpcMethod {
463 name: v.name.clone(),
464 description: v.description.clone(),
465 usage: v.usage.clone(),
466 })
467 .collect();
468
469 let subscriptions = self
470 .subscriptions
471 .keys()
472 .map(|s| s.clone())
473 .chain(self.wildcard_subscription.iter().map(|_| String::from("*")))
474 .collect();
475
476 let hooks: Vec<messages::Hook> = self
477 .hooks
478 .values()
479 .map(|v| messages::Hook {
480 name: v.name.clone(),
481 before: v.before.clone(),
482 after: v.after.clone(),
483 filters: v.filters.clone(),
484 })
485 .collect();
486
487 messages::GetManifestResponse {
488 options: self.options.values().cloned().collect(),
489 subscriptions,
490 hooks,
491 rpcmethods,
492 notifications: self.notifications.clone(),
493 featurebits: self.featurebits.clone(),
494 dynamic: self.dynamic,
495 nonnumericids: true,
496 custommessages: self.custommessages.clone(),
497 }
498 }
499
500 fn handle_init(&mut self, call: messages::InitCall) -> Result<Configuration, Error> {
501 use options::Value as OValue;
502 use serde_json::Value as JValue;
503
504 for (name, option) in self.options.iter() {
507 let json_value = call.options.get(name);
508 let default_value = option.default();
509
510 let option_value: Option<options::Value> = match (json_value, default_value) {
511 (None, None) => None,
512 (None, Some(default)) => Some(default.clone()),
513 (Some(JValue::Array(a)), _) => match a.first() {
514 Some(JValue::String(_)) => Some(OValue::StringArray(
515 a.iter().map(|x| x.as_str().unwrap().to_string()).collect(),
516 )),
517 Some(JValue::Number(_)) => Some(OValue::IntegerArray(
518 a.iter().map(|x| x.as_i64().unwrap()).collect(),
519 )),
520 _ => panic!("Array type not supported for option: {}", name),
521 },
522 (Some(JValue::String(s)), _) => Some(OValue::String(s.to_string())),
523 (Some(JValue::Number(i)), _) => Some(OValue::Integer(i.as_i64().unwrap())),
524 (Some(JValue::Bool(b)), _) => Some(OValue::Boolean(*b)),
525 _ => panic!("Type mismatch for option {}", name),
526 };
527
528 self.option_values.insert(name.to_string(), option_value);
529 }
530 Ok(call.configuration)
531 }
532}
533
534impl<S> HookBuilder<S>
535where
536 S: Send + Clone,
537{
538 pub fn new<C, F>(name: &str, callback: C) -> Self
539 where
540 C: Send + Sync + 'static,
541 C: Fn(Plugin<S>, Request) -> F + 'static,
542 F: Future<Output = Response> + Send + 'static,
543 {
544 Self {
545 name: name.to_string(),
546 callback: Box::new(move |p, r| Box::pin(callback(p, r))),
547 before: Vec::new(),
548 after: Vec::new(),
549 filters: None,
550 }
551 }
552
553 pub fn new_typed<C, F, Req, Resp>(name: &str, callback: C) -> Self
554 where
555 C: Send + Sync + 'static,
556 C: Fn(Plugin<S>, Req) -> F + 'static,
557 F: Future<Output = Result<Resp, Error>> + Send + 'static,
558 Req: DeserializeOwned + Send + 'static,
559 Resp: Serialize + Send + 'static,
560 {
561 let hookname = name.to_string();
562 Self {
563 name: hookname.clone(),
564 callback: Box::new(move |p, r| {
565 let typed_req = serde_json::from_value(r).unwrap_or_else(|e| {
566 let error = format!(
567 "cln-plugin: hook '{hookname}' received a request that doesn't match \
568 the expected schema. Error: {e}"
569 );
570 println!(
571 "{}",
572 serde_json::json!({"jsonrpc": "2.0",
573 "method": "log",
574 "params": {"level":"warn", "message":error}})
575 );
576 std::process::exit(1);
577 });
578 let fut = callback(p, typed_req);
579 Box::pin(async move {
580 let typed_resp = fut.await?;
581 serde_json::to_value(typed_resp).map_err(Error::from)
582 })
583 }),
584 before: Vec::new(),
585 after: Vec::new(),
586 filters: None,
587 }
588 }
589
590 pub fn before(mut self, before: Vec<String>) -> Self {
591 self.before = before;
592 self
593 }
594
595 pub fn after(mut self, after: Vec<String>) -> Self {
596 self.after = after;
597 self
598 }
599
600 pub fn filters(mut self, filters: Vec<HookFilter>) -> Self {
601 if filters.is_empty() {
603 self.filters = None;
604 } else {
605 self.filters = Some(filters);
606 }
607 self
608 }
609
610 fn build(self) -> Hook<S> {
611 Hook {
612 callback: self.callback,
613 name: self.name,
614 before: self.before,
615 after: self.after,
616 filters: self.filters,
617 }
618 }
619}
620
621impl<S> RpcMethodBuilder<S>
622where
623 S: Send + Clone,
624{
625 pub fn new<C, F>(name: &str, callback: C) -> Self
626 where
627 C: Send + Sync + 'static,
628 C: Fn(Plugin<S>, Request) -> F + 'static,
629 F: Future<Output = Response> + Send + 'static,
630 {
631 Self {
632 name: name.to_string(),
633 callback: Box::new(move |p, r| Box::pin(callback(p, r))),
634 usage: None,
635 description: None,
636 }
637 }
638
639 pub fn description(mut self, description: &str) -> Self {
640 self.description = Some(description.to_string());
641 self
642 }
643
644 pub fn usage(mut self, usage: &str) -> Self {
645 self.usage = Some(usage.to_string());
646 self
647 }
648
649 fn build(self) -> RpcMethod<S> {
650 RpcMethod {
651 callback: self.callback,
652 name: self.name,
653 description: self.description.unwrap_or_default(),
654 usage: self.usage.unwrap_or_default(),
655 }
656 }
657}
658
659type Request = serde_json::Value;
662type Response = Result<serde_json::Value, Error>;
663type AsyncCallback<S> =
664 Box<dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Response> + Send>> + Send + Sync>;
665type AsyncNotificationCallback<S> = Box<
666 dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
667 + Send
668 + Sync,
669>;
670
671struct RpcMethod<S>
675where
676 S: Clone + Send,
677{
678 callback: AsyncCallback<S>,
679 description: String,
680 name: String,
681 usage: String,
682}
683
684pub struct RpcMethodBuilder<S>
685where
686 S: Clone + Send,
687{
688 callback: AsyncCallback<S>,
689 name: String,
690 description: Option<String>,
691 usage: Option<String>,
692}
693
694struct Subscription<S>
695where
696 S: Clone + Send,
697{
698 callback: AsyncNotificationCallback<S>,
699}
700
701struct Hook<S>
702where
703 S: Clone + Send,
704{
705 name: String,
706 callback: AsyncCallback<S>,
707 before: Vec<String>,
708 after: Vec<String>,
709 filters: Option<Vec<HookFilter>>,
710}
711
712pub struct HookBuilder<S>
713where
714 S: Clone + Send,
715{
716 name: String,
717 callback: AsyncCallback<S>,
718 before: Vec<String>,
719 after: Vec<String>,
720 filters: Option<Vec<HookFilter>>,
721}
722
723#[derive(Debug, Clone, Serialize)]
724#[serde(untagged)]
725pub enum HookFilter {
726 Str(String),
727 Int(i64),
728}
729
730impl<S> Plugin<S>
731where
732 S: Clone + Send,
733{
734 pub fn option_str(&self, name: &str) -> Result<Option<options::Value>> {
735 self.option_values
736 .lock()
737 .unwrap()
738 .get(name)
739 .ok_or(anyhow!("No option named {}", name))
740 .cloned()
741 }
742
743 pub fn option<'a, OV: OptionType<'a>>(
744 &self,
745 config_option: &options::ConfigOption<'a, OV>,
746 ) -> Result<OV::OutputValue> {
747 let value = self.option_str(config_option.name())?;
748 Ok(OV::from_value(&value))
749 }
750
751 pub fn set_option_str(&self, name: &str, value: options::Value) -> Result<()> {
752 *self
753 .option_values
754 .lock()
755 .unwrap()
756 .get_mut(name)
757 .ok_or(anyhow!("No option named {}", name))? = Some(value);
758 Ok(())
759 }
760
761 pub fn set_option<'a, OV: OptionType<'a>>(
762 &self,
763 config_option: &options::ConfigOption<'a, OV>,
764 value: options::Value,
765 ) -> Result<()> {
766 self.set_option_str(config_option.name(), value)?;
767 Ok(())
768 }
769}
770
771impl<S, I, O> ConfiguredPlugin<S, I, O>
772where
773 S: Send + Clone + Sync + 'static,
774 I: AsyncRead + Send + Unpin + 'static,
775 O: Send + AsyncWrite + Unpin + 'static,
776{
777 #[allow(unused_mut)]
778 pub async fn start(mut self, state: S) -> Result<Plugin<S>, anyhow::Error> {
779 let output = self.output;
780 let input = self.input;
781 let (wait_handle, _) = tokio::sync::broadcast::channel(1);
782
783 let (sender, receiver) = tokio::sync::mpsc::channel(4);
786
787 let plugin = Plugin {
788 state,
789 options: self.options,
790 option_values: Arc::new(std::sync::Mutex::new(self.option_values)),
791 configuration: self.configuration,
792 wait_handle,
793 sender,
794 };
795
796 let driver = PluginDriver {
797 plugin: plugin.clone(),
798 rpcmethods: self.rpcmethods,
799 setconfig_callback: self.setconfig_callback,
800 hooks: self.hooks,
801 subscriptions: self.subscriptions,
802 wildcard_subscription: self.wildcard_subscription,
803 };
804
805 output
806 .lock()
807 .await
808 .send(json!(
809 {
810 "jsonrpc": "2.0",
811 "id": self.init_id,
812 "result": crate::messages::InitResponse{disable: None}
813 }
814 ))
815 .await
816 .context("sending init response")?;
817
818 let joiner = plugin.wait_handle.clone();
819 tokio::spawn(async move {
821 if let Err(e) = driver.run(receiver, input, output).await {
822 log::warn!("Plugin loop returned error {:?}", e);
823 }
824
825 joiner.send(())
829 });
830 Ok(plugin)
831 }
832
833 #[allow(unused_mut)]
836 pub async fn disable(mut self, reason: &str) -> Result<(), anyhow::Error> {
837 self.output
838 .lock()
839 .await
840 .send(json!(
841 {
842 "jsonrpc": "2.0",
843 "id": self.init_id,
844 "result": crate::messages::InitResponse{
845 disable: Some(reason.to_string())
846 }
847 }
848 ))
849 .await
850 .context("sending init response")?;
851 Ok(())
852 }
853
854 pub fn option_str(&self, name: &str) -> Result<Option<options::Value>> {
855 self.option_values
856 .get(name)
857 .ok_or(anyhow!("No option named '{}'", name))
858 .map(|c| c.clone())
859 }
860
861 pub fn option<'a, OV: OptionType<'a>>(
862 &self,
863 config_option: &options::ConfigOption<'a, OV>,
864 ) -> Result<OV::OutputValue> {
865 let value = self.option_str(config_option.name())?;
866 Ok(OV::from_value(&value))
867 }
868
869 pub fn configuration(&self) -> Configuration {
872 self.configuration.clone()
873 }
874}
875
876impl<S> PluginDriver<S>
877where
878 S: Send + Clone,
879{
880 async fn run<I, O>(
882 self,
883 mut receiver: tokio::sync::mpsc::Receiver<serde_json::Value>,
884 mut input: FramedRead<I, JsonRpcCodec>,
885 output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
886 ) -> Result<(), Error>
887 where
888 I: Send + AsyncReadExt + Unpin,
889 O: Send + AsyncWriteExt + Unpin,
890 {
891 loop {
892 tokio::select! {
897 e = self.dispatch_one(&mut input, &self.plugin) => {
898 if let Err(e) = e {
899 return Err(e)
900 }
901 },
902 v = receiver.recv() => {
903 output.lock().await.send(
904 v.context("internal communication error")?
905 ).await?;
906 },
907 }
908 }
909 }
910
911 async fn dispatch_one<I>(
914 &self,
915 input: &mut FramedRead<I, JsonRpcCodec>,
916 plugin: &Plugin<S>,
917 ) -> Result<(), Error>
918 where
919 I: Send + AsyncReadExt + Unpin,
920 {
921 match input.next().await {
922 Some(Ok(msg)) => {
923 trace!("Received a message: {:?}", msg);
924 match msg {
925 messages::JsonRpc::Request(_id, _p) => {
926 todo!(
927 "This is unreachable until we start filling in messages:Request. Until then the custom dispatcher below is used exclusively."
928 );
929 }
930 messages::JsonRpc::Notification(_n) => {
931 todo!(
932 "As soon as we define the full structure of the messages::Notification we'll get here. Until then the custom dispatcher below is used."
933 )
934 }
935 messages::JsonRpc::CustomRequest(id, request) => {
936 trace!("Dispatching custom method {:?}", request);
937 let method = request
938 .get("method")
939 .context("Missing 'method' in request")?
940 .as_str()
941 .context("'method' is not a string")?;
942 let callback = match method {
943 name if name.eq("setconfig") => {
944 self.setconfig_callback.as_ref().ok_or_else(|| {
945 anyhow!("No handler for method '{}' registered", method)
946 })?
947 }
948 _ => self.rpcmethods.get(method).with_context(|| {
949 anyhow!("No handler for method '{}' registered", method)
950 })?,
951 };
952 let params = request
953 .get("params")
954 .context("Missing 'params' field in request")?
955 .clone();
956
957 let plugin = plugin.clone();
958 let call = callback(plugin.clone(), params);
959
960 tokio::spawn(async move {
961 match call.await {
962 Ok(v) => plugin
963 .sender
964 .send(json!({
965 "jsonrpc": "2.0",
966 "id": id,
967 "result": v
968 }))
969 .await
970 .context("returning custom response"),
971 Err(e) => plugin
972 .sender
973 .send(json!({
974 "jsonrpc": "2.0",
975 "id": id,
976 "error": parse_error(e.to_string()),
977 }))
978 .await
979 .context("returning custom error"),
980 }
981 });
982 Ok(())
983 }
984 messages::JsonRpc::CustomNotification(request) => {
985 trace!("Dispatching custom notification {:?}", request);
987 let method = request
988 .get("method")
989 .context("Missing 'method' in request")?
990 .as_str()
991 .context("'method' is not a string")?;
992
993 let params = request
994 .get("params")
995 .context("Missing 'params' field in request")?;
996
997 match &self.wildcard_subscription {
1000 Some(cb) => {
1001 let call = cb(plugin.clone(), params.clone());
1002 tokio::spawn(async move {
1003 if let Err(e) = call.await {
1004 log::warn!("Wildcard notification handler error: '{}'", e)
1005 }
1006 });
1007 }
1008 None => {}
1009 };
1010
1011 match self.subscriptions.get(method) {
1014 Some(cb) => {
1015 let call = cb(plugin.clone(), params.clone());
1016 tokio::spawn(async move {
1017 if let Err(e) = call.await {
1018 log::warn!("Notification handler error: '{}'", e)
1019 }
1020 });
1021 }
1022 None => {
1023 if self.wildcard_subscription.is_none() {
1024 log::warn!(
1025 "No handler for notification '{}' registered",
1026 method
1027 );
1028 }
1029 }
1030 };
1031 Ok(())
1032 }
1033 }
1034 }
1035 Some(Err(e)) => Err(anyhow!("Error reading command: {}", e)),
1036 None => Err(anyhow!("Error reading from master")),
1037 }
1038 }
1039}
1040
1041impl<S> Plugin<S>
1042where
1043 S: Clone + Send,
1044{
1045 pub fn options(&self) -> Vec<UntypedConfigOption> {
1046 self.options.values().cloned().collect()
1047 }
1048 pub fn configuration(&self) -> Configuration {
1049 self.configuration.clone()
1050 }
1051 pub fn state(&self) -> &S {
1052 &self.state
1053 }
1054}
1055
1056impl<S> Plugin<S>
1057where
1058 S: Send + Clone,
1059{
1060 pub async fn send_custom_notification(
1061 &self,
1062 method: String,
1063 v: serde_json::Value,
1064 ) -> Result<(), Error> {
1065 let mut params = match &v {
1068 serde_json::Value::Object(map) => map.clone(),
1069 _ => return Err(anyhow::anyhow!("params must be a JSON object")),
1070 };
1071 params.insert(method.clone(), json!(v));
1072
1073 self.sender
1074 .send(json!({
1075 "jsonrpc": "2.0",
1076 "method": method,
1077 "params": params,
1078 }))
1079 .await
1080 .context("sending custom notification")?;
1081 Ok(())
1082 }
1083
1084 pub async fn join(&self) -> Result<(), Error> {
1086 self.wait_handle
1087 .subscribe()
1088 .recv()
1089 .await
1090 .context("error waiting for shutdown")
1091 }
1092
1093 pub fn shutdown(&self) -> Result<(), Error> {
1095 self.wait_handle
1096 .send(())
1097 .context("error waiting for shutdown")?;
1098 Ok(())
1099 }
1100}
1101
1102pub enum FeatureBitsKind {
1103 Node,
1104 Channel,
1105 Invoice,
1106 Init,
1107}
1108
1109#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)]
1110struct RpcError {
1111 pub code: Option<i32>,
1112 pub message: String,
1113 pub data: Option<serde_json::Value>,
1114}
1115fn parse_error(error: String) -> RpcError {
1116 match serde_json::from_str::<RpcError>(&error) {
1117 Ok(o) => o,
1118 Err(_) => RpcError {
1119 code: Some(-32700),
1120 message: error,
1121 data: None,
1122 },
1123 }
1124}
1125
1126#[cfg(test)]
1127mod test {
1128 use super::*;
1129
1130 #[tokio::test]
1131 async fn init() {
1132 let state = ();
1133 let builder = Builder::new(tokio::io::stdin(), tokio::io::stdout());
1134 let _ = builder.start(state);
1135 }
1136}