1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
use super::plan::{QueryNode, QueryPlan};
use crate::{
error::{OrmError, OrmResult},
loading::batch_loader::BatchLoader,
};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
/// Result of executing a query plan
#[derive(Debug)]
pub struct ExecutionResult {
/// Query results grouped by node ID
pub results: HashMap<String, Vec<JsonValue>>,
/// Execution statistics
pub stats: ExecutionStats,
/// Errors that occurred during execution
pub errors: Vec<OrmError>,
}
/// Statistics about query plan execution
#[derive(Debug, Clone)]
pub struct ExecutionStats {
/// Total execution time
pub total_duration: Duration,
/// Time spent on each phase
pub phase_durations: Vec<Duration>,
/// Number of queries executed
pub query_count: usize,
/// Total rows fetched
pub rows_fetched: usize,
/// Number of phases executed in parallel
pub parallel_phases: usize,
/// Average response time per query
pub avg_query_time: Duration,
/// Peak memory usage (if available)
pub peak_memory_mb: Option<f64>,
}
impl ExecutionStats {
pub fn new() -> Self {
Self {
total_duration: Duration::from_secs(0),
phase_durations: Vec::new(),
query_count: 0,
rows_fetched: 0,
parallel_phases: 0,
avg_query_time: Duration::from_secs(0),
peak_memory_mb: None,
}
}
/// Calculate average query time
pub fn calculate_averages(&mut self) {
if self.query_count > 0 {
self.avg_query_time = self.total_duration / self.query_count as u32;
}
}
/// Add phase duration
pub fn add_phase_duration(&mut self, duration: Duration) {
self.phase_durations.push(duration);
self.total_duration += duration;
}
}
impl Default for ExecutionStats {
fn default() -> Self {
Self::new()
}
}
/// Executes optimized query plans with parallel processing
pub struct PlanExecutor {
/// Batch loader for executing queries
_batch_loader: BatchLoader,
/// Maximum number of parallel tasks
max_parallel_tasks: usize,
/// Timeout for individual queries
query_timeout: Duration,
}
impl PlanExecutor {
/// Create a new plan executor
pub fn new(batch_loader: BatchLoader) -> Self {
Self {
_batch_loader: batch_loader,
max_parallel_tasks: 10, // Reasonable default
query_timeout: Duration::from_secs(30),
}
}
/// Create a plan executor with custom configuration
pub fn with_config(
batch_loader: BatchLoader,
max_parallel_tasks: usize,
query_timeout: Duration,
) -> Self {
Self {
_batch_loader: batch_loader,
max_parallel_tasks,
query_timeout,
}
}
/// Execute a query plan with optimized parallel processing
pub async fn execute_plan(
&self,
plan: &QueryPlan,
connection: &sqlx::PgPool,
) -> OrmResult<ExecutionResult> {
let start_time = Instant::now();
let mut results: HashMap<String, Vec<JsonValue>> = HashMap::new();
let mut stats = ExecutionStats::new();
let mut errors = Vec::new();
// Execute each phase
for phase in plan.execution_phases.iter() {
let phase_start = Instant::now();
if phase.len() == 1 {
// Single node - execute directly
let node_id = &phase[0];
if let Some(node) = plan.nodes.get(node_id) {
match self.execute_node_query(node, connection).await {
Ok(node_results) => {
stats.query_count += 1;
stats.rows_fetched += node_results.len();
results.insert(node_id.clone(), node_results);
}
Err(e) => errors.push(e),
}
}
} else {
// Multiple nodes - execute in parallel
stats.parallel_phases += 1;
let parallel_results = self.execute_phase_parallel(phase, plan, connection).await;
for (node_id, result) in parallel_results {
match result {
Ok(node_results) => {
stats.query_count += 1;
stats.rows_fetched += node_results.len();
results.insert(node_id, node_results);
}
Err(e) => errors.push(e),
}
}
}
let phase_duration = phase_start.elapsed();
stats.add_phase_duration(phase_duration);
}
stats.total_duration = start_time.elapsed();
stats.calculate_averages();
Ok(ExecutionResult {
results,
stats,
errors,
})
}
/// Execute a phase with multiple nodes in parallel
async fn execute_phase_parallel(
&self,
phase: &[String],
plan: &QueryPlan,
connection: &sqlx::PgPool,
) -> HashMap<String, OrmResult<Vec<JsonValue>>> {
let _handles: Vec<JoinHandle<(String, OrmResult<Vec<JsonValue>>)>> = Vec::new();
let mut results = HashMap::new();
// Limit parallel tasks to avoid overwhelming the database
let chunks: Vec<_> = phase.chunks(self.max_parallel_tasks).collect();
for chunk in chunks {
let mut chunk_handles = Vec::new();
for node_id in chunk {
if let Some(node) = plan.nodes.get(node_id) {
let node_clone = node.clone();
let node_id_clone = node_id.clone();
let connection_clone = connection.clone();
let handle = tokio::spawn(async move {
let result =
Self::execute_node_query_static(&node_clone, &connection_clone).await;
(node_id_clone, result)
});
chunk_handles.push(handle);
}
}
// Wait for chunk to complete
for handle in chunk_handles {
match handle.await {
Ok((node_id, result)) => {
results.insert(node_id, result);
}
Err(e) => {
eprintln!("Task join error: {}", e);
}
}
}
}
results
}
/// Execute a single node's query
async fn execute_node_query(
&self,
node: &QueryNode,
connection: &sqlx::PgPool,
) -> OrmResult<Vec<JsonValue>> {
// Add timeout wrapper
let query_future = self.execute_node_query_impl(node, connection);
match tokio::time::timeout(self.query_timeout, query_future).await {
Ok(result) => result,
Err(_) => Err(OrmError::Query(format!(
"Query timeout for node '{}' after {:?}",
node.id, self.query_timeout
))),
}
}
/// Actual implementation of node query execution using the batch loader
async fn execute_node_query_impl(
&self,
node: &QueryNode,
connection: &sqlx::PgPool,
) -> OrmResult<Vec<JsonValue>> {
// Use the batch loader to execute real database queries
// This replaces the previous mock implementation with actual database queries
if node.is_root() {
// Root node: Query all records from the table (with constraints if any)
self.execute_root_query(node, connection).await
} else {
// Child node: Query based on parent relationship
self.execute_relationship_query(node, connection).await
}
}
/// Execute query for root node (no parent relationship)
async fn execute_root_query(
&self,
node: &QueryNode,
connection: &sqlx::PgPool,
) -> OrmResult<Vec<JsonValue>> {
use crate::query::QueryBuilder;
// Build base query for the table
let mut query = QueryBuilder::<()>::new().from(&node.table);
// Apply constraints if any
for constraint in &node.constraints {
query = query.where_raw(constraint);
}
// Apply reasonable limit to prevent excessive data loading
let limit = std::cmp::min(node.estimated_rows, 1000);
query = query.limit(limit as i64);
// Execute the query
let (sql, _params) = query.to_sql_with_params();
let db_query = sqlx::query(&sql);
let rows = db_query
.fetch_all(connection)
.await
.map_err(|e| OrmError::Database(e.to_string()))?;
// Convert rows to JSON values
let results: Result<Vec<JsonValue>, OrmError> = rows
.into_iter()
.map(|row| {
crate::loading::batch_loader::row_conversion::convert_row_to_json(&row)
.map_err(|e| OrmError::Serialization(e.to_string()))
})
.collect();
results
}
/// Execute query for child node (with parent relationship)
async fn execute_relationship_query(
&self,
_node: &QueryNode,
_connection: &sqlx::PgPool,
) -> OrmResult<Vec<JsonValue>> {
// For relationship queries, we need parent IDs to load the related records
// In a real implementation, this would be called with parent IDs
// For now, return empty results as this indicates the need for proper relationship loading
// This method should be called with specific parent IDs via the batch loader
// Example: self.batch_loader.load_batch::<Model>(parent_ids, &node.table, connection).await
// Return empty for now - the actual loading should happen through the relationship system
Ok(Vec::new())
}
/// Static version of execute_node_query for use in async tasks
async fn execute_node_query_static(
node: &QueryNode,
connection: &sqlx::PgPool,
) -> OrmResult<Vec<JsonValue>> {
// Use real database queries instead of mock data
if node.is_root() {
Self::execute_root_query_static(node, connection).await
} else {
Self::execute_relationship_query_static(node, connection).await
}
}
/// Static version of root query execution
async fn execute_root_query_static(
node: &QueryNode,
connection: &sqlx::PgPool,
) -> OrmResult<Vec<JsonValue>> {
use crate::query::QueryBuilder;
// Build base query for the table
let mut query = QueryBuilder::<()>::new().from(&node.table);
// Apply constraints if any
for constraint in &node.constraints {
query = query.where_raw(constraint);
}
// Apply reasonable limit to prevent excessive data loading
let limit = std::cmp::min(node.estimated_rows, 1000);
query = query.limit(limit as i64);
// Execute the query
let (sql, _params) = query.to_sql_with_params();
let db_query = sqlx::query(&sql);
let rows = db_query
.fetch_all(connection)
.await
.map_err(|e| OrmError::Database(e.to_string()))?;
// Convert rows to JSON values
let results: Result<Vec<JsonValue>, OrmError> = rows
.into_iter()
.map(|row| {
crate::loading::batch_loader::row_conversion::convert_row_to_json(&row)
.map_err(|e| OrmError::Serialization(e.to_string()))
})
.collect();
results
}
/// Static version of relationship query execution
async fn execute_relationship_query_static(
_node: &QueryNode,
_connection: &sqlx::PgPool,
) -> OrmResult<Vec<JsonValue>> {
// For relationship queries, we need parent IDs to load the related records
// Return empty for now - the actual loading should happen through the relationship system
Ok(Vec::new())
}
/// Get executor statistics
pub fn get_stats(&self) -> ExecutorStats {
ExecutorStats {
max_parallel_tasks: self.max_parallel_tasks,
query_timeout: self.query_timeout,
}
}
/// Update executor configuration
pub fn set_max_parallel_tasks(&mut self, max_tasks: usize) {
self.max_parallel_tasks = max_tasks;
}
pub fn set_query_timeout(&mut self, timeout: Duration) {
self.query_timeout = timeout;
}
}
/// Statistics about the executor configuration
#[derive(Debug, Clone)]
pub struct ExecutorStats {
pub max_parallel_tasks: usize,
pub query_timeout: Duration,
}