Skip to main content

asteroid_mq/protocol/node/
durable_message.rs

1use std::any::TypeId;
2use std::{borrow::Cow, collections::HashMap, future::Future, sync::Arc};
3
4use asteroid_mq_model::{EndpointAddr, MessageStateUpdate, TopicCode};
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8use crate::protocol::message::*;
9use crate::protocol::node::raft::state_machine::topic::config::TopicConfig;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DurableMessage {
13    pub message: Message,
14    pub status: HashMap<EndpointAddr, MessageStatusKind>,
15    pub time: DateTime<Utc>,
16}
17
18#[derive(Debug, Clone, Copy)]
19pub struct DurableMessageQuery {
20    pub limit: u32,
21    pub offset: u32,
22}
23
24impl DurableMessageQuery {
25    pub fn new(limit: u32, offset: u32) -> Self {
26        Self { limit, offset }
27    }
28    pub fn next_page(&self) -> Self {
29        Self {
30            limit: self.limit,
31            offset: self.offset + self.limit,
32        }
33    }
34}
35
36pub use asteroid_mq_model::MessageDurableConfig;
37
38#[derive(Debug)]
39pub struct DurableError {
40    pub context: Cow<'static, str>,
41    pub source: Option<Box<dyn std::error::Error + Send + Sync>>,
42}
43
44impl std::fmt::Display for DurableError {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        write!(f, "DurableError: {}", self.context)?;
47        if let Some(source) = &self.source {
48            write!(f, "with source: {}", source)?;
49        }
50        Ok(())
51    }
52}
53
54impl std::error::Error for DurableError {}
55
56impl DurableError {
57    pub fn new_local(context: &'static str) -> Self {
58        Self {
59            context: context.into(),
60            source: None,
61        }
62    }
63    pub fn with_source(
64        context: &'static str,
65        source: impl std::error::Error + Send + Sync + 'static,
66    ) -> Self {
67        Self {
68            context: context.into(),
69            source: Some(Box::new(source)),
70        }
71    }
72}
73
74#[derive(Debug, Clone)]
75pub(crate) enum DurableCommand {
76    Create(Message),
77    UpdateStatus(MessageStateUpdate),
78    Archive(MessageId),
79}
80#[derive(Clone)]
81pub struct DurableService {
82    provider: Cow<'static, str>,
83    provider_type: TypeId,
84    inner: Arc<dyn sealed::DurabilityObjectTrait>,
85}
86
87impl std::fmt::Debug for DurableService {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        f.debug_struct("DurableService")
90            .field("provider", &self.provider)
91            .finish()
92    }
93}
94impl DurableService {
95    pub fn new<T>(inner: T) -> Self
96    where
97        T: Durable + 'static,
98    {
99        Self {
100            provider: std::any::type_name::<T>().into(),
101            provider_type: TypeId::of::<T>(),
102            inner: Arc::new(inner),
103        }
104    }
105    pub fn downcast_ref<T: Durable>(&self) -> Option<&T> {
106        if self.provider_type == TypeId::of::<T>() {
107            unsafe {
108                Some(
109                    &*(self.inner.as_ref() as *const dyn sealed::DurabilityObjectTrait as *const T),
110                )
111            }
112        } else {
113            None
114        }
115    }
116    #[inline(always)]
117    pub async fn save(
118        &self,
119        topic: TopicCode,
120        message: DurableMessage,
121    ) -> Result<(), DurableError> {
122        self.inner.save(topic, message).await
123    }
124    pub async fn update_status(
125        &self,
126        topic: TopicCode,
127        update: MessageStateUpdate,
128    ) -> Result<(), DurableError> {
129        self.inner.update_status(topic, update).await
130    }
131    #[inline(always)]
132    pub async fn archive(
133        &self,
134        topic: TopicCode,
135        message_id: MessageId,
136    ) -> Result<(), DurableError> {
137        self.inner.archive(topic, message_id).await
138    }
139    #[inline(always)]
140    pub async fn retrieve(
141        &self,
142        topic: TopicCode,
143        message_id: MessageId,
144    ) -> Result<DurableMessage, DurableError> {
145        self.inner.retrieve(topic, message_id).await
146    }
147    #[inline(always)]
148    pub async fn batch_retrieve(
149        &self,
150        topic: TopicCode,
151        query: DurableMessageQuery,
152    ) -> Result<Vec<DurableMessage>, DurableError> {
153        self.inner.batch_retrieve(topic, query).await
154    }
155    #[inline(always)]
156    pub async fn create_topic(&self, topic: TopicConfig) -> Result<(), DurableError> {
157        self.inner.create_topic(topic).await
158    }
159    #[inline(always)]
160    pub async fn delete_topic(&self, topic: TopicCode) -> Result<(), DurableError> {
161        self.inner.delete_topic(topic).await
162    }
163    #[inline(always)]
164    pub async fn topic_code_list(&self) -> Result<Vec<TopicCode>, DurableError> {
165        self.inner.topic_code_list().await
166    }
167    pub async fn topic_list(&self) -> Result<Vec<TopicConfig>, DurableError> {
168        self.inner.topic_list().await
169    }
170}
171
172pub trait Durable: Send + Sync + 'static {
173    fn save(
174        &self,
175        topic: TopicCode,
176        message: DurableMessage,
177    ) -> impl Future<Output = Result<(), DurableError>> + Send;
178    fn update_status(
179        &self,
180        topic: TopicCode,
181        update: MessageStateUpdate,
182    ) -> impl Future<Output = Result<(), DurableError>> + Send;
183
184    fn retrieve(
185        &self,
186        topic: TopicCode,
187        message_id: MessageId,
188    ) -> impl Future<Output = Result<DurableMessage, DurableError>> + Send;
189    /// the earlier message should be in the front
190    fn batch_retrieve(
191        &self,
192        topic: TopicCode,
193        query: DurableMessageQuery,
194    ) -> impl Future<Output = Result<Vec<DurableMessage>, DurableError>> + Send;
195    fn archive(
196        &self,
197        topic: TopicCode,
198        message_id: MessageId,
199    ) -> impl Future<Output = Result<(), DurableError>> + Send;
200    fn create_topic(
201        &self,
202        topic: TopicConfig,
203    ) -> impl Future<Output = Result<(), DurableError>> + Send;
204    fn delete_topic(
205        &self,
206        topic: TopicCode,
207    ) -> impl Future<Output = Result<(), DurableError>> + Send;
208    fn topic_code_list(&self) -> impl Future<Output = Result<Vec<TopicCode>, DurableError>> + Send;
209    fn topic_list(&self) -> impl Future<Output = Result<Vec<TopicConfig>, DurableError>> + Send;
210}
211
212mod sealed {
213    use std::{future::Future, pin::Pin};
214
215    use asteroid_mq_model::MessageStateUpdate;
216
217    use crate::{
218        prelude::TopicCode,
219        protocol::{message::*, node::raft::state_machine::topic::config::TopicConfig},
220    };
221
222    use super::{Durable, DurableError, DurableMessage, DurableMessageQuery};
223
224    pub(super) trait DurabilityObjectTrait: Send + Sync + 'static {
225        fn save(
226            &self,
227            topic: TopicCode,
228            message: DurableMessage,
229        ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>>;
230        fn retrieve(
231            &self,
232            topic: TopicCode,
233            message_id: MessageId,
234        ) -> Pin<Box<dyn Future<Output = Result<DurableMessage, DurableError>> + Send + '_>>;
235        fn update_status(
236            &self,
237            topic: TopicCode,
238            update: MessageStateUpdate,
239        ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>>;
240        fn batch_retrieve(
241            &self,
242            topic: TopicCode,
243            query: DurableMessageQuery,
244        ) -> Pin<Box<dyn Future<Output = Result<Vec<DurableMessage>, DurableError>> + Send + '_>>;
245        fn archive(
246            &self,
247            topic: TopicCode,
248            message_id: MessageId,
249        ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>>;
250        fn create_topic(
251            &self,
252            topic: TopicConfig,
253        ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>>;
254        fn delete_topic(
255            &self,
256            topic: TopicCode,
257        ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>>;
258        fn topic_code_list(
259            &self,
260        ) -> Pin<Box<dyn Future<Output = Result<Vec<TopicCode>, DurableError>> + Send + '_>>;
261        fn topic_list(
262            &self,
263        ) -> Pin<Box<dyn Future<Output = Result<Vec<TopicConfig>, DurableError>> + Send + '_>>;
264    }
265
266    impl<T> DurabilityObjectTrait for T
267    where
268        T: Durable,
269    {
270        #[inline(always)]
271        fn save(
272            &self,
273            topic: TopicCode,
274            message: DurableMessage,
275        ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>> {
276            Box::pin(self.save(topic, message))
277        }
278
279        #[inline(always)]
280        fn retrieve(
281            &self,
282            topic: TopicCode,
283            message_id: MessageId,
284        ) -> Pin<Box<dyn Future<Output = Result<DurableMessage, DurableError>> + Send + '_>>
285        {
286            Box::pin(self.retrieve(topic, message_id))
287        }
288        #[inline(always)]
289        fn batch_retrieve(
290            &self,
291            topic: TopicCode,
292            query: DurableMessageQuery,
293        ) -> Pin<Box<dyn Future<Output = Result<Vec<DurableMessage>, DurableError>> + Send + '_>>
294        {
295            Box::pin(self.batch_retrieve(topic, query))
296        }
297        #[inline(always)]
298        fn archive(
299            &self,
300            topic: TopicCode,
301            message_id: MessageId,
302        ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>> {
303            Box::pin(self.archive(topic, message_id))
304        }
305
306        #[inline(always)]
307        fn update_status(
308            &self,
309            topic: TopicCode,
310            update: MessageStateUpdate,
311        ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>> {
312            Box::pin(self.update_status(topic, update))
313        }
314        #[inline(always)]
315        fn create_topic(
316            &self,
317            topic: TopicConfig,
318        ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>> {
319            Box::pin(self.create_topic(topic))
320        }
321        #[inline(always)]
322        fn delete_topic(
323            &self,
324            topic: TopicCode,
325        ) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send + '_>> {
326            Box::pin(self.delete_topic(topic))
327        }
328        #[inline(always)]
329        fn topic_code_list(
330            &self,
331        ) -> Pin<Box<dyn Future<Output = Result<Vec<TopicCode>, DurableError>> + Send + '_>>
332        {
333            Box::pin(self.topic_code_list())
334        }
335        #[inline(always)]
336        fn topic_list(
337            &self,
338        ) -> Pin<Box<dyn Future<Output = Result<Vec<TopicConfig>, DurableError>> + Send + '_>>
339        {
340            Box::pin(self.topic_list())
341        }
342    }
343}