sos_system_messages/
system_messages.rs

1use crate::Error;
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use serde_with::{serde_as, DisplayFromStr};
5use std::{cmp::Ordering, collections::HashMap};
6use time::OffsetDateTime;
7use tokio::sync::broadcast;
8use urn::Urn;
9
10/// Boxed storage provider.
11pub type SystemMessageStorageProvider<E> =
12    Box<dyn SystemMessageStorage<Error = E> + Send + Sync + 'static>;
13
14/// Manages system messages.
15#[async_trait]
16pub trait SystemMessageManager {
17    /// Error type.
18    type Error: std::error::Error
19        + std::fmt::Debug
20        + From<Error>
21        + Send
22        + Sync
23        + 'static;
24
25    /// Load system messages for an account into memory.
26    async fn load_system_messages(&mut self) -> Result<(), Self::Error>;
27
28    /// Subscribe to the broadcast channel.
29    fn subscribe(&self) -> broadcast::Receiver<SysMessageCount>;
30
31    /// Number of system messages.
32    fn len(&self) -> usize;
33
34    /// Whether the system messages collection is empty.
35    fn is_empty(&self) -> bool;
36
37    /// Message counts.
38    fn counts(&self) -> SysMessageCount;
39
40    /// Iterator of the system messages.
41    fn iter(&self) -> impl Iterator<Item = (&Urn, &SysMessage)>;
42
43    /// Get a message.
44    fn get(&self, key: &Urn) -> Option<&SysMessage>;
45
46    /// Sorted list of system messages.
47    fn sorted_list(&self) -> Vec<(&Urn, &SysMessage)>;
48}
49
50/// Storage for system messages.
51#[async_trait]
52pub trait SystemMessageStorage {
53    /// Error type.
54    type Error: std::error::Error
55        + std::fmt::Debug
56        + From<Error>
57        + Send
58        + Sync
59        + 'static;
60
61    /// List system messages for an account.
62    async fn list_system_messages(
63        &self,
64    ) -> Result<SystemMessageMap, Self::Error>;
65
66    /// Add a system message to an account.
67    async fn insert_system_message(
68        &mut self,
69        key: Urn,
70        message: SysMessage,
71    ) -> Result<(), Self::Error>;
72
73    /// Remove a system message from an account.
74    async fn remove_system_message(
75        &mut self,
76        key: &Urn,
77    ) -> Result<(), Self::Error>;
78
79    /// Mark a system message as read or unread.
80    async fn mark_system_message(
81        &mut self,
82        key: &Urn,
83        is_read: bool,
84    ) -> Result<(), Self::Error>;
85
86    /// Delete all system messages for an account.
87    async fn clear_system_messages(&mut self) -> Result<(), Self::Error>;
88}
89
90/// System messages count.
91#[derive(Debug, Default, Clone, Eq, PartialEq)]
92pub struct SysMessageCount {
93    /// Total number of messages.
94    pub total: usize,
95    /// Number of unread messages.
96    pub unread: usize,
97    /// Number of unread info messages.
98    pub unread_info: usize,
99    /// Number of unread warn messages.
100    pub unread_warn: usize,
101    /// Number of unread error messages.
102    pub unread_error: usize,
103}
104
105/// Level for system messages.
106#[derive(
107    Debug,
108    Default,
109    Clone,
110    Serialize,
111    Deserialize,
112    Ord,
113    PartialOrd,
114    Eq,
115    PartialEq,
116)]
117#[serde(rename_all = "lowercase")]
118pub enum SysMessageLevel {
119    /// Informational message.
120    #[default]
121    Info,
122    /// Warning message.
123    Warn,
124    /// Error message.
125    Error,
126    /// Progress message such as an upload or download.
127    Progress,
128    /// Completed operation (eg: upload or download).
129    Done,
130}
131
132/// System message notification.
133///
134/// Higher priority messages are sorted before
135/// lower priority messages, if priorities are
136/// equal sorting uses the created date and time.
137#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
138#[serde(rename_all = "camelCase")]
139pub struct SysMessage {
140    /// Optional identifier for the message.
141    #[serde(default, skip_serializing_if = "Option::is_none")]
142    pub id: Option<u64>,
143    /// Date and time the message was created.
144    pub created: OffsetDateTime,
145    /// Message priority.
146    pub priority: usize,
147    /// Title for the message.
148    pub title: String,
149    /// Sub title byline for the message.
150    pub sub_title: Option<String>,
151    /// Content of the message.
152    #[serde(default, skip_serializing_if = "Option::is_none")]
153    pub content: Option<String>,
154    /// Indicates if the message has been read.
155    pub is_read: bool,
156    /// Level indicator.
157    pub level: SysMessageLevel,
158}
159impl SysMessage {
160    /// Create a new message.
161    pub fn new(title: String, content: String) -> Self {
162        Self {
163            id: None,
164            created: OffsetDateTime::now_utc(),
165            priority: 0,
166            title,
167            sub_title: None,
168            content: Some(content),
169            is_read: false,
170            level: Default::default(),
171        }
172    }
173
174    /// Create a new message with the given priority and level.
175    pub fn new_priority(
176        title: String,
177        content: String,
178        priority: usize,
179        level: SysMessageLevel,
180    ) -> Self {
181        Self {
182            id: None,
183            created: OffsetDateTime::now_utc(),
184            priority,
185            title,
186            sub_title: None,
187            content: Some(content),
188            is_read: false,
189            level,
190        }
191    }
192}
193
194impl PartialOrd for SysMessage {
195    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
196        Some(self.cmp(other))
197    }
198}
199
200impl Ord for SysMessage {
201    fn cmp(&self, other: &Self) -> Ordering {
202        match other.priority.cmp(&self.priority) {
203            std::cmp::Ordering::Equal => other.created.cmp(&self.created),
204            result => result,
205        }
206    }
207}
208
209/// Collection of system messages.
210#[serde_as]
211#[derive(Default, Clone, Debug, Serialize, Deserialize)]
212pub struct SystemMessageMap(
213    #[serde_as(as = "HashMap<DisplayFromStr, _>")]
214    pub  HashMap<Urn, SysMessage>,
215);
216
217impl From<HashMap<Urn, SysMessage>> for SystemMessageMap {
218    fn from(value: HashMap<Urn, SysMessage>) -> Self {
219        Self(value)
220    }
221}
222
223impl SystemMessageMap {
224    /// Borrowed iterator.
225    pub fn iter(
226        &self,
227    ) -> std::collections::hash_map::Iter<'_, Urn, SysMessage> {
228        self.0.iter()
229    }
230
231    /// Owned iterator.
232    pub fn into_iter(
233        self,
234    ) -> std::collections::hash_map::IntoIter<Urn, SysMessage> {
235        self.0.into_iter()
236    }
237}
238
239/// Persistent system message notifications.
240pub struct SystemMessages<E>
241where
242    E: std::error::Error
243        + std::fmt::Debug
244        + From<Error>
245        + Send
246        + Sync
247        + 'static,
248{
249    provider: SystemMessageStorageProvider<E>,
250    messages: SystemMessageMap,
251    channel: broadcast::Sender<SysMessageCount>,
252}
253
254impl<E> SystemMessages<E>
255where
256    E: std::error::Error
257        + std::fmt::Debug
258        + From<Error>
259        + Send
260        + Sync
261        + 'static,
262{
263    /// Create new system messages.
264    pub fn new(provider: SystemMessageStorageProvider<E>) -> Self {
265        let (channel, _) = broadcast::channel(8);
266        Self {
267            provider,
268            messages: Default::default(),
269            channel,
270        }
271    }
272
273    fn send_counts(&self) {
274        if let Err(e) = self.channel.send(self.counts()) {
275            tracing::error!(error = %e, "system_messages::send");
276        }
277    }
278}
279
280#[async_trait]
281impl<E> SystemMessageManager for SystemMessages<E>
282where
283    E: std::error::Error
284        + std::fmt::Debug
285        + From<Error>
286        + Send
287        + Sync
288        + 'static,
289{
290    type Error = E;
291
292    async fn load_system_messages(&mut self) -> Result<(), E> {
293        self.messages = self.provider.list_system_messages().await?;
294        Ok(())
295    }
296
297    fn subscribe(&self) -> broadcast::Receiver<SysMessageCount> {
298        self.channel.subscribe()
299    }
300
301    fn len(&self) -> usize {
302        self.messages.0.len()
303    }
304
305    fn is_empty(&self) -> bool {
306        self.messages.0.is_empty()
307    }
308
309    fn counts(&self) -> SysMessageCount {
310        let mut counts = SysMessageCount { total: self.messages.0.len(), ..Default::default() };
311        for item in self.messages.0.values() {
312            if !item.is_read {
313                counts.unread += 1;
314                if matches!(item.level, SysMessageLevel::Info) {
315                    counts.unread_info += 1;
316                }
317                if matches!(item.level, SysMessageLevel::Warn) {
318                    counts.unread_warn += 1;
319                }
320                if matches!(item.level, SysMessageLevel::Error) {
321                    counts.unread_error += 1;
322                }
323            }
324        }
325        counts
326    }
327
328    fn iter(&self) -> impl Iterator<Item = (&Urn, &SysMessage)> {
329        self.messages.iter()
330    }
331
332    fn get(&self, key: &Urn) -> Option<&SysMessage> {
333        self.messages.0.get(key)
334    }
335
336    fn sorted_list(&self) -> Vec<(&Urn, &SysMessage)> {
337        let mut messages: Vec<_> = self.messages.iter().collect();
338        messages.sort_by(|a, b| a.1.cmp(b.1));
339        messages
340    }
341}
342
343#[async_trait]
344impl<E> SystemMessageStorage for SystemMessages<E>
345where
346    E: std::error::Error
347        + std::fmt::Debug
348        + From<Error>
349        + Send
350        + Sync
351        + 'static,
352{
353    type Error = E;
354
355    async fn list_system_messages(
356        &self,
357    ) -> Result<SystemMessageMap, Self::Error> {
358        self.provider.list_system_messages().await
359    }
360
361    async fn insert_system_message(
362        &mut self,
363        key: Urn,
364        message: SysMessage,
365    ) -> Result<(), Self::Error> {
366        self.messages.0.insert(key.clone(), message.clone());
367        self.provider.insert_system_message(key, message).await?;
368        self.send_counts();
369        Ok(())
370    }
371
372    async fn remove_system_message(
373        &mut self,
374        key: &Urn,
375    ) -> Result<(), Self::Error> {
376        self.messages.0.remove(key);
377        self.provider.remove_system_message(key).await?;
378        self.send_counts();
379        Ok(())
380    }
381
382    async fn mark_system_message(
383        &mut self,
384        key: &Urn,
385        is_read: bool,
386    ) -> Result<(), Self::Error> {
387        if let Some(message) = self.messages.0.get_mut(key) {
388            message.is_read = true;
389            self.provider.mark_system_message(key, is_read).await?;
390            self.send_counts();
391            Ok(())
392        } else {
393            Err(Error::NoSysMessage(key.to_string()).into())
394        }
395    }
396
397    async fn clear_system_messages(&mut self) -> Result<(), Self::Error> {
398        self.messages = Default::default();
399        self.provider.clear_system_messages().await?;
400        self.send_counts();
401        Ok(())
402    }
403}