use futures::stream::StreamExt;
use nautilus_connector::{execute_all, Executor, SqliteExecutor};
use nautilus_core::Value;
use nautilus_dialect::Sql;
async fn setup_executor() -> nautilus_connector::ConnectorResult<SqliteExecutor> {
SqliteExecutor::new("sqlite::memory:").await
}
async fn setup_test_table(executor: &SqliteExecutor) -> nautilus_connector::ConnectorResult<()> {
let create_table = Sql {
text: r#"
CREATE TABLE IF NOT EXISTS test_users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT,
age INTEGER,
score REAL,
active BOOLEAN,
data BLOB
)
"#
.to_string(),
params: vec![],
};
execute_all(executor, &create_table).await?;
Ok(())
}
#[tokio::test]
async fn test_sqlite_executor_connection() {
let executor = setup_executor().await;
assert!(
executor.is_ok(),
"Failed to connect to SQLite: {:?}",
executor.err()
);
}
#[tokio::test]
async fn test_execute_query_no_results() {
let executor = setup_executor().await.expect("Failed to create executor");
setup_test_table(&executor)
.await
.expect("Failed to setup table");
let sql = Sql {
text: "SELECT * FROM test_users WHERE id = ?".to_string(),
params: vec![Value::I64(999)],
};
let rows = execute_all(&executor, &sql)
.await
.expect("Failed to execute query");
assert_eq!(rows.len(), 0, "Expected no rows");
}
#[tokio::test]
async fn test_insert_and_select() {
let executor = setup_executor().await.expect("Failed to create executor");
setup_test_table(&executor)
.await
.expect("Failed to setup table");
let insert = Sql {
text: "INSERT INTO test_users (id, name, email, age, score, active, data) VALUES (?, ?, ?, ?, ?, ?, ?)".to_string(),
params: vec![
Value::I64(1),
Value::String("Alice".to_string()),
Value::String("alice@example.com".to_string()),
Value::I64(30),
Value::F64(95.5),
Value::Bool(true),
Value::Bytes(vec![1, 2, 3]),
],
};
execute_all(&executor, &insert)
.await
.expect("Failed to insert");
let select = Sql {
text: "SELECT id, name, email, age, score, active, data FROM test_users WHERE id = ?"
.to_string(),
params: vec![Value::I64(1)],
};
let rows = execute_all(&executor, &select)
.await
.expect("Failed to select");
assert_eq!(rows.len(), 1, "Expected 1 row");
let row = &rows[0];
assert_eq!(row.get("id"), Some(&Value::I64(1)));
assert_eq!(row.get("name"), Some(&Value::String("Alice".to_string())));
assert_eq!(
row.get("email"),
Some(&Value::String("alice@example.com".to_string()))
);
assert_eq!(row.get("age"), Some(&Value::I64(30)));
assert_eq!(row.get("score"), Some(&Value::F64(95.5)));
assert_eq!(row.get("active"), Some(&Value::Bool(true)));
assert_eq!(row.get("data"), Some(&Value::Bytes(vec![1, 2, 3])));
}
#[tokio::test]
async fn test_row_positional_access() {
let executor = setup_executor().await.expect("Failed to create executor");
setup_test_table(&executor)
.await
.expect("Failed to setup table");
let insert = Sql {
text: "INSERT INTO test_users (id, name, age, score, active) VALUES (?, ?, ?, ?, ?)"
.to_string(),
params: vec![
Value::I64(2),
Value::String("Bob".to_string()),
Value::I64(25),
Value::F64(88.0),
Value::Bool(false),
],
};
execute_all(&executor, &insert)
.await
.expect("Failed to insert");
let select = Sql {
text: "SELECT id, name, email FROM test_users WHERE id = ?".to_string(),
params: vec![Value::I64(2)],
};
let rows = execute_all(&executor, &select)
.await
.expect("Failed to select");
assert_eq!(rows.len(), 1);
let row = &rows[0];
assert_eq!(row.get_by_pos(0), Some(&Value::I64(2)));
assert_eq!(row.get_by_pos(1), Some(&Value::String("Bob".to_string())));
assert_eq!(row.get_by_pos(2), Some(&Value::Null));
assert_eq!(row.get_by_pos(3), None);
}
#[tokio::test]
async fn test_row_iterator() {
let executor = setup_executor().await.expect("Failed to create executor");
setup_test_table(&executor)
.await
.expect("Failed to setup table");
let insert = Sql {
text: "INSERT INTO test_users (id, name, email, age, score, active, data) VALUES (?, ?, ?, ?, ?, ?, ?)".to_string(),
params: vec![
Value::I64(3),
Value::String("Charlie".to_string()),
Value::String("charlie@example.com".to_string()),
Value::I64(35),
Value::F64(92.0),
Value::Bool(true),
Value::Bytes(vec![]),
],
};
execute_all(&executor, &insert)
.await
.expect("Failed to insert");
let select = Sql {
text: "SELECT id, name FROM test_users WHERE id = ?".to_string(),
params: vec![Value::I64(3)],
};
let rows = execute_all(&executor, &select)
.await
.expect("Failed to select");
let row = &rows[0];
let columns: Vec<_> = row.iter().collect();
assert_eq!(columns.len(), 2);
assert_eq!(columns[0].0, "id");
assert_eq!(columns[0].1, &Value::I64(3));
assert_eq!(columns[1].0, "name");
assert_eq!(columns[1].1, &Value::String("Charlie".to_string()));
}
#[tokio::test]
async fn test_null_values() {
let executor = setup_executor().await.expect("Failed to create executor");
setup_test_table(&executor)
.await
.expect("Failed to setup table");
let insert = Sql {
text: "INSERT INTO test_users (id, name) VALUES (?, ?)".to_string(),
params: vec![Value::I64(4), Value::String("David".to_string())],
};
execute_all(&executor, &insert)
.await
.expect("Failed to insert");
let select = Sql {
text: "SELECT id, name, email, age, score, active, data FROM test_users WHERE id = ?"
.to_string(),
params: vec![Value::I64(4)],
};
let rows = execute_all(&executor, &select)
.await
.expect("Failed to select");
let row = &rows[0];
assert_eq!(row.get("id"), Some(&Value::I64(4)));
assert_eq!(row.get("name"), Some(&Value::String("David".to_string())));
assert_eq!(row.get("email"), Some(&Value::Null));
assert_eq!(row.get("age"), Some(&Value::Null));
assert_eq!(row.get("score"), Some(&Value::Null));
assert_eq!(row.get("active"), Some(&Value::Null));
assert_eq!(row.get("data"), Some(&Value::Null));
}
#[tokio::test]
async fn test_multiple_rows() {
let executor = setup_executor().await.expect("Failed to create executor");
setup_test_table(&executor)
.await
.expect("Failed to setup table");
for i in 10..15i64 {
let insert = Sql {
text: "INSERT INTO test_users (id, name, email, age, score, active) VALUES (?, ?, ?, ?, ?, ?)".to_string(),
params: vec![
Value::I64(i),
Value::String(format!("User{}", i)),
Value::String(format!("user{}@example.com", i)),
Value::I64(20 + i),
Value::F64(80.0 + i as f64),
Value::Bool(i % 2 == 0),
],
};
execute_all(&executor, &insert)
.await
.expect("Failed to insert");
}
let select = Sql {
text: "SELECT id, name FROM test_users WHERE id >= ? ORDER BY id".to_string(),
params: vec![Value::I64(10)],
};
let rows = execute_all(&executor, &select)
.await
.expect("Failed to select");
assert_eq!(rows.len(), 5);
for (i, row) in rows.iter().enumerate() {
let expected_id = 10 + i as i64;
assert_eq!(row.get("id"), Some(&Value::I64(expected_id)));
assert_eq!(
row.get("name"),
Some(&Value::String(format!("User{}", expected_id)))
);
}
}
#[tokio::test]
async fn test_streaming_execution() {
let executor = setup_executor().await.expect("Failed to create executor");
setup_test_table(&executor)
.await
.expect("Failed to setup table");
for i in 20..25i64 {
let insert = Sql {
text: "INSERT INTO test_users (id, name, email, age, score, active) VALUES (?, ?, ?, ?, ?, ?)".to_string(),
params: vec![
Value::I64(i),
Value::String(format!("StreamUser{}", i)),
Value::String(format!("stream{}@example.com", i)),
Value::I64(20 + i),
Value::F64(80.0 + i as f64),
Value::Bool(true),
],
};
execute_all(&executor, &insert)
.await
.expect("Failed to insert");
}
let select = Sql {
text: "SELECT id, name FROM test_users WHERE id >= ? ORDER BY id".to_string(),
params: vec![Value::I64(20)],
};
let mut stream = executor.execute(&select);
let mut count = 0;
let mut collected_ids = Vec::new();
while let Some(result) = stream.next().await {
let row = result.expect("Failed to get row from stream");
let id = match row.get("id") {
Some(Value::I64(i)) => *i,
_ => panic!("Expected I64 id"),
};
collected_ids.push(id);
count += 1;
}
assert_eq!(count, 5, "Expected 5 rows from stream");
assert_eq!(collected_ids, vec![20, 21, 22, 23, 24]);
}
#[tokio::test]
async fn test_duplicate_column_names() {
let executor = setup_executor().await.expect("Failed to create executor");
setup_test_table(&executor)
.await
.expect("Failed to setup table");
let insert = Sql {
text:
"INSERT INTO test_users (id, name, email, age, score, active) VALUES (?, ?, ?, ?, ?, ?)"
.to_string(),
params: vec![
Value::I64(5),
Value::String("Eve".to_string()),
Value::String("eve@example.com".to_string()),
Value::I64(28),
Value::F64(90.0),
Value::Bool(true),
],
};
execute_all(&executor, &insert)
.await
.expect("Failed to insert");
let select = Sql {
text: "SELECT id, name, id FROM test_users WHERE id = ?".to_string(),
params: vec![Value::I64(5)],
};
let rows = execute_all(&executor, &select)
.await
.expect("Failed to select");
let row = &rows[0];
assert_eq!(row.get("id"), Some(&Value::I64(5)));
assert_eq!(row.len(), 3);
assert_eq!(row.get_by_pos(0), Some(&Value::I64(5)));
assert_eq!(row.get_by_pos(1), Some(&Value::String("Eve".to_string())));
assert_eq!(row.get_by_pos(2), Some(&Value::I64(5)));
}
#[tokio::test]
async fn test_returning_clause() {
let executor = setup_executor().await.expect("Failed to create executor");
setup_test_table(&executor)
.await
.expect("Failed to setup table");
let insert = Sql {
text: "INSERT INTO test_users (id, name, email) VALUES (?, ?, ?) RETURNING id, name, email"
.to_string(),
params: vec![
Value::I64(100),
Value::String("Returning".to_string()),
Value::String("ret@example.com".to_string()),
],
};
let rows = execute_all(&executor, &insert)
.await
.expect("Failed to insert with RETURNING");
assert_eq!(rows.len(), 1);
let row = &rows[0];
assert_eq!(row.get("id"), Some(&Value::I64(100)));
assert_eq!(
row.get("name"),
Some(&Value::String("Returning".to_string()))
);
assert_eq!(
row.get("email"),
Some(&Value::String("ret@example.com".to_string()))
);
}
#[tokio::test]
async fn test_update_returning() {
let executor = setup_executor().await.expect("Failed to create executor");
setup_test_table(&executor)
.await
.expect("Failed to setup table");
let insert = Sql {
text: "INSERT INTO test_users (id, name, email) VALUES (?, ?, ?)".to_string(),
params: vec![
Value::I64(101),
Value::String("Before".to_string()),
Value::String("before@example.com".to_string()),
],
};
execute_all(&executor, &insert)
.await
.expect("Failed to insert");
let update = Sql {
text: "UPDATE test_users SET name = ?, email = ? WHERE id = ? RETURNING id, name, email"
.to_string(),
params: vec![
Value::String("After".to_string()),
Value::String("after@example.com".to_string()),
Value::I64(101),
],
};
let rows = execute_all(&executor, &update)
.await
.expect("Failed to update with RETURNING");
assert_eq!(rows.len(), 1);
let row = &rows[0];
assert_eq!(row.get("id"), Some(&Value::I64(101)));
assert_eq!(row.get("name"), Some(&Value::String("After".to_string())));
assert_eq!(
row.get("email"),
Some(&Value::String("after@example.com".to_string()))
);
}
#[tokio::test]
async fn test_delete_returning() {
let executor = setup_executor().await.expect("Failed to create executor");
setup_test_table(&executor)
.await
.expect("Failed to setup table");
let insert = Sql {
text: "INSERT INTO test_users (id, name, email) VALUES (?, ?, ?)".to_string(),
params: vec![
Value::I64(102),
Value::String("ToDelete".to_string()),
Value::String("delete@example.com".to_string()),
],
};
execute_all(&executor, &insert)
.await
.expect("Failed to insert");
let delete = Sql {
text: "DELETE FROM test_users WHERE id = ? RETURNING id, name, email".to_string(),
params: vec![Value::I64(102)],
};
let rows = execute_all(&executor, &delete)
.await
.expect("Failed to delete with RETURNING");
assert_eq!(rows.len(), 1);
let row = &rows[0];
assert_eq!(row.get("id"), Some(&Value::I64(102)));
assert_eq!(
row.get("name"),
Some(&Value::String("ToDelete".to_string()))
);
assert_eq!(
row.get("email"),
Some(&Value::String("delete@example.com".to_string()))
);
let select = Sql {
text: "SELECT * FROM test_users WHERE id = ?".to_string(),
params: vec![Value::I64(102)],
};
let rows = execute_all(&executor, &select)
.await
.expect("Failed to select");
assert_eq!(rows.len(), 0);
}
#[tokio::test]
async fn test_client_sqlite_constructor() {
use nautilus_connector::Client;
let client = Client::sqlite("sqlite::memory:").await;
assert!(
client.is_ok(),
"Failed to create SQLite client: {:?}",
client.err()
);
}