morax_client/
lib.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use error_stack::ResultExt;
16use morax_api::request::AcknowledgeRequest;
17use morax_api::request::AcknowledgeResponse;
18use morax_api::request::CreateSubscriptionRequest;
19use morax_api::request::CreateSubscriptionResponse;
20use morax_api::request::CreateTopicRequest;
21use morax_api::request::CreateTopicResponse;
22use morax_api::request::PublishMessageRequest;
23use morax_api::request::PublishMessageResponse;
24use morax_api::request::PullMessageRequest;
25use morax_api::request::PullMessageResponse;
26use reqwest::Client;
27use reqwest::ClientBuilder;
28use reqwest::Response;
29use reqwest::StatusCode;
30use serde::de::DeserializeOwned;
31
32#[derive(Debug, thiserror::Error)]
33#[error("{0}")]
34pub struct ClientError(String);
35
36#[derive(Debug, Clone)]
37pub enum HTTPResponse<T> {
38    Success(T),
39    Error(ErrorStatus),
40}
41
42impl<T> HTTPResponse<T> {
43    pub fn into_success(self) -> Option<T> {
44        match self {
45            HTTPResponse::Success(t) => Some(t),
46            HTTPResponse::Error(_) => None,
47        }
48    }
49}
50
51#[derive(Debug, Clone)]
52pub struct ErrorStatus {
53    code: StatusCode,
54    payload: Vec<u8>,
55}
56
57impl std::fmt::Display for ErrorStatus {
58    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
59        write!(
60            f,
61            "{:?} ({}): {}",
62            self.code.canonical_reason(),
63            self.code.as_u16(),
64            if self.payload.is_empty() {
65                "(no payload)".into()
66            } else {
67                String::from_utf8_lossy(&self.payload)
68            }
69        )
70    }
71}
72
73#[derive(Debug)]
74pub struct HTTPClient {
75    endpoint: String,
76    client: Client,
77}
78
79impl HTTPClient {
80    pub fn new(
81        endpoint: impl Into<String>,
82        builder: ClientBuilder,
83    ) -> error_stack::Result<Self, ClientError> {
84        let endpoint = endpoint.into();
85        let make_error = || ClientError(format!("failed to create client: {endpoint:?}"));
86
87        Ok(Self {
88            endpoint: endpoint.clone(),
89            client: builder.build().change_context_lazy(make_error)?,
90        })
91    }
92
93    pub async fn create_topic(
94        &self,
95        topic_name: String,
96        request: CreateTopicRequest,
97    ) -> error_stack::Result<HTTPResponse<CreateTopicResponse>, ClientError> {
98        let make_error = || ClientError(format!("failed to create topic: {request:?}"));
99
100        let response = self
101            .client
102            .post(format!("{}/v1/topics/{topic_name}", self.endpoint))
103            .json(&request)
104            .send()
105            .await
106            .change_context_lazy(make_error)?;
107
108        make_response(response).await
109    }
110
111    pub async fn publish(
112        &self,
113        topic_name: String,
114        request: PublishMessageRequest,
115    ) -> error_stack::Result<HTTPResponse<PublishMessageResponse>, ClientError> {
116        let make_error = || ClientError(format!("failed to publish messages: {request:?}"));
117
118        let response = self
119            .client
120            .post(format!("{}/v1/topics/{topic_name}/publish", self.endpoint))
121            .json(&request)
122            .send()
123            .await
124            .change_context_lazy(make_error)?;
125
126        make_response(response).await
127    }
128
129    pub async fn create_subscription(
130        &self,
131        subscription_name: String,
132        request: CreateSubscriptionRequest,
133    ) -> error_stack::Result<HTTPResponse<CreateSubscriptionResponse>, ClientError> {
134        let make_error = || ClientError(format!("failed to create subscription: {request:?}"));
135
136        let response = self
137            .client
138            .post(format!(
139                "{}/v1/subscriptions/{subscription_name}",
140                self.endpoint
141            ))
142            .json(&request)
143            .send()
144            .await
145            .change_context_lazy(make_error)?;
146
147        make_response(response).await
148    }
149
150    pub async fn pull(
151        &self,
152        subscription_name: String,
153        request: PullMessageRequest,
154    ) -> error_stack::Result<HTTPResponse<PullMessageResponse>, ClientError> {
155        let make_error = || ClientError(format!("failed to pull messages: {request:?}"));
156
157        let response = self
158            .client
159            .post(format!(
160                "{}/v1/subscriptions/{subscription_name}/pull",
161                self.endpoint
162            ))
163            .json(&request)
164            .send()
165            .await
166            .change_context_lazy(make_error)?;
167
168        make_response(response).await
169    }
170
171    pub async fn acknowledge(
172        &self,
173        subscription_name: String,
174        request: AcknowledgeRequest,
175    ) -> error_stack::Result<HTTPResponse<AcknowledgeResponse>, ClientError> {
176        let make_error = || ClientError(format!("failed to acknowledge: {request:?}"));
177
178        let response = self
179            .client
180            .post(format!(
181                "{}/v1/subscriptions/{subscription_name}/acknowledge",
182                self.endpoint
183            ))
184            .json(&request)
185            .send()
186            .await
187            .change_context_lazy(make_error)?;
188
189        make_response(response).await
190    }
191}
192
193async fn make_response<T: DeserializeOwned>(
194    r: Response,
195) -> error_stack::Result<HTTPResponse<T>, ClientError> {
196    let make_error = || ClientError("failed to make response".to_string());
197
198    let status = r.status();
199
200    if status.is_success() {
201        let result = r.json().await.change_context_lazy(make_error)?;
202        return Ok(HTTPResponse::Success(result));
203    }
204
205    let payload = r.bytes().await.change_context_lazy(make_error)?;
206    Ok(HTTPResponse::Error(ErrorStatus {
207        code: status,
208        payload: payload.to_vec(),
209    }))
210}