datafold 0.1.55

A personal database for data sovereignty with AI-powered ingestion
Documentation
use crate::datafold_node::OperationProcessor;

use crate::schema::types::operations::{Operation, Query};
use crate::server::http_server::AppState;
use actix_web::{web, HttpResponse, Responder};
use log::{debug, error, info, warn};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SuccessResponse {
    pub success: bool,
    pub message: String,
}

#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct MutationResponse {
    pub mutation_id: String,
}

/// Execute a query.
#[utoipa::path(
    post,
    path = "/api/query",
    tag = "query",
    request_body = serde_json::Value,
    responses(
        (status = 200, description = "Array of query result records"),
        (status = 400, description = "Bad request"),
        (status = 500, description = "Server error")
    )
)]
pub async fn execute_query(query: web::Json<Query>, state: web::Data<AppState>) -> impl Responder {
    let query_inner = query.into_inner();
    log::info!(
        "🔍 execute_query: schema={}, fields={:?}, filter={:?}",
        query_inner.schema_name,
        query_inner.fields,
        query_inner.filter
    );

    let processor = OperationProcessor::new(state.node.read().await.clone());

    match processor.execute_query_json(query_inner).await {
        Ok(data) => {
            log::info!("✅ Query completed: {} records returned", data.len());
            HttpResponse::Ok().json(data)
        }
        Err(e) => {
            log::error!("❌ Query failed: {}", e);
            HttpResponse::InternalServerError()
                .json(json!({"error": format!("Failed to execute query: {}", e)}))
        }
    }
}

/// Execute a mutation.
#[utoipa::path(
    post,
    path = "/api/mutation",
    tag = "query",
    request_body = serde_json::Value,
    responses(
        (status = 200, description = "Mutation accepted", body = MutationResponse),
        (status = 400, description = "Bad request"),
        (status = 500, description = "Server error")
    )
)]
pub async fn execute_mutation(
    mutation_data: web::Json<Value>,
    state: web::Data<AppState>,
) -> impl Responder {
    log::info!("📥 Received mutation request");

    let (schema, fields_and_values, key_value, mutation_type) =
        match serde_json::from_value::<Operation>(mutation_data.into_inner()) {
            Ok(Operation::Mutation {
                schema,
                fields_and_values,
                key_value,
                mutation_type,
                source_file_name: _,
            }) => {
                log::info!(
                    "✅ Parsed mutation: schema={}, type={:?}, fields={}",
                    schema,
                    mutation_type,
                    fields_and_values.len()
                );
                (schema, fields_and_values, key_value, mutation_type)
            }
            Err(e) => {
                log::error!("❌ Failed to parse mutation: {}", e);
                return HttpResponse::BadRequest()
                    .json(json!({"error": format!("Failed to parse mutation: {}", e)}));
            }
        };

    let processor = OperationProcessor::new(state.node.read().await.clone());

    log::info!("🚀 Executing mutation via OperationProcessor");
    match processor
        .execute_mutation(schema, fields_and_values, key_value, mutation_type)
        .await
    {
        Ok(mutation_id) => {
            log::info!("✅ Mutation executed successfully: {}", mutation_id);
            HttpResponse::Ok().json(json!({"mutation_id": mutation_id, "success": true}))
        }
        Err(e) => {
            log::error!("❌ Mutation execution failed: {}", e);
            HttpResponse::InternalServerError().json(
                json!({"error": format!("Failed to execute mutation: {}", e), "success": false}),
            )
        }
    }
}

/// Execute multiple mutations in a batch for improved performance.
#[utoipa::path(
    post,
    path = "/api/mutations/batch",
    tag = "query",
    request_body = Vec<serde_json::Value>,
    responses(
        (status = 200, description = "Array of mutation IDs"),
        (status = 400, description = "Bad request"),
        (status = 500, description = "Server error")
    )
)]
pub async fn execute_mutations_batch(
    mutations_data: web::Json<Vec<Value>>,
    state: web::Data<AppState>,
) -> impl Responder {
    let processor = OperationProcessor::new(state.node.read().await.clone());

    match processor
        .execute_mutations_batch(mutations_data.into_inner())
        .await
    {
        Ok(mutation_ids) => HttpResponse::Ok().json(mutation_ids),
        Err(e) => HttpResponse::InternalServerError()
            .json(json!({"error": format!("Failed to execute batch mutations: {}", e)})),
    }
}

#[utoipa::path(
    get,
    path = "/api/transforms",
    tag = "query",
    responses(
        (status = 200, description = "Map of transform names to transform objects"),
        (status = 500, description = "Server error")
    )
)]
pub async fn list_transforms(state: web::Data<AppState>) -> impl Responder {
    let processor = OperationProcessor::new(state.node.read().await.clone());

    match processor.list_transforms().await {
        Ok(map) => HttpResponse::Ok().json(map),
        Err(e) => HttpResponse::InternalServerError()
            .json(json!({"error": format!("Failed to list transforms: {}", e)})),
    }
}

#[utoipa::path(
    post,
    path = "/api/transforms/queue/{id}",
    tag = "query",
    params(
        ("id" = String, Path, description = "Transform id")
    ),
    responses(
        (status = 200, description = "Queued"),
        (status = 500, description = "Server error")
    )
)]
pub async fn add_to_transform_queue(
    path: web::Path<String>,
    state: web::Data<AppState>,
) -> impl Responder {
    let transform_id = path.into_inner();
    let processor = OperationProcessor::new(state.node.read().await.clone());

    match processor
        .add_to_transform_queue(&transform_id, "manual_api_trigger")
        .await
    {
        Ok(_) => HttpResponse::Ok().json(SuccessResponse {
            success: true,
            message: format!("Transform '{}' added to queue", transform_id),
        }),
        Err(e) => HttpResponse::InternalServerError()
            .json(json!({"error": format!("Failed to add transform to queue: {}", e)})),
    }
}

#[utoipa::path(
    get,
    path = "/api/transforms/queue",
    tag = "query",
    responses(
        (status = 200, description = "Transform queue information object"),
        (status = 500, description = "Server error")
    )
)]
pub async fn get_transform_queue(state: web::Data<AppState>) -> impl Responder {
    let processor = OperationProcessor::new(state.node.read().await.clone());

    match processor.get_transform_queue().await {
        Ok((len, queued)) => HttpResponse::Ok().json(json!({
            "length": len,
            "queued_transforms": queued
        })),
        Err(e) => HttpResponse::InternalServerError()
            .json(json!({"error": format!("Failed to get transform queue info: {}", e)})),
    }
}

#[utoipa::path(
    get,
    path = "/api/transforms/backfills",
    tag = "query",
    responses(
        (status = 200, description = "Array of all backfill information objects"),
        (status = 500, description = "Server error")
    )
)]
pub async fn get_all_backfills(state: web::Data<AppState>) -> impl Responder {
    let processor = OperationProcessor::new(state.node.read().await.clone());

    match processor.get_all_backfills().await {
        Ok(backfills) => HttpResponse::Ok().json(backfills),
        Err(e) => HttpResponse::InternalServerError()
            .json(json!({"error": format!("Failed to get backfills: {}", e)})),
    }
}

#[utoipa::path(
    get,
    path = "/api/transforms/backfills/active",
    tag = "query",
    responses(
        (status = 200, description = "Array of active backfill information objects"),
        (status = 500, description = "Server error")
    )
)]
pub async fn get_active_backfills(state: web::Data<AppState>) -> impl Responder {
    let processor = OperationProcessor::new(state.node.read().await.clone());

    match processor.get_active_backfills().await {
        Ok(backfills) => HttpResponse::Ok().json(backfills),
        Err(e) => HttpResponse::InternalServerError()
            .json(json!({"error": format!("Failed to get active backfills: {}", e)})),
    }
}

#[utoipa::path(
    get,
    path = "/api/transforms/backfills/{id}",
    tag = "query",
    params(
        ("id" = String, Path, description = "Transform ID")
    ),
    responses(
        (status = 200, description = "Backfill information object"),
        (status = 404, description = "Backfill not found"),
        (status = 500, description = "Server error")
    )
)]
pub async fn get_backfill(path: web::Path<String>, state: web::Data<AppState>) -> impl Responder {
    let transform_id = path.into_inner();
    let processor = OperationProcessor::new(state.node.read().await.clone());

    match processor.get_backfill(&transform_id).await {
        Ok(Some(backfill)) => HttpResponse::Ok().json(backfill),
        Ok(None) => HttpResponse::NotFound()
            .json(json!({"error": format!("Backfill not found for transform: {}", transform_id)})),
        Err(e) => HttpResponse::InternalServerError()
            .json(json!({"error": format!("Failed to get backfill: {}", e)})),
    }
}

#[utoipa::path(
    get,
    path = "/api/transforms/statistics",
    tag = "query",
    responses(
        (status = 200, description = "Transform statistics object"),
        (status = 500, description = "Server error")
    )
)]
pub async fn get_transform_statistics(state: web::Data<AppState>) -> impl Responder {
    let processor = OperationProcessor::new(state.node.read().await.clone());

    match processor.get_transform_statistics().await {
        Ok(stats) => HttpResponse::Ok().json(stats),
        Err(e) => HttpResponse::InternalServerError()
            .json(json!({"error": format!("Failed to get transform statistics: {}", e)})),
    }
}

/// Search the native word index for a term.
#[utoipa::path(
    get,
    path = "/api/native-index/search",
    tag = "query",
    params(
        ("term" = String, Query, description = "Search term for native word index")
    ),
    responses(
        (status = 200, description = "Array of native index results", body = [crate::db_operations::IndexResult]),
        (status = 400, description = "Bad request"),
        (status = 500, description = "Server error")
    )
)]
pub async fn native_index_search(
    query: web::Query<std::collections::HashMap<String, String>>,
    state: web::Data<AppState>,
) -> impl Responder {
    info!("API: native_index_search endpoint called");

    let term = match query.get("term") {
        Some(t) if !t.trim().is_empty() => t.trim().to_string(),
        _ => {
            warn!("API: Missing or empty term parameter");
            return HttpResponse::BadRequest()
                .json(json!({"error": "Missing required 'term' query parameter"}));
        }
    };

    info!("API: Searching for term: '{}'", term);

    let processor = OperationProcessor::new(state.node.read().await.clone());

    debug!("API: Acquired database, calling native_search_all_classifications");
    match processor.native_index_search(&term).await {
        Ok(results) => {
            info!("API: Search completed, found {} results", results.len());
            HttpResponse::Ok().json(results)
        }
        Err(e) => {
            error!("API: Search failed: {}", e);
            HttpResponse::InternalServerError()
                .json(json!({"error": format!("Native index search failed: {}", e)}))
        }
    }
}
#[utoipa::path(
    get,
    path = "/api/transforms/backfills/statistics",
    tag = "query",
    responses(
        (status = 200, description = "Aggregate backfill statistics", body = crate::fold_db_core::infrastructure::backfill_tracker::BackfillStatistics),
        (status = 500, description = "Server error")
    )
)]
pub async fn get_backfill_statistics(state: web::Data<AppState>) -> impl Responder {
    let processor = OperationProcessor::new(state.node.read().await.clone());

    match processor.get_backfill_statistics().await {
        Ok(stats) => HttpResponse::Ok().json(stats),
        Err(e) => HttpResponse::InternalServerError()
            .json(json!({"error": format!("Failed to get backfill statistics: {}", e)})),
    }
}

/// Get indexing status
#[utoipa::path(
    get,
    path = "/api/indexing/status",
    tag = "system",
    responses(
        (status = 200, description = "Current indexing status", body = IndexingStatus),
        (status = 500, description = "Server error")
    )
)]
pub async fn get_indexing_status(state: web::Data<AppState>) -> impl Responder {
    let processor = OperationProcessor::new(state.node.read().await.clone());

    match processor.get_indexing_status().await {
        Ok(status) => HttpResponse::Ok().json(status),
        Err(e) => HttpResponse::InternalServerError()
            .json(json!({"error": format!("Failed to get indexing status: {}", e)})),
    }
}

#[cfg(test)]
mod tests {}