monocore/orchestration/
log.rs1use futures::Stream;
2use std::{pin::Pin, time::Duration};
3use tokio::{
4 fs,
5 io::{AsyncReadExt, AsyncSeekExt},
6 time,
7};
8
9use crate::{utils::LOG_SUBDIR, MonocoreResult};
10
11use super::Orchestrator;
12
13type BoxedStream = Pin<Box<dyn Stream<Item = MonocoreResult<String>> + Send>>;
14
15impl Orchestrator {
20 pub async fn view_logs(
27 &self,
28 service_name: &str,
29 lines: Option<usize>,
30 follow: bool,
31 ) -> MonocoreResult<BoxedStream> {
32 let log_dir = self.home_dir.join(LOG_SUBDIR);
34
35 if !fs::try_exists(&log_dir).await? {
37 let msg = format!("No logs found for service '{}'", service_name);
38 return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok(
39 msg,
40 )))));
41 }
42
43 let log_path = log_dir.join(format!("{}.stdout.log", service_name));
44
45 if !fs::try_exists(&log_path).await? {
47 let msg = format!("No logs found for service '{}'", service_name);
48 return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok(
49 msg,
50 )))));
51 }
52
53 let content = fs::read_to_string(&log_path).await?;
55 let initial_content = if let Some(n) = lines {
56 let lines: Vec<&str> = content.lines().collect();
57 let start = if lines.len() > n { lines.len() - n } else { 0 };
58 lines[start..].join("\n")
59 } else {
60 content
61 };
62
63 if !follow {
64 return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok(
65 initial_content,
66 )))));
67 }
68
69 let log_path_clone = log_path.clone();
71 let stream = async_stream::stream! {
72 yield Ok(initial_content);
74
75 let mut last_size = fs::metadata(&log_path_clone).await?.len();
76 let mut interval = time::interval(Duration::from_millis(100));
77
78 loop {
79 interval.tick().await;
80
81 if !fs::try_exists(&log_path_clone).await? {
83 break;
84 }
85
86 let metadata = fs::metadata(&log_path_clone).await?;
87 let current_size = metadata.len();
88
89 if current_size > last_size {
90 let mut file = fs::File::open(&log_path_clone).await?;
92 file.seek(std::io::SeekFrom::Start(last_size)).await?;
93 let mut new_content = String::new();
94 file.read_to_string(&mut new_content).await?;
95 last_size = current_size;
96 yield Ok(new_content);
97 }
98 }
99 };
100
101 Ok(Box::pin(stream))
102 }
103}