use futures::Stream;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use uni_common::{Result, UniError};
#[doc(inline)]
pub use uni_common::value::{Edge, FromValue, Node, Path, Value};
#[derive(Debug, Clone, Default)]
pub struct QueryMetrics {
pub parse_time: Duration,
pub plan_time: Duration,
pub exec_time: Duration,
pub total_time: Duration,
pub rows_returned: usize,
pub rows_scanned: usize,
pub bytes_read: usize,
pub plan_cache_hit: bool,
pub l0_reads: usize,
pub storage_reads: usize,
pub cache_hits: usize,
}
#[derive(Debug, Clone)]
pub struct Row {
pub(crate) columns: Arc<Vec<String>>,
pub(crate) values: Vec<Value>,
}
impl Row {
pub fn new(columns: Arc<Vec<String>>, values: Vec<Value>) -> Self {
Self { columns, values }
}
pub fn columns(&self) -> &[String] {
&self.columns
}
pub fn values(&self) -> &[Value] {
&self.values
}
pub fn into_values(self) -> Vec<Value> {
self.values
}
pub fn get<T: FromValue>(&self, column: &str) -> Result<T> {
let idx = self
.columns
.iter()
.position(|c| c == column)
.ok_or_else(|| UniError::Query {
message: format!("Column '{}' not found", column),
query: None,
})?;
self.get_idx(idx)
}
pub fn get_idx<T: FromValue>(&self, index: usize) -> Result<T> {
if index >= self.values.len() {
return Err(UniError::Query {
message: format!("Column index {} out of bounds", index),
query: None,
});
}
T::from_value(&self.values[index])
}
pub fn try_get<T: FromValue>(&self, column: &str) -> Option<T> {
self.get(column).ok()
}
pub fn value(&self, column: &str) -> Option<&Value> {
let idx = self.columns.iter().position(|c| c == column)?;
self.values.get(idx)
}
pub fn as_map(&self) -> HashMap<&str, &Value> {
self.columns
.iter()
.zip(&self.values)
.map(|(col, val)| (col.as_str(), val))
.collect()
}
pub fn to_json(&self) -> serde_json::Value {
serde_json::to_value(self.as_map()).unwrap_or(serde_json::Value::Null)
}
}
impl std::ops::Index<usize> for Row {
type Output = Value;
fn index(&self, index: usize) -> &Self::Output {
&self.values[index]
}
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum QueryWarning {
IndexUnavailable {
label: String,
index_name: String,
reason: String,
},
NoIndexForFilter {
label: String,
property: String,
},
RrfPointContext,
Other(String),
}
impl std::fmt::Display for QueryWarning {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
QueryWarning::IndexUnavailable {
label,
index_name,
reason,
} => {
write!(
f,
"Index '{}' on label '{}' is unavailable: {}",
index_name, label, reason
)
}
QueryWarning::NoIndexForFilter { label, property } => {
write!(
f,
"No index available for filter on {}.{}, using full scan",
label, property
)
}
QueryWarning::RrfPointContext => {
write!(
f,
"RRF fusion degenerated to equal-weight fusion in point-computation context \
(no global ranking available). Consider using method: 'weighted' with explicit weights."
)
}
QueryWarning::Other(msg) => write!(f, "{}", msg),
}
}
}
#[derive(Debug)]
pub struct QueryResult {
pub(crate) columns: Arc<Vec<String>>,
pub(crate) rows: Vec<Row>,
pub(crate) warnings: Vec<QueryWarning>,
pub(crate) metrics: QueryMetrics,
}
impl QueryResult {
#[doc(hidden)]
pub fn new(
columns: Arc<Vec<String>>,
rows: Vec<Row>,
warnings: Vec<QueryWarning>,
metrics: QueryMetrics,
) -> Self {
Self {
columns,
rows,
warnings,
metrics,
}
}
pub fn columns(&self) -> &[String] {
&self.columns
}
pub fn len(&self) -> usize {
self.rows.len()
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
pub fn rows(&self) -> &[Row] {
&self.rows
}
pub fn into_rows(self) -> Vec<Row> {
self.rows
}
pub fn iter(&self) -> impl Iterator<Item = &Row> {
self.rows.iter()
}
pub fn warnings(&self) -> &[QueryWarning] {
&self.warnings
}
pub fn has_warnings(&self) -> bool {
!self.warnings.is_empty()
}
pub fn metrics(&self) -> &QueryMetrics {
&self.metrics
}
#[doc(hidden)]
pub fn update_parse_timing(
&mut self,
parse_time: std::time::Duration,
total_time: std::time::Duration,
) {
self.metrics.parse_time = parse_time;
self.metrics.total_time = total_time;
}
}
impl IntoIterator for QueryResult {
type Item = Row;
type IntoIter = std::vec::IntoIter<Row>;
fn into_iter(self) -> Self::IntoIter {
self.rows.into_iter()
}
}
#[derive(Debug)]
pub struct ExecuteResult {
pub(crate) affected_rows: usize,
pub(crate) nodes_created: usize,
pub(crate) nodes_deleted: usize,
pub(crate) relationships_created: usize,
pub(crate) relationships_deleted: usize,
pub(crate) properties_set: usize,
pub(crate) labels_added: usize,
pub(crate) labels_removed: usize,
pub(crate) metrics: QueryMetrics,
}
impl ExecuteResult {
#[doc(hidden)]
pub fn new(affected_rows: usize) -> Self {
Self {
affected_rows,
nodes_created: 0,
nodes_deleted: 0,
relationships_created: 0,
relationships_deleted: 0,
properties_set: 0,
labels_added: 0,
labels_removed: 0,
metrics: QueryMetrics::default(),
}
}
#[doc(hidden)]
pub fn with_details(
affected_rows: usize,
stats: &uni_store::runtime::l0::MutationStats,
metrics: QueryMetrics,
) -> Self {
Self {
affected_rows,
nodes_created: stats.nodes_created,
nodes_deleted: stats.nodes_deleted,
relationships_created: stats.relationships_created,
relationships_deleted: stats.relationships_deleted,
properties_set: stats.properties_set,
labels_added: stats.labels_added,
labels_removed: stats.labels_removed,
metrics,
}
}
pub fn affected_rows(&self) -> usize {
self.affected_rows
}
pub fn nodes_created(&self) -> usize {
self.nodes_created
}
pub fn nodes_deleted(&self) -> usize {
self.nodes_deleted
}
pub fn relationships_created(&self) -> usize {
self.relationships_created
}
pub fn relationships_deleted(&self) -> usize {
self.relationships_deleted
}
pub fn properties_set(&self) -> usize {
self.properties_set
}
pub fn labels_added(&self) -> usize {
self.labels_added
}
pub fn labels_removed(&self) -> usize {
self.labels_removed
}
pub fn metrics(&self) -> &QueryMetrics {
&self.metrics
}
}
pub struct QueryCursor {
pub(crate) columns: Arc<Vec<String>>,
pub(crate) stream: Pin<Box<dyn Stream<Item = Result<Vec<Row>>> + Send>>,
}
impl QueryCursor {
#[doc(hidden)]
pub fn new(
columns: Arc<Vec<String>>,
stream: Pin<Box<dyn Stream<Item = Result<Vec<Row>>> + Send>>,
) -> Self {
Self { columns, stream }
}
pub fn columns(&self) -> &[String] {
&self.columns
}
pub async fn next_batch(&mut self) -> Option<Result<Vec<Row>>> {
use futures::StreamExt;
self.stream.next().await
}
pub async fn collect_remaining(mut self) -> Result<Vec<Row>> {
use futures::StreamExt;
let mut rows = Vec::new();
while let Some(batch_res) = self.stream.next().await {
rows.extend(batch_res?);
}
Ok(rows)
}
}