cln_plugin/
logging.rs

1use crate::codec::JsonCodec;
2use anyhow::Context;
3use futures::SinkExt;
4use serde::Serialize;
5use std::sync::Arc;
6use tokio::io::AsyncWrite;
7use tokio::sync::mpsc;
8use tokio::sync::Mutex;
9use tokio_util::codec::FramedWrite;
10
11#[derive(Clone, Debug, Serialize)]
12#[serde(rename_all = "lowercase")]
13struct LogEntry {
14    level: LogLevel,
15    message: String,
16}
17
18#[derive(Clone, Debug, Serialize)]
19#[serde(rename_all = "lowercase")]
20enum LogLevel {
21    Debug,
22    Info,
23    Warn,
24    Error,
25    Trace,
26}
27
28impl From<log::Level> for LogLevel {
29    fn from(lvl: log::Level) -> Self {
30        match lvl {
31            log::Level::Error => LogLevel::Error,
32            log::Level::Warn => LogLevel::Warn,
33            log::Level::Info => LogLevel::Info,
34            log::Level::Debug => LogLevel::Debug,
35            log::Level::Trace => LogLevel::Trace,
36        }
37    }
38}
39
40/// Start a listener that receives incoming log events, and then
41/// writes them out to `stdout`, after wrapping them in a valid
42/// JSON-RPC notification object.
43fn start_writer<O>(out: Arc<Mutex<FramedWrite<O, JsonCodec>>>) -> mpsc::UnboundedSender<LogEntry>
44where
45    O: AsyncWrite + Send + Unpin + 'static,
46{
47    let (sender, mut receiver) = mpsc::unbounded_channel::<LogEntry>();
48    tokio::spawn(async move {
49        while let Some(i) = receiver.recv().await {
50            // We continue draining the queue, even if we get some
51            // errors when forwarding. Forwarding could break due to
52            // an interrupted connection or stdout being closed, but
53            // keeping the messages in the queue is a memory leak.
54            let payload = json!({
55                "jsonrpc": "2.0",
56                "method": "log",
57                "params": i
58            });
59
60            let _ = out.lock().await.send(payload).await;
61        }
62    });
63    sender
64}
65
66/// Initialize the logger starting a flusher to the passed in sink.
67pub async fn init<O>(out: Arc<Mutex<FramedWrite<O, JsonCodec>>>) -> Result<(), anyhow::Error>
68where
69    O: AsyncWrite + Send + Unpin + 'static,
70{
71    return trace::init(out).context("initializing tracing logger");
72}
73
74mod trace {
75    use super::*;
76    use tracing::Level;
77    use tracing_subscriber::prelude::*;
78    use tracing_subscriber::Layer;
79
80    /// Initialize the logger starting a flusher to the passed in sink.
81    pub fn init<O>(out: Arc<Mutex<FramedWrite<O, JsonCodec>>>) -> Result<(), log::SetLoggerError>
82    where
83        O: AsyncWrite + Send + Unpin + 'static,
84    {
85        let filter = tracing_subscriber::filter::EnvFilter::builder()
86            .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
87            .with_env_var("CLN_PLUGIN_LOG")
88            .from_env_lossy();
89        let sender = start_writer(out);
90
91        tracing_subscriber::registry()
92            .with(filter)
93            .with(LoggingLayer::new(sender))
94            .init();
95
96        Ok(())
97    }
98
99    struct LoggingLayer {
100        sender: mpsc::UnboundedSender<LogEntry>,
101    }
102    impl LoggingLayer {
103        fn new(sender: mpsc::UnboundedSender<LogEntry>) -> Self {
104            LoggingLayer { sender }
105        }
106    }
107
108    impl<S> Layer<S> for LoggingLayer
109    where
110        S: tracing::Subscriber,
111    {
112        fn on_event(
113            &self,
114            event: &tracing::Event<'_>,
115            _ctx: tracing_subscriber::layer::Context<'_, S>,
116        ) {
117            let mut extractor = LogExtract::default();
118            event.record(&mut extractor);
119            let message = match extractor.msg {
120                Some(m) => m,
121                None => return,
122            };
123            let level = event.metadata().level().into();
124            self.sender.send(LogEntry { level, message }).unwrap();
125        }
126    }
127
128    impl From<&Level> for LogLevel {
129        fn from(l: &Level) -> LogLevel {
130            match l {
131                &Level::DEBUG => LogLevel::Debug,
132                &Level::ERROR => LogLevel::Error,
133                &Level::INFO => LogLevel::Info,
134                &Level::WARN => LogLevel::Warn,
135                &Level::TRACE => LogLevel::Trace,
136            }
137        }
138    }
139
140    /// Extracts the message from the visitor
141    #[derive(Default)]
142    struct LogExtract {
143        msg: Option<String>,
144    }
145
146    impl tracing::field::Visit for LogExtract {
147        fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
148            if field.name() != "message" {
149                return;
150            }
151            self.msg = Some(format!("{:?}", value));
152        }
153    }
154}