bestool_psql/audit/
library.rs1use std::io::Write;
2use std::path::{Path, PathBuf};
3
4use jiff::Timestamp;
5use miette::{IntoDiagnostic, Result};
6use tracing::{debug, info};
7
8use super::{Audit, AuditEntry, AuditEntryWithTimestamp};
9
10fn parse_timestamp(s: &str) -> Result<Timestamp> {
12 if let Ok(ts) = s.parse::<Timestamp>() {
14 return Ok(ts);
15 }
16
17 jiff::civil::DateTime::strptime("%Y-%m-%d %H:%M:%S", s)
19 .or_else(|_| jiff::civil::DateTime::strptime("%Y-%m-%d", s))
20 .into_diagnostic()?
21 .to_zoned(jiff::tz::TimeZone::system())
22 .into_diagnostic()
23 .map(|zdt| zdt.timestamp())
24}
25
26#[derive(Debug, Clone, Default)]
28pub struct QueryOptions {
29 pub limit: Option<usize>,
31 pub from_oldest: bool,
33 pub since: Option<String>,
35 pub until: Option<String>,
37}
38
39#[derive(Debug, Clone, Default)]
41pub struct ExportOptions {
42 pub audit_path: Option<PathBuf>,
44 pub query_options: QueryOptions,
46 pub orphans: bool,
48}
49
50impl Audit {
51 pub fn query(&self, options: &QueryOptions) -> Result<Vec<(u64, AuditEntry)>> {
57 debug!(?options, "querying audit entries");
58
59 let mut entries = self.list()?;
61
62 if let Some(ref since_str) = options.since {
64 let since = parse_timestamp(since_str)?;
65 let since_micros = since.as_microsecond() as u64;
66 entries.retain(|(ts, _)| *ts >= since_micros);
67 }
68
69 if let Some(ref until_str) = options.until {
70 let until = parse_timestamp(until_str)?;
71 let until_micros = until.as_microsecond() as u64;
72 entries.retain(|(ts, _)| *ts <= until_micros);
73 }
74
75 if let Some(limit) = options.limit
77 && limit > 0
78 {
79 if options.from_oldest {
80 entries.truncate(limit);
82 } else {
83 let start = entries.len().saturating_sub(limit);
85 entries.drain(..start);
86 }
87 }
88
89 debug!(count = entries.len(), "returning audit entries");
90 Ok(entries)
91 }
92
93 pub fn find_orphans(audit_dir: impl AsRef<Path>) -> Result<Vec<std::path::PathBuf>> {
97 let audit_dir = audit_dir.as_ref();
98 let main_path = Self::main_db_path(audit_dir);
99
100 super::multi_process::WorkingDatabase::find_orphan_databases(&main_path)
101 }
102
103 pub fn open_file(path: impl AsRef<Path>) -> Result<Self> {
105 let path = path.as_ref();
106 debug!(?path, "opening audit database file");
107
108 let db = redb::Database::open(path).into_diagnostic()?;
109 let db = std::sync::Arc::new(db);
110
111 let repl_state = crate::repl::ReplState {
112 output_file: None,
113 sys_user: String::new(),
114 db_user: String::new(),
115 expanded_mode: false,
116 write_mode: false,
117 ots: None,
118 use_colours: false,
119 vars: std::collections::BTreeMap::new(),
120 snippets: crate::snippets::Snippets::new(),
121 transaction_state: crate::repl::TransactionState::None,
122 result_store: crate::result_store::ResultStore::new(),
123 };
124
125 let audit = Self {
126 db,
127 repl_state: std::sync::Arc::new(std::sync::Mutex::new(repl_state)),
128 working_info: None,
129 sync_thread: None,
130 };
131
132 Ok(audit)
133 }
134}
135
136pub fn export_audit_entries(options: ExportOptions) -> Result<()> {
141 let audit_path = if let Some(path) = options.audit_path {
142 path
143 } else {
144 Audit::default_path()?
145 };
146
147 debug!(?audit_path, "using audit path");
148
149 let mut stdout = std::io::stdout().lock();
150
151 if options.orphans {
152 let orphans = Audit::find_orphans(&audit_path)?;
154
155 if orphans.is_empty() {
156 info!("no orphan databases found");
157 return Ok(());
158 }
159
160 for orphan_path in orphans {
161 info!(
162 "reading orphan: {}",
163 orphan_path.file_name().unwrap().to_string_lossy()
164 );
165
166 let audit = Audit::open_file(&orphan_path)?;
167 let entries = audit.query(&options.query_options)?;
168
169 for (timestamp, entry) in entries {
170 let entry_with_ts =
171 AuditEntryWithTimestamp::from_entry_and_timestamp(entry, timestamp);
172 let json = serde_json::to_string(&entry_with_ts).into_diagnostic()?;
173 writeln!(stdout, "{}", json).into_diagnostic()?;
174 }
175 }
176 } else {
177 let audit = Audit::open_file(Audit::main_db_path(&audit_path))?;
179 let entries = audit.query(&options.query_options)?;
180
181 for (timestamp, entry) in entries {
182 let entry_with_ts = AuditEntryWithTimestamp::from_entry_and_timestamp(entry, timestamp);
183 let json = serde_json::to_string(&entry_with_ts).into_diagnostic()?;
184 writeln!(stdout, "{}", json).into_diagnostic()?;
185 }
186 }
187
188 Ok(())
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 #[test]
196 fn test_query_limit_from_oldest() {
197 let temp_dir = tempfile::tempdir().unwrap();
198 let db_path = temp_dir.path().join("test.redb");
199
200 let mut audit = Audit::open_empty(db_path).unwrap();
201
202 for i in 0..10 {
204 audit.add_entry(format!("SELECT {};", i)).unwrap();
205 std::thread::sleep(std::time::Duration::from_micros(10));
207 }
208
209 let opts = QueryOptions {
211 limit: Some(3),
212 from_oldest: true,
213 ..Default::default()
214 };
215 let entries = audit.query(&opts).unwrap();
216 assert_eq!(entries.len(), 3);
217 assert_eq!(entries[0].1.query, "SELECT 0;");
218 assert_eq!(entries[1].1.query, "SELECT 1;");
219 assert_eq!(entries[2].1.query, "SELECT 2;");
220 }
221
222 #[test]
223 fn test_query_limit_from_newest() {
224 let temp_dir = tempfile::tempdir().unwrap();
225 let db_path = temp_dir.path().join("test.redb");
226
227 let mut audit = Audit::open_empty(db_path).unwrap();
228
229 for i in 0..10 {
231 audit.add_entry(format!("SELECT {};", i)).unwrap();
232 std::thread::sleep(std::time::Duration::from_micros(10));
233 }
234
235 let opts = QueryOptions {
237 limit: Some(3),
238 from_oldest: false,
239 ..Default::default()
240 };
241 let entries = audit.query(&opts).unwrap();
242 assert_eq!(entries.len(), 3);
243 assert_eq!(entries[0].1.query, "SELECT 7;");
244 assert_eq!(entries[1].1.query, "SELECT 8;");
245 assert_eq!(entries[2].1.query, "SELECT 9;");
246 }
247}