json_register/
lib.rs

1//! # JSON Register
2//!
3//! `json-register` is a library for registering JSON objects into a PostgreSQL database
4//! with canonicalisation and caching. It ensures that semantically equivalent JSON objects
5//! are stored only once and assigned a unique identifier.
6//!
7//! This library provides both a Rust API and Python bindings.
8
9#[cfg(feature = "python")]
10use pyo3::prelude::*;
11#[cfg(feature = "python")]
12use pyo3::types::PyList;
13#[cfg(feature = "python")]
14use tokio::runtime::Runtime;
15
16use serde_json::Value;
17use std::sync::atomic::{AtomicU64, Ordering};
18
19mod cache;
20mod canonicalise;
21mod db;
22mod errors;
23
24pub use cache::Cache;
25pub use canonicalise::canonicalise;
26pub use db::Db;
27pub use errors::JsonRegisterError;
28
29/// Builds a PostgreSQL connection string from its components.
30///
31/// # Arguments
32///
33/// * `user` - Database user name
34/// * `password` - Database password
35/// * `host` - Database host (e.g., "localhost")
36/// * `port` - Database port (e.g., 5432)
37/// * `database` - Database name
38///
39/// # Returns
40///
41/// A formatted PostgreSQL connection string
42pub fn build_connection_string(
43    user: &str,
44    password: &str,
45    host: &str,
46    port: u16,
47    database: &str,
48) -> String {
49    format!(
50        "postgres://{}:{}@{}:{}/{}",
51        user, password, host, port, database
52    )
53}
54
55/// Sanitizes a connection string by replacing the password with asterisks.
56///
57/// This prevents passwords from leaking in error messages, logs, or stack traces.
58///
59/// # Arguments
60///
61/// * `connection_string` - The connection string to sanitize
62///
63/// # Returns
64///
65/// A sanitized connection string with the password replaced by "****"
66///
67/// # Example
68///
69/// ```
70/// use json_register::sanitize_connection_string;
71/// let sanitized = sanitize_connection_string("postgres://user:secret@localhost:5432/db");
72/// assert_eq!(sanitized, "postgres://user:****@localhost:5432/db");
73/// ```
74pub fn sanitize_connection_string(connection_string: &str) -> String {
75    // Handle postgres:// or postgresql:// schemes
76    if let Some(scheme_end) = connection_string.find("://") {
77        let scheme = &connection_string[..scheme_end + 3];
78        let rest = &connection_string[scheme_end + 3..];
79
80        // Find the LAST @ symbol before any / (to handle @ in passwords)
81        // The @ separates user:password from host:port/db
82        let at_idx = if let Some(slash_idx) = rest.find('/') {
83            // Find last @ before the slash
84            rest[..slash_idx].rfind('@')
85        } else {
86            // No slash, find last @ in entire string
87            rest.rfind('@')
88        };
89
90        if let Some(at_idx) = at_idx {
91            let user_pass = &rest[..at_idx];
92            let host_db = &rest[at_idx..];
93
94            // Find FIRST : separator between user and password
95            // (username shouldn't have :, but password might)
96            if let Some(colon_idx) = user_pass.find(':') {
97                let user = &user_pass[..colon_idx];
98                return format!("{}{}:****{}", scheme, user, host_db);
99            }
100        }
101    }
102
103    // If parsing fails, return as-is (no password to hide)
104    connection_string.to_string()
105}
106
107/// The main registry structure that coordinates database interactions and caching.
108///
109/// This struct maintains a connection pool to the PostgreSQL database and an
110/// in-memory LRU cache to speed up lookups of frequently accessed JSON objects.
111pub struct Register {
112    db: Db,
113    cache: Cache,
114    register_single_calls: AtomicU64,
115    register_batch_calls: AtomicU64,
116    total_objects_registered: AtomicU64,
117}
118
119impl Register {
120    /// Creates a new `Register` instance.
121    ///
122    /// # Arguments
123    ///
124    /// * `connection_string` - The PostgreSQL connection string.
125    /// * `table_name` - The name of the table where JSON objects are stored.
126    /// * `id_column` - The name of the column storing the unique ID.
127    /// * `jsonb_column` - The name of the column storing the JSONB data.
128    /// * `pool_size` - The maximum number of connections in the database pool.
129    /// * `lru_cache_size` - The capacity of the in-memory LRU cache.
130    /// * `acquire_timeout_secs` - Optional timeout for acquiring connections (default: 5s).
131    /// * `idle_timeout_secs` - Optional timeout for idle connections (default: 600s).
132    /// * `max_lifetime_secs` - Optional maximum lifetime for connections (default: 1800s).
133    ///
134    /// # Returns
135    ///
136    /// A `Result` containing the new `Register` instance or a `JsonRegisterError`.
137    #[allow(clippy::too_many_arguments)]
138    pub async fn new(
139        connection_string: &str,
140        table_name: &str,
141        id_column: &str,
142        jsonb_column: &str,
143        pool_size: u32,
144        lru_cache_size: usize,
145        acquire_timeout_secs: Option<u64>,
146        idle_timeout_secs: Option<u64>,
147        max_lifetime_secs: Option<u64>,
148    ) -> Result<Self, JsonRegisterError> {
149        let db = Db::new(
150            connection_string,
151            table_name,
152            id_column,
153            jsonb_column,
154            pool_size,
155            acquire_timeout_secs,
156            idle_timeout_secs,
157            max_lifetime_secs,
158        )
159        .await
160        .map_err(JsonRegisterError::DbError)?;
161        let cache = Cache::new(lru_cache_size);
162        Ok(Self {
163            db,
164            cache,
165            register_single_calls: AtomicU64::new(0),
166            register_batch_calls: AtomicU64::new(0),
167            total_objects_registered: AtomicU64::new(0),
168        })
169    }
170
171    /// Registers a single JSON object.
172    ///
173    /// This method canonicalises the input JSON, checks the cache, and if necessary,
174    /// inserts the object into the database. It returns the unique ID associated
175    /// with the JSON object.
176    ///
177    /// # Arguments
178    ///
179    /// * `value` - The JSON value to register.
180    ///
181    /// # Returns
182    ///
183    /// A `Result` containing the unique ID (i32) or a `JsonRegisterError`.
184    pub async fn register_object(&self, value: &Value) -> Result<i32, JsonRegisterError> {
185        self.register_single_calls.fetch_add(1, Ordering::Relaxed);
186        self.total_objects_registered
187            .fetch_add(1, Ordering::Relaxed);
188
189        let canonical = canonicalise(value).map_err(JsonRegisterError::SerdeError)?;
190
191        if let Some(id) = self.cache.get(&canonical) {
192            return Ok(id);
193        }
194
195        let id = self
196            .db
197            .register_object(&canonical)
198            .await
199            .map_err(JsonRegisterError::DbError)?;
200
201        self.cache.put(canonical, id);
202
203        Ok(id)
204    }
205
206    /// Registers a batch of JSON objects.
207    ///
208    /// This method processes multiple JSON objects efficiently. It first checks the
209    /// cache for all items. If any are missing, it performs a batch insert/select
210    /// operation in the database. The order of the returned IDs corresponds to the
211    /// order of the input values.
212    ///
213    /// # Arguments
214    ///
215    /// * `values` - A slice of JSON values to register.
216    ///
217    /// # Returns
218    ///
219    /// A `Result` containing a vector of unique IDs or a `JsonRegisterError`.
220    pub async fn register_batch_objects(
221        &self,
222        values: &[Value],
223    ) -> Result<Vec<i32>, JsonRegisterError> {
224        self.register_batch_calls.fetch_add(1, Ordering::Relaxed);
225        self.total_objects_registered
226            .fetch_add(values.len() as u64, Ordering::Relaxed);
227
228        let mut canonicals = Vec::with_capacity(values.len());
229        for value in values {
230            canonicals.push(canonicalise(value).map_err(JsonRegisterError::SerdeError)?);
231        }
232
233        // Check cache for existing entries
234        let mut all_cached = true;
235        let mut cached_ids = Vec::with_capacity(values.len());
236        for canonical in &canonicals {
237            if let Some(id) = self.cache.get(canonical) {
238                cached_ids.push(id);
239            } else {
240                all_cached = false;
241                break;
242            }
243        }
244
245        if all_cached {
246            return Ok(cached_ids);
247        }
248
249        // If not all items are in the cache, query the database
250        let ids = self
251            .db
252            .register_batch_objects(&canonicals)
253            .await
254            .map_err(JsonRegisterError::DbError)?;
255
256        // Update the cache with the newly retrieved IDs
257        for (canonical, id) in canonicals.into_iter().zip(ids.iter()) {
258            self.cache.put(canonical, *id);
259        }
260
261        Ok(ids)
262    }
263
264    /// Returns the current size of the connection pool.
265    ///
266    /// This is the total number of connections (both idle and active) currently
267    /// in the pool. Useful for monitoring pool utilization.
268    ///
269    /// # Returns
270    ///
271    /// The number of connections in the pool.
272    pub fn pool_size(&self) -> usize {
273        self.db.pool_size()
274    }
275
276    /// Returns the number of idle connections in the pool.
277    ///
278    /// Idle connections are available for immediate use. A low idle count
279    /// during high load may indicate the pool is undersized.
280    ///
281    /// # Returns
282    ///
283    /// The number of idle connections.
284    pub fn idle_connections(&self) -> usize {
285        self.db.idle_connections()
286    }
287
288    /// Checks if the connection pool is closed.
289    ///
290    /// A closed pool cannot create new connections and will error on acquire attempts.
291    ///
292    /// # Returns
293    ///
294    /// `true` if the pool is closed, `false` otherwise.
295    pub fn is_closed(&self) -> bool {
296        self.db.is_closed()
297    }
298
299    /// Returns the number of cache hits.
300    ///
301    /// # Returns
302    ///
303    /// The total number of successful cache lookups.
304    pub fn cache_hits(&self) -> u64 {
305        self.cache.hits()
306    }
307
308    /// Returns the number of cache misses.
309    ///
310    /// # Returns
311    ///
312    /// The total number of unsuccessful cache lookups.
313    pub fn cache_misses(&self) -> u64 {
314        self.cache.misses()
315    }
316
317    /// Returns the cache hit rate as a percentage.
318    ///
319    /// # Returns
320    ///
321    /// The hit rate as a float between 0.0 and 100.0.
322    /// Returns 0.0 if no cache operations have occurred.
323    pub fn cache_hit_rate(&self) -> f64 {
324        self.cache.hit_rate()
325    }
326
327    /// Returns the current number of items in the cache.
328    ///
329    /// # Returns
330    ///
331    /// The number of items currently stored in the cache.
332    pub fn cache_size(&self) -> usize {
333        self.cache.size()
334    }
335
336    /// Returns the maximum capacity of the cache.
337    ///
338    /// # Returns
339    ///
340    /// The maximum number of items the cache can hold.
341    pub fn cache_capacity(&self) -> usize {
342        self.cache.capacity()
343    }
344
345    /// Returns the number of cache evictions.
346    ///
347    /// # Returns
348    ///
349    /// The total number of items evicted from the cache.
350    pub fn cache_evictions(&self) -> u64 {
351        self.cache.evictions()
352    }
353
354    /// Returns the number of active database connections.
355    ///
356    /// Active connections are those currently in use (not idle).
357    ///
358    /// # Returns
359    ///
360    /// The number of active connections (pool_size - idle_connections).
361    pub fn active_connections(&self) -> usize {
362        self.pool_size().saturating_sub(self.idle_connections())
363    }
364
365    /// Returns the total number of database queries executed.
366    ///
367    /// # Returns
368    ///
369    /// The total number of queries executed since instance creation.
370    pub fn db_queries_total(&self) -> u64 {
371        self.db.queries_executed()
372    }
373
374    /// Returns the total number of database query errors.
375    ///
376    /// # Returns
377    ///
378    /// The total number of failed queries since instance creation.
379    pub fn db_query_errors(&self) -> u64 {
380        self.db.query_errors()
381    }
382
383    /// Returns the number of times register_object was called.
384    ///
385    /// # Returns
386    ///
387    /// The total number of single object registration calls.
388    pub fn register_single_calls(&self) -> u64 {
389        self.register_single_calls.load(Ordering::Relaxed)
390    }
391
392    /// Returns the number of times register_batch_objects was called.
393    ///
394    /// # Returns
395    ///
396    /// The total number of batch registration calls.
397    pub fn register_batch_calls(&self) -> u64 {
398        self.register_batch_calls.load(Ordering::Relaxed)
399    }
400
401    /// Returns the total number of objects registered.
402    ///
403    /// This counts all objects across both single and batch operations.
404    ///
405    /// # Returns
406    ///
407    /// The total number of objects registered since instance creation.
408    pub fn total_objects_registered(&self) -> u64 {
409        self.total_objects_registered.load(Ordering::Relaxed)
410    }
411
412    /// Returns all telemetry metrics in a single snapshot.
413    ///
414    /// This is useful for OpenTelemetry exporters and monitoring systems
415    /// that need to collect all metrics at once.
416    ///
417    /// # Returns
418    ///
419    /// A `TelemetryMetrics` struct containing all current metric values.
420    pub fn telemetry_metrics(&self) -> TelemetryMetrics {
421        TelemetryMetrics {
422            // Cache metrics
423            cache_hits: self.cache_hits(),
424            cache_misses: self.cache_misses(),
425            cache_hit_rate: self.cache_hit_rate(),
426            cache_size: self.cache_size(),
427            cache_capacity: self.cache_capacity(),
428            cache_evictions: self.cache_evictions(),
429            // Connection pool metrics
430            pool_size: self.pool_size(),
431            idle_connections: self.idle_connections(),
432            active_connections: self.active_connections(),
433            is_closed: self.is_closed(),
434            // Database metrics
435            db_queries_total: self.db_queries_total(),
436            db_query_errors: self.db_query_errors(),
437            // Operation metrics
438            register_single_calls: self.register_single_calls(),
439            register_batch_calls: self.register_batch_calls(),
440            total_objects_registered: self.total_objects_registered(),
441        }
442    }
443}
444
445/// A snapshot of all telemetry metrics.
446///
447/// This struct provides a complete view of the register's performance
448/// and is designed to work well with OpenTelemetry exporters.
449#[derive(Debug, Clone)]
450pub struct TelemetryMetrics {
451    // Cache metrics
452    pub cache_hits: u64,
453    pub cache_misses: u64,
454    pub cache_hit_rate: f64,
455    pub cache_size: usize,
456    pub cache_capacity: usize,
457    pub cache_evictions: u64,
458    // Connection pool metrics
459    pub pool_size: usize,
460    pub idle_connections: usize,
461    pub active_connections: usize,
462    pub is_closed: bool,
463    // Database metrics
464    pub db_queries_total: u64,
465    pub db_query_errors: u64,
466    // Operation metrics
467    pub register_single_calls: u64,
468    pub register_batch_calls: u64,
469    pub total_objects_registered: u64,
470}
471
472#[cfg(feature = "python")]
473#[pyclass(name = "JsonRegister")]
474/// Python wrapper for the `Register` struct.
475struct PyJsonRegister {
476    inner: Register,
477    rt: Runtime,
478}
479
480#[cfg(feature = "python")]
481#[pymethods]
482impl PyJsonRegister {
483    #[new]
484    #[pyo3(signature = (
485        database_name,
486        database_host,
487        database_port,
488        database_user,
489        database_password,
490        lru_cache_size=1000,
491        table_name="json_objects",
492        id_column="id",
493        jsonb_column="json_object",
494        pool_size=10,
495        acquire_timeout_secs=None,
496        idle_timeout_secs=None,
497        max_lifetime_secs=None
498    ))]
499    #[allow(clippy::too_many_arguments)]
500    /// Initializes a new `JsonRegister` instance from Python.
501    ///
502    /// # Optional Timeout Parameters
503    ///
504    /// * `acquire_timeout_secs` - Timeout for acquiring a connection from pool (default: 5)
505    /// * `idle_timeout_secs` - Timeout for idle connections before closure (default: 600)
506    /// * `max_lifetime_secs` - Maximum lifetime of connections (default: 1800)
507    fn new(
508        database_name: String,
509        database_host: String,
510        database_port: u16,
511        database_user: String,
512        database_password: String,
513        lru_cache_size: usize,
514        table_name: &str,
515        id_column: &str,
516        jsonb_column: &str,
517        pool_size: u32,
518        acquire_timeout_secs: Option<u64>,
519        idle_timeout_secs: Option<u64>,
520        max_lifetime_secs: Option<u64>,
521    ) -> PyResult<Self> {
522        // Validate configuration parameters
523        if database_name.is_empty() {
524            return Err(
525                JsonRegisterError::Configuration("database_name cannot be empty".into()).into(),
526            );
527        }
528
529        if database_host.is_empty() {
530            return Err(
531                JsonRegisterError::Configuration("database_host cannot be empty".into()).into(),
532            );
533        }
534
535        if database_port == 0 {
536            return Err(JsonRegisterError::Configuration(
537                "database_port must be between 1 and 65535".into(),
538            )
539            .into());
540        }
541
542        if pool_size == 0 {
543            return Err(JsonRegisterError::Configuration(
544                "pool_size must be greater than 0".into(),
545            )
546            .into());
547        }
548
549        if pool_size > 10000 {
550            return Err(JsonRegisterError::Configuration(
551                "pool_size exceeds reasonable maximum of 10000".into(),
552            )
553            .into());
554        }
555
556        if table_name.is_empty() {
557            return Err(
558                JsonRegisterError::Configuration("table_name cannot be empty".into()).into(),
559            );
560        }
561
562        if id_column.is_empty() {
563            return Err(
564                JsonRegisterError::Configuration("id_column cannot be empty".into()).into(),
565            );
566        }
567
568        if jsonb_column.is_empty() {
569            return Err(
570                JsonRegisterError::Configuration("jsonb_column cannot be empty".into()).into(),
571            );
572        }
573
574        let connection_string = build_connection_string(
575            &database_user,
576            &database_password,
577            &database_host,
578            database_port,
579            &database_name,
580        );
581
582        let rt = Runtime::new().map_err(|e| JsonRegisterError::RuntimeError(e.to_string()))?;
583
584        let inner = rt.block_on(async {
585            Register::new(
586                &connection_string,
587                table_name,
588                id_column,
589                jsonb_column,
590                pool_size,
591                lru_cache_size,
592                acquire_timeout_secs,
593                idle_timeout_secs,
594                max_lifetime_secs,
595            )
596            .await
597        })?;
598
599        Ok(PyJsonRegister { inner, rt })
600    }
601
602    /// Registers a single JSON object from Python.
603    fn register_object(&self, json_obj: &Bound<'_, PyAny>) -> PyResult<i32> {
604        let value: Value = pythonize::depythonize(json_obj)
605            .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
606        self.rt
607            .block_on(self.inner.register_object(&value))
608            .map_err(Into::into)
609    }
610
611    /// Registers a batch of JSON objects from Python.
612    fn register_batch_objects(&self, json_objects: &Bound<'_, PyList>) -> PyResult<Vec<i32>> {
613        let mut values = Vec::with_capacity(json_objects.len());
614        for obj in json_objects {
615            let value: Value = pythonize::depythonize(&obj)
616                .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
617            values.push(value);
618        }
619        self.rt
620            .block_on(self.inner.register_batch_objects(&values))
621            .map_err(Into::into)
622    }
623
624    /// Returns the current size of the connection pool.
625    ///
626    /// This is the total number of connections (both idle and active) currently
627    /// in the pool. Useful for monitoring pool utilization.
628    fn pool_size(&self) -> usize {
629        self.inner.pool_size()
630    }
631
632    /// Returns the number of idle connections in the pool.
633    ///
634    /// Idle connections are available for immediate use. A low idle count
635    /// during high load may indicate the pool is undersized.
636    fn idle_connections(&self) -> usize {
637        self.inner.idle_connections()
638    }
639
640    /// Checks if the connection pool is closed.
641    ///
642    /// A closed pool cannot create new connections and will error on acquire attempts.
643    fn is_closed(&self) -> bool {
644        self.inner.is_closed()
645    }
646
647    /// Returns the number of cache hits.
648    ///
649    /// This is the total number of successful cache lookups since the instance was created.
650    fn cache_hits(&self) -> u64 {
651        self.inner.cache_hits()
652    }
653
654    /// Returns the number of cache misses.
655    ///
656    /// This is the total number of unsuccessful cache lookups since the instance was created.
657    fn cache_misses(&self) -> u64 {
658        self.inner.cache_misses()
659    }
660
661    /// Returns the cache hit rate as a percentage.
662    ///
663    /// Returns a value between 0.0 and 100.0. Returns 0.0 if no cache operations have occurred.
664    fn cache_hit_rate(&self) -> f64 {
665        self.inner.cache_hit_rate()
666    }
667
668    /// Returns the current number of items in the cache.
669    fn cache_size(&self) -> usize {
670        self.inner.cache_size()
671    }
672
673    /// Returns the maximum capacity of the cache.
674    fn cache_capacity(&self) -> usize {
675        self.inner.cache_capacity()
676    }
677
678    /// Returns the number of cache evictions.
679    fn cache_evictions(&self) -> u64 {
680        self.inner.cache_evictions()
681    }
682
683    /// Returns the number of active database connections.
684    fn active_connections(&self) -> usize {
685        self.inner.active_connections()
686    }
687
688    /// Returns the total number of database queries executed.
689    fn db_queries_total(&self) -> u64 {
690        self.inner.db_queries_total()
691    }
692
693    /// Returns the total number of database query errors.
694    fn db_query_errors(&self) -> u64 {
695        self.inner.db_query_errors()
696    }
697
698    /// Returns the number of times register_object was called.
699    fn register_single_calls(&self) -> u64 {
700        self.inner.register_single_calls()
701    }
702
703    /// Returns the number of times register_batch_objects was called.
704    fn register_batch_calls(&self) -> u64 {
705        self.inner.register_batch_calls()
706    }
707
708    /// Returns the total number of objects registered.
709    fn total_objects_registered(&self) -> u64 {
710        self.inner.total_objects_registered()
711    }
712}
713
714#[cfg(feature = "python")]
715#[pyfunction(name = "canonicalise")]
716/// Canonicalises a Python object into its JSON string representation (as bytes).
717fn py_canonicalise(json_obj: &Bound<'_, PyAny>) -> PyResult<Vec<u8>> {
718    let value: Value = pythonize::depythonize(json_obj)
719        .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
720    crate::canonicalise::canonicalise(&value)
721        .map(|s| s.into_bytes())
722        .map_err(|e| JsonRegisterError::SerdeError(e).into())
723}
724
725/// A Python module implemented in Rust.
726#[cfg(feature = "python")]
727#[pymodule]
728fn json_register(_m: &Bound<'_, PyModule>) -> PyResult<()> {
729    _m.add_class::<PyJsonRegister>()?;
730    _m.add_function(wrap_pyfunction!(py_canonicalise, _m)?)?;
731    Ok(())
732}
733
734#[cfg(test)]
735mod connection_tests {
736    use super::*;
737
738    #[test]
739    fn test_sanitize_connection_string_with_password() {
740        let input = "postgres://user:secret123@localhost:5432/mydb";
741        let expected = "postgres://user:****@localhost:5432/mydb";
742        assert_eq!(sanitize_connection_string(input), expected);
743    }
744
745    #[test]
746    fn test_sanitize_connection_string_postgresql_scheme() {
747        let input = "postgresql://admin:p@ssw0rd@db.example.com:5432/production";
748        let expected = "postgresql://admin:****@db.example.com:5432/production";
749        assert_eq!(sanitize_connection_string(input), expected);
750    }
751
752    #[test]
753    fn test_sanitize_connection_string_no_password() {
754        // No password in connection string
755        let input = "postgres://user@localhost:5432/mydb";
756        assert_eq!(sanitize_connection_string(input), input);
757    }
758
759    #[test]
760    fn test_sanitize_connection_string_with_special_chars() {
761        let input = "postgres://user:p@ss:word@localhost:5432/mydb";
762        let expected = "postgres://user:****@localhost:5432/mydb";
763        assert_eq!(sanitize_connection_string(input), expected);
764    }
765
766    #[test]
767    fn test_sanitize_connection_string_not_postgres() {
768        // Works with other schemes too
769        let input = "mysql://user:password@localhost:3306/mydb";
770        let expected = "mysql://user:****@localhost:3306/mydb";
771        assert_eq!(sanitize_connection_string(input), expected);
772    }
773
774    #[test]
775    fn test_sanitize_connection_string_malformed() {
776        // Malformed string - return as-is
777        let input = "not a connection string";
778        assert_eq!(sanitize_connection_string(input), input);
779    }
780}