hirn_exec/operators/
policy_query_read.rs1use std::any::Any;
4use std::fmt;
5use std::sync::Arc;
6
7use arrow_array::{BinaryArray, RecordBatch};
8use arrow_schema::SchemaRef;
9use datafusion_common::{DataFusionError, Result};
10use datafusion_execution::{SendableRecordBatchStream, TaskContext};
11use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
12use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
13use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
14
15use crate::extensions::HirnSessionExt;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum PolicyReadKind {
19 ShowPolicies,
20 ExplainPolicy,
21}
22
23#[derive(Debug, Clone)]
24pub struct PolicyQueryReadExec {
25 schema: SchemaRef,
26 properties: PlanProperties,
27 kind: PolicyReadKind,
28 principal_kind: Option<String>,
29 principal_name: Option<String>,
30 resource_type: Option<String>,
31 resource_name: Option<String>,
32 action: Option<String>,
33}
34
35impl PolicyQueryReadExec {
36 pub fn new(
37 schema: SchemaRef,
38 kind: PolicyReadKind,
39 principal_kind: Option<String>,
40 principal_name: Option<String>,
41 resource_type: Option<String>,
42 resource_name: Option<String>,
43 action: Option<String>,
44 ) -> Self {
45 let properties = PlanProperties::new(
46 datafusion_physical_expr::EquivalenceProperties::new(schema.clone()),
47 datafusion_physical_plan::Partitioning::UnknownPartitioning(1),
48 EmissionType::Final,
49 Boundedness::Bounded,
50 );
51
52 Self {
53 schema,
54 properties,
55 kind,
56 principal_kind,
57 principal_name,
58 resource_type,
59 resource_name,
60 action,
61 }
62 }
63}
64
65impl DisplayAs for PolicyQueryReadExec {
66 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 write!(f, "PolicyQueryReadExec: kind={:?}", self.kind)
68 }
69}
70
71impl ExecutionPlan for PolicyQueryReadExec {
72 fn name(&self) -> &str {
73 match self.kind {
74 PolicyReadKind::ShowPolicies => "PolicyShowPoliciesExec",
75 PolicyReadKind::ExplainPolicy => "PolicyExplainPolicyExec",
76 }
77 }
78
79 fn as_any(&self) -> &dyn Any {
80 self
81 }
82
83 fn schema(&self) -> SchemaRef {
84 self.schema.clone()
85 }
86
87 fn properties(&self) -> &PlanProperties {
88 &self.properties
89 }
90
91 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
92 vec![]
93 }
94
95 fn with_new_children(
96 self: Arc<Self>,
97 children: Vec<Arc<dyn ExecutionPlan>>,
98 ) -> Result<Arc<dyn ExecutionPlan>> {
99 if !children.is_empty() {
100 return Err(DataFusionError::Plan(
101 "PolicyQueryReadExec is a leaf node and does not accept children".to_string(),
102 ));
103 }
104 Ok(self)
105 }
106
107 fn execute(
108 &self,
109 _partition: usize,
110 context: Arc<TaskContext>,
111 ) -> Result<SendableRecordBatchStream> {
112 let schema = self.schema.clone();
113 let stream_schema = schema.clone();
114 let kind = self.kind;
115 let principal_kind = self.principal_kind.clone();
116 let principal_name = self.principal_name.clone();
117 let resource_type = self.resource_type.clone();
118 let resource_name = self.resource_name.clone();
119 let action = self.action.clone();
120 let ext = context
121 .session_config()
122 .options()
123 .extensions
124 .get::<HirnSessionExt>()
125 .cloned();
126
127 let fut = async move {
128 let Some(ext) = ext else {
129 return Err(DataFusionError::Execution(
130 "PolicyQueryReadExec requires HirnSessionExt".to_string(),
131 ));
132 };
133 let Some(runtime) = ext.query_read_runtime() else {
134 return Err(DataFusionError::Execution(
135 "PolicyQueryReadExec requires a query read runtime in HirnSessionExt"
136 .to_string(),
137 ));
138 };
139
140 let payload = match kind {
141 PolicyReadKind::ShowPolicies => {
142 runtime
143 .show_policies_json(principal_kind.as_deref(), principal_name.as_deref())
144 .await
145 }
146 PolicyReadKind::ExplainPolicy => {
147 let Some(principal_kind) = principal_kind.as_deref() else {
148 return Err(DataFusionError::Execution(
149 "PolicyQueryReadExec EXPLAIN POLICY requires a principal kind"
150 .to_string(),
151 ));
152 };
153 let Some(principal_name) = principal_name.as_deref() else {
154 return Err(DataFusionError::Execution(
155 "PolicyQueryReadExec EXPLAIN POLICY requires a principal name"
156 .to_string(),
157 ));
158 };
159 let Some(resource_type) = resource_type.as_deref() else {
160 return Err(DataFusionError::Execution(
161 "PolicyQueryReadExec EXPLAIN POLICY requires a resource type"
162 .to_string(),
163 ));
164 };
165 let Some(resource_name) = resource_name.as_deref() else {
166 return Err(DataFusionError::Execution(
167 "PolicyQueryReadExec EXPLAIN POLICY requires a resource name"
168 .to_string(),
169 ));
170 };
171 let Some(action) = action.as_deref() else {
172 return Err(DataFusionError::Execution(
173 "PolicyQueryReadExec EXPLAIN POLICY requires an action".to_string(),
174 ));
175 };
176 runtime
177 .explain_policy_json(
178 principal_kind,
179 principal_name,
180 resource_type,
181 resource_name,
182 action,
183 )
184 .await
185 }
186 }
187 .map_err(|error| DataFusionError::Execution(error.to_string()))?;
188
189 Ok::<_, DataFusionError>(RecordBatch::try_new(
190 stream_schema,
191 vec![Arc::new(BinaryArray::from(vec![payload.as_slice()]))],
192 )?)
193 };
194
195 let stream = futures::stream::once(fut);
196 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
197 }
198}