1use atrg_db::DbPool;
7
8pub const CREATE_CURSOR_TABLE_SQLITE: &str = r#"
10CREATE TABLE IF NOT EXISTS atrg_jetstream_cursors (
11 consumer_id TEXT PRIMARY KEY,
12 time_us INTEGER NOT NULL,
13 updated_at INTEGER NOT NULL DEFAULT (unixepoch())
14);
15"#;
16
17pub const CREATE_CURSOR_TABLE_POSTGRES: &str = r#"
19CREATE TABLE IF NOT EXISTS atrg_jetstream_cursors (
20 consumer_id TEXT PRIMARY KEY,
21 time_us BIGINT NOT NULL,
22 updated_at BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW())::bigint
23);
24"#;
25
26pub async fn load_cursor(pool: &DbPool, consumer_id: &str) -> anyhow::Result<Option<i64>> {
29 let result: Option<i64> = match pool {
30 #[cfg(feature = "sqlite")]
31 DbPool::Sqlite(p) => {
32 sqlx::query_scalar("SELECT time_us FROM atrg_jetstream_cursors WHERE consumer_id = ?1")
33 .bind(consumer_id)
34 .fetch_optional(p)
35 .await?
36 }
37 #[cfg(feature = "postgres")]
38 DbPool::Postgres(p) => {
39 sqlx::query_scalar("SELECT time_us FROM atrg_jetstream_cursors WHERE consumer_id = $1")
40 .bind(consumer_id)
41 .fetch_optional(p)
42 .await?
43 }
44 #[allow(unreachable_patterns)]
45 _ => anyhow::bail!("no database backend enabled for this operation"),
46 };
47 Ok(result)
48}
49
50pub async fn save_cursor(pool: &DbPool, consumer_id: &str, time_us: i64) -> anyhow::Result<()> {
52 let now = std::time::SystemTime::now()
53 .duration_since(std::time::UNIX_EPOCH)
54 .unwrap_or_default()
55 .as_secs() as i64;
56
57 match pool {
58 #[cfg(feature = "sqlite")]
59 DbPool::Sqlite(p) => {
60 sqlx::query(
61 "INSERT INTO atrg_jetstream_cursors (consumer_id, time_us, updated_at) \
62 VALUES (?1, ?2, ?3) \
63 ON CONFLICT(consumer_id) DO UPDATE SET time_us = ?2, updated_at = ?3",
64 )
65 .bind(consumer_id)
66 .bind(time_us)
67 .bind(now)
68 .execute(p)
69 .await?;
70 }
71 #[cfg(feature = "postgres")]
72 DbPool::Postgres(p) => {
73 sqlx::query(
74 "INSERT INTO atrg_jetstream_cursors (consumer_id, time_us, updated_at) \
75 VALUES ($1, $2, $3) \
76 ON CONFLICT(consumer_id) DO UPDATE SET time_us = $2, updated_at = $3",
77 )
78 .bind(consumer_id)
79 .bind(time_us)
80 .bind(now)
81 .execute(p)
82 .await?;
83 }
84 #[allow(unreachable_patterns)]
85 _ => anyhow::bail!("no database backend enabled for this operation"),
86 }
87 Ok(())
88}
89
90pub async fn ensure_cursor_table(pool: &DbPool) -> anyhow::Result<()> {
95 match pool {
96 #[cfg(feature = "sqlite")]
97 DbPool::Sqlite(p) => {
98 sqlx::query(CREATE_CURSOR_TABLE_SQLITE).execute(p).await?;
99 }
100 #[cfg(feature = "postgres")]
101 DbPool::Postgres(p) => {
102 sqlx::query(CREATE_CURSOR_TABLE_POSTGRES).execute(p).await?;
103 }
104 #[allow(unreachable_patterns)]
105 _ => anyhow::bail!("no database backend enabled for this operation"),
106 }
107 tracing::debug!("ensured atrg_jetstream_cursors table exists");
108 Ok(())
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114
115 #[cfg(feature = "sqlite")]
116 #[tokio::test]
117 async fn test_cursor_roundtrip() {
118 let pool = atrg_db::connect("sqlite::memory:").await.unwrap();
119 if let DbPool::Sqlite(p) = &pool {
121 sqlx::query(CREATE_CURSOR_TABLE_SQLITE)
122 .execute(p)
123 .await
124 .unwrap();
125 }
126
127 let cursor = load_cursor(&pool, "test-consumer").await.unwrap();
129 assert_eq!(cursor, None);
130
131 save_cursor(&pool, "test-consumer", 1234567890)
133 .await
134 .unwrap();
135
136 let cursor = load_cursor(&pool, "test-consumer").await.unwrap();
138 assert_eq!(cursor, Some(1234567890));
139
140 save_cursor(&pool, "test-consumer", 9999999999)
142 .await
143 .unwrap();
144 let cursor = load_cursor(&pool, "test-consumer").await.unwrap();
145 assert_eq!(cursor, Some(9999999999));
146 }
147
148 #[cfg(feature = "sqlite")]
149 #[tokio::test]
150 async fn test_multiple_consumers() {
151 let pool = atrg_db::connect("sqlite::memory:").await.unwrap();
152 if let DbPool::Sqlite(p) = &pool {
153 sqlx::query(CREATE_CURSOR_TABLE_SQLITE)
154 .execute(p)
155 .await
156 .unwrap();
157 }
158
159 save_cursor(&pool, "consumer-a", 100).await.unwrap();
160 save_cursor(&pool, "consumer-b", 200).await.unwrap();
161
162 assert_eq!(load_cursor(&pool, "consumer-a").await.unwrap(), Some(100));
163 assert_eq!(load_cursor(&pool, "consumer-b").await.unwrap(), Some(200));
164 }
165
166 #[cfg(feature = "sqlite")]
167 #[tokio::test]
168 async fn test_ensure_cursor_table_idempotent() {
169 let pool = atrg_db::connect("sqlite::memory:").await.unwrap();
170
171 ensure_cursor_table(&pool).await.unwrap();
173 ensure_cursor_table(&pool).await.unwrap();
174
175 save_cursor(&pool, "idempotent-test", 42).await.unwrap();
177 assert_eq!(
178 load_cursor(&pool, "idempotent-test").await.unwrap(),
179 Some(42)
180 );
181 }
182}