monocore/orchestration/
log.rs

1use futures::Stream;
2use std::{pin::Pin, time::Duration};
3use tokio::{
4    fs,
5    io::{AsyncReadExt, AsyncSeekExt},
6    time,
7};
8
9use crate::{utils::LOG_SUBDIR, MonocoreResult};
10
11use super::Orchestrator;
12
13type BoxedStream = Pin<Box<dyn Stream<Item = MonocoreResult<String>> + Send>>;
14
15//--------------------------------------------------------------------------------------------------
16// Methods
17//--------------------------------------------------------------------------------------------------
18
19impl Orchestrator {
20    /// View logs for a specific service
21    ///
22    /// # Arguments
23    /// * `service_name` - Name of the service to view logs for
24    /// * `lines` - Optional number of lines to show from the end
25    /// * `follow` - Whether to continuously follow the log output
26    pub async fn view_logs(
27        &self,
28        service_name: &str,
29        lines: Option<usize>,
30        follow: bool,
31    ) -> MonocoreResult<BoxedStream> {
32        // Get log directory from home directory
33        let log_dir = self.home_dir.join(LOG_SUBDIR);
34
35        // Ensure log directory exists
36        if !fs::try_exists(&log_dir).await? {
37            let msg = format!("No logs found for service '{}'", service_name);
38            return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok(
39                msg,
40            )))));
41        }
42
43        let log_path = log_dir.join(format!("{}.stdout.log", service_name));
44
45        // Check if log file exists
46        if !fs::try_exists(&log_path).await? {
47            let msg = format!("No logs found for service '{}'", service_name);
48            return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok(
49                msg,
50            )))));
51        }
52
53        // Read initial content
54        let content = fs::read_to_string(&log_path).await?;
55        let initial_content = if let Some(n) = lines {
56            let lines: Vec<&str> = content.lines().collect();
57            let start = if lines.len() > n { lines.len() - n } else { 0 };
58            lines[start..].join("\n")
59        } else {
60            content
61        };
62
63        if !follow {
64            return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok(
65                initial_content,
66            )))));
67        }
68
69        // For follow mode, create a stream that continuously reads the file
70        let log_path_clone = log_path.clone();
71        let stream = async_stream::stream! {
72            // Send initial content
73            yield Ok(initial_content);
74
75            let mut last_size = fs::metadata(&log_path_clone).await?.len();
76            let mut interval = time::interval(Duration::from_millis(100));
77
78            loop {
79                interval.tick().await;
80
81                // Check if file still exists
82                if !fs::try_exists(&log_path_clone).await? {
83                    break;
84                }
85
86                let metadata = fs::metadata(&log_path_clone).await?;
87                let current_size = metadata.len();
88
89                if current_size > last_size {
90                    // Read only the new content
91                    let mut file = fs::File::open(&log_path_clone).await?;
92                    file.seek(std::io::SeekFrom::Start(last_size)).await?;
93                    let mut new_content = String::new();
94                    file.read_to_string(&mut new_content).await?;
95                    last_size = current_size;
96                    yield Ok(new_content);
97                }
98            }
99        };
100
101        Ok(Box::pin(stream))
102    }
103}