1#[cfg(feature = "python")]
10use pyo3::prelude::*;
11#[cfg(feature = "python")]
12use pyo3::types::PyList;
13#[cfg(feature = "python")]
14use tokio::runtime::Runtime;
15
16use serde_json::Value;
17use std::sync::atomic::{AtomicU64, Ordering};
18#[cfg(feature = "python")]
19use std::sync::Arc;
20use tracing::{debug, instrument, trace};
21
22mod cache;
23mod canonicalise;
24mod db;
25mod errors;
26
27pub use cache::Cache;
28pub use canonicalise::canonicalise;
29pub use db::Db;
30pub use errors::JsonRegisterError;
31
32pub fn build_connection_string(
46 user: &str,
47 password: &str,
48 host: &str,
49 port: u16,
50 database: &str,
51) -> String {
52 format!(
53 "postgres://{}:{}@{}:{}/{}",
54 user, password, host, port, database
55 )
56}
57
58pub fn sanitize_connection_string(connection_string: &str) -> String {
78 if let Some(scheme_end) = connection_string.find("://") {
80 let scheme = &connection_string[..scheme_end + 3];
81 let rest = &connection_string[scheme_end + 3..];
82
83 let at_idx = if let Some(slash_idx) = rest.find('/') {
86 rest[..slash_idx].rfind('@')
88 } else {
89 rest.rfind('@')
91 };
92
93 if let Some(at_idx) = at_idx {
94 let user_pass = &rest[..at_idx];
95 let host_db = &rest[at_idx..];
96
97 if let Some(colon_idx) = user_pass.find(':') {
100 let user = &user_pass[..colon_idx];
101 return format!("{}{}:****{}", scheme, user, host_db);
102 }
103 }
104 }
105
106 connection_string.to_string()
108}
109
110pub struct Register {
115 db: Db,
116 cache: Cache,
117 register_single_calls: AtomicU64,
118 register_batch_calls: AtomicU64,
119 total_objects_registered: AtomicU64,
120}
121
122impl Register {
123 #[allow(clippy::too_many_arguments)]
142 pub async fn new(
143 connection_string: &str,
144 table_name: &str,
145 id_column: &str,
146 jsonb_column: &str,
147 pool_size: u32,
148 lru_cache_size: usize,
149 acquire_timeout_secs: Option<u64>,
150 idle_timeout_secs: Option<u64>,
151 max_lifetime_secs: Option<u64>,
152 use_tls: Option<bool>,
153 ) -> Result<Self, JsonRegisterError> {
154 let db = Db::new(
155 connection_string,
156 table_name,
157 id_column,
158 jsonb_column,
159 pool_size,
160 acquire_timeout_secs,
161 idle_timeout_secs,
162 max_lifetime_secs,
163 use_tls,
164 )
165 .await?;
166 let cache = Cache::new(lru_cache_size);
167 Ok(Self {
168 db,
169 cache,
170 register_single_calls: AtomicU64::new(0),
171 register_batch_calls: AtomicU64::new(0),
172 total_objects_registered: AtomicU64::new(0),
173 })
174 }
175
176 #[instrument(skip(self, value))]
190 pub async fn register_object(&self, value: &Value) -> Result<i32, JsonRegisterError> {
191 self.register_single_calls.fetch_add(1, Ordering::Relaxed);
192 self.total_objects_registered
193 .fetch_add(1, Ordering::Relaxed);
194
195 let canonical = canonicalise(value).map_err(JsonRegisterError::SerdeError)?;
196
197 if let Some(id) = self.cache.get(&canonical) {
198 trace!(id, "cache hit");
199 return Ok(id);
200 }
201
202 trace!("cache miss, querying database");
203 let id = self
204 .db
205 .register_object(value)
206 .await
207 .map_err(JsonRegisterError::DbError)?;
208
209 self.cache.put(canonical, id);
210 debug!(id, "object registered and cached");
211
212 Ok(id)
213 }
214
215 #[instrument(skip(self, values), fields(batch_size = values.len()))]
230 pub async fn register_batch_objects(
231 &self,
232 values: &[Value],
233 ) -> Result<Vec<i32>, JsonRegisterError> {
234 self.register_batch_calls.fetch_add(1, Ordering::Relaxed);
235 self.total_objects_registered
236 .fetch_add(values.len() as u64, Ordering::Relaxed);
237
238 let mut canonicals = Vec::with_capacity(values.len());
239 for value in values {
240 canonicals.push(canonicalise(value).map_err(JsonRegisterError::SerdeError)?);
241 }
242
243 let mut all_cached = true;
245 let mut cached_ids = Vec::with_capacity(values.len());
246 for canonical in &canonicals {
247 if let Some(id) = self.cache.get(canonical) {
248 cached_ids.push(id);
249 } else {
250 all_cached = false;
251 break;
252 }
253 }
254
255 if all_cached {
256 debug!(cached = values.len(), "all objects found in cache");
257 return Ok(cached_ids);
258 }
259
260 trace!("cache miss for some objects, querying database");
262 let ids = self
263 .db
264 .register_batch_objects(values)
265 .await
266 .map_err(JsonRegisterError::DbError)?;
267
268 for (canonical, id) in canonicals.into_iter().zip(ids.iter()) {
270 self.cache.put(canonical, *id);
271 }
272
273 debug!(registered = ids.len(), "batch registered and cached");
274 Ok(ids)
275 }
276
277 pub fn pool_size(&self) -> usize {
286 self.db.pool_size()
287 }
288
289 pub fn idle_connections(&self) -> usize {
298 self.db.idle_connections()
299 }
300
301 pub fn is_closed(&self) -> bool {
309 self.db.is_closed()
310 }
311
312 pub fn cache_hits(&self) -> u64 {
318 self.cache.hits()
319 }
320
321 pub fn cache_misses(&self) -> u64 {
327 self.cache.misses()
328 }
329
330 pub fn cache_hit_rate(&self) -> f64 {
337 self.cache.hit_rate()
338 }
339
340 pub fn cache_size(&self) -> usize {
346 self.cache.size()
347 }
348
349 pub fn cache_capacity(&self) -> usize {
355 self.cache.capacity()
356 }
357
358 pub fn cache_evictions(&self) -> u64 {
364 self.cache.evictions()
365 }
366
367 pub fn active_connections(&self) -> usize {
375 self.pool_size().saturating_sub(self.idle_connections())
376 }
377
378 pub fn db_queries_total(&self) -> u64 {
384 self.db.queries_executed()
385 }
386
387 pub fn db_query_errors(&self) -> u64 {
393 self.db.query_errors()
394 }
395
396 pub fn register_single_calls(&self) -> u64 {
402 self.register_single_calls.load(Ordering::Relaxed)
403 }
404
405 pub fn register_batch_calls(&self) -> u64 {
411 self.register_batch_calls.load(Ordering::Relaxed)
412 }
413
414 pub fn total_objects_registered(&self) -> u64 {
422 self.total_objects_registered.load(Ordering::Relaxed)
423 }
424
425 pub fn telemetry_metrics(&self) -> TelemetryMetrics {
434 TelemetryMetrics {
435 cache_hits: self.cache_hits(),
437 cache_misses: self.cache_misses(),
438 cache_hit_rate: self.cache_hit_rate(),
439 cache_size: self.cache_size(),
440 cache_capacity: self.cache_capacity(),
441 cache_evictions: self.cache_evictions(),
442 pool_size: self.pool_size(),
444 idle_connections: self.idle_connections(),
445 active_connections: self.active_connections(),
446 is_closed: self.is_closed(),
447 db_queries_total: self.db_queries_total(),
449 db_query_errors: self.db_query_errors(),
450 register_single_calls: self.register_single_calls(),
452 register_batch_calls: self.register_batch_calls(),
453 total_objects_registered: self.total_objects_registered(),
454 }
455 }
456}
457
458#[derive(Debug, Clone)]
463pub struct TelemetryMetrics {
464 pub cache_hits: u64,
466 pub cache_misses: u64,
467 pub cache_hit_rate: f64,
468 pub cache_size: usize,
469 pub cache_capacity: usize,
470 pub cache_evictions: u64,
471 pub pool_size: usize,
473 pub idle_connections: usize,
474 pub active_connections: usize,
475 pub is_closed: bool,
476 pub db_queries_total: u64,
478 pub db_query_errors: u64,
479 pub register_single_calls: u64,
481 pub register_batch_calls: u64,
482 pub total_objects_registered: u64,
483}
484
485#[cfg(feature = "python")]
486#[pyclass(name = "JsonRegister")]
487struct PyJsonRegister {
489 inner: Arc<Register>,
490 rt: Runtime,
491}
492
493#[cfg(feature = "python")]
494#[pymethods]
495impl PyJsonRegister {
496 #[new]
497 #[pyo3(signature = (
498 database_name,
499 database_host,
500 database_port,
501 database_user,
502 database_password,
503 lru_cache_size=1000,
504 table_name="json_objects",
505 id_column="id",
506 jsonb_column="json_object",
507 pool_size=10,
508 acquire_timeout_secs=None,
509 idle_timeout_secs=None,
510 max_lifetime_secs=None,
511 use_tls=None
512 ))]
513 #[allow(clippy::too_many_arguments)]
514 fn new(
523 database_name: String,
524 database_host: String,
525 database_port: u16,
526 database_user: String,
527 database_password: String,
528 lru_cache_size: usize,
529 table_name: &str,
530 id_column: &str,
531 jsonb_column: &str,
532 pool_size: u32,
533 acquire_timeout_secs: Option<u64>,
534 idle_timeout_secs: Option<u64>,
535 max_lifetime_secs: Option<u64>,
536 use_tls: Option<bool>,
537 ) -> PyResult<Self> {
538 if database_name.is_empty() {
540 return Err(
541 JsonRegisterError::Configuration("database_name cannot be empty".into()).into(),
542 );
543 }
544
545 if database_host.is_empty() {
546 return Err(
547 JsonRegisterError::Configuration("database_host cannot be empty".into()).into(),
548 );
549 }
550
551 if database_port == 0 {
552 return Err(JsonRegisterError::Configuration(
553 "database_port must be between 1 and 65535".into(),
554 )
555 .into());
556 }
557
558 if pool_size == 0 {
559 return Err(JsonRegisterError::Configuration(
560 "pool_size must be greater than 0".into(),
561 )
562 .into());
563 }
564
565 if pool_size > 10000 {
566 return Err(JsonRegisterError::Configuration(
567 "pool_size exceeds reasonable maximum of 10000".into(),
568 )
569 .into());
570 }
571
572 if table_name.is_empty() {
573 return Err(
574 JsonRegisterError::Configuration("table_name cannot be empty".into()).into(),
575 );
576 }
577
578 if id_column.is_empty() {
579 return Err(
580 JsonRegisterError::Configuration("id_column cannot be empty".into()).into(),
581 );
582 }
583
584 if jsonb_column.is_empty() {
585 return Err(
586 JsonRegisterError::Configuration("jsonb_column cannot be empty".into()).into(),
587 );
588 }
589
590 let connection_string = build_connection_string(
591 &database_user,
592 &database_password,
593 &database_host,
594 database_port,
595 &database_name,
596 );
597
598 let rt = Runtime::new().map_err(|e| JsonRegisterError::RuntimeError(e.to_string()))?;
599
600 let inner = rt.block_on(async {
601 Register::new(
602 &connection_string,
603 table_name,
604 id_column,
605 jsonb_column,
606 pool_size,
607 lru_cache_size,
608 acquire_timeout_secs,
609 idle_timeout_secs,
610 max_lifetime_secs,
611 use_tls,
612 )
613 .await
614 })?;
615
616 Ok(PyJsonRegister {
617 inner: Arc::new(inner),
618 rt,
619 })
620 }
621
622 fn register_object(&self, json_obj: &Bound<'_, PyAny>) -> PyResult<i32> {
624 let value: Value = pythonize::depythonize(json_obj)
625 .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
626 self.rt
627 .block_on(self.inner.register_object(&value))
628 .map_err(Into::into)
629 }
630
631 fn register_batch_objects(&self, json_objects: &Bound<'_, PyList>) -> PyResult<Vec<i32>> {
633 let mut values = Vec::with_capacity(json_objects.len());
634 for obj in json_objects {
635 let value: Value = pythonize::depythonize(&obj)
636 .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
637 values.push(value);
638 }
639 self.rt
640 .block_on(self.inner.register_batch_objects(&values))
641 .map_err(Into::into)
642 }
643
644 fn register_object_async<'py>(
650 &self,
651 py: Python<'py>,
652 json_obj: &Bound<'_, PyAny>,
653 ) -> PyResult<Bound<'py, PyAny>> {
654 let value: Value = pythonize::depythonize(json_obj)
655 .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
656 let inner = Arc::clone(&self.inner);
657 pyo3_async_runtimes::tokio::future_into_py(py, async move {
658 inner.register_object(&value).await.map_err(PyErr::from)
659 })
660 }
661
662 fn register_batch_objects_async<'py>(
668 &self,
669 py: Python<'py>,
670 json_objects: &Bound<'_, PyList>,
671 ) -> PyResult<Bound<'py, PyAny>> {
672 let mut values = Vec::with_capacity(json_objects.len());
673 for obj in json_objects {
674 let value: Value = pythonize::depythonize(&obj)
675 .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
676 values.push(value);
677 }
678 let inner = Arc::clone(&self.inner);
679 pyo3_async_runtimes::tokio::future_into_py(py, async move {
680 inner
681 .register_batch_objects(&values)
682 .await
683 .map_err(PyErr::from)
684 })
685 }
686
687 fn pool_size(&self) -> usize {
692 self.inner.pool_size()
693 }
694
695 fn idle_connections(&self) -> usize {
700 self.inner.idle_connections()
701 }
702
703 fn is_closed(&self) -> bool {
707 self.inner.is_closed()
708 }
709
710 fn cache_hits(&self) -> u64 {
714 self.inner.cache_hits()
715 }
716
717 fn cache_misses(&self) -> u64 {
721 self.inner.cache_misses()
722 }
723
724 fn cache_hit_rate(&self) -> f64 {
728 self.inner.cache_hit_rate()
729 }
730
731 fn cache_size(&self) -> usize {
733 self.inner.cache_size()
734 }
735
736 fn cache_capacity(&self) -> usize {
738 self.inner.cache_capacity()
739 }
740
741 fn cache_evictions(&self) -> u64 {
743 self.inner.cache_evictions()
744 }
745
746 fn active_connections(&self) -> usize {
748 self.inner.active_connections()
749 }
750
751 fn db_queries_total(&self) -> u64 {
753 self.inner.db_queries_total()
754 }
755
756 fn db_query_errors(&self) -> u64 {
758 self.inner.db_query_errors()
759 }
760
761 fn register_single_calls(&self) -> u64 {
763 self.inner.register_single_calls()
764 }
765
766 fn register_batch_calls(&self) -> u64 {
768 self.inner.register_batch_calls()
769 }
770
771 fn total_objects_registered(&self) -> u64 {
773 self.inner.total_objects_registered()
774 }
775}
776
777#[cfg(feature = "python")]
778#[pyfunction(name = "canonicalise")]
779fn py_canonicalise(json_obj: &Bound<'_, PyAny>) -> PyResult<Vec<u8>> {
781 let value: Value = pythonize::depythonize(json_obj)
782 .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
783 crate::canonicalise::canonicalise(&value)
784 .map(|s| s.into_bytes())
785 .map_err(|e| JsonRegisterError::SerdeError(e).into())
786}
787
788#[cfg(feature = "python")]
790#[pymodule]
791fn json_register(_m: &Bound<'_, PyModule>) -> PyResult<()> {
792 tracing_log::LogTracer::init().ok();
796 let _ = pyo3_log::try_init();
797
798 _m.add_class::<PyJsonRegister>()?;
799 _m.add_function(wrap_pyfunction!(py_canonicalise, _m)?)?;
800 _m.add("__version__", env!("CARGO_PKG_VERSION"))?;
801 Ok(())
802}
803
804#[cfg(test)]
805mod connection_tests {
806 use super::*;
807
808 #[test]
809 fn test_sanitize_connection_string_with_password() {
810 let input = "postgres://user:secret123@localhost:5432/mydb";
811 let expected = "postgres://user:****@localhost:5432/mydb";
812 assert_eq!(sanitize_connection_string(input), expected);
813 }
814
815 #[test]
816 fn test_sanitize_connection_string_postgresql_scheme() {
817 let input = "postgresql://admin:p@ssw0rd@db.example.com:5432/production";
818 let expected = "postgresql://admin:****@db.example.com:5432/production";
819 assert_eq!(sanitize_connection_string(input), expected);
820 }
821
822 #[test]
823 fn test_sanitize_connection_string_no_password() {
824 let input = "postgres://user@localhost:5432/mydb";
826 assert_eq!(sanitize_connection_string(input), input);
827 }
828
829 #[test]
830 fn test_sanitize_connection_string_with_special_chars() {
831 let input = "postgres://user:p@ss:word@localhost:5432/mydb";
832 let expected = "postgres://user:****@localhost:5432/mydb";
833 assert_eq!(sanitize_connection_string(input), expected);
834 }
835
836 #[test]
837 fn test_sanitize_connection_string_not_postgres() {
838 let input = "mysql://user:password@localhost:3306/mydb";
840 let expected = "mysql://user:****@localhost:3306/mydb";
841 assert_eq!(sanitize_connection_string(input), expected);
842 }
843
844 #[test]
845 fn test_sanitize_connection_string_malformed() {
846 let input = "not a connection string";
848 assert_eq!(sanitize_connection_string(input), input);
849 }
850}