1use std::{rc::Rc, sync::Arc, time::Instant};
4
5use selene_core::{CancellationToken, Change, NodeScanBudget, metrics};
6use selene_graph::CommitOutcome;
7
8use super::plan_cache::{SharedPlanCacheInsert, SharedPlanCacheLookup};
9use super::session::materialize_parameter_values;
10use crate::{
11 ExecutionPlan, GqlStatus, LiveIndexCatalog, OptimizeContext, PipelineOp, ProcedureRegistry,
12 SourceSpan, StatementCategory, TxOp,
13 analyze::analyze,
14 ast::Statement,
15 optimize,
16 parser::parse,
17 plan::plan_with_caps as build_plan,
18 runtime::{
19 BindingTable, BindingTableRegistry, CallPlanKey, ExecutorError, ExecutorWarning, Session,
20 TxContext, execute_plan, pipeline,
21 },
22};
23
24#[derive(Clone, Debug, PartialEq)]
26#[non_exhaustive]
27pub enum StatementOutput {
28 Empty,
30 Written(WriteOutcome),
32 Rows(BindingTable),
34}
35
36#[derive(Clone, Debug, PartialEq)]
38#[non_exhaustive]
39pub struct WriteOutcome {
40 pub rows: Option<BindingTable>,
42 pub changes: Vec<Change>,
44 pub generation: u64,
46 pub next_node_id: u64,
48 pub next_edge_id: u64,
50 pub durable_at: Option<u64>,
52}
53
54impl WriteOutcome {
55 pub(crate) fn from_commit(outcome: CommitOutcome, rows: Option<BindingTable>) -> Self {
56 Self {
57 rows,
58 changes: outcome.changes,
59 generation: outcome.generation,
60 next_node_id: outcome.next_node_id,
61 next_edge_id: outcome.next_edge_id,
62 durable_at: outcome.durable_at,
63 }
64 }
65}
66
67#[tracing::instrument(
72 name = "selene.gql.execute_statement",
73 skip(plan, session, registry),
74 fields(category = ?plan.category)
75)]
76pub fn execute_statement(
77 plan: &ExecutionPlan,
78 session: &mut Session<'_>,
79 registry: &dyn ProcedureRegistry,
80) -> Result<StatementOutput, ExecutorError> {
81 if session.is_closed() {
91 return Err(ExecutorError::SessionClosed {
92 span: SourceSpan::default(),
93 });
94 }
95 if session.aborted && plan.category != StatementCategory::TransactionControl {
96 return Err(ExecutorError::InFailedTransaction {
97 span: SourceSpan::default(),
98 });
99 }
100 let started = Instant::now();
101 let counts_toward_tx =
102 plan.category != StatementCategory::TransactionControl && session.active_txn.is_some();
103 let result = match plan.category {
104 StatementCategory::ReadOnly => execute_read_only(plan, session, registry),
105 StatementCategory::Maintenance => execute_maintenance(plan, session, registry),
106 StatementCategory::DataModifying | StatementCategory::CatalogModifying => {
107 execute_write(plan, session, registry)
108 }
109 StatementCategory::TransactionControl => execute_transaction_control(plan, session),
110 StatementCategory::SessionControl => execute_session_control(plan, session, registry),
111 };
112 if counts_toward_tx {
113 if result.is_ok() {
114 session.tx_statement_count = session.tx_statement_count.saturating_add(1);
115 } else {
116 session.aborted = true;
117 }
118 }
119 record_statement_metrics(plan, started);
120 result
121}
122
123impl Session<'_> {
124 pub fn execute_source(
138 &mut self,
139 source: &str,
140 registry: &dyn ProcedureRegistry,
141 ) -> Result<StatementOutput, ExecutorError> {
142 if self.is_closed() {
147 return Err(ExecutorError::SessionClosed {
148 span: SourceSpan::default(),
149 });
150 }
151 let schema_version = self.graph().schema_version();
152 let registry_version = registry.registry_version();
153 let top_level_call_candidate = is_top_level_call_candidate(source);
154 let active_txn_has_schema_changes = self
155 .active_txn
156 .as_ref()
157 .is_some_and(|txn| txn.has_schema_changes());
158 if !active_txn_has_schema_changes
159 && let Some(cached) = self
160 .plan_cache
161 .as_mut()
162 .and_then(|cache| cache.get(source, schema_version))
163 {
164 return execute_statement(&cached, self, registry);
165 }
166
167 let shared_plan_cache = (!active_txn_has_schema_changes)
168 .then(|| self.shared_plan_cache.as_ref().map(Arc::clone))
169 .flatten();
170 let call_plan_cache = (!active_txn_has_schema_changes)
171 .then(|| self.call_plan_cache.as_ref().map(Arc::clone))
172 .flatten();
173 let cache_graph_id = if shared_plan_cache.is_some() || call_plan_cache.is_some() {
174 Some(self.graph().read().graph_id())
175 } else {
176 None
177 };
178 if !top_level_call_candidate
179 && let (Some(cache), Some(graph_id)) = (shared_plan_cache.as_ref(), cache_graph_id)
180 && let Some(cached) = cache.get(SharedPlanCacheLookup {
181 graph_id,
182 schema_version,
183 registry_version,
184 source,
185 caps: self.caps,
186 index_selection: self.index_selection,
187 })
188 {
189 if let Some(cache) = self.plan_cache.as_mut() {
190 cache.insert(Arc::from(source), Arc::clone(&cached), schema_version);
191 }
192 return execute_statement(&cached, self, registry);
193 }
194 if top_level_call_candidate
195 && let (Some(cache), Some(graph_id)) = (call_plan_cache.as_ref(), cache_graph_id)
196 && let Some(cached) =
197 cache.get_source(graph_id, schema_version, registry_version, source)
198 {
199 metrics::counter_inc(metrics::CALL_PLAN_CACHE_HITS_TOTAL);
200 return execute_statement(&cached, self, registry);
201 }
202
203 let statement = parse(source).map_err(|source| {
210 if self.active_txn.is_some() {
211 self.aborted = true;
212 }
213 ExecutorError::Parse { source }
214 })?;
215
216 if self.aborted && !is_tx_control_statement(&statement) {
221 return Err(ExecutorError::InFailedTransaction {
222 span: SourceSpan::default(),
223 });
224 }
225
226 let call_plan_key = cache_graph_id.and_then(|graph_id| {
227 CallPlanKey::for_statement(graph_id, schema_version, registry_version, &statement)
228 });
229 if let (Some(cache), Some(key)) = (call_plan_cache.as_ref(), call_plan_key.as_ref())
230 && let Some(cached) = cache.get(key)
231 {
232 metrics::counter_inc(metrics::CALL_PLAN_CACHE_HITS_TOTAL);
233 return execute_statement(&cached, self, registry);
234 }
235
236 let graph_type = self
237 .active_txn
238 .as_ref()
239 .and_then(|txn| txn.read().meta.bound_type.as_ref().map(Arc::clone))
240 .or_else(|| self.graph().graph_type());
241 let analyzed = analyze(statement, registry, graph_type.as_deref()).map_err(|source| {
242 if self.active_txn.is_some() {
243 self.aborted = true;
244 }
245 ExecutorError::Analysis { source }
246 })?;
247 let lowered = build_plan(&analyzed, registry, &self.caps).map_err(|source| {
248 if self.active_txn.is_some() {
249 self.aborted = true;
250 }
251 ExecutorError::Plan { source }
252 })?;
253 let plan = Arc::new(self.optimize_plan(lowered));
258 let source_arc = Arc::<str>::from(source);
259 if !active_txn_has_schema_changes && let Some(cache) = self.plan_cache.as_mut() {
260 cache.insert(Arc::clone(&source_arc), Arc::clone(&plan), schema_version);
261 }
262 if let (Some(cache), Some(graph_id)) = (shared_plan_cache, cache_graph_id) {
263 cache.insert(
264 SharedPlanCacheInsert {
265 graph_id,
266 schema_version,
267 registry_version,
268 source: Arc::clone(&source_arc),
269 caps: self.caps,
270 index_selection: self.index_selection,
271 },
272 Arc::clone(&plan),
273 );
274 }
275 if let (Some(cache), Some(key)) = (call_plan_cache, call_plan_key) {
276 cache.insert_with_source(key, source_arc, Arc::clone(&plan));
277 }
278 execute_statement(&plan, self, registry)
279 }
280
281 fn optimize_plan(&self, lowered: ExecutionPlan) -> ExecutionPlan {
300 if !self.index_selection {
301 return lowered;
302 }
303 let snapshot = match self.active_txn.as_ref() {
304 Some(txn) => Arc::new(txn.read().clone()),
305 None => self.graph().read(),
306 };
307 let catalog = LiveIndexCatalog::new(snapshot);
308 let caps = lowered.impl_defined_caps;
309 let ctx = OptimizeContext::new(&caps).with_index_catalog(&catalog);
310 optimize(lowered, &ctx)
311 }
312}
313
314fn is_tx_control_statement(statement: &Statement) -> bool {
315 matches!(
316 statement,
317 Statement::StartTransaction { .. } | Statement::Commit { .. } | Statement::Rollback { .. }
318 )
319}
320
321fn is_top_level_call_candidate(source: &str) -> bool {
322 let source = source.trim_start();
323 let Some(prefix) = source.get(..4) else {
324 return false;
325 };
326 if !prefix.eq_ignore_ascii_case("CALL") {
327 return false;
328 }
329 source[4..].chars().next().is_none_or(char::is_whitespace)
330}
331
332fn execute_read_only(
333 plan: &ExecutionPlan,
334 session: &mut Session<'_>,
335 registry: &dyn ProcedureRegistry,
336) -> Result<StatementOutput, ExecutorError> {
337 let providers = session.graph().index_providers();
338 let snapshot = session.graph().read();
339 let session_tz = session.effective_time_zone();
340 let binding_tables = Rc::new(BindingTableRegistry::new());
341 let parameters = materialize_parameter_values(
342 &session.parameters,
343 &session.scalar_parameters,
344 &binding_tables,
345 );
346 let (cancellation, deadline, row_cap, node_scan_budget) = resource_limits(session);
347 let warning_sink = session.warning_sink.as_ref();
348 let table = if let Some(txn) = session.active_txn.as_mut() {
349 let mut ctx = TxContext::write_with_owned_parameters_and_registry(
350 snapshot,
351 &plan.impl_defined_caps,
352 registry,
353 txn,
354 providers,
355 parameters,
356 Rc::clone(&binding_tables),
357 )
358 .with_resource_limits(
359 cancellation.as_ref(),
360 deadline,
361 row_cap,
362 node_scan_budget.as_ref(),
363 )
364 .with_warning_sink(warning_sink)
365 .with_session_time_zone(session_tz);
366 ctx.check_cancellation()?;
367 let table = execute_plan(plan, &mut ctx)?;
368 note_output_rows(plan, &ctx, table.row_count())?;
369 table
370 } else {
371 let mut ctx = TxContext::read_only_with_owned_parameters_and_registry(
372 snapshot,
373 &plan.impl_defined_caps,
374 registry,
375 providers,
376 parameters,
377 Rc::clone(&binding_tables),
378 )
379 .with_resource_limits(
380 cancellation.as_ref(),
381 deadline,
382 row_cap,
383 node_scan_budget.as_ref(),
384 )
385 .with_warning_sink(warning_sink)
386 .with_session_time_zone(session_tz);
387 ctx.check_cancellation()?;
388 let table = execute_plan(plan, &mut ctx)?;
389 note_output_rows(plan, &ctx, table.row_count())?;
390 table
391 };
392 Ok(output_from_table(plan, table))
393}
394
395fn execute_write(
396 plan: &ExecutionPlan,
397 session: &mut Session<'_>,
398 registry: &dyn ProcedureRegistry,
399) -> Result<StatementOutput, ExecutorError> {
400 if session.active_txn.is_some() {
401 return execute_inside_explicit_tx(plan, session, registry);
402 }
403 execute_auto_commit(plan, session, registry)
404}
405
406fn execute_maintenance(
407 plan: &ExecutionPlan,
408 session: &mut Session<'_>,
409 registry: &dyn ProcedureRegistry,
410) -> Result<StatementOutput, ExecutorError> {
411 if session.active_txn.is_some() {
412 return Err(ExecutorError::InvalidTransactionState {
413 detail: "maintenance procedure cannot run inside an explicit transaction",
414 span: SourceSpan::default(),
415 });
416 }
417 let providers = session.graph().index_providers();
418 let snapshot = session.graph().read();
419 let session_tz = session.effective_time_zone();
420 let binding_tables = Rc::new(BindingTableRegistry::new());
421 let parameters = materialize_parameter_values(
422 &session.parameters,
423 &session.scalar_parameters,
424 &binding_tables,
425 );
426 let (cancellation, deadline, row_cap, node_scan_budget) = resource_limits(session);
427 let warning_sink = session.warning_sink.as_ref();
428 let mut ctx = TxContext::maintenance_with_owned_parameters_and_registry(
429 snapshot,
430 &plan.impl_defined_caps,
431 registry,
432 session.graph(),
433 providers,
434 parameters,
435 Rc::clone(&binding_tables),
436 )
437 .with_resource_limits(
438 cancellation.as_ref(),
439 deadline,
440 row_cap,
441 node_scan_budget.as_ref(),
442 )
443 .with_warning_sink(warning_sink)
444 .with_session_time_zone(session_tz);
445 ctx.check_cancellation()?;
446 let table = execute_plan(plan, &mut ctx)?;
447 note_output_rows(plan, &ctx, table.row_count())?;
448 Ok(output_from_table(plan, table))
449}
450
451fn execute_inside_explicit_tx(
452 plan: &ExecutionPlan,
453 session: &mut Session<'_>,
454 registry: &dyn ProcedureRegistry,
455) -> Result<StatementOutput, ExecutorError> {
456 let providers = session.graph().index_providers();
457 let snapshot = session.graph().read();
458 let session_tz = session.effective_time_zone();
459 let binding_tables = Rc::new(BindingTableRegistry::new());
460 let parameters = materialize_parameter_values(
461 &session.parameters,
462 &session.scalar_parameters,
463 &binding_tables,
464 );
465 let (cancellation, deadline, row_cap, node_scan_budget) = resource_limits(session);
466 let warning_sink = session.warning_sink.as_ref();
467 let txn = session
468 .active_txn
469 .as_mut()
470 .ok_or(ExecutorError::ImplementationDefined {
471 detail: "explicit-TX path entered without active transaction",
472 })?;
473 let mut ctx = TxContext::write_with_owned_parameters_and_registry(
474 snapshot,
475 &plan.impl_defined_caps,
476 registry,
477 txn,
478 providers,
479 parameters,
480 Rc::clone(&binding_tables),
481 )
482 .with_resource_limits(
483 cancellation.as_ref(),
484 deadline,
485 row_cap,
486 node_scan_budget.as_ref(),
487 )
488 .with_warning_sink(warning_sink)
489 .with_session_time_zone(session_tz);
490 let result = ctx
491 .check_cancellation()
492 .and_then(|()| execute_plan(plan, &mut ctx))
493 .and_then(|table| {
494 note_output_rows(plan, &ctx, table.row_count())?;
495 Ok(table)
496 });
497 if result.is_err() {
498 session.aborted = true;
499 }
500 result.map(|table| output_from_table(plan, table))
501}
502
503fn execute_auto_commit(
504 plan: &ExecutionPlan,
505 session: &mut Session<'_>,
506 registry: &dyn ProcedureRegistry,
507) -> Result<StatementOutput, ExecutorError> {
508 let providers = session.graph().index_providers();
509 let snapshot = session.graph().read();
510 let principal = session.principal();
511 let session_tz = session.effective_time_zone();
512 let binding_tables = Rc::new(BindingTableRegistry::new());
513 let parameters = materialize_parameter_values(
514 &session.parameters,
515 &session.scalar_parameters,
516 &binding_tables,
517 );
518 let mut txn = session.graph().begin_write();
519 let (cancellation, deadline, row_cap, node_scan_budget) = resource_limits(session);
520 let warning_sink = session.warning_sink.as_ref();
521 let result = {
522 let mut ctx = TxContext::write_with_owned_parameters_and_registry(
523 snapshot,
524 &plan.impl_defined_caps,
525 registry,
526 &mut txn,
527 providers,
528 parameters,
529 Rc::clone(&binding_tables),
530 )
531 .with_resource_limits(
532 cancellation.as_ref(),
533 deadline,
534 row_cap,
535 node_scan_budget.as_ref(),
536 )
537 .with_warning_sink(warning_sink)
538 .with_session_time_zone(session_tz);
539 ctx.check_cancellation()
540 .and_then(|()| execute_plan(plan, &mut ctx))
541 .and_then(|table| {
542 note_output_rows(plan, &ctx, table.row_count())?;
543 Ok(table)
544 })
545 };
546 match result {
547 Ok(table) => {
548 let outcome = txn.commit_with_principal(principal).map_err(|source| {
549 ExecutorError::GraphMutation {
550 source,
551 span: SourceSpan::default(),
552 }
553 })?;
554 emit_commit_warnings(&outcome, session);
555 Ok(write_output_from_commit(plan, table, outcome))
556 }
557 Err(error) => {
558 txn.rollback();
559 Err(error)
560 }
561 }
562}
563
564fn emit_commit_warnings(outcome: &CommitOutcome, session: &Session<'_>) {
565 let Some(sink) = session.warning_sink.as_ref() else {
566 return;
567 };
568 for warning in &outcome.warnings {
569 sink.borrow_mut().emit(ExecutorWarning {
570 code: GqlStatus::VALIDATION_MODE_RELAXED_WRITE,
571 message: warning.warning.violation.to_string(),
572 span: SourceSpan::default(),
573 });
574 }
575}
576
577fn note_output_rows(
578 plan: &ExecutionPlan,
579 ctx: &TxContext<'_, '_>,
580 row_count: usize,
581) -> Result<(), ExecutorError> {
582 if !plan.output_schema.columns.is_empty() {
583 ctx.note_result_rows(row_count)?;
584 }
585 Ok(())
586}
587
588fn resource_limits(
589 session: &Session<'_>,
590) -> (
591 Option<CancellationToken>,
592 Option<std::time::Instant>,
593 Option<usize>,
594 Option<NodeScanBudget>,
595) {
596 (
597 session.cancellation.clone(),
598 session.deadline,
599 session.row_cap,
600 session.max_nodes_scanned.map(NodeScanBudget::new),
601 )
602}
603
604fn execute_transaction_control(
605 plan: &ExecutionPlan,
606 session: &mut Session<'_>,
607) -> Result<StatementOutput, ExecutorError> {
608 let [crate::PipelineOp::Tx(op)] = plan.pipeline.as_slice() else {
609 return Err(ExecutorError::ImplementationDefined {
610 detail: "transaction-control plan must contain exactly one TX op",
611 });
612 };
613 pipeline::tx::execute(op, session)
614}
615
616fn execute_session_control(
617 plan: &ExecutionPlan,
618 session: &mut Session<'_>,
619 registry: &dyn ProcedureRegistry,
620) -> Result<StatementOutput, ExecutorError> {
621 let [crate::PipelineOp::Session(op)] = plan.pipeline.as_slice() else {
622 return Err(ExecutorError::ImplementationDefined {
623 detail: "session-control plan must contain exactly one session op",
624 });
625 };
626 pipeline::session::execute(op, session, registry)
627}
628
629fn output_from_table(plan: &ExecutionPlan, table: BindingTable) -> StatementOutput {
630 if plan.output_schema.columns.is_empty() {
631 StatementOutput::Empty
632 } else {
633 StatementOutput::Rows(table)
634 }
635}
636
637fn write_output_from_commit(
638 plan: &ExecutionPlan,
639 table: BindingTable,
640 outcome: CommitOutcome,
641) -> StatementOutput {
642 let rows = if plan.output_schema.columns.is_empty() {
643 None
644 } else {
645 Some(table)
646 };
647 StatementOutput::Written(WriteOutcome::from_commit(outcome, rows))
648}
649
650fn record_statement_metrics(plan: &ExecutionPlan, started: Instant) {
651 let label = metrics::Label::new(metrics::STATEMENT_KIND_LABEL, statement_kind(plan));
652 metrics::counter_inc_with_label(metrics::QUERIES_TOTAL, label);
653 metrics::histogram_record_with_label(
654 metrics::QUERY_DURATION_SECONDS,
655 started.elapsed().as_secs_f64(),
656 label,
657 );
658}
659
660fn statement_kind(plan: &ExecutionPlan) -> &'static str {
661 if let Some(kind) = plan.pipeline.iter().find_map(pipeline_statement_kind) {
662 return kind;
663 }
664 match plan.category {
665 StatementCategory::ReadOnly => "query",
666 StatementCategory::DataModifying => "mutation",
667 StatementCategory::CatalogModifying => "catalog",
668 StatementCategory::Maintenance => "maintenance",
669 StatementCategory::TransactionControl => "transaction",
670 StatementCategory::SessionControl => "session",
671 }
672}
673
674fn pipeline_statement_kind(op: &PipelineOp) -> Option<&'static str> {
675 match op {
676 PipelineOp::Union { .. } | PipelineOp::Chain(_) | PipelineOp::CorrelatedChain(_) => {
677 Some("composite")
678 }
679 PipelineOp::Match(_) | PipelineOp::OptionalMatch(_) => Some("query"),
680 PipelineOp::Call(_) => Some("call"),
681 PipelineOp::CallSubquery(_) => Some("call_subquery"),
682 PipelineOp::Mutation(_) => Some("mutation"),
683 PipelineOp::Catalog(_) => Some("catalog"),
684 PipelineOp::ExplainPlan { .. } => Some("explain"),
685 PipelineOp::Tx(TxOp::Start { .. }) => Some("start_transaction"),
686 PipelineOp::Tx(TxOp::Commit { .. }) => Some("commit"),
687 PipelineOp::Tx(TxOp::Rollback { .. }) => Some("rollback"),
688 _ => None,
689 }
690}