use anyhow::bail;
use serde::Deserialize;
use tokio::io::{AsyncBufReadExt, AsyncRead};
use tracing::{debug, error};
#[derive(Debug)]
pub enum SendError {
ChannelClosed,
Abnormal(anyhow::Error),
}
impl From<std::io::Error> for SendError {
fn from(error: std::io::Error) -> Self {
SendError::Abnormal(anyhow::Error::new(error))
}
}
impl std::fmt::Display for SendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SendError::ChannelClosed => write!(f, "ChannelClosed"),
SendError::Abnormal(e) => write!(f, "{}", e),
}
}
}
impl std::error::Error for SendError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
SendError::ChannelClosed => None,
SendError::Abnormal(e) => Some(e.root_cause()),
}
}
}
#[derive(Debug, Deserialize)]
pub struct Options {
#[serde(rename = "tailLines")]
pub tail: Option<usize>,
#[serde(default)]
pub follow: bool,
}
pub struct Sender {
sender: hyper::body::Sender,
opts: Options,
}
impl Sender {
pub fn new(sender: hyper::body::Sender, opts: Options) -> Self {
Sender { sender, opts }
}
pub fn tail(&self) -> Option<usize> {
self.opts.tail
}
pub fn follow(&self) -> bool {
self.opts.follow
}
pub async fn send(&mut self, data: String) -> Result<(), SendError> {
let b: hyper::body::Bytes = data.into();
self.sender.send_data(b).await.map_err(|e| {
if e.is_closed() {
debug!("channel closed.");
SendError::ChannelClosed
} else {
error!("channel error: {}", e);
SendError::Abnormal(anyhow::Error::new(e))
}
})
}
}
async fn tail<R: AsyncRead + std::marker::Unpin>(
lines: &mut tokio::io::Lines<tokio::io::BufReader<R>>,
sender: &mut Sender,
n: usize,
) -> Result<(), SendError> {
let mut line_buf = std::collections::VecDeque::with_capacity(n);
while let Some(line) = match lines.next_line().await {
Ok(line) => line,
Err(e) => {
let err = format!("Error reading from log: {:?}", e);
error!("{}", &err);
sender.send(err).await?;
return Err(e.into());
}
} {
if line_buf.len() == n {
line_buf.pop_front();
}
line_buf.push_back(line);
}
for mut line in line_buf {
line.push('\n');
sender.send(line).await?;
}
Ok(())
}
async fn stream_to_end<R: AsyncRead + std::marker::Unpin>(
lines: &mut tokio::io::Lines<tokio::io::BufReader<R>>,
sender: &mut Sender,
) -> Result<(), SendError> {
while let Some(mut line) = match lines.next_line().await {
Ok(line) => line,
Err(e) => {
let err = format!("Error reading from log: {:?}", e);
error!("{}", &err);
sender.send(err).await?;
return Err(e.into());
}
} {
line.push('\n');
sender.send(line).await?;
}
Ok(())
}
pub async fn stream<R: AsyncRead + std::marker::Unpin>(
handle: R,
mut sender: Sender,
) -> anyhow::Result<()> {
let buf = tokio::io::BufReader::new(handle);
let mut lines = buf.lines();
if let Some(n) = sender.tail() {
match tail(&mut lines, &mut sender, n).await {
Ok(_) => (),
Err(SendError::ChannelClosed) => return Ok(()),
Err(SendError::Abnormal(e)) => bail!(e),
}
} else {
match stream_to_end(&mut lines, &mut sender).await {
Ok(_) => (),
Err(SendError::ChannelClosed) => return Ok(()),
Err(SendError::Abnormal(e)) => bail!(e),
}
}
if sender.follow() {
loop {
match stream_to_end(&mut lines, &mut sender).await {
Ok(_) => (),
Err(SendError::ChannelClosed) => return Ok(()),
Err(SendError::Abnormal(e)) => bail!(e),
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
Ok(())
}
pub trait HandleFactory<R>: Sync + Send {
fn new_handle(&self) -> R;
}