datafold 0.1.55

A personal database for data sovereignty with AI-powered ingestion
Documentation
use datafold::datafold_node::node::DataFoldNode;
use datafold::datafold_node::OperationProcessor;
use datafold::schema::types::operations::MutationType;
use datafold::schema::types::KeyValue;
use datafold::testing_utils::TestDatabaseFactory;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;

#[tokio::test]
async fn test_indexing_progress_tracking() {
    // Setup
    let config = TestDatabaseFactory::create_test_node_config();
    let node = DataFoldNode::new(config).await.unwrap();

    // Create a schema
    let schema_json = r#"{
        "name": "test_schema",
        "type": "Single",
        "key": {
            "fields": ["id"],
            "primary": true
        },
        "fields": ["id", "content"],
        "field_topologies": {
            "id": {
                "root": {
                    "type": "Primitive",
                    "value": "String"
                }
            },
            "content": {
                "root": {
                    "type": "Primitive",
                    "value": "String",
                    "classifications": ["word"]
                }
            }
        }
    }"#;

    node.get_fold_db()
        .await
        .unwrap()
        .load_schema_from_json(schema_json)
        .await
        .unwrap();

    // Check initial status
    let status = node.get_indexing_status().await;
    assert_eq!(status.total_operations_processed, 0);

    // Perform mutation
    let fields_and_values = {
        let mut map = HashMap::new();
        map.insert("id".to_string(), json!("1"));
        map.insert("content".to_string(), json!("hello world"));
        map
    };

    let node_clone = node.clone();
    let node_arc = Arc::new(tokio::sync::RwLock::new(node));
    let processor = OperationProcessor::new(node_clone);

    let key_value = KeyValue::new(Some("1".to_string()), None);

    processor
        .execute_mutation(
            "test_schema".to_string(),
            fields_and_values,
            key_value,
            MutationType::Create,
        )
        .await
        .unwrap();

    // Poll for status update
    let start = std::time::Instant::now();
    let timeout = std::time::Duration::from_secs(5);

    let mut processed = 0;
    while start.elapsed() < timeout {
        let node_guard = node_arc.read().await;
        // Check if DB is ready/monitoring
        let status = node_guard.get_indexing_status().await;
        if status.total_operations_processed > 0 {
            processed = status.total_operations_processed;
            break;
        }
        drop(node_guard);
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }

    println!("Total operations processed: {}", processed);
    assert!(
        processed > 0,
        "Should have processed indexing operations within timeout"
    );
}