#![cfg_attr(docsrs, feature(doc_cfg))]
use std::{
borrow::Cow, collections::BTreeMap, future::Future, marker::PhantomData, pin::Pin, task::Poll,
};
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Deserializer};
use soft_assert::*;
use url::Url;
use zeroize::Zeroize;
pub struct Forgejo {
url: Url,
client: Client,
}
mod generated;
#[cfg(feature = "sync")]
pub mod sync;
#[derive(thiserror::Error, Debug)]
pub enum ForgejoError {
#[error("url must have a host")]
HostRequired,
#[error("scheme must be http or https")]
HttpRequired,
#[error(transparent)]
ReqwestError(#[from] reqwest::Error),
#[error("API key should be ascii")]
KeyNotAscii,
#[error("the response from forgejo was not properly structured")]
BadStructure(#[from] StructureError),
#[error("unexpected status code {} {}", .0.as_u16(), .0.canonical_reason().unwrap_or(""))]
UnexpectedStatusCode(StatusCode),
#[error(transparent)]
ApiError(#[from] ApiError),
#[error("the provided authorization was too long to accept")]
AuthTooLong,
}
#[derive(thiserror::Error, Debug)]
pub enum StructureError {
#[error("{e}")]
Serde {
e: serde_json::Error,
contents: bytes::Bytes,
},
#[error(transparent)]
Utf8(#[from] std::str::Utf8Error),
#[error("failed to find header `{0}`")]
HeaderMissing(&'static str),
#[error("header was not ascii")]
HeaderNotAscii,
#[error("failed to parse header")]
HeaderParseFailed,
#[error("nothing was returned when a value was expected")]
EmptyResponse,
}
impl From<std::str::Utf8Error> for ForgejoError {
fn from(error: std::str::Utf8Error) -> Self {
Self::BadStructure(StructureError::Utf8(error))
}
}
#[derive(thiserror::Error, Debug)]
pub struct ApiError {
pub message: Option<String>,
pub kind: ApiErrorKind,
}
impl ApiError {
fn new(message: Option<String>, kind: ApiErrorKind) -> Self {
Self { message, kind }
}
pub fn message(&self) -> Option<&str> {
self.message.as_deref()
}
pub fn error_kind(&self) -> &ApiErrorKind {
&self.kind
}
}
impl From<ApiErrorKind> for ApiError {
fn from(kind: ApiErrorKind) -> Self {
Self {
message: None,
kind,
}
}
}
impl std::fmt::Display for ApiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.message {
Some(message) => write!(f, "{}: {message}", self.kind),
None => write!(f, "{}", self.kind),
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum ApiErrorKind {
#[error("api error")]
Generic,
#[error("access denied")]
Forbidden,
#[error("invalid topics")]
InvalidTopics { invalid_topics: Option<Vec<String>> },
#[error("not found")]
NotFound { errors: Option<Vec<String>> },
#[error("repo archived")]
RepoArchived,
#[error("unauthorized")]
Unauthorized,
#[error("validation failed")]
ValidationFailed,
#[error("status code {0}")]
Other(reqwest::StatusCode),
}
impl From<structs::APIError> for ApiError {
fn from(value: structs::APIError) -> Self {
Self::new(value.message, ApiErrorKind::Generic)
}
}
impl From<structs::APIForbiddenError> for ApiError {
fn from(value: structs::APIForbiddenError) -> Self {
Self::new(value.message, ApiErrorKind::Forbidden)
}
}
impl From<structs::APIInvalidTopicsError> for ApiError {
fn from(value: structs::APIInvalidTopicsError) -> Self {
Self::new(
value.message,
ApiErrorKind::InvalidTopics {
invalid_topics: value.invalid_topics,
},
)
}
}
impl From<structs::APINotFound> for ApiError {
fn from(value: structs::APINotFound) -> Self {
Self::new(
value.message,
ApiErrorKind::NotFound {
errors: value.errors,
},
)
}
}
impl From<structs::APIRepoArchivedError> for ApiError {
fn from(value: structs::APIRepoArchivedError) -> Self {
Self::new(value.message, ApiErrorKind::RepoArchived)
}
}
impl From<structs::APIUnauthorizedError> for ApiError {
fn from(value: structs::APIUnauthorizedError) -> Self {
Self::new(value.message, ApiErrorKind::Unauthorized)
}
}
impl From<structs::APIValidationError> for ApiError {
fn from(value: structs::APIValidationError) -> Self {
Self::new(value.message, ApiErrorKind::ValidationFailed)
}
}
impl From<reqwest::StatusCode> for ApiError {
fn from(value: reqwest::StatusCode) -> Self {
match value {
reqwest::StatusCode::NOT_FOUND => ApiErrorKind::NotFound { errors: None },
reqwest::StatusCode::FORBIDDEN => ApiErrorKind::Forbidden,
reqwest::StatusCode::UNAUTHORIZED => ApiErrorKind::Unauthorized,
_ => ApiErrorKind::Other(value),
}
.into()
}
}
impl From<OAuthError> for ApiError {
fn from(value: OAuthError) -> Self {
Self::new(Some(value.error_description), ApiErrorKind::Generic)
}
}
pub enum Auth<'a> {
Token(&'a str),
OAuth2(&'a str),
Password {
username: &'a str,
password: &'a str,
mfa: Option<&'a str>,
},
None,
}
impl Auth<'_> {
fn to_headers(&self) -> Result<reqwest::header::HeaderMap, ForgejoError> {
let mut headers = reqwest::header::HeaderMap::new();
match self {
Auth::Token(token) => {
let mut header: reqwest::header::HeaderValue = format!("token {token}")
.try_into()
.map_err(|_| ForgejoError::KeyNotAscii)?;
header.set_sensitive(true);
headers.insert("Authorization", header);
}
Auth::Password {
username,
password,
mfa,
} => {
let unencoded_len = username.len() + password.len() + 1;
let unpadded_len = unencoded_len
.checked_mul(4)
.ok_or(ForgejoError::AuthTooLong)?
.div_ceil(3);
let len = unpadded_len.div_ceil(4) * 4;
let mut bytes = vec![0; len];
let mut encoder = base64ct::Encoder::<base64ct::Base64>::new(&mut bytes).unwrap();
encoder.encode(username.as_bytes()).unwrap();
encoder.encode(b":").unwrap();
encoder.encode(password.as_bytes()).unwrap();
let b64 = encoder.finish().unwrap();
let mut header: reqwest::header::HeaderValue =
format!("Basic {b64}").try_into().unwrap(); header.set_sensitive(true);
headers.insert("Authorization", header);
bytes.zeroize();
if let Some(mfa) = mfa {
let mut key_header: reqwest::header::HeaderValue =
(*mfa).try_into().map_err(|_| ForgejoError::KeyNotAscii)?;
key_header.set_sensitive(true);
headers.insert("X-FORGEJO-OTP", key_header);
}
}
Auth::OAuth2(token) => {
let mut header: reqwest::header::HeaderValue = format!("Bearer {token}")
.try_into()
.map_err(|_| ForgejoError::KeyNotAscii)?;
header.set_sensitive(true);
headers.insert("Authorization", header);
}
Auth::None => (),
}
Ok(headers)
}
}
impl Forgejo {
pub fn new(auth: Auth, url: Url) -> Result<Self, ForgejoError> {
Self::with_user_agent(auth, url, "forgejo-api-rs")
}
pub fn with_user_agent(auth: Auth, url: Url, user_agent: &str) -> Result<Self, ForgejoError> {
soft_assert!(
matches!(url.scheme(), "http" | "https"),
Err(ForgejoError::HttpRequired)
);
let client = Client::builder()
.user_agent(user_agent)
.default_headers(auth.to_headers()?)
.build()?;
Ok(Self { url, client })
}
pub async fn download_release_attachment(
&self,
owner: &str,
repo: &str,
release: i64,
attach: i64,
) -> Result<bytes::Bytes, ForgejoError> {
let release = self
.repo_get_release_attachment(owner, repo, release, attach)
.await?;
let mut url = self.url.clone();
url.path_segments_mut()
.unwrap()
.pop_if_empty()
.extend(["attachments", &release.uuid.unwrap().to_string()]);
let request = self.client.get(url).build()?;
Ok(self.client.execute(request).await?.bytes().await?)
}
pub async fn oauth_get_access_token(
&self,
body: structs::OAuthTokenRequest<'_>,
) -> Result<structs::OAuthToken, ForgejoError> {
let url = self.url.join("login/oauth/access_token").unwrap();
let request = self.client.post(url).json(&body).build()?;
let response = self.client.execute(request).await?;
match response.status() {
reqwest::StatusCode::OK => Ok(response.json().await?),
status if status.is_client_error() => {
let err = response.json::<OAuthError>().await?;
Err(ApiError::from(err).into())
}
_ => Err(ForgejoError::UnexpectedStatusCode(response.status())),
}
}
pub async fn send_request(&self, request: &RawRequest) -> Result<ApiResponse, ForgejoError> {
let mut url = self
.url
.join(&request.path)
.expect("url fail. bug in forgejo-api");
{
let mut query_pairs = url.query_pairs_mut();
if let Some(query) = &request.query {
query_pairs.extend_pairs(query.iter());
}
if let Some(page) = request.page {
query_pairs.append_pair("page", &format!("{page}"));
}
if let Some(limit) = request.limit {
query_pairs.append_pair("limit", &format!("{limit}"));
}
}
let mut reqwest_request = self.client.request(request.method.clone(), url);
reqwest_request = match &request.body {
RequestBody::Json(bytes) => reqwest_request
.body(bytes.clone())
.header(reqwest::header::CONTENT_TYPE, "application/json"),
RequestBody::Form(list) => {
let mut form = reqwest::multipart::Form::new();
for (k, v) in list {
form = form.part(
*k,
reqwest::multipart::Part::bytes(v.clone()).file_name("file"),
);
}
reqwest_request.multipart(form)
}
RequestBody::None => reqwest_request,
};
let mut reqwest_response = reqwest_request.send().await?;
let response = ApiResponse {
status_code: reqwest_response.status(),
headers: std::mem::take(reqwest_response.headers_mut()),
body: reqwest_response.bytes().await?,
};
Ok(response)
}
pub async fn hit_endpoint<E: Endpoint, R: FromResponse>(
&self,
endpoint: E,
) -> Result<R, ForgejoError> {
let (response, has_body) =
E::handle_error(self.send_request(&endpoint.make_request()).await?)?;
Ok(R::from_response(response, has_body)?)
}
}
#[derive(serde::Deserialize)]
struct OAuthError {
error_description: String,
}
pub mod structs {
pub use crate::generated::structs::*;
#[derive(serde::Serialize)]
#[serde(tag = "grant_type")]
pub enum OAuthTokenRequest<'a> {
#[serde(rename = "authorization_code")]
Confidential {
client_id: &'a str,
client_secret: &'a str,
code: &'a str,
redirect_uri: url::Url,
},
#[serde(rename = "authorization_code")]
Public {
client_id: &'a str,
code_verifier: &'a str,
code: &'a str,
redirect_uri: url::Url,
},
#[serde(rename = "refresh_token")]
Refresh {
refresh_token: &'a str,
client_id: &'a str,
client_secret: &'a str,
},
}
#[derive(serde::Deserialize)]
pub struct OAuthToken {
pub access_token: String,
pub refresh_token: String,
pub token_type: String,
pub expires_in: u32,
}
}
fn none_if_blank_url<'de, D: serde::Deserializer<'de>>(
deserializer: D,
) -> Result<Option<Url>, D::Error> {
use serde::de::{Error, Unexpected, Visitor};
use std::fmt;
struct EmptyUrlVisitor;
impl<'de> Visitor<'de> for EmptyUrlVisitor {
type Value = Option<Url>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("option")
}
#[inline]
fn visit_unit<E>(self) -> Result<Self::Value, E>
where
E: Error,
{
Ok(None)
}
#[inline]
fn visit_none<E>(self) -> Result<Self::Value, E>
where
E: Error,
{
Ok(None)
}
#[inline]
fn visit_some<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'de>,
{
let s: String = serde::Deserialize::deserialize(deserializer)?;
if s.is_empty() {
return Ok(None);
}
Url::parse(&s)
.map_err(|err| {
let err_s = format!("{}", err);
Error::invalid_value(Unexpected::Str(&s), &err_s.as_str())
})
.map(Some)
}
#[inline]
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
where
E: Error,
{
if s.is_empty() {
return Ok(None);
}
Url::parse(s)
.map_err(|err| {
let err_s = format!("{err}");
Error::invalid_value(Unexpected::Str(s), &err_s.as_str())
})
.map(Some)
}
}
deserializer.deserialize_option(EmptyUrlVisitor)
}
#[allow(dead_code)] fn deserialize_ssh_url<'de, D, DE>(deserializer: D) -> Result<Url, DE>
where
D: Deserializer<'de>,
DE: serde::de::Error,
{
let raw_url: String = String::deserialize(deserializer).map_err(DE::custom)?;
parse_ssh_url(&raw_url).map_err(DE::custom)
}
fn deserialize_optional_ssh_url<'de, D, DE>(deserializer: D) -> Result<Option<Url>, DE>
where
D: Deserializer<'de>,
DE: serde::de::Error,
{
let raw_url: Option<String> = Option::deserialize(deserializer).map_err(DE::custom)?;
raw_url
.as_ref()
.map(parse_ssh_url)
.map(|res| res.map_err(DE::custom))
.transpose()
.or(Ok(None))
}
fn requested_reviewers_ignore_null<'de, D, DE>(
deserializer: D,
) -> Result<Option<Vec<structs::User>>, DE>
where
D: Deserializer<'de>,
DE: serde::de::Error,
{
let list: Option<Vec<Option<structs::User>>> =
Option::deserialize(deserializer).map_err(DE::custom)?;
Ok(list.map(|list| list.into_iter().flatten().collect::<Vec<_>>()))
}
fn parse_ssh_url(raw_url: &String) -> Result<Url, url::ParseError> {
Url::parse(raw_url).or_else(|_| {
let url = format!("ssh://{url}", url = raw_url.replace(":", "/"));
Url::parse(url.as_str())
})
}
#[test]
fn ssh_url_deserialization() {
#[derive(serde::Deserialize)]
struct SshUrl {
#[serde(deserialize_with = "deserialize_ssh_url")]
url: url::Url,
}
let full_url = r#"{ "url": "ssh://git@codeberg.org/Cyborus/forgejo-api" }"#;
let ssh_url = r#"{ "url": "git@codeberg.org:Cyborus/forgejo-api" }"#;
let full_url_de =
serde_json::from_str::<SshUrl>(full_url).expect("failed to deserialize full url");
let ssh_url_de =
serde_json::from_str::<SshUrl>(ssh_url).expect("failed to deserialize ssh url");
let expected = "ssh://git@codeberg.org/Cyborus/forgejo-api";
assert_eq!(full_url_de.url.as_str(), expected);
assert_eq!(ssh_url_de.url.as_str(), expected);
#[derive(serde::Deserialize)]
struct OptSshUrl {
#[serde(deserialize_with = "deserialize_optional_ssh_url")]
url: Option<url::Url>,
}
let null_url = r#"{ "url": null }"#;
let full_url_de = serde_json::from_str::<OptSshUrl>(full_url)
.expect("failed to deserialize optional full url");
let ssh_url_de =
serde_json::from_str::<OptSshUrl>(ssh_url).expect("failed to deserialize optional ssh url");
let null_url_de =
serde_json::from_str::<OptSshUrl>(null_url).expect("failed to deserialize null url");
let expected = Some("ssh://git@codeberg.org/Cyborus/forgejo-api");
assert_eq!(full_url_de.url.as_ref().map(|u| u.as_ref()), expected);
assert_eq!(ssh_url_de.url.as_ref().map(|u| u.as_ref()), expected);
assert!(null_url_de.url.is_none());
}
impl From<structs::DefaultMergeStyle> for structs::MergePullRequestOptionDo {
fn from(value: structs::DefaultMergeStyle) -> Self {
match value {
structs::DefaultMergeStyle::Merge => structs::MergePullRequestOptionDo::Merge,
structs::DefaultMergeStyle::Rebase => structs::MergePullRequestOptionDo::Rebase,
structs::DefaultMergeStyle::RebaseMerge => {
structs::MergePullRequestOptionDo::RebaseMerge
}
structs::DefaultMergeStyle::Squash => structs::MergePullRequestOptionDo::Squash,
structs::DefaultMergeStyle::FastForwardOnly => {
structs::MergePullRequestOptionDo::FastForwardOnly
}
}
}
}
mod sealed {
pub trait Sealed {}
}
pub trait Endpoint: sealed::Sealed {
type Response: FromResponse;
fn make_request(self) -> RawRequest;
fn handle_error(response: ApiResponse) -> Result<(ApiResponse, bool), ForgejoError>;
}
#[derive(Clone)]
pub struct RawRequest {
method: reqwest::Method,
path: Cow<'static, str>,
query: Option<Vec<(&'static str, String)>>,
body: RequestBody,
page: Option<u32>,
limit: Option<u32>,
}
impl RawRequest {
pub(crate) fn wrap<E: Endpoint<Response = R>, R>(self, client: &Forgejo) -> Request<'_, E, R> {
Request {
inner: TypedRequest {
inner: self,
__endpoint: PhantomData,
__response: PhantomData,
},
client,
}
}
#[cfg(feature = "sync")]
pub(crate) fn wrap_sync<E: Endpoint<Response = R>, R>(
self,
client: &sync::Forgejo,
) -> sync::Request<'_, E, R> {
sync::Request {
inner: TypedRequest {
inner: self,
__endpoint: PhantomData,
__response: PhantomData,
},
client,
}
}
}
pub trait FromResponse {
fn from_response(response: ApiResponse, has_body: bool) -> Result<Self, StructureError>
where
Self: Sized;
}
#[macro_export]
macro_rules! impl_from_response {
($t:ty) => {
impl $crate::FromResponse for $t {
$crate::json_impl!();
}
};
}
#[macro_export]
#[doc(hidden)]
macro_rules! json_impl {
() => {
fn from_response(
response: $crate::ApiResponse,
has_body: bool,
) -> Result<Self, $crate::StructureError> {
soft_assert::soft_assert!(has_body, Err($crate::StructureError::EmptyResponse));
serde_json::from_slice(&response.body()).map_err(|e| $crate::StructureError::Serde {
e,
contents: response.body().clone(),
})
}
};
}
impl FromResponse for String {
fn from_response(
response: crate::ApiResponse,
has_body: bool,
) -> Result<Self, crate::StructureError> {
soft_assert::soft_assert!(has_body, Err(crate::StructureError::EmptyResponse));
Ok(std::str::from_utf8(&response.body)?.to_owned())
}
}
impl FromResponse for bytes::Bytes {
fn from_response(
response: crate::ApiResponse,
has_body: bool,
) -> Result<Self, crate::StructureError> {
soft_assert::soft_assert!(has_body, Err(crate::StructureError::EmptyResponse));
Ok(response.body.clone())
}
}
impl<T: FromResponse + serde::de::DeserializeOwned> FromResponse for Vec<T> {
json_impl!();
}
impl<K, V> FromResponse for BTreeMap<K, V>
where
BTreeMap<K, V>: serde::de::DeserializeOwned,
{
json_impl!();
}
impl FromResponse for Vec<u8> {
fn from_response(
response: crate::ApiResponse,
has_body: bool,
) -> Result<Self, crate::StructureError> {
soft_assert::soft_assert!(has_body, Err(crate::StructureError::EmptyResponse));
Ok(response.body.to_vec())
}
}
impl<
T: FromResponse,
H: for<'a> TryFrom<&'a reqwest::header::HeaderMap, Error = crate::StructureError>,
> FromResponse for (H, T)
{
fn from_response(
response: crate::ApiResponse,
has_body: bool,
) -> Result<Self, crate::StructureError> {
let headers = H::try_from(&response.headers)?;
let body = T::from_response(response, has_body)?;
Ok((headers, body))
}
}
impl<T: FromResponse> FromResponse for Option<T> {
fn from_response(
response: crate::ApiResponse,
has_body: bool,
) -> Result<Self, crate::StructureError> {
if has_body {
T::from_response(response, true).map(Some)
} else {
Ok(None)
}
}
}
impl_from_response!(bool);
impl FromResponse for () {
fn from_response(_: crate::ApiResponse, _: bool) -> Result<Self, crate::StructureError> {
Ok(())
}
}
#[derive(Clone)]
pub enum RequestBody {
Json(bytes::Bytes),
Form(Vec<(&'static str, Vec<u8>)>),
None,
}
pub struct TypedRequest<E, R> {
inner: RawRequest,
__endpoint: PhantomData<E>,
__response: PhantomData<R>,
}
impl<E: Endpoint, R: FromResponse> TypedRequest<E, R> {
async fn send(&self, client: &Forgejo) -> Result<R, ForgejoError> {
let (response, has_body) = E::handle_error(client.send_request(&self.inner).await?)?;
Ok(R::from_response(response, has_body)?)
}
#[cfg(feature = "sync")]
fn send_sync(&self, client: &sync::Forgejo) -> Result<R, ForgejoError> {
let (response, has_body) = E::handle_error(client.send_request(&self.inner)?)?;
Ok(R::from_response(response, has_body)?)
}
}
pub struct ApiResponse {
status_code: StatusCode,
headers: reqwest::header::HeaderMap,
body: bytes::Bytes,
}
impl ApiResponse {
pub fn status_code(&self) -> StatusCode {
self.status_code
}
pub fn headers(&self) -> &reqwest::header::HeaderMap {
&self.headers
}
pub fn body(&self) -> &bytes::Bytes {
&self.body
}
}
pub struct Request<'a, E, R> {
inner: TypedRequest<E, R>,
client: &'a Forgejo,
}
impl<'a, E: Endpoint, R: FromResponse> Request<'a, E, R> {
pub async fn send(self) -> Result<R, ForgejoError> {
self.inner.send(self.client).await
}
pub fn response_type<T: FromResponse>(self) -> Request<'a, E, T> {
Request {
inner: TypedRequest {
inner: self.inner.inner,
__endpoint: PhantomData,
__response: PhantomData,
},
client: self.client,
}
}
pub fn page(mut self, page: u32) -> Self {
self.inner.inner.page = Some(page);
self
}
pub fn page_size(mut self, limit: u32) -> Self {
self.inner.inner.limit = Some(limit);
self
}
}
pub trait CountHeader: sealed::Sealed {
fn count(&self) -> Option<usize>;
}
pub trait PageSize: sealed::Sealed {
fn page_size(&self) -> usize;
}
impl<T> sealed::Sealed for Vec<T> {}
impl<T> PageSize for Vec<T> {
fn page_size(&self) -> usize {
self.len()
}
}
impl<'a, E: Endpoint, H: CountHeader, T: PageSize> Request<'a, E, (H, T)>
where
(H, T): FromResponse,
{
pub fn stream_pages(self) -> PageStream<'a, E, T, H> {
PageStream {
request: self,
total_seen: 0,
finished: false,
fut: None,
}
}
}
pub struct PageStream<'a, E: Endpoint, T, H> {
request: Request<'a, E, (H, T)>,
total_seen: usize,
finished: bool,
fut: Option<Pin<Box<dyn Future<Output = Result<(H, T), ForgejoError>> + Send + Sync + 'a>>>,
}
impl<'a, E: Endpoint, T: PageSize, H: CountHeader> futures::stream::Stream
for PageStream<'a, E, T, H>
where
Self: Unpin + 'a,
(H, T): FromResponse,
{
type Item = Result<T, ForgejoError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.finished {
return Poll::Ready(None);
}
match &mut self.fut {
None => {
let request = self.request.inner.inner.clone();
let client = self.request.client;
let fut = Box::pin(async move {
E::handle_error(client.send_request(&request).await?).and_then(|(res, body)| {
<(H, T)>::from_response(res, body).map_err(|e| e.into())
})
});
self.fut = Some(fut);
cx.waker().wake_by_ref();
Poll::Pending
}
Some(fut) => {
let (headers, page_content) = match fut.as_mut().poll(cx) {
Poll::Ready(Ok(response)) => response,
Poll::Ready(Err(e)) => {
self.finished = true;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => return Poll::Pending,
};
self.total_seen += page_content.page_size();
let total_count = match headers.count() {
Some(n) => n,
None => {
self.finished = true;
return Poll::Ready(Some(Err(StructureError::HeaderMissing(
"x-total-count",
)
.into())));
}
};
if self.total_seen >= total_count {
self.finished = true;
} else {
self.request.inner.inner.page =
Some(self.request.inner.inner.page.unwrap_or(1) + 1);
self.fut = None;
}
Poll::Ready(Some(Ok(page_content)))
}
}
}
}
impl<
'a,
E: Endpoint + Unpin + Send + Sync + 'a,
T: Unpin + Send + Sync + 'a,
H: CountHeader + Unpin + Send + Sync + 'a,
> Request<'a, E, (H, Vec<T>)>
where
(H, Vec<T>): FromResponse,
{
pub fn stream(
self,
) -> impl futures::Stream<Item = Result<T, ForgejoError>> + Send + Sync + use<'a, E, T, H> {
use futures::TryStreamExt;
self.stream_pages()
.map_ok(|page| futures::stream::iter(page.into_iter().map(Ok)))
.try_flatten()
}
pub async fn all(self) -> Result<Vec<T>, ForgejoError> {
use futures::TryStreamExt;
self.stream().try_collect().await
}
}
impl<'a, E: Endpoint, R: FromResponse> std::future::IntoFuture for Request<'a, E, R> {
type Output = Result<R, ForgejoError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'a>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let (response, has_body) =
E::handle_error(self.client.send_request(&self.inner.inner).await?)?;
Ok(R::from_response(response, has_body)?)
})
}
}