kglite-bolt-server 0.10.1

Bolt v5.x protocol server for kglite knowledge graphs — pure-Rust single-binary frontend speaking the Neo4j wire protocol so the Neo4j driver ecosystem (Python/JS/Java/Go/.NET drivers, Cypher Shell, Neo4j Browser, BloodHound, LangChain Neo4jGraph) plugs in unchanged.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
//! `BoltBackend` implementation for kglite.
//!
//! Phase C.1 through C.6 ✅ shipped: handshake / session lifecycle /
//! scalar RUN+PULL / parameter decoding / Node-Rel-Path RETURN /
//! explicit transactions (BEGIN/COMMIT/ROLLBACK) + `--readonly`
//! enforcement / typed `KgError` → `Neo.{Class}.{Category}.{Title}`
//! FAILURE-code mapping (via `crate::error_map`) / `--auth basic`
//! credential validator (wired in `main.rs`) / `db.*` schema-
//! introspection procedure pass-through (works via the standard
//! Cypher CALL pipeline — Phase A.3 added the procs to kglite core).

use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;

use async_trait::async_trait;
use boltr::error::BoltError;
use boltr::server::{
    AuthInfo, BoltBackend, BoltRecord, ResultMetadata, ResultStream, RoutingTable, SessionConfig,
    SessionHandle, SessionProperty, TransactionHandle,
};
use boltr::types::{BoltDict, BoltValue};

use kglite::api::{cypher, DirGraph, Value};

use crate::error_map::kg_to_bolt;
use crate::value_adapter;

/// Bolt backend wrapping a loaded kglite graph.
///
/// One instance is constructed at server boot and shared across all
/// connections via `Arc` inside `BoltServer::serve`.
///
/// **State model** (Phase C.5 + robustness pass RA-1):
/// - `graph` holds the canonical shared `Arc<DirGraph>` behind a
///   `Mutex`. Auto-commit reads briefly lock, `Arc::clone` the inner,
///   release. Commits lock + replace the inner Arc.
/// - `transactions` holds per-transaction working state. The outer
///   `Mutex<HashMap<...>>` is acquired only to look up / insert /
///   remove the per-tx entry; the actual tx work happens inside the
///   inner `Arc<Mutex<TxState>>`. **Lock ordering**: always outer
///   first, never the reverse. Specifically: take outer, clone the
///   Arc to the inner mutex, release outer, take inner. The outer
///   mutex is never held across a Cypher pipeline call — one
///   session's slow query no longer blocks all other sessions' tx
///   operations.
///
/// **Concurrency**:
/// - Reads (auto-commit or tx-snapshot) are wait-free apart from the
///   momentary mutex acquire to clone the Arc<DirGraph>.
/// - Mutations inside an explicit transaction run against the tx's
///   working copy under the per-tx mutex — no contention with other
///   sessions until commit.
/// - Commit takes a brief mutex on `graph` to swap the inner Arc.
/// - **OCC version checking is deferred** — the `DirGraph::version`
///   field is `pub(crate)` and not exposed via `kglite::api`. The
///   Python `Transaction` class has it; bolt-server gets it when the
///   accessor is added. For Phase C.5 the test scenarios are
///   sequential so no conflict is possible; concurrent-writer
///   stress is the next pass's concern.
///
/// **`--readonly`**: rejects `begin_transaction` outright, and the
/// auto-commit mutation gate in `execute` is unchanged. A read-only
/// server is genuinely write-rejecting; there's no read-only-tx
/// surface today.
pub struct KgliteBackend {
    /// Canonical shared graph + transaction-commit machinery,
    /// extracted to `kglite::api::session` in Phase E. Sessions
    /// snapshot via `session.snapshot()`; commits go through
    /// `session.commit(tx, check_occ)` which handles the OCC
    /// version bump + Arc swap atomically.
    session: Arc<kglite::api::session::Session>,
    /// Server-wide `--readonly` flag. Rejects begin_transaction and
    /// auto-commit mutations.
    readonly: bool,
    /// Per-transaction state. Keyed by `TransactionHandle.0`. The
    /// outer mutex is brief-acquire-only (lookup/insert/remove); the
    /// per-tx work happens inside the inner mutex. See struct doc on
    /// lock ordering.
    transactions: Arc<Mutex<HashMap<String, Arc<Mutex<TxState>>>>>,
    /// Monotonic per-server session counter.
    session_counter: AtomicU64,
    /// Monotonic per-server transaction counter.
    tx_counter: AtomicU64,
    /// "host:port" string returned in `route()`'s `RoutingTable`
    /// so cluster-aware drivers (`neo4j://` URIs) know where to
    /// reconnect. Phase F #5. Typically matches the bind address
    /// but can differ when running behind a reverse proxy
    /// (`--advertise-addr` flag on `main.rs`).
    advertised_addr: String,
}

/// Per-Bolt-transaction state. Wraps the canonical
/// [`kglite::api::session::Transaction`] (snapshot/working CoW)
/// alongside the bolt-server's session-ownership tracking.
struct TxState {
    /// The canonical CoW transaction state. `None` after
    /// commit/rollback (we move the inner out for the
    /// `Session::commit` / `Session::rollback` calls).
    inner: Option<kglite::api::session::Transaction>,
    /// Bolt session that owns this tx — used by `close_session` to
    /// roll back any in-flight tx for a dropped connection.
    session_id: String,
}

impl KgliteBackend {
    /// Construct a backend. The DirGraph is wrapped in a
    /// `session::Session` for shared-graph + commit-swap semantics.
    /// `advertised_addr` (`host:port`, no scheme) is what `route()`
    /// returns to cluster-aware drivers using `neo4j://` URIs —
    /// they'll reconnect to this address for subsequent sessions,
    /// so it must be reachable from the client's network. Usually
    /// this matches the bind address but should differ when bound
    /// to `0.0.0.0` behind a hostname or reverse proxy.
    pub fn new(graph: DirGraph, readonly: bool, advertised_addr: String) -> Self {
        Self {
            session: Arc::new(kglite::api::session::Session::new(graph)),
            readonly,
            transactions: Arc::new(Mutex::new(HashMap::new())),
            session_counter: AtomicU64::new(0),
            tx_counter: AtomicU64::new(0),
            advertised_addr,
        }
    }
}

#[async_trait]
impl BoltBackend for KgliteBackend {
    // ---- Session lifecycle (Phase C.1 ✓) ---------------------------------

    async fn create_session(&self, config: &SessionConfig) -> Result<SessionHandle, BoltError> {
        let id = self.session_counter.fetch_add(1, Ordering::Relaxed);
        let handle = SessionHandle(format!("bolt-{id}"));
        tracing::debug!(
            session_id = %handle.0,
            user_agent = %config.user_agent,
            database = ?config.database,
            "create_session"
        );
        Ok(handle)
    }

    async fn set_session_auth(
        &self,
        session: &SessionHandle,
        auth_info: AuthInfo,
    ) -> Result<(), BoltError> {
        tracing::debug!(
            session_id = %session.0,
            principal = %auth_info.principal,
            "set_session_auth (no-op until C.6)"
        );
        Ok(())
    }

    async fn close_session(&self, session: &SessionHandle) -> Result<(), BoltError> {
        // Roll back any in-flight transactions for this session.
        // Brief outer-mutex hold: scan the HashMap for matching
        // session_id (requires taking the per-tx inner lock to read
        // it), collect the handles to remove, then release the outer.
        // We DO NOT hold the outer mutex across the inner-lock reads
        // — that would re-introduce the head-of-line blocking the
        // per-tx mutex split fixed.
        let to_drop: Vec<String> = {
            let txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
            txs.iter()
                .filter_map(|(handle, state_arc)| {
                    // Each per-tx mutex is brief-held to read session_id.
                    let state = state_arc.lock().unwrap_or_else(|p| p.into_inner());
                    (state.session_id == session.0).then(|| handle.clone())
                })
                .collect()
        };
        // Remove drops under the outer mutex.
        {
            let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
            for handle in &to_drop {
                txs.remove(handle);
                tracing::debug!(
                    session_id = %session.0,
                    tx = %handle,
                    "rolled back in-flight transaction on session close"
                );
            }
        }
        tracing::debug!(
            session_id = %session.0,
            rolled_back = to_drop.len(),
            "close_session"
        );
        Ok(())
    }

    async fn configure_session(
        &self,
        session: &SessionHandle,
        property: SessionProperty,
    ) -> Result<(), BoltError> {
        match property {
            SessionProperty::Database(db) => {
                tracing::debug!(
                    session_id = %session.0,
                    database = %db,
                    "configure_session: database property accepted but ignored (single-graph server)"
                );
            }
        }
        Ok(())
    }

    async fn reset_session(&self, session: &SessionHandle) -> Result<(), BoltError> {
        // RESET clears any in-flight transaction (same effect as
        // close_session, but the session itself stays alive).
        let to_drop: Vec<String> = {
            let txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
            txs.iter()
                .filter_map(|(handle, state_arc)| {
                    let state = state_arc.lock().unwrap_or_else(|p| p.into_inner());
                    (state.session_id == session.0).then(|| handle.clone())
                })
                .collect()
        };
        {
            let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
            for handle in &to_drop {
                txs.remove(handle);
            }
        }
        tracing::debug!(
            session_id = %session.0,
            rolled_back = to_drop.len(),
            "reset_session"
        );
        Ok(())
    }

    // ---- Query execution -------------------------------------------------

    async fn execute(
        &self,
        _session: &SessionHandle,
        query: &str,
        parameters: &HashMap<String, BoltValue>,
        _extra: &BoltDict,
        transaction: Option<&TransactionHandle>,
    ) -> Result<ResultStream, BoltError> {
        // Input gates (Phase robustness RB-2). These produce clear
        // Protocol/ClientError responses so users see actionable
        // errors instead of opaque parser failures or silent partial
        // execution.

        // Empty or whitespace-only query.
        let trimmed = query.trim();
        if trimmed.is_empty() {
            return Err(BoltError::Protocol(
                "empty Cypher query — RUN requires a non-empty statement".into(),
            ));
        }

        // Multi-statement query. The kglite parser handles one Cypher
        // statement per RUN; sending `MATCH ... ; MATCH ...` would
        // silently parse only the first statement. Reject explicitly.
        //
        // The semicolon detection is a string-level heuristic: it can
        // false-positive on a semicolon inside a string literal (rare
        // and arguably worth a clearer error too). The substring
        // approach matches how cypher-shell + most drivers signal
        // multi-statement separation.
        if _query_appears_multi_statement(trimmed) {
            return Err(BoltError::Protocol(
                "multi-statement queries not supported — send one Cypher \
                 statement per RUN message (or open a transaction and \
                 issue separate RUNs)"
                    .into(),
            ));
        }

        // Decode params (C.3). Errors here are genuine client errors
        // (bad parameter type) → Protocol → ClientError.
        let kg_params: HashMap<String, Value> = parameters
            .iter()
            .map(|(k, v)| value_adapter::from_bolt(v).map(|kv| (k.clone(), kv)))
            .collect::<Result<HashMap<_, _>, _>>()?;

        let elapsed_start = Instant::now();

        // Branch: tx execution holds the tx mutex for the whole
        // pipeline (parse/plan/execute against the same graph view).
        // Auto-commit takes a momentary snapshot of the backend.
        let (result, type_str) = if let Some(handle) = transaction.map(|t| t.0.clone()) {
            self.execute_in_tx(&handle, query, kg_params)?
        } else {
            self.execute_auto_commit(query, kg_params)?
        };

        let elapsed_ms = elapsed_start.elapsed().as_millis() as i64;

        let records: Vec<BoltRecord> = result
            .rows
            .iter()
            .map(|row| {
                row.iter()
                    .map(value_adapter::to_bolt)
                    .collect::<Result<Vec<_>, _>>()
                    .map(|values| BoltRecord { values })
            })
            .collect::<Result<Vec<_>, _>>()?;

        let mut summary = BoltDict::from([
            ("type".to_string(), BoltValue::String(type_str.to_string())),
            ("t_last".to_string(), BoltValue::Integer(elapsed_ms)),
        ]);
        if let Some(stats) = &result.stats {
            let stats_dict = BoltDict::from([
                (
                    "nodes-created".to_string(),
                    BoltValue::Integer(stats.nodes_created as i64),
                ),
                (
                    "nodes-deleted".to_string(),
                    BoltValue::Integer(stats.nodes_deleted as i64),
                ),
                (
                    "relationships-created".to_string(),
                    BoltValue::Integer(stats.relationships_created as i64),
                ),
                (
                    "relationships-deleted".to_string(),
                    BoltValue::Integer(stats.relationships_deleted as i64),
                ),
                (
                    "properties-set".to_string(),
                    BoltValue::Integer(stats.properties_set as i64),
                ),
            ]);
            summary.insert("stats".to_string(), BoltValue::Dict(stats_dict));
        }

        Ok(ResultStream {
            metadata: ResultMetadata {
                columns: result.columns,
                extra: BoltDict::new(),
            },
            records,
            summary,
        })
    }

    // ---- Transactions (Phase C.5 ✓) --------------------------------------

    async fn begin_transaction(
        &self,
        session: &SessionHandle,
        _extra: &BoltDict,
    ) -> Result<TransactionHandle, BoltError> {
        if self.readonly {
            return Err(BoltError::Forbidden(
                "server is read-only — explicit transactions rejected (--readonly flag)".into(),
            ));
        }
        let id = self.tx_counter.fetch_add(1, Ordering::Relaxed);
        let handle = TransactionHandle(format!("tx-{id}"));
        let state = TxState {
            inner: Some(self.session.begin()),
            session_id: session.0.clone(),
        };
        // Brief outer-mutex hold to insert. The Arc wrapping the
        // inner Mutex<TxState> is created here so concurrent
        // commit/rollback for OTHER txs don't block this insert.
        {
            let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
            txs.insert(handle.0.clone(), Arc::new(Mutex::new(state)));
        }
        tracing::debug!(
            session_id = %session.0,
            tx = %handle.0,
            "begin_transaction"
        );
        Ok(handle)
    }

    async fn commit(
        &self,
        session: &SessionHandle,
        transaction: &TransactionHandle,
    ) -> Result<BoltDict, BoltError> {
        // Brief outer-mutex hold: remove the per-tx entry from the
        // HashMap. We then check session ownership + extract working
        // under the per-tx mutex (which we own exclusively since we
        // just removed it). If ownership check fails, re-insert.
        let state_arc = {
            let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
            txs.remove(&transaction.0).ok_or_else(|| {
                BoltError::Transaction(format!(
                    "commit: unknown transaction handle: {}",
                    transaction.0
                ))
            })?
        };

        // Take the inner state. We hold the only Arc reference now (we
        // just removed the HashMap entry), so try_unwrap is free.
        let mut state = match Arc::try_unwrap(state_arc) {
            Ok(mutex) => mutex.into_inner().unwrap_or_else(|p| p.into_inner()),
            Err(arc) => {
                // Defensive: another holder. Drop the tx entirely
                // (we can't easily extract from a shared Arc).
                let guard = arc.lock().unwrap_or_else(|p| p.into_inner());
                TxState {
                    inner: None,
                    session_id: guard.session_id.clone(),
                }
            }
        };

        if state.session_id != session.0 {
            // Ownership mismatch — re-insert and error.
            let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
            txs.insert(transaction.0.clone(), Arc::new(Mutex::new(state)));
            return Err(BoltError::Transaction(format!(
                "commit: transaction {} doesn't belong to session {}",
                transaction.0, session.0
            )));
        }

        // Delegate to session::Session::commit which handles OCC +
        // Arc swap atomically. Phase E.4 wires OCC (was deferred in
        // C.5); concurrent writers now get
        // ConflictDetected → BoltError::Transaction.
        let Some(tx) = state.inner.take() else {
            // Defensive fallthrough — was already consumed.
            return Ok(BoltDict::new());
        };
        match self.session.commit(tx, /* check_occ = */ true) {
            kglite::api::session::CommitOutcome::NoWritesNoOp => {
                tracing::debug!(
                    session_id = %session.0,
                    tx = %transaction.0,
                    "commit (no-op; no mutations)"
                );
            }
            kglite::api::session::CommitOutcome::Committed { new_version } => {
                tracing::debug!(
                    session_id = %session.0,
                    tx = %transaction.0,
                    new_version,
                    "commit (with mutations)"
                );
            }
            kglite::api::session::CommitOutcome::ConflictDetected {
                current_version,
                base_version,
            } => {
                tracing::debug!(
                    session_id = %session.0,
                    tx = %transaction.0,
                    current_version,
                    base_version,
                    "commit conflict — another writer committed first"
                );
                return Err(BoltError::Transaction(format!(
                    "Transaction conflict: graph was modified by another committer \
                     since this transaction's BEGIN (base version {base_version}, \
                     current version {current_version}). Retry the transaction."
                )));
            }
        }

        Ok(BoltDict::new())
    }

    async fn rollback(
        &self,
        session: &SessionHandle,
        transaction: &TransactionHandle,
    ) -> Result<(), BoltError> {
        let state_arc = {
            let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
            txs.remove(&transaction.0).ok_or_else(|| {
                BoltError::Transaction(format!(
                    "rollback: unknown transaction handle: {}",
                    transaction.0
                ))
            })?
        };

        // Brief inner-mutex hold just to check ownership.
        let (session_id, had_mutations) = {
            let state = state_arc.lock().unwrap_or_else(|p| p.into_inner());
            (
                state.session_id.clone(),
                state.inner.as_ref().is_some_and(|t| t.has_writes()),
            )
        };

        if session_id != session.0 {
            // Re-insert; tx ownership mismatch.
            let mut txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
            txs.insert(transaction.0.clone(), state_arc);
            return Err(BoltError::Transaction(format!(
                "rollback: transaction {} doesn't belong to session {}",
                transaction.0, session.0
            )));
        }
        // Delegate to session::Session::rollback. With Arc::try_unwrap
        // we extract the inner tx; on shared-Arc fallback we drop
        // (which is a rollback anyway since the Transaction's Drop
        // releases the snapshot Arc).
        if let Ok(mutex) = Arc::try_unwrap(state_arc) {
            if let Ok(mut state) = mutex.into_inner() {
                if let Some(tx) = state.inner.take() {
                    self.session.rollback(tx);
                }
            }
        }
        tracing::debug!(
            session_id = %session.0,
            tx = %transaction.0,
            had_mutations = had_mutations,
            "rollback"
        );
        Ok(())
    }

    // ---- Server metadata (Phase C.1 ✓) -----------------------------------

    async fn get_server_info(&self) -> Result<BoltDict, BoltError> {
        let version = env!("CARGO_PKG_VERSION");
        let product = format!("kglite-bolt-server/{version}");
        let bolt_agent = BoltDict::from([
            ("product".to_string(), BoltValue::String(product.clone())),
            (
                "version".to_string(),
                BoltValue::String(version.to_string()),
            ),
        ]);
        let info = BoltDict::from([
            ("server".to_string(), BoltValue::String(product)),
            ("bolt_agent".to_string(), BoltValue::Dict(bolt_agent)),
        ]);
        Ok(info)
    }

    // ---- Routing (Phase F #5: single-server self-pointing table) ----------
    //
    // Cluster-aware drivers (`neo4j://` URIs, the default scheme
    // in Neo4j 5.x drivers) send a ROUTE message at connect time
    // expecting back a `RoutingTable` with WRITE/READ/ROUTE roles.
    // For a single-server kglite-bolt-server we return the same
    // advertised address under all three roles so the driver does
    // its remaining work against this same instance. `bolt://`
    // (direct) URIs bypass routing entirely; either scheme works.

    async fn route(
        &self,
        _routing_context: &BoltDict,
        _bookmarks: &[String],
        db: Option<&str>,
    ) -> Result<RoutingTable, BoltError> {
        // Default DB name aligns with Neo4j's: "neo4j" if none
        // was negotiated at HELLO. kglite is single-database so
        // the requested name is informational here.
        let db_name = db.unwrap_or("neo4j").to_string();
        // 300s TTL — the driver re-fetches the routing table on
        // expiry. Matches Neo4j's typical default.
        let ttl = 300;
        let single_server = boltr::server::RoutingServer {
            addresses: vec![self.advertised_addr.clone()],
            role: String::new(), // populated per-role below
        };
        let mut servers = Vec::with_capacity(3);
        for role in ["WRITE", "READ", "ROUTE"] {
            servers.push(boltr::server::RoutingServer {
                addresses: single_server.addresses.clone(),
                role: role.to_string(),
            });
        }
        Ok(RoutingTable {
            ttl,
            db: db_name,
            servers,
        })
    }
}

/// Heuristic: does this query string contain a statement separator
/// outside of any string literal? Used by the multi-statement gate
/// in `execute()` (RB-2). Returns true on `MATCH (a) RETURN a; MATCH
/// (b) RETURN b`. Does NOT false-positive on `RETURN 'a;b' AS s`.
///
/// The scan tracks the active quote (Cypher allows both `'` and `"`)
/// and treats backslash as an escape. It does not handle block
/// comments `/* ... */` — kglite's parser doesn't recognize those
/// either, so a semicolon inside a comment would already be a parse
/// error before reaching this function.
fn _query_appears_multi_statement(query: &str) -> bool {
    let mut in_quote: Option<char> = None;
    let mut chars = query.chars().peekable();
    while let Some(c) = chars.next() {
        match (c, in_quote) {
            ('\\', Some(_)) => {
                // Skip the next char (escape inside a string).
                let _ = chars.next();
            }
            ('\'', None) => in_quote = Some('\''),
            ('"', None) => in_quote = Some('"'),
            (c, Some(q)) if c == q => in_quote = None,
            (';', None) => {
                // Found a semicolon outside any string. If the rest
                // of the query is just whitespace, it's a trailing
                // semicolon — allow it (common driver convention).
                let rest: String = chars.collect();
                if !rest.trim().is_empty() {
                    return true;
                }
                return false;
            }
            _ => {}
        }
    }
    false
}

impl KgliteBackend {
    /// Build the canonical `ExecuteOptions` the bolt-server uses for
    /// every query. Eager rows (`lazy_eligible: false`) — bolt-server
    /// materializes every result into BoltRecords before handing
    /// back to boltr; we don't have a lazy materializer at this
    /// layer.
    fn execute_opts<'a>(
        &self,
        kg_params: &'a HashMap<String, Value>,
    ) -> kglite::api::session::ExecuteOptions<'a> {
        kglite::api::session::ExecuteOptions {
            params: kg_params,
            deadline: None,
            max_rows: None,
            lazy_eligible: false,
            disabled_passes: None,
            embedder: None, // bolt-server doesn't wire text_score; rejected at session level
        }
    }

    /// Auto-commit path: take a snapshot, delegate to
    /// `session::execute_read`, reject mutations. Mutations in
    /// auto-commit aren't supported (drivers always wrap writes in
    /// explicit transactions in practice).
    fn execute_auto_commit(
        &self,
        query: &str,
        kg_params: HashMap<String, Value>,
    ) -> Result<(cypher::CypherResult, &'static str), BoltError> {
        // Pre-parse to decide whether this is a mutation (so we can
        // reject auto-commit mutations with a Bolt-specific error
        // message before session::execute_read rejects with a
        // generic one). The parse is cached.
        let pre_parsed = cypher::parse_cypher(query).map_err(kg_to_bolt)?;
        if cypher::is_mutation_query(&pre_parsed) {
            if self.readonly {
                return Err(BoltError::Forbidden(
                    "server is read-only — mutations rejected (--readonly flag)".into(),
                ));
            }
            return Err(BoltError::Backend(
                "auto-commit mutations not supported by kglite-bolt-server — \
                 wrap CREATE/SET/DELETE in an explicit transaction \
                 (session.begin_transaction)"
                    .into(),
            ));
        }

        let snapshot = self.session.snapshot();
        let opts = self.execute_opts(&kg_params);
        let outcome =
            kglite::api::session::execute_read(&snapshot, query, &opts).map_err(kg_to_bolt)?;
        Ok((outcome.result, "r"))
    }

    /// Tx path: take outer mutex briefly to clone the per-tx Arc,
    /// release outer, then take the inner per-tx mutex for the
    /// actual pipeline + execute. Other sessions can operate on
    /// other transactions in parallel — the only contention is
    /// within a single tx (which is sequential by Bolt semantics).
    ///
    /// Delegates the snapshot/working CoW + pipeline orchestration
    /// to `kglite::api::session::{Transaction, execute_read,
    /// execute_mut}`.
    fn execute_in_tx(
        &self,
        handle: &str,
        query: &str,
        kg_params: HashMap<String, Value>,
    ) -> Result<(cypher::CypherResult, &'static str), BoltError> {
        // Step 1: Brief outer-mutex hold to look up the per-tx Arc.
        let state_arc: Arc<Mutex<TxState>> = {
            let txs = self.transactions.lock().unwrap_or_else(|p| p.into_inner());
            txs.get(handle)
                .ok_or_else(|| {
                    BoltError::Transaction(format!("unknown transaction handle: {handle}"))
                })
                .map(Arc::clone)?
        }; // outer mutex released here

        // Step 2: Take inner per-tx mutex for the entire pipeline.
        // Other sessions' tx operations are now unblocked.
        let mut state = state_arc.lock().unwrap_or_else(|p| p.into_inner());
        let tx_inner = state.inner.as_mut().ok_or_else(|| {
            BoltError::Transaction(format!("tx {handle} already committed or rolled back"))
        })?;

        // Pre-parse for read/mut routing.
        let pre_parsed = cypher::parse_cypher(query).map_err(kg_to_bolt)?;
        let is_mutation = cypher::is_mutation_query(&pre_parsed);

        if is_mutation && self.readonly {
            // Shouldn't happen — we reject begin_transaction under
            // --readonly — but defensive.
            return Err(BoltError::Forbidden(
                "server is read-only — mutations rejected (--readonly flag)".into(),
            ));
        }

        let opts = self.execute_opts(&kg_params);

        if is_mutation {
            // Materialize working on first mutation via session::Transaction.
            let working = tx_inner.working_mut().map_err(kg_to_bolt)?;
            let outcome =
                kglite::api::session::execute_mut(working, query, &opts).map_err(kg_to_bolt)?;
            Ok((outcome.result, "w"))
        } else {
            let graph = tx_inner.current().ok_or_else(|| {
                BoltError::Backend(format!(
                    "tx {handle} lost its graph view mid-read — bolt-server internal bug"
                ))
            })?;
            let outcome =
                kglite::api::session::execute_read(graph, query, &opts).map_err(kg_to_bolt)?;
            Ok((outcome.result, "r"))
        }
    }
}