1use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::time::{Duration, Instant};
13
14use tokio_util::sync::CancellationToken;
15use tracing::instrument;
16use uuid::Uuid;
17
18use crate::api::UniInner;
19use crate::api::hooks::{HookContext, QueryType, SessionHook};
20use crate::api::impl_locy::LocyRuleRegistry;
21use crate::api::locy_result::LocyResult;
22use crate::api::transaction::{IsolationLevel, Transaction};
23use uni_common::{Result, UniError, Value};
24use uni_query::{ExplainOutput, ProfileOutput, QueryCursor, QueryResult, Row};
25
26pub(crate) struct PlanCacheMetrics {
29 pub(crate) hits: AtomicU64,
30 pub(crate) misses: AtomicU64,
31}
32
33#[derive(Debug, Clone)]
38pub struct SessionCapabilities {
39 pub can_write: bool,
41 pub can_pin: bool,
43 pub isolation: IsolationLevel,
45 pub has_notifications: bool,
47 pub write_lease: Option<WriteLeaseSummary>,
49}
50
51#[derive(Debug, Clone)]
56pub enum WriteLeaseSummary {
57 Local,
59 DynamoDB { table: String },
61 Custom,
63}
64
65pub(crate) struct SessionMetricsInner {
67 pub(crate) queries_executed: AtomicU64,
68 pub(crate) locy_evaluations: AtomicU64,
69 pub(crate) total_query_time_us: AtomicU64,
70 pub(crate) transactions_committed: AtomicU64,
71 pub(crate) transactions_rolled_back: AtomicU64,
72 pub(crate) total_rows_returned: AtomicU64,
73 pub(crate) total_rows_scanned: AtomicU64,
74}
75
76impl SessionMetricsInner {
77 fn new() -> Self {
78 Self {
79 queries_executed: AtomicU64::new(0),
80 locy_evaluations: AtomicU64::new(0),
81 total_query_time_us: AtomicU64::new(0),
82 transactions_committed: AtomicU64::new(0),
83 transactions_rolled_back: AtomicU64::new(0),
84 total_rows_returned: AtomicU64::new(0),
85 total_rows_scanned: AtomicU64::new(0),
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct SessionMetrics {
93 pub session_id: String,
95 pub active_since: Instant,
97 pub queries_executed: u64,
99 pub locy_evaluations: u64,
101 pub total_query_time: Duration,
103 pub transactions_committed: u64,
105 pub transactions_rolled_back: u64,
107 pub total_rows_returned: u64,
109 pub total_rows_scanned: u64,
111 pub plan_cache_hits: u64,
113 pub plan_cache_misses: u64,
115 pub plan_cache_size: usize,
117}
118
119pub struct Session {
145 pub(crate) db: Arc<UniInner>,
146 original_db: Option<Arc<UniInner>>,
149 id: String,
150 params: Arc<std::sync::RwLock<HashMap<String, Value>>>,
151 rule_registry: Arc<std::sync::RwLock<LocyRuleRegistry>>,
152 active_write_guard: Arc<AtomicBool>,
155 pub(crate) metrics_inner: Arc<SessionMetricsInner>,
157 created_at: Instant,
159 cancellation_token: Arc<std::sync::RwLock<CancellationToken>>,
162 plan_cache: Arc<std::sync::Mutex<PlanCache>>,
164 plan_cache_metrics: Arc<PlanCacheMetrics>,
166 pub(crate) hooks: HashMap<String, Arc<dyn SessionHook>>,
168 pub(crate) query_timeout: Option<Duration>,
170 pub(crate) transaction_timeout: Option<Duration>,
172}
173
174impl Session {
175 pub(crate) fn new(db: Arc<UniInner>) -> Self {
177 let global_registry = db.locy_rule_registry.read().unwrap();
179 let session_registry = global_registry.clone();
180 drop(global_registry);
181
182 db.active_session_count.fetch_add(1, Ordering::Relaxed);
183
184 Self {
185 db,
186 original_db: None,
187 id: Uuid::new_v4().to_string(),
188 params: Arc::new(std::sync::RwLock::new(HashMap::new())),
189 rule_registry: Arc::new(std::sync::RwLock::new(session_registry)),
190 active_write_guard: Arc::new(AtomicBool::new(false)),
191 metrics_inner: Arc::new(SessionMetricsInner::new()),
192 created_at: Instant::now(),
193 cancellation_token: Arc::new(std::sync::RwLock::new(CancellationToken::new())),
194 plan_cache: Arc::new(std::sync::Mutex::new(PlanCache::new(1000))),
195 plan_cache_metrics: Arc::new(PlanCacheMetrics {
196 hits: AtomicU64::new(0),
197 misses: AtomicU64::new(0),
198 }),
199 hooks: HashMap::new(),
200 query_timeout: None,
201 transaction_timeout: None,
202 }
203 }
204
205 pub(crate) fn new_from_template(
207 db: Arc<UniInner>,
208 params: HashMap<String, Value>,
209 rule_registry: LocyRuleRegistry,
210 hooks: HashMap<String, Arc<dyn SessionHook>>,
211 query_timeout: Option<Duration>,
212 transaction_timeout: Option<Duration>,
213 ) -> Self {
214 db.active_session_count.fetch_add(1, Ordering::Relaxed);
215
216 Self {
217 db,
218 original_db: None,
219 id: Uuid::new_v4().to_string(),
220 params: Arc::new(std::sync::RwLock::new(params)),
221 rule_registry: Arc::new(std::sync::RwLock::new(rule_registry)),
222 active_write_guard: Arc::new(AtomicBool::new(false)),
223 metrics_inner: Arc::new(SessionMetricsInner::new()),
224 created_at: Instant::now(),
225 cancellation_token: Arc::new(std::sync::RwLock::new(CancellationToken::new())),
226 plan_cache: Arc::new(std::sync::Mutex::new(PlanCache::new(1000))),
227 plan_cache_metrics: Arc::new(PlanCacheMetrics {
228 hits: AtomicU64::new(0),
229 misses: AtomicU64::new(0),
230 }),
231 hooks,
232 query_timeout,
233 transaction_timeout,
234 }
235 }
236
237 pub fn params(&self) -> Params<'_> {
241 Params {
242 store: &self.params,
243 }
244 }
245
246 #[instrument(skip(self), fields(session_id = %self.id))]
254 pub async fn query(&self, cypher: &str) -> Result<QueryResult> {
255 let params = self.merge_params(HashMap::new());
256 self.run_before_query_hooks(cypher, QueryType::Cypher, ¶ms)?;
257 let start = Instant::now();
258 let result = self.execute_cached(cypher, params.clone()).await;
259 self.metrics_inner
260 .queries_executed
261 .fetch_add(1, Ordering::Relaxed);
262 self.db.total_queries.fetch_add(1, Ordering::Relaxed);
263 self.metrics_inner
264 .total_query_time_us
265 .fetch_add(start.elapsed().as_micros() as u64, Ordering::Relaxed);
266 if let Ok(ref qr) = result {
267 self.metrics_inner
268 .total_rows_returned
269 .fetch_add(qr.len() as u64, Ordering::Relaxed);
270 self.run_after_query_hooks(cypher, QueryType::Cypher, ¶ms, qr.metrics());
271 }
272 result
273 }
274
275 pub fn query_with(&self, cypher: &str) -> QueryBuilder<'_> {
277 QueryBuilder {
278 session: self,
279 cypher: cypher.to_string(),
280 params: HashMap::new(),
281 timeout: self.query_timeout,
282 max_memory: None,
283 cancellation_token: None,
284 }
285 }
286
287 #[instrument(skip(self), fields(session_id = %self.id))]
291 pub async fn locy(&self, program: &str) -> Result<LocyResult> {
292 self.run_before_query_hooks(program, QueryType::Locy, &HashMap::new())?;
293 let result = self.locy_with(program).run().await;
294 self.metrics_inner
295 .locy_evaluations
296 .fetch_add(1, Ordering::Relaxed);
297 result
298 }
299
300 pub fn locy_with(&self, program: &str) -> crate::api::locy_builder::LocyBuilder<'_> {
302 crate::api::locy_builder::LocyBuilder::new(self, program)
303 }
304
305 pub fn rules(&self) -> super::rule_registry::RuleRegistry<'_> {
309 super::rule_registry::RuleRegistry::new(&self.rule_registry)
310 }
311
312 #[instrument(skip(self), fields(session_id = %self.id))]
314 pub fn compile_locy(&self, program: &str) -> Result<uni_locy::CompiledProgram> {
315 let ast = uni_cypher::parse_locy(program).map_err(|e| UniError::Parse {
316 message: format!("LocyParseError: {e}"),
317 position: None,
318 line: None,
319 column: None,
320 context: None,
321 })?;
322 let registry = self.rule_registry.read().unwrap();
323 if registry.rules.is_empty() {
324 drop(registry);
325 uni_locy::compile(&ast).map_err(|e| UniError::Query {
326 message: format!("LocyCompileError: {e}"),
327 query: None,
328 })
329 } else {
330 let external_names: Vec<String> = registry.rules.keys().cloned().collect();
331 drop(registry);
332 uni_locy::compile_with_external_rules(&ast, &external_names).map_err(|e| {
333 UniError::Query {
334 message: format!("LocyCompileError: {e}"),
335 query: None,
336 }
337 })
338 }
339 }
340
341 #[instrument(skip(self), fields(session_id = %self.id))]
348 pub async fn tx(&self) -> Result<Transaction> {
349 if self.is_pinned() {
350 return Err(UniError::ReadOnly {
351 operation: "start_transaction".to_string(),
352 });
353 }
354 Transaction::new(self).await
355 }
356
357 pub fn tx_with(&self) -> TransactionBuilder<'_> {
359 TransactionBuilder {
360 session: self,
361 timeout: self.transaction_timeout,
362 isolation: IsolationLevel::default(),
363 }
364 }
365
366 #[instrument(skip(self), fields(session_id = %self.id))]
372 pub async fn pin_to_version(&mut self, snapshot_id: &str) -> Result<()> {
373 let pinned = self.live_db().at_snapshot(snapshot_id).await?;
374 if self.original_db.is_none() {
375 self.original_db = Some(self.db.clone());
376 }
377 self.db = Arc::new(pinned);
378 Ok(())
379 }
380
381 #[instrument(skip(self), fields(session_id = %self.id))]
386 pub async fn pin_to_timestamp(&mut self, ts: chrono::DateTime<chrono::Utc>) -> Result<()> {
387 let snapshot_id = self.live_db().resolve_time_travel_timestamp(ts).await?;
388 self.pin_to_version(&snapshot_id).await
389 }
390
391 pub async fn refresh(&mut self) -> Result<()> {
397 if let Some(original) = self.original_db.take() {
398 self.db = original;
399 }
400 Ok(())
401 }
402
403 pub fn is_pinned(&self) -> bool {
405 self.original_db.is_some()
406 }
407
408 fn live_db(&self) -> &Arc<UniInner> {
410 self.original_db.as_ref().unwrap_or(&self.db)
411 }
412
413 #[instrument(skip(self), fields(session_id = %self.id))]
421 pub fn cancel(&self) {
422 let mut token = self.cancellation_token.write().unwrap();
423 token.cancel();
424 *token = CancellationToken::new();
425 }
426
427 pub fn cancellation_token(&self) -> CancellationToken {
431 self.cancellation_token.read().unwrap().clone()
432 }
433
434 #[instrument(skip(self), fields(session_id = %self.id))]
441 pub async fn prepare(&self, cypher: &str) -> Result<crate::api::prepared::PreparedQuery> {
442 crate::api::prepared::PreparedQuery::new(self.db.clone(), cypher).await
443 }
444
445 #[instrument(skip(self), fields(session_id = %self.id))]
447 pub async fn prepare_locy(&self, program: &str) -> Result<crate::api::prepared::PreparedLocy> {
448 crate::api::prepared::PreparedLocy::new(
449 self.db.clone(),
450 self.rule_registry.clone(),
451 program,
452 )
453 }
454
455 pub fn add_hook(&mut self, name: impl Into<String>, hook: impl SessionHook + 'static) {
459 self.hooks.insert(name.into(), Arc::new(hook));
460 }
461
462 pub fn remove_hook(&mut self, name: &str) -> bool {
464 self.hooks.remove(name).is_some()
465 }
466
467 pub fn list_hooks(&self) -> Vec<String> {
469 self.hooks.keys().cloned().collect()
470 }
471
472 pub fn clear_hooks(&mut self) {
474 self.hooks.clear();
475 }
476
477 pub(crate) fn run_before_query_hooks(
479 &self,
480 query_text: &str,
481 query_type: QueryType,
482 params: &HashMap<String, Value>,
483 ) -> Result<()> {
484 if self.hooks.is_empty() {
485 return Ok(());
486 }
487 let ctx = HookContext {
488 session_id: self.id.clone(),
489 query_text: query_text.to_string(),
490 query_type,
491 params: params.clone(),
492 };
493 for hook in self.hooks.values() {
494 hook.before_query(&ctx)?;
495 }
496 Ok(())
497 }
498
499 pub(crate) fn run_after_query_hooks(
501 &self,
502 query_text: &str,
503 query_type: QueryType,
504 params: &HashMap<String, Value>,
505 metrics: &uni_query::QueryMetrics,
506 ) {
507 if self.hooks.is_empty() {
508 return;
509 }
510 let ctx = HookContext {
511 session_id: self.id.clone(),
512 query_text: query_text.to_string(),
513 query_type,
514 params: params.clone(),
515 };
516 for hook in self.hooks.values() {
517 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
518 hook.after_query(&ctx, metrics);
519 }));
520 if let Err(e) = result {
521 tracing::error!("after_query hook panicked: {:?}", e);
522 }
523 }
524 }
525
526 pub fn watch(&self) -> crate::api::notifications::CommitStream {
530 let rx = self.db.commit_tx.subscribe();
531 crate::api::notifications::WatchBuilder::new(rx).build()
532 }
533
534 pub fn watch_with(&self) -> crate::api::notifications::WatchBuilder {
536 let rx = self.db.commit_tx.subscribe();
537 crate::api::notifications::WatchBuilder::new(rx)
538 }
539
540 pub fn id(&self) -> &str {
544 &self.id
545 }
546
547 pub fn capabilities(&self) -> SessionCapabilities {
551 use crate::api::multi_agent::WriteLease;
552 let write_lease = self.db.write_lease.as_ref().map(|wl| match wl {
553 WriteLease::Local => WriteLeaseSummary::Local,
554 WriteLease::DynamoDB { table } => WriteLeaseSummary::DynamoDB {
555 table: table.clone(),
556 },
557 WriteLease::Custom(_) => WriteLeaseSummary::Custom,
558 });
559 SessionCapabilities {
560 can_write: self.db.writer.is_some() && !self.is_pinned(),
561 can_pin: true,
562 isolation: IsolationLevel::default(),
563 has_notifications: true,
564 write_lease,
565 }
566 }
567
568 pub fn metrics(&self) -> SessionMetrics {
570 let m = &self.metrics_inner;
571 SessionMetrics {
572 session_id: self.id.clone(),
573 active_since: self.created_at,
574 queries_executed: m.queries_executed.load(Ordering::Relaxed),
575 locy_evaluations: m.locy_evaluations.load(Ordering::Relaxed),
576 total_query_time: Duration::from_micros(m.total_query_time_us.load(Ordering::Relaxed)),
577 transactions_committed: m.transactions_committed.load(Ordering::Relaxed),
578 transactions_rolled_back: m.transactions_rolled_back.load(Ordering::Relaxed),
579 total_rows_returned: m.total_rows_returned.load(Ordering::Relaxed),
580 total_rows_scanned: m.total_rows_scanned.load(Ordering::Relaxed),
581 plan_cache_hits: self.plan_cache_metrics.hits.load(Ordering::Relaxed),
582 plan_cache_misses: self.plan_cache_metrics.misses.load(Ordering::Relaxed),
583 plan_cache_size: self.plan_cache.lock().map(|c| c.len()).unwrap_or(0),
584 }
585 }
586
587 pub(crate) async fn execute_cached(
596 &self,
597 cypher: &str,
598 params: HashMap<String, Value>,
599 ) -> Result<QueryResult> {
600 let schema_version = self.db.schema.schema().schema_version;
601 let cache_key = plan_cache_key(cypher);
602
603 let cached = self.plan_cache.lock().ok().and_then(|mut cache| {
605 cache
606 .get(cache_key, schema_version)
607 .map(|entry| (entry.ast.clone(), entry.plan.clone()))
608 });
609
610 if let Some((_ast, plan)) = cached {
611 self.plan_cache_metrics.hits.fetch_add(1, Ordering::Relaxed);
613 return self
614 .db
615 .execute_plan_internal(plan, cypher, params, self.db.config.clone(), None)
616 .await;
617 }
618
619 self.plan_cache_metrics
621 .misses
622 .fetch_add(1, Ordering::Relaxed);
623
624 let ast = uni_cypher::parse(cypher).map_err(crate::api::impl_query::into_parse_error)?;
626
627 uni_query::validate_read_only(&ast).map_err(|_| UniError::Query {
630 message: "Session.query() is read-only. Mutation clauses (CREATE, MERGE, DELETE, SET, \
631 REMOVE) require a transaction. Use session.tx() to start one."
632 .to_string(),
633 query: Some(cypher.to_string()),
634 })?;
635
636 if matches!(ast, uni_cypher::ast::Query::TimeTravel { .. }) {
638 return self
639 .db
640 .execute_internal_with_config(cypher, params, self.db.config.clone())
641 .await;
642 }
643
644 let planner = uni_query::QueryPlanner::new(self.db.schema.schema().clone())
646 .with_params(params.clone());
647 let plan = planner
648 .plan(ast.clone())
649 .map_err(|e| crate::api::impl_query::into_query_error(e, cypher))?;
650
651 if let Ok(mut cache) = self.plan_cache.lock() {
653 cache.insert(
654 cache_key,
655 PlanCacheEntry {
656 ast,
657 plan: plan.clone(),
658 schema_version,
659 hit_count: 0,
660 },
661 );
662 }
663
664 self.db
666 .execute_plan_internal(plan, cypher, params, self.db.config.clone(), None)
667 .await
668 }
669
670 pub(crate) fn db(&self) -> &Arc<UniInner> {
672 &self.db
673 }
674
675 pub(crate) fn rule_registry(&self) -> &Arc<std::sync::RwLock<LocyRuleRegistry>> {
677 &self.rule_registry
678 }
679
680 pub(crate) fn active_write_guard(&self) -> &Arc<AtomicBool> {
682 &self.active_write_guard
683 }
684
685 pub(crate) fn merge_params(
687 &self,
688 mut query_params: HashMap<String, Value>,
689 ) -> HashMap<String, Value> {
690 let session_params = self.params.read().unwrap();
691 if !session_params.is_empty() {
692 let session_map: HashMap<String, Value> = session_params.clone();
693 if let Some(Value::Map(existing)) = query_params.get_mut("session") {
694 for (k, v) in session_map {
695 existing.entry(k).or_insert(v);
696 }
697 } else {
698 query_params.insert("session".to_string(), Value::Map(session_map));
699 }
700 }
701 query_params
702 }
703}
704
705pub struct Params<'a> {
709 store: &'a Arc<std::sync::RwLock<HashMap<String, Value>>>,
710}
711
712impl<'a> Params<'a> {
713 pub fn set<K: Into<String>, V: Into<Value>>(&self, key: K, value: V) {
715 self.store.write().unwrap().insert(key.into(), value.into());
716 }
717
718 pub fn get(&self, key: &str) -> Option<Value> {
720 self.store.read().unwrap().get(key).cloned()
721 }
722
723 pub fn unset(&self, key: &str) -> Option<Value> {
725 self.store.write().unwrap().remove(key)
726 }
727
728 pub fn get_all(&self) -> HashMap<String, Value> {
730 self.store.read().unwrap().clone()
731 }
732
733 pub fn set_all<I, K, V>(&self, params: I)
735 where
736 I: IntoIterator<Item = (K, V)>,
737 K: Into<String>,
738 V: Into<Value>,
739 {
740 let mut store = self.store.write().unwrap();
741 for (k, v) in params {
742 store.insert(k.into(), v.into());
743 }
744 }
745
746 pub fn clone_store_arc(&self) -> Arc<std::sync::RwLock<HashMap<String, Value>>> {
748 self.store.clone()
749 }
750}
751
752pub struct QueryBuilder<'a> {
754 session: &'a Session,
755 cypher: String,
756 params: HashMap<String, Value>,
757 timeout: Option<std::time::Duration>,
758 max_memory: Option<usize>,
759 cancellation_token: Option<CancellationToken>,
760}
761
762impl<'a> QueryBuilder<'a> {
763 pub fn param<K: Into<String>, V: Into<Value>>(mut self, key: K, value: V) -> Self {
765 self.params.insert(key.into(), value.into());
766 self
767 }
768
769 pub fn params<'p>(mut self, params: impl IntoIterator<Item = (&'p str, Value)>) -> Self {
771 for (k, v) in params {
772 self.params.insert(k.to_string(), v);
773 }
774 self
775 }
776
777 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
779 self.timeout = Some(duration);
780 self
781 }
782
783 pub fn max_memory(mut self, bytes: usize) -> Self {
785 self.max_memory = Some(bytes);
786 self
787 }
788
789 pub fn cancellation_token(mut self, token: CancellationToken) -> Self {
791 self.cancellation_token = Some(token);
792 self
793 }
794
795 pub async fn fetch_all(self) -> Result<QueryResult> {
800 let has_overrides = self.timeout.is_some()
801 || self.max_memory.is_some()
802 || self.cancellation_token.is_some();
803 if has_overrides {
804 let ast = uni_cypher::parse(&self.cypher)
807 .map_err(crate::api::impl_query::into_parse_error)?;
808 uni_query::validate_read_only(&ast).map_err(|_| UniError::Query {
809 message: "Session.query() is read-only. Mutation clauses (CREATE, MERGE, DELETE, \
810 SET, REMOVE) require a transaction. Use session.tx() to start one."
811 .to_string(),
812 query: Some(self.cypher.clone()),
813 })?;
814
815 let mut db_config = self.session.db.config.clone();
817 if let Some(t) = self.timeout {
818 db_config.query_timeout = t;
819 }
820 if let Some(m) = self.max_memory {
821 db_config.max_query_memory = m;
822 }
823 let params = self.session.merge_params(self.params);
824 self.session
825 .db
826 .execute_internal_with_config_and_token(
827 &self.cypher,
828 params,
829 db_config,
830 self.cancellation_token,
831 )
832 .await
833 } else {
834 let params = self.session.merge_params(self.params);
836 self.session.execute_cached(&self.cypher, params).await
837 }
838 }
839
840 pub async fn fetch_one(self) -> Result<Option<Row>> {
842 let result = self.fetch_all().await?;
843 Ok(result.into_rows().into_iter().next())
844 }
845
846 pub async fn cursor(self) -> Result<QueryCursor> {
848 let mut db_config = self.session.db.config.clone();
849 if let Some(t) = self.timeout {
850 db_config.query_timeout = t;
851 }
852 if let Some(m) = self.max_memory {
853 db_config.max_query_memory = m;
854 }
855 let params = self.session.merge_params(self.params);
856 self.session
857 .db
858 .execute_cursor_internal_with_config(&self.cypher, params, db_config)
859 .await
860 }
861
862 pub async fn explain(self) -> Result<ExplainOutput> {
864 self.session.db.explain_internal(&self.cypher).await
865 }
866
867 pub async fn profile(self) -> Result<(QueryResult, ProfileOutput)> {
869 let params = self.session.merge_params(self.params);
870 self.session.db.profile_internal(&self.cypher, params).await
871 }
872}
873
874pub struct TransactionBuilder<'a> {
876 session: &'a Session,
877 timeout: Option<Duration>,
878 isolation: IsolationLevel,
879}
880
881impl<'a> TransactionBuilder<'a> {
882 pub fn timeout(mut self, d: Duration) -> Self {
885 self.timeout = Some(d);
886 self
887 }
888
889 pub fn isolation(mut self, level: IsolationLevel) -> Self {
891 self.isolation = level;
892 self
893 }
894
895 pub async fn start(self) -> Result<Transaction> {
897 if self.session.is_pinned() {
898 return Err(UniError::ReadOnly {
899 operation: "start_transaction".to_string(),
900 });
901 }
902 Transaction::new_with_options(self.session, self.timeout, self.isolation).await
903 }
904}
905
906impl Clone for Session {
907 fn clone(&self) -> Self {
913 self.db.active_session_count.fetch_add(1, Ordering::Relaxed);
914 Self {
915 db: self.db.clone(),
916 original_db: self.original_db.clone(),
917 id: Uuid::new_v4().to_string(),
918 params: Arc::new(std::sync::RwLock::new(self.params.read().unwrap().clone())),
919 rule_registry: Arc::new(std::sync::RwLock::new(
920 self.rule_registry.read().unwrap().clone(),
921 )),
922 active_write_guard: Arc::new(AtomicBool::new(false)),
923 metrics_inner: Arc::new(SessionMetricsInner::new()),
924 created_at: Instant::now(),
925 cancellation_token: Arc::new(std::sync::RwLock::new(CancellationToken::new())),
926 plan_cache: self.plan_cache.clone(),
927 plan_cache_metrics: self.plan_cache_metrics.clone(),
928 hooks: self.hooks.clone(),
929 query_timeout: self.query_timeout,
930 transaction_timeout: self.transaction_timeout,
931 }
932 }
933}
934
935impl Drop for Session {
936 fn drop(&mut self) {
937 self.db.active_session_count.fetch_sub(1, Ordering::Relaxed);
938 }
939}
940
941struct PlanCacheEntry {
945 ast: uni_query::CypherQuery,
946 plan: uni_query::LogicalPlan,
947 schema_version: u32,
948 hit_count: u64,
949}
950
951struct PlanCache {
956 entries: HashMap<u64, PlanCacheEntry>,
957 max_entries: usize,
958}
959
960impl PlanCache {
961 fn new(max_entries: usize) -> Self {
962 Self {
963 entries: HashMap::new(),
964 max_entries,
965 }
966 }
967
968 fn get(&mut self, key: u64, current_schema_version: u32) -> Option<&PlanCacheEntry> {
969 if let Some(entry) = self.entries.get_mut(&key) {
970 if entry.schema_version == current_schema_version {
971 entry.hit_count += 1;
972 return self.entries.get(&key);
973 }
974 self.entries.remove(&key);
976 }
977 None
978 }
979
980 fn insert(&mut self, key: u64, entry: PlanCacheEntry) {
981 if self.entries.len() >= self.max_entries {
982 if let Some((&evict_key, _)) = self.entries.iter().min_by_key(|(_, e)| e.hit_count) {
984 self.entries.remove(&evict_key);
985 }
986 }
987 self.entries.insert(key, entry);
988 }
989
990 fn len(&self) -> usize {
992 self.entries.len()
993 }
994}
995
996fn plan_cache_key(cypher: &str) -> u64 {
998 use std::hash::{Hash, Hasher};
999 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1000 cypher.hash(&mut hasher);
1001 hasher.finish()
1002}