1#![warn(clippy::all, clippy::pedantic, clippy::nursery)]
2
3use async_fn_stream::try_fn_stream;
4use futures::{stream::StreamExt, Stream};
5use orbit_types::{ErrorResponse, Progress};
6use reqwest::{header, Response, StatusCode};
7use reqwest_eventsource::{Event, RequestBuilderExt};
8use url::Url;
9
10pub struct Client {
11 base_url: Url,
12 token: String,
13 client: reqwest::Client,
14}
15
16#[derive(Debug, thiserror::Error)]
17pub enum Error {
18 #[error(transparent)]
19 Stream(#[from] reqwest_eventsource::Error),
20
21 #[error("{0}")]
22 Transport(#[from] reqwest::Error),
23
24 #[error("Invalid authentication token")]
25 Unauthorized,
26
27 #[error("Could not find the requested site")]
28 SiteNotFound,
29
30 #[error("The server returned an invalid response.")]
31 InvalidResponse(StatusCode, Response),
32
33 #[error("The server returned an invalid event: {0}")]
34 InvalidEvent(String),
35
36 #[error("Could not decode the event data")]
37 Decoding(#[from] serde_json::Error),
38}
39
40impl Client {
41 #[must_use]
43 pub fn new(base_url: Url, token: String) -> Self {
44 Self {
45 token,
46 base_url,
47 client: reqwest::Client::new(),
48 }
49 }
50
51 #[allow(clippy::missing_panics_doc)]
53 pub fn deploy(
54 &self,
55 name: &str,
56 r#ref: Option<&str>,
57 ) -> impl Stream<Item = Result<Result<Progress, orbit_types::Error>, Error>> {
58 let mut stream = self
59 .client
60 .post(
61 self.base_url
62 .join(&format!("/sites/{name}/deploy"))
63 .unwrap(),
64 )
65 .query(&[("ref", r#ref)])
66 .header(header::AUTHORIZATION, format!("Bearer {}", self.token))
67 .eventsource()
68 .unwrap();
69
70 try_fn_stream(|emitter| async move {
71 while let Some(event) = stream.next().await {
72 let event = match event {
73 Ok(Event::Open) => continue,
74 Ok(Event::Message(message)) => message,
75 Err(reqwest_eventsource::Error::InvalidStatusCode(status_code, response)) => {
76 match status_code {
77 StatusCode::NOT_FOUND => return Err(Error::SiteNotFound),
78 StatusCode::UNAUTHORIZED => return Err(Error::Unauthorized),
79 _ => return Err(Error::InvalidResponse(status_code, response)),
80 }
81 },
82 Err(reqwest_eventsource::Error::StreamEnded) => return Ok(()),
83 Err(reqwest_eventsource::Error::Transport(err)) => return Err(err.into()),
84 Err(err) => return Err(err.into()),
85 };
86
87 let response = match event.id.as_ref() {
88 "log" => Ok(Progress::Log(serde_json::from_str(&event.data)?)),
89 "stage" => Ok(Progress::Stage(serde_json::from_str(&event.data)?)),
90 "error" => Err(serde_json::from_str::<ErrorResponse>(&event.data)?.error),
91 _ => return Err(Error::InvalidEvent(format!("{}: {}", event.id, event.data))),
92 };
93
94 emitter.emit(response).await;
95 }
96
97 unreachable!("The stream should not end without a StreamEnded error");
98 })
99 }
100}