1use std::{
2 io::{self},
3 path::PathBuf,
4};
5
6use tokio::{
7 fs::File,
8 io::{AsyncBufReadExt as _, BufReader},
9 sync::{mpsc, oneshot},
10};
11use tokio_stream::{wrappers::LinesStream, StreamExt as TokioStreamExt};
12
13#[derive(Debug, Clone, Default, PartialEq, Eq)]
15pub struct LogsQuery {
16 offset: usize,
17 limit: Option<usize>,
18}
19
20impl LogsQuery {
21 pub fn fetch_all() -> Self {
23 Self::default()
24 }
25
26 pub fn with_limit(limit: usize) -> Self {
34 Self {
35 offset: 0,
36 limit: limit.into(),
37 }
38 }
39
40 pub fn with_offset(offset: usize) -> Self {
48 Self {
49 offset,
50 limit: None,
51 }
52 }
53
54 pub fn with_offset_and_limit(offset: usize, limit: usize) -> Self {
62 Self {
63 offset,
64 limit: limit.into(),
65 }
66 }
67}
68
69#[derive(Debug)]
70pub(crate) enum LogsQueryType {
71 Stdout,
72 Stderr,
73}
74
75impl ToString for LogsQueryType {
76 fn to_string(&self) -> String {
77 match self {
78 LogsQueryType::Stdout => "STDOUT",
79 LogsQueryType::Stderr => "STDERR",
80 }
81 .into()
82 }
83}
84
85#[allow(dead_code)] pub(crate) enum LogSettingsQuery {
87 Stdout,
88 Stderr,
89 Merged,
90}
91
92enum LogReaderCommand {
93 GetLogs {
94 logs_query_type: LogsQueryType,
95 query: LogsQuery,
96 responder: oneshot::Sender<Result<Vec<String>, LogReaderError>>,
97 },
98 Abort,
99 CheckSettings {
100 query: LogSettingsQuery,
101 responder: oneshot::Sender<bool>,
102 },
103}
104
105pub struct LogReader {
106 logs_stdout: Option<PathBuf>,
107 logs_stderr: Option<PathBuf>,
108 logs_merged: Option<PathBuf>,
109 receiver: mpsc::Receiver<LogReaderCommand>,
110}
111
112impl LogReader {
113 pub fn spawn(
114 logs_stdout: Option<PathBuf>,
115 logs_stderr: Option<PathBuf>,
116 logs_merged: Option<PathBuf>,
117 ) -> LogReaderHandle {
118 let (sender, receiver) = mpsc::channel(32);
119 let mut reader = Self {
120 logs_stdout,
121 logs_stderr,
122 logs_merged,
123 receiver,
124 };
125 tokio::spawn(async move { reader.run().await });
126 LogReaderHandle::new(sender)
127 }
128
129 async fn run(&mut self) {
130 while let Some(command) = self.receiver.recv().await {
131 match command {
132 LogReaderCommand::GetLogs {
133 logs_query_type,
134 query,
135 responder,
136 } => {
137 let result = self.get_logs(logs_query_type, query).await;
138 let _ = responder.send(result);
139 }
140 LogReaderCommand::CheckSettings { query, responder } => {
141 let response = self.is_log_type_set(&query);
142 let _ = responder.send(response);
143 }
144 LogReaderCommand::Abort => break,
145 }
146 }
147 }
148
149 async fn get_logs(
150 &self,
151 logs_query_type: LogsQueryType,
152 query: LogsQuery,
153 ) -> Result<Vec<String>, LogReaderError> {
154 let log_file_path = self
155 .get_log_file_path(&logs_query_type)
156 .ok_or(LogReaderError::LogTypeWasNotConfigured(logs_query_type))?;
157
158 let file = File::open(log_file_path).await?;
159 let reader = BufReader::new(file);
160
161 if let Some(limit) = query.limit {
162 Self::read_logs(reader, query.offset, limit).await
163 } else {
164 Self::read_logs_to_end(reader, query.offset).await
165 }
166 .map_err(Into::into)
167 }
168
169 fn get_log_file_path(&self, logs_query_type: &LogsQueryType) -> Option<&PathBuf> {
170 match logs_query_type {
171 LogsQueryType::Stdout => self.logs_stdout.as_ref(),
172 LogsQueryType::Stderr => self.logs_stderr.as_ref(),
173 }
174 .or(self.logs_merged.as_ref())
175 }
176
177 async fn read_logs_to_end(reader: BufReader<File>, offset: usize) -> io::Result<Vec<String>> {
178 let logs = LinesStream::new(reader.lines())
179 .skip(offset)
180 .then(|result| async move { result.ok() })
181 .filter_map(|line_opt| line_opt)
182 .collect()
183 .await;
184 Ok(logs)
185 }
186
187 async fn read_logs(
188 reader: BufReader<File>,
189 offset: usize,
190 limit: usize,
191 ) -> io::Result<Vec<String>> {
192 let logs = LinesStream::new(reader.lines())
193 .skip(offset)
194 .take(limit)
195 .then(|result| async move { result.ok() })
196 .filter_map(|line_opt| line_opt)
197 .collect()
198 .await;
199 Ok(logs)
200 }
201
202 fn is_log_type_set(&self, query: &LogSettingsQuery) -> bool {
203 match query {
204 LogSettingsQuery::Stdout => self.logs_stdout.as_ref(),
205 LogSettingsQuery::Stderr => self.logs_stderr.as_ref(),
206 LogSettingsQuery::Merged => self.logs_merged.as_ref(),
207 }
208 .is_some()
209 }
210}
211
212#[derive(Debug)]
213pub(crate) struct LogReaderHandle {
214 sender: mpsc::Sender<LogReaderCommand>,
215}
216
217impl LogReaderHandle {
218 fn new(sender: mpsc::Sender<LogReaderCommand>) -> Self {
219 Self { sender }
220 }
221
222 pub async fn read_logs(
223 &self,
224 logs_query_type: LogsQueryType,
225 query: LogsQuery,
226 ) -> Result<Vec<String>, LogReaderError> {
227 let (responder, receiver) = oneshot::channel();
228 let cmd = LogReaderCommand::GetLogs {
229 logs_query_type,
230 query,
231 responder,
232 };
233 let _ = self.sender.send(cmd).await;
234 match receiver.await {
235 Ok(result) => result,
236 Err(_) => Ok(vec![]), }
238 }
239
240 pub async fn abort(&self) {
241 let _ = self.sender.send(LogReaderCommand::Abort).await;
242 }
243
244 #[allow(dead_code)] pub(crate) async fn check_logs_settings(
246 &self,
247 query: LogSettingsQuery,
248 ) -> Result<bool, oneshot::error::RecvError> {
249 let (responder, receiver) = oneshot::channel();
250 let cmd: LogReaderCommand = LogReaderCommand::CheckSettings { query, responder };
251 let _ = self.sender.send(cmd).await;
252 receiver.await
253 }
254}
255
256#[derive(thiserror::Error, Debug)]
257pub(crate) enum LogReaderError {
258 #[error("Log type was not configured for process with id: {0:?}")]
259 LogTypeWasNotConfigured(LogsQueryType),
260 #[error(transparent)]
261 UnExpectedIoError(#[from] io::Error),
262}