datafold 0.1.55

A personal database for data sovereignty with AI-powered ingestion
Documentation
use datafold::fold_db_core::FoldDB;
use datafold::schema::SchemaState;
use serde_json::json;
use std::thread;
use std::time::Duration;
use tempfile::TempDir;

mod common;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_heart_rate_average_dsl() {
    // 1. Setup FoldDB
    let temp_dir = TempDir::new().expect("Failed to create temp directory");
    let test_db_path = temp_dir.path().to_str().expect("Failed to convert path");
    let mut fold_db = FoldDB::new(test_db_path)
        .await
        .expect("Failed to create FoldDB");
    let transform_manager = fold_db.transform_manager();
    let db_ops = transform_manager.db_ops.clone();

    // 2. Define Schema with DSL for Average
    let schema_json = json!({
        "name": "DailyHealthSummary",
        "key": {
            "range_field": "user_id"
        },
        "fields": {
            "user_id": {},
            "raw_readings": {},
            "avg_bpm": {}
        },
        "transform_fields": {
            // DSL: Access array -> split into items -> calculate average
            "avg_bpm": "DailyHealthSummary.raw_readings.split_array().average()"
        },
        "field_topologies": {
            "user_id": { "root": { "type": "Primitive", "value": "String", "classifications": ["word"] } },
            "raw_readings": { "root": { "type": "Array", "value": { "type": "Primitive", "value": "Number", "classifications": [] } } },
            "avg_bpm": { "root": { "type": "Primitive", "value": "String", "classifications": ["word"] } }
        }
    });

    let schema_str = serde_json::to_string(&schema_json).unwrap();
    fold_db
        .schema_manager()
        .load_schema_from_json(&schema_str)
        .await
        .expect("Failed to load schema");

    // Wait for transform registration (async via message bus)
    // The transform ID for declarative schema is the schema name itself
    let transform_id = "DailyHealthSummary";
    let mut registered = false;
    for i in 0..50 {
        // Wait up to 5 seconds
        if transform_manager
            .transform_exists(transform_id)
            .expect("Failed to check transform")
        {
            registered = true;
            println!(
                "DEBUG: Transform '{}' registered after {} iterations",
                transform_id, i
            );
            break;
        }
        thread::sleep(Duration::from_millis(100));
    }
    assert!(registered, "Transform was not registered!");

    // Approve schema
    db_ops
        .store_schema_state("DailyHealthSummary", &SchemaState::Approved)
        .await
        .expect("Failed to approve schema");

    // 3. Ingest Data (Simulate Mutation)
    // User 1: [60, 65, 70, 75, 80, 85, 90, 85, 80, 75, 70, 65] -> Avg: 75
    let raw_readings_1 = vec![60, 65, 70, 75, 80, 85, 90, 85, 80, 75, 70, 65];
    let mutation_1 = common::create_test_mutation(
        &schema_json,
        json!({
            "schema_name": "DailyHealthSummary",
            "uuid": "mutation_1",
            "pub_key": "user_123", // Using user_id as pub_key for simplicity in test helper
            "fields_and_values": {
                "user_id": "user_123",
                "raw_readings": raw_readings_1
            }
        }),
    );

    // User 2: [55, 58, 60, 62, 60, 58] -> Avg: 58.833...
    let raw_readings_2 = vec![55, 58, 60, 62, 60, 58];
    let mutation_2 = common::create_test_mutation(
        &schema_json,
        json!({
            "schema_name": "DailyHealthSummary",
            "uuid": "mutation_2",
            "pub_key": "user_456",
            "fields_and_values": {
                "user_id": "user_456",
                "raw_readings": raw_readings_2
            }
        }),
    );

    // Write mutations
    fold_db
        .mutation_manager_mut()
        .write_mutations_batch_async(vec![mutation_1, mutation_2])
        .await
        .expect("Failed to write mutations");

    // Wait for processing
    thread::sleep(Duration::from_millis(500));

    // DEBUG: Check if schema has molecule UUIDs persisted
    if let Some(schema) = db_ops.get_schema("DailyHealthSummary").await.unwrap() {
        println!(
            "DEBUG: Schema Molecule UUIDs: {:?}",
            schema.field_molecule_uuids
        );
    } else {
        println!("DEBUG: Schema not found!");
    }

    // DEBUG: Verify that user_id was indexed to ensure system is working
    let user_check = fold_db
        .native_search_all_classifications("user_123")
        .await
        .unwrap();
    if user_check.is_empty() {
        println!("WARNING: user_123 not found in index! System might be slow or broken.");
    } else {
        println!("DEBUG: user_123 found in index. Ingestion works.");
    }

    // Poll for results (async pipeline: Mutation -> Transform -> Index) usually, but here checking mutation effect would rely on storage inspection)
    // Since we don't have a direct "read row" API easily accessible in integration tests (usually goes through query engines),
    // we can verify utilizing the transform manager or checking the underlying storage if we had access.
    // However, existing tests mostly rely on checking if things didn't crash or checking index state.
    // Let's verify by trying to "search" for the calculated value if it was indexed?
    // The schema defines "word" classification for `avg_bpm`. So "75" should be indexable.

    // Using `db_ops` to fetch the atom for "avg_bpm" for a specific user is complex because atoms are UUID based.
    // Instead, let's verify via the Native Index search if "75" returns user_123.
    // Note: The `avg_bpm` field has "word" classification.

    // We need to know the specific partition key structure.
    // But for this test, we can trust the `TypedEngine` unit tests for calculation correctness.
    // To be thorough, let's add a debug print or a specific check if possible.
    // Actually, `transform_execution_test.rs` doesn't verify values, just state.
    // Let's use `fold_db.query_executor` if available? No, it's private or behind feature flags often.

    // Alternative: We can inspect the `db_ops` directly if we can construct the key.
    // Or we can rely on `schema_manager.get_schema` having updated stats? No.

    // Let's search for "75" using the public API
    // Note: The specific string representation of 75.0 might differ, but our reducer outputs "75" for integers.
    // We expect user_123 (Simulated by verifying we get *some* result, as we don't have easy access to inspect the exact return structure without more deps)

    // Poll for results (async pipeline: Mutation -> Transform -> Index)
    let mut found = false;
    for i in 0..50 {
        // Wait up to 5 seconds
        let search_results = fold_db
            .native_search_all_classifications("75")
            .await
            .expect("Search failed");

        if !search_results.is_empty() {
            found = true;
            break;
        }

        thread::sleep(Duration::from_millis(100));
        if i % 10 == 0 {
            println!("Waiting for indexing... attempt {}", i + 1);
        }
    }

    assert!(
        found,
        "Should find result for calculated average '75' after waiting"
    );

    // For the decimal one: 353 / 6 = 58.8333...
    // Our reducer converts to string. It doesn't truncate decimals unless it ends in .0.
    // So searching for exact string might be hard without knowing precision.
    // "58.833333333333336" likely.

    // Let's simply verify "75" first to confirm the pipeline works.

    fold_db.close().expect("Failed to close");
}