1use crate::config_extension_ext::{
2 set_distributed_option_extension, set_distributed_option_extension_from_headers,
3};
4use crate::distributed_planner::set_distributed_task_estimator;
5use crate::networking::{set_distributed_channel_resolver, set_distributed_worker_resolver};
6use crate::passthrough_headers::set_passthrough_headers;
7use crate::protobuf::{set_distributed_user_codec, set_distributed_user_codec_arc};
8use crate::work_unit_feed::set_distributed_work_unit_feed;
9use crate::{
10 ChannelResolver, DistributedConfig, TaskEstimator, WorkUnitFeed, WorkUnitFeedProvider,
11 WorkerResolver,
12};
13use arrow_ipc::CompressionType;
14use datafusion::common::DataFusionError;
15use datafusion::config::ConfigExtension;
16use datafusion::execution::{SessionState, SessionStateBuilder};
17use datafusion::physical_plan::ExecutionPlan;
18use datafusion::prelude::{SessionConfig, SessionContext};
19use datafusion_proto::physical_plan::PhysicalExtensionCodec;
20use delegate::delegate;
21use http::HeaderMap;
22use std::sync::Arc;
23
24pub trait DistributedExt: Sized {
26 fn with_distributed_option_extension<T: ConfigExtension + Default>(self, t: T) -> Self;
73
74 fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T);
76
77 fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
126 self,
127 headers: &HeaderMap,
128 ) -> Result<Self, DataFusionError>;
129
130 fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
132 &mut self,
133 headers: &HeaderMap,
134 ) -> Result<(), DataFusionError>;
135
136 fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(self, codec: T) -> Self;
177
178 fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
180
181 fn with_distributed_user_codec_arc(self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
183
184 fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
186
187 fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
223 self,
224 resolver: T,
225 ) -> Self;
226
227 fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
229 &mut self,
230 resolver: T,
231 );
232
233 fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
277 self,
278 resolver: T,
279 ) -> Self;
280
281 fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
283 &mut self,
284 resolver: T,
285 );
286
287 fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
317 self,
318 estimator: T,
319 ) -> Self;
320
321 fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
323 &mut self,
324 estimator: T,
325 );
326
327 fn with_distributed_file_scan_config_bytes_per_partition(
355 self,
356 bytes_per_partition: usize,
357 ) -> Result<Self, DataFusionError>;
358
359 fn set_distributed_file_scan_config_bytes_per_partition(
361 &mut self,
362 bytes_per_partition: usize,
363 ) -> Result<(), DataFusionError>;
364
365 fn with_distributed_cardinality_effect_task_scale_factor(
402 self,
403 factor: f64,
404 ) -> Result<Self, DataFusionError>;
405
406 fn set_distributed_cardinality_effect_task_scale_factor(
409 &mut self,
410 factor: f64,
411 ) -> Result<(), DataFusionError>;
412
413 fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;
416
417 fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
419
420 fn with_distributed_children_isolator_unions(
439 self,
440 enabled: bool,
441 ) -> Result<Self, DataFusionError>;
442
443 fn set_distributed_children_isolator_unions(
445 &mut self,
446 enabled: bool,
447 ) -> Result<(), DataFusionError>;
448
449 fn with_distributed_broadcast_joins(self, enabled: bool) -> Result<Self, DataFusionError>;
456
457 fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError>;
459
460 fn with_distributed_compression(
464 self,
465 compression: Option<CompressionType>,
466 ) -> Result<Self, DataFusionError>;
467
468 fn set_distributed_compression(
470 &mut self,
471 compression: Option<CompressionType>,
472 ) -> Result<(), DataFusionError>;
473
474 fn with_distributed_shuffle_batch_size(
480 self,
481 batch_size: usize,
482 ) -> Result<Self, DataFusionError>;
483
484 fn set_distributed_shuffle_batch_size(
486 &mut self,
487 batch_size: usize,
488 ) -> Result<(), DataFusionError>;
489
490 fn with_distributed_passthrough_headers(
512 self,
513 headers: HeaderMap,
514 ) -> Result<Self, DataFusionError>;
515
516 fn set_distributed_passthrough_headers(
518 &mut self,
519 headers: HeaderMap,
520 ) -> Result<(), DataFusionError>;
521
522 fn with_distributed_max_tasks_per_stage(
526 self,
527 max_tasks_per_stage: usize,
528 ) -> Result<Self, DataFusionError>;
529
530 fn set_distributed_max_tasks_per_stage(
532 &mut self,
533 max_tasks_per_stage: usize,
534 ) -> Result<(), DataFusionError>;
535
536 fn with_distributed_partial_reduce(self, enabled: bool) -> Result<Self, DataFusionError>;
541
542 fn set_distributed_partial_reduce(&mut self, enabled: bool) -> Result<(), DataFusionError>;
544
545 fn with_distributed_worker_connection_buffer_budget_bytes(
550 self,
551 budget_bytes: usize,
552 ) -> Result<Self, DataFusionError>;
553
554 fn set_distributed_worker_connection_buffer_budget_bytes(
557 &mut self,
558 budget_bytes: usize,
559 ) -> Result<(), DataFusionError>;
560
561 fn with_distributed_work_unit_feed<T, P, F>(self, getter: F) -> Self
573 where
574 T: ExecutionPlan + 'static,
575 P: WorkUnitFeedProvider + 'static,
576 P::WorkUnit: 'static,
577 F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
578
579 fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
581 where
582 T: ExecutionPlan + 'static,
583 P: WorkUnitFeedProvider + 'static,
584 P::WorkUnit: 'static,
585 F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
586}
587
588impl DistributedExt for SessionConfig {
589 fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T) {
590 set_distributed_option_extension(self, t)
591 }
592
593 fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
594 &mut self,
595 headers: &HeaderMap,
596 ) -> Result<(), DataFusionError> {
597 set_distributed_option_extension_from_headers::<T>(self, headers)?;
598 Ok(())
599 }
600
601 fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T) {
602 set_distributed_user_codec(self, codec)
603 }
604
605 fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>) {
606 set_distributed_user_codec_arc(self, codec)
607 }
608
609 fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
610 &mut self,
611 resolver: T,
612 ) {
613 set_distributed_worker_resolver(self, resolver);
614 }
615
616 fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
617 &mut self,
618 resolver: T,
619 ) {
620 set_distributed_channel_resolver(self, resolver);
621 }
622
623 fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
624 &mut self,
625 estimator: T,
626 ) {
627 set_distributed_task_estimator(self, estimator)
628 }
629
630 fn set_distributed_file_scan_config_bytes_per_partition(
631 &mut self,
632 bytes_per_partition: usize,
633 ) -> Result<(), DataFusionError> {
634 let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
635 d_cfg.file_scan_config_bytes_per_partition = bytes_per_partition;
636 Ok(())
637 }
638
639 fn set_distributed_cardinality_effect_task_scale_factor(
640 &mut self,
641 factor: f64,
642 ) -> Result<(), DataFusionError> {
643 let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
644 d_cfg.cardinality_task_count_factor = factor;
645 Ok(())
646 }
647
648 fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError> {
649 let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
650 d_cfg.collect_metrics = enabled;
651 Ok(())
652 }
653
654 fn set_distributed_children_isolator_unions(
655 &mut self,
656 enabled: bool,
657 ) -> Result<(), DataFusionError> {
658 let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
659 d_cfg.children_isolator_unions = enabled;
660 Ok(())
661 }
662
663 fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError> {
664 let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
665 d_cfg.broadcast_joins = enabled;
666 Ok(())
667 }
668
669 fn set_distributed_compression(
670 &mut self,
671 compression: Option<CompressionType>,
672 ) -> Result<(), DataFusionError> {
673 let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
674 d_cfg.compression = match compression {
675 Some(CompressionType::ZSTD) => "zstd".to_string(),
676 Some(CompressionType::LZ4_FRAME) => "lz4".to_string(),
677 _ => "none".to_string(),
678 };
679 Ok(())
680 }
681
682 fn set_distributed_shuffle_batch_size(
683 &mut self,
684 batch_size: usize,
685 ) -> Result<(), DataFusionError> {
686 let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
687 d_cfg.shuffle_batch_size = batch_size;
688 Ok(())
689 }
690
691 fn set_distributed_passthrough_headers(
692 &mut self,
693 headers: HeaderMap,
694 ) -> Result<(), DataFusionError> {
695 set_passthrough_headers(self, headers)
696 }
697
698 fn set_distributed_max_tasks_per_stage(
699 &mut self,
700 max_tasks_per_stage: usize,
701 ) -> Result<(), DataFusionError> {
702 let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
703 d_cfg.max_tasks_per_stage = max_tasks_per_stage;
704 Ok(())
705 }
706
707 fn set_distributed_partial_reduce(&mut self, enabled: bool) -> Result<(), DataFusionError> {
708 let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
709 d_cfg.partial_reduce = enabled;
710 Ok(())
711 }
712
713 fn set_distributed_worker_connection_buffer_budget_bytes(
714 &mut self,
715 budget_bytes: usize,
716 ) -> Result<(), DataFusionError> {
717 let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
718 d_cfg.worker_connection_buffer_budget_bytes = budget_bytes;
719 Ok(())
720 }
721
722 fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
723 where
724 T: ExecutionPlan + 'static,
725 P: WorkUnitFeedProvider + 'static,
726 P::WorkUnit: 'static,
727 F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static,
728 {
729 set_distributed_work_unit_feed(self, move |plan: &Arc<dyn ExecutionPlan>| {
730 plan.downcast_ref::<T>().and_then(&getter)
731 })
732 }
733
734 delegate! {
735 to self {
736 #[call(set_distributed_option_extension)]
737 #[expr($;self)]
738 fn with_distributed_option_extension<T: ConfigExtension + Default>(mut self, t: T) -> Self;
739
740 #[call(set_distributed_option_extension_from_headers)]
741 #[expr($?;Ok(self))]
742 fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
743
744 #[call(set_distributed_user_codec)]
745 #[expr($;self)]
746 fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
747
748 #[call(set_distributed_user_codec_arc)]
749 #[expr($;self)]
750 fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
751
752 #[call(set_distributed_worker_resolver)]
753 #[expr($;self)]
754 fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
755
756 #[call(set_distributed_channel_resolver)]
757 #[expr($;self)]
758 fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
759
760 #[call(set_distributed_task_estimator)]
761 #[expr($;self)]
762 fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
763
764 #[call(set_distributed_file_scan_config_bytes_per_partition)]
765 #[expr($?;Ok(self))]
766 fn with_distributed_file_scan_config_bytes_per_partition(mut self, bytes_per_partition: usize) -> Result<Self, DataFusionError>;
767
768 #[call(set_distributed_cardinality_effect_task_scale_factor)]
769 #[expr($?;Ok(self))]
770 fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
771
772 #[call(set_distributed_metrics_collection)]
773 #[expr($?;Ok(self))]
774 fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
775
776 #[call(set_distributed_children_isolator_unions)]
777 #[expr($?;Ok(self))]
778 fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
779
780 #[call(set_distributed_broadcast_joins)]
781 #[expr($?;Ok(self))]
782 fn with_distributed_broadcast_joins(mut self, enabled: bool) -> Result<Self, DataFusionError>;
783
784 #[call(set_distributed_compression)]
785 #[expr($?;Ok(self))]
786 fn with_distributed_compression(mut self, compression: Option<CompressionType>) -> Result<Self, DataFusionError>;
787
788 #[call(set_distributed_shuffle_batch_size)]
789 #[expr($?;Ok(self))]
790 fn with_distributed_shuffle_batch_size(mut self, batch_size: usize) -> Result<Self, DataFusionError>;
791
792 #[call(set_distributed_passthrough_headers)]
793 #[expr($?;Ok(self))]
794 fn with_distributed_passthrough_headers(mut self, headers: HeaderMap) -> Result<Self, DataFusionError>;
795
796 #[call(set_distributed_max_tasks_per_stage)]
797 #[expr($?;Ok(self))]
798 fn with_distributed_max_tasks_per_stage(mut self, max_tasks_per_stage: usize) -> Result<Self, DataFusionError>;
799
800 #[call(set_distributed_partial_reduce)]
801 #[expr($?;Ok(self))]
802 fn with_distributed_partial_reduce(mut self, enabled: bool) -> Result<Self, DataFusionError>;
803
804 #[call(set_distributed_worker_connection_buffer_budget_bytes)]
805 #[expr($?;Ok(self))]
806 fn with_distributed_worker_connection_buffer_budget_bytes(mut self, budget_bytes: usize) -> Result<Self, DataFusionError>;
807
808 #[call(set_distributed_work_unit_feed)]
809 #[expr($;self)]
810 fn with_distributed_work_unit_feed<T, P, F>(mut self, getter: F) -> Self
811 where
812 T: ExecutionPlan + 'static,
813 P: WorkUnitFeedProvider + 'static,
814 P::WorkUnit: 'static,
815 F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
816 }
817 }
818}
819
820impl DistributedExt for SessionStateBuilder {
821 delegate! {
822 to self.config().get_or_insert_default() {
823 fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T);
824 #[call(set_distributed_option_extension)]
825 #[expr($;self)]
826 fn with_distributed_option_extension<T: ConfigExtension + Default>(mut self, t: T) -> Self;
827
828 fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
829 #[call(set_distributed_option_extension_from_headers)]
830 #[expr($?;Ok(self))]
831 fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
832
833 fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
834 #[call(set_distributed_user_codec)]
835 #[expr($;self)]
836 fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
837
838 fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
839 #[call(set_distributed_user_codec_arc)]
840 #[expr($;self)]
841 fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
842
843 fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(&mut self, resolver: T);
844 #[call(set_distributed_worker_resolver)]
845 #[expr($;self)]
846 fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
847
848 fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
849 #[call(set_distributed_channel_resolver)]
850 #[expr($;self)]
851 fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
852
853 fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(&mut self, estimator: T);
854 #[call(set_distributed_task_estimator)]
855 #[expr($;self)]
856 fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
857
858 fn set_distributed_file_scan_config_bytes_per_partition(&mut self, bytes_per_partition: usize) -> Result<(), DataFusionError>;
859 #[call(set_distributed_file_scan_config_bytes_per_partition)]
860 #[expr($?;Ok(self))]
861 fn with_distributed_file_scan_config_bytes_per_partition(mut self, bytes_per_partition: usize) -> Result<Self, DataFusionError>;
862
863 fn set_distributed_cardinality_effect_task_scale_factor(&mut self, factor: f64) -> Result<(), DataFusionError>;
864 #[call(set_distributed_cardinality_effect_task_scale_factor)]
865 #[expr($?;Ok(self))]
866 fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
867
868 fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
869 #[call(set_distributed_metrics_collection)]
870 #[expr($?;Ok(self))]
871 fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
872
873 fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
874 #[call(set_distributed_children_isolator_unions)]
875 #[expr($?;Ok(self))]
876 fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
877
878 fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError>;
879 #[call(set_distributed_broadcast_joins)]
880 #[expr($?;Ok(self))]
881 fn with_distributed_broadcast_joins(mut self, enabled: bool) -> Result<Self, DataFusionError>;
882
883 fn set_distributed_compression(&mut self, compression: Option<CompressionType>) -> Result<(), DataFusionError>;
884 #[call(set_distributed_compression)]
885 #[expr($?;Ok(self))]
886 fn with_distributed_compression(mut self, compression: Option<CompressionType>) -> Result<Self, DataFusionError>;
887
888 fn set_distributed_shuffle_batch_size(&mut self, batch_size: usize) -> Result<(), DataFusionError>;
889 #[call(set_distributed_shuffle_batch_size)]
890 #[expr($?;Ok(self))]
891 fn with_distributed_shuffle_batch_size(mut self, batch_size: usize) -> Result<Self, DataFusionError>;
892
893 fn set_distributed_passthrough_headers(&mut self, headers: HeaderMap) -> Result<(), DataFusionError>;
894 #[call(set_distributed_passthrough_headers)]
895 #[expr($?;Ok(self))]
896 fn with_distributed_passthrough_headers(mut self, headers: HeaderMap) -> Result<Self, DataFusionError>;
897
898 fn set_distributed_max_tasks_per_stage(&mut self, max_tasks_per_stage: usize) -> Result<(), DataFusionError>;
899 #[call(set_distributed_max_tasks_per_stage)]
900 #[expr($?;Ok(self))]
901 fn with_distributed_max_tasks_per_stage(mut self, max_tasks_per_stage: usize) -> Result<Self, DataFusionError>;
902
903 fn set_distributed_partial_reduce(&mut self, enabled: bool) -> Result<(), DataFusionError>;
904 #[call(set_distributed_partial_reduce)]
905 #[expr($?;Ok(self))]
906 fn with_distributed_partial_reduce(mut self, enabled: bool) -> Result<Self, DataFusionError>;
907
908 fn set_distributed_worker_connection_buffer_budget_bytes(&mut self, budget_bytes: usize) -> Result<(), DataFusionError>;
909 #[call(set_distributed_worker_connection_buffer_budget_bytes)]
910 #[expr($?;Ok(self))]
911 fn with_distributed_worker_connection_buffer_budget_bytes(mut self, budget_bytes: usize) -> Result<Self, DataFusionError>;
912
913 fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
914 where
915 T: ExecutionPlan + 'static,
916 P: WorkUnitFeedProvider + 'static,
917 P::WorkUnit: 'static,
918 F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
919 #[call(set_distributed_work_unit_feed)]
920 #[expr($;self)]
921 fn with_distributed_work_unit_feed<T, P, F>(mut self, getter: F) -> Self
922 where
923 T: ExecutionPlan + 'static,
924 P: WorkUnitFeedProvider + 'static,
925 P::WorkUnit: 'static,
926 F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
927 }
928 }
929}
930
931impl DistributedExt for SessionState {
932 delegate! {
933 to self.config_mut() {
934 fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T);
935 #[call(set_distributed_option_extension)]
936 #[expr($;self)]
937 fn with_distributed_option_extension<T: ConfigExtension + Default>(mut self, t: T) -> Self;
938
939 fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
940 #[call(set_distributed_option_extension_from_headers)]
941 #[expr($?;Ok(self))]
942 fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
943
944 fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
945 #[call(set_distributed_user_codec)]
946 #[expr($;self)]
947 fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
948
949 fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
950 #[call(set_distributed_user_codec_arc)]
951 #[expr($;self)]
952 fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
953
954 fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(&mut self, resolver: T);
955 #[call(set_distributed_worker_resolver)]
956 #[expr($;self)]
957 fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
958
959 fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
960 #[call(set_distributed_channel_resolver)]
961 #[expr($;self)]
962 fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
963
964 fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(&mut self, estimator: T);
965 #[call(set_distributed_task_estimator)]
966 #[expr($;self)]
967 fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
968
969 fn set_distributed_file_scan_config_bytes_per_partition(&mut self, bytes_per_partition: usize) -> Result<(), DataFusionError>;
970 #[call(set_distributed_file_scan_config_bytes_per_partition)]
971 #[expr($?;Ok(self))]
972 fn with_distributed_file_scan_config_bytes_per_partition(mut self, bytes_per_partition: usize) -> Result<Self, DataFusionError>;
973
974 fn set_distributed_cardinality_effect_task_scale_factor(&mut self, factor: f64) -> Result<(), DataFusionError>;
975 #[call(set_distributed_cardinality_effect_task_scale_factor)]
976 #[expr($?;Ok(self))]
977 fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
978
979 fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
980 #[call(set_distributed_metrics_collection)]
981 #[expr($?;Ok(self))]
982 fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
983
984 fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
985 #[call(set_distributed_children_isolator_unions)]
986 #[expr($?;Ok(self))]
987 fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
988
989 fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError>;
990 #[call(set_distributed_broadcast_joins)]
991 #[expr($?;Ok(self))]
992 fn with_distributed_broadcast_joins(mut self, enabled: bool) -> Result<Self, DataFusionError>;
993
994 fn set_distributed_compression(&mut self, compression: Option<CompressionType>) -> Result<(), DataFusionError>;
995 #[call(set_distributed_compression)]
996 #[expr($?;Ok(self))]
997 fn with_distributed_compression(mut self, compression: Option<CompressionType>) -> Result<Self, DataFusionError>;
998
999 fn set_distributed_shuffle_batch_size(&mut self, batch_size: usize) -> Result<(), DataFusionError>;
1000 #[call(set_distributed_shuffle_batch_size)]
1001 #[expr($?;Ok(self))]
1002 fn with_distributed_shuffle_batch_size(mut self, batch_size: usize) -> Result<Self, DataFusionError>;
1003
1004 fn set_distributed_passthrough_headers(&mut self, headers: HeaderMap) -> Result<(), DataFusionError>;
1005 #[call(set_distributed_passthrough_headers)]
1006 #[expr($?;Ok(self))]
1007 fn with_distributed_passthrough_headers(mut self, headers: HeaderMap) -> Result<Self, DataFusionError>;
1008
1009 fn set_distributed_max_tasks_per_stage(&mut self, max_tasks_per_stage: usize) -> Result<(), DataFusionError>;
1010 #[call(set_distributed_max_tasks_per_stage)]
1011 #[expr($?;Ok(self))]
1012 fn with_distributed_max_tasks_per_stage(mut self, max_tasks_per_stage: usize) -> Result<Self, DataFusionError>;
1013
1014 fn set_distributed_partial_reduce(&mut self, enabled: bool) -> Result<(), DataFusionError>;
1015 #[call(set_distributed_partial_reduce)]
1016 #[expr($?;Ok(self))]
1017 fn with_distributed_partial_reduce(mut self, enabled: bool) -> Result<Self, DataFusionError>;
1018
1019 fn set_distributed_worker_connection_buffer_budget_bytes(&mut self, budget_bytes: usize) -> Result<(), DataFusionError>;
1020 #[call(set_distributed_worker_connection_buffer_budget_bytes)]
1021 #[expr($?;Ok(self))]
1022 fn with_distributed_worker_connection_buffer_budget_bytes(mut self, budget_bytes: usize) -> Result<Self, DataFusionError>;
1023
1024 fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
1025 where
1026 T: ExecutionPlan + 'static,
1027 P: WorkUnitFeedProvider + 'static,
1028 P::WorkUnit: 'static,
1029 F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
1030 #[call(set_distributed_work_unit_feed)]
1031 #[expr($;self)]
1032 fn with_distributed_work_unit_feed<T, P, F>(mut self, getter: F) -> Self
1033 where
1034 T: ExecutionPlan + 'static,
1035 P: WorkUnitFeedProvider + 'static,
1036 P::WorkUnit: 'static,
1037 F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
1038 }
1039 }
1040}
1041
1042impl DistributedExt for SessionContext {
1043 delegate! {
1044 to self.state_ref().write().config_mut() {
1045 fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T);
1046 #[call(set_distributed_option_extension)]
1047 #[expr($;self)]
1048 fn with_distributed_option_extension<T: ConfigExtension + Default>(self, t: T) -> Self;
1049
1050 fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
1051 #[call(set_distributed_option_extension_from_headers)]
1052 #[expr($?;Ok(self))]
1053 fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
1054
1055 fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
1056 #[call(set_distributed_user_codec)]
1057 #[expr($;self)]
1058 fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(self, codec: T) -> Self;
1059
1060 fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
1061 #[call(set_distributed_user_codec_arc)]
1062 #[expr($;self)]
1063 fn with_distributed_user_codec_arc(self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
1064
1065 fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(&mut self, resolver: T);
1066 #[call(set_distributed_worker_resolver)]
1067 #[expr($;self)]
1068 fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
1069
1070 fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
1071 #[call(set_distributed_channel_resolver)]
1072 #[expr($;self)]
1073 fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
1074
1075 fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(&mut self, estimator: T);
1076 #[call(set_distributed_task_estimator)]
1077 #[expr($;self)]
1078 fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(self, estimator: T) -> Self;
1079
1080 fn set_distributed_file_scan_config_bytes_per_partition(&mut self, bytes_per_partition: usize) -> Result<(), DataFusionError>;
1081 #[call(set_distributed_file_scan_config_bytes_per_partition)]
1082 #[expr($?;Ok(self))]
1083 fn with_distributed_file_scan_config_bytes_per_partition(self, bytes_per_partition: usize) -> Result<Self, DataFusionError>;
1084
1085 fn set_distributed_cardinality_effect_task_scale_factor(&mut self, factor: f64) -> Result<(), DataFusionError>;
1086 #[call(set_distributed_cardinality_effect_task_scale_factor)]
1087 #[expr($?;Ok(self))]
1088 fn with_distributed_cardinality_effect_task_scale_factor(self, factor: f64) -> Result<Self, DataFusionError>;
1089
1090 fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
1091 #[call(set_distributed_metrics_collection)]
1092 #[expr($?;Ok(self))]
1093 fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;
1094
1095 fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
1096 #[call(set_distributed_children_isolator_unions)]
1097 #[expr($?;Ok(self))]
1098 fn with_distributed_children_isolator_unions(self, enabled: bool) -> Result<Self, DataFusionError>;
1099
1100 fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError>;
1101 #[call(set_distributed_broadcast_joins)]
1102 #[expr($?;Ok(self))]
1103 fn with_distributed_broadcast_joins(self, enabled: bool) -> Result<Self, DataFusionError>;
1104
1105 fn set_distributed_compression(&mut self, compression: Option<CompressionType>) -> Result<(), DataFusionError>;
1106 #[call(set_distributed_compression)]
1107 #[expr($?;Ok(self))]
1108 fn with_distributed_compression(self, compression: Option<CompressionType>) -> Result<Self, DataFusionError>;
1109
1110 fn set_distributed_shuffle_batch_size(&mut self, batch_size: usize) -> Result<(), DataFusionError>;
1111 #[call(set_distributed_shuffle_batch_size)]
1112 #[expr($?;Ok(self))]
1113 fn with_distributed_shuffle_batch_size(self, batch_size: usize) -> Result<Self, DataFusionError>;
1114
1115 fn set_distributed_passthrough_headers(&mut self, headers: HeaderMap) -> Result<(), DataFusionError>;
1116 #[call(set_distributed_passthrough_headers)]
1117 #[expr($?;Ok(self))]
1118 fn with_distributed_passthrough_headers(self, headers: HeaderMap) -> Result<Self, DataFusionError>;
1119
1120 fn set_distributed_max_tasks_per_stage(&mut self, max_tasks_per_stage: usize) -> Result<(), DataFusionError>;
1121 #[call(set_distributed_max_tasks_per_stage)]
1122 #[expr($?;Ok(self))]
1123 fn with_distributed_max_tasks_per_stage(self, max_tasks_per_stage: usize) -> Result<Self, DataFusionError>;
1124
1125 fn set_distributed_partial_reduce(&mut self, enabled: bool) -> Result<(), DataFusionError>;
1126 #[call(set_distributed_partial_reduce)]
1127 #[expr($?;Ok(self))]
1128 fn with_distributed_partial_reduce(self, enabled: bool) -> Result<Self, DataFusionError>;
1129
1130 fn set_distributed_worker_connection_buffer_budget_bytes(&mut self, budget_bytes: usize) -> Result<(), DataFusionError>;
1131 #[call(set_distributed_worker_connection_buffer_budget_bytes)]
1132 #[expr($?;Ok(self))]
1133 fn with_distributed_worker_connection_buffer_budget_bytes(self, budget_bytes: usize) -> Result<Self, DataFusionError>;
1134
1135 fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
1136 where
1137 T: ExecutionPlan + 'static,
1138 P: WorkUnitFeedProvider + 'static,
1139 P::WorkUnit: 'static,
1140 F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
1141 #[call(set_distributed_work_unit_feed)]
1142 #[expr($;self)]
1143 fn with_distributed_work_unit_feed<T, P, F>(self, getter: F) -> Self
1144 where
1145 T: ExecutionPlan + 'static,
1146 P: WorkUnitFeedProvider + 'static,
1147 P::WorkUnit: 'static,
1148 F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
1149 }
1150 }
1151}