bestool_psql/audit/
library.rs1use std::io::Write;
2use std::path::{Path, PathBuf};
3
4use jiff::Timestamp;
5use miette::{IntoDiagnostic, Result};
6use tracing::{debug, info};
7
8use super::{Audit, AuditEntry, AuditEntryWithTimestamp};
9
10fn parse_timestamp(s: &str) -> Result<Timestamp> {
12 if let Ok(ts) = s.parse::<Timestamp>() {
14 return Ok(ts);
15 }
16
17 jiff::civil::DateTime::strptime("%Y-%m-%d %H:%M:%S", s)
19 .or_else(|_| jiff::civil::DateTime::strptime("%Y-%m-%d", s))
20 .into_diagnostic()?
21 .to_zoned(jiff::tz::TimeZone::system())
22 .into_diagnostic()
23 .map(|zdt| zdt.timestamp())
24}
25
26#[derive(Debug, Clone, Default)]
28pub struct QueryOptions {
29 pub limit: Option<usize>,
31 pub from_oldest: bool,
33 pub since: Option<String>,
35 pub until: Option<String>,
37}
38
39#[derive(Debug, Clone, Default)]
41pub struct ExportOptions {
42 pub audit_path: Option<PathBuf>,
44 pub query_options: QueryOptions,
46 pub orphans: bool,
48}
49
50impl Audit {
51 pub fn query(&self, options: &QueryOptions) -> Result<Vec<(u64, AuditEntry)>> {
57 debug!(?options, "querying audit entries");
58
59 let mut entries = self.list()?;
61
62 if let Some(ref since_str) = options.since {
64 let since = parse_timestamp(since_str)?;
65 let since_micros = since.as_microsecond() as u64;
66 entries.retain(|(ts, _)| *ts >= since_micros);
67 }
68
69 if let Some(ref until_str) = options.until {
70 let until = parse_timestamp(until_str)?;
71 let until_micros = until.as_microsecond() as u64;
72 entries.retain(|(ts, _)| *ts <= until_micros);
73 }
74
75 if let Some(limit) = options.limit
77 && limit > 0
78 {
79 if options.from_oldest {
80 entries.truncate(limit);
82 } else {
83 let start = entries.len().saturating_sub(limit);
85 entries.drain(..start);
86 }
87 }
88
89 debug!(count = entries.len(), "returning audit entries");
90 Ok(entries)
91 }
92
93 pub fn find_orphans(audit_dir: impl AsRef<Path>) -> Result<Vec<std::path::PathBuf>> {
97 let audit_dir = audit_dir.as_ref();
98 let main_path = Self::main_db_path(audit_dir);
99
100 super::multi_process::WorkingDatabase::find_orphan_databases(&main_path)
101 }
102
103 pub fn open_file(path: impl AsRef<Path>) -> Result<Self> {
105 let path = path.as_ref();
106 debug!(?path, "opening audit database file");
107
108 let db = redb::Database::open(path).into_diagnostic()?;
109 let db = std::sync::Arc::new(db);
110
111 let repl_state = crate::repl::ReplState {
112 config: Default::default(),
113 output_file: None,
114 sys_user: String::new(),
115 db_user: String::new(),
116 expanded_mode: false,
117 write_mode: false,
118 redact_mode: false,
119 ots: None,
120 vars: std::collections::BTreeMap::new(),
121 snippets: crate::snippets::Snippets::new(),
122 transaction_state: crate::repl::TransactionState::None,
123 result_store: crate::result_store::ResultStore::new(),
124 };
125
126 let audit = Self {
127 db,
128 repl_state: std::sync::Arc::new(std::sync::Mutex::new(repl_state)),
129 working_info: None,
130 sync_thread: None,
131 };
132
133 Ok(audit)
134 }
135}
136
137pub fn export_audit_entries(options: ExportOptions) -> Result<()> {
142 let audit_path = if let Some(path) = options.audit_path {
143 path
144 } else {
145 Audit::default_path()?
146 };
147
148 debug!(?audit_path, "using audit path");
149
150 let mut stdout = std::io::stdout().lock();
151
152 if options.orphans {
153 let orphans = Audit::find_orphans(&audit_path)?;
155
156 if orphans.is_empty() {
157 info!("no orphan databases found");
158 return Ok(());
159 }
160
161 for orphan_path in orphans {
162 info!(
163 "reading orphan: {}",
164 orphan_path.file_name().unwrap().to_string_lossy()
165 );
166
167 let audit = Audit::open_file(&orphan_path)?;
168 let entries = audit.query(&options.query_options)?;
169
170 for (timestamp, entry) in entries {
171 let entry_with_ts =
172 AuditEntryWithTimestamp::from_entry_and_timestamp(entry, timestamp);
173 let json = serde_json::to_string(&entry_with_ts).into_diagnostic()?;
174 writeln!(stdout, "{}", json).into_diagnostic()?;
175 }
176 }
177 } else {
178 let audit = Audit::open_file(Audit::main_db_path(&audit_path))?;
180 let entries = audit.query(&options.query_options)?;
181
182 for (timestamp, entry) in entries {
183 let entry_with_ts = AuditEntryWithTimestamp::from_entry_and_timestamp(entry, timestamp);
184 let json = serde_json::to_string(&entry_with_ts).into_diagnostic()?;
185 writeln!(stdout, "{}", json).into_diagnostic()?;
186 }
187 }
188
189 Ok(())
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195
196 #[test]
197 fn test_query_limit_from_oldest() {
198 let temp_dir = tempfile::tempdir().unwrap();
199 let db_path = temp_dir.path().join("test.redb");
200
201 let mut audit = Audit::open_empty(db_path).unwrap();
202
203 for i in 0..10 {
205 audit.add_entry(format!("SELECT {};", i)).unwrap();
206 std::thread::sleep(std::time::Duration::from_micros(10));
208 }
209
210 let opts = QueryOptions {
212 limit: Some(3),
213 from_oldest: true,
214 ..Default::default()
215 };
216 let entries = audit.query(&opts).unwrap();
217 assert_eq!(entries.len(), 3);
218 assert_eq!(entries[0].1.query, "SELECT 0;");
219 assert_eq!(entries[1].1.query, "SELECT 1;");
220 assert_eq!(entries[2].1.query, "SELECT 2;");
221 }
222
223 #[test]
224 fn test_query_limit_from_newest() {
225 let temp_dir = tempfile::tempdir().unwrap();
226 let db_path = temp_dir.path().join("test.redb");
227
228 let mut audit = Audit::open_empty(db_path).unwrap();
229
230 for i in 0..10 {
232 audit.add_entry(format!("SELECT {};", i)).unwrap();
233 std::thread::sleep(std::time::Duration::from_micros(10));
234 }
235
236 let opts = QueryOptions {
238 limit: Some(3),
239 from_oldest: false,
240 ..Default::default()
241 };
242 let entries = audit.query(&opts).unwrap();
243 assert_eq!(entries.len(), 3);
244 assert_eq!(entries[0].1.query, "SELECT 7;");
245 assert_eq!(entries[1].1.query, "SELECT 8;");
246 assert_eq!(entries[2].1.query, "SELECT 9;");
247 }
248}