rsdbc_postgres/
lib.rs

1mod ssl_mode;
2
3use std::collections::HashMap;
4use std::future::Future;
5use std::pin::Pin;
6use std::time::Duration;
7use futures::future::BoxFuture;
8// use postgres::{Client, NoTls};
9// use postgres::config::SslMode;
10use tokio_postgres::{Client, NoTls};
11use tokio_postgres::config::SslMode;
12use native_tls::{Certificate, TlsConnector};
13use postgres_native_tls::MakeTlsConnector;
14use std::fs;
15
16use tracing_subscriber::fmt::time;
17use url::Url;
18use rsdbc_core::connection::{Batch, Connection, ConnectionFactory, ConnectionFactoryMetadata, ConnectionFactoryOptions, ConnectionFactoryProvider, ConnectionMetadata, IsolationLevel, Statement, ValidationDepth};
19use rsdbc_core::error::RsdbcErrors;
20use rsdbc_core::{OptionValue, Result, TransactionDefinition};
21
22// TODO: should this take raw string?
23pub struct  PostgresqlConnectionConfiguration {
24    // application_name = "rsdbc-postgresql"
25    pub application_name: String,
26    pub auto_detect_extensions: bool, // true
27    pub compatibility_mode: bool,
28    pub connection_timeout: Duration,
29    pub database: String,
30    pub fetch_size: u64,
31    pub force_binary: bool,
32    pub host: String,
33    pub options: HashMap<String, String>,
34    pub password: String,
35    pub port: u32, // default port
36    pub prepared_statement_cache_queries: i32,
37    pub schema: String,
38    pub socket: String,
39    pub ssl_cert: Option<Url>,
40    // pub hostname_verifier: HostnameVerifier,
41    pub ssl_key: Option<Url>,
42    pub ssl_mode: ssl_mode::SslMode, // TODO: expose own so that we can change internals if we need to
43    pub ssl_password: String,
44    pub ssl_root_cert: Option<Url>,
45    pub statement_timeout: Duration,
46    pub tcp_keep_alive: bool,
47    pub tcp_no_delay: bool, // true
48    pub username: String,
49}
50
51// example of builder see
52// https://github.com/sfackler/rust-native-tls/blob/41522daa6f6e76182c3118a7f9c23f6949e6d59f/src/lib.rs
53impl PostgresqlConnectionConfiguration {
54
55    fn new() -> Self {
56        Self {
57            application_name: "rsdbc-postgresql".to_string(), // TODO: set default somewhere else
58            auto_detect_extensions: false,
59            compatibility_mode: false,
60            connection_timeout: Default::default(),
61            database: "".to_string(),
62            fetch_size: 0,
63            force_binary: false,
64            host: "".to_string(),
65            options: Default::default(),
66            password: "".to_string(),
67            port: 5432,
68            prepared_statement_cache_queries: 0,
69            schema: "".to_string(),
70            socket: "".to_string(),
71            ssl_cert: None,
72            ssl_key: None,
73            ssl_mode: ssl_mode::SslMode::Disable,
74            ssl_password: "".to_string(),
75            ssl_root_cert: None,
76            statement_timeout: Default::default(),
77            tcp_keep_alive: false,
78            tcp_no_delay: false,
79            username: "".to_string()
80        }
81    }
82
83    pub fn application_name(&mut self, name: String) -> &mut Self {
84        self.application_name = name;
85        self
86    }
87
88    // TODO: this might not be necessary....was for java serviceloader
89    pub fn auto_detect_extensions(&mut self, auto_detect: bool) -> &mut Self {
90        self.auto_detect_extensions = auto_detect;
91        self
92    }
93
94    pub fn compatibility_mode(&mut self, compatibility_mode: bool) -> &mut Self {
95        self.compatibility_mode = compatibility_mode;
96        self
97    }
98
99    pub fn connect_timeout(&mut self, timeout: Duration) -> &mut Self {
100        self.connection_timeout = timeout;
101        self
102    }
103
104    // TODO: probably don't need
105    // pub fn codec_registrar(&mut self, registrar: CodecRegistrar) -> &mut Self {
106    //     self.connection_timeout = timeout;
107    //     self
108    // }
109
110    pub fn database(&mut self, database: String) -> &mut Self {
111        self.database = database;
112        self
113    }
114
115    // TODO: check what ssl modes postgres takes.
116    pub fn enable_ssl(&mut self) -> &mut Self {
117        self.ssl_mode = ssl_mode::SslMode::Require; // VERIFY_FULL
118        self
119    }
120
121    // TODO: probably dont need
122    // pub fn extend_with(&mut self, extension: Extension) -> &mut Self {
123    //     self.extensions.push(extension);
124    //     self
125    // }
126
127    // TODO: probably don't need
128    // pub fn error_response_log_level(&mut self, log_level: LogLevel) -> &mut Self {
129    //     self.error_response_log_level = log_level
130    //     self
131    // }
132
133    pub fn fetch_size(&mut self, fetch_size: u64) -> &mut Self {
134        self.fetch_size = fetch_size;
135        self
136    }
137
138    // TODO: add fn for fetch_size to take in a function?
139
140    pub fn force_binary(&mut self, force_binary: bool) -> &mut Self {
141        self.force_binary = force_binary;
142        self
143    }
144
145    pub fn host(&mut self, host: String) -> &mut Self {
146        self.host = host;
147        self
148    }
149
150    // TODO: probably don't need this
151    // pub fn notice_log_level(&mut self, log_level: LogLevel) -> &mut Self {
152    //     self.notice_log_level = log_level;
153    //     self
154    // }
155
156    /// Configure connection initialization parameters.
157    ///
158    /// These parameters are applied once after creating a new connection.
159    /// This is useful for setting up client-specific
160    /// <a href="https://www.postgresql.org/docs/current/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-FORMAT">runtime parameters</a>
161    /// like statement timeouts, time zones etc.
162    pub fn options(&mut self, options: HashMap<String, String>) -> &mut Self {
163        self.options = options;
164        self
165    }
166
167    pub fn password(&mut self, password: String) -> &mut Self {
168        self.password = password;
169        self
170    }
171
172    // TODO: should default to 5432
173    pub fn port(&mut self, port: u32) -> &mut Self {
174        self.port = port;
175        self
176    }
177
178    /// Configure the preparedStatementCacheQueries. The default is {@code -1}, meaning there's no limit.
179    /// The value of {@code 0} disables the cache. Any other value specifies the cache size.
180    pub fn prepared_statement_cache_queries(&mut self, prepared_statement_cache_queries: i32) -> &mut Self {
181        self.prepared_statement_cache_queries = prepared_statement_cache_queries;
182        self
183    }
184
185    pub fn schema(&mut self, schema: String) -> &mut Self {
186        self.schema = schema;
187        self
188    }
189
190    pub fn socket(&mut self, socket: String) -> &mut Self {
191        self.socket = socket;
192        self.ssl_mode = ssl_mode::SslMode::Disable;
193        self
194    }
195
196    // TODO: how to handle this failing?
197    // Might have to actually use a builder and return Result on call to `build`
198    // /// Configure ssl cert for client certificate authentication.
199    // /// Can point to either a resource or a file.
200    // /// sslCert an X.509 certificate chain file in PEM format
201    // pub fn ssl(&mut self, ssl_cert_path: String) -> &mut Self {
202    //     self.ssl_url(Url::parse(ssl_cert_path.as_str()))
203    // }
204
205    /// Configure ssl cert for client certificate authentication.
206    ///
207    /// sslCert an X.509 certificate chain file in PEM format
208    pub fn ssl_url(&mut self, ssl_cert: Url) -> &mut Self {
209        self.ssl_key = Some(ssl_cert);
210        self
211    }
212
213    // TODO: how to do ssl hostname verifier/verification
214
215
216    // Configure ssl key for client certificate authentication.
217    // Can point to either a resource or a file.
218    // pub fn sslkey(&mut self, sslkey: String) -> &mut Self {
219    //     self.ssl_key(Url::parse(sslkey.as_str()))
220    // }
221
222    /// Configure ssl key for client certificate authentication.
223    ///
224    /// sslKey a PKCS#8 private key file in PEM format
225    pub fn sslkey_url(&mut self, sslkey: Url) -> &mut Self {
226        self.ssl_key = Some(sslkey);
227        self
228    }
229
230    pub fn ssl_mode(&mut self, ssl_mode: ssl_mode::SslMode) -> &mut Self {
231        self.ssl_mode = ssl_mode;
232        self
233    }
234
235    pub fn ssl_password(&mut self, ssl_password: String) -> &mut Self {
236        self.ssl_password = ssl_password;
237        self
238    }
239
240    // Configure ssl root cert for server certificate validation.
241    // Can point to either a resource or a file.
242    // pub fn ssl_root_cert(&mut self, ssl_root_cert: String) -> &mut Self {
243    //     self.ssl_root_cert_url(Url::parse(ssl_root_cert.as_str()))
244    // }
245
246    /// Configure ssl root cert for server certificate validation.
247    ///
248    /// sslRootCert an X.509 certificate chain file in PEM format
249    pub fn ssl_root_cert_url(&mut self, ssl_root_cert: Url) -> &mut Self {
250        self.ssl_root_cert = Some(ssl_root_cert);
251        self
252    }
253
254    pub fn tcp_keep_alive(&mut self, enabled: bool) -> &mut Self {
255        self.tcp_keep_alive = enabled;
256        self
257    }
258
259    pub fn tcp_no_delay(&mut self, enabled: bool) -> &mut Self {
260        self.tcp_no_delay = enabled;
261        self
262    }
263
264    pub fn username(&mut self, username: String) -> &mut Self {
265        self.username = username;
266        self
267    }
268}
269
270pub struct PostgresqlConnectionFactory {
271    pub configuration: PostgresqlConnectionConfiguration
272}
273
274pub struct PostgresqlConnection {
275    // TODO: does this need to hold ref to configuration?
276    client: Client,
277    // conn: tokio_postgres::Connection<S, T>,
278}
279
280impl Connection for PostgresqlConnection {
281
282    // TODO: provide options to build transaction
283    // beginTransaction(TransactionDefinition definition)
284    fn begin_transaction(&mut self) -> Result<()> {
285        // self.client.transaction()
286        // self.client.build_transaction()
287        todo!()
288    }
289
290    fn close(&mut self) -> Result<()> {
291        // self.client.close()
292        todo!()
293    }
294
295    fn commit_transaction(&mut self) {
296        // self.client.
297        todo!()
298    }
299
300    fn create_batch(&mut self) -> Result<Box<dyn Batch>> {
301        todo!()
302    }
303
304    fn create_savepoint(&mut self, name: &str) {
305        todo!()
306    }
307
308    fn create_statement(&mut self, sql: &str) -> Result<Box<dyn Statement<'_> + '_>> {
309        todo!()
310    }
311
312    // TODO: not seeing how to do this...needs more research
313    fn is_auto_commit(&mut self) -> bool {
314        todo!()
315    }
316
317    fn metadata(&mut self) -> Result<Box<dyn ConnectionMetadata>> {
318        // self.client.
319        todo!()
320    }
321
322    fn transaction_isolation_level(&mut self) -> IsolationLevel {
323        todo!()
324    }
325
326    fn release_savepoint(&mut self, name: &str) {
327        todo!()
328    }
329
330    fn rollback_transaction(&mut self) {
331        todo!()
332    }
333
334    fn rollback_transaction_to_savepoint(&mut self, name: String) {
335        todo!()
336    }
337
338    fn auto_commit(&mut self, commit: bool) {
339        todo!()
340    }
341
342    fn set_transaction_isolation_level(&mut self, isolation_level: IsolationLevel) {
343        todo!()
344    }
345
346    fn validate(&mut self, depth: ValidationDepth) -> bool {
347        if self.client.is_closed() {
348            return false;
349        }
350
351        // TODO: where to get duration from?
352        // self.client.is_valid(Duration::from_secs(60)).is_ok()
353
354        // "" vs "SELECT 1"
355
356        let query = self.client.simple_query("SELECT 1");
357        // tokio::time::timeout(Duration::from_secs(60), query)
358
359        // let inner_client = &self.client;
360        // self.connection.block_on(async {
361        //     let trivial_query = inner_client.simple_query("");
362        //     tokio::time::timeout(timeout, trivial_query)
363        //         .await
364        //         .map_err(|_| Error::__private_api_timeout())?
365        //         .map(|_| ())
366        // })
367
368
369        return true;
370    }
371}
372
373impl ConnectionFactory for PostgresqlConnectionFactory {
374    fn connect(&self) -> Pin<Box<dyn Future<Output = Result<Box<(dyn Connection + 'static)>>> + Send>> {
375
376        // let tls = if self.configuration.ssl_mode == ssl_mode::SslMode::Disable {
377        //     NoTls
378        // } else {
379        //     // let cert = fs::read("database_cert.pem")?;
380        //     // let cert = Certificate::from_pem(&cert)?;
381        //     // let connector = TlsConnector::builder()
382        //     //     .add_root_certificate(cert)
383        //     //     .build()?;
384        //     // let connector = MakeTlsConnector::new(connector);
385        // };
386
387        // let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
388
389        // let (client, connection) =
390        //     tokio_postgres::connect("host=localhost user=postgres", NoTls).await.unwrap();
391
392        // The connection object performs the actual communication with the database,
393        // so spawn it off to run on its own.
394        // tokio::spawn(async move {
395        //     if let Err(e) = connection.await {
396        //         eprintln!("connection error: {}", e);
397        //     }
398        // });
399
400        // Client::configure().connect()
401
402        todo!()
403    }
404    // fn connect(&self) -> BoxFuture<'_, Result<Box<PostgresqlConnection>>> {
405    //     todo!()
406    // }
407
408
409    // TODO: change to Postgres Connection factory metadata?
410    fn get_metadata(&self) -> Box<dyn ConnectionFactoryMetadata> {
411        todo!()
412    }
413}
414
415impl ConnectionFactoryProvider for PostgresqlConnectionFactory {
416    type C = PostgresqlConnectionFactory;
417
418    fn create(connection_factory_options: ConnectionFactoryOptions) -> Result<Self::C> {
419        let configuration = PostgresqlConnectionConfiguration::new();
420        Ok(PostgresqlConnectionFactory {
421            configuration
422        })
423    }
424}
425
426fn to_rsdbc_err(e: postgres::error::Error) -> rsdbc_core::error::RsdbcErrors {
427    rsdbc_core::error::RsdbcErrors::General(format!("{:?}", e))
428}
429
430
431// pub trait PostgresTransactionDefinition: TransactionDefinition {
432//
433//     fn deferrable(&mut self) -> &mut Self;
434//
435//     fn non_deferrable(&mut self) -> &mut Self;
436//
437//     fn isolation_level(&mut self, isolation_level: IsolationLevel) -> &mut Self {
438//         todo!()
439//     }
440//
441//     fn read_only(&mut self) -> &mut Self {
442//         todo!()
443//     }
444//
445//     fn read_write(&mut self) -> &mut Self {
446//         todo!()
447//     }
448// }
449
450pub struct PostgresTransactionDefinition {
451    pub options: HashMap<String, OptionValue>,
452}
453
454impl PostgresTransactionDefinition {
455
456    fn deferrable(&mut self) -> &mut Self {
457        self.options.insert("deferrable".to_string(), OptionValue::Bool(true));
458        self
459    }
460
461    fn non_deferrable(&mut self) -> &mut Self {
462        self.options.insert("deferrable".to_string(), OptionValue::Bool(false));
463        self
464    }
465
466    fn isolation_level(&mut self, isolation_level: IsolationLevel) -> &mut Self {
467        self.options.insert("isolationLevel".to_string(), OptionValue::String(isolation_level.as_sql().to_string()));
468        self
469    }
470
471    fn read_only(&mut self) -> &mut Self {
472        self.options.insert("readOnly".to_string(), OptionValue::Bool(true));
473        self
474    }
475
476    fn read_write(&mut self) -> &mut Self {
477        self.options.insert("readOnly".to_string(), OptionValue::Bool(false));
478        self
479    }
480
481}
482
483impl TransactionDefinition for PostgresTransactionDefinition {
484    fn get_attribute(&self, attribute: &str) -> OptionValue {
485        todo!()
486    }
487}
488
489// pub struct SimpleTransactionDefinition {
490//     // fn get_attribute<V>(&self, attribute: &str) -> Option<V>;
491//     pub options: HashMap<String, String>,
492// }
493//
494// impl TransactionDefinition for SimpleTransactionDefinition {
495//     fn get_attribute<V>(&self, attribute: &str) -> Option<V> {
496//         todo!()
497//     }
498// }
499//
500// impl PostgresTransactionDefinition for SimpleTransactionDefinition {
501//     fn deferrable(&mut self) -> &mut Self {
502//         todo!()
503//     }
504//
505//     fn non_deferrable(&mut self) -> &mut Self {
506//         todo!()
507//     }
508//
509// }
510
511
512#[cfg(test)]
513mod tests {
514    #[test]
515    fn it_works() {
516        assert_eq!(2 + 2, 4);
517    }
518}