Skip to main content

tuitbot_core/storage/watchtower/
mod.rs

1//! CRUD operations for Watchtower ingestion tables.
2//!
3//! Manages source contexts, content nodes, draft seeds, and remote
4//! sync connections for the Cold-Start Watchtower RAG pipeline.
5
6pub mod connections;
7
8#[cfg(test)]
9mod tests;
10
11pub use connections::*;
12
13use super::accounts::DEFAULT_ACCOUNT_ID;
14use super::DbPool;
15use crate::error::StorageError;
16
17/// Row type for source_contexts queries.
18type SourceContextRow = (
19    i64,
20    String,
21    String,
22    String,
23    Option<String>,
24    String,
25    Option<String>,
26    String,
27    String,
28);
29
30/// Row type for content_nodes queries.
31type ContentNodeRow = (
32    i64,
33    String,
34    i64,
35    String,
36    String,
37    Option<String>,
38    String,
39    Option<String>,
40    Option<String>,
41    String,
42    String,
43    String,
44);
45
46/// Row type for draft_seeds queries.
47type DraftSeedRow = (
48    i64,
49    String,
50    i64,
51    String,
52    Option<String>,
53    f64,
54    String,
55    String,
56    Option<String>,
57);
58
59// ============================================================================
60// Row structs
61// ============================================================================
62
63/// A registered content source.
64#[derive(Debug, Clone, serde::Serialize)]
65pub struct SourceContext {
66    pub id: i64,
67    pub account_id: String,
68    pub source_type: String,
69    pub config_json: String,
70    pub sync_cursor: Option<String>,
71    pub status: String,
72    pub error_message: Option<String>,
73    pub created_at: String,
74    pub updated_at: String,
75}
76
77/// An ingested content node from a source.
78#[derive(Debug, Clone, serde::Serialize)]
79pub struct ContentNode {
80    pub id: i64,
81    pub account_id: String,
82    pub source_id: i64,
83    pub relative_path: String,
84    pub content_hash: String,
85    pub title: Option<String>,
86    pub body_text: String,
87    pub front_matter_json: Option<String>,
88    pub tags: Option<String>,
89    pub status: String,
90    pub ingested_at: String,
91    pub updated_at: String,
92}
93
94/// A pre-computed draft seed derived from a content node.
95#[derive(Debug, Clone, serde::Serialize)]
96pub struct DraftSeed {
97    pub id: i64,
98    pub account_id: String,
99    pub node_id: i64,
100    pub seed_text: String,
101    pub archetype_suggestion: Option<String>,
102    pub engagement_weight: f64,
103    pub status: String,
104    pub created_at: String,
105    pub used_at: Option<String>,
106}
107
108/// Result of an upsert operation on a content node.
109#[derive(Debug, Clone, PartialEq, Eq)]
110pub enum UpsertResult {
111    /// A new node was inserted.
112    Inserted,
113    /// An existing node was updated (content hash changed).
114    Updated,
115    /// The node was skipped (content hash unchanged).
116    Skipped,
117}
118
119// ============================================================================
120// Source contexts
121// ============================================================================
122
123/// Insert a new source context and return its ID.
124pub async fn insert_source_context(
125    pool: &DbPool,
126    source_type: &str,
127    config_json: &str,
128) -> Result<i64, StorageError> {
129    let row: (i64,) = sqlx::query_as(
130        "INSERT INTO source_contexts (account_id, source_type, config_json) \
131         VALUES (?, ?, ?) \
132         RETURNING id",
133    )
134    .bind(DEFAULT_ACCOUNT_ID)
135    .bind(source_type)
136    .bind(config_json)
137    .fetch_one(pool)
138    .await
139    .map_err(|e| StorageError::Query { source: e })?;
140
141    Ok(row.0)
142}
143
144/// Get a source context by ID.
145pub async fn get_source_context(
146    pool: &DbPool,
147    id: i64,
148) -> Result<Option<SourceContext>, StorageError> {
149    let row: Option<SourceContextRow> = sqlx::query_as(
150        "SELECT id, account_id, source_type, config_json, sync_cursor, \
151                    status, error_message, created_at, updated_at \
152             FROM source_contexts WHERE id = ?",
153    )
154    .bind(id)
155    .fetch_optional(pool)
156    .await
157    .map_err(|e| StorageError::Query { source: e })?;
158
159    Ok(row.map(|r| SourceContext {
160        id: r.0,
161        account_id: r.1,
162        source_type: r.2,
163        config_json: r.3,
164        sync_cursor: r.4,
165        status: r.5,
166        error_message: r.6,
167        created_at: r.7,
168        updated_at: r.8,
169    }))
170}
171
172/// Get all active source contexts.
173pub async fn get_source_contexts(pool: &DbPool) -> Result<Vec<SourceContext>, StorageError> {
174    let rows: Vec<SourceContextRow> = sqlx::query_as(
175        "SELECT id, account_id, source_type, config_json, sync_cursor, \
176                    status, error_message, created_at, updated_at \
177             FROM source_contexts WHERE status = 'active' ORDER BY id",
178    )
179    .fetch_all(pool)
180    .await
181    .map_err(|e| StorageError::Query { source: e })?;
182
183    Ok(rows
184        .into_iter()
185        .map(|r| SourceContext {
186            id: r.0,
187            account_id: r.1,
188            source_type: r.2,
189            config_json: r.3,
190            sync_cursor: r.4,
191            status: r.5,
192            error_message: r.6,
193            created_at: r.7,
194            updated_at: r.8,
195        })
196        .collect())
197}
198
199/// Update the sync cursor for a source context.
200pub async fn update_sync_cursor(pool: &DbPool, id: i64, cursor: &str) -> Result<(), StorageError> {
201    sqlx::query(
202        "UPDATE source_contexts SET sync_cursor = ?, updated_at = datetime('now') WHERE id = ?",
203    )
204    .bind(cursor)
205    .bind(id)
206    .execute(pool)
207    .await
208    .map_err(|e| StorageError::Query { source: e })?;
209
210    Ok(())
211}
212
213/// Update the status (and optional error message) of a source context.
214pub async fn update_source_status(
215    pool: &DbPool,
216    id: i64,
217    status: &str,
218    error_message: Option<&str>,
219) -> Result<(), StorageError> {
220    sqlx::query(
221        "UPDATE source_contexts \
222         SET status = ?, error_message = ?, updated_at = datetime('now') \
223         WHERE id = ?",
224    )
225    .bind(status)
226    .bind(error_message)
227    .bind(id)
228    .execute(pool)
229    .await
230    .map_err(|e| StorageError::Query { source: e })?;
231
232    Ok(())
233}
234
235// ============================================================================
236// Content nodes
237// ============================================================================
238
239/// Upsert a content node by (source_id, relative_path).
240///
241/// If the node does not exist, it is inserted. If it exists and the content
242/// hash has changed, it is updated. If the hash is unchanged, the operation
243/// is skipped.
244#[allow(clippy::too_many_arguments)]
245pub async fn upsert_content_node(
246    pool: &DbPool,
247    source_id: i64,
248    relative_path: &str,
249    content_hash: &str,
250    title: Option<&str>,
251    body_text: &str,
252    front_matter_json: Option<&str>,
253    tags: Option<&str>,
254) -> Result<UpsertResult, StorageError> {
255    // Check if node exists and get its current hash.
256    let existing: Option<(i64, String)> = sqlx::query_as(
257        "SELECT id, content_hash FROM content_nodes \
258         WHERE source_id = ? AND relative_path = ?",
259    )
260    .bind(source_id)
261    .bind(relative_path)
262    .fetch_optional(pool)
263    .await
264    .map_err(|e| StorageError::Query { source: e })?;
265
266    match existing {
267        Some((_id, ref existing_hash)) if existing_hash == content_hash => {
268            Ok(UpsertResult::Skipped)
269        }
270        Some((id, _)) => {
271            sqlx::query(
272                "UPDATE content_nodes \
273                 SET content_hash = ?, title = ?, body_text = ?, \
274                     front_matter_json = ?, tags = ?, \
275                     status = 'pending', updated_at = datetime('now') \
276                 WHERE id = ?",
277            )
278            .bind(content_hash)
279            .bind(title)
280            .bind(body_text)
281            .bind(front_matter_json)
282            .bind(tags)
283            .bind(id)
284            .execute(pool)
285            .await
286            .map_err(|e| StorageError::Query { source: e })?;
287
288            Ok(UpsertResult::Updated)
289        }
290        None => {
291            sqlx::query(
292                "INSERT INTO content_nodes \
293                 (account_id, source_id, relative_path, content_hash, \
294                  title, body_text, front_matter_json, tags) \
295                 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
296            )
297            .bind(DEFAULT_ACCOUNT_ID)
298            .bind(source_id)
299            .bind(relative_path)
300            .bind(content_hash)
301            .bind(title)
302            .bind(body_text)
303            .bind(front_matter_json)
304            .bind(tags)
305            .execute(pool)
306            .await
307            .map_err(|e| StorageError::Query { source: e })?;
308
309            Ok(UpsertResult::Inserted)
310        }
311    }
312}
313
314/// Get a content node by ID.
315pub async fn get_content_node(pool: &DbPool, id: i64) -> Result<Option<ContentNode>, StorageError> {
316    let row: Option<ContentNodeRow> = sqlx::query_as(
317        "SELECT id, account_id, source_id, relative_path, content_hash, \
318                    title, body_text, front_matter_json, tags, status, \
319                    ingested_at, updated_at \
320             FROM content_nodes WHERE id = ?",
321    )
322    .bind(id)
323    .fetch_optional(pool)
324    .await
325    .map_err(|e| StorageError::Query { source: e })?;
326
327    Ok(row.map(|r| ContentNode {
328        id: r.0,
329        account_id: r.1,
330        source_id: r.2,
331        relative_path: r.3,
332        content_hash: r.4,
333        title: r.5,
334        body_text: r.6,
335        front_matter_json: r.7,
336        tags: r.8,
337        status: r.9,
338        ingested_at: r.10,
339        updated_at: r.11,
340    }))
341}
342
343/// Get all content nodes for a source, optionally filtered by status.
344pub async fn get_nodes_for_source(
345    pool: &DbPool,
346    source_id: i64,
347    status_filter: Option<&str>,
348) -> Result<Vec<ContentNode>, StorageError> {
349    let rows: Vec<ContentNodeRow> = match status_filter {
350        Some(status) => {
351            sqlx::query_as(
352                "SELECT id, account_id, source_id, relative_path, content_hash, \
353                            title, body_text, front_matter_json, tags, status, \
354                            ingested_at, updated_at \
355                     FROM content_nodes WHERE source_id = ? AND status = ? ORDER BY id",
356            )
357            .bind(source_id)
358            .bind(status)
359            .fetch_all(pool)
360            .await
361        }
362        None => {
363            sqlx::query_as(
364                "SELECT id, account_id, source_id, relative_path, content_hash, \
365                            title, body_text, front_matter_json, tags, status, \
366                            ingested_at, updated_at \
367                     FROM content_nodes WHERE source_id = ? ORDER BY id",
368            )
369            .bind(source_id)
370            .fetch_all(pool)
371            .await
372        }
373    }
374    .map_err(|e| StorageError::Query { source: e })?;
375
376    Ok(rows
377        .into_iter()
378        .map(|r| ContentNode {
379            id: r.0,
380            account_id: r.1,
381            source_id: r.2,
382            relative_path: r.3,
383            content_hash: r.4,
384            title: r.5,
385            body_text: r.6,
386            front_matter_json: r.7,
387            tags: r.8,
388            status: r.9,
389            ingested_at: r.10,
390            updated_at: r.11,
391        })
392        .collect())
393}
394
395// ============================================================================
396// Draft seeds
397// ============================================================================
398
399/// Insert a new draft seed and return its ID.
400pub async fn insert_draft_seed(
401    pool: &DbPool,
402    node_id: i64,
403    seed_text: &str,
404    archetype_suggestion: Option<&str>,
405) -> Result<i64, StorageError> {
406    let row: (i64,) = sqlx::query_as(
407        "INSERT INTO draft_seeds (account_id, node_id, seed_text, archetype_suggestion) \
408         VALUES (?, ?, ?, ?) \
409         RETURNING id",
410    )
411    .bind(DEFAULT_ACCOUNT_ID)
412    .bind(node_id)
413    .bind(seed_text)
414    .bind(archetype_suggestion)
415    .fetch_one(pool)
416    .await
417    .map_err(|e| StorageError::Query { source: e })?;
418
419    Ok(row.0)
420}
421
422/// Get pending draft seeds ordered by engagement weight descending.
423pub async fn get_pending_seeds(pool: &DbPool, limit: u32) -> Result<Vec<DraftSeed>, StorageError> {
424    let rows: Vec<DraftSeedRow> = sqlx::query_as(
425        "SELECT id, account_id, node_id, seed_text, archetype_suggestion, \
426                    engagement_weight, status, created_at, used_at \
427             FROM draft_seeds \
428             WHERE status = 'pending' \
429             ORDER BY engagement_weight DESC \
430             LIMIT ?",
431    )
432    .bind(limit)
433    .fetch_all(pool)
434    .await
435    .map_err(|e| StorageError::Query { source: e })?;
436
437    Ok(rows
438        .into_iter()
439        .map(|r| DraftSeed {
440            id: r.0,
441            account_id: r.1,
442            node_id: r.2,
443            seed_text: r.3,
444            archetype_suggestion: r.4,
445            engagement_weight: r.5,
446            status: r.6,
447            created_at: r.7,
448            used_at: r.8,
449        })
450        .collect())
451}
452
453/// Mark a draft seed as used.
454pub async fn mark_seed_used(pool: &DbPool, id: i64) -> Result<(), StorageError> {
455    sqlx::query("UPDATE draft_seeds SET status = 'used', used_at = datetime('now') WHERE id = ?")
456        .bind(id)
457        .execute(pool)
458        .await
459        .map_err(|e| StorageError::Query { source: e })?;
460
461    Ok(())
462}
463
464/// Find a source context by source type and path substring in config_json.
465pub async fn find_source_by_path(
466    pool: &DbPool,
467    path: &str,
468) -> Result<Option<SourceContext>, StorageError> {
469    let row: Option<SourceContextRow> = sqlx::query_as(
470        "SELECT id, account_id, source_type, config_json, sync_cursor, \
471                    status, error_message, created_at, updated_at \
472             FROM source_contexts \
473             WHERE account_id = ? AND source_type = 'local_fs' AND status = 'active' \
474               AND config_json LIKE '%' || ? || '%' \
475             LIMIT 1",
476    )
477    .bind(DEFAULT_ACCOUNT_ID)
478    .bind(path)
479    .fetch_optional(pool)
480    .await
481    .map_err(|e| StorageError::Query { source: e })?;
482
483    Ok(row.map(|r| SourceContext {
484        id: r.0,
485        account_id: r.1,
486        source_type: r.2,
487        config_json: r.3,
488        sync_cursor: r.4,
489        status: r.5,
490        error_message: r.6,
491        created_at: r.7,
492        updated_at: r.8,
493    }))
494}
495
496/// Ensure a "local_fs" source context exists for the given path, returning its ID.
497///
498/// Creates the source if it does not exist. Used by the Watchtower to register
499/// configured filesystem sources.
500pub async fn ensure_local_fs_source(
501    pool: &DbPool,
502    path: &str,
503    config_json: &str,
504) -> Result<i64, StorageError> {
505    if let Some(ctx) = find_source_by_path(pool, path).await? {
506        return Ok(ctx.id);
507    }
508    insert_source_context(pool, "local_fs", config_json).await
509}
510
511// ============================================================================
512// Winning DNA: pending nodes and seed management
513// ============================================================================
514
515/// Get content nodes with status='pending' that need seed generation.
516pub async fn get_pending_content_nodes(
517    pool: &DbPool,
518    limit: u32,
519) -> Result<Vec<ContentNode>, StorageError> {
520    let rows: Vec<ContentNodeRow> = sqlx::query_as(
521        "SELECT id, account_id, source_id, relative_path, content_hash, \
522                    title, body_text, front_matter_json, tags, status, \
523                    ingested_at, updated_at \
524             FROM content_nodes \
525             WHERE status = 'pending' \
526             ORDER BY ingested_at ASC \
527             LIMIT ?",
528    )
529    .bind(limit)
530    .fetch_all(pool)
531    .await
532    .map_err(|e| StorageError::Query { source: e })?;
533
534    Ok(rows
535        .into_iter()
536        .map(|r| ContentNode {
537            id: r.0,
538            account_id: r.1,
539            source_id: r.2,
540            relative_path: r.3,
541            content_hash: r.4,
542            title: r.5,
543            body_text: r.6,
544            front_matter_json: r.7,
545            tags: r.8,
546            status: r.9,
547            ingested_at: r.10,
548            updated_at: r.11,
549        })
550        .collect())
551}
552
553/// Mark a content node as 'processed' after seed generation.
554pub async fn mark_node_processed(pool: &DbPool, node_id: i64) -> Result<(), StorageError> {
555    sqlx::query(
556        "UPDATE content_nodes SET status = 'processed', updated_at = datetime('now') WHERE id = ?",
557    )
558    .bind(node_id)
559    .execute(pool)
560    .await
561    .map_err(|e| StorageError::Query { source: e })?;
562    Ok(())
563}
564
565/// Insert a draft seed with an explicit engagement weight.
566pub async fn insert_draft_seed_with_weight(
567    pool: &DbPool,
568    node_id: i64,
569    seed_text: &str,
570    archetype_suggestion: Option<&str>,
571    weight: f64,
572) -> Result<i64, StorageError> {
573    let row: (i64,) = sqlx::query_as(
574        "INSERT INTO draft_seeds (account_id, node_id, seed_text, archetype_suggestion, engagement_weight) \
575         VALUES (?, ?, ?, ?, ?) \
576         RETURNING id",
577    )
578    .bind(DEFAULT_ACCOUNT_ID)
579    .bind(node_id)
580    .bind(seed_text)
581    .bind(archetype_suggestion)
582    .bind(weight)
583    .fetch_one(pool)
584    .await
585    .map_err(|e| StorageError::Query { source: e })?;
586
587    Ok(row.0)
588}
589
590/// Row type for seeds with their parent node context.
591#[derive(Debug, Clone, serde::Serialize)]
592pub struct SeedWithContext {
593    /// The seed hook text.
594    pub seed_text: String,
595    /// Title from the parent content node.
596    pub source_title: Option<String>,
597    /// Suggested archetype for the seed.
598    pub archetype_suggestion: Option<String>,
599    /// Engagement weight for retrieval ranking.
600    pub engagement_weight: f64,
601}
602
603/// Retrieve draft seeds suitable for cold-start context injection.
604///
605/// Returns pending seeds joined with their parent content node's title,
606/// ordered by engagement_weight DESC.
607pub async fn get_seeds_for_context(
608    pool: &DbPool,
609    limit: u32,
610) -> Result<Vec<SeedWithContext>, StorageError> {
611    let rows: Vec<(String, Option<String>, Option<String>, f64)> = sqlx::query_as(
612        "SELECT ds.seed_text, cn.title, ds.archetype_suggestion, ds.engagement_weight \
613         FROM draft_seeds ds \
614         JOIN content_nodes cn ON cn.id = ds.node_id \
615         WHERE ds.status = 'pending' \
616         ORDER BY ds.engagement_weight DESC \
617         LIMIT ?",
618    )
619    .bind(limit)
620    .fetch_all(pool)
621    .await
622    .map_err(|e| StorageError::Query { source: e })?;
623
624    Ok(rows
625        .into_iter()
626        .map(|r| SeedWithContext {
627            seed_text: r.0,
628            source_title: r.1,
629            archetype_suggestion: r.2,
630            engagement_weight: r.3,
631        })
632        .collect())
633}
634
635/// Ensure a "google_drive" source context exists for the given folder ID, returning its ID.
636///
637/// Creates the source if it does not exist. Used by the Watchtower to register
638/// configured Google Drive sources.
639pub async fn ensure_google_drive_source(
640    pool: &DbPool,
641    folder_id: &str,
642    config_json: &str,
643) -> Result<i64, StorageError> {
644    if let Some(ctx) = find_source_by_folder_id(pool, folder_id).await? {
645        return Ok(ctx.id);
646    }
647    insert_source_context(pool, "google_drive", config_json).await
648}
649
650/// Find a source context by Google Drive folder ID in config_json.
651pub async fn find_source_by_folder_id(
652    pool: &DbPool,
653    folder_id: &str,
654) -> Result<Option<SourceContext>, StorageError> {
655    let row: Option<SourceContextRow> = sqlx::query_as(
656        "SELECT id, account_id, source_type, config_json, sync_cursor, \
657                    status, error_message, created_at, updated_at \
658             FROM source_contexts \
659             WHERE account_id = ? AND source_type = 'google_drive' AND status = 'active' \
660               AND config_json LIKE '%' || ? || '%' \
661             LIMIT 1",
662    )
663    .bind(DEFAULT_ACCOUNT_ID)
664    .bind(folder_id)
665    .fetch_optional(pool)
666    .await
667    .map_err(|e| StorageError::Query { source: e })?;
668
669    Ok(row.map(|r| SourceContext {
670        id: r.0,
671        account_id: r.1,
672        source_type: r.2,
673        config_json: r.3,
674        sync_cursor: r.4,
675        status: r.5,
676        error_message: r.6,
677        created_at: r.7,
678        updated_at: r.8,
679    }))
680}
681
682/// Ensure a "manual" source context exists for inline ingestion, returning its ID.
683///
684/// Creates the source if it does not exist. This is used by the ingest API
685/// when content is submitted directly (e.g. from Shortcuts or Telegram).
686pub async fn ensure_manual_source(pool: &DbPool) -> Result<i64, StorageError> {
687    let existing: Option<(i64,)> = sqlx::query_as(
688        "SELECT id FROM source_contexts \
689         WHERE account_id = ? AND source_type = 'manual' AND status = 'active' \
690         LIMIT 1",
691    )
692    .bind(DEFAULT_ACCOUNT_ID)
693    .fetch_optional(pool)
694    .await
695    .map_err(|e| StorageError::Query { source: e })?;
696
697    match existing {
698        Some((id,)) => Ok(id),
699        None => insert_source_context(pool, "manual", "{}").await,
700    }
701}