synd_term/client/synd_api/
mod.rs

1use std::{fmt::Debug, time::Duration};
2
3use anyhow::anyhow;
4use graphql_client::{GraphQLQuery, Response};
5use reqwest::header::{self, HeaderValue};
6use serde::{de::DeserializeOwned, Serialize};
7use synd_o11y::{health_check::Health, opentelemetry::extension::*};
8use thiserror::Error;
9use tracing::{error, Span};
10use url::Url;
11
12use crate::{
13    auth::{Credential, Verified},
14    client::synd_api::payload::ExportSubscriptionPayload,
15    config, types,
16};
17
18use self::query::subscription::SubscriptionOutput;
19
20mod scalar;
21pub use scalar::*;
22#[path = "generated/mutation.rs"]
23pub mod mutation;
24pub mod payload;
25#[path = "generated/query.rs"]
26pub mod query;
27
28#[derive(Error, Debug)]
29pub enum SubscribeFeedError {
30    #[error("invalid feed url: `{feed_url}` ({message})`")]
31    InvalidFeedUrl { feed_url: FeedUrl, message: String },
32    #[error("{feed_url} {message}")]
33    FeedUnavailable { feed_url: FeedUrl, message: String },
34}
35
36#[derive(Error, Debug)]
37pub enum SyndApiError {
38    #[error("unauthorized")]
39    Unauthorized { url: Option<Url> },
40    #[error(transparent)]
41    BuildRequest(#[from] reqwest::Error),
42    #[error("graphql error: {errors:?}")]
43    Graphql { errors: Vec<graphql_client::Error> },
44    #[error(transparent)]
45    SubscribeFeed(SubscribeFeedError),
46    #[error(transparent)]
47    Internal(#[from] anyhow::Error),
48}
49
50/// synd-api client
51#[derive(Clone)]
52pub struct Client {
53    client: reqwest::Client,
54    credential: Option<HeaderValue>,
55    endpoint: Url,
56}
57
58impl Client {
59    const GRAPHQL: &'static str = "/graphql";
60    const HEALTH_CHECK: &'static str = "/health";
61
62    pub fn new(endpoint: Url, timeout: Duration) -> anyhow::Result<Self> {
63        let client = reqwest::ClientBuilder::new()
64            .user_agent(config::client::USER_AGENT)
65            .timeout(timeout)
66            .connect_timeout(Duration::from_secs(10))
67            // this client specifically targets the syndicationd api, so accepts self signed certificates
68            .danger_accept_invalid_certs(true)
69            .build()?;
70
71        Ok(Self {
72            client,
73            endpoint,
74            credential: None,
75        })
76    }
77
78    pub(crate) fn set_credential(&mut self, cred: Verified<Credential>) {
79        let mut token = HeaderValue::try_from(match cred.into_inner() {
80            Credential::Github { access_token } => format!("github {access_token}"),
81            Credential::Google { id_token, .. } => format!("google {id_token}"),
82        })
83        .unwrap();
84        token.set_sensitive(true);
85        self.credential = Some(token);
86    }
87
88    #[tracing::instrument(skip(self))]
89    pub async fn fetch_subscription(
90        &self,
91        after: Option<String>,
92        first: Option<i64>,
93    ) -> Result<SubscriptionOutput, SyndApiError> {
94        let var = query::subscription::Variables { after, first };
95        let request = query::Subscription::build_query(var);
96        let response: query::subscription::ResponseData = self.request(&request).await?;
97        Ok(response.output)
98    }
99
100    #[tracing::instrument(skip(self))]
101    pub async fn subscribe_feed(
102        &self,
103        input: mutation::subscribe_feed::SubscribeFeedInput,
104    ) -> Result<types::Feed, SyndApiError> {
105        use crate::client::synd_api::mutation::subscribe_feed::ResponseCode;
106        let url = input.url.clone();
107        let var = mutation::subscribe_feed::Variables {
108            subscribe_input: input,
109        };
110        let request = mutation::SubscribeFeed::build_query(var);
111        let response: mutation::subscribe_feed::ResponseData = self.request(&request).await?;
112
113        match response.subscribe_feed {
114            mutation::subscribe_feed::SubscribeFeedSubscribeFeed::SubscribeFeedSuccess(success) => {
115                Ok(types::Feed::from(success.feed))
116            }
117            mutation::subscribe_feed::SubscribeFeedSubscribeFeed::SubscribeFeedError(err) => {
118                match err.status.code {
119                    ResponseCode::OK => unreachable!(),
120                    ResponseCode::INVALID_FEED_URL => Err(SyndApiError::SubscribeFeed(
121                        SubscribeFeedError::InvalidFeedUrl {
122                            feed_url: url,
123                            message: err.message,
124                        },
125                    )),
126                    ResponseCode::FEED_UNAVAILABLE => Err(SyndApiError::SubscribeFeed(
127                        SubscribeFeedError::FeedUnavailable {
128                            feed_url: url,
129                            message: err.message,
130                        },
131                    )),
132                    err_code => Err(SyndApiError::Internal(anyhow::anyhow!(
133                        "Unexpected subscribe_feed error code: {err_code:?}"
134                    ))),
135                }
136            }
137        }
138    }
139
140    #[tracing::instrument(skip(self))]
141    pub async fn unsubscribe_feed(&self, url: FeedUrl) -> Result<(), SyndApiError> {
142        let var = mutation::unsubscribe_feed::Variables {
143            unsubscribe_input: mutation::unsubscribe_feed::UnsubscribeFeedInput { url },
144        };
145        let request = mutation::UnsubscribeFeed::build_query(var);
146        let response: mutation::unsubscribe_feed::ResponseData = self.request(&request).await?;
147
148        match response.unsubscribe_feed {
149            mutation::unsubscribe_feed::UnsubscribeFeedUnsubscribeFeed::UnsubscribeFeedSuccess(
150                _,
151            ) => Ok(()),
152            mutation::unsubscribe_feed::UnsubscribeFeedUnsubscribeFeed::UnsubscribeFeedError(
153                err,
154            ) => Err(SyndApiError::Internal(anyhow!(
155                "Failed to mutate unsubscribe_feed {err:?}"
156            ))),
157        }
158    }
159
160    #[tracing::instrument(skip(self))]
161    pub async fn fetch_entries(
162        &self,
163        after: Option<String>,
164        first: i64,
165    ) -> Result<payload::FetchEntriesPayload, SyndApiError> {
166        tracing::debug!("Fetch entries...");
167
168        let var = query::entries::Variables { after, first };
169        let request = query::Entries::build_query(var);
170        let response: query::entries::ResponseData = self.request(&request).await?;
171
172        tracing::debug!("Got response");
173
174        Ok(response.output.into())
175    }
176
177    #[tracing::instrument(skip(self))]
178    pub async fn export_subscription(
179        &self,
180        after: Option<String>,
181        first: i64,
182    ) -> anyhow::Result<ExportSubscriptionPayload> {
183        let var = query::export_subscription::Variables { after, first };
184        let request = query::ExportSubscription::build_query(var);
185        let response: query::export_subscription::ResponseData = self.request(&request).await?;
186
187        Ok(response.output.into())
188    }
189
190    #[tracing::instrument(skip_all, err(Display))]
191    async fn request<Body, ResponseData>(&self, body: &Body) -> Result<ResponseData, SyndApiError>
192    where
193        Body: Serialize + Debug + ?Sized,
194        ResponseData: DeserializeOwned + Debug,
195    {
196        let mut request = self
197            .client
198            .post(self.endpoint.join(Self::GRAPHQL).unwrap())
199            .header(
200                header::AUTHORIZATION,
201                self.credential
202                    .as_ref()
203                    .expect("Credential not configured. this is a BUG")
204                    .clone(),
205            )
206            .json(body)
207            .build()
208            .map_err(SyndApiError::BuildRequest)?;
209
210        synd_o11y::opentelemetry::http::inject_with_baggage(
211            &Span::current().context(),
212            request.headers_mut(),
213            std::iter::once(synd_o11y::request_id_key_value()),
214        );
215
216        tracing::debug!(url = request.url().as_str(), "Send request");
217
218        let response: Response<ResponseData> = self
219            .client
220            .execute(request)
221            .await?
222            .error_for_status()
223            .map_err(|err| match err.status().map(|s| s.as_u16()) {
224                Some(401) => SyndApiError::Unauthorized {
225                    url: err.url().cloned(),
226                },
227                _ => SyndApiError::Internal(anyhow::Error::from(err)),
228            })?
229            .json()
230            .await?;
231
232        match (response.data, response.errors) {
233            (_, Some(errors)) if !errors.is_empty() => Err(SyndApiError::Graphql { errors }),
234            (Some(data), _) => Ok(data),
235            _ => Err(SyndApiError::Internal(anyhow!(
236                "Unexpected error. response does not contain data and errors"
237            ))),
238        }
239    }
240
241    // call health check api
242    pub async fn health(&self) -> anyhow::Result<Health> {
243        self.client
244            .get(self.endpoint.join(Self::HEALTH_CHECK).unwrap())
245            .send()
246            .await?
247            .error_for_status()?
248            .json()
249            .await
250            .map_err(anyhow::Error::from)
251    }
252}