1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use std::borrow::Cow;
use futures::{Async, Future, Poll, Stream};
use futures_state_stream::{StateStream, StreamEvent};
use query::{ResultSetStream, ExecFuture, QueryStream};
use stmt::{ForEachRow, SingleResultSet, Statement, StmtStream, ResultStreamExt};
use types::ToSql;
use {BoxableIo, SqlConnection, TdsError};

/// A transaction
pub struct Transaction<I: BoxableIo>(SqlConnection<I>);

pub fn new_transaction<I: BoxableIo>(conn: SqlConnection<I>) -> Transaction<I> {
    Transaction(conn)
}

/// A stream which is a result from an operation which is executed within a transaction
/// This simply wraps the state (which internally is a SqlConnection) in the `Transaction` struct
pub struct TransactionStream<S> {
    stream: Option<S>,
}

impl<S> TransactionStream<S> {
    pub fn new(stream: S) -> TransactionStream<S> {
        TransactionStream {
            stream: Some(stream)
        }
    }
}

impl<I: BoxableIo, S: StateStream<State = SqlConnection<I>>> StateStream for TransactionStream<S> {
    type Item = S::Item;
    type State = Transaction<I>;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<StreamEvent<Self::Item, Self::State>, Self::Error> {
        let item = match try_ready!(self.stream.as_mut().unwrap().poll()) {
            StreamEvent::Done(conn) => {
                self.stream.take();
                StreamEvent::Done(Transaction(conn))
            },
            StreamEvent::Next(next) => StreamEvent::Next(next),
        };
        Ok(Async::Ready(item))
    }
}

impl<I: BoxableIo, R> ResultStreamExt<I> for TransactionStream<R> where R: ResultStreamExt<I> + StateStream<State = SqlConnection<I>> {
    fn for_each_row<F>(self, f: F) -> ForEachRow<I, Self, F>
        where Self: StateStream<Item=QueryStream<I>, Error=<QueryStream<I> as Stream>::Error>,
              F: FnMut(<QueryStream<I> as Stream>::Item) -> Result<(), TdsError>
    {
        ForEachRow::new(self, f)
    }

    fn single(self) -> SingleResultSet<I, Self>
        where Self: Sized + StateStream<Item=ExecFuture<I>, Error=<ExecFuture<I> as Future>::Error> 
    {
        SingleResultSet::new(self)
    }
}

impl<I: BoxableIo + 'static> Transaction<I> {
    pub fn simple_exec<'a, Q>(self, query: Q) -> TransactionStream<ResultSetStream<I, ExecFuture<I>>> where Q: Into<Cow<'a, str>> {
        TransactionStream::new(self.0.simple_exec(query))
    }

    pub fn simple_query<'a, Q>(self, query: Q) -> TransactionStream<ResultSetStream<I, QueryStream<I>>> where Q: Into<Cow<'a, str>> {
        TransactionStream::new(self.0.simple_query(query))
    }

    pub fn exec<S: Into<Statement>>(self, stmt: S, params: &[&ToSql]) -> TransactionStream<StmtStream<I, ExecFuture<I>>> {
        TransactionStream::new(self.0.exec(stmt, params))
    }

    pub fn query<S: Into<Statement>>(self, stmt: S, params: &[&ToSql]) -> TransactionStream<StmtStream<I, QueryStream<I>>> {
        TransactionStream::new(self.0.query(stmt, params))
    }

    pub fn prepare<S>(&self, stmt: S) -> Statement where S: Into<Cow<'static, str>> {
        self.0.prepare(stmt.into())
    }

    /// Commits a transaction
    pub fn commit(self) -> Box<Future<Item=SqlConnection<I>, Error=TdsError>> {
        Box::new(self
            .internal_exec("COMMIT TRAN")
            .and_then(|trans| trans.finish()))
    }

    /// Rollback a transaction
    pub fn rollback(self) -> Box<Future<Item=SqlConnection<I>, Error=TdsError>> {
        Box::new(self
            .internal_exec("ROLLBACK TRAN")
            .and_then(|trans| trans.finish()))
    }

    /// convert back to a normal connection (enable auto commit)
    fn finish(self) -> Box<Future<Item=SqlConnection<I>, Error=TdsError>> {
        Box::new(self
            .internal_exec("set implicit_transactions off")
            .and_then(|trans| Ok(trans.0)))
    }

    /// executes an internal statement and checks if it succeeded
    fn internal_exec(self, sql: &str) -> Box<Future<Item=Transaction<I>, Error=TdsError>> {
        Box::new(self
            .simple_exec(sql)
            .single()
            .and_then(|(result, trans)| {
                assert_eq!(result, 0);
                Ok(trans)
            }))
    }
}