1use error_stack::ResultExt;
16use morax_api::request::AcknowledgeRequest;
17use morax_api::request::AcknowledgeResponse;
18use morax_api::request::CreateSubscriptionRequest;
19use morax_api::request::CreateSubscriptionResponse;
20use morax_api::request::CreateTopicRequest;
21use morax_api::request::CreateTopicResponse;
22use morax_api::request::PublishMessageRequest;
23use morax_api::request::PublishMessageResponse;
24use morax_api::request::PullMessageRequest;
25use morax_api::request::PullMessageResponse;
26use reqwest::Client;
27use reqwest::ClientBuilder;
28use reqwest::Response;
29use reqwest::StatusCode;
30use serde::de::DeserializeOwned;
31
32#[derive(Debug, thiserror::Error)]
33#[error("{0}")]
34pub struct ClientError(String);
35
36#[derive(Debug, Clone)]
37pub enum HTTPResponse<T> {
38 Success(T),
39 Error(ErrorStatus),
40}
41
42impl<T> HTTPResponse<T> {
43 pub fn into_success(self) -> Option<T> {
44 match self {
45 HTTPResponse::Success(t) => Some(t),
46 HTTPResponse::Error(_) => None,
47 }
48 }
49}
50
51#[derive(Debug, Clone)]
52pub struct ErrorStatus {
53 code: StatusCode,
54 payload: Vec<u8>,
55}
56
57impl std::fmt::Display for ErrorStatus {
58 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
59 write!(
60 f,
61 "{:?} ({}): {}",
62 self.code.canonical_reason(),
63 self.code.as_u16(),
64 if self.payload.is_empty() {
65 "(no payload)".into()
66 } else {
67 String::from_utf8_lossy(&self.payload)
68 }
69 )
70 }
71}
72
73#[derive(Debug)]
74pub struct HTTPClient {
75 endpoint: String,
76 client: Client,
77}
78
79impl HTTPClient {
80 pub fn new(
81 endpoint: impl Into<String>,
82 builder: ClientBuilder,
83 ) -> error_stack::Result<Self, ClientError> {
84 let endpoint = endpoint.into();
85 let make_error = || ClientError(format!("failed to create client: {endpoint:?}"));
86
87 Ok(Self {
88 endpoint: endpoint.clone(),
89 client: builder.build().change_context_lazy(make_error)?,
90 })
91 }
92
93 pub async fn create_topic(
94 &self,
95 topic_name: String,
96 request: CreateTopicRequest,
97 ) -> error_stack::Result<HTTPResponse<CreateTopicResponse>, ClientError> {
98 let make_error = || ClientError(format!("failed to create topic: {request:?}"));
99
100 let response = self
101 .client
102 .post(format!("{}/v1/topics/{topic_name}", self.endpoint))
103 .json(&request)
104 .send()
105 .await
106 .change_context_lazy(make_error)?;
107
108 make_response(response).await
109 }
110
111 pub async fn publish(
112 &self,
113 topic_name: String,
114 request: PublishMessageRequest,
115 ) -> error_stack::Result<HTTPResponse<PublishMessageResponse>, ClientError> {
116 let make_error = || ClientError(format!("failed to publish messages: {request:?}"));
117
118 let response = self
119 .client
120 .post(format!("{}/v1/topics/{topic_name}/publish", self.endpoint))
121 .json(&request)
122 .send()
123 .await
124 .change_context_lazy(make_error)?;
125
126 make_response(response).await
127 }
128
129 pub async fn create_subscription(
130 &self,
131 subscription_name: String,
132 request: CreateSubscriptionRequest,
133 ) -> error_stack::Result<HTTPResponse<CreateSubscriptionResponse>, ClientError> {
134 let make_error = || ClientError(format!("failed to create subscription: {request:?}"));
135
136 let response = self
137 .client
138 .post(format!(
139 "{}/v1/subscriptions/{subscription_name}",
140 self.endpoint
141 ))
142 .json(&request)
143 .send()
144 .await
145 .change_context_lazy(make_error)?;
146
147 make_response(response).await
148 }
149
150 pub async fn pull(
151 &self,
152 subscription_name: String,
153 request: PullMessageRequest,
154 ) -> error_stack::Result<HTTPResponse<PullMessageResponse>, ClientError> {
155 let make_error = || ClientError(format!("failed to pull messages: {request:?}"));
156
157 let response = self
158 .client
159 .post(format!(
160 "{}/v1/subscriptions/{subscription_name}/pull",
161 self.endpoint
162 ))
163 .json(&request)
164 .send()
165 .await
166 .change_context_lazy(make_error)?;
167
168 make_response(response).await
169 }
170
171 pub async fn acknowledge(
172 &self,
173 subscription_name: String,
174 request: AcknowledgeRequest,
175 ) -> error_stack::Result<HTTPResponse<AcknowledgeResponse>, ClientError> {
176 let make_error = || ClientError(format!("failed to acknowledge: {request:?}"));
177
178 let response = self
179 .client
180 .post(format!(
181 "{}/v1/subscriptions/{subscription_name}/acknowledge",
182 self.endpoint
183 ))
184 .json(&request)
185 .send()
186 .await
187 .change_context_lazy(make_error)?;
188
189 make_response(response).await
190 }
191}
192
193async fn make_response<T: DeserializeOwned>(
194 r: Response,
195) -> error_stack::Result<HTTPResponse<T>, ClientError> {
196 let make_error = || ClientError("failed to make response".to_string());
197
198 let status = r.status();
199
200 if status.is_success() {
201 let result = r.json().await.change_context_lazy(make_error)?;
202 return Ok(HTTPResponse::Success(result));
203 }
204
205 let payload = r.bytes().await.change_context_lazy(make_error)?;
206 Ok(HTTPResponse::Error(ErrorStatus {
207 code: status,
208 payload: payload.to_vec(),
209 }))
210}