bestool_psql/audit/
library.rs1use std::io::Write;
2use std::path::{Path, PathBuf};
3
4use jiff::Timestamp;
5use miette::{IntoDiagnostic, Result};
6use tracing::{debug, info};
7
8use super::{Audit, AuditEntry, AuditEntryWithTimestamp};
9
10fn parse_timestamp(s: &str) -> Result<Timestamp> {
12 if let Ok(ts) = s.parse::<Timestamp>() {
14 return Ok(ts);
15 }
16
17 jiff::civil::DateTime::strptime("%Y-%m-%d %H:%M:%S", s)
19 .or_else(|_| jiff::civil::DateTime::strptime("%Y-%m-%d", s))
20 .into_diagnostic()?
21 .to_zoned(jiff::tz::TimeZone::system())
22 .into_diagnostic()
23 .map(|zdt| zdt.timestamp())
24}
25
26#[derive(Debug, Clone, Default)]
28pub struct QueryOptions {
29 pub limit: Option<usize>,
31 pub from_oldest: bool,
33 pub since: Option<String>,
35 pub until: Option<String>,
37}
38
39#[derive(Debug, Clone, Default)]
41pub struct ExportOptions {
42 pub audit_path: Option<PathBuf>,
44 pub query_options: QueryOptions,
46 pub orphans: bool,
48}
49
50impl Audit {
51 pub fn query(&self, options: &QueryOptions) -> Result<Vec<(u64, AuditEntry)>> {
57 debug!(?options, "querying audit entries");
58
59 let mut entries = self.list()?;
61
62 if let Some(ref since_str) = options.since {
64 let since = parse_timestamp(since_str)?;
65 let since_micros = since.as_microsecond() as u64;
66 entries.retain(|(ts, _)| *ts >= since_micros);
67 }
68
69 if let Some(ref until_str) = options.until {
70 let until = parse_timestamp(until_str)?;
71 let until_micros = until.as_microsecond() as u64;
72 entries.retain(|(ts, _)| *ts <= until_micros);
73 }
74
75 if let Some(limit) = options.limit
77 && limit > 0
78 {
79 if options.from_oldest {
80 entries.truncate(limit);
82 } else {
83 let start = entries.len().saturating_sub(limit);
85 entries.drain(..start);
86 }
87 }
88
89 debug!(count = entries.len(), "returning audit entries");
90 Ok(entries)
91 }
92
93 pub fn find_orphans(audit_dir: impl AsRef<Path>) -> Result<Vec<std::path::PathBuf>> {
97 let audit_dir = audit_dir.as_ref();
98 let main_path = Self::main_db_path(audit_dir);
99
100 super::multi_process::WorkingDatabase::find_orphan_databases(&main_path)
101 }
102
103 pub fn open_file(path: impl AsRef<Path>) -> Result<Self> {
105 let path = path.as_ref();
106 debug!(?path, "opening audit database file");
107
108 let db = redb::Database::open(path).into_diagnostic()?;
109 let db = std::sync::Arc::new(db);
110
111 let repl_state = crate::repl::ReplState {
112 config: Default::default(),
113 output_file: None,
114 sys_user: String::new(),
115 db_user: String::new(),
116 expanded_mode: false,
117 write_mode: false,
118 redact_mode: false,
119 ots: None,
120 vars: std::collections::BTreeMap::new(),
121 snippets: crate::snippets::Snippets::new(),
122 transaction_state: crate::repl::TransactionState::None,
123 result_store: crate::result_store::ResultStore::new(),
124 from_snippet_or_include: false,
125 initial_content: None,
126 };
127
128 let audit = Self {
129 db,
130 repl_state: std::sync::Arc::new(std::sync::Mutex::new(repl_state)),
131 working_info: None,
132 sync_thread: None,
133 };
134
135 Ok(audit)
136 }
137}
138
139pub fn export_audit_entries(options: ExportOptions) -> Result<()> {
144 let audit_path = if let Some(path) = options.audit_path {
145 path
146 } else {
147 Audit::default_path()?
148 };
149
150 debug!(?audit_path, "using audit path");
151
152 let mut stdout = std::io::stdout().lock();
153
154 if options.orphans {
155 let orphans = Audit::find_orphans(&audit_path)?;
157
158 if orphans.is_empty() {
159 info!("no orphan databases found");
160 return Ok(());
161 }
162
163 for orphan_path in orphans {
164 info!(
165 "reading orphan: {}",
166 orphan_path.file_name().unwrap().to_string_lossy()
167 );
168
169 let audit = Audit::open_file(&orphan_path)?;
170 let entries = audit.query(&options.query_options)?;
171
172 for (timestamp, entry) in entries {
173 let entry_with_ts =
174 AuditEntryWithTimestamp::from_entry_and_timestamp(entry, timestamp);
175 let json = serde_json::to_string(&entry_with_ts).into_diagnostic()?;
176 writeln!(stdout, "{}", json).into_diagnostic()?;
177 }
178 }
179 } else {
180 let audit = Audit::open_file(Audit::main_db_path(&audit_path))?;
182 let entries = audit.query(&options.query_options)?;
183
184 for (timestamp, entry) in entries {
185 let entry_with_ts = AuditEntryWithTimestamp::from_entry_and_timestamp(entry, timestamp);
186 let json = serde_json::to_string(&entry_with_ts).into_diagnostic()?;
187 writeln!(stdout, "{}", json).into_diagnostic()?;
188 }
189 }
190
191 Ok(())
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197
198 #[test]
199 fn test_query_limit_from_oldest() {
200 let temp_dir = tempfile::tempdir().unwrap();
201 let db_path = temp_dir.path().join("test.redb");
202
203 let mut audit = Audit::open_empty(db_path).unwrap();
204
205 for i in 0..10 {
207 audit.add_entry(format!("SELECT {};", i)).unwrap();
208 std::thread::sleep(std::time::Duration::from_micros(10));
210 }
211
212 let opts = QueryOptions {
214 limit: Some(3),
215 from_oldest: true,
216 ..Default::default()
217 };
218 let entries = audit.query(&opts).unwrap();
219 assert_eq!(entries.len(), 3);
220 assert_eq!(entries[0].1.query, "SELECT 0;");
221 assert_eq!(entries[1].1.query, "SELECT 1;");
222 assert_eq!(entries[2].1.query, "SELECT 2;");
223 }
224
225 #[test]
226 fn test_query_limit_from_newest() {
227 let temp_dir = tempfile::tempdir().unwrap();
228 let db_path = temp_dir.path().join("test.redb");
229
230 let mut audit = Audit::open_empty(db_path).unwrap();
231
232 for i in 0..10 {
234 audit.add_entry(format!("SELECT {};", i)).unwrap();
235 std::thread::sleep(std::time::Duration::from_micros(10));
236 }
237
238 let opts = QueryOptions {
240 limit: Some(3),
241 from_oldest: false,
242 ..Default::default()
243 };
244 let entries = audit.query(&opts).unwrap();
245 assert_eq!(entries.len(), 3);
246 assert_eq!(entries[0].1.query, "SELECT 7;");
247 assert_eq!(entries[1].1.query, "SELECT 8;");
248 assert_eq!(entries[2].1.query, "SELECT 9;");
249 }
250}