1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
//! # SQSListen, a simple listener for AWS SQS queue.
//!
//! It allows you to set listener to your AWS SQS queue which will ask for the available messages in the queue and call the passed handler when the message received.
//! Once message received and processed (does not matter if handler returns error or not) the message is removed from the queue.
//!
//! ## Usage
//! ```rust
//! use sqslisten::{ReceiveMessageRequest, Region, SQSListen};
//! use std::{thread, time};
//!
//! fn main() {
//!     let mut sqs_listener = SQSListen::new(Region::UsEast1);
//!     let handle = sqs_listener.listen(
//!         ReceiveMessageRequest {
//!             queue_url: "<queue_url>".to_string(),
//!             ..ReceiveMessageRequest::default()
//!         },
//!         |msg, err| {
//!             match msg {
//!                 Some(message) => println!("Message received: {:?}", message),
//!                 None => {}
//!             }
//!
//!             match err {
//!                 Some(error) => println!("Error received: {:?}", error),
//!                 None => {}
//!             }
//!
//!             return Ok(());
//!         },
//!     );

//!     let ten_seconds = time::Duration::from_millis(100000);
//!     thread::sleep(ten_seconds);
//!
//!     handle.stop();
//! }
//! ```

extern crate rusoto_core;
extern crate rusoto_sqs;

pub use rusoto_core::{DispatchSignedRequest, ProvideAwsCredentials, Region, RusotoError};
pub use rusoto_sqs::{
    DeleteMessageRequest, Message, ReceiveMessageError, ReceiveMessageRequest, ReceiveMessageResult,
};

use clokwerk::{ScheduleHandle, Scheduler, TimeUnits};
use rusoto_sqs::{Sqs, SqsClient};
use std::option::Option;
use std::time::Duration;

#[derive(Clone)]
pub struct SQSListen {
    sqs_client: SqsClient,
    queue_url: String,
}

#[derive(Debug, Clone)]
pub struct HandlerError;

trait SQSListenHandler {
    fn handle() {}
}

impl SQSListen {
    pub fn new(region: Region) -> SQSListen {
        SQSListen {
            sqs_client: SqsClient::new(region),
            queue_url: "".to_string(),
        }
    }

    pub fn new_with<P, D>(
        request_dispatcher: D,
        credentials_provider: P,
        region: Region,
    ) -> SQSListen
    where
        P: ProvideAwsCredentials + Send + Sync + 'static,
        P::Future: Send,
        D: DispatchSignedRequest + Send + Sync + 'static,
        D::Future: Send,
    {
        SQSListen {
            sqs_client: SqsClient::new_with(request_dispatcher, credentials_provider, region),
            queue_url: "".to_string(),
        }
    }

    pub fn listen<F>(&mut self, input: ReceiveMessageRequest, handler: F) -> ScheduleHandle
    where
        F: Fn(
                Option<&Message>,
                Option<RusotoError<ReceiveMessageError>>,
            ) -> Result<(), HandlerError>
            + Send
            + Sync
            + 'static,
    {
        const DEFAULT_INTERVAL: u32 = 1;

        self.queue_url = input.queue_url.clone();
        let sqs_client = self.sqs_client.clone();
        let that = self.clone();

        let interval = match input.wait_time_seconds {
            Some(wait_time) => (wait_time as u32 + 1).seconds(),
            None => DEFAULT_INTERVAL.seconds(),
        };

        let mut scheduler = Scheduler::new();
        scheduler.every(interval).run(move || {
            match sqs_client.receive_message(input.clone()).sync() {
                Ok(response) => that.process_response(&response, &handler),
                Err(err) => {
                    let _ignored = handler(None, Some(err));
                }
            }
        });
        scheduler.watch_thread(Duration::from_millis(100))
    }

    fn process_response<F>(&self, response: &ReceiveMessageResult, handler: &F)
    where
        F: Fn(
                Option<&Message>,
                Option<RusotoError<ReceiveMessageError>>,
            ) -> Result<(), HandlerError>
            + Send
            + Sync
            + 'static,
    {
        match &response.messages {
            Some(messages) => {
                for message in messages {
                    let _ignored = handler(Some(&message), None);
                    self.ack_message(&message);
                }
            }
            None => {}
        }
    }

    fn ack_message(&self, message: &Message) {
        if message.receipt_handle.is_none() {
            return;
        }

        let _ignore = self
            .sqs_client
            .delete_message(DeleteMessageRequest {
                queue_url: self.queue_url.clone(),
                receipt_handle: message.receipt_handle.clone().unwrap(),
            })
            .sync();
    }
}