imsub_log_crate/
rabbitmq.rs1use std::ops::DerefMut;
2use std::sync::{Arc, RwLock};
3use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
4
5use lapin::{options::*, BasicProperties, Channel, Connection, ConnectionProperties, ConnectionState};
6use tracing::Level;
7use super::error::Error;
8
9use super::message::{LogMessage, LogMessage_Severity};
10
11pub struct RabbitMqClient {
12 _ampq_url: String,
13 _conn: Arc<RwLock<Connection>>,
14 queue_sender: UnboundedSender<LogMessage>,
15 _reconnection_max: u8,
16 service_name: String,
17}
18
19impl RabbitMqClient {
20 pub async fn new(ampq_url: String, service_name: String) -> Result<Self, Error> {
21 let conn = Arc::new(RwLock::new(
22 RabbitMqClient::connect(ampq_url.clone()).await?,
23 ));
24 let (tx, mut rx) = unbounded_channel::<LogMessage>();
25 let lock_conn = conn.read()?;
26 let mut channel = lock_conn.create_channel().await?;
27 drop(lock_conn);
28
29 let conn_move = conn.clone();
30 let ampq_url_move = ampq_url.clone();
31 let reconnection_max = 4;
32 let reconnection_max_move = reconnection_max;
33 tokio::spawn(async move {
34
35 while let Some(message) = rx.recv().await {
36
37 let mut retry: u8 = 0;
38 loop {
39 if retry > reconnection_max_move {
40 break
41 }
42
43 if let Err(_err) = RabbitMqClient::publish_message_static(&channel, message.clone()).await {
44 let new_conn = match RabbitMqClient::connect(ampq_url_move.clone()).await {
45 Ok(conn) => conn,
46 Err(_err) => {
47 retry += 1;
48 println!("RabbitMQ reconnection failed");
49 continue
50 }
51 };
52 channel = match new_conn.create_channel().await {
53 Ok(channel) => channel,
54 Err(_err) => {
55 retry += 1;
56 println!("RabbitMQ unable to establish a channel");
57 continue
58 }
59 };
60 let new_conn_arc = conn_move.clone();
61 {
62 match new_conn_arc.write() {
63 Ok(mut write_guard) => {
64 let write_ref = write_guard.deref_mut();
65 *write_ref = new_conn;
66 println!("Rabbit connection restored");
67 break
68 }
69 Err(_err) => {
70 retry += 1;
71 println!("RabbitMQ poisoned connection");
72 continue
73 }
74 };
75 }
76 }
77 else {
78 break
79 }
80 }
81 }
82 });
83
84 Ok(Self {
85 _ampq_url: ampq_url,
86 _conn: conn,
87 queue_sender: tx,
88 _reconnection_max: reconnection_max,
89 service_name,
90 })
91 }
92
93 async fn connect(ampq_url: String) -> Result<Connection, Error> {
94 let conn = Connection::connect(&q_url, ConnectionProperties::default()).await?;
95 Ok(conn)
96 }
97
98 pub fn check_status(&self) -> bool {
99 match self.status() {
100 Ok(state) => {
101 use ConnectionState::*;
102 match state {
103 Initial | Connecting | Connected => true,
104 Closing | Closed | Error => false,
105 }
106 },
107 Err(_err) => false
108 }
109 }
110
111 pub fn status(&self) -> Result<ConnectionState, Error> {
112 Ok(self._conn.read()?.status().state())
113 }
114
115 async fn publish_message_static(channel: &Channel, message: LogMessage) -> Result<(), Error> {
116 channel.basic_publish(
117 "imsub.logger",
118 "logger",
119 BasicPublishOptions::default(),
120 serde_json::to_string(&message)?.as_bytes(),
121 BasicProperties::default(),
122 )
123 .await?
124 .await?;
125 Ok(())
127 }
128
129 pub fn send_log(
130 &self,
131 level: &Level,
132 user_id: Option<String>,
133 method_name: Option<String>,
134 message: Option<String>,
135 details: Option<String>,
136 ) {
137 let message = LogMessage {
138 service_name: self.service_name.clone(),
139 telegram_id: user_id.unwrap_or_else(|| "-1".to_string()),
140 method_name: method_name.unwrap_or_else(|| "".to_string()),
141 severity: {
142 match *level {
143 Level::ERROR => LogMessage_Severity::Error,
144 Level::WARN => LogMessage_Severity::Warning,
145 Level::INFO => LogMessage_Severity::Info,
146 Level::DEBUG => LogMessage_Severity::Debug,
147 Level::TRACE => LogMessage_Severity::Debug,
148 }
149 },
150 message: message.unwrap_or_else(|| "".to_string()),
151 details: details.unwrap_or_else(|| "".to_string()),
152 ..Default::default()
153 };
154 if let Err(err) = self.queue_sender.send(message) {
155 println!("{:?}", err);
156 }
157 }
158}