json_register/
lib.rs

1//! # JSON Register
2//!
3//! `json-register` is a library for registering JSON objects into a PostgreSQL database
4//! with canonicalisation and caching. It ensures that semantically equivalent JSON objects
5//! are stored only once and assigned a unique identifier.
6//!
7//! This library provides both a Rust API and Python bindings.
8
9#[cfg(feature = "python")]
10use pyo3::prelude::*;
11#[cfg(feature = "python")]
12use pyo3::types::PyList;
13#[cfg(feature = "python")]
14use tokio::runtime::Runtime;
15
16use serde_json::Value;
17use std::sync::atomic::{AtomicU64, Ordering};
18
19mod cache;
20mod canonicalise;
21mod db;
22mod errors;
23
24pub use cache::Cache;
25pub use canonicalise::canonicalise;
26pub use db::Db;
27pub use errors::JsonRegisterError;
28
29/// Builds a PostgreSQL connection string from its components.
30///
31/// # Arguments
32///
33/// * `user` - Database user name
34/// * `password` - Database password
35/// * `host` - Database host (e.g., "localhost")
36/// * `port` - Database port (e.g., 5432)
37/// * `database` - Database name
38///
39/// # Returns
40///
41/// A formatted PostgreSQL connection string
42pub fn build_connection_string(
43    user: &str,
44    password: &str,
45    host: &str,
46    port: u16,
47    database: &str,
48) -> String {
49    format!(
50        "postgres://{}:{}@{}:{}/{}",
51        user, password, host, port, database
52    )
53}
54
55/// Sanitizes a connection string by replacing the password with asterisks.
56///
57/// This prevents passwords from leaking in error messages, logs, or stack traces.
58///
59/// # Arguments
60///
61/// * `connection_string` - The connection string to sanitize
62///
63/// # Returns
64///
65/// A sanitized connection string with the password replaced by "****"
66///
67/// # Example
68///
69/// ```
70/// use json_register::sanitize_connection_string;
71/// let sanitized = sanitize_connection_string("postgres://user:secret@localhost:5432/db");
72/// assert_eq!(sanitized, "postgres://user:****@localhost:5432/db");
73/// ```
74pub fn sanitize_connection_string(connection_string: &str) -> String {
75    // Handle postgres:// or postgresql:// schemes
76    if let Some(scheme_end) = connection_string.find("://") {
77        let scheme = &connection_string[..scheme_end + 3];
78        let rest = &connection_string[scheme_end + 3..];
79
80        // Find the LAST @ symbol before any / (to handle @ in passwords)
81        // The @ separates user:password from host:port/db
82        let at_idx = if let Some(slash_idx) = rest.find('/') {
83            // Find last @ before the slash
84            rest[..slash_idx].rfind('@')
85        } else {
86            // No slash, find last @ in entire string
87            rest.rfind('@')
88        };
89
90        if let Some(at_idx) = at_idx {
91            let user_pass = &rest[..at_idx];
92            let host_db = &rest[at_idx..];
93
94            // Find FIRST : separator between user and password
95            // (username shouldn't have :, but password might)
96            if let Some(colon_idx) = user_pass.find(':') {
97                let user = &user_pass[..colon_idx];
98                return format!("{}{}:****{}", scheme, user, host_db);
99            }
100        }
101    }
102
103    // If parsing fails, return as-is (no password to hide)
104    connection_string.to_string()
105}
106
107/// The main registry structure that coordinates database interactions and caching.
108///
109/// This struct maintains a connection pool to the PostgreSQL database and an
110/// in-memory LRU cache to speed up lookups of frequently accessed JSON objects.
111pub struct Register {
112    db: Db,
113    cache: Cache,
114    register_single_calls: AtomicU64,
115    register_batch_calls: AtomicU64,
116    total_objects_registered: AtomicU64,
117}
118
119impl Register {
120    /// Creates a new `Register` instance.
121    ///
122    /// # Arguments
123    ///
124    /// * `connection_string` - The PostgreSQL connection string.
125    /// * `table_name` - The name of the table where JSON objects are stored.
126    /// * `id_column` - The name of the column storing the unique ID.
127    /// * `jsonb_column` - The name of the column storing the JSONB data.
128    /// * `pool_size` - The maximum number of connections in the database pool.
129    /// * `lru_cache_size` - The capacity of the in-memory LRU cache.
130    /// * `acquire_timeout_secs` - Optional timeout for acquiring connections (default: 5s).
131    /// * `idle_timeout_secs` - Optional timeout for idle connections (default: 600s).
132    /// * `max_lifetime_secs` - Optional maximum lifetime for connections (default: 1800s).
133    /// * `use_tls` - Optional flag to enable TLS (default: false for backwards compatibility).
134    ///
135    /// # Returns
136    ///
137    /// A `Result` containing the new `Register` instance or a `JsonRegisterError`.
138    #[allow(clippy::too_many_arguments)]
139    pub async fn new(
140        connection_string: &str,
141        table_name: &str,
142        id_column: &str,
143        jsonb_column: &str,
144        pool_size: u32,
145        lru_cache_size: usize,
146        acquire_timeout_secs: Option<u64>,
147        idle_timeout_secs: Option<u64>,
148        max_lifetime_secs: Option<u64>,
149        use_tls: Option<bool>,
150    ) -> Result<Self, JsonRegisterError> {
151        let db = Db::new(
152            connection_string,
153            table_name,
154            id_column,
155            jsonb_column,
156            pool_size,
157            acquire_timeout_secs,
158            idle_timeout_secs,
159            max_lifetime_secs,
160            use_tls,
161        )
162        .await?;
163        let cache = Cache::new(lru_cache_size);
164        Ok(Self {
165            db,
166            cache,
167            register_single_calls: AtomicU64::new(0),
168            register_batch_calls: AtomicU64::new(0),
169            total_objects_registered: AtomicU64::new(0),
170        })
171    }
172
173    /// Registers a single JSON object.
174    ///
175    /// This method canonicalises the input JSON, checks the cache, and if necessary,
176    /// inserts the object into the database. It returns the unique ID associated
177    /// with the JSON object.
178    ///
179    /// # Arguments
180    ///
181    /// * `value` - The JSON value to register.
182    ///
183    /// # Returns
184    ///
185    /// A `Result` containing the unique ID (i32) or a `JsonRegisterError`.
186    pub async fn register_object(&self, value: &Value) -> Result<i32, JsonRegisterError> {
187        self.register_single_calls.fetch_add(1, Ordering::Relaxed);
188        self.total_objects_registered
189            .fetch_add(1, Ordering::Relaxed);
190
191        let canonical = canonicalise(value).map_err(JsonRegisterError::SerdeError)?;
192
193        if let Some(id) = self.cache.get(&canonical) {
194            return Ok(id);
195        }
196
197        let id = self
198            .db
199            .register_object(value)
200            .await
201            .map_err(JsonRegisterError::DbError)?;
202
203        self.cache.put(canonical, id);
204
205        Ok(id)
206    }
207
208    /// Registers a batch of JSON objects.
209    ///
210    /// This method processes multiple JSON objects efficiently. It first checks the
211    /// cache for all items. If any are missing, it performs a batch insert/select
212    /// operation in the database. The order of the returned IDs corresponds to the
213    /// order of the input values.
214    ///
215    /// # Arguments
216    ///
217    /// * `values` - A slice of JSON values to register.
218    ///
219    /// # Returns
220    ///
221    /// A `Result` containing a vector of unique IDs or a `JsonRegisterError`.
222    pub async fn register_batch_objects(
223        &self,
224        values: &[Value],
225    ) -> Result<Vec<i32>, JsonRegisterError> {
226        self.register_batch_calls.fetch_add(1, Ordering::Relaxed);
227        self.total_objects_registered
228            .fetch_add(values.len() as u64, Ordering::Relaxed);
229
230        let mut canonicals = Vec::with_capacity(values.len());
231        for value in values {
232            canonicals.push(canonicalise(value).map_err(JsonRegisterError::SerdeError)?);
233        }
234
235        // Check cache for existing entries
236        let mut all_cached = true;
237        let mut cached_ids = Vec::with_capacity(values.len());
238        for canonical in &canonicals {
239            if let Some(id) = self.cache.get(canonical) {
240                cached_ids.push(id);
241            } else {
242                all_cached = false;
243                break;
244            }
245        }
246
247        if all_cached {
248            return Ok(cached_ids);
249        }
250
251        // If not all items are in the cache, query the database
252        let ids = self
253            .db
254            .register_batch_objects(values)
255            .await
256            .map_err(JsonRegisterError::DbError)?;
257
258        // Update the cache with the newly retrieved IDs
259        for (canonical, id) in canonicals.into_iter().zip(ids.iter()) {
260            self.cache.put(canonical, *id);
261        }
262
263        Ok(ids)
264    }
265
266    /// Returns the current size of the connection pool.
267    ///
268    /// This is the total number of connections (both idle and active) currently
269    /// in the pool. Useful for monitoring pool utilization.
270    ///
271    /// # Returns
272    ///
273    /// The number of connections in the pool.
274    pub fn pool_size(&self) -> usize {
275        self.db.pool_size()
276    }
277
278    /// Returns the number of idle connections in the pool.
279    ///
280    /// Idle connections are available for immediate use. A low idle count
281    /// during high load may indicate the pool is undersized.
282    ///
283    /// # Returns
284    ///
285    /// The number of idle connections.
286    pub fn idle_connections(&self) -> usize {
287        self.db.idle_connections()
288    }
289
290    /// Checks if the connection pool is closed.
291    ///
292    /// A closed pool cannot create new connections and will error on acquire attempts.
293    ///
294    /// # Returns
295    ///
296    /// `true` if the pool is closed, `false` otherwise.
297    pub fn is_closed(&self) -> bool {
298        self.db.is_closed()
299    }
300
301    /// Returns the number of cache hits.
302    ///
303    /// # Returns
304    ///
305    /// The total number of successful cache lookups.
306    pub fn cache_hits(&self) -> u64 {
307        self.cache.hits()
308    }
309
310    /// Returns the number of cache misses.
311    ///
312    /// # Returns
313    ///
314    /// The total number of unsuccessful cache lookups.
315    pub fn cache_misses(&self) -> u64 {
316        self.cache.misses()
317    }
318
319    /// Returns the cache hit rate as a percentage.
320    ///
321    /// # Returns
322    ///
323    /// The hit rate as a float between 0.0 and 100.0.
324    /// Returns 0.0 if no cache operations have occurred.
325    pub fn cache_hit_rate(&self) -> f64 {
326        self.cache.hit_rate()
327    }
328
329    /// Returns the current number of items in the cache.
330    ///
331    /// # Returns
332    ///
333    /// The number of items currently stored in the cache.
334    pub fn cache_size(&self) -> usize {
335        self.cache.size()
336    }
337
338    /// Returns the maximum capacity of the cache.
339    ///
340    /// # Returns
341    ///
342    /// The maximum number of items the cache can hold.
343    pub fn cache_capacity(&self) -> usize {
344        self.cache.capacity()
345    }
346
347    /// Returns the number of cache evictions.
348    ///
349    /// # Returns
350    ///
351    /// The total number of items evicted from the cache.
352    pub fn cache_evictions(&self) -> u64 {
353        self.cache.evictions()
354    }
355
356    /// Returns the number of active database connections.
357    ///
358    /// Active connections are those currently in use (not idle).
359    ///
360    /// # Returns
361    ///
362    /// The number of active connections (pool_size - idle_connections).
363    pub fn active_connections(&self) -> usize {
364        self.pool_size().saturating_sub(self.idle_connections())
365    }
366
367    /// Returns the total number of database queries executed.
368    ///
369    /// # Returns
370    ///
371    /// The total number of queries executed since instance creation.
372    pub fn db_queries_total(&self) -> u64 {
373        self.db.queries_executed()
374    }
375
376    /// Returns the total number of database query errors.
377    ///
378    /// # Returns
379    ///
380    /// The total number of failed queries since instance creation.
381    pub fn db_query_errors(&self) -> u64 {
382        self.db.query_errors()
383    }
384
385    /// Returns the number of times register_object was called.
386    ///
387    /// # Returns
388    ///
389    /// The total number of single object registration calls.
390    pub fn register_single_calls(&self) -> u64 {
391        self.register_single_calls.load(Ordering::Relaxed)
392    }
393
394    /// Returns the number of times register_batch_objects was called.
395    ///
396    /// # Returns
397    ///
398    /// The total number of batch registration calls.
399    pub fn register_batch_calls(&self) -> u64 {
400        self.register_batch_calls.load(Ordering::Relaxed)
401    }
402
403    /// Returns the total number of objects registered.
404    ///
405    /// This counts all objects across both single and batch operations.
406    ///
407    /// # Returns
408    ///
409    /// The total number of objects registered since instance creation.
410    pub fn total_objects_registered(&self) -> u64 {
411        self.total_objects_registered.load(Ordering::Relaxed)
412    }
413
414    /// Returns all telemetry metrics in a single snapshot.
415    ///
416    /// This is useful for OpenTelemetry exporters and monitoring systems
417    /// that need to collect all metrics at once.
418    ///
419    /// # Returns
420    ///
421    /// A `TelemetryMetrics` struct containing all current metric values.
422    pub fn telemetry_metrics(&self) -> TelemetryMetrics {
423        TelemetryMetrics {
424            // Cache metrics
425            cache_hits: self.cache_hits(),
426            cache_misses: self.cache_misses(),
427            cache_hit_rate: self.cache_hit_rate(),
428            cache_size: self.cache_size(),
429            cache_capacity: self.cache_capacity(),
430            cache_evictions: self.cache_evictions(),
431            // Connection pool metrics
432            pool_size: self.pool_size(),
433            idle_connections: self.idle_connections(),
434            active_connections: self.active_connections(),
435            is_closed: self.is_closed(),
436            // Database metrics
437            db_queries_total: self.db_queries_total(),
438            db_query_errors: self.db_query_errors(),
439            // Operation metrics
440            register_single_calls: self.register_single_calls(),
441            register_batch_calls: self.register_batch_calls(),
442            total_objects_registered: self.total_objects_registered(),
443        }
444    }
445}
446
447/// A snapshot of all telemetry metrics.
448///
449/// This struct provides a complete view of the register's performance
450/// and is designed to work well with OpenTelemetry exporters.
451#[derive(Debug, Clone)]
452pub struct TelemetryMetrics {
453    // Cache metrics
454    pub cache_hits: u64,
455    pub cache_misses: u64,
456    pub cache_hit_rate: f64,
457    pub cache_size: usize,
458    pub cache_capacity: usize,
459    pub cache_evictions: u64,
460    // Connection pool metrics
461    pub pool_size: usize,
462    pub idle_connections: usize,
463    pub active_connections: usize,
464    pub is_closed: bool,
465    // Database metrics
466    pub db_queries_total: u64,
467    pub db_query_errors: u64,
468    // Operation metrics
469    pub register_single_calls: u64,
470    pub register_batch_calls: u64,
471    pub total_objects_registered: u64,
472}
473
474#[cfg(feature = "python")]
475#[pyclass(name = "JsonRegister")]
476/// Python wrapper for the `Register` struct.
477struct PyJsonRegister {
478    inner: Register,
479    rt: Runtime,
480}
481
482#[cfg(feature = "python")]
483#[pymethods]
484impl PyJsonRegister {
485    #[new]
486    #[pyo3(signature = (
487        database_name,
488        database_host,
489        database_port,
490        database_user,
491        database_password,
492        lru_cache_size=1000,
493        table_name="json_objects",
494        id_column="id",
495        jsonb_column="json_object",
496        pool_size=10,
497        acquire_timeout_secs=None,
498        idle_timeout_secs=None,
499        max_lifetime_secs=None,
500        use_tls=None
501    ))]
502    #[allow(clippy::too_many_arguments)]
503    /// Initializes a new `JsonRegister` instance from Python.
504    ///
505    /// # Optional Timeout Parameters
506    ///
507    /// * `acquire_timeout_secs` - Timeout for acquiring a connection from pool (default: 5)
508    /// * `idle_timeout_secs` - Timeout for idle connections before closure (default: 600)
509    /// * `max_lifetime_secs` - Maximum lifetime of connections (default: 1800)
510    /// * `use_tls` - Enable TLS for database connections (default: False for backwards compatibility)
511    fn new(
512        database_name: String,
513        database_host: String,
514        database_port: u16,
515        database_user: String,
516        database_password: String,
517        lru_cache_size: usize,
518        table_name: &str,
519        id_column: &str,
520        jsonb_column: &str,
521        pool_size: u32,
522        acquire_timeout_secs: Option<u64>,
523        idle_timeout_secs: Option<u64>,
524        max_lifetime_secs: Option<u64>,
525        use_tls: Option<bool>,
526    ) -> PyResult<Self> {
527        // Validate configuration parameters
528        if database_name.is_empty() {
529            return Err(
530                JsonRegisterError::Configuration("database_name cannot be empty".into()).into(),
531            );
532        }
533
534        if database_host.is_empty() {
535            return Err(
536                JsonRegisterError::Configuration("database_host cannot be empty".into()).into(),
537            );
538        }
539
540        if database_port == 0 {
541            return Err(JsonRegisterError::Configuration(
542                "database_port must be between 1 and 65535".into(),
543            )
544            .into());
545        }
546
547        if pool_size == 0 {
548            return Err(JsonRegisterError::Configuration(
549                "pool_size must be greater than 0".into(),
550            )
551            .into());
552        }
553
554        if pool_size > 10000 {
555            return Err(JsonRegisterError::Configuration(
556                "pool_size exceeds reasonable maximum of 10000".into(),
557            )
558            .into());
559        }
560
561        if table_name.is_empty() {
562            return Err(
563                JsonRegisterError::Configuration("table_name cannot be empty".into()).into(),
564            );
565        }
566
567        if id_column.is_empty() {
568            return Err(
569                JsonRegisterError::Configuration("id_column cannot be empty".into()).into(),
570            );
571        }
572
573        if jsonb_column.is_empty() {
574            return Err(
575                JsonRegisterError::Configuration("jsonb_column cannot be empty".into()).into(),
576            );
577        }
578
579        let connection_string = build_connection_string(
580            &database_user,
581            &database_password,
582            &database_host,
583            database_port,
584            &database_name,
585        );
586
587        let rt = Runtime::new().map_err(|e| JsonRegisterError::RuntimeError(e.to_string()))?;
588
589        let inner = rt.block_on(async {
590            Register::new(
591                &connection_string,
592                table_name,
593                id_column,
594                jsonb_column,
595                pool_size,
596                lru_cache_size,
597                acquire_timeout_secs,
598                idle_timeout_secs,
599                max_lifetime_secs,
600                use_tls,
601            )
602            .await
603        })?;
604
605        Ok(PyJsonRegister { inner, rt })
606    }
607
608    /// Registers a single JSON object from Python.
609    fn register_object(&self, json_obj: &Bound<'_, PyAny>) -> PyResult<i32> {
610        let value: Value = pythonize::depythonize(json_obj)
611            .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
612        self.rt
613            .block_on(self.inner.register_object(&value))
614            .map_err(Into::into)
615    }
616
617    /// Registers a batch of JSON objects from Python.
618    fn register_batch_objects(&self, json_objects: &Bound<'_, PyList>) -> PyResult<Vec<i32>> {
619        let mut values = Vec::with_capacity(json_objects.len());
620        for obj in json_objects {
621            let value: Value = pythonize::depythonize(&obj)
622                .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
623            values.push(value);
624        }
625        self.rt
626            .block_on(self.inner.register_batch_objects(&values))
627            .map_err(Into::into)
628    }
629
630    /// Returns the current size of the connection pool.
631    ///
632    /// This is the total number of connections (both idle and active) currently
633    /// in the pool. Useful for monitoring pool utilization.
634    fn pool_size(&self) -> usize {
635        self.inner.pool_size()
636    }
637
638    /// Returns the number of idle connections in the pool.
639    ///
640    /// Idle connections are available for immediate use. A low idle count
641    /// during high load may indicate the pool is undersized.
642    fn idle_connections(&self) -> usize {
643        self.inner.idle_connections()
644    }
645
646    /// Checks if the connection pool is closed.
647    ///
648    /// A closed pool cannot create new connections and will error on acquire attempts.
649    fn is_closed(&self) -> bool {
650        self.inner.is_closed()
651    }
652
653    /// Returns the number of cache hits.
654    ///
655    /// This is the total number of successful cache lookups since the instance was created.
656    fn cache_hits(&self) -> u64 {
657        self.inner.cache_hits()
658    }
659
660    /// Returns the number of cache misses.
661    ///
662    /// This is the total number of unsuccessful cache lookups since the instance was created.
663    fn cache_misses(&self) -> u64 {
664        self.inner.cache_misses()
665    }
666
667    /// Returns the cache hit rate as a percentage.
668    ///
669    /// Returns a value between 0.0 and 100.0. Returns 0.0 if no cache operations have occurred.
670    fn cache_hit_rate(&self) -> f64 {
671        self.inner.cache_hit_rate()
672    }
673
674    /// Returns the current number of items in the cache.
675    fn cache_size(&self) -> usize {
676        self.inner.cache_size()
677    }
678
679    /// Returns the maximum capacity of the cache.
680    fn cache_capacity(&self) -> usize {
681        self.inner.cache_capacity()
682    }
683
684    /// Returns the number of cache evictions.
685    fn cache_evictions(&self) -> u64 {
686        self.inner.cache_evictions()
687    }
688
689    /// Returns the number of active database connections.
690    fn active_connections(&self) -> usize {
691        self.inner.active_connections()
692    }
693
694    /// Returns the total number of database queries executed.
695    fn db_queries_total(&self) -> u64 {
696        self.inner.db_queries_total()
697    }
698
699    /// Returns the total number of database query errors.
700    fn db_query_errors(&self) -> u64 {
701        self.inner.db_query_errors()
702    }
703
704    /// Returns the number of times register_object was called.
705    fn register_single_calls(&self) -> u64 {
706        self.inner.register_single_calls()
707    }
708
709    /// Returns the number of times register_batch_objects was called.
710    fn register_batch_calls(&self) -> u64 {
711        self.inner.register_batch_calls()
712    }
713
714    /// Returns the total number of objects registered.
715    fn total_objects_registered(&self) -> u64 {
716        self.inner.total_objects_registered()
717    }
718}
719
720#[cfg(feature = "python")]
721#[pyfunction(name = "canonicalise")]
722/// Canonicalises a Python object into its JSON string representation (as bytes).
723fn py_canonicalise(json_obj: &Bound<'_, PyAny>) -> PyResult<Vec<u8>> {
724    let value: Value = pythonize::depythonize(json_obj)
725        .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
726    crate::canonicalise::canonicalise(&value)
727        .map(|s| s.into_bytes())
728        .map_err(|e| JsonRegisterError::SerdeError(e).into())
729}
730
731/// A Python module implemented in Rust.
732#[cfg(feature = "python")]
733#[pymodule]
734fn json_register(_m: &Bound<'_, PyModule>) -> PyResult<()> {
735    _m.add_class::<PyJsonRegister>()?;
736    _m.add_function(wrap_pyfunction!(py_canonicalise, _m)?)?;
737    Ok(())
738}
739
740#[cfg(test)]
741mod connection_tests {
742    use super::*;
743
744    #[test]
745    fn test_sanitize_connection_string_with_password() {
746        let input = "postgres://user:secret123@localhost:5432/mydb";
747        let expected = "postgres://user:****@localhost:5432/mydb";
748        assert_eq!(sanitize_connection_string(input), expected);
749    }
750
751    #[test]
752    fn test_sanitize_connection_string_postgresql_scheme() {
753        let input = "postgresql://admin:p@ssw0rd@db.example.com:5432/production";
754        let expected = "postgresql://admin:****@db.example.com:5432/production";
755        assert_eq!(sanitize_connection_string(input), expected);
756    }
757
758    #[test]
759    fn test_sanitize_connection_string_no_password() {
760        // No password in connection string
761        let input = "postgres://user@localhost:5432/mydb";
762        assert_eq!(sanitize_connection_string(input), input);
763    }
764
765    #[test]
766    fn test_sanitize_connection_string_with_special_chars() {
767        let input = "postgres://user:p@ss:word@localhost:5432/mydb";
768        let expected = "postgres://user:****@localhost:5432/mydb";
769        assert_eq!(sanitize_connection_string(input), expected);
770    }
771
772    #[test]
773    fn test_sanitize_connection_string_not_postgres() {
774        // Works with other schemes too
775        let input = "mysql://user:password@localhost:3306/mydb";
776        let expected = "mysql://user:****@localhost:3306/mydb";
777        assert_eq!(sanitize_connection_string(input), expected);
778    }
779
780    #[test]
781    fn test_sanitize_connection_string_malformed() {
782        // Malformed string - return as-is
783        let input = "not a connection string";
784        assert_eq!(sanitize_connection_string(input), input);
785    }
786}