1use std::any::TypeId;
2use std::{borrow::Cow, collections::HashMap, future::Future, sync::Arc};
3
4use asteroid_mq_model::{EndpointAddr, MessageStateUpdate, TopicCode};
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8use crate::protocol::message::*;
9use crate::protocol::node::raft::state_machine::topic::config::TopicConfig;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DurableMessage {
13 pub message: Message,
14 pub status: HashMap<EndpointAddr, MessageStatusKind>,
15 pub time: DateTime<Utc>,
16}
17
18#[derive(Debug, Clone, Copy)]
19pub struct DurableMessageQuery {
20 pub limit: u32,
21 pub offset: u32,
22}
23
24impl DurableMessageQuery {
25 pub fn new(limit: u32, offset: u32) -> Self {
26 Self { limit, offset }
27 }
28 pub fn next_page(&self) -> Self {
29 Self {
30 limit: self.limit,
31 offset: self.offset + self.limit,
32 }
33 }
34}
35
36pub use asteroid_mq_model::MessageDurableConfig;
37
38#[derive(Debug)]
39pub struct DurableError {
40 pub context: Cow<'static, str>,
41 pub source: Option<Box<dyn std::error::Error + Send + Sync>>,
42}
43
44impl std::fmt::Display for DurableError {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 write!(f, "DurableError: {}", self.context)?;
47 if let Some(source) = &self.source {
48 write!(f, "with source: {}", source)?;
49 }
50 Ok(())
51 }
52}
53
54impl std::error::Error for DurableError {}
55
56impl DurableError {
57 pub fn new_local(context: &'static str) -> Self {
58 Self {
59 context: context.into(),
60 source: None,
61 }
62 }
63 pub fn with_source(
64 context: &'static str,
65 source: impl std::error::Error + Send + Sync + 'static,
66 ) -> Self {
67 Self {
68 context: context.into(),
69 source: Some(Box::new(source)),
70 }
71 }
72}
73
74#[derive(Debug, Clone)]
75pub(crate) enum DurableCommand {
76 Create(Message),
77 UpdateStatus(MessageStateUpdate),
78 Archive(MessageId),
79}
80#[derive(Clone)]
81pub struct DurableService {
82 provider: Cow<'static, str>,
83 provider_type: TypeId,
84 inner: Arc<dyn sealed::DurabilityObjectTrait>,
85}
86
87impl std::fmt::Debug for DurableService {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 f.debug_struct("DurableService")
90 .field("provider", &self.provider)
91 .finish()
92 }
93}
94impl DurableService {
95 pub fn new<T>(inner: T) -> Self
96 where
97 T: Durable + 'static,
98 {
99 Self {
100 provider: std::any::type_name::<T>().into(),
101 provider_type: TypeId::of::<T>(),
102 inner: Arc::new(inner),
103 }
104 }
105 pub fn downcast_ref<T: Durable>(&self) -> Option<&T> {
106 if self.provider_type == TypeId::of::<T>() {
107 unsafe {
108 Some(
109 &*(self.inner.as_ref() as *const dyn sealed::DurabilityObjectTrait as *const T),
110 )
111 }
112 } else {
113 None
114 }
115 }
116 #[inline(always)]
117 pub async fn save(
118 &self,
119 topic: TopicCode,
120 message: DurableMessage,
121 ) -> Result<(), DurableError> {
122 self.inner.save(topic, message).await
123 }
124 pub async fn update_status(
125 &self,
126 topic: TopicCode,
127 update: MessageStateUpdate,
128 ) -> Result<(), DurableError> {
129 self.inner.update_status(topic, update).await
130 }
131 #[inline(always)]
132 pub async fn archive(
133 &self,
134 topic: TopicCode,
135 message_id: MessageId,
136 ) -> Result<(), DurableError> {
137 self.inner.archive(topic, message_id).await
138 }
139 #[inline(always)]
140 pub async fn retrieve(
141 &self,
142 topic: TopicCode,
143 message_id: MessageId,
144 ) -> Result<DurableMessage, DurableError> {
145 self.inner.retrieve(topic, message_id).await
146 }
147 #[inline(always)]
148 pub async fn batch_retrieve(
149 &self,
150 topic: TopicCode,
151 query: DurableMessageQuery,
152 ) -> Result<Vec<DurableMessage>, DurableError> {
153 self.inner.batch_retrieve(topic, query).await
154 }
155 #[inline(always)]
156 pub async fn create_topic(&self, topic: TopicConfig) -> Result<(), DurableError> {
157 self.inner.create_topic(topic).await
158 }
159 #[inline(always)]
160 pub async fn delete_topic(&self, topic: TopicCode) -> Result<(), DurableError> {
161 self.inner.delete_topic(topic).await
162 }
163 #[inline(always)]
164 pub async fn topic_code_list(&self) -> Result<Vec<TopicCode>, DurableError> {
165 self.inner.topic_code_list().await
166 }
167 pub async fn topic_list(&self) -> Result<Vec<TopicConfig>, DurableError> {
168 self.inner.topic_list().await
169 }
170}
171
172pub trait Durable: Send + Sync + 'static {
173 fn save(
174 &self,
175 topic: TopicCode,
176 message: DurableMessage,
177 ) -> impl Future<Output = Result<(), DurableError>> + Send;
178 fn update_status(
179 &self,
180 topic: TopicCode,
181 update: MessageStateUpdate,
182 ) -> impl Future<Output = Result<(), DurableError>> + Send;
183
184 fn retrieve(
185 &self,
186 topic: TopicCode,
187 message_id: MessageId,
188 ) -> impl Future<Output = Result<DurableMessage, DurableError>> + Send;
189 fn batch_retrieve(
191 &self,
192 topic: TopicCode,
193 query: DurableMessageQuery,
194 ) -> impl Future<Output = Result<Vec<DurableMessage>, DurableError>> + Send;
195 fn archive(
196 &self,
197 topic: TopicCode,
198 message_id: MessageId,
199 ) -> impl Future<Output = Result<(), DurableError>> + Send;
200 fn create_topic(
201 &self,
202 topic: TopicConfig,
203 ) -> impl Future<Output = Result<(), DurableError>> + Send;
204 fn delete_topic(
205 &self,
206 topic: TopicCode,
207 ) -> impl Future<Output = Result<(), DurableError>> + Send;
208 fn topic_code_list(&self) -> impl Future<Output = Result<Vec<TopicCode>, DurableError>> + Send;
209 fn topic_list(&self) -> impl Future<Output = Result<Vec<TopicConfig>, DurableError>> + Send;
210}
211
212mod sealed {
213 use std::{future::Future, pin::Pin};
214
215 use asteroid_mq_model::MessageStateUpdate;
216
217 use crate::{
218 prelude::TopicCode,
219 protocol::{message::*, node::raft::state_machine::topic::config::TopicConfig},
220 };
221
222 use super::{Durable, DurableError, DurableMessage, DurableMessageQuery};
223
224 pub(super) trait DurabilityObjectTrait: Send + Sync + 'static {
225 fn save(
226 &self,
227 topic: TopicCode,
228 message: DurableMessage,
229 ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>>;
230 fn retrieve(
231 &self,
232 topic: TopicCode,
233 message_id: MessageId,
234 ) -> Pin<Box<dyn Future<Output = Result<DurableMessage, DurableError>> + Send + '_>>;
235 fn update_status(
236 &self,
237 topic: TopicCode,
238 update: MessageStateUpdate,
239 ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>>;
240 fn batch_retrieve(
241 &self,
242 topic: TopicCode,
243 query: DurableMessageQuery,
244 ) -> Pin<Box<dyn Future<Output = Result<Vec<DurableMessage>, DurableError>> + Send + '_>>;
245 fn archive(
246 &self,
247 topic: TopicCode,
248 message_id: MessageId,
249 ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>>;
250 fn create_topic(
251 &self,
252 topic: TopicConfig,
253 ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>>;
254 fn delete_topic(
255 &self,
256 topic: TopicCode,
257 ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>>;
258 fn topic_code_list(
259 &self,
260 ) -> Pin<Box<dyn Future<Output = Result<Vec<TopicCode>, DurableError>> + Send + '_>>;
261 fn topic_list(
262 &self,
263 ) -> Pin<Box<dyn Future<Output = Result<Vec<TopicConfig>, DurableError>> + Send + '_>>;
264 }
265
266 impl<T> DurabilityObjectTrait for T
267 where
268 T: Durable,
269 {
270 #[inline(always)]
271 fn save(
272 &self,
273 topic: TopicCode,
274 message: DurableMessage,
275 ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>> {
276 Box::pin(self.save(topic, message))
277 }
278
279 #[inline(always)]
280 fn retrieve(
281 &self,
282 topic: TopicCode,
283 message_id: MessageId,
284 ) -> Pin<Box<dyn Future<Output = Result<DurableMessage, DurableError>> + Send + '_>>
285 {
286 Box::pin(self.retrieve(topic, message_id))
287 }
288 #[inline(always)]
289 fn batch_retrieve(
290 &self,
291 topic: TopicCode,
292 query: DurableMessageQuery,
293 ) -> Pin<Box<dyn Future<Output = Result<Vec<DurableMessage>, DurableError>> + Send + '_>>
294 {
295 Box::pin(self.batch_retrieve(topic, query))
296 }
297 #[inline(always)]
298 fn archive(
299 &self,
300 topic: TopicCode,
301 message_id: MessageId,
302 ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>> {
303 Box::pin(self.archive(topic, message_id))
304 }
305
306 #[inline(always)]
307 fn update_status(
308 &self,
309 topic: TopicCode,
310 update: MessageStateUpdate,
311 ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>> {
312 Box::pin(self.update_status(topic, update))
313 }
314 #[inline(always)]
315 fn create_topic(
316 &self,
317 topic: TopicConfig,
318 ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>> {
319 Box::pin(self.create_topic(topic))
320 }
321 #[inline(always)]
322 fn delete_topic(
323 &self,
324 topic: TopicCode,
325 ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>> {
326 Box::pin(self.delete_topic(topic))
327 }
328 #[inline(always)]
329 fn topic_code_list(
330 &self,
331 ) -> Pin<Box<dyn Future<Output = Result<Vec<TopicCode>, DurableError>> + Send + '_>>
332 {
333 Box::pin(self.topic_code_list())
334 }
335 #[inline(always)]
336 fn topic_list(
337 &self,
338 ) -> Pin<Box<dyn Future<Output = Result<Vec<TopicConfig>, DurableError>> + Send + '_>>
339 {
340 Box::pin(self.topic_list())
341 }
342 }
343}