1use std::collections::HashMap;
2use std::fmt;
3use std::pin::Pin;
4use std::task::Poll;
5use std::time::Duration;
6
7use bytes::{Bytes, BytesMut};
8use chrono::NaiveDateTime;
9use futures::future::BoxFuture;
10use futures::stream::BoxStream;
11use futures::{prelude::*, ready};
12use reqwest::{Response, StatusCode, Url};
13use serde::{Deserialize, Deserializer};
14use thiserror::Error;
15
16use crate::Lava;
17
18#[derive(Debug)]
19pub struct JobLogBuilder<'a> {
20 lava: &'a Lava,
21 id: i64,
22 start: u64,
23 end: u64,
24}
25
26impl<'a> JobLogBuilder<'a> {
27 pub fn new(lava: &'a Lava, id: i64) -> Self {
28 Self {
29 lava,
30 id,
31 start: 0,
32 end: 0,
33 }
34 }
35
36 pub fn start(mut self, start: u64) -> Self {
37 self.start = start;
38 self
39 }
40
41 pub fn end(mut self, end: u64) -> Self {
42 self.end = end;
43 self
44 }
45
46 pub fn raw(self) -> JobLogRaw<'a> {
47 JobLogRaw::new(self.lava, self.id, self.start, self.end)
48 }
49
50 pub fn log(self) -> JobLog<'a> {
51 JobLog::new(self.lava, self.id, self.start, self.end)
52 }
53}
54
55#[derive(Debug, Error)]
56pub enum JobLogError {
57 #[error("Request failed: {0}")]
58 RequestError(#[from] reqwest::Error),
59 #[error("Parse error: {0} - {1}")]
60 ParseError(String, serde_norway::Error),
61 #[error("No data available")]
62 NoData,
63}
64
65enum LogRequest {
66 Initial,
67 Request(BoxFuture<'static, reqwest::Result<Response>>),
68 Stream(BoxStream<'static, reqwest::Result<Bytes>>),
69 Done,
70}
71
72impl fmt::Debug for LogRequest {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 let fmt = match self {
75 LogRequest::Initial => "Initial",
76 LogRequest::Request(_) => "Request",
77 LogRequest::Stream(_) => "Stream",
78 LogRequest::Done => "Done",
79 };
80 f.write_str(fmt)
81 }
82}
83
84#[derive(Debug)]
85pub struct JobLogRaw<'a> {
86 lava: &'a Lava,
87 id: i64,
88 start: u64,
89 end: u64,
90 state: LogRequest,
91}
92
93impl<'a> JobLogRaw<'a> {
94 fn new(lava: &'a Lava, id: i64, start: u64, end: u64) -> Self {
95 Self {
96 lava,
97 id,
98 start,
99 end,
100 state: LogRequest::Initial,
101 }
102 }
103
104 fn url(&self) -> Url {
105 let mut url = self.lava.base.clone();
106 url.path_segments_mut()
107 .unwrap()
108 .pop_if_empty()
109 .push("jobs")
110 .push(&self.id.to_string())
111 .push("logs")
112 .push("");
113
114 if self.start != 0 {
115 url.query_pairs_mut()
116 .append_pair("start", &self.start.to_string());
117 }
118
119 if self.end != 0 {
120 url.query_pairs_mut()
121 .append_pair("end", &self.end.to_string());
122 }
123 url
124 }
125}
126
127impl Stream for JobLogRaw<'_> {
128 type Item = Result<Bytes, JobLogError>;
129
130 fn poll_next(
131 self: std::pin::Pin<&mut Self>,
132 cx: &mut std::task::Context<'_>,
133 ) -> std::task::Poll<Option<Self::Item>> {
134 let me = self.get_mut();
135 loop {
136 match me.state {
137 LogRequest::Initial => {
138 let u = me.url();
139 let r = me.lava.client.get(u).send();
140 me.state = LogRequest::Request(r.boxed());
141 }
142 LogRequest::Request(ref mut r) => match ready!(r.as_mut().poll(cx)) {
143 Ok(r) => match r.error_for_status() {
144 Ok(r) => me.state = LogRequest::Stream(r.bytes_stream().boxed()),
145 Err(e) => {
146 me.state = LogRequest::Done;
147 let e = match e.status() {
148 Some(StatusCode::NOT_FOUND) => JobLogError::NoData,
149 _ => e.into(),
150 };
151 return Poll::Ready(Some(Err(e)));
152 }
153 },
154 Err(e) => return Poll::Ready(Some(Err(e.into()))),
155 },
156 LogRequest::Stream(ref mut stream) => match ready!(stream.as_mut().poll_next(cx)) {
157 Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
158 Some(Ok(b)) => {
159 return Poll::Ready(Some(Ok(b)));
160 }
161 None => {
162 me.state = LogRequest::Done;
163 return Poll::Ready(None);
164 }
165 },
166 LogRequest::Done => return Poll::Ready(None),
167 }
168 }
169 }
170
171 fn size_hint(&self) -> (usize, Option<usize>) {
172 (0, None)
173 }
174}
175
176fn deserialize_duration<'de, D>(d: D) -> Result<Option<Duration>, D::Error>
177where
178 D: Deserializer<'de>,
179{
180 let duration = String::deserialize(d)?
181 .parse()
182 .map_err(serde::de::Error::custom)?;
183 Ok(Some(Duration::from_secs_f64(duration)))
184}
185
186#[derive(Debug, Clone, Deserialize)]
187pub struct JobResult {
188 pub case: String,
189 pub definition: String,
190 pub namespace: Option<String>,
191 pub level: Option<String>,
192 pub result: String,
193 #[serde(default, deserialize_with = "deserialize_duration")]
194 pub duration: Option<Duration>,
195 #[serde(default)]
196 pub extra: HashMap<String, serde_norway::Value>,
197}
198
199#[derive(Debug, Clone, Deserialize)]
200#[serde(untagged)]
201pub enum JobLogMsg {
202 Msg(String),
203 Msgs(Vec<String>),
204 Result(JobResult),
205}
206
207#[derive(Debug, Clone, Deserialize)]
208#[serde(rename_all = "lowercase")]
209pub enum JobLogLevel {
210 Debug,
211 Info,
212 Warning,
213 Error,
214 Results,
215 Target,
216 Input,
217 Feedback,
218 Exception,
219}
220
221#[derive(Debug, Clone, Deserialize)]
222pub struct JobLogEntry {
223 pub dt: NaiveDateTime,
224 pub lvl: JobLogLevel,
225 pub ns: Option<String>,
226 pub msg: JobLogMsg,
227}
228
229#[derive(Debug)]
230pub struct JobLog<'a> {
231 buf: Vec<Bytes>,
232 from_buf: bool,
233 raw: JobLogRaw<'a>,
234}
235
236impl<'a> JobLog<'a> {
237 fn new(lava: &'a Lava, id: i64, start: u64, end: u64) -> Self {
238 let raw = JobLogRaw::new(lava, id, start, end);
239 Self {
240 buf: Vec::new(),
241 from_buf: false,
242 raw,
243 }
244 }
245}
246
247impl Stream for JobLog<'_> {
248 type Item = Result<JobLogEntry, JobLogError>;
249
250 fn poll_next(
251 self: std::pin::Pin<&mut Self>,
252 cx: &mut std::task::Context<'_>,
253 ) -> Poll<Option<Self::Item>> {
254 let me = self.get_mut();
255 loop {
256 if me.from_buf {
257 let last = me.buf.last().unwrap();
258 if let Some(eol) = last.iter().position(|e| e == &b'\n') {
259 let line = if me.buf.len() == 1 {
260 if last.len() - 1 == eol {
261 me.from_buf = false;
262 me.buf.pop().unwrap()
263 } else {
264 let b = me.buf.get_mut(0).unwrap();
265 b.split_to(eol + 1)
266 }
267 } else {
268 let mut buf = BytesMut::new();
269 for b in me.buf.drain(0..me.buf.len() - 1) {
270 buf.extend_from_slice(b.as_ref());
271 }
272
273 let last = me.buf.last().unwrap();
274 if last.len() == eol {
275 me.from_buf = false;
276 buf.extend_from_slice(me.buf.pop().unwrap().as_ref());
277 } else {
278 let b = me.buf.get_mut(0).unwrap();
279 buf.extend_from_slice(b.split_to(eol + 1).as_ref());
280 }
281 buf.into()
282 };
283 let l = line.slice(1..);
284 let entry = serde_norway::from_slice(l.as_ref()).map_err(|e| {
285 let s = String::from_utf8_lossy(l.as_ref());
286 JobLogError::ParseError(s.into_owned(), e)
287 });
288 return Poll::Ready(Some(entry));
289 } else {
290 me.from_buf = false;
291 }
292 } else {
293 match ready!(Pin::new(&mut me.raw).poll_next(cx)) {
294 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
295 Some(Ok(b)) => {
296 me.from_buf = true;
297 me.buf.push(b);
298 }
299 None => return Poll::Ready(None),
300 }
301 }
302 }
303 }
304}