1use super::{SourceContext, SourceContextRow};
4use crate::error::StorageError;
5use crate::storage::accounts::DEFAULT_ACCOUNT_ID;
6use crate::storage::DbPool;
7
8pub async fn insert_source_context_for(
14 pool: &DbPool,
15 account_id: &str,
16 source_type: &str,
17 config_json: &str,
18) -> Result<i64, StorageError> {
19 let row: (i64,) = sqlx::query_as(
20 "INSERT INTO source_contexts (account_id, source_type, config_json) \
21 VALUES (?, ?, ?) \
22 RETURNING id",
23 )
24 .bind(account_id)
25 .bind(source_type)
26 .bind(config_json)
27 .fetch_one(pool)
28 .await
29 .map_err(|e| StorageError::Query { source: e })?;
30
31 Ok(row.0)
32}
33
34pub async fn insert_source_context(
36 pool: &DbPool,
37 source_type: &str,
38 config_json: &str,
39) -> Result<i64, StorageError> {
40 insert_source_context_for(pool, DEFAULT_ACCOUNT_ID, source_type, config_json).await
41}
42
43pub async fn get_source_context(
45 pool: &DbPool,
46 id: i64,
47) -> Result<Option<SourceContext>, StorageError> {
48 let row: Option<SourceContextRow> = sqlx::query_as(
49 "SELECT id, account_id, source_type, config_json, sync_cursor, \
50 status, error_message, created_at, updated_at \
51 FROM source_contexts WHERE id = ?",
52 )
53 .bind(id)
54 .fetch_optional(pool)
55 .await
56 .map_err(|e| StorageError::Query { source: e })?;
57
58 Ok(row.map(SourceContext::from_row))
59}
60
61pub async fn get_source_contexts_for(
63 pool: &DbPool,
64 account_id: &str,
65) -> Result<Vec<SourceContext>, StorageError> {
66 let rows: Vec<SourceContextRow> = sqlx::query_as(
67 "SELECT id, account_id, source_type, config_json, sync_cursor, \
68 status, error_message, created_at, updated_at \
69 FROM source_contexts \
70 WHERE account_id = ? AND status = 'active' ORDER BY id",
71 )
72 .bind(account_id)
73 .fetch_all(pool)
74 .await
75 .map_err(|e| StorageError::Query { source: e })?;
76
77 Ok(rows.into_iter().map(SourceContext::from_row).collect())
78}
79
80pub async fn get_source_contexts(pool: &DbPool) -> Result<Vec<SourceContext>, StorageError> {
82 let rows: Vec<SourceContextRow> = sqlx::query_as(
83 "SELECT id, account_id, source_type, config_json, sync_cursor, \
84 status, error_message, created_at, updated_at \
85 FROM source_contexts WHERE status = 'active' ORDER BY id",
86 )
87 .fetch_all(pool)
88 .await
89 .map_err(|e| StorageError::Query { source: e })?;
90
91 Ok(rows.into_iter().map(SourceContext::from_row).collect())
92}
93
94pub async fn get_all_source_contexts(pool: &DbPool) -> Result<Vec<SourceContext>, StorageError> {
96 let rows: Vec<SourceContextRow> = sqlx::query_as(
97 "SELECT id, account_id, source_type, config_json, sync_cursor, \
98 status, error_message, created_at, updated_at \
99 FROM source_contexts ORDER BY id",
100 )
101 .fetch_all(pool)
102 .await
103 .map_err(|e| StorageError::Query { source: e })?;
104
105 Ok(rows.into_iter().map(SourceContext::from_row).collect())
106}
107
108pub async fn update_sync_cursor(pool: &DbPool, id: i64, cursor: &str) -> Result<(), StorageError> {
110 sqlx::query(
111 "UPDATE source_contexts SET sync_cursor = ?, updated_at = datetime('now') WHERE id = ?",
112 )
113 .bind(cursor)
114 .bind(id)
115 .execute(pool)
116 .await
117 .map_err(|e| StorageError::Query { source: e })?;
118
119 Ok(())
120}
121
122pub async fn update_source_status(
124 pool: &DbPool,
125 id: i64,
126 status: &str,
127 error_message: Option<&str>,
128) -> Result<(), StorageError> {
129 sqlx::query(
130 "UPDATE source_contexts \
131 SET status = ?, error_message = ?, updated_at = datetime('now') \
132 WHERE id = ?",
133 )
134 .bind(status)
135 .bind(error_message)
136 .bind(id)
137 .execute(pool)
138 .await
139 .map_err(|e| StorageError::Query { source: e })?;
140
141 Ok(())
142}
143
144pub async fn find_source_by_path_for(
146 pool: &DbPool,
147 account_id: &str,
148 path: &str,
149) -> Result<Option<SourceContext>, StorageError> {
150 let row: Option<SourceContextRow> = sqlx::query_as(
151 "SELECT id, account_id, source_type, config_json, sync_cursor, \
152 status, error_message, created_at, updated_at \
153 FROM source_contexts \
154 WHERE account_id = ? AND source_type = 'local_fs' AND status = 'active' \
155 AND config_json LIKE '%' || ? || '%' \
156 LIMIT 1",
157 )
158 .bind(account_id)
159 .bind(path)
160 .fetch_optional(pool)
161 .await
162 .map_err(|e| StorageError::Query { source: e })?;
163
164 Ok(row.map(SourceContext::from_row))
165}
166
167pub async fn find_source_by_path(
169 pool: &DbPool,
170 path: &str,
171) -> Result<Option<SourceContext>, StorageError> {
172 find_source_by_path_for(pool, DEFAULT_ACCOUNT_ID, path).await
173}
174
175pub async fn ensure_local_fs_source_for(
177 pool: &DbPool,
178 account_id: &str,
179 path: &str,
180 config_json: &str,
181) -> Result<i64, StorageError> {
182 if let Some(ctx) = find_source_by_path_for(pool, account_id, path).await? {
183 return Ok(ctx.id);
184 }
185 insert_source_context_for(pool, account_id, "local_fs", config_json).await
186}
187
188pub async fn ensure_local_fs_source(
190 pool: &DbPool,
191 path: &str,
192 config_json: &str,
193) -> Result<i64, StorageError> {
194 ensure_local_fs_source_for(pool, DEFAULT_ACCOUNT_ID, path, config_json).await
195}
196
197pub async fn find_source_by_folder_id_for(
199 pool: &DbPool,
200 account_id: &str,
201 folder_id: &str,
202) -> Result<Option<SourceContext>, StorageError> {
203 let row: Option<SourceContextRow> = sqlx::query_as(
204 "SELECT id, account_id, source_type, config_json, sync_cursor, \
205 status, error_message, created_at, updated_at \
206 FROM source_contexts \
207 WHERE account_id = ? AND source_type = 'google_drive' AND status = 'active' \
208 AND config_json LIKE '%' || ? || '%' \
209 LIMIT 1",
210 )
211 .bind(account_id)
212 .bind(folder_id)
213 .fetch_optional(pool)
214 .await
215 .map_err(|e| StorageError::Query { source: e })?;
216
217 Ok(row.map(SourceContext::from_row))
218}
219
220pub async fn find_source_by_folder_id(
222 pool: &DbPool,
223 folder_id: &str,
224) -> Result<Option<SourceContext>, StorageError> {
225 find_source_by_folder_id_for(pool, DEFAULT_ACCOUNT_ID, folder_id).await
226}
227
228pub async fn ensure_google_drive_source_for(
230 pool: &DbPool,
231 account_id: &str,
232 folder_id: &str,
233 config_json: &str,
234) -> Result<i64, StorageError> {
235 if let Some(ctx) = find_source_by_folder_id_for(pool, account_id, folder_id).await? {
236 return Ok(ctx.id);
237 }
238 insert_source_context_for(pool, account_id, "google_drive", config_json).await
239}
240
241pub async fn ensure_google_drive_source(
243 pool: &DbPool,
244 folder_id: &str,
245 config_json: &str,
246) -> Result<i64, StorageError> {
247 ensure_google_drive_source_for(pool, DEFAULT_ACCOUNT_ID, folder_id, config_json).await
248}
249
250pub async fn get_all_source_contexts_for(
252 pool: &DbPool,
253 account_id: &str,
254) -> Result<Vec<SourceContext>, StorageError> {
255 let rows: Vec<SourceContextRow> = sqlx::query_as(
256 "SELECT id, account_id, source_type, config_json, sync_cursor, \
257 status, error_message, created_at, updated_at \
258 FROM source_contexts \
259 WHERE account_id = ? ORDER BY id",
260 )
261 .bind(account_id)
262 .fetch_all(pool)
263 .await
264 .map_err(|e| StorageError::Query { source: e })?;
265
266 Ok(rows.into_iter().map(SourceContext::from_row).collect())
267}
268
269pub async fn ensure_manual_source_for(
271 pool: &DbPool,
272 account_id: &str,
273) -> Result<i64, StorageError> {
274 let existing: Option<(i64,)> = sqlx::query_as(
275 "SELECT id FROM source_contexts \
276 WHERE account_id = ? AND source_type = 'manual' AND status = 'active' \
277 LIMIT 1",
278 )
279 .bind(account_id)
280 .fetch_optional(pool)
281 .await
282 .map_err(|e| StorageError::Query { source: e })?;
283
284 match existing {
285 Some((id,)) => Ok(id),
286 None => insert_source_context_for(pool, account_id, "manual", "{}").await,
287 }
288}
289
290pub async fn ensure_manual_source(pool: &DbPool) -> Result<i64, StorageError> {
292 ensure_manual_source_for(pool, DEFAULT_ACCOUNT_ID).await
293}