use std::sync::Arc;
use crate::error::Error;
use crate::protocol::{IdAllocator, QueryResult, Request, Response};
use crate::transport::DispatcherHandle;
struct StreamState {
rows: std::vec::IntoIter<serde_json::Map<String, serde_json::Value>>,
cont_id: Option<String>,
done: bool,
handle: DispatcherHandle,
ids: Arc<IdAllocator>,
}
impl Drop for StreamState {
fn drop(&mut self) {
if !self.done {
if let Some(cont_id) = self.cont_id.take() {
spawn_close(self.handle.clone(), cont_id);
}
}
}
}
#[derive(Debug)]
pub struct Query {
cont_id: String,
handle: DispatcherHandle,
}
impl Query {
pub(crate) fn new(cont_id: String, handle: DispatcherHandle) -> Self {
Self { cont_id, handle }
}
pub async fn execute(&self, ids: &IdAllocator) -> crate::Result<Rows> {
self.execute_inner(ids, None).await
}
pub async fn execute_with(
&self,
ids: &IdAllocator,
params: &[serde_json::Value],
) -> crate::Result<Rows> {
self.execute_inner(ids, Some(params.to_vec())).await
}
async fn execute_inner(
&self,
ids: &IdAllocator,
params: Option<Vec<serde_json::Value>>,
) -> crate::Result<Rows> {
let id = ids.next();
let request = Request::Execute {
id: id.clone(),
cont_id: self.cont_id.clone(),
parameters: params,
};
let resp = self.handle.send(request).await?;
match resp {
Response::QueryResult(q) if q.id == id => Ok(Rows::new(q, self.handle.clone())),
Response::Error(e) => Err(crate::job_helpers::server_error(e)),
ref other => Err(crate::job_helpers::unexpected(other)),
}
}
pub async fn execute_batch(
&self,
ids: &IdAllocator,
batches: &[&[serde_json::Value]],
) -> crate::Result<Vec<Rows>> {
let mut out = Vec::with_capacity(batches.len());
for params in batches {
out.push(self.execute_with(ids, params).await?);
}
Ok(out)
}
}
impl Drop for Query {
fn drop(&mut self) {
spawn_close(self.handle.clone(), self.cont_id.clone());
}
}
pub(crate) fn spawn_close(handle: DispatcherHandle, cont_id: String) {
let id = format!("close-{cont_id}");
crate::job_helpers::spawn_best_effort(async move {
let _ = handle.send(Request::SqlClose { id, cont_id }).await;
});
}
#[derive(Debug)]
pub struct Rows {
inner: QueryResult,
handle: DispatcherHandle,
}
impl Rows {
pub(crate) fn new(inner: QueryResult, handle: DispatcherHandle) -> Self {
Self { inner, handle }
}
#[must_use]
pub fn update_count(&self) -> Option<i64> {
if self.inner.has_results || self.inner.update_count < 0 {
None
} else {
Some(self.inner.update_count)
}
}
#[must_use]
pub fn has_results(&self) -> bool {
self.inner.has_results
}
#[must_use]
pub fn columns(&self) -> Option<&[crate::protocol::Column]> {
if self.inner.has_results {
Some(&self.inner.metadata.columns)
} else {
None
}
}
#[must_use]
pub fn execution_time(&self) -> std::time::Duration {
std::time::Duration::from_secs_f64(self.inner.execution_time / 1000.0)
}
pub fn stream(mut self) -> impl futures::Stream<Item = crate::Result<Row>> {
use futures::stream::unfold;
let cont_id = self.inner.cont_id.take();
let data = std::mem::take(&mut self.inner.data);
let handle = self.handle.clone();
let ids = Arc::new(IdAllocator::new());
unfold(
StreamState {
rows: data.into_iter(),
cont_id,
done: self.inner.is_done,
handle,
ids,
},
|mut state| async move {
if let Some(row_data) = state.rows.next() {
return Some((Ok(Row::from_map(row_data)), state));
}
if state.done {
return None;
}
let cont_id = match &state.cont_id {
Some(c) => c.clone(),
None => return None,
};
let id = state.ids.next();
let resp = state
.handle
.send(Request::SqlMore {
id: id.clone(),
cont_id,
rows: 100,
})
.await;
match resp {
Ok(Response::QueryResult(q)) if q.id == id => {
state.rows = q.data.into_iter();
state.done = q.is_done;
state.cont_id = q.cont_id;
if let Some(row_data) = state.rows.next() {
Some((Ok(Row::from_map(row_data)), state))
} else if state.done {
None
} else {
if let Some(cid) = state.cont_id.take() {
spawn_close(state.handle.clone(), cid);
}
state.done = true;
Some((
Err(Error::Internal(
"server returned empty page without is_done".into(),
)),
state,
))
}
}
Ok(Response::Error(e)) => {
Some((Err(crate::job_helpers::server_error(e)), state))
}
Ok(other) => Some((Err(crate::job_helpers::unexpected(&other)), state)),
Err(e) => Some((Err(e), state)),
}
},
)
}
pub async fn into_typed<T>(self) -> crate::Result<Vec<T>>
where
T: crate::FromRow,
{
use futures::{StreamExt, TryStreamExt};
self.stream()
.map(|res| res.and_then(|row| T::from_row(&row)))
.try_collect()
.await
}
pub fn stream_typed<T>(self) -> impl futures::Stream<Item = crate::Result<T>> + Send
where
T: crate::FromRow + Send + 'static,
{
use futures::StreamExt;
self.stream()
.map(|res| res.and_then(|row| T::from_row(&row)))
}
pub async fn into_dynamic(self) -> crate::Result<Vec<Row>> {
use futures::TryStreamExt;
self.stream().try_collect().await
}
}
impl Drop for Rows {
fn drop(&mut self) {
if !self.inner.is_done {
if let Some(cont_id) = self.inner.cont_id.take() {
spawn_close(self.handle.clone(), cont_id);
}
}
}
}
#[derive(Debug, Clone)]
pub struct Row {
data: serde_json::Map<String, serde_json::Value>,
}
impl Row {
pub fn get<T: serde::de::DeserializeOwned>(&self, column: &str) -> crate::Result<T> {
use crate::error::DecodeError;
let value = self.data.get(column).ok_or_else(|| Error::Decode {
column: Some(column.to_owned()),
source: DecodeError::MissingColumn(column.to_owned()),
})?;
T::deserialize(value).map_err(|e| Error::Decode {
column: Some(column.to_owned()),
source: DecodeError::Serde(e.to_string()),
})
}
#[must_use]
pub fn try_get<T: serde::de::DeserializeOwned>(
&self,
column: &str,
) -> Option<crate::Result<T>> {
use crate::error::DecodeError;
let value = self.data.get(column)?;
Some(T::deserialize(value).map_err(|e| Error::Decode {
column: Some(column.to_owned()),
source: DecodeError::Serde(e.to_string()),
}))
}
pub(crate) fn map(&self) -> &serde_json::Map<String, serde_json::Value> {
&self.data
}
pub(crate) fn from_map(data: serde_json::Map<String, serde_json::Value>) -> Self {
Self { data }
}
}