1use crate::sql::cache::entity_cache;
2use crate::sql::cache::init_entity_cache;
3use crate::sql::error::SqlError;
4use crate::sql::traits::{
5 AlertSqlLogic, ArchiveSqlLogic, CustomMetricSqlLogic, GenAIDriftSqlLogic,
6 ObservabilitySqlLogic, ProfileSqlLogic, PsiSqlLogic, SpcSqlLogic, TagSqlLogic, TraceSqlLogic,
7 UserSqlLogic,
8};
9use scouter_settings::DatabaseSettings;
10use scouter_types::{RecordType, ServerRecords, TagRecord, ToDriftRecords, TraceServerRecord};
11use sqlx::ConnectOptions;
12use sqlx::{postgres::PgConnectOptions, Pool, Postgres};
13use std::result::Result::Ok;
14use tokio::try_join;
15use tracing::log::LevelFilter;
16use tracing::{debug, error, info, instrument};
17
18#[derive(Debug, Clone)]
19#[allow(dead_code)]
20pub struct PostgresClient {}
21
22impl SpcSqlLogic for PostgresClient {}
23impl CustomMetricSqlLogic for PostgresClient {}
24impl PsiSqlLogic for PostgresClient {}
25impl GenAIDriftSqlLogic for PostgresClient {}
26impl UserSqlLogic for PostgresClient {}
27impl ProfileSqlLogic for PostgresClient {}
28impl ObservabilitySqlLogic for PostgresClient {}
29impl AlertSqlLogic for PostgresClient {}
30impl ArchiveSqlLogic for PostgresClient {}
31impl TraceSqlLogic for PostgresClient {}
32impl TagSqlLogic for PostgresClient {}
33
34impl PostgresClient {
35 #[instrument(skip(database_settings))]
41 pub async fn create_db_pool(
42 database_settings: &DatabaseSettings,
43 ) -> Result<Pool<Postgres>, SqlError> {
44 let mut opts: PgConnectOptions = database_settings.connection_uri.parse()?;
45
46 opts = opts.log_statements(LevelFilter::Off);
48
49 let pool = match sqlx::postgres::PgPoolOptions::new()
50 .max_connections(database_settings.max_connections)
51 .connect_with(opts)
52 .await
53 {
54 Ok(pool) => {
55 info!("✅ Successfully connected to database");
56 pool
57 }
58 Err(err) => {
59 error!("🚨 Failed to connect to database {:?}", err);
60 std::process::exit(1);
61 }
62 };
63
64 init_entity_cache(1000);
66
67 if let Err(err) = Self::run_migrations(&pool).await {
69 error!("🚨 Failed to run migrations {:?}", err);
70 std::process::exit(1);
71 }
72
73 Ok(pool)
74 }
75
76 pub async fn run_migrations(pool: &Pool<Postgres>) -> Result<(), SqlError> {
77 info!("Running migrations");
78 sqlx::migrate!("src/migrations")
79 .run(pool)
80 .await
81 .map_err(SqlError::MigrateError)?;
82
83 debug!("Migrations complete");
84
85 Ok(())
86 }
87}
88
89pub struct MessageHandler {}
90
91impl MessageHandler {
92 const DEFAULT_BATCH_SIZE: usize = 500;
93 #[instrument(skip_all)]
94 pub async fn insert_server_records(
95 pool: &Pool<Postgres>,
96 records: ServerRecords,
97 ) -> Result<(), SqlError> {
98 debug!("Inserting server records: {:?}", records.record_type()?);
99
100 let entity_id = entity_cache()
101 .get_entity_id_from_uid(pool, records.uid()?)
102 .await
103 .inspect_err(|e| error!("Failed to get entity ID from UID: {:?}", e))?;
104
105 match records.record_type()? {
106 RecordType::Spc => {
107 let spc_records = records.to_spc_drift_records()?;
108 debug!("SPC record count: {}", spc_records.len());
109
110 for chunk in spc_records.chunks(Self::DEFAULT_BATCH_SIZE) {
111 PostgresClient::insert_spc_drift_records_batch(pool, chunk, &entity_id)
112 .await
113 .map_err(|e| {
114 error!("Failed to insert SPC drift records batch: {:?}", e);
115 e
116 })?;
117 }
118 }
119
120 RecordType::Psi => {
121 let psi_records = records.to_psi_drift_records()?;
122 debug!("PSI record count: {}", psi_records.len());
123
124 for chunk in psi_records.chunks(Self::DEFAULT_BATCH_SIZE) {
125 PostgresClient::insert_bin_counts_batch(pool, chunk, &entity_id)
126 .await
127 .map_err(|e| {
128 error!("Failed to insert PSI drift records batch: {:?}", e);
129 e
130 })?;
131 }
132 }
133 RecordType::Custom => {
134 let custom_records = records.to_custom_metric_drift_records()?;
135 debug!("Custom record count: {}", custom_records.len());
136
137 for chunk in custom_records.chunks(Self::DEFAULT_BATCH_SIZE) {
138 PostgresClient::insert_custom_metric_values_batch(pool, chunk, &entity_id)
139 .await
140 .map_err(|e| {
141 error!("Failed to insert custom metric records batch: {:?}", e);
142 e
143 })?;
144 }
145 }
146
147 RecordType::GenAIEval => {
148 debug!("LLM Drift record count: {:?}", records.len());
149 let records = records.to_genai_eval_records()?;
150 for record in records {
151 let _ = PostgresClient::insert_genai_eval_record(pool, record, &entity_id)
152 .await
153 .map_err(|e| {
154 error!("Failed to insert GenAI drift record: {:?}", e);
155 });
156 }
157 }
158
159 RecordType::GenAITask => {
160 debug!("GenAI Task count: {:?}", records.len());
161 let records = records.to_genai_task_records()?;
162 for chunk in records.chunks(Self::DEFAULT_BATCH_SIZE) {
163 PostgresClient::insert_eval_task_results_batch(pool, chunk, &entity_id)
164 .await
165 .map_err(|e| {
166 error!("Failed to insert GenAI task records batch: {:?}", e);
167 e
168 })?;
169 }
170 }
171
172 RecordType::GenAIWorkflow => {
173 debug!("GenAI Workflow count: {:?}", records.len());
174 let records = records.to_genai_workflow_records()?;
175 for record in records {
176 let _ = PostgresClient::insert_genai_eval_workflow_record(
177 pool, &record, &entity_id,
178 )
179 .await
180 .map_err(|e| {
181 error!("Failed to insert GenAI workflow record: {:?}", e);
182 });
183 }
184 }
185
186 _ => {
187 error!(
188 "Unsupported record type for batch insert: {:?}",
189 records.record_type()?
190 );
191 return Err(SqlError::UnsupportedBatchTypeError);
192 }
193 }
194
195 Ok(())
196 }
197
198 pub async fn insert_trace_server_record(
199 pool: &Pool<Postgres>,
200 records: TraceServerRecord,
201 ) -> Result<(), SqlError> {
202 let (span_batch, baggage_batch, tag_records) = records.to_records()?;
203
204 let (span_result, baggage_result, tag_result) = try_join!(
205 PostgresClient::insert_span_batch(pool, &span_batch),
206 PostgresClient::insert_trace_baggage_batch(pool, &baggage_batch),
207 PostgresClient::insert_tag_batch(pool, &tag_records),
208 )?;
209
210 debug!(
211 span_rows = span_result.rows_affected(),
212 baggage_rows = baggage_result.rows_affected(),
213 total_spans = span_batch.len(),
214 total_baggage = baggage_batch.len(),
215 tag_rows = tag_result.rows_affected(),
216 "Successfully inserted trace server records"
217 );
218 Ok(())
219 }
220
221 pub async fn insert_tag_record(
222 pool: &Pool<Postgres>,
223 record: TagRecord,
224 ) -> Result<(), SqlError> {
225 let result = PostgresClient::insert_tag_batch(pool, std::slice::from_ref(&record)).await?;
226
227 debug!(
228 rows_affected = result.rows_affected(),
229 entity_type = record.entity_type.as_str(),
230 entity_id = record.entity_id.as_str(),
231 key = record.key.as_str(),
232 "Successfully inserted tag record"
233 );
234
235 Ok(())
236 }
237}
238
239#[cfg(test)]
243mod tests {
244
245 use super::*;
246 use crate::sql::schema::User;
247 use crate::sql::traits::EntitySqlLogic;
248 use chrono::{Duration, Utc};
249 use potato_head::create_uuid7;
250 use rand::Rng;
251 use scouter_semver::VersionType;
252 use scouter_settings::ObjectStorageSettings;
253 use scouter_types::genai::ExecutionPlan;
254 use scouter_types::psi::{Bin, BinType, PsiDriftConfig, PsiFeatureDriftProfile};
255 use scouter_types::spc::SpcDriftProfile;
256 use scouter_types::sql::TraceFilters;
257 use scouter_types::*;
258 use serde_json::Value;
259
260 const SPACE: &str = "space";
261 const NAME: &str = "name";
262 const VERSION: &str = "1.0.0";
263 const SCOPE: &str = "scope";
264 const UID: &str = "test";
265 const ENTITY_ID: i32 = 9999;
266
267 fn random_trace_record() -> TraceRecord {
268 let mut rng = rand::rng();
269 let random_num = rng.random_range(0..1000);
270 let trace_id: String = (0..32)
271 .map(|_| format!("{:x}", rng.random_range(0..16)))
272 .collect();
273 let span_id: String = (0..16)
274 .map(|_| format!("{:x}", rng.random_range(0..16)))
275 .collect();
276 let created_at = Utc::now() + chrono::Duration::milliseconds(random_num);
277
278 TraceRecord {
279 trace_id: trace_id.clone(),
280 created_at,
281 service_name: format!("service_{}", random_num % 10),
282 scope: SCOPE.to_string(),
283 trace_state: "running".to_string(),
284 start_time: created_at,
285 end_time: created_at + chrono::Duration::milliseconds(150),
286 duration_ms: 150,
287 status_code: 0,
288 span_count: 1,
289 status_message: "OK".to_string(),
290 root_span_id: span_id.clone(),
291 tags: vec![],
292 process_attributes: vec![],
293 }
294 }
295
296 fn random_span_record(
297 trace_id: &str,
298 parent_span_id: Option<&str>,
299 service_name: &str,
300 ) -> TraceSpanRecord {
301 let mut rng = rand::rng();
302 let span_id: String = (0..16)
303 .map(|_| format!("{:x}", rng.random_range(0..16)))
304 .collect();
305
306 let random_offset_ms = rng.random_range(0..1000);
307 let duration_ms_val = rng.random_range(50..500);
308
309 let created_at = Utc::now() + chrono::Duration::milliseconds(random_offset_ms);
310 let start_time = created_at;
311 let end_time = start_time + chrono::Duration::milliseconds(duration_ms_val);
312
313 let status_code = if rng.random_bool(0.95) { 0 } else { 2 };
315 let span_kind_options = ["SERVER", "CLIENT", "INTERNAL", "PRODUCER", "CONSUMER"];
316 let span_kind = span_kind_options[rng.random_range(0..span_kind_options.len())].to_string();
317
318 TraceSpanRecord {
319 created_at,
320 span_id,
321 trace_id: trace_id.to_string(),
322 parent_span_id: parent_span_id.map(|s| s.to_string()),
323 service_name: service_name.to_string(),
324 scope: SCOPE.to_string(),
325 span_name: format!("{}_{}", "random_operation", rng.random_range(0..10)),
326 span_kind,
327 start_time,
328 end_time,
329 duration_ms: duration_ms_val,
330 status_code,
331 status_message: if status_code == 2 {
332 "Internal Server Error".to_string()
333 } else {
334 "OK".to_string()
335 },
336 attributes: vec![Attribute::default()],
337 events: vec![],
338 links: vec![],
339 label: None,
340 input: Value::default(),
341 output: Value::default(),
342 resource_attributes: vec![],
343 }
344 }
345
346 pub async fn cleanup(pool: &Pool<Postgres>) {
347 sqlx::raw_sql(
348 r#"
349 DELETE
350 FROM scouter.service_entities;
351
352 DELETE
353 FROM scouter.drift_entities;
354
355 DELETE
356 FROM scouter.spc_drift;
357
358 DELETE
359 FROM scouter.observability_metric;
360
361 DELETE
362 FROM scouter.custom_drift;
363
364 DELETE
365 FROM scouter.drift_alert;
366
367 DELETE
368 FROM scouter.drift_profile;
369
370 DELETE
371 FROM scouter.psi_drift;
372
373 DELETE
374 FROM scouter.user;
375
376 DELETE
377 FROM scouter.genai_eval_record;
378
379 DELETE
380 FROM scouter.genai_eval_task;
381
382 DELETE
383 FROM scouter.genai_eval_workflow;
384
385 DELETE
386 FROM scouter.spans;
387
388 DELETE
389 FROM scouter.trace_baggage;
390
391 DELETE
392 FROM scouter.tags;
393 "#,
394 )
395 .fetch_all(pool)
396 .await
397 .unwrap();
398 }
399
400 pub async fn db_pool() -> Pool<Postgres> {
401 let pool = PostgresClient::create_db_pool(&DatabaseSettings::default())
402 .await
403 .unwrap();
404 cleanup(&pool).await;
405 pool
406 }
407
408 pub async fn insert_profile_to_db(
409 pool: &Pool<Postgres>,
410 profile: &DriftProfile,
411 active: bool,
412 deactivate_others: bool,
413 ) -> String {
414 let base_args = profile.get_base_args();
415 let version_request = VersionRequest {
416 version: None,
417 version_type: VersionType::Minor,
418 pre_tag: None,
419 build_tag: None,
420 };
421 let version = PostgresClient::get_next_profile_version(pool, &base_args, version_request)
422 .await
423 .unwrap();
424
425 let result = PostgresClient::insert_drift_profile(
426 pool,
427 profile,
428 &base_args,
429 &version,
430 &active,
431 &deactivate_others,
432 )
433 .await
434 .unwrap();
435
436 result
437 }
438
439 #[tokio::test]
440 async fn test_postgres_start() {
441 let _pool = db_pool().await;
442 }
443
444 #[tokio::test]
445 async fn test_postgres_drift_alert() {
446 let pool = db_pool().await;
447 let entity_id = 9999;
448
449 for i in 0..10 {
451 let alert = AlertMap::Custom(custom::ComparisonMetricAlert {
452 metric_name: "test".to_string(),
453 baseline_value: 0 as f64,
454 observed_value: i as f64,
455 delta: None,
456 alert_threshold: AlertThreshold::Above,
457 });
458
459 let result = PostgresClient::insert_drift_alert(&pool, &entity_id, &alert)
460 .await
461 .unwrap();
462
463 assert_eq!(result.rows_affected(), 1);
464
465 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
467 }
468
469 let request = DriftAlertPaginationRequest {
471 uid: UID.to_string(),
472 active: Some(true),
473 limit: Some(50),
474 ..Default::default()
475 };
476
477 let response = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
478 .await
479 .unwrap();
480
481 assert_eq!(response.items.len(), 10);
482 assert!(!response.has_next); assert!(!response.has_previous); assert!(response.next_cursor.is_none());
485 assert!(response.previous_cursor.is_some());
486
487 let request = DriftAlertPaginationRequest {
489 uid: UID.to_string(),
490 active: Some(true),
491 limit: Some(3),
492 direction: Some("next".to_string()),
493 ..Default::default()
494 };
495
496 let page1 = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
497 .await
498 .unwrap();
499
500 assert_eq!(page1.items.len(), 3);
501 assert!(page1.has_next);
502 assert!(!page1.has_previous);
503 assert!(page1.next_cursor.is_some());
504 assert!(page1.previous_cursor.is_some());
505
506 let next_cursor = page1.next_cursor.unwrap();
508 let request = DriftAlertPaginationRequest {
509 uid: UID.to_string(),
510 active: Some(true),
511 limit: Some(3),
512 cursor_created_at: Some(next_cursor.created_at),
513 cursor_id: Some(next_cursor.id as i32),
514 direction: Some("next".to_string()),
515 start_datetime: None,
516 end_datetime: None,
517 };
518
519 let page2 = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
520 .await
521 .unwrap();
522
523 assert_eq!(page2.items.len(), 3);
524 assert!(page2.has_next); assert!(page2.has_previous); assert!(page2.next_cursor.is_some());
527 assert!(page2.previous_cursor.is_some());
528
529 let page1_ids: std::collections::HashSet<_> = page1.items.iter().map(|a| a.id).collect();
531 let page2_ids: std::collections::HashSet<_> = page2.items.iter().map(|a| a.id).collect();
532 assert!(page1_ids.is_disjoint(&page2_ids));
533
534 let prev_cursor = page2.previous_cursor.unwrap();
536 let request = DriftAlertPaginationRequest {
537 uid: UID.to_string(),
538 active: Some(true),
539 limit: Some(3),
540 cursor_created_at: Some(prev_cursor.created_at),
541 cursor_id: Some(prev_cursor.id as i32),
542 direction: Some("previous".to_string()),
543 start_datetime: None,
544 end_datetime: None,
545 };
546
547 let page_back = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
548 .await
549 .unwrap();
550
551 assert_eq!(page_back.items.len(), 3);
552 assert!(page_back.has_next); assert_eq!(
555 page_back.items.iter().map(|a| a.id).collect::<Vec<_>>(),
556 page1.items.iter().map(|a| a.id).collect::<Vec<_>>()
557 );
558
559 let to_deactivate = &page1.items[0];
562 let update_request = UpdateAlertStatus {
563 id: to_deactivate.id,
564 active: false,
565 space: "test_space".to_string(),
566 };
567
568 PostgresClient::update_drift_alert_status(&pool, &update_request)
569 .await
570 .unwrap();
571
572 let request = DriftAlertPaginationRequest {
574 uid: UID.to_string(),
575 active: Some(true),
576 limit: Some(50),
577 ..Default::default()
578 };
579
580 let active_alerts = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
581 .await
582 .unwrap();
583
584 assert_eq!(active_alerts.items.len(), 9); assert!(active_alerts.items.iter().all(|a| a.active));
586
587 let request = DriftAlertPaginationRequest {
589 uid: UID.to_string(),
590 active: Some(false),
591 limit: Some(50),
592 ..Default::default()
593 };
594
595 let inactive_alerts =
596 PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
597 .await
598 .unwrap();
599
600 assert_eq!(inactive_alerts.items.len(), 1);
601 assert!(!inactive_alerts.items[0].active);
602
603 let request = DriftAlertPaginationRequest {
605 uid: UID.to_string(),
606 active: None, limit: Some(50),
608 ..Default::default()
609 };
610
611 let all_alerts = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
612 .await
613 .unwrap();
614
615 assert_eq!(all_alerts.items.len(), 10);
616
617 let request = DriftAlertPaginationRequest {
619 uid: UID.to_string(),
620 active: Some(true),
621 limit: Some(3),
622 ..Default::default()
623 };
624
625 let non_existent = PostgresClient::get_paginated_drift_alerts(&pool, &request, &999999)
626 .await
627 .unwrap();
628
629 assert_eq!(non_existent.items.len(), 0);
630 assert!(!non_existent.has_next);
631 assert!(!non_existent.has_previous);
632 assert!(non_existent.next_cursor.is_none());
633 assert!(non_existent.previous_cursor.is_none());
634 }
635
636 #[tokio::test]
637 async fn test_postgres_spc_drift_record() {
638 let pool = db_pool().await;
639
640 let record1 = SpcRecord {
641 created_at: Utc::now(),
642 uid: UID.to_string(),
643 feature: "test".to_string(),
644 value: 1.0,
645 entity_id: 0,
646 };
647
648 let record2 = SpcRecord {
649 created_at: Utc::now(),
650 uid: UID.to_string(),
651 feature: "test2".to_string(),
652 value: 2.0,
653 entity_id: 0,
654 };
655
656 let result =
657 PostgresClient::insert_spc_drift_records_batch(&pool, &[record1, record2], &ENTITY_ID)
658 .await
659 .unwrap();
660
661 assert_eq!(result.rows_affected(), 2);
662 }
663
664 #[tokio::test]
665 async fn test_postgres_bin_count() {
666 let pool = db_pool().await;
667
668 let record1 = PsiRecord {
669 created_at: Utc::now(),
670 uid: UID.to_string(),
671 feature: "test".to_string(),
672 bin_id: 1,
673 bin_count: 1,
674 entity_id: ENTITY_ID,
675 };
676
677 let record2 = PsiRecord {
678 created_at: Utc::now(),
679 uid: UID.to_string(),
680 feature: "test2".to_string(),
681 bin_id: 2,
682 bin_count: 2,
683 entity_id: ENTITY_ID,
684 };
685
686 let result =
687 PostgresClient::insert_bin_counts_batch(&pool, &[record1, record2], &ENTITY_ID)
688 .await
689 .unwrap();
690
691 assert_eq!(result.rows_affected(), 2);
692 }
693
694 #[tokio::test]
695 async fn test_postgres_observability_record() {
696 let pool = db_pool().await;
697
698 let record = ObservabilityMetrics::default();
699
700 let result = PostgresClient::insert_observability_record(&pool, &record, &ENTITY_ID)
701 .await
702 .unwrap();
703
704 assert_eq!(result.rows_affected(), 1);
705 }
706
707 #[tokio::test]
708 async fn test_postgres_crud_drift_profile() {
709 let pool = db_pool().await;
710
711 let mut spc_profile = SpcDriftProfile::default();
712 let profile = DriftProfile::Spc(spc_profile.clone());
713 let uid = insert_profile_to_db(&pool, &profile, false, false).await;
714 assert!(!uid.is_empty());
715
716 let entity_id = PostgresClient::get_entity_id_from_uid(&pool, &uid)
717 .await
718 .unwrap();
719
720 spc_profile.scouter_version = "test".to_string();
721
722 let result = PostgresClient::update_drift_profile(
723 &pool,
724 &DriftProfile::Spc(spc_profile.clone()),
725 &entity_id,
726 )
727 .await
728 .unwrap();
729
730 assert_eq!(result.rows_affected(), 1);
731
732 let profile = PostgresClient::get_drift_profile(&pool, &entity_id)
733 .await
734 .unwrap();
735
736 let deserialized = serde_json::from_value::<SpcDriftProfile>(profile.unwrap()).unwrap();
737
738 assert_eq!(deserialized, spc_profile);
739
740 PostgresClient::update_drift_profile_status(
741 &pool,
742 &ProfileStatusRequest {
743 name: spc_profile.config.name.clone(),
744 space: spc_profile.config.space.clone(),
745 version: spc_profile.config.version.clone(),
746 active: false,
747 drift_type: Some(DriftType::Spc),
748 deactivate_others: false,
749 },
750 )
751 .await
752 .unwrap();
753 }
754
755 #[tokio::test]
756 async fn test_postgres_get_features() {
757 let pool = db_pool().await;
758
759 let timestamp = Utc::now();
760
761 for _ in 0..10 {
762 let mut records = Vec::new();
763 for j in 0..10 {
764 let record = SpcRecord {
765 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
766 uid: UID.to_string(),
767 feature: format!("test{j}"),
768 value: j as f64,
769 entity_id: ENTITY_ID,
770 };
771
772 records.push(record);
773 }
774
775 let result =
776 PostgresClient::insert_spc_drift_records_batch(&pool, &records, &ENTITY_ID)
777 .await
778 .unwrap();
779 assert_eq!(result.rows_affected(), records.len() as u64);
780 }
781
782 let features = PostgresClient::get_spc_features(&pool, &ENTITY_ID)
783 .await
784 .unwrap();
785 assert_eq!(features.len(), 10);
786
787 let records =
788 PostgresClient::get_spc_drift_records(&pool, ×tamp, &features, &ENTITY_ID)
789 .await
790 .unwrap();
791
792 assert_eq!(records.features.len(), 10);
793
794 let binned_records = PostgresClient::get_binned_spc_drift_records(
795 &pool,
796 &DriftRequest {
797 uid: UID.to_string(),
798 time_interval: TimeInterval::FifteenMinutes,
799 max_data_points: 10,
800 ..Default::default()
801 },
802 &DatabaseSettings::default().retention_period,
803 &ObjectStorageSettings::default(),
804 &ENTITY_ID,
805 )
806 .await
807 .unwrap();
808
809 assert_eq!(binned_records.features.len(), 10);
810 }
811
812 #[tokio::test]
813 async fn test_postgres_bin_proportions() {
814 let pool = db_pool().await;
815
816 let timestamp = Utc::now();
817
818 let num_features = 3;
819 let num_bins = 5;
820
821 let features = (0..=num_features)
822 .map(|feature| {
823 let bins = (0..=num_bins)
824 .map(|bind_id| Bin {
825 id: bind_id,
826 lower_limit: None,
827 upper_limit: None,
828 proportion: 0.0,
829 })
830 .collect();
831 let feature_name = format!("feature{feature}");
832 let feature_profile = PsiFeatureDriftProfile {
833 id: feature_name.clone(),
834 bins,
835 timestamp,
836 bin_type: BinType::Numeric,
837 };
838 (feature_name, feature_profile)
839 })
840 .collect();
841
842 let profile = &DriftProfile::Psi(psi::PsiDriftProfile::new(
843 features,
844 PsiDriftConfig {
845 space: SPACE.to_string(),
846 name: NAME.to_string(),
847 version: VERSION.to_string(),
848 ..Default::default()
849 },
850 ));
851 let uid = insert_profile_to_db(&pool, profile, false, false).await;
852 let entity_id = PostgresClient::get_entity_id_from_uid(&pool, &uid)
853 .await
854 .unwrap();
855
856 for feature in 0..num_features {
857 for bin in 0..=num_bins {
858 let mut records = Vec::new();
859 for j in 0..=100 {
860 let record = PsiRecord {
861 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
862 uid: uid.to_string(),
863 feature: format!("feature{feature}"),
864 bin_id: bin,
865 bin_count: rand::rng().random_range(0..10) as i32,
866 entity_id: ENTITY_ID,
867 };
868
869 records.push(record);
870 }
871 PostgresClient::insert_bin_counts_batch(&pool, &records, &entity_id)
872 .await
873 .unwrap();
874 }
875 }
876
877 let binned_records = PostgresClient::get_feature_distributions(
878 &pool,
879 ×tamp,
880 &["feature0".to_string()],
881 &entity_id,
882 )
883 .await
884 .unwrap();
885
886 let bin_proportion = binned_records
888 .distributions
889 .get("feature0")
890 .unwrap()
891 .bins
892 .get(&1)
893 .unwrap();
894
895 assert!(*bin_proportion > 0.1 && *bin_proportion < 0.2);
896
897 let binned_records = PostgresClient::get_binned_psi_drift_records(
898 &pool,
899 &DriftRequest {
900 uid: UID.to_string(),
901 time_interval: TimeInterval::OneHour,
902 max_data_points: 1000,
903 ..Default::default()
904 },
905 &DatabaseSettings::default().retention_period,
906 &ObjectStorageSettings::default(),
907 &entity_id,
908 )
909 .await
910 .unwrap();
911 assert_eq!(binned_records.len(), 3);
913 }
914
915 #[tokio::test]
916 async fn test_postgres_cru_custom_metric() {
917 let pool = db_pool().await;
918 let timestamp = Utc::now();
919
920 let (uid, entity_id) = PostgresClient::create_entity(
921 &pool,
922 SPACE,
923 NAME,
924 VERSION,
925 DriftType::Custom.to_string(),
926 )
927 .await
928 .unwrap();
929
930 for i in 0..2 {
931 let mut records = Vec::new();
932 for j in 0..25 {
933 let record = CustomMetricRecord {
934 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
935 uid: uid.clone(),
936 metric: format!("metric{i}"),
937 value: rand::rng().random_range(0..10) as f64,
938 entity_id: ENTITY_ID,
939 };
940 records.push(record);
941 }
942 let result =
943 PostgresClient::insert_custom_metric_values_batch(&pool, &records, &entity_id)
944 .await
945 .unwrap();
946 assert_eq!(result.rows_affected(), 25);
947 }
948
949 let record = CustomMetricRecord {
951 created_at: Utc::now(),
952 uid: uid.clone(),
953 metric: "metric3".to_string(),
954 value: rand::rng().random_range(0..10) as f64,
955 entity_id: ENTITY_ID,
956 };
957
958 let result =
959 PostgresClient::insert_custom_metric_values_batch(&pool, &[record], &entity_id)
960 .await
961 .unwrap();
962 assert_eq!(result.rows_affected(), 1);
963
964 let metrics = PostgresClient::get_custom_metric_values(
965 &pool,
966 ×tamp,
967 &["metric1".to_string()],
968 &entity_id,
969 )
970 .await
971 .unwrap();
972
973 assert_eq!(metrics.len(), 1);
974
975 let binned_records = PostgresClient::get_binned_custom_drift_records(
976 &pool,
977 &DriftRequest {
978 uid: uid.clone(),
979 time_interval: TimeInterval::OneHour,
980 max_data_points: 1000,
981 ..Default::default()
982 },
983 &DatabaseSettings::default().retention_period,
984 &ObjectStorageSettings::default(),
985 &entity_id,
986 )
987 .await
988 .unwrap();
989 assert_eq!(binned_records.metrics.len(), 3);
991 }
992
993 #[tokio::test]
994 async fn test_postgres_user() {
995 let pool = db_pool().await;
996 let recovery_codes = vec!["recovery_code_1".to_string(), "recovery_code_2".to_string()];
997
998 let user = User::new(
1000 "user".to_string(),
1001 "pass".to_string(),
1002 "email".to_string(),
1003 recovery_codes,
1004 None,
1005 None,
1006 None,
1007 None,
1008 );
1009 PostgresClient::insert_user(&pool, &user).await.unwrap();
1010
1011 let mut user = PostgresClient::get_user(&pool, "user")
1013 .await
1014 .unwrap()
1015 .unwrap();
1016
1017 assert_eq!(user.username, "user");
1018 assert_eq!(user.group_permissions, vec!["user"]);
1019 assert_eq!(user.email, "email");
1020
1021 user.active = false;
1023 user.refresh_token = Some("token".to_string());
1024
1025 PostgresClient::update_user(&pool, &user).await.unwrap();
1027 let user = PostgresClient::get_user(&pool, "user")
1028 .await
1029 .unwrap()
1030 .unwrap();
1031 assert!(!user.active);
1032 assert_eq!(user.refresh_token.unwrap(), "token");
1033
1034 let users = PostgresClient::get_users(&pool).await.unwrap();
1036 assert_eq!(users.len(), 1);
1037
1038 let is_last_admin = PostgresClient::is_last_admin(&pool, "user").await.unwrap();
1040 assert!(!is_last_admin);
1041
1042 PostgresClient::delete_user(&pool, "user").await.unwrap();
1044 }
1045
1046 #[tokio::test]
1047 async fn test_postgres_genai_eval_record_insert_get() {
1048 let pool = db_pool().await;
1049
1050 let (uid, entity_id) = PostgresClient::create_entity(
1051 &pool,
1052 SPACE,
1053 NAME,
1054 VERSION,
1055 DriftType::GenAI.to_string(),
1056 )
1057 .await
1058 .unwrap();
1059
1060 let input = "This is a test input";
1061 let output = "This is a test response";
1062
1063 for j in 0..10 {
1064 let context = serde_json::json!({
1065 "input": input,
1066 "response": output,
1067 });
1068 let record = GenAIEvalRecord {
1069 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1070 context,
1071 status: Status::Pending,
1072 id: 0, uid: format!("test_{}", j),
1074 entity_uid: uid.clone(),
1075 entity_id,
1076 ..Default::default()
1077 };
1078
1079 let boxed = BoxedGenAIEvalRecord::new(record);
1080
1081 let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1082 .await
1083 .unwrap();
1084
1085 assert_eq!(result.rows_affected(), 1);
1086 }
1087
1088 let features = PostgresClient::get_genai_eval_records(&pool, None, None, &entity_id)
1089 .await
1090 .unwrap();
1091 assert_eq!(features.len(), 10);
1092
1093 let pending_tasks = PostgresClient::get_pending_genai_eval_record(&pool)
1095 .await
1096 .unwrap();
1097
1098 assert!(pending_tasks.is_some());
1100
1101 let task_input = &pending_tasks.as_ref().unwrap().context["input"];
1103 assert_eq!(*task_input, "This is a test input".to_string());
1104
1105 PostgresClient::update_genai_eval_record_status(
1107 &pool,
1108 &pending_tasks.unwrap(),
1109 Status::Processed,
1110 &(1_i64),
1111 )
1112 .await
1113 .unwrap();
1114
1115 let processed_tasks = PostgresClient::get_genai_eval_records(
1117 &pool,
1118 None,
1119 Some(Status::Processed),
1120 &entity_id,
1121 )
1122 .await
1123 .unwrap();
1124
1125 assert_eq!(processed_tasks.len(), 1);
1127 }
1128
1129 #[tokio::test]
1130 async fn test_postgres_genai_eval_record_pagination() {
1131 let pool = db_pool().await;
1132
1133 let (uid, entity_id) = PostgresClient::create_entity(
1134 &pool,
1135 SPACE,
1136 NAME,
1137 VERSION,
1138 DriftType::GenAI.to_string(),
1139 )
1140 .await
1141 .unwrap();
1142
1143 let input = "This is a test input";
1144 let output = "This is a test response";
1145
1146 for j in 0..10 {
1148 let context = serde_json::json!({
1149 "input": input,
1150 "response": output,
1151 });
1152 let record = GenAIEvalRecord {
1153 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1154 context,
1155 status: Status::Pending,
1156 id: 0, uid: format!("test_{}", j),
1158 entity_uid: uid.clone(),
1159 entity_id: ENTITY_ID,
1160 ..Default::default()
1161 };
1162
1163 let boxed = BoxedGenAIEvalRecord::new(record);
1164
1165 let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1166 .await
1167 .unwrap();
1168
1169 assert_eq!(result.rows_affected(), 1);
1170 }
1171
1172 let params = GenAIEvalRecordPaginationRequest {
1174 status: None,
1175 limit: Some(5),
1176 cursor_created_at: None,
1177 cursor_id: None,
1178 direction: None,
1179 ..Default::default()
1180 };
1181
1182 let page1 = PostgresClient::get_paginated_genai_eval_records(&pool, ¶ms, &entity_id)
1183 .await
1184 .unwrap();
1185
1186 assert_eq!(page1.items.len(), 5, "Page 1 should have 5 records");
1187 assert!(page1.has_next, "Should have next page");
1188 assert!(
1189 !page1.has_previous,
1190 "Should not have previous page (first page)"
1191 );
1192 assert!(page1.next_cursor.is_some(), "Should have next cursor");
1193
1194 let page1_first = page1.items.first().unwrap();
1196 let page1_last = page1.items.last().unwrap();
1197
1198 assert!(
1199 page1_first.created_at >= page1_last.created_at,
1200 "Page 1 should be sorted newest first (DESC)"
1201 );
1202
1203 let next_cursor = page1.next_cursor.unwrap();
1205
1206 let params = GenAIEvalRecordPaginationRequest {
1207 status: None,
1208 limit: Some(5),
1209 cursor_created_at: Some(next_cursor.created_at),
1210 cursor_id: Some(next_cursor.id),
1211 direction: None,
1212 ..Default::default()
1213 };
1214
1215 let page2 = PostgresClient::get_paginated_genai_eval_records(&pool, ¶ms, &entity_id)
1216 .await
1217 .unwrap();
1218
1219 assert_eq!(page2.items.len(), 5, "Page 2 should have 5 records");
1220 assert!(!page2.has_next, "Should not have next page (last page)");
1221 assert!(page2.has_previous, "Should have previous page");
1222 assert!(
1223 page2.previous_cursor.is_some(),
1224 "Should have previous cursor"
1225 );
1226
1227 let page2_first = page2.items.first().unwrap();
1228
1229 assert!(
1231 page2_first.created_at < page1_last.created_at
1232 || (page2_first.created_at == page1_last.created_at
1233 && page2_first.id < page1_last.id),
1234 "Page 2 should start with records older than Page 1 last item"
1235 );
1236
1237 let all_ids: Vec<i64> = page1
1239 .items
1240 .iter()
1241 .chain(page2.items.iter())
1242 .map(|r| r.id)
1243 .collect();
1244
1245 assert_eq!(all_ids.len(), 10, "Should have 10 unique records total");
1246
1247 let unique_ids: std::collections::HashSet<_> = all_ids.iter().collect();
1249 assert_eq!(unique_ids.len(), 10, "All IDs should be unique");
1250
1251 let previous_cursor = page2.previous_cursor.unwrap();
1254
1255 let params = GenAIEvalRecordPaginationRequest {
1256 status: None,
1257 limit: Some(5),
1258 cursor_created_at: Some(previous_cursor.created_at),
1259 cursor_id: Some(previous_cursor.id),
1260 direction: Some("previous".to_string()),
1261 ..Default::default()
1262 };
1263
1264 let page1_again =
1265 PostgresClient::get_paginated_genai_eval_records(&pool, ¶ms, &entity_id)
1266 .await
1267 .unwrap();
1268
1269 assert_eq!(
1270 page1_again.items.len(),
1271 5,
1272 "Going back should return 5 records"
1273 );
1274
1275 assert_eq!(
1277 page1_again.items.first().unwrap().id,
1278 page1_first.id,
1279 "Should return to the same first record"
1280 );
1281 }
1282
1283 #[tokio::test]
1284 async fn test_postgres_genai_eval_workflow_pagination() {
1285 let pool = db_pool().await;
1286
1287 let (_uid, entity_id) = PostgresClient::create_entity(
1288 &pool,
1289 SPACE,
1290 NAME,
1291 VERSION,
1292 DriftType::GenAI.to_string(),
1293 )
1294 .await
1295 .unwrap();
1296
1297 for j in 0..10 {
1299 let record = GenAIEvalWorkflowResult {
1300 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1301 record_uid: format!("test_{}", j),
1302 entity_id,
1303 ..Default::default()
1304 };
1305
1306 let result =
1307 PostgresClient::insert_genai_eval_workflow_record(&pool, &record, &entity_id)
1308 .await
1309 .unwrap();
1310
1311 assert_eq!(result.rows_affected(), 1);
1312 }
1313
1314 let params = GenAIEvalRecordPaginationRequest {
1316 status: None,
1317 limit: Some(5),
1318 cursor_created_at: None,
1319 cursor_id: None,
1320 direction: None,
1321 ..Default::default()
1322 };
1323
1324 let page1 =
1325 PostgresClient::get_paginated_genai_eval_workflow_records(&pool, ¶ms, &entity_id)
1326 .await
1327 .unwrap();
1328
1329 assert_eq!(page1.items.len(), 5, "Page 1 should have 5 records");
1330 assert!(page1.has_next, "Should have next page");
1331 assert!(
1332 !page1.has_previous,
1333 "Should not have previous page (first page)"
1334 );
1335 assert!(page1.next_cursor.is_some(), "Should have next cursor");
1336
1337 let page1_first = page1.items.first().unwrap();
1339 let page1_last = page1.items.last().unwrap();
1340
1341 assert!(
1342 page1_first.created_at >= page1_last.created_at,
1343 "Page 1 should be sorted newest first (DESC)"
1344 );
1345
1346 let next_cursor = page1.next_cursor.unwrap();
1348
1349 let params = GenAIEvalRecordPaginationRequest {
1350 status: None,
1351 limit: Some(5),
1352 cursor_created_at: Some(next_cursor.created_at),
1353 cursor_id: Some(next_cursor.id),
1354 direction: None,
1355 ..Default::default()
1356 };
1357
1358 let page2 =
1359 PostgresClient::get_paginated_genai_eval_workflow_records(&pool, ¶ms, &entity_id)
1360 .await
1361 .unwrap();
1362
1363 assert_eq!(page2.items.len(), 5, "Page 2 should have 5 records");
1364 assert!(!page2.has_next, "Should not have next page (last page)");
1365 assert!(page2.has_previous, "Should have previous page");
1366 assert!(
1367 page2.previous_cursor.is_some(),
1368 "Should have previous cursor"
1369 );
1370
1371 let page2_first = page2.items.first().unwrap();
1372
1373 assert!(
1375 page2_first.created_at < page1_last.created_at
1376 || (page2_first.created_at == page1_last.created_at
1377 && page2_first.id < page1_last.id),
1378 "Page 2 should start with records older than Page 1 last item"
1379 );
1380
1381 let all_ids: Vec<i64> = page1
1383 .items
1384 .iter()
1385 .chain(page2.items.iter())
1386 .map(|r| r.id)
1387 .collect();
1388
1389 assert_eq!(all_ids.len(), 10, "Should have 10 unique records total");
1390
1391 let unique_ids: std::collections::HashSet<_> = all_ids.iter().collect();
1393 assert_eq!(unique_ids.len(), 10, "All IDs should be unique");
1394
1395 let previous_cursor = page2.previous_cursor.unwrap();
1398
1399 let params = GenAIEvalRecordPaginationRequest {
1400 status: None,
1401 limit: Some(5),
1402 cursor_created_at: Some(previous_cursor.created_at),
1403 cursor_id: Some(previous_cursor.id),
1404 direction: Some("previous".to_string()),
1405 ..Default::default()
1406 };
1407
1408 let page1_again =
1409 PostgresClient::get_paginated_genai_eval_workflow_records(&pool, ¶ms, &entity_id)
1410 .await
1411 .unwrap();
1412
1413 assert_eq!(
1414 page1_again.items.len(),
1415 5,
1416 "Going back should return 5 records"
1417 );
1418
1419 assert_eq!(
1421 page1_again.items.first().unwrap().id,
1422 page1_first.id,
1423 "Should return to the same first record"
1424 );
1425 }
1426
1427 #[tokio::test]
1428 async fn test_postgres_genai_task_result_insert_get() {
1429 let pool = db_pool().await;
1430
1431 let timestamp = Utc::now();
1432
1433 let (uid, entity_id) = PostgresClient::create_entity(
1434 &pool,
1435 SPACE,
1436 NAME,
1437 VERSION,
1438 DriftType::GenAI.to_string(),
1439 )
1440 .await
1441 .unwrap();
1442
1443 let mut records = Vec::new();
1444 for i in 0..2 {
1445 for j in 0..25 {
1446 let record = GenAIEvalTaskResult {
1447 record_uid: format!("record_uid_{i}_{j}"),
1448 created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1449 start_time: Utc::now(),
1450 end_time: Utc::now() + chrono::Duration::seconds(1),
1451 entity_id,
1452 task_id: format!("task{i}"),
1453 task_type: scouter_types::genai::EvaluationTaskType::Assertion,
1454 passed: true,
1455 value: j as f64,
1456 field_path: Some(format!("field.path.{i}")),
1457 operator: scouter_types::genai::ComparisonOperator::Contains,
1458 expected: Value::Null,
1459 actual: Value::Null,
1460 message: "All good".to_string(),
1461 entity_uid: uid.clone(),
1462 condition: false,
1463 stage: 0_i32,
1464 };
1465 records.push(record);
1466 }
1467 let result =
1468 PostgresClient::insert_eval_task_results_batch(&pool, &records, &entity_id)
1469 .await
1470 .unwrap();
1471 assert_eq!(result.rows_affected(), 25);
1472 }
1473
1474 let metrics = PostgresClient::get_genai_task_values(
1475 &pool,
1476 ×tamp,
1477 &["task1".to_string()],
1478 &entity_id,
1479 )
1480 .await
1481 .unwrap();
1482
1483 assert_eq!(metrics.len(), 1);
1484 let binned_records = PostgresClient::get_binned_genai_task_values(
1485 &pool,
1486 &DriftRequest {
1487 uid: uid.clone(),
1488 time_interval: TimeInterval::OneHour,
1489 max_data_points: 1000,
1490 ..Default::default()
1491 },
1492 &DatabaseSettings::default().retention_period,
1493 &ObjectStorageSettings::default(),
1494 &entity_id,
1495 )
1496 .await
1497 .unwrap();
1498 assert_eq!(binned_records.metrics.len(), 2);
1500
1501 let eval_task = PostgresClient::get_genai_eval_task(&pool, &records[0].record_uid)
1502 .await
1503 .unwrap();
1504
1505 assert_eq!(eval_task[0].record_uid, records[0].record_uid);
1506 }
1507
1508 #[tokio::test]
1509 async fn test_postgres_genai_workflow_result_insert_get() {
1510 let pool = db_pool().await;
1511
1512 let timestamp = Utc::now();
1513
1514 let (uid, entity_id) = PostgresClient::create_entity(
1515 &pool,
1516 SPACE,
1517 NAME,
1518 VERSION,
1519 DriftType::GenAI.to_string(),
1520 )
1521 .await
1522 .unwrap();
1523
1524 for i in 0..2 {
1525 for j in 0..25 {
1526 let record = GenAIEvalWorkflowResult {
1527 record_uid: format!("record_uid_{i}_{j}"),
1528 created_at: Utc::now() + chrono::Duration::hours(i),
1529 entity_id,
1530 total_tasks: 10,
1531 passed_tasks: 8,
1532 failed_tasks: 2,
1533 pass_rate: 0.8,
1534 duration_ms: 1500,
1535 entity_uid: uid.clone(),
1536 execution_plan: ExecutionPlan::default(),
1537 id: 0,
1538 };
1539 let result =
1540 PostgresClient::insert_genai_eval_workflow_record(&pool, &record, &entity_id)
1541 .await
1542 .unwrap();
1543 assert_eq!(result.rows_affected(), 1);
1544 }
1545 }
1546
1547 let metric = PostgresClient::get_genai_workflow_value(&pool, ×tamp, &entity_id)
1548 .await
1549 .unwrap();
1550
1551 assert!(metric.is_some());
1552
1553 let binned_records = PostgresClient::get_binned_genai_workflow_values(
1554 &pool,
1555 &DriftRequest {
1556 uid: uid.clone(),
1557 time_interval: TimeInterval::OneHour,
1558 max_data_points: 1000,
1559 ..Default::default()
1560 },
1561 &DatabaseSettings::default().retention_period,
1562 &ObjectStorageSettings::default(),
1563 &entity_id,
1564 )
1565 .await
1566 .unwrap();
1567 assert_eq!(binned_records.metrics.len(), 1);
1569 }
1570
1571 #[tokio::test]
1572 async fn test_postgres_tracing_metrics() {
1573 let pool = db_pool().await;
1574 let script = std::fs::read_to_string("src/tests/script/populate_trace.sql").unwrap();
1575 sqlx::query(&script).execute(&pool).await.unwrap();
1576 let mut filters = TraceFilters::default();
1577
1578 let first_batch = PostgresClient::get_paginated_traces(&pool, filters.clone())
1579 .await
1580 .unwrap();
1581
1582 assert_eq!(
1583 first_batch.items.len(),
1584 50,
1585 "First batch should have 50 records"
1586 );
1587
1588 let last_record = first_batch.next_cursor.unwrap();
1590 filters = filters.next_page(&last_record);
1591
1592 let next_batch = PostgresClient::get_paginated_traces(&pool, filters.clone())
1593 .await
1594 .unwrap();
1595
1596 assert_eq!(
1598 next_batch.items.len(),
1599 50,
1600 "Next batch should have 50 records"
1601 );
1602
1603 let next_first_record = next_batch.items.first().unwrap();
1605 assert!(
1606 next_first_record.start_time <= last_record.start_time,
1607 "Next batch first record timestamp is not less than or equal to last record timestamp"
1608 );
1609
1610 filters = filters.previous_page(&next_batch.previous_cursor.unwrap());
1612 let previous_batch = PostgresClient::get_paginated_traces(&pool, filters.clone())
1613 .await
1614 .unwrap();
1615 assert_eq!(
1616 previous_batch.items.len(),
1617 50,
1618 "Previous batch should have 50 records"
1619 );
1620
1621 let filtered_record = first_batch
1623 .items
1624 .iter()
1625 .find(|record| record.span_count > 5)
1626 .unwrap();
1627
1628 filters.cursor_start_time = None;
1629 filters.cursor_trace_id = None;
1630
1631 let records = PostgresClient::get_paginated_traces(&pool, filters.clone())
1632 .await
1633 .unwrap();
1634
1635 assert!(
1637 !records.items.is_empty(),
1638 "Should return records with specified filters"
1639 );
1640
1641 let spans = PostgresClient::get_trace_spans(
1643 &pool,
1644 &filtered_record.trace_id,
1645 Some(&filtered_record.service_name),
1646 )
1647 .await
1648 .unwrap();
1649
1650 assert!(spans.len() == filtered_record.span_count as usize);
1651
1652 let start_time = filtered_record.start_time - chrono::Duration::hours(48);
1653 let end_time = filtered_record.start_time + chrono::Duration::minutes(5);
1654
1655 let trace_metrics =
1657 PostgresClient::get_trace_metrics(&pool, None, start_time, end_time, "5 minutes", None)
1658 .await
1659 .unwrap();
1660
1661 assert!(trace_metrics.len() >= 10);
1663
1664 let filters = scouter_types::sql::TraceFilters {
1666 attribute_filters: Some(vec![("component=kafka".to_string())]),
1667 ..Default::default()
1668 };
1669 let tagged_batch = PostgresClient::get_paginated_traces(&pool, filters)
1670 .await
1671 .unwrap();
1672 assert!(!tagged_batch.items.is_empty());
1673 }
1674
1675 #[tokio::test]
1676 async fn test_postgres_tracing_insert() {
1677 let pool = db_pool().await;
1678
1679 let mut trace_record = random_trace_record();
1681 let trace_id = trace_record.trace_id.clone();
1682
1683 let root_span = random_span_record(&trace_id, None, &trace_record.service_name);
1685 let child_span =
1686 random_span_record(&trace_id, Some(&root_span.span_id), &root_span.service_name);
1687
1688 trace_record.root_span_id = root_span.span_id.clone();
1690
1691 let result =
1693 PostgresClient::insert_span_batch(&pool, &[root_span.clone(), child_span.clone()])
1694 .await
1695 .unwrap();
1696
1697 assert_eq!(result.rows_affected(), 2);
1698
1699 let inserted_created_at = trace_record.created_at;
1700 let inserted_trace_id = trace_record.trace_id.clone();
1701
1702 let trace_filter = TraceFilters {
1703 cursor_start_time: Some(inserted_created_at + Duration::days(1)),
1704 cursor_trace_id: Some(inserted_trace_id),
1705 start_time: Some(inserted_created_at - Duration::minutes(5)),
1706 end_time: Some(inserted_created_at + Duration::days(1)),
1707 ..TraceFilters::default()
1708 };
1709
1710 let traces = PostgresClient::get_paginated_traces(&pool, trace_filter)
1711 .await
1712 .unwrap();
1713
1714 assert_eq!(traces.items.len(), 1);
1715 let retrieved_trace = &traces.items[0];
1716 assert_eq!(retrieved_trace.span_count, 2);
1718
1719 let baggage = TraceBaggageRecord {
1720 created_at: Utc::now(),
1721 trace_id: trace_record.trace_id.clone(),
1722 scope: "test_scope".to_string(),
1723 key: "user_id".to_string(),
1724 value: "12345".to_string(),
1725 };
1726
1727 let result =
1728 PostgresClient::insert_trace_baggage_batch(&pool, std::slice::from_ref(&baggage))
1729 .await
1730 .unwrap();
1731
1732 assert_eq!(result.rows_affected(), 1);
1733
1734 let retrieved_baggage =
1735 PostgresClient::get_trace_baggage_records(&pool, &trace_record.trace_id)
1736 .await
1737 .unwrap();
1738
1739 assert_eq!(retrieved_baggage.len(), 1);
1740 }
1741
1742 #[tokio::test]
1743 async fn test_postgres_tags() {
1744 let pool = db_pool().await;
1745 let uid = create_uuid7();
1746
1747 let tag1 = TagRecord {
1748 entity_id: uid.clone(),
1749 entity_type: "service".to_string(),
1750 key: "env".to_string(),
1751 value: "production".to_string(),
1752 };
1753
1754 let tag2 = TagRecord {
1755 entity_id: uid.clone(),
1756 entity_type: "service".to_string(),
1757 key: "team".to_string(),
1758 value: "backend".to_string(),
1759 };
1760
1761 let result = PostgresClient::insert_tag_batch(&pool, &[tag1.clone(), tag2.clone()])
1762 .await
1763 .unwrap();
1764
1765 assert_eq!(result.rows_affected(), 2);
1766
1767 let tags = PostgresClient::get_tags(&pool, "service", &uid)
1768 .await
1769 .unwrap();
1770
1771 assert_eq!(tags.len(), 2);
1772
1773 let tag_filter = vec![Tag {
1774 key: tags.first().unwrap().key.clone(),
1775 value: tags.first().unwrap().value.clone(),
1776 }];
1777
1778 let entity_id = PostgresClient::get_entity_id_by_tags(&pool, "service", &tag_filter, false)
1779 .await
1780 .unwrap();
1781
1782 assert_eq!(entity_id.first().unwrap(), &uid);
1783 }
1784}