Skip to main content

hirn_exec/operators/
targeted_query_read.rs

1//! `TargetedQueryReadExec` — query-scoped terminal reads for INSPECT and TRACE.
2
3use std::any::Any;
4use std::fmt;
5use std::sync::Arc;
6
7use arrow_array::{BinaryArray, RecordBatch};
8use arrow_schema::SchemaRef;
9use datafusion_common::{DataFusionError, Result};
10use datafusion_execution::{SendableRecordBatchStream, TaskContext};
11use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
12use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
13use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
14use hirn_query::compiler::plan_compiler::SemanticTargetKindRepr;
15
16use crate::extensions::HirnSessionExt;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum TargetedReadKind {
20    Inspect,
21    Trace,
22}
23
24#[derive(Debug, Clone)]
25pub struct TargetedQueryReadExec {
26    schema: SchemaRef,
27    properties: PlanProperties,
28    kind: TargetedReadKind,
29    target: String,
30    target_kind: SemanticTargetKindRepr,
31}
32
33impl TargetedQueryReadExec {
34    pub fn new(
35        schema: SchemaRef,
36        kind: TargetedReadKind,
37        target: String,
38        target_kind: SemanticTargetKindRepr,
39    ) -> Self {
40        let properties = PlanProperties::new(
41            datafusion_physical_expr::EquivalenceProperties::new(schema.clone()),
42            datafusion_physical_plan::Partitioning::UnknownPartitioning(1),
43            EmissionType::Final,
44            Boundedness::Bounded,
45        );
46
47        Self {
48            schema,
49            properties,
50            kind,
51            target,
52            target_kind,
53        }
54    }
55}
56
57impl DisplayAs for TargetedQueryReadExec {
58    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        write!(
60            f,
61            "TargetedQueryReadExec: kind={:?}, target_kind={:?}",
62            self.kind, self.target_kind
63        )
64    }
65}
66
67impl ExecutionPlan for TargetedQueryReadExec {
68    fn name(&self) -> &str {
69        match self.kind {
70            TargetedReadKind::Inspect => "TargetedInspectExec",
71            TargetedReadKind::Trace => "TargetedTraceExec",
72        }
73    }
74
75    fn as_any(&self) -> &dyn Any {
76        self
77    }
78
79    fn schema(&self) -> SchemaRef {
80        self.schema.clone()
81    }
82
83    fn properties(&self) -> &PlanProperties {
84        &self.properties
85    }
86
87    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
88        vec![]
89    }
90
91    fn with_new_children(
92        self: Arc<Self>,
93        children: Vec<Arc<dyn ExecutionPlan>>,
94    ) -> Result<Arc<dyn ExecutionPlan>> {
95        if !children.is_empty() {
96            return Err(DataFusionError::Plan(
97                "TargetedQueryReadExec is a leaf node and does not accept children".to_string(),
98            ));
99        }
100        Ok(self)
101    }
102
103    fn execute(
104        &self,
105        _partition: usize,
106        context: Arc<TaskContext>,
107    ) -> Result<SendableRecordBatchStream> {
108        let schema = self.schema.clone();
109        let stream_schema = schema.clone();
110        let kind = self.kind;
111        let target = self.target.clone();
112        let target_kind = self.target_kind;
113        let ext = context
114            .session_config()
115            .options()
116            .extensions
117            .get::<HirnSessionExt>()
118            .cloned();
119
120        let fut = async move {
121            let Some(ext) = ext else {
122                return Err(DataFusionError::Execution(
123                    "TargetedQueryReadExec requires HirnSessionExt".to_string(),
124                ));
125            };
126            let Some(runtime) = ext.query_read_runtime() else {
127                return Err(DataFusionError::Execution(
128                    "TargetedQueryReadExec requires a query read runtime in HirnSessionExt"
129                        .to_string(),
130                ));
131            };
132            let Some(agent_id) = ext.agent_id() else {
133                return Err(DataFusionError::Execution(
134                    "TargetedQueryReadExec requires an agent identity in HirnSessionExt"
135                        .to_string(),
136                ));
137            };
138
139            let payload = match kind {
140                TargetedReadKind::Inspect => {
141                    runtime
142                        .inspect_json(&target, target_kind, agent_id, ext.allowed_namespaces())
143                        .await
144                }
145                TargetedReadKind::Trace => {
146                    runtime
147                        .trace_json(&target, target_kind, agent_id, ext.allowed_namespaces())
148                        .await
149                }
150            }
151            .map_err(|error| DataFusionError::Execution(error.to_string()))?;
152
153            build_output_batch(stream_schema, payload)
154        };
155
156        let stream = futures::stream::once(fut);
157        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
158    }
159}
160
161fn build_output_batch(schema: SchemaRef, payload: Vec<u8>) -> Result<RecordBatch> {
162    Ok(RecordBatch::try_new(
163        schema,
164        vec![Arc::new(BinaryArray::from(vec![payload.as_slice()]))],
165    )?)
166}