binary_options_tools_core_pre/utils/
tracing.rs1use std::{fs::OpenOptions, io::Write, time::Duration};
2
3use kanal::{bounded_async, Sender};
4use serde_json::Value;
5use tokio_tungstenite::tungstenite::Message;
6use tracing::level_filters::LevelFilter;
7use tracing_subscriber::{
8 fmt::{self, MakeWriter},
9 layer::SubscriberExt,
10 util::SubscriberInitExt,
11 Layer, Registry,
12};
13
14use crate::{
15 error::{CoreError, CoreResult},
16 utils::stream::RecieverStream,
17};
18
19pub fn start_tracing(terminal: bool) -> CoreResult<()> {
20 let error_logs = OpenOptions::new()
21 .append(true)
22 .create(true)
23 .open("errors.log")?;
24
25 let sub = tracing_subscriber::registry()
26 .with(
28 fmt::layer()
30 .with_ansi(false)
31 .with_writer(error_logs)
32 .with_filter(LevelFilter::WARN),
33 );
34 if terminal {
35 sub.with(fmt::Layer::default().with_filter(LevelFilter::DEBUG))
36 .try_init()
37 .map_err(|e| CoreError::Tracing(e.to_string()))?;
38 } else {
39 sub.try_init()
40 .map_err(|e| CoreError::Tracing(e.to_string()))?;
41 }
42
43 Ok(())
44}
45
46pub fn start_tracing_leveled(terminal: bool, level: LevelFilter) -> CoreResult<()> {
47 let error_logs = OpenOptions::new()
48 .append(true)
49 .create(true)
50 .open("errors.log")?;
51
52 let sub = tracing_subscriber::registry()
53 .with(
55 fmt::layer()
57 .with_ansi(false)
58 .with_writer(error_logs)
59 .with_filter(LevelFilter::WARN),
60 );
61 if terminal {
62 sub.with(fmt::Layer::default().with_filter(level))
63 .try_init()
64 .map_err(|e| CoreError::Tracing(e.to_string()))?;
65 } else {
66 sub.try_init()
67 .map_err(|e| CoreError::Tracing(e.to_string()))?;
68 }
69
70 Ok(())
71}
72
73#[derive(Clone)]
74pub struct StreamWriter {
75 sender: Sender<Message>,
76}
77
78impl Write for StreamWriter {
79 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
80 if let Ok(item) = serde_json::from_slice::<Value>(buf) {
81 self.sender
82 .send(Message::text(item.to_string()))
83 .map_err(std::io::Error::other)?;
84 }
85 Ok(buf.len())
86 }
87
88 fn flush(&mut self) -> std::io::Result<()> {
89 Ok(())
90 }
91}
92
93impl<'a> MakeWriter<'a> for StreamWriter {
94 type Writer = StreamWriter;
95 fn make_writer(&'a self) -> Self::Writer {
96 self.clone()
97 }
98}
99
100pub fn stream_logs_layer(
101 level: LevelFilter,
102 timeout: Option<Duration>,
103) -> (Box<dyn Layer<Registry> + Send + Sync>, RecieverStream) {
104 let (sender, receiver) = bounded_async(128);
105 let receiver = RecieverStream::new_timed(receiver, timeout);
106 let writer = StreamWriter {
107 sender: sender.to_sync(),
108 };
109 let layer = tracing_subscriber::fmt::layer::<Registry>()
110 .json()
111 .flatten_event(true)
112 .with_writer(writer)
113 .with_filter(level)
114 .boxed();
115 (layer, receiver)
116}