1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
pub mod conn;
pub mod man;
use crate::protocol::{Command, LogMessage, RequestPackage, Status};
use conn::ClientConnection;
use man::ConnectionManager;
use log::{LevelFilter, Log};
use r2d2::Pool;
use std::net::SocketAddr;
#[derive(Debug)]
pub struct RokkettLogger {
service: String,
pool: Pool<ConnectionManager>,
}
impl RokkettLogger {
pub fn new(addr: &str, service: &str, token: Option<&str>) -> anyhow::Result<()> {
let ipaddr: SocketAddr = addr.parse()?;
let token = match token {
Some(token) => token.to_owned(),
None => "".to_owned(),
};
ClientConnection::new(&ipaddr)?.is_valid()?;
let max_size = num_cpus::get() as u32 * 2;
let logger = RokkettLogger {
service: service.to_owned(),
pool: r2d2::Pool::builder()
.max_size(max_size)
.build(ConnectionManager::new(ipaddr, token))?,
};
log::set_boxed_logger(Box::new(logger))?;
log::set_max_level(LevelFilter::Debug);
Ok(())
}
}
impl Log for RokkettLogger {
fn enabled(&self, metadata: &log::Metadata) -> bool {
metadata.level() > LevelFilter::Off
}
fn log(&self, record: &log::Record) {
if self.enabled(record.metadata()) {
let log_msg = LogMessage::from_log_record(record, &self.service);
let package = RequestPackage {
command: Command::Log(log_msg),
};
let pool = self.pool.clone();
let task = move || {
let mut conn = match pool.get() {
Ok(conn) => conn,
Err(err) => return eprintln!("failed to get connection from pool. err = {err}"),
};
match conn.send(package) {
Ok(status) => {
match status {
Status::Ok => {}
status => eprintln!("failed to send log. status = {status}"),
};
}
Err(err) => eprintln!("failed to send log. err = {err}"),
}
};
#[cfg(not(feature = "tokio"))]
std::thread::spawn(task);
#[cfg(feature = "tokio")]
tokio::task::spawn(async { (task)() });
}
}
fn flush(&self) {}
}