1#[cfg(feature = "python")]
10use pyo3::prelude::*;
11#[cfg(feature = "python")]
12use pyo3::types::PyList;
13#[cfg(feature = "python")]
14use tokio::runtime::Runtime;
15
16use serde_json::Value;
17
18mod cache;
19mod canonicalise;
20mod db;
21mod errors;
22
23pub use cache::Cache;
24pub use canonicalise::canonicalise;
25pub use db::Db;
26pub use errors::JsonRegisterError;
27
28pub fn build_connection_string(
42 user: &str,
43 password: &str,
44 host: &str,
45 port: u16,
46 database: &str,
47) -> String {
48 format!(
49 "postgres://{}:{}@{}:{}/{}",
50 user, password, host, port, database
51 )
52}
53
54pub fn sanitize_connection_string(connection_string: &str) -> String {
74 if let Some(scheme_end) = connection_string.find("://") {
76 let scheme = &connection_string[..scheme_end + 3];
77 let rest = &connection_string[scheme_end + 3..];
78
79 let at_idx = if let Some(slash_idx) = rest.find('/') {
82 rest[..slash_idx].rfind('@')
84 } else {
85 rest.rfind('@')
87 };
88
89 if let Some(at_idx) = at_idx {
90 let user_pass = &rest[..at_idx];
91 let host_db = &rest[at_idx..];
92
93 if let Some(colon_idx) = user_pass.find(':') {
96 let user = &user_pass[..colon_idx];
97 return format!("{}{}:****{}", scheme, user, host_db);
98 }
99 }
100 }
101
102 connection_string.to_string()
104}
105
106pub struct Register {
111 db: Db,
112 cache: Cache,
113}
114
115impl Register {
116 #[allow(clippy::too_many_arguments)]
134 pub async fn new(
135 connection_string: &str,
136 table_name: &str,
137 id_column: &str,
138 jsonb_column: &str,
139 pool_size: u32,
140 lru_cache_size: usize,
141 acquire_timeout_secs: Option<u64>,
142 idle_timeout_secs: Option<u64>,
143 max_lifetime_secs: Option<u64>,
144 ) -> Result<Self, JsonRegisterError> {
145 let db = Db::new(
146 connection_string,
147 table_name,
148 id_column,
149 jsonb_column,
150 pool_size,
151 acquire_timeout_secs,
152 idle_timeout_secs,
153 max_lifetime_secs,
154 )
155 .await
156 .map_err(JsonRegisterError::DbError)?;
157 let cache = Cache::new(lru_cache_size);
158 Ok(Self { db, cache })
159 }
160
161 pub async fn register_object(&self, value: &Value) -> Result<i32, JsonRegisterError> {
175 let canonical = canonicalise(value).map_err(JsonRegisterError::SerdeError)?;
176
177 if let Some(id) = self.cache.get(&canonical) {
178 return Ok(id);
179 }
180
181 let id = self
182 .db
183 .register_object(&canonical)
184 .await
185 .map_err(JsonRegisterError::DbError)?;
186
187 self.cache.put(canonical, id);
188
189 Ok(id)
190 }
191
192 pub async fn register_batch_objects(
207 &self,
208 values: &[Value],
209 ) -> Result<Vec<i32>, JsonRegisterError> {
210 let mut canonicals = Vec::with_capacity(values.len());
211 for value in values {
212 canonicals.push(canonicalise(value).map_err(JsonRegisterError::SerdeError)?);
213 }
214
215 let mut all_cached = true;
217 let mut cached_ids = Vec::with_capacity(values.len());
218 for canonical in &canonicals {
219 if let Some(id) = self.cache.get(canonical) {
220 cached_ids.push(id);
221 } else {
222 all_cached = false;
223 break;
224 }
225 }
226
227 if all_cached {
228 return Ok(cached_ids);
229 }
230
231 let ids = self
233 .db
234 .register_batch_objects(&canonicals)
235 .await
236 .map_err(JsonRegisterError::DbError)?;
237
238 for (canonical, id) in canonicals.into_iter().zip(ids.iter()) {
240 self.cache.put(canonical, *id);
241 }
242
243 Ok(ids)
244 }
245
246 pub fn pool_size(&self) -> usize {
255 self.db.pool_size()
256 }
257
258 pub fn idle_connections(&self) -> usize {
267 self.db.idle_connections()
268 }
269
270 pub fn is_closed(&self) -> bool {
278 self.db.is_closed()
279 }
280
281 pub fn cache_hits(&self) -> u64 {
287 self.cache.hits()
288 }
289
290 pub fn cache_misses(&self) -> u64 {
296 self.cache.misses()
297 }
298
299 pub fn cache_hit_rate(&self) -> f64 {
306 self.cache.hit_rate()
307 }
308}
309
310#[cfg(feature = "python")]
311#[pyclass(name = "JsonRegister")]
312struct PyJsonRegister {
314 inner: Register,
315 rt: Runtime,
316}
317
318#[cfg(feature = "python")]
319#[pymethods]
320impl PyJsonRegister {
321 #[new]
322 #[pyo3(signature = (
323 database_name,
324 database_host,
325 database_port,
326 database_user,
327 database_password,
328 lru_cache_size=1000,
329 table_name="json_objects",
330 id_column="id",
331 jsonb_column="json_object",
332 pool_size=10,
333 acquire_timeout_secs=None,
334 idle_timeout_secs=None,
335 max_lifetime_secs=None
336 ))]
337 #[allow(clippy::too_many_arguments)]
338 fn new(
346 database_name: String,
347 database_host: String,
348 database_port: u16,
349 database_user: String,
350 database_password: String,
351 lru_cache_size: usize,
352 table_name: &str,
353 id_column: &str,
354 jsonb_column: &str,
355 pool_size: u32,
356 acquire_timeout_secs: Option<u64>,
357 idle_timeout_secs: Option<u64>,
358 max_lifetime_secs: Option<u64>,
359 ) -> PyResult<Self> {
360 if database_name.is_empty() {
362 return Err(
363 JsonRegisterError::Configuration("database_name cannot be empty".into()).into(),
364 );
365 }
366
367 if database_host.is_empty() {
368 return Err(
369 JsonRegisterError::Configuration("database_host cannot be empty".into()).into(),
370 );
371 }
372
373 if database_port == 0 {
374 return Err(JsonRegisterError::Configuration(
375 "database_port must be between 1 and 65535".into(),
376 )
377 .into());
378 }
379
380 if pool_size == 0 {
381 return Err(JsonRegisterError::Configuration(
382 "pool_size must be greater than 0".into(),
383 )
384 .into());
385 }
386
387 if pool_size > 10000 {
388 return Err(JsonRegisterError::Configuration(
389 "pool_size exceeds reasonable maximum of 10000".into(),
390 )
391 .into());
392 }
393
394 if table_name.is_empty() {
395 return Err(
396 JsonRegisterError::Configuration("table_name cannot be empty".into()).into(),
397 );
398 }
399
400 if id_column.is_empty() {
401 return Err(
402 JsonRegisterError::Configuration("id_column cannot be empty".into()).into(),
403 );
404 }
405
406 if jsonb_column.is_empty() {
407 return Err(
408 JsonRegisterError::Configuration("jsonb_column cannot be empty".into()).into(),
409 );
410 }
411
412 let connection_string = build_connection_string(
413 &database_user,
414 &database_password,
415 &database_host,
416 database_port,
417 &database_name,
418 );
419
420 let rt = Runtime::new().map_err(|e| JsonRegisterError::RuntimeError(e.to_string()))?;
421
422 let inner = rt.block_on(async {
423 Register::new(
424 &connection_string,
425 table_name,
426 id_column,
427 jsonb_column,
428 pool_size,
429 lru_cache_size,
430 acquire_timeout_secs,
431 idle_timeout_secs,
432 max_lifetime_secs,
433 )
434 .await
435 })?;
436
437 Ok(PyJsonRegister { inner, rt })
438 }
439
440 fn register_object(&self, json_obj: &Bound<'_, PyAny>) -> PyResult<i32> {
442 let value: Value = pythonize::depythonize(json_obj)
443 .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
444 self.rt
445 .block_on(self.inner.register_object(&value))
446 .map_err(Into::into)
447 }
448
449 fn register_batch_objects(&self, json_objects: &Bound<'_, PyList>) -> PyResult<Vec<i32>> {
451 let mut values = Vec::with_capacity(json_objects.len());
452 for obj in json_objects {
453 let value: Value = pythonize::depythonize(&obj)
454 .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
455 values.push(value);
456 }
457 self.rt
458 .block_on(self.inner.register_batch_objects(&values))
459 .map_err(Into::into)
460 }
461
462 fn pool_size(&self) -> usize {
467 self.inner.pool_size()
468 }
469
470 fn idle_connections(&self) -> usize {
475 self.inner.idle_connections()
476 }
477
478 fn is_closed(&self) -> bool {
482 self.inner.is_closed()
483 }
484
485 fn cache_hits(&self) -> u64 {
489 self.inner.cache_hits()
490 }
491
492 fn cache_misses(&self) -> u64 {
496 self.inner.cache_misses()
497 }
498
499 fn cache_hit_rate(&self) -> f64 {
503 self.inner.cache_hit_rate()
504 }
505}
506
507#[cfg(feature = "python")]
508#[pyfunction(name = "canonicalise")]
509fn py_canonicalise(json_obj: &Bound<'_, PyAny>) -> PyResult<Vec<u8>> {
511 let value: Value = pythonize::depythonize(json_obj)
512 .map_err(|e| JsonRegisterError::SerializationError(e.to_string()))?;
513 crate::canonicalise::canonicalise(&value)
514 .map(|s| s.into_bytes())
515 .map_err(|e| JsonRegisterError::SerdeError(e).into())
516}
517
518#[cfg(feature = "python")]
520#[pymodule]
521fn json_register(_m: &Bound<'_, PyModule>) -> PyResult<()> {
522 _m.add_class::<PyJsonRegister>()?;
523 _m.add_function(wrap_pyfunction!(py_canonicalise, _m)?)?;
524 Ok(())
525}
526
527#[cfg(test)]
528mod connection_tests {
529 use super::*;
530
531 #[test]
532 fn test_sanitize_connection_string_with_password() {
533 let input = "postgres://user:secret123@localhost:5432/mydb";
534 let expected = "postgres://user:****@localhost:5432/mydb";
535 assert_eq!(sanitize_connection_string(input), expected);
536 }
537
538 #[test]
539 fn test_sanitize_connection_string_postgresql_scheme() {
540 let input = "postgresql://admin:p@ssw0rd@db.example.com:5432/production";
541 let expected = "postgresql://admin:****@db.example.com:5432/production";
542 assert_eq!(sanitize_connection_string(input), expected);
543 }
544
545 #[test]
546 fn test_sanitize_connection_string_no_password() {
547 let input = "postgres://user@localhost:5432/mydb";
549 assert_eq!(sanitize_connection_string(input), input);
550 }
551
552 #[test]
553 fn test_sanitize_connection_string_with_special_chars() {
554 let input = "postgres://user:p@ss:word@localhost:5432/mydb";
555 let expected = "postgres://user:****@localhost:5432/mydb";
556 assert_eq!(sanitize_connection_string(input), expected);
557 }
558
559 #[test]
560 fn test_sanitize_connection_string_not_postgres() {
561 let input = "mysql://user:password@localhost:3306/mydb";
563 let expected = "mysql://user:****@localhost:3306/mydb";
564 assert_eq!(sanitize_connection_string(input), expected);
565 }
566
567 #[test]
568 fn test_sanitize_connection_string_malformed() {
569 let input = "not a connection string";
571 assert_eq!(sanitize_connection_string(input), input);
572 }
573}