json_register/
lib.rs

1//! # JSON Register
2//!
3//! `json-register` is a library for registering JSON objects into a PostgreSQL database
4//! with canonicalisation and caching. It ensures that semantically equivalent JSON objects
5//! are stored only once and assigned a unique identifier.
6//!
7//! This library provides both a Rust API and Python bindings.
8
9#[cfg(feature = "python")]
10use pyo3::prelude::*;
11#[cfg(feature = "python")]
12use pyo3::types::PyList;
13#[cfg(feature = "python")]
14use tokio::runtime::Runtime;
15
16use serde_json::Value;
17
18mod cache;
19mod canonicalise;
20mod db;
21mod errors;
22
23pub use cache::Cache;
24pub use canonicalise::canonicalise;
25pub use db::Db;
26pub use errors::JsonRegisterError;
27
28/// Builds a PostgreSQL connection string from its components.
29///
30/// # Arguments
31///
32/// * `user` - Database user name
33/// * `password` - Database password
34/// * `host` - Database host (e.g., "localhost")
35/// * `port` - Database port (e.g., 5432)
36/// * `database` - Database name
37///
38/// # Returns
39///
40/// A formatted PostgreSQL connection string
41pub fn build_connection_string(
42    user: &str,
43    password: &str,
44    host: &str,
45    port: u16,
46    database: &str,
47) -> String {
48    format!(
49        "postgres://{}:{}@{}:{}/{}",
50        user, password, host, port, database
51    )
52}
53
54/// Sanitizes a connection string by replacing the password with asterisks.
55///
56/// This prevents passwords from leaking in error messages, logs, or stack traces.
57///
58/// # Arguments
59///
60/// * `connection_string` - The connection string to sanitize
61///
62/// # Returns
63///
64/// A sanitized connection string with the password replaced by "****"
65///
66/// # Example
67///
68/// ```
69/// use json_register::sanitize_connection_string;
70/// let sanitized = sanitize_connection_string("postgres://user:secret@localhost:5432/db");
71/// assert_eq!(sanitized, "postgres://user:****@localhost:5432/db");
72/// ```
73pub fn sanitize_connection_string(connection_string: &str) -> String {
74    // Handle postgres:// or postgresql:// schemes
75    if let Some(scheme_end) = connection_string.find("://") {
76        let scheme = &connection_string[..scheme_end + 3];
77        let rest = &connection_string[scheme_end + 3..];
78
79        // Find the LAST @ symbol before any / (to handle @ in passwords)
80        // The @ separates user:password from host:port/db
81        let at_idx = if let Some(slash_idx) = rest.find('/') {
82            // Find last @ before the slash
83            rest[..slash_idx].rfind('@')
84        } else {
85            // No slash, find last @ in entire string
86            rest.rfind('@')
87        };
88
89        if let Some(at_idx) = at_idx {
90            let user_pass = &rest[..at_idx];
91            let host_db = &rest[at_idx..];
92
93            // Find FIRST : separator between user and password
94            // (username shouldn't have :, but password might)
95            if let Some(colon_idx) = user_pass.find(':') {
96                let user = &user_pass[..colon_idx];
97                return format!("{}{}:****{}", scheme, user, host_db);
98            }
99        }
100    }
101
102    // If parsing fails, return as-is (no password to hide)
103    connection_string.to_string()
104}
105
106/// The main registry structure that coordinates database interactions and caching.
107///
108/// This struct maintains a connection pool to the PostgreSQL database and an
109/// in-memory LRU cache to speed up lookups of frequently accessed JSON objects.
110pub struct Register {
111    db: Db,
112    cache: Cache,
113}
114
115impl Register {
116    /// Creates a new `Register` instance.
117    ///
118    /// # Arguments
119    ///
120    /// * `connection_string` - The PostgreSQL connection string.
121    /// * `table_name` - The name of the table where JSON objects are stored.
122    /// * `id_column` - The name of the column storing the unique ID.
123    /// * `jsonb_column` - The name of the column storing the JSONB data.
124    /// * `pool_size` - The maximum number of connections in the database pool.
125    /// * `lru_cache_size` - The capacity of the in-memory LRU cache.
126    /// * `acquire_timeout_secs` - Optional timeout for acquiring connections (default: 5s).
127    /// * `idle_timeout_secs` - Optional timeout for idle connections (default: 600s).
128    /// * `max_lifetime_secs` - Optional maximum lifetime for connections (default: 1800s).
129    ///
130    /// # Returns
131    ///
132    /// A `Result` containing the new `Register` instance or a `JsonRegisterError`.
133    #[allow(clippy::too_many_arguments)]
134    pub async fn new(
135        connection_string: &str,
136        table_name: &str,
137        id_column: &str,
138        jsonb_column: &str,
139        pool_size: u32,
140        lru_cache_size: usize,
141        acquire_timeout_secs: Option<u64>,
142        idle_timeout_secs: Option<u64>,
143        max_lifetime_secs: Option<u64>,
144    ) -> Result<Self, JsonRegisterError> {
145        let db = Db::new(
146            connection_string,
147            table_name,
148            id_column,
149            jsonb_column,
150            pool_size,
151            acquire_timeout_secs,
152            idle_timeout_secs,
153            max_lifetime_secs,
154        )
155        .await
156        .map_err(JsonRegisterError::DbError)?;
157        let cache = Cache::new(lru_cache_size);
158        Ok(Self { db, cache })
159    }
160
161    /// Registers a single JSON object.
162    ///
163    /// This method canonicalises the input JSON, checks the cache, and if necessary,
164    /// inserts the object into the database. It returns the unique ID associated
165    /// with the JSON object.
166    ///
167    /// # Arguments
168    ///
169    /// * `value` - The JSON value to register.
170    ///
171    /// # Returns
172    ///
173    /// A `Result` containing the unique ID (i32) or a `JsonRegisterError`.
174    pub async fn register_object(&self, value: &Value) -> Result<i32, JsonRegisterError> {
175        let canonical = canonicalise(value).map_err(JsonRegisterError::SerdeError)?;
176
177        if let Some(id) = self.cache.get(&canonical) {
178            return Ok(id);
179        }
180
181        let id = self
182            .db
183            .register_object(&canonical)
184            .await
185            .map_err(JsonRegisterError::DbError)?;
186
187        self.cache.put(canonical, id);
188
189        Ok(id)
190    }
191
192    /// Registers a batch of JSON objects.
193    ///
194    /// This method processes multiple JSON objects efficiently. It first checks the
195    /// cache for all items. If any are missing, it performs a batch insert/select
196    /// operation in the database. The order of the returned IDs corresponds to the
197    /// order of the input values.
198    ///
199    /// # Arguments
200    ///
201    /// * `values` - A slice of JSON values to register.
202    ///
203    /// # Returns
204    ///
205    /// A `Result` containing a vector of unique IDs or a `JsonRegisterError`.
206    pub async fn register_batch_objects(
207        &self,
208        values: &[Value],
209    ) -> Result<Vec<i32>, JsonRegisterError> {
210        let mut canonicals = Vec::with_capacity(values.len());
211        for value in values {
212            canonicals.push(canonicalise(value).map_err(JsonRegisterError::SerdeError)?);
213        }
214
215        // Check cache for existing entries
216        let mut all_cached = true;
217        let mut cached_ids = Vec::with_capacity(values.len());
218        for canonical in &canonicals {
219            if let Some(id) = self.cache.get(canonical) {
220                cached_ids.push(id);
221            } else {
222                all_cached = false;
223                break;
224            }
225        }
226
227        if all_cached {
228            return Ok(cached_ids);
229        }
230
231        // If not all items are in the cache, query the database
232        let ids = self
233            .db
234            .register_batch_objects(&canonicals)
235            .await
236            .map_err(JsonRegisterError::DbError)?;
237
238        // Update the cache with the newly retrieved IDs
239        for (canonical, id) in canonicals.into_iter().zip(ids.iter()) {
240            self.cache.put(canonical, *id);
241        }
242
243        Ok(ids)
244    }
245
246    /// Returns the current size of the connection pool.
247    ///
248    /// This is the total number of connections (both idle and active) currently
249    /// in the pool. Useful for monitoring pool utilization.
250    ///
251    /// # Returns
252    ///
253    /// The number of connections in the pool.
254    pub fn pool_size(&self) -> usize {
255        self.db.pool_size()
256    }
257
258    /// Returns the number of idle connections in the pool.
259    ///
260    /// Idle connections are available for immediate use. A low idle count
261    /// during high load may indicate the pool is undersized.
262    ///
263    /// # Returns
264    ///
265    /// The number of idle connections.
266    pub fn idle_connections(&self) -> usize {
267        self.db.idle_connections()
268    }
269
270    /// Checks if the connection pool is closed.
271    ///
272    /// A closed pool cannot create new connections and will error on acquire attempts.
273    ///
274    /// # Returns
275    ///
276    /// `true` if the pool is closed, `false` otherwise.
277    pub fn is_closed(&self) -> bool {
278        self.db.is_closed()
279    }
280
281    /// Returns the number of cache hits.
282    ///
283    /// # Returns
284    ///
285    /// The total number of successful cache lookups.
286    pub fn cache_hits(&self) -> u64 {
287        self.cache.hits()
288    }
289
290    /// Returns the number of cache misses.
291    ///
292    /// # Returns
293    ///
294    /// The total number of unsuccessful cache lookups.
295    pub fn cache_misses(&self) -> u64 {
296        self.cache.misses()
297    }
298
299    /// Returns the cache hit rate as a percentage.
300    ///
301    /// # Returns
302    ///
303    /// The hit rate as a float between 0.0 and 100.0.
304    /// Returns 0.0 if no cache operations have occurred.
305    pub fn cache_hit_rate(&self) -> f64 {
306        self.cache.hit_rate()
307    }
308}
309
310#[cfg(feature = "python")]
311#[pyclass(name = "JsonRegister")]
312/// Python wrapper for the `Register` struct.
313struct PyJsonRegister {
314    inner: Register,
315    rt: Runtime,
316}
317
318#[cfg(feature = "python")]
319#[pymethods]
320impl PyJsonRegister {
321    #[new]
322    #[pyo3(signature = (
323        database_name,
324        database_host,
325        database_port,
326        database_user,
327        database_password,
328        lru_cache_size=1000,
329        table_name="json_objects",
330        id_column="id",
331        jsonb_column="json_object",
332        pool_size=10,
333        acquire_timeout_secs=None,
334        idle_timeout_secs=None,
335        max_lifetime_secs=None
336    ))]
337    #[allow(clippy::too_many_arguments)]
338    /// Initializes a new `JsonRegister` instance from Python.
339    ///
340    /// # Optional Timeout Parameters
341    ///
342    /// * `acquire_timeout_secs` - Timeout for acquiring a connection from pool (default: 5)
343    /// * `idle_timeout_secs` - Timeout for idle connections before closure (default: 600)
344    /// * `max_lifetime_secs` - Maximum lifetime of connections (default: 1800)
345    fn new(
346        database_name: String,
347        database_host: String,
348        database_port: u16,
349        database_user: String,
350        database_password: String,
351        lru_cache_size: usize,
352        table_name: &str,
353        id_column: &str,
354        jsonb_column: &str,
355        pool_size: u32,
356        acquire_timeout_secs: Option<u64>,
357        idle_timeout_secs: Option<u64>,
358        max_lifetime_secs: Option<u64>,
359    ) -> PyResult<Self> {
360        // Validate configuration parameters
361        if database_name.is_empty() {
362            return Err(
363                JsonRegisterError::Configuration("database_name cannot be empty".into()).into(),
364            );
365        }
366
367        if database_host.is_empty() {
368            return Err(
369                JsonRegisterError::Configuration("database_host cannot be empty".into()).into(),
370            );
371        }
372
373        if database_port == 0 {
374            return Err(JsonRegisterError::Configuration(
375                "database_port must be between 1 and 65535".into(),
376            )
377            .into());
378        }
379
380        if pool_size == 0 {
381            return Err(JsonRegisterError::Configuration(
382                "pool_size must be greater than 0".into(),
383            )
384            .into());
385        }
386
387        if pool_size > 10000 {
388            return Err(JsonRegisterError::Configuration(
389                "pool_size exceeds reasonable maximum of 10000".into(),
390            )
391            .into());
392        }
393
394        if table_name.is_empty() {
395            return Err(
396                JsonRegisterError::Configuration("table_name cannot be empty".into()).into(),
397            );
398        }
399
400        if id_column.is_empty() {
401            return Err(
402                JsonRegisterError::Configuration("id_column cannot be empty".into()).into(),
403            );
404        }
405
406        if jsonb_column.is_empty() {
407            return Err(
408                JsonRegisterError::Configuration("jsonb_column cannot be empty".into()).into(),
409            );
410        }
411
412        let connection_string = build_connection_string(
413            &database_user,
414            &database_password,
415            &database_host,
416            database_port,
417            &database_name,
418        );
419
420        let rt = Runtime::new().map_err(|e| JsonRegisterError::RuntimeError(e.to_string()))?;
421
422        let inner = rt.block_on(async {
423            Register::new(
424                &connection_string,
425                table_name,
426                id_column,
427                jsonb_column,
428                pool_size,
429                lru_cache_size,
430                acquire_timeout_secs,
431                idle_timeout_secs,
432                max_lifetime_secs,
433            )
434            .await
435        })?;
436
437        Ok(PyJsonRegister { inner, rt })
438    }
439
440    /// Registers a single JSON object from Python.
441    fn register_object(&self, json_obj: &Bound<'_, PyAny>) -> PyResult<i32> {
442        let value: Value = pythonize::depythonize(json_obj)
443            .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
444        self.rt
445            .block_on(self.inner.register_object(&value))
446            .map_err(Into::into)
447    }
448
449    /// Registers a batch of JSON objects from Python.
450    fn register_batch_objects(&self, json_objects: &Bound<'_, PyList>) -> PyResult<Vec<i32>> {
451        let mut values = Vec::with_capacity(json_objects.len());
452        for obj in json_objects {
453            let value: Value = pythonize::depythonize(&obj)
454                .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
455            values.push(value);
456        }
457        self.rt
458            .block_on(self.inner.register_batch_objects(&values))
459            .map_err(Into::into)
460    }
461
462    /// Returns the current size of the connection pool.
463    ///
464    /// This is the total number of connections (both idle and active) currently
465    /// in the pool. Useful for monitoring pool utilization.
466    fn pool_size(&self) -> usize {
467        self.inner.pool_size()
468    }
469
470    /// Returns the number of idle connections in the pool.
471    ///
472    /// Idle connections are available for immediate use. A low idle count
473    /// during high load may indicate the pool is undersized.
474    fn idle_connections(&self) -> usize {
475        self.inner.idle_connections()
476    }
477
478    /// Checks if the connection pool is closed.
479    ///
480    /// A closed pool cannot create new connections and will error on acquire attempts.
481    fn is_closed(&self) -> bool {
482        self.inner.is_closed()
483    }
484
485    /// Returns the number of cache hits.
486    ///
487    /// This is the total number of successful cache lookups since the instance was created.
488    fn cache_hits(&self) -> u64 {
489        self.inner.cache_hits()
490    }
491
492    /// Returns the number of cache misses.
493    ///
494    /// This is the total number of unsuccessful cache lookups since the instance was created.
495    fn cache_misses(&self) -> u64 {
496        self.inner.cache_misses()
497    }
498
499    /// Returns the cache hit rate as a percentage.
500    ///
501    /// Returns a value between 0.0 and 100.0. Returns 0.0 if no cache operations have occurred.
502    fn cache_hit_rate(&self) -> f64 {
503        self.inner.cache_hit_rate()
504    }
505}
506
507#[cfg(feature = "python")]
508#[pyfunction(name = "canonicalise")]
509/// Canonicalises a Python object into its JSON string representation (as bytes).
510fn py_canonicalise(json_obj: &Bound<'_, PyAny>) -> PyResult<Vec<u8>> {
511    let value: Value = pythonize::depythonize(json_obj)
512        .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
513    crate::canonicalise::canonicalise(&value)
514        .map(|s| s.into_bytes())
515        .map_err(|e| JsonRegisterError::SerdeError(e).into())
516}
517
518/// A Python module implemented in Rust.
519#[cfg(feature = "python")]
520#[pymodule]
521fn json_register(_m: &Bound<'_, PyModule>) -> PyResult<()> {
522    _m.add_class::<PyJsonRegister>()?;
523    _m.add_function(wrap_pyfunction!(py_canonicalise, _m)?)?;
524    Ok(())
525}
526
527#[cfg(test)]
528mod connection_tests {
529    use super::*;
530
531    #[test]
532    fn test_sanitize_connection_string_with_password() {
533        let input = "postgres://user:secret123@localhost:5432/mydb";
534        let expected = "postgres://user:****@localhost:5432/mydb";
535        assert_eq!(sanitize_connection_string(input), expected);
536    }
537
538    #[test]
539    fn test_sanitize_connection_string_postgresql_scheme() {
540        let input = "postgresql://admin:p@ssw0rd@db.example.com:5432/production";
541        let expected = "postgresql://admin:****@db.example.com:5432/production";
542        assert_eq!(sanitize_connection_string(input), expected);
543    }
544
545    #[test]
546    fn test_sanitize_connection_string_no_password() {
547        // No password in connection string
548        let input = "postgres://user@localhost:5432/mydb";
549        assert_eq!(sanitize_connection_string(input), input);
550    }
551
552    #[test]
553    fn test_sanitize_connection_string_with_special_chars() {
554        let input = "postgres://user:p@ss:word@localhost:5432/mydb";
555        let expected = "postgres://user:****@localhost:5432/mydb";
556        assert_eq!(sanitize_connection_string(input), expected);
557    }
558
559    #[test]
560    fn test_sanitize_connection_string_not_postgres() {
561        // Works with other schemes too
562        let input = "mysql://user:password@localhost:3306/mydb";
563        let expected = "mysql://user:****@localhost:3306/mydb";
564        assert_eq!(sanitize_connection_string(input), expected);
565    }
566
567    #[test]
568    fn test_sanitize_connection_string_malformed() {
569        // Malformed string - return as-is
570        let input = "not a connection string";
571        assert_eq!(sanitize_connection_string(input), input);
572    }
573}