pi_logger/
ali_sls.rs

1use derivative::Derivative;
2use io::ErrorKind;
3use log::Record;
4
5use log4rs::{
6    append::Append,
7    config::{Deserialize, Deserializers},
8    encode::{
9        self, pattern::PatternEncoder, writer::simple::SimpleWriter, Encode, EncoderConfig, Style,
10    },
11};
12
13use async_httpc::{
14    AsyncHttpRequest, AsyncHttpRequestBody, AsyncHttpRequestMethod, AsyncHttpc, AsyncHttpcBuilder,
15};
16
17use pi_async_rt::rt::{
18    multi_thread::{MultiTaskRuntime, MultiTaskRuntimeBuilder, StealableTaskPool},
19    single_thread::{SingleTaskRunner, SingleTaskRuntime},
20    spawn_worker_thread, AsyncRuntime,
21};
22
23use lazy_static::lazy_static;
24use std::sync::RwLock;
25use std::{
26    fmt::{self, Debug},
27    fs::{self, File, OpenOptions},
28    io::{self, BufWriter, Error, Write},
29    path::{Path, PathBuf},
30    sync::Arc,
31    thread,
32    time::Duration,
33};
34
35/// An appender which logs to a http.
36#[derive(Derivative)]
37#[derivative(Debug)]
38pub struct SLSAppender {
39    write: HttpWrite,
40    encoder: Box<dyn Encode>,
41}
42
43impl Append for SLSAppender {
44    fn append(&self, record: &Record) -> anyhow::Result<()> {
45        let mut write = self.write.clone();
46        self.encoder.encode(&mut write, record)?;
47        write.flush()?;
48        Ok(())
49    }
50
51    fn flush(&self) {}
52}
53
54impl SLSAppender {
55    /// Creates a new `SLSAppender` builder.
56    pub fn builder() -> SLSAppenderBuilder {
57        SLSAppenderBuilder {
58            encoder: None,
59            append: true,
60        }
61    }
62}
63
64/// A builder for `SLSAppender`s.
65pub struct SLSAppenderBuilder {
66    encoder: Option<Box<dyn Encode>>,
67    append: bool,
68}
69
70impl SLSAppenderBuilder {
71    /// Sets the output encoder for the `SLSAppender`.
72    pub fn encoder(mut self, encoder: Box<dyn Encode>) -> SLSAppenderBuilder {
73        self.encoder = Some(encoder);
74        self
75    }
76
77    /// Determines if the appender will append to or truncate the output file.
78    ///
79    /// Defaults to `true`.
80    pub fn append(mut self, append: bool) -> SLSAppenderBuilder {
81        self.append = append;
82        self
83    }
84
85    /// Consumes the `SLSAppenderBuilder`, producing a `SLSAppender`.
86    /// The path argument can contain environment variables of the form $ENV{name_here},
87    /// where 'name_here' will be the name of the environment variable that
88    /// will be resolved. Note that if the variable fails to resolve,
89    /// $ENV{name_here} will NOT be replaced in the path.
90    pub fn build(
91        self,
92        url: String,
93        rt: MultiTaskRuntime<()>,
94        httpc: AsyncHttpc,
95    ) -> io::Result<SLSAppender> {
96        let http_w = HttpWrite {
97            url,
98            rt,
99            buf: Vec::new(),
100            httpc,
101        };
102
103        Ok(SLSAppender {
104            write: http_w,
105            encoder: self
106                .encoder
107                .unwrap_or_else(|| Box::new(PatternEncoder::default())),
108        })
109    }
110}
111
112#[derive(Clone)]
113struct HttpWrite {
114    buf: Vec<u8>,
115    rt: MultiTaskRuntime<()>,
116    url: String,
117    httpc: AsyncHttpc,
118}
119
120impl Debug for HttpWrite {
121    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122        Ok(())
123    }
124}
125
126impl io::Write for HttpWrite {
127    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
128        self.buf.write_all(buf);
129        Ok(buf.len())
130    }
131
132    fn flush(&mut self) -> io::Result<()> {
133        let httpc = self.httpc.clone();
134        let buf = self.buf.clone();
135        let url = self.url.clone();
136        let rt = self.rt.clone();
137        self.rt.spawn(async move {
138            http_request(httpc, url, buf, rt).await;
139        });
140        self.buf.clear();
141        Ok(())
142    }
143}
144
145impl encode::Write for HttpWrite {
146    fn set_style(&mut self, style: &Style) -> io::Result<()> {
147        // self.0.set_style(style)
148        Ok(())
149    }
150}
151
152/// 发送日志到阿里云日志服务
153pub async fn http_request(
154    httpc: AsyncHttpc,
155    url: String,
156    body: Vec<u8>,
157    rt: MultiTaskRuntime<()>,
158) -> io::Result<Vec<u8>> {
159    let body = AsyncHttpRequestBody::with_binary(body);
160    let httpc_copy = httpc.clone();
161
162    let mut resp = httpc_copy
163        .build_request(&url, AsyncHttpRequestMethod::Post)
164        .add_header("Content-Type", "application/json")
165        .add_header("x-log-apiversion", "0.6.0")
166        .add_header("x-log-bodyrawsize", "0")
167        .set_body(body)
168        .send()
169        .await?;
170    let mut bodyVec: Vec<u8> = Vec::new();
171    loop {
172        match resp.get_body().await? {
173            Some(body) => {
174                bodyVec.write_all(&*body);
175            }
176            None => {
177                return Ok(bodyVec);
178            }
179        }
180    }
181}