1use derivative::Derivative;
2use io::ErrorKind;
3use log::Record;
4
5use log4rs::{
6 append::Append,
7 config::{Deserialize, Deserializers},
8 encode::{
9 self, pattern::PatternEncoder, writer::simple::SimpleWriter, Encode, EncoderConfig, Style,
10 },
11};
12
13use async_httpc::{
14 AsyncHttpRequest, AsyncHttpRequestBody, AsyncHttpRequestMethod, AsyncHttpc, AsyncHttpcBuilder,
15};
16
17use pi_async_rt::rt::{
18 multi_thread::{MultiTaskRuntime, MultiTaskRuntimeBuilder, StealableTaskPool},
19 single_thread::{SingleTaskRunner, SingleTaskRuntime},
20 spawn_worker_thread, AsyncRuntime,
21};
22
23use lazy_static::lazy_static;
24use std::sync::RwLock;
25use std::{
26 fmt::{self, Debug},
27 fs::{self, File, OpenOptions},
28 io::{self, BufWriter, Error, Write},
29 path::{Path, PathBuf},
30 sync::Arc,
31 thread,
32 time::Duration,
33};
34
35#[derive(Derivative)]
37#[derivative(Debug)]
38pub struct SLSAppender {
39 write: HttpWrite,
40 encoder: Box<dyn Encode>,
41}
42
43impl Append for SLSAppender {
44 fn append(&self, record: &Record) -> anyhow::Result<()> {
45 let mut write = self.write.clone();
46 self.encoder.encode(&mut write, record)?;
47 write.flush()?;
48 Ok(())
49 }
50
51 fn flush(&self) {}
52}
53
54impl SLSAppender {
55 pub fn builder() -> SLSAppenderBuilder {
57 SLSAppenderBuilder {
58 encoder: None,
59 append: true,
60 }
61 }
62}
63
64pub struct SLSAppenderBuilder {
66 encoder: Option<Box<dyn Encode>>,
67 append: bool,
68}
69
70impl SLSAppenderBuilder {
71 pub fn encoder(mut self, encoder: Box<dyn Encode>) -> SLSAppenderBuilder {
73 self.encoder = Some(encoder);
74 self
75 }
76
77 pub fn append(mut self, append: bool) -> SLSAppenderBuilder {
81 self.append = append;
82 self
83 }
84
85 pub fn build(
91 self,
92 url: String,
93 rt: MultiTaskRuntime<()>,
94 httpc: AsyncHttpc,
95 ) -> io::Result<SLSAppender> {
96 let http_w = HttpWrite {
97 url,
98 rt,
99 buf: Vec::new(),
100 httpc,
101 };
102
103 Ok(SLSAppender {
104 write: http_w,
105 encoder: self
106 .encoder
107 .unwrap_or_else(|| Box::new(PatternEncoder::default())),
108 })
109 }
110}
111
112#[derive(Clone)]
113struct HttpWrite {
114 buf: Vec<u8>,
115 rt: MultiTaskRuntime<()>,
116 url: String,
117 httpc: AsyncHttpc,
118}
119
120impl Debug for HttpWrite {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 Ok(())
123 }
124}
125
126impl io::Write for HttpWrite {
127 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
128 self.buf.write_all(buf);
129 Ok(buf.len())
130 }
131
132 fn flush(&mut self) -> io::Result<()> {
133 let httpc = self.httpc.clone();
134 let buf = self.buf.clone();
135 let url = self.url.clone();
136 let rt = self.rt.clone();
137 self.rt.spawn(async move {
138 http_request(httpc, url, buf, rt).await;
139 });
140 self.buf.clear();
141 Ok(())
142 }
143}
144
145impl encode::Write for HttpWrite {
146 fn set_style(&mut self, style: &Style) -> io::Result<()> {
147 Ok(())
149 }
150}
151
152pub async fn http_request(
154 httpc: AsyncHttpc,
155 url: String,
156 body: Vec<u8>,
157 rt: MultiTaskRuntime<()>,
158) -> io::Result<Vec<u8>> {
159 let body = AsyncHttpRequestBody::with_binary(body);
160 let httpc_copy = httpc.clone();
161
162 let mut resp = httpc_copy
163 .build_request(&url, AsyncHttpRequestMethod::Post)
164 .add_header("Content-Type", "application/json")
165 .add_header("x-log-apiversion", "0.6.0")
166 .add_header("x-log-bodyrawsize", "0")
167 .set_body(body)
168 .send()
169 .await?;
170 let mut bodyVec: Vec<u8> = Vec::new();
171 loop {
172 match resp.get_body().await? {
173 Some(body) => {
174 bodyVec.write_all(&*body);
175 }
176 None => {
177 return Ok(bodyVec);
178 }
179 }
180 }
181}