1use std::collections::HashMap;
51use std::sync::{Arc, LazyLock};
52
53use bytes::Bytes;
54#[cfg(feature = "datafusion")]
55use datafusion::datasource::object_store::ObjectStoreUrl;
56use delta_kernel::engine::default::executor::tokio::{
57 TokioBackgroundExecutor, TokioMultiThreadExecutor,
58};
59use delta_kernel::engine::default::DefaultEngine;
60use delta_kernel::log_segment::LogSegment;
61use delta_kernel::path::{LogPathFileType, ParsedLogPath};
62use delta_kernel::{AsAny, Engine};
63use futures::StreamExt;
64use object_store::ObjectStoreScheme;
65use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
66use regex::Regex;
67use serde::de::{Error, SeqAccess, Visitor};
68use serde::ser::SerializeSeq;
69use serde::{Deserialize, Serialize};
70use serde_json::Deserializer;
71use tokio::runtime::RuntimeFlavor;
72use tokio::task::spawn_blocking;
73use tracing::*;
74use url::Url;
75use uuid::Uuid;
76
77use crate::kernel::transaction::TransactionError;
78use crate::kernel::Action;
79use crate::{DeltaResult, DeltaTableError};
80
81pub use self::config::StorageConfig;
82pub use self::factories::{
83 logstore_factories, object_store_factories, store_for, LogStoreFactory,
84 LogStoreFactoryRegistry, ObjectStoreFactory, ObjectStoreFactoryRegistry,
85};
86pub use self::storage::utils::commit_uri_from_version;
87pub use self::storage::{
88 DefaultObjectStoreRegistry, DeltaIOStorageBackend, IORuntime, ObjectStoreRef,
89 ObjectStoreRegistry, ObjectStoreRetryExt,
90};
91pub use ::object_store;
93
94pub mod config;
95pub(crate) mod default_logstore;
96pub(crate) mod factories;
97pub(crate) mod storage;
98
99trait LogStoreFactoryExt {
101 fn with_options_internal(
110 &self,
111 root_store: ObjectStoreRef,
112 location: &Url,
113 options: &StorageConfig,
114 ) -> DeltaResult<LogStoreRef>;
115}
116
117impl<T: LogStoreFactory + ?Sized> LogStoreFactoryExt for T {
118 fn with_options_internal(
119 &self,
120 root_store: ObjectStoreRef,
121 location: &Url,
122 options: &StorageConfig,
123 ) -> DeltaResult<LogStoreRef> {
124 let prefixed_store = options.decorate_store(root_store.clone(), location)?;
125 let log_store =
126 self.with_options(Arc::new(prefixed_store), root_store, location, options)?;
127 Ok(log_store)
128 }
129}
130
131impl<T: LogStoreFactory> LogStoreFactoryExt for Arc<T> {
132 fn with_options_internal(
133 &self,
134 root_store: ObjectStoreRef,
135 location: &Url,
136 options: &StorageConfig,
137 ) -> DeltaResult<LogStoreRef> {
138 T::with_options_internal(self, root_store, location, options)
139 }
140}
141
142pub fn default_logstore(
144 prefixed_store: ObjectStoreRef,
145 root_store: ObjectStoreRef,
146 location: &Url,
147 options: &StorageConfig,
148) -> Arc<dyn LogStore> {
149 Arc::new(default_logstore::DefaultLogStore::new(
150 prefixed_store,
151 root_store,
152 LogStoreConfig {
153 location: location.clone(),
154 options: options.clone(),
155 },
156 ))
157}
158
159pub type LogStoreRef = Arc<dyn LogStore>;
161
162static DELTA_LOG_PATH: LazyLock<Path> = LazyLock::new(|| Path::from("_delta_log"));
163
164pub(crate) static DELTA_LOG_REGEX: LazyLock<Regex> =
165 LazyLock::new(|| Regex::new(r"(\d{20})\.(json|checkpoint(\.\d+)?\.parquet)$").unwrap());
166
167pub fn logstore_for(location: Url, storage_config: StorageConfig) -> DeltaResult<LogStoreRef> {
180 let scheme = Url::parse(&format!("{}://", location.scheme()))
182 .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;
183
184 if let Some(entry) = object_store_factories().get(&scheme) {
185 debug!("Found a storage provider for {scheme} ({location})");
186 let (root_store, _prefix) = entry.value().parse_url_opts(&location, &storage_config)?;
187 return logstore_with(root_store, location, storage_config);
188 }
189
190 Err(DeltaTableError::InvalidTableLocation(location.into()))
191}
192
193pub fn logstore_with(
195 root_store: ObjectStoreRef,
196 location: Url,
197 storage_config: StorageConfig,
198) -> DeltaResult<LogStoreRef> {
199 let scheme = Url::parse(&format!("{}://", location.scheme()))
200 .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;
201
202 if let Some(factory) = logstore_factories().get(&scheme) {
203 debug!("Found a logstore provider for {scheme}");
204 return factory
205 .value()
206 .with_options_internal(root_store, &location, &storage_config);
207 }
208
209 error!("Could not find a logstore for the scheme {scheme}");
210 Err(DeltaTableError::InvalidTableLocation(
211 location.clone().into(),
212 ))
213}
214
215#[derive(Clone)]
217pub enum CommitOrBytes {
218 TmpCommit(Path),
220 LogBytes(Bytes),
222}
223
224#[derive(Debug)]
227pub enum PeekCommit {
228 New(i64, Vec<Action>),
230 UpToDate,
232}
233
234#[derive(Debug, Clone)]
236pub struct LogStoreConfig {
237 pub location: Url,
239 pub options: StorageConfig,
241}
242
243impl LogStoreConfig {
244 pub fn decorate_store<T: ObjectStore + Clone>(
245 &self,
246 store: T,
247 table_root: Option<&url::Url>,
248 ) -> DeltaResult<Box<dyn ObjectStore>> {
249 let table_url = table_root.unwrap_or(&self.location);
250 self.options.decorate_store(store, table_url)
251 }
252
253 pub fn object_store_factory(&self) -> ObjectStoreFactoryRegistry {
254 self::factories::object_store_factories()
255 }
256}
257
258#[async_trait::async_trait]
269pub trait LogStore: Send + Sync + AsAny {
270 fn name(&self) -> String;
272
273 async fn refresh(&self) -> DeltaResult<()> {
275 Ok(())
276 }
277
278 async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>>;
280
281 async fn write_commit_entry(
286 &self,
287 version: i64,
288 commit_or_bytes: CommitOrBytes,
289 operation_id: Uuid,
290 ) -> Result<(), TransactionError>;
291
292 async fn abort_commit_entry(
294 &self,
295 version: i64,
296 commit_or_bytes: CommitOrBytes,
297 operation_id: Uuid,
298 ) -> Result<(), TransactionError>;
299
300 async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;
302
303 async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
305 let next_version = current_version + 1;
306 let commit_log_bytes = match self.read_commit_entry(next_version).await {
307 Ok(Some(bytes)) => Ok(bytes),
308 Ok(None) => return Ok(PeekCommit::UpToDate),
309 Err(err) => Err(err),
310 }?;
311
312 let actions = crate::logstore::get_actions(next_version, &commit_log_bytes);
313 Ok(PeekCommit::New(next_version, actions?))
314 }
315
316 fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore>;
318
319 fn root_object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore>;
320
321 fn engine(&self, operation_id: Option<Uuid>) -> Arc<dyn Engine> {
322 let store = self.root_object_store(operation_id);
323 get_engine(store)
324 }
325
326 fn to_uri(&self, location: &Path) -> String {
328 let root = &self.config().location;
329 to_uri(root, location)
330 }
331
332 fn root_uri(&self) -> String {
334 self.to_uri(&Path::from(""))
335 }
336
337 fn log_path(&self) -> &Path {
339 &DELTA_LOG_PATH
340 }
341
342 #[deprecated(
343 since = "0.1.0",
344 note = "DO NOT USE: Just a stop gap to support lakefs during kernel migration"
345 )]
346 fn transaction_url(&self, _operation_id: Uuid, base: &Url) -> DeltaResult<Url> {
347 Ok(base.clone())
348 }
349
350 async fn is_delta_table_location(&self) -> DeltaResult<bool> {
352 let object_store = self.object_store(None);
353 let dummy_url = Url::parse("http://example.com").unwrap();
354 let log_path = Path::from("_delta_log");
355
356 let mut stream = object_store.list(Some(&log_path));
357 while let Some(res) = stream.next().await {
358 match res {
359 Ok(meta) => {
360 let file_url = dummy_url.join(meta.location.as_ref()).unwrap();
361 if let Ok(Some(parsed_path)) = ParsedLogPath::try_from(file_url) {
362 if matches!(
363 parsed_path.file_type,
364 LogPathFileType::Commit
365 | LogPathFileType::SinglePartCheckpoint
366 | LogPathFileType::UuidCheckpoint(_)
367 | LogPathFileType::MultiPartCheckpoint { .. }
368 | LogPathFileType::CompactedCommit { .. }
369 ) {
370 return Ok(true);
371 }
372 }
373 continue;
374 }
375 Err(ObjectStoreError::NotFound { .. }) => return Ok(false),
376 Err(err) => return Err(err.into()),
377 }
378 }
379
380 Ok(false)
381 }
382
383 fn config(&self) -> &LogStoreConfig;
385
386 #[cfg(feature = "datafusion")]
387 fn object_store_url(&self) -> ObjectStoreUrl {
393 crate::logstore::object_store_url(&self.config().location)
394 }
395}
396
397pub(crate) trait LogStoreExt: LogStore {
399 fn table_root_url(&self) -> Url {
404 let mut base = self.config().location.clone();
405 if !base.path().ends_with("/") {
406 base.set_path(&format!("{}/", base.path()));
407 }
408 base
409 }
410
411 fn log_root_url(&self) -> Url {
416 self.table_root_url().join("_delta_log/").unwrap()
417 }
418}
419
420impl<T: LogStore + ?Sized> LogStoreExt for T {}
421
422#[async_trait::async_trait]
423impl<T: LogStore + ?Sized> LogStore for Arc<T> {
424 fn name(&self) -> String {
425 T::name(self)
426 }
427
428 async fn refresh(&self) -> DeltaResult<()> {
429 T::refresh(self).await
430 }
431
432 async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
433 T::read_commit_entry(self, version).await
434 }
435
436 async fn write_commit_entry(
437 &self,
438 version: i64,
439 commit_or_bytes: CommitOrBytes,
440 operation_id: Uuid,
441 ) -> Result<(), TransactionError> {
442 T::write_commit_entry(self, version, commit_or_bytes, operation_id).await
443 }
444
445 async fn abort_commit_entry(
446 &self,
447 version: i64,
448 commit_or_bytes: CommitOrBytes,
449 operation_id: Uuid,
450 ) -> Result<(), TransactionError> {
451 T::abort_commit_entry(self, version, commit_or_bytes, operation_id).await
452 }
453
454 async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64> {
455 T::get_latest_version(self, start_version).await
456 }
457
458 async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
459 T::peek_next_commit(self, current_version).await
460 }
461
462 fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
463 T::object_store(self, operation_id)
464 }
465
466 fn root_object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
467 T::root_object_store(self, operation_id)
468 }
469
470 fn engine(&self, operation_id: Option<Uuid>) -> Arc<dyn Engine> {
471 T::engine(self, operation_id)
472 }
473
474 fn to_uri(&self, location: &Path) -> String {
475 T::to_uri(self, location)
476 }
477
478 fn root_uri(&self) -> String {
479 T::root_uri(self)
480 }
481
482 fn log_path(&self) -> &Path {
483 T::log_path(self)
484 }
485
486 async fn is_delta_table_location(&self) -> DeltaResult<bool> {
487 T::is_delta_table_location(self).await
488 }
489
490 fn config(&self) -> &LogStoreConfig {
491 T::config(self)
492 }
493
494 #[cfg(feature = "datafusion")]
495 fn object_store_url(&self) -> ObjectStoreUrl {
496 T::object_store_url(self)
497 }
498}
499
500pub(crate) fn get_engine(store: Arc<dyn ObjectStore>) -> Arc<dyn Engine> {
501 let handle = tokio::runtime::Handle::current();
502 match handle.runtime_flavor() {
503 RuntimeFlavor::MultiThread => Arc::new(DefaultEngine::new(
504 store,
505 Arc::new(TokioMultiThreadExecutor::new(handle)),
506 )),
507 RuntimeFlavor::CurrentThread => Arc::new(DefaultEngine::new(
508 store,
509 Arc::new(TokioBackgroundExecutor::new()),
510 )),
511 _ => panic!("unsupported runtime flavor"),
512 }
513}
514
515#[cfg(feature = "datafusion")]
516fn object_store_url(location: &Url) -> ObjectStoreUrl {
517 use object_store::path::DELIMITER;
518 ObjectStoreUrl::parse(format!(
519 "delta-rs://{}-{}{}",
520 location.scheme(),
521 location.host_str().unwrap_or("-"),
522 location.path().replace(DELIMITER, "-").replace(':', "-")
523 ))
524 .expect("Invalid object store url.")
525}
526
527pub(crate) fn object_store_path(table_root: &Url) -> DeltaResult<Path> {
530 Ok(match ObjectStoreScheme::parse(table_root) {
531 Ok((_, path)) => path,
532 _ => Path::parse(table_root.path())?,
533 })
534}
535
536pub fn to_uri(root: &Url, location: &Path) -> String {
538 match root.scheme() {
539 "file" => {
540 #[cfg(windows)]
541 let uri = format!(
542 "{}/{}",
543 root.as_ref().trim_end_matches('/'),
544 location.as_ref()
545 )
546 .replace("file:///", "");
547 #[cfg(unix)]
548 let uri = format!(
549 "{}/{}",
550 root.as_ref().trim_end_matches('/'),
551 location.as_ref()
552 )
553 .replace("file://", "");
554 uri
555 }
556 _ => {
557 if location.as_ref().is_empty() || location.as_ref() == "/" {
558 root.as_ref().to_string()
559 } else {
560 format!("{}/{}", root.as_ref(), location.as_ref())
561 }
562 }
563 }
564}
565
566pub fn get_actions(
568 version: i64,
569 commit_log_bytes: &bytes::Bytes,
570) -> Result<Vec<Action>, DeltaTableError> {
571 debug!("parsing commit with version {version}...");
572 Deserializer::from_slice(commit_log_bytes)
573 .into_iter::<Action>()
574 .map(|result| {
575 result.map_err(|e| {
576 let line = format!("Error at line {}, column {}", e.line(), e.column());
577 DeltaTableError::InvalidJsonLog {
578 json_err: e,
579 line,
580 version,
581 }
582 })
583 })
584 .collect()
585}
586
587impl std::fmt::Debug for dyn LogStore + '_ {
589 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
590 write!(f, "{}({})", self.name(), self.root_uri())
591 }
592}
593
594impl Serialize for LogStoreConfig {
595 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
596 where
597 S: serde::Serializer,
598 {
599 let mut seq = serializer.serialize_seq(None)?;
600 seq.serialize_element(&self.location.to_string())?;
601 seq.serialize_element(&self.options.raw)?;
602 seq.end()
603 }
604}
605
606impl<'de> Deserialize<'de> for LogStoreConfig {
607 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
608 where
609 D: serde::Deserializer<'de>,
610 {
611 struct LogStoreConfigVisitor {}
612
613 impl<'de> Visitor<'de> for LogStoreConfigVisitor {
614 type Value = LogStoreConfig;
615
616 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
617 formatter.write_str("struct LogStoreConfig")
618 }
619
620 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
621 where
622 A: SeqAccess<'de>,
623 {
624 let location_str: String = seq
625 .next_element()?
626 .ok_or_else(|| A::Error::invalid_length(0, &self))?;
627 let options: HashMap<String, String> = seq
628 .next_element()?
629 .ok_or_else(|| A::Error::invalid_length(0, &self))?;
630 let location = Url::parse(&location_str).map_err(A::Error::custom)?;
631 Ok(LogStoreConfig {
632 location,
633 options: StorageConfig::parse_options(options).map_err(A::Error::custom)?,
634 })
635 }
636 }
637
638 deserializer.deserialize_seq(LogStoreConfigVisitor {})
639 }
640}
641
642pub fn extract_version_from_filename(name: &str) -> Option<i64> {
644 DELTA_LOG_REGEX
645 .captures(name)
646 .map(|captures| captures.get(1).unwrap().as_str().parse().unwrap())
647}
648
649pub async fn get_latest_version(
651 log_store: &dyn LogStore,
652 current_version: i64,
653) -> DeltaResult<i64> {
654 let current_version = if current_version < 0 {
655 0
656 } else {
657 current_version
658 };
659
660 let storage = log_store.engine(None).storage_handler();
661 let log_root = log_store.log_root_url();
662
663 let segment = spawn_blocking(move || {
664 LogSegment::for_table_changes(storage.as_ref(), log_root, current_version as u64, None)
665 })
666 .await
667 .map_err(|e| DeltaTableError::Generic(e.to_string()))?
668 .map_err(|e| {
669 if e.to_string()
670 .contains(&format!("to have version {current_version}"))
671 {
672 DeltaTableError::InvalidVersion(current_version)
673 } else {
674 DeltaTableError::Generic(e.to_string())
675 }
676 })?;
677
678 Ok(segment.end_version as i64)
679}
680
681#[instrument(skip(storage), fields(version = version, path = %commit_uri_from_version(version)))]
683pub async fn read_commit_entry(
684 storage: &dyn ObjectStore,
685 version: i64,
686) -> DeltaResult<Option<Bytes>> {
687 let commit_uri = commit_uri_from_version(version);
688 match storage.get(&commit_uri).await {
689 Ok(res) => {
690 let bytes = res.bytes().await?;
691 debug!(size = bytes.len(), "commit entry read successfully");
692 Ok(Some(bytes))
693 }
694 Err(ObjectStoreError::NotFound { .. }) => {
695 debug!("commit entry not found");
696 Ok(None)
697 }
698 Err(err) => {
699 error!(error = %err, version = version, "failed to read commit entry");
700 Err(err.into())
701 }
702 }
703}
704
705#[instrument(skip(storage), fields(version = version, tmp_path = %tmp_commit, commit_path = %commit_uri_from_version(version)))]
707pub async fn write_commit_entry(
708 storage: &dyn ObjectStore,
709 version: i64,
710 tmp_commit: &Path,
711) -> Result<(), TransactionError> {
712 storage
715 .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version))
716 .await
717 .map_err(|err| -> TransactionError {
718 match err {
719 ObjectStoreError::AlreadyExists { .. } => {
720 warn!("commit entry already exists");
721 TransactionError::VersionAlreadyExists(version)
722 }
723 _ => {
724 error!(error = %err, "failed to write commit entry");
725 TransactionError::from(err)
726 }
727 }
728 })?;
729 debug!("commit entry written successfully");
730 Ok(())
731}
732
733#[instrument(skip(storage), fields(version = _version, tmp_path = %tmp_commit))]
735pub async fn abort_commit_entry(
736 storage: &dyn ObjectStore,
737 _version: i64,
738 tmp_commit: &Path,
739) -> Result<(), TransactionError> {
740 storage.delete_with_retries(tmp_commit, 15).await?;
741 debug!("commit entry aborted successfully");
742 Ok(())
743}
744
745#[cfg(test)]
746pub(crate) mod tests {
747 use futures::TryStreamExt;
748
749 use super::*;
750
751 #[test]
752 fn logstore_with_invalid_url() {
753 let location = Url::parse("nonexistent://table").unwrap();
754
755 let store = logstore_for(location, StorageConfig::default());
756 assert!(store.is_err());
757 }
758
759 #[test]
760 fn logstore_with_memory() {
761 let location = Url::parse("memory:///table").unwrap();
762 let store = logstore_for(location, StorageConfig::default());
763 assert!(store.is_ok());
764 }
765
766 #[test]
767 fn logstore_with_memory_and_rt() {
768 let location = Url::parse("memory:///table").unwrap();
769 let store = logstore_for(
770 location,
771 StorageConfig::default().with_io_runtime(IORuntime::default()),
772 );
773 assert!(store.is_ok());
774 }
775
776 #[test]
777 fn test_logstore_ext() {
778 let location = Url::parse("memory:///table").unwrap();
779 let store = logstore_for(location, StorageConfig::default()).unwrap();
780 let table_url = store.table_root_url();
781 assert!(table_url.path().ends_with('/'));
782 let log_url = store.log_root_url();
783 assert!(log_url.path().ends_with("_delta_log/"));
784 }
785
786 #[tokio::test]
787 async fn test_is_location_a_table() {
788 use object_store::path::Path;
789 use object_store::{PutOptions, PutPayload};
790 let location = Url::parse("memory:///table").unwrap();
791 let store =
792 logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
793 assert!(!store
794 .is_delta_table_location()
795 .await
796 .expect("Failed to look at table"));
797
798 let payload = PutPayload::from_static(b"test-drivin");
801 let _put = store
802 .object_store(None)
803 .put_opts(
804 &Path::from("_delta_log/_commit_failed.tmp"),
805 payload,
806 PutOptions::default(),
807 )
808 .await
809 .expect("Failed to put");
810 assert!(!store
811 .is_delta_table_location()
812 .await
813 .expect("Failed to look at table"));
814 }
815
816 #[tokio::test]
817 async fn test_is_location_a_table_commit() {
818 use object_store::path::Path;
819 use object_store::{PutOptions, PutPayload};
820 let location = Url::parse("memory:///table").unwrap();
821 let store =
822 logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
823 assert!(!store
824 .is_delta_table_location()
825 .await
826 .expect("Failed to identify table"));
827
828 let payload = PutPayload::from_static(b"test");
830 let _put = store
831 .object_store(None)
832 .put_opts(
833 &Path::from("_delta_log/00000000000000000000.json"),
834 payload,
835 PutOptions::default(),
836 )
837 .await
838 .expect("Failed to put");
839 assert!(store
841 .is_delta_table_location()
842 .await
843 .expect("Failed to identify table"));
844 }
845
846 #[tokio::test]
847 async fn test_is_location_a_table_checkpoint() {
848 use object_store::path::Path;
849 use object_store::{PutOptions, PutPayload};
850 let location = Url::parse("memory:///table").unwrap();
851 let store =
852 logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
853 assert!(!store
854 .is_delta_table_location()
855 .await
856 .expect("Failed to identify table"));
857
858 let payload = PutPayload::from_static(b"test");
860 let _put = store
861 .object_store(None)
862 .put_opts(
863 &Path::from("_delta_log/00000000000000000000.checkpoint.parquet"),
864 payload,
865 PutOptions::default(),
866 )
867 .await
868 .expect("Failed to put");
869 assert!(store
871 .is_delta_table_location()
872 .await
873 .expect("Failed to identify table"));
874 }
875
876 #[tokio::test]
877 async fn test_is_location_a_table_crc() {
878 use object_store::path::Path;
879 use object_store::{PutOptions, PutPayload};
880 let location = Url::parse("memory:///table").unwrap();
881 let store =
882 logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
883 assert!(!store
884 .is_delta_table_location()
885 .await
886 .expect("Failed to identify table"));
887
888 let payload = PutPayload::from_static(b"test");
890
891 let _put = store
892 .object_store(None)
893 .put_opts(
894 &Path::from("_delta_log/.00000000000000000000.crc.crc"),
895 payload.clone(),
896 PutOptions::default(),
897 )
898 .await
899 .expect("Failed to put");
900
901 let _put = store
902 .object_store(None)
903 .put_opts(
904 &Path::from("_delta_log/.00000000000000000000.json.crc"),
905 payload.clone(),
906 PutOptions::default(),
907 )
908 .await
909 .expect("Failed to put");
910
911 let _put = store
912 .object_store(None)
913 .put_opts(
914 &Path::from("_delta_log/00000000000000000000.crc"),
915 payload.clone(),
916 PutOptions::default(),
917 )
918 .await
919 .expect("Failed to put");
920
921 let _put = store
923 .object_store(None)
924 .put_opts(
925 &Path::from("_delta_log/00000000000000000000.json"),
926 payload.clone(),
927 PutOptions::default(),
928 )
929 .await
930 .expect("Failed to put");
931
932 assert!(store
934 .is_delta_table_location()
935 .await
936 .expect("Failed to identify table"));
937 }
938
939 #[tokio::test]
941 async fn test_peek_with_invalid_json() -> DeltaResult<()> {
942 use crate::logstore::object_store::memory::InMemory;
943 let memory_store = Arc::new(InMemory::new());
944 let log_path = Path::from("delta-table/_delta_log/00000000000000000001.json");
945
946 let log_content = r#"{invalid_json"#;
947
948 memory_store
949 .put(&log_path, log_content.into())
950 .await
951 .expect("Failed to write log file");
952
953 let table_uri = url::Url::parse("memory:///delta-table").unwrap();
954 let table = crate::DeltaTableBuilder::from_uri(table_uri.clone())
955 .unwrap()
956 .with_storage_backend(memory_store, table_uri)
957 .build()?;
958
959 let result = table.log_store().peek_next_commit(0).await;
960 assert!(result.is_err());
961 Ok(())
962 }
963
964 pub(crate) async fn flatten_list_stream(
966 storage: &object_store::DynObjectStore,
967 prefix: Option<&Path>,
968 ) -> object_store::Result<Vec<Path>> {
969 storage
970 .list(prefix)
971 .map_ok(|meta| meta.location)
972 .try_collect::<Vec<Path>>()
973 .await
974 }
975}
976
977#[cfg(all(test, feature = "datafusion"))]
978mod datafusion_tests {
979 use super::*;
980 use url::Url;
981
982 #[tokio::test]
983 async fn test_unique_object_store_url() {
984 for (location_1, location_2) in [
985 ("file:///path/to/table_1", "file:///path/to/table_2"),
987 ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"),
989 ("s3://bucket_1/table_1", "s3://bucket_2/table_1"),
991 ] {
992 let url_1 = Url::parse(location_1).unwrap();
993 let url_2 = Url::parse(location_2).unwrap();
994
995 assert_ne!(
996 object_store_url(&url_1).as_str(),
997 object_store_url(&url_2).as_str(),
998 );
999 }
1000 }
1001
1002 #[tokio::test]
1003 async fn test_get_actions_malformed_json() {
1004 let malformed_json = bytes::Bytes::from(
1005 r#"{"add": {"path": "test.parquet", "partitionValues": {}, "size": 100, "modificationTime": 1234567890, "dataChange": true}}
1006{"invalid json without closing brace"#,
1007 );
1008
1009 let result = get_actions(0, &malformed_json);
1010
1011 match result {
1012 Err(DeltaTableError::InvalidJsonLog {
1013 line,
1014 version,
1015 json_err,
1016 }) => {
1017 assert_eq!(version, 0);
1018 assert!(line.contains("line 2"));
1019 assert!(json_err.is_eof());
1020 }
1021 other => panic!("Expected InvalidJsonLog error, got {:?}", other),
1022 }
1023 }
1024}