1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
pub mod conn;
pub mod man;

use crate::protocol::{Command, LogMessage, RequestPackage, Status};
use conn::ClientConnection;
use man::ConnectionManager;

use log::{LevelFilter, Log};
use r2d2::Pool;
use std::net::SocketAddr;

#[derive(Debug)]
pub struct RokkettLogger {
	service: String,
	pool: Pool<ConnectionManager>,
}

impl RokkettLogger {
	/// Creates and initializes the new RokkettLogger.
	///
	/// - **addr**: the address of the logging server in <host>:<port> format
	/// - **service**: The name of the application
	/// - **token**: auth token
	pub fn new(addr: &str, service: &str, token: Option<&str>) -> anyhow::Result<()> {
		let ipaddr: SocketAddr = addr.parse()?;
		let token = match token {
			Some(token) => token.to_owned(),
			None => "".to_owned(),
		};

		// test connection before creating a pool
		ClientConnection::new(&ipaddr)?.is_valid()?;

		let max_size = num_cpus::get() as u32 * 2;
		let logger = RokkettLogger {
			service: service.to_owned(),
			pool: r2d2::Pool::builder()
				.max_size(max_size)
				.build(ConnectionManager::new(ipaddr, token))?,
		};
		log::set_boxed_logger(Box::new(logger))?;
		log::set_max_level(LevelFilter::Debug);
		Ok(())
	}
}

impl Log for RokkettLogger {
	fn enabled(&self, metadata: &log::Metadata) -> bool {
		metadata.level() > LevelFilter::Off
	}

	fn log(&self, record: &log::Record) {
		if self.enabled(record.metadata()) {
			let log_msg = LogMessage::from_log_record(record, &self.service);
			let package = RequestPackage {
				command: Command::Log(log_msg),
			};
			let pool = self.pool.clone();

			// TODO: fix blocking everything inside tasks
			let task = move || {
				let mut conn = match pool.get() {
					Ok(conn) => conn,
					Err(err) => return eprintln!("failed to get connection from pool. err = {err}"),
				};
				match conn.send(package) {
					Ok(status) => {
						match status {
							Status::Ok => {}
							status => eprintln!("failed to send log. status = {status}"),
						};
					}
					Err(err) => eprintln!("failed to send log. err = {err}"),
				}
			};

			#[cfg(not(feature = "tokio"))]
			std::thread::spawn(task);

			#[cfg(feature = "tokio")]
			tokio::task::spawn(async { (task)() });
		}
	}

	fn flush(&self) {}
}

// #[cfg(test)]
// mod tests {
// 	use super::*;

// 	#[test]
// 	fn test() {
// 		assert!(true);
// 	}
// }