Skip to main content

tuitbot_server/routes/
sources.rs

1//! Source status and reindex endpoints.
2//!
3//! Exposes runtime status of content sources and a reindex trigger
4//! for the Watchtower pipeline.
5
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use axum::extract::{Path, State};
10use axum::Json;
11use serde::Serialize;
12use tuitbot_core::automation::WatchtowerLoop;
13use tuitbot_core::storage::watchtower as store;
14
15use crate::error::ApiError;
16use crate::state::AppState;
17
18// ---------------------------------------------------------------------------
19// Response types
20// ---------------------------------------------------------------------------
21
22#[derive(Serialize)]
23pub struct SourceStatusResponse {
24    pub sources: Vec<SourceStatusItem>,
25    pub deployment_mode: String,
26}
27
28#[derive(Serialize)]
29pub struct SourceStatusItem {
30    pub id: i64,
31    pub source_type: String,
32    pub status: String,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub error_message: Option<String>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub sync_cursor: Option<String>,
37    pub created_at: String,
38    pub updated_at: String,
39    pub config_json: String,
40}
41
42#[derive(Serialize)]
43pub struct ReindexResponse {
44    pub status: String,
45    pub source_id: i64,
46}
47
48// ---------------------------------------------------------------------------
49// Handlers
50// ---------------------------------------------------------------------------
51
52/// `GET /api/sources/status` — return runtime status of all content sources.
53pub async fn source_status(
54    State(state): State<Arc<AppState>>,
55) -> Result<Json<SourceStatusResponse>, ApiError> {
56    let contexts = store::get_all_source_contexts(&state.db)
57        .await
58        .map_err(ApiError::Storage)?;
59
60    let sources = contexts
61        .into_iter()
62        .map(|ctx| SourceStatusItem {
63            id: ctx.id,
64            source_type: ctx.source_type,
65            status: ctx.status,
66            error_message: ctx.error_message,
67            sync_cursor: ctx.sync_cursor,
68            created_at: ctx.created_at,
69            updated_at: ctx.updated_at,
70            config_json: ctx.config_json,
71        })
72        .collect();
73
74    Ok(Json(SourceStatusResponse {
75        sources,
76        deployment_mode: state.deployment_mode.to_string(),
77    }))
78}
79
80/// `POST /api/sources/{id}/reindex` — trigger a full rescan of one source.
81///
82/// Validates the source exists and is a local_fs source (remote reindex is
83/// handled by the normal poll cycle). The reindex runs in a spawned task
84/// and returns immediately.
85pub async fn reindex_source(
86    State(state): State<Arc<AppState>>,
87    Path(source_id): Path<i64>,
88) -> Result<Json<ReindexResponse>, ApiError> {
89    // Verify the source exists.
90    let ctx = store::get_source_context(&state.db, source_id)
91        .await
92        .map_err(ApiError::Storage)?
93        .ok_or_else(|| ApiError::NotFound(format!("source {source_id} not found")))?;
94
95    if ctx.source_type != "local_fs" {
96        return Err(ApiError::BadRequest(
97            "reindex is only supported for local_fs sources".to_string(),
98        ));
99    }
100
101    // Extract path and patterns from config_json.
102    let config: serde_json::Value = serde_json::from_str(&ctx.config_json)
103        .map_err(|e| ApiError::Internal(format!("invalid source config_json: {e}")))?;
104
105    let path_str = config
106        .get("path")
107        .and_then(|v| v.as_str())
108        .ok_or_else(|| ApiError::Internal("source config_json missing path".to_string()))?;
109
110    let base_path = PathBuf::from(tuitbot_core::storage::expand_tilde(path_str));
111
112    let patterns: Vec<String> = config
113        .get("file_patterns")
114        .and_then(|v| v.as_array())
115        .map(|arr| {
116            arr.iter()
117                .filter_map(|v| v.as_str().map(String::from))
118                .collect()
119        })
120        .unwrap_or_else(|| vec!["*.md".to_string(), "*.txt".to_string()]);
121
122    // Spawn the reindex in a background task.
123    let pool = state.db.clone();
124    tokio::spawn(async move {
125        match WatchtowerLoop::reindex_local_source(&pool, source_id, &base_path, &patterns).await {
126            Ok(summary) => {
127                tracing::info!(
128                    source_id,
129                    ingested = summary.ingested,
130                    skipped = summary.skipped,
131                    errors = summary.errors.len(),
132                    "Reindex complete"
133                );
134            }
135            Err(e) => {
136                tracing::error!(source_id, error = %e, "Reindex failed");
137            }
138        }
139    });
140
141    Ok(Json(ReindexResponse {
142        status: "reindex_started".to_string(),
143        source_id,
144    }))
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn source_status_response_serializes() {
153        let resp = SourceStatusResponse {
154            sources: vec![SourceStatusItem {
155                id: 1,
156                source_type: "local_fs".into(),
157                status: "active".into(),
158                error_message: None,
159                sync_cursor: None,
160                created_at: "2026-03-15T10:00:00Z".into(),
161                updated_at: "2026-03-15T10:00:00Z".into(),
162                config_json: "{}".into(),
163            }],
164            deployment_mode: "desktop".into(),
165        };
166        let json = serde_json::to_string(&resp).expect("serialize");
167        assert!(json.contains("local_fs"));
168        assert!(json.contains("desktop"));
169        assert!(!json.contains("error_message"));
170        assert!(!json.contains("sync_cursor"));
171    }
172
173    #[test]
174    fn source_status_item_with_error() {
175        let item = SourceStatusItem {
176            id: 2,
177            source_type: "google_drive".into(),
178            status: "error".into(),
179            error_message: Some("auth failed".into()),
180            sync_cursor: Some("cursor_123".into()),
181            created_at: "2026-03-15T10:00:00Z".into(),
182            updated_at: "2026-03-15T10:00:00Z".into(),
183            config_json: r#"{"path":"/vault"}"#.into(),
184        };
185        let json = serde_json::to_string(&item).expect("serialize");
186        assert!(json.contains("error_message"));
187        assert!(json.contains("auth failed"));
188        assert!(json.contains("sync_cursor"));
189    }
190
191    #[test]
192    fn reindex_response_serializes() {
193        let resp = ReindexResponse {
194            status: "reindex_started".into(),
195            source_id: 42,
196        };
197        let json = serde_json::to_string(&resp).expect("serialize");
198        assert!(json.contains("reindex_started"));
199        assert!(json.contains("42"));
200    }
201}