use journal_engine::{
Facets, FileIndexCacheBuilder, FileIndexKey, IndexingLimits, QueryTimeRange,
batch_compute_file_indexes,
};
use journal_index::FieldName;
use journal_registry::{Monitor, Registry};
use std::env;
use std::path::PathBuf;
use tokio_util::sync::CancellationToken;
#[allow(unused_imports)]
use tracing::{info, warn};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
let dir = if let Some(arg) = env::args().nth(1) {
PathBuf::from(arg)
} else {
PathBuf::from("/mnt/slow-disk/journal-fixtures")
};
info!("scanning directory: {}", dir.display());
let (monitor, _event_receiver) = Monitor::new()?;
let registry = Registry::new(monitor);
registry.watch_directory(dir.to_str().unwrap())?;
let files = registry.find_files_in_range(
journal_common::Seconds(0),
journal_common::Seconds(u32::MAX),
)?;
info!("found {} journal files", files.len());
if files.is_empty() {
return Ok(());
}
let cache = FileIndexCacheBuilder::new()
.with_cache_path("/tmp/foyer-cache")
.with_memory_capacity(1000)
.with_disk_capacity(2048 * 1024 * 1024)
.with_block_size(4 * 1024 * 1024)
.build()
.await?;
info!("created file index cache");
let facets = Facets::new(&["log.severity_number".to_string()]);
let source_timestamp_field = FieldName::new("_SOURCE_REALTIME_TIMESTAMP").unwrap();
let keys: Vec<FileIndexKey> = files
.iter()
.map(|file_info| {
FileIndexKey::new(
&file_info.file,
&facets,
Some(source_timestamp_field.clone()),
)
})
.collect();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs() as u32;
let time_range = QueryTimeRange::new(now - 86400, now)?;
let cancellation = CancellationToken::new();
info!(
"computing {} file indexes, bucket duration: {}s",
keys.len(),
time_range.bucket_duration()
);
let start = std::time::Instant::now();
let responses = batch_compute_file_indexes(
&cache,
®istry,
keys,
&time_range,
cancellation,
IndexingLimits::default(),
None,
)
.await?;
let elapsed = start.elapsed();
info!("responses={}, duration={:?}", responses.len(), elapsed);
cache.close().await?;
Ok(())
}