use std::sync::Arc;
use futures::stream;
use pgwire::api::results::{DataRowEncoder, QueryResponse, Response, Tag};
use crate::bridge::envelope::PhysicalPlan;
use crate::bridge::physical_plan::{
CrdtOp, DocumentOp, GraphOp, MetaOp, QueryOp, SpatialOp, TextOp, TimeseriesOp, VectorOp,
};
use super::super::types::text_field;
#[derive(Debug, Clone, Copy)]
pub(super) enum PlanKind {
SingleDocument,
MultiRow,
Execution,
}
pub(super) fn extract_collection(plan: &PhysicalPlan) -> Option<&str> {
match plan {
PhysicalPlan::Document(DocumentOp::PointGet { collection, .. })
| PhysicalPlan::Vector(VectorOp::Search { collection, .. })
| PhysicalPlan::Document(DocumentOp::RangeScan { collection, .. })
| PhysicalPlan::Crdt(CrdtOp::Read { collection, .. })
| PhysicalPlan::Crdt(CrdtOp::Apply { collection, .. })
| PhysicalPlan::Vector(VectorOp::Insert { collection, .. })
| PhysicalPlan::Vector(VectorOp::BatchInsert { collection, .. })
| PhysicalPlan::Vector(VectorOp::MultiSearch { collection, .. })
| PhysicalPlan::Vector(VectorOp::Delete { collection, .. })
| PhysicalPlan::Document(DocumentOp::BatchInsert { collection, .. })
| PhysicalPlan::Document(DocumentOp::PointPut { collection, .. })
| PhysicalPlan::Document(DocumentOp::PointDelete { collection, .. })
| PhysicalPlan::Document(DocumentOp::PointUpdate { collection, .. })
| PhysicalPlan::Document(DocumentOp::Scan { collection, .. })
| PhysicalPlan::Query(QueryOp::Aggregate { collection, .. })
| PhysicalPlan::Query(QueryOp::HashJoin {
left_collection: collection,
..
})
| PhysicalPlan::Query(QueryOp::NestedLoopJoin {
left_collection: collection,
..
})
| PhysicalPlan::Graph(GraphOp::RagFusion { collection, .. })
| PhysicalPlan::Crdt(CrdtOp::SetPolicy { collection, .. })
| PhysicalPlan::Vector(VectorOp::SetParams { collection, .. })
| PhysicalPlan::Text(TextOp::Search { collection, .. })
| PhysicalPlan::Text(TextOp::HybridSearch { collection, .. })
| PhysicalPlan::Query(QueryOp::PartialAggregate { collection, .. })
| PhysicalPlan::Query(QueryOp::BroadcastJoin {
large_collection: collection,
..
})
| PhysicalPlan::Query(QueryOp::ShuffleJoin {
left_collection: collection,
..
})
| PhysicalPlan::Document(DocumentOp::BulkUpdate { collection, .. })
| PhysicalPlan::Document(DocumentOp::BulkDelete { collection, .. })
| PhysicalPlan::Document(DocumentOp::Upsert { collection, .. })
| PhysicalPlan::Document(DocumentOp::InsertSelect {
target_collection: collection,
..
})
| PhysicalPlan::Document(DocumentOp::Truncate { collection })
| PhysicalPlan::Document(DocumentOp::EstimateCount { collection, .. })
| PhysicalPlan::Timeseries(TimeseriesOp::Scan { collection, .. })
| PhysicalPlan::Timeseries(TimeseriesOp::Ingest { collection, .. })
| PhysicalPlan::Spatial(SpatialOp::Scan { collection, .. })
| PhysicalPlan::Document(DocumentOp::Register { collection, .. })
| PhysicalPlan::Document(DocumentOp::IndexLookup { collection, .. })
| PhysicalPlan::Document(DocumentOp::DropIndex { collection, .. }) => {
Some(collection.as_str())
}
PhysicalPlan::Graph(GraphOp::EdgePut { .. })
| PhysicalPlan::Graph(GraphOp::EdgeDelete { .. })
| PhysicalPlan::Graph(GraphOp::Hop { .. })
| PhysicalPlan::Graph(GraphOp::Neighbors { .. })
| PhysicalPlan::Graph(GraphOp::Path { .. })
| PhysicalPlan::Graph(GraphOp::Subgraph { .. })
| PhysicalPlan::Meta(MetaOp::WalAppend { .. })
| PhysicalPlan::Meta(MetaOp::Cancel { .. })
| PhysicalPlan::Meta(MetaOp::TransactionBatch { .. })
| PhysicalPlan::Meta(MetaOp::CreateSnapshot)
| PhysicalPlan::Meta(MetaOp::Compact)
| PhysicalPlan::Meta(MetaOp::Checkpoint)
| PhysicalPlan::Graph(GraphOp::Algo { .. })
| PhysicalPlan::Graph(GraphOp::Match { .. }) => None,
_ => None,
}
}
pub(super) fn describe_plan(plan: &PhysicalPlan) -> PlanKind {
match plan {
PhysicalPlan::Document(DocumentOp::PointGet { .. })
| PhysicalPlan::Crdt(CrdtOp::Read { .. }) => PlanKind::SingleDocument,
PhysicalPlan::Vector(VectorOp::Search { .. })
| PhysicalPlan::Document(DocumentOp::RangeScan { .. })
| PhysicalPlan::Graph(GraphOp::Hop { .. })
| PhysicalPlan::Graph(GraphOp::Neighbors { .. })
| PhysicalPlan::Graph(GraphOp::Path { .. })
| PhysicalPlan::Graph(GraphOp::Subgraph { .. })
| PhysicalPlan::Graph(GraphOp::RagFusion { .. })
| PhysicalPlan::Document(DocumentOp::Scan { .. })
| PhysicalPlan::Query(QueryOp::Aggregate { .. })
| PhysicalPlan::Query(QueryOp::HashJoin { .. })
| PhysicalPlan::Graph(GraphOp::Algo { .. })
| PhysicalPlan::Graph(GraphOp::Match { .. }) => PlanKind::MultiRow,
_ => PlanKind::Execution,
}
}
pub(super) fn payload_to_response(payload: &[u8], kind: PlanKind) -> Response {
match kind {
PlanKind::Execution => Response::Execution(Tag::new("OK")),
PlanKind::SingleDocument | PlanKind::MultiRow => {
let col_name = if matches!(kind, PlanKind::SingleDocument) {
"document"
} else {
"result"
};
let schema = Arc::new(vec![text_field(col_name)]);
if payload.is_empty() {
Response::Query(QueryResponse::new(schema, stream::empty()))
} else {
let text = crate::data::executor::response_codec::decode_payload_to_json(payload);
if matches!(kind, PlanKind::MultiRow)
&& let Ok(serde_json::Value::Array(items)) =
serde_json::from_str::<serde_json::Value>(&text)
{
let row_schema = schema.clone();
let rows: Vec<_> = items
.iter()
.map(|item| {
let mut encoder = DataRowEncoder::new(row_schema.clone());
let _ = encoder.encode_field(&item.to_string());
Ok(encoder.take_row())
})
.collect();
return Response::Query(QueryResponse::new(schema, stream::iter(rows)));
}
let mut encoder = DataRowEncoder::new(schema.clone());
if let Err(e) = encoder.encode_field(&text) {
tracing::error!(error = %e, "failed to encode field");
return Response::Execution(Tag::new("ERROR"));
}
let row = encoder.take_row();
Response::Query(QueryResponse::new(schema, stream::iter(vec![Ok(row)])))
}
}
}
}