use std::io::{self, Write};
use async_trait::async_trait;
use bytes::Bytes;
use pingora::{
server::{ListenFds, ShutdownWatch},
services::Service,
};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing_subscriber::fmt::MakeWriter;
#[derive(Debug, Clone)]
pub struct StdoutWriter(UnboundedSender<Bytes>);
impl io::Write for StdoutWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Ok(()) = self.0.send(Bytes::copy_from_slice(buf)) {
return Ok(buf.len());
}
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
io::stdout().flush()
}
}
#[derive(Debug)]
pub struct ProxyLog {
stdout: StdoutWriter,
}
impl ProxyLog {
pub fn new(sender: &UnboundedSender<Bytes>) -> Self {
ProxyLog {
stdout: StdoutWriter(sender.clone()),
}
}
}
impl<'a> MakeWriter<'a> for ProxyLog {
type Writer = StdoutWriter;
fn make_writer(&'a self) -> Self::Writer {
self.stdout.clone()
}
}
pub struct ProxyLoggerReceiver {
receiver: UnboundedReceiver<Bytes>,
}
impl ProxyLoggerReceiver {
pub fn new(receiver: UnboundedReceiver<Bytes>) -> Self {
ProxyLoggerReceiver { receiver }
}
}
#[async_trait]
impl Service for ProxyLoggerReceiver {
async fn start_service(&mut self, _fds: Option<ListenFds>, _shutdown: ShutdownWatch) {
loop {
if let Some(buf) = self.receiver.recv().await {
io::stdout().write_all(&buf).unwrap();
}
}
}
fn name(&self) -> &str {
"ProxyLogger"
}
fn threads(&self) -> Option<usize> {
Some(1)
}
}