hirn_exec/operators/
targeted_query_read.rs1use std::any::Any;
4use std::fmt;
5use std::sync::Arc;
6
7use arrow_array::{BinaryArray, RecordBatch};
8use arrow_schema::SchemaRef;
9use datafusion_common::{DataFusionError, Result};
10use datafusion_execution::{SendableRecordBatchStream, TaskContext};
11use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
12use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
13use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
14use hirn_query::compiler::plan_compiler::SemanticTargetKindRepr;
15
16use crate::extensions::HirnSessionExt;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum TargetedReadKind {
20 Inspect,
21 Trace,
22}
23
24#[derive(Debug, Clone)]
25pub struct TargetedQueryReadExec {
26 schema: SchemaRef,
27 properties: PlanProperties,
28 kind: TargetedReadKind,
29 target: String,
30 target_kind: SemanticTargetKindRepr,
31}
32
33impl TargetedQueryReadExec {
34 pub fn new(
35 schema: SchemaRef,
36 kind: TargetedReadKind,
37 target: String,
38 target_kind: SemanticTargetKindRepr,
39 ) -> Self {
40 let properties = PlanProperties::new(
41 datafusion_physical_expr::EquivalenceProperties::new(schema.clone()),
42 datafusion_physical_plan::Partitioning::UnknownPartitioning(1),
43 EmissionType::Final,
44 Boundedness::Bounded,
45 );
46
47 Self {
48 schema,
49 properties,
50 kind,
51 target,
52 target_kind,
53 }
54 }
55}
56
57impl DisplayAs for TargetedQueryReadExec {
58 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 write!(
60 f,
61 "TargetedQueryReadExec: kind={:?}, target_kind={:?}",
62 self.kind, self.target_kind
63 )
64 }
65}
66
67impl ExecutionPlan for TargetedQueryReadExec {
68 fn name(&self) -> &str {
69 match self.kind {
70 TargetedReadKind::Inspect => "TargetedInspectExec",
71 TargetedReadKind::Trace => "TargetedTraceExec",
72 }
73 }
74
75 fn as_any(&self) -> &dyn Any {
76 self
77 }
78
79 fn schema(&self) -> SchemaRef {
80 self.schema.clone()
81 }
82
83 fn properties(&self) -> &PlanProperties {
84 &self.properties
85 }
86
87 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
88 vec![]
89 }
90
91 fn with_new_children(
92 self: Arc<Self>,
93 children: Vec<Arc<dyn ExecutionPlan>>,
94 ) -> Result<Arc<dyn ExecutionPlan>> {
95 if !children.is_empty() {
96 return Err(DataFusionError::Plan(
97 "TargetedQueryReadExec is a leaf node and does not accept children".to_string(),
98 ));
99 }
100 Ok(self)
101 }
102
103 fn execute(
104 &self,
105 _partition: usize,
106 context: Arc<TaskContext>,
107 ) -> Result<SendableRecordBatchStream> {
108 let schema = self.schema.clone();
109 let stream_schema = schema.clone();
110 let kind = self.kind;
111 let target = self.target.clone();
112 let target_kind = self.target_kind;
113 let ext = context
114 .session_config()
115 .options()
116 .extensions
117 .get::<HirnSessionExt>()
118 .cloned();
119
120 let fut = async move {
121 let Some(ext) = ext else {
122 return Err(DataFusionError::Execution(
123 "TargetedQueryReadExec requires HirnSessionExt".to_string(),
124 ));
125 };
126 let Some(runtime) = ext.query_read_runtime() else {
127 return Err(DataFusionError::Execution(
128 "TargetedQueryReadExec requires a query read runtime in HirnSessionExt"
129 .to_string(),
130 ));
131 };
132 let Some(agent_id) = ext.agent_id() else {
133 return Err(DataFusionError::Execution(
134 "TargetedQueryReadExec requires an agent identity in HirnSessionExt"
135 .to_string(),
136 ));
137 };
138
139 let payload = match kind {
140 TargetedReadKind::Inspect => {
141 runtime
142 .inspect_json(&target, target_kind, agent_id, ext.allowed_namespaces())
143 .await
144 }
145 TargetedReadKind::Trace => {
146 runtime
147 .trace_json(&target, target_kind, agent_id, ext.allowed_namespaces())
148 .await
149 }
150 }
151 .map_err(|error| DataFusionError::Execution(error.to_string()))?;
152
153 build_output_batch(stream_schema, payload)
154 };
155
156 let stream = futures::stream::once(fut);
157 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
158 }
159}
160
161fn build_output_batch(schema: SchemaRef, payload: Vec<u8>) -> Result<RecordBatch> {
162 Ok(RecordBatch::try_new(
163 schema,
164 vec![Arc::new(BinaryArray::from(vec![payload.as_slice()]))],
165 )?)
166}