1use super::log_routes;
2use super::llm_query;
3use super::{query_routes, schema_routes, security_routes, system_routes};
4use crate::datafold_node::DataFoldNode;
5use crate::error::{FoldDbError, FoldDbResult};
6use crate::error_handling::http_errors;
7use crate::ingestion::routes as ingestion_routes;
8use crate::ingestion::create_progress_tracker;
9
10use crate::log_feature;
11use crate::logging::features::LogFeature;
12use actix_cors::Cors;
13use actix_files::Files;
14use actix_web::{web, App, HttpResponse, HttpServer as ActixHttpServer};
15use std::sync::Arc;
16use tokio::sync::Mutex;
17
18pub struct DataFoldHttpServer {
32 node: Arc<tokio::sync::Mutex<DataFoldNode>>,
34 bind_address: String,
36}
37
38pub struct AppState {
40 pub(crate) node: Arc<tokio::sync::Mutex<DataFoldNode>>,
42}
43
44impl DataFoldHttpServer {
45 pub async fn new(node: DataFoldNode, bind_address: &str) -> FoldDbResult<Self> {
64 if let Err(e) = crate::logging::LoggingSystem::init_default().await {
66 log_feature!(
67 LogFeature::HttpServer,
68 warn,
69 "Failed to initialize enhanced logging system, falling back to web logger: {}",
70 e
71 );
72 crate::web_logger::init().ok();
74 }
75
76 Ok(Self {
77 node: Arc::new(Mutex::new(node)),
78 bind_address: bind_address.to_string(),
79 })
80 }
81
82 pub async fn run(&self) -> FoldDbResult<()> {
98 log_feature!(
99 LogFeature::HttpServer,
100 info,
101 "HTTP server running on {}",
102 self.bind_address
103 );
104
105 let schema_service_url = {
107 let node_guard = self.node.lock().await;
108 node_guard.config.schema_service_url.clone()
109 };
110
111 if let Some(url) = schema_service_url {
112 if url.starts_with("test://") || url.starts_with("mock://") {
114 log_feature!(
115 LogFeature::Database,
116 info,
117 "Mock schema service detected ({}). Skipping automatic schema loading. Schemas must be loaded manually in tests.",
118 url
119 );
120 } else {
121 log_feature!(
122 LogFeature::Database,
123 info,
124 "Loading schemas from schema service at {}...",
125 url
126 );
127
128 let schema_manager = {
129 let node_guard = self.node.lock().await;
130 let db_guard = node_guard.get_fold_db()?;
131 let manager = db_guard.schema_manager.clone();
132 drop(db_guard);
133 drop(node_guard);
134 manager
135 };
136
137 let client = crate::datafold_node::SchemaServiceClient::new(&url);
138
139 match client.load_all_schemas(&schema_manager).await {
140 Ok(loaded_count) => {
141 log_feature!(
142 LogFeature::Database,
143 info,
144 "Loaded {} schemas from schema service",
145 loaded_count
146 );
147 }
148 Err(e) => {
149 log_feature!(
150 LogFeature::Database,
151 error,
152 "Failed to load schemas from schema service: {}. Server will start but no schemas will be available.",
153 e
154 );
155 }
156 }
157 }
158 }
159
160 let upload_storage_config = crate::storage::UploadStorageConfig::from_env()
162 .unwrap_or_default();
163
164 let upload_storage = match upload_storage_config {
165 crate::storage::UploadStorageConfig::Local { path } => {
166 crate::storage::UploadStorage::local(path)
167 }
168 crate::storage::UploadStorageConfig::S3 { bucket, region, prefix } => {
169 let aws_config = aws_config::defaults(aws_config::BehaviorVersion::latest())
171 .region(aws_sdk_s3::config::Region::new(region))
172 .load()
173 .await;
174 let s3_client = aws_sdk_s3::Client::new(&aws_config);
175
176 crate::storage::UploadStorage::s3(bucket, prefix, s3_client)
177 }
178 };
179
180 log_feature!(
181 LogFeature::HttpServer,
182 info,
183 "Upload storage initialized: {}",
184 if upload_storage.is_local() { "Local" } else { "S3" }
185 );
186
187 let node = web::Data::new(self.node.clone());
189 let progress_tracker_data = web::Data::new(create_progress_tracker());
190 let upload_storage_data = web::Data::new(upload_storage.clone());
191
192 let app_state = web::Data::new(AppState {
194 node: self.node.clone(),
195 });
196
197 let llm_query_state = web::Data::new(llm_query::LlmQueryState::new());
199
200 let server = ActixHttpServer::new(move || {
202 let cors = Cors::default()
204 .allow_any_origin()
205 .allow_any_method()
206 .allow_any_header()
207 .max_age(3600);
208
209 let json_config =
211 web::JsonConfig::default().error_handler(http_errors::json_error_handler);
212
213 App::new()
214 .wrap(cors)
215 .app_data(app_state.clone())
216 .app_data(llm_query_state.clone())
217 .app_data(node.clone())
218 .app_data(progress_tracker_data.clone())
219 .app_data(upload_storage_data.clone())
220 .app_data(json_config)
221 .service(
222 web::scope("/api")
223 .route(
225 "/openapi.json",
226 web::get().to(|| async move {
227 let doc = crate::datafold_node::openapi::build_openapi();
228 HttpResponse::Ok().content_type("application/json").body(doc)
229 }),
230 )
231 .route("/schemas", web::get().to(schema_routes::list_schemas))
233 .route("/schemas/load", web::post().to(schema_routes::load_schemas))
234 .route("/schema/{name}", web::get().to(schema_routes::get_schema))
235 .route(
236 "/schema/{name}/approve",
237 web::post().to(schema_routes::approve_schema),
238 )
239 .route(
240 "/schema/{name}/block",
241 web::post().to(schema_routes::block_schema),
242 )
243 .route("/backfill/{hash}", web::get().to(schema_routes::get_backfill_status))
245 .route("/query", web::post().to(query_routes::execute_query))
246 .route("/mutation", web::post().to(query_routes::execute_mutation))
247 .route("/mutations/batch", web::post().to(query_routes::execute_mutations_batch))
248 .route(
250 "/ingestion/process",
251 web::post().to(ingestion_routes::process_json),
252 )
253 .route(
254 "/ingestion/upload",
255 web::post().to(crate::ingestion::file_upload::upload_file),
256 )
257 .route(
258 "/ingestion/status",
259 web::get().to(ingestion_routes::get_status),
260 )
261 .route(
262 "/ingestion/health",
263 web::get().to(ingestion_routes::health_check),
264 )
265 .route(
266 "/ingestion/config",
267 web::get().to(ingestion_routes::get_ingestion_config),
268 )
269 .route(
270 "/ingestion/config",
271 web::post().to(ingestion_routes::save_ingestion_config),
272 )
273 .route(
274 "/ingestion/validate",
275 web::post().to(ingestion_routes::validate_json),
276 )
277 .route(
279 "/ingestion/progress",
280 web::get().to(ingestion_routes::get_all_progress),
281 )
282 .route(
283 "/ingestion/progress/{id}",
284 web::get().to(ingestion_routes::get_progress),
285 )
286 .route("/transforms", web::get().to(query_routes::list_transforms))
288 .route(
289 "/transforms/queue",
290 web::get().to(query_routes::get_transform_queue),
291 )
292 .route(
293 "/transforms/queue/{id}",
294 web::post().to(query_routes::add_to_transform_queue),
295 )
296 .route(
298 "/transforms/backfills",
299 web::get().to(query_routes::get_all_backfills),
300 )
301 .route(
302 "/transforms/backfills/active",
303 web::get().to(query_routes::get_active_backfills),
304 )
305 .route(
306 "/transforms/backfills/statistics",
307 web::get().to(query_routes::get_backfill_statistics),
308 )
309 .route(
310 "/transforms/backfills/{id}",
311 web::get().to(query_routes::get_backfill),
312 )
313 .route(
314 "/transforms/statistics",
315 web::get().to(query_routes::get_transform_statistics),
316 )
317 .route(
319 "/native-index/search",
320 web::get().to(query_routes::native_index_search),
321 )
322 .route(
324 "/indexing/status",
325 web::get().to(query_routes::get_indexing_status),
326 )
327 .route("/logs", web::get().to(log_routes::list_logs))
329 .route("/logs/stream", web::get().to(log_routes::stream_logs))
330 .route("/logs/config", web::get().to(log_routes::get_config))
331 .route(
332 "/logs/config/reload",
333 web::post().to(log_routes::reload_config),
334 )
335 .route("/logs/features", web::get().to(log_routes::get_features))
336 .route(
337 "/logs/level",
338 web::put().to(log_routes::update_feature_level),
339 )
340 .route(
342 "/system/status",
343 web::get().to(system_routes::get_system_status),
344 )
345 .route(
346 "/system/private-key",
347 web::get().to(system_routes::get_node_private_key),
348 )
349 .route(
350 "/system/public-key",
351 web::get().to(system_routes::get_node_public_key),
352 )
353 .route(
354 "/system/reset-database",
355 web::post().to(system_routes::reset_database),
356 )
357 .route(
358 "/system/reset-schema-service",
359 web::post().to(system_routes::reset_schema_service),
360 )
361 .route(
362 "/system/database-config",
363 web::get().to(system_routes::get_database_config),
364 )
365 .route(
366 "/system/database-config",
367 web::post().to(system_routes::update_database_config),
368 )
369 .route(
371 "/llm-query/run",
372 web::post().to(llm_query::run_query),
373 )
374 .route(
375 "/llm-query/analyze",
376 web::post().to(llm_query::analyze_query),
377 )
378 .route(
379 "/llm-query/execute",
380 web::post().to(llm_query::execute_query_plan),
381 )
382 .route(
383 "/llm-query/chat",
384 web::post().to(llm_query::chat),
385 )
386 .route(
387 "/llm-query/analyze-followup",
388 web::post().to(llm_query::analyze_followup),
389 )
390 .route(
391 "/llm-query/native-index",
392 web::post().to(llm_query::ai_native_index_query),
393 )
394 .route(
395 "/llm-query/backfill/{hash}",
396 web::get().to(llm_query::get_backfill_status),
397 )
398 .service(
400 web::scope("/security")
401 .service(
402 web::resource("/system-key")
403 .route(
404 web::get().to(security_routes::get_system_public_key),
405 ),
406 ),
407 )
408 )
409 .service(
411 Files::new("/", "src/datafold_node/static-react/dist").index_file("index.html"),
412 )
413 })
414 .bind(&self.bind_address)
415 .map_err(|e| FoldDbError::Config(format!("Failed to bind HTTP server: {}", e)))?
416 .run();
417
418 server
420 .await
421 .map_err(|e| FoldDbError::Config(format!("HTTP server error: {}", e)))?;
422
423 Ok(())
424 }
425}