Skip to main content

binary_options_tools_core_pre/utils/
tracing.rs

1use std::{fs::OpenOptions, io::Write, time::Duration};
2
3use kanal::{bounded_async, Sender};
4use serde_json::Value;
5use tokio_tungstenite::tungstenite::Message;
6use tracing::level_filters::LevelFilter;
7use tracing_subscriber::{
8    fmt::{self, MakeWriter},
9    layer::SubscriberExt,
10    util::SubscriberInitExt,
11    Layer, Registry,
12};
13
14use crate::{
15    error::{CoreError, CoreResult},
16    utils::stream::RecieverStream,
17};
18
19pub fn start_tracing(terminal: bool) -> CoreResult<()> {
20    let error_logs = OpenOptions::new()
21        .append(true)
22        .create(true)
23        .open("errors.log")?;
24
25    let sub = tracing_subscriber::registry()
26        // .with(filtered_layer)
27        .with(
28            // log-error file, to log the errors that arise
29            fmt::layer()
30                .with_ansi(false)
31                .with_writer(error_logs)
32                .with_filter(LevelFilter::WARN),
33        );
34    if terminal {
35        sub.with(fmt::Layer::default().with_filter(LevelFilter::DEBUG))
36            .try_init()
37            .map_err(|e| CoreError::Tracing(e.to_string()))?;
38    } else {
39        sub.try_init()
40            .map_err(|e| CoreError::Tracing(e.to_string()))?;
41    }
42
43    Ok(())
44}
45
46pub fn start_tracing_leveled(terminal: bool, level: LevelFilter) -> CoreResult<()> {
47    let error_logs = OpenOptions::new()
48        .append(true)
49        .create(true)
50        .open("errors.log")?;
51
52    let sub = tracing_subscriber::registry()
53        // .with(filtered_layer)
54        .with(
55            // log-error file, to log the errors that arise
56            fmt::layer()
57                .with_ansi(false)
58                .with_writer(error_logs)
59                .with_filter(LevelFilter::WARN),
60        );
61    if terminal {
62        sub.with(fmt::Layer::default().with_filter(level))
63            .try_init()
64            .map_err(|e| CoreError::Tracing(e.to_string()))?;
65    } else {
66        sub.try_init()
67            .map_err(|e| CoreError::Tracing(e.to_string()))?;
68    }
69
70    Ok(())
71}
72
73#[derive(Clone)]
74pub struct StreamWriter {
75    sender: Sender<Message>,
76}
77
78impl Write for StreamWriter {
79    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
80        if let Ok(item) = serde_json::from_slice::<Value>(buf) {
81            self.sender
82                .send(Message::text(item.to_string()))
83                .map_err(std::io::Error::other)?;
84        }
85        Ok(buf.len())
86    }
87
88    fn flush(&mut self) -> std::io::Result<()> {
89        Ok(())
90    }
91}
92
93impl<'a> MakeWriter<'a> for StreamWriter {
94    type Writer = StreamWriter;
95    fn make_writer(&'a self) -> Self::Writer {
96        self.clone()
97    }
98}
99
100pub fn stream_logs_layer(
101    level: LevelFilter,
102    timeout: Option<Duration>,
103) -> (Box<dyn Layer<Registry> + Send + Sync>, RecieverStream) {
104    let (sender, receiver) = bounded_async(128);
105    let receiver = RecieverStream::new_timed(receiver, timeout);
106    let writer = StreamWriter {
107        sender: sender.to_sync(),
108    };
109    let layer = tracing_subscriber::fmt::layer::<Registry>()
110        .json()
111        .flatten_event(true)
112        .with_writer(writer)
113        .with_filter(level)
114        .boxed();
115    (layer, receiver)
116}