Skip to main content

tuitbot_server/routes/
sources.rs

1//! Source status and reindex endpoints.
2//!
3//! Exposes runtime status of content sources and a reindex trigger
4//! for the Watchtower pipeline.
5
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use axum::extract::{Path, State};
10use axum::Json;
11use serde::Serialize;
12use tuitbot_core::automation::WatchtowerLoop;
13use tuitbot_core::storage::watchtower as store;
14
15use crate::error::ApiError;
16use crate::state::AppState;
17
18// ---------------------------------------------------------------------------
19// Response types
20// ---------------------------------------------------------------------------
21
22#[derive(Serialize)]
23pub struct SourceStatusResponse {
24    pub sources: Vec<SourceStatusItem>,
25}
26
27#[derive(Serialize)]
28pub struct SourceStatusItem {
29    pub id: i64,
30    pub source_type: String,
31    pub status: String,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub error_message: Option<String>,
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub sync_cursor: Option<String>,
36    pub created_at: String,
37    pub updated_at: String,
38    pub config_json: String,
39}
40
41#[derive(Serialize)]
42pub struct ReindexResponse {
43    pub status: String,
44    pub source_id: i64,
45}
46
47// ---------------------------------------------------------------------------
48// Handlers
49// ---------------------------------------------------------------------------
50
51/// `GET /api/sources/status` — return runtime status of all content sources.
52pub async fn source_status(
53    State(state): State<Arc<AppState>>,
54) -> Result<Json<SourceStatusResponse>, ApiError> {
55    let contexts = store::get_all_source_contexts(&state.db)
56        .await
57        .map_err(ApiError::Storage)?;
58
59    let sources = contexts
60        .into_iter()
61        .map(|ctx| SourceStatusItem {
62            id: ctx.id,
63            source_type: ctx.source_type,
64            status: ctx.status,
65            error_message: ctx.error_message,
66            sync_cursor: ctx.sync_cursor,
67            created_at: ctx.created_at,
68            updated_at: ctx.updated_at,
69            config_json: ctx.config_json,
70        })
71        .collect();
72
73    Ok(Json(SourceStatusResponse { sources }))
74}
75
76/// `POST /api/sources/{id}/reindex` — trigger a full rescan of one source.
77///
78/// Validates the source exists and is a local_fs source (remote reindex is
79/// handled by the normal poll cycle). The reindex runs in a spawned task
80/// and returns immediately.
81pub async fn reindex_source(
82    State(state): State<Arc<AppState>>,
83    Path(source_id): Path<i64>,
84) -> Result<Json<ReindexResponse>, ApiError> {
85    // Verify the source exists.
86    let ctx = store::get_source_context(&state.db, source_id)
87        .await
88        .map_err(ApiError::Storage)?
89        .ok_or_else(|| ApiError::NotFound(format!("source {source_id} not found")))?;
90
91    if ctx.source_type != "local_fs" {
92        return Err(ApiError::BadRequest(
93            "reindex is only supported for local_fs sources".to_string(),
94        ));
95    }
96
97    // Extract path and patterns from config_json.
98    let config: serde_json::Value = serde_json::from_str(&ctx.config_json)
99        .map_err(|e| ApiError::Internal(format!("invalid source config_json: {e}")))?;
100
101    let path_str = config
102        .get("path")
103        .and_then(|v| v.as_str())
104        .ok_or_else(|| ApiError::Internal("source config_json missing path".to_string()))?;
105
106    let base_path = PathBuf::from(tuitbot_core::storage::expand_tilde(path_str));
107
108    let patterns: Vec<String> = config
109        .get("file_patterns")
110        .and_then(|v| v.as_array())
111        .map(|arr| {
112            arr.iter()
113                .filter_map(|v| v.as_str().map(String::from))
114                .collect()
115        })
116        .unwrap_or_else(|| vec!["*.md".to_string(), "*.txt".to_string()]);
117
118    // Spawn the reindex in a background task.
119    let pool = state.db.clone();
120    tokio::spawn(async move {
121        match WatchtowerLoop::reindex_local_source(&pool, source_id, &base_path, &patterns).await {
122            Ok(summary) => {
123                tracing::info!(
124                    source_id,
125                    ingested = summary.ingested,
126                    skipped = summary.skipped,
127                    errors = summary.errors.len(),
128                    "Reindex complete"
129                );
130            }
131            Err(e) => {
132                tracing::error!(source_id, error = %e, "Reindex failed");
133            }
134        }
135    });
136
137    Ok(Json(ReindexResponse {
138        status: "reindex_started".to_string(),
139        source_id,
140    }))
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    #[test]
148    fn source_status_response_serializes() {
149        let resp = SourceStatusResponse {
150            sources: vec![SourceStatusItem {
151                id: 1,
152                source_type: "local_fs".into(),
153                status: "active".into(),
154                error_message: None,
155                sync_cursor: None,
156                created_at: "2026-03-15T10:00:00Z".into(),
157                updated_at: "2026-03-15T10:00:00Z".into(),
158                config_json: "{}".into(),
159            }],
160        };
161        let json = serde_json::to_string(&resp).expect("serialize");
162        assert!(json.contains("local_fs"));
163        assert!(!json.contains("error_message"));
164        assert!(!json.contains("sync_cursor"));
165    }
166
167    #[test]
168    fn source_status_item_with_error() {
169        let item = SourceStatusItem {
170            id: 2,
171            source_type: "google_drive".into(),
172            status: "error".into(),
173            error_message: Some("auth failed".into()),
174            sync_cursor: Some("cursor_123".into()),
175            created_at: "2026-03-15T10:00:00Z".into(),
176            updated_at: "2026-03-15T10:00:00Z".into(),
177            config_json: r#"{"path":"/vault"}"#.into(),
178        };
179        let json = serde_json::to_string(&item).expect("serialize");
180        assert!(json.contains("error_message"));
181        assert!(json.contains("auth failed"));
182        assert!(json.contains("sync_cursor"));
183    }
184
185    #[test]
186    fn reindex_response_serializes() {
187        let resp = ReindexResponse {
188            status: "reindex_started".into(),
189            source_id: 42,
190        };
191        let json = serde_json::to_string(&resp).expect("serialize");
192        assert!(json.contains("reindex_started"));
193        assert!(json.contains("42"));
194    }
195}