#![doc = include_str!("../README.md")]
#![warn(clippy::all, clippy::pedantic)]
#![allow(clippy::missing_errors_doc)]
use axum::BoxError;
use parking_lot::{Mutex, RwLock};
use serde::Deserialize;
use std::collections::BTreeMap;
use std::fmt::Formatter;
use std::io::LineWriter;
use std::path::Path;
use std::{
error::Error,
fmt::{Debug, Display},
fs::File,
io::{BufRead, BufReader, Write},
path::PathBuf,
sync::Arc,
time::Duration,
};
use sysinfo::MINIMUM_CPU_UPDATE_INTERVAL;
use tantivy::schema::Field;
use tantivy::{
Document, Index, IndexReader, IndexWriter, TantivyDocument,
directory::MmapDirectory,
query::QueryParser,
schema::{DateOptions, INDEXED, STORED, STRING, Schema, TEXT},
};
use time::format_description;
use tracing::{Span, instrument};
use utoipa::ToSchema;
struct CpuTimeDisplay(u64);
impl Display for CpuTimeDisplay {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}ms", self.0)
}
}
struct CpuPercentageDisplay(f32);
impl Display for CpuPercentageDisplay {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:.2}%", self.0)
}
}
#[derive(Deserialize, ToSchema)]
pub struct LogSpan {
name: String,
domain: Option<String>,
host: Option<String>,
sni: Option<String>,
}
#[derive(Deserialize, ToSchema)]
pub struct LogLine {
spans: Option<Vec<LogSpan>>,
}
#[derive(Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub enum QueryFormat {
All,
Count,
Top,
}
impl QueryFormat {
fn format(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match *self {
Self::All => write!(f, "all"),
Self::Count => write!(f, "count"),
Self::Top => write!(f, "top"),
}
}
}
impl Display for QueryFormat {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.format(f)
}
}
impl Debug for QueryFormat {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.format(f)
}
}
pub struct Manager {
span: Span,
dir: PathBuf,
schema: Schema,
fields: Vec<Field>,
sync_interval: (u64, u64),
max_ttl_days: u16,
root_index: Arc<(Index, Arc<IndexReader>, QueryParser)>,
#[allow(clippy::type_complexity)]
domain_indexes: Arc<RwLock<BTreeMap<String, (Index, Arc<IndexReader>, QueryParser)>>>,
cursors: Arc<Mutex<(Option<PathBuf>, usize)>>,
api_domain: String,
log_size: bool,
pid: sysinfo::Pid,
system: Arc<Mutex<sysinfo::System>>,
}
impl Manager {
pub fn new(
dir: PathBuf,
api_domain: String,
sync_interval: (u64, u64),
max_ttl_days: u16,
server_span: Option<Span>,
log_size: bool,
) -> Result<Self, Box<dyn Error>> {
let span = if let Some(span) = server_span {
span.in_scope(|| tracing::info_span!("monitor"))
} else {
tracing::info_span!("monitor")
};
let cursor_path = dir.join("cursor");
std::fs::create_dir_all(&cursor_path)?;
let file_cursor_path = cursor_path.join("file");
let file_cursor = match std::fs::read(&file_cursor_path) {
Ok(file) => Some(dir.join("files").join(std::str::from_utf8(&file)?)),
Err(_) => None,
};
let line_cursor_path = cursor_path.join("line");
let line_cursor = match std::fs::read(&line_cursor_path) {
Ok(file) => std::str::from_utf8(&file)?.parse::<usize>()?,
Err(_) => 0,
};
let mut schema_builder = Schema::builder();
let ts_opts = DateOptions::from(INDEXED)
.set_stored()
.set_fast()
.set_precision(tantivy::schema::DateTimePrecision::Microseconds);
let timestamp = schema_builder.add_date_field("timestamp", ts_opts);
let level = schema_builder.add_text_field("level", STRING | STORED);
let fields = schema_builder.add_json_field("fields", TEXT | STORED);
let spans = schema_builder.add_json_field("spans", TEXT | STORED);
let fields = vec![timestamp, level, fields, spans];
let schema = schema_builder.build();
let index_dir = dir.join("search").join("root");
std::fs::create_dir_all(&index_dir)?;
let mmap_dir = MmapDirectory::open(index_dir)?;
let index = Index::open_or_create(mmap_dir, schema.clone())?;
let reader = index
.reader_builder()
.reload_policy(tantivy::ReloadPolicy::OnCommitWithDelay);
let query_parser = QueryParser::for_index(&index, fields.clone());
let archive_path = dir.join("archive").join("root");
std::fs::create_dir_all(&archive_path)?;
let system = sysinfo::System::new_all();
Ok(Manager {
span,
dir,
schema,
fields,
sync_interval,
max_ttl_days,
root_index: Arc::new((index, Arc::new(reader.try_into()?), query_parser)),
domain_indexes: Arc::new(RwLock::new(BTreeMap::default())),
cursors: Arc::new(Mutex::new((file_cursor, line_cursor))),
api_domain,
log_size,
pid: sysinfo::Pid::from(std::process::id() as usize),
system: Arc::new(Mutex::new(system)),
})
}
#[instrument(skip(self), err)]
pub fn add(&self, domain: String) -> Result<(), Box<dyn Error>> {
let index_dir = self.dir.join("search").join(&domain);
std::fs::create_dir_all(&index_dir)?;
let cursor_path = self.dir.join("archive").join(&domain);
std::fs::create_dir_all(&cursor_path)?;
let dir = MmapDirectory::open(index_dir)?;
let index = Index::open_or_create(dir, self.schema.clone())?;
let reader = index
.reader_builder()
.reload_policy(tantivy::ReloadPolicy::OnCommitWithDelay);
let query_parser = QueryParser::for_index(&index, self.fields.clone());
let mut di = self.domain_indexes.write();
di.insert(domain, (index, Arc::new(reader.try_into()?), query_parser));
Ok(())
}
#[instrument(name = "search", skip(self, domain, query, format, limit), err)]
pub fn search_all(
&self,
domain: &str,
query: &str,
format: &QueryFormat,
limit: &Option<usize>,
) -> Result<Vec<Result<String, BoxError>>, Box<dyn Error>> {
if let Some(limit) = limit {
tracing::info!(query, %format, limit);
} else {
tracing::info!(query, %format);
}
let di = self.domain_indexes.read();
let index = if domain == "root" {
Some(&*self.root_index)
} else {
di.get(domain)
};
if let Some((_, reader, parser)) = index {
let query = parser.parse_query(query)?;
let searcher = reader.searcher();
let docs = searcher.search(&*query, &tantivy::collector::DocSetCollector)?;
tracing::info!(ct = docs.len());
let mut json = Vec::with_capacity(docs.len());
for doc in &docs {
if let Ok(doc) = searcher.doc::<TantivyDocument>(*doc) {
let mut line = doc.to_json(&self.schema);
line.push('\n');
json.push(Ok(line));
}
}
return Ok(json);
}
Err("failed to search".into())
}
#[instrument(name = "search", skip(self, domain, query, format, limit), err)]
pub fn search_count(
&self,
domain: &str,
query: &str,
format: &QueryFormat,
limit: &Option<usize>,
) -> Result<usize, Box<dyn Error>> {
if let Some(limit) = limit {
tracing::info!(query, %format, limit);
} else {
tracing::info!(query, %format);
}
let di = self.domain_indexes.read();
let index = if domain == "root" {
Some(&*self.root_index)
} else {
di.get(domain)
};
if let Some((_, reader, parser)) = index {
let query = parser.parse_query(query)?;
let searcher = reader.searcher();
return Ok(searcher.search(&*query, &tantivy::collector::Count)?);
}
Err("failed to search".into())
}
#[instrument(name = "search", skip(self, domain, query, format, limit), err)]
pub fn search_top(
&self,
domain: &str,
query: &str,
format: &QueryFormat,
limit: &Option<usize>,
) -> Result<Vec<Result<String, BoxError>>, Box<dyn Error>> {
if let Some(limit) = limit {
tracing::info!(query, %format, limit);
} else {
tracing::info!(query, %format);
}
let di = self.domain_indexes.read();
let index = if domain == "root" {
Some(&*self.root_index)
} else {
di.get(domain)
};
if let Some((_, reader, parser)) = index {
let query = parser.parse_query(query)?;
let searcher = reader.searcher();
if let Some(limit) = limit {
let docs =
searcher.search(&*query, &tantivy::collector::TopDocs::with_limit(*limit))?;
tracing::info!(ct = docs.len());
let mut json = Vec::with_capacity(docs.len());
for (_, doc) in &docs {
if let Ok(doc) = searcher.doc::<TantivyDocument>(*doc) {
let mut line = doc.to_json(&self.schema);
line.push('\n');
json.push(Ok(line));
}
}
return Ok(json);
}
}
Err("failed to search".into())
}
#[instrument(
skip(
dir,
cursors_mutex,
root_index,
domain_indexes,
schema,
api_domain,
max_ttl_hours,
log_size,
system,
pid,
),
err
)]
#[allow(
clippy::type_complexity,
clippy::too_many_lines,
clippy::too_many_arguments,
clippy::cast_precision_loss
)]
fn sync(
dir: &Path,
cursors_mutex: &Arc<Mutex<(Option<PathBuf>, usize)>>,
root_index: &Arc<(Index, Arc<IndexReader>, QueryParser)>,
domain_indexes: &Arc<RwLock<BTreeMap<String, (Index, Arc<IndexReader>, QueryParser)>>>,
schema: &Schema,
api_domain: &String,
max_ttl_hours: u32,
log_size: bool,
system: &Arc<Mutex<sysinfo::System>>,
pid: sysinfo::Pid,
) -> Result<(), Box<dyn Error>> {
let mut sys = system.lock();
sys.refresh_all();
if let Some(process) = sys.process(pid) {
let cpu_ct = sys.cpus().len();
let du = process.disk_usage();
tracing::info!(
system.cpu.count = cpu_ct,
system.cpu.usage = %CpuPercentageDisplay(sys.global_cpu_usage()),
system.memory.total = %bytesize::ByteSize(sys.total_memory()).display().si_short(),
system.memory.used = %bytesize::ByteSize(sys.used_memory()).display().si_short(),
system.memory.free = %bytesize::ByteSize(sys.free_memory()).display().si_short(),
system.memory.available = %bytesize::ByteSize(sys.available_memory()).display().si_short(),
process.cpu.usage = %CpuPercentageDisplay(process.cpu_usage() / cpu_ct as f32),
process.cpu.time = %CpuTimeDisplay(process.accumulated_cpu_time()),
process.threads = num_threads::num_threads(),
process.memory.real = %bytesize::ByteSize(process.memory()).display().si_short(),
process.memory.virtual = %bytesize::ByteSize(process.virtual_memory()).display().si_short(),
process.disk.read = %bytesize::ByteSize(du.total_read_bytes).display().si_short(),
process.disk.write = %bytesize::ByteSize(du.total_written_bytes).display().si_short(),
"info"
);
}
drop(sys);
let mut lock = cursors_mutex.lock();
let file_cursor = match &lock.0 {
Some(fc) => fc,
None => &dir.join("files"),
};
tracing::info!(
cursor.file = %file_cursor
.file_name()
.expect("log file to have name")
.to_str()
.expect("log file name to str"),
cursor.line = lock.1,
"syncing"
);
let mut log_files = std::fs::read_dir(dir.join("files"))?
.map(|res| res.map(|e| e.path()))
.collect::<Result<Vec<_>, std::io::Error>>()?;
log_files.sort();
let format = format_description::parse(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:6]Z",
)?;
let kill_on_or_before = (time::UtcDateTime::now()
- time::Duration::days(i64::from(max_ttl_hours.cast_signed())))
.format(&format)?;
let del_query_str =
format!("timestamp:[1970-01-01T00:00:00.000000Z TO {kill_on_or_before}}}");
let di = domain_indexes.read();
let mut offset_map: BTreeMap<&String, BTreeMap<&PathBuf, Vec<usize>>> = BTreeMap::default();
let mut file_handles = BTreeMap::default();
for (domain, _) in di.iter() {
offset_map.insert(domain, BTreeMap::default());
}
let mut root_writer: IndexWriter = root_index.0.writer(15_000_000)?;
let mut last_log_file = file_cursor.clone();
let mut last_log_line = lock.1;
let mut lines_added = 0;
for log_file in &log_files {
if log_file < file_cursor {
continue;
}
let file_name = log_file
.file_name()
.expect("log file to have name")
.to_str()
.expect("log file name to str");
tracing::info!(
file = %file_name,
start_line = if log_file == file_cursor {
last_log_line
} else {
0
},
"preparing"
);
let mut file_lines = 0;
let file = File::open(log_file)?;
file_handles.insert(log_file.clone(), File::open(log_file)?);
let reader = BufReader::new(file);
let mut lines = reader.lines();
let i_offset = if log_file == file_cursor {
lines.nth(lock.1);
lock.1 + 1
} else {
0
};
let tmp_path = dir.join("archive").join("root").join(file_name);
let tmp_file = File::create(&tmp_path)?;
let mut tmp_file = LineWriter::new(tmp_file);
for (i, log, line) in lines.enumerate().filter_map(|(i, line)| {
if let Ok(line) = line {
if let Ok(log) = serde_json::from_str::<LogLine>(&line) {
Some((i, log, line))
} else {
tracing::error!(log = line, "failed to parse");
None
}
} else {
tracing::error!(line = i, "failed to read");
None
}
}) {
let mut accounted_for = false;
if let Some(spans) = log.spans {
for span in spans {
let domain = match span.name.as_str() {
"app" => span.domain,
"redirect" => span.host,
"tls" => span.sni,
_ => None,
};
if let Some(domain) = domain
&& &domain != api_domain
&& let Some(offsets) = offset_map.get_mut(&domain)
{
if let Some(lines) = offsets.get_mut(log_file) {
lines.push(i + i_offset);
} else {
offsets.insert(log_file, vec![i + i_offset]);
}
accounted_for = true;
break;
}
}
}
if !accounted_for {
tmp_file.write_all(line.as_bytes())?;
tmp_file.write_all(b"\n")?;
root_writer.add_document(TantivyDocument::parse_json(schema, &line)?)?;
lines_added += 1;
file_lines += 1;
}
last_log_line = i + i_offset;
}
tmp_file.flush()?;
Self::archive(dir, log_size, "root", &mut file_lines, file_name, &tmp_path)?;
last_log_file.clone_from(log_file);
}
let del_query = root_index.2.parse_query(&del_query_str)?;
let searcher = root_index.1.searcher();
let removed_lines = searcher.search(&del_query, &tantivy::collector::Count)?;
root_writer.delete_query(del_query)?;
root_writer.commit()?;
root_writer.garbage_collect_files();
root_writer.wait_merging_threads()?;
let searcher = root_index.1.searcher();
let total = searcher.num_docs();
tracing::info!(
domain = %"root",
added = lines_added,
removed = removed_lines,
total,
"indexed"
);
for (domain, offsets) in offset_map {
if let Some((index, reader, query_parser)) = di.get(domain) {
let mut lines_added = 0;
let mut writer: IndexWriter = index.writer(15_000_000)?;
for (path, line_ns) in offsets {
let mut file_lines = 0;
let file_name = path
.file_name()
.expect("log file to have name")
.to_str()
.expect("log file name to str");
let tmp_path = dir.join("archive").join(domain).join(file_name);
let tmp_file = File::create(&tmp_path)?;
let mut tmp_file = LineWriter::new(tmp_file);
if let Some(file) = file_handles.get(path) {
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut pos = 0;
for n in line_ns {
let next = n - pos;
pos = n + 1;
if let Some(line) = lines.nth(next)
&& let Ok(line) = line
{
tmp_file.write_all(line.as_bytes())?;
tmp_file.write_all(b"\n")?;
writer.add_document(TantivyDocument::parse_json(schema, &line)?)?;
lines_added += 1;
file_lines += 1;
}
}
}
tmp_file.flush()?;
Self::archive(dir, log_size, domain, &mut file_lines, file_name, &tmp_path)?;
}
let del_query = query_parser.parse_query(&del_query_str)?;
let searcher = reader.searcher();
let removed_lines = searcher.search(&del_query, &tantivy::collector::Count)?;
writer.delete_query(del_query)?;
writer.commit()?;
writer.garbage_collect_files();
writer.wait_merging_threads()?;
let searcher = reader.searcher();
let total = searcher.num_docs();
tracing::info!(
%domain,
added = lines_added,
removed = removed_lines,
total,
"indexed"
);
}
}
drop(di);
let mut file_cursor = File::create(dir.join("cursor").join("file"))?;
file_cursor.write_all(
last_log_file
.file_name()
.expect("log file to have name")
.to_str()
.expect("log file name to str")
.as_bytes(),
)?;
let mut line_cursor = File::create(dir.join("cursor").join("line"))?;
line_cursor.write_all((last_log_line + 1).to_string().as_bytes())?;
for log_file in &log_files {
if log_file < &last_log_file {
std::fs::remove_file(log_file)?;
tracing::info!(
file = %log_file
.file_name()
.expect("log file to have name")
.to_str()
.expect("log file name to str"),
"removed"
);
}
}
*lock = (Some(last_log_file), last_log_line + 1);
drop(lock);
Ok(())
}
fn archive(
dir: &Path,
log_size: bool,
domain: &str,
file_lines: &mut i32,
file_name: &str,
tmp_path: &PathBuf,
) -> Result<(), Box<dyn Error>> {
let mut dest_name: String;
let archive_file = {
let mut j = 0;
let file_name = file_name.to_owned();
loop {
dest_name = format!("{file_name}.{j}.zst");
let archive_path = dir.join("archive").join(domain).join(&dest_name);
if let Ok(file) = File::create_new(&archive_path) {
break file;
}
j += 1;
}
};
let mut encoder = zstd::Encoder::new(archive_file, 17)?;
let mut tmp_file = File::open(tmp_path)?;
let size = std::io::copy(&mut tmp_file, &mut encoder)?;
let archive_file = encoder.finish()?;
std::fs::remove_file(tmp_path)?;
let compressed = archive_file.metadata()?.len();
if log_size {
tracing::info!(%domain, file = %dest_name, lines = file_lines, size.source = %bytesize::ByteSize(size).display().si_short(), size.compressed = %bytesize::ByteSize(compressed).display().si_short(), "archived");
} else {
tracing::info!(%domain, file = %dest_name, lines = file_lines, "archived");
}
Ok(())
}
#[allow(clippy::similar_names)]
pub fn start(&self) -> Result<std::thread::JoinHandle<()>, Box<dyn Error>> {
let sync_interval_s = self.sync_interval;
let max_ttl_hours = u32::from(self.max_ttl_days * 24);
let dir = self.dir.clone();
let domain_indexes = self.domain_indexes.clone();
let monitor_span = self.span.clone();
let (mut sync_min, mut sync_max) = sync_interval_s;
sync_min *= 1000;
sync_max *= 1000;
let cursors_clone = self.cursors.clone();
let schema = self.schema.clone();
let root_index = self.root_index.clone();
let api_domain = self.api_domain.clone();
let log_size = self.log_size;
let system = self.system.clone();
let pid = self.pid;
let sync_handle = std::thread::spawn(move || {
loop {
let sync_interval_ms = rand::random_range(sync_min..sync_max);
if Duration::from_millis(sync_interval_ms) < MINIMUM_CPU_UPDATE_INTERVAL {
std::thread::sleep(MINIMUM_CPU_UPDATE_INTERVAL);
} else {
std::thread::sleep(Duration::from_millis(sync_interval_ms));
}
monitor_span.in_scope(|| {
match Self::sync(
&dir,
&cursors_clone,
&root_index,
&domain_indexes,
&schema,
&api_domain,
max_ttl_hours,
log_size,
&system,
pid,
) {
Ok(()) => {}
Err(err) => tracing::error!(%err),
}
});
}
});
Ok(sync_handle)
}
}