bestool_psql/audit/
library.rs1use std::io::Write;
2use std::path::{Path, PathBuf};
3
4use jiff::Timestamp;
5use miette::{IntoDiagnostic, Result};
6use tracing::{debug, info};
7
8use super::{Audit, AuditEntry, AuditEntryWithTimestamp};
9
10fn parse_timestamp(s: &str) -> Result<Timestamp> {
12 if let Ok(ts) = s.parse::<Timestamp>() {
14 return Ok(ts);
15 }
16
17 jiff::civil::DateTime::strptime("%Y-%m-%d %H:%M:%S", s)
19 .or_else(|_| jiff::civil::DateTime::strptime("%Y-%m-%d", s))
20 .into_diagnostic()?
21 .to_zoned(jiff::tz::TimeZone::system())
22 .into_diagnostic()
23 .map(|zdt| zdt.timestamp())
24}
25
26#[derive(Debug, Clone, Default)]
28pub struct QueryOptions {
29 pub limit: Option<usize>,
31 pub from_oldest: bool,
33 pub since: Option<String>,
35 pub until: Option<String>,
37}
38
39#[derive(Debug, Clone, Default)]
41pub struct ExportOptions {
42 pub audit_path: Option<PathBuf>,
44 pub query_options: QueryOptions,
46 pub orphans: bool,
48}
49
50impl Audit {
51 pub fn query(&self, options: &QueryOptions) -> Result<Vec<(u64, AuditEntry)>> {
57 debug!(?options, "querying audit entries");
58
59 let mut entries = self.list()?;
61
62 if let Some(ref since_str) = options.since {
64 let since = parse_timestamp(since_str)?;
65 let since_micros = since.as_microsecond() as u64;
66 entries.retain(|(ts, _)| *ts >= since_micros);
67 }
68
69 if let Some(ref until_str) = options.until {
70 let until = parse_timestamp(until_str)?;
71 let until_micros = until.as_microsecond() as u64;
72 entries.retain(|(ts, _)| *ts <= until_micros);
73 }
74
75 if let Some(limit) = options.limit
77 && limit > 0
78 {
79 if options.from_oldest {
80 entries.truncate(limit);
82 } else {
83 let start = entries.len().saturating_sub(limit);
85 entries.drain(..start);
86 }
87 }
88
89 debug!(count = entries.len(), "returning audit entries");
90 Ok(entries)
91 }
92
93 pub fn find_orphans(audit_dir: impl AsRef<Path>) -> Result<Vec<std::path::PathBuf>> {
97 let audit_dir = audit_dir.as_ref();
98 let main_path = Self::main_db_path(audit_dir);
99
100 super::multi_process::WorkingDatabase::find_orphan_databases(&main_path)
101 }
102
103 pub fn open_file(path: impl AsRef<Path>) -> Result<Self> {
105 let path = path.as_ref();
106 debug!(?path, "opening audit database file");
107
108 let db = redb::Database::open(path).into_diagnostic()?;
109 let db = std::sync::Arc::new(db);
110
111 let repl_state = crate::repl::ReplState {
112 config: Default::default(),
113 output_file: None,
114 sys_user: String::new(),
115 db_user: String::new(),
116 expanded_mode: false,
117 write_mode: false,
118 redact_mode: false,
119 ots: None,
120 vars: std::collections::BTreeMap::new(),
121 snippets: crate::snippets::Snippets::new(),
122 transaction_state: crate::repl::TransactionState::None,
123 result_store: crate::result_store::ResultStore::new(),
124 from_snippet_or_include: false,
125 initial_content: None,
126 write_mode_active_at: None,
127 };
128
129 let audit = Self {
130 db,
131 repl_state: std::sync::Arc::new(std::sync::Mutex::new(repl_state)),
132 working_info: None,
133 sync_thread: None,
134 };
135
136 Ok(audit)
137 }
138}
139
140pub fn export_audit_entries(options: ExportOptions) -> Result<()> {
145 let audit_path = if let Some(path) = options.audit_path {
146 path
147 } else {
148 Audit::default_path()?
149 };
150
151 debug!(?audit_path, "using audit path");
152
153 let mut stdout = std::io::stdout().lock();
154
155 if options.orphans {
156 let orphans = Audit::find_orphans(&audit_path)?;
158
159 if orphans.is_empty() {
160 info!("no orphan databases found");
161 return Ok(());
162 }
163
164 for orphan_path in orphans {
165 info!(
166 "reading orphan: {}",
167 orphan_path.file_name().unwrap().to_string_lossy()
168 );
169
170 let audit = Audit::open_file(&orphan_path)?;
171 let entries = audit.query(&options.query_options)?;
172
173 for (timestamp, entry) in entries {
174 let entry_with_ts =
175 AuditEntryWithTimestamp::from_entry_and_timestamp(entry, timestamp);
176 let json = serde_json::to_string(&entry_with_ts).into_diagnostic()?;
177 writeln!(stdout, "{}", json).into_diagnostic()?;
178 }
179 }
180 } else {
181 let audit = Audit::open_file(Audit::main_db_path(&audit_path))?;
183 let entries = audit.query(&options.query_options)?;
184
185 for (timestamp, entry) in entries {
186 let entry_with_ts = AuditEntryWithTimestamp::from_entry_and_timestamp(entry, timestamp);
187 let json = serde_json::to_string(&entry_with_ts).into_diagnostic()?;
188 writeln!(stdout, "{}", json).into_diagnostic()?;
189 }
190 }
191
192 Ok(())
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198
199 #[test]
200 fn test_query_limit_from_oldest() {
201 let temp_dir = tempfile::tempdir().unwrap();
202 let db_path = temp_dir.path().join("test.redb");
203
204 let mut audit = Audit::open_empty(db_path).unwrap();
205
206 for i in 0..10 {
208 audit.add_entry(format!("SELECT {};", i)).unwrap();
209 std::thread::sleep(std::time::Duration::from_micros(10));
211 }
212
213 let opts = QueryOptions {
215 limit: Some(3),
216 from_oldest: true,
217 ..Default::default()
218 };
219 let entries = audit.query(&opts).unwrap();
220 assert_eq!(entries.len(), 3);
221 assert_eq!(entries[0].1.query, "SELECT 0;");
222 assert_eq!(entries[1].1.query, "SELECT 1;");
223 assert_eq!(entries[2].1.query, "SELECT 2;");
224 }
225
226 #[test]
227 fn test_query_limit_from_newest() {
228 let temp_dir = tempfile::tempdir().unwrap();
229 let db_path = temp_dir.path().join("test.redb");
230
231 let mut audit = Audit::open_empty(db_path).unwrap();
232
233 for i in 0..10 {
235 audit.add_entry(format!("SELECT {};", i)).unwrap();
236 std::thread::sleep(std::time::Duration::from_micros(10));
237 }
238
239 let opts = QueryOptions {
241 limit: Some(3),
242 from_oldest: false,
243 ..Default::default()
244 };
245 let entries = audit.query(&opts).unwrap();
246 assert_eq!(entries.len(), 3);
247 assert_eq!(entries[0].1.query, "SELECT 7;");
248 assert_eq!(entries[1].1.query, "SELECT 8;");
249 assert_eq!(entries[2].1.query, "SELECT 9;");
250 }
251}