prowl_queue/
queue.rs

1use crate::{LinearRetry, ProwlQueueOptions, RetryMethod};
2use prowl::Notification;
3use thiserror::Error;
4use tokio::{
5    sync::mpsc::{error::SendError, unbounded_channel, UnboundedReceiver, UnboundedSender},
6    time::{sleep, Duration},
7};
8
9pub struct ProwlQueue {
10    prowl_queue_reciever: ProwlQueueReceiver,
11    prowl_queue_sender: ProwlQueueSender,
12}
13
14#[derive(Clone)]
15pub struct ProwlQueueSender {
16    sender: UnboundedSender<Notification>,
17}
18
19pub struct ProwlQueueReceiver {
20    options: ProwlQueueOptions,
21    reciever: UnboundedReceiver<Notification>,
22}
23
24#[derive(Debug, Error)]
25pub enum AddError {
26    /// Tokio failed to queue the notification.
27    #[error("{0}")]
28    SendError(SendError<Notification>),
29}
30
31/// Implements a Tokio mpsc backed notification queue.
32impl ProwlQueue {
33    pub fn new(options: ProwlQueueOptions) -> Self {
34        let (sender, reciever) = unbounded_channel();
35        let prowl_queue_reciever = ProwlQueueReceiver { options, reciever };
36        let prowl_queue_sender = ProwlQueueSender { sender };
37
38        Self {
39            prowl_queue_reciever,
40            prowl_queue_sender,
41        }
42    }
43
44    pub fn into_parts(self) -> (ProwlQueueSender, ProwlQueueReceiver) {
45        (self.prowl_queue_sender, self.prowl_queue_reciever)
46    }
47}
48
49impl ProwlQueueSender {
50    /// Queue a notification for sending
51    pub fn add(&self, notification: Notification) -> Result<(), Box<AddError>> {
52        self.sender
53            .send(notification)
54            .map_err(AddError::SendError)
55            .map_err(Box::new)?;
56        Ok(())
57    }
58}
59
60impl ProwlQueueReceiver {
61    /// Spawn a recv'ing loop that will continue to process and
62    /// retry notifications. Stops processing with the sender half
63    /// is dropped.
64    pub async fn async_loop(mut self) {
65        log::debug!("Notifications channel processor started.");
66        while let Some(notification) = self.reciever.recv().await {
67            let mut retry = 0;
68            'notification: loop {
69                match notification.add().await {
70                    Ok(_) => break 'notification,
71                    Err(prowl::AddError::Send(e)) => {
72                        log::warn!("Will retry notification. Try {retry} failed due to {:?}", e);
73                    }
74                    Err(e) => {
75                        // API or internal error - lets not hammer with invalid requests.
76                        // TODO: don't break if 5xx response
77                        log::error!("Terminally failed to send notification due to {:?}", e);
78                        break 'notification;
79                    }
80                }
81
82                match self.options.retry_method() {
83                    RetryMethod::Linear(linear_retry) => {
84                        if let Some(max) = linear_retry.max_retries() {
85                            if retry + 1 > *max {
86                                log::warn!("Dropping notification {:?} because it's on retry {retry} when max_retries is {}", notification, max);
87                                break 'notification;
88                            }
89                        }
90                        sleep(*linear_retry.backoff()).await;
91                    }
92                }
93
94                retry += 1;
95            }
96        }
97        log::warn!("Notification channel has been closed.");
98    }
99
100    pub fn to_unbound_receiver(self) -> UnboundedReceiver<Notification> {
101        self.reciever
102    }
103}
104
105impl Default for ProwlQueue {
106    fn default() -> Self {
107        let retry_method = LinearRetry::new(Duration::from_secs(60), None);
108        let retry_method = RetryMethod::Linear(retry_method);
109        let options = ProwlQueueOptions::new(retry_method);
110
111        let (sender, reciever) = unbounded_channel();
112        let prowl_queue_reciever = ProwlQueueReceiver { options, reciever };
113        let prowl_queue_sender = ProwlQueueSender { sender };
114
115        Self {
116            prowl_queue_reciever,
117            prowl_queue_sender,
118        }
119    }
120}