use derivative::Derivative;
use io::ErrorKind;
use log::Record;
use crossbeam_channel as crossbeam;
use log4rs::{
append::Append,
config::{Deserialize, Deserializers},
encode::{
self, pattern::PatternEncoder, writer::simple::SimpleWriter, Encode, EncoderConfig, Style,
},
};
use async_httpc::{
AsyncHttpRequest, AsyncHttpRequestBody, AsyncHttpRequestMethod, AsyncHttpc, AsyncHttpcBuilder,
};
use pi_async_rt::rt::{
multi_thread::{MultiTaskRuntime, MultiTaskRuntimeBuilder, StealableTaskPool},
single_thread::{SingleTaskRunner, SingleTaskRuntime},
spawn_worker_thread, AsyncRuntime,
};
use lazy_static::lazy_static;
use std::sync::RwLock;
use std::{
fmt::{self, Debug},
fs::{self, File, OpenOptions},
io::{self, BufWriter, Error, Write},
path::{Path, PathBuf},
sync::Arc,
thread,
time::{Duration, Instant},
};
#[derive(Derivative)]
#[derivative(Debug)]
pub struct SLSAppender {
write: BatchHttpWrite,
encoder: Box<dyn Encode>,
}
impl Append for SLSAppender {
fn append(&self, record: &Record) -> anyhow::Result<()> {
let mut write = self.write.clone();
self.encoder.encode(&mut write, record)?;
write.flush()?;
Ok(())
}
fn flush(&self) {}
}
impl SLSAppender {
pub fn builder() -> SLSAppenderBuilder {
SLSAppenderBuilder {
encoder: None,
append: true,
batch_config: None,
source: None,
}
}
}
pub struct SLSAppenderBuilder {
encoder: Option<Box<dyn Encode>>,
append: bool,
batch_config: Option<SLBatchConfig>,
source: Option<String>,
}
impl SLSAppenderBuilder {
pub fn encoder(mut self, encoder: Box<dyn Encode>) -> SLSAppenderBuilder {
self.encoder = Some(encoder);
self
}
pub fn append(mut self, append: bool) -> SLSAppenderBuilder {
self.append = append;
self
}
pub fn batch_config(mut self, config: SLBatchConfig) -> SLSAppenderBuilder {
self.batch_config = Some(config);
self
}
pub fn source(mut self, source: String) -> SLSAppenderBuilder {
self.source = Some(source);
self
}
pub fn build(
self,
url: String,
rt: MultiTaskRuntime<()>,
httpc: AsyncHttpc,
) -> io::Result<SLSAppender> {
let config = self.batch_config.unwrap_or_default();
let source = self.source.unwrap_or_else(|| "pi_logger".to_string());
let batch_write = BatchHttpWrite::new(url, rt, httpc, config, source);
Ok(SLSAppender {
write: batch_write,
encoder: self
.encoder
.unwrap_or_else(|| Box::new(PatternEncoder::default())),
})
}
}
#[derive(Clone)]
struct HttpWrite {
buf: Vec<u8>,
rt: MultiTaskRuntime<()>,
url: String,
httpc: AsyncHttpc,
}
impl Debug for HttpWrite {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Ok(())
}
}
impl io::Write for HttpWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buf.write_all(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
let httpc = self.httpc.clone();
let buf = self.buf.clone();
let url = self.url.clone();
let rt = self.rt.clone();
self.rt.spawn(async move {
http_request(httpc, url, buf, rt).await;
});
self.buf.clear();
Ok(())
}
}
impl encode::Write for HttpWrite {
fn set_style(&mut self, style: &Style) -> io::Result<()> {
Ok(())
}
}
pub async fn http_request(
httpc: AsyncHttpc,
url: String,
body: Vec<u8>,
rt: MultiTaskRuntime<()>,
) -> io::Result<Vec<u8>> {
let body = AsyncHttpRequestBody::with_binary(body);
let httpc_copy = httpc.clone();
let mut resp = httpc_copy
.build_request(&url, AsyncHttpRequestMethod::Post)
.add_header("Content-Type", "application/json")
.add_header("x-log-apiversion", "0.6.0")
.add_header("x-log-bodyrawsize", "0")
.set_body(body)
.send()
.await?;
let mut bodyVec: Vec<u8> = Vec::new();
loop {
match resp.get_body().await? {
Some(body) => {
bodyVec.write_all(&*body);
}
None => {
return Ok(bodyVec);
}
}
}
}
#[derive(Clone, Debug)]
pub struct SLBatchConfig {
pub batch_bytes: usize,
pub timeout_secs: u64,
pub max_retries: usize,
pub retry_delay_ms: u64,
}
impl Default for SLBatchConfig {
fn default() -> Self {
Self {
batch_bytes: 2 * 1024 * 1024, timeout_secs: 1,
max_retries: 3,
retry_delay_ms: 1000,
}
}
}
#[derive(Clone)]
struct BatchHttpWrite {
sender: crossbeam::Sender<String>,
config: SLBatchConfig,
buffer: Vec<u8>,
source: String,
}
impl BatchHttpWrite {
fn new(
url: String,
rt: MultiTaskRuntime<()>,
httpc: AsyncHttpc,
config: SLBatchConfig,
source: String,
) -> Self {
let (sender, receiver) = crossbeam::unbounded();
start_batch_processor(receiver, url, rt, httpc, config.clone(), source.clone());
Self {
sender,
config,
buffer: Vec::new(),
source,
}
}
}
impl Debug for BatchHttpWrite {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Ok(())
}
}
impl io::Write for BatchHttpWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buffer.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
if !self.buffer.is_empty() {
let data = String::from_utf8_lossy(&self.buffer).to_string();
self.sender.send(data)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.buffer.clear();
}
Ok(())
}
}
impl encode::Write for BatchHttpWrite {
fn set_style(&mut self, _style: &Style) -> io::Result<()> {
Ok(())
}
}
fn start_batch_processor(
receiver: crossbeam::Receiver<String>,
url: String,
rt: MultiTaskRuntime<()>,
httpc: AsyncHttpc,
config: SLBatchConfig,
source: String,
) {
rt.clone().spawn(async move {
let mut buffer = Vec::with_capacity(128);
let mut last_flush = Instant::now();
let mut current_bytes = 0;
loop {
if !buffer.is_empty() &&
last_flush.elapsed() >= Duration::from_secs(config.timeout_secs) {
send_batch_with_retry(&buffer, &httpc, &url, &rt, &config, &source).await;
buffer.clear();
current_bytes = 0;
last_flush = Instant::now();
}
let mut received_any = false;
for log_entry in receiver.try_iter() {
received_any = true;
let entry_bytes = log_entry.len();
current_bytes += entry_bytes;
buffer.push(log_entry);
if current_bytes >= config.batch_bytes {
send_batch_with_retry(&buffer, &httpc, &url, &rt, &config, &source).await;
buffer.clear();
current_bytes = 0;
last_flush = Instant::now();
}
}
if !received_any && buffer.is_empty() {
rt.timeout(100).await;
} else if !received_any {
rt.timeout(10).await;
}
}
});
}
async fn send_batch_with_retry(
logs: &[String],
httpc: &AsyncHttpc,
url: &str,
rt: &MultiTaskRuntime<()>,
config: &SLBatchConfig,
source: &str,
) {
let payload = build_batch_payload(logs, source);
for retry in 0..=config.max_retries {
match http_send_batch(httpc, url, &payload).await {
Ok(resp_body) => {
if let Ok(resp_str) = String::from_utf8(resp_body) {
if resp_str.contains("\"errorCode\"") || resp_str.contains("\"error\"") {
eprintln!("[SLS Appender] Warning: SLS returned error response: {}", resp_str);
}
}
return; }
Err(e) if retry < config.max_retries => {
eprintln!("[SLS Appender] Warning: Send failed (attempt {}/{}), error: {:?}. Retrying in {}ms...",
retry + 1, config.max_retries + 1, e, config.retry_delay_ms);
rt.timeout(config.retry_delay_ms as usize).await;
}
Err(e) => {
eprintln!("[SLS Appender] Error: Failed to send logs after {} retries. Final error: {:?}",
config.max_retries + 1, e);
eprintln!("[SLS Appender] Failed to send {} log entries to: {}", logs.len(), url);
return;
}
}
}
}
fn convert_values_to_strings(value: &serde_json::Value) -> serde_json::Value {
match value {
serde_json::Value::String(s) => serde_json::Value::String(s.clone()),
serde_json::Value::Number(n) => serde_json::Value::String(n.to_string()),
serde_json::Value::Bool(b) => serde_json::Value::String(b.to_string()),
serde_json::Value::Null => serde_json::Value::String("null".to_string()),
serde_json::Value::Array(arr) => {
let converted: Vec<serde_json::Value> = arr.iter().map(convert_values_to_strings).collect();
let all_strings = converted.iter().all(|v| matches!(v, serde_json::Value::String(_)));
if all_strings {
serde_json::Value::Array(converted)
} else {
serde_json::Value::String(serde_json::to_string(&converted).unwrap_or_default())
}
}
serde_json::Value::Object(obj) => {
let mut new_obj = serde_json::Map::new();
for (k, v) in obj.iter() {
let converted = convert_values_to_strings(v);
let final_value = match &converted {
serde_json::Value::String(_) => converted,
serde_json::Value::Array(arr) => {
let all_strings = arr.iter().all(|v| matches!(v, serde_json::Value::String(_)));
if all_strings {
converted
} else {
serde_json::Value::String(serde_json::to_string(&converted).unwrap_or_default())
}
}
serde_json::Value::Object(_) => {
serde_json::Value::String(serde_json::to_string(&converted).unwrap_or_default())
}
_ => serde_json::Value::String(serde_json::to_string(&converted).unwrap_or_default()),
};
new_obj.insert(k.clone(), final_value);
}
serde_json::Value::Object(new_obj)
}
}
}
fn build_batch_payload(logs: &[String], source: &str) -> String {
let mut log_entries = Vec::new();
let mut skipped = 0;
for entry in logs.iter() {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(entry) {
let converted = convert_values_to_strings(&v);
log_entries.push(converted);
continue;
}
let mut has_valid_entry = false;
for line in entry.lines() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
let converted = convert_values_to_strings(&v);
log_entries.push(converted);
has_valid_entry = true;
} else {
skipped += 1;
eprintln!("[SLS Appender] Warning: Failed to parse log line as JSON: {}", trimmed);
}
}
if !has_valid_entry {
skipped += 1;
}
}
if skipped > 0 {
eprintln!("[SLS Appender] Warning: Skipped {} invalid log entries", skipped);
}
serde_json::json!({
"__source__": source,
"__logs__": log_entries
}).to_string()
}
async fn http_send_batch(
httpc: &AsyncHttpc,
url: &str,
body: &str,
) -> io::Result<Vec<u8>> {
let body_bytes = body.as_bytes();
let body_req = AsyncHttpRequestBody::with_binary(body_bytes.to_vec());
let mut resp = httpc
.build_request(url, AsyncHttpRequestMethod::Post)
.add_header("Content-Type", "application/json")
.add_header("x-log-apiversion", "0.6.0")
.add_header("x-log-bodyrawsize", &body_bytes.len().to_string())
.set_body(body_req)
.send()
.await?;
let mut body_vec = Vec::new();
loop {
match resp.get_body().await? {
Some(b) => {
body_vec.write_all(&*b)?;
}
None => {
return Ok(body_vec);
}
}
}
}