Skip to main content

selene_gql/runtime/
statement.rs

1//! Top-level statement executor.
2
3use std::{rc::Rc, sync::Arc, time::Instant};
4
5use selene_core::{CancellationToken, Change, NodeScanBudget, metrics};
6use selene_graph::CommitOutcome;
7
8use super::plan_cache::{SharedPlanCacheInsert, SharedPlanCacheLookup};
9use super::session::materialize_parameter_values;
10use crate::{
11    ExecutionPlan, GqlStatus, LiveIndexCatalog, OptimizeContext, PipelineOp, ProcedureRegistry,
12    SourceSpan, StatementCategory, TxOp,
13    analyze::analyze,
14    ast::Statement,
15    optimize,
16    parser::parse,
17    plan::plan_with_caps as build_plan,
18    runtime::{
19        BindingTable, BindingTableRegistry, CallPlanKey, ExecutorError, ExecutorWarning, Session,
20        TxContext, execute_plan, pipeline,
21    },
22};
23
24/// Result returned by statement-level execution.
25#[derive(Clone, Debug, PartialEq)]
26#[non_exhaustive]
27pub enum StatementOutput {
28    /// Statement completed without a row-bearing result.
29    Empty,
30    /// Statement published a graph generation.
31    Written(WriteOutcome),
32    /// Statement produced a binding table.
33    Rows(BindingTable),
34}
35
36/// Metadata returned for a statement that committed graph changes.
37#[derive(Clone, Debug, PartialEq)]
38#[non_exhaustive]
39pub struct WriteOutcome {
40    /// Optional row result for write statements with `RETURN`.
41    pub rows: Option<BindingTable>,
42    /// Changes produced by the committed write transaction.
43    pub changes: Vec<Change>,
44    /// Published graph generation.
45    pub generation: u64,
46    /// Next node ID after the commit.
47    pub next_node_id: u64,
48    /// Next edge ID after the commit.
49    pub next_edge_id: u64,
50    /// Highest sequence reported by commit-critical durable providers.
51    pub durable_at: Option<u64>,
52}
53
54impl WriteOutcome {
55    pub(crate) fn from_commit(outcome: CommitOutcome, rows: Option<BindingTable>) -> Self {
56        Self {
57            rows,
58            changes: outcome.changes,
59            generation: outcome.generation,
60            next_node_id: outcome.next_node_id,
61            next_edge_id: outcome.next_edge_id,
62            durable_at: outcome.durable_at,
63        }
64    }
65}
66
67/// Execute one planned statement against a caller-owned session.
68///
69/// The procedure registry argument is optional because statement kinds without
70/// CALL should not force embedders to construct a registry.
71#[tracing::instrument(
72    name = "selene.gql.execute_statement",
73    skip(plan, session, registry),
74    fields(category = ?plan.category)
75)]
76pub fn execute_statement(
77    plan: &ExecutionPlan,
78    session: &mut Session<'_>,
79    registry: &dyn ProcedureRegistry,
80) -> Result<StatementOutput, ExecutorError> {
81    // ISO/IEC 39075:2024 section 7.3: once `SESSION CLOSE` sets the termination
82    // flag, every subsequent GQL-request is rejected regardless of category.
83    // `execute_source` guards the source-string entry; this guard covers the
84    // sibling public path where an embedder caches an `ExecutionPlan` and
85    // re-executes it directly (the pipeline documented in the embedding guide),
86    // so the termination flag is enforced at the single statement funnel
87    // (hard rule 11) rather than only one layer up. The flag is set *during*
88    // execution of `SESSION CLOSE` itself, so this check never blocks the
89    // closing statement — only the requests that follow it.
90    if session.is_closed() {
91        return Err(ExecutorError::SessionClosed {
92            span: SourceSpan::default(),
93        });
94    }
95    if session.aborted && plan.category != StatementCategory::TransactionControl {
96        return Err(ExecutorError::InFailedTransaction {
97            span: SourceSpan::default(),
98        });
99    }
100    let started = Instant::now();
101    let counts_toward_tx =
102        plan.category != StatementCategory::TransactionControl && session.active_txn.is_some();
103    let result = match plan.category {
104        StatementCategory::ReadOnly => execute_read_only(plan, session, registry),
105        StatementCategory::Maintenance => execute_maintenance(plan, session, registry),
106        StatementCategory::DataModifying | StatementCategory::CatalogModifying => {
107            execute_write(plan, session, registry)
108        }
109        StatementCategory::TransactionControl => execute_transaction_control(plan, session),
110        StatementCategory::SessionControl => execute_session_control(plan, session, registry),
111    };
112    if counts_toward_tx {
113        if result.is_ok() {
114            session.tx_statement_count = session.tx_statement_count.saturating_add(1);
115        } else {
116            session.aborted = true;
117        }
118    }
119    record_statement_metrics(plan, started);
120    result
121}
122
123impl Session<'_> {
124    /// Parse, plan, and execute one source-string statement through this session.
125    ///
126    /// When the session has a plan cache, source strings whose cached plan was
127    /// prepared against the current graph schema version skip parse, analyze,
128    /// and plan. Short-lived sessions can additionally use the opt-in shared
129    /// non-CALL source-plan cache, keyed by graph ID, schema version, registry
130    /// version, source text, caps, and optimizer mode. Procedure-call-rooted
131    /// statements use the separate opt-in shared CALL plan cache, keyed by
132    /// graph ID, schema version, registry version, and formatter-canonical
133    /// source. Embedded in-pipeline `CALL` remains uncached.
134    /// If an active explicit transaction has uncommitted schema changes, both
135    /// caches bypass lookup and insert so analysis sees transaction-local
136    /// schema.
137    pub fn execute_source(
138        &mut self,
139        source: &str,
140        registry: &dyn ProcedureRegistry,
141    ) -> Result<StatementOutput, ExecutorError> {
142        // ISO/IEC 39075:2024 section 6 GR3 + section 7.3: once a session is
143        // closed by `SESSION CLOSE`, every subsequent GQL-request is rejected.
144        // This is the spec request boundary; embedders that drop the `Session`
145        // struct still work, but the termination flag is the conformant model.
146        if self.is_closed() {
147            return Err(ExecutorError::SessionClosed {
148                span: SourceSpan::default(),
149            });
150        }
151        let schema_version = self.graph().schema_version();
152        let registry_version = registry.registry_version();
153        let top_level_call_candidate = is_top_level_call_candidate(source);
154        let active_txn_has_schema_changes = self
155            .active_txn
156            .as_ref()
157            .is_some_and(|txn| txn.has_schema_changes());
158        if !active_txn_has_schema_changes
159            && let Some(cached) = self
160                .plan_cache
161                .as_mut()
162                .and_then(|cache| cache.get(source, schema_version))
163        {
164            return execute_statement(&cached, self, registry);
165        }
166
167        let shared_plan_cache = (!active_txn_has_schema_changes)
168            .then(|| self.shared_plan_cache.as_ref().map(Arc::clone))
169            .flatten();
170        let call_plan_cache = (!active_txn_has_schema_changes)
171            .then(|| self.call_plan_cache.as_ref().map(Arc::clone))
172            .flatten();
173        let cache_graph_id = if shared_plan_cache.is_some() || call_plan_cache.is_some() {
174            Some(self.graph().read().graph_id())
175        } else {
176            None
177        };
178        if !top_level_call_candidate
179            && let (Some(cache), Some(graph_id)) = (shared_plan_cache.as_ref(), cache_graph_id)
180            && let Some(cached) = cache.get(SharedPlanCacheLookup {
181                graph_id,
182                schema_version,
183                registry_version,
184                source,
185                caps: self.caps,
186                index_selection: self.index_selection,
187            })
188        {
189            if let Some(cache) = self.plan_cache.as_mut() {
190                cache.insert(Arc::from(source), Arc::clone(&cached), schema_version);
191            }
192            return execute_statement(&cached, self, registry);
193        }
194        if top_level_call_candidate
195            && let (Some(cache), Some(graph_id)) = (call_plan_cache.as_ref(), cache_graph_id)
196            && let Some(cached) =
197                cache.get_source(graph_id, schema_version, registry_version, source)
198        {
199            metrics::counter_inc(metrics::CALL_PLAN_CACHE_HITS_TOTAL);
200            return execute_statement(&cached, self, registry);
201        }
202
203        // Codex PR #127 auto-review P2 #2 + P2 #3: compile failures inside an
204        // explicit transaction must abort the transaction (PostgreSQL-style
205        // contract: any error → rollback). Without this, parse/analyze/plan
206        // errors would leave the explicit-tx in a usable state, diverging
207        // from `execute_inside_explicit_tx` which marks `aborted = true` on
208        // execution-time errors.
209        let statement = parse(source).map_err(|source| {
210            if self.active_txn.is_some() {
211                self.aborted = true;
212            }
213            ExecutorError::Parse { source }
214        })?;
215
216        // Codex PR #127 auto-review P2 #1: short-circuit aborted-session
217        // non-control statements before analyze/plan/cache-insert work.
218        // TX-control statements (START/COMMIT/ROLLBACK) must remain reachable
219        // on an aborted session so the caller can recover via ROLLBACK.
220        if self.aborted && !is_tx_control_statement(&statement) {
221            return Err(ExecutorError::InFailedTransaction {
222                span: SourceSpan::default(),
223            });
224        }
225
226        let call_plan_key = cache_graph_id.and_then(|graph_id| {
227            CallPlanKey::for_statement(graph_id, schema_version, registry_version, &statement)
228        });
229        if let (Some(cache), Some(key)) = (call_plan_cache.as_ref(), call_plan_key.as_ref())
230            && let Some(cached) = cache.get(key)
231        {
232            metrics::counter_inc(metrics::CALL_PLAN_CACHE_HITS_TOTAL);
233            return execute_statement(&cached, self, registry);
234        }
235
236        let graph_type = self
237            .active_txn
238            .as_ref()
239            .and_then(|txn| txn.read().meta.bound_type.as_ref().map(Arc::clone))
240            .or_else(|| self.graph().graph_type());
241        let analyzed = analyze(statement, registry, graph_type.as_deref()).map_err(|source| {
242            if self.active_txn.is_some() {
243                self.aborted = true;
244            }
245            ExecutorError::Analysis { source }
246        })?;
247        let lowered = build_plan(&analyzed, registry, &self.caps).map_err(|source| {
248            if self.active_txn.is_some() {
249                self.aborted = true;
250            }
251            ExecutorError::Plan { source }
252        })?;
253        // Optimize on the cache-MISS path only: cached plans are already
254        // optimized, so the two cache-hit early-returns above serve optimized
255        // plans at hit-cost. EXPLAIN renders the optimized inner plan for free
256        // because the optimizer recurses into PipelineOp::ExplainPlan { inner }.
257        let plan = Arc::new(self.optimize_plan(lowered));
258        let source_arc = Arc::<str>::from(source);
259        if !active_txn_has_schema_changes && let Some(cache) = self.plan_cache.as_mut() {
260            cache.insert(Arc::clone(&source_arc), Arc::clone(&plan), schema_version);
261        }
262        if let (Some(cache), Some(graph_id)) = (shared_plan_cache, cache_graph_id) {
263            cache.insert(
264                SharedPlanCacheInsert {
265                    graph_id,
266                    schema_version,
267                    registry_version,
268                    source: Arc::clone(&source_arc),
269                    caps: self.caps,
270                    index_selection: self.index_selection,
271                },
272                Arc::clone(&plan),
273            );
274        }
275        if let (Some(cache), Some(key)) = (call_plan_cache, call_plan_key) {
276            cache.insert_with_source(key, source_arc, Arc::clone(&plan));
277        }
278        execute_statement(&plan, self, registry)
279    }
280
281    /// Run the default optimizer over a freshly-lowered plan.
282    ///
283    /// When [`index_selection`](Session::without_index_selection) is enabled
284    /// (the default), the optimizer probes a snapshot-pinned
285    /// [`LiveIndexCatalog`] so label / typed / composite index access paths are
286    /// selected; Linear remains the always-correct fallback inside every rule.
287    /// When disabled, the lowered (Linear) plan is returned unchanged — the
288    /// optimizer fixed-point is skipped entirely, giving byte-identical Linear
289    /// lowering and EXPLAIN to pre-optimizer-wiring HEAD.
290    ///
291    /// The catalog is built from a single pinned `Arc<SeleneGraph>` snapshot —
292    /// the active transaction's working snapshot when inside an explicit
293    /// transaction (so txn-local index DDL is visible), else the published
294    /// snapshot. The plan-cache key is the `schema_version` epoch, which bumps
295    /// only on schema-changing commits and is published after the new snapshot
296    /// (see `selene_graph::WriteTxn::commit`); index *selection* depends only
297    /// on which indexes exist, so a structural access path stays correct for
298    /// any data mutation within an epoch.
299    fn optimize_plan(&self, lowered: ExecutionPlan) -> ExecutionPlan {
300        if !self.index_selection {
301            return lowered;
302        }
303        let snapshot = match self.active_txn.as_ref() {
304            Some(txn) => Arc::new(txn.read().clone()),
305            None => self.graph().read(),
306        };
307        let catalog = LiveIndexCatalog::new(snapshot);
308        let caps = lowered.impl_defined_caps;
309        let ctx = OptimizeContext::new(&caps).with_index_catalog(&catalog);
310        optimize(lowered, &ctx)
311    }
312}
313
314fn is_tx_control_statement(statement: &Statement) -> bool {
315    matches!(
316        statement,
317        Statement::StartTransaction { .. } | Statement::Commit { .. } | Statement::Rollback { .. }
318    )
319}
320
321fn is_top_level_call_candidate(source: &str) -> bool {
322    let source = source.trim_start();
323    let Some(prefix) = source.get(..4) else {
324        return false;
325    };
326    if !prefix.eq_ignore_ascii_case("CALL") {
327        return false;
328    }
329    source[4..].chars().next().is_none_or(char::is_whitespace)
330}
331
332fn execute_read_only(
333    plan: &ExecutionPlan,
334    session: &mut Session<'_>,
335    registry: &dyn ProcedureRegistry,
336) -> Result<StatementOutput, ExecutorError> {
337    let providers = session.graph().index_providers();
338    let snapshot = session.graph().read();
339    let session_tz = session.effective_time_zone();
340    let binding_tables = Rc::new(BindingTableRegistry::new());
341    let parameters = materialize_parameter_values(
342        &session.parameters,
343        &session.scalar_parameters,
344        &binding_tables,
345    );
346    let (cancellation, deadline, row_cap, node_scan_budget) = resource_limits(session);
347    let warning_sink = session.warning_sink.as_ref();
348    let table = if let Some(txn) = session.active_txn.as_mut() {
349        let mut ctx = TxContext::write_with_owned_parameters_and_registry(
350            snapshot,
351            &plan.impl_defined_caps,
352            registry,
353            txn,
354            providers,
355            parameters,
356            Rc::clone(&binding_tables),
357        )
358        .with_resource_limits(
359            cancellation.as_ref(),
360            deadline,
361            row_cap,
362            node_scan_budget.as_ref(),
363        )
364        .with_warning_sink(warning_sink)
365        .with_session_time_zone(session_tz);
366        ctx.check_cancellation()?;
367        let table = execute_plan(plan, &mut ctx)?;
368        note_output_rows(plan, &ctx, table.row_count())?;
369        table
370    } else {
371        let mut ctx = TxContext::read_only_with_owned_parameters_and_registry(
372            snapshot,
373            &plan.impl_defined_caps,
374            registry,
375            providers,
376            parameters,
377            Rc::clone(&binding_tables),
378        )
379        .with_resource_limits(
380            cancellation.as_ref(),
381            deadline,
382            row_cap,
383            node_scan_budget.as_ref(),
384        )
385        .with_warning_sink(warning_sink)
386        .with_session_time_zone(session_tz);
387        ctx.check_cancellation()?;
388        let table = execute_plan(plan, &mut ctx)?;
389        note_output_rows(plan, &ctx, table.row_count())?;
390        table
391    };
392    Ok(output_from_table(plan, table))
393}
394
395fn execute_write(
396    plan: &ExecutionPlan,
397    session: &mut Session<'_>,
398    registry: &dyn ProcedureRegistry,
399) -> Result<StatementOutput, ExecutorError> {
400    if session.active_txn.is_some() {
401        return execute_inside_explicit_tx(plan, session, registry);
402    }
403    execute_auto_commit(plan, session, registry)
404}
405
406fn execute_maintenance(
407    plan: &ExecutionPlan,
408    session: &mut Session<'_>,
409    registry: &dyn ProcedureRegistry,
410) -> Result<StatementOutput, ExecutorError> {
411    if session.active_txn.is_some() {
412        return Err(ExecutorError::InvalidTransactionState {
413            detail: "maintenance procedure cannot run inside an explicit transaction",
414            span: SourceSpan::default(),
415        });
416    }
417    let providers = session.graph().index_providers();
418    let snapshot = session.graph().read();
419    let session_tz = session.effective_time_zone();
420    let binding_tables = Rc::new(BindingTableRegistry::new());
421    let parameters = materialize_parameter_values(
422        &session.parameters,
423        &session.scalar_parameters,
424        &binding_tables,
425    );
426    let (cancellation, deadline, row_cap, node_scan_budget) = resource_limits(session);
427    let warning_sink = session.warning_sink.as_ref();
428    let mut ctx = TxContext::maintenance_with_owned_parameters_and_registry(
429        snapshot,
430        &plan.impl_defined_caps,
431        registry,
432        session.graph(),
433        providers,
434        parameters,
435        Rc::clone(&binding_tables),
436    )
437    .with_resource_limits(
438        cancellation.as_ref(),
439        deadline,
440        row_cap,
441        node_scan_budget.as_ref(),
442    )
443    .with_warning_sink(warning_sink)
444    .with_session_time_zone(session_tz);
445    ctx.check_cancellation()?;
446    let table = execute_plan(plan, &mut ctx)?;
447    note_output_rows(plan, &ctx, table.row_count())?;
448    Ok(output_from_table(plan, table))
449}
450
451fn execute_inside_explicit_tx(
452    plan: &ExecutionPlan,
453    session: &mut Session<'_>,
454    registry: &dyn ProcedureRegistry,
455) -> Result<StatementOutput, ExecutorError> {
456    let providers = session.graph().index_providers();
457    let snapshot = session.graph().read();
458    let session_tz = session.effective_time_zone();
459    let binding_tables = Rc::new(BindingTableRegistry::new());
460    let parameters = materialize_parameter_values(
461        &session.parameters,
462        &session.scalar_parameters,
463        &binding_tables,
464    );
465    let (cancellation, deadline, row_cap, node_scan_budget) = resource_limits(session);
466    let warning_sink = session.warning_sink.as_ref();
467    let txn = session
468        .active_txn
469        .as_mut()
470        .ok_or(ExecutorError::ImplementationDefined {
471            detail: "explicit-TX path entered without active transaction",
472        })?;
473    let mut ctx = TxContext::write_with_owned_parameters_and_registry(
474        snapshot,
475        &plan.impl_defined_caps,
476        registry,
477        txn,
478        providers,
479        parameters,
480        Rc::clone(&binding_tables),
481    )
482    .with_resource_limits(
483        cancellation.as_ref(),
484        deadline,
485        row_cap,
486        node_scan_budget.as_ref(),
487    )
488    .with_warning_sink(warning_sink)
489    .with_session_time_zone(session_tz);
490    let result = ctx
491        .check_cancellation()
492        .and_then(|()| execute_plan(plan, &mut ctx))
493        .and_then(|table| {
494            note_output_rows(plan, &ctx, table.row_count())?;
495            Ok(table)
496        });
497    if result.is_err() {
498        session.aborted = true;
499    }
500    result.map(|table| output_from_table(plan, table))
501}
502
503fn execute_auto_commit(
504    plan: &ExecutionPlan,
505    session: &mut Session<'_>,
506    registry: &dyn ProcedureRegistry,
507) -> Result<StatementOutput, ExecutorError> {
508    let providers = session.graph().index_providers();
509    let snapshot = session.graph().read();
510    let principal = session.principal();
511    let session_tz = session.effective_time_zone();
512    let binding_tables = Rc::new(BindingTableRegistry::new());
513    let parameters = materialize_parameter_values(
514        &session.parameters,
515        &session.scalar_parameters,
516        &binding_tables,
517    );
518    let mut txn = session.graph().begin_write();
519    let (cancellation, deadline, row_cap, node_scan_budget) = resource_limits(session);
520    let warning_sink = session.warning_sink.as_ref();
521    let result = {
522        let mut ctx = TxContext::write_with_owned_parameters_and_registry(
523            snapshot,
524            &plan.impl_defined_caps,
525            registry,
526            &mut txn,
527            providers,
528            parameters,
529            Rc::clone(&binding_tables),
530        )
531        .with_resource_limits(
532            cancellation.as_ref(),
533            deadline,
534            row_cap,
535            node_scan_budget.as_ref(),
536        )
537        .with_warning_sink(warning_sink)
538        .with_session_time_zone(session_tz);
539        ctx.check_cancellation()
540            .and_then(|()| execute_plan(plan, &mut ctx))
541            .and_then(|table| {
542                note_output_rows(plan, &ctx, table.row_count())?;
543                Ok(table)
544            })
545    };
546    match result {
547        Ok(table) => {
548            let outcome = txn.commit_with_principal(principal).map_err(|source| {
549                ExecutorError::GraphMutation {
550                    source,
551                    span: SourceSpan::default(),
552                }
553            })?;
554            emit_commit_warnings(&outcome, session);
555            Ok(write_output_from_commit(plan, table, outcome))
556        }
557        Err(error) => {
558            txn.rollback();
559            Err(error)
560        }
561    }
562}
563
564fn emit_commit_warnings(outcome: &CommitOutcome, session: &Session<'_>) {
565    let Some(sink) = session.warning_sink.as_ref() else {
566        return;
567    };
568    for warning in &outcome.warnings {
569        sink.borrow_mut().emit(ExecutorWarning {
570            code: GqlStatus::VALIDATION_MODE_RELAXED_WRITE,
571            message: warning.warning.violation.to_string(),
572            span: SourceSpan::default(),
573        });
574    }
575}
576
577fn note_output_rows(
578    plan: &ExecutionPlan,
579    ctx: &TxContext<'_, '_>,
580    row_count: usize,
581) -> Result<(), ExecutorError> {
582    if !plan.output_schema.columns.is_empty() {
583        ctx.note_result_rows(row_count)?;
584    }
585    Ok(())
586}
587
588fn resource_limits(
589    session: &Session<'_>,
590) -> (
591    Option<CancellationToken>,
592    Option<std::time::Instant>,
593    Option<usize>,
594    Option<NodeScanBudget>,
595) {
596    (
597        session.cancellation.clone(),
598        session.deadline,
599        session.row_cap,
600        session.max_nodes_scanned.map(NodeScanBudget::new),
601    )
602}
603
604fn execute_transaction_control(
605    plan: &ExecutionPlan,
606    session: &mut Session<'_>,
607) -> Result<StatementOutput, ExecutorError> {
608    let [crate::PipelineOp::Tx(op)] = plan.pipeline.as_slice() else {
609        return Err(ExecutorError::ImplementationDefined {
610            detail: "transaction-control plan must contain exactly one TX op",
611        });
612    };
613    pipeline::tx::execute(op, session)
614}
615
616fn execute_session_control(
617    plan: &ExecutionPlan,
618    session: &mut Session<'_>,
619    registry: &dyn ProcedureRegistry,
620) -> Result<StatementOutput, ExecutorError> {
621    let [crate::PipelineOp::Session(op)] = plan.pipeline.as_slice() else {
622        return Err(ExecutorError::ImplementationDefined {
623            detail: "session-control plan must contain exactly one session op",
624        });
625    };
626    pipeline::session::execute(op, session, registry)
627}
628
629fn output_from_table(plan: &ExecutionPlan, table: BindingTable) -> StatementOutput {
630    if plan.output_schema.columns.is_empty() {
631        StatementOutput::Empty
632    } else {
633        StatementOutput::Rows(table)
634    }
635}
636
637fn write_output_from_commit(
638    plan: &ExecutionPlan,
639    table: BindingTable,
640    outcome: CommitOutcome,
641) -> StatementOutput {
642    let rows = if plan.output_schema.columns.is_empty() {
643        None
644    } else {
645        Some(table)
646    };
647    StatementOutput::Written(WriteOutcome::from_commit(outcome, rows))
648}
649
650fn record_statement_metrics(plan: &ExecutionPlan, started: Instant) {
651    let label = metrics::Label::new(metrics::STATEMENT_KIND_LABEL, statement_kind(plan));
652    metrics::counter_inc_with_label(metrics::QUERIES_TOTAL, label);
653    metrics::histogram_record_with_label(
654        metrics::QUERY_DURATION_SECONDS,
655        started.elapsed().as_secs_f64(),
656        label,
657    );
658}
659
660fn statement_kind(plan: &ExecutionPlan) -> &'static str {
661    if let Some(kind) = plan.pipeline.iter().find_map(pipeline_statement_kind) {
662        return kind;
663    }
664    match plan.category {
665        StatementCategory::ReadOnly => "query",
666        StatementCategory::DataModifying => "mutation",
667        StatementCategory::CatalogModifying => "catalog",
668        StatementCategory::Maintenance => "maintenance",
669        StatementCategory::TransactionControl => "transaction",
670        StatementCategory::SessionControl => "session",
671    }
672}
673
674fn pipeline_statement_kind(op: &PipelineOp) -> Option<&'static str> {
675    match op {
676        PipelineOp::Union { .. } | PipelineOp::Chain(_) | PipelineOp::CorrelatedChain(_) => {
677            Some("composite")
678        }
679        PipelineOp::Match(_) | PipelineOp::OptionalMatch(_) => Some("query"),
680        PipelineOp::Call(_) => Some("call"),
681        PipelineOp::CallSubquery(_) => Some("call_subquery"),
682        PipelineOp::Mutation(_) => Some("mutation"),
683        PipelineOp::Catalog(_) => Some("catalog"),
684        PipelineOp::ExplainPlan { .. } => Some("explain"),
685        PipelineOp::Tx(TxOp::Start { .. }) => Some("start_transaction"),
686        PipelineOp::Tx(TxOp::Commit { .. }) => Some("commit"),
687        PipelineOp::Tx(TxOp::Rollback { .. }) => Some("rollback"),
688        _ => None,
689    }
690}