1use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::atomic::Ordering;
12use std::time::{Duration, Instant};
13
14use metrics;
15use tokio_util::sync::CancellationToken;
16use tracing::{info, instrument, warn};
17use uuid::Uuid;
18
19use crate::api::UniInner;
20use crate::api::impl_locy::{self, LocyRuleRegistry};
21use crate::api::session::Session;
22use uni_common::{Result, UniError};
23use uni_locy::DerivedFactSet;
24
25use crate::api::locy_result::LocyResult;
26use uni_query::{ExecuteResult, QueryCursor, QueryResult, Row, Value};
27
28struct L0Snapshot {
30 mutation_count: usize,
31 mutation_stats: uni_store::runtime::l0::MutationStats,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
40#[non_exhaustive]
41pub enum IsolationLevel {
42 #[default]
44 Serialized,
45}
46
47impl std::fmt::Display for IsolationLevel {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 IsolationLevel::Serialized => write!(f, "Serialized"),
51 }
52 }
53}
54
55#[derive(Debug)]
57pub struct CommitResult {
58 pub mutations_committed: usize,
60 pub rules_promoted: usize,
62 pub version: u64,
64 pub started_at_version: u64,
66 pub wal_lsn: u64,
68 pub duration: Duration,
70 pub rule_promotion_errors: Vec<RulePromotionError>,
72}
73
74impl CommitResult {
75 pub fn version_gap(&self) -> u64 {
78 self.version.saturating_sub(self.started_at_version + 1)
79 }
80}
81
82#[derive(Debug, Clone)]
84pub struct RulePromotionError {
85 pub rule_text: String,
86 pub error: String,
87}
88
89pub struct Transaction {
106 pub(crate) db: Arc<UniInner>,
107 pub(crate) tx_l0: Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
109 session_write_guard: Arc<std::sync::atomic::AtomicBool>,
111 session_rule_registry: Arc<std::sync::RwLock<LocyRuleRegistry>>,
113 rule_registry: Arc<std::sync::RwLock<LocyRuleRegistry>>,
115 session_metrics: Arc<crate::api::session::SessionMetricsInner>,
117 completed: bool,
118 id: String,
119 session_id: String,
121 start_time: Instant,
122 started_at_version: u64,
123 deadline: Option<Instant>,
125 cancellation_token: CancellationToken,
127 hooks: Vec<Arc<dyn crate::api::hooks::SessionHook>>, }
130
131impl Transaction {
132 pub(crate) async fn new(session: &Session) -> Result<Self> {
133 Self::new_with_options(session, None, IsolationLevel::default()).await
134 }
135
136 pub(crate) async fn new_with_options(
137 session: &Session,
138 timeout: Option<Duration>,
139 _isolation: IsolationLevel,
140 ) -> Result<Self> {
141 if session
143 .active_write_guard()
144 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
145 .is_err()
146 {
147 return Err(UniError::WriteContextAlreadyActive {
148 session_id: session.id().to_string(),
149 hint: "Only one Transaction, BulkWriter, or Appender can be active per Session at a time. Commit or rollback the active one first, or create a separate Session for concurrent writes.",
150 });
151 }
152
153 let write_guard_cleanup = scopeguard::guard(session.active_write_guard().clone(), |g| {
159 g.store(false, Ordering::SeqCst);
160 });
161
162 let db = session.db().clone();
163 let writer_lock = db.writer.clone().ok_or_else(|| {
164 UniError::ReadOnly {
166 operation: "start_transaction".to_string(),
167 }
168 })?;
169
170 let (started_at_version, tx_l0) = {
174 let writer = writer_lock.read().await;
175 let l0 = writer.create_transaction_l0();
176 let version = l0.read().current_version;
177 (version, l0)
178 };
179
180 let id = Uuid::new_v4().to_string();
181 info!(transaction_id = %id, "Transaction started");
182
183 let session_registry = session.rule_registry().read().unwrap().clone();
185
186 let deadline = timeout.map(|d| Instant::now() + d);
187 let cancellation_token = session.cancellation_token().child_token();
189
190 let tx = Self {
191 db,
192 tx_l0,
193 session_write_guard: session.active_write_guard().clone(),
194 session_rule_registry: session.rule_registry().clone(),
195 rule_registry: Arc::new(std::sync::RwLock::new(session_registry)),
196 session_metrics: session.metrics_inner.clone(),
197 completed: false,
198 id,
199 session_id: session.id().to_string(),
200 start_time: Instant::now(),
201 started_at_version,
202 deadline,
203 cancellation_token,
204 hooks: session.hooks.values().cloned().collect(),
205 };
206
207 std::mem::forget(write_guard_cleanup);
210
211 Ok(tx)
212 }
213
214 #[instrument(skip(self), fields(transaction_id = %self.id))]
219 pub async fn query(&self, cypher: &str) -> Result<QueryResult> {
220 self.check_completed()?;
221 self.db
222 .execute_internal_with_tx_l0(cypher, HashMap::new(), self.tx_l0.clone())
223 .await
224 }
225
226 pub fn query_with(&self, cypher: &str) -> TxQueryBuilder<'_> {
228 TxQueryBuilder {
229 tx: self,
230 cypher: cypher.to_string(),
231 params: HashMap::new(),
232 cancellation_token: None,
233 timeout: None,
234 }
235 }
236
237 #[instrument(skip(self), fields(transaction_id = %self.id))]
242 pub async fn execute(&self, cypher: &str) -> Result<ExecuteResult> {
243 self.check_completed()?;
244 let before = self.snapshot_l0();
245 let result = self.query(cypher).await?;
246 let after = self.snapshot_l0();
247 Ok(Self::compute_execute_result(&before, &after, &result))
248 }
249
250 pub fn execute_with(&self, cypher: &str) -> ExecuteBuilder<'_> {
255 ExecuteBuilder {
256 tx: self,
257 cypher: cypher.to_string(),
258 params: HashMap::new(),
259 timeout: None,
260 }
261 }
262
263 #[instrument(skip(self, derived), fields(transaction_id = %self.id))]
271 pub async fn apply(&self, derived: DerivedFactSet) -> Result<ApplyResult> {
272 self.apply_internal(derived, false, None).await
273 }
274
275 pub fn apply_with(&self, derived: DerivedFactSet) -> ApplyBuilder<'_> {
277 ApplyBuilder {
278 tx: self,
279 derived,
280 require_fresh: false,
281 max_version_gap: None,
282 }
283 }
284
285 async fn apply_internal(
286 &self,
287 derived: DerivedFactSet,
288 require_fresh: bool,
289 max_gap: Option<u64>,
290 ) -> Result<ApplyResult> {
291 self.check_completed()?;
292 let current_version = self.tx_l0.read().current_version;
293 let version_gap = current_version.saturating_sub(derived.evaluated_at_version);
294
295 if require_fresh && version_gap > 0 {
296 return Err(UniError::StaleDerivedFacts { version_gap });
297 }
298 if let Some(max) = max_gap
299 && version_gap > max
300 {
301 return Err(UniError::StaleDerivedFacts { version_gap });
302 }
303 if version_gap > 0 {
304 info!(
305 transaction_id = %self.id,
306 version_gap,
307 "Applying DerivedFactSet with version gap"
308 );
309 }
310
311 let mut facts_applied = 0;
312 for query in derived.mutation_queries {
313 self.db
314 .execute_ast_internal_with_tx_l0(
315 query,
316 "<locy-apply>",
317 HashMap::new(),
318 self.db.config.clone(),
319 self.tx_l0.clone(),
320 )
321 .await?;
322 facts_applied += 1;
323 }
324
325 Ok(ApplyResult {
326 facts_applied,
327 version_gap,
328 })
329 }
330
331 #[instrument(skip(self, properties_list), fields(transaction_id = %self.id))]
338 pub async fn bulk_insert_vertices(
339 &self,
340 label: &str,
341 properties_list: Vec<uni_common::Properties>,
342 ) -> Result<Vec<uni_common::core::id::Vid>> {
343 self.check_completed()?;
344 let schema = self.db.schema.schema();
345 schema
346 .labels
347 .get(label)
348 .ok_or_else(|| UniError::LabelNotFound {
349 label: label.to_string(),
350 })?;
351 let writer_lock = self.db.writer.as_ref().ok_or_else(|| UniError::ReadOnly {
352 operation: "bulk_insert_vertices".to_string(),
353 })?;
354 let mut writer = writer_lock.write().await;
355 if properties_list.is_empty() {
356 return Ok(Vec::new());
357 }
358 let vids = writer
359 .allocate_vids(properties_list.len())
360 .await
361 .map_err(UniError::Internal)?;
362 let result = writer
364 .insert_vertices_batch(
365 vids.clone(),
366 properties_list,
367 vec![label.to_string()],
368 Some(&self.tx_l0),
369 )
370 .await
371 .map_err(UniError::Internal);
372 result?;
373 Ok(vids)
374 }
375
376 #[instrument(skip(self, edges), fields(transaction_id = %self.id))]
381 pub async fn bulk_insert_edges(
382 &self,
383 edge_type: &str,
384 edges: Vec<(
385 uni_common::core::id::Vid,
386 uni_common::core::id::Vid,
387 uni_common::Properties,
388 )>,
389 ) -> Result<()> {
390 self.check_completed()?;
391 let schema = self.db.schema.schema();
392 let edge_meta =
393 schema
394 .edge_types
395 .get(edge_type)
396 .ok_or_else(|| UniError::EdgeTypeNotFound {
397 edge_type: edge_type.to_string(),
398 })?;
399 let type_id = edge_meta.id;
400 let writer_lock = self.db.writer.as_ref().ok_or_else(|| UniError::ReadOnly {
401 operation: "bulk_insert_edges".to_string(),
402 })?;
403 let mut writer = writer_lock.write().await;
404 let result: Result<()> = async {
406 for (src_vid, dst_vid, props) in edges {
407 let eid = writer.next_eid(type_id).await.map_err(UniError::Internal)?;
408 writer
409 .insert_edge(
410 src_vid,
411 dst_vid,
412 type_id,
413 eid,
414 props,
415 Some(edge_type.to_string()),
416 Some(&self.tx_l0),
417 )
418 .await
419 .map_err(UniError::Internal)?;
420 }
421 Ok(())
422 }
423 .await;
424 result
425 }
426
427 pub fn bulk_writer(&self) -> crate::api::bulk::BulkWriterBuilder {
435 crate::api::bulk::BulkWriterBuilder::new_unguarded(self.db.clone())
436 }
437
438 pub fn appender(&self, label: &str) -> crate::api::appender::AppenderBuilder {
443 crate::api::appender::AppenderBuilder::new_from_tx(self.db.clone(), label)
444 }
445
446 #[instrument(skip(self), fields(transaction_id = %self.id))]
452 pub async fn locy(&self, program: &str) -> Result<LocyResult> {
453 self.check_completed()?;
454 let engine = impl_locy::LocyEngine {
457 db: &self.db,
458 tx_l0_override: Some(self.tx_l0.clone()),
459 locy_l0: Some(self.tx_l0.clone()),
460 collect_derive: false,
461 };
462 engine.evaluate(program).await
463 }
464
465 pub fn locy_with(&self, program: &str) -> crate::api::locy_builder::TxLocyBuilder<'_> {
467 crate::api::locy_builder::TxLocyBuilder::new(self, program)
468 }
469
470 #[instrument(skip(self), fields(transaction_id = %self.id))]
477 pub async fn prepare(&self, cypher: &str) -> Result<crate::api::prepared::PreparedQuery> {
478 self.check_completed()?;
479 crate::api::prepared::PreparedQuery::new(self.db.clone(), cypher).await
480 }
481
482 #[instrument(skip(self), fields(transaction_id = %self.id))]
484 pub async fn prepare_locy(&self, program: &str) -> Result<crate::api::prepared::PreparedLocy> {
485 self.check_completed()?;
486 crate::api::prepared::PreparedLocy::new(
487 self.db.clone(),
488 self.rule_registry.clone(),
489 program,
490 )
491 }
492
493 pub fn rules(&self) -> super::rule_registry::RuleRegistry<'_> {
498 super::rule_registry::RuleRegistry::new(&self.rule_registry)
499 }
500
501 #[instrument(skip(self), fields(transaction_id = %self.id, duration_ms), level = "info")]
508 pub async fn commit(mut self) -> Result<CommitResult> {
509 self.check_completed()?;
510
511 let writer_lock = self.db.writer.as_ref().ok_or_else(|| UniError::ReadOnly {
512 operation: "commit".to_string(),
513 })?;
514
515 let mutations = self.tx_l0.read().mutation_count;
517
518 if !self.hooks.is_empty() {
520 let ctx = crate::api::hooks::CommitHookContext {
521 session_id: self.session_id.clone(),
522 tx_id: self.id.clone(),
523 mutation_count: mutations,
524 };
525 for hook in &self.hooks {
526 hook.before_commit(&ctx)?;
527 }
528 }
529
530 let (labels_affected, edge_types_affected) = {
532 let l0 = self.tx_l0.read();
533 let labels: Vec<String> = l0
534 .vertex_labels
535 .values()
536 .flatten()
537 .cloned()
538 .collect::<std::collections::HashSet<_>>()
539 .into_iter()
540 .collect();
541 let edge_types: Vec<String> = l0
542 .edge_types
543 .values()
544 .cloned()
545 .collect::<std::collections::HashSet<_>>()
546 .into_iter()
547 .collect();
548 (labels, edge_types)
549 };
550
551 let mut writer = tokio::time::timeout(
553 std::time::Duration::from_secs(5),
554 writer_lock.write(),
555 )
556 .await
557 .map_err(|_| UniError::CommitTimeout {
558 tx_id: self.id.clone(),
559 hint: "Another commit is in progress and taking longer than expected. Your transaction is still active \u{2014} you can retry commit().",
560 })?;
561 let wal_lsn = writer.commit_transaction_l0(self.tx_l0.clone()).await?;
562 {
564 let l0 = writer.l0_manager.get_current();
565 let l0_guard = l0.read();
566 self.db
567 .cached_l0_mutation_count
568 .store(l0_guard.mutation_count, Ordering::Relaxed);
569 self.db
570 .cached_l0_estimated_size
571 .store(l0_guard.estimated_size, Ordering::Relaxed);
572 }
573 self.db.cached_wal_lsn.store(wal_lsn, Ordering::Relaxed);
574 let version = writer.l0_manager.get_current().read().current_version;
575 drop(writer);
576
577 self.completed = true;
578
579 let duration = self.start_time.elapsed();
580 tracing::Span::current().record("duration_ms", duration.as_millis());
581 metrics::histogram!("uni_transaction_duration_seconds").record(duration.as_secs_f64());
582 metrics::counter!("uni_transaction_commits_total").increment(1);
583
584 let mut rule_promotion_errors = Vec::new();
586 let rules_promoted = {
587 match (
588 self.rule_registry.read(),
589 self.session_rule_registry.write(),
590 ) {
591 (Ok(tx_reg), Ok(mut session_reg)) => {
592 let mut promoted = 0;
593 for (name, rule) in &tx_reg.rules {
594 if !session_reg.rules.contains_key(name) {
595 session_reg.rules.insert(name.clone(), rule.clone());
596 promoted += 1;
597 }
598 }
599 promoted
600 }
601 (Err(e), _) => {
602 rule_promotion_errors.push(RulePromotionError {
603 rule_text: "<all>".into(),
604 error: format!("tx rule registry lock poisoned: {e}"),
605 });
606 0
607 }
608 (_, Err(e)) => {
609 rule_promotion_errors.push(RulePromotionError {
610 rule_text: "<all>".into(),
611 error: format!("session rule registry lock poisoned: {e}"),
612 });
613 0
614 }
615 }
616 };
617
618 self.session_write_guard.store(false, Ordering::SeqCst);
620
621 self.session_metrics
623 .transactions_committed
624 .fetch_add(1, Ordering::Relaxed);
625 self.db.total_commits.fetch_add(1, Ordering::Relaxed);
626
627 let commit_result = CommitResult {
628 mutations_committed: mutations,
629 rules_promoted,
630 version,
631 started_at_version: self.started_at_version,
632 wal_lsn,
633 duration,
634 rule_promotion_errors,
635 };
636
637 let notif = crate::api::notifications::CommitNotification {
639 version,
640 mutation_count: mutations,
641 labels_affected,
642 edge_types_affected,
643 rules_promoted,
644 timestamp: chrono::Utc::now(),
645 tx_id: self.id.clone(),
646 session_id: self.session_id.clone(),
647 causal_version: self.started_at_version,
648 };
649 let _ = self.db.commit_tx.send(Arc::new(notif));
650
651 if !self.hooks.is_empty() {
653 let ctx = crate::api::hooks::CommitHookContext {
654 session_id: self.session_id.clone(),
655 tx_id: self.id.clone(),
656 mutation_count: mutations,
657 };
658 for hook in &self.hooks {
659 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
660 hook.after_commit(&ctx, &commit_result);
661 }));
662 if let Err(e) = result {
663 tracing::error!("after_commit hook panicked: {:?}", e);
664 }
665 }
666 }
667
668 info!("Transaction committed");
669
670 Ok(commit_result)
671 }
672
673 pub fn rollback(mut self) {
679 if self.completed {
680 return;
681 }
682 self.completed = true;
683
684 self.session_write_guard.store(false, Ordering::SeqCst);
686
687 let duration = self.start_time.elapsed();
688 metrics::histogram!("uni_transaction_duration_seconds").record(duration.as_secs_f64());
689 metrics::counter!("uni_transaction_rollbacks_total").increment(1);
690
691 self.session_metrics
693 .transactions_rolled_back
694 .fetch_add(1, Ordering::Relaxed);
695
696 info!("Transaction rolled back");
697 }
698
699 pub fn is_dirty(&self) -> bool {
701 self.tx_l0.read().mutation_count > 0
702 }
703
704 pub fn id(&self) -> &str {
706 &self.id
707 }
708
709 pub fn started_at_version(&self) -> u64 {
711 self.started_at_version
712 }
713
714 #[instrument(skip(self), fields(transaction_id = %self.id))]
716 pub fn cancel(&self) {
717 self.cancellation_token.cancel();
718 }
719
720 pub fn cancellation_token(&self) -> CancellationToken {
722 self.cancellation_token.clone()
723 }
724
725 fn snapshot_l0(&self) -> L0Snapshot {
727 let l0 = self.tx_l0.read();
728 L0Snapshot {
729 mutation_count: l0.mutation_count,
730 mutation_stats: l0.mutation_stats.clone(),
731 }
732 }
733
734 fn compute_execute_result(
736 before: &L0Snapshot,
737 after: &L0Snapshot,
738 result: &QueryResult,
739 ) -> ExecuteResult {
740 let affected_rows = if result.is_empty() {
741 after.mutation_count.saturating_sub(before.mutation_count)
742 } else {
743 result.len()
744 };
745 let diff = after.mutation_stats.diff(&before.mutation_stats);
746 ExecuteResult::with_details(affected_rows, &diff, result.metrics().clone())
747 }
748
749 fn check_completed(&self) -> Result<()> {
750 if self.completed {
751 return Err(UniError::TransactionAlreadyCompleted);
752 }
753 if let Some(deadline) = self.deadline
754 && Instant::now() > deadline
755 {
756 return Err(UniError::TransactionExpired {
757 tx_id: self.id.clone(),
758 hint: "Transaction exceeded its timeout. All operations are rejected. The transaction will auto-rollback on drop.",
759 });
760 }
761 Ok(())
762 }
763}
764
765impl Drop for Transaction {
766 fn drop(&mut self) {
767 if !self.completed {
768 if self.is_dirty() {
769 warn!(
770 transaction_id = %self.id,
771 "Transaction dropped with uncommitted writes — discarding private L0"
772 );
773 }
774 self.session_write_guard.store(false, Ordering::SeqCst);
777 }
778 }
779}
780
781pub struct ExecuteBuilder<'a> {
786 tx: &'a Transaction,
787 cypher: String,
788 params: HashMap<String, Value>,
789 timeout: Option<Duration>,
790}
791
792impl<'a> ExecuteBuilder<'a> {
793 pub fn param<K: Into<String>, V: Into<Value>>(mut self, key: K, value: V) -> Self {
795 self.params.insert(key.into(), value.into());
796 self
797 }
798
799 pub fn params<'p>(mut self, params: impl IntoIterator<Item = (&'p str, Value)>) -> Self {
801 for (k, v) in params {
802 self.params.insert(k.to_string(), v);
803 }
804 self
805 }
806
807 pub fn timeout(mut self, duration: Duration) -> Self {
809 self.timeout = Some(duration);
810 self
811 }
812
813 pub async fn run(self) -> Result<ExecuteResult> {
815 self.tx.check_completed()?;
816 let before = self.tx.snapshot_l0();
817 let fut = self.tx.db.execute_internal_with_tx_l0(
818 &self.cypher,
819 self.params,
820 self.tx.tx_l0.clone(),
821 );
822 let result = if let Some(t) = self.timeout {
823 tokio::time::timeout(t, fut)
824 .await
825 .map_err(|_| UniError::Timeout {
826 timeout_ms: t.as_millis() as u64,
827 })??
828 } else {
829 fut.await?
830 };
831 let after = self.tx.snapshot_l0();
832 Ok(Transaction::compute_execute_result(
833 &before, &after, &result,
834 ))
835 }
836}
837
838pub struct TxQueryBuilder<'a> {
840 tx: &'a Transaction,
841 cypher: String,
842 params: HashMap<String, Value>,
843 cancellation_token: Option<CancellationToken>,
844 timeout: Option<Duration>,
845}
846
847impl<'a> TxQueryBuilder<'a> {
848 pub fn param(mut self, name: &str, value: impl Into<Value>) -> Self {
850 self.params.insert(name.to_string(), value.into());
851 self
852 }
853
854 pub fn cancellation_token(mut self, token: CancellationToken) -> Self {
856 self.cancellation_token = Some(token);
857 self
858 }
859
860 pub fn timeout(mut self, duration: Duration) -> Self {
862 self.timeout = Some(duration);
863 self
864 }
865
866 pub async fn execute(self) -> Result<ExecuteResult> {
868 self.tx.check_completed()?;
869 let before = self.tx.snapshot_l0();
870 let fut = self.tx.db.execute_internal_with_tx_l0(
871 &self.cypher,
872 self.params,
873 self.tx.tx_l0.clone(),
874 );
875 let result = if let Some(t) = self.timeout {
876 tokio::time::timeout(t, fut)
877 .await
878 .map_err(|_| UniError::Timeout {
879 timeout_ms: t.as_millis() as u64,
880 })??
881 } else {
882 fut.await?
883 };
884 let after = self.tx.snapshot_l0();
885 Ok(Transaction::compute_execute_result(
886 &before, &after, &result,
887 ))
888 }
889
890 pub async fn fetch_all(self) -> Result<QueryResult> {
892 self.tx.check_completed()?;
893 let fut = self.tx.db.execute_internal_with_tx_l0(
894 &self.cypher,
895 self.params,
896 self.tx.tx_l0.clone(),
897 );
898 if let Some(t) = self.timeout {
899 tokio::time::timeout(t, fut)
900 .await
901 .map_err(|_| UniError::Timeout {
902 timeout_ms: t.as_millis() as u64,
903 })?
904 } else {
905 fut.await
906 }
907 }
908
909 pub async fn fetch_one(self) -> Result<Option<Row>> {
911 let result = self.fetch_all().await?;
912 Ok(result.into_rows().into_iter().next())
913 }
914
915 pub async fn cursor(self) -> Result<QueryCursor> {
917 self.tx.check_completed()?;
918 self.tx
919 .db
920 .execute_cursor_internal_with_tx_l0(&self.cypher, self.params, self.tx.tx_l0.clone())
921 .await
922 }
923}
924
925#[derive(Debug)]
927pub struct ApplyResult {
928 pub facts_applied: usize,
930 pub version_gap: u64,
933}
934
935pub struct ApplyBuilder<'a> {
937 tx: &'a Transaction,
938 derived: DerivedFactSet,
939 require_fresh: bool,
940 max_version_gap: Option<u64>,
941}
942
943impl<'a> ApplyBuilder<'a> {
944 pub fn require_fresh(mut self) -> Self {
947 self.require_fresh = true;
948 self
949 }
950
951 pub fn max_version_gap(mut self, n: u64) -> Self {
954 self.max_version_gap = Some(n);
955 self
956 }
957
958 pub async fn run(self) -> Result<ApplyResult> {
960 self.tx
961 .apply_internal(self.derived, self.require_fresh, self.max_version_gap)
962 .await
963 }
964}