1use crate::{LinearRetry, ProwlQueueOptions, RetryMethod};
2use prowl::Notification;
3use thiserror::Error;
4use tokio::{
5 sync::mpsc::{error::SendError, unbounded_channel, UnboundedReceiver, UnboundedSender},
6 time::{sleep, Duration},
7};
8
9pub struct ProwlQueue {
10 prowl_queue_reciever: ProwlQueueReceiver,
11 prowl_queue_sender: ProwlQueueSender,
12}
13
14#[derive(Clone)]
15pub struct ProwlQueueSender {
16 sender: UnboundedSender<Notification>,
17}
18
19pub struct ProwlQueueReceiver {
20 options: ProwlQueueOptions,
21 reciever: UnboundedReceiver<Notification>,
22}
23
24#[derive(Debug, Error)]
25pub enum AddError {
26 #[error("{0}")]
28 SendError(SendError<Notification>),
29}
30
31impl ProwlQueue {
33 pub fn new(options: ProwlQueueOptions) -> Self {
34 let (sender, reciever) = unbounded_channel();
35 let prowl_queue_reciever = ProwlQueueReceiver { options, reciever };
36 let prowl_queue_sender = ProwlQueueSender { sender };
37
38 Self {
39 prowl_queue_reciever,
40 prowl_queue_sender,
41 }
42 }
43
44 pub fn into_parts(self) -> (ProwlQueueSender, ProwlQueueReceiver) {
45 (self.prowl_queue_sender, self.prowl_queue_reciever)
46 }
47}
48
49impl ProwlQueueSender {
50 pub fn add(&self, notification: Notification) -> Result<(), Box<AddError>> {
52 self.sender
53 .send(notification)
54 .map_err(AddError::SendError)
55 .map_err(Box::new)?;
56 Ok(())
57 }
58}
59
60impl ProwlQueueReceiver {
61 pub async fn async_loop(mut self) {
65 log::debug!("Notifications channel processor started.");
66 while let Some(notification) = self.reciever.recv().await {
67 let mut retry = 0;
68 'notification: loop {
69 match notification.add().await {
70 Ok(_) => break 'notification,
71 Err(prowl::AddError::Send(e)) => {
72 log::warn!("Will retry notification. Try {retry} failed due to {:?}", e);
73 }
74 Err(e) => {
75 log::error!("Terminally failed to send notification due to {:?}", e);
78 break 'notification;
79 }
80 }
81
82 match self.options.retry_method() {
83 RetryMethod::Linear(linear_retry) => {
84 if let Some(max) = linear_retry.max_retries() {
85 if retry + 1 > *max {
86 log::warn!("Dropping notification {:?} because it's on retry {retry} when max_retries is {}", notification, max);
87 break 'notification;
88 }
89 }
90 sleep(*linear_retry.backoff()).await;
91 }
92 }
93
94 retry += 1;
95 }
96 }
97 log::warn!("Notification channel has been closed.");
98 }
99
100 pub fn to_unbound_receiver(self) -> UnboundedReceiver<Notification> {
101 self.reciever
102 }
103}
104
105impl Default for ProwlQueue {
106 fn default() -> Self {
107 let retry_method = LinearRetry::new(Duration::from_secs(60), None);
108 let retry_method = RetryMethod::Linear(retry_method);
109 let options = ProwlQueueOptions::new(retry_method);
110
111 let (sender, reciever) = unbounded_channel();
112 let prowl_queue_reciever = ProwlQueueReceiver { options, reciever };
113 let prowl_queue_sender = ProwlQueueSender { sender };
114
115 Self {
116 prowl_queue_reciever,
117 prowl_queue_sender,
118 }
119 }
120}