1use std::collections::HashMap;
77use std::sync::Arc;
78
79use bytes::Bytes;
80use chrono::Utc;
81use conflict_checker::ConflictChecker;
82use futures::future::BoxFuture;
83use object_store::path::Path;
84use object_store::Error as ObjectStoreError;
85use serde_json::Value;
86use tracing::*;
87use uuid::Uuid;
88
89use delta_kernel::table_features::{ReaderFeature, WriterFeature};
90use serde::{Deserialize, Serialize};
91
92use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
93use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for};
94use crate::errors::DeltaTableError;
95use crate::kernel::{Action, CommitInfo, EagerSnapshot, Metadata, Protocol, Transaction};
96use crate::logstore::ObjectStoreRef;
97use crate::logstore::{CommitOrBytes, LogStoreRef};
98use crate::operations::CustomExecuteHandler;
99use crate::protocol::DeltaOperation;
100use crate::table::config::TableConfig;
101use crate::table::state::DeltaTableState;
102use crate::{crate_version, DeltaResult};
103
104pub use self::conflict_checker::CommitConflictError;
105pub use self::protocol::INSTANCE as PROTOCOL;
106
107#[cfg(test)]
108pub(crate) mod application;
109mod conflict_checker;
110mod protocol;
111#[cfg(feature = "datafusion")]
112mod state;
113
114const DELTA_LOG_FOLDER: &str = "_delta_log";
115pub(crate) const DEFAULT_RETRIES: usize = 15;
116
117#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
118#[serde(rename_all = "camelCase")]
119pub struct CommitMetrics {
120 pub num_retries: u64,
122}
123
124#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
125#[serde(rename_all = "camelCase")]
126pub struct PostCommitMetrics {
127 pub new_checkpoint_created: bool,
129
130 pub num_log_files_cleaned_up: u64,
132}
133
134#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
135#[serde(rename_all = "camelCase")]
136pub struct Metrics {
137 pub num_retries: u64,
139
140 pub new_checkpoint_created: bool,
142
143 pub num_log_files_cleaned_up: u64,
145}
146
147#[derive(thiserror::Error, Debug)]
149pub enum TransactionError {
150 #[error("Tried committing existing table version: {0}")]
152 VersionAlreadyExists(i64),
153
154 #[error("Error serializing commit log to json: {json_err}")]
156 SerializeLogJson {
157 json_err: serde_json::error::Error,
159 },
160
161 #[error("Log storage error: {}", .source)]
163 ObjectStore {
164 #[from]
166 source: ObjectStoreError,
167 },
168
169 #[error("Failed to commit transaction: {0}")]
171 CommitConflict(#[from] CommitConflictError),
172
173 #[error("Failed to commit transaction: {0}")]
175 MaxCommitAttempts(i32),
176
177 #[error(
179 "The transaction includes Remove action with data change but Delta table is append-only"
180 )]
181 DeltaTableAppendOnly,
182
183 #[error("Unsupported reader features required: {0:?}")]
185 UnsupportedReaderFeatures(Vec<ReaderFeature>),
186
187 #[error("Unsupported writer features required: {0:?}")]
189 UnsupportedWriterFeatures(Vec<WriterFeature>),
190
191 #[error("Writer features must be specified for writerversion >= 7, please specify: {0:?}")]
193 WriterFeaturesRequired(WriterFeature),
194
195 #[error("Reader features must be specified for reader version >= 3, please specify: {0:?}")]
197 ReaderFeaturesRequired(ReaderFeature),
198
199 #[error("Transaction failed: {msg}")]
202 LogStoreError {
203 msg: String,
205 source: Box<dyn std::error::Error + Send + Sync + 'static>,
207 },
208}
209
210impl From<TransactionError> for DeltaTableError {
211 fn from(err: TransactionError) -> Self {
212 match err {
213 TransactionError::VersionAlreadyExists(version) => {
214 DeltaTableError::VersionAlreadyExists(version)
215 }
216 TransactionError::SerializeLogJson { json_err } => {
217 DeltaTableError::SerializeLogJson { json_err }
218 }
219 TransactionError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
220 other => DeltaTableError::Transaction { source: other },
221 }
222 }
223}
224
225#[derive(thiserror::Error, Debug)]
227pub enum CommitBuilderError {}
228
229impl From<CommitBuilderError> for DeltaTableError {
230 fn from(err: CommitBuilderError) -> Self {
231 DeltaTableError::CommitValidation { source: err }
232 }
233}
234
235pub trait TableReference: Send + Sync {
237 fn config(&self) -> TableConfig;
239
240 fn protocol(&self) -> &Protocol;
242
243 fn metadata(&self) -> &Metadata;
245
246 fn eager_snapshot(&self) -> &EagerSnapshot;
248}
249
250impl TableReference for EagerSnapshot {
251 fn protocol(&self) -> &Protocol {
252 EagerSnapshot::protocol(self)
253 }
254
255 fn metadata(&self) -> &Metadata {
256 EagerSnapshot::metadata(self)
257 }
258
259 fn config(&self) -> TableConfig {
260 self.table_config()
261 }
262
263 fn eager_snapshot(&self) -> &EagerSnapshot {
264 self
265 }
266}
267
268impl TableReference for DeltaTableState {
269 fn config(&self) -> TableConfig {
270 self.snapshot.config()
271 }
272
273 fn protocol(&self) -> &Protocol {
274 self.snapshot.protocol()
275 }
276
277 fn metadata(&self) -> &Metadata {
278 self.snapshot.metadata()
279 }
280
281 fn eager_snapshot(&self) -> &EagerSnapshot {
282 &self.snapshot
283 }
284}
285
286#[derive(Debug)]
288pub struct CommitData {
289 pub actions: Vec<Action>,
291 pub operation: DeltaOperation,
293 pub app_metadata: HashMap<String, Value>,
295 pub app_transactions: Vec<Transaction>,
297}
298
299impl CommitData {
300 pub fn new(
302 mut actions: Vec<Action>,
303 operation: DeltaOperation,
304 mut app_metadata: HashMap<String, Value>,
305 app_transactions: Vec<Transaction>,
306 ) -> Self {
307 if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) {
308 let mut commit_info = operation.get_commit_info();
309 commit_info.timestamp = Some(Utc::now().timestamp_millis());
310 app_metadata.insert(
311 "clientVersion".to_string(),
312 Value::String(format!("delta-rs.{}", crate_version())),
313 );
314 app_metadata.extend(commit_info.info);
315 commit_info.info = app_metadata.clone();
316 actions.push(Action::CommitInfo(commit_info))
317 }
318
319 for txn in &app_transactions {
320 actions.push(Action::Txn(txn.clone()))
321 }
322
323 CommitData {
324 actions,
325 operation,
326 app_metadata,
327 app_transactions,
328 }
329 }
330
331 pub fn get_bytes(&self) -> Result<bytes::Bytes, TransactionError> {
333 let mut jsons = Vec::<String>::new();
334 for action in &self.actions {
335 let json = serde_json::to_string(action)
336 .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
337 jsons.push(json);
338 }
339 Ok(bytes::Bytes::from(jsons.join("\n")))
340 }
341}
342
343#[derive(Clone, Debug, Copy)]
344pub struct PostCommitHookProperties {
346 create_checkpoint: bool,
347 cleanup_expired_logs: Option<bool>,
349}
350
351#[derive(Clone, Debug)]
352pub struct CommitProperties {
355 pub(crate) app_metadata: HashMap<String, Value>,
356 pub(crate) app_transaction: Vec<Transaction>,
357 max_retries: usize,
358 create_checkpoint: bool,
359 cleanup_expired_logs: Option<bool>,
360}
361
362impl Default for CommitProperties {
363 fn default() -> Self {
364 Self {
365 app_metadata: Default::default(),
366 app_transaction: Vec::new(),
367 max_retries: DEFAULT_RETRIES,
368 create_checkpoint: true,
369 cleanup_expired_logs: None,
370 }
371 }
372}
373
374impl CommitProperties {
375 pub fn with_metadata(
377 mut self,
378 metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
379 ) -> Self {
380 self.app_metadata = HashMap::from_iter(metadata);
381 self
382 }
383
384 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
386 self.max_retries = max_retries;
387 self
388 }
389
390 pub fn with_create_checkpoint(mut self, create_checkpoint: bool) -> Self {
392 self.create_checkpoint = create_checkpoint;
393 self
394 }
395
396 pub fn with_application_transaction(mut self, txn: Transaction) -> Self {
398 self.app_transaction.push(txn);
399 self
400 }
401
402 pub fn with_application_transactions(mut self, txn: Vec<Transaction>) -> Self {
404 self.app_transaction = txn;
405 self
406 }
407
408 pub fn with_cleanup_expired_logs(mut self, cleanup_expired_logs: Option<bool>) -> Self {
410 self.cleanup_expired_logs = cleanup_expired_logs;
411 self
412 }
413}
414
415impl From<CommitProperties> for CommitBuilder {
416 fn from(value: CommitProperties) -> Self {
417 CommitBuilder {
418 max_retries: value.max_retries,
419 app_metadata: value.app_metadata,
420 post_commit_hook: Some(PostCommitHookProperties {
421 create_checkpoint: value.create_checkpoint,
422 cleanup_expired_logs: value.cleanup_expired_logs,
423 }),
424 app_transaction: value.app_transaction,
425 ..Default::default()
426 }
427 }
428}
429
430pub struct CommitBuilder {
432 actions: Vec<Action>,
433 app_metadata: HashMap<String, Value>,
434 app_transaction: Vec<Transaction>,
435 max_retries: usize,
436 post_commit_hook: Option<PostCommitHookProperties>,
437 post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
438 operation_id: Uuid,
439}
440
441impl Default for CommitBuilder {
442 fn default() -> Self {
443 CommitBuilder {
444 actions: Vec::new(),
445 app_metadata: HashMap::new(),
446 app_transaction: Vec::new(),
447 max_retries: DEFAULT_RETRIES,
448 post_commit_hook: None,
449 post_commit_hook_handler: None,
450 operation_id: Uuid::new_v4(),
451 }
452 }
453}
454
455impl<'a> CommitBuilder {
456 pub fn with_actions(mut self, actions: Vec<Action>) -> Self {
458 self.actions = actions;
459 self
460 }
461
462 pub fn with_app_metadata(mut self, app_metadata: HashMap<String, Value>) -> Self {
464 self.app_metadata = app_metadata;
465 self
466 }
467
468 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
470 self.max_retries = max_retries;
471 self
472 }
473
474 pub fn with_post_commit_hook(mut self, post_commit_hook: PostCommitHookProperties) -> Self {
476 self.post_commit_hook = Some(post_commit_hook);
477 self
478 }
479
480 pub fn with_operation_id(mut self, operation_id: Uuid) -> Self {
482 self.operation_id = operation_id;
483 self
484 }
485
486 pub fn with_post_commit_hook_handler(
488 mut self,
489 handler: Option<Arc<dyn CustomExecuteHandler>>,
490 ) -> Self {
491 self.post_commit_hook_handler = handler;
492 self
493 }
494
495 pub fn build(
497 self,
498 table_data: Option<&'a dyn TableReference>,
499 log_store: LogStoreRef,
500 operation: DeltaOperation,
501 ) -> PreCommit<'a> {
502 let data = CommitData::new(
503 self.actions,
504 operation,
505 self.app_metadata,
506 self.app_transaction,
507 );
508 PreCommit {
509 log_store,
510 table_data,
511 max_retries: self.max_retries,
512 data,
513 post_commit_hook: self.post_commit_hook,
514 post_commit_hook_handler: self.post_commit_hook_handler,
515 operation_id: self.operation_id,
516 }
517 }
518}
519
520pub struct PreCommit<'a> {
522 log_store: LogStoreRef,
523 table_data: Option<&'a dyn TableReference>,
524 data: CommitData,
525 max_retries: usize,
526 post_commit_hook: Option<PostCommitHookProperties>,
527 post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
528 operation_id: Uuid,
529}
530
531impl<'a> std::future::IntoFuture for PreCommit<'a> {
532 type Output = DeltaResult<FinalizedCommit>;
533 type IntoFuture = BoxFuture<'a, Self::Output>;
534
535 fn into_future(self) -> Self::IntoFuture {
536 Box::pin(async move { self.into_prepared_commit_future().await?.await?.await })
537 }
538}
539
540impl<'a> PreCommit<'a> {
541 pub fn into_prepared_commit_future(self) -> BoxFuture<'a, DeltaResult<PreparedCommit<'a>>> {
543 let this = self;
544
545 async fn write_tmp_commit(
548 log_entry: Bytes,
549 store: ObjectStoreRef,
550 ) -> DeltaResult<CommitOrBytes> {
551 let token = uuid::Uuid::new_v4().to_string();
552 let path = Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]);
553 store.put(&path, log_entry.into()).await?;
554 Ok(CommitOrBytes::TmpCommit(path))
555 }
556
557 Box::pin(async move {
558 if let Some(table_reference) = this.table_data {
559 PROTOCOL.can_commit(table_reference, &this.data.actions, &this.data.operation)?;
560 }
561 let log_entry = this.data.get_bytes()?;
562
563 let commit_or_bytes = if ["LakeFSLogStore", "DefaultLogStore"]
566 .contains(&this.log_store.name().as_str())
567 {
568 CommitOrBytes::LogBytes(log_entry)
569 } else {
570 write_tmp_commit(
571 log_entry,
572 this.log_store.object_store(Some(this.operation_id)),
573 )
574 .await?
575 };
576
577 Ok(PreparedCommit {
578 commit_or_bytes,
579 log_store: this.log_store,
580 table_data: this.table_data,
581 max_retries: this.max_retries,
582 data: this.data,
583 post_commit: this.post_commit_hook,
584 post_commit_hook_handler: this.post_commit_hook_handler,
585 operation_id: this.operation_id,
586 })
587 })
588 }
589}
590
591pub struct PreparedCommit<'a> {
593 commit_or_bytes: CommitOrBytes,
594 log_store: LogStoreRef,
595 data: CommitData,
596 table_data: Option<&'a dyn TableReference>,
597 max_retries: usize,
598 post_commit: Option<PostCommitHookProperties>,
599 post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
600 operation_id: Uuid,
601}
602
603impl PreparedCommit<'_> {
604 pub fn commit_or_bytes(&self) -> &CommitOrBytes {
606 &self.commit_or_bytes
607 }
608}
609
610impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
611 type Output = DeltaResult<PostCommit>;
612 type IntoFuture = BoxFuture<'a, Self::Output>;
613
614 fn into_future(self) -> Self::IntoFuture {
615 let this = self;
616
617 Box::pin(async move {
618 let commit_or_bytes = this.commit_or_bytes;
619
620 if this.table_data.is_none() {
621 this.log_store
622 .write_commit_entry(0, commit_or_bytes.clone(), this.operation_id)
623 .await?;
624 return Ok(PostCommit {
625 version: 0,
626 data: this.data,
627 create_checkpoint: false,
628 cleanup_expired_logs: None,
629 log_store: this.log_store,
630 table_data: None,
631 custom_execute_handler: this.post_commit_hook_handler,
632 metrics: CommitMetrics { num_retries: 0 },
633 });
634 }
635
636 let mut read_snapshot = this.table_data.unwrap().eager_snapshot().clone();
638
639 let mut attempt_number = 1;
640 let total_retries = this.max_retries + 1;
641 while attempt_number <= total_retries {
642 let latest_version = this
643 .log_store
644 .get_latest_version(read_snapshot.version())
645 .await?;
646
647 if latest_version > read_snapshot.version() {
648 if this.max_retries == 0 {
651 return Err(
652 TransactionError::MaxCommitAttempts(this.max_retries as i32).into()
653 );
654 }
655 warn!("Attempting to write a transaction {} but the underlying table has been updated to {latest_version}\n{:?}", read_snapshot.version() + 1, this.log_store);
656 let mut steps = latest_version - read_snapshot.version();
657
658 while steps != 0 {
661 let summary = WinningCommitSummary::try_new(
662 this.log_store.as_ref(),
663 latest_version - steps,
664 (latest_version - steps) + 1,
665 )
666 .await?;
667 let transaction_info = TransactionInfo::try_new(
668 &read_snapshot,
669 this.data.operation.read_predicate(),
670 &this.data.actions,
671 this.data.operation.read_whole_table(),
672 )?;
673 let conflict_checker = ConflictChecker::new(
674 transaction_info,
675 summary,
676 Some(&this.data.operation),
677 );
678
679 match conflict_checker.check_conflicts() {
680 Ok(_) => {}
681 Err(err) => {
682 return Err(TransactionError::CommitConflict(err).into());
683 }
684 }
685 steps -= 1;
686 }
687 read_snapshot
689 .update(this.log_store.clone(), Some(latest_version))
690 .await?;
691 }
692 let version: i64 = latest_version + 1;
693
694 match this
695 .log_store
696 .write_commit_entry(version, commit_or_bytes.clone(), this.operation_id)
697 .await
698 {
699 Ok(()) => {
700 return Ok(PostCommit {
701 version,
702 data: this.data,
703 create_checkpoint: this
704 .post_commit
705 .map(|v| v.create_checkpoint)
706 .unwrap_or_default(),
707 cleanup_expired_logs: this
708 .post_commit
709 .map(|v| v.cleanup_expired_logs)
710 .unwrap_or_default(),
711 log_store: this.log_store,
712 table_data: Some(Box::new(read_snapshot)),
713 custom_execute_handler: this.post_commit_hook_handler,
714 metrics: CommitMetrics {
715 num_retries: attempt_number as u64 - 1,
716 },
717 });
718 }
719 Err(TransactionError::VersionAlreadyExists(version)) => {
720 error!("The transaction {version} already exists, will retry!");
721 attempt_number += 1;
724 }
725 Err(err) => {
726 this.log_store
727 .abort_commit_entry(version, commit_or_bytes, this.operation_id)
728 .await?;
729 return Err(err.into());
730 }
731 }
732 }
733
734 Err(TransactionError::MaxCommitAttempts(this.max_retries as i32).into())
735 })
736 }
737}
738
739pub struct PostCommit {
741 pub version: i64,
743 pub data: CommitData,
745 create_checkpoint: bool,
746 cleanup_expired_logs: Option<bool>,
747 log_store: LogStoreRef,
748 table_data: Option<Box<dyn TableReference>>,
749 custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
750 metrics: CommitMetrics,
751}
752
753impl PostCommit {
754 async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> {
756 if let Some(table) = &self.table_data {
757 let post_commit_operation_id = Uuid::new_v4();
758 let mut snapshot = table.eager_snapshot().clone();
759 if self.version - snapshot.version() > 1 {
760 snapshot
763 .update(self.log_store.clone(), Some(self.version - 1))
764 .await?;
765 snapshot.advance(vec![&self.data])?;
766 } else {
767 snapshot.advance(vec![&self.data])?;
768 }
769 let mut state = DeltaTableState { snapshot };
770
771 let cleanup_logs = if let Some(cleanup_logs) = self.cleanup_expired_logs {
772 cleanup_logs
773 } else {
774 state.table_config().enable_expired_log_cleanup()
775 };
776
777 if let Some(custom_execute_handler) = &self.custom_execute_handler {
779 custom_execute_handler
780 .before_post_commit_hook(
781 &self.log_store,
782 cleanup_logs || self.create_checkpoint,
783 post_commit_operation_id,
784 )
785 .await?
786 }
787
788 let mut new_checkpoint_created = false;
789 if self.create_checkpoint {
790 new_checkpoint_created = self
792 .create_checkpoint(
793 &state,
794 &self.log_store,
795 self.version,
796 post_commit_operation_id,
797 )
798 .await?;
799 }
800
801 let mut num_log_files_cleaned_up: u64 = 0;
802 if cleanup_logs {
803 num_log_files_cleaned_up = cleanup_expired_logs_for(
805 self.version,
806 self.log_store.as_ref(),
807 Utc::now().timestamp_millis()
808 - state.table_config().log_retention_duration().as_millis() as i64,
809 Some(post_commit_operation_id),
810 )
811 .await? as u64;
812 if num_log_files_cleaned_up > 0 {
813 state = DeltaTableState::try_new(
814 &state.snapshot().table_root(),
815 self.log_store.object_store(None),
816 state.load_config().clone(),
817 Some(self.version),
818 )
819 .await?;
820 }
821 }
822
823 if let Some(custom_execute_handler) = &self.custom_execute_handler {
825 custom_execute_handler
826 .after_post_commit_hook(
827 &self.log_store,
828 cleanup_logs || self.create_checkpoint,
829 post_commit_operation_id,
830 )
831 .await?
832 }
833 Ok((
834 state,
835 PostCommitMetrics {
836 new_checkpoint_created,
837 num_log_files_cleaned_up,
838 },
839 ))
840 } else {
841 let state = DeltaTableState::try_new(
842 &Path::default(),
843 self.log_store.object_store(None),
844 Default::default(),
845 Some(self.version),
846 )
847 .await?;
848 Ok((
849 state,
850 PostCommitMetrics {
851 new_checkpoint_created: false,
852 num_log_files_cleaned_up: 0,
853 },
854 ))
855 }
856 }
857 async fn create_checkpoint(
858 &self,
859 table_state: &DeltaTableState,
860 log_store: &LogStoreRef,
861 version: i64,
862 operation_id: Uuid,
863 ) -> DeltaResult<bool> {
864 if !table_state.load_config().require_files {
865 warn!("Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files.");
866 return Ok(false);
867 }
868
869 let checkpoint_interval = table_state.config().checkpoint_interval() as i64;
870 if ((version + 1) % checkpoint_interval) == 0 {
871 create_checkpoint_for(version, table_state, log_store.as_ref(), Some(operation_id))
872 .await?;
873 Ok(true)
874 } else {
875 Ok(false)
876 }
877 }
878}
879
880pub struct FinalizedCommit {
882 pub snapshot: DeltaTableState,
884
885 pub version: i64,
887
888 pub metrics: Metrics,
890}
891
892impl FinalizedCommit {
893 pub fn snapshot(&self) -> DeltaTableState {
895 self.snapshot.clone()
896 }
897 pub fn version(&self) -> i64 {
899 self.version
900 }
901}
902
903impl std::future::IntoFuture for PostCommit {
904 type Output = DeltaResult<FinalizedCommit>;
905 type IntoFuture = BoxFuture<'static, Self::Output>;
906
907 fn into_future(self) -> Self::IntoFuture {
908 let this = self;
909
910 Box::pin(async move {
911 match this.run_post_commit_hook().await {
912 Ok((snapshot, post_commit_metrics)) => Ok(FinalizedCommit {
913 snapshot,
914 version: this.version,
915 metrics: Metrics {
916 num_retries: this.metrics.num_retries,
917 new_checkpoint_created: post_commit_metrics.new_checkpoint_created,
918 num_log_files_cleaned_up: post_commit_metrics.num_log_files_cleaned_up,
919 },
920 }),
921 Err(err) => Err(err),
922 }
923 })
924 }
925}
926
927#[cfg(test)]
928mod tests {
929 use std::sync::Arc;
930
931 use super::*;
932 use crate::logstore::{commit_uri_from_version, default_logstore::DefaultLogStore, LogStore};
933 use object_store::{memory::InMemory, ObjectStore, PutPayload};
934 use url::Url;
935
936 #[test]
937 fn test_commit_uri_from_version() {
938 let version = commit_uri_from_version(0);
939 assert_eq!(version, Path::from("_delta_log/00000000000000000000.json"));
940 let version = commit_uri_from_version(123);
941 assert_eq!(version, Path::from("_delta_log/00000000000000000123.json"))
942 }
943
944 #[tokio::test]
945 async fn test_try_commit_transaction() {
946 let store = Arc::new(InMemory::new());
947 let url = Url::parse("mem://what/is/this").unwrap();
948 let log_store = DefaultLogStore::new(
949 store.clone(),
950 crate::logstore::LogStoreConfig {
951 location: url,
952 options: Default::default(),
953 },
954 );
955 let version_path = Path::from("_delta_log/00000000000000000000.json");
956 store.put(&version_path, PutPayload::new()).await.unwrap();
957
958 let res = log_store
959 .write_commit_entry(
960 0,
961 CommitOrBytes::LogBytes(PutPayload::new().into()),
962 Uuid::new_v4(),
963 )
964 .await;
965 assert!(res.is_err());
967
968 log_store
970 .write_commit_entry(
971 1,
972 CommitOrBytes::LogBytes(PutPayload::new().into()),
973 Uuid::new_v4(),
974 )
975 .await
976 .unwrap();
977 }
978}