1use crate::sql::aggregator::init_trace_cache;
2use crate::sql::cache::entity_cache;
3use crate::sql::cache::init_entity_cache;
4use crate::sql::error::SqlError;
5use crate::sql::traits::{
6 AlertSqlLogic, ArchiveSqlLogic, CustomMetricSqlLogic, GenAIDriftSqlLogic,
7 ObservabilitySqlLogic, ProfileSqlLogic, PsiSqlLogic, SpcSqlLogic, TagSqlLogic, TraceSqlLogic,
8 UserSqlLogic,
9};
10use scouter_settings::DatabaseSettings;
11use scouter_types::{RecordType, ServerRecords, TagRecord, ToDriftRecords, TraceServerRecord};
12use sqlx::ConnectOptions;
13use sqlx::{postgres::PgConnectOptions, Pool, Postgres};
14use std::result::Result::Ok;
15use std::time::Duration;
16use tokio::try_join;
17use tracing::log::LevelFilter;
18use tracing::{debug, error, info, instrument};
19const DEFAULT_BATCH_SIZE: usize = 500;
20
21#[derive(Debug, Clone)]
22#[allow(dead_code)]
23pub struct PostgresClient {}
24
25impl SpcSqlLogic for PostgresClient {}
26impl CustomMetricSqlLogic for PostgresClient {}
27impl PsiSqlLogic for PostgresClient {}
28impl GenAIDriftSqlLogic for PostgresClient {}
29impl UserSqlLogic for PostgresClient {}
30impl ProfileSqlLogic for PostgresClient {}
31impl ObservabilitySqlLogic for PostgresClient {}
32impl AlertSqlLogic for PostgresClient {}
33impl ArchiveSqlLogic for PostgresClient {}
34impl TraceSqlLogic for PostgresClient {}
35impl TagSqlLogic for PostgresClient {}
36
37impl PostgresClient {
38 #[instrument(skip(database_settings))]
44 pub async fn create_db_pool(
45 database_settings: &DatabaseSettings,
46 ) -> Result<Pool<Postgres>, SqlError> {
47 let mut opts: PgConnectOptions = database_settings.connection_uri.parse()?;
48
49 opts = opts.log_statements(LevelFilter::Off);
51
52 let pool = match sqlx::postgres::PgPoolOptions::new()
53 .max_connections(database_settings.max_connections)
54 .min_connections(database_settings.min_connections)
55 .acquire_timeout(Duration::from_secs(
56 database_settings.db_acquire_timeout_seconds,
57 ))
58 .idle_timeout(Duration::from_secs(
59 database_settings.db_idle_timeout_seconds,
60 ))
61 .max_lifetime(Duration::from_secs(
62 database_settings.db_max_lifetime_seconds,
63 ))
64 .test_before_acquire(database_settings.db_test_before_acquire)
65 .connect_with(opts)
66 .await
67 {
68 Ok(pool) => {
69 info!("✅ Successfully connected to database");
70 pool
71 }
72 Err(err) => {
73 error!("🚨 Failed to connect to database {:?}", err);
74 std::process::exit(1);
75 }
76 };
77
78 init_entity_cache(database_settings.entity_cache_size);
80
81 init_trace_cache(
83 pool.clone(),
84 database_settings.flush_interval,
85 database_settings.stale_threshold,
86 database_settings.max_cache_size,
87 )
88 .await?;
89
90 if let Err(err) = Self::run_migrations(&pool).await {
92 error!("🚨 Failed to run migrations {:?}", err);
93 std::process::exit(1);
94 }
95
96 Ok(pool)
97 }
98
99 pub async fn run_migrations(pool: &Pool<Postgres>) -> Result<(), SqlError> {
100 info!("Running migrations");
101 sqlx::migrate!("src/migrations")
102 .run(pool)
103 .await
104 .map_err(SqlError::MigrateError)?;
105
106 debug!("Migrations complete");
107
108 Ok(())
109 }
110}
111
112pub struct MessageHandler {}
113
114impl MessageHandler {
115 #[instrument(skip_all)]
116 pub async fn insert_server_records(
117 pool: &Pool<Postgres>,
118 records: ServerRecords,
119 ) -> Result<(), SqlError> {
120 debug!("Inserting server records: {:?}", records.record_type()?);
121
122 let entity_id = entity_cache()
123 .get_entity_id_from_uid(pool, records.uid()?)
124 .await
125 .inspect_err(|e| error!("Failed to get entity ID from UID: {:?}", e))?;
126
127 match records.record_type()? {
128 RecordType::Spc => {
129 let spc_records = records.to_spc_drift_records()?;
130 debug!("SPC record count: {}", spc_records.len());
131
132 for chunk in spc_records.chunks(DEFAULT_BATCH_SIZE) {
133 PostgresClient::insert_spc_drift_records_batch(pool, chunk, &entity_id)
134 .await
135 .map_err(|e| {
136 error!("Failed to insert SPC drift records batch: {:?}", e);
137 e
138 })?;
139 }
140 }
141
142 RecordType::Psi => {
143 let psi_records = records.to_psi_drift_records()?;
144 debug!("PSI record count: {}", psi_records.len());
145
146 for chunk in psi_records.chunks(DEFAULT_BATCH_SIZE) {
147 PostgresClient::insert_bin_counts_batch(pool, chunk, &entity_id)
148 .await
149 .map_err(|e| {
150 error!("Failed to insert PSI drift records batch: {:?}", e);
151 e
152 })?;
153 }
154 }
155 RecordType::Custom => {
156 let custom_records = records.to_custom_metric_drift_records()?;
157 debug!("Custom record count: {}", custom_records.len());
158
159 for chunk in custom_records.chunks(DEFAULT_BATCH_SIZE) {
160 PostgresClient::insert_custom_metric_values_batch(pool, chunk, &entity_id)
161 .await
162 .map_err(|e| {
163 error!("Failed to insert custom metric records batch: {:?}", e);
164 e
165 })?;
166 }
167 }
168
169 RecordType::GenAIEval => {
170 debug!("LLM Drift record count: {:?}", records.len());
171 let records = records.to_genai_eval_records()?;
172 for record in records {
173 let _ = PostgresClient::insert_genai_eval_record(pool, record, &entity_id)
174 .await
175 .map_err(|e| {
176 error!("Failed to insert GenAI drift record: {:?}", e);
177 });
178 }
179 }
180
181 RecordType::GenAITask => {
182 debug!("GenAI Task count: {:?}", records.len());
183 let records = records.to_genai_task_records()?;
184 for chunk in records.chunks(DEFAULT_BATCH_SIZE) {
185 PostgresClient::insert_eval_task_results_batch(pool, chunk, &entity_id)
186 .await
187 .map_err(|e| {
188 error!("Failed to insert GenAI task records batch: {:?}", e);
189 e
190 })?;
191 }
192 }
193
194 RecordType::GenAIWorkflow => {
195 debug!("GenAI Workflow count: {:?}", records.len());
196 let records = records.to_genai_workflow_records()?;
197 for record in records {
198 let _ = PostgresClient::insert_genai_eval_workflow_record(
199 pool, &record, &entity_id,
200 )
201 .await
202 .map_err(|e| {
203 error!("Failed to insert GenAI workflow record: {:?}", e);
204 });
205 }
206 }
207
208 _ => {
209 error!(
210 "Unsupported record type for batch insert: {:?}",
211 records.record_type()?
212 );
213 return Err(SqlError::UnsupportedBatchTypeError);
214 }
215 }
216
217 Ok(())
218 }
219
220 pub async fn insert_trace_server_record(
221 pool: &Pool<Postgres>,
222 records: TraceServerRecord,
223 ) -> Result<(), SqlError> {
224 let (span_batch, baggage_batch, tag_records) = records.to_records()?;
225 let span_count = span_batch.len();
226
227 let trace_cache = crate::sql::aggregator::get_trace_cache().await;
230 for span in &span_batch {
231 trace_cache.update_trace(span).await;
232 }
233
234 if let Some(trace_service) =
236 scouter_dataframe::parquet::tracing::service::get_trace_span_service()
237 {
238 if let Err(e) = trace_service.write_spans(span_batch).await {
239 error!("Failed to write spans to Delta Lake: {:?}", e);
240 }
241 } else {
242 error!("TraceSpanService not initialized — spans will be lost");
243 }
244
245 let (baggage_result, tag_result) = try_join!(
247 PostgresClient::insert_trace_baggage_batch(pool, &baggage_batch),
248 PostgresClient::insert_tag_batch(pool, &tag_records),
249 )?;
250
251 debug!(
252 span_count,
253 baggage_rows = baggage_result.rows_affected(),
254 total_baggage = baggage_batch.len(),
255 tag_rows = tag_result.rows_affected(),
256 "Successfully processed trace server records"
257 );
258 Ok(())
259 }
260
261 pub async fn insert_tag_record(
262 pool: &Pool<Postgres>,
263 record: TagRecord,
264 ) -> Result<(), SqlError> {
265 let result = PostgresClient::insert_tag_batch(pool, std::slice::from_ref(&record)).await?;
266
267 debug!(
268 rows_affected = result.rows_affected(),
269 entity_type = record.entity_type.as_str(),
270 entity_id = record.entity_id.as_str(),
271 key = record.key.as_str(),
272 "Successfully inserted tag record"
273 );
274
275 Ok(())
276 }
277}
278
279#[cfg(test)]
283mod tests {
284
285 use super::*;
286 use crate::sql::schema::User;
287 use crate::sql::traits::EntitySqlLogic;
288 use chrono::{Duration, Utc};
289 use potato_head::create_uuid7;
290 use rand::Rng;
291 use scouter_semver::VersionType;
292 use scouter_settings::ObjectStorageSettings;
293 use scouter_types::genai::ExecutionPlan;
294 use scouter_types::psi::{Bin, BinType, PsiDriftConfig, PsiFeatureDriftProfile};
295 use scouter_types::spc::SpcDriftProfile;
296 use scouter_types::*;
297 use serde_json::Value;
298
299 const SPACE: &str = "space";
300 const NAME: &str = "name";
301 const VERSION: &str = "1.0.0";
302 const ENTITY_ID: i32 = 9999;
303
304 pub async fn cleanup(pool: &Pool<Postgres>) {
305 sqlx::raw_sql(
306 r#"
307
308 DELETE
309 FROM scouter.drift_entities;
310
311 DELETE
312 FROM scouter.spc_drift;
313
314 DELETE
315 FROM scouter.observability_metric;
316
317 DELETE
318 FROM scouter.custom_drift;
319
320 DELETE
321 FROM scouter.drift_alert;
322
323 DELETE
324 FROM scouter.drift_profile;
325
326 DELETE
327 FROM scouter.psi_drift;
328
329 DELETE
330 FROM scouter.user;
331
332 DELETE
333 FROM scouter.genai_eval_record;
334
335 DELETE
336 FROM scouter.genai_eval_task;
337
338 DELETE
339 FROM scouter.genai_eval_workflow;
340
341 DELETE
342 FROM scouter.spans;
343
344 DELETE
345 FROM scouter.traces;
346
347 DELETE
348 FROM scouter.trace_entities;
349
350 DELETE
351 FROM scouter.trace_baggage;
352
353 DELETE
354 FROM scouter.tags;
355 "#,
356 )
357 .fetch_all(pool)
358 .await
359 .unwrap();
360 }
361
362 pub async fn db_pool() -> Pool<Postgres> {
363 let pool = PostgresClient::create_db_pool(&DatabaseSettings::default())
364 .await
365 .unwrap();
366 cleanup(&pool).await;
367 pool
368 }
369
370 pub async fn insert_profile_to_db(
371 pool: &Pool<Postgres>,
372 profile: &DriftProfile,
373 active: bool,
374 deactivate_others: bool,
375 ) -> String {
376 let base_args = profile.get_base_args();
377 let version_request = VersionRequest {
378 version: None,
379 version_type: VersionType::Minor,
380 pre_tag: None,
381 build_tag: None,
382 };
383 let version = PostgresClient::get_next_profile_version(pool, &base_args, version_request)
384 .await
385 .unwrap();
386
387 let result = PostgresClient::insert_drift_profile(
388 pool,
389 profile,
390 &base_args,
391 &version,
392 &active,
393 &deactivate_others,
394 )
395 .await
396 .unwrap();
397
398 result
399 }
400
401 #[tokio::test]
402 async fn test_postgres_start() {
403 let _pool = db_pool().await;
404 }
405
406 #[tokio::test]
407 async fn test_postgres_drift_alert() {
408 let pool = db_pool().await;
409 let entity_id = 9999;
410
411 for i in 0..10 {
413 let alert = AlertMap::Custom(custom::ComparisonMetricAlert {
414 metric_name: "test".to_string(),
415 baseline_value: 0 as f64,
416 observed_value: i as f64,
417 delta: None,
418 alert_threshold: AlertThreshold::Above,
419 });
420
421 let result = PostgresClient::insert_drift_alert(&pool, &entity_id, &alert)
422 .await
423 .unwrap();
424
425 assert_eq!(result.rows_affected(), 1);
426
427 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
429 }
430
431 let request = DriftAlertPaginationRequest {
433 uid: create_uuid7(),
434 active: Some(true),
435 limit: Some(50),
436 ..Default::default()
437 };
438
439 let response = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
440 .await
441 .unwrap();
442
443 assert_eq!(response.items.len(), 10);
444 assert!(!response.has_next); assert!(!response.has_previous); assert!(response.next_cursor.is_none());
447 assert!(response.previous_cursor.is_some());
448
449 let request = DriftAlertPaginationRequest {
451 uid: create_uuid7(),
452 active: Some(true),
453 limit: Some(3),
454 direction: Some("next".to_string()),
455 ..Default::default()
456 };
457
458 let page1 = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
459 .await
460 .unwrap();
461
462 assert_eq!(page1.items.len(), 3);
463 assert!(page1.has_next);
464 assert!(!page1.has_previous);
465 assert!(page1.next_cursor.is_some());
466 assert!(page1.previous_cursor.is_some());
467
468 let next_cursor = page1.next_cursor.unwrap();
470 let request = DriftAlertPaginationRequest {
471 uid: create_uuid7(),
472 active: Some(true),
473 limit: Some(3),
474 cursor_created_at: Some(next_cursor.created_at),
475 cursor_id: Some(next_cursor.id as i32),
476 direction: Some("next".to_string()),
477 start_datetime: None,
478 end_datetime: None,
479 };
480
481 let page2 = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
482 .await
483 .unwrap();
484
485 assert_eq!(page2.items.len(), 3);
486 assert!(page2.has_next); assert!(page2.has_previous); assert!(page2.next_cursor.is_some());
489 assert!(page2.previous_cursor.is_some());
490
491 let page1_ids: std::collections::HashSet<_> = page1.items.iter().map(|a| a.id).collect();
493 let page2_ids: std::collections::HashSet<_> = page2.items.iter().map(|a| a.id).collect();
494 assert!(page1_ids.is_disjoint(&page2_ids));
495
496 let prev_cursor = page2.previous_cursor.unwrap();
498 let request = DriftAlertPaginationRequest {
499 uid: create_uuid7(),
500 active: Some(true),
501 limit: Some(3),
502 cursor_created_at: Some(prev_cursor.created_at),
503 cursor_id: Some(prev_cursor.id as i32),
504 direction: Some("previous".to_string()),
505 start_datetime: None,
506 end_datetime: None,
507 };
508
509 let page_back = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
510 .await
511 .unwrap();
512
513 assert_eq!(page_back.items.len(), 3);
514 assert!(page_back.has_next); assert_eq!(
517 page_back.items.iter().map(|a| a.id).collect::<Vec<_>>(),
518 page1.items.iter().map(|a| a.id).collect::<Vec<_>>()
519 );
520
521 let to_deactivate = &page1.items[0];
524 let update_request = UpdateAlertStatus {
525 id: to_deactivate.id,
526 active: false,
527 space: "test_space".to_string(),
528 };
529
530 PostgresClient::update_drift_alert_status(&pool, &update_request)
531 .await
532 .unwrap();
533
534 let request = DriftAlertPaginationRequest {
536 uid: create_uuid7(),
537 active: Some(true),
538 limit: Some(50),
539 ..Default::default()
540 };
541
542 let active_alerts = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
543 .await
544 .unwrap();
545
546 assert_eq!(active_alerts.items.len(), 9); assert!(active_alerts.items.iter().all(|a| a.active));
548
549 let request = DriftAlertPaginationRequest {
551 uid: create_uuid7(),
552 active: Some(false),
553 limit: Some(50),
554 ..Default::default()
555 };
556
557 let inactive_alerts =
558 PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
559 .await
560 .unwrap();
561
562 assert_eq!(inactive_alerts.items.len(), 1);
563 assert!(!inactive_alerts.items[0].active);
564
565 let request = DriftAlertPaginationRequest {
567 uid: create_uuid7(),
568 active: None, limit: Some(50),
570 ..Default::default()
571 };
572
573 let all_alerts = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
574 .await
575 .unwrap();
576
577 assert_eq!(all_alerts.items.len(), 10);
578
579 let request = DriftAlertPaginationRequest {
581 uid: create_uuid7(),
582 active: Some(true),
583 limit: Some(3),
584 ..Default::default()
585 };
586
587 let non_existent = PostgresClient::get_paginated_drift_alerts(&pool, &request, &999999)
588 .await
589 .unwrap();
590
591 assert_eq!(non_existent.items.len(), 0);
592 assert!(!non_existent.has_next);
593 assert!(!non_existent.has_previous);
594 assert!(non_existent.next_cursor.is_none());
595 assert!(non_existent.previous_cursor.is_none());
596 }
597
598 #[tokio::test]
599 async fn test_postgres_spc_drift_record() {
600 let pool = db_pool().await;
601
602 let record1 = SpcRecord {
603 created_at: Utc::now(),
604 uid: create_uuid7(),
605 feature: "test".to_string(),
606 value: 1.0,
607 entity_id: 0,
608 };
609
610 let record2 = SpcRecord {
611 created_at: Utc::now(),
612 uid: create_uuid7(),
613 feature: "test2".to_string(),
614 value: 2.0,
615 entity_id: 0,
616 };
617
618 let result =
619 PostgresClient::insert_spc_drift_records_batch(&pool, &[record1, record2], &ENTITY_ID)
620 .await
621 .unwrap();
622
623 assert_eq!(result.rows_affected(), 2);
624 }
625
626 #[tokio::test]
627 async fn test_postgres_bin_count() {
628 let pool = db_pool().await;
629
630 let record1 = PsiRecord {
631 created_at: Utc::now(),
632 uid: create_uuid7(),
633 feature: "test".to_string(),
634 bin_id: 1,
635 bin_count: 1,
636 entity_id: ENTITY_ID,
637 };
638
639 let record2 = PsiRecord {
640 created_at: Utc::now(),
641 uid: create_uuid7(),
642 feature: "test2".to_string(),
643 bin_id: 2,
644 bin_count: 2,
645 entity_id: ENTITY_ID,
646 };
647
648 let result =
649 PostgresClient::insert_bin_counts_batch(&pool, &[record1, record2], &ENTITY_ID)
650 .await
651 .unwrap();
652
653 assert_eq!(result.rows_affected(), 2);
654 }
655
656 #[tokio::test]
657 async fn test_postgres_observability_record() {
658 let pool = db_pool().await;
659
660 let record = ObservabilityMetrics::default();
661
662 let result = PostgresClient::insert_observability_record(&pool, &record, &ENTITY_ID)
663 .await
664 .unwrap();
665
666 assert_eq!(result.rows_affected(), 1);
667 }
668
669 #[tokio::test]
670 async fn test_postgres_crud_drift_profile() {
671 let pool = db_pool().await;
672
673 let mut spc_profile = SpcDriftProfile::default();
674 let profile = DriftProfile::Spc(spc_profile.clone());
675 let uid = insert_profile_to_db(&pool, &profile, false, false).await;
676 assert!(!uid.is_empty());
677
678 let entity_id = PostgresClient::get_entity_id_from_uid(&pool, &uid)
679 .await
680 .unwrap();
681
682 spc_profile.scouter_version = "test".to_string();
683
684 let result = PostgresClient::update_drift_profile(
685 &pool,
686 &DriftProfile::Spc(spc_profile.clone()),
687 &entity_id,
688 )
689 .await
690 .unwrap();
691
692 assert_eq!(result.rows_affected(), 1);
693
694 let profile = PostgresClient::get_drift_profile(&pool, &entity_id)
695 .await
696 .unwrap();
697
698 let deserialized = serde_json::from_value::<SpcDriftProfile>(profile.unwrap()).unwrap();
699
700 assert_eq!(deserialized, spc_profile);
701
702 PostgresClient::update_drift_profile_status(
703 &pool,
704 &ProfileStatusRequest {
705 name: spc_profile.config.name.clone(),
706 space: spc_profile.config.space.clone(),
707 version: spc_profile.config.version.clone(),
708 active: false,
709 drift_type: Some(DriftType::Spc),
710 deactivate_others: false,
711 },
712 )
713 .await
714 .unwrap();
715 }
716
717 #[tokio::test]
718 async fn test_postgres_get_features() {
719 let pool = db_pool().await;
720
721 let timestamp = Utc::now();
722
723 for _ in 0..10 {
724 let mut records = Vec::new();
725 for j in 0..10 {
726 let record = SpcRecord {
727 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
728 uid: create_uuid7(),
729 feature: format!("test{j}"),
730 value: j as f64,
731 entity_id: ENTITY_ID,
732 };
733
734 records.push(record);
735 }
736
737 let result =
738 PostgresClient::insert_spc_drift_records_batch(&pool, &records, &ENTITY_ID)
739 .await
740 .unwrap();
741 assert_eq!(result.rows_affected(), records.len() as u64);
742 }
743
744 let features = PostgresClient::get_spc_features(&pool, &ENTITY_ID)
745 .await
746 .unwrap();
747 assert_eq!(features.len(), 10);
748
749 let records =
750 PostgresClient::get_spc_drift_records(&pool, ×tamp, &features, &ENTITY_ID)
751 .await
752 .unwrap();
753
754 assert_eq!(records.features.len(), 10);
755
756 let binned_records = PostgresClient::get_binned_spc_drift_records(
757 &pool,
758 &DriftRequest {
759 uid: create_uuid7(),
760 time_interval: TimeInterval::FifteenMinutes,
761 max_data_points: 10,
762 ..Default::default()
763 },
764 &DatabaseSettings::default().retention_period,
765 &ObjectStorageSettings::default(),
766 &ENTITY_ID,
767 )
768 .await
769 .unwrap();
770
771 assert_eq!(binned_records.features.len(), 10);
772 }
773
774 #[tokio::test]
775 async fn test_postgres_bin_proportions() {
776 let pool = db_pool().await;
777
778 let timestamp = Utc::now();
779
780 let num_features = 3;
781 let num_bins = 5;
782
783 let features = (0..=num_features)
784 .map(|feature| {
785 let bins = (0..=num_bins)
786 .map(|bind_id| Bin {
787 id: bind_id,
788 lower_limit: None,
789 upper_limit: None,
790 proportion: 0.0,
791 })
792 .collect();
793 let feature_name = format!("feature{feature}");
794 let feature_profile = PsiFeatureDriftProfile {
795 id: feature_name.clone(),
796 bins,
797 timestamp,
798 bin_type: BinType::Numeric,
799 };
800 (feature_name, feature_profile)
801 })
802 .collect();
803
804 let profile = &DriftProfile::Psi(psi::PsiDriftProfile::new(
805 features,
806 PsiDriftConfig {
807 space: SPACE.to_string(),
808 name: NAME.to_string(),
809 version: VERSION.to_string(),
810 ..Default::default()
811 },
812 ));
813 let uid = insert_profile_to_db(&pool, profile, false, false).await;
814 let entity_id = PostgresClient::get_entity_id_from_uid(&pool, &uid)
815 .await
816 .unwrap();
817
818 for feature in 0..num_features {
819 for bin in 0..=num_bins {
820 let mut records = Vec::new();
821 for j in 0..=100 {
822 let record = PsiRecord {
823 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
824 uid: create_uuid7(),
825 feature: format!("feature{feature}"),
826 bin_id: bin,
827 bin_count: rand::rng().random_range(0..10) as i32,
828 entity_id: ENTITY_ID,
829 };
830
831 records.push(record);
832 }
833 PostgresClient::insert_bin_counts_batch(&pool, &records, &entity_id)
834 .await
835 .unwrap();
836 }
837 }
838
839 let binned_records = PostgresClient::get_feature_distributions(
840 &pool,
841 ×tamp,
842 &["feature0".to_string()],
843 &entity_id,
844 )
845 .await
846 .unwrap();
847
848 let bin_proportion = binned_records
850 .distributions
851 .get("feature0")
852 .unwrap()
853 .bins
854 .get(&1)
855 .unwrap();
856
857 assert!(*bin_proportion > 0.1 && *bin_proportion < 0.2);
858
859 let binned_records = PostgresClient::get_binned_psi_drift_records(
860 &pool,
861 &DriftRequest {
862 uid: create_uuid7(),
863 time_interval: TimeInterval::OneHour,
864 max_data_points: 1000,
865 ..Default::default()
866 },
867 &DatabaseSettings::default().retention_period,
868 &ObjectStorageSettings::default(),
869 &entity_id,
870 )
871 .await
872 .unwrap();
873 assert_eq!(binned_records.len(), 3);
875 }
876
877 #[tokio::test]
878 async fn test_postgres_cru_custom_metric() {
879 let pool = db_pool().await;
880 let timestamp = Utc::now();
881
882 let (uid, entity_id) = PostgresClient::create_entity(
883 &pool,
884 SPACE,
885 NAME,
886 VERSION,
887 DriftType::Custom.to_string(),
888 )
889 .await
890 .unwrap();
891
892 for i in 0..2 {
893 let mut records = Vec::new();
894 for j in 0..25 {
895 let record = CustomMetricRecord {
896 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
897 uid: uid.clone(),
898 metric: format!("metric{i}"),
899 value: rand::rng().random_range(0..10) as f64,
900 entity_id: ENTITY_ID,
901 };
902 records.push(record);
903 }
904 let result =
905 PostgresClient::insert_custom_metric_values_batch(&pool, &records, &entity_id)
906 .await
907 .unwrap();
908 assert_eq!(result.rows_affected(), 25);
909 }
910
911 let record = CustomMetricRecord {
913 created_at: Utc::now(),
914 uid: uid.clone(),
915 metric: "metric3".to_string(),
916 value: rand::rng().random_range(0..10) as f64,
917 entity_id: ENTITY_ID,
918 };
919
920 let result =
921 PostgresClient::insert_custom_metric_values_batch(&pool, &[record], &entity_id)
922 .await
923 .unwrap();
924 assert_eq!(result.rows_affected(), 1);
925
926 let metrics = PostgresClient::get_custom_metric_values(
927 &pool,
928 ×tamp,
929 &["metric1".to_string()],
930 &entity_id,
931 )
932 .await
933 .unwrap();
934
935 assert_eq!(metrics.len(), 1);
936
937 let binned_records = PostgresClient::get_binned_custom_drift_records(
938 &pool,
939 &DriftRequest {
940 uid: uid.clone(),
941 time_interval: TimeInterval::OneHour,
942 max_data_points: 1000,
943 ..Default::default()
944 },
945 &DatabaseSettings::default().retention_period,
946 &ObjectStorageSettings::default(),
947 &entity_id,
948 )
949 .await
950 .unwrap();
951 assert_eq!(binned_records.metrics.len(), 3);
953 }
954
955 #[tokio::test]
956 async fn test_postgres_user() {
957 let pool = db_pool().await;
958 let recovery_codes = vec!["recovery_code_1".to_string(), "recovery_code_2".to_string()];
959
960 let user = User::new(
962 "user".to_string(),
963 "pass".to_string(),
964 "email".to_string(),
965 recovery_codes,
966 None,
967 None,
968 None,
969 None,
970 );
971 PostgresClient::insert_user(&pool, &user).await.unwrap();
972
973 let mut user = PostgresClient::get_user(&pool, "user")
975 .await
976 .unwrap()
977 .unwrap();
978
979 assert_eq!(user.username, "user");
980 assert_eq!(user.group_permissions, vec!["user"]);
981 assert_eq!(user.email, "email");
982
983 user.active = false;
985 user.refresh_token = Some("token".to_string());
986
987 PostgresClient::update_user(&pool, &user).await.unwrap();
989 let user = PostgresClient::get_user(&pool, "user")
990 .await
991 .unwrap()
992 .unwrap();
993 assert!(!user.active);
994 assert_eq!(user.refresh_token.unwrap(), "token");
995
996 let users = PostgresClient::get_users(&pool).await.unwrap();
998 assert_eq!(users.len(), 1);
999
1000 let is_last_admin = PostgresClient::is_last_admin(&pool, "user").await.unwrap();
1002 assert!(!is_last_admin);
1003
1004 PostgresClient::delete_user(&pool, "user").await.unwrap();
1006 }
1007
1008 #[tokio::test]
1009 async fn test_postgres_genai_eval_record_reschedule() {
1010 let pool = db_pool().await;
1011
1012 let (uid, entity_id) = PostgresClient::create_entity(
1013 &pool,
1014 SPACE,
1015 NAME,
1016 VERSION,
1017 DriftType::GenAI.to_string(),
1018 )
1019 .await
1020 .unwrap();
1021
1022 let input = "This is a test input";
1023 let output = "This is a test response";
1024
1025 for j in 0..10 {
1026 let context = serde_json::json!({
1027 "input": input,
1028 "response": output,
1029 });
1030 let record = EvalRecord {
1031 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1032 context,
1033 status: Status::Pending,
1034 id: 0, uid: format!("test_{}", j),
1036 entity_uid: uid.clone(),
1037 entity_id,
1038 tags: vec!["key=value".to_string()],
1039 ..Default::default()
1040 };
1041
1042 let boxed = BoxedEvalRecord::new(record);
1043
1044 let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1045 .await
1046 .unwrap();
1047
1048 assert_eq!(result.rows_affected(), 1);
1049 }
1050
1051 let features = PostgresClient::get_genai_eval_records(&pool, None, None, &entity_id)
1052 .await
1053 .unwrap();
1054 assert_eq!(features.len(), 10);
1055
1056 let pending_tasks = PostgresClient::get_pending_genai_eval_record(&pool)
1058 .await
1059 .unwrap();
1060
1061 assert!(pending_tasks.is_some());
1063
1064 let task_input = &pending_tasks.as_ref().unwrap().context["input"];
1066 assert_eq!(*task_input, "This is a test input".to_string());
1067
1068 PostgresClient::reschedule_genai_eval_record(
1070 &pool,
1071 &pending_tasks.as_ref().unwrap().uid,
1072 Duration::seconds(30),
1073 )
1074 .await
1075 .unwrap();
1076 }
1077
1078 #[tokio::test]
1079 async fn test_postgres_genai_eval_record_insert_get() {
1080 let pool = db_pool().await;
1081
1082 let (uid, entity_id) = PostgresClient::create_entity(
1083 &pool,
1084 SPACE,
1085 NAME,
1086 VERSION,
1087 DriftType::GenAI.to_string(),
1088 )
1089 .await
1090 .unwrap();
1091
1092 let input = "This is a test input";
1093 let output = "This is a test response";
1094
1095 for j in 0..10 {
1096 let context = serde_json::json!({
1097 "input": input,
1098 "response": output,
1099 });
1100 let record = EvalRecord {
1101 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1102 context,
1103 status: Status::Pending,
1104 id: 0, uid: format!("test_{}", j),
1106 entity_uid: uid.clone(),
1107 entity_id,
1108 ..Default::default()
1109 };
1110
1111 let boxed = BoxedEvalRecord::new(record);
1112
1113 let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1114 .await
1115 .unwrap();
1116
1117 assert_eq!(result.rows_affected(), 1);
1118 }
1119
1120 let features = PostgresClient::get_genai_eval_records(&pool, None, None, &entity_id)
1121 .await
1122 .unwrap();
1123 assert_eq!(features.len(), 10);
1124
1125 let pending_tasks = PostgresClient::get_pending_genai_eval_record(&pool)
1127 .await
1128 .unwrap();
1129
1130 assert!(pending_tasks.is_some());
1132
1133 let task_input = &pending_tasks.as_ref().unwrap().context["input"];
1135 assert_eq!(*task_input, "This is a test input".to_string());
1136
1137 PostgresClient::update_genai_eval_record_status(
1139 &pool,
1140 &pending_tasks.unwrap(),
1141 Status::Processed,
1142 &(1_i64),
1143 )
1144 .await
1145 .unwrap();
1146
1147 let processed_tasks = PostgresClient::get_genai_eval_records(
1149 &pool,
1150 None,
1151 Some(Status::Processed),
1152 &entity_id,
1153 )
1154 .await
1155 .unwrap();
1156
1157 assert_eq!(processed_tasks.len(), 1);
1159 }
1160
1161 #[tokio::test]
1162 async fn test_postgres_genai_eval_record_pagination() {
1163 let pool = db_pool().await;
1164
1165 let (uid, entity_id) = PostgresClient::create_entity(
1166 &pool,
1167 SPACE,
1168 NAME,
1169 VERSION,
1170 DriftType::GenAI.to_string(),
1171 )
1172 .await
1173 .unwrap();
1174
1175 let input = "This is a test input";
1176 let output = "This is a test response";
1177
1178 for j in 0..10 {
1180 let context = serde_json::json!({
1181 "input": input,
1182 "response": output,
1183 });
1184 let record = EvalRecord {
1185 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1186 context,
1187 status: Status::Pending,
1188 id: 0, uid: format!("test_{}", j),
1190 entity_uid: uid.clone(),
1191 entity_id: ENTITY_ID,
1192 ..Default::default()
1193 };
1194
1195 let boxed = BoxedEvalRecord::new(record);
1196
1197 let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1198 .await
1199 .unwrap();
1200
1201 assert_eq!(result.rows_affected(), 1);
1202 }
1203
1204 let params = EvalRecordPaginationRequest {
1206 status: None,
1207 limit: Some(5),
1208 cursor_created_at: None,
1209 cursor_id: None,
1210 direction: None,
1211 ..Default::default()
1212 };
1213
1214 let page1 = PostgresClient::get_paginated_genai_eval_records(&pool, ¶ms, &entity_id)
1215 .await
1216 .unwrap();
1217
1218 assert_eq!(page1.items.len(), 5, "Page 1 should have 5 records");
1219 assert!(page1.has_next, "Should have next page");
1220 assert!(
1221 !page1.has_previous,
1222 "Should not have previous page (first page)"
1223 );
1224 assert!(page1.next_cursor.is_some(), "Should have next cursor");
1225
1226 let page1_first = page1.items.first().unwrap();
1228 let page1_last = page1.items.last().unwrap();
1229
1230 assert!(
1231 page1_first.created_at >= page1_last.created_at,
1232 "Page 1 should be sorted newest first (DESC)"
1233 );
1234
1235 let next_cursor = page1.next_cursor.unwrap();
1237
1238 let params = EvalRecordPaginationRequest {
1239 status: None,
1240 limit: Some(5),
1241 cursor_created_at: Some(next_cursor.created_at),
1242 cursor_id: Some(next_cursor.id),
1243 direction: None,
1244 ..Default::default()
1245 };
1246
1247 let page2 = PostgresClient::get_paginated_genai_eval_records(&pool, ¶ms, &entity_id)
1248 .await
1249 .unwrap();
1250
1251 assert_eq!(page2.items.len(), 5, "Page 2 should have 5 records");
1252 assert!(!page2.has_next, "Should not have next page (last page)");
1253 assert!(page2.has_previous, "Should have previous page");
1254 assert!(
1255 page2.previous_cursor.is_some(),
1256 "Should have previous cursor"
1257 );
1258
1259 let page2_first = page2.items.first().unwrap();
1260
1261 assert!(
1263 page2_first.created_at < page1_last.created_at
1264 || (page2_first.created_at == page1_last.created_at
1265 && page2_first.id < page1_last.id),
1266 "Page 2 should start with records older than Page 1 last item"
1267 );
1268
1269 let all_ids: Vec<i64> = page1
1271 .items
1272 .iter()
1273 .chain(page2.items.iter())
1274 .map(|r| r.id)
1275 .collect();
1276
1277 assert_eq!(all_ids.len(), 10, "Should have 10 unique records total");
1278
1279 let unique_ids: std::collections::HashSet<_> = all_ids.iter().collect();
1281 assert_eq!(unique_ids.len(), 10, "All IDs should be unique");
1282
1283 let previous_cursor = page2.previous_cursor.unwrap();
1286
1287 let params = EvalRecordPaginationRequest {
1288 status: None,
1289 limit: Some(5),
1290 cursor_created_at: Some(previous_cursor.created_at),
1291 cursor_id: Some(previous_cursor.id),
1292 direction: Some("previous".to_string()),
1293 ..Default::default()
1294 };
1295
1296 let page1_again =
1297 PostgresClient::get_paginated_genai_eval_records(&pool, ¶ms, &entity_id)
1298 .await
1299 .unwrap();
1300
1301 assert_eq!(
1302 page1_again.items.len(),
1303 5,
1304 "Going back should return 5 records"
1305 );
1306
1307 assert_eq!(
1309 page1_again.items.first().unwrap().id,
1310 page1_first.id,
1311 "Should return to the same first record"
1312 );
1313 }
1314
1315 #[tokio::test]
1316 async fn test_postgres_genai_eval_workflow_pagination() {
1317 let pool = db_pool().await;
1318
1319 let (_uid, entity_id) = PostgresClient::create_entity(
1320 &pool,
1321 SPACE,
1322 NAME,
1323 VERSION,
1324 DriftType::GenAI.to_string(),
1325 )
1326 .await
1327 .unwrap();
1328
1329 for j in 0..10 {
1331 let record = GenAIEvalWorkflowResult {
1332 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1333 record_uid: format!("test_{}", j),
1334 entity_id,
1335 ..Default::default()
1336 };
1337
1338 let result =
1339 PostgresClient::insert_genai_eval_workflow_record(&pool, &record, &entity_id)
1340 .await
1341 .unwrap();
1342
1343 assert_eq!(result.rows_affected(), 1);
1344 }
1345
1346 let params = EvalRecordPaginationRequest {
1348 status: None,
1349 limit: Some(5),
1350 cursor_created_at: None,
1351 cursor_id: None,
1352 direction: None,
1353 ..Default::default()
1354 };
1355
1356 let page1 =
1357 PostgresClient::get_paginated_genai_eval_workflow_records(&pool, ¶ms, &entity_id)
1358 .await
1359 .unwrap();
1360
1361 assert_eq!(page1.items.len(), 5, "Page 1 should have 5 records");
1362 assert!(page1.has_next, "Should have next page");
1363 assert!(
1364 !page1.has_previous,
1365 "Should not have previous page (first page)"
1366 );
1367 assert!(page1.next_cursor.is_some(), "Should have next cursor");
1368
1369 let page1_first = page1.items.first().unwrap();
1371 let page1_last = page1.items.last().unwrap();
1372
1373 assert!(
1374 page1_first.created_at >= page1_last.created_at,
1375 "Page 1 should be sorted newest first (DESC)"
1376 );
1377
1378 let next_cursor = page1.next_cursor.unwrap();
1380
1381 let params = EvalRecordPaginationRequest {
1382 status: None,
1383 limit: Some(5),
1384 cursor_created_at: Some(next_cursor.created_at),
1385 cursor_id: Some(next_cursor.id),
1386 direction: None,
1387 ..Default::default()
1388 };
1389
1390 let page2 =
1391 PostgresClient::get_paginated_genai_eval_workflow_records(&pool, ¶ms, &entity_id)
1392 .await
1393 .unwrap();
1394
1395 assert_eq!(page2.items.len(), 5, "Page 2 should have 5 records");
1396 assert!(!page2.has_next, "Should not have next page (last page)");
1397 assert!(page2.has_previous, "Should have previous page");
1398 assert!(
1399 page2.previous_cursor.is_some(),
1400 "Should have previous cursor"
1401 );
1402
1403 let page2_first = page2.items.first().unwrap();
1404
1405 assert!(
1407 page2_first.created_at < page1_last.created_at
1408 || (page2_first.created_at == page1_last.created_at
1409 && page2_first.id < page1_last.id),
1410 "Page 2 should start with records older than Page 1 last item"
1411 );
1412
1413 let all_ids: Vec<i64> = page1
1415 .items
1416 .iter()
1417 .chain(page2.items.iter())
1418 .map(|r| r.id)
1419 .collect();
1420
1421 assert_eq!(all_ids.len(), 10, "Should have 10 unique records total");
1422
1423 let unique_ids: std::collections::HashSet<_> = all_ids.iter().collect();
1425 assert_eq!(unique_ids.len(), 10, "All IDs should be unique");
1426
1427 let previous_cursor = page2.previous_cursor.unwrap();
1430
1431 let params = EvalRecordPaginationRequest {
1432 status: None,
1433 limit: Some(5),
1434 cursor_created_at: Some(previous_cursor.created_at),
1435 cursor_id: Some(previous_cursor.id),
1436 direction: Some("previous".to_string()),
1437 ..Default::default()
1438 };
1439
1440 let page1_again =
1441 PostgresClient::get_paginated_genai_eval_workflow_records(&pool, ¶ms, &entity_id)
1442 .await
1443 .unwrap();
1444
1445 assert_eq!(
1446 page1_again.items.len(),
1447 5,
1448 "Going back should return 5 records"
1449 );
1450
1451 assert_eq!(
1453 page1_again.items.first().unwrap().id,
1454 page1_first.id,
1455 "Should return to the same first record"
1456 );
1457 }
1458
1459 #[tokio::test]
1460 async fn test_postgres_genai_task_result_insert_get() {
1461 let pool = db_pool().await;
1462
1463 let timestamp = Utc::now();
1464
1465 let (uid, entity_id) = PostgresClient::create_entity(
1466 &pool,
1467 SPACE,
1468 NAME,
1469 VERSION,
1470 DriftType::GenAI.to_string(),
1471 )
1472 .await
1473 .unwrap();
1474
1475 let mut records = Vec::new();
1476 for i in 0..2 {
1477 for j in 0..25 {
1478 let record = EvalTaskResult {
1479 record_uid: format!("record_uid_{i}_{j}"),
1480 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1481 start_time: Utc::now(),
1482 end_time: Utc::now() + chrono::Duration::seconds(1),
1483 entity_id,
1484 task_id: format!("task{i}"),
1485 task_type: scouter_types::genai::EvaluationTaskType::Assertion,
1486 passed: true,
1487 value: j as f64,
1488 assertion: Assertion::FieldPath(Some(format!("field.path.{i}"))),
1489 operator: scouter_types::genai::ComparisonOperator::Contains,
1490 expected: Value::Null,
1491 actual: Value::Null,
1492 message: "All good".to_string(),
1493 entity_uid: uid.clone(),
1494 condition: false,
1495 stage: 0_i32,
1496 };
1497 records.push(record);
1498 }
1499 let result =
1500 PostgresClient::insert_eval_task_results_batch(&pool, &records, &entity_id)
1501 .await
1502 .unwrap();
1503 assert_eq!(result.rows_affected(), 25);
1504 }
1505
1506 let metrics = PostgresClient::get_genai_task_values(
1507 &pool,
1508 ×tamp,
1509 &["task1".to_string()],
1510 &entity_id,
1511 )
1512 .await
1513 .unwrap();
1514
1515 assert_eq!(metrics.len(), 1);
1516 let binned_records = PostgresClient::get_binned_genai_task_values(
1517 &pool,
1518 &DriftRequest {
1519 uid: uid.clone(),
1520 time_interval: TimeInterval::OneHour,
1521 max_data_points: 1000,
1522 ..Default::default()
1523 },
1524 &DatabaseSettings::default().retention_period,
1525 &ObjectStorageSettings::default(),
1526 &entity_id,
1527 )
1528 .await
1529 .unwrap();
1530 assert_eq!(binned_records.metrics.len(), 2);
1532
1533 let eval_task = PostgresClient::get_genai_eval_task(&pool, &records[0].record_uid)
1534 .await
1535 .unwrap();
1536
1537 assert_eq!(eval_task[0].record_uid, records[0].record_uid);
1538 }
1539
1540 #[tokio::test]
1541 async fn test_postgres_genai_workflow_result_insert_get() {
1542 let pool = db_pool().await;
1543
1544 let timestamp = Utc::now();
1545
1546 let (uid, entity_id) = PostgresClient::create_entity(
1547 &pool,
1548 SPACE,
1549 NAME,
1550 VERSION,
1551 DriftType::GenAI.to_string(),
1552 )
1553 .await
1554 .unwrap();
1555
1556 for i in 0..2 {
1557 for j in 0..25 {
1558 let record = GenAIEvalWorkflowResult {
1559 record_uid: format!("record_uid_{i}_{j}"),
1560 created_at: Utc::now() + chrono::Duration::hours(i),
1561 entity_id,
1562 total_tasks: 10,
1563 passed_tasks: 8,
1564 failed_tasks: 2,
1565 pass_rate: 0.8,
1566 duration_ms: 1500,
1567 entity_uid: uid.clone(),
1568 execution_plan: ExecutionPlan::default(),
1569 id: 0,
1570 };
1571 let result =
1572 PostgresClient::insert_genai_eval_workflow_record(&pool, &record, &entity_id)
1573 .await
1574 .unwrap();
1575 assert_eq!(result.rows_affected(), 1);
1576 }
1577 }
1578
1579 let metric = PostgresClient::get_genai_workflow_value(&pool, ×tamp, &entity_id)
1580 .await
1581 .unwrap();
1582
1583 assert!(metric.is_some());
1584
1585 let binned_records = PostgresClient::get_binned_genai_workflow_values(
1586 &pool,
1587 &DriftRequest {
1588 uid: uid.clone(),
1589 time_interval: TimeInterval::OneHour,
1590 max_data_points: 1000,
1591 ..Default::default()
1592 },
1593 &DatabaseSettings::default().retention_period,
1594 &ObjectStorageSettings::default(),
1595 &entity_id,
1596 )
1597 .await
1598 .unwrap();
1599 assert_eq!(binned_records.metrics.len(), 1);
1601 }
1602
1603 #[tokio::test]
1604 async fn test_postgres_tags() {
1605 let pool = db_pool().await;
1606 let uid = create_uuid7();
1607
1608 let tag1 = TagRecord {
1609 entity_id: uid.clone(),
1610 entity_type: "service".to_string(),
1611 key: "env".to_string(),
1612 value: "production".to_string(),
1613 };
1614
1615 let tag2 = TagRecord {
1616 entity_id: uid.clone(),
1617 entity_type: "service".to_string(),
1618 key: "team".to_string(),
1619 value: "backend".to_string(),
1620 };
1621
1622 let result = PostgresClient::insert_tag_batch(&pool, &[tag1.clone(), tag2.clone()])
1623 .await
1624 .unwrap();
1625
1626 assert_eq!(result.rows_affected(), 2);
1627
1628 let tags = PostgresClient::get_tags(&pool, "service", &uid)
1629 .await
1630 .unwrap();
1631
1632 assert_eq!(tags.len(), 2);
1633
1634 let tag_filter = vec![Tag {
1635 key: tags.first().unwrap().key.clone(),
1636 value: tags.first().unwrap().value.clone(),
1637 }];
1638
1639 let entity_id = PostgresClient::get_entity_id_by_tags(&pool, "service", &tag_filter, false)
1640 .await
1641 .unwrap();
1642
1643 assert_eq!(entity_id.first().unwrap(), &uid);
1644 }
1645}