Skip to main content

harmont_cloud/
logs.rs

1//! Live job log streaming over SSE.
2//!
3//! The log stream is **not** part of the OpenAPI spec, so it is hand-written
4//! here. Flow: mint a build-scoped token via [`HarmontClient::log_token`],
5//! then [`HarmontClient::stream_job_logs`] yields [`LogEvent`]s per job until
6//! the terminal `done` event.
7
8use futures_core::Stream;
9use futures_util::StreamExt;
10use eventsource_stream::Eventsource;
11use uuid::Uuid;
12use crate::{HarmontClient, HarmontError, Result};
13
14/// stdout / stderr / meta, per the server's `stream_kind` (0/1/2).
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum StreamKind { Stdout, Stderr, Meta }
17
18impl StreamKind {
19    fn from_i64(n: i64) -> Self {
20        match n { 1 => StreamKind::Stderr, 2 => StreamKind::Meta, _ => StreamKind::Stdout }
21    }
22}
23
24/// One log chunk. `content` may hold multiple lines or a partial line —
25/// callers buffer and split on `\n` themselves.
26#[derive(Debug, Clone)]
27pub struct LogChunk {
28    pub seq: i64,
29    pub stream: StreamKind,
30    pub content: String,
31    pub ts_unix_ns: Option<i64>,
32}
33
34/// A decoded SSE event from the job log stream.
35#[derive(Debug, Clone)]
36pub enum LogEvent {
37    /// Replay of prior chunks, sent once on connect.
38    History(Vec<LogChunk>),
39    /// A live chunk.
40    Chunk(LogChunk),
41    /// Terminal: the job reached a terminal state; the stream is finished.
42    Done,
43}
44
45#[derive(serde::Deserialize)]
46struct RawChunk { seq: i64, stream_kind: i64, content: String, ts: Option<i64> }
47
48impl From<RawChunk> for LogChunk {
49    fn from(r: RawChunk) -> Self {
50        LogChunk { seq: r.seq, stream: StreamKind::from_i64(r.stream_kind),
51                   content: r.content, ts_unix_ns: r.ts }
52    }
53}
54
55/// Decode one SSE (event, data) pair. Returns `Ok(None)` for ignored events.
56pub(crate) fn decode_event(event: &str, data: &str) -> Result<Option<LogEvent>> {
57    match event {
58        "chunk" => {
59            let r: RawChunk = serde_json::from_str(data)
60                .map_err(|e| HarmontError::LogStream(e.to_string()))?;
61            Ok(Some(LogEvent::Chunk(r.into())))
62        }
63        "history" => {
64            #[derive(serde::Deserialize)]
65            struct H { chunks: Vec<RawChunk> }
66            let h: H = serde_json::from_str(data)
67                .map_err(|e| HarmontError::LogStream(e.to_string()))?;
68            Ok(Some(LogEvent::History(h.chunks.into_iter().map(Into::into).collect())))
69        }
70        "done" => Ok(Some(LogEvent::Done)),
71        _ => Ok(None),
72    }
73}
74
75/// A minted, build-scoped log token.
76#[derive(Debug, Clone, serde::Deserialize)]
77pub struct LogToken {
78    pub token: String,
79    pub expires_at: chrono::DateTime<chrono::Utc>,
80}
81
82impl HarmontClient {
83    /// Mint a build-scoped HMAC log token (authorizes every job in the build).
84    pub async fn log_token(&self, org: &str, pipeline: &str, number: i64) -> Result<LogToken> {
85        let url = format!(
86            "{}/api/v0/organizations/{org}/pipelines/{pipeline}/builds/{number}/log-token",
87            self.base
88        );
89        let resp = self.http.get(&url).send().await?;
90        self.parse_json(resp).await
91    }
92
93    /// Stream a single job's logs until the `done` event. `log_base` is the
94    /// host serving `/v0/jobs/{id}/logs` (the web edge, e.g. the API base);
95    /// `token` comes from [`Self::log_token`].
96    pub async fn stream_job_logs(
97        &self, log_base: &str, job_id: Uuid, token: &str,
98    ) -> Result<impl Stream<Item = Result<LogEvent>>> {
99        let url = format!("{log_base}/v0/jobs/{job_id}/logs?token={token}");
100        let resp = self.http
101            .get(&url)
102            .header(reqwest::header::ACCEPT, "text/event-stream")
103            .send()
104            .await?;
105        if resp.status() == reqwest::StatusCode::UNAUTHORIZED {
106            return Err(HarmontError::Unauthorized);
107        }
108        if !resp.status().is_success() {
109            return Err(HarmontError::LogStream(format!("status {}", resp.status())));
110        }
111        let stream = resp.bytes_stream().eventsource().filter_map(|item| async move {
112            match item {
113                Ok(ev) => decode_event(&ev.event, &ev.data).transpose(),
114                Err(e) => Some(Err(HarmontError::LogStream(e.to_string()))),
115            }
116        });
117        Ok(stream)
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124
125    #[test]
126    fn decodes_chunk_event() {
127        let data = r#"{"seq":7,"stream_kind":1,"content":"boom\n","ts":1234}"#;
128        let ev = decode_event("chunk", data).expect("some").expect("ok");
129        match ev {
130            LogEvent::Chunk(c) => {
131                assert_eq!(c.seq, 7);
132                assert_eq!(c.stream, StreamKind::Stderr);
133                assert_eq!(c.content, "boom\n");
134                assert_eq!(c.ts_unix_ns, Some(1234));
135            }
136            _ => panic!("expected chunk"),
137        }
138    }
139
140    #[test]
141    fn decodes_done_event() {
142        let ev = decode_event("done", "{}").expect("some").expect("ok");
143        assert!(matches!(ev, LogEvent::Done));
144    }
145
146    #[test]
147    fn ignores_unknown_event() {
148        assert!(decode_event("comment", "irrelevant").expect("ok").is_none());
149    }
150
151    #[test]
152    fn decodes_history_batch() {
153        let data = r#"{"chunks":[{"seq":0,"stream_kind":0,"content":"hi","ts":null}]}"#;
154        let ev = decode_event("history", data).expect("some").expect("ok");
155        match ev { LogEvent::History(v) => assert_eq!(v.len(), 1), _ => panic!("expected history") }
156    }
157}