pub(crate) mod spectate {
tonic::include_proto!("spectate_proto");
}
pub use env_logger::Target;
pub use log;
use log::trace;
use spectate::spectate_client::SpectateClient;
use spectate::LogEntry;
use std::thread;
use std::{
io,
sync::{
mpsc::{channel, Receiver, Sender},
Arc, Mutex,
},
};
use tokio::runtime::Runtime;
use tonic::transport::Channel;
#[derive(Debug, Clone)]
pub struct Spectate {
sender: Sender<u8>,
receiver: Arc<Mutex<Receiver<u8>>>,
runtime: Arc<Mutex<Runtime>>,
client: Arc<Mutex<SpectateClient<Channel>>>,
}
impl Default for Spectate {
fn default() -> Self {
Self::new()
}
}
impl Spectate {
fn new() -> Self {
let (sender, receiver) = channel();
let _ = env_logger::try_init();
let receiver = Arc::new(Mutex::new(receiver));
let runtime = Arc::new(Mutex::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("spectate_runtime_thread")
.on_thread_start(move || set_current_thread_priority(WORKER_PRIORITY))
.build()
.expect("Creating spectate_runtime_thread"),
));
let client = Arc::new(Mutex::new(
Spectate::client_init(&runtime).expect("Couldn't initialize connection"),
));
Self {
sender,
receiver,
runtime,
client,
}
}
fn flush(&self) {
let receiver = self.receiver.clone();
let client = self.client.clone();
let runtime = self.runtime.clone();
let send_thread = thread::spawn(move || {
let receiver = receiver.lock().expect("");
let runtime = runtime.lock().expect("");
let mut client = client.lock().expect("");
let entries = vec![LogEntry {
data: receiver.try_iter().collect(),
}];
if !entries.is_empty() {
let send_future = client.send_records(futures_util::stream::iter(entries));
runtime.block_on(async move {
send_future.await.expect("Couldn't send records");
});
}
});
send_thread.join().expect("join send_thread");
}
pub fn target() -> Target {
Target::Pipe(Box::new(LogTarget(Spectate::new())))
}
fn client_init(
runtime: &Arc<Mutex<Runtime>>,
) -> Result<SpectateClient<Channel>, Box<dyn std::error::Error>> {
let runtime = runtime.clone();
let runtime_thread = thread::spawn(move || {
let runtime = runtime.lock().expect("lock runtime");
trace!("Connecting to grpc server");
runtime.block_on(async {
SpectateClient::connect("http://[::1]:50051")
.await
.expect("couldn't connect")
})
});
let client = runtime_thread.join().expect("Couldn't join runtime_thread");
Ok(client)
}
}
#[derive(Debug, Clone)]
struct LogTarget(Spectate);
impl LogTarget {}
impl io::Write for LogTarget {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
for char in buf {
self.0.sender.send(*char).ok();
}
self.flush()?;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush();
Ok(())
}
}
const WORKER_PRIORITY: i32 = 10;
#[cfg(unix)]
fn set_current_thread_priority(priority: i32) {
unsafe { libc::setpriority(0, 0, priority) };
}
#[allow(dead_code)]
#[cfg(not(unix))]
fn set_current_thread_priority(priority: i32) {
warn!("Setting worker thread priority not supported on this platform");
}