1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use std::borrow::Cow;
use futures::{Async, Future, Poll, Stream};
use futures_state_stream::{StateStream, StreamEvent};
use query::{ResultSetStream, ExecFuture, QueryStream};
use stmt::{ForEachRow, SingleResultSet, Statement, StmtStream, ResultStreamExt};
use types::ToSql;
use {BoxableIo, SqlConnection, TdsError};
pub struct Transaction<I: BoxableIo>(SqlConnection<I>);
pub fn new_transaction<I: BoxableIo>(conn: SqlConnection<I>) -> Transaction<I> {
Transaction(conn)
}
pub struct TransactionStream<S> {
stream: Option<S>,
}
impl<S> TransactionStream<S> {
pub fn new(stream: S) -> TransactionStream<S> {
TransactionStream {
stream: Some(stream)
}
}
}
impl<I: BoxableIo, S: StateStream<State = SqlConnection<I>>> StateStream for TransactionStream<S> {
type Item = S::Item;
type State = Transaction<I>;
type Error = S::Error;
fn poll(&mut self) -> Poll<StreamEvent<Self::Item, Self::State>, Self::Error> {
let item = match try_ready!(self.stream.as_mut().unwrap().poll()) {
StreamEvent::Done(conn) => {
self.stream.take();
StreamEvent::Done(Transaction(conn))
},
StreamEvent::Next(next) => StreamEvent::Next(next),
};
Ok(Async::Ready(item))
}
}
impl<I: BoxableIo, R> ResultStreamExt<I> for TransactionStream<R> where R: ResultStreamExt<I> + StateStream<State = SqlConnection<I>> {
fn for_each_row<F>(self, f: F) -> ForEachRow<I, Self, F>
where Self: StateStream<Item=QueryStream<I>, Error=<QueryStream<I> as Stream>::Error>,
F: FnMut(<QueryStream<I> as Stream>::Item) -> Result<(), TdsError>
{
ForEachRow::new(self, f)
}
fn single(self) -> SingleResultSet<I, Self>
where Self: Sized + StateStream<Item=ExecFuture<I>, Error=<ExecFuture<I> as Future>::Error>
{
SingleResultSet::new(self)
}
}
impl<I: BoxableIo + 'static> Transaction<I> {
pub fn simple_exec<'a, Q>(self, query: Q) -> TransactionStream<ResultSetStream<I, ExecFuture<I>>> where Q: Into<Cow<'a, str>> {
TransactionStream::new(self.0.simple_exec(query))
}
pub fn simple_query<'a, Q>(self, query: Q) -> TransactionStream<ResultSetStream<I, QueryStream<I>>> where Q: Into<Cow<'a, str>> {
TransactionStream::new(self.0.simple_query(query))
}
pub fn exec<S: Into<Statement>>(self, stmt: S, params: &[&ToSql]) -> TransactionStream<StmtStream<I, ExecFuture<I>>> {
TransactionStream::new(self.0.exec(stmt, params))
}
pub fn query<S: Into<Statement>>(self, stmt: S, params: &[&ToSql]) -> TransactionStream<StmtStream<I, QueryStream<I>>> {
TransactionStream::new(self.0.query(stmt, params))
}
pub fn prepare<S>(&self, stmt: S) -> Statement where S: Into<Cow<'static, str>> {
self.0.prepare(stmt.into())
}
pub fn commit(self) -> Box<Future<Item=SqlConnection<I>, Error=TdsError>> {
Box::new(self
.internal_exec("COMMIT TRAN")
.and_then(|trans| trans.finish()))
}
pub fn rollback(self) -> Box<Future<Item=SqlConnection<I>, Error=TdsError>> {
Box::new(self
.internal_exec("ROLLBACK TRAN")
.and_then(|trans| trans.finish()))
}
fn finish(self) -> Box<Future<Item=SqlConnection<I>, Error=TdsError>> {
Box::new(self
.internal_exec("set implicit_transactions off")
.and_then(|trans| Ok(trans.0)))
}
fn internal_exec(self, sql: &str) -> Box<Future<Item=Transaction<I>, Error=TdsError>> {
Box::new(self
.simple_exec(sql)
.single()
.and_then(|(result, trans)| {
assert_eq!(result, 0);
Ok(trans)
}))
}
}