1pub mod connections;
7
8#[cfg(test)]
9mod tests;
10
11pub use connections::*;
12
13use super::accounts::DEFAULT_ACCOUNT_ID;
14use super::DbPool;
15use crate::error::StorageError;
16
17type SourceContextRow = (
19 i64,
20 String,
21 String,
22 String,
23 Option<String>,
24 String,
25 Option<String>,
26 String,
27 String,
28);
29
30type ContentNodeRow = (
32 i64,
33 String,
34 i64,
35 String,
36 String,
37 Option<String>,
38 String,
39 Option<String>,
40 Option<String>,
41 String,
42 String,
43 String,
44);
45
46type DraftSeedRow = (
48 i64,
49 String,
50 i64,
51 String,
52 Option<String>,
53 f64,
54 String,
55 String,
56 Option<String>,
57);
58
59#[derive(Debug, Clone, serde::Serialize)]
65pub struct SourceContext {
66 pub id: i64,
67 pub account_id: String,
68 pub source_type: String,
69 pub config_json: String,
70 pub sync_cursor: Option<String>,
71 pub status: String,
72 pub error_message: Option<String>,
73 pub created_at: String,
74 pub updated_at: String,
75}
76
77#[derive(Debug, Clone, serde::Serialize)]
79pub struct ContentNode {
80 pub id: i64,
81 pub account_id: String,
82 pub source_id: i64,
83 pub relative_path: String,
84 pub content_hash: String,
85 pub title: Option<String>,
86 pub body_text: String,
87 pub front_matter_json: Option<String>,
88 pub tags: Option<String>,
89 pub status: String,
90 pub ingested_at: String,
91 pub updated_at: String,
92}
93
94#[derive(Debug, Clone, serde::Serialize)]
96pub struct DraftSeed {
97 pub id: i64,
98 pub account_id: String,
99 pub node_id: i64,
100 pub seed_text: String,
101 pub archetype_suggestion: Option<String>,
102 pub engagement_weight: f64,
103 pub status: String,
104 pub created_at: String,
105 pub used_at: Option<String>,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq)]
110pub enum UpsertResult {
111 Inserted,
113 Updated,
115 Skipped,
117}
118
119pub async fn insert_source_context(
125 pool: &DbPool,
126 source_type: &str,
127 config_json: &str,
128) -> Result<i64, StorageError> {
129 let row: (i64,) = sqlx::query_as(
130 "INSERT INTO source_contexts (account_id, source_type, config_json) \
131 VALUES (?, ?, ?) \
132 RETURNING id",
133 )
134 .bind(DEFAULT_ACCOUNT_ID)
135 .bind(source_type)
136 .bind(config_json)
137 .fetch_one(pool)
138 .await
139 .map_err(|e| StorageError::Query { source: e })?;
140
141 Ok(row.0)
142}
143
144pub async fn get_source_context(
146 pool: &DbPool,
147 id: i64,
148) -> Result<Option<SourceContext>, StorageError> {
149 let row: Option<SourceContextRow> = sqlx::query_as(
150 "SELECT id, account_id, source_type, config_json, sync_cursor, \
151 status, error_message, created_at, updated_at \
152 FROM source_contexts WHERE id = ?",
153 )
154 .bind(id)
155 .fetch_optional(pool)
156 .await
157 .map_err(|e| StorageError::Query { source: e })?;
158
159 Ok(row.map(|r| SourceContext {
160 id: r.0,
161 account_id: r.1,
162 source_type: r.2,
163 config_json: r.3,
164 sync_cursor: r.4,
165 status: r.5,
166 error_message: r.6,
167 created_at: r.7,
168 updated_at: r.8,
169 }))
170}
171
172pub async fn get_source_contexts(pool: &DbPool) -> Result<Vec<SourceContext>, StorageError> {
174 let rows: Vec<SourceContextRow> = sqlx::query_as(
175 "SELECT id, account_id, source_type, config_json, sync_cursor, \
176 status, error_message, created_at, updated_at \
177 FROM source_contexts WHERE status = 'active' ORDER BY id",
178 )
179 .fetch_all(pool)
180 .await
181 .map_err(|e| StorageError::Query { source: e })?;
182
183 Ok(rows
184 .into_iter()
185 .map(|r| SourceContext {
186 id: r.0,
187 account_id: r.1,
188 source_type: r.2,
189 config_json: r.3,
190 sync_cursor: r.4,
191 status: r.5,
192 error_message: r.6,
193 created_at: r.7,
194 updated_at: r.8,
195 })
196 .collect())
197}
198
199pub async fn update_sync_cursor(pool: &DbPool, id: i64, cursor: &str) -> Result<(), StorageError> {
201 sqlx::query(
202 "UPDATE source_contexts SET sync_cursor = ?, updated_at = datetime('now') WHERE id = ?",
203 )
204 .bind(cursor)
205 .bind(id)
206 .execute(pool)
207 .await
208 .map_err(|e| StorageError::Query { source: e })?;
209
210 Ok(())
211}
212
213pub async fn update_source_status(
215 pool: &DbPool,
216 id: i64,
217 status: &str,
218 error_message: Option<&str>,
219) -> Result<(), StorageError> {
220 sqlx::query(
221 "UPDATE source_contexts \
222 SET status = ?, error_message = ?, updated_at = datetime('now') \
223 WHERE id = ?",
224 )
225 .bind(status)
226 .bind(error_message)
227 .bind(id)
228 .execute(pool)
229 .await
230 .map_err(|e| StorageError::Query { source: e })?;
231
232 Ok(())
233}
234
235#[allow(clippy::too_many_arguments)]
245pub async fn upsert_content_node(
246 pool: &DbPool,
247 source_id: i64,
248 relative_path: &str,
249 content_hash: &str,
250 title: Option<&str>,
251 body_text: &str,
252 front_matter_json: Option<&str>,
253 tags: Option<&str>,
254) -> Result<UpsertResult, StorageError> {
255 let existing: Option<(i64, String)> = sqlx::query_as(
257 "SELECT id, content_hash FROM content_nodes \
258 WHERE source_id = ? AND relative_path = ?",
259 )
260 .bind(source_id)
261 .bind(relative_path)
262 .fetch_optional(pool)
263 .await
264 .map_err(|e| StorageError::Query { source: e })?;
265
266 match existing {
267 Some((_id, ref existing_hash)) if existing_hash == content_hash => {
268 Ok(UpsertResult::Skipped)
269 }
270 Some((id, _)) => {
271 sqlx::query(
272 "UPDATE content_nodes \
273 SET content_hash = ?, title = ?, body_text = ?, \
274 front_matter_json = ?, tags = ?, \
275 status = 'pending', updated_at = datetime('now') \
276 WHERE id = ?",
277 )
278 .bind(content_hash)
279 .bind(title)
280 .bind(body_text)
281 .bind(front_matter_json)
282 .bind(tags)
283 .bind(id)
284 .execute(pool)
285 .await
286 .map_err(|e| StorageError::Query { source: e })?;
287
288 Ok(UpsertResult::Updated)
289 }
290 None => {
291 sqlx::query(
292 "INSERT INTO content_nodes \
293 (account_id, source_id, relative_path, content_hash, \
294 title, body_text, front_matter_json, tags) \
295 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
296 )
297 .bind(DEFAULT_ACCOUNT_ID)
298 .bind(source_id)
299 .bind(relative_path)
300 .bind(content_hash)
301 .bind(title)
302 .bind(body_text)
303 .bind(front_matter_json)
304 .bind(tags)
305 .execute(pool)
306 .await
307 .map_err(|e| StorageError::Query { source: e })?;
308
309 Ok(UpsertResult::Inserted)
310 }
311 }
312}
313
314pub async fn get_content_node(pool: &DbPool, id: i64) -> Result<Option<ContentNode>, StorageError> {
316 let row: Option<ContentNodeRow> = sqlx::query_as(
317 "SELECT id, account_id, source_id, relative_path, content_hash, \
318 title, body_text, front_matter_json, tags, status, \
319 ingested_at, updated_at \
320 FROM content_nodes WHERE id = ?",
321 )
322 .bind(id)
323 .fetch_optional(pool)
324 .await
325 .map_err(|e| StorageError::Query { source: e })?;
326
327 Ok(row.map(|r| ContentNode {
328 id: r.0,
329 account_id: r.1,
330 source_id: r.2,
331 relative_path: r.3,
332 content_hash: r.4,
333 title: r.5,
334 body_text: r.6,
335 front_matter_json: r.7,
336 tags: r.8,
337 status: r.9,
338 ingested_at: r.10,
339 updated_at: r.11,
340 }))
341}
342
343pub async fn get_nodes_for_source(
345 pool: &DbPool,
346 source_id: i64,
347 status_filter: Option<&str>,
348) -> Result<Vec<ContentNode>, StorageError> {
349 let rows: Vec<ContentNodeRow> = match status_filter {
350 Some(status) => {
351 sqlx::query_as(
352 "SELECT id, account_id, source_id, relative_path, content_hash, \
353 title, body_text, front_matter_json, tags, status, \
354 ingested_at, updated_at \
355 FROM content_nodes WHERE source_id = ? AND status = ? ORDER BY id",
356 )
357 .bind(source_id)
358 .bind(status)
359 .fetch_all(pool)
360 .await
361 }
362 None => {
363 sqlx::query_as(
364 "SELECT id, account_id, source_id, relative_path, content_hash, \
365 title, body_text, front_matter_json, tags, status, \
366 ingested_at, updated_at \
367 FROM content_nodes WHERE source_id = ? ORDER BY id",
368 )
369 .bind(source_id)
370 .fetch_all(pool)
371 .await
372 }
373 }
374 .map_err(|e| StorageError::Query { source: e })?;
375
376 Ok(rows
377 .into_iter()
378 .map(|r| ContentNode {
379 id: r.0,
380 account_id: r.1,
381 source_id: r.2,
382 relative_path: r.3,
383 content_hash: r.4,
384 title: r.5,
385 body_text: r.6,
386 front_matter_json: r.7,
387 tags: r.8,
388 status: r.9,
389 ingested_at: r.10,
390 updated_at: r.11,
391 })
392 .collect())
393}
394
395pub async fn insert_draft_seed(
401 pool: &DbPool,
402 node_id: i64,
403 seed_text: &str,
404 archetype_suggestion: Option<&str>,
405) -> Result<i64, StorageError> {
406 let row: (i64,) = sqlx::query_as(
407 "INSERT INTO draft_seeds (account_id, node_id, seed_text, archetype_suggestion) \
408 VALUES (?, ?, ?, ?) \
409 RETURNING id",
410 )
411 .bind(DEFAULT_ACCOUNT_ID)
412 .bind(node_id)
413 .bind(seed_text)
414 .bind(archetype_suggestion)
415 .fetch_one(pool)
416 .await
417 .map_err(|e| StorageError::Query { source: e })?;
418
419 Ok(row.0)
420}
421
422pub async fn get_pending_seeds(pool: &DbPool, limit: u32) -> Result<Vec<DraftSeed>, StorageError> {
424 let rows: Vec<DraftSeedRow> = sqlx::query_as(
425 "SELECT id, account_id, node_id, seed_text, archetype_suggestion, \
426 engagement_weight, status, created_at, used_at \
427 FROM draft_seeds \
428 WHERE status = 'pending' \
429 ORDER BY engagement_weight DESC \
430 LIMIT ?",
431 )
432 .bind(limit)
433 .fetch_all(pool)
434 .await
435 .map_err(|e| StorageError::Query { source: e })?;
436
437 Ok(rows
438 .into_iter()
439 .map(|r| DraftSeed {
440 id: r.0,
441 account_id: r.1,
442 node_id: r.2,
443 seed_text: r.3,
444 archetype_suggestion: r.4,
445 engagement_weight: r.5,
446 status: r.6,
447 created_at: r.7,
448 used_at: r.8,
449 })
450 .collect())
451}
452
453pub async fn mark_seed_used(pool: &DbPool, id: i64) -> Result<(), StorageError> {
455 sqlx::query("UPDATE draft_seeds SET status = 'used', used_at = datetime('now') WHERE id = ?")
456 .bind(id)
457 .execute(pool)
458 .await
459 .map_err(|e| StorageError::Query { source: e })?;
460
461 Ok(())
462}
463
464pub async fn find_source_by_path(
466 pool: &DbPool,
467 path: &str,
468) -> Result<Option<SourceContext>, StorageError> {
469 let row: Option<SourceContextRow> = sqlx::query_as(
470 "SELECT id, account_id, source_type, config_json, sync_cursor, \
471 status, error_message, created_at, updated_at \
472 FROM source_contexts \
473 WHERE account_id = ? AND source_type = 'local_fs' AND status = 'active' \
474 AND config_json LIKE '%' || ? || '%' \
475 LIMIT 1",
476 )
477 .bind(DEFAULT_ACCOUNT_ID)
478 .bind(path)
479 .fetch_optional(pool)
480 .await
481 .map_err(|e| StorageError::Query { source: e })?;
482
483 Ok(row.map(|r| SourceContext {
484 id: r.0,
485 account_id: r.1,
486 source_type: r.2,
487 config_json: r.3,
488 sync_cursor: r.4,
489 status: r.5,
490 error_message: r.6,
491 created_at: r.7,
492 updated_at: r.8,
493 }))
494}
495
496pub async fn ensure_local_fs_source(
501 pool: &DbPool,
502 path: &str,
503 config_json: &str,
504) -> Result<i64, StorageError> {
505 if let Some(ctx) = find_source_by_path(pool, path).await? {
506 return Ok(ctx.id);
507 }
508 insert_source_context(pool, "local_fs", config_json).await
509}
510
511pub async fn get_pending_content_nodes(
517 pool: &DbPool,
518 limit: u32,
519) -> Result<Vec<ContentNode>, StorageError> {
520 let rows: Vec<ContentNodeRow> = sqlx::query_as(
521 "SELECT id, account_id, source_id, relative_path, content_hash, \
522 title, body_text, front_matter_json, tags, status, \
523 ingested_at, updated_at \
524 FROM content_nodes \
525 WHERE status = 'pending' \
526 ORDER BY ingested_at ASC \
527 LIMIT ?",
528 )
529 .bind(limit)
530 .fetch_all(pool)
531 .await
532 .map_err(|e| StorageError::Query { source: e })?;
533
534 Ok(rows
535 .into_iter()
536 .map(|r| ContentNode {
537 id: r.0,
538 account_id: r.1,
539 source_id: r.2,
540 relative_path: r.3,
541 content_hash: r.4,
542 title: r.5,
543 body_text: r.6,
544 front_matter_json: r.7,
545 tags: r.8,
546 status: r.9,
547 ingested_at: r.10,
548 updated_at: r.11,
549 })
550 .collect())
551}
552
553pub async fn mark_node_processed(pool: &DbPool, node_id: i64) -> Result<(), StorageError> {
555 sqlx::query(
556 "UPDATE content_nodes SET status = 'processed', updated_at = datetime('now') WHERE id = ?",
557 )
558 .bind(node_id)
559 .execute(pool)
560 .await
561 .map_err(|e| StorageError::Query { source: e })?;
562 Ok(())
563}
564
565pub async fn insert_draft_seed_with_weight(
567 pool: &DbPool,
568 node_id: i64,
569 seed_text: &str,
570 archetype_suggestion: Option<&str>,
571 weight: f64,
572) -> Result<i64, StorageError> {
573 let row: (i64,) = sqlx::query_as(
574 "INSERT INTO draft_seeds (account_id, node_id, seed_text, archetype_suggestion, engagement_weight) \
575 VALUES (?, ?, ?, ?, ?) \
576 RETURNING id",
577 )
578 .bind(DEFAULT_ACCOUNT_ID)
579 .bind(node_id)
580 .bind(seed_text)
581 .bind(archetype_suggestion)
582 .bind(weight)
583 .fetch_one(pool)
584 .await
585 .map_err(|e| StorageError::Query { source: e })?;
586
587 Ok(row.0)
588}
589
590#[derive(Debug, Clone, serde::Serialize)]
592pub struct SeedWithContext {
593 pub seed_text: String,
595 pub source_title: Option<String>,
597 pub archetype_suggestion: Option<String>,
599 pub engagement_weight: f64,
601}
602
603pub async fn get_seeds_for_context(
608 pool: &DbPool,
609 limit: u32,
610) -> Result<Vec<SeedWithContext>, StorageError> {
611 let rows: Vec<(String, Option<String>, Option<String>, f64)> = sqlx::query_as(
612 "SELECT ds.seed_text, cn.title, ds.archetype_suggestion, ds.engagement_weight \
613 FROM draft_seeds ds \
614 JOIN content_nodes cn ON cn.id = ds.node_id \
615 WHERE ds.status = 'pending' \
616 ORDER BY ds.engagement_weight DESC \
617 LIMIT ?",
618 )
619 .bind(limit)
620 .fetch_all(pool)
621 .await
622 .map_err(|e| StorageError::Query { source: e })?;
623
624 Ok(rows
625 .into_iter()
626 .map(|r| SeedWithContext {
627 seed_text: r.0,
628 source_title: r.1,
629 archetype_suggestion: r.2,
630 engagement_weight: r.3,
631 })
632 .collect())
633}
634
635pub async fn ensure_google_drive_source(
640 pool: &DbPool,
641 folder_id: &str,
642 config_json: &str,
643) -> Result<i64, StorageError> {
644 if let Some(ctx) = find_source_by_folder_id(pool, folder_id).await? {
645 return Ok(ctx.id);
646 }
647 insert_source_context(pool, "google_drive", config_json).await
648}
649
650pub async fn find_source_by_folder_id(
652 pool: &DbPool,
653 folder_id: &str,
654) -> Result<Option<SourceContext>, StorageError> {
655 let row: Option<SourceContextRow> = sqlx::query_as(
656 "SELECT id, account_id, source_type, config_json, sync_cursor, \
657 status, error_message, created_at, updated_at \
658 FROM source_contexts \
659 WHERE account_id = ? AND source_type = 'google_drive' AND status = 'active' \
660 AND config_json LIKE '%' || ? || '%' \
661 LIMIT 1",
662 )
663 .bind(DEFAULT_ACCOUNT_ID)
664 .bind(folder_id)
665 .fetch_optional(pool)
666 .await
667 .map_err(|e| StorageError::Query { source: e })?;
668
669 Ok(row.map(|r| SourceContext {
670 id: r.0,
671 account_id: r.1,
672 source_type: r.2,
673 config_json: r.3,
674 sync_cursor: r.4,
675 status: r.5,
676 error_message: r.6,
677 created_at: r.7,
678 updated_at: r.8,
679 }))
680}
681
682pub async fn ensure_manual_source(pool: &DbPool) -> Result<i64, StorageError> {
687 let existing: Option<(i64,)> = sqlx::query_as(
688 "SELECT id FROM source_contexts \
689 WHERE account_id = ? AND source_type = 'manual' AND status = 'active' \
690 LIMIT 1",
691 )
692 .bind(DEFAULT_ACCOUNT_ID)
693 .fetch_optional(pool)
694 .await
695 .map_err(|e| StorageError::Query { source: e })?;
696
697 match existing {
698 Some((id,)) => Ok(id),
699 None => insert_source_context(pool, "manual", "{}").await,
700 }
701}