Skip to main content

tuitbot_core/storage/watchtower/
mod.rs

1//! CRUD operations for Watchtower ingestion tables.
2//!
3//! Manages source contexts, content nodes, and draft seeds for the
4//! Cold-Start Watchtower RAG pipeline.
5
6#[cfg(test)]
7mod tests;
8
9use super::accounts::DEFAULT_ACCOUNT_ID;
10use super::DbPool;
11use crate::error::StorageError;
12
13/// Row type for source_contexts queries.
14type SourceContextRow = (
15    i64,
16    String,
17    String,
18    String,
19    Option<String>,
20    String,
21    Option<String>,
22    String,
23    String,
24);
25
26/// Row type for content_nodes queries.
27type ContentNodeRow = (
28    i64,
29    String,
30    i64,
31    String,
32    String,
33    Option<String>,
34    String,
35    Option<String>,
36    Option<String>,
37    String,
38    String,
39    String,
40);
41
42/// Row type for draft_seeds queries.
43type DraftSeedRow = (
44    i64,
45    String,
46    i64,
47    String,
48    Option<String>,
49    f64,
50    String,
51    String,
52    Option<String>,
53);
54
55// ============================================================================
56// Row structs
57// ============================================================================
58
59/// A registered content source.
60#[derive(Debug, Clone, serde::Serialize)]
61pub struct SourceContext {
62    pub id: i64,
63    pub account_id: String,
64    pub source_type: String,
65    pub config_json: String,
66    pub sync_cursor: Option<String>,
67    pub status: String,
68    pub error_message: Option<String>,
69    pub created_at: String,
70    pub updated_at: String,
71}
72
73/// An ingested content node from a source.
74#[derive(Debug, Clone, serde::Serialize)]
75pub struct ContentNode {
76    pub id: i64,
77    pub account_id: String,
78    pub source_id: i64,
79    pub relative_path: String,
80    pub content_hash: String,
81    pub title: Option<String>,
82    pub body_text: String,
83    pub front_matter_json: Option<String>,
84    pub tags: Option<String>,
85    pub status: String,
86    pub ingested_at: String,
87    pub updated_at: String,
88}
89
90/// A pre-computed draft seed derived from a content node.
91#[derive(Debug, Clone, serde::Serialize)]
92pub struct DraftSeed {
93    pub id: i64,
94    pub account_id: String,
95    pub node_id: i64,
96    pub seed_text: String,
97    pub archetype_suggestion: Option<String>,
98    pub engagement_weight: f64,
99    pub status: String,
100    pub created_at: String,
101    pub used_at: Option<String>,
102}
103
104/// Result of an upsert operation on a content node.
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub enum UpsertResult {
107    /// A new node was inserted.
108    Inserted,
109    /// An existing node was updated (content hash changed).
110    Updated,
111    /// The node was skipped (content hash unchanged).
112    Skipped,
113}
114
115// ============================================================================
116// Source contexts
117// ============================================================================
118
119/// Insert a new source context and return its ID.
120pub async fn insert_source_context(
121    pool: &DbPool,
122    source_type: &str,
123    config_json: &str,
124) -> Result<i64, StorageError> {
125    let row: (i64,) = sqlx::query_as(
126        "INSERT INTO source_contexts (account_id, source_type, config_json) \
127         VALUES (?, ?, ?) \
128         RETURNING id",
129    )
130    .bind(DEFAULT_ACCOUNT_ID)
131    .bind(source_type)
132    .bind(config_json)
133    .fetch_one(pool)
134    .await
135    .map_err(|e| StorageError::Query { source: e })?;
136
137    Ok(row.0)
138}
139
140/// Get a source context by ID.
141pub async fn get_source_context(
142    pool: &DbPool,
143    id: i64,
144) -> Result<Option<SourceContext>, StorageError> {
145    let row: Option<SourceContextRow> = sqlx::query_as(
146        "SELECT id, account_id, source_type, config_json, sync_cursor, \
147                    status, error_message, created_at, updated_at \
148             FROM source_contexts WHERE id = ?",
149    )
150    .bind(id)
151    .fetch_optional(pool)
152    .await
153    .map_err(|e| StorageError::Query { source: e })?;
154
155    Ok(row.map(|r| SourceContext {
156        id: r.0,
157        account_id: r.1,
158        source_type: r.2,
159        config_json: r.3,
160        sync_cursor: r.4,
161        status: r.5,
162        error_message: r.6,
163        created_at: r.7,
164        updated_at: r.8,
165    }))
166}
167
168/// Get all active source contexts.
169pub async fn get_source_contexts(pool: &DbPool) -> Result<Vec<SourceContext>, StorageError> {
170    let rows: Vec<SourceContextRow> = sqlx::query_as(
171        "SELECT id, account_id, source_type, config_json, sync_cursor, \
172                    status, error_message, created_at, updated_at \
173             FROM source_contexts WHERE status = 'active' ORDER BY id",
174    )
175    .fetch_all(pool)
176    .await
177    .map_err(|e| StorageError::Query { source: e })?;
178
179    Ok(rows
180        .into_iter()
181        .map(|r| SourceContext {
182            id: r.0,
183            account_id: r.1,
184            source_type: r.2,
185            config_json: r.3,
186            sync_cursor: r.4,
187            status: r.5,
188            error_message: r.6,
189            created_at: r.7,
190            updated_at: r.8,
191        })
192        .collect())
193}
194
195/// Update the sync cursor for a source context.
196pub async fn update_sync_cursor(pool: &DbPool, id: i64, cursor: &str) -> Result<(), StorageError> {
197    sqlx::query(
198        "UPDATE source_contexts SET sync_cursor = ?, updated_at = datetime('now') WHERE id = ?",
199    )
200    .bind(cursor)
201    .bind(id)
202    .execute(pool)
203    .await
204    .map_err(|e| StorageError::Query { source: e })?;
205
206    Ok(())
207}
208
209/// Update the status (and optional error message) of a source context.
210pub async fn update_source_status(
211    pool: &DbPool,
212    id: i64,
213    status: &str,
214    error_message: Option<&str>,
215) -> Result<(), StorageError> {
216    sqlx::query(
217        "UPDATE source_contexts \
218         SET status = ?, error_message = ?, updated_at = datetime('now') \
219         WHERE id = ?",
220    )
221    .bind(status)
222    .bind(error_message)
223    .bind(id)
224    .execute(pool)
225    .await
226    .map_err(|e| StorageError::Query { source: e })?;
227
228    Ok(())
229}
230
231// ============================================================================
232// Content nodes
233// ============================================================================
234
235/// Upsert a content node by (source_id, relative_path).
236///
237/// If the node does not exist, it is inserted. If it exists and the content
238/// hash has changed, it is updated. If the hash is unchanged, the operation
239/// is skipped.
240#[allow(clippy::too_many_arguments)]
241pub async fn upsert_content_node(
242    pool: &DbPool,
243    source_id: i64,
244    relative_path: &str,
245    content_hash: &str,
246    title: Option<&str>,
247    body_text: &str,
248    front_matter_json: Option<&str>,
249    tags: Option<&str>,
250) -> Result<UpsertResult, StorageError> {
251    // Check if node exists and get its current hash.
252    let existing: Option<(i64, String)> = sqlx::query_as(
253        "SELECT id, content_hash FROM content_nodes \
254         WHERE source_id = ? AND relative_path = ?",
255    )
256    .bind(source_id)
257    .bind(relative_path)
258    .fetch_optional(pool)
259    .await
260    .map_err(|e| StorageError::Query { source: e })?;
261
262    match existing {
263        Some((_id, ref existing_hash)) if existing_hash == content_hash => {
264            Ok(UpsertResult::Skipped)
265        }
266        Some((id, _)) => {
267            sqlx::query(
268                "UPDATE content_nodes \
269                 SET content_hash = ?, title = ?, body_text = ?, \
270                     front_matter_json = ?, tags = ?, \
271                     status = 'pending', updated_at = datetime('now') \
272                 WHERE id = ?",
273            )
274            .bind(content_hash)
275            .bind(title)
276            .bind(body_text)
277            .bind(front_matter_json)
278            .bind(tags)
279            .bind(id)
280            .execute(pool)
281            .await
282            .map_err(|e| StorageError::Query { source: e })?;
283
284            Ok(UpsertResult::Updated)
285        }
286        None => {
287            sqlx::query(
288                "INSERT INTO content_nodes \
289                 (account_id, source_id, relative_path, content_hash, \
290                  title, body_text, front_matter_json, tags) \
291                 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
292            )
293            .bind(DEFAULT_ACCOUNT_ID)
294            .bind(source_id)
295            .bind(relative_path)
296            .bind(content_hash)
297            .bind(title)
298            .bind(body_text)
299            .bind(front_matter_json)
300            .bind(tags)
301            .execute(pool)
302            .await
303            .map_err(|e| StorageError::Query { source: e })?;
304
305            Ok(UpsertResult::Inserted)
306        }
307    }
308}
309
310/// Get a content node by ID.
311pub async fn get_content_node(pool: &DbPool, id: i64) -> Result<Option<ContentNode>, StorageError> {
312    let row: Option<ContentNodeRow> = sqlx::query_as(
313        "SELECT id, account_id, source_id, relative_path, content_hash, \
314                    title, body_text, front_matter_json, tags, status, \
315                    ingested_at, updated_at \
316             FROM content_nodes WHERE id = ?",
317    )
318    .bind(id)
319    .fetch_optional(pool)
320    .await
321    .map_err(|e| StorageError::Query { source: e })?;
322
323    Ok(row.map(|r| ContentNode {
324        id: r.0,
325        account_id: r.1,
326        source_id: r.2,
327        relative_path: r.3,
328        content_hash: r.4,
329        title: r.5,
330        body_text: r.6,
331        front_matter_json: r.7,
332        tags: r.8,
333        status: r.9,
334        ingested_at: r.10,
335        updated_at: r.11,
336    }))
337}
338
339/// Get all content nodes for a source, optionally filtered by status.
340pub async fn get_nodes_for_source(
341    pool: &DbPool,
342    source_id: i64,
343    status_filter: Option<&str>,
344) -> Result<Vec<ContentNode>, StorageError> {
345    let rows: Vec<ContentNodeRow> = match status_filter {
346        Some(status) => {
347            sqlx::query_as(
348                "SELECT id, account_id, source_id, relative_path, content_hash, \
349                            title, body_text, front_matter_json, tags, status, \
350                            ingested_at, updated_at \
351                     FROM content_nodes WHERE source_id = ? AND status = ? ORDER BY id",
352            )
353            .bind(source_id)
354            .bind(status)
355            .fetch_all(pool)
356            .await
357        }
358        None => {
359            sqlx::query_as(
360                "SELECT id, account_id, source_id, relative_path, content_hash, \
361                            title, body_text, front_matter_json, tags, status, \
362                            ingested_at, updated_at \
363                     FROM content_nodes WHERE source_id = ? ORDER BY id",
364            )
365            .bind(source_id)
366            .fetch_all(pool)
367            .await
368        }
369    }
370    .map_err(|e| StorageError::Query { source: e })?;
371
372    Ok(rows
373        .into_iter()
374        .map(|r| ContentNode {
375            id: r.0,
376            account_id: r.1,
377            source_id: r.2,
378            relative_path: r.3,
379            content_hash: r.4,
380            title: r.5,
381            body_text: r.6,
382            front_matter_json: r.7,
383            tags: r.8,
384            status: r.9,
385            ingested_at: r.10,
386            updated_at: r.11,
387        })
388        .collect())
389}
390
391// ============================================================================
392// Draft seeds
393// ============================================================================
394
395/// Insert a new draft seed and return its ID.
396pub async fn insert_draft_seed(
397    pool: &DbPool,
398    node_id: i64,
399    seed_text: &str,
400    archetype_suggestion: Option<&str>,
401) -> Result<i64, StorageError> {
402    let row: (i64,) = sqlx::query_as(
403        "INSERT INTO draft_seeds (account_id, node_id, seed_text, archetype_suggestion) \
404         VALUES (?, ?, ?, ?) \
405         RETURNING id",
406    )
407    .bind(DEFAULT_ACCOUNT_ID)
408    .bind(node_id)
409    .bind(seed_text)
410    .bind(archetype_suggestion)
411    .fetch_one(pool)
412    .await
413    .map_err(|e| StorageError::Query { source: e })?;
414
415    Ok(row.0)
416}
417
418/// Get pending draft seeds ordered by engagement weight descending.
419pub async fn get_pending_seeds(pool: &DbPool, limit: u32) -> Result<Vec<DraftSeed>, StorageError> {
420    let rows: Vec<DraftSeedRow> = sqlx::query_as(
421        "SELECT id, account_id, node_id, seed_text, archetype_suggestion, \
422                    engagement_weight, status, created_at, used_at \
423             FROM draft_seeds \
424             WHERE status = 'pending' \
425             ORDER BY engagement_weight DESC \
426             LIMIT ?",
427    )
428    .bind(limit)
429    .fetch_all(pool)
430    .await
431    .map_err(|e| StorageError::Query { source: e })?;
432
433    Ok(rows
434        .into_iter()
435        .map(|r| DraftSeed {
436            id: r.0,
437            account_id: r.1,
438            node_id: r.2,
439            seed_text: r.3,
440            archetype_suggestion: r.4,
441            engagement_weight: r.5,
442            status: r.6,
443            created_at: r.7,
444            used_at: r.8,
445        })
446        .collect())
447}
448
449/// Mark a draft seed as used.
450pub async fn mark_seed_used(pool: &DbPool, id: i64) -> Result<(), StorageError> {
451    sqlx::query("UPDATE draft_seeds SET status = 'used', used_at = datetime('now') WHERE id = ?")
452        .bind(id)
453        .execute(pool)
454        .await
455        .map_err(|e| StorageError::Query { source: e })?;
456
457    Ok(())
458}
459
460/// Find a source context by source type and path substring in config_json.
461pub async fn find_source_by_path(
462    pool: &DbPool,
463    path: &str,
464) -> Result<Option<SourceContext>, StorageError> {
465    let row: Option<SourceContextRow> = sqlx::query_as(
466        "SELECT id, account_id, source_type, config_json, sync_cursor, \
467                    status, error_message, created_at, updated_at \
468             FROM source_contexts \
469             WHERE account_id = ? AND source_type = 'local_fs' AND status = 'active' \
470               AND config_json LIKE '%' || ? || '%' \
471             LIMIT 1",
472    )
473    .bind(DEFAULT_ACCOUNT_ID)
474    .bind(path)
475    .fetch_optional(pool)
476    .await
477    .map_err(|e| StorageError::Query { source: e })?;
478
479    Ok(row.map(|r| SourceContext {
480        id: r.0,
481        account_id: r.1,
482        source_type: r.2,
483        config_json: r.3,
484        sync_cursor: r.4,
485        status: r.5,
486        error_message: r.6,
487        created_at: r.7,
488        updated_at: r.8,
489    }))
490}
491
492/// Ensure a "local_fs" source context exists for the given path, returning its ID.
493///
494/// Creates the source if it does not exist. Used by the Watchtower to register
495/// configured filesystem sources.
496pub async fn ensure_local_fs_source(
497    pool: &DbPool,
498    path: &str,
499    config_json: &str,
500) -> Result<i64, StorageError> {
501    if let Some(ctx) = find_source_by_path(pool, path).await? {
502        return Ok(ctx.id);
503    }
504    insert_source_context(pool, "local_fs", config_json).await
505}
506
507// ============================================================================
508// Winning DNA: pending nodes and seed management
509// ============================================================================
510
511/// Get content nodes with status='pending' that need seed generation.
512pub async fn get_pending_content_nodes(
513    pool: &DbPool,
514    limit: u32,
515) -> Result<Vec<ContentNode>, StorageError> {
516    let rows: Vec<ContentNodeRow> = sqlx::query_as(
517        "SELECT id, account_id, source_id, relative_path, content_hash, \
518                    title, body_text, front_matter_json, tags, status, \
519                    ingested_at, updated_at \
520             FROM content_nodes \
521             WHERE status = 'pending' \
522             ORDER BY ingested_at ASC \
523             LIMIT ?",
524    )
525    .bind(limit)
526    .fetch_all(pool)
527    .await
528    .map_err(|e| StorageError::Query { source: e })?;
529
530    Ok(rows
531        .into_iter()
532        .map(|r| ContentNode {
533            id: r.0,
534            account_id: r.1,
535            source_id: r.2,
536            relative_path: r.3,
537            content_hash: r.4,
538            title: r.5,
539            body_text: r.6,
540            front_matter_json: r.7,
541            tags: r.8,
542            status: r.9,
543            ingested_at: r.10,
544            updated_at: r.11,
545        })
546        .collect())
547}
548
549/// Mark a content node as 'processed' after seed generation.
550pub async fn mark_node_processed(pool: &DbPool, node_id: i64) -> Result<(), StorageError> {
551    sqlx::query(
552        "UPDATE content_nodes SET status = 'processed', updated_at = datetime('now') WHERE id = ?",
553    )
554    .bind(node_id)
555    .execute(pool)
556    .await
557    .map_err(|e| StorageError::Query { source: e })?;
558    Ok(())
559}
560
561/// Insert a draft seed with an explicit engagement weight.
562pub async fn insert_draft_seed_with_weight(
563    pool: &DbPool,
564    node_id: i64,
565    seed_text: &str,
566    archetype_suggestion: Option<&str>,
567    weight: f64,
568) -> Result<i64, StorageError> {
569    let row: (i64,) = sqlx::query_as(
570        "INSERT INTO draft_seeds (account_id, node_id, seed_text, archetype_suggestion, engagement_weight) \
571         VALUES (?, ?, ?, ?, ?) \
572         RETURNING id",
573    )
574    .bind(DEFAULT_ACCOUNT_ID)
575    .bind(node_id)
576    .bind(seed_text)
577    .bind(archetype_suggestion)
578    .bind(weight)
579    .fetch_one(pool)
580    .await
581    .map_err(|e| StorageError::Query { source: e })?;
582
583    Ok(row.0)
584}
585
586/// Row type for seeds with their parent node context.
587#[derive(Debug, Clone, serde::Serialize)]
588pub struct SeedWithContext {
589    /// The seed hook text.
590    pub seed_text: String,
591    /// Title from the parent content node.
592    pub source_title: Option<String>,
593    /// Suggested archetype for the seed.
594    pub archetype_suggestion: Option<String>,
595    /// Engagement weight for retrieval ranking.
596    pub engagement_weight: f64,
597}
598
599/// Retrieve draft seeds suitable for cold-start context injection.
600///
601/// Returns pending seeds joined with their parent content node's title,
602/// ordered by engagement_weight DESC.
603pub async fn get_seeds_for_context(
604    pool: &DbPool,
605    limit: u32,
606) -> Result<Vec<SeedWithContext>, StorageError> {
607    let rows: Vec<(String, Option<String>, Option<String>, f64)> = sqlx::query_as(
608        "SELECT ds.seed_text, cn.title, ds.archetype_suggestion, ds.engagement_weight \
609         FROM draft_seeds ds \
610         JOIN content_nodes cn ON cn.id = ds.node_id \
611         WHERE ds.status = 'pending' \
612         ORDER BY ds.engagement_weight DESC \
613         LIMIT ?",
614    )
615    .bind(limit)
616    .fetch_all(pool)
617    .await
618    .map_err(|e| StorageError::Query { source: e })?;
619
620    Ok(rows
621        .into_iter()
622        .map(|r| SeedWithContext {
623            seed_text: r.0,
624            source_title: r.1,
625            archetype_suggestion: r.2,
626            engagement_weight: r.3,
627        })
628        .collect())
629}
630
631/// Ensure a "google_drive" source context exists for the given folder ID, returning its ID.
632///
633/// Creates the source if it does not exist. Used by the Watchtower to register
634/// configured Google Drive sources.
635pub async fn ensure_google_drive_source(
636    pool: &DbPool,
637    folder_id: &str,
638    config_json: &str,
639) -> Result<i64, StorageError> {
640    if let Some(ctx) = find_source_by_folder_id(pool, folder_id).await? {
641        return Ok(ctx.id);
642    }
643    insert_source_context(pool, "google_drive", config_json).await
644}
645
646/// Find a source context by Google Drive folder ID in config_json.
647pub async fn find_source_by_folder_id(
648    pool: &DbPool,
649    folder_id: &str,
650) -> Result<Option<SourceContext>, StorageError> {
651    let row: Option<SourceContextRow> = sqlx::query_as(
652        "SELECT id, account_id, source_type, config_json, sync_cursor, \
653                    status, error_message, created_at, updated_at \
654             FROM source_contexts \
655             WHERE account_id = ? AND source_type = 'google_drive' AND status = 'active' \
656               AND config_json LIKE '%' || ? || '%' \
657             LIMIT 1",
658    )
659    .bind(DEFAULT_ACCOUNT_ID)
660    .bind(folder_id)
661    .fetch_optional(pool)
662    .await
663    .map_err(|e| StorageError::Query { source: e })?;
664
665    Ok(row.map(|r| SourceContext {
666        id: r.0,
667        account_id: r.1,
668        source_type: r.2,
669        config_json: r.3,
670        sync_cursor: r.4,
671        status: r.5,
672        error_message: r.6,
673        created_at: r.7,
674        updated_at: r.8,
675    }))
676}
677
678/// Ensure a "manual" source context exists for inline ingestion, returning its ID.
679///
680/// Creates the source if it does not exist. This is used by the ingest API
681/// when content is submitted directly (e.g. from Shortcuts or Telegram).
682pub async fn ensure_manual_source(pool: &DbPool) -> Result<i64, StorageError> {
683    let existing: Option<(i64,)> = sqlx::query_as(
684        "SELECT id FROM source_contexts \
685         WHERE account_id = ? AND source_type = 'manual' AND status = 'active' \
686         LIMIT 1",
687    )
688    .bind(DEFAULT_ACCOUNT_ID)
689    .fetch_optional(pool)
690    .await
691    .map_err(|e| StorageError::Query { source: e })?;
692
693    match existing {
694        Some((id,)) => Ok(id),
695        None => insert_source_context(pool, "manual", "{}").await,
696    }
697}