use std::fmt::{self, Debug, Display, Formatter};
use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
use delta_kernel::path::{LogPathFileType, ParsedLogPath};
use futures::stream::BoxStream;
use object_store::ObjectMeta;
use object_store::{
CopyOptions, GetOptions, GetRange as ObjectStoreGetRange, GetResult, ListResult,
MultipartUpload, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult,
RenameOptions, path::Path,
};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
use url::Url;
use crate::logstore::default_logstore::DefaultLogStore;
use crate::logstore::{LogStoreRef, ObjectStoreRef};
#[derive(Debug, PartialEq)]
pub(crate) enum RecordedObjectStoreOperation {
Get(RecordedPathKind),
GetOpts(RecordedPathKind),
GetRange(RecordedPathKind, Range<u64>),
GetRanges(RecordedPathKind, Vec<Range<u64>>),
Head(RecordedPathKind),
List(RecordedPathKind),
ListWithOffset(RecordedPathKind),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum RecordedPathKind {
Commit,
Checkpoint,
Data,
Other,
}
impl RecordedObjectStoreOperation {
pub(crate) fn is_log_replay_read(&self) -> bool {
matches!(
self,
Self::Get(RecordedPathKind::Commit | RecordedPathKind::Checkpoint)
| Self::GetOpts(RecordedPathKind::Commit | RecordedPathKind::Checkpoint)
| Self::GetRange(RecordedPathKind::Commit | RecordedPathKind::Checkpoint, _)
| Self::GetRanges(RecordedPathKind::Commit | RecordedPathKind::Checkpoint, _)
| Self::Head(RecordedPathKind::Commit | RecordedPathKind::Checkpoint)
| Self::List(RecordedPathKind::Commit | RecordedPathKind::Checkpoint)
| Self::ListWithOffset(RecordedPathKind::Commit | RecordedPathKind::Checkpoint)
)
}
}
pub(crate) struct RecordingObjectStore {
inner: ObjectStoreRef,
operations: UnboundedSender<RecordedObjectStoreOperation>,
}
impl RecordingObjectStore {
pub(crate) fn with_operations(
inner: ObjectStoreRef,
operations: UnboundedSender<RecordedObjectStoreOperation>,
) -> Self {
Self { inner, operations }
}
}
impl Display for RecordingObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Display::fmt(&self.inner, f)
}
}
impl Debug for RecordingObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Debug::fmt(&self.inner, f)
}
}
pub(crate) async fn drain_recorded_object_store_operations(
operations: &mut UnboundedReceiver<RecordedObjectStoreOperation>,
) -> Vec<RecordedObjectStoreOperation> {
tokio::task::yield_now().await;
let mut recorded = Vec::new();
while let Ok(op) = operations.try_recv() {
recorded.push(op);
}
recorded
}
pub(crate) fn recording_log_store(
base: LogStoreRef,
) -> (LogStoreRef, UnboundedReceiver<RecordedObjectStoreOperation>) {
let (operations_sender, operations) = unbounded_channel();
let wrapped_prefixed_object_store = Arc::new(RecordingObjectStore::with_operations(
base.object_store(None),
operations_sender.clone(),
));
let wrapped_root_object_store = Arc::new(RecordingObjectStore::with_operations(
base.root_object_store(None),
operations_sender,
));
let log_store = Arc::new(DefaultLogStore::new(
wrapped_prefixed_object_store,
wrapped_root_object_store,
base.config().clone(),
));
(log_store, operations)
}
fn classify_path(path: &Path) -> RecordedPathKind {
let dummy_url = Url::parse("dummy:///").unwrap();
if let Some(parsed) = dummy_url
.join(path.as_ref())
.ok()
.and_then(|url| ParsedLogPath::try_from(url).ok())
.flatten()
{
return match parsed.file_type {
LogPathFileType::Commit
| LogPathFileType::StagedCommit
| LogPathFileType::CompactedCommit { .. }
| LogPathFileType::Crc
| LogPathFileType::Unknown => RecordedPathKind::Commit,
LogPathFileType::SinglePartCheckpoint
| LogPathFileType::UuidCheckpoint
| LogPathFileType::MultiPartCheckpoint { .. } => RecordedPathKind::Checkpoint,
};
}
let path_str = path.as_ref();
if path_str == "_delta_log" || path_str.starts_with("_delta_log/") {
RecordedPathKind::Commit
} else if path_str.starts_with("part-") {
RecordedPathKind::Data
} else {
RecordedPathKind::Other
}
}
fn classify_optional_path(path: Option<&Path>) -> RecordedPathKind {
path.map(classify_path).unwrap_or(RecordedPathKind::Other)
}
#[async_trait::async_trait]
impl ObjectStore for RecordingObjectStore {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> object_store::Result<PutResult> {
self.inner.put_opts(location, payload, opts).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart_opts(location, opts).await
}
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
let path_kind = classify_path(location);
let is_plain_get = options.if_match.is_none()
&& options.if_none_match.is_none()
&& options.if_modified_since.is_none()
&& options.if_unmodified_since.is_none()
&& options.range.is_none()
&& options.version.is_none()
&& !options.head;
let operation = if options.head {
RecordedObjectStoreOperation::Head(path_kind)
} else {
match options.range.as_ref() {
Some(ObjectStoreGetRange::Bounded(range)) => {
RecordedObjectStoreOperation::GetRange(path_kind, range.clone())
}
Some(_) => RecordedObjectStoreOperation::GetOpts(path_kind),
None if is_plain_get => RecordedObjectStoreOperation::Get(path_kind),
None => RecordedObjectStoreOperation::GetOpts(path_kind),
}
};
self.operations.send(operation).unwrap();
self.inner.get_opts(location, options).await
}
async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<u64>],
) -> object_store::Result<Vec<Bytes>> {
self.operations
.send(RecordedObjectStoreOperation::GetRanges(
classify_path(location),
ranges.to_vec(),
))
.unwrap();
self.inner.get_ranges(location, ranges).await
}
fn delete_stream(
&self,
locations: BoxStream<'static, object_store::Result<Path>>,
) -> BoxStream<'static, object_store::Result<Path>> {
self.inner.delete_stream(locations)
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
self.operations
.send(RecordedObjectStoreOperation::List(classify_optional_path(
prefix,
)))
.unwrap();
self.inner.list(prefix)
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
self.operations
.send(RecordedObjectStoreOperation::ListWithOffset(
classify_optional_path(prefix),
))
.unwrap();
self.inner.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}
async fn copy_opts(
&self,
from: &Path,
to: &Path,
options: CopyOptions,
) -> object_store::Result<()> {
self.inner.copy_opts(from, to, options).await
}
async fn rename_opts(
&self,
from: &Path,
to: &Path,
options: RenameOptions,
) -> object_store::Result<()> {
self.inner.rename_opts(from, to, options).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn classifies_checkpoint_paths() {
let path = Path::from("_delta_log/00000000000000000010.checkpoint.parquet");
assert_eq!(classify_path(&path), RecordedPathKind::Checkpoint);
}
#[test]
fn classifies_crc_paths_as_log_replay_reads() {
let path = Path::from("_delta_log/00000000000000000010.crc");
assert!(RecordedObjectStoreOperation::Get(classify_path(&path)).is_log_replay_read(),);
}
#[test]
fn classifies_staged_commit_paths_as_log_replay_reads() {
let path = Path::from(
"_delta_log/_staged_commits/00000000000000000010.12345678-1234-1234-1234-123456789abc.json",
);
assert!(RecordedObjectStoreOperation::Get(classify_path(&path)).is_log_replay_read(),);
}
}