use std::io::Write;
use std::path::{Path, PathBuf};
use jiff::Timestamp;
use miette::{IntoDiagnostic, Result};
use tracing::{debug, info};
use super::{Audit, AuditEntry, AuditEntryWithTimestamp};
fn parse_timestamp(s: &str) -> Result<Timestamp> {
if let Ok(ts) = s.parse::<Timestamp>() {
return Ok(ts);
}
jiff::civil::DateTime::strptime("%Y-%m-%d %H:%M:%S", s)
.or_else(|_| jiff::civil::DateTime::strptime("%Y-%m-%d", s))
.into_diagnostic()?
.to_zoned(jiff::tz::TimeZone::system())
.into_diagnostic()
.map(|zdt| zdt.timestamp())
}
#[derive(Debug, Clone, Default)]
pub struct QueryOptions {
pub limit: Option<usize>,
pub from_oldest: bool,
pub since: Option<String>,
pub until: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct ExportOptions {
pub audit_path: Option<PathBuf>,
pub query_options: QueryOptions,
pub orphans: bool,
}
impl Audit {
pub fn query(&self, options: &QueryOptions) -> Result<Vec<(u64, AuditEntry)>> {
debug!(?options, "querying audit entries");
let mut entries = self.list()?;
if let Some(ref since_str) = options.since {
let since = parse_timestamp(since_str)?;
let since_micros = since.as_microsecond() as u64;
entries.retain(|(ts, _)| *ts >= since_micros);
}
if let Some(ref until_str) = options.until {
let until = parse_timestamp(until_str)?;
let until_micros = until.as_microsecond() as u64;
entries.retain(|(ts, _)| *ts <= until_micros);
}
if let Some(limit) = options.limit
&& limit > 0
{
if options.from_oldest {
entries.truncate(limit);
} else {
let start = entries.len().saturating_sub(limit);
entries.drain(..start);
}
}
debug!(count = entries.len(), "returning audit entries");
Ok(entries)
}
pub fn find_orphans(audit_dir: impl AsRef<Path>) -> Result<Vec<std::path::PathBuf>> {
let audit_dir = audit_dir.as_ref();
let main_path = Self::main_db_path(audit_dir);
super::multi_process::WorkingDatabase::find_orphan_databases(&main_path)
}
pub fn open_file(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
debug!(?path, "opening audit database file");
let db = redb::Database::open(path).into_diagnostic()?;
let db = std::sync::Arc::new(db);
let repl_state = crate::repl::ReplState {
config: Default::default(),
output_file: None,
sys_user: String::new(),
db_user: String::new(),
expanded_mode: false,
write_mode: false,
redact_mode: false,
ots: None,
vars: std::collections::BTreeMap::new(),
snippets: crate::snippets::Snippets::new(),
transaction_state: crate::repl::TransactionState::None,
result_store: crate::result_store::ResultStore::new(),
from_snippet_or_include: false,
initial_content: None,
last_edit_content: None,
write_mode_active_at: None,
};
let audit = Self {
db,
repl_state: std::sync::Arc::new(std::sync::Mutex::new(repl_state)),
working_info: None,
sync_thread: None,
};
Ok(audit)
}
}
pub fn export_audit_entries(options: ExportOptions) -> Result<()> {
let audit_path = if let Some(path) = options.audit_path {
path
} else {
Audit::default_path()?
};
debug!(?audit_path, "using audit path");
let mut stdout = std::io::stdout().lock();
if options.orphans {
let orphans = Audit::find_orphans(&audit_path)?;
if orphans.is_empty() {
info!("no orphan databases found");
return Ok(());
}
for orphan_path in orphans {
info!(
"reading orphan: {}",
orphan_path.file_name().unwrap().to_string_lossy()
);
let audit = Audit::open_file(&orphan_path)?;
let entries = audit.query(&options.query_options)?;
for (timestamp, entry) in entries {
let entry_with_ts =
AuditEntryWithTimestamp::from_entry_and_timestamp(entry, timestamp);
let json = serde_json::to_string(&entry_with_ts).into_diagnostic()?;
writeln!(stdout, "{}", json).into_diagnostic()?;
}
}
} else {
let audit = Audit::open_file(Audit::main_db_path(&audit_path))?;
let entries = audit.query(&options.query_options)?;
for (timestamp, entry) in entries {
let entry_with_ts = AuditEntryWithTimestamp::from_entry_and_timestamp(entry, timestamp);
let json = serde_json::to_string(&entry_with_ts).into_diagnostic()?;
writeln!(stdout, "{}", json).into_diagnostic()?;
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_query_limit_from_oldest() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let mut audit = Audit::open_empty(db_path).unwrap();
for i in 0..10 {
audit.add_entry(format!("SELECT {};", i)).unwrap();
std::thread::sleep(std::time::Duration::from_micros(10));
}
let opts = QueryOptions {
limit: Some(3),
from_oldest: true,
..Default::default()
};
let entries = audit.query(&opts).unwrap();
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].1.query, "SELECT 0;");
assert_eq!(entries[1].1.query, "SELECT 1;");
assert_eq!(entries[2].1.query, "SELECT 2;");
}
#[test]
fn test_query_limit_from_newest() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let mut audit = Audit::open_empty(db_path).unwrap();
for i in 0..10 {
audit.add_entry(format!("SELECT {};", i)).unwrap();
std::thread::sleep(std::time::Duration::from_micros(10));
}
let opts = QueryOptions {
limit: Some(3),
from_oldest: false,
..Default::default()
};
let entries = audit.query(&opts).unwrap();
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].1.query, "SELECT 7;");
assert_eq!(entries[1].1.query, "SELECT 8;");
assert_eq!(entries[2].1.query, "SELECT 9;");
}
}