1#[cfg(test)]
7mod tests;
8
9use super::accounts::DEFAULT_ACCOUNT_ID;
10use super::DbPool;
11use crate::error::StorageError;
12
13type SourceContextRow = (
15 i64,
16 String,
17 String,
18 String,
19 Option<String>,
20 String,
21 Option<String>,
22 String,
23 String,
24);
25
26type ContentNodeRow = (
28 i64,
29 String,
30 i64,
31 String,
32 String,
33 Option<String>,
34 String,
35 Option<String>,
36 Option<String>,
37 String,
38 String,
39 String,
40);
41
42type DraftSeedRow = (
44 i64,
45 String,
46 i64,
47 String,
48 Option<String>,
49 f64,
50 String,
51 String,
52 Option<String>,
53);
54
55#[derive(Debug, Clone, serde::Serialize)]
61pub struct SourceContext {
62 pub id: i64,
63 pub account_id: String,
64 pub source_type: String,
65 pub config_json: String,
66 pub sync_cursor: Option<String>,
67 pub status: String,
68 pub error_message: Option<String>,
69 pub created_at: String,
70 pub updated_at: String,
71}
72
73#[derive(Debug, Clone, serde::Serialize)]
75pub struct ContentNode {
76 pub id: i64,
77 pub account_id: String,
78 pub source_id: i64,
79 pub relative_path: String,
80 pub content_hash: String,
81 pub title: Option<String>,
82 pub body_text: String,
83 pub front_matter_json: Option<String>,
84 pub tags: Option<String>,
85 pub status: String,
86 pub ingested_at: String,
87 pub updated_at: String,
88}
89
90#[derive(Debug, Clone, serde::Serialize)]
92pub struct DraftSeed {
93 pub id: i64,
94 pub account_id: String,
95 pub node_id: i64,
96 pub seed_text: String,
97 pub archetype_suggestion: Option<String>,
98 pub engagement_weight: f64,
99 pub status: String,
100 pub created_at: String,
101 pub used_at: Option<String>,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
106pub enum UpsertResult {
107 Inserted,
109 Updated,
111 Skipped,
113}
114
115pub async fn insert_source_context(
121 pool: &DbPool,
122 source_type: &str,
123 config_json: &str,
124) -> Result<i64, StorageError> {
125 let row: (i64,) = sqlx::query_as(
126 "INSERT INTO source_contexts (account_id, source_type, config_json) \
127 VALUES (?, ?, ?) \
128 RETURNING id",
129 )
130 .bind(DEFAULT_ACCOUNT_ID)
131 .bind(source_type)
132 .bind(config_json)
133 .fetch_one(pool)
134 .await
135 .map_err(|e| StorageError::Query { source: e })?;
136
137 Ok(row.0)
138}
139
140pub async fn get_source_context(
142 pool: &DbPool,
143 id: i64,
144) -> Result<Option<SourceContext>, StorageError> {
145 let row: Option<SourceContextRow> = sqlx::query_as(
146 "SELECT id, account_id, source_type, config_json, sync_cursor, \
147 status, error_message, created_at, updated_at \
148 FROM source_contexts WHERE id = ?",
149 )
150 .bind(id)
151 .fetch_optional(pool)
152 .await
153 .map_err(|e| StorageError::Query { source: e })?;
154
155 Ok(row.map(|r| SourceContext {
156 id: r.0,
157 account_id: r.1,
158 source_type: r.2,
159 config_json: r.3,
160 sync_cursor: r.4,
161 status: r.5,
162 error_message: r.6,
163 created_at: r.7,
164 updated_at: r.8,
165 }))
166}
167
168pub async fn get_source_contexts(pool: &DbPool) -> Result<Vec<SourceContext>, StorageError> {
170 let rows: Vec<SourceContextRow> = sqlx::query_as(
171 "SELECT id, account_id, source_type, config_json, sync_cursor, \
172 status, error_message, created_at, updated_at \
173 FROM source_contexts WHERE status = 'active' ORDER BY id",
174 )
175 .fetch_all(pool)
176 .await
177 .map_err(|e| StorageError::Query { source: e })?;
178
179 Ok(rows
180 .into_iter()
181 .map(|r| SourceContext {
182 id: r.0,
183 account_id: r.1,
184 source_type: r.2,
185 config_json: r.3,
186 sync_cursor: r.4,
187 status: r.5,
188 error_message: r.6,
189 created_at: r.7,
190 updated_at: r.8,
191 })
192 .collect())
193}
194
195pub async fn update_sync_cursor(pool: &DbPool, id: i64, cursor: &str) -> Result<(), StorageError> {
197 sqlx::query(
198 "UPDATE source_contexts SET sync_cursor = ?, updated_at = datetime('now') WHERE id = ?",
199 )
200 .bind(cursor)
201 .bind(id)
202 .execute(pool)
203 .await
204 .map_err(|e| StorageError::Query { source: e })?;
205
206 Ok(())
207}
208
209pub async fn update_source_status(
211 pool: &DbPool,
212 id: i64,
213 status: &str,
214 error_message: Option<&str>,
215) -> Result<(), StorageError> {
216 sqlx::query(
217 "UPDATE source_contexts \
218 SET status = ?, error_message = ?, updated_at = datetime('now') \
219 WHERE id = ?",
220 )
221 .bind(status)
222 .bind(error_message)
223 .bind(id)
224 .execute(pool)
225 .await
226 .map_err(|e| StorageError::Query { source: e })?;
227
228 Ok(())
229}
230
231#[allow(clippy::too_many_arguments)]
241pub async fn upsert_content_node(
242 pool: &DbPool,
243 source_id: i64,
244 relative_path: &str,
245 content_hash: &str,
246 title: Option<&str>,
247 body_text: &str,
248 front_matter_json: Option<&str>,
249 tags: Option<&str>,
250) -> Result<UpsertResult, StorageError> {
251 let existing: Option<(i64, String)> = sqlx::query_as(
253 "SELECT id, content_hash FROM content_nodes \
254 WHERE source_id = ? AND relative_path = ?",
255 )
256 .bind(source_id)
257 .bind(relative_path)
258 .fetch_optional(pool)
259 .await
260 .map_err(|e| StorageError::Query { source: e })?;
261
262 match existing {
263 Some((_id, ref existing_hash)) if existing_hash == content_hash => {
264 Ok(UpsertResult::Skipped)
265 }
266 Some((id, _)) => {
267 sqlx::query(
268 "UPDATE content_nodes \
269 SET content_hash = ?, title = ?, body_text = ?, \
270 front_matter_json = ?, tags = ?, \
271 status = 'pending', updated_at = datetime('now') \
272 WHERE id = ?",
273 )
274 .bind(content_hash)
275 .bind(title)
276 .bind(body_text)
277 .bind(front_matter_json)
278 .bind(tags)
279 .bind(id)
280 .execute(pool)
281 .await
282 .map_err(|e| StorageError::Query { source: e })?;
283
284 Ok(UpsertResult::Updated)
285 }
286 None => {
287 sqlx::query(
288 "INSERT INTO content_nodes \
289 (account_id, source_id, relative_path, content_hash, \
290 title, body_text, front_matter_json, tags) \
291 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
292 )
293 .bind(DEFAULT_ACCOUNT_ID)
294 .bind(source_id)
295 .bind(relative_path)
296 .bind(content_hash)
297 .bind(title)
298 .bind(body_text)
299 .bind(front_matter_json)
300 .bind(tags)
301 .execute(pool)
302 .await
303 .map_err(|e| StorageError::Query { source: e })?;
304
305 Ok(UpsertResult::Inserted)
306 }
307 }
308}
309
310pub async fn get_content_node(pool: &DbPool, id: i64) -> Result<Option<ContentNode>, StorageError> {
312 let row: Option<ContentNodeRow> = sqlx::query_as(
313 "SELECT id, account_id, source_id, relative_path, content_hash, \
314 title, body_text, front_matter_json, tags, status, \
315 ingested_at, updated_at \
316 FROM content_nodes WHERE id = ?",
317 )
318 .bind(id)
319 .fetch_optional(pool)
320 .await
321 .map_err(|e| StorageError::Query { source: e })?;
322
323 Ok(row.map(|r| ContentNode {
324 id: r.0,
325 account_id: r.1,
326 source_id: r.2,
327 relative_path: r.3,
328 content_hash: r.4,
329 title: r.5,
330 body_text: r.6,
331 front_matter_json: r.7,
332 tags: r.8,
333 status: r.9,
334 ingested_at: r.10,
335 updated_at: r.11,
336 }))
337}
338
339pub async fn get_nodes_for_source(
341 pool: &DbPool,
342 source_id: i64,
343 status_filter: Option<&str>,
344) -> Result<Vec<ContentNode>, StorageError> {
345 let rows: Vec<ContentNodeRow> = match status_filter {
346 Some(status) => {
347 sqlx::query_as(
348 "SELECT id, account_id, source_id, relative_path, content_hash, \
349 title, body_text, front_matter_json, tags, status, \
350 ingested_at, updated_at \
351 FROM content_nodes WHERE source_id = ? AND status = ? ORDER BY id",
352 )
353 .bind(source_id)
354 .bind(status)
355 .fetch_all(pool)
356 .await
357 }
358 None => {
359 sqlx::query_as(
360 "SELECT id, account_id, source_id, relative_path, content_hash, \
361 title, body_text, front_matter_json, tags, status, \
362 ingested_at, updated_at \
363 FROM content_nodes WHERE source_id = ? ORDER BY id",
364 )
365 .bind(source_id)
366 .fetch_all(pool)
367 .await
368 }
369 }
370 .map_err(|e| StorageError::Query { source: e })?;
371
372 Ok(rows
373 .into_iter()
374 .map(|r| ContentNode {
375 id: r.0,
376 account_id: r.1,
377 source_id: r.2,
378 relative_path: r.3,
379 content_hash: r.4,
380 title: r.5,
381 body_text: r.6,
382 front_matter_json: r.7,
383 tags: r.8,
384 status: r.9,
385 ingested_at: r.10,
386 updated_at: r.11,
387 })
388 .collect())
389}
390
391pub async fn insert_draft_seed(
397 pool: &DbPool,
398 node_id: i64,
399 seed_text: &str,
400 archetype_suggestion: Option<&str>,
401) -> Result<i64, StorageError> {
402 let row: (i64,) = sqlx::query_as(
403 "INSERT INTO draft_seeds (account_id, node_id, seed_text, archetype_suggestion) \
404 VALUES (?, ?, ?, ?) \
405 RETURNING id",
406 )
407 .bind(DEFAULT_ACCOUNT_ID)
408 .bind(node_id)
409 .bind(seed_text)
410 .bind(archetype_suggestion)
411 .fetch_one(pool)
412 .await
413 .map_err(|e| StorageError::Query { source: e })?;
414
415 Ok(row.0)
416}
417
418pub async fn get_pending_seeds(pool: &DbPool, limit: u32) -> Result<Vec<DraftSeed>, StorageError> {
420 let rows: Vec<DraftSeedRow> = sqlx::query_as(
421 "SELECT id, account_id, node_id, seed_text, archetype_suggestion, \
422 engagement_weight, status, created_at, used_at \
423 FROM draft_seeds \
424 WHERE status = 'pending' \
425 ORDER BY engagement_weight DESC \
426 LIMIT ?",
427 )
428 .bind(limit)
429 .fetch_all(pool)
430 .await
431 .map_err(|e| StorageError::Query { source: e })?;
432
433 Ok(rows
434 .into_iter()
435 .map(|r| DraftSeed {
436 id: r.0,
437 account_id: r.1,
438 node_id: r.2,
439 seed_text: r.3,
440 archetype_suggestion: r.4,
441 engagement_weight: r.5,
442 status: r.6,
443 created_at: r.7,
444 used_at: r.8,
445 })
446 .collect())
447}
448
449pub async fn mark_seed_used(pool: &DbPool, id: i64) -> Result<(), StorageError> {
451 sqlx::query("UPDATE draft_seeds SET status = 'used', used_at = datetime('now') WHERE id = ?")
452 .bind(id)
453 .execute(pool)
454 .await
455 .map_err(|e| StorageError::Query { source: e })?;
456
457 Ok(())
458}
459
460pub async fn find_source_by_path(
462 pool: &DbPool,
463 path: &str,
464) -> Result<Option<SourceContext>, StorageError> {
465 let row: Option<SourceContextRow> = sqlx::query_as(
466 "SELECT id, account_id, source_type, config_json, sync_cursor, \
467 status, error_message, created_at, updated_at \
468 FROM source_contexts \
469 WHERE account_id = ? AND source_type = 'local_fs' AND status = 'active' \
470 AND config_json LIKE '%' || ? || '%' \
471 LIMIT 1",
472 )
473 .bind(DEFAULT_ACCOUNT_ID)
474 .bind(path)
475 .fetch_optional(pool)
476 .await
477 .map_err(|e| StorageError::Query { source: e })?;
478
479 Ok(row.map(|r| SourceContext {
480 id: r.0,
481 account_id: r.1,
482 source_type: r.2,
483 config_json: r.3,
484 sync_cursor: r.4,
485 status: r.5,
486 error_message: r.6,
487 created_at: r.7,
488 updated_at: r.8,
489 }))
490}
491
492pub async fn ensure_local_fs_source(
497 pool: &DbPool,
498 path: &str,
499 config_json: &str,
500) -> Result<i64, StorageError> {
501 if let Some(ctx) = find_source_by_path(pool, path).await? {
502 return Ok(ctx.id);
503 }
504 insert_source_context(pool, "local_fs", config_json).await
505}
506
507pub async fn get_pending_content_nodes(
513 pool: &DbPool,
514 limit: u32,
515) -> Result<Vec<ContentNode>, StorageError> {
516 let rows: Vec<ContentNodeRow> = sqlx::query_as(
517 "SELECT id, account_id, source_id, relative_path, content_hash, \
518 title, body_text, front_matter_json, tags, status, \
519 ingested_at, updated_at \
520 FROM content_nodes \
521 WHERE status = 'pending' \
522 ORDER BY ingested_at ASC \
523 LIMIT ?",
524 )
525 .bind(limit)
526 .fetch_all(pool)
527 .await
528 .map_err(|e| StorageError::Query { source: e })?;
529
530 Ok(rows
531 .into_iter()
532 .map(|r| ContentNode {
533 id: r.0,
534 account_id: r.1,
535 source_id: r.2,
536 relative_path: r.3,
537 content_hash: r.4,
538 title: r.5,
539 body_text: r.6,
540 front_matter_json: r.7,
541 tags: r.8,
542 status: r.9,
543 ingested_at: r.10,
544 updated_at: r.11,
545 })
546 .collect())
547}
548
549pub async fn mark_node_processed(pool: &DbPool, node_id: i64) -> Result<(), StorageError> {
551 sqlx::query(
552 "UPDATE content_nodes SET status = 'processed', updated_at = datetime('now') WHERE id = ?",
553 )
554 .bind(node_id)
555 .execute(pool)
556 .await
557 .map_err(|e| StorageError::Query { source: e })?;
558 Ok(())
559}
560
561pub async fn insert_draft_seed_with_weight(
563 pool: &DbPool,
564 node_id: i64,
565 seed_text: &str,
566 archetype_suggestion: Option<&str>,
567 weight: f64,
568) -> Result<i64, StorageError> {
569 let row: (i64,) = sqlx::query_as(
570 "INSERT INTO draft_seeds (account_id, node_id, seed_text, archetype_suggestion, engagement_weight) \
571 VALUES (?, ?, ?, ?, ?) \
572 RETURNING id",
573 )
574 .bind(DEFAULT_ACCOUNT_ID)
575 .bind(node_id)
576 .bind(seed_text)
577 .bind(archetype_suggestion)
578 .bind(weight)
579 .fetch_one(pool)
580 .await
581 .map_err(|e| StorageError::Query { source: e })?;
582
583 Ok(row.0)
584}
585
586#[derive(Debug, Clone, serde::Serialize)]
588pub struct SeedWithContext {
589 pub seed_text: String,
591 pub source_title: Option<String>,
593 pub archetype_suggestion: Option<String>,
595 pub engagement_weight: f64,
597}
598
599pub async fn get_seeds_for_context(
604 pool: &DbPool,
605 limit: u32,
606) -> Result<Vec<SeedWithContext>, StorageError> {
607 let rows: Vec<(String, Option<String>, Option<String>, f64)> = sqlx::query_as(
608 "SELECT ds.seed_text, cn.title, ds.archetype_suggestion, ds.engagement_weight \
609 FROM draft_seeds ds \
610 JOIN content_nodes cn ON cn.id = ds.node_id \
611 WHERE ds.status = 'pending' \
612 ORDER BY ds.engagement_weight DESC \
613 LIMIT ?",
614 )
615 .bind(limit)
616 .fetch_all(pool)
617 .await
618 .map_err(|e| StorageError::Query { source: e })?;
619
620 Ok(rows
621 .into_iter()
622 .map(|r| SeedWithContext {
623 seed_text: r.0,
624 source_title: r.1,
625 archetype_suggestion: r.2,
626 engagement_weight: r.3,
627 })
628 .collect())
629}
630
631pub async fn ensure_google_drive_source(
636 pool: &DbPool,
637 folder_id: &str,
638 config_json: &str,
639) -> Result<i64, StorageError> {
640 if let Some(ctx) = find_source_by_folder_id(pool, folder_id).await? {
641 return Ok(ctx.id);
642 }
643 insert_source_context(pool, "google_drive", config_json).await
644}
645
646pub async fn find_source_by_folder_id(
648 pool: &DbPool,
649 folder_id: &str,
650) -> Result<Option<SourceContext>, StorageError> {
651 let row: Option<SourceContextRow> = sqlx::query_as(
652 "SELECT id, account_id, source_type, config_json, sync_cursor, \
653 status, error_message, created_at, updated_at \
654 FROM source_contexts \
655 WHERE account_id = ? AND source_type = 'google_drive' AND status = 'active' \
656 AND config_json LIKE '%' || ? || '%' \
657 LIMIT 1",
658 )
659 .bind(DEFAULT_ACCOUNT_ID)
660 .bind(folder_id)
661 .fetch_optional(pool)
662 .await
663 .map_err(|e| StorageError::Query { source: e })?;
664
665 Ok(row.map(|r| SourceContext {
666 id: r.0,
667 account_id: r.1,
668 source_type: r.2,
669 config_json: r.3,
670 sync_cursor: r.4,
671 status: r.5,
672 error_message: r.6,
673 created_at: r.7,
674 updated_at: r.8,
675 }))
676}
677
678pub async fn ensure_manual_source(pool: &DbPool) -> Result<i64, StorageError> {
683 let existing: Option<(i64,)> = sqlx::query_as(
684 "SELECT id FROM source_contexts \
685 WHERE account_id = ? AND source_type = 'manual' AND status = 'active' \
686 LIMIT 1",
687 )
688 .bind(DEFAULT_ACCOUNT_ID)
689 .fetch_optional(pool)
690 .await
691 .map_err(|e| StorageError::Query { source: e })?;
692
693 match existing {
694 Some((id,)) => Ok(id),
695 None => insert_source_context(pool, "manual", "{}").await,
696 }
697}