Skip to main content

talon_core/query/
changes.rs

1//! Changes handler — returns added/modified/deleted notes from the `event_log`.
2
3use std::collections::{HashMap, HashSet};
4
5use rusqlite::Connection;
6
7use crate::config::{ScopeFilter, TalonConfig};
8use crate::contracts::VaultPath;
9use crate::indexing::change_tracking;
10use crate::query::{ChangeEntry, ChangesInput, ChangesResponse, TombstoneEntry};
11
12/// Returns notes that were added, modified, or deleted since `input.since`.
13///
14/// Classification:
15/// - **added**: first `'index'` event for a path falls within the window.
16/// - **modified**: an `'index'` event within the window but an earlier `'index'`
17///   event exists before the window.
18/// - **deleted**: a `'delete'` event within the window.
19///
20/// Returns empty lists when `since` cannot be parsed.
21#[must_use]
22pub fn query_changes(
23    conn: &Connection,
24    input: &ChangesInput,
25    config: Option<&TalonConfig>,
26) -> ChangesResponse {
27    let empty = ChangesResponse {
28        vault: None,
29        added: Vec::new(),
30        modified: Vec::new(),
31        deleted: Vec::new(),
32    };
33    let Ok(since_ms) = change_tracking::parse_since(&input.since) else {
34        return empty;
35    };
36    let Some(all_events) = fetch_events(conn) else {
37        return empty;
38    };
39    let filter = config.map(|cfg| {
40        ScopeFilter::from_args(cfg, &input.scope, &input.scope_only, input.scope_all)
41            .unwrap_or_else(|_| ScopeFilter::default_for(cfg))
42    });
43    classify(
44        &all_events,
45        since_ms,
46        filter.as_ref(),
47        input.limit.get() as usize,
48    )
49}
50
51/// Loads all `event_log` rows as `(action, path, timestamp_ms)` tuples.
52fn fetch_events(conn: &Connection) -> Option<Vec<(String, String, u64)>> {
53    let Ok(mut stmt) = conn.prepare("SELECT action, path, timestamp FROM event_log ORDER BY id")
54    else {
55        return None;
56    };
57    let Ok(rows) = stmt.query_map([], |row| {
58        let action: String = row.get(0)?;
59        let path: String = row.get(1)?;
60        let ts: String = row.get(2)?;
61        Ok((action, path, ts))
62    }) else {
63        return None;
64    };
65    let Ok(events): rusqlite::Result<Vec<_>> = rows.collect() else {
66        return None;
67    };
68    Some(
69        events
70            .into_iter()
71            .filter_map(|(action, path, ts_str)| {
72                rfc3339_to_ms(&ts_str).map(|ms| (action, path, ms))
73            })
74            .collect(),
75    )
76}
77
78/// Classifies raw events into `ChangesResponse` buckets.
79fn classify(
80    all_events: &[(String, String, u64)],
81    since_ms: u64,
82    filter: Option<&ScopeFilter<'_>>,
83    limit: usize,
84) -> ChangesResponse {
85    let indexed_before: HashSet<String> = all_events
86        .iter()
87        .filter(|(action, _, ts)| action == "index" && *ts < since_ms)
88        .map(|(_, path, _)| path.clone())
89        .collect();
90
91    let mut latest_index: HashMap<String, u64> = HashMap::new();
92    let mut latest_delete: HashMap<String, u64> = HashMap::new();
93
94    for (action, path, ts_ms) in all_events {
95        if *ts_ms < since_ms {
96            continue;
97        }
98        if let Some(f) = filter
99            && !f.accepts(path)
100        {
101            continue;
102        }
103        let map = if action == "index" {
104            &mut latest_index
105        } else if action == "delete" {
106            &mut latest_delete
107        } else {
108            continue;
109        };
110        let entry = map.entry(path.clone()).or_insert(0);
111        if *ts_ms > *entry {
112            *entry = *ts_ms;
113        }
114    }
115
116    let mut added: Vec<ChangeEntry> = Vec::new();
117    let mut modified: Vec<ChangeEntry> = Vec::new();
118
119    for (path, ts_ms) in latest_index {
120        let Ok(vault_path) = VaultPath::parse(&path) else {
121            continue;
122        };
123        let entry = ChangeEntry {
124            path: vault_path,
125            indexed_at: super::mtime::format_iso8601(ts_ms).unwrap_or_default(),
126        };
127        if indexed_before.contains(&path) {
128            modified.push(entry);
129        } else {
130            added.push(entry);
131        }
132    }
133
134    let mut deleted: Vec<TombstoneEntry> = latest_delete
135        .iter()
136        .filter_map(|(path, ts_ms)| {
137            VaultPath::parse(path)
138                .ok()
139                .map(|vault_path| TombstoneEntry {
140                    path: vault_path,
141                    deleted_at: super::mtime::format_iso8601(*ts_ms).unwrap_or_default(),
142                })
143        })
144        .collect();
145
146    // ISO 8601 strings sort lexicographically the same as their epoch-ms source.
147    added.sort_by(|a, b| b.indexed_at.cmp(&a.indexed_at));
148    modified.sort_by(|a, b| b.indexed_at.cmp(&a.indexed_at));
149    deleted.sort_by(|a, b| b.deleted_at.cmp(&a.deleted_at));
150
151    let mut remaining = limit;
152    added.truncate(remaining);
153    remaining = remaining.saturating_sub(added.len());
154    modified.truncate(remaining);
155    remaining = remaining.saturating_sub(modified.len());
156    deleted.truncate(remaining);
157
158    ChangesResponse {
159        vault: None,
160        added,
161        modified,
162        deleted,
163    }
164}
165
166fn rfc3339_to_ms(s: &str) -> Option<u64> {
167    time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
168        .ok()
169        .and_then(|dt| u64::try_from(dt.unix_timestamp_nanos() / 1_000_000).ok())
170}
171
172#[cfg(test)]
173#[allow(clippy::unwrap_used)]
174mod tests {
175    use rusqlite::Connection;
176
177    use super::query_changes;
178    use crate::constants::CHANGES_DEFAULT_LIMIT;
179    use crate::contracts::PositiveCount;
180    use crate::indexing::migrations::run_migrations;
181    use crate::query::ChangesInput;
182
183    fn fresh_db() -> Connection {
184        let mut conn = Connection::open_in_memory().unwrap();
185        run_migrations(&mut conn).unwrap();
186        conn
187    }
188
189    fn insert_event(conn: &Connection, action: &str, path: &str, timestamp: &str) {
190        conn.execute(
191            "INSERT INTO event_log (action, path, timestamp) VALUES (?, ?, ?)",
192            rusqlite::params![action, path, timestamp],
193        )
194        .unwrap();
195    }
196
197    fn changes_input(since: &str) -> ChangesInput {
198        ChangesInput {
199            since: since.to_string(),
200            scope: Vec::new(),
201            scope_only: Vec::new(),
202            scope_all: false,
203            limit: PositiveCount::new(100, "limit").unwrap(),
204        }
205    }
206
207    fn changes_input_with_limit(since: &str, limit: u16) -> ChangesInput {
208        ChangesInput {
209            since: since.to_string(),
210            scope: Vec::new(),
211            scope_only: Vec::new(),
212            scope_all: false,
213            limit: PositiveCount::new(limit, "limit").unwrap(),
214        }
215    }
216
217    #[test]
218    fn new_index_event_classified_as_added() {
219        let conn = fresh_db();
220        insert_event(&conn, "index", "a.md", "2024-01-15T10:30:01Z");
221
222        let result = query_changes(&conn, &changes_input("2024-01-15T10:30:00Z"), None);
223
224        assert_eq!(result.added.len(), 1);
225        assert_eq!(result.added[0].path.as_str(), "a.md");
226        assert!(result.modified.is_empty());
227        assert!(result.deleted.is_empty());
228    }
229
230    #[test]
231    fn reindex_after_prior_index_classified_as_modified() {
232        let conn = fresh_db();
233        insert_event(&conn, "index", "a.md", "2024-01-15T09:00:00Z");
234        insert_event(&conn, "index", "a.md", "2024-01-15T10:30:01Z");
235
236        let result = query_changes(&conn, &changes_input("2024-01-15T10:30:00Z"), None);
237
238        assert!(result.added.is_empty());
239        assert_eq!(result.modified.len(), 1);
240        assert_eq!(result.modified[0].path.as_str(), "a.md");
241        assert!(result.deleted.is_empty());
242    }
243
244    #[test]
245    fn delete_event_classified_as_deleted() {
246        let conn = fresh_db();
247        insert_event(&conn, "index", "a.md", "2024-01-15T09:00:00Z");
248        insert_event(&conn, "delete", "a.md", "2024-01-15T10:30:01Z");
249
250        let result = query_changes(&conn, &changes_input("2024-01-15T10:30:00Z"), None);
251
252        assert!(result.added.is_empty());
253        assert!(result.modified.is_empty());
254        assert_eq!(result.deleted.len(), 1);
255        assert_eq!(result.deleted[0].path.as_str(), "a.md");
256    }
257
258    #[test]
259    fn events_before_since_are_excluded() {
260        let conn = fresh_db();
261        insert_event(&conn, "index", "a.md", "2024-01-15T09:00:00Z");
262
263        let result = query_changes(&conn, &changes_input("2024-01-15T10:30:00Z"), None);
264
265        assert!(result.added.is_empty());
266        assert!(result.modified.is_empty());
267        assert!(result.deleted.is_empty());
268    }
269
270    #[test]
271    fn invalid_since_returns_empty() {
272        let conn = fresh_db();
273        insert_event(&conn, "index", "a.md", "2024-01-15T10:30:01Z");
274
275        let result = query_changes(&conn, &changes_input("not-a-timestamp"), None);
276
277        assert!(result.added.is_empty());
278        assert!(result.modified.is_empty());
279        assert!(result.deleted.is_empty());
280    }
281
282    #[test]
283    fn entries_are_ordered_newest_first_before_limit() {
284        let conn = fresh_db();
285        insert_event(&conn, "index", "old.md", "2024-01-15T10:30:01Z");
286        insert_event(&conn, "index", "new.md", "2024-01-15T10:30:03Z");
287        insert_event(&conn, "index", "middle.md", "2024-01-15T10:30:02Z");
288
289        let result = query_changes(
290            &conn,
291            &changes_input_with_limit("2024-01-15T10:30:00Z", 2),
292            None,
293        );
294
295        let paths: Vec<&str> = result
296            .added
297            .iter()
298            .map(|entry| entry.path.as_str())
299            .collect();
300        assert_eq!(paths, vec!["new.md", "middle.md"]);
301    }
302
303    #[test]
304    fn serde_default_limit_is_change_feed_default() {
305        let input: ChangesInput =
306            serde_json::from_value(serde_json::json!({ "since": "2024-01-15T10:30:00Z" })).unwrap();
307
308        assert_eq!(input.limit.get(), CHANGES_DEFAULT_LIMIT);
309    }
310}