tuitbot_server/routes/
sources.rs1use std::path::PathBuf;
7use std::sync::Arc;
8
9use axum::extract::{Path, State};
10use axum::Json;
11use serde::Serialize;
12use tuitbot_core::automation::WatchtowerLoop;
13use tuitbot_core::storage::watchtower as store;
14
15use crate::error::ApiError;
16use crate::state::AppState;
17
18#[derive(Serialize)]
23pub struct SourceStatusResponse {
24 pub sources: Vec<SourceStatusItem>,
25}
26
27#[derive(Serialize)]
28pub struct SourceStatusItem {
29 pub id: i64,
30 pub source_type: String,
31 pub status: String,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub error_message: Option<String>,
34 #[serde(skip_serializing_if = "Option::is_none")]
35 pub sync_cursor: Option<String>,
36 pub created_at: String,
37 pub updated_at: String,
38 pub config_json: String,
39}
40
41#[derive(Serialize)]
42pub struct ReindexResponse {
43 pub status: String,
44 pub source_id: i64,
45}
46
47pub async fn source_status(
53 State(state): State<Arc<AppState>>,
54) -> Result<Json<SourceStatusResponse>, ApiError> {
55 let contexts = store::get_all_source_contexts(&state.db)
56 .await
57 .map_err(ApiError::Storage)?;
58
59 let sources = contexts
60 .into_iter()
61 .map(|ctx| SourceStatusItem {
62 id: ctx.id,
63 source_type: ctx.source_type,
64 status: ctx.status,
65 error_message: ctx.error_message,
66 sync_cursor: ctx.sync_cursor,
67 created_at: ctx.created_at,
68 updated_at: ctx.updated_at,
69 config_json: ctx.config_json,
70 })
71 .collect();
72
73 Ok(Json(SourceStatusResponse { sources }))
74}
75
76pub async fn reindex_source(
82 State(state): State<Arc<AppState>>,
83 Path(source_id): Path<i64>,
84) -> Result<Json<ReindexResponse>, ApiError> {
85 let ctx = store::get_source_context(&state.db, source_id)
87 .await
88 .map_err(ApiError::Storage)?
89 .ok_or_else(|| ApiError::NotFound(format!("source {source_id} not found")))?;
90
91 if ctx.source_type != "local_fs" {
92 return Err(ApiError::BadRequest(
93 "reindex is only supported for local_fs sources".to_string(),
94 ));
95 }
96
97 let config: serde_json::Value = serde_json::from_str(&ctx.config_json)
99 .map_err(|e| ApiError::Internal(format!("invalid source config_json: {e}")))?;
100
101 let path_str = config
102 .get("path")
103 .and_then(|v| v.as_str())
104 .ok_or_else(|| ApiError::Internal("source config_json missing path".to_string()))?;
105
106 let base_path = PathBuf::from(tuitbot_core::storage::expand_tilde(path_str));
107
108 let patterns: Vec<String> = config
109 .get("file_patterns")
110 .and_then(|v| v.as_array())
111 .map(|arr| {
112 arr.iter()
113 .filter_map(|v| v.as_str().map(String::from))
114 .collect()
115 })
116 .unwrap_or_else(|| vec!["*.md".to_string(), "*.txt".to_string()]);
117
118 let pool = state.db.clone();
120 tokio::spawn(async move {
121 match WatchtowerLoop::reindex_local_source(&pool, source_id, &base_path, &patterns).await {
122 Ok(summary) => {
123 tracing::info!(
124 source_id,
125 ingested = summary.ingested,
126 skipped = summary.skipped,
127 errors = summary.errors.len(),
128 "Reindex complete"
129 );
130 }
131 Err(e) => {
132 tracing::error!(source_id, error = %e, "Reindex failed");
133 }
134 }
135 });
136
137 Ok(Json(ReindexResponse {
138 status: "reindex_started".to_string(),
139 source_id,
140 }))
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146
147 #[test]
148 fn source_status_response_serializes() {
149 let resp = SourceStatusResponse {
150 sources: vec![SourceStatusItem {
151 id: 1,
152 source_type: "local_fs".into(),
153 status: "active".into(),
154 error_message: None,
155 sync_cursor: None,
156 created_at: "2026-03-15T10:00:00Z".into(),
157 updated_at: "2026-03-15T10:00:00Z".into(),
158 config_json: "{}".into(),
159 }],
160 };
161 let json = serde_json::to_string(&resp).expect("serialize");
162 assert!(json.contains("local_fs"));
163 assert!(!json.contains("error_message"));
164 assert!(!json.contains("sync_cursor"));
165 }
166
167 #[test]
168 fn source_status_item_with_error() {
169 let item = SourceStatusItem {
170 id: 2,
171 source_type: "google_drive".into(),
172 status: "error".into(),
173 error_message: Some("auth failed".into()),
174 sync_cursor: Some("cursor_123".into()),
175 created_at: "2026-03-15T10:00:00Z".into(),
176 updated_at: "2026-03-15T10:00:00Z".into(),
177 config_json: r#"{"path":"/vault"}"#.into(),
178 };
179 let json = serde_json::to_string(&item).expect("serialize");
180 assert!(json.contains("error_message"));
181 assert!(json.contains("auth failed"));
182 assert!(json.contains("sync_cursor"));
183 }
184
185 #[test]
186 fn reindex_response_serializes() {
187 let resp = ReindexResponse {
188 status: "reindex_started".into(),
189 source_id: 42,
190 };
191 let json = serde_json::to_string(&resp).expect("serialize");
192 assert!(json.contains("reindex_started"));
193 assert!(json.contains("42"));
194 }
195}