Skip to main content

hirn_exec/operators/
policy_query_read.rs

1//! `PolicyQueryReadExec` — query-scoped terminal reads for policy HirnQL statements.
2
3use std::any::Any;
4use std::fmt;
5use std::sync::Arc;
6
7use arrow_array::{BinaryArray, RecordBatch};
8use arrow_schema::SchemaRef;
9use datafusion_common::{DataFusionError, Result};
10use datafusion_execution::{SendableRecordBatchStream, TaskContext};
11use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
12use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
13use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
14
15use crate::extensions::HirnSessionExt;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum PolicyReadKind {
19    ShowPolicies,
20    ExplainPolicy,
21}
22
23#[derive(Debug, Clone)]
24pub struct PolicyQueryReadExec {
25    schema: SchemaRef,
26    properties: PlanProperties,
27    kind: PolicyReadKind,
28    principal_kind: Option<String>,
29    principal_name: Option<String>,
30    resource_type: Option<String>,
31    resource_name: Option<String>,
32    action: Option<String>,
33}
34
35impl PolicyQueryReadExec {
36    pub fn new(
37        schema: SchemaRef,
38        kind: PolicyReadKind,
39        principal_kind: Option<String>,
40        principal_name: Option<String>,
41        resource_type: Option<String>,
42        resource_name: Option<String>,
43        action: Option<String>,
44    ) -> Self {
45        let properties = PlanProperties::new(
46            datafusion_physical_expr::EquivalenceProperties::new(schema.clone()),
47            datafusion_physical_plan::Partitioning::UnknownPartitioning(1),
48            EmissionType::Final,
49            Boundedness::Bounded,
50        );
51
52        Self {
53            schema,
54            properties,
55            kind,
56            principal_kind,
57            principal_name,
58            resource_type,
59            resource_name,
60            action,
61        }
62    }
63}
64
65impl DisplayAs for PolicyQueryReadExec {
66    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        write!(f, "PolicyQueryReadExec: kind={:?}", self.kind)
68    }
69}
70
71impl ExecutionPlan for PolicyQueryReadExec {
72    fn name(&self) -> &str {
73        match self.kind {
74            PolicyReadKind::ShowPolicies => "PolicyShowPoliciesExec",
75            PolicyReadKind::ExplainPolicy => "PolicyExplainPolicyExec",
76        }
77    }
78
79    fn as_any(&self) -> &dyn Any {
80        self
81    }
82
83    fn schema(&self) -> SchemaRef {
84        self.schema.clone()
85    }
86
87    fn properties(&self) -> &PlanProperties {
88        &self.properties
89    }
90
91    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
92        vec![]
93    }
94
95    fn with_new_children(
96        self: Arc<Self>,
97        children: Vec<Arc<dyn ExecutionPlan>>,
98    ) -> Result<Arc<dyn ExecutionPlan>> {
99        if !children.is_empty() {
100            return Err(DataFusionError::Plan(
101                "PolicyQueryReadExec is a leaf node and does not accept children".to_string(),
102            ));
103        }
104        Ok(self)
105    }
106
107    fn execute(
108        &self,
109        _partition: usize,
110        context: Arc<TaskContext>,
111    ) -> Result<SendableRecordBatchStream> {
112        let schema = self.schema.clone();
113        let stream_schema = schema.clone();
114        let kind = self.kind;
115        let principal_kind = self.principal_kind.clone();
116        let principal_name = self.principal_name.clone();
117        let resource_type = self.resource_type.clone();
118        let resource_name = self.resource_name.clone();
119        let action = self.action.clone();
120        let ext = context
121            .session_config()
122            .options()
123            .extensions
124            .get::<HirnSessionExt>()
125            .cloned();
126
127        let fut = async move {
128            let Some(ext) = ext else {
129                return Err(DataFusionError::Execution(
130                    "PolicyQueryReadExec requires HirnSessionExt".to_string(),
131                ));
132            };
133            let Some(runtime) = ext.query_read_runtime() else {
134                return Err(DataFusionError::Execution(
135                    "PolicyQueryReadExec requires a query read runtime in HirnSessionExt"
136                        .to_string(),
137                ));
138            };
139
140            let payload = match kind {
141                PolicyReadKind::ShowPolicies => {
142                    runtime
143                        .show_policies_json(principal_kind.as_deref(), principal_name.as_deref())
144                        .await
145                }
146                PolicyReadKind::ExplainPolicy => {
147                    let Some(principal_kind) = principal_kind.as_deref() else {
148                        return Err(DataFusionError::Execution(
149                            "PolicyQueryReadExec EXPLAIN POLICY requires a principal kind"
150                                .to_string(),
151                        ));
152                    };
153                    let Some(principal_name) = principal_name.as_deref() else {
154                        return Err(DataFusionError::Execution(
155                            "PolicyQueryReadExec EXPLAIN POLICY requires a principal name"
156                                .to_string(),
157                        ));
158                    };
159                    let Some(resource_type) = resource_type.as_deref() else {
160                        return Err(DataFusionError::Execution(
161                            "PolicyQueryReadExec EXPLAIN POLICY requires a resource type"
162                                .to_string(),
163                        ));
164                    };
165                    let Some(resource_name) = resource_name.as_deref() else {
166                        return Err(DataFusionError::Execution(
167                            "PolicyQueryReadExec EXPLAIN POLICY requires a resource name"
168                                .to_string(),
169                        ));
170                    };
171                    let Some(action) = action.as_deref() else {
172                        return Err(DataFusionError::Execution(
173                            "PolicyQueryReadExec EXPLAIN POLICY requires an action".to_string(),
174                        ));
175                    };
176                    runtime
177                        .explain_policy_json(
178                            principal_kind,
179                            principal_name,
180                            resource_type,
181                            resource_name,
182                            action,
183                        )
184                        .await
185                }
186            }
187            .map_err(|error| DataFusionError::Execution(error.to_string()))?;
188
189            Ok::<_, DataFusionError>(RecordBatch::try_new(
190                stream_schema,
191                vec![Arc::new(BinaryArray::from(vec![payload.as_slice()]))],
192            )?)
193        };
194
195        let stream = futures::stream::once(fut);
196        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
197    }
198}