dge_runtime/
rmq.rs

1use futures::Future;
2use lapin::options::BasicQosOptions;
3use lapin::Channel;
4use log::debug;
5use log::info;
6use log::warn;
7use serde::de::DeserializeOwned;
8use serde_json;
9
10use super::rmq_primitive::constant::*;
11use super::rmq_primitive::create_channel;
12use super::rmq_primitive::unreliable_ack_or_reject;
13use super::rmq_primitive::AckType;
14use super::rmq_primitive::Responsibility;
15use crate::Result;
16
17/// Read a message of type `InputMsg` from `input_queue`, and process it with `handler`.
18///
19/// During the processing, the `handler` can access its state of type `HandlerState`,
20/// and use the `Channel` to publish messages to other queues.
21///
22/// The `handler` can return `Ok(Responsibility::Accept)` to indicate that
23/// the handler has taken care of the message been processed,
24/// (i.e. the responsibility has transferred from RabbitMQ to the handler code),
25/// this will cause an RabbitMQ ack to be send back to the RabbitMQ server,
26/// rendering the consumption of the message.
27///
28/// The `handler` can also return `Ok<Responsibility::Reject>` to indicate that
29/// the handler rejects the message,
30/// in this case an RabbitMQ `basic.reject` will be sent back to the RabbitMQ server,
31/// and the message will be redelivered later.
32///
33/// For convenience, and `Err(_)` is treated largely in a similar way with a rejection,
34/// the difference is that an `Ok<Responsibility::Reject>` is considered as an intentional rejection,
35/// thus no warnings are logged, whereas an `Err(_)` is treated as an unintentional rejection,
36/// which will cause warnings to be logged.
37///
38/// This function never terminates.
39///
40/// If the connection to the RabbitMQ server drops, it will be retried.
41pub async fn consume_forever<InputMsg, HandlerState, HandlerResult>(
42    rmq_uri: &str,
43    input_queue: &'static str,
44    handler: fn(HandlerState, Channel, InputMsg) -> HandlerResult,
45    handler_state: HandlerState,
46    prefetch_count: u16,
47) where
48    InputMsg: DeserializeOwned + Send + 'static,
49    HandlerState: Clone + Send + 'static,
50    HandlerResult: Future<Output = Result<Responsibility>> + Send + 'static,
51{
52    loop {
53        // establish connection to rmq server and consume the queue
54        match consume_queue(
55            rmq_uri,
56            &input_queue,
57            handler,
58            handler_state.clone(),
59            prefetch_count,
60        )
61        .await
62        {
63            Ok(()) => (),
64            Err(e) => {
65                warn!(
66                    "error happened when consuming queue {}, will retry: {}",
67                    &input_queue, e
68                );
69            }
70        }
71
72        // sleep for a while before reconnecting to avoid rapid fire
73        let duration = std::time::Duration::from_millis(2000);
74        info!(
75            "sleep for {} seconds before reconnecting to queue {}",
76            &duration.as_secs(),
77            &input_queue
78        );
79        tokio::time::sleep(duration).await
80    }
81}
82
83async fn consume_queue<InputMsg, HandlerState, HandlerResult>(
84    rmq_uri: &str,
85    input_queue: &'static str,
86    handler: fn(HandlerState, Channel, InputMsg) -> HandlerResult,
87    handler_state: HandlerState,
88    prefetch_count: u16,
89) -> Result<()>
90where
91    InputMsg: DeserializeOwned + Send + 'static,
92    HandlerState: Clone + Send + 'static,
93    HandlerResult: Future<Output = Result<Responsibility>> + Send + 'static,
94{
95    // establish communication
96    info!("creating channel for consuming queue {}", input_queue);
97    let channel = create_channel(rmq_uri).await?;
98    info!("setting prefetch to be {}", prefetch_count);
99    channel
100        .basic_qos(prefetch_count, BasicQosOptions { global: false })
101        .await?;
102
103    // create consumer
104    // since each queue has exactly one consumer,
105    // we can use the queue name as the identifier
106    let consumer_tag = format!("dgec-{}", input_queue);
107    info!("creating consumer {}", &consumer_tag);
108    let consumer = channel
109        .basic_consume(
110            input_queue,
111            &consumer_tag,
112            RMQ_BASIC_CONSUME_OPTIONS,
113            lapin::types::FieldTable::default(),
114        )
115        .await?;
116
117    // consuming loop
118    info!("entering consuming loop for queue {}", input_queue);
119    for delivery in consumer {
120        let (channel, msg) = delivery?;
121        tokio::spawn(handle_one_delivery(
122            channel,
123            msg,
124            handler,
125            handler_state.clone(),
126        ));
127    }
128
129    Ok(())
130}
131
132async fn handle_one_delivery<InputMsg, HandlerState, HandlerResult>(
133    channel: Channel,
134    delivery: lapin::message::Delivery,
135    handle: fn(HandlerState, Channel, InputMsg) -> HandlerResult,
136    handler_state: HandlerState,
137) where
138    InputMsg: DeserializeOwned + Send + 'static,
139    HandlerState: Clone + Send + 'static,
140    HandlerResult: Future<Output = Result<Responsibility>> + Send + 'static,
141{
142    debug!("processing message of tag: {}", delivery.delivery_tag);
143    match serde_json::from_slice::<InputMsg>(&delivery.data) {
144        Err(e) => {
145            // json parse failed, we just warn and drop the message.
146            // in theory, there are many ways to signal the error, either to the sender, or to a human,
147            // but this shouldn't happen often, and probably due to a programming error made by a human,
148            // so we just issue a warning
149            warn!(
150                "failed to parse json when processing delivery: {}, msg will be dropped, error is: {}, data is: {:?}",
151                &delivery.delivery_tag, e, &delivery.data
152            );
153            unreliable_ack_or_reject(channel, AckType::Ack, delivery.delivery_tag).await
154        }
155        Ok(msg) => match handle(handler_state, channel.clone(), msg).await {
156            Err(e) => {
157                warn!(
158                    "an error occurred while handling message {}, will requeue it, error is: {}",
159                    &delivery.delivery_tag, e
160                );
161                unreliable_ack_or_reject(channel, AckType::Reject, delivery.delivery_tag).await
162            }
163            Ok(Responsibility::Reject) => {
164                debug!("explicitly rejecting message {}", &delivery.delivery_tag);
165                unreliable_ack_or_reject(channel, AckType::Reject, delivery.delivery_tag).await
166            }
167
168            Ok(Responsibility::Accept) => {
169                debug!("accepting message {}", &delivery.delivery_tag);
170                unreliable_ack_or_reject(channel, AckType::Ack, delivery.delivery_tag).await
171            }
172        },
173    }
174}