Skip to main content

atrg_stream/
cursor.rs

1//! Cursor persistence for Jetstream consumer.
2//!
3//! Stores the last processed event timestamp (`time_us`) so the consumer
4//! can resume from where it left off after restart.
5
6use atrg_db::DbPool;
7
8/// SQL for creating the cursor table (SQLite).
9pub const CREATE_CURSOR_TABLE_SQLITE: &str = r#"
10CREATE TABLE IF NOT EXISTS atrg_jetstream_cursors (
11    consumer_id TEXT PRIMARY KEY,
12    time_us INTEGER NOT NULL,
13    updated_at INTEGER NOT NULL DEFAULT (unixepoch())
14);
15"#;
16
17/// SQL for creating the cursor table (PostgreSQL).
18pub const CREATE_CURSOR_TABLE_POSTGRES: &str = r#"
19CREATE TABLE IF NOT EXISTS atrg_jetstream_cursors (
20    consumer_id TEXT PRIMARY KEY,
21    time_us BIGINT NOT NULL,
22    updated_at BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW())::bigint
23);
24"#;
25
26/// Load the last stored cursor for a consumer.
27/// Returns `None` if no cursor has been stored yet.
28pub async fn load_cursor(pool: &DbPool, consumer_id: &str) -> anyhow::Result<Option<i64>> {
29    let result: Option<i64> = match pool {
30        #[cfg(feature = "sqlite")]
31        DbPool::Sqlite(p) => {
32            sqlx::query_scalar("SELECT time_us FROM atrg_jetstream_cursors WHERE consumer_id = ?1")
33                .bind(consumer_id)
34                .fetch_optional(p)
35                .await?
36        }
37        #[cfg(feature = "postgres")]
38        DbPool::Postgres(p) => {
39            sqlx::query_scalar("SELECT time_us FROM atrg_jetstream_cursors WHERE consumer_id = $1")
40                .bind(consumer_id)
41                .fetch_optional(p)
42                .await?
43        }
44        #[allow(unreachable_patterns)]
45        _ => anyhow::bail!("no database backend enabled for this operation"),
46    };
47    Ok(result)
48}
49
50/// Save the cursor position for a consumer.
51pub async fn save_cursor(pool: &DbPool, consumer_id: &str, time_us: i64) -> anyhow::Result<()> {
52    let now = std::time::SystemTime::now()
53        .duration_since(std::time::UNIX_EPOCH)
54        .unwrap_or_default()
55        .as_secs() as i64;
56
57    match pool {
58        #[cfg(feature = "sqlite")]
59        DbPool::Sqlite(p) => {
60            sqlx::query(
61                "INSERT INTO atrg_jetstream_cursors (consumer_id, time_us, updated_at) \
62                 VALUES (?1, ?2, ?3) \
63                 ON CONFLICT(consumer_id) DO UPDATE SET time_us = ?2, updated_at = ?3",
64            )
65            .bind(consumer_id)
66            .bind(time_us)
67            .bind(now)
68            .execute(p)
69            .await?;
70        }
71        #[cfg(feature = "postgres")]
72        DbPool::Postgres(p) => {
73            sqlx::query(
74                "INSERT INTO atrg_jetstream_cursors (consumer_id, time_us, updated_at) \
75                 VALUES ($1, $2, $3) \
76                 ON CONFLICT(consumer_id) DO UPDATE SET time_us = $2, updated_at = $3",
77            )
78            .bind(consumer_id)
79            .bind(time_us)
80            .bind(now)
81            .execute(p)
82            .await?;
83        }
84        #[allow(unreachable_patterns)]
85        _ => anyhow::bail!("no database backend enabled for this operation"),
86    }
87    Ok(())
88}
89
90/// Ensure the cursor table exists for the active backend.
91///
92/// This should be called once during app startup (e.g. inside
93/// `AtrgApp::run()`) before spawning the Jetstream consumer.
94pub async fn ensure_cursor_table(pool: &DbPool) -> anyhow::Result<()> {
95    match pool {
96        #[cfg(feature = "sqlite")]
97        DbPool::Sqlite(p) => {
98            sqlx::query(CREATE_CURSOR_TABLE_SQLITE).execute(p).await?;
99        }
100        #[cfg(feature = "postgres")]
101        DbPool::Postgres(p) => {
102            sqlx::query(CREATE_CURSOR_TABLE_POSTGRES).execute(p).await?;
103        }
104        #[allow(unreachable_patterns)]
105        _ => anyhow::bail!("no database backend enabled for this operation"),
106    }
107    tracing::debug!("ensured atrg_jetstream_cursors table exists");
108    Ok(())
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114
115    #[cfg(feature = "sqlite")]
116    #[tokio::test]
117    async fn test_cursor_roundtrip() {
118        let pool = atrg_db::connect("sqlite::memory:").await.unwrap();
119        // Create table
120        if let DbPool::Sqlite(p) = &pool {
121            sqlx::query(CREATE_CURSOR_TABLE_SQLITE)
122                .execute(p)
123                .await
124                .unwrap();
125        }
126
127        // Initially no cursor
128        let cursor = load_cursor(&pool, "test-consumer").await.unwrap();
129        assert_eq!(cursor, None);
130
131        // Save cursor
132        save_cursor(&pool, "test-consumer", 1234567890)
133            .await
134            .unwrap();
135
136        // Load it back
137        let cursor = load_cursor(&pool, "test-consumer").await.unwrap();
138        assert_eq!(cursor, Some(1234567890));
139
140        // Update cursor
141        save_cursor(&pool, "test-consumer", 9999999999)
142            .await
143            .unwrap();
144        let cursor = load_cursor(&pool, "test-consumer").await.unwrap();
145        assert_eq!(cursor, Some(9999999999));
146    }
147
148    #[cfg(feature = "sqlite")]
149    #[tokio::test]
150    async fn test_multiple_consumers() {
151        let pool = atrg_db::connect("sqlite::memory:").await.unwrap();
152        if let DbPool::Sqlite(p) = &pool {
153            sqlx::query(CREATE_CURSOR_TABLE_SQLITE)
154                .execute(p)
155                .await
156                .unwrap();
157        }
158
159        save_cursor(&pool, "consumer-a", 100).await.unwrap();
160        save_cursor(&pool, "consumer-b", 200).await.unwrap();
161
162        assert_eq!(load_cursor(&pool, "consumer-a").await.unwrap(), Some(100));
163        assert_eq!(load_cursor(&pool, "consumer-b").await.unwrap(), Some(200));
164    }
165
166    #[cfg(feature = "sqlite")]
167    #[tokio::test]
168    async fn test_ensure_cursor_table_idempotent() {
169        let pool = atrg_db::connect("sqlite::memory:").await.unwrap();
170
171        // Call twice — should not error on the second call.
172        ensure_cursor_table(&pool).await.unwrap();
173        ensure_cursor_table(&pool).await.unwrap();
174
175        // Table should be usable.
176        save_cursor(&pool, "idempotent-test", 42).await.unwrap();
177        assert_eq!(
178            load_cursor(&pool, "idempotent-test").await.unwrap(),
179            Some(42)
180        );
181    }
182}