use std::any::Any;
use std::marker::PhantomData;
use std::mem;
use hyper::Client;
use hyper::header::{Headers, Location};
use time::{self, Tm};
use ::error::{GraphError, Neo4jError};
use super::result::{CypherResult, ResultTrait};
use super::statement::Statement;
const DATETIME_RFC822: &'static str = "%a, %d %b %Y %T %Z";
pub struct Created;
pub struct Started;
#[derive(Debug, Deserialize)]
struct TransactionInfo {
expires: String,
}
#[derive(Debug, Deserialize)]
struct TransactionResult {
commit: String,
transaction: TransactionInfo,
results: Vec<CypherResult>,
errors: Vec<Neo4jError>,
}
impl ResultTrait for TransactionResult {
fn results(&self) -> &Vec<CypherResult> {
&self.results
}
fn errors(&self) -> &Vec<Neo4jError> {
&self.errors
}
}
#[derive(Deserialize)]
#[allow(dead_code)]
struct CommitResult {
results: Vec<CypherResult>,
errors: Vec<Neo4jError>,
}
impl ResultTrait for CommitResult {
fn results(&self) -> &Vec<CypherResult> {
&self.results
}
fn errors(&self) -> &Vec<Neo4jError> {
&self.errors
}
}
pub struct Transaction<'a, State: Any = Created> {
transaction: String,
commit: String,
expires: Tm,
client: Client,
headers: &'a Headers,
statements: Vec<Statement>,
_state: PhantomData<State>,
}
impl<'a, State: Any> Transaction<'a, State> {
pub fn add_statement<S: Into<Statement>>(&mut self, statement: S) {
self.statements.push(statement.into());
}
pub fn get_expires(&self) -> &Tm {
&self.expires
}
}
impl<'a> Transaction<'a, Created> {
pub fn new(endpoint: &str, headers: &'a Headers) -> Transaction<'a, Created> {
Transaction {
transaction: endpoint.to_owned(),
commit: endpoint.to_owned(),
expires: time::now_utc(),
client: Client::new(),
headers: headers,
statements: vec![],
_state: PhantomData,
}
}
pub fn with_statement<S: Into<Statement>>(mut self, statement: S) -> Self {
self.add_statement(statement);
self
}
pub fn begin(self) -> Result<(Transaction<'a, Started>, Vec<CypherResult>), GraphError> {
debug!("Beginning transaction");
let mut res = super::send_query(&self.client,
&self.transaction,
self.headers,
self.statements)?;
let mut result: TransactionResult = super::parse_response(&mut res)?;
let transaction = res.headers.get::<Location>()
.map(|location| location.0.to_owned())
.ok_or_else(|| {
error!("No transaction URI returned from server");
GraphError::Transaction("No transaction URI returned from server".to_owned())
})?;
let expires = time::strptime(&mut result.transaction.expires, DATETIME_RFC822)?;
debug!("Transaction started at {}, expires in {}", transaction, expires.rfc822z());
let transaction = Transaction {
transaction: transaction,
commit: result.commit,
expires: expires,
client: self.client,
headers: self.headers,
statements: Vec::new(),
_state: PhantomData,
};
Ok((transaction, result.results))
}
}
impl<'a> Transaction<'a, Started> {
pub fn with_statement<S: Into<Statement>>(&mut self, statement: S) -> &mut Self {
self.add_statement(statement);
self
}
pub fn exec<S: Into<Statement>>(&mut self, statement: S) -> Result<CypherResult, GraphError> {
self.statements.clear();
self.add_statement(statement);
let mut results = self.send()?;
let result = results.pop()
.ok_or(GraphError::Statement("Server returned no results".to_owned()))?;
Ok(result)
}
pub fn send(&mut self) -> Result<Vec<CypherResult>, GraphError> {
let mut statements = vec![];
mem::swap(&mut statements, &mut self.statements);
let mut res = super::send_query(&self.client,
&self.transaction,
self.headers,
statements)?;
let mut result: TransactionResult = super::parse_response(&mut res)?;
self.expires = time::strptime(&mut result.transaction.expires, DATETIME_RFC822)?;
Ok(result.results)
}
pub fn commit(self) -> Result<Vec<CypherResult>, GraphError> {
debug!("Commiting transaction {}", self.transaction);
let mut res = super::send_query(&self.client,
&self.commit,
self.headers,
self.statements)?;
let result: CommitResult = super::parse_response(&mut res)?;
debug!("Transaction commited {}", self.transaction);
Ok(result.results)
}
pub fn rollback(self) -> Result<(), GraphError> {
debug!("Rolling back transaction {}", self.transaction);
let mut res = self.client.delete(&self.transaction)
.headers(self.headers.clone())
.send()?;
super::parse_response::<CommitResult>(&mut res)?;
debug!("Transaction rolled back {}", self.transaction);
Ok(())
}
pub fn reset_timeout(&mut self) -> Result<(), GraphError> {
super::send_query(&self.client,
&self.transaction,
self.headers,
vec![])
.map(|_| ())
}
}
#[cfg(test)]
mod tests {
use super::*;
use hyper::header::{Authorization, Basic, ContentType, Headers};
const URL: &'static str = "http://neo4j:neo4j@localhost:7474/db/data/transaction";
fn get_headers() -> Headers {
let mut headers = Headers::new();
headers.set(Authorization(
Basic {
username: "neo4j".to_owned(),
password: Some("neo4j".to_owned()),
}
));
headers.set(ContentType::json());
headers
}
#[test]
fn begin_transaction() {
let headers = get_headers();
let transaction = Transaction::new(URL, &headers);
let result = transaction.begin().unwrap();
assert_eq!(result.1.len(), 0);
}
#[test]
fn create_node_and_commit() {
let headers = get_headers();
Transaction::new(URL, &headers)
.with_statement("CREATE (n:TEST_TRANSACTION_CREATE_COMMIT { name: 'Rust', safe: true })")
.begin().unwrap()
.0.commit().unwrap();
let (transaction, results) = Transaction::new(URL, &headers)
.with_statement("MATCH (n:TEST_TRANSACTION_CREATE_COMMIT) RETURN n")
.begin().unwrap();
assert_eq!(results[0].data.len(), 1);
transaction.rollback().unwrap();
Transaction::new(URL, &headers)
.with_statement("MATCH (n:TEST_TRANSACTION_CREATE_COMMIT) DELETE n")
.begin().unwrap()
.0.commit().unwrap();
}
#[test]
fn create_node_and_rollback() {
let headers = get_headers();
let (mut transaction, _) = Transaction::new(URL, &headers)
.with_statement("CREATE (n:TEST_TRANSACTION_CREATE_ROLLBACK { name: 'Rust', safe: true })")
.begin().unwrap();
let result = transaction
.exec("MATCH (n:TEST_TRANSACTION_CREATE_ROLLBACK) RETURN n")
.unwrap();
assert_eq!(result.data.len(), 1);
transaction.rollback().unwrap();
let (transaction, results) = Transaction::new(URL, &headers)
.with_statement("MATCH (n:TEST_TRANSACTION_CREATE_ROLLBACK) RETURN n")
.begin().unwrap();
assert_eq!(results[0].data.len(), 0);
transaction.rollback().unwrap();
}
#[test]
fn query_open_transaction() {
let headers = get_headers();
let (mut transaction, _) = Transaction::new(URL, &headers).begin().unwrap();
let result = transaction
.exec(
"CREATE (n:TEST_TRANSACTION_QUERY_OPEN { name: 'Rust', safe: true }) RETURN n")
.unwrap();
assert_eq!(result.data.len(), 1);
transaction.rollback().unwrap();
}
}