use std::collections::{HashMap, HashSet};
use rusqlite::Connection;
use crate::config::{ScopeFilter, TalonConfig};
use crate::contracts::VaultPath;
use crate::indexing::change_tracking;
use crate::query::{ChangeEntry, ChangesInput, ChangesResponse, TombstoneEntry};
#[must_use]
pub fn query_changes(
conn: &Connection,
input: &ChangesInput,
config: Option<&TalonConfig>,
) -> ChangesResponse {
let empty = ChangesResponse {
vault: None,
added: Vec::new(),
modified: Vec::new(),
deleted: Vec::new(),
};
let Ok(since_ms) = change_tracking::parse_since(&input.since) else {
return empty;
};
let Some(all_events) = fetch_events(conn) else {
return empty;
};
let filter = config.map(|cfg| {
ScopeFilter::from_args(cfg, &input.scope, &input.scope_only, input.scope_all)
.unwrap_or_else(|_| ScopeFilter::default_for(cfg))
});
classify(
&all_events,
since_ms,
filter.as_ref(),
input.limit.get() as usize,
)
}
fn fetch_events(conn: &Connection) -> Option<Vec<(String, String, u64)>> {
let Ok(mut stmt) = conn.prepare("SELECT action, path, timestamp FROM event_log ORDER BY id")
else {
return None;
};
let Ok(rows) = stmt.query_map([], |row| {
let action: String = row.get(0)?;
let path: String = row.get(1)?;
let ts: String = row.get(2)?;
Ok((action, path, ts))
}) else {
return None;
};
let Ok(events): rusqlite::Result<Vec<_>> = rows.collect() else {
return None;
};
Some(
events
.into_iter()
.filter_map(|(action, path, ts_str)| {
rfc3339_to_ms(&ts_str).map(|ms| (action, path, ms))
})
.collect(),
)
}
fn classify(
all_events: &[(String, String, u64)],
since_ms: u64,
filter: Option<&ScopeFilter<'_>>,
limit: usize,
) -> ChangesResponse {
let indexed_before: HashSet<String> = all_events
.iter()
.filter(|(action, _, ts)| action == "index" && *ts < since_ms)
.map(|(_, path, _)| path.clone())
.collect();
let mut latest_index: HashMap<String, u64> = HashMap::new();
let mut latest_delete: HashMap<String, u64> = HashMap::new();
for (action, path, ts_ms) in all_events {
if *ts_ms < since_ms {
continue;
}
if let Some(f) = filter
&& !f.accepts(path)
{
continue;
}
let map = if action == "index" {
&mut latest_index
} else if action == "delete" {
&mut latest_delete
} else {
continue;
};
let entry = map.entry(path.clone()).or_insert(0);
if *ts_ms > *entry {
*entry = *ts_ms;
}
}
let mut added: Vec<ChangeEntry> = Vec::new();
let mut modified: Vec<ChangeEntry> = Vec::new();
for (path, ts_ms) in latest_index {
let Ok(vault_path) = VaultPath::parse(&path) else {
continue;
};
let entry = ChangeEntry {
path: vault_path,
indexed_at: super::mtime::format_iso8601(ts_ms).unwrap_or_default(),
};
if indexed_before.contains(&path) {
modified.push(entry);
} else {
added.push(entry);
}
}
let mut deleted: Vec<TombstoneEntry> = latest_delete
.iter()
.filter_map(|(path, ts_ms)| {
VaultPath::parse(path)
.ok()
.map(|vault_path| TombstoneEntry {
path: vault_path,
deleted_at: super::mtime::format_iso8601(*ts_ms).unwrap_or_default(),
})
})
.collect();
added.sort_by(|a, b| b.indexed_at.cmp(&a.indexed_at));
modified.sort_by(|a, b| b.indexed_at.cmp(&a.indexed_at));
deleted.sort_by(|a, b| b.deleted_at.cmp(&a.deleted_at));
let mut remaining = limit;
added.truncate(remaining);
remaining = remaining.saturating_sub(added.len());
modified.truncate(remaining);
remaining = remaining.saturating_sub(modified.len());
deleted.truncate(remaining);
ChangesResponse {
vault: None,
added,
modified,
deleted,
}
}
fn rfc3339_to_ms(s: &str) -> Option<u64> {
time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
.ok()
.and_then(|dt| u64::try_from(dt.unix_timestamp_nanos() / 1_000_000).ok())
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use rusqlite::Connection;
use super::query_changes;
use crate::constants::CHANGES_DEFAULT_LIMIT;
use crate::contracts::PositiveCount;
use crate::indexing::migrations::run_migrations;
use crate::query::ChangesInput;
fn fresh_db() -> Connection {
let mut conn = Connection::open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
conn
}
fn insert_event(conn: &Connection, action: &str, path: &str, timestamp: &str) {
conn.execute(
"INSERT INTO event_log (action, path, timestamp) VALUES (?, ?, ?)",
rusqlite::params![action, path, timestamp],
)
.unwrap();
}
fn changes_input(since: &str) -> ChangesInput {
ChangesInput {
since: since.to_string(),
scope: Vec::new(),
scope_only: Vec::new(),
scope_all: false,
limit: PositiveCount::new(100, "limit").unwrap(),
}
}
fn changes_input_with_limit(since: &str, limit: u16) -> ChangesInput {
ChangesInput {
since: since.to_string(),
scope: Vec::new(),
scope_only: Vec::new(),
scope_all: false,
limit: PositiveCount::new(limit, "limit").unwrap(),
}
}
#[test]
fn new_index_event_classified_as_added() {
let conn = fresh_db();
insert_event(&conn, "index", "a.md", "2024-01-15T10:30:01Z");
let result = query_changes(&conn, &changes_input("2024-01-15T10:30:00Z"), None);
assert_eq!(result.added.len(), 1);
assert_eq!(result.added[0].path.as_str(), "a.md");
assert!(result.modified.is_empty());
assert!(result.deleted.is_empty());
}
#[test]
fn reindex_after_prior_index_classified_as_modified() {
let conn = fresh_db();
insert_event(&conn, "index", "a.md", "2024-01-15T09:00:00Z");
insert_event(&conn, "index", "a.md", "2024-01-15T10:30:01Z");
let result = query_changes(&conn, &changes_input("2024-01-15T10:30:00Z"), None);
assert!(result.added.is_empty());
assert_eq!(result.modified.len(), 1);
assert_eq!(result.modified[0].path.as_str(), "a.md");
assert!(result.deleted.is_empty());
}
#[test]
fn delete_event_classified_as_deleted() {
let conn = fresh_db();
insert_event(&conn, "index", "a.md", "2024-01-15T09:00:00Z");
insert_event(&conn, "delete", "a.md", "2024-01-15T10:30:01Z");
let result = query_changes(&conn, &changes_input("2024-01-15T10:30:00Z"), None);
assert!(result.added.is_empty());
assert!(result.modified.is_empty());
assert_eq!(result.deleted.len(), 1);
assert_eq!(result.deleted[0].path.as_str(), "a.md");
}
#[test]
fn events_before_since_are_excluded() {
let conn = fresh_db();
insert_event(&conn, "index", "a.md", "2024-01-15T09:00:00Z");
let result = query_changes(&conn, &changes_input("2024-01-15T10:30:00Z"), None);
assert!(result.added.is_empty());
assert!(result.modified.is_empty());
assert!(result.deleted.is_empty());
}
#[test]
fn invalid_since_returns_empty() {
let conn = fresh_db();
insert_event(&conn, "index", "a.md", "2024-01-15T10:30:01Z");
let result = query_changes(&conn, &changes_input("not-a-timestamp"), None);
assert!(result.added.is_empty());
assert!(result.modified.is_empty());
assert!(result.deleted.is_empty());
}
#[test]
fn entries_are_ordered_newest_first_before_limit() {
let conn = fresh_db();
insert_event(&conn, "index", "old.md", "2024-01-15T10:30:01Z");
insert_event(&conn, "index", "new.md", "2024-01-15T10:30:03Z");
insert_event(&conn, "index", "middle.md", "2024-01-15T10:30:02Z");
let result = query_changes(
&conn,
&changes_input_with_limit("2024-01-15T10:30:00Z", 2),
None,
);
let paths: Vec<&str> = result
.added
.iter()
.map(|entry| entry.path.as_str())
.collect();
assert_eq!(paths, vec!["new.md", "middle.md"]);
}
#[test]
fn serde_default_limit_is_change_feed_default() {
let input: ChangesInput =
serde_json::from_value(serde_json::json!({ "since": "2024-01-15T10:30:00Z" })).unwrap();
assert_eq!(input.limit.get(), CHANGES_DEFAULT_LIMIT);
}
}