Skip to main content

lava_api/
joblog.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::pin::Pin;
4use std::task::Poll;
5use std::time::Duration;
6
7use bytes::{Bytes, BytesMut};
8use chrono::NaiveDateTime;
9use futures::future::BoxFuture;
10use futures::stream::BoxStream;
11use futures::{prelude::*, ready};
12use reqwest::{Response, StatusCode, Url};
13use serde::{Deserialize, Deserializer};
14use thiserror::Error;
15
16use crate::Lava;
17
18#[derive(Debug)]
19pub struct JobLogBuilder<'a> {
20    lava: &'a Lava,
21    id: i64,
22    start: u64,
23    end: u64,
24}
25
26impl<'a> JobLogBuilder<'a> {
27    pub fn new(lava: &'a Lava, id: i64) -> Self {
28        Self {
29            lava,
30            id,
31            start: 0,
32            end: 0,
33        }
34    }
35
36    pub fn start(mut self, start: u64) -> Self {
37        self.start = start;
38        self
39    }
40
41    pub fn end(mut self, end: u64) -> Self {
42        self.end = end;
43        self
44    }
45
46    pub fn raw(self) -> JobLogRaw<'a> {
47        JobLogRaw::new(self.lava, self.id, self.start, self.end)
48    }
49
50    pub fn log(self) -> JobLog<'a> {
51        JobLog::new(self.lava, self.id, self.start, self.end)
52    }
53}
54
55#[derive(Debug, Error)]
56pub enum JobLogError {
57    #[error("Request failed: {0}")]
58    RequestError(#[from] reqwest::Error),
59    #[error("Parse error: {0} - {1}")]
60    ParseError(String, serde_norway::Error),
61    #[error("No data available")]
62    NoData,
63}
64
65enum LogRequest {
66    Initial,
67    Request(BoxFuture<'static, reqwest::Result<Response>>),
68    Stream(BoxStream<'static, reqwest::Result<Bytes>>),
69    Done,
70}
71
72impl fmt::Debug for LogRequest {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        let fmt = match self {
75            LogRequest::Initial => "Initial",
76            LogRequest::Request(_) => "Request",
77            LogRequest::Stream(_) => "Stream",
78            LogRequest::Done => "Done",
79        };
80        f.write_str(fmt)
81    }
82}
83
84#[derive(Debug)]
85pub struct JobLogRaw<'a> {
86    lava: &'a Lava,
87    id: i64,
88    start: u64,
89    end: u64,
90    state: LogRequest,
91}
92
93impl<'a> JobLogRaw<'a> {
94    fn new(lava: &'a Lava, id: i64, start: u64, end: u64) -> Self {
95        Self {
96            lava,
97            id,
98            start,
99            end,
100            state: LogRequest::Initial,
101        }
102    }
103
104    fn url(&self) -> Url {
105        let mut url = self.lava.base.clone();
106        url.path_segments_mut()
107            .unwrap()
108            .pop_if_empty()
109            .push("jobs")
110            .push(&self.id.to_string())
111            .push("logs")
112            .push("");
113
114        if self.start != 0 {
115            url.query_pairs_mut()
116                .append_pair("start", &self.start.to_string());
117        }
118
119        if self.end != 0 {
120            url.query_pairs_mut()
121                .append_pair("end", &self.end.to_string());
122        }
123        url
124    }
125}
126
127impl Stream for JobLogRaw<'_> {
128    type Item = Result<Bytes, JobLogError>;
129
130    fn poll_next(
131        self: std::pin::Pin<&mut Self>,
132        cx: &mut std::task::Context<'_>,
133    ) -> std::task::Poll<Option<Self::Item>> {
134        let me = self.get_mut();
135        loop {
136            match me.state {
137                LogRequest::Initial => {
138                    let u = me.url();
139                    let r = me.lava.client.get(u).send();
140                    me.state = LogRequest::Request(r.boxed());
141                }
142                LogRequest::Request(ref mut r) => match ready!(r.as_mut().poll(cx)) {
143                    Ok(r) => match r.error_for_status() {
144                        Ok(r) => me.state = LogRequest::Stream(r.bytes_stream().boxed()),
145                        Err(e) => {
146                            me.state = LogRequest::Done;
147                            let e = match e.status() {
148                                Some(StatusCode::NOT_FOUND) => JobLogError::NoData,
149                                _ => e.into(),
150                            };
151                            return Poll::Ready(Some(Err(e)));
152                        }
153                    },
154                    Err(e) => return Poll::Ready(Some(Err(e.into()))),
155                },
156                LogRequest::Stream(ref mut stream) => match ready!(stream.as_mut().poll_next(cx)) {
157                    Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
158                    Some(Ok(b)) => {
159                        return Poll::Ready(Some(Ok(b)));
160                    }
161                    None => {
162                        me.state = LogRequest::Done;
163                        return Poll::Ready(None);
164                    }
165                },
166                LogRequest::Done => return Poll::Ready(None),
167            }
168        }
169    }
170
171    fn size_hint(&self) -> (usize, Option<usize>) {
172        (0, None)
173    }
174}
175
176fn deserialize_duration<'de, D>(d: D) -> Result<Option<Duration>, D::Error>
177where
178    D: Deserializer<'de>,
179{
180    let duration = String::deserialize(d)?
181        .parse()
182        .map_err(serde::de::Error::custom)?;
183    Ok(Some(Duration::from_secs_f64(duration)))
184}
185
186#[derive(Debug, Clone, Deserialize)]
187pub struct JobResult {
188    pub case: String,
189    pub definition: String,
190    pub namespace: Option<String>,
191    pub level: Option<String>,
192    pub result: String,
193    #[serde(default, deserialize_with = "deserialize_duration")]
194    pub duration: Option<Duration>,
195    #[serde(default)]
196    pub extra: HashMap<String, serde_norway::Value>,
197}
198
199#[derive(Debug, Clone, Deserialize)]
200#[serde(untagged)]
201pub enum JobLogMsg {
202    Msg(String),
203    Msgs(Vec<String>),
204    Result(JobResult),
205}
206
207#[derive(Debug, Clone, Deserialize)]
208#[serde(rename_all = "lowercase")]
209pub enum JobLogLevel {
210    Debug,
211    Info,
212    Warning,
213    Error,
214    Results,
215    Target,
216    Input,
217    Feedback,
218    Exception,
219}
220
221#[derive(Debug, Clone, Deserialize)]
222pub struct JobLogEntry {
223    pub dt: NaiveDateTime,
224    pub lvl: JobLogLevel,
225    pub ns: Option<String>,
226    pub msg: JobLogMsg,
227}
228
229#[derive(Debug)]
230pub struct JobLog<'a> {
231    buf: Vec<Bytes>,
232    from_buf: bool,
233    raw: JobLogRaw<'a>,
234}
235
236impl<'a> JobLog<'a> {
237    fn new(lava: &'a Lava, id: i64, start: u64, end: u64) -> Self {
238        let raw = JobLogRaw::new(lava, id, start, end);
239        Self {
240            buf: Vec::new(),
241            from_buf: false,
242            raw,
243        }
244    }
245}
246
247impl Stream for JobLog<'_> {
248    type Item = Result<JobLogEntry, JobLogError>;
249
250    fn poll_next(
251        self: std::pin::Pin<&mut Self>,
252        cx: &mut std::task::Context<'_>,
253    ) -> Poll<Option<Self::Item>> {
254        let me = self.get_mut();
255        loop {
256            if me.from_buf {
257                let last = me.buf.last().unwrap();
258                if let Some(eol) = last.iter().position(|e| e == &b'\n') {
259                    let line = if me.buf.len() == 1 {
260                        if last.len() - 1 == eol {
261                            me.from_buf = false;
262                            me.buf.pop().unwrap()
263                        } else {
264                            let b = me.buf.get_mut(0).unwrap();
265                            b.split_to(eol + 1)
266                        }
267                    } else {
268                        let mut buf = BytesMut::new();
269                        for b in me.buf.drain(0..me.buf.len() - 1) {
270                            buf.extend_from_slice(b.as_ref());
271                        }
272
273                        let last = me.buf.last().unwrap();
274                        if last.len() == eol {
275                            me.from_buf = false;
276                            buf.extend_from_slice(me.buf.pop().unwrap().as_ref());
277                        } else {
278                            let b = me.buf.get_mut(0).unwrap();
279                            buf.extend_from_slice(b.split_to(eol + 1).as_ref());
280                        }
281                        buf.into()
282                    };
283                    let l = line.slice(1..);
284                    let entry = serde_norway::from_slice(l.as_ref()).map_err(|e| {
285                        let s = String::from_utf8_lossy(l.as_ref());
286                        JobLogError::ParseError(s.into_owned(), e)
287                    });
288                    return Poll::Ready(Some(entry));
289                } else {
290                    me.from_buf = false;
291                }
292            } else {
293                match ready!(Pin::new(&mut me.raw).poll_next(cx)) {
294                    Some(Err(e)) => return Poll::Ready(Some(Err(e))),
295                    Some(Ok(b)) => {
296                        me.from_buf = true;
297                        me.buf.push(b);
298                    }
299                    None => return Poll::Ready(None),
300                }
301            }
302        }
303    }
304}