datafold/datafold_node/
http_server.rs

1use super::log_routes;
2use super::llm_query;
3use super::{query_routes, schema_routes, security_routes, system_routes};
4use crate::datafold_node::DataFoldNode;
5use crate::error::{FoldDbError, FoldDbResult};
6use crate::error_handling::http_errors;
7use crate::ingestion::routes as ingestion_routes;
8use crate::ingestion::create_progress_tracker;
9
10use crate::log_feature;
11use crate::logging::features::LogFeature;
12use actix_cors::Cors;
13use actix_files::Files;
14use actix_web::{web, App, HttpResponse, HttpServer as ActixHttpServer};
15use std::sync::Arc;
16use tokio::sync::Mutex;
17
18/// HTTP server for the DataFold node.
19///
20/// DataFoldHttpServer provides a web-based interface for external clients to interact
21/// with a DataFold node. It handles HTTP requests and can serve the built React
22/// UI,
23/// and provides REST API endpoints for schemas, queries, and mutations.
24///
25/// # Features
26///
27/// * Static file serving for the UI
28/// * REST API endpoints for schemas, queries, and mutations
29/// * Sample data management
30/// * One-click loading of sample data
31pub struct DataFoldHttpServer {
32    /// The DataFold node
33    node: Arc<tokio::sync::Mutex<DataFoldNode>>,
34    /// The HTTP server bind address
35    bind_address: String,
36}
37
38/// Shared application state for the HTTP server.
39pub struct AppState {
40    /// The DataFold node
41    pub(crate) node: Arc<tokio::sync::Mutex<DataFoldNode>>,
42}
43
44impl DataFoldHttpServer {
45    /// Create a new HTTP server.
46    ///
47    /// This method creates a new HTTP server that listens on the specified address.
48    /// It uses the provided DataFoldNode to process client requests.
49    ///
50    /// # Arguments
51    ///
52    /// * `node` - The DataFoldNode instance to use for processing requests
53    /// * `bind_address` - The address to bind to (e.g., "127.0.0.1:9001")
54    ///
55    /// # Returns
56    ///
57    /// A `FoldDbResult` containing the new DataFoldHttpServer instance.
58    ///
59    /// # Errors
60    ///
61    /// Returns a `FoldDbError` if:
62    /// * There is an error starting the HTTP server
63    pub async fn new(node: DataFoldNode, bind_address: &str) -> FoldDbResult<Self> {
64        // Initialize the enhanced logging system
65        if let Err(e) = crate::logging::LoggingSystem::init_default().await {
66            log_feature!(
67                LogFeature::HttpServer,
68                warn,
69                "Failed to initialize enhanced logging system, falling back to web logger: {}",
70                e
71            );
72            // Fall back to old web logger for backward compatibility
73            crate::web_logger::init().ok();
74        }
75
76        Ok(Self {
77            node: Arc::new(Mutex::new(node)),
78            bind_address: bind_address.to_string(),
79        })
80    }
81
82    /// Run the HTTP server.
83    ///
84    /// This method starts the HTTP server and begins accepting client connections.
85    /// It can serve the compiled React UI and provides REST API endpoints for
86    /// schemas, queries, and mutations.
87    ///
88    /// # Returns
89    ///
90    /// A `FoldDbResult` indicating success or failure.
91    ///
92    /// # Errors
93    ///
94    /// Returns a `FoldDbError` if:
95    /// * There is an error binding to the specified address
96    /// * There is an error starting the server
97    pub async fn run(&self) -> FoldDbResult<()> {
98        log_feature!(
99            LogFeature::HttpServer,
100            info,
101            "HTTP server running on {}",
102            self.bind_address
103        );
104
105        // Load schemas from schema service if configured
106        let schema_service_url = {
107            let node_guard = self.node.lock().await;
108            node_guard.config.schema_service_url.clone()
109        };
110        
111        if let Some(url) = schema_service_url {
112            // Skip loading for mock/test schema services
113            if url.starts_with("test://") || url.starts_with("mock://") {
114                log_feature!(
115                    LogFeature::Database,
116                    info,
117                    "Mock schema service detected ({}). Skipping automatic schema loading. Schemas must be loaded manually in tests.",
118                    url
119                );
120            } else {
121                log_feature!(
122                    LogFeature::Database,
123                    info,
124                    "Loading schemas from schema service at {}...",
125                    url
126                );
127                
128                let schema_manager = {
129                    let node_guard = self.node.lock().await;
130                    let db_guard = node_guard.get_fold_db()?;
131                    let manager = db_guard.schema_manager.clone();
132                    drop(db_guard);
133                    drop(node_guard);
134                    manager
135                };
136                
137                let client = crate::datafold_node::SchemaServiceClient::new(&url);
138                
139                match client.load_all_schemas(&schema_manager).await {
140                    Ok(loaded_count) => {
141                        log_feature!(
142                            LogFeature::Database,
143                            info,
144                            "Loaded {} schemas from schema service",
145                            loaded_count
146                        );
147                    }
148                    Err(e) => {
149                        log_feature!(
150                            LogFeature::Database,
151                            error,
152                            "Failed to load schemas from schema service: {}. Server will start but no schemas will be available.",
153                            e
154                        );
155                    }
156                }
157            }
158        }
159
160        // Initialize upload storage from environment config
161        let upload_storage_config = crate::storage::UploadStorageConfig::from_env()
162            .unwrap_or_default();
163        
164        let upload_storage = match upload_storage_config {
165            crate::storage::UploadStorageConfig::Local { path } => {
166                crate::storage::UploadStorage::local(path)
167            }
168            crate::storage::UploadStorageConfig::S3 { bucket, region, prefix } => {
169                // Create S3 client
170                let aws_config = aws_config::defaults(aws_config::BehaviorVersion::latest())
171                    .region(aws_sdk_s3::config::Region::new(region))
172                    .load()
173                    .await;
174                let s3_client = aws_sdk_s3::Client::new(&aws_config);
175                
176                crate::storage::UploadStorage::s3(bucket, prefix, s3_client)
177            }
178        };
179
180        log_feature!(
181            LogFeature::HttpServer,
182            info,
183            "Upload storage initialized: {}",
184            if upload_storage.is_local() { "Local" } else { "S3" }
185        );
186
187        // Create individual dependencies for ingestion routes
188        let node = web::Data::new(self.node.clone());
189        let progress_tracker_data = web::Data::new(create_progress_tracker());
190        let upload_storage_data = web::Data::new(upload_storage.clone());
191
192        // Create shared application state for routes that still need it
193        let app_state = web::Data::new(AppState {
194            node: self.node.clone(),
195        });
196
197        // Create LLM query state (gracefully handles missing configuration)
198        let llm_query_state = web::Data::new(llm_query::LlmQueryState::new());
199
200        // Start the HTTP server
201        let server = ActixHttpServer::new(move || {
202            // Create CORS middleware
203            let cors = Cors::default()
204                .allow_any_origin()
205                .allow_any_method()
206                .allow_any_header()
207                .max_age(3600);
208
209            // Configure custom JSON error handler
210            let json_config =
211                web::JsonConfig::default().error_handler(http_errors::json_error_handler);
212
213            App::new()
214                .wrap(cors)
215                .app_data(app_state.clone())
216                .app_data(llm_query_state.clone())
217                .app_data(node.clone())
218                .app_data(progress_tracker_data.clone())
219                .app_data(upload_storage_data.clone())
220                .app_data(json_config)
221                .service(
222                    web::scope("/api")
223                        // OpenAPI spec endpoint
224                        .route(
225                            "/openapi.json",
226                            web::get().to(|| async move {
227                                let doc = crate::datafold_node::openapi::build_openapi();
228                                HttpResponse::Ok().content_type("application/json").body(doc)
229                            }),
230                        )
231                        // Schema endpoints
232                        .route("/schemas", web::get().to(schema_routes::list_schemas))
233                        .route("/schemas/load", web::post().to(schema_routes::load_schemas))
234                        .route("/schema/{name}", web::get().to(schema_routes::get_schema))
235                        .route(
236                            "/schema/{name}/approve",
237                            web::post().to(schema_routes::approve_schema),
238                        )
239                        .route(
240                            "/schema/{name}/block",
241                            web::post().to(schema_routes::block_schema),
242                        )
243                        // Backfill endpoints
244                        .route("/backfill/{hash}", web::get().to(schema_routes::get_backfill_status))
245                        .route("/query", web::post().to(query_routes::execute_query))
246                        .route("/mutation", web::post().to(query_routes::execute_mutation))
247                        .route("/mutations/batch", web::post().to(query_routes::execute_mutations_batch))
248                        // Ingestion endpoints
249                        .route(
250                            "/ingestion/process",
251                            web::post().to(ingestion_routes::process_json),
252                        )
253                        .route(
254                            "/ingestion/upload",
255                            web::post().to(crate::ingestion::file_upload::upload_file),
256                        )
257                        .route(
258                            "/ingestion/status",
259                            web::get().to(ingestion_routes::get_status),
260                        )
261                        .route(
262                            "/ingestion/health",
263                            web::get().to(ingestion_routes::health_check),
264                        )
265                        .route(
266                            "/ingestion/config",
267                            web::get().to(ingestion_routes::get_ingestion_config),
268                        )
269                        .route(
270                            "/ingestion/config",
271                            web::post().to(ingestion_routes::save_ingestion_config),
272                        )
273                        .route(
274                            "/ingestion/validate",
275                            web::post().to(ingestion_routes::validate_json),
276                        )
277                        // Progress tracking endpoints
278                        .route(
279                            "/ingestion/progress",
280                            web::get().to(ingestion_routes::get_all_progress),
281                        )
282                        .route(
283                            "/ingestion/progress/{id}",
284                            web::get().to(ingestion_routes::get_progress),
285                        )
286                        // Transform endpoints
287                        .route("/transforms", web::get().to(query_routes::list_transforms))
288                        .route(
289                            "/transforms/queue",
290                            web::get().to(query_routes::get_transform_queue),
291                        )
292                        .route(
293                            "/transforms/queue/{id}",
294                            web::post().to(query_routes::add_to_transform_queue),
295                        )
296                        // Backfill monitoring endpoints
297                        .route(
298                            "/transforms/backfills",
299                            web::get().to(query_routes::get_all_backfills),
300                        )
301                        .route(
302                            "/transforms/backfills/active",
303                            web::get().to(query_routes::get_active_backfills),
304                        )
305                        .route(
306                            "/transforms/backfills/statistics",
307                            web::get().to(query_routes::get_backfill_statistics),
308                        )
309                        .route(
310                            "/transforms/backfills/{id}",
311                            web::get().to(query_routes::get_backfill),
312                        )
313                        .route(
314                            "/transforms/statistics",
315                            web::get().to(query_routes::get_transform_statistics),
316                        )
317                        // Native index search endpoint
318                        .route(
319                            "/native-index/search",
320                            web::get().to(query_routes::native_index_search),
321                        )
322                        // Indexing status endpoint
323                        .route(
324                            "/indexing/status",
325                            web::get().to(query_routes::get_indexing_status),
326                        )
327                        // Log endpoints
328                        .route("/logs", web::get().to(log_routes::list_logs))
329                        .route("/logs/stream", web::get().to(log_routes::stream_logs))
330                        .route("/logs/config", web::get().to(log_routes::get_config))
331                        .route(
332                            "/logs/config/reload",
333                            web::post().to(log_routes::reload_config),
334                        )
335                        .route("/logs/features", web::get().to(log_routes::get_features))
336                        .route(
337                            "/logs/level",
338                            web::put().to(log_routes::update_feature_level),
339                        )
340                        // System endpoints
341                        .route(
342                            "/system/status",
343                            web::get().to(system_routes::get_system_status),
344                        )
345                        .route(
346                            "/system/private-key",
347                            web::get().to(system_routes::get_node_private_key),
348                        )
349                        .route(
350                            "/system/public-key",
351                            web::get().to(system_routes::get_node_public_key),
352                        )
353                        .route(
354                            "/system/reset-database",
355                            web::post().to(system_routes::reset_database),
356                        )
357                        .route(
358                            "/system/reset-schema-service",
359                            web::post().to(system_routes::reset_schema_service),
360                        )
361                        .route(
362                            "/system/database-config",
363                            web::get().to(system_routes::get_database_config),
364                        )
365                        .route(
366                            "/system/database-config",
367                            web::post().to(system_routes::update_database_config),
368                        )
369                        // LLM Query endpoints
370                        .route(
371                            "/llm-query/run",
372                            web::post().to(llm_query::run_query),
373                        )
374                        .route(
375                            "/llm-query/analyze",
376                            web::post().to(llm_query::analyze_query),
377                        )
378                        .route(
379                            "/llm-query/execute",
380                            web::post().to(llm_query::execute_query_plan),
381                        )
382                        .route(
383                            "/llm-query/chat",
384                            web::post().to(llm_query::chat),
385                        )
386                        .route(
387                            "/llm-query/analyze-followup",
388                            web::post().to(llm_query::analyze_followup),
389                        )
390                        .route(
391                            "/llm-query/native-index",
392                            web::post().to(llm_query::ai_native_index_query),
393                        )
394                        .route(
395                            "/llm-query/backfill/{hash}",
396                            web::get().to(llm_query::get_backfill_status),
397                        )
398                        // Security endpoints
399                        .service(
400                            web::scope("/security")
401                                .service(
402                                    web::resource("/system-key")
403                                        .route(
404                                            web::get().to(security_routes::get_system_public_key),
405                                        ),
406                                ),
407                        )
408                )
409                // Serve the built React UI if it exists
410                .service(
411                    Files::new("/", "src/datafold_node/static-react/dist").index_file("index.html"),
412                )
413        })
414        .bind(&self.bind_address)
415        .map_err(|e| FoldDbError::Config(format!("Failed to bind HTTP server: {}", e)))?
416        .run();
417
418        // Run the server
419        server
420            .await
421            .map_err(|e| FoldDbError::Config(format!("HTTP server error: {}", e)))?;
422
423        Ok(())
424    }
425}