orbit_client/
lib.rs

1#![warn(clippy::all, clippy::pedantic, clippy::nursery)]
2
3use async_fn_stream::try_fn_stream;
4use futures::{stream::StreamExt, Stream};
5use orbit_types::{ErrorResponse, Progress};
6use reqwest::{header, Response, StatusCode};
7use reqwest_eventsource::{Event, RequestBuilderExt};
8use url::Url;
9
10pub struct Client {
11	base_url: Url,
12	token: String,
13	client: reqwest::Client,
14}
15
16#[derive(Debug, thiserror::Error)]
17pub enum Error {
18	#[error(transparent)]
19	Stream(#[from] reqwest_eventsource::Error),
20
21	#[error("{0}")]
22	Transport(#[from] reqwest::Error),
23
24	#[error("Invalid authentication token")]
25	Unauthorized,
26
27	#[error("Could not find the requested site")]
28	SiteNotFound,
29
30	#[error("The server returned an invalid response.")]
31	InvalidResponse(StatusCode, Response),
32
33	#[error("The server returned an invalid event: {0}")]
34	InvalidEvent(String),
35
36	#[error("Could not decode the event data")]
37	Decoding(#[from] serde_json::Error),
38}
39
40impl Client {
41	/// Create a new client.
42	#[must_use]
43	pub fn new(base_url: Url, token: String) -> Self {
44		Self {
45			token,
46			base_url,
47			client: reqwest::Client::new(),
48		}
49	}
50
51	/// Deploy a site.
52	#[allow(clippy::missing_panics_doc)]
53	pub fn deploy(
54		&self,
55		name: &str,
56		r#ref: Option<&str>,
57	) -> impl Stream<Item = Result<Result<Progress, orbit_types::Error>, Error>> {
58		let mut stream = self
59			.client
60			.post(
61				self.base_url
62					.join(&format!("/sites/{name}/deploy"))
63					.unwrap(),
64			)
65			.query(&[("ref", r#ref)])
66			.header(header::AUTHORIZATION, format!("Bearer {}", self.token))
67			.eventsource()
68			.unwrap();
69
70		try_fn_stream(|emitter| async move {
71			while let Some(event) = stream.next().await {
72				let event = match event {
73					Ok(Event::Open) => continue,
74					Ok(Event::Message(message)) => message,
75					Err(reqwest_eventsource::Error::InvalidStatusCode(status_code, response)) => {
76						match status_code {
77							StatusCode::NOT_FOUND => return Err(Error::SiteNotFound),
78							StatusCode::UNAUTHORIZED => return Err(Error::Unauthorized),
79							_ => return Err(Error::InvalidResponse(status_code, response)),
80						}
81					},
82					Err(reqwest_eventsource::Error::StreamEnded) => return Ok(()),
83					Err(reqwest_eventsource::Error::Transport(err)) => return Err(err.into()),
84					Err(err) => return Err(err.into()),
85				};
86
87				let response = match event.id.as_ref() {
88					"log" => Ok(Progress::Log(serde_json::from_str(&event.data)?)),
89					"stage" => Ok(Progress::Stage(serde_json::from_str(&event.data)?)),
90					"error" => Err(serde_json::from_str::<ErrorResponse>(&event.data)?.error),
91					_ => return Err(Error::InvalidEvent(format!("{}: {}", event.id, event.data))),
92				};
93
94				emitter.emit(response).await;
95			}
96
97			unreachable!("The stream should not end without a StreamEnded error");
98		})
99	}
100}