pandrs 0.3.2

A high-performance DataFrame library for Rust, providing pandas-like API with advanced features including SIMD optimization, parallel processing, and distributed computing capabilities
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
//! # Execution Engine Interface
//!
//! This module defines the interface for distributed execution engines.

// Re-export backward compatibility module (temporarily disabled)
// pub mod backward_compat;
// pub use backward_compat::*;

use std::sync::Arc;

use crate::distributed::core::config::DistributedConfig;
use crate::distributed::core::partition::{Partition, PartitionSet};
use crate::error::Result;

/// Interface for distributed execution engines
pub trait ExecutionEngine: Send + Sync {
    /// Initializes the execution engine
    fn initialize(&mut self, config: &DistributedConfig) -> Result<()>;

    /// Checks if the engine is initialized
    fn is_initialized(&self) -> bool;

    /// Creates a new execution context
    fn create_context(&self, config: &DistributedConfig) -> Result<Box<dyn ExecutionContext>>;

    /// Clones the engine
    fn clone(&self) -> Box<dyn ExecutionEngine>;
}

/// Execution context for running distributed operations
pub trait ExecutionContext: Send + Sync {
    /// Executes a plan and returns a result
    fn execute_plan(&mut self, plan: ExecutionPlan) -> Result<ExecutionResult>;

    /// Registers a partition set with the context
    fn register_in_memory_table(&mut self, name: &str, partitions: PartitionSet) -> Result<()>;

    /// Registers a CSV file as a dataset
    fn register_csv(&mut self, name: &str, path: &str) -> Result<()>;

    /// Registers a Parquet file as a dataset
    fn register_parquet(&mut self, name: &str, path: &str) -> Result<()>;

    /// Executes a SQL query
    fn sql(&mut self, query: &str) -> Result<ExecutionResult>;

    /// Gets the schema of a table
    fn table_schema(&self, name: &str) -> Result<arrow::datatypes::SchemaRef>;

    /// Explains an execution plan
    fn explain_plan(&self, plan: &ExecutionPlan, with_statistics: bool) -> Result<String>;

    /// Writes results to Parquet
    fn write_parquet(&mut self, result: &ExecutionResult, path: &str) -> Result<()>;

    /// Writes results to CSV
    fn write_csv(&mut self, result: &ExecutionResult, path: &str) -> Result<()>;

    /// Gets execution metrics
    fn metrics(&self) -> Result<ExecutionMetrics>;

    /// Clones the context
    fn clone(&self) -> Box<dyn ExecutionContext>;
}

/// A plan for executing operations
#[derive(Debug, Clone)]
pub struct ExecutionPlan {
    /// Input dataset name
    input: String,
    /// Operations to execute
    operations: Vec<Operation>,
}

impl ExecutionPlan {
    /// Get the input datasets
    pub fn inputs(&self) -> Vec<&str> {
        vec![&self.input]
    }

    /// Creates a new execution plan
    pub fn new(input: &str) -> Self {
        Self {
            input: input.to_string(),
            operations: Vec::new(),
        }
    }

    /// Adds an operation to the plan
    pub fn add_operation(&mut self, operation: Operation) -> &mut Self {
        self.operations.push(operation);
        self
    }

    /// Adds multiple operations to the plan
    pub fn add_operations(&mut self, operations: Vec<Operation>) -> &mut Self {
        self.operations.extend(operations);
        self
    }

    /// Gets the input
    pub fn input(&self) -> &str {
        &self.input
    }

    /// Gets the operations
    pub fn operations(&self) -> &Vec<Operation> {
        &self.operations
    }
}

/// Types of operations that can be executed
#[derive(Debug, Clone)]
pub enum Operation {
    /// SELECT operation - Select specific columns
    Select(Vec<String>),

    /// FILTER operation - Filter rows based on a condition
    Filter(String),

    /// JOIN operation - Join with another dataset
    Join {
        /// Right-side dataset
        right: String,
        /// Join type
        join_type: JoinType,
        /// Left join keys
        left_keys: Vec<String>,
        /// Right join keys
        right_keys: Vec<String>,
    },

    /// AGGREGATE operation - Group by keys and apply aggregations
    Aggregate(Vec<String>, Vec<AggregateExpr>),

    /// GROUP BY operation - Group by keys and apply aggregations (struct variant)
    GroupBy {
        /// Group by keys
        keys: Vec<String>,
        /// Aggregation expressions
        aggregates: Vec<AggregateExpr>,
    },

    /// ORDER BY operation - Sort data
    OrderBy(Vec<SortExpr>),

    /// LIMIT operation - Limit number of rows
    Limit(usize),

    /// WINDOW operation - Apply window functions
    Window(Vec<String>),

    /// PROJECTION operation - Add computed columns
    Project(Vec<(String, String)>),

    /// DISTINCT operation - Remove duplicate rows
    Distinct,

    /// UNION operation - Combine with another dataset
    Union(String),

    /// INTERSECT operation - Get rows present in both datasets
    Intersect(String),

    /// EXCEPT operation - Get rows in this dataset but not in another
    Except(String),

    /// Custom operation (for extensibility)
    Custom {
        /// Operation name
        name: String,
        /// Operation parameters
        params: std::collections::HashMap<String, String>,
    },
}

/// Types of joins
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinType {
    /// Inner join
    Inner,
    /// Left outer join
    Left,
    /// Right outer join
    Right,
    /// Full outer join
    Full,
    /// Cross join
    Cross,
}

/// Expression for aggregation
#[derive(Debug, Clone)]
pub struct AggregateExpr {
    /// Input column
    pub column: String,
    /// Aggregation function (e.g., "sum", "avg", "count")
    pub function: String,
    /// Output column name (alias)
    pub alias: String,
}

/// Expression for sorting
#[derive(Debug, Clone)]
pub struct SortExpr {
    /// Column name
    pub column: String,
    /// Sort direction
    pub ascending: bool,
    /// Nulls first or last
    pub nulls_first: bool,
}

/// Result of executing a plan
#[derive(Debug, Clone)]
pub struct ExecutionResult {
    /// The resulting partitions
    partitions: PartitionSet,
    /// Result schema
    schema: arrow::datatypes::SchemaRef,
    /// Execution metrics
    metrics: ExecutionMetrics,
}

impl ExecutionResult {
    /// Creates a new execution result
    pub fn new(
        partitions: PartitionSet,
        schema: arrow::datatypes::SchemaRef,
        metrics: ExecutionMetrics,
    ) -> Self {
        Self {
            partitions,
            schema,
            metrics,
        }
    }

    /// Gets the partitions
    pub fn partitions(&self) -> &PartitionSet {
        &self.partitions
    }

    /// Gets the schema
    pub fn schema(&self) -> &arrow::datatypes::SchemaRef {
        &self.schema
    }

    /// Gets the metrics
    pub fn metrics(&self) -> &ExecutionMetrics {
        &self.metrics
    }

    /// Gets the row count
    pub fn row_count(&self) -> usize {
        self.partitions.total_rows()
    }

    /// Collects all record batches
    pub fn collect(&self) -> Result<Vec<arrow::record_batch::RecordBatch>> {
        let mut batches = Vec::new();

        for partition in self.partitions.partitions() {
            if let Some(batch) = partition.data() {
                batches.push(batch.clone());
            }
        }

        Ok(batches)
    }
}

/// Metrics about execution
#[derive(Debug, Clone, Default)]
pub struct ExecutionMetrics {
    /// Time taken for execution in milliseconds
    pub execution_time_ms: u64,
    /// Number of rows processed
    pub rows_processed: usize,
    /// Number of partitions processed
    pub partitions_processed: usize,
    /// Bytes processed
    pub bytes_processed: usize,
    /// Bytes output
    pub bytes_output: usize,
    /// Optional query identifier
    pub query_id: Option<String>,
    /// Number of input rows
    pub input_rows: usize,
    /// Number of output rows
    pub output_rows: usize,
    /// Custom metrics
    pub custom_metrics: std::collections::HashMap<String, String>,
}

impl ExecutionMetrics {
    /// Creates a new set of execution metrics
    pub fn new() -> Self {
        Self::default()
    }

    /// Sets the execution time
    pub fn with_execution_time(mut self, time_ms: u64) -> Self {
        self.execution_time_ms = time_ms;
        self
    }

    /// Sets the rows processed
    pub fn with_rows_processed(mut self, rows: usize) -> Self {
        self.rows_processed = rows;
        self
    }

    /// Sets the partitions processed
    pub fn with_partitions_processed(mut self, partitions: usize) -> Self {
        self.partitions_processed = partitions;
        self
    }

    /// Sets the bytes processed
    pub fn with_bytes_processed(mut self, bytes: usize) -> Self {
        self.bytes_processed = bytes;
        self
    }

    /// Sets the bytes output
    pub fn with_bytes_output(mut self, bytes: usize) -> Self {
        self.bytes_output = bytes;
        self
    }

    /// Sets the query ID
    pub fn with_query_id(mut self, id: impl Into<String>) -> Self {
        self.query_id = Some(id.into());
        self
    }

    /// Sets the input rows
    pub fn with_input_rows(mut self, rows: usize) -> Self {
        self.input_rows = rows;
        self
    }

    /// Sets the output rows
    pub fn with_output_rows(mut self, rows: usize) -> Self {
        self.output_rows = rows;
        self
    }

    /// Adds a custom metric
    pub fn with_custom_metric(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
        self.custom_metrics.insert(name.into(), value.into());
        self
    }

    /// Gets a formatted summary of the metrics
    pub fn summary(&self) -> String {
        let mut result = String::new();

        result.push_str(&format!("Execution time: {}ms\n", self.execution_time_ms));
        result.push_str(&format!("Rows processed: {}\n", self.rows_processed));
        result.push_str(&format!(
            "Partitions processed: {}\n",
            self.partitions_processed
        ));
        result.push_str(&format!("Bytes processed: {}\n", self.bytes_processed));
        result.push_str(&format!("Bytes output: {}\n", self.bytes_output));

        if let Some(query_id) = &self.query_id {
            result.push_str(&format!("Query ID: {}\n", query_id));
        }

        if self.input_rows > 0 {
            result.push_str(&format!("Input rows: {}\n", self.input_rows));
        }

        if self.output_rows > 0 {
            result.push_str(&format!("Output rows: {}\n", self.output_rows));
        }

        if !self.custom_metrics.is_empty() {
            result.push_str("Custom metrics:\n");
            for (name, value) in &self.custom_metrics {
                result.push_str(&format!("  {}: {}\n", name, value));
            }
        }

        result
    }

    /// Merges with another set of metrics
    pub fn merge(&mut self, other: &Self) {
        self.execution_time_ms += other.execution_time_ms;
        self.rows_processed += other.rows_processed;
        self.partitions_processed += other.partitions_processed;
        self.bytes_processed += other.bytes_processed;
        self.bytes_output += other.bytes_output;
        self.input_rows += other.input_rows;
        self.output_rows += other.output_rows;

        // Merge custom metrics (just override for now)
        for (name, value) in &other.custom_metrics {
            self.custom_metrics.insert(name.clone(), value.clone());
        }
    }

    /// Adds execution time to the current metrics
    pub fn add_execution_time(&mut self, time_ms: u64) {
        self.execution_time_ms += time_ms;
    }

    /// Adds rows processed to the current metrics
    pub fn add_rows_processed(&mut self, rows: usize) {
        self.rows_processed += rows;
    }

    /// Adds partitions processed to the current metrics
    pub fn add_partitions_processed(&mut self, partitions: usize) {
        self.partitions_processed += partitions;
    }

    /// Adds bytes processed to the current metrics
    pub fn add_bytes_processed(&mut self, bytes: usize) {
        self.bytes_processed += bytes;
    }
}