imsub_log_crate/
rabbitmq.rs

1use std::ops::DerefMut;
2use std::sync::{Arc, RwLock};
3use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
4
5use lapin::{options::*, BasicProperties, Channel, Connection, ConnectionProperties, ConnectionState};
6use tracing::Level;
7use super::error::Error;
8
9use super::message::{LogMessage, LogMessage_Severity};
10
11pub struct RabbitMqClient {
12    _ampq_url: String,
13    _conn: Arc<RwLock<Connection>>,
14    queue_sender: UnboundedSender<LogMessage>,
15    _reconnection_max: u8,
16    service_name: String,
17}
18
19impl RabbitMqClient {
20    pub async fn new(ampq_url: String, service_name: String) -> Result<Self, Error> {
21        let conn = Arc::new(RwLock::new(
22            RabbitMqClient::connect(ampq_url.clone()).await?,
23        ));
24        let (tx, mut rx) = unbounded_channel::<LogMessage>();
25        let lock_conn = conn.read()?;
26        let mut channel = lock_conn.create_channel().await?;
27        drop(lock_conn);
28
29        let conn_move = conn.clone();
30        let ampq_url_move = ampq_url.clone();
31        let reconnection_max = 4;
32        let reconnection_max_move = reconnection_max;
33        tokio::spawn(async move {
34
35            while let Some(message) = rx.recv().await {
36
37                let mut retry: u8 = 0;
38                loop {
39                    if retry > reconnection_max_move {
40                        break
41                    }
42
43                    if let Err(_err) = RabbitMqClient::publish_message_static(&channel, message.clone()).await {
44                        let new_conn = match RabbitMqClient::connect(ampq_url_move.clone()).await {
45                            Ok(conn) => conn,
46                            Err(_err) => {
47                                retry += 1;
48                                println!("RabbitMQ reconnection failed");
49                                continue
50                            }
51                        };
52                        channel = match new_conn.create_channel().await {
53                            Ok(channel) => channel,
54                            Err(_err) => {
55                                retry += 1;
56                                println!("RabbitMQ unable to establish a channel");
57                                continue
58                            }
59                        };
60                        let new_conn_arc = conn_move.clone();
61                        {
62                            match new_conn_arc.write() {
63                                Ok(mut write_guard) => {
64                                    let write_ref = write_guard.deref_mut();
65                                    *write_ref = new_conn;
66                                    println!("Rabbit connection restored");
67                                    break
68                                }
69                                Err(_err) => {
70                                    retry += 1;
71                                    println!("RabbitMQ poisoned connection");
72                                    continue
73                                }
74                            };
75                        }
76                    }
77                    else {
78                        break
79                    }
80                }
81            }
82        });
83
84        Ok(Self {
85            _ampq_url: ampq_url,
86            _conn: conn,
87            queue_sender: tx,
88            _reconnection_max: reconnection_max,
89            service_name,
90        })
91    }
92
93    async fn connect(ampq_url: String) -> Result<Connection, Error> {
94        let conn = Connection::connect(&ampq_url, ConnectionProperties::default()).await?;
95        Ok(conn)
96    }
97
98    pub fn check_status(&self) -> bool {
99        match self.status() {
100            Ok(state) => {
101                use ConnectionState::*;
102                match state {
103                    Initial | Connecting | Connected => true,
104                    Closing | Closed | Error => false,
105                }
106            },
107            Err(_err) => false
108        }
109    }
110
111    pub fn status(&self) -> Result<ConnectionState, Error> {
112        Ok(self._conn.read()?.status().state())
113    }
114
115    async fn publish_message_static(channel: &Channel, message: LogMessage) -> Result<(), Error> {
116        channel.basic_publish(
117                "imsub.logger",
118                "logger",
119                BasicPublishOptions::default(),
120                serde_json::to_string(&message)?.as_bytes(),
121                BasicProperties::default(),
122            )
123            .await?
124            .await?;
125        // assert_eq!(confirm, Confirmation::NotRequested);
126        Ok(())
127    }
128
129    pub fn send_log(
130        &self,
131        level: &Level,
132        user_id: Option<String>,
133        method_name: Option<String>,
134        message: Option<String>,
135        details: Option<String>,
136    ) {
137        let message = LogMessage {
138            service_name: self.service_name.clone(),
139            telegram_id: user_id.unwrap_or_else(|| "-1".to_string()),
140            method_name: method_name.unwrap_or_else(|| "".to_string()),
141            severity: {
142                match *level {
143                    Level::ERROR => LogMessage_Severity::Error,
144                    Level::WARN => LogMessage_Severity::Warning,
145                    Level::INFO => LogMessage_Severity::Info,
146                    Level::DEBUG => LogMessage_Severity::Debug,
147                    Level::TRACE => LogMessage_Severity::Debug,
148                }
149            },
150            message: message.unwrap_or_else(|| "".to_string()),
151            details: details.unwrap_or_else(|| "".to_string()),
152            ..Default::default()
153        };
154        if let Err(err) = self.queue_sender.send(message) {
155            println!("{:?}", err);
156        }
157    }
158}