use futures::Stream;
use std::{pin::Pin, time::Duration};
use tokio::{
fs,
io::{AsyncReadExt, AsyncSeekExt},
time,
};
use crate::{utils::LOG_SUBDIR, MonocoreResult};
use super::Orchestrator;
type BoxedStream = Pin<Box<dyn Stream<Item = MonocoreResult<String>> + Send>>;
impl Orchestrator {
pub async fn view_logs(
&self,
service_name: &str,
lines: Option<usize>,
follow: bool,
) -> MonocoreResult<BoxedStream> {
let log_dir = self.home_dir.join(LOG_SUBDIR);
if !fs::try_exists(&log_dir).await? {
let msg = format!("No logs found for service '{}'", service_name);
return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok(
msg,
)))));
}
let log_path = log_dir.join(format!("{}.stdout.log", service_name));
if !fs::try_exists(&log_path).await? {
let msg = format!("No logs found for service '{}'", service_name);
return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok(
msg,
)))));
}
let content = fs::read_to_string(&log_path).await?;
let initial_content = if let Some(n) = lines {
let lines: Vec<&str> = content.lines().collect();
let start = if lines.len() > n { lines.len() - n } else { 0 };
lines[start..].join("\n")
} else {
content
};
if !follow {
return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok(
initial_content,
)))));
}
let log_path_clone = log_path.clone();
let stream = async_stream::stream! {
yield Ok(initial_content);
let mut last_size = fs::metadata(&log_path_clone).await?.len();
let mut interval = time::interval(Duration::from_millis(100));
loop {
interval.tick().await;
if !fs::try_exists(&log_path_clone).await? {
break;
}
let metadata = fs::metadata(&log_path_clone).await?;
let current_size = metadata.len();
if current_size > last_size {
let mut file = fs::File::open(&log_path_clone).await?;
file.seek(std::io::SeekFrom::Start(last_size)).await?;
let mut new_content = String::new();
file.read_to_string(&mut new_content).await?;
last_size = current_size;
yield Ok(new_content);
}
}
};
Ok(Box::pin(stream))
}
}