use crate::error::Result;
use crate::types::CursorState;
use super::{StateConn, StateStore, pg_sql};
impl StateStore {
pub fn get(&self, export_name: &str) -> Result<CursorState> {
match &self.conn {
StateConn::Sqlite(c) => {
let mut stmt = c.prepare(
"SELECT last_cursor_value, last_run_at FROM export_state WHERE export_name = ?1",
)?;
let result = stmt.query_row([export_name], |row| {
Ok(CursorState {
export_name: export_name.to_string(),
last_cursor_value: row.get(0)?,
last_run_at: row.get(1)?,
})
});
match result {
Ok(state) => Ok(state),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(CursorState {
export_name: export_name.to_string(),
last_cursor_value: None,
last_run_at: None,
}),
Err(e) => Err(e.into()),
}
}
StateConn::Postgres(client) => {
let mut c = client.borrow_mut();
match c.query_opt(
"SELECT last_cursor_value, last_run_at FROM export_state WHERE export_name = $1",
&[&export_name],
)? {
Some(row) => Ok(CursorState {
export_name: export_name.to_string(),
last_cursor_value: row.get(0),
last_run_at: row.get(1),
}),
None => Ok(CursorState {
export_name: export_name.to_string(),
last_cursor_value: None,
last_run_at: None,
}),
}
}
}
}
pub fn update(&self, export_name: &str, cursor_value: &str) -> Result<()> {
let now = chrono::Utc::now().to_rfc3339();
let sql = "INSERT INTO export_state (export_name, last_cursor_value, last_run_at)
VALUES (?1, ?2, ?3)
ON CONFLICT(export_name) DO UPDATE SET
last_cursor_value = excluded.last_cursor_value,
last_run_at = excluded.last_run_at";
match &self.conn {
StateConn::Sqlite(c) => {
c.execute(sql, rusqlite::params![export_name, cursor_value, now])?;
}
StateConn::Postgres(client) => {
let mut c = client.borrow_mut();
c.execute(&pg_sql(sql), &[&export_name, &cursor_value, &now])?;
}
}
Ok(())
}
pub fn reset(&self, export_name: &str) -> Result<()> {
let sql = "DELETE FROM export_state WHERE export_name = ?1";
match &self.conn {
StateConn::Sqlite(c) => {
c.execute(sql, [export_name])?;
}
StateConn::Postgres(client) => {
let mut c = client.borrow_mut();
c.execute(&pg_sql(sql), &[&export_name])?;
}
}
Ok(())
}
pub fn list_all(&self) -> Result<Vec<CursorState>> {
let sql = "SELECT export_name, last_cursor_value, last_run_at FROM export_state ORDER BY export_name";
match &self.conn {
StateConn::Sqlite(c) => {
let mut stmt = c.prepare(sql)?;
let rows = stmt.query_map([], |row| {
Ok(CursorState {
export_name: row.get(0)?,
last_cursor_value: row.get(1)?,
last_run_at: row.get(2)?,
})
})?;
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
}
StateConn::Postgres(client) => {
let mut c = client.borrow_mut();
let rows = c.query(sql, &[])?;
Ok(rows
.iter()
.map(|row| CursorState {
export_name: row.get(0),
last_cursor_value: row.get(1),
last_run_at: row.get(2),
})
.collect())
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn store() -> StateStore {
StateStore::open_in_memory().expect("in-memory store")
}
#[test]
fn get_unknown_returns_empty_state() {
let s = store();
let state = s.get("nonexistent").unwrap();
assert!(state.last_cursor_value.is_none());
}
#[test]
fn update_then_get_returns_stored_cursor() {
let s = store();
s.update("orders", "2024-06-01").unwrap();
assert_eq!(
s.get("orders").unwrap().last_cursor_value.as_deref(),
Some("2024-06-01")
);
}
#[test]
fn update_overwrites_previous_cursor() {
let s = store();
s.update("orders", "100").unwrap();
s.update("orders", "200").unwrap();
assert_eq!(
s.get("orders").unwrap().last_cursor_value.as_deref(),
Some("200")
);
}
#[test]
fn reset_clears_cursor_state() {
let s = store();
s.update("orders", "100").unwrap();
s.reset("orders").unwrap();
assert!(s.get("orders").unwrap().last_cursor_value.is_none());
}
#[test]
fn list_all_on_empty_store_returns_empty() {
assert!(store().list_all().unwrap().is_empty());
}
#[test]
fn list_all_returns_entries_sorted_by_name() {
let s = store();
s.update("gamma", "3").unwrap();
s.update("alpha", "1").unwrap();
s.update("beta", "2").unwrap();
let all = s.list_all().unwrap();
assert_eq!(all[0].export_name, "alpha");
assert_eq!(all[2].export_name, "gamma");
}
#[test]
fn duplicate_cursor_values_are_stored_as_written() {
let s = store();
s.update("orders", "2024-06-01T00:00:00Z").unwrap();
s.update("orders", "2024-06-01T00:00:00Z").unwrap();
assert_eq!(
s.get("orders").unwrap().last_cursor_value.as_deref(),
Some("2024-06-01T00:00:00Z")
);
}
#[test]
fn high_precision_timestamp_is_preserved_byte_for_byte() {
let s = store();
let ts = "2024-06-01T12:34:56.123456789+02:00";
s.update("events", ts).unwrap();
assert_eq!(
s.get("events").unwrap().last_cursor_value.as_deref(),
Some(ts)
);
}
#[test]
fn unicode_and_binary_like_cursor_values_round_trip() {
let s = store();
let values = [
"2024-06-01",
"018f1c0b-7a34-7b54-8e16-1c5a9b3f1c2d", "ελληνικά 🚀 cursor",
"v\n\t with whitespace",
"",
];
for v in values {
s.update("t", v).unwrap();
assert_eq!(
s.get("t").unwrap().last_cursor_value.as_deref(),
Some(v),
"cursor value {v:?} must round-trip exactly"
);
}
}
#[test]
fn reset_clears_cursor_state_completely() {
let s = store();
s.update("orders", "2024-06-01").unwrap();
s.reset("orders").unwrap();
let after = s.get("orders").unwrap();
assert!(after.last_cursor_value.is_none());
assert!(
after.last_run_at.is_none(),
"reset must clear last_run_at as well"
);
}
}