1use std::collections::HashMap;
51use std::sync::{Arc, LazyLock};
52
53use bytes::Bytes;
54#[cfg(feature = "datafusion")]
55use datafusion::datasource::object_store::ObjectStoreUrl;
56use delta_kernel::engine::default::executor::tokio::{
57 TokioBackgroundExecutor, TokioMultiThreadExecutor,
58};
59use delta_kernel::engine::default::DefaultEngine;
60use delta_kernel::log_segment::LogSegment;
61use delta_kernel::path::{LogPathFileType, ParsedLogPath};
62use delta_kernel::{AsAny, Engine};
63use futures::StreamExt;
64use object_store::ObjectStoreScheme;
65use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
66use regex::Regex;
67use serde::de::{Error, SeqAccess, Visitor};
68use serde::ser::SerializeSeq;
69use serde::{Deserialize, Serialize};
70use serde_json::Deserializer;
71use tokio::runtime::RuntimeFlavor;
72use tracing::*;
73use url::Url;
74use uuid::Uuid;
75
76use crate::kernel::transaction::TransactionError;
77use crate::kernel::{spawn_blocking_with_span, Action};
78use crate::{DeltaResult, DeltaTableError};
79
80pub use self::config::StorageConfig;
81pub use self::factories::{
82 logstore_factories, object_store_factories, store_for, LogStoreFactory,
83 LogStoreFactoryRegistry, ObjectStoreFactory, ObjectStoreFactoryRegistry,
84};
85pub use self::storage::utils::commit_uri_from_version;
86pub use self::storage::{
87 DefaultObjectStoreRegistry, DeltaIOStorageBackend, IORuntime, ObjectStoreRef,
88 ObjectStoreRegistry, ObjectStoreRetryExt,
89};
90pub use ::object_store;
92
93pub mod config;
94pub(crate) mod default_logstore;
95pub(crate) mod factories;
96pub(crate) mod storage;
97
98trait LogStoreFactoryExt {
100 fn with_options_internal(
109 &self,
110 root_store: ObjectStoreRef,
111 location: &Url,
112 options: &StorageConfig,
113 ) -> DeltaResult<LogStoreRef>;
114}
115
116impl<T: LogStoreFactory + ?Sized> LogStoreFactoryExt for T {
117 fn with_options_internal(
118 &self,
119 root_store: ObjectStoreRef,
120 location: &Url,
121 options: &StorageConfig,
122 ) -> DeltaResult<LogStoreRef> {
123 let prefixed_store = options.decorate_store(root_store.clone(), location)?;
124 let log_store =
125 self.with_options(Arc::new(prefixed_store), root_store, location, options)?;
126 Ok(log_store)
127 }
128}
129
130impl<T: LogStoreFactory> LogStoreFactoryExt for Arc<T> {
131 fn with_options_internal(
132 &self,
133 root_store: ObjectStoreRef,
134 location: &Url,
135 options: &StorageConfig,
136 ) -> DeltaResult<LogStoreRef> {
137 T::with_options_internal(self, root_store, location, options)
138 }
139}
140
141pub fn default_logstore(
143 prefixed_store: ObjectStoreRef,
144 root_store: ObjectStoreRef,
145 location: &Url,
146 options: &StorageConfig,
147) -> Arc<dyn LogStore> {
148 Arc::new(default_logstore::DefaultLogStore::new(
149 prefixed_store,
150 root_store,
151 LogStoreConfig {
152 location: location.clone(),
153 options: options.clone(),
154 },
155 ))
156}
157
158pub type LogStoreRef = Arc<dyn LogStore>;
160
161static DELTA_LOG_PATH: LazyLock<Path> = LazyLock::new(|| Path::from("_delta_log"));
162
163pub(crate) static DELTA_LOG_REGEX: LazyLock<Regex> =
164 LazyLock::new(|| Regex::new(r"(\d{20})\.(json|checkpoint(\.\d+)?\.parquet)$").unwrap());
165
166pub fn logstore_for(location: Url, storage_config: StorageConfig) -> DeltaResult<LogStoreRef> {
179 let scheme = Url::parse(&format!("{}://", location.scheme()))
181 .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;
182
183 if let Some(entry) = object_store_factories().get(&scheme) {
184 debug!("Found a storage provider for {scheme} ({location})");
185 let (root_store, _prefix) = entry.value().parse_url_opts(&location, &storage_config)?;
186 return logstore_with(root_store, location, storage_config);
187 }
188
189 Err(DeltaTableError::InvalidTableLocation(location.into()))
190}
191
192pub fn logstore_with(
194 root_store: ObjectStoreRef,
195 location: Url,
196 storage_config: StorageConfig,
197) -> DeltaResult<LogStoreRef> {
198 let scheme = Url::parse(&format!("{}://", location.scheme()))
199 .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;
200
201 if let Some(factory) = logstore_factories().get(&scheme) {
202 debug!("Found a logstore provider for {scheme}");
203 return factory
204 .value()
205 .with_options_internal(root_store, &location, &storage_config);
206 }
207
208 error!("Could not find a logstore for the scheme {scheme}");
209 Err(DeltaTableError::InvalidTableLocation(
210 location.clone().into(),
211 ))
212}
213
214#[derive(Clone)]
216pub enum CommitOrBytes {
217 TmpCommit(Path),
219 LogBytes(Bytes),
221}
222
223#[derive(Debug)]
226pub enum PeekCommit {
227 New(i64, Vec<Action>),
229 UpToDate,
231}
232
233#[derive(Debug, Clone)]
235pub struct LogStoreConfig {
236 pub location: Url,
238 pub options: StorageConfig,
240}
241
242impl LogStoreConfig {
243 pub fn decorate_store<T: ObjectStore + Clone>(
244 &self,
245 store: T,
246 table_root: Option<&url::Url>,
247 ) -> DeltaResult<Box<dyn ObjectStore>> {
248 let table_url = table_root.unwrap_or(&self.location);
249 self.options.decorate_store(store, table_url)
250 }
251
252 pub fn object_store_factory(&self) -> ObjectStoreFactoryRegistry {
253 self::factories::object_store_factories()
254 }
255}
256
257#[async_trait::async_trait]
268pub trait LogStore: Send + Sync + AsAny {
269 fn name(&self) -> String;
271
272 async fn refresh(&self) -> DeltaResult<()> {
274 Ok(())
275 }
276
277 async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>>;
279
280 async fn write_commit_entry(
285 &self,
286 version: i64,
287 commit_or_bytes: CommitOrBytes,
288 operation_id: Uuid,
289 ) -> Result<(), TransactionError>;
290
291 async fn abort_commit_entry(
293 &self,
294 version: i64,
295 commit_or_bytes: CommitOrBytes,
296 operation_id: Uuid,
297 ) -> Result<(), TransactionError>;
298
299 async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;
301
302 async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
304 let next_version = current_version + 1;
305 let commit_log_bytes = match self.read_commit_entry(next_version).await {
306 Ok(Some(bytes)) => Ok(bytes),
307 Ok(None) => return Ok(PeekCommit::UpToDate),
308 Err(err) => Err(err),
309 }?;
310
311 let actions = crate::logstore::get_actions(next_version, &commit_log_bytes);
312 Ok(PeekCommit::New(next_version, actions?))
313 }
314
315 fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore>;
317
318 fn root_object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore>;
319
320 fn engine(&self, operation_id: Option<Uuid>) -> Arc<dyn Engine> {
321 let store = self.root_object_store(operation_id);
322 get_engine(store)
323 }
324
325 fn to_uri(&self, location: &Path) -> String {
327 let root = &self.config().location;
328 to_uri(root, location)
329 }
330
331 fn root_uri(&self) -> String {
333 self.to_uri(&Path::from(""))
334 }
335
336 fn log_path(&self) -> &Path {
338 &DELTA_LOG_PATH
339 }
340
341 #[deprecated(
342 since = "0.1.0",
343 note = "DO NOT USE: Just a stop gap to support lakefs during kernel migration"
344 )]
345 fn transaction_url(&self, _operation_id: Uuid, base: &Url) -> DeltaResult<Url> {
346 Ok(base.clone())
347 }
348
349 async fn is_delta_table_location(&self) -> DeltaResult<bool> {
351 let object_store = self.object_store(None);
352 let dummy_url = Url::parse("http://example.com").unwrap();
353 let log_path = Path::from("_delta_log");
354
355 let mut stream = object_store.list(Some(&log_path));
356 while let Some(res) = stream.next().await {
357 match res {
358 Ok(meta) => {
359 let file_url = dummy_url.join(meta.location.as_ref()).unwrap();
360 if let Ok(Some(parsed_path)) = ParsedLogPath::try_from(file_url) {
361 if matches!(
362 parsed_path.file_type,
363 LogPathFileType::Commit
364 | LogPathFileType::SinglePartCheckpoint
365 | LogPathFileType::UuidCheckpoint(_)
366 | LogPathFileType::MultiPartCheckpoint { .. }
367 | LogPathFileType::CompactedCommit { .. }
368 ) {
369 return Ok(true);
370 }
371 }
372 continue;
373 }
374 Err(ObjectStoreError::NotFound { .. }) => return Ok(false),
375 Err(err) => return Err(err.into()),
376 }
377 }
378
379 Ok(false)
380 }
381
382 fn config(&self) -> &LogStoreConfig;
384
385 #[cfg(feature = "datafusion")]
386 fn object_store_url(&self) -> ObjectStoreUrl {
392 crate::logstore::object_store_url(&self.config().location)
393 }
394}
395
396pub(crate) trait LogStoreExt: LogStore {
398 fn table_root_url(&self) -> Url {
403 let mut base = self.config().location.clone();
404 if !base.path().ends_with("/") {
405 base.set_path(&format!("{}/", base.path()));
406 }
407 base
408 }
409
410 fn log_root_url(&self) -> Url {
415 self.table_root_url().join("_delta_log/").unwrap()
416 }
417}
418
419impl<T: LogStore + ?Sized> LogStoreExt for T {}
420
421#[async_trait::async_trait]
422impl<T: LogStore + ?Sized> LogStore for Arc<T> {
423 fn name(&self) -> String {
424 T::name(self)
425 }
426
427 async fn refresh(&self) -> DeltaResult<()> {
428 T::refresh(self).await
429 }
430
431 async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
432 T::read_commit_entry(self, version).await
433 }
434
435 async fn write_commit_entry(
436 &self,
437 version: i64,
438 commit_or_bytes: CommitOrBytes,
439 operation_id: Uuid,
440 ) -> Result<(), TransactionError> {
441 T::write_commit_entry(self, version, commit_or_bytes, operation_id).await
442 }
443
444 async fn abort_commit_entry(
445 &self,
446 version: i64,
447 commit_or_bytes: CommitOrBytes,
448 operation_id: Uuid,
449 ) -> Result<(), TransactionError> {
450 T::abort_commit_entry(self, version, commit_or_bytes, operation_id).await
451 }
452
453 async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64> {
454 T::get_latest_version(self, start_version).await
455 }
456
457 async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
458 T::peek_next_commit(self, current_version).await
459 }
460
461 fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
462 T::object_store(self, operation_id)
463 }
464
465 fn root_object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
466 T::root_object_store(self, operation_id)
467 }
468
469 fn engine(&self, operation_id: Option<Uuid>) -> Arc<dyn Engine> {
470 T::engine(self, operation_id)
471 }
472
473 fn to_uri(&self, location: &Path) -> String {
474 T::to_uri(self, location)
475 }
476
477 fn root_uri(&self) -> String {
478 T::root_uri(self)
479 }
480
481 fn log_path(&self) -> &Path {
482 T::log_path(self)
483 }
484
485 async fn is_delta_table_location(&self) -> DeltaResult<bool> {
486 T::is_delta_table_location(self).await
487 }
488
489 fn config(&self) -> &LogStoreConfig {
490 T::config(self)
491 }
492
493 #[cfg(feature = "datafusion")]
494 fn object_store_url(&self) -> ObjectStoreUrl {
495 T::object_store_url(self)
496 }
497}
498
499pub(crate) fn get_engine(store: Arc<dyn ObjectStore>) -> Arc<dyn Engine> {
500 let handle = tokio::runtime::Handle::current();
501 match handle.runtime_flavor() {
502 RuntimeFlavor::MultiThread => Arc::new(DefaultEngine::new(
503 store,
504 Arc::new(TokioMultiThreadExecutor::new(handle)),
505 )),
506 RuntimeFlavor::CurrentThread => Arc::new(DefaultEngine::new(
507 store,
508 Arc::new(TokioBackgroundExecutor::new()),
509 )),
510 _ => panic!("unsupported runtime flavor"),
511 }
512}
513
514#[cfg(feature = "datafusion")]
515fn object_store_url(location: &Url) -> ObjectStoreUrl {
516 use object_store::path::DELIMITER;
517 ObjectStoreUrl::parse(format!(
518 "delta-rs://{}-{}{}",
519 location.scheme(),
520 location.host_str().unwrap_or("-"),
521 location.path().replace(DELIMITER, "-").replace(':', "-")
522 ))
523 .expect("Invalid object store url.")
524}
525
526pub(crate) fn object_store_path(table_root: &Url) -> DeltaResult<Path> {
529 Ok(match ObjectStoreScheme::parse(table_root) {
530 Ok((_, path)) => path,
531 _ => Path::parse(table_root.path())?,
532 })
533}
534
535pub fn to_uri(root: &Url, location: &Path) -> String {
537 match root.scheme() {
538 "file" => {
539 #[cfg(windows)]
540 let uri = format!(
541 "{}/{}",
542 root.as_ref().trim_end_matches('/'),
543 location.as_ref()
544 )
545 .replace("file:///", "");
546 #[cfg(unix)]
547 let uri = format!(
548 "{}/{}",
549 root.as_ref().trim_end_matches('/'),
550 location.as_ref()
551 )
552 .replace("file://", "");
553 uri
554 }
555 _ => {
556 if location.as_ref().is_empty() || location.as_ref() == "/" {
557 root.as_ref().to_string()
558 } else {
559 format!("{}/{}", root.as_ref(), location.as_ref())
560 }
561 }
562 }
563}
564
565pub fn get_actions(
567 version: i64,
568 commit_log_bytes: &bytes::Bytes,
569) -> Result<Vec<Action>, DeltaTableError> {
570 debug!("parsing commit with version {version}...");
571 Deserializer::from_slice(commit_log_bytes)
572 .into_iter::<Action>()
573 .map(|result| {
574 result.map_err(|e| {
575 let line = format!("Error at line {}, column {}", e.line(), e.column());
576 DeltaTableError::InvalidJsonLog {
577 json_err: e,
578 line,
579 version,
580 }
581 })
582 })
583 .collect()
584}
585
586impl std::fmt::Debug for dyn LogStore + '_ {
588 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
589 write!(f, "{}({})", self.name(), self.root_uri())
590 }
591}
592
593impl Serialize for LogStoreConfig {
594 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
595 where
596 S: serde::Serializer,
597 {
598 let mut seq = serializer.serialize_seq(None)?;
599 seq.serialize_element(&self.location.to_string())?;
600 seq.serialize_element(&self.options.raw)?;
601 seq.end()
602 }
603}
604
605impl<'de> Deserialize<'de> for LogStoreConfig {
606 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
607 where
608 D: serde::Deserializer<'de>,
609 {
610 struct LogStoreConfigVisitor {}
611
612 impl<'de> Visitor<'de> for LogStoreConfigVisitor {
613 type Value = LogStoreConfig;
614
615 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
616 formatter.write_str("struct LogStoreConfig")
617 }
618
619 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
620 where
621 A: SeqAccess<'de>,
622 {
623 let location_str: String = seq
624 .next_element()?
625 .ok_or_else(|| A::Error::invalid_length(0, &self))?;
626 let options: HashMap<String, String> = seq
627 .next_element()?
628 .ok_or_else(|| A::Error::invalid_length(0, &self))?;
629 let location = Url::parse(&location_str).map_err(A::Error::custom)?;
630 Ok(LogStoreConfig {
631 location,
632 options: StorageConfig::parse_options(options).map_err(A::Error::custom)?,
633 })
634 }
635 }
636
637 deserializer.deserialize_seq(LogStoreConfigVisitor {})
638 }
639}
640
641pub fn extract_version_from_filename(name: &str) -> Option<i64> {
643 DELTA_LOG_REGEX
644 .captures(name)
645 .map(|captures| captures.get(1).unwrap().as_str().parse().unwrap())
646}
647
648pub async fn get_latest_version(
650 log_store: &dyn LogStore,
651 current_version: i64,
652) -> DeltaResult<i64> {
653 let current_version = if current_version < 0 {
654 0
655 } else {
656 current_version
657 };
658
659 let storage = log_store.engine(None).storage_handler();
660 let log_root = log_store.log_root_url();
661
662 let segment = spawn_blocking_with_span(move || {
663 LogSegment::for_table_changes(storage.as_ref(), log_root, current_version as u64, None)
664 })
665 .await
666 .map_err(|e| DeltaTableError::Generic(e.to_string()))?
667 .map_err(|e| {
668 if e.to_string()
669 .contains(&format!("to have version {current_version}"))
670 {
671 DeltaTableError::InvalidVersion(current_version)
672 } else {
673 DeltaTableError::Generic(e.to_string())
674 }
675 })?;
676
677 Ok(segment.end_version as i64)
678}
679
680#[instrument(skip(storage), fields(version = version, path = %commit_uri_from_version(version)))]
682pub async fn read_commit_entry(
683 storage: &dyn ObjectStore,
684 version: i64,
685) -> DeltaResult<Option<Bytes>> {
686 let commit_uri = commit_uri_from_version(version);
687 match storage.get(&commit_uri).await {
688 Ok(res) => {
689 let bytes = res.bytes().await?;
690 debug!(size = bytes.len(), "commit entry read successfully");
691 Ok(Some(bytes))
692 }
693 Err(ObjectStoreError::NotFound { .. }) => {
694 debug!("commit entry not found");
695 Ok(None)
696 }
697 Err(err) => {
698 error!(error = %err, version = version, "failed to read commit entry");
699 Err(err.into())
700 }
701 }
702}
703
704#[instrument(skip(storage), fields(version = version, tmp_path = %tmp_commit, commit_path = %commit_uri_from_version(version)))]
706pub async fn write_commit_entry(
707 storage: &dyn ObjectStore,
708 version: i64,
709 tmp_commit: &Path,
710) -> Result<(), TransactionError> {
711 storage
714 .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version))
715 .await
716 .map_err(|err| -> TransactionError {
717 match err {
718 ObjectStoreError::AlreadyExists { .. } => {
719 warn!("commit entry already exists");
720 TransactionError::VersionAlreadyExists(version)
721 }
722 _ => {
723 error!(error = %err, "failed to write commit entry");
724 TransactionError::from(err)
725 }
726 }
727 })?;
728 debug!("commit entry written successfully");
729 Ok(())
730}
731
732#[instrument(skip(storage), fields(version = _version, tmp_path = %tmp_commit))]
734pub async fn abort_commit_entry(
735 storage: &dyn ObjectStore,
736 _version: i64,
737 tmp_commit: &Path,
738) -> Result<(), TransactionError> {
739 storage.delete_with_retries(tmp_commit, 15).await?;
740 debug!("commit entry aborted successfully");
741 Ok(())
742}
743
744#[cfg(test)]
745pub(crate) mod tests {
746 use futures::TryStreamExt;
747
748 use super::*;
749
750 #[test]
751 fn logstore_with_invalid_url() {
752 let location = Url::parse("nonexistent://table").unwrap();
753
754 let store = logstore_for(location, StorageConfig::default());
755 assert!(store.is_err());
756 }
757
758 #[test]
759 fn logstore_with_memory() {
760 let location = Url::parse("memory:///table").unwrap();
761 let store = logstore_for(location, StorageConfig::default());
762 assert!(store.is_ok());
763 }
764
765 #[test]
766 fn logstore_with_memory_and_rt() {
767 let location = Url::parse("memory:///table").unwrap();
768 let store = logstore_for(
769 location,
770 StorageConfig::default().with_io_runtime(IORuntime::default()),
771 );
772 assert!(store.is_ok());
773 }
774
775 #[test]
776 fn test_logstore_ext() {
777 let location = Url::parse("memory:///table").unwrap();
778 let store = logstore_for(location, StorageConfig::default()).unwrap();
779 let table_url = store.table_root_url();
780 assert!(table_url.path().ends_with('/'));
781 let log_url = store.log_root_url();
782 assert!(log_url.path().ends_with("_delta_log/"));
783 }
784
785 #[tokio::test]
786 async fn test_is_location_a_table() {
787 use object_store::path::Path;
788 use object_store::{PutOptions, PutPayload};
789 let location = Url::parse("memory:///table").unwrap();
790 let store =
791 logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
792 assert!(!store
793 .is_delta_table_location()
794 .await
795 .expect("Failed to look at table"));
796
797 let payload = PutPayload::from_static(b"test-drivin");
800 let _put = store
801 .object_store(None)
802 .put_opts(
803 &Path::from("_delta_log/_commit_failed.tmp"),
804 payload,
805 PutOptions::default(),
806 )
807 .await
808 .expect("Failed to put");
809 assert!(!store
810 .is_delta_table_location()
811 .await
812 .expect("Failed to look at table"));
813 }
814
815 #[tokio::test]
816 async fn test_is_location_a_table_commit() {
817 use object_store::path::Path;
818 use object_store::{PutOptions, PutPayload};
819 let location = Url::parse("memory:///table").unwrap();
820 let store =
821 logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
822 assert!(!store
823 .is_delta_table_location()
824 .await
825 .expect("Failed to identify table"));
826
827 let payload = PutPayload::from_static(b"test");
829 let _put = store
830 .object_store(None)
831 .put_opts(
832 &Path::from("_delta_log/00000000000000000000.json"),
833 payload,
834 PutOptions::default(),
835 )
836 .await
837 .expect("Failed to put");
838 assert!(store
840 .is_delta_table_location()
841 .await
842 .expect("Failed to identify table"));
843 }
844
845 #[tokio::test]
846 async fn test_is_location_a_table_checkpoint() {
847 use object_store::path::Path;
848 use object_store::{PutOptions, PutPayload};
849 let location = Url::parse("memory:///table").unwrap();
850 let store =
851 logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
852 assert!(!store
853 .is_delta_table_location()
854 .await
855 .expect("Failed to identify table"));
856
857 let payload = PutPayload::from_static(b"test");
859 let _put = store
860 .object_store(None)
861 .put_opts(
862 &Path::from("_delta_log/00000000000000000000.checkpoint.parquet"),
863 payload,
864 PutOptions::default(),
865 )
866 .await
867 .expect("Failed to put");
868 assert!(store
870 .is_delta_table_location()
871 .await
872 .expect("Failed to identify table"));
873 }
874
875 #[tokio::test]
876 async fn test_is_location_a_table_crc() {
877 use object_store::path::Path;
878 use object_store::{PutOptions, PutPayload};
879 let location = Url::parse("memory:///table").unwrap();
880 let store =
881 logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
882 assert!(!store
883 .is_delta_table_location()
884 .await
885 .expect("Failed to identify table"));
886
887 let payload = PutPayload::from_static(b"test");
889
890 let _put = store
891 .object_store(None)
892 .put_opts(
893 &Path::from("_delta_log/.00000000000000000000.crc.crc"),
894 payload.clone(),
895 PutOptions::default(),
896 )
897 .await
898 .expect("Failed to put");
899
900 let _put = store
901 .object_store(None)
902 .put_opts(
903 &Path::from("_delta_log/.00000000000000000000.json.crc"),
904 payload.clone(),
905 PutOptions::default(),
906 )
907 .await
908 .expect("Failed to put");
909
910 let _put = store
911 .object_store(None)
912 .put_opts(
913 &Path::from("_delta_log/00000000000000000000.crc"),
914 payload.clone(),
915 PutOptions::default(),
916 )
917 .await
918 .expect("Failed to put");
919
920 let _put = store
922 .object_store(None)
923 .put_opts(
924 &Path::from("_delta_log/00000000000000000000.json"),
925 payload.clone(),
926 PutOptions::default(),
927 )
928 .await
929 .expect("Failed to put");
930
931 assert!(store
933 .is_delta_table_location()
934 .await
935 .expect("Failed to identify table"));
936 }
937
938 #[tokio::test]
940 async fn test_peek_with_invalid_json() -> DeltaResult<()> {
941 use crate::logstore::object_store::memory::InMemory;
942 let memory_store = Arc::new(InMemory::new());
943 let log_path = Path::from("delta-table/_delta_log/00000000000000000001.json");
944
945 let log_content = r#"{invalid_json"#;
946
947 memory_store
948 .put(&log_path, log_content.into())
949 .await
950 .expect("Failed to write log file");
951
952 let table_uri = url::Url::parse("memory:///delta-table").unwrap();
953 let table = crate::DeltaTableBuilder::from_uri(table_uri.clone())
954 .unwrap()
955 .with_storage_backend(memory_store, table_uri)
956 .build()?;
957
958 let result = table.log_store().peek_next_commit(0).await;
959 assert!(result.is_err());
960 Ok(())
961 }
962
963 pub(crate) async fn flatten_list_stream(
965 storage: &object_store::DynObjectStore,
966 prefix: Option<&Path>,
967 ) -> object_store::Result<Vec<Path>> {
968 storage
969 .list(prefix)
970 .map_ok(|meta| meta.location)
971 .try_collect::<Vec<Path>>()
972 .await
973 }
974}
975
976#[cfg(all(test, feature = "datafusion"))]
977mod datafusion_tests {
978 use super::*;
979 use url::Url;
980
981 #[tokio::test]
982 async fn test_unique_object_store_url() {
983 for (location_1, location_2) in [
984 ("file:///path/to/table_1", "file:///path/to/table_2"),
986 ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"),
988 ("s3://bucket_1/table_1", "s3://bucket_2/table_1"),
990 ] {
991 let url_1 = Url::parse(location_1).unwrap();
992 let url_2 = Url::parse(location_2).unwrap();
993
994 assert_ne!(
995 object_store_url(&url_1).as_str(),
996 object_store_url(&url_2).as_str(),
997 );
998 }
999 }
1000
1001 #[tokio::test]
1002 async fn test_get_actions_malformed_json() {
1003 let malformed_json = bytes::Bytes::from(
1004 r#"{"add": {"path": "test.parquet", "partitionValues": {}, "size": 100, "modificationTime": 1234567890, "dataChange": true}}
1005{"invalid json without closing brace"#,
1006 );
1007
1008 let result = get_actions(0, &malformed_json);
1009
1010 match result {
1011 Err(DeltaTableError::InvalidJsonLog {
1012 line,
1013 version,
1014 json_err,
1015 }) => {
1016 assert_eq!(version, 0);
1017 assert!(line.contains("line 2"));
1018 assert!(json_err.is_eof());
1019 }
1020 other => panic!("Expected InvalidJsonLog error, got {:?}", other),
1021 }
1022 }
1023}