json_register/
lib.rs

1//! # JSON Register
2//!
3//! `json-register` is a library for registering JSON objects into a PostgreSQL database
4//! with canonicalisation and caching. It ensures that semantically equivalent JSON objects
5//! are stored only once and assigned a unique identifier.
6//!
7//! This library provides both a Rust API and Python bindings.
8
9#[cfg(feature = "python")]
10use pyo3::prelude::*;
11#[cfg(feature = "python")]
12use pyo3::types::PyList;
13#[cfg(feature = "python")]
14use tokio::runtime::Runtime;
15
16use serde_json::Value;
17use std::sync::atomic::{AtomicU64, Ordering};
18#[cfg(feature = "python")]
19use std::sync::Arc;
20use tracing::{debug, instrument, trace};
21
22mod cache;
23mod canonicalise;
24mod db;
25mod errors;
26
27pub use cache::Cache;
28pub use canonicalise::canonicalise;
29pub use db::Db;
30pub use errors::JsonRegisterError;
31
32/// Builds a PostgreSQL connection string from its components.
33///
34/// # Arguments
35///
36/// * `user` - Database user name
37/// * `password` - Database password
38/// * `host` - Database host (e.g., "localhost")
39/// * `port` - Database port (e.g., 5432)
40/// * `database` - Database name
41///
42/// # Returns
43///
44/// A formatted PostgreSQL connection string
45pub fn build_connection_string(
46    user: &str,
47    password: &str,
48    host: &str,
49    port: u16,
50    database: &str,
51) -> String {
52    format!(
53        "postgres://{}:{}@{}:{}/{}",
54        user, password, host, port, database
55    )
56}
57
58/// Sanitizes a connection string by replacing the password with asterisks.
59///
60/// This prevents passwords from leaking in error messages, logs, or stack traces.
61///
62/// # Arguments
63///
64/// * `connection_string` - The connection string to sanitize
65///
66/// # Returns
67///
68/// A sanitized connection string with the password replaced by "****"
69///
70/// # Example
71///
72/// ```
73/// use json_register::sanitize_connection_string;
74/// let sanitized = sanitize_connection_string("postgres://user:secret@localhost:5432/db");
75/// assert_eq!(sanitized, "postgres://user:****@localhost:5432/db");
76/// ```
77pub fn sanitize_connection_string(connection_string: &str) -> String {
78    // Handle postgres:// or postgresql:// schemes
79    if let Some(scheme_end) = connection_string.find("://") {
80        let scheme = &connection_string[..scheme_end + 3];
81        let rest = &connection_string[scheme_end + 3..];
82
83        // Find the LAST @ symbol before any / (to handle @ in passwords)
84        // The @ separates user:password from host:port/db
85        let at_idx = if let Some(slash_idx) = rest.find('/') {
86            // Find last @ before the slash
87            rest[..slash_idx].rfind('@')
88        } else {
89            // No slash, find last @ in entire string
90            rest.rfind('@')
91        };
92
93        if let Some(at_idx) = at_idx {
94            let user_pass = &rest[..at_idx];
95            let host_db = &rest[at_idx..];
96
97            // Find FIRST : separator between user and password
98            // (username shouldn't have :, but password might)
99            if let Some(colon_idx) = user_pass.find(':') {
100                let user = &user_pass[..colon_idx];
101                return format!("{}{}:****{}", scheme, user, host_db);
102            }
103        }
104    }
105
106    // If parsing fails, return as-is (no password to hide)
107    connection_string.to_string()
108}
109
110/// The main registry structure that coordinates database interactions and caching.
111///
112/// This struct maintains a connection pool to the PostgreSQL database and an
113/// in-memory LRU cache to speed up lookups of frequently accessed JSON objects.
114pub struct Register {
115    db: Db,
116    cache: Cache,
117    register_single_calls: AtomicU64,
118    register_batch_calls: AtomicU64,
119    total_objects_registered: AtomicU64,
120}
121
122impl Register {
123    /// Creates a new `Register` instance.
124    ///
125    /// # Arguments
126    ///
127    /// * `connection_string` - The PostgreSQL connection string.
128    /// * `table_name` - The name of the table where JSON objects are stored.
129    /// * `id_column` - The name of the column storing the unique ID.
130    /// * `jsonb_column` - The name of the column storing the JSONB data.
131    /// * `pool_size` - The maximum number of connections in the database pool.
132    /// * `lru_cache_size` - The capacity of the in-memory LRU cache.
133    /// * `acquire_timeout_secs` - Optional timeout for acquiring connections (default: 5s).
134    /// * `idle_timeout_secs` - Optional timeout for idle connections (default: 600s).
135    /// * `max_lifetime_secs` - Optional maximum lifetime for connections (default: 1800s).
136    /// * `use_tls` - Optional flag to enable TLS (default: false for backwards compatibility).
137    ///
138    /// # Returns
139    ///
140    /// A `Result` containing the new `Register` instance or a `JsonRegisterError`.
141    #[allow(clippy::too_many_arguments)]
142    pub async fn new(
143        connection_string: &str,
144        table_name: &str,
145        id_column: &str,
146        jsonb_column: &str,
147        pool_size: u32,
148        lru_cache_size: usize,
149        acquire_timeout_secs: Option<u64>,
150        idle_timeout_secs: Option<u64>,
151        max_lifetime_secs: Option<u64>,
152        use_tls: Option<bool>,
153    ) -> Result<Self, JsonRegisterError> {
154        let db = Db::new(
155            connection_string,
156            table_name,
157            id_column,
158            jsonb_column,
159            pool_size,
160            acquire_timeout_secs,
161            idle_timeout_secs,
162            max_lifetime_secs,
163            use_tls,
164        )
165        .await?;
166        let cache = Cache::new(lru_cache_size);
167        Ok(Self {
168            db,
169            cache,
170            register_single_calls: AtomicU64::new(0),
171            register_batch_calls: AtomicU64::new(0),
172            total_objects_registered: AtomicU64::new(0),
173        })
174    }
175
176    /// Registers a single JSON object.
177    ///
178    /// This method canonicalises the input JSON, checks the cache, and if necessary,
179    /// inserts the object into the database. It returns the unique ID associated
180    /// with the JSON object.
181    ///
182    /// # Arguments
183    ///
184    /// * `value` - The JSON value to register.
185    ///
186    /// # Returns
187    ///
188    /// A `Result` containing the unique ID (i32) or a `JsonRegisterError`.
189    #[instrument(skip(self, value))]
190    pub async fn register_object(&self, value: &Value) -> Result<i32, JsonRegisterError> {
191        self.register_single_calls.fetch_add(1, Ordering::Relaxed);
192        self.total_objects_registered
193            .fetch_add(1, Ordering::Relaxed);
194
195        let canonical = canonicalise(value).map_err(JsonRegisterError::SerdeError)?;
196
197        if let Some(id) = self.cache.get(&canonical) {
198            trace!(id, "cache hit");
199            return Ok(id);
200        }
201
202        trace!("cache miss, querying database");
203        let id = self
204            .db
205            .register_object(value)
206            .await
207            .map_err(JsonRegisterError::DbError)?;
208
209        self.cache.put(canonical, id);
210        debug!(id, "object registered and cached");
211
212        Ok(id)
213    }
214
215    /// Registers a batch of JSON objects.
216    ///
217    /// This method processes multiple JSON objects efficiently. It first checks the
218    /// cache for all items. If any are missing, it performs a batch insert/select
219    /// operation in the database. The order of the returned IDs corresponds to the
220    /// order of the input values.
221    ///
222    /// # Arguments
223    ///
224    /// * `values` - A slice of JSON values to register.
225    ///
226    /// # Returns
227    ///
228    /// A `Result` containing a vector of unique IDs or a `JsonRegisterError`.
229    #[instrument(skip(self, values), fields(batch_size = values.len()))]
230    pub async fn register_batch_objects(
231        &self,
232        values: &[Value],
233    ) -> Result<Vec<i32>, JsonRegisterError> {
234        self.register_batch_calls.fetch_add(1, Ordering::Relaxed);
235        self.total_objects_registered
236            .fetch_add(values.len() as u64, Ordering::Relaxed);
237
238        let mut canonicals = Vec::with_capacity(values.len());
239        for value in values {
240            canonicals.push(canonicalise(value).map_err(JsonRegisterError::SerdeError)?);
241        }
242
243        // Check cache for existing entries
244        let mut all_cached = true;
245        let mut cached_ids = Vec::with_capacity(values.len());
246        for canonical in &canonicals {
247            if let Some(id) = self.cache.get(canonical) {
248                cached_ids.push(id);
249            } else {
250                all_cached = false;
251                break;
252            }
253        }
254
255        if all_cached {
256            debug!(cached = values.len(), "all objects found in cache");
257            return Ok(cached_ids);
258        }
259
260        // If not all items are in the cache, query the database
261        trace!("cache miss for some objects, querying database");
262        let ids = self
263            .db
264            .register_batch_objects(values)
265            .await
266            .map_err(JsonRegisterError::DbError)?;
267
268        // Update the cache with the newly retrieved IDs
269        for (canonical, id) in canonicals.into_iter().zip(ids.iter()) {
270            self.cache.put(canonical, *id);
271        }
272
273        debug!(registered = ids.len(), "batch registered and cached");
274        Ok(ids)
275    }
276
277    /// Returns the current size of the connection pool.
278    ///
279    /// This is the total number of connections (both idle and active) currently
280    /// in the pool. Useful for monitoring pool utilization.
281    ///
282    /// # Returns
283    ///
284    /// The number of connections in the pool.
285    pub fn pool_size(&self) -> usize {
286        self.db.pool_size()
287    }
288
289    /// Returns the number of idle connections in the pool.
290    ///
291    /// Idle connections are available for immediate use. A low idle count
292    /// during high load may indicate the pool is undersized.
293    ///
294    /// # Returns
295    ///
296    /// The number of idle connections.
297    pub fn idle_connections(&self) -> usize {
298        self.db.idle_connections()
299    }
300
301    /// Checks if the connection pool is closed.
302    ///
303    /// A closed pool cannot create new connections and will error on acquire attempts.
304    ///
305    /// # Returns
306    ///
307    /// `true` if the pool is closed, `false` otherwise.
308    pub fn is_closed(&self) -> bool {
309        self.db.is_closed()
310    }
311
312    /// Returns the number of cache hits.
313    ///
314    /// # Returns
315    ///
316    /// The total number of successful cache lookups.
317    pub fn cache_hits(&self) -> u64 {
318        self.cache.hits()
319    }
320
321    /// Returns the number of cache misses.
322    ///
323    /// # Returns
324    ///
325    /// The total number of unsuccessful cache lookups.
326    pub fn cache_misses(&self) -> u64 {
327        self.cache.misses()
328    }
329
330    /// Returns the cache hit rate as a percentage.
331    ///
332    /// # Returns
333    ///
334    /// The hit rate as a float between 0.0 and 100.0.
335    /// Returns 0.0 if no cache operations have occurred.
336    pub fn cache_hit_rate(&self) -> f64 {
337        self.cache.hit_rate()
338    }
339
340    /// Returns the current number of items in the cache.
341    ///
342    /// # Returns
343    ///
344    /// The number of items currently stored in the cache.
345    pub fn cache_size(&self) -> usize {
346        self.cache.size()
347    }
348
349    /// Returns the maximum capacity of the cache.
350    ///
351    /// # Returns
352    ///
353    /// The maximum number of items the cache can hold.
354    pub fn cache_capacity(&self) -> usize {
355        self.cache.capacity()
356    }
357
358    /// Returns the number of cache evictions.
359    ///
360    /// # Returns
361    ///
362    /// The total number of items evicted from the cache.
363    pub fn cache_evictions(&self) -> u64 {
364        self.cache.evictions()
365    }
366
367    /// Returns the number of active database connections.
368    ///
369    /// Active connections are those currently in use (not idle).
370    ///
371    /// # Returns
372    ///
373    /// The number of active connections (pool_size - idle_connections).
374    pub fn active_connections(&self) -> usize {
375        self.pool_size().saturating_sub(self.idle_connections())
376    }
377
378    /// Returns the total number of database queries executed.
379    ///
380    /// # Returns
381    ///
382    /// The total number of queries executed since instance creation.
383    pub fn db_queries_total(&self) -> u64 {
384        self.db.queries_executed()
385    }
386
387    /// Returns the total number of database query errors.
388    ///
389    /// # Returns
390    ///
391    /// The total number of failed queries since instance creation.
392    pub fn db_query_errors(&self) -> u64 {
393        self.db.query_errors()
394    }
395
396    /// Returns the number of times register_object was called.
397    ///
398    /// # Returns
399    ///
400    /// The total number of single object registration calls.
401    pub fn register_single_calls(&self) -> u64 {
402        self.register_single_calls.load(Ordering::Relaxed)
403    }
404
405    /// Returns the number of times register_batch_objects was called.
406    ///
407    /// # Returns
408    ///
409    /// The total number of batch registration calls.
410    pub fn register_batch_calls(&self) -> u64 {
411        self.register_batch_calls.load(Ordering::Relaxed)
412    }
413
414    /// Returns the total number of objects registered.
415    ///
416    /// This counts all objects across both single and batch operations.
417    ///
418    /// # Returns
419    ///
420    /// The total number of objects registered since instance creation.
421    pub fn total_objects_registered(&self) -> u64 {
422        self.total_objects_registered.load(Ordering::Relaxed)
423    }
424
425    /// Returns all telemetry metrics in a single snapshot.
426    ///
427    /// This is useful for OpenTelemetry exporters and monitoring systems
428    /// that need to collect all metrics at once.
429    ///
430    /// # Returns
431    ///
432    /// A `TelemetryMetrics` struct containing all current metric values.
433    pub fn telemetry_metrics(&self) -> TelemetryMetrics {
434        TelemetryMetrics {
435            // Cache metrics
436            cache_hits: self.cache_hits(),
437            cache_misses: self.cache_misses(),
438            cache_hit_rate: self.cache_hit_rate(),
439            cache_size: self.cache_size(),
440            cache_capacity: self.cache_capacity(),
441            cache_evictions: self.cache_evictions(),
442            // Connection pool metrics
443            pool_size: self.pool_size(),
444            idle_connections: self.idle_connections(),
445            active_connections: self.active_connections(),
446            is_closed: self.is_closed(),
447            // Database metrics
448            db_queries_total: self.db_queries_total(),
449            db_query_errors: self.db_query_errors(),
450            // Operation metrics
451            register_single_calls: self.register_single_calls(),
452            register_batch_calls: self.register_batch_calls(),
453            total_objects_registered: self.total_objects_registered(),
454        }
455    }
456}
457
458/// A snapshot of all telemetry metrics.
459///
460/// This struct provides a complete view of the register's performance
461/// and is designed to work well with OpenTelemetry exporters.
462#[derive(Debug, Clone)]
463pub struct TelemetryMetrics {
464    // Cache metrics
465    pub cache_hits: u64,
466    pub cache_misses: u64,
467    pub cache_hit_rate: f64,
468    pub cache_size: usize,
469    pub cache_capacity: usize,
470    pub cache_evictions: u64,
471    // Connection pool metrics
472    pub pool_size: usize,
473    pub idle_connections: usize,
474    pub active_connections: usize,
475    pub is_closed: bool,
476    // Database metrics
477    pub db_queries_total: u64,
478    pub db_query_errors: u64,
479    // Operation metrics
480    pub register_single_calls: u64,
481    pub register_batch_calls: u64,
482    pub total_objects_registered: u64,
483}
484
485#[cfg(feature = "python")]
486#[pyclass(name = "JsonRegister")]
487/// Python wrapper for the `Register` struct.
488struct PyJsonRegister {
489    inner: Arc<Register>,
490    rt: Runtime,
491}
492
493#[cfg(feature = "python")]
494#[pymethods]
495impl PyJsonRegister {
496    #[new]
497    #[pyo3(signature = (
498        database_name,
499        database_host,
500        database_port,
501        database_user,
502        database_password,
503        lru_cache_size=1000,
504        table_name="json_objects",
505        id_column="id",
506        jsonb_column="json_object",
507        pool_size=10,
508        acquire_timeout_secs=None,
509        idle_timeout_secs=None,
510        max_lifetime_secs=None,
511        use_tls=None
512    ))]
513    #[allow(clippy::too_many_arguments)]
514    /// Initializes a new `JsonRegister` instance from Python.
515    ///
516    /// # Optional Timeout Parameters
517    ///
518    /// * `acquire_timeout_secs` - Timeout for acquiring a connection from pool (default: 5)
519    /// * `idle_timeout_secs` - Timeout for idle connections before closure (default: 600)
520    /// * `max_lifetime_secs` - Maximum lifetime of connections (default: 1800)
521    /// * `use_tls` - Enable TLS for database connections (default: False for backwards compatibility)
522    fn new(
523        database_name: String,
524        database_host: String,
525        database_port: u16,
526        database_user: String,
527        database_password: String,
528        lru_cache_size: usize,
529        table_name: &str,
530        id_column: &str,
531        jsonb_column: &str,
532        pool_size: u32,
533        acquire_timeout_secs: Option<u64>,
534        idle_timeout_secs: Option<u64>,
535        max_lifetime_secs: Option<u64>,
536        use_tls: Option<bool>,
537    ) -> PyResult<Self> {
538        // Validate configuration parameters
539        if database_name.is_empty() {
540            return Err(
541                JsonRegisterError::Configuration("database_name cannot be empty".into()).into(),
542            );
543        }
544
545        if database_host.is_empty() {
546            return Err(
547                JsonRegisterError::Configuration("database_host cannot be empty".into()).into(),
548            );
549        }
550
551        if database_port == 0 {
552            return Err(JsonRegisterError::Configuration(
553                "database_port must be between 1 and 65535".into(),
554            )
555            .into());
556        }
557
558        if pool_size == 0 {
559            return Err(JsonRegisterError::Configuration(
560                "pool_size must be greater than 0".into(),
561            )
562            .into());
563        }
564
565        if pool_size > 10000 {
566            return Err(JsonRegisterError::Configuration(
567                "pool_size exceeds reasonable maximum of 10000".into(),
568            )
569            .into());
570        }
571
572        if table_name.is_empty() {
573            return Err(
574                JsonRegisterError::Configuration("table_name cannot be empty".into()).into(),
575            );
576        }
577
578        if id_column.is_empty() {
579            return Err(
580                JsonRegisterError::Configuration("id_column cannot be empty".into()).into(),
581            );
582        }
583
584        if jsonb_column.is_empty() {
585            return Err(
586                JsonRegisterError::Configuration("jsonb_column cannot be empty".into()).into(),
587            );
588        }
589
590        let connection_string = build_connection_string(
591            &database_user,
592            &database_password,
593            &database_host,
594            database_port,
595            &database_name,
596        );
597
598        let rt = Runtime::new().map_err(|e| JsonRegisterError::RuntimeError(e.to_string()))?;
599
600        let inner = rt.block_on(async {
601            Register::new(
602                &connection_string,
603                table_name,
604                id_column,
605                jsonb_column,
606                pool_size,
607                lru_cache_size,
608                acquire_timeout_secs,
609                idle_timeout_secs,
610                max_lifetime_secs,
611                use_tls,
612            )
613            .await
614        })?;
615
616        Ok(PyJsonRegister {
617            inner: Arc::new(inner),
618            rt,
619        })
620    }
621
622    /// Registers a single JSON object from Python.
623    fn register_object(&self, json_obj: &Bound<'_, PyAny>) -> PyResult<i32> {
624        let value: Value = pythonize::depythonize(json_obj)
625            .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
626        self.rt
627            .block_on(self.inner.register_object(&value))
628            .map_err(Into::into)
629    }
630
631    /// Registers a batch of JSON objects from Python.
632    fn register_batch_objects(&self, json_objects: &Bound<'_, PyList>) -> PyResult<Vec<i32>> {
633        let mut values = Vec::with_capacity(json_objects.len());
634        for obj in json_objects {
635            let value: Value = pythonize::depythonize(&obj)
636                .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
637            values.push(value);
638        }
639        self.rt
640            .block_on(self.inner.register_batch_objects(&values))
641            .map_err(Into::into)
642    }
643
644    /// Registers a single JSON object from Python asynchronously.
645    ///
646    /// This method returns a Python awaitable that can be used with `await`.
647    /// Use this in async Python code (e.g., FastAPI, aiohttp) to avoid blocking
648    /// the event loop.
649    fn register_object_async<'py>(
650        &self,
651        py: Python<'py>,
652        json_obj: &Bound<'_, PyAny>,
653    ) -> PyResult<Bound<'py, PyAny>> {
654        let value: Value = pythonize::depythonize(json_obj)
655            .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
656        let inner = Arc::clone(&self.inner);
657        pyo3_async_runtimes::tokio::future_into_py(py, async move {
658            inner.register_object(&value).await.map_err(PyErr::from)
659        })
660    }
661
662    /// Registers a batch of JSON objects from Python asynchronously.
663    ///
664    /// This method returns a Python awaitable that can be used with `await`.
665    /// Use this in async Python code (e.g., FastAPI, aiohttp) to avoid blocking
666    /// the event loop.
667    fn register_batch_objects_async<'py>(
668        &self,
669        py: Python<'py>,
670        json_objects: &Bound<'_, PyList>,
671    ) -> PyResult<Bound<'py, PyAny>> {
672        let mut values = Vec::with_capacity(json_objects.len());
673        for obj in json_objects {
674            let value: Value = pythonize::depythonize(&obj)
675                .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
676            values.push(value);
677        }
678        let inner = Arc::clone(&self.inner);
679        pyo3_async_runtimes::tokio::future_into_py(py, async move {
680            inner
681                .register_batch_objects(&values)
682                .await
683                .map_err(PyErr::from)
684        })
685    }
686
687    /// Returns the current size of the connection pool.
688    ///
689    /// This is the total number of connections (both idle and active) currently
690    /// in the pool. Useful for monitoring pool utilization.
691    fn pool_size(&self) -> usize {
692        self.inner.pool_size()
693    }
694
695    /// Returns the number of idle connections in the pool.
696    ///
697    /// Idle connections are available for immediate use. A low idle count
698    /// during high load may indicate the pool is undersized.
699    fn idle_connections(&self) -> usize {
700        self.inner.idle_connections()
701    }
702
703    /// Checks if the connection pool is closed.
704    ///
705    /// A closed pool cannot create new connections and will error on acquire attempts.
706    fn is_closed(&self) -> bool {
707        self.inner.is_closed()
708    }
709
710    /// Returns the number of cache hits.
711    ///
712    /// This is the total number of successful cache lookups since the instance was created.
713    fn cache_hits(&self) -> u64 {
714        self.inner.cache_hits()
715    }
716
717    /// Returns the number of cache misses.
718    ///
719    /// This is the total number of unsuccessful cache lookups since the instance was created.
720    fn cache_misses(&self) -> u64 {
721        self.inner.cache_misses()
722    }
723
724    /// Returns the cache hit rate as a percentage.
725    ///
726    /// Returns a value between 0.0 and 100.0. Returns 0.0 if no cache operations have occurred.
727    fn cache_hit_rate(&self) -> f64 {
728        self.inner.cache_hit_rate()
729    }
730
731    /// Returns the current number of items in the cache.
732    fn cache_size(&self) -> usize {
733        self.inner.cache_size()
734    }
735
736    /// Returns the maximum capacity of the cache.
737    fn cache_capacity(&self) -> usize {
738        self.inner.cache_capacity()
739    }
740
741    /// Returns the number of cache evictions.
742    fn cache_evictions(&self) -> u64 {
743        self.inner.cache_evictions()
744    }
745
746    /// Returns the number of active database connections.
747    fn active_connections(&self) -> usize {
748        self.inner.active_connections()
749    }
750
751    /// Returns the total number of database queries executed.
752    fn db_queries_total(&self) -> u64 {
753        self.inner.db_queries_total()
754    }
755
756    /// Returns the total number of database query errors.
757    fn db_query_errors(&self) -> u64 {
758        self.inner.db_query_errors()
759    }
760
761    /// Returns the number of times register_object was called.
762    fn register_single_calls(&self) -> u64 {
763        self.inner.register_single_calls()
764    }
765
766    /// Returns the number of times register_batch_objects was called.
767    fn register_batch_calls(&self) -> u64 {
768        self.inner.register_batch_calls()
769    }
770
771    /// Returns the total number of objects registered.
772    fn total_objects_registered(&self) -> u64 {
773        self.inner.total_objects_registered()
774    }
775}
776
777#[cfg(feature = "python")]
778#[pyfunction(name = "canonicalise")]
779/// Canonicalises a Python object into its JSON string representation (as bytes).
780fn py_canonicalise(json_obj: &Bound<'_, PyAny>) -> PyResult<Vec<u8>> {
781    let value: Value = pythonize::depythonize(json_obj)
782        .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
783    crate::canonicalise::canonicalise(&value)
784        .map(|s| s.into_bytes())
785        .map_err(|e| JsonRegisterError::SerdeError(e).into())
786}
787
788/// A Python module implemented in Rust.
789#[cfg(feature = "python")]
790#[pymodule]
791fn json_register(_m: &Bound<'_, PyModule>) -> PyResult<()> {
792    // Initialize logging bridge: tracing -> log -> Python logging
793    // This allows Python apps to see Rust tracing events in their logging output
794    // Use try_init() variants to avoid panics if loggers are already initialized
795    tracing_log::LogTracer::init().ok();
796    let _ = pyo3_log::try_init();
797
798    _m.add_class::<PyJsonRegister>()?;
799    _m.add_function(wrap_pyfunction!(py_canonicalise, _m)?)?;
800    _m.add("__version__", env!("CARGO_PKG_VERSION"))?;
801    Ok(())
802}
803
804#[cfg(test)]
805mod connection_tests {
806    use super::*;
807
808    #[test]
809    fn test_sanitize_connection_string_with_password() {
810        let input = "postgres://user:secret123@localhost:5432/mydb";
811        let expected = "postgres://user:****@localhost:5432/mydb";
812        assert_eq!(sanitize_connection_string(input), expected);
813    }
814
815    #[test]
816    fn test_sanitize_connection_string_postgresql_scheme() {
817        let input = "postgresql://admin:p@ssw0rd@db.example.com:5432/production";
818        let expected = "postgresql://admin:****@db.example.com:5432/production";
819        assert_eq!(sanitize_connection_string(input), expected);
820    }
821
822    #[test]
823    fn test_sanitize_connection_string_no_password() {
824        // No password in connection string
825        let input = "postgres://user@localhost:5432/mydb";
826        assert_eq!(sanitize_connection_string(input), input);
827    }
828
829    #[test]
830    fn test_sanitize_connection_string_with_special_chars() {
831        let input = "postgres://user:p@ss:word@localhost:5432/mydb";
832        let expected = "postgres://user:****@localhost:5432/mydb";
833        assert_eq!(sanitize_connection_string(input), expected);
834    }
835
836    #[test]
837    fn test_sanitize_connection_string_not_postgres() {
838        // Works with other schemes too
839        let input = "mysql://user:password@localhost:3306/mydb";
840        let expected = "mysql://user:****@localhost:3306/mydb";
841        assert_eq!(sanitize_connection_string(input), expected);
842    }
843
844    #[test]
845    fn test_sanitize_connection_string_malformed() {
846        // Malformed string - return as-is
847        let input = "not a connection string";
848        assert_eq!(sanitize_connection_string(input), input);
849    }
850}