1use crate::sql::aggregator::init_trace_cache;
2use crate::sql::cache::entity_cache;
3use crate::sql::cache::init_entity_cache;
4use crate::sql::error::SqlError;
5use crate::sql::traits::{
6 AlertSqlLogic, ArchiveSqlLogic, CustomMetricSqlLogic, GenAIDriftSqlLogic,
7 ObservabilitySqlLogic, ProfileSqlLogic, PsiSqlLogic, SpcSqlLogic, TagSqlLogic, TraceSqlLogic,
8 UserSqlLogic,
9};
10use scouter_settings::DatabaseSettings;
11use scouter_types::{RecordType, ServerRecords, TagRecord, ToDriftRecords, TraceServerRecord};
12use sqlx::ConnectOptions;
13use sqlx::{postgres::PgConnectOptions, Pool, Postgres};
14use std::result::Result::Ok;
15use std::time::Duration;
16use tokio::try_join;
17use tracing::log::LevelFilter;
18use tracing::{debug, error, info, instrument};
19const DEFAULT_BATCH_SIZE: usize = 500;
20
21#[derive(Debug, Clone)]
22#[allow(dead_code)]
23pub struct PostgresClient {}
24
25impl SpcSqlLogic for PostgresClient {}
26impl CustomMetricSqlLogic for PostgresClient {}
27impl PsiSqlLogic for PostgresClient {}
28impl GenAIDriftSqlLogic for PostgresClient {}
29impl UserSqlLogic for PostgresClient {}
30impl ProfileSqlLogic for PostgresClient {}
31impl ObservabilitySqlLogic for PostgresClient {}
32impl AlertSqlLogic for PostgresClient {}
33impl ArchiveSqlLogic for PostgresClient {}
34impl TraceSqlLogic for PostgresClient {}
35impl TagSqlLogic for PostgresClient {}
36
37impl PostgresClient {
38 #[instrument(skip(database_settings))]
44 pub async fn create_db_pool(
45 database_settings: &DatabaseSettings,
46 ) -> Result<Pool<Postgres>, SqlError> {
47 let mut opts: PgConnectOptions = database_settings.connection_uri.parse()?;
48
49 opts = opts.log_statements(LevelFilter::Off);
51
52 let pool = match sqlx::postgres::PgPoolOptions::new()
53 .max_connections(database_settings.max_connections)
54 .min_connections(database_settings.min_connections)
55 .acquire_timeout(Duration::from_secs(
56 database_settings.db_acquire_timeout_seconds,
57 ))
58 .idle_timeout(Duration::from_secs(
59 database_settings.db_idle_timeout_seconds,
60 ))
61 .max_lifetime(Duration::from_secs(
62 database_settings.db_max_lifetime_seconds,
63 ))
64 .test_before_acquire(database_settings.db_test_before_acquire)
65 .connect_with(opts)
66 .await
67 {
68 Ok(pool) => {
69 info!("✅ Successfully connected to database");
70 pool
71 }
72 Err(err) => {
73 error!("🚨 Failed to connect to database {:?}", err);
74 std::process::exit(1);
75 }
76 };
77
78 init_entity_cache(database_settings.entity_cache_size);
80
81 init_trace_cache(
83 pool.clone(),
84 database_settings.flush_interval,
85 database_settings.stale_threshold,
86 database_settings.max_cache_size,
87 )
88 .await?;
89
90 if let Err(err) = Self::run_migrations(&pool).await {
92 error!("🚨 Failed to run migrations {:?}", err);
93 std::process::exit(1);
94 }
95
96 Ok(pool)
97 }
98
99 pub async fn run_migrations(pool: &Pool<Postgres>) -> Result<(), SqlError> {
100 info!("Running migrations");
101 sqlx::migrate!("src/migrations")
102 .run(pool)
103 .await
104 .map_err(SqlError::MigrateError)?;
105
106 debug!("Migrations complete");
107
108 Ok(())
109 }
110}
111
112pub struct MessageHandler {}
113
114impl MessageHandler {
115 #[instrument(skip_all)]
116 pub async fn insert_server_records(
117 pool: &Pool<Postgres>,
118 records: ServerRecords,
119 ) -> Result<(), SqlError> {
120 debug!("Inserting server records: {:?}", records.record_type()?);
121
122 let entity_id = entity_cache()
123 .get_entity_id_from_uid(pool, records.uid()?)
124 .await
125 .inspect_err(|e| error!("Failed to get entity ID from UID: {:?}", e))?;
126
127 match records.record_type()? {
128 RecordType::Spc => {
129 let spc_records = records.to_spc_drift_records()?;
130 debug!("SPC record count: {}", spc_records.len());
131
132 for chunk in spc_records.chunks(DEFAULT_BATCH_SIZE) {
133 PostgresClient::insert_spc_drift_records_batch(pool, chunk, &entity_id)
134 .await
135 .map_err(|e| {
136 error!("Failed to insert SPC drift records batch: {:?}", e);
137 e
138 })?;
139 }
140 }
141
142 RecordType::Psi => {
143 let psi_records = records.to_psi_drift_records()?;
144 debug!("PSI record count: {}", psi_records.len());
145
146 for chunk in psi_records.chunks(DEFAULT_BATCH_SIZE) {
147 PostgresClient::insert_bin_counts_batch(pool, chunk, &entity_id)
148 .await
149 .map_err(|e| {
150 error!("Failed to insert PSI drift records batch: {:?}", e);
151 e
152 })?;
153 }
154 }
155 RecordType::Custom => {
156 let custom_records = records.to_custom_metric_drift_records()?;
157 debug!("Custom record count: {}", custom_records.len());
158
159 for chunk in custom_records.chunks(DEFAULT_BATCH_SIZE) {
160 PostgresClient::insert_custom_metric_values_batch(pool, chunk, &entity_id)
161 .await
162 .map_err(|e| {
163 error!("Failed to insert custom metric records batch: {:?}", e);
164 e
165 })?;
166 }
167 }
168
169 RecordType::GenAIEval => {
170 debug!("LLM Drift record count: {:?}", records.len());
171 let records = records.to_genai_eval_records()?;
172 for record in records {
173 let _ = PostgresClient::insert_genai_eval_record(pool, record, &entity_id)
174 .await
175 .map_err(|e| {
176 error!("Failed to insert GenAI drift record: {:?}", e);
177 });
178 }
179 }
180
181 RecordType::GenAITask => {
182 debug!("GenAI Task count: {:?}", records.len());
183 let records = records.to_genai_task_records()?;
184 for chunk in records.chunks(DEFAULT_BATCH_SIZE) {
185 PostgresClient::insert_eval_task_results_batch(pool, chunk, &entity_id)
186 .await
187 .map_err(|e| {
188 error!("Failed to insert GenAI task records batch: {:?}", e);
189 e
190 })?;
191 }
192 }
193
194 RecordType::GenAIWorkflow => {
195 debug!("GenAI Workflow count: {:?}", records.len());
196 let records = records.to_genai_workflow_records()?;
197 for record in records {
198 let _ = PostgresClient::insert_genai_eval_workflow_record(
199 pool, &record, &entity_id,
200 )
201 .await
202 .map_err(|e| {
203 error!("Failed to insert GenAI workflow record: {:?}", e);
204 });
205 }
206 }
207
208 _ => {
209 error!(
210 "Unsupported record type for batch insert: {:?}",
211 records.record_type()?
212 );
213 return Err(SqlError::UnsupportedBatchTypeError);
214 }
215 }
216
217 Ok(())
218 }
219
220 pub async fn insert_trace_server_record(
221 pool: &Pool<Postgres>,
222 records: TraceServerRecord,
223 ) -> Result<(), SqlError> {
224 let (span_batch, baggage_batch, tag_records) = records.to_records()?;
225 let span_count = span_batch.len();
226
227 let trace_cache = crate::sql::aggregator::get_trace_cache().await;
230 for span in &span_batch {
231 trace_cache.update_trace(span).await;
232 }
233
234 if let Some(trace_service) =
236 scouter_dataframe::parquet::tracing::service::get_trace_span_service()
237 {
238 if let Err(e) = trace_service.write_spans(span_batch).await {
239 error!("Failed to write spans to Delta Lake: {:?}", e);
240 }
241 } else {
242 error!("TraceSpanService not initialized — spans will be lost");
243 }
244
245 let (baggage_result, tag_result) = try_join!(
247 PostgresClient::insert_trace_baggage_batch(pool, &baggage_batch),
248 PostgresClient::insert_tag_batch(pool, &tag_records),
249 )?;
250
251 debug!(
252 span_count,
253 baggage_rows = baggage_result.rows_affected(),
254 total_baggage = baggage_batch.len(),
255 tag_rows = tag_result.rows_affected(),
256 "Successfully processed trace server records"
257 );
258 Ok(())
259 }
260
261 pub async fn insert_tag_record(
262 pool: &Pool<Postgres>,
263 record: TagRecord,
264 ) -> Result<(), SqlError> {
265 let result = PostgresClient::insert_tag_batch(pool, std::slice::from_ref(&record)).await?;
266
267 debug!(
268 rows_affected = result.rows_affected(),
269 entity_type = record.entity_type.as_str(),
270 entity_id = record.entity_id.as_str(),
271 key = record.key.as_str(),
272 "Successfully inserted tag record"
273 );
274
275 Ok(())
276 }
277}
278
279#[cfg(test)]
283mod tests {
284
285 use super::*;
286 use crate::sql::schema::User;
287 use crate::sql::traits::EntitySqlLogic;
288 use chrono::{Duration, Utc};
289 use potato_head::create_uuid7;
290 use rand::Rng;
291 use scouter_semver::VersionType;
292 use scouter_settings::ObjectStorageSettings;
293 use scouter_types::genai::ExecutionPlan;
294 use scouter_types::psi::{Bin, BinType, PsiDriftConfig, PsiFeatureDriftProfile};
295 use scouter_types::spc::SpcDriftProfile;
296 use scouter_types::*;
297 use serde_json::Value;
298
299 const SPACE: &str = "space";
300 const NAME: &str = "name";
301 const VERSION: &str = "1.0.0";
302 const ENTITY_ID: i32 = 9999;
303
304 pub async fn cleanup(pool: &Pool<Postgres>) {
305 sqlx::raw_sql(
306 r#"
307
308 DELETE
309 FROM scouter.drift_entities;
310
311 DELETE
312 FROM scouter.spc_drift;
313
314 DELETE
315 FROM scouter.observability_metric;
316
317 DELETE
318 FROM scouter.custom_drift;
319
320 DELETE
321 FROM scouter.drift_alert;
322
323 DELETE
324 FROM scouter.drift_profile;
325
326 DELETE
327 FROM scouter.psi_drift;
328
329 DELETE
330 FROM scouter.user;
331
332 DELETE
333 FROM scouter.genai_eval_record;
334
335 DELETE
336 FROM scouter.genai_eval_task;
337
338 DELETE
339 FROM scouter.genai_eval_workflow;
340
341 DELETE
342 FROM scouter.spans;
343
344 DELETE
345 FROM scouter.traces;
346
347 DELETE
348 FROM scouter.trace_entities;
349
350 DELETE
351 FROM scouter.trace_baggage;
352
353 DELETE
354 FROM scouter.tags;
355 "#,
356 )
357 .fetch_all(pool)
358 .await
359 .unwrap();
360 }
361
362 pub async fn db_pool() -> Pool<Postgres> {
363 let pool = PostgresClient::create_db_pool(&DatabaseSettings::default())
364 .await
365 .unwrap();
366 cleanup(&pool).await;
367 pool
368 }
369
370 pub async fn insert_profile_to_db(
371 pool: &Pool<Postgres>,
372 profile: &DriftProfile,
373 active: bool,
374 deactivate_others: bool,
375 ) -> String {
376 let base_args = profile.get_base_args();
377 let version_request = VersionRequest {
378 version: None,
379 version_type: VersionType::Minor,
380 pre_tag: None,
381 build_tag: None,
382 };
383 let version = PostgresClient::get_next_profile_version(pool, &base_args, version_request)
384 .await
385 .unwrap();
386
387 let result = PostgresClient::insert_drift_profile(
388 pool,
389 profile,
390 &base_args,
391 &version,
392 &active,
393 &deactivate_others,
394 )
395 .await
396 .unwrap();
397
398 result
399 }
400
401 #[tokio::test]
402 async fn test_postgres_start() {
403 let _pool = db_pool().await;
404 }
405
406 #[tokio::test]
407 async fn test_postgres_drift_alert() {
408 let pool = db_pool().await;
409 let entity_id = 9999;
410
411 for i in 0..10 {
413 let alert = AlertMap::Custom(custom::ComparisonMetricAlert {
414 metric_name: "test".to_string(),
415 baseline_value: 0 as f64,
416 observed_value: i as f64,
417 delta: None,
418 alert_threshold: AlertThreshold::Above,
419 });
420
421 let result = PostgresClient::insert_drift_alert(&pool, &entity_id, &alert)
422 .await
423 .unwrap();
424
425 assert_eq!(result.rows_affected(), 1);
426
427 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
429 }
430
431 let request = DriftAlertPaginationRequest {
433 uid: create_uuid7(),
434 active: Some(true),
435 limit: Some(50),
436 ..Default::default()
437 };
438
439 let response = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
440 .await
441 .unwrap();
442
443 assert_eq!(response.items.len(), 10);
444 assert!(!response.has_next); assert!(!response.has_previous); assert!(response.next_cursor.is_none());
447 assert!(response.previous_cursor.is_some());
448
449 let request = DriftAlertPaginationRequest {
451 uid: create_uuid7(),
452 active: Some(true),
453 limit: Some(3),
454 direction: Some("next".to_string()),
455 ..Default::default()
456 };
457
458 let page1 = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
459 .await
460 .unwrap();
461
462 assert_eq!(page1.items.len(), 3);
463 assert!(page1.has_next);
464 assert!(!page1.has_previous);
465 assert!(page1.next_cursor.is_some());
466 assert!(page1.previous_cursor.is_some());
467
468 let next_cursor = page1.next_cursor.unwrap();
470 let request = DriftAlertPaginationRequest {
471 uid: create_uuid7(),
472 active: Some(true),
473 limit: Some(3),
474 cursor_created_at: Some(next_cursor.created_at),
475 cursor_id: Some(next_cursor.id as i32),
476 direction: Some("next".to_string()),
477 start_datetime: None,
478 end_datetime: None,
479 };
480
481 let page2 = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
482 .await
483 .unwrap();
484
485 assert_eq!(page2.items.len(), 3);
486 assert!(page2.has_next); assert!(page2.has_previous); assert!(page2.next_cursor.is_some());
489 assert!(page2.previous_cursor.is_some());
490
491 let page1_ids: std::collections::HashSet<_> = page1.items.iter().map(|a| a.id).collect();
493 let page2_ids: std::collections::HashSet<_> = page2.items.iter().map(|a| a.id).collect();
494 assert!(page1_ids.is_disjoint(&page2_ids));
495
496 let prev_cursor = page2.previous_cursor.unwrap();
498 let request = DriftAlertPaginationRequest {
499 uid: create_uuid7(),
500 active: Some(true),
501 limit: Some(3),
502 cursor_created_at: Some(prev_cursor.created_at),
503 cursor_id: Some(prev_cursor.id as i32),
504 direction: Some("previous".to_string()),
505 start_datetime: None,
506 end_datetime: None,
507 };
508
509 let page_back = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
510 .await
511 .unwrap();
512
513 assert_eq!(page_back.items.len(), 3);
514 assert!(page_back.has_next); assert_eq!(
517 page_back.items.iter().map(|a| a.id).collect::<Vec<_>>(),
518 page1.items.iter().map(|a| a.id).collect::<Vec<_>>()
519 );
520
521 let to_deactivate = &page1.items[0];
524 let update_request = UpdateAlertStatus {
525 id: to_deactivate.id,
526 active: false,
527 space: "test_space".to_string(),
528 };
529
530 PostgresClient::update_drift_alert_status(&pool, &update_request)
531 .await
532 .unwrap();
533
534 let request = DriftAlertPaginationRequest {
536 uid: create_uuid7(),
537 active: Some(true),
538 limit: Some(50),
539 ..Default::default()
540 };
541
542 let active_alerts = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
543 .await
544 .unwrap();
545
546 assert_eq!(active_alerts.items.len(), 9); assert!(active_alerts.items.iter().all(|a| a.active));
548
549 let request = DriftAlertPaginationRequest {
551 uid: create_uuid7(),
552 active: Some(false),
553 limit: Some(50),
554 ..Default::default()
555 };
556
557 let inactive_alerts =
558 PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
559 .await
560 .unwrap();
561
562 assert_eq!(inactive_alerts.items.len(), 1);
563 assert!(!inactive_alerts.items[0].active);
564
565 let request = DriftAlertPaginationRequest {
567 uid: create_uuid7(),
568 active: None, limit: Some(50),
570 ..Default::default()
571 };
572
573 let all_alerts = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
574 .await
575 .unwrap();
576
577 assert_eq!(all_alerts.items.len(), 10);
578
579 let request = DriftAlertPaginationRequest {
581 uid: create_uuid7(),
582 active: Some(true),
583 limit: Some(3),
584 ..Default::default()
585 };
586
587 let non_existent = PostgresClient::get_paginated_drift_alerts(&pool, &request, &999999)
588 .await
589 .unwrap();
590
591 assert_eq!(non_existent.items.len(), 0);
592 assert!(!non_existent.has_next);
593 assert!(!non_existent.has_previous);
594 assert!(non_existent.next_cursor.is_none());
595 assert!(non_existent.previous_cursor.is_none());
596 }
597
598 #[tokio::test]
599 async fn test_postgres_spc_drift_record() {
600 let pool = db_pool().await;
601
602 let record1 = SpcRecord {
603 created_at: Utc::now(),
604 uid: create_uuid7(),
605 feature: "test".to_string(),
606 value: 1.0,
607 entity_id: 0,
608 };
609
610 let record2 = SpcRecord {
611 created_at: Utc::now(),
612 uid: create_uuid7(),
613 feature: "test2".to_string(),
614 value: 2.0,
615 entity_id: 0,
616 };
617
618 let result =
619 PostgresClient::insert_spc_drift_records_batch(&pool, &[record1, record2], &ENTITY_ID)
620 .await
621 .unwrap();
622
623 assert_eq!(result.rows_affected(), 2);
624 }
625
626 #[tokio::test]
627 async fn test_postgres_bin_count() {
628 let pool = db_pool().await;
629
630 let record1 = PsiRecord {
631 created_at: Utc::now(),
632 uid: create_uuid7(),
633 feature: "test".to_string(),
634 bin_id: 1,
635 bin_count: 1,
636 entity_id: ENTITY_ID,
637 };
638
639 let record2 = PsiRecord {
640 created_at: Utc::now(),
641 uid: create_uuid7(),
642 feature: "test2".to_string(),
643 bin_id: 2,
644 bin_count: 2,
645 entity_id: ENTITY_ID,
646 };
647
648 let result =
649 PostgresClient::insert_bin_counts_batch(&pool, &[record1, record2], &ENTITY_ID)
650 .await
651 .unwrap();
652
653 assert_eq!(result.rows_affected(), 2);
654 }
655
656 #[tokio::test]
657 async fn test_postgres_observability_record() {
658 let pool = db_pool().await;
659
660 let record = ObservabilityMetrics::default();
661
662 let result = PostgresClient::insert_observability_record(&pool, &record, &ENTITY_ID)
663 .await
664 .unwrap();
665
666 assert_eq!(result.rows_affected(), 1);
667 }
668
669 #[tokio::test]
670 async fn test_postgres_crud_drift_profile() {
671 let pool = db_pool().await;
672
673 let mut spc_profile = SpcDriftProfile::default();
674 let profile = DriftProfile::Spc(spc_profile.clone());
675 let uid = insert_profile_to_db(&pool, &profile, false, false).await;
676 assert!(!uid.is_empty());
677
678 let entity_id = PostgresClient::get_entity_id_from_uid(&pool, &uid)
679 .await
680 .unwrap();
681
682 spc_profile.scouter_version = "test".to_string();
683
684 let result = PostgresClient::update_drift_profile(
685 &pool,
686 &DriftProfile::Spc(spc_profile.clone()),
687 &entity_id,
688 )
689 .await
690 .unwrap();
691
692 assert_eq!(result.rows_affected(), 1);
693
694 let profile = PostgresClient::get_drift_profile(&pool, &entity_id)
695 .await
696 .unwrap();
697
698 let deserialized = serde_json::from_value::<SpcDriftProfile>(profile.unwrap()).unwrap();
699
700 assert_eq!(deserialized, spc_profile);
701
702 PostgresClient::update_drift_profile_status(
703 &pool,
704 &ProfileStatusRequest {
705 name: spc_profile.config.name.clone(),
706 space: spc_profile.config.space.clone(),
707 version: spc_profile.config.version.clone(),
708 active: false,
709 drift_type: Some(DriftType::Spc),
710 deactivate_others: false,
711 },
712 )
713 .await
714 .unwrap();
715 }
716
717 #[tokio::test]
718 async fn test_postgres_get_features() {
719 let pool = db_pool().await;
720
721 let timestamp = Utc::now();
722
723 for _ in 0..10 {
724 let mut records = Vec::new();
725 for j in 0..10 {
726 let record = SpcRecord {
727 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
728 uid: create_uuid7(),
729 feature: format!("test{j}"),
730 value: j as f64,
731 entity_id: ENTITY_ID,
732 };
733
734 records.push(record);
735 }
736
737 let result =
738 PostgresClient::insert_spc_drift_records_batch(&pool, &records, &ENTITY_ID)
739 .await
740 .unwrap();
741 assert_eq!(result.rows_affected(), records.len() as u64);
742 }
743
744 let features = PostgresClient::get_spc_features(&pool, &ENTITY_ID)
745 .await
746 .unwrap();
747 assert_eq!(features.len(), 10);
748
749 let records =
750 PostgresClient::get_spc_drift_records(&pool, ×tamp, &features, &ENTITY_ID)
751 .await
752 .unwrap();
753
754 assert_eq!(records.features.len(), 10);
755
756 let binned_records = PostgresClient::get_binned_spc_drift_records(
757 &pool,
758 &DriftRequest {
759 uid: create_uuid7(),
760 time_interval: TimeInterval::FifteenMinutes,
761 max_data_points: 10,
762 ..Default::default()
763 },
764 &DatabaseSettings::default().retention_period,
765 &ObjectStorageSettings::default(),
766 &ENTITY_ID,
767 )
768 .await
769 .unwrap();
770
771 assert_eq!(binned_records.features.len(), 10);
772 }
773
774 #[tokio::test]
775 async fn test_postgres_bin_proportions() {
776 let pool = db_pool().await;
777
778 let timestamp = Utc::now();
779
780 let num_features = 3;
781 let num_bins = 5;
782
783 let features = (0..=num_features)
784 .map(|feature| {
785 let bins = (0..=num_bins)
786 .map(|bind_id| Bin {
787 id: bind_id,
788 lower_limit: None,
789 upper_limit: None,
790 proportion: 0.0,
791 })
792 .collect();
793 let feature_name = format!("feature{feature}");
794 let feature_profile = PsiFeatureDriftProfile {
795 id: feature_name.clone(),
796 bins,
797 timestamp,
798 bin_type: BinType::Numeric,
799 };
800 (feature_name, feature_profile)
801 })
802 .collect();
803
804 let profile = &DriftProfile::Psi(psi::PsiDriftProfile::new(
805 features,
806 PsiDriftConfig {
807 space: SPACE.to_string(),
808 name: NAME.to_string(),
809 version: VERSION.to_string(),
810 ..Default::default()
811 },
812 ));
813 let uid = insert_profile_to_db(&pool, profile, false, false).await;
814 let entity_id = PostgresClient::get_entity_id_from_uid(&pool, &uid)
815 .await
816 .unwrap();
817
818 for feature in 0..num_features {
819 for bin in 0..=num_bins {
820 let mut records = Vec::new();
821 for j in 0..=100 {
822 let record = PsiRecord {
823 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
824 uid: create_uuid7(),
825 feature: format!("feature{feature}"),
826 bin_id: bin,
827 bin_count: rand::rng().random_range(0..10) as i32,
828 entity_id: ENTITY_ID,
829 };
830
831 records.push(record);
832 }
833 PostgresClient::insert_bin_counts_batch(&pool, &records, &entity_id)
834 .await
835 .unwrap();
836 }
837 }
838
839 let binned_records = PostgresClient::get_feature_distributions(
840 &pool,
841 ×tamp,
842 &["feature0".to_string()],
843 &entity_id,
844 )
845 .await
846 .unwrap();
847
848 let bin_proportion = binned_records
850 .distributions
851 .get("feature0")
852 .unwrap()
853 .bins
854 .get(&1)
855 .unwrap();
856
857 assert!(*bin_proportion > 0.1 && *bin_proportion < 0.2);
858
859 let binned_records = PostgresClient::get_binned_psi_drift_records(
860 &pool,
861 &DriftRequest {
862 uid: create_uuid7(),
863 time_interval: TimeInterval::OneHour,
864 max_data_points: 1000,
865 ..Default::default()
866 },
867 &DatabaseSettings::default().retention_period,
868 &ObjectStorageSettings::default(),
869 &entity_id,
870 )
871 .await
872 .unwrap();
873 assert_eq!(binned_records.len(), 3);
875 }
876
877 #[tokio::test]
878 async fn test_postgres_cru_custom_metric() {
879 let pool = db_pool().await;
880 let timestamp = Utc::now();
881
882 let (uid, entity_id) = PostgresClient::create_entity(
883 &pool,
884 SPACE,
885 NAME,
886 VERSION,
887 DriftType::Custom.to_string(),
888 )
889 .await
890 .unwrap();
891
892 for i in 0..2 {
893 let mut records = Vec::new();
894 for j in 0..25 {
895 let record = CustomMetricRecord {
896 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
897 uid: uid.clone(),
898 metric: format!("metric{i}"),
899 value: rand::rng().random_range(0..10) as f64,
900 entity_id: ENTITY_ID,
901 };
902 records.push(record);
903 }
904 let result =
905 PostgresClient::insert_custom_metric_values_batch(&pool, &records, &entity_id)
906 .await
907 .unwrap();
908 assert_eq!(result.rows_affected(), 25);
909 }
910
911 let record = CustomMetricRecord {
913 created_at: Utc::now(),
914 uid: uid.clone(),
915 metric: "metric3".to_string(),
916 value: rand::rng().random_range(0..10) as f64,
917 entity_id: ENTITY_ID,
918 };
919
920 let result =
921 PostgresClient::insert_custom_metric_values_batch(&pool, &[record], &entity_id)
922 .await
923 .unwrap();
924 assert_eq!(result.rows_affected(), 1);
925
926 let metrics = PostgresClient::get_custom_metric_values(
927 &pool,
928 ×tamp,
929 &["metric1".to_string()],
930 &entity_id,
931 )
932 .await
933 .unwrap();
934
935 assert_eq!(metrics.len(), 1);
936
937 let binned_records = PostgresClient::get_binned_custom_drift_records(
938 &pool,
939 &DriftRequest {
940 uid: uid.clone(),
941 time_interval: TimeInterval::OneHour,
942 max_data_points: 1000,
943 ..Default::default()
944 },
945 &DatabaseSettings::default().retention_period,
946 &ObjectStorageSettings::default(),
947 &entity_id,
948 )
949 .await
950 .unwrap();
951 assert_eq!(binned_records.metrics.len(), 3);
953 }
954
955 #[tokio::test]
956 async fn test_postgres_user() {
957 let pool = db_pool().await;
958 let recovery_codes = vec!["recovery_code_1".to_string(), "recovery_code_2".to_string()];
959
960 let user = User::new(
962 "user".to_string(),
963 "pass".to_string(),
964 "email".to_string(),
965 recovery_codes,
966 None,
967 None,
968 None,
969 None,
970 );
971 PostgresClient::insert_user(&pool, &user).await.unwrap();
972
973 let mut user = PostgresClient::get_user(&pool, "user")
975 .await
976 .unwrap()
977 .unwrap();
978
979 assert_eq!(user.username, "user");
980 assert_eq!(user.group_permissions, vec!["user"]);
981 assert_eq!(user.email, "email");
982
983 user.active = false;
985 user.refresh_token = Some("token".to_string());
986
987 PostgresClient::update_user(&pool, &user).await.unwrap();
989 let user = PostgresClient::get_user(&pool, "user")
990 .await
991 .unwrap()
992 .unwrap();
993 assert!(!user.active);
994 assert_eq!(user.refresh_token.unwrap(), "token");
995
996 let users = PostgresClient::get_users(&pool).await.unwrap();
998 assert_eq!(users.len(), 1);
999
1000 let is_last_admin = PostgresClient::is_last_admin(&pool, "user").await.unwrap();
1002 assert!(!is_last_admin);
1003
1004 PostgresClient::delete_user(&pool, "user").await.unwrap();
1006 }
1007
1008 #[tokio::test]
1009 async fn test_postgres_genai_eval_record_reschedule() {
1010 let pool = db_pool().await;
1011
1012 let (uid, entity_id) = PostgresClient::create_entity(
1013 &pool,
1014 SPACE,
1015 NAME,
1016 VERSION,
1017 DriftType::GenAI.to_string(),
1018 )
1019 .await
1020 .unwrap();
1021
1022 let input = "This is a test input";
1023 let output = "This is a test response";
1024
1025 for j in 0..10 {
1026 let context = serde_json::json!({
1027 "input": input,
1028 "response": output,
1029 });
1030 let record = EvalRecord {
1031 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1032 context,
1033 status: Status::Pending,
1034 id: 0, uid: format!("test_{}", j),
1036 entity_uid: uid.clone(),
1037 entity_id,
1038 ..Default::default()
1039 };
1040
1041 let boxed = BoxedEvalRecord::new(record);
1042
1043 let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1044 .await
1045 .unwrap();
1046
1047 assert_eq!(result.rows_affected(), 1);
1048 }
1049
1050 let features = PostgresClient::get_genai_eval_records(&pool, None, None, &entity_id)
1051 .await
1052 .unwrap();
1053 assert_eq!(features.len(), 10);
1054
1055 let pending_tasks = PostgresClient::get_pending_genai_eval_record(&pool)
1057 .await
1058 .unwrap();
1059
1060 assert!(pending_tasks.is_some());
1062
1063 let task_input = &pending_tasks.as_ref().unwrap().context["input"];
1065 assert_eq!(*task_input, "This is a test input".to_string());
1066
1067 PostgresClient::reschedule_genai_eval_record(
1069 &pool,
1070 &pending_tasks.as_ref().unwrap().uid,
1071 Duration::seconds(30),
1072 )
1073 .await
1074 .unwrap();
1075 }
1076
1077 #[tokio::test]
1078 async fn test_postgres_genai_eval_record_insert_get() {
1079 let pool = db_pool().await;
1080
1081 let (uid, entity_id) = PostgresClient::create_entity(
1082 &pool,
1083 SPACE,
1084 NAME,
1085 VERSION,
1086 DriftType::GenAI.to_string(),
1087 )
1088 .await
1089 .unwrap();
1090
1091 let input = "This is a test input";
1092 let output = "This is a test response";
1093
1094 for j in 0..10 {
1095 let context = serde_json::json!({
1096 "input": input,
1097 "response": output,
1098 });
1099 let record = EvalRecord {
1100 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1101 context,
1102 status: Status::Pending,
1103 id: 0, uid: format!("test_{}", j),
1105 entity_uid: uid.clone(),
1106 entity_id,
1107 ..Default::default()
1108 };
1109
1110 let boxed = BoxedEvalRecord::new(record);
1111
1112 let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1113 .await
1114 .unwrap();
1115
1116 assert_eq!(result.rows_affected(), 1);
1117 }
1118
1119 let features = PostgresClient::get_genai_eval_records(&pool, None, None, &entity_id)
1120 .await
1121 .unwrap();
1122 assert_eq!(features.len(), 10);
1123
1124 let pending_tasks = PostgresClient::get_pending_genai_eval_record(&pool)
1126 .await
1127 .unwrap();
1128
1129 assert!(pending_tasks.is_some());
1131
1132 let task_input = &pending_tasks.as_ref().unwrap().context["input"];
1134 assert_eq!(*task_input, "This is a test input".to_string());
1135
1136 PostgresClient::update_genai_eval_record_status(
1138 &pool,
1139 &pending_tasks.unwrap(),
1140 Status::Processed,
1141 &(1_i64),
1142 )
1143 .await
1144 .unwrap();
1145
1146 let processed_tasks = PostgresClient::get_genai_eval_records(
1148 &pool,
1149 None,
1150 Some(Status::Processed),
1151 &entity_id,
1152 )
1153 .await
1154 .unwrap();
1155
1156 assert_eq!(processed_tasks.len(), 1);
1158 }
1159
1160 #[tokio::test]
1161 async fn test_postgres_genai_eval_record_pagination() {
1162 let pool = db_pool().await;
1163
1164 let (uid, entity_id) = PostgresClient::create_entity(
1165 &pool,
1166 SPACE,
1167 NAME,
1168 VERSION,
1169 DriftType::GenAI.to_string(),
1170 )
1171 .await
1172 .unwrap();
1173
1174 let input = "This is a test input";
1175 let output = "This is a test response";
1176
1177 for j in 0..10 {
1179 let context = serde_json::json!({
1180 "input": input,
1181 "response": output,
1182 });
1183 let record = EvalRecord {
1184 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1185 context,
1186 status: Status::Pending,
1187 id: 0, uid: format!("test_{}", j),
1189 entity_uid: uid.clone(),
1190 entity_id: ENTITY_ID,
1191 ..Default::default()
1192 };
1193
1194 let boxed = BoxedEvalRecord::new(record);
1195
1196 let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1197 .await
1198 .unwrap();
1199
1200 assert_eq!(result.rows_affected(), 1);
1201 }
1202
1203 let params = EvalRecordPaginationRequest {
1205 status: None,
1206 limit: Some(5),
1207 cursor_created_at: None,
1208 cursor_id: None,
1209 direction: None,
1210 ..Default::default()
1211 };
1212
1213 let page1 = PostgresClient::get_paginated_genai_eval_records(&pool, ¶ms, &entity_id)
1214 .await
1215 .unwrap();
1216
1217 assert_eq!(page1.items.len(), 5, "Page 1 should have 5 records");
1218 assert!(page1.has_next, "Should have next page");
1219 assert!(
1220 !page1.has_previous,
1221 "Should not have previous page (first page)"
1222 );
1223 assert!(page1.next_cursor.is_some(), "Should have next cursor");
1224
1225 let page1_first = page1.items.first().unwrap();
1227 let page1_last = page1.items.last().unwrap();
1228
1229 assert!(
1230 page1_first.created_at >= page1_last.created_at,
1231 "Page 1 should be sorted newest first (DESC)"
1232 );
1233
1234 let next_cursor = page1.next_cursor.unwrap();
1236
1237 let params = EvalRecordPaginationRequest {
1238 status: None,
1239 limit: Some(5),
1240 cursor_created_at: Some(next_cursor.created_at),
1241 cursor_id: Some(next_cursor.id),
1242 direction: None,
1243 ..Default::default()
1244 };
1245
1246 let page2 = PostgresClient::get_paginated_genai_eval_records(&pool, ¶ms, &entity_id)
1247 .await
1248 .unwrap();
1249
1250 assert_eq!(page2.items.len(), 5, "Page 2 should have 5 records");
1251 assert!(!page2.has_next, "Should not have next page (last page)");
1252 assert!(page2.has_previous, "Should have previous page");
1253 assert!(
1254 page2.previous_cursor.is_some(),
1255 "Should have previous cursor"
1256 );
1257
1258 let page2_first = page2.items.first().unwrap();
1259
1260 assert!(
1262 page2_first.created_at < page1_last.created_at
1263 || (page2_first.created_at == page1_last.created_at
1264 && page2_first.id < page1_last.id),
1265 "Page 2 should start with records older than Page 1 last item"
1266 );
1267
1268 let all_ids: Vec<i64> = page1
1270 .items
1271 .iter()
1272 .chain(page2.items.iter())
1273 .map(|r| r.id)
1274 .collect();
1275
1276 assert_eq!(all_ids.len(), 10, "Should have 10 unique records total");
1277
1278 let unique_ids: std::collections::HashSet<_> = all_ids.iter().collect();
1280 assert_eq!(unique_ids.len(), 10, "All IDs should be unique");
1281
1282 let previous_cursor = page2.previous_cursor.unwrap();
1285
1286 let params = EvalRecordPaginationRequest {
1287 status: None,
1288 limit: Some(5),
1289 cursor_created_at: Some(previous_cursor.created_at),
1290 cursor_id: Some(previous_cursor.id),
1291 direction: Some("previous".to_string()),
1292 ..Default::default()
1293 };
1294
1295 let page1_again =
1296 PostgresClient::get_paginated_genai_eval_records(&pool, ¶ms, &entity_id)
1297 .await
1298 .unwrap();
1299
1300 assert_eq!(
1301 page1_again.items.len(),
1302 5,
1303 "Going back should return 5 records"
1304 );
1305
1306 assert_eq!(
1308 page1_again.items.first().unwrap().id,
1309 page1_first.id,
1310 "Should return to the same first record"
1311 );
1312 }
1313
1314 #[tokio::test]
1315 async fn test_postgres_genai_eval_workflow_pagination() {
1316 let pool = db_pool().await;
1317
1318 let (_uid, entity_id) = PostgresClient::create_entity(
1319 &pool,
1320 SPACE,
1321 NAME,
1322 VERSION,
1323 DriftType::GenAI.to_string(),
1324 )
1325 .await
1326 .unwrap();
1327
1328 for j in 0..10 {
1330 let record = GenAIEvalWorkflowResult {
1331 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1332 record_uid: format!("test_{}", j),
1333 entity_id,
1334 ..Default::default()
1335 };
1336
1337 let result =
1338 PostgresClient::insert_genai_eval_workflow_record(&pool, &record, &entity_id)
1339 .await
1340 .unwrap();
1341
1342 assert_eq!(result.rows_affected(), 1);
1343 }
1344
1345 let params = EvalRecordPaginationRequest {
1347 status: None,
1348 limit: Some(5),
1349 cursor_created_at: None,
1350 cursor_id: None,
1351 direction: None,
1352 ..Default::default()
1353 };
1354
1355 let page1 =
1356 PostgresClient::get_paginated_genai_eval_workflow_records(&pool, ¶ms, &entity_id)
1357 .await
1358 .unwrap();
1359
1360 assert_eq!(page1.items.len(), 5, "Page 1 should have 5 records");
1361 assert!(page1.has_next, "Should have next page");
1362 assert!(
1363 !page1.has_previous,
1364 "Should not have previous page (first page)"
1365 );
1366 assert!(page1.next_cursor.is_some(), "Should have next cursor");
1367
1368 let page1_first = page1.items.first().unwrap();
1370 let page1_last = page1.items.last().unwrap();
1371
1372 assert!(
1373 page1_first.created_at >= page1_last.created_at,
1374 "Page 1 should be sorted newest first (DESC)"
1375 );
1376
1377 let next_cursor = page1.next_cursor.unwrap();
1379
1380 let params = EvalRecordPaginationRequest {
1381 status: None,
1382 limit: Some(5),
1383 cursor_created_at: Some(next_cursor.created_at),
1384 cursor_id: Some(next_cursor.id),
1385 direction: None,
1386 ..Default::default()
1387 };
1388
1389 let page2 =
1390 PostgresClient::get_paginated_genai_eval_workflow_records(&pool, ¶ms, &entity_id)
1391 .await
1392 .unwrap();
1393
1394 assert_eq!(page2.items.len(), 5, "Page 2 should have 5 records");
1395 assert!(!page2.has_next, "Should not have next page (last page)");
1396 assert!(page2.has_previous, "Should have previous page");
1397 assert!(
1398 page2.previous_cursor.is_some(),
1399 "Should have previous cursor"
1400 );
1401
1402 let page2_first = page2.items.first().unwrap();
1403
1404 assert!(
1406 page2_first.created_at < page1_last.created_at
1407 || (page2_first.created_at == page1_last.created_at
1408 && page2_first.id < page1_last.id),
1409 "Page 2 should start with records older than Page 1 last item"
1410 );
1411
1412 let all_ids: Vec<i64> = page1
1414 .items
1415 .iter()
1416 .chain(page2.items.iter())
1417 .map(|r| r.id)
1418 .collect();
1419
1420 assert_eq!(all_ids.len(), 10, "Should have 10 unique records total");
1421
1422 let unique_ids: std::collections::HashSet<_> = all_ids.iter().collect();
1424 assert_eq!(unique_ids.len(), 10, "All IDs should be unique");
1425
1426 let previous_cursor = page2.previous_cursor.unwrap();
1429
1430 let params = EvalRecordPaginationRequest {
1431 status: None,
1432 limit: Some(5),
1433 cursor_created_at: Some(previous_cursor.created_at),
1434 cursor_id: Some(previous_cursor.id),
1435 direction: Some("previous".to_string()),
1436 ..Default::default()
1437 };
1438
1439 let page1_again =
1440 PostgresClient::get_paginated_genai_eval_workflow_records(&pool, ¶ms, &entity_id)
1441 .await
1442 .unwrap();
1443
1444 assert_eq!(
1445 page1_again.items.len(),
1446 5,
1447 "Going back should return 5 records"
1448 );
1449
1450 assert_eq!(
1452 page1_again.items.first().unwrap().id,
1453 page1_first.id,
1454 "Should return to the same first record"
1455 );
1456 }
1457
1458 #[tokio::test]
1459 async fn test_postgres_genai_task_result_insert_get() {
1460 let pool = db_pool().await;
1461
1462 let timestamp = Utc::now();
1463
1464 let (uid, entity_id) = PostgresClient::create_entity(
1465 &pool,
1466 SPACE,
1467 NAME,
1468 VERSION,
1469 DriftType::GenAI.to_string(),
1470 )
1471 .await
1472 .unwrap();
1473
1474 let mut records = Vec::new();
1475 for i in 0..2 {
1476 for j in 0..25 {
1477 let record = EvalTaskResult {
1478 record_uid: format!("record_uid_{i}_{j}"),
1479 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1480 start_time: Utc::now(),
1481 end_time: Utc::now() + chrono::Duration::seconds(1),
1482 entity_id,
1483 task_id: format!("task{i}"),
1484 task_type: scouter_types::genai::EvaluationTaskType::Assertion,
1485 passed: true,
1486 value: j as f64,
1487 assertion: Assertion::FieldPath(Some(format!("field.path.{i}"))),
1488 operator: scouter_types::genai::ComparisonOperator::Contains,
1489 expected: Value::Null,
1490 actual: Value::Null,
1491 message: "All good".to_string(),
1492 entity_uid: uid.clone(),
1493 condition: false,
1494 stage: 0_i32,
1495 };
1496 records.push(record);
1497 }
1498 let result =
1499 PostgresClient::insert_eval_task_results_batch(&pool, &records, &entity_id)
1500 .await
1501 .unwrap();
1502 assert_eq!(result.rows_affected(), 25);
1503 }
1504
1505 let metrics = PostgresClient::get_genai_task_values(
1506 &pool,
1507 ×tamp,
1508 &["task1".to_string()],
1509 &entity_id,
1510 )
1511 .await
1512 .unwrap();
1513
1514 assert_eq!(metrics.len(), 1);
1515 let binned_records = PostgresClient::get_binned_genai_task_values(
1516 &pool,
1517 &DriftRequest {
1518 uid: uid.clone(),
1519 time_interval: TimeInterval::OneHour,
1520 max_data_points: 1000,
1521 ..Default::default()
1522 },
1523 &DatabaseSettings::default().retention_period,
1524 &ObjectStorageSettings::default(),
1525 &entity_id,
1526 )
1527 .await
1528 .unwrap();
1529 assert_eq!(binned_records.metrics.len(), 2);
1531
1532 let eval_task = PostgresClient::get_genai_eval_task(&pool, &records[0].record_uid)
1533 .await
1534 .unwrap();
1535
1536 assert_eq!(eval_task[0].record_uid, records[0].record_uid);
1537 }
1538
1539 #[tokio::test]
1540 async fn test_postgres_genai_workflow_result_insert_get() {
1541 let pool = db_pool().await;
1542
1543 let timestamp = Utc::now();
1544
1545 let (uid, entity_id) = PostgresClient::create_entity(
1546 &pool,
1547 SPACE,
1548 NAME,
1549 VERSION,
1550 DriftType::GenAI.to_string(),
1551 )
1552 .await
1553 .unwrap();
1554
1555 for i in 0..2 {
1556 for j in 0..25 {
1557 let record = GenAIEvalWorkflowResult {
1558 record_uid: format!("record_uid_{i}_{j}"),
1559 created_at: Utc::now() + chrono::Duration::hours(i),
1560 entity_id,
1561 total_tasks: 10,
1562 passed_tasks: 8,
1563 failed_tasks: 2,
1564 pass_rate: 0.8,
1565 duration_ms: 1500,
1566 entity_uid: uid.clone(),
1567 execution_plan: ExecutionPlan::default(),
1568 id: 0,
1569 };
1570 let result =
1571 PostgresClient::insert_genai_eval_workflow_record(&pool, &record, &entity_id)
1572 .await
1573 .unwrap();
1574 assert_eq!(result.rows_affected(), 1);
1575 }
1576 }
1577
1578 let metric = PostgresClient::get_genai_workflow_value(&pool, ×tamp, &entity_id)
1579 .await
1580 .unwrap();
1581
1582 assert!(metric.is_some());
1583
1584 let binned_records = PostgresClient::get_binned_genai_workflow_values(
1585 &pool,
1586 &DriftRequest {
1587 uid: uid.clone(),
1588 time_interval: TimeInterval::OneHour,
1589 max_data_points: 1000,
1590 ..Default::default()
1591 },
1592 &DatabaseSettings::default().retention_period,
1593 &ObjectStorageSettings::default(),
1594 &entity_id,
1595 )
1596 .await
1597 .unwrap();
1598 assert_eq!(binned_records.metrics.len(), 1);
1600 }
1601
1602 #[tokio::test]
1603 async fn test_postgres_tags() {
1604 let pool = db_pool().await;
1605 let uid = create_uuid7();
1606
1607 let tag1 = TagRecord {
1608 entity_id: uid.clone(),
1609 entity_type: "service".to_string(),
1610 key: "env".to_string(),
1611 value: "production".to_string(),
1612 };
1613
1614 let tag2 = TagRecord {
1615 entity_id: uid.clone(),
1616 entity_type: "service".to_string(),
1617 key: "team".to_string(),
1618 value: "backend".to_string(),
1619 };
1620
1621 let result = PostgresClient::insert_tag_batch(&pool, &[tag1.clone(), tag2.clone()])
1622 .await
1623 .unwrap();
1624
1625 assert_eq!(result.rows_affected(), 2);
1626
1627 let tags = PostgresClient::get_tags(&pool, "service", &uid)
1628 .await
1629 .unwrap();
1630
1631 assert_eq!(tags.len(), 2);
1632
1633 let tag_filter = vec![Tag {
1634 key: tags.first().unwrap().key.clone(),
1635 value: tags.first().unwrap().value.clone(),
1636 }];
1637
1638 let entity_id = PostgresClient::get_entity_id_by_tags(&pool, "service", &tag_filter, false)
1639 .await
1640 .unwrap();
1641
1642 assert_eq!(entity_id.first().unwrap(), &uid);
1643 }
1644}