1use std::collections::HashMap;
4use std::ffi::{c_void, CStr};
5use std::sync::Arc;
6use std::time::Instant;
7
8use log::debug;
9
10use libpq::Connection;
11use libpq_sys::ExecStatusType::{PGRES_COMMAND_OK, PGRES_TUPLES_OK};
12use libpq_sys::{
13 PGContextVisibility, PQclear, PQconsumeInput, PQfname, PQgetResult, PQgetvalue, PQlibVersion,
14 PQnfields, PQntuples, PQresultStatus, PQresultVerboseErrorMessage, PQsendQuery,
15 PQsetErrorVerbosity, PQsetNoticeReceiver,
16};
17
18use crate::notices::{notice_receiver, Notice, NoticeStorage, Verbosity};
19use crate::value::Value;
20
21pub struct PgwireLite {
26 hostname: String,
27 port: u16,
28 use_tls: bool,
29 verbosity: Verbosity,
30 notices: NoticeStorage,
31}
32
33#[derive(Debug)]
38pub struct QueryResult {
39 pub rows: Vec<HashMap<String, Value>>,
41
42 pub column_names: Vec<String>,
44
45 pub notices: Vec<Notice>,
47
48 pub row_count: i32,
50
51 pub col_count: i32,
53
54 pub notice_count: usize,
56
57 pub status: libpq_sys::ExecStatusType,
59
60 pub elapsed_time_ms: u64,
62}
63
64fn clear_pg_result(result: *mut libpq_sys::PGresult) {
66 if !result.is_null() {
67 unsafe {
68 debug!("Clearing PGresult at {:p}", result);
69 PQclear(result);
70 debug!("PGresult cleared successfully");
71 }
72 }
73}
74
75impl PgwireLite {
76 pub fn new(
98 hostname: &str,
99 port: u16,
100 use_tls: bool,
101 verbosity: &str,
102 ) -> Result<Self, Box<dyn std::error::Error>> {
103 let verbosity_val = match verbosity.to_lowercase().as_str() {
104 "default" => Verbosity::Default,
105 "verbose" => Verbosity::Verbose,
106 "terse" => Verbosity::Terse,
107 "sqlstate" => Verbosity::Sqlstate,
108 "" => Verbosity::Default,
109 _ => Verbosity::Default,
110 };
111
112 match verbosity_val {
114 Verbosity::Terse => log::set_max_level(log::LevelFilter::Warn),
115 Verbosity::Default => log::set_max_level(log::LevelFilter::Info),
116 Verbosity::Verbose => log::set_max_level(log::LevelFilter::Debug),
117 Verbosity::Sqlstate => log::set_max_level(log::LevelFilter::Debug),
118 }
119
120 let notices = Arc::new(std::sync::Mutex::new(Vec::new()));
121
122 Ok(PgwireLite {
123 hostname: hostname.to_string(),
124 port,
125 use_tls,
126 verbosity: verbosity_val,
127 notices,
128 })
129 }
130
131 pub fn libpq_version(&self) -> String {
137 let version = unsafe { PQlibVersion() };
138 let major = version / 10000;
139 let minor = (version / 100) % 100;
140 let patch = version % 100;
141 format!("{}.{}.{}", major, minor, patch)
142 }
143
144 pub fn verbosity(&self) -> String {
150 format!("{:?}", self.verbosity)
151 }
152
153 fn consume_pending_results(conn: &Connection) {
155 debug!("Consuming pending results");
156 unsafe {
157 PQconsumeInput(conn.into());
159
160 loop {
162 let result = PQgetResult(conn.into());
163 if result.is_null() {
164 break;
165 }
166 clear_pg_result(result);
167 }
168 }
169 }
170
171 pub fn query(&self, query: &str) -> Result<QueryResult, Box<dyn std::error::Error>> {
199 debug!("Clearing previous notices");
201 if let Ok(mut notices) = self.notices.lock() {
202 notices.clear();
203 }
204
205 let start_time = Instant::now();
206
207 let conn_str = format!(
209 "host={} port={} sslmode={} application_name=pgwire-lite-client connect_timeout=10 client_encoding=UTF8",
210 self.hostname,
211 self.port,
212 if self.use_tls { "verify-full" } else { "disable" }
213 );
214 debug!("Establishing connection using: {}", conn_str);
215
216 let conn = Connection::new(&conn_str)?;
218
219 unsafe {
221 let ssl_in_use = libpq_sys::PQsslInUse((&conn).into()) != 0;
222 let host_ptr = libpq_sys::PQhost((&conn).into());
223 let port_ptr = libpq_sys::PQport((&conn).into());
224 if !host_ptr.is_null() && !port_ptr.is_null() {
225 let host = CStr::from_ptr(host_ptr).to_string_lossy();
226 let port = CStr::from_ptr(port_ptr).to_string_lossy();
227 debug!("Connected to: {}:{} (ssl: {})", host, port, ssl_in_use);
228 }
229
230 let status = libpq_sys::PQstatus((&conn).into());
232 debug!("Connection status: {:?}", status);
233
234 let tx_status = libpq_sys::PQtransactionStatus((&conn).into());
236 debug!("Transaction status: {:?}", tx_status);
237
238 let server_version = libpq_sys::PQserverVersion((&conn).into());
240 let major = server_version / 10000;
241 let minor = (server_version / 100) % 100;
242 let revision = server_version % 100;
243 debug!(
244 "Server version: {}.{}.{} ({})",
245 major, minor, revision, server_version
246 );
247 }
248
249 debug!("Setting error verbosity to: {:?}", self.verbosity);
251 unsafe {
252 PQsetErrorVerbosity((&conn).into(), self.verbosity.into());
253 }
254
255 debug!("Setting up notice receiver");
257 let notices_ptr = Arc::into_raw(self.notices.clone()) as *mut c_void;
258 unsafe {
259 PQsetNoticeReceiver((&conn).into(), Some(notice_receiver), notices_ptr);
260 }
261
262 let query = if query.ends_with(';') {
264 query.to_string()
265 } else {
266 format!("{};", query)
267 };
268
269 debug!("Sending query: {}", query);
271 let send_success = unsafe { PQsendQuery((&conn).into(), query.as_ptr() as *const i8) };
272 if send_success == 0 {
273 return Err(
275 format!("Error: {}", conn.error_message().unwrap_or("Unknown error")).into(),
276 );
277 }
278
279 debug!("Processing the result");
281 let result = unsafe { PQgetResult((&conn).into()) };
282
283 if result.is_null() {
284 return Err("No result returned".into());
285 }
286
287 let status = unsafe { PQresultStatus(result) };
288
289 if status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK {
290 let error_msg_ptr = unsafe {
292 PQresultVerboseErrorMessage(
293 result,
294 self.verbosity.into(),
295 PGContextVisibility::PQSHOW_CONTEXT_ALWAYS,
296 )
297 };
298
299 let error_msg = if !error_msg_ptr.is_null() {
300 let msg = unsafe { CStr::from_ptr(error_msg_ptr).to_string_lossy().into_owned() };
302 unsafe { libpq_sys::PQfreemem(error_msg_ptr as *mut _) };
304 msg
305 } else {
306 conn.error_message().unwrap_or("Unknown error").to_string()
308 };
309
310 clear_pg_result(result);
311
312 Self::consume_pending_results(&conn);
314
315 return Err(error_msg.trim_end().to_string().into());
317 }
318
319 debug!("Getting column count");
321 let col_count = unsafe { PQnfields(result) };
322
323 debug!("Getting column names");
325 let mut column_names = Vec::with_capacity(col_count as usize);
326 for col_index in 0..col_count {
327 let col_name_ptr = unsafe { PQfname(result, col_index) };
328 if !col_name_ptr.is_null() {
329 let col_name =
330 unsafe { CStr::from_ptr(col_name_ptr).to_string_lossy().into_owned() };
331 column_names.push(col_name);
332 } else {
333 column_names.push(String::from("(unknown)"));
334 }
335 }
336
337 debug!("Getting row count");
339 let row_count = if status == PGRES_TUPLES_OK {
340 unsafe { PQntuples(result) }
341 } else {
342 0
343 };
344
345 let mut rows = Vec::new();
347
348 if status == PGRES_TUPLES_OK {
350 debug!("Processing rows");
351
352 for row_index in 0..row_count {
354 let mut row_data = HashMap::new();
355
356 for col_index in 0..col_count {
358 let value_ptr = unsafe { PQgetvalue(result, row_index, col_index) };
359 let value = if !value_ptr.is_null() {
360 let string_value =
361 unsafe { CStr::from_ptr(value_ptr).to_string_lossy().into_owned() };
362 Value::String(string_value)
363 } else {
364 Value::Null
365 };
366
367 row_data.insert(column_names[col_index as usize].clone(), value);
369 }
370
371 rows.push(row_data);
372 }
373 }
374 debug!("Rows processed: {}", rows.len());
375
376 clear_pg_result(result);
377
378 Self::consume_pending_results(&conn);
380
381 debug!("Collecting notices");
383 let notices = if let Ok(mut lock) = self.notices.lock() {
384 lock.drain(..).collect()
385 } else {
386 Vec::new()
387 };
388 let notice_count = notices.len();
389
390 let elapsed_time_ms = start_time.elapsed().as_millis() as u64;
391
392 drop(conn);
393
394 Ok(QueryResult {
395 rows,
396 column_names,
397 notices,
398 row_count,
399 col_count,
400 notice_count,
401 status,
402 elapsed_time_ms,
403 })
404 }
405}