1use futures_core::Stream;
9use futures_util::StreamExt;
10use eventsource_stream::Eventsource;
11use uuid::Uuid;
12use crate::{HarmontClient, HarmontError, Result};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum StreamKind { Stdout, Stderr, Meta }
17
18impl StreamKind {
19 fn from_i64(n: i64) -> Self {
20 match n { 1 => StreamKind::Stderr, 2 => StreamKind::Meta, _ => StreamKind::Stdout }
21 }
22}
23
24#[derive(Debug, Clone)]
27pub struct LogChunk {
28 pub seq: i64,
29 pub stream: StreamKind,
30 pub content: String,
31 pub ts_unix_ns: Option<i64>,
32}
33
34#[derive(Debug, Clone)]
36pub enum LogEvent {
37 History(Vec<LogChunk>),
39 Chunk(LogChunk),
41 Done,
43}
44
45#[derive(serde::Deserialize)]
46struct RawChunk { seq: i64, stream_kind: i64, content: String, ts: Option<i64> }
47
48impl From<RawChunk> for LogChunk {
49 fn from(r: RawChunk) -> Self {
50 LogChunk { seq: r.seq, stream: StreamKind::from_i64(r.stream_kind),
51 content: r.content, ts_unix_ns: r.ts }
52 }
53}
54
55pub(crate) fn decode_event(event: &str, data: &str) -> Result<Option<LogEvent>> {
57 match event {
58 "chunk" => {
59 let r: RawChunk = serde_json::from_str(data)
60 .map_err(|e| HarmontError::LogStream(e.to_string()))?;
61 Ok(Some(LogEvent::Chunk(r.into())))
62 }
63 "history" => {
64 #[derive(serde::Deserialize)]
65 struct H { chunks: Vec<RawChunk> }
66 let h: H = serde_json::from_str(data)
67 .map_err(|e| HarmontError::LogStream(e.to_string()))?;
68 Ok(Some(LogEvent::History(h.chunks.into_iter().map(Into::into).collect())))
69 }
70 "done" => Ok(Some(LogEvent::Done)),
71 _ => Ok(None),
72 }
73}
74
75#[derive(Debug, Clone, serde::Deserialize)]
77pub struct LogToken {
78 pub token: String,
79 pub expires_at: chrono::DateTime<chrono::Utc>,
80}
81
82impl HarmontClient {
83 pub async fn log_token(&self, org: &str, pipeline: &str, number: i64) -> Result<LogToken> {
85 let url = format!(
86 "{}/api/v0/organizations/{org}/pipelines/{pipeline}/builds/{number}/log-token",
87 self.base
88 );
89 let resp = self.http.get(&url).send().await?;
90 self.parse_json(resp).await
91 }
92
93 pub async fn stream_job_logs(
97 &self, log_base: &str, job_id: Uuid, token: &str,
98 ) -> Result<impl Stream<Item = Result<LogEvent>>> {
99 let url = format!("{log_base}/v0/jobs/{job_id}/logs?token={token}");
100 let resp = self.http
101 .get(&url)
102 .header(reqwest::header::ACCEPT, "text/event-stream")
103 .send()
104 .await?;
105 if resp.status() == reqwest::StatusCode::UNAUTHORIZED {
106 return Err(HarmontError::Unauthorized);
107 }
108 if !resp.status().is_success() {
109 return Err(HarmontError::LogStream(format!("status {}", resp.status())));
110 }
111 let stream = resp.bytes_stream().eventsource().filter_map(|item| async move {
112 match item {
113 Ok(ev) => decode_event(&ev.event, &ev.data).transpose(),
114 Err(e) => Some(Err(HarmontError::LogStream(e.to_string()))),
115 }
116 });
117 Ok(stream)
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use super::*;
124
125 #[test]
126 fn decodes_chunk_event() {
127 let data = r#"{"seq":7,"stream_kind":1,"content":"boom\n","ts":1234}"#;
128 let ev = decode_event("chunk", data).expect("some").expect("ok");
129 match ev {
130 LogEvent::Chunk(c) => {
131 assert_eq!(c.seq, 7);
132 assert_eq!(c.stream, StreamKind::Stderr);
133 assert_eq!(c.content, "boom\n");
134 assert_eq!(c.ts_unix_ns, Some(1234));
135 }
136 _ => panic!("expected chunk"),
137 }
138 }
139
140 #[test]
141 fn decodes_done_event() {
142 let ev = decode_event("done", "{}").expect("some").expect("ok");
143 assert!(matches!(ev, LogEvent::Done));
144 }
145
146 #[test]
147 fn ignores_unknown_event() {
148 assert!(decode_event("comment", "irrelevant").expect("ok").is_none());
149 }
150
151 #[test]
152 fn decodes_history_batch() {
153 let data = r#"{"chunks":[{"seq":0,"stream_kind":0,"content":"hi","ts":null}]}"#;
154 let ev = decode_event("history", data).expect("some").expect("ok");
155 match ev { LogEvent::History(v) => assert_eq!(v.len(), 1), _ => panic!("expected history") }
156 }
157}