tuitbot_server/routes/
sources.rs1use std::path::PathBuf;
7use std::sync::Arc;
8
9use axum::extract::{Path, State};
10use axum::Json;
11use serde::Serialize;
12use tuitbot_core::automation::WatchtowerLoop;
13use tuitbot_core::storage::watchtower as store;
14
15use crate::error::ApiError;
16use crate::state::AppState;
17
18#[derive(Serialize)]
23pub struct SourceStatusResponse {
24 pub sources: Vec<SourceStatusItem>,
25 pub deployment_mode: String,
26}
27
28#[derive(Serialize)]
29pub struct SourceStatusItem {
30 pub id: i64,
31 pub source_type: String,
32 pub status: String,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub error_message: Option<String>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub sync_cursor: Option<String>,
37 pub created_at: String,
38 pub updated_at: String,
39 pub config_json: String,
40}
41
42#[derive(Serialize)]
43pub struct ReindexResponse {
44 pub status: String,
45 pub source_id: i64,
46}
47
48pub async fn source_status(
54 State(state): State<Arc<AppState>>,
55) -> Result<Json<SourceStatusResponse>, ApiError> {
56 let contexts = store::get_all_source_contexts(&state.db)
57 .await
58 .map_err(ApiError::Storage)?;
59
60 let sources = contexts
61 .into_iter()
62 .map(|ctx| SourceStatusItem {
63 id: ctx.id,
64 source_type: ctx.source_type,
65 status: ctx.status,
66 error_message: ctx.error_message,
67 sync_cursor: ctx.sync_cursor,
68 created_at: ctx.created_at,
69 updated_at: ctx.updated_at,
70 config_json: ctx.config_json,
71 })
72 .collect();
73
74 Ok(Json(SourceStatusResponse {
75 sources,
76 deployment_mode: state.deployment_mode.to_string(),
77 }))
78}
79
80pub async fn reindex_source(
86 State(state): State<Arc<AppState>>,
87 Path(source_id): Path<i64>,
88) -> Result<Json<ReindexResponse>, ApiError> {
89 let ctx = store::get_source_context(&state.db, source_id)
91 .await
92 .map_err(ApiError::Storage)?
93 .ok_or_else(|| ApiError::NotFound(format!("source {source_id} not found")))?;
94
95 if ctx.source_type != "local_fs" {
96 return Err(ApiError::BadRequest(
97 "reindex is only supported for local_fs sources".to_string(),
98 ));
99 }
100
101 let config: serde_json::Value = serde_json::from_str(&ctx.config_json)
103 .map_err(|e| ApiError::Internal(format!("invalid source config_json: {e}")))?;
104
105 let path_str = config
106 .get("path")
107 .and_then(|v| v.as_str())
108 .ok_or_else(|| ApiError::Internal("source config_json missing path".to_string()))?;
109
110 let base_path = PathBuf::from(tuitbot_core::storage::expand_tilde(path_str));
111
112 let patterns: Vec<String> = config
113 .get("file_patterns")
114 .and_then(|v| v.as_array())
115 .map(|arr| {
116 arr.iter()
117 .filter_map(|v| v.as_str().map(String::from))
118 .collect()
119 })
120 .unwrap_or_else(|| vec!["*.md".to_string(), "*.txt".to_string()]);
121
122 let pool = state.db.clone();
124 tokio::spawn(async move {
125 match WatchtowerLoop::reindex_local_source(&pool, source_id, &base_path, &patterns).await {
126 Ok(summary) => {
127 tracing::info!(
128 source_id,
129 ingested = summary.ingested,
130 skipped = summary.skipped,
131 errors = summary.errors.len(),
132 "Reindex complete"
133 );
134 }
135 Err(e) => {
136 tracing::error!(source_id, error = %e, "Reindex failed");
137 }
138 }
139 });
140
141 Ok(Json(ReindexResponse {
142 status: "reindex_started".to_string(),
143 source_id,
144 }))
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 #[test]
152 fn source_status_response_serializes() {
153 let resp = SourceStatusResponse {
154 sources: vec![SourceStatusItem {
155 id: 1,
156 source_type: "local_fs".into(),
157 status: "active".into(),
158 error_message: None,
159 sync_cursor: None,
160 created_at: "2026-03-15T10:00:00Z".into(),
161 updated_at: "2026-03-15T10:00:00Z".into(),
162 config_json: "{}".into(),
163 }],
164 deployment_mode: "desktop".into(),
165 };
166 let json = serde_json::to_string(&resp).expect("serialize");
167 assert!(json.contains("local_fs"));
168 assert!(json.contains("desktop"));
169 assert!(!json.contains("error_message"));
170 assert!(!json.contains("sync_cursor"));
171 }
172
173 #[test]
174 fn source_status_item_with_error() {
175 let item = SourceStatusItem {
176 id: 2,
177 source_type: "google_drive".into(),
178 status: "error".into(),
179 error_message: Some("auth failed".into()),
180 sync_cursor: Some("cursor_123".into()),
181 created_at: "2026-03-15T10:00:00Z".into(),
182 updated_at: "2026-03-15T10:00:00Z".into(),
183 config_json: r#"{"path":"/vault"}"#.into(),
184 };
185 let json = serde_json::to_string(&item).expect("serialize");
186 assert!(json.contains("error_message"));
187 assert!(json.contains("auth failed"));
188 assert!(json.contains("sync_cursor"));
189 }
190
191 #[test]
192 fn reindex_response_serializes() {
193 let resp = ReindexResponse {
194 status: "reindex_started".into(),
195 source_id: 42,
196 };
197 let json = serde_json::to_string(&resp).expect("serialize");
198 assert!(json.contains("reindex_started"));
199 assert!(json.contains("42"));
200 }
201}