1use std::sync::Arc;
19
20use datafusion_common::{DataFusionError, Result};
21use datafusion_execution::object_store::ObjectStoreUrl;
22use datafusion_session::Session;
23
24use futures::stream::BoxStream;
25use futures::{StreamExt, TryStreamExt};
26use glob::Pattern;
27use itertools::Itertools;
28use log::debug;
29use object_store::path::Path;
30use object_store::path::DELIMITER;
31use object_store::{ObjectMeta, ObjectStore};
32use url::Url;
33
34#[derive(Debug, Clone, Eq, PartialEq, Hash)]
37pub struct ListingTableUrl {
38 url: Url,
40 prefix: Path,
42 glob: Option<Pattern>,
44}
45
46impl ListingTableUrl {
47 pub fn parse(s: impl AsRef<str>) -> Result<Self> {
103 let s = s.as_ref();
104
105 #[cfg(not(target_arch = "wasm32"))]
107 if std::path::Path::new(s).is_absolute() {
108 return Self::parse_path(s);
109 }
110
111 match Url::parse(s) {
112 Ok(url) => Self::try_new(url, None),
113 #[cfg(not(target_arch = "wasm32"))]
114 Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
115 Err(e) => Err(DataFusionError::External(Box::new(e))),
116 }
117 }
118
119 #[cfg(not(target_arch = "wasm32"))]
121 fn parse_path(s: &str) -> Result<Self> {
122 let (path, glob) = match split_glob_expression(s) {
123 Some((prefix, glob)) => {
124 let glob = Pattern::new(glob)
125 .map_err(|e| DataFusionError::External(Box::new(e)))?;
126 (prefix, Some(glob))
127 }
128 None => (s, None),
129 };
130
131 let url = url_from_filesystem_path(path).ok_or_else(|| {
132 DataFusionError::External(
133 format!("Failed to convert path to URL: {path}").into(),
134 )
135 })?;
136
137 Self::try_new(url, glob)
138 }
139
140 pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
147 let prefix = Path::from_url_path(url.path())?;
148 Ok(Self { url, prefix, glob })
149 }
150
151 pub fn scheme(&self) -> &str {
153 self.url.scheme()
154 }
155
156 pub fn prefix(&self) -> &Path {
161 &self.prefix
162 }
163
164 pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
166 let Some(all_segments) = self.strip_prefix(path) else {
167 return false;
168 };
169
170 let mut segments = all_segments.filter(|s| !s.contains('='));
173
174 match &self.glob {
175 Some(glob) => {
176 if ignore_subdirectory {
177 segments
178 .next()
179 .is_some_and(|file_name| glob.matches(file_name))
180 } else {
181 let stripped = segments.join(DELIMITER);
182 glob.matches(&stripped)
183 }
184 }
185 None if ignore_subdirectory => segments.count() <= 1,
189 None => true,
191 }
192 }
193
194 pub fn is_collection(&self) -> bool {
196 self.url.path().ends_with(DELIMITER)
197 }
198
199 pub fn file_extension(&self) -> Option<&str> {
212 if let Some(mut segments) = self.url.path_segments() {
213 if let Some(last_segment) = segments.next_back() {
214 if last_segment.contains(".") && !last_segment.ends_with(".") {
215 return last_segment.split('.').next_back();
216 }
217 }
218 }
219
220 None
221 }
222
223 pub fn strip_prefix<'a, 'b: 'a>(
226 &'a self,
227 path: &'b Path,
228 ) -> Option<impl Iterator<Item = &'b str> + 'a> {
229 let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
230 if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
231 stripped = stripped.strip_prefix(DELIMITER)?;
232 }
233 Some(stripped.split_terminator(DELIMITER))
234 }
235
236 pub async fn list_all_files<'a>(
238 &'a self,
239 ctx: &'a dyn Session,
240 store: &'a dyn ObjectStore,
241 file_extension: &'a str,
242 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
243 let exec_options = &ctx.config_options().execution;
244 let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
245
246 let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
247 list_with_cache(ctx, store, &self.prefix).await?
248 } else {
249 match store.head(&self.prefix).await {
250 Ok(meta) => futures::stream::once(async { Ok(meta) })
251 .map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
252 .boxed(),
253 Err(object_store::Error::NotFound { .. }) => {
256 list_with_cache(ctx, store, &self.prefix).await?
257 }
258 Err(e) => return Err(e.into()),
259 }
260 };
261
262 Ok(list
263 .try_filter(move |meta| {
264 let path = &meta.location;
265 let extension_match = path.as_ref().ends_with(file_extension);
266 let glob_match = self.contains(path, ignore_subdirectory);
267 futures::future::ready(extension_match && glob_match)
268 })
269 .boxed())
270 }
271
272 pub fn as_str(&self) -> &str {
274 self.as_ref()
275 }
276
277 pub fn object_store(&self) -> ObjectStoreUrl {
279 let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
280 ObjectStoreUrl::parse(url).unwrap()
281 }
282
283 pub fn is_folder(&self) -> bool {
285 self.url.scheme() == "file" && self.is_collection()
286 }
287
288 pub fn get_url(&self) -> &Url {
290 &self.url
291 }
292
293 pub fn get_glob(&self) -> &Option<Pattern> {
295 &self.glob
296 }
297
298 pub fn with_glob(self, glob: &str) -> Result<Self> {
300 let glob =
301 Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
302 Self::try_new(self.url, Some(glob))
303 }
304}
305
306async fn list_with_cache<'b>(
307 ctx: &'b dyn Session,
308 store: &'b dyn ObjectStore,
309 prefix: &'b Path,
310) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
311 match ctx.runtime_env().cache_manager.get_list_files_cache() {
312 None => Ok(store
313 .list(Some(prefix))
314 .map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
315 .boxed()),
316 Some(cache) => {
317 let vec = if let Some(res) = cache.get(prefix) {
318 debug!("Hit list all files cache");
319 res.as_ref().clone()
320 } else {
321 let vec = store
322 .list(Some(prefix))
323 .try_collect::<Vec<ObjectMeta>>()
324 .await?;
325 cache.put(prefix, Arc::new(vec.clone()));
326 vec
327 };
328 Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
329 }
330 }
331}
332
333#[cfg(not(target_arch = "wasm32"))]
335fn url_from_filesystem_path(s: &str) -> Option<Url> {
336 let path = std::path::Path::new(s);
337 let is_dir = match path.exists() {
338 true => path.is_dir(),
339 false => std::path::is_separator(s.chars().last()?),
341 };
342
343 let from_absolute_path = |p| {
344 let first = match is_dir {
345 true => Url::from_directory_path(p).ok(),
346 false => Url::from_file_path(p).ok(),
347 }?;
348
349 Url::parse(first.as_str()).ok()
352 };
353
354 if path.is_absolute() {
355 return from_absolute_path(path);
356 }
357
358 let absolute = std::env::current_dir().ok()?.join(path);
359 from_absolute_path(&absolute)
360}
361
362impl AsRef<str> for ListingTableUrl {
363 fn as_ref(&self) -> &str {
364 self.url.as_ref()
365 }
366}
367
368impl AsRef<Url> for ListingTableUrl {
369 fn as_ref(&self) -> &Url {
370 &self.url
371 }
372}
373
374impl std::fmt::Display for ListingTableUrl {
375 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376 self.as_str().fmt(f)
377 }
378}
379
380#[cfg(not(target_arch = "wasm32"))]
381const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
382
383#[cfg(not(target_arch = "wasm32"))]
389fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
390 let mut last_separator = 0;
391
392 for (byte_idx, char) in path.char_indices() {
393 if GLOB_START_CHARS.contains(&char) {
394 if last_separator == 0 {
395 return Some((".", path));
396 }
397 return Some(path.split_at(last_separator));
398 }
399
400 if std::path::is_separator(char) {
401 last_separator = byte_idx + char.len_utf8();
402 }
403 }
404 None
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use async_trait::async_trait;
411 use bytes::Bytes;
412 use datafusion_common::config::TableOptions;
413 use datafusion_common::DFSchema;
414 use datafusion_execution::config::SessionConfig;
415 use datafusion_execution::runtime_env::RuntimeEnv;
416 use datafusion_execution::TaskContext;
417 use datafusion_expr::execution_props::ExecutionProps;
418 use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
419 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
420 use datafusion_physical_plan::ExecutionPlan;
421 use object_store::{
422 GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOptions,
423 PutPayload,
424 };
425 use std::any::Any;
426 use std::collections::HashMap;
427 use std::ops::Range;
428 use tempfile::tempdir;
429
430 #[test]
431 fn test_prefix_path() {
432 let root = std::env::current_dir().unwrap();
433 let root = root.to_string_lossy();
434
435 let url = ListingTableUrl::parse(root).unwrap();
436 let child = url.prefix.child("partition").child("file");
437
438 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
439 assert_eq!(prefix, vec!["partition", "file"]);
440
441 let url = ListingTableUrl::parse("file:///").unwrap();
442 let child = Path::parse("/foo/bar").unwrap();
443 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
444 assert_eq!(prefix, vec!["foo", "bar"]);
445
446 let url = ListingTableUrl::parse("file:///foo").unwrap();
447 let child = Path::parse("/foob/bar").unwrap();
448 assert!(url.strip_prefix(&child).is_none());
449
450 let url = ListingTableUrl::parse("file:///foo/file").unwrap();
451 let child = Path::parse("/foo/file").unwrap();
452 assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
453
454 let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
455 assert_eq!(url.prefix.as_ref(), "foo/ bar");
456
457 let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
458 assert_eq!(url.prefix.as_ref(), "foo/bar");
459
460 let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
461 assert_eq!(url.prefix.as_ref(), "foo/😺");
462
463 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
464 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
465
466 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
467 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
468
469 let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
470 assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
471
472 let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
473 assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
474
475 let dir = tempdir().unwrap();
476 let path = dir.path().join("bar%2Ffoo");
477 std::fs::File::create(&path).unwrap();
478
479 let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
480 assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
481
482 let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
483 assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
484
485 let url =
486 ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
487 assert_eq!(url.prefix.as_ref(), "baz/test.txt");
488
489 let workdir = std::env::current_dir().unwrap();
490 let t = workdir.join("non-existent");
491 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
492 let b = ListingTableUrl::parse("non-existent").unwrap();
493 assert_eq!(a, b);
494 assert!(a.prefix.as_ref().ends_with("non-existent"));
495
496 let t = workdir.parent().unwrap();
497 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
498 let b = ListingTableUrl::parse("..").unwrap();
499 assert_eq!(a, b);
500
501 let t = t.join("bar");
502 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
503 let b = ListingTableUrl::parse("../bar").unwrap();
504 assert_eq!(a, b);
505 assert!(a.prefix.as_ref().ends_with("bar"));
506
507 let t = t.join(".").join("foo").join("..").join("baz");
508 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
509 let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
510 assert_eq!(a, b);
511 assert!(a.prefix.as_ref().ends_with("bar/baz"));
512 }
513
514 #[test]
515 fn test_prefix_s3() {
516 let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
517 assert_eq!(url.prefix.as_ref(), "foo/bar");
518
519 let path = Path::from("foo/bar/partition/foo.parquet");
520 let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
521 assert_eq!(prefix, vec!["partition", "foo.parquet"]);
522
523 let path = Path::from("other/bar/partition/foo.parquet");
524 assert!(url.strip_prefix(&path).is_none());
525 }
526
527 #[test]
528 fn test_split_glob() {
529 fn test(input: &str, expected: Option<(&str, &str)>) {
530 assert_eq!(
531 split_glob_expression(input),
532 expected,
533 "testing split_glob_expression with {input}"
534 );
535 }
536
537 test("/", None);
539 test("/a.txt", None);
540 test("/a", None);
541 test("/a/", None);
542 test("/a/b", None);
543 test("/a/b/", None);
544 test("/a/b.txt", None);
545 test("/a/b/c.txt", None);
546 test("*.txt", Some((".", "*.txt")));
548 test("/*.txt", Some(("/", "*.txt")));
549 test("/a/*b.txt", Some(("/a/", "*b.txt")));
550 test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
551 test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
552 test("/a/b*.txt", Some(("/a/", "b*.txt")));
553 test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
554
555 test(
557 "/a/b/c//alltypes_plain*.parquet",
558 Some(("/a/b/c//", "alltypes_plain*.parquet")),
559 );
560 }
561
562 #[test]
563 fn test_is_collection() {
564 fn test(input: &str, expected: bool, message: &str) {
565 let url = ListingTableUrl::parse(input).unwrap();
566 assert_eq!(url.is_collection(), expected, "{message}");
567 }
568
569 test("https://a.b.c/path/", true, "path ends with / - collection");
570 test(
571 "https://a.b.c/path/?a=b",
572 true,
573 "path ends with / - with query args - collection",
574 );
575 test(
576 "https://a.b.c/path?a=b/",
577 false,
578 "path not ends with / - query ends with / - not collection",
579 );
580 test(
581 "https://a.b.c/path/#a=b",
582 true,
583 "path ends with / - with fragment - collection",
584 );
585 test(
586 "https://a.b.c/path#a=b/",
587 false,
588 "path not ends with / - fragment ends with / - not collection",
589 );
590 }
591
592 #[test]
593 fn test_file_extension() {
594 fn test(input: &str, expected: Option<&str>, message: &str) {
595 let url = ListingTableUrl::parse(input).unwrap();
596 assert_eq!(url.file_extension(), expected, "{message}");
597 }
598
599 test("https://a.b.c/path/", None, "path ends with / - not a file");
600 test(
601 "https://a.b.c/path/?a=b",
602 None,
603 "path ends with / - with query args - not a file",
604 );
605 test(
606 "https://a.b.c/path?a=b/",
607 None,
608 "path not ends with / - query ends with / but no file extension",
609 );
610 test(
611 "https://a.b.c/path/#a=b",
612 None,
613 "path ends with / - with fragment - not a file",
614 );
615 test(
616 "https://a.b.c/path#a=b/",
617 None,
618 "path not ends with / - fragment ends with / but no file extension",
619 );
620 test(
621 "file///some/path/",
622 None,
623 "file path ends with / - not a file",
624 );
625 test(
626 "file///some/path/file",
627 None,
628 "file path does not end with - no extension",
629 );
630 test(
631 "file///some/path/file.",
632 None,
633 "file path ends with . - no value after .",
634 );
635 test(
636 "file///some/path/file.ext",
637 Some("ext"),
638 "file path ends with .ext - extension is ext",
639 );
640 }
641
642 #[tokio::test]
643 async fn test_list_files() -> Result<()> {
644 let store = MockObjectStore {
645 in_mem: object_store::memory::InMemory::new(),
646 forbidden_paths: vec!["forbidden/e.parquet".into()],
647 };
648
649 create_file(&store, "a.parquet").await;
651 create_file(&store, "/t/b.parquet").await;
652 create_file(&store, "/t/c.csv").await;
653 create_file(&store, "/t/d.csv").await;
654
655 create_file(&store, "/forbidden/e.parquet").await;
657
658 assert_eq!(
659 list_all_files("/", &store, "parquet").await?,
660 vec!["a.parquet"],
661 );
662
663 assert_eq!(
665 list_all_files("/t/", &store, "parquet").await?,
666 vec!["t/b.parquet"],
667 );
668 assert_eq!(
669 list_all_files("/t", &store, "parquet").await?,
670 vec!["t/b.parquet"],
671 );
672
673 assert_eq!(
675 list_all_files("/t", &store, "csv").await?,
676 vec!["t/c.csv", "t/d.csv"],
677 );
678 assert_eq!(
679 list_all_files("/t/", &store, "csv").await?,
680 vec!["t/c.csv", "t/d.csv"],
681 );
682
683 assert_eq!(
685 list_all_files("/NonExisting", &store, "csv").await?,
686 vec![] as Vec<String>
687 );
688 assert_eq!(
689 list_all_files("/NonExisting/", &store, "csv").await?,
690 vec![] as Vec<String>
691 );
692
693 let Err(DataFusionError::ObjectStore(err)) =
695 list_all_files("/forbidden/e.parquet", &store, "parquet").await
696 else {
697 panic!("Expected ObjectStore error");
698 };
699
700 let object_store::Error::PermissionDenied { .. } = &*err else {
701 panic!("Expected PermissionDenied error");
702 };
703
704 Ok(())
705 }
706
707 async fn create_file(object_store: &dyn ObjectStore, path: &str) {
709 object_store
710 .put(&Path::from(path), PutPayload::from_static(b"hello world"))
711 .await
712 .expect("failed to create test file");
713 }
714
715 async fn list_all_files(
719 url: &str,
720 store: &dyn ObjectStore,
721 file_extension: &str,
722 ) -> Result<Vec<String>> {
723 try_list_all_files(url, store, file_extension).await
724 }
725
726 async fn try_list_all_files(
728 url: &str,
729 store: &dyn ObjectStore,
730 file_extension: &str,
731 ) -> Result<Vec<String>> {
732 let session = MockSession::new();
733 let url = ListingTableUrl::parse(url)?;
734 let files = url
735 .list_all_files(&session, store, file_extension)
736 .await?
737 .try_collect::<Vec<_>>()
738 .await?
739 .into_iter()
740 .map(|meta| meta.location.as_ref().to_string())
741 .collect();
742 Ok(files)
743 }
744
745 #[derive(Debug)]
746 struct MockObjectStore {
747 in_mem: object_store::memory::InMemory,
748 forbidden_paths: Vec<Path>,
749 }
750
751 impl std::fmt::Display for MockObjectStore {
752 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753 self.in_mem.fmt(f)
754 }
755 }
756
757 #[async_trait]
758 impl ObjectStore for MockObjectStore {
759 async fn put_opts(
760 &self,
761 location: &Path,
762 payload: PutPayload,
763 opts: object_store::PutOptions,
764 ) -> object_store::Result<object_store::PutResult> {
765 self.in_mem.put_opts(location, payload, opts).await
766 }
767
768 async fn put_multipart_opts(
769 &self,
770 location: &Path,
771 opts: PutMultipartOptions,
772 ) -> object_store::Result<Box<dyn MultipartUpload>> {
773 self.in_mem.put_multipart_opts(location, opts).await
774 }
775
776 async fn get_opts(
777 &self,
778 location: &Path,
779 options: GetOptions,
780 ) -> object_store::Result<GetResult> {
781 self.in_mem.get_opts(location, options).await
782 }
783
784 async fn get_ranges(
785 &self,
786 location: &Path,
787 ranges: &[Range<u64>],
788 ) -> object_store::Result<Vec<Bytes>> {
789 self.in_mem.get_ranges(location, ranges).await
790 }
791
792 async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
793 if self.forbidden_paths.contains(location) {
794 Err(object_store::Error::PermissionDenied {
795 path: location.to_string(),
796 source: "forbidden".into(),
797 })
798 } else {
799 self.in_mem.head(location).await
800 }
801 }
802
803 async fn delete(&self, location: &Path) -> object_store::Result<()> {
804 self.in_mem.delete(location).await
805 }
806
807 fn list(
808 &self,
809 prefix: Option<&Path>,
810 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
811 self.in_mem.list(prefix)
812 }
813
814 async fn list_with_delimiter(
815 &self,
816 prefix: Option<&Path>,
817 ) -> object_store::Result<ListResult> {
818 self.in_mem.list_with_delimiter(prefix).await
819 }
820
821 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
822 self.in_mem.copy(from, to).await
823 }
824
825 async fn copy_if_not_exists(
826 &self,
827 from: &Path,
828 to: &Path,
829 ) -> object_store::Result<()> {
830 self.in_mem.copy_if_not_exists(from, to).await
831 }
832 }
833
834 struct MockSession {
835 config: SessionConfig,
836 runtime_env: Arc<RuntimeEnv>,
837 }
838
839 impl MockSession {
840 fn new() -> Self {
841 Self {
842 config: SessionConfig::new(),
843 runtime_env: Arc::new(RuntimeEnv::default()),
844 }
845 }
846 }
847
848 #[async_trait::async_trait]
849 impl Session for MockSession {
850 fn session_id(&self) -> &str {
851 unimplemented!()
852 }
853
854 fn config(&self) -> &SessionConfig {
855 &self.config
856 }
857
858 async fn create_physical_plan(
859 &self,
860 _logical_plan: &LogicalPlan,
861 ) -> Result<Arc<dyn ExecutionPlan>> {
862 unimplemented!()
863 }
864
865 fn create_physical_expr(
866 &self,
867 _expr: Expr,
868 _df_schema: &DFSchema,
869 ) -> Result<Arc<dyn PhysicalExpr>> {
870 unimplemented!()
871 }
872
873 fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
874 unimplemented!()
875 }
876
877 fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
878 unimplemented!()
879 }
880
881 fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
882 unimplemented!()
883 }
884
885 fn runtime_env(&self) -> &Arc<RuntimeEnv> {
886 &self.runtime_env
887 }
888
889 fn execution_props(&self) -> &ExecutionProps {
890 unimplemented!()
891 }
892
893 fn as_any(&self) -> &dyn Any {
894 unimplemented!()
895 }
896
897 fn table_options(&self) -> &TableOptions {
898 unimplemented!()
899 }
900
901 fn table_options_mut(&mut self) -> &mut TableOptions {
902 unimplemented!()
903 }
904
905 fn task_ctx(&self) -> Arc<TaskContext> {
906 unimplemented!()
907 }
908 }
909}