1use crate::error::Result;
2use crate::types::CursorState;
3
4use super::{StateConn, StateStore, pg_sql};
5
6impl StateStore {
12 pub fn get(&self, export_name: &str) -> Result<CursorState> {
13 match &self.conn {
14 StateConn::Sqlite(c) => {
15 let mut stmt = c.prepare(
16 "SELECT last_cursor_value, last_run_at FROM export_state WHERE export_name = ?1",
17 )?;
18 let result = stmt.query_row([export_name], |row| {
19 Ok(CursorState {
20 export_name: export_name.to_string(),
21 last_cursor_value: row.get(0)?,
22 last_run_at: row.get(1)?,
23 })
24 });
25 match result {
26 Ok(state) => Ok(state),
27 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(CursorState {
28 export_name: export_name.to_string(),
29 last_cursor_value: None,
30 last_run_at: None,
31 }),
32 Err(e) => Err(e.into()),
33 }
34 }
35 StateConn::Postgres(client) => {
36 let mut c = client.borrow_mut();
37 match c.query_opt(
38 "SELECT last_cursor_value, last_run_at FROM export_state WHERE export_name = $1",
39 &[&export_name],
40 )? {
41 Some(row) => Ok(CursorState {
42 export_name: export_name.to_string(),
43 last_cursor_value: row.get(0),
44 last_run_at: row.get(1),
45 }),
46 None => Ok(CursorState {
47 export_name: export_name.to_string(),
48 last_cursor_value: None,
49 last_run_at: None,
50 }),
51 }
52 }
53 }
54 }
55
56 pub fn update(&self, export_name: &str, cursor_value: &str) -> Result<()> {
57 let now = chrono::Utc::now().to_rfc3339();
58 let sql = "INSERT INTO export_state (export_name, last_cursor_value, last_run_at)
59 VALUES (?1, ?2, ?3)
60 ON CONFLICT(export_name) DO UPDATE SET
61 last_cursor_value = excluded.last_cursor_value,
62 last_run_at = excluded.last_run_at";
63 match &self.conn {
64 StateConn::Sqlite(c) => {
65 c.execute(sql, rusqlite::params![export_name, cursor_value, now])?;
66 }
67 StateConn::Postgres(client) => {
68 let mut c = client.borrow_mut();
69 c.execute(&pg_sql(sql), &[&export_name, &cursor_value, &now])?;
70 }
71 }
72 Ok(())
73 }
74
75 pub fn reset(&self, export_name: &str) -> Result<()> {
76 let sql = "DELETE FROM export_state WHERE export_name = ?1";
77 match &self.conn {
78 StateConn::Sqlite(c) => {
79 c.execute(sql, [export_name])?;
80 }
81 StateConn::Postgres(client) => {
82 let mut c = client.borrow_mut();
83 c.execute(&pg_sql(sql), &[&export_name])?;
84 }
85 }
86 Ok(())
87 }
88
89 pub fn list_all(&self) -> Result<Vec<CursorState>> {
90 let sql = "SELECT export_name, last_cursor_value, last_run_at FROM export_state ORDER BY export_name";
91 match &self.conn {
92 StateConn::Sqlite(c) => {
93 let mut stmt = c.prepare(sql)?;
94 let rows = stmt.query_map([], |row| {
95 Ok(CursorState {
96 export_name: row.get(0)?,
97 last_cursor_value: row.get(1)?,
98 last_run_at: row.get(2)?,
99 })
100 })?;
101 rows.collect::<std::result::Result<Vec<_>, _>>()
102 .map_err(Into::into)
103 }
104 StateConn::Postgres(client) => {
105 let mut c = client.borrow_mut();
106 let rows = c.query(sql, &[])?;
107 Ok(rows
108 .iter()
109 .map(|row| CursorState {
110 export_name: row.get(0),
111 last_cursor_value: row.get(1),
112 last_run_at: row.get(2),
113 })
114 .collect())
115 }
116 }
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123
124 fn store() -> StateStore {
125 StateStore::open_in_memory().expect("in-memory store")
126 }
127
128 #[test]
129 fn get_unknown_returns_empty_state() {
130 let s = store();
131 let state = s.get("nonexistent").unwrap();
132 assert!(state.last_cursor_value.is_none());
133 }
134
135 #[test]
136 fn update_then_get_returns_stored_cursor() {
137 let s = store();
138 s.update("orders", "2024-06-01").unwrap();
139 assert_eq!(
140 s.get("orders").unwrap().last_cursor_value.as_deref(),
141 Some("2024-06-01")
142 );
143 }
144
145 #[test]
146 fn update_overwrites_previous_cursor() {
147 let s = store();
148 s.update("orders", "100").unwrap();
149 s.update("orders", "200").unwrap();
150 assert_eq!(
151 s.get("orders").unwrap().last_cursor_value.as_deref(),
152 Some("200")
153 );
154 }
155
156 #[test]
157 fn reset_clears_cursor_state() {
158 let s = store();
159 s.update("orders", "100").unwrap();
160 s.reset("orders").unwrap();
161 assert!(s.get("orders").unwrap().last_cursor_value.is_none());
162 }
163
164 #[test]
165 fn list_all_on_empty_store_returns_empty() {
166 assert!(store().list_all().unwrap().is_empty());
167 }
168
169 #[test]
170 fn list_all_returns_entries_sorted_by_name() {
171 let s = store();
172 s.update("gamma", "3").unwrap();
173 s.update("alpha", "1").unwrap();
174 s.update("beta", "2").unwrap();
175 let all = s.list_all().unwrap();
176 assert_eq!(all[0].export_name, "alpha");
177 assert_eq!(all[2].export_name, "gamma");
178 }
179
180 #[test]
191 fn duplicate_cursor_values_are_stored_as_written() {
192 let s = store();
193 s.update("orders", "2024-06-01T00:00:00Z").unwrap();
194 s.update("orders", "2024-06-01T00:00:00Z").unwrap();
195 assert_eq!(
196 s.get("orders").unwrap().last_cursor_value.as_deref(),
197 Some("2024-06-01T00:00:00Z")
198 );
199 }
200
201 #[test]
205 fn high_precision_timestamp_is_preserved_byte_for_byte() {
206 let s = store();
207 let ts = "2024-06-01T12:34:56.123456789+02:00";
208 s.update("events", ts).unwrap();
209 assert_eq!(
210 s.get("events").unwrap().last_cursor_value.as_deref(),
211 Some(ts)
212 );
213 }
214
215 #[test]
218 fn unicode_and_binary_like_cursor_values_round_trip() {
219 let s = store();
220 let values = [
221 "2024-06-01",
222 "018f1c0b-7a34-7b54-8e16-1c5a9b3f1c2d", "ελληνικά 🚀 cursor",
224 "v\n\t with whitespace",
225 "",
226 ];
227 for v in values {
228 s.update("t", v).unwrap();
229 assert_eq!(
230 s.get("t").unwrap().last_cursor_value.as_deref(),
231 Some(v),
232 "cursor value {v:?} must round-trip exactly"
233 );
234 }
235 }
236
237 #[test]
240 fn reset_clears_cursor_state_completely() {
241 let s = store();
242 s.update("orders", "2024-06-01").unwrap();
243 s.reset("orders").unwrap();
244 let after = s.get("orders").unwrap();
245 assert!(after.last_cursor_value.is_none());
246 assert!(
247 after.last_run_at.is_none(),
248 "reset must clear last_run_at as well"
249 );
250 }
251}