use crate::config::Config;
use crate::connect::Connect;
use crate::pools::Pools;
use json::JsonValue;
mod config;
pub mod connect;
mod error;
mod format;
mod packet;
pub mod pools;
pub use error::PgsqlError;
#[derive(Clone, Debug)]
pub struct Pgsql {
pub config: Config,
}
impl Pgsql {
pub fn new(config: &JsonValue) -> Result<Self, PgsqlError> {
Ok(Self {
config: Config::new(config),
})
}
pub fn connect(&mut self) -> Result<Connect, PgsqlError> {
Connect::new(self.config.clone())
}
pub fn pools(&mut self) -> Result<Pools, PgsqlError> {
let size = self.config.pool_max.max(1) as usize;
Pools::new(self.config.clone(), size)
}
}
#[cfg(test)]
mod tests {
use super::Pgsql;
use std::io::{Read, Write};
use std::net::TcpListener;
use std::thread;
use std::time::Duration;
fn pg_msg(tag: u8, payload: &[u8]) -> Vec<u8> {
let mut m = Vec::with_capacity(5 + payload.len());
m.push(tag);
m.extend(&((payload.len() as u32 + 4).to_be_bytes()));
m.extend_from_slice(payload);
m
}
fn pg_auth(auth_type: u32, extra: &[u8]) -> Vec<u8> {
let mut body = Vec::new();
body.extend(&auth_type.to_be_bytes());
body.extend_from_slice(extra);
pg_msg(b'R', &body)
}
fn post_auth_ok() -> Vec<u8> {
let mut v = Vec::new();
v.extend(pg_auth(0, &[]));
v.extend(pg_msg(b'S', b"server_version\x0015.0\x00"));
let mut k = Vec::new();
k.extend(&1u32.to_be_bytes());
k.extend(&2u32.to_be_bytes());
v.extend(pg_msg(b'K', &k));
v.extend(pg_msg(b'Z', b"I"));
v
}
fn simple_query_response() -> Vec<u8> {
let mut r = Vec::new();
r.extend(pg_msg(b'1', &[]));
r.extend(pg_msg(b'2', &[]));
let mut rd = Vec::new();
rd.extend(&1u16.to_be_bytes());
rd.extend(b"c\x00");
rd.extend(&0u32.to_be_bytes());
rd.extend(&1u16.to_be_bytes());
rd.extend(&23u32.to_be_bytes());
rd.extend(&4i16.to_be_bytes());
rd.extend(&(-1i32).to_be_bytes());
rd.extend(&0u16.to_be_bytes());
r.extend(pg_msg(b'T', &rd));
let mut dr = Vec::new();
dr.extend(&1u16.to_be_bytes());
dr.extend(&1u32.to_be_bytes());
dr.push(b'1');
r.extend(pg_msg(b'D', &dr));
r.extend(pg_msg(b'C', b"SELECT 1\x00"));
r.extend(pg_msg(b'Z', b"I"));
r
}
fn spawn_cleartext_server() -> u16 {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
thread::spawn(move || {
listener.set_nonblocking(true).unwrap();
let deadline = std::time::Instant::now() + Duration::from_secs(5);
loop {
if std::time::Instant::now() >= deadline {
break;
}
match listener.accept() {
Ok((s, _)) => {
s.set_nonblocking(false).ok();
thread::spawn(move || {
s.set_read_timeout(Some(Duration::from_secs(5))).ok();
let mut s = s;
let mut buf = [0u8; 4096];
if s.read(&mut buf).unwrap_or(0) == 0 {
return;
}
let _ = s.write_all(&pg_auth(3, &[]));
if s.read(&mut buf).unwrap_or(0) == 0 {
return;
}
let _ = s.write_all(&post_auth_ok());
loop {
match s.read(&mut buf) {
Ok(0) | Err(_) => break,
Ok(_) => {
let _ = s.write_all(&simple_query_response());
}
}
}
});
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(5));
}
Err(_) => break,
}
}
});
thread::sleep(Duration::from_millis(30));
port
}
#[test]
fn new_with_empty_config_uses_defaults() {
let config = json::object! {};
let pgsql = Pgsql::new(&config);
assert!(pgsql.is_ok());
let pgsql = pgsql.unwrap();
assert!(!pgsql.config.debug);
assert_eq!(pgsql.config.hostname, "localhost");
assert_eq!(pgsql.config.hostport, 5432);
assert_eq!(pgsql.config.username, "postgres");
assert_eq!(pgsql.config.userpass, "111111");
assert_eq!(pgsql.config.database, "postgres");
assert_eq!(pgsql.config.charset, "UTF8");
assert_eq!(pgsql.config.pool_max, 5);
}
#[test]
fn new_with_full_config_uses_provided_values() {
let config = json::object! {
debug: true,
hostname: "db.example.com",
hostport: 15432,
username: "admin",
userpass: "secret",
database: "app_db",
charset: "utf8",
pool_max: 30,
};
let pgsql = Pgsql::new(&config);
assert!(pgsql.is_ok());
let pgsql = pgsql.unwrap();
assert!(pgsql.config.debug);
assert_eq!(pgsql.config.hostname, "db.example.com");
assert_eq!(pgsql.config.hostport, 15432);
assert_eq!(pgsql.config.username, "admin");
assert_eq!(pgsql.config.userpass, "secret");
assert_eq!(pgsql.config.database, "app_db");
assert_eq!(pgsql.config.charset, "utf8");
assert_eq!(pgsql.config.pool_max, 30);
}
#[test]
fn new_with_partial_config_mixes_values_and_defaults() {
let config = json::object! {
hostname: "127.0.0.1",
username: "reader",
};
let pgsql = Pgsql::new(&config);
assert!(pgsql.is_ok());
let pgsql = pgsql.unwrap();
assert!(!pgsql.config.debug);
assert_eq!(pgsql.config.hostname, "127.0.0.1");
assert_eq!(pgsql.config.hostport, 5432);
assert_eq!(pgsql.config.username, "reader");
assert_eq!(pgsql.config.userpass, "111111");
assert_eq!(pgsql.config.database, "postgres");
assert_eq!(pgsql.config.charset, "UTF8");
assert_eq!(pgsql.config.pool_max, 5);
}
#[test]
fn pgsql_connect_success() {
let port = spawn_cleartext_server();
let config = json::object! {
hostname: "127.0.0.1",
hostport: port as u32,
username: "u",
userpass: "p",
database: "d",
};
let mut pgsql = Pgsql::new(&config).unwrap();
let conn = pgsql.connect();
assert!(conn.is_ok());
}
#[test]
fn pgsql_connect_failure() {
let config = json::object! {
hostname: "127.0.0.1",
hostport: 1,
username: "u",
userpass: "p",
database: "d",
};
let mut pgsql = Pgsql::new(&config).unwrap();
let conn = pgsql.connect();
assert!(conn.is_err());
}
#[test]
fn pgsql_pools_success() {
let port = spawn_cleartext_server();
let config = json::object! {
hostname: "127.0.0.1",
hostport: port as u32,
username: "u",
userpass: "p",
database: "d",
pool_max: 2,
};
let mut pgsql = Pgsql::new(&config).unwrap();
let pools = pgsql.pools();
assert!(pools.is_ok());
}
#[test]
fn pgsql_pools_failure() {
let config = json::object! {
hostname: "127.0.0.1",
hostport: 1,
username: "u",
userpass: "p",
database: "d",
pool_max: 2,
};
let mut pgsql = Pgsql::new(&config).unwrap();
let pools = pgsql.pools();
assert!(pools.is_ok());
assert_eq!(pools.unwrap().total_connections(), 0);
}
}