proc_heim/process/
log_reader.rs

1use std::{
2    io::{self},
3    path::PathBuf,
4};
5
6use tokio::{
7    fs::File,
8    io::{AsyncBufReadExt as _, BufReader},
9    sync::{mpsc, oneshot},
10};
11use tokio_stream::{wrappers::LinesStream, StreamExt as TokioStreamExt};
12
13/// Type describing how to fetch logs from spawned process.
14#[derive(Debug, Clone, Default, PartialEq, Eq)]
15pub struct LogsQuery {
16    offset: usize,
17    limit: Option<usize>,
18}
19
20impl LogsQuery {
21    /// Fetch all logs.
22    pub fn fetch_all() -> Self {
23        Self::default()
24    }
25
26    /// Fetch logs with given limit. It will retrieve a maximum of `limit` logs starting from first one.
27    /// # Examples
28    /// ```
29    /// # use proc_heim::manager::LogsQuery;
30    /// // assume logs are: [log0, log1, log2, log3, log4]
31    /// LogsQuery::with_limit(2); // [log0, log1]
32    /// ```
33    pub fn with_limit(limit: usize) -> Self {
34        Self {
35            offset: 0,
36            limit: limit.into(),
37        }
38    }
39
40    /// Fetch logs with given offset. It will retrieve logs starting from `offset` to the last produced log.
41    /// # Examples
42    /// ```
43    /// # use proc_heim::manager::LogsQuery;
44    /// // assume logs are: [log0, log1, log2, log3, log4]
45    /// LogsQuery::with_offset(2); // [log2, log3, log4]
46    /// ```
47    pub fn with_offset(offset: usize) -> Self {
48        Self {
49            offset,
50            limit: None,
51        }
52    }
53
54    /// Fetch logs with given offset and limit.
55    /// # Examples
56    /// ```
57    /// # use proc_heim::manager::LogsQuery;
58    /// // assume logs are: [log0, log1, log2, log3, log4]
59    /// LogsQuery::with_offset_and_limit(2, 2); // [log2, log3]
60    /// ```
61    pub fn with_offset_and_limit(offset: usize, limit: usize) -> Self {
62        Self {
63            offset,
64            limit: limit.into(),
65        }
66    }
67}
68
69#[derive(Debug)]
70pub(crate) enum LogsQueryType {
71    Stdout,
72    Stderr,
73}
74
75impl ToString for LogsQueryType {
76    fn to_string(&self) -> String {
77        match self {
78            LogsQueryType::Stdout => "STDOUT",
79            LogsQueryType::Stderr => "STDERR",
80        }
81        .into()
82    }
83}
84
85#[allow(dead_code)] // used in spawner tests
86pub(crate) enum LogSettingsQuery {
87    Stdout,
88    Stderr,
89    Merged,
90}
91
92enum LogReaderCommand {
93    GetLogs {
94        logs_query_type: LogsQueryType,
95        query: LogsQuery,
96        responder: oneshot::Sender<Result<Vec<String>, LogReaderError>>,
97    },
98    Abort,
99    CheckSettings {
100        query: LogSettingsQuery,
101        responder: oneshot::Sender<bool>,
102    },
103}
104
105pub struct LogReader {
106    logs_stdout: Option<PathBuf>,
107    logs_stderr: Option<PathBuf>,
108    logs_merged: Option<PathBuf>,
109    receiver: mpsc::Receiver<LogReaderCommand>,
110}
111
112impl LogReader {
113    pub fn spawn(
114        logs_stdout: Option<PathBuf>,
115        logs_stderr: Option<PathBuf>,
116        logs_merged: Option<PathBuf>,
117    ) -> LogReaderHandle {
118        let (sender, receiver) = mpsc::channel(32);
119        let mut reader = Self {
120            logs_stdout,
121            logs_stderr,
122            logs_merged,
123            receiver,
124        };
125        tokio::spawn(async move { reader.run().await });
126        LogReaderHandle::new(sender)
127    }
128
129    async fn run(&mut self) {
130        while let Some(command) = self.receiver.recv().await {
131            match command {
132                LogReaderCommand::GetLogs {
133                    logs_query_type,
134                    query,
135                    responder,
136                } => {
137                    let result = self.get_logs(logs_query_type, query).await;
138                    let _ = responder.send(result);
139                }
140                LogReaderCommand::CheckSettings { query, responder } => {
141                    let response = self.is_log_type_set(&query);
142                    let _ = responder.send(response);
143                }
144                LogReaderCommand::Abort => break,
145            }
146        }
147    }
148
149    async fn get_logs(
150        &self,
151        logs_query_type: LogsQueryType,
152        query: LogsQuery,
153    ) -> Result<Vec<String>, LogReaderError> {
154        let log_file_path = self
155            .get_log_file_path(&logs_query_type)
156            .ok_or(LogReaderError::LogTypeWasNotConfigured(logs_query_type))?;
157
158        let file = File::open(log_file_path).await?;
159        let reader = BufReader::new(file);
160
161        if let Some(limit) = query.limit {
162            Self::read_logs(reader, query.offset, limit).await
163        } else {
164            Self::read_logs_to_end(reader, query.offset).await
165        }
166        .map_err(Into::into)
167    }
168
169    fn get_log_file_path(&self, logs_query_type: &LogsQueryType) -> Option<&PathBuf> {
170        match logs_query_type {
171            LogsQueryType::Stdout => self.logs_stdout.as_ref(),
172            LogsQueryType::Stderr => self.logs_stderr.as_ref(),
173        }
174        .or(self.logs_merged.as_ref())
175    }
176
177    async fn read_logs_to_end(reader: BufReader<File>, offset: usize) -> io::Result<Vec<String>> {
178        let logs = LinesStream::new(reader.lines())
179            .skip(offset)
180            .then(|result| async move { result.ok() })
181            .filter_map(|line_opt| line_opt)
182            .collect()
183            .await;
184        Ok(logs)
185    }
186
187    async fn read_logs(
188        reader: BufReader<File>,
189        offset: usize,
190        limit: usize,
191    ) -> io::Result<Vec<String>> {
192        let logs = LinesStream::new(reader.lines())
193            .skip(offset)
194            .take(limit)
195            .then(|result| async move { result.ok() })
196            .filter_map(|line_opt| line_opt)
197            .collect()
198            .await;
199        Ok(logs)
200    }
201
202    fn is_log_type_set(&self, query: &LogSettingsQuery) -> bool {
203        match query {
204            LogSettingsQuery::Stdout => self.logs_stdout.as_ref(),
205            LogSettingsQuery::Stderr => self.logs_stderr.as_ref(),
206            LogSettingsQuery::Merged => self.logs_merged.as_ref(),
207        }
208        .is_some()
209    }
210}
211
212#[derive(Debug)]
213pub(crate) struct LogReaderHandle {
214    sender: mpsc::Sender<LogReaderCommand>,
215}
216
217impl LogReaderHandle {
218    fn new(sender: mpsc::Sender<LogReaderCommand>) -> Self {
219        Self { sender }
220    }
221
222    pub async fn read_logs(
223        &self,
224        logs_query_type: LogsQueryType,
225        query: LogsQuery,
226    ) -> Result<Vec<String>, LogReaderError> {
227        let (responder, receiver) = oneshot::channel();
228        let cmd = LogReaderCommand::GetLogs {
229            logs_query_type,
230            query,
231            responder,
232        };
233        let _ = self.sender.send(cmd).await;
234        match receiver.await {
235            Ok(result) => result,
236            Err(_) => Ok(vec![]), // ignore error, LogReader is killed
237        }
238    }
239
240    pub async fn abort(&self) {
241        let _ = self.sender.send(LogReaderCommand::Abort).await;
242    }
243
244    #[allow(dead_code)] // used in spawner tests
245    pub(crate) async fn check_logs_settings(
246        &self,
247        query: LogSettingsQuery,
248    ) -> Result<bool, oneshot::error::RecvError> {
249        let (responder, receiver) = oneshot::channel();
250        let cmd: LogReaderCommand = LogReaderCommand::CheckSettings { query, responder };
251        let _ = self.sender.send(cmd).await;
252        receiver.await
253    }
254}
255
256#[derive(thiserror::Error, Debug)]
257pub(crate) enum LogReaderError {
258    #[error("Log type was not configured for process with id: {0:?}")]
259    LogTypeWasNotConfigured(LogsQueryType),
260    #[error(transparent)]
261    UnExpectedIoError(#[from] io::Error),
262}