datafold 0.1.55

A personal database for data sovereignty with AI-powered ingestion
Documentation
use crate::datafold_node::OperationProcessor;
use crate::server::http_server::AppState;
use actix_web::{web, HttpResponse, Responder, Result};
use futures_util::stream::StreamExt;
use serde::{Deserialize, Serialize};
use tokio_stream::wrappers::BroadcastStream; // Keep for backward compatibility

#[derive(Serialize, Deserialize, utoipa::ToSchema)]
pub struct LogLevelUpdate {
    pub feature: String,
    pub level: String,
}

#[derive(Serialize, Deserialize, utoipa::ToSchema)]
pub struct LogConfigResponse {
    pub message: String,
    pub current_level: String,
}

#[derive(Deserialize)]
pub struct ListLogsQuery {
    pub since: Option<i64>,
}

/// List logs with optional pagination
#[utoipa::path(
    get,
    path = "/api/logs",
    tag = "logs",
    params(
        ("since" = Option<i64>, Query, description = "Timestamp to list logs from")
    ),
    responses((status = 200, description = "List logs", body = serde_json::Value))
)]
pub async fn list_logs(
    query: web::Query<ListLogsQuery>,
    state: web::Data<AppState>,
) -> impl Responder {
    let temp_processor_node = state.node.read().await.clone();
    let processor = OperationProcessor::new(temp_processor_node);

    let logs = processor.list_logs(query.since, Some(1000)).await;

    HttpResponse::Ok().json(serde_json::json!({
        "logs": logs,
        "count": logs.len(),
        "timestamp": std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs()
    }))
}

/// Stream logs via Server-Sent Events (backward compatibility)
#[utoipa::path(
    get,
    path = "/api/logs/stream",
    tag = "logs",
    responses((status = 200, description = "Stream logs"))
)]
pub async fn stream_logs() -> impl Responder {
    // Subscribe to new WebOutput via logging module
    let rx = match crate::logging::subscribe() {
        Some(r) => r,
        None => return HttpResponse::InternalServerError().finish(),
    };
    
    // The WebOutput now broadcasts JSON strings (LogEntry serialized)
    // We wrap them in SSE format: "data: {JSON}\n\n"
    let stream = BroadcastStream::new(rx).filter_map(|msg| async move {
        match msg {
            Ok(json_str) => Some(Ok::<web::Bytes, actix_web::Error>(web::Bytes::from(
                format!("data: {}\n\n", json_str),
            ))),
            Err(_) => None, // Broadcast error (lagging, etc)
        }
    });

    HttpResponse::Ok()
        .insert_header(("Content-Type", "text/event-stream"))
        .streaming(stream)
}


/// Get current logging configuration
#[utoipa::path(
    get,
    path = "/api/logs/config",
    tag = "logs",
    responses((status = 200, description = "Logging configuration", body = LogConfigResponse))
)]
pub async fn get_config(state: web::Data<AppState>) -> Result<impl Responder> {
    let temp_processor_node = state.node.read().await.clone();
    let processor = OperationProcessor::new(temp_processor_node);

    if let Some(config) = processor.get_log_config().await {
        Ok(HttpResponse::Ok().json(serde_json::json!({
            "config": config
        })))
    } else {
        let current_level = log::max_level().to_string();
        Ok(HttpResponse::Ok().json(LogConfigResponse {
            message: "Basic logging configuration".to_string(),
            current_level,
        }))
    }
}

/// Update feature-specific log level at runtime
#[utoipa::path(
    put,
    path = "/api/logs/level",
    tag = "logs",
    request_body = LogLevelUpdate,
    responses(
        (status = 200, description = "Updated"),
        (status = 400, description = "Bad request"),
        (status = 500, description = "Server error")
    )
)]
pub async fn update_feature_level(
    level_update: web::Json<LogLevelUpdate>,
    state: web::Data<AppState>,
) -> Result<impl Responder> {
    let valid_levels = ["TRACE", "DEBUG", "INFO", "WARN", "ERROR"];
    if !valid_levels.contains(&level_update.level.as_str()) {
        return Ok(HttpResponse::BadRequest().json(serde_json::json!({
            "error": format!("Invalid log level: {}", level_update.level)
        })));
    }

    let temp_processor_node = state.node.read().await.clone();
    let processor = OperationProcessor::new(temp_processor_node);

    match processor.update_log_feature_level(&level_update.feature, &level_update.level).await {
        Ok(_) => {
            Ok(HttpResponse::Ok().json(serde_json::json!({
                "success": true,
                "message": format!("Updated {} log level to {}", level_update.feature, level_update.level)
            })))
        }
        Err(e) => {
            Ok(HttpResponse::InternalServerError().json(serde_json::json!({
                "error": format!("Failed to update log level: {}", e)
            })))
        }
    }
}

/// Reload logging configuration from file
#[utoipa::path(
    post,
    path = "/api/logs/config/reload",
    tag = "logs",
    responses((status = 200, description = "Reloaded"), (status = 400, description = "Bad request"))
)]
pub async fn reload_config(state: web::Data<AppState>) -> Result<impl Responder> {
    let temp_processor_node = state.node.read().await.clone();
    let processor = OperationProcessor::new(temp_processor_node);

    match processor.reload_log_config("config/logging.toml").await {
        Ok(_) => Ok(HttpResponse::Ok().json(serde_json::json!({
            "success": true,
            "message": "Configuration reloaded successfully"
        }))),
        Err(e) => Ok(HttpResponse::BadRequest().json(serde_json::json!({
            "error": format!("Failed to reload configuration: {}", e)
        }))),
    }
}

/// Get available log features and their current levels
#[utoipa::path(
    get,
    path = "/api/logs/features",
    tag = "logs",
    responses((status = 200, description = "Features", body = serde_json::Value))
)]
pub async fn get_features(state: web::Data<AppState>) -> Result<impl Responder> {
    let temp_processor_node = state.node.read().await.clone();
    let processor = OperationProcessor::new(temp_processor_node);

    if let Some(features) = processor.get_log_features().await {
        Ok(HttpResponse::Ok().json(serde_json::json!({
            "features": features,
            "available_levels": ["TRACE", "DEBUG", "INFO", "WARN", "ERROR"]
        })))
    } else {
        let current_level = log::max_level().to_string();
        Ok(HttpResponse::Ok().json(serde_json::json!({
            "features": {
                "transform": current_level,
                "network": current_level,
                "database": current_level,
                "schema": current_level,
                "query": current_level,
                "mutation": current_level,
                "permissions": current_level,
                "http_server": current_level,
                "tcp_server": current_level,
                "ingestion": current_level
            },
            "available_levels": ["TRACE", "DEBUG", "INFO", "WARN", "ERROR"]
        })))
    }
}