1use std::collections::{HashMap, HashSet};
4
5use rusqlite::Connection;
6
7use crate::config::{ScopeFilter, TalonConfig};
8use crate::contracts::VaultPath;
9use crate::indexing::change_tracking;
10use crate::query::{ChangeEntry, ChangesInput, ChangesResponse, TombstoneEntry};
11
12#[must_use]
22pub fn query_changes(
23 conn: &Connection,
24 input: &ChangesInput,
25 config: Option<&TalonConfig>,
26) -> ChangesResponse {
27 let empty = ChangesResponse {
28 vault: None,
29 added: Vec::new(),
30 modified: Vec::new(),
31 deleted: Vec::new(),
32 };
33 let Ok(since_ms) = change_tracking::parse_since(&input.since) else {
34 return empty;
35 };
36 let Some(all_events) = fetch_events(conn) else {
37 return empty;
38 };
39 let filter = config.map(|cfg| {
40 ScopeFilter::from_args(cfg, &input.scope, &input.scope_only, input.scope_all)
41 .unwrap_or_else(|_| ScopeFilter::default_for(cfg))
42 });
43 classify(
44 &all_events,
45 since_ms,
46 filter.as_ref(),
47 input.limit.get() as usize,
48 )
49}
50
51fn fetch_events(conn: &Connection) -> Option<Vec<(String, String, u64)>> {
53 let Ok(mut stmt) = conn.prepare("SELECT action, path, timestamp FROM event_log ORDER BY id")
54 else {
55 return None;
56 };
57 let Ok(rows) = stmt.query_map([], |row| {
58 let action: String = row.get(0)?;
59 let path: String = row.get(1)?;
60 let ts: String = row.get(2)?;
61 Ok((action, path, ts))
62 }) else {
63 return None;
64 };
65 let Ok(events): rusqlite::Result<Vec<_>> = rows.collect() else {
66 return None;
67 };
68 Some(
69 events
70 .into_iter()
71 .filter_map(|(action, path, ts_str)| {
72 rfc3339_to_ms(&ts_str).map(|ms| (action, path, ms))
73 })
74 .collect(),
75 )
76}
77
78fn classify(
80 all_events: &[(String, String, u64)],
81 since_ms: u64,
82 filter: Option<&ScopeFilter<'_>>,
83 limit: usize,
84) -> ChangesResponse {
85 let indexed_before: HashSet<String> = all_events
86 .iter()
87 .filter(|(action, _, ts)| action == "index" && *ts < since_ms)
88 .map(|(_, path, _)| path.clone())
89 .collect();
90
91 let mut latest_index: HashMap<String, u64> = HashMap::new();
92 let mut latest_delete: HashMap<String, u64> = HashMap::new();
93
94 for (action, path, ts_ms) in all_events {
95 if *ts_ms < since_ms {
96 continue;
97 }
98 if let Some(f) = filter
99 && !f.accepts(path)
100 {
101 continue;
102 }
103 let map = if action == "index" {
104 &mut latest_index
105 } else if action == "delete" {
106 &mut latest_delete
107 } else {
108 continue;
109 };
110 let entry = map.entry(path.clone()).or_insert(0);
111 if *ts_ms > *entry {
112 *entry = *ts_ms;
113 }
114 }
115
116 let mut added: Vec<ChangeEntry> = Vec::new();
117 let mut modified: Vec<ChangeEntry> = Vec::new();
118
119 for (path, ts_ms) in latest_index {
120 let Ok(vault_path) = VaultPath::parse(&path) else {
121 continue;
122 };
123 let entry = ChangeEntry {
124 path: vault_path,
125 indexed_at: super::mtime::format_iso8601(ts_ms).unwrap_or_default(),
126 };
127 if indexed_before.contains(&path) {
128 modified.push(entry);
129 } else {
130 added.push(entry);
131 }
132 }
133
134 let mut deleted: Vec<TombstoneEntry> = latest_delete
135 .iter()
136 .filter_map(|(path, ts_ms)| {
137 VaultPath::parse(path)
138 .ok()
139 .map(|vault_path| TombstoneEntry {
140 path: vault_path,
141 deleted_at: super::mtime::format_iso8601(*ts_ms).unwrap_or_default(),
142 })
143 })
144 .collect();
145
146 added.sort_by(|a, b| b.indexed_at.cmp(&a.indexed_at));
148 modified.sort_by(|a, b| b.indexed_at.cmp(&a.indexed_at));
149 deleted.sort_by(|a, b| b.deleted_at.cmp(&a.deleted_at));
150
151 let mut remaining = limit;
152 added.truncate(remaining);
153 remaining = remaining.saturating_sub(added.len());
154 modified.truncate(remaining);
155 remaining = remaining.saturating_sub(modified.len());
156 deleted.truncate(remaining);
157
158 ChangesResponse {
159 vault: None,
160 added,
161 modified,
162 deleted,
163 }
164}
165
166fn rfc3339_to_ms(s: &str) -> Option<u64> {
167 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
168 .ok()
169 .and_then(|dt| u64::try_from(dt.unix_timestamp_nanos() / 1_000_000).ok())
170}
171
172#[cfg(test)]
173#[allow(clippy::unwrap_used)]
174mod tests {
175 use rusqlite::Connection;
176
177 use super::query_changes;
178 use crate::constants::CHANGES_DEFAULT_LIMIT;
179 use crate::contracts::PositiveCount;
180 use crate::indexing::migrations::run_migrations;
181 use crate::query::ChangesInput;
182
183 fn fresh_db() -> Connection {
184 let mut conn = Connection::open_in_memory().unwrap();
185 run_migrations(&mut conn).unwrap();
186 conn
187 }
188
189 fn insert_event(conn: &Connection, action: &str, path: &str, timestamp: &str) {
190 conn.execute(
191 "INSERT INTO event_log (action, path, timestamp) VALUES (?, ?, ?)",
192 rusqlite::params![action, path, timestamp],
193 )
194 .unwrap();
195 }
196
197 fn changes_input(since: &str) -> ChangesInput {
198 ChangesInput {
199 since: since.to_string(),
200 scope: Vec::new(),
201 scope_only: Vec::new(),
202 scope_all: false,
203 limit: PositiveCount::new(100, "limit").unwrap(),
204 }
205 }
206
207 fn changes_input_with_limit(since: &str, limit: u16) -> ChangesInput {
208 ChangesInput {
209 since: since.to_string(),
210 scope: Vec::new(),
211 scope_only: Vec::new(),
212 scope_all: false,
213 limit: PositiveCount::new(limit, "limit").unwrap(),
214 }
215 }
216
217 #[test]
218 fn new_index_event_classified_as_added() {
219 let conn = fresh_db();
220 insert_event(&conn, "index", "a.md", "2024-01-15T10:30:01Z");
221
222 let result = query_changes(&conn, &changes_input("2024-01-15T10:30:00Z"), None);
223
224 assert_eq!(result.added.len(), 1);
225 assert_eq!(result.added[0].path.as_str(), "a.md");
226 assert!(result.modified.is_empty());
227 assert!(result.deleted.is_empty());
228 }
229
230 #[test]
231 fn reindex_after_prior_index_classified_as_modified() {
232 let conn = fresh_db();
233 insert_event(&conn, "index", "a.md", "2024-01-15T09:00:00Z");
234 insert_event(&conn, "index", "a.md", "2024-01-15T10:30:01Z");
235
236 let result = query_changes(&conn, &changes_input("2024-01-15T10:30:00Z"), None);
237
238 assert!(result.added.is_empty());
239 assert_eq!(result.modified.len(), 1);
240 assert_eq!(result.modified[0].path.as_str(), "a.md");
241 assert!(result.deleted.is_empty());
242 }
243
244 #[test]
245 fn delete_event_classified_as_deleted() {
246 let conn = fresh_db();
247 insert_event(&conn, "index", "a.md", "2024-01-15T09:00:00Z");
248 insert_event(&conn, "delete", "a.md", "2024-01-15T10:30:01Z");
249
250 let result = query_changes(&conn, &changes_input("2024-01-15T10:30:00Z"), None);
251
252 assert!(result.added.is_empty());
253 assert!(result.modified.is_empty());
254 assert_eq!(result.deleted.len(), 1);
255 assert_eq!(result.deleted[0].path.as_str(), "a.md");
256 }
257
258 #[test]
259 fn events_before_since_are_excluded() {
260 let conn = fresh_db();
261 insert_event(&conn, "index", "a.md", "2024-01-15T09:00:00Z");
262
263 let result = query_changes(&conn, &changes_input("2024-01-15T10:30:00Z"), None);
264
265 assert!(result.added.is_empty());
266 assert!(result.modified.is_empty());
267 assert!(result.deleted.is_empty());
268 }
269
270 #[test]
271 fn invalid_since_returns_empty() {
272 let conn = fresh_db();
273 insert_event(&conn, "index", "a.md", "2024-01-15T10:30:01Z");
274
275 let result = query_changes(&conn, &changes_input("not-a-timestamp"), None);
276
277 assert!(result.added.is_empty());
278 assert!(result.modified.is_empty());
279 assert!(result.deleted.is_empty());
280 }
281
282 #[test]
283 fn entries_are_ordered_newest_first_before_limit() {
284 let conn = fresh_db();
285 insert_event(&conn, "index", "old.md", "2024-01-15T10:30:01Z");
286 insert_event(&conn, "index", "new.md", "2024-01-15T10:30:03Z");
287 insert_event(&conn, "index", "middle.md", "2024-01-15T10:30:02Z");
288
289 let result = query_changes(
290 &conn,
291 &changes_input_with_limit("2024-01-15T10:30:00Z", 2),
292 None,
293 );
294
295 let paths: Vec<&str> = result
296 .added
297 .iter()
298 .map(|entry| entry.path.as_str())
299 .collect();
300 assert_eq!(paths, vec!["new.md", "middle.md"]);
301 }
302
303 #[test]
304 fn serde_default_limit_is_change_feed_default() {
305 let input: ChangesInput =
306 serde_json::from_value(serde_json::json!({ "since": "2024-01-15T10:30:00Z" })).unwrap();
307
308 assert_eq!(input.limit.get(), CHANGES_DEFAULT_LIMIT);
309 }
310}