Skip to main content

tuitbot_core/storage/watchtower/
sources.rs

1//! CRUD operations for source_contexts.
2
3use super::{SourceContext, SourceContextRow};
4use crate::error::StorageError;
5use crate::storage::accounts::DEFAULT_ACCOUNT_ID;
6use crate::storage::DbPool;
7
8// ============================================================================
9// Account-scoped source context functions
10// ============================================================================
11
12/// Insert a new source context for a specific account and return its ID.
13pub async fn insert_source_context_for(
14    pool: &DbPool,
15    account_id: &str,
16    source_type: &str,
17    config_json: &str,
18) -> Result<i64, StorageError> {
19    let row: (i64,) = sqlx::query_as(
20        "INSERT INTO source_contexts (account_id, source_type, config_json) \
21         VALUES (?, ?, ?) \
22         RETURNING id",
23    )
24    .bind(account_id)
25    .bind(source_type)
26    .bind(config_json)
27    .fetch_one(pool)
28    .await
29    .map_err(|e| StorageError::Query { source: e })?;
30
31    Ok(row.0)
32}
33
34/// Insert a new source context and return its ID.
35pub async fn insert_source_context(
36    pool: &DbPool,
37    source_type: &str,
38    config_json: &str,
39) -> Result<i64, StorageError> {
40    insert_source_context_for(pool, DEFAULT_ACCOUNT_ID, source_type, config_json).await
41}
42
43/// Get a source context by ID.
44pub async fn get_source_context(
45    pool: &DbPool,
46    id: i64,
47) -> Result<Option<SourceContext>, StorageError> {
48    let row: Option<SourceContextRow> = sqlx::query_as(
49        "SELECT id, account_id, source_type, config_json, sync_cursor, \
50                    status, error_message, created_at, updated_at \
51             FROM source_contexts WHERE id = ?",
52    )
53    .bind(id)
54    .fetch_optional(pool)
55    .await
56    .map_err(|e| StorageError::Query { source: e })?;
57
58    Ok(row.map(SourceContext::from_row))
59}
60
61/// Get all active source contexts for a specific account.
62pub async fn get_source_contexts_for(
63    pool: &DbPool,
64    account_id: &str,
65) -> Result<Vec<SourceContext>, StorageError> {
66    let rows: Vec<SourceContextRow> = sqlx::query_as(
67        "SELECT id, account_id, source_type, config_json, sync_cursor, \
68                    status, error_message, created_at, updated_at \
69             FROM source_contexts \
70             WHERE account_id = ? AND status = 'active' ORDER BY id",
71    )
72    .bind(account_id)
73    .fetch_all(pool)
74    .await
75    .map_err(|e| StorageError::Query { source: e })?;
76
77    Ok(rows.into_iter().map(SourceContext::from_row).collect())
78}
79
80/// Get all active source contexts.
81pub async fn get_source_contexts(pool: &DbPool) -> Result<Vec<SourceContext>, StorageError> {
82    let rows: Vec<SourceContextRow> = sqlx::query_as(
83        "SELECT id, account_id, source_type, config_json, sync_cursor, \
84                    status, error_message, created_at, updated_at \
85             FROM source_contexts WHERE status = 'active' ORDER BY id",
86    )
87    .fetch_all(pool)
88    .await
89    .map_err(|e| StorageError::Query { source: e })?;
90
91    Ok(rows.into_iter().map(SourceContext::from_row).collect())
92}
93
94/// Get all source contexts regardless of status (for status APIs).
95pub async fn get_all_source_contexts(pool: &DbPool) -> Result<Vec<SourceContext>, StorageError> {
96    let rows: Vec<SourceContextRow> = sqlx::query_as(
97        "SELECT id, account_id, source_type, config_json, sync_cursor, \
98                    status, error_message, created_at, updated_at \
99             FROM source_contexts ORDER BY id",
100    )
101    .fetch_all(pool)
102    .await
103    .map_err(|e| StorageError::Query { source: e })?;
104
105    Ok(rows.into_iter().map(SourceContext::from_row).collect())
106}
107
108/// Update the sync cursor for a source context.
109pub async fn update_sync_cursor(pool: &DbPool, id: i64, cursor: &str) -> Result<(), StorageError> {
110    sqlx::query(
111        "UPDATE source_contexts SET sync_cursor = ?, updated_at = datetime('now') WHERE id = ?",
112    )
113    .bind(cursor)
114    .bind(id)
115    .execute(pool)
116    .await
117    .map_err(|e| StorageError::Query { source: e })?;
118
119    Ok(())
120}
121
122/// Update the status (and optional error message) of a source context.
123pub async fn update_source_status(
124    pool: &DbPool,
125    id: i64,
126    status: &str,
127    error_message: Option<&str>,
128) -> Result<(), StorageError> {
129    sqlx::query(
130        "UPDATE source_contexts \
131         SET status = ?, error_message = ?, updated_at = datetime('now') \
132         WHERE id = ?",
133    )
134    .bind(status)
135    .bind(error_message)
136    .bind(id)
137    .execute(pool)
138    .await
139    .map_err(|e| StorageError::Query { source: e })?;
140
141    Ok(())
142}
143
144/// Find a source context by path substring for a specific account.
145pub async fn find_source_by_path_for(
146    pool: &DbPool,
147    account_id: &str,
148    path: &str,
149) -> Result<Option<SourceContext>, StorageError> {
150    let row: Option<SourceContextRow> = sqlx::query_as(
151        "SELECT id, account_id, source_type, config_json, sync_cursor, \
152                    status, error_message, created_at, updated_at \
153             FROM source_contexts \
154             WHERE account_id = ? AND source_type = 'local_fs' AND status = 'active' \
155               AND config_json LIKE '%' || ? || '%' \
156             LIMIT 1",
157    )
158    .bind(account_id)
159    .bind(path)
160    .fetch_optional(pool)
161    .await
162    .map_err(|e| StorageError::Query { source: e })?;
163
164    Ok(row.map(SourceContext::from_row))
165}
166
167/// Find a source context by source type and path substring in config_json.
168pub async fn find_source_by_path(
169    pool: &DbPool,
170    path: &str,
171) -> Result<Option<SourceContext>, StorageError> {
172    find_source_by_path_for(pool, DEFAULT_ACCOUNT_ID, path).await
173}
174
175/// Ensure a "local_fs" source context exists for a specific account, returning its ID.
176pub async fn ensure_local_fs_source_for(
177    pool: &DbPool,
178    account_id: &str,
179    path: &str,
180    config_json: &str,
181) -> Result<i64, StorageError> {
182    if let Some(ctx) = find_source_by_path_for(pool, account_id, path).await? {
183        return Ok(ctx.id);
184    }
185    insert_source_context_for(pool, account_id, "local_fs", config_json).await
186}
187
188/// Ensure a "local_fs" source context exists for the given path, returning its ID.
189pub async fn ensure_local_fs_source(
190    pool: &DbPool,
191    path: &str,
192    config_json: &str,
193) -> Result<i64, StorageError> {
194    ensure_local_fs_source_for(pool, DEFAULT_ACCOUNT_ID, path, config_json).await
195}
196
197/// Find a source context by Google Drive folder ID for a specific account.
198pub async fn find_source_by_folder_id_for(
199    pool: &DbPool,
200    account_id: &str,
201    folder_id: &str,
202) -> Result<Option<SourceContext>, StorageError> {
203    let row: Option<SourceContextRow> = sqlx::query_as(
204        "SELECT id, account_id, source_type, config_json, sync_cursor, \
205                    status, error_message, created_at, updated_at \
206             FROM source_contexts \
207             WHERE account_id = ? AND source_type = 'google_drive' AND status = 'active' \
208               AND config_json LIKE '%' || ? || '%' \
209             LIMIT 1",
210    )
211    .bind(account_id)
212    .bind(folder_id)
213    .fetch_optional(pool)
214    .await
215    .map_err(|e| StorageError::Query { source: e })?;
216
217    Ok(row.map(SourceContext::from_row))
218}
219
220/// Find a source context by Google Drive folder ID in config_json.
221pub async fn find_source_by_folder_id(
222    pool: &DbPool,
223    folder_id: &str,
224) -> Result<Option<SourceContext>, StorageError> {
225    find_source_by_folder_id_for(pool, DEFAULT_ACCOUNT_ID, folder_id).await
226}
227
228/// Ensure a "google_drive" source context exists for a specific account, returning its ID.
229pub async fn ensure_google_drive_source_for(
230    pool: &DbPool,
231    account_id: &str,
232    folder_id: &str,
233    config_json: &str,
234) -> Result<i64, StorageError> {
235    if let Some(ctx) = find_source_by_folder_id_for(pool, account_id, folder_id).await? {
236        return Ok(ctx.id);
237    }
238    insert_source_context_for(pool, account_id, "google_drive", config_json).await
239}
240
241/// Ensure a "google_drive" source context exists for the given folder ID, returning its ID.
242pub async fn ensure_google_drive_source(
243    pool: &DbPool,
244    folder_id: &str,
245    config_json: &str,
246) -> Result<i64, StorageError> {
247    ensure_google_drive_source_for(pool, DEFAULT_ACCOUNT_ID, folder_id, config_json).await
248}
249
250/// Get all source contexts for a specific account regardless of status.
251pub async fn get_all_source_contexts_for(
252    pool: &DbPool,
253    account_id: &str,
254) -> Result<Vec<SourceContext>, StorageError> {
255    let rows: Vec<SourceContextRow> = sqlx::query_as(
256        "SELECT id, account_id, source_type, config_json, sync_cursor, \
257                    status, error_message, created_at, updated_at \
258             FROM source_contexts \
259             WHERE account_id = ? ORDER BY id",
260    )
261    .bind(account_id)
262    .fetch_all(pool)
263    .await
264    .map_err(|e| StorageError::Query { source: e })?;
265
266    Ok(rows.into_iter().map(SourceContext::from_row).collect())
267}
268
269/// Ensure a "manual" source context exists for a specific account, returning its ID.
270pub async fn ensure_manual_source_for(
271    pool: &DbPool,
272    account_id: &str,
273) -> Result<i64, StorageError> {
274    let existing: Option<(i64,)> = sqlx::query_as(
275        "SELECT id FROM source_contexts \
276         WHERE account_id = ? AND source_type = 'manual' AND status = 'active' \
277         LIMIT 1",
278    )
279    .bind(account_id)
280    .fetch_optional(pool)
281    .await
282    .map_err(|e| StorageError::Query { source: e })?;
283
284    match existing {
285        Some((id,)) => Ok(id),
286        None => insert_source_context_for(pool, account_id, "manual", "{}").await,
287    }
288}
289
290/// Ensure a "manual" source context exists for inline ingestion, returning its ID.
291pub async fn ensure_manual_source(pool: &DbPool) -> Result<i64, StorageError> {
292    ensure_manual_source_for(pool, DEFAULT_ACCOUNT_ID).await
293}