1#[cfg(feature = "python")]
10use pyo3::prelude::*;
11#[cfg(feature = "python")]
12use pyo3::types::PyList;
13#[cfg(feature = "python")]
14use tokio::runtime::Runtime;
15
16use serde_json::Value;
17use std::sync::atomic::{AtomicU64, Ordering};
18
19mod cache;
20mod canonicalise;
21mod db;
22mod errors;
23
24pub use cache::Cache;
25pub use canonicalise::canonicalise;
26pub use db::Db;
27pub use errors::JsonRegisterError;
28
29pub fn build_connection_string(
43 user: &str,
44 password: &str,
45 host: &str,
46 port: u16,
47 database: &str,
48) -> String {
49 format!(
50 "postgres://{}:{}@{}:{}/{}",
51 user, password, host, port, database
52 )
53}
54
55pub fn sanitize_connection_string(connection_string: &str) -> String {
75 if let Some(scheme_end) = connection_string.find("://") {
77 let scheme = &connection_string[..scheme_end + 3];
78 let rest = &connection_string[scheme_end + 3..];
79
80 let at_idx = if let Some(slash_idx) = rest.find('/') {
83 rest[..slash_idx].rfind('@')
85 } else {
86 rest.rfind('@')
88 };
89
90 if let Some(at_idx) = at_idx {
91 let user_pass = &rest[..at_idx];
92 let host_db = &rest[at_idx..];
93
94 if let Some(colon_idx) = user_pass.find(':') {
97 let user = &user_pass[..colon_idx];
98 return format!("{}{}:****{}", scheme, user, host_db);
99 }
100 }
101 }
102
103 connection_string.to_string()
105}
106
107pub struct Register {
112 db: Db,
113 cache: Cache,
114 register_single_calls: AtomicU64,
115 register_batch_calls: AtomicU64,
116 total_objects_registered: AtomicU64,
117}
118
119impl Register {
120 #[allow(clippy::too_many_arguments)]
138 pub async fn new(
139 connection_string: &str,
140 table_name: &str,
141 id_column: &str,
142 jsonb_column: &str,
143 pool_size: u32,
144 lru_cache_size: usize,
145 acquire_timeout_secs: Option<u64>,
146 idle_timeout_secs: Option<u64>,
147 max_lifetime_secs: Option<u64>,
148 ) -> Result<Self, JsonRegisterError> {
149 let db = Db::new(
150 connection_string,
151 table_name,
152 id_column,
153 jsonb_column,
154 pool_size,
155 acquire_timeout_secs,
156 idle_timeout_secs,
157 max_lifetime_secs,
158 )
159 .await
160 .map_err(JsonRegisterError::DbError)?;
161 let cache = Cache::new(lru_cache_size);
162 Ok(Self {
163 db,
164 cache,
165 register_single_calls: AtomicU64::new(0),
166 register_batch_calls: AtomicU64::new(0),
167 total_objects_registered: AtomicU64::new(0),
168 })
169 }
170
171 pub async fn register_object(&self, value: &Value) -> Result<i32, JsonRegisterError> {
185 self.register_single_calls.fetch_add(1, Ordering::Relaxed);
186 self.total_objects_registered
187 .fetch_add(1, Ordering::Relaxed);
188
189 let canonical = canonicalise(value).map_err(JsonRegisterError::SerdeError)?;
190
191 if let Some(id) = self.cache.get(&canonical) {
192 return Ok(id);
193 }
194
195 let id = self
196 .db
197 .register_object(&canonical)
198 .await
199 .map_err(JsonRegisterError::DbError)?;
200
201 self.cache.put(canonical, id);
202
203 Ok(id)
204 }
205
206 pub async fn register_batch_objects(
221 &self,
222 values: &[Value],
223 ) -> Result<Vec<i32>, JsonRegisterError> {
224 self.register_batch_calls.fetch_add(1, Ordering::Relaxed);
225 self.total_objects_registered
226 .fetch_add(values.len() as u64, Ordering::Relaxed);
227
228 let mut canonicals = Vec::with_capacity(values.len());
229 for value in values {
230 canonicals.push(canonicalise(value).map_err(JsonRegisterError::SerdeError)?);
231 }
232
233 let mut all_cached = true;
235 let mut cached_ids = Vec::with_capacity(values.len());
236 for canonical in &canonicals {
237 if let Some(id) = self.cache.get(canonical) {
238 cached_ids.push(id);
239 } else {
240 all_cached = false;
241 break;
242 }
243 }
244
245 if all_cached {
246 return Ok(cached_ids);
247 }
248
249 let ids = self
251 .db
252 .register_batch_objects(&canonicals)
253 .await
254 .map_err(JsonRegisterError::DbError)?;
255
256 for (canonical, id) in canonicals.into_iter().zip(ids.iter()) {
258 self.cache.put(canonical, *id);
259 }
260
261 Ok(ids)
262 }
263
264 pub fn pool_size(&self) -> usize {
273 self.db.pool_size()
274 }
275
276 pub fn idle_connections(&self) -> usize {
285 self.db.idle_connections()
286 }
287
288 pub fn is_closed(&self) -> bool {
296 self.db.is_closed()
297 }
298
299 pub fn cache_hits(&self) -> u64 {
305 self.cache.hits()
306 }
307
308 pub fn cache_misses(&self) -> u64 {
314 self.cache.misses()
315 }
316
317 pub fn cache_hit_rate(&self) -> f64 {
324 self.cache.hit_rate()
325 }
326
327 pub fn cache_size(&self) -> usize {
333 self.cache.size()
334 }
335
336 pub fn cache_capacity(&self) -> usize {
342 self.cache.capacity()
343 }
344
345 pub fn cache_evictions(&self) -> u64 {
351 self.cache.evictions()
352 }
353
354 pub fn active_connections(&self) -> usize {
362 self.pool_size().saturating_sub(self.idle_connections())
363 }
364
365 pub fn db_queries_total(&self) -> u64 {
371 self.db.queries_executed()
372 }
373
374 pub fn db_query_errors(&self) -> u64 {
380 self.db.query_errors()
381 }
382
383 pub fn register_single_calls(&self) -> u64 {
389 self.register_single_calls.load(Ordering::Relaxed)
390 }
391
392 pub fn register_batch_calls(&self) -> u64 {
398 self.register_batch_calls.load(Ordering::Relaxed)
399 }
400
401 pub fn total_objects_registered(&self) -> u64 {
409 self.total_objects_registered.load(Ordering::Relaxed)
410 }
411
412 pub fn telemetry_metrics(&self) -> TelemetryMetrics {
421 TelemetryMetrics {
422 cache_hits: self.cache_hits(),
424 cache_misses: self.cache_misses(),
425 cache_hit_rate: self.cache_hit_rate(),
426 cache_size: self.cache_size(),
427 cache_capacity: self.cache_capacity(),
428 cache_evictions: self.cache_evictions(),
429 pool_size: self.pool_size(),
431 idle_connections: self.idle_connections(),
432 active_connections: self.active_connections(),
433 is_closed: self.is_closed(),
434 db_queries_total: self.db_queries_total(),
436 db_query_errors: self.db_query_errors(),
437 register_single_calls: self.register_single_calls(),
439 register_batch_calls: self.register_batch_calls(),
440 total_objects_registered: self.total_objects_registered(),
441 }
442 }
443}
444
445#[derive(Debug, Clone)]
450pub struct TelemetryMetrics {
451 pub cache_hits: u64,
453 pub cache_misses: u64,
454 pub cache_hit_rate: f64,
455 pub cache_size: usize,
456 pub cache_capacity: usize,
457 pub cache_evictions: u64,
458 pub pool_size: usize,
460 pub idle_connections: usize,
461 pub active_connections: usize,
462 pub is_closed: bool,
463 pub db_queries_total: u64,
465 pub db_query_errors: u64,
466 pub register_single_calls: u64,
468 pub register_batch_calls: u64,
469 pub total_objects_registered: u64,
470}
471
472#[cfg(feature = "python")]
473#[pyclass(name = "JsonRegister")]
474struct PyJsonRegister {
476 inner: Register,
477 rt: Runtime,
478}
479
480#[cfg(feature = "python")]
481#[pymethods]
482impl PyJsonRegister {
483 #[new]
484 #[pyo3(signature = (
485 database_name,
486 database_host,
487 database_port,
488 database_user,
489 database_password,
490 lru_cache_size=1000,
491 table_name="json_objects",
492 id_column="id",
493 jsonb_column="json_object",
494 pool_size=10,
495 acquire_timeout_secs=None,
496 idle_timeout_secs=None,
497 max_lifetime_secs=None
498 ))]
499 #[allow(clippy::too_many_arguments)]
500 fn new(
508 database_name: String,
509 database_host: String,
510 database_port: u16,
511 database_user: String,
512 database_password: String,
513 lru_cache_size: usize,
514 table_name: &str,
515 id_column: &str,
516 jsonb_column: &str,
517 pool_size: u32,
518 acquire_timeout_secs: Option<u64>,
519 idle_timeout_secs: Option<u64>,
520 max_lifetime_secs: Option<u64>,
521 ) -> PyResult<Self> {
522 if database_name.is_empty() {
524 return Err(
525 JsonRegisterError::Configuration("database_name cannot be empty".into()).into(),
526 );
527 }
528
529 if database_host.is_empty() {
530 return Err(
531 JsonRegisterError::Configuration("database_host cannot be empty".into()).into(),
532 );
533 }
534
535 if database_port == 0 {
536 return Err(JsonRegisterError::Configuration(
537 "database_port must be between 1 and 65535".into(),
538 )
539 .into());
540 }
541
542 if pool_size == 0 {
543 return Err(JsonRegisterError::Configuration(
544 "pool_size must be greater than 0".into(),
545 )
546 .into());
547 }
548
549 if pool_size > 10000 {
550 return Err(JsonRegisterError::Configuration(
551 "pool_size exceeds reasonable maximum of 10000".into(),
552 )
553 .into());
554 }
555
556 if table_name.is_empty() {
557 return Err(
558 JsonRegisterError::Configuration("table_name cannot be empty".into()).into(),
559 );
560 }
561
562 if id_column.is_empty() {
563 return Err(
564 JsonRegisterError::Configuration("id_column cannot be empty".into()).into(),
565 );
566 }
567
568 if jsonb_column.is_empty() {
569 return Err(
570 JsonRegisterError::Configuration("jsonb_column cannot be empty".into()).into(),
571 );
572 }
573
574 let connection_string = build_connection_string(
575 &database_user,
576 &database_password,
577 &database_host,
578 database_port,
579 &database_name,
580 );
581
582 let rt = Runtime::new().map_err(|e| JsonRegisterError::RuntimeError(e.to_string()))?;
583
584 let inner = rt.block_on(async {
585 Register::new(
586 &connection_string,
587 table_name,
588 id_column,
589 jsonb_column,
590 pool_size,
591 lru_cache_size,
592 acquire_timeout_secs,
593 idle_timeout_secs,
594 max_lifetime_secs,
595 )
596 .await
597 })?;
598
599 Ok(PyJsonRegister { inner, rt })
600 }
601
602 fn register_object(&self, json_obj: &Bound<'_, PyAny>) -> PyResult<i32> {
604 let value: Value = pythonize::depythonize(json_obj)
605 .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
606 self.rt
607 .block_on(self.inner.register_object(&value))
608 .map_err(Into::into)
609 }
610
611 fn register_batch_objects(&self, json_objects: &Bound<'_, PyList>) -> PyResult<Vec<i32>> {
613 let mut values = Vec::with_capacity(json_objects.len());
614 for obj in json_objects {
615 let value: Value = pythonize::depythonize(&obj)
616 .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
617 values.push(value);
618 }
619 self.rt
620 .block_on(self.inner.register_batch_objects(&values))
621 .map_err(Into::into)
622 }
623
624 fn pool_size(&self) -> usize {
629 self.inner.pool_size()
630 }
631
632 fn idle_connections(&self) -> usize {
637 self.inner.idle_connections()
638 }
639
640 fn is_closed(&self) -> bool {
644 self.inner.is_closed()
645 }
646
647 fn cache_hits(&self) -> u64 {
651 self.inner.cache_hits()
652 }
653
654 fn cache_misses(&self) -> u64 {
658 self.inner.cache_misses()
659 }
660
661 fn cache_hit_rate(&self) -> f64 {
665 self.inner.cache_hit_rate()
666 }
667
668 fn cache_size(&self) -> usize {
670 self.inner.cache_size()
671 }
672
673 fn cache_capacity(&self) -> usize {
675 self.inner.cache_capacity()
676 }
677
678 fn cache_evictions(&self) -> u64 {
680 self.inner.cache_evictions()
681 }
682
683 fn active_connections(&self) -> usize {
685 self.inner.active_connections()
686 }
687
688 fn db_queries_total(&self) -> u64 {
690 self.inner.db_queries_total()
691 }
692
693 fn db_query_errors(&self) -> u64 {
695 self.inner.db_query_errors()
696 }
697
698 fn register_single_calls(&self) -> u64 {
700 self.inner.register_single_calls()
701 }
702
703 fn register_batch_calls(&self) -> u64 {
705 self.inner.register_batch_calls()
706 }
707
708 fn total_objects_registered(&self) -> u64 {
710 self.inner.total_objects_registered()
711 }
712}
713
714#[cfg(feature = "python")]
715#[pyfunction(name = "canonicalise")]
716fn py_canonicalise(json_obj: &Bound<'_, PyAny>) -> PyResult<Vec<u8>> {
718 let value: Value = pythonize::depythonize(json_obj)
719 .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
720 crate::canonicalise::canonicalise(&value)
721 .map(|s| s.into_bytes())
722 .map_err(|e| JsonRegisterError::SerdeError(e).into())
723}
724
725#[cfg(feature = "python")]
727#[pymodule]
728fn json_register(_m: &Bound<'_, PyModule>) -> PyResult<()> {
729 _m.add_class::<PyJsonRegister>()?;
730 _m.add_function(wrap_pyfunction!(py_canonicalise, _m)?)?;
731 Ok(())
732}
733
734#[cfg(test)]
735mod connection_tests {
736 use super::*;
737
738 #[test]
739 fn test_sanitize_connection_string_with_password() {
740 let input = "postgres://user:secret123@localhost:5432/mydb";
741 let expected = "postgres://user:****@localhost:5432/mydb";
742 assert_eq!(sanitize_connection_string(input), expected);
743 }
744
745 #[test]
746 fn test_sanitize_connection_string_postgresql_scheme() {
747 let input = "postgresql://admin:p@ssw0rd@db.example.com:5432/production";
748 let expected = "postgresql://admin:****@db.example.com:5432/production";
749 assert_eq!(sanitize_connection_string(input), expected);
750 }
751
752 #[test]
753 fn test_sanitize_connection_string_no_password() {
754 let input = "postgres://user@localhost:5432/mydb";
756 assert_eq!(sanitize_connection_string(input), input);
757 }
758
759 #[test]
760 fn test_sanitize_connection_string_with_special_chars() {
761 let input = "postgres://user:p@ss:word@localhost:5432/mydb";
762 let expected = "postgres://user:****@localhost:5432/mydb";
763 assert_eq!(sanitize_connection_string(input), expected);
764 }
765
766 #[test]
767 fn test_sanitize_connection_string_not_postgres() {
768 let input = "mysql://user:password@localhost:3306/mydb";
770 let expected = "mysql://user:****@localhost:3306/mydb";
771 assert_eq!(sanitize_connection_string(input), expected);
772 }
773
774 #[test]
775 fn test_sanitize_connection_string_malformed() {
776 let input = "not a connection string";
778 assert_eq!(sanitize_connection_string(input), input);
779 }
780}