pandrs 0.1.0-beta.2

A high-performance DataFrame library for Rust, providing pandas-like API with advanced features including SIMD optimization, parallel processing, and distributed computing capabilities
Documentation
#[cfg(feature = "distributed")]
use pandrs::distributed::config::DistributedConfig;
#[cfg(feature = "distributed")]
use pandrs::distributed::datafusion::{DataFusionContext, FaultTolerantDataFusionContext};
#[cfg(feature = "distributed")]
use pandrs::distributed::execution::{ExecutionPlan, Operation};
#[cfg(feature = "distributed")]
use pandrs::distributed::fault_tolerance::{FaultToleranceHandler, RecoveryStrategy, RetryPolicy};
#[cfg(feature = "distributed")]
use pandrs::error::Result;

/// Example showing fault tolerance in action with retries
#[cfg(feature = "distributed")]
#[allow(clippy::result_large_err)]
fn main() -> Result<()> {
    println!("Distributed fault tolerance example is currently disabled due to ongoing API changes.");
    println!("This example will be re-enabled in a future release.");
    Ok(())
}

#[cfg(feature = "distributed")]
#[allow(dead_code)]
#[allow(clippy::result_large_err)]
fn _disabled_main() -> Result<()> {
    // Configure the distributed processing
    let config = DistributedConfig::new()
        .with_concurrency(4)
        .with_memory_limit(Some(1024 * 1024 * 1024)) // 1GB
        .with_optimization(true);

    // Create a basic context
    let mut context = DataFusionContext::new(config);

    // Register test data
    register_test_data(&mut context)?;

    // Create a fault tolerance handler with exponential backoff
    let fault_handler = FaultToleranceHandler::new(
        RetryPolicy::Exponential {
            max_retries: 3,
            initial_delay_ms: 100,
            max_delay_ms: 1000,
            backoff_factor: 2.0,
        },
        RecoveryStrategy::RetryFailedPartitions,
    );

    // Create a fault-tolerant context
    let ft_context = FaultTolerantDataFusionContext::new(context, fault_handler);

    // Create a plan with a simple filter
    let plan = ExecutionPlan::new(
        Operation::Filter {
            predicate: "id > 10".to_string(),
        },
        vec!["test_data".to_string()],
        "filtered_result".to_string(),
    );

    // Execute the plan with fault tolerance
    let result = ft_context.execute(&plan)?;

    // Get the execution metrics
    println!("Execution Result Metrics:\n{}", result.metrics().format());

    // Display results
    let df = result.collect_to_local()?;
    println!("\nResults:\n{:?}", df);

    // Run a fault-tolerant SQL query
    println!("\nRunning SQL Query with Fault Tolerance:");
    let sql_result = ft_context.sql("SELECT * FROM test_data WHERE value > 50")?;

    println!("SQL Execution Metrics:\n{}", sql_result.metrics().format());

    let sql_df = sql_result.collect_to_local()?;
    println!("\nSQL Results:\n{:?}", sql_df);

    // Demonstrate fault recovery by simulating failures
    println!("\nDemonstrating fault recovery with simulated failures:");
    let mut failure_simulation = FailureSimulation::new(ft_context);
    failure_simulation.run_with_simulated_failures()?;

    Ok(())
}

#[cfg(not(feature = "distributed"))]
fn main() {
    println!("This example requires the 'distributed' feature flag to be enabled.");
    println!("Please recompile with:");
    println!("  cargo run --example distributed_fault_tolerance_example --features distributed");
}

/// Register test data for the example
#[cfg(feature = "distributed")]
#[allow(clippy::result_large_err)]
fn register_test_data(context: &mut DataFusionContext) -> Result<()> {
    // Create a simple dataframe with test data
    let mut df = pandrs::dataframe::DataFrame::new();

    // Add some columns
    df.add_column(
        "id".to_string(),
        pandrs::series::Series::new((1..=100).collect::<Vec<i64>>(), Some("id".to_string()))?,
    )?;

    df.add_column(
        "value".to_string(),
        pandrs::series::Series::new(
            (1..=100).map(|x| x as f64 * 1.5).collect::<Vec<f64>>(),
            Some("value".to_string()),
        )?,
    )?;

    // Convert to partitions
    #[cfg(feature = "distributed")]
    {
        use pandrs::distributed::datafusion::conversion::dataframe_to_record_batches;
        use pandrs::distributed::partition::{Partition, PartitionSet};

        let batches = dataframe_to_record_batches(&df, 25)?;
        let mut partitions = Vec::new();

        for (i, batch) in batches.iter().enumerate() {
            let partition = Partition::new(i, batch.clone());
            partitions.push(Arc::new(partition));
        }

        let partition_set = PartitionSet::new(partitions, batches[0].schema());

        // Register the data
        context.register_dataset("test_data", partition_set)?;
    }

    Ok(())
}

/// Simulation of failures for demonstration purposes
#[cfg(feature = "distributed")]
struct FailureSimulation {
    context: FaultTolerantDataFusionContext,
    failure_count: usize,
}

#[cfg(feature = "distributed")]
impl FailureSimulation {
    /// Create a new failure simulation
    fn new(context: FaultTolerantDataFusionContext) -> Self {
        Self {
            context,
            failure_count: 0,
        }
    }

    /// Run an example with simulated failures
    #[allow(clippy::result_large_err)]
    fn run_with_simulated_failures(&mut self) -> Result<()> {
        // Set up handler with visible reporting
        let new_handler = FaultToleranceHandler::new(
            RetryPolicy::Fixed {
                max_retries: 2,
                delay_ms: 500,
            },
            RecoveryStrategy::RetryQuery,
        );

        let ft_context =
            FaultTolerantDataFusionContext::new(self.context.inner().clone(), new_handler);

        // First try - will fail with our simulated error
        println!("Attempt 1: Expect failure and retry");
        let result = match self.simulate_query_with_failures(
            &ft_context,
            "SELECT * FROM test_data WHERE id > 50 AND id < 70",
        ) {
            Ok(r) => r,
            Err(e) => {
                println!("Query failed after all retries: {}", e);
                return Ok(());
            }
        };

        println!("Query eventually succeeded after failures!");
        println!("Results: {} rows", result.collect_to_local()?.shape()?.0);

        // Show the failures that were recorded
        let failures = ft_context.fault_handler().recent_failures()?;
        println!("\nRecorded Failures:");
        for (i, failure) in failures.iter().enumerate() {
            println!(
                "Failure {}: Type: {:?}, Message: {}, Recovered: {}, Retries: {}",
                i + 1,
                failure.failure_type,
                failure.error_message,
                failure.recovered,
                failure.retry_attempts
            );
        }

        Ok(())
    }

    /// Simulate a query with failures injected
    fn simulate_query_with_failures(
        &mut self,
        context: &FaultTolerantDataFusionContext,
        query: &str,
    ) -> Result<pandrs::distributed::execution::ExecutionResult> {
        // Track simulated failures
        self.failure_count += 1;

        // Inject failure for the first two attempts
        if self.failure_count <= 2 {
            println!("Simulating failure #{}", self.failure_count);

            // Return different errors based on the failure count
            if self.failure_count == 1 {
                return Err(pandrs::error::Error::Timeout(
                    "Simulated timeout error for testing fault tolerance".to_string(),
                ));
            } else {
                return Err(pandrs::error::Error::IoError(
                    "Simulated network error for testing fault tolerance".to_string(),
                ));
            }
        }

        // After simulated failures, return success
        println!("Execution succeeding on attempt #{}", self.failure_count);
        context.sql(query)
    }
}