1use crate::Error;
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use serde_with::{serde_as, DisplayFromStr};
5use std::{cmp::Ordering, collections::HashMap};
6use time::OffsetDateTime;
7use tokio::sync::broadcast;
8use urn::Urn;
9
10pub type SystemMessageStorageProvider<E> =
12 Box<dyn SystemMessageStorage<Error = E> + Send + Sync + 'static>;
13
14#[async_trait]
16pub trait SystemMessageManager {
17 type Error: std::error::Error
19 + std::fmt::Debug
20 + From<Error>
21 + Send
22 + Sync
23 + 'static;
24
25 async fn load_system_messages(&mut self) -> Result<(), Self::Error>;
27
28 fn subscribe(&self) -> broadcast::Receiver<SysMessageCount>;
30
31 fn len(&self) -> usize;
33
34 fn is_empty(&self) -> bool;
36
37 fn counts(&self) -> SysMessageCount;
39
40 fn iter(&self) -> impl Iterator<Item = (&Urn, &SysMessage)>;
42
43 fn get(&self, key: &Urn) -> Option<&SysMessage>;
45
46 fn sorted_list(&self) -> Vec<(&Urn, &SysMessage)>;
48}
49
50#[async_trait]
52pub trait SystemMessageStorage {
53 type Error: std::error::Error
55 + std::fmt::Debug
56 + From<Error>
57 + Send
58 + Sync
59 + 'static;
60
61 async fn list_system_messages(
63 &self,
64 ) -> Result<SystemMessageMap, Self::Error>;
65
66 async fn insert_system_message(
68 &mut self,
69 key: Urn,
70 message: SysMessage,
71 ) -> Result<(), Self::Error>;
72
73 async fn remove_system_message(
75 &mut self,
76 key: &Urn,
77 ) -> Result<(), Self::Error>;
78
79 async fn mark_system_message(
81 &mut self,
82 key: &Urn,
83 is_read: bool,
84 ) -> Result<(), Self::Error>;
85
86 async fn clear_system_messages(&mut self) -> Result<(), Self::Error>;
88}
89
90#[derive(Debug, Default, Clone, Eq, PartialEq)]
92pub struct SysMessageCount {
93 pub total: usize,
95 pub unread: usize,
97 pub unread_info: usize,
99 pub unread_warn: usize,
101 pub unread_error: usize,
103}
104
105#[derive(
107 Debug,
108 Default,
109 Clone,
110 Serialize,
111 Deserialize,
112 Ord,
113 PartialOrd,
114 Eq,
115 PartialEq,
116)]
117#[serde(rename_all = "lowercase")]
118pub enum SysMessageLevel {
119 #[default]
121 Info,
122 Warn,
124 Error,
126 Progress,
128 Done,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
138#[serde(rename_all = "camelCase")]
139pub struct SysMessage {
140 #[serde(default, skip_serializing_if = "Option::is_none")]
142 pub id: Option<u64>,
143 pub created: OffsetDateTime,
145 pub priority: usize,
147 pub title: String,
149 pub sub_title: Option<String>,
151 #[serde(default, skip_serializing_if = "Option::is_none")]
153 pub content: Option<String>,
154 pub is_read: bool,
156 pub level: SysMessageLevel,
158}
159impl SysMessage {
160 pub fn new(title: String, content: String) -> Self {
162 Self {
163 id: None,
164 created: OffsetDateTime::now_utc(),
165 priority: 0,
166 title,
167 sub_title: None,
168 content: Some(content),
169 is_read: false,
170 level: Default::default(),
171 }
172 }
173
174 pub fn new_priority(
176 title: String,
177 content: String,
178 priority: usize,
179 level: SysMessageLevel,
180 ) -> Self {
181 Self {
182 id: None,
183 created: OffsetDateTime::now_utc(),
184 priority,
185 title,
186 sub_title: None,
187 content: Some(content),
188 is_read: false,
189 level,
190 }
191 }
192}
193
194impl PartialOrd for SysMessage {
195 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
196 Some(self.cmp(other))
197 }
198}
199
200impl Ord for SysMessage {
201 fn cmp(&self, other: &Self) -> Ordering {
202 match other.priority.cmp(&self.priority) {
203 std::cmp::Ordering::Equal => other.created.cmp(&self.created),
204 result => result,
205 }
206 }
207}
208
209#[serde_as]
211#[derive(Default, Clone, Debug, Serialize, Deserialize)]
212pub struct SystemMessageMap(
213 #[serde_as(as = "HashMap<DisplayFromStr, _>")]
214 pub HashMap<Urn, SysMessage>,
215);
216
217impl From<HashMap<Urn, SysMessage>> for SystemMessageMap {
218 fn from(value: HashMap<Urn, SysMessage>) -> Self {
219 Self(value)
220 }
221}
222
223impl SystemMessageMap {
224 pub fn iter(
226 &self,
227 ) -> std::collections::hash_map::Iter<'_, Urn, SysMessage> {
228 self.0.iter()
229 }
230
231 pub fn into_iter(
233 self,
234 ) -> std::collections::hash_map::IntoIter<Urn, SysMessage> {
235 self.0.into_iter()
236 }
237}
238
239pub struct SystemMessages<E>
241where
242 E: std::error::Error
243 + std::fmt::Debug
244 + From<Error>
245 + Send
246 + Sync
247 + 'static,
248{
249 provider: SystemMessageStorageProvider<E>,
250 messages: SystemMessageMap,
251 channel: broadcast::Sender<SysMessageCount>,
252}
253
254impl<E> SystemMessages<E>
255where
256 E: std::error::Error
257 + std::fmt::Debug
258 + From<Error>
259 + Send
260 + Sync
261 + 'static,
262{
263 pub fn new(provider: SystemMessageStorageProvider<E>) -> Self {
265 let (channel, _) = broadcast::channel(8);
266 Self {
267 provider,
268 messages: Default::default(),
269 channel,
270 }
271 }
272
273 fn send_counts(&self) {
274 if let Err(e) = self.channel.send(self.counts()) {
275 tracing::error!(error = %e, "system_messages::send");
276 }
277 }
278}
279
280#[async_trait]
281impl<E> SystemMessageManager for SystemMessages<E>
282where
283 E: std::error::Error
284 + std::fmt::Debug
285 + From<Error>
286 + Send
287 + Sync
288 + 'static,
289{
290 type Error = E;
291
292 async fn load_system_messages(&mut self) -> Result<(), E> {
293 self.messages = self.provider.list_system_messages().await?;
294 Ok(())
295 }
296
297 fn subscribe(&self) -> broadcast::Receiver<SysMessageCount> {
298 self.channel.subscribe()
299 }
300
301 fn len(&self) -> usize {
302 self.messages.0.len()
303 }
304
305 fn is_empty(&self) -> bool {
306 self.messages.0.is_empty()
307 }
308
309 fn counts(&self) -> SysMessageCount {
310 let mut counts = SysMessageCount { total: self.messages.0.len(), ..Default::default() };
311 for item in self.messages.0.values() {
312 if !item.is_read {
313 counts.unread += 1;
314 if matches!(item.level, SysMessageLevel::Info) {
315 counts.unread_info += 1;
316 }
317 if matches!(item.level, SysMessageLevel::Warn) {
318 counts.unread_warn += 1;
319 }
320 if matches!(item.level, SysMessageLevel::Error) {
321 counts.unread_error += 1;
322 }
323 }
324 }
325 counts
326 }
327
328 fn iter(&self) -> impl Iterator<Item = (&Urn, &SysMessage)> {
329 self.messages.iter()
330 }
331
332 fn get(&self, key: &Urn) -> Option<&SysMessage> {
333 self.messages.0.get(key)
334 }
335
336 fn sorted_list(&self) -> Vec<(&Urn, &SysMessage)> {
337 let mut messages: Vec<_> = self.messages.iter().collect();
338 messages.sort_by(|a, b| a.1.cmp(b.1));
339 messages
340 }
341}
342
343#[async_trait]
344impl<E> SystemMessageStorage for SystemMessages<E>
345where
346 E: std::error::Error
347 + std::fmt::Debug
348 + From<Error>
349 + Send
350 + Sync
351 + 'static,
352{
353 type Error = E;
354
355 async fn list_system_messages(
356 &self,
357 ) -> Result<SystemMessageMap, Self::Error> {
358 self.provider.list_system_messages().await
359 }
360
361 async fn insert_system_message(
362 &mut self,
363 key: Urn,
364 message: SysMessage,
365 ) -> Result<(), Self::Error> {
366 self.messages.0.insert(key.clone(), message.clone());
367 self.provider.insert_system_message(key, message).await?;
368 self.send_counts();
369 Ok(())
370 }
371
372 async fn remove_system_message(
373 &mut self,
374 key: &Urn,
375 ) -> Result<(), Self::Error> {
376 self.messages.0.remove(key);
377 self.provider.remove_system_message(key).await?;
378 self.send_counts();
379 Ok(())
380 }
381
382 async fn mark_system_message(
383 &mut self,
384 key: &Urn,
385 is_read: bool,
386 ) -> Result<(), Self::Error> {
387 if let Some(message) = self.messages.0.get_mut(key) {
388 message.is_read = true;
389 self.provider.mark_system_message(key, is_read).await?;
390 self.send_counts();
391 Ok(())
392 } else {
393 Err(Error::NoSysMessage(key.to_string()).into())
394 }
395 }
396
397 async fn clear_system_messages(&mut self) -> Result<(), Self::Error> {
398 self.messages = Default::default();
399 self.provider.clear_system_messages().await?;
400 self.send_counts();
401 Ok(())
402 }
403}