use conn::Conn;
use conn::futures::new_raw_query_result::NewRawQueryResult;
use conn::futures::query_result::futures::*;
use conn::futures::read_packet::ReadPacket;
use conn::stmt::InnerStmt;
use conn::stmt::new_stmt;
use conn::stmt::Stmt;
use consts;
use either::Either;
use either::Left;
use either::Right;
use errors::*;
use lib_futures::Async;
use lib_futures::Async::Ready;
use lib_futures::Future;
use lib_futures::Poll;
use lib_futures::stream::Stream;
use proto::Column;
use proto::OkPacket;
use proto::Packet;
use proto::PacketType;
use proto::Row;
use std::borrow::Cow;
use std::marker::PhantomData;
use std::mem;
use std::ops::Deref;
use std::sync::Arc;
use value::FromRow;
use value::Value;
pub mod futures;
pub struct ResultSet<T, Q>(pub Vec<T>, Q);
impl<T, Q: QueryResult> ResultSet<T, Q> {
pub fn as_ref(&self) -> &[T] {
&*self.0
}
}
impl<T, Q: QueryResult> Deref for ResultSet<T, Q> {
type Target = Q;
fn deref(&self) -> &Self::Target {
&self.1
}
}
impl<T, Q: QueryResult> IntoIterator for ResultSet<T, Q> {
type Item = T;
type IntoIter = ::std::vec::IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
#[derive(Eq, PartialEq, Copy, Clone, Debug, Hash)]
pub enum Protocol {
Text,
Binary,
}
pub trait ResultKind {
type Output: Sized;
fn protocol() -> Protocol;
}
pub trait InnerResultKind: ResultKind {
fn read_values(packet: Packet, cols: &Arc<Vec<Column>>) -> Result<Vec<Value>>;
fn handle_next_query_result(next: RawQueryResult<Self>) -> Self::Output;
fn handle_end(conn: Conn, inner_stmt: Option<InnerStmt>) -> Self::Output;
}
pub struct BinaryResult;
impl ResultKind for BinaryResult {
type Output = Stmt;
fn protocol() -> Protocol {
Protocol::Binary
}
}
impl InnerResultKind for BinaryResult {
fn read_values(packet: Packet, cols: &Arc<Vec<Column>>) -> Result<Vec<Value>> {
Value::from_bin_payload(packet.as_ref(), cols)
}
fn handle_next_query_result(_: RawQueryResult<Self>) -> Self::Output {
panic!("Binary protocol query can't produce multi-result set");
}
fn handle_end(conn: Conn, inner_stmt: Option<InnerStmt>) -> Self::Output {
new_stmt(inner_stmt.unwrap(), conn)
}
}
pub struct TextResult;
impl ResultKind for TextResult {
type Output = Either<TextQueryResult, Conn>;
fn protocol() -> Protocol {
Protocol::Text
}
}
impl InnerResultKind for TextResult {
fn read_values(packet: Packet, cols: &Arc<Vec<Column>>) -> Result<Vec<Value>> {
Value::from_payload(packet.as_ref(), cols.len())
}
fn handle_next_query_result(next: RawQueryResult<Self>) -> Self::Output {
Left(next.into())
}
fn handle_end(conn: Conn, _: Option<InnerStmt>) -> Self::Output {
Right(conn)
}
}
enum Step<K: ResultKind + ?Sized> {
ReadPacket(ReadPacket),
NextResult(NewRawQueryResult<K>),
Done(Conn),
Consumed,
}
enum Out<K: ResultKind + ?Sized> {
ReadPacket((Conn, Packet)),
NextResult(RawQueryResult<K>),
Done,
}
pub struct RawQueryResult<K: ResultKind + ?Sized> {
step: Step<K>,
columns: Arc<Vec<Column>>,
ok_packet: Option<OkPacket>,
inner_stmt: Option<InnerStmt>,
_phantom: PhantomData<K>,
}
pub fn new_raw<K: ?Sized, T>(mut conn: Conn,
cols: T,
ok_packet: Option<OkPacket>,
inner_stmt: Option<InnerStmt>)
-> RawQueryResult<K>
where T: Into<Arc<Vec<Column>>>,
K: ResultKind,
{
let cols = cols.into();
let step = if cols.len() == 0 {
Step::Done(conn)
} else {
conn.has_result = Some((cols.clone(), ok_packet.clone(), inner_stmt.clone()));
Step::ReadPacket(conn.read_packet())
};
RawQueryResult {
step: step,
columns: cols,
ok_packet: ok_packet,
inner_stmt: inner_stmt,
_phantom: PhantomData,
}
}
impl<K: ResultKind> RawQueryResult<K> {
fn either_poll(&mut self) -> Result<Async<Option<Out<K>>>> {
match self.step {
Step::ReadPacket(ref mut fut) => {
let val = try_ready!(fut.poll());
Ok(Ready(Some(Out::ReadPacket(val))))
},
Step::NextResult(ref mut fut) => {
let val = try_ready!(fut.poll());
Ok(Ready(Some(Out::NextResult(val))))
},
Step::Done(_) => Ok(Ready(Some(Out::Done))),
Step::Consumed => Ok(Ready(None)),
}
}
}
impl<K: ResultKind + InnerResultKind> Stream for RawQueryResult<K> {
type Item = Either<Row, K::Output>;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.either_poll()) {
Some(Out::ReadPacket((mut conn, packet))) => {
if packet.is(PacketType::Eof) {
if conn.status.contains(consts::SERVER_MORE_RESULTS_EXISTS) {
self.step =
Step::NextResult(conn.handle_result_set::<K>(self.inner_stmt.clone()));
self.poll()
} else {
conn.has_result = None;
self.step = Step::Done(conn);
self.poll()
}
} else {
let values = K::read_values(packet, &self.columns)?;
let row = Row::new(values, self.columns.clone());
self.step = Step::ReadPacket(conn.read_packet());
Ok(Ready(Some(Left(row))))
}
},
Some(Out::NextResult(query_result)) => {
let output = K::handle_next_query_result(query_result);
self.step = Step::Consumed;
Ok(Ready(Some(Right(output))))
},
Some(Out::Done) => {
if let Step::Done(mut conn) = mem::replace(&mut self.step, Step::Consumed) {
conn.has_result = None;
let output = K::handle_end(conn, self.inner_stmt.clone());
Ok(Ready(Some(Right(output))))
} else {
unreachable!();
}
},
None => Ok(Ready(None)),
}
}
}
pub trait QueryResultOutput {
type Result: QueryResult;
type Output;
fn into_next_or_output(self,
prev: Self::Result)
-> (Self::Result, Either<Self::Result, Self::Output>);
}
impl QueryResultOutput for Stmt {
type Result = BinQueryResult;
type Output = Stmt;
fn into_next_or_output(self,
prev: BinQueryResult)
-> (Self::Result, Either<Self::Result, Self::Output>) {
(prev, Right(self))
}
}
impl QueryResultOutput for Either<TextQueryResult, Conn> {
type Result = TextQueryResult;
type Output = Conn;
fn into_next_or_output(self,
prev: TextQueryResult)
-> (Self::Result, Either<Self::Result, Self::Output>) {
(prev, self)
}
}
pub trait UnconsumedQueryResult: InnerQueryResult {
type Output: QueryResultOutput;
fn collect<R>(self) -> Collect<R, Self>
where Self: Sized,
R: FromRow,
{
new_collect(self)
}
fn collect_all(self) -> CollectAll<Self>
where Self: Sized,
{
new_collect_all(self)
}
fn map<F, U>(self, fun: F) -> Map<F, U, Self>
where Self: Sized,
F: FnMut(Row) -> U,
{
new_map(self, fun)
}
fn reduce<A, F>(self, init: A, fun: F) -> Reduce<A, F, Self>
where Self: Sized,
F: FnMut(A, Row) -> A,
{
new_reduce(self, init, fun)
}
fn for_each<F>(self, fun: F) -> ForEach<F, Self>
where Self: Sized,
F: FnMut(Row),
{
new_for_each(self, fun)
}
fn drop_result(self) -> DropResult<Self>
where Self: Sized,
{
new_drop_result(self)
}
}
pub trait InnerQueryResult: QueryResult {
fn poll(&mut self) -> Result<Async<Either<Row, Self::Output>>> where Self: UnconsumedQueryResult;
fn ok_packet_ref(&self) -> Option<&OkPacket>;
}
pub trait QueryResult {
fn affected_rows(&self) -> Option<u64>
where Self: InnerQueryResult,
{
self.ok_packet_ref().map(OkPacket::affected_rows)
}
fn last_insert_id(&self) -> Option<u64>
where Self: InnerQueryResult,
{
self.ok_packet_ref().map(OkPacket::last_insert_id)
}
fn warnings(&self) -> Option<u16>
where Self: InnerQueryResult,
{
self.ok_packet_ref().map(OkPacket::warnings)
}
fn info_bytes(&self) -> Option<&[u8]>
where Self: InnerQueryResult,
{
self.ok_packet_ref().map(OkPacket::info_bytes)
}
fn info(&self) -> Option<Cow<str>>
where Self: InnerQueryResult,
{
self.ok_packet_ref().map(OkPacket::info)
}
fn session_state_changes_bytes(&self) -> Option<&[u8]>
where Self: InnerQueryResult,
{
self.ok_packet_ref().and_then(OkPacket::session_state_changes_bytes)
}
fn session_state_changes(&self) -> Option<Cow<str>>
where Self: InnerQueryResult,
{
self.ok_packet_ref().and_then(OkPacket::session_state_changes)
}
}
pub struct TextQueryResult(RawQueryResult<TextResult>);
#[doc(hidden)]
impl From<RawQueryResult<TextResult>> for TextQueryResult {
fn from(raw_query_result: RawQueryResult<TextResult>) -> Self {
TextQueryResult(raw_query_result)
}
}
impl QueryResult for TextQueryResult {}
impl UnconsumedQueryResult for TextQueryResult {
type Output = <TextResult as ResultKind>::Output;
}
impl InnerQueryResult for TextQueryResult {
fn poll(&mut self) -> Result<Async<Either<Row, <Self as UnconsumedQueryResult>::Output>>>
where Self: UnconsumedQueryResult,
{
let result = try_ready!(self.0.poll());
Ok(Ready(result.expect("Can't poll consumed query result")))
}
fn ok_packet_ref(&self) -> Option<&OkPacket> {
self.0.ok_packet.as_ref()
}
}
pub struct BinQueryResult(RawQueryResult<BinaryResult>);
#[doc(hidden)]
impl From<RawQueryResult<BinaryResult>> for BinQueryResult {
fn from(raw_query_result: RawQueryResult<BinaryResult>) -> Self {
BinQueryResult(raw_query_result)
}
}
impl QueryResult for BinQueryResult {}
impl UnconsumedQueryResult for BinQueryResult {
type Output = <BinaryResult as ResultKind>::Output;
}
impl InnerQueryResult for BinQueryResult {
#[doc(hidden)]
fn poll(&mut self) -> Result<Async<Either<Row, <Self as UnconsumedQueryResult>::Output>>>
where Self: UnconsumedQueryResult,
{
let result = try_ready!(self.0.poll());
Ok(Ready(result.expect("Can't poll consumed query result")))
}
#[doc(hidden)]
fn ok_packet_ref(&self) -> Option<&OkPacket> {
self.0.ok_packet.as_ref()
}
}