scouter_sql/sql/
postgres.rs

1use crate::sql::cache::entity_cache;
2use crate::sql::cache::init_entity_cache;
3use crate::sql::error::SqlError;
4use crate::sql::traits::{
5    AlertSqlLogic, ArchiveSqlLogic, CustomMetricSqlLogic, GenAIDriftSqlLogic,
6    ObservabilitySqlLogic, ProfileSqlLogic, PsiSqlLogic, SpcSqlLogic, TagSqlLogic, TraceSqlLogic,
7    UserSqlLogic,
8};
9use scouter_settings::DatabaseSettings;
10use scouter_types::{RecordType, ServerRecords, TagRecord, ToDriftRecords, TraceServerRecord};
11use sqlx::ConnectOptions;
12use sqlx::{postgres::PgConnectOptions, Pool, Postgres};
13use std::result::Result::Ok;
14use tokio::try_join;
15use tracing::log::LevelFilter;
16use tracing::{debug, error, info, instrument};
17
18#[derive(Debug, Clone)]
19#[allow(dead_code)]
20pub struct PostgresClient {}
21
22impl SpcSqlLogic for PostgresClient {}
23impl CustomMetricSqlLogic for PostgresClient {}
24impl PsiSqlLogic for PostgresClient {}
25impl GenAIDriftSqlLogic for PostgresClient {}
26impl UserSqlLogic for PostgresClient {}
27impl ProfileSqlLogic for PostgresClient {}
28impl ObservabilitySqlLogic for PostgresClient {}
29impl AlertSqlLogic for PostgresClient {}
30impl ArchiveSqlLogic for PostgresClient {}
31impl TraceSqlLogic for PostgresClient {}
32impl TagSqlLogic for PostgresClient {}
33
34impl PostgresClient {
35    /// Setup the application with the given database pool.
36    ///
37    /// # Returns
38    ///
39    /// * `Result<Pool<Postgres>, anyhow::Error>` - Result of the database pool
40    #[instrument(skip(database_settings))]
41    pub async fn create_db_pool(
42        database_settings: &DatabaseSettings,
43    ) -> Result<Pool<Postgres>, SqlError> {
44        let mut opts: PgConnectOptions = database_settings.connection_uri.parse()?;
45
46        // Sqlx logs a lot of debug information by default, which can be overwhelming.
47        opts = opts.log_statements(LevelFilter::Off);
48
49        let pool = match sqlx::postgres::PgPoolOptions::new()
50            .max_connections(database_settings.max_connections)
51            .connect_with(opts)
52            .await
53        {
54            Ok(pool) => {
55                info!("✅ Successfully connected to database");
56                pool
57            }
58            Err(err) => {
59                error!("🚨 Failed to connect to database {:?}", err);
60                std::process::exit(1);
61            }
62        };
63
64        // setup entity cache
65        init_entity_cache(1000);
66
67        // Run migrations
68        if let Err(err) = Self::run_migrations(&pool).await {
69            error!("🚨 Failed to run migrations {:?}", err);
70            std::process::exit(1);
71        }
72
73        Ok(pool)
74    }
75
76    pub async fn run_migrations(pool: &Pool<Postgres>) -> Result<(), SqlError> {
77        info!("Running migrations");
78        sqlx::migrate!("src/migrations")
79            .run(pool)
80            .await
81            .map_err(SqlError::MigrateError)?;
82
83        debug!("Migrations complete");
84
85        Ok(())
86    }
87}
88
89pub struct MessageHandler {}
90
91impl MessageHandler {
92    const DEFAULT_BATCH_SIZE: usize = 500;
93    #[instrument(skip_all)]
94    pub async fn insert_server_records(
95        pool: &Pool<Postgres>,
96        records: ServerRecords,
97    ) -> Result<(), SqlError> {
98        debug!("Inserting server records: {:?}", records.record_type()?);
99
100        let entity_id = entity_cache()
101            .get_entity_id_from_uid(pool, records.uid()?)
102            .await
103            .inspect_err(|e| error!("Failed to get entity ID from UID: {:?}", e))?;
104
105        match records.record_type()? {
106            RecordType::Spc => {
107                let spc_records = records.to_spc_drift_records()?;
108                debug!("SPC record count: {}", spc_records.len());
109
110                for chunk in spc_records.chunks(Self::DEFAULT_BATCH_SIZE) {
111                    PostgresClient::insert_spc_drift_records_batch(pool, chunk, &entity_id)
112                        .await
113                        .map_err(|e| {
114                            error!("Failed to insert SPC drift records batch: {:?}", e);
115                            e
116                        })?;
117                }
118            }
119
120            RecordType::Psi => {
121                let psi_records = records.to_psi_drift_records()?;
122                debug!("PSI record count: {}", psi_records.len());
123
124                for chunk in psi_records.chunks(Self::DEFAULT_BATCH_SIZE) {
125                    PostgresClient::insert_bin_counts_batch(pool, chunk, &entity_id)
126                        .await
127                        .map_err(|e| {
128                            error!("Failed to insert PSI drift records batch: {:?}", e);
129                            e
130                        })?;
131                }
132            }
133            RecordType::Custom => {
134                let custom_records = records.to_custom_metric_drift_records()?;
135                debug!("Custom record count: {}", custom_records.len());
136
137                for chunk in custom_records.chunks(Self::DEFAULT_BATCH_SIZE) {
138                    PostgresClient::insert_custom_metric_values_batch(pool, chunk, &entity_id)
139                        .await
140                        .map_err(|e| {
141                            error!("Failed to insert custom metric records batch: {:?}", e);
142                            e
143                        })?;
144                }
145            }
146
147            RecordType::GenAIEval => {
148                debug!("LLM Drift record count: {:?}", records.len());
149                let records = records.to_genai_eval_records()?;
150                for record in records {
151                    let _ = PostgresClient::insert_genai_eval_record(pool, record, &entity_id)
152                        .await
153                        .map_err(|e| {
154                            error!("Failed to insert GenAI drift record: {:?}", e);
155                        });
156                }
157            }
158
159            RecordType::GenAITask => {
160                debug!("GenAI Task count: {:?}", records.len());
161                let records = records.to_genai_task_records()?;
162                for chunk in records.chunks(Self::DEFAULT_BATCH_SIZE) {
163                    PostgresClient::insert_eval_task_results_batch(pool, chunk, &entity_id)
164                        .await
165                        .map_err(|e| {
166                            error!("Failed to insert GenAI task records batch: {:?}", e);
167                            e
168                        })?;
169                }
170            }
171
172            RecordType::GenAIWorkflow => {
173                debug!("GenAI Workflow count: {:?}", records.len());
174                let records = records.to_genai_workflow_records()?;
175                for record in records {
176                    let _ = PostgresClient::insert_genai_eval_workflow_record(
177                        pool, &record, &entity_id,
178                    )
179                    .await
180                    .map_err(|e| {
181                        error!("Failed to insert GenAI workflow record: {:?}", e);
182                    });
183                }
184            }
185
186            _ => {
187                error!(
188                    "Unsupported record type for batch insert: {:?}",
189                    records.record_type()?
190                );
191                return Err(SqlError::UnsupportedBatchTypeError);
192            }
193        }
194
195        Ok(())
196    }
197
198    pub async fn insert_trace_server_record(
199        pool: &Pool<Postgres>,
200        records: TraceServerRecord,
201    ) -> Result<(), SqlError> {
202        let (span_batch, baggage_batch, tag_records) = records.to_records()?;
203
204        let (span_result, baggage_result, tag_result) = try_join!(
205            PostgresClient::insert_span_batch(pool, &span_batch),
206            PostgresClient::insert_trace_baggage_batch(pool, &baggage_batch),
207            PostgresClient::insert_tag_batch(pool, &tag_records),
208        )?;
209
210        debug!(
211            span_rows = span_result.rows_affected(),
212            baggage_rows = baggage_result.rows_affected(),
213            total_spans = span_batch.len(),
214            total_baggage = baggage_batch.len(),
215            tag_rows = tag_result.rows_affected(),
216            "Successfully inserted trace server records"
217        );
218        Ok(())
219    }
220
221    pub async fn insert_tag_record(
222        pool: &Pool<Postgres>,
223        record: TagRecord,
224    ) -> Result<(), SqlError> {
225        let result = PostgresClient::insert_tag_batch(pool, std::slice::from_ref(&record)).await?;
226
227        debug!(
228            rows_affected = result.rows_affected(),
229            entity_type = record.entity_type.as_str(),
230            entity_id = record.entity_id.as_str(),
231            key = record.key.as_str(),
232            "Successfully inserted tag record"
233        );
234
235        Ok(())
236    }
237}
238
239/// Runs database integratino tests
240/// Note - binned queries targeting custom intervals with long-term and short-term data are
241/// done in the scouter-server integration tests
242#[cfg(test)]
243mod tests {
244
245    use super::*;
246    use crate::sql::schema::User;
247    use crate::sql::traits::EntitySqlLogic;
248    use chrono::{Duration, Utc};
249    use potato_head::create_uuid7;
250    use rand::Rng;
251    use scouter_semver::VersionType;
252    use scouter_settings::ObjectStorageSettings;
253    use scouter_types::genai::ExecutionPlan;
254    use scouter_types::psi::{Bin, BinType, PsiDriftConfig, PsiFeatureDriftProfile};
255    use scouter_types::spc::SpcDriftProfile;
256    use scouter_types::sql::TraceFilters;
257    use scouter_types::*;
258    use serde_json::Value;
259
260    const SPACE: &str = "space";
261    const NAME: &str = "name";
262    const VERSION: &str = "1.0.0";
263    const SCOPE: &str = "scope";
264    const UID: &str = "test";
265    const ENTITY_ID: i32 = 9999;
266
267    fn random_trace_record() -> TraceRecord {
268        let mut rng = rand::rng();
269        let random_num = rng.random_range(0..1000);
270        let trace_id: String = (0..32)
271            .map(|_| format!("{:x}", rng.random_range(0..16)))
272            .collect();
273        let span_id: String = (0..16)
274            .map(|_| format!("{:x}", rng.random_range(0..16)))
275            .collect();
276        let created_at = Utc::now() + chrono::Duration::milliseconds(random_num);
277
278        TraceRecord {
279            trace_id: trace_id.clone(),
280            created_at,
281            service_name: format!("service_{}", random_num % 10),
282            scope: SCOPE.to_string(),
283            trace_state: "running".to_string(),
284            start_time: created_at,
285            end_time: created_at + chrono::Duration::milliseconds(150),
286            duration_ms: 150,
287            status_code: 0,
288            span_count: 1,
289            status_message: "OK".to_string(),
290            root_span_id: span_id.clone(),
291            tags: vec![],
292            process_attributes: vec![],
293        }
294    }
295
296    fn random_span_record(
297        trace_id: &str,
298        parent_span_id: Option<&str>,
299        service_name: &str,
300    ) -> TraceSpanRecord {
301        let mut rng = rand::rng();
302        let span_id: String = (0..16)
303            .map(|_| format!("{:x}", rng.random_range(0..16)))
304            .collect();
305
306        let random_offset_ms = rng.random_range(0..1000);
307        let duration_ms_val = rng.random_range(50..500);
308
309        let created_at = Utc::now() + chrono::Duration::milliseconds(random_offset_ms);
310        let start_time = created_at;
311        let end_time = start_time + chrono::Duration::milliseconds(duration_ms_val);
312
313        // --- Status and Kind ---
314        let status_code = if rng.random_bool(0.95) { 0 } else { 2 };
315        let span_kind_options = ["SERVER", "CLIENT", "INTERNAL", "PRODUCER", "CONSUMER"];
316        let span_kind = span_kind_options[rng.random_range(0..span_kind_options.len())].to_string();
317
318        TraceSpanRecord {
319            created_at,
320            span_id,
321            trace_id: trace_id.to_string(),
322            parent_span_id: parent_span_id.map(|s| s.to_string()),
323            service_name: service_name.to_string(),
324            scope: SCOPE.to_string(),
325            span_name: format!("{}_{}", "random_operation", rng.random_range(0..10)),
326            span_kind,
327            start_time,
328            end_time,
329            duration_ms: duration_ms_val,
330            status_code,
331            status_message: if status_code == 2 {
332                "Internal Server Error".to_string()
333            } else {
334                "OK".to_string()
335            },
336            attributes: vec![Attribute::default()],
337            events: vec![],
338            links: vec![],
339            label: None,
340            input: Value::default(),
341            output: Value::default(),
342            resource_attributes: vec![],
343        }
344    }
345
346    pub async fn cleanup(pool: &Pool<Postgres>) {
347        sqlx::raw_sql(
348            r#"
349            DELETE
350            FROM scouter.service_entities;
351
352            DELETE
353            FROM scouter.drift_entities;
354
355            DELETE
356            FROM scouter.spc_drift;
357
358            DELETE
359            FROM scouter.observability_metric;
360
361            DELETE
362            FROM scouter.custom_drift;
363
364            DELETE
365            FROM scouter.drift_alert;
366
367            DELETE
368            FROM scouter.drift_profile;
369
370            DELETE
371            FROM scouter.psi_drift;
372
373            DELETE
374            FROM scouter.user;
375
376            DELETE
377            FROM scouter.genai_eval_record;
378
379            DELETE
380            FROM scouter.genai_eval_task;
381
382            DELETE
383            FROM scouter.genai_eval_workflow;
384
385            DELETE
386            FROM scouter.spans;
387
388            DELETE
389            FROM scouter.trace_baggage;
390
391            DELETE
392            FROM scouter.tags;
393            "#,
394        )
395        .fetch_all(pool)
396        .await
397        .unwrap();
398    }
399
400    pub async fn db_pool() -> Pool<Postgres> {
401        let pool = PostgresClient::create_db_pool(&DatabaseSettings::default())
402            .await
403            .unwrap();
404        cleanup(&pool).await;
405        pool
406    }
407
408    pub async fn insert_profile_to_db(
409        pool: &Pool<Postgres>,
410        profile: &DriftProfile,
411        active: bool,
412        deactivate_others: bool,
413    ) -> String {
414        let base_args = profile.get_base_args();
415        let version_request = VersionRequest {
416            version: None,
417            version_type: VersionType::Minor,
418            pre_tag: None,
419            build_tag: None,
420        };
421        let version = PostgresClient::get_next_profile_version(pool, &base_args, version_request)
422            .await
423            .unwrap();
424
425        let result = PostgresClient::insert_drift_profile(
426            pool,
427            profile,
428            &base_args,
429            &version,
430            &active,
431            &deactivate_others,
432        )
433        .await
434        .unwrap();
435
436        result
437    }
438
439    #[tokio::test]
440    async fn test_postgres_start() {
441        let _pool = db_pool().await;
442    }
443
444    #[tokio::test]
445    async fn test_postgres_drift_alert() {
446        let pool = db_pool().await;
447        let entity_id = 9999;
448
449        // Insert 10 alerts with slight delays to ensure different timestamps
450        for i in 0..10 {
451            let alert = AlertMap::Custom(custom::ComparisonMetricAlert {
452                metric_name: "test".to_string(),
453                baseline_value: 0 as f64,
454                observed_value: i as f64,
455                delta: None,
456                alert_threshold: AlertThreshold::Above,
457            });
458
459            let result = PostgresClient::insert_drift_alert(&pool, &entity_id, &alert)
460                .await
461                .unwrap();
462
463            assert_eq!(result.rows_affected(), 1);
464
465            // Small delay to ensure timestamp ordering
466            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
467        }
468
469        // Test 1: Get first page with default limit (50)
470        let request = DriftAlertPaginationRequest {
471            uid: UID.to_string(),
472            active: Some(true),
473            limit: Some(50),
474            ..Default::default()
475        };
476
477        let response = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
478            .await
479            .unwrap();
480
481        assert_eq!(response.items.len(), 10);
482        assert!(!response.has_next); // No more items
483        assert!(!response.has_previous); // First page
484        assert!(response.next_cursor.is_none());
485        assert!(response.previous_cursor.is_some());
486
487        // Test 2: Paginate with limit of 3 - forward direction
488        let request = DriftAlertPaginationRequest {
489            uid: UID.to_string(),
490            active: Some(true),
491            limit: Some(3),
492            direction: Some("next".to_string()),
493            ..Default::default()
494        };
495
496        let page1 = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
497            .await
498            .unwrap();
499
500        assert_eq!(page1.items.len(), 3);
501        assert!(page1.has_next);
502        assert!(!page1.has_previous);
503        assert!(page1.next_cursor.is_some());
504        assert!(page1.previous_cursor.is_some());
505
506        // Test 3: Get second page using next cursor
507        let next_cursor = page1.next_cursor.unwrap();
508        let request = DriftAlertPaginationRequest {
509            uid: UID.to_string(),
510            active: Some(true),
511            limit: Some(3),
512            cursor_created_at: Some(next_cursor.created_at),
513            cursor_id: Some(next_cursor.id as i32),
514            direction: Some("next".to_string()),
515            start_datetime: None,
516            end_datetime: None,
517        };
518
519        let page2 = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
520            .await
521            .unwrap();
522
523        assert_eq!(page2.items.len(), 3);
524        assert!(page2.has_next); // More items available
525        assert!(page2.has_previous); // Can go back
526        assert!(page2.next_cursor.is_some());
527        assert!(page2.previous_cursor.is_some());
528
529        // Verify no overlap between pages
530        let page1_ids: std::collections::HashSet<_> = page1.items.iter().map(|a| a.id).collect();
531        let page2_ids: std::collections::HashSet<_> = page2.items.iter().map(|a| a.id).collect();
532        assert!(page1_ids.is_disjoint(&page2_ids));
533
534        // Test 4: Navigate backward using previous cursor
535        let prev_cursor = page2.previous_cursor.unwrap();
536        let request = DriftAlertPaginationRequest {
537            uid: UID.to_string(),
538            active: Some(true),
539            limit: Some(3),
540            cursor_created_at: Some(prev_cursor.created_at),
541            cursor_id: Some(prev_cursor.id as i32),
542            direction: Some("previous".to_string()),
543            start_datetime: None,
544            end_datetime: None,
545        };
546
547        let page_back = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
548            .await
549            .unwrap();
550
551        assert_eq!(page_back.items.len(), 3);
552        assert!(page_back.has_next); // Can go forward
553                                     // Should return to page1 items
554        assert_eq!(
555            page_back.items.iter().map(|a| a.id).collect::<Vec<_>>(),
556            page1.items.iter().map(|a| a.id).collect::<Vec<_>>()
557        );
558
559        // Test 5: Filter by active status
560        // Deactivate some alerts first
561        let to_deactivate = &page1.items[0];
562        let update_request = UpdateAlertStatus {
563            id: to_deactivate.id,
564            active: false,
565            space: "test_space".to_string(),
566        };
567
568        PostgresClient::update_drift_alert_status(&pool, &update_request)
569            .await
570            .unwrap();
571
572        // Query only active alerts
573        let request = DriftAlertPaginationRequest {
574            uid: UID.to_string(),
575            active: Some(true),
576            limit: Some(50),
577            ..Default::default()
578        };
579
580        let active_alerts = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
581            .await
582            .unwrap();
583
584        assert_eq!(active_alerts.items.len(), 9); // 10 - 1 deactivated
585        assert!(active_alerts.items.iter().all(|a| a.active));
586
587        // Query only inactive alerts
588        let request = DriftAlertPaginationRequest {
589            uid: UID.to_string(),
590            active: Some(false),
591            limit: Some(50),
592            ..Default::default()
593        };
594
595        let inactive_alerts =
596            PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
597                .await
598                .unwrap();
599
600        assert_eq!(inactive_alerts.items.len(), 1);
601        assert!(!inactive_alerts.items[0].active);
602
603        // Test 6: Query all alerts (active and inactive)
604        let request = DriftAlertPaginationRequest {
605            uid: UID.to_string(),
606            active: None, // No filter
607            limit: Some(50),
608            ..Default::default()
609        };
610
611        let all_alerts = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
612            .await
613            .unwrap();
614
615        assert_eq!(all_alerts.items.len(), 10);
616
617        // Test 7: Empty result set
618        let request = DriftAlertPaginationRequest {
619            uid: UID.to_string(),
620            active: Some(true),
621            limit: Some(3),
622            ..Default::default()
623        };
624
625        let non_existent = PostgresClient::get_paginated_drift_alerts(&pool, &request, &999999)
626            .await
627            .unwrap();
628
629        assert_eq!(non_existent.items.len(), 0);
630        assert!(!non_existent.has_next);
631        assert!(!non_existent.has_previous);
632        assert!(non_existent.next_cursor.is_none());
633        assert!(non_existent.previous_cursor.is_none());
634    }
635
636    #[tokio::test]
637    async fn test_postgres_spc_drift_record() {
638        let pool = db_pool().await;
639
640        let record1 = SpcRecord {
641            created_at: Utc::now(),
642            uid: UID.to_string(),
643            feature: "test".to_string(),
644            value: 1.0,
645            entity_id: 0,
646        };
647
648        let record2 = SpcRecord {
649            created_at: Utc::now(),
650            uid: UID.to_string(),
651            feature: "test2".to_string(),
652            value: 2.0,
653            entity_id: 0,
654        };
655
656        let result =
657            PostgresClient::insert_spc_drift_records_batch(&pool, &[record1, record2], &ENTITY_ID)
658                .await
659                .unwrap();
660
661        assert_eq!(result.rows_affected(), 2);
662    }
663
664    #[tokio::test]
665    async fn test_postgres_bin_count() {
666        let pool = db_pool().await;
667
668        let record1 = PsiRecord {
669            created_at: Utc::now(),
670            uid: UID.to_string(),
671            feature: "test".to_string(),
672            bin_id: 1,
673            bin_count: 1,
674            entity_id: ENTITY_ID,
675        };
676
677        let record2 = PsiRecord {
678            created_at: Utc::now(),
679            uid: UID.to_string(),
680            feature: "test2".to_string(),
681            bin_id: 2,
682            bin_count: 2,
683            entity_id: ENTITY_ID,
684        };
685
686        let result =
687            PostgresClient::insert_bin_counts_batch(&pool, &[record1, record2], &ENTITY_ID)
688                .await
689                .unwrap();
690
691        assert_eq!(result.rows_affected(), 2);
692    }
693
694    #[tokio::test]
695    async fn test_postgres_observability_record() {
696        let pool = db_pool().await;
697
698        let record = ObservabilityMetrics::default();
699
700        let result = PostgresClient::insert_observability_record(&pool, &record, &ENTITY_ID)
701            .await
702            .unwrap();
703
704        assert_eq!(result.rows_affected(), 1);
705    }
706
707    #[tokio::test]
708    async fn test_postgres_crud_drift_profile() {
709        let pool = db_pool().await;
710
711        let mut spc_profile = SpcDriftProfile::default();
712        let profile = DriftProfile::Spc(spc_profile.clone());
713        let uid = insert_profile_to_db(&pool, &profile, false, false).await;
714        assert!(!uid.is_empty());
715
716        let entity_id = PostgresClient::get_entity_id_from_uid(&pool, &uid)
717            .await
718            .unwrap();
719
720        spc_profile.scouter_version = "test".to_string();
721
722        let result = PostgresClient::update_drift_profile(
723            &pool,
724            &DriftProfile::Spc(spc_profile.clone()),
725            &entity_id,
726        )
727        .await
728        .unwrap();
729
730        assert_eq!(result.rows_affected(), 1);
731
732        let profile = PostgresClient::get_drift_profile(&pool, &entity_id)
733            .await
734            .unwrap();
735
736        let deserialized = serde_json::from_value::<SpcDriftProfile>(profile.unwrap()).unwrap();
737
738        assert_eq!(deserialized, spc_profile);
739
740        PostgresClient::update_drift_profile_status(
741            &pool,
742            &ProfileStatusRequest {
743                name: spc_profile.config.name.clone(),
744                space: spc_profile.config.space.clone(),
745                version: spc_profile.config.version.clone(),
746                active: false,
747                drift_type: Some(DriftType::Spc),
748                deactivate_others: false,
749            },
750        )
751        .await
752        .unwrap();
753    }
754
755    #[tokio::test]
756    async fn test_postgres_get_features() {
757        let pool = db_pool().await;
758
759        let timestamp = Utc::now();
760
761        for _ in 0..10 {
762            let mut records = Vec::new();
763            for j in 0..10 {
764                let record = SpcRecord {
765                    created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
766                    uid: UID.to_string(),
767                    feature: format!("test{j}"),
768                    value: j as f64,
769                    entity_id: ENTITY_ID,
770                };
771
772                records.push(record);
773            }
774
775            let result =
776                PostgresClient::insert_spc_drift_records_batch(&pool, &records, &ENTITY_ID)
777                    .await
778                    .unwrap();
779            assert_eq!(result.rows_affected(), records.len() as u64);
780        }
781
782        let features = PostgresClient::get_spc_features(&pool, &ENTITY_ID)
783            .await
784            .unwrap();
785        assert_eq!(features.len(), 10);
786
787        let records =
788            PostgresClient::get_spc_drift_records(&pool, &timestamp, &features, &ENTITY_ID)
789                .await
790                .unwrap();
791
792        assert_eq!(records.features.len(), 10);
793
794        let binned_records = PostgresClient::get_binned_spc_drift_records(
795            &pool,
796            &DriftRequest {
797                uid: UID.to_string(),
798                time_interval: TimeInterval::FifteenMinutes,
799                max_data_points: 10,
800                ..Default::default()
801            },
802            &DatabaseSettings::default().retention_period,
803            &ObjectStorageSettings::default(),
804            &ENTITY_ID,
805        )
806        .await
807        .unwrap();
808
809        assert_eq!(binned_records.features.len(), 10);
810    }
811
812    #[tokio::test]
813    async fn test_postgres_bin_proportions() {
814        let pool = db_pool().await;
815
816        let timestamp = Utc::now();
817
818        let num_features = 3;
819        let num_bins = 5;
820
821        let features = (0..=num_features)
822            .map(|feature| {
823                let bins = (0..=num_bins)
824                    .map(|bind_id| Bin {
825                        id: bind_id,
826                        lower_limit: None,
827                        upper_limit: None,
828                        proportion: 0.0,
829                    })
830                    .collect();
831                let feature_name = format!("feature{feature}");
832                let feature_profile = PsiFeatureDriftProfile {
833                    id: feature_name.clone(),
834                    bins,
835                    timestamp,
836                    bin_type: BinType::Numeric,
837                };
838                (feature_name, feature_profile)
839            })
840            .collect();
841
842        let profile = &DriftProfile::Psi(psi::PsiDriftProfile::new(
843            features,
844            PsiDriftConfig {
845                space: SPACE.to_string(),
846                name: NAME.to_string(),
847                version: VERSION.to_string(),
848                ..Default::default()
849            },
850        ));
851        let uid = insert_profile_to_db(&pool, profile, false, false).await;
852        let entity_id = PostgresClient::get_entity_id_from_uid(&pool, &uid)
853            .await
854            .unwrap();
855
856        for feature in 0..num_features {
857            for bin in 0..=num_bins {
858                let mut records = Vec::new();
859                for j in 0..=100 {
860                    let record = PsiRecord {
861                        created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
862                        uid: uid.to_string(),
863                        feature: format!("feature{feature}"),
864                        bin_id: bin,
865                        bin_count: rand::rng().random_range(0..10) as i32,
866                        entity_id: ENTITY_ID,
867                    };
868
869                    records.push(record);
870                }
871                PostgresClient::insert_bin_counts_batch(&pool, &records, &entity_id)
872                    .await
873                    .unwrap();
874            }
875        }
876
877        let binned_records = PostgresClient::get_feature_distributions(
878            &pool,
879            &timestamp,
880            &["feature0".to_string()],
881            &entity_id,
882        )
883        .await
884        .unwrap();
885
886        // assert binned_records.features["test"]["decile_1"] is around .5
887        let bin_proportion = binned_records
888            .distributions
889            .get("feature0")
890            .unwrap()
891            .bins
892            .get(&1)
893            .unwrap();
894
895        assert!(*bin_proportion > 0.1 && *bin_proportion < 0.2);
896
897        let binned_records = PostgresClient::get_binned_psi_drift_records(
898            &pool,
899            &DriftRequest {
900                uid: UID.to_string(),
901                time_interval: TimeInterval::OneHour,
902                max_data_points: 1000,
903                ..Default::default()
904            },
905            &DatabaseSettings::default().retention_period,
906            &ObjectStorageSettings::default(),
907            &entity_id,
908        )
909        .await
910        .unwrap();
911        //
912        assert_eq!(binned_records.len(), 3);
913    }
914
915    #[tokio::test]
916    async fn test_postgres_cru_custom_metric() {
917        let pool = db_pool().await;
918        let timestamp = Utc::now();
919
920        let (uid, entity_id) = PostgresClient::create_entity(
921            &pool,
922            SPACE,
923            NAME,
924            VERSION,
925            DriftType::Custom.to_string(),
926        )
927        .await
928        .unwrap();
929
930        for i in 0..2 {
931            let mut records = Vec::new();
932            for j in 0..25 {
933                let record = CustomMetricRecord {
934                    created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
935                    uid: uid.clone(),
936                    metric: format!("metric{i}"),
937                    value: rand::rng().random_range(0..10) as f64,
938                    entity_id: ENTITY_ID,
939                };
940                records.push(record);
941            }
942            let result =
943                PostgresClient::insert_custom_metric_values_batch(&pool, &records, &entity_id)
944                    .await
945                    .unwrap();
946            assert_eq!(result.rows_affected(), 25);
947        }
948
949        // insert random record to test has statistics funcs handle single record
950        let record = CustomMetricRecord {
951            created_at: Utc::now(),
952            uid: uid.clone(),
953            metric: "metric3".to_string(),
954            value: rand::rng().random_range(0..10) as f64,
955            entity_id: ENTITY_ID,
956        };
957
958        let result =
959            PostgresClient::insert_custom_metric_values_batch(&pool, &[record], &entity_id)
960                .await
961                .unwrap();
962        assert_eq!(result.rows_affected(), 1);
963
964        let metrics = PostgresClient::get_custom_metric_values(
965            &pool,
966            &timestamp,
967            &["metric1".to_string()],
968            &entity_id,
969        )
970        .await
971        .unwrap();
972
973        assert_eq!(metrics.len(), 1);
974
975        let binned_records = PostgresClient::get_binned_custom_drift_records(
976            &pool,
977            &DriftRequest {
978                uid: uid.clone(),
979                time_interval: TimeInterval::OneHour,
980                max_data_points: 1000,
981                ..Default::default()
982            },
983            &DatabaseSettings::default().retention_period,
984            &ObjectStorageSettings::default(),
985            &entity_id,
986        )
987        .await
988        .unwrap();
989        //
990        assert_eq!(binned_records.metrics.len(), 3);
991    }
992
993    #[tokio::test]
994    async fn test_postgres_user() {
995        let pool = db_pool().await;
996        let recovery_codes = vec!["recovery_code_1".to_string(), "recovery_code_2".to_string()];
997
998        // Create
999        let user = User::new(
1000            "user".to_string(),
1001            "pass".to_string(),
1002            "email".to_string(),
1003            recovery_codes,
1004            None,
1005            None,
1006            None,
1007            None,
1008        );
1009        PostgresClient::insert_user(&pool, &user).await.unwrap();
1010
1011        // Read
1012        let mut user = PostgresClient::get_user(&pool, "user")
1013            .await
1014            .unwrap()
1015            .unwrap();
1016
1017        assert_eq!(user.username, "user");
1018        assert_eq!(user.group_permissions, vec!["user"]);
1019        assert_eq!(user.email, "email");
1020
1021        // update user
1022        user.active = false;
1023        user.refresh_token = Some("token".to_string());
1024
1025        // Update
1026        PostgresClient::update_user(&pool, &user).await.unwrap();
1027        let user = PostgresClient::get_user(&pool, "user")
1028            .await
1029            .unwrap()
1030            .unwrap();
1031        assert!(!user.active);
1032        assert_eq!(user.refresh_token.unwrap(), "token");
1033
1034        // get users
1035        let users = PostgresClient::get_users(&pool).await.unwrap();
1036        assert_eq!(users.len(), 1);
1037
1038        // get last admin
1039        let is_last_admin = PostgresClient::is_last_admin(&pool, "user").await.unwrap();
1040        assert!(!is_last_admin);
1041
1042        // delete
1043        PostgresClient::delete_user(&pool, "user").await.unwrap();
1044    }
1045
1046    #[tokio::test]
1047    async fn test_postgres_genai_eval_record_insert_get() {
1048        let pool = db_pool().await;
1049
1050        let (uid, entity_id) = PostgresClient::create_entity(
1051            &pool,
1052            SPACE,
1053            NAME,
1054            VERSION,
1055            DriftType::GenAI.to_string(),
1056        )
1057        .await
1058        .unwrap();
1059
1060        let input = "This is a test input";
1061        let output = "This is a test response";
1062
1063        for j in 0..10 {
1064            let context = serde_json::json!({
1065                "input": input,
1066                "response": output,
1067            });
1068            let record = GenAIEvalRecord {
1069                created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1070                context,
1071                status: Status::Pending,
1072                id: 0, // This will be set by the database
1073                uid: format!("test_{}", j),
1074                entity_uid: uid.clone(),
1075                entity_id,
1076                ..Default::default()
1077            };
1078
1079            let boxed = BoxedGenAIEvalRecord::new(record);
1080
1081            let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1082                .await
1083                .unwrap();
1084
1085            assert_eq!(result.rows_affected(), 1);
1086        }
1087
1088        let features = PostgresClient::get_genai_eval_records(&pool, None, None, &entity_id)
1089            .await
1090            .unwrap();
1091        assert_eq!(features.len(), 10);
1092
1093        // get pending task
1094        let pending_tasks = PostgresClient::get_pending_genai_eval_record(&pool)
1095            .await
1096            .unwrap();
1097
1098        // assert not empty
1099        assert!(pending_tasks.is_some());
1100
1101        // get pending task with space, name, version
1102        let task_input = &pending_tasks.as_ref().unwrap().context["input"];
1103        assert_eq!(*task_input, "This is a test input".to_string());
1104
1105        // update pending task
1106        PostgresClient::update_genai_eval_record_status(
1107            &pool,
1108            &pending_tasks.unwrap(),
1109            Status::Processed,
1110            &(1_i64),
1111        )
1112        .await
1113        .unwrap();
1114
1115        // query processed tasks
1116        let processed_tasks = PostgresClient::get_genai_eval_records(
1117            &pool,
1118            None,
1119            Some(Status::Processed),
1120            &entity_id,
1121        )
1122        .await
1123        .unwrap();
1124
1125        // assert not empty
1126        assert_eq!(processed_tasks.len(), 1);
1127    }
1128
1129    #[tokio::test]
1130    async fn test_postgres_genai_eval_record_pagination() {
1131        let pool = db_pool().await;
1132
1133        let (uid, entity_id) = PostgresClient::create_entity(
1134            &pool,
1135            SPACE,
1136            NAME,
1137            VERSION,
1138            DriftType::GenAI.to_string(),
1139        )
1140        .await
1141        .unwrap();
1142
1143        let input = "This is a test input";
1144        let output = "This is a test response";
1145
1146        // Insert 10 records with increasing timestamps
1147        for j in 0..10 {
1148            let context = serde_json::json!({
1149                "input": input,
1150                "response": output,
1151            });
1152            let record = GenAIEvalRecord {
1153                created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1154                context,
1155                status: Status::Pending,
1156                id: 0, // This will be set by the database
1157                uid: format!("test_{}", j),
1158                entity_uid: uid.clone(),
1159                entity_id: ENTITY_ID,
1160                ..Default::default()
1161            };
1162
1163            let boxed = BoxedGenAIEvalRecord::new(record);
1164
1165            let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1166                .await
1167                .unwrap();
1168
1169            assert_eq!(result.rows_affected(), 1);
1170        }
1171
1172        // ===== PAGE 1: Get first 5 records (newest) =====
1173        let params = GenAIEvalRecordPaginationRequest {
1174            status: None,
1175            limit: Some(5),
1176            cursor_created_at: None,
1177            cursor_id: None,
1178            direction: None,
1179            ..Default::default()
1180        };
1181
1182        let page1 = PostgresClient::get_paginated_genai_eval_records(&pool, &params, &entity_id)
1183            .await
1184            .unwrap();
1185
1186        assert_eq!(page1.items.len(), 5, "Page 1 should have 5 records");
1187        assert!(page1.has_next, "Should have next page");
1188        assert!(
1189            !page1.has_previous,
1190            "Should not have previous page (first page)"
1191        );
1192        assert!(page1.next_cursor.is_some(), "Should have next cursor");
1193
1194        // First item should be the NEWEST record (highest ID)
1195        let page1_first = page1.items.first().unwrap();
1196        let page1_last = page1.items.last().unwrap();
1197
1198        assert!(
1199            page1_first.created_at >= page1_last.created_at,
1200            "Page 1 should be sorted newest first (DESC)"
1201        );
1202
1203        // ===== PAGE 2: Get next 5 records (older) =====
1204        let next_cursor = page1.next_cursor.unwrap();
1205
1206        let params = GenAIEvalRecordPaginationRequest {
1207            status: None,
1208            limit: Some(5),
1209            cursor_created_at: Some(next_cursor.created_at),
1210            cursor_id: Some(next_cursor.id),
1211            direction: None,
1212            ..Default::default()
1213        };
1214
1215        let page2 = PostgresClient::get_paginated_genai_eval_records(&pool, &params, &entity_id)
1216            .await
1217            .unwrap();
1218
1219        assert_eq!(page2.items.len(), 5, "Page 2 should have 5 records");
1220        assert!(!page2.has_next, "Should not have next page (last page)");
1221        assert!(page2.has_previous, "Should have previous page");
1222        assert!(
1223            page2.previous_cursor.is_some(),
1224            "Should have previous cursor"
1225        );
1226
1227        let page2_first = page2.items.first().unwrap();
1228
1229        // Page 2 first item should be OLDER than Page 1 last item
1230        assert!(
1231            page2_first.created_at < page1_last.created_at
1232                || (page2_first.created_at == page1_last.created_at
1233                    && page2_first.id < page1_last.id),
1234            "Page 2 should start with records older than Page 1 last item"
1235        );
1236
1237        // Verify we got all 10 records across both pages
1238        let all_ids: Vec<i64> = page1
1239            .items
1240            .iter()
1241            .chain(page2.items.iter())
1242            .map(|r| r.id)
1243            .collect();
1244
1245        assert_eq!(all_ids.len(), 10, "Should have 10 unique records total");
1246
1247        // All IDs should be unique
1248        let unique_ids: std::collections::HashSet<_> = all_ids.iter().collect();
1249        assert_eq!(unique_ids.len(), 10, "All IDs should be unique");
1250
1251        // ===== BACKWARD PAGINATION TEST =====
1252        // Go back from page 2 to page 1
1253        let previous_cursor = page2.previous_cursor.unwrap();
1254
1255        let params = GenAIEvalRecordPaginationRequest {
1256            status: None,
1257            limit: Some(5),
1258            cursor_created_at: Some(previous_cursor.created_at),
1259            cursor_id: Some(previous_cursor.id),
1260            direction: Some("previous".to_string()),
1261            ..Default::default()
1262        };
1263
1264        let page1_again =
1265            PostgresClient::get_paginated_genai_eval_records(&pool, &params, &entity_id)
1266                .await
1267                .unwrap();
1268
1269        assert_eq!(
1270            page1_again.items.len(),
1271            5,
1272            "Going back should return 5 records"
1273        );
1274
1275        // Should get the same records as page 1
1276        assert_eq!(
1277            page1_again.items.first().unwrap().id,
1278            page1_first.id,
1279            "Should return to the same first record"
1280        );
1281    }
1282
1283    #[tokio::test]
1284    async fn test_postgres_genai_eval_workflow_pagination() {
1285        let pool = db_pool().await;
1286
1287        let (_uid, entity_id) = PostgresClient::create_entity(
1288            &pool,
1289            SPACE,
1290            NAME,
1291            VERSION,
1292            DriftType::GenAI.to_string(),
1293        )
1294        .await
1295        .unwrap();
1296
1297        // Insert 10 records with increasing timestamps
1298        for j in 0..10 {
1299            let record = GenAIEvalWorkflowResult {
1300                created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1301                record_uid: format!("test_{}", j),
1302                entity_id,
1303                ..Default::default()
1304            };
1305
1306            let result =
1307                PostgresClient::insert_genai_eval_workflow_record(&pool, &record, &entity_id)
1308                    .await
1309                    .unwrap();
1310
1311            assert_eq!(result.rows_affected(), 1);
1312        }
1313
1314        // ===== PAGE 1: Get first 5 records (newest) =====
1315        let params = GenAIEvalRecordPaginationRequest {
1316            status: None,
1317            limit: Some(5),
1318            cursor_created_at: None,
1319            cursor_id: None,
1320            direction: None,
1321            ..Default::default()
1322        };
1323
1324        let page1 =
1325            PostgresClient::get_paginated_genai_eval_workflow_records(&pool, &params, &entity_id)
1326                .await
1327                .unwrap();
1328
1329        assert_eq!(page1.items.len(), 5, "Page 1 should have 5 records");
1330        assert!(page1.has_next, "Should have next page");
1331        assert!(
1332            !page1.has_previous,
1333            "Should not have previous page (first page)"
1334        );
1335        assert!(page1.next_cursor.is_some(), "Should have next cursor");
1336
1337        // First item should be the NEWEST record (highest ID)
1338        let page1_first = page1.items.first().unwrap();
1339        let page1_last = page1.items.last().unwrap();
1340
1341        assert!(
1342            page1_first.created_at >= page1_last.created_at,
1343            "Page 1 should be sorted newest first (DESC)"
1344        );
1345
1346        // ===== PAGE 2: Get next 5 records (older) =====
1347        let next_cursor = page1.next_cursor.unwrap();
1348
1349        let params = GenAIEvalRecordPaginationRequest {
1350            status: None,
1351            limit: Some(5),
1352            cursor_created_at: Some(next_cursor.created_at),
1353            cursor_id: Some(next_cursor.id),
1354            direction: None,
1355            ..Default::default()
1356        };
1357
1358        let page2 =
1359            PostgresClient::get_paginated_genai_eval_workflow_records(&pool, &params, &entity_id)
1360                .await
1361                .unwrap();
1362
1363        assert_eq!(page2.items.len(), 5, "Page 2 should have 5 records");
1364        assert!(!page2.has_next, "Should not have next page (last page)");
1365        assert!(page2.has_previous, "Should have previous page");
1366        assert!(
1367            page2.previous_cursor.is_some(),
1368            "Should have previous cursor"
1369        );
1370
1371        let page2_first = page2.items.first().unwrap();
1372
1373        // Page 2 first item should be OLDER than Page 1 last item
1374        assert!(
1375            page2_first.created_at < page1_last.created_at
1376                || (page2_first.created_at == page1_last.created_at
1377                    && page2_first.id < page1_last.id),
1378            "Page 2 should start with records older than Page 1 last item"
1379        );
1380
1381        // Verify we got all 10 records across both pages
1382        let all_ids: Vec<i64> = page1
1383            .items
1384            .iter()
1385            .chain(page2.items.iter())
1386            .map(|r| r.id)
1387            .collect();
1388
1389        assert_eq!(all_ids.len(), 10, "Should have 10 unique records total");
1390
1391        // All IDs should be unique
1392        let unique_ids: std::collections::HashSet<_> = all_ids.iter().collect();
1393        assert_eq!(unique_ids.len(), 10, "All IDs should be unique");
1394
1395        // ===== BACKWARD PAGINATION TEST =====
1396        // Go back from page 2 to page 1
1397        let previous_cursor = page2.previous_cursor.unwrap();
1398
1399        let params = GenAIEvalRecordPaginationRequest {
1400            status: None,
1401            limit: Some(5),
1402            cursor_created_at: Some(previous_cursor.created_at),
1403            cursor_id: Some(previous_cursor.id),
1404            direction: Some("previous".to_string()),
1405            ..Default::default()
1406        };
1407
1408        let page1_again =
1409            PostgresClient::get_paginated_genai_eval_workflow_records(&pool, &params, &entity_id)
1410                .await
1411                .unwrap();
1412
1413        assert_eq!(
1414            page1_again.items.len(),
1415            5,
1416            "Going back should return 5 records"
1417        );
1418
1419        // Should get the same records as page 1
1420        assert_eq!(
1421            page1_again.items.first().unwrap().id,
1422            page1_first.id,
1423            "Should return to the same first record"
1424        );
1425    }
1426
1427    #[tokio::test]
1428    async fn test_postgres_genai_task_result_insert_get() {
1429        let pool = db_pool().await;
1430
1431        let timestamp = Utc::now();
1432
1433        let (uid, entity_id) = PostgresClient::create_entity(
1434            &pool,
1435            SPACE,
1436            NAME,
1437            VERSION,
1438            DriftType::GenAI.to_string(),
1439        )
1440        .await
1441        .unwrap();
1442
1443        let mut records = Vec::new();
1444        for i in 0..2 {
1445            for j in 0..25 {
1446                let record = GenAIEvalTaskResult {
1447                    record_uid: format!("record_uid_{i}_{j}"),
1448                    created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1449                    start_time: Utc::now(),
1450                    end_time: Utc::now() + chrono::Duration::seconds(1),
1451                    entity_id,
1452                    task_id: format!("task{i}"),
1453                    task_type: scouter_types::genai::EvaluationTaskType::Assertion,
1454                    passed: true,
1455                    value: j as f64,
1456                    field_path: Some(format!("field.path.{i}")),
1457                    operator: scouter_types::genai::ComparisonOperator::Contains,
1458                    expected: Value::Null,
1459                    actual: Value::Null,
1460                    message: "All good".to_string(),
1461                    entity_uid: uid.clone(),
1462                    condition: false,
1463                    stage: 0_i32,
1464                };
1465                records.push(record);
1466            }
1467            let result =
1468                PostgresClient::insert_eval_task_results_batch(&pool, &records, &entity_id)
1469                    .await
1470                    .unwrap();
1471            assert_eq!(result.rows_affected(), 25);
1472        }
1473
1474        let metrics = PostgresClient::get_genai_task_values(
1475            &pool,
1476            &timestamp,
1477            &["task1".to_string()],
1478            &entity_id,
1479        )
1480        .await
1481        .unwrap();
1482
1483        assert_eq!(metrics.len(), 1);
1484        let binned_records = PostgresClient::get_binned_genai_task_values(
1485            &pool,
1486            &DriftRequest {
1487                uid: uid.clone(),
1488                time_interval: TimeInterval::OneHour,
1489                max_data_points: 1000,
1490                ..Default::default()
1491            },
1492            &DatabaseSettings::default().retention_period,
1493            &ObjectStorageSettings::default(),
1494            &entity_id,
1495        )
1496        .await
1497        .unwrap();
1498        //
1499        assert_eq!(binned_records.metrics.len(), 2);
1500
1501        let eval_task = PostgresClient::get_genai_eval_task(&pool, &records[0].record_uid)
1502            .await
1503            .unwrap();
1504
1505        assert_eq!(eval_task[0].record_uid, records[0].record_uid);
1506    }
1507
1508    #[tokio::test]
1509    async fn test_postgres_genai_workflow_result_insert_get() {
1510        let pool = db_pool().await;
1511
1512        let timestamp = Utc::now();
1513
1514        let (uid, entity_id) = PostgresClient::create_entity(
1515            &pool,
1516            SPACE,
1517            NAME,
1518            VERSION,
1519            DriftType::GenAI.to_string(),
1520        )
1521        .await
1522        .unwrap();
1523
1524        for i in 0..2 {
1525            for j in 0..25 {
1526                let record = GenAIEvalWorkflowResult {
1527                    record_uid: format!("record_uid_{i}_{j}"),
1528                    created_at: Utc::now() + chrono::Duration::hours(i),
1529                    entity_id,
1530                    total_tasks: 10,
1531                    passed_tasks: 8,
1532                    failed_tasks: 2,
1533                    pass_rate: 0.8,
1534                    duration_ms: 1500,
1535                    entity_uid: uid.clone(),
1536                    execution_plan: ExecutionPlan::default(),
1537                    id: 0,
1538                };
1539                let result =
1540                    PostgresClient::insert_genai_eval_workflow_record(&pool, &record, &entity_id)
1541                        .await
1542                        .unwrap();
1543                assert_eq!(result.rows_affected(), 1);
1544            }
1545        }
1546
1547        let metric = PostgresClient::get_genai_workflow_value(&pool, &timestamp, &entity_id)
1548            .await
1549            .unwrap();
1550
1551        assert!(metric.is_some());
1552
1553        let binned_records = PostgresClient::get_binned_genai_workflow_values(
1554            &pool,
1555            &DriftRequest {
1556                uid: uid.clone(),
1557                time_interval: TimeInterval::OneHour,
1558                max_data_points: 1000,
1559                ..Default::default()
1560            },
1561            &DatabaseSettings::default().retention_period,
1562            &ObjectStorageSettings::default(),
1563            &entity_id,
1564        )
1565        .await
1566        .unwrap();
1567        //
1568        assert_eq!(binned_records.metrics.len(), 1);
1569    }
1570
1571    #[tokio::test]
1572    async fn test_postgres_tracing_metrics() {
1573        let pool = db_pool().await;
1574        let script = std::fs::read_to_string("src/tests/script/populate_trace.sql").unwrap();
1575        sqlx::query(&script).execute(&pool).await.unwrap();
1576        let mut filters = TraceFilters::default();
1577
1578        let first_batch = PostgresClient::get_paginated_traces(&pool, filters.clone())
1579            .await
1580            .unwrap();
1581
1582        assert_eq!(
1583            first_batch.items.len(),
1584            50,
1585            "First batch should have 50 records"
1586        );
1587
1588        // test pagination (get last record created_at and trace_id)
1589        let last_record = first_batch.next_cursor.unwrap();
1590        filters = filters.next_page(&last_record);
1591
1592        let next_batch = PostgresClient::get_paginated_traces(&pool, filters.clone())
1593            .await
1594            .unwrap();
1595
1596        // should be another 50 records
1597        assert_eq!(
1598            next_batch.items.len(),
1599            50,
1600            "Next batch should have 50 records"
1601        );
1602
1603        // assert next_batch first record timestamp is <= first_batch last record timestamp
1604        let next_first_record = next_batch.items.first().unwrap();
1605        assert!(
1606            next_first_record.start_time <= last_record.start_time,
1607            "Next batch first record timestamp is not less than or equal to last record timestamp"
1608        );
1609
1610        // test pagination for previous
1611        filters = filters.previous_page(&next_batch.previous_cursor.unwrap());
1612        let previous_batch = PostgresClient::get_paginated_traces(&pool, filters.clone())
1613            .await
1614            .unwrap();
1615        assert_eq!(
1616            previous_batch.items.len(),
1617            50,
1618            "Previous batch should have 50 records"
1619        );
1620
1621        // Filter records to find item with >5 spans
1622        let filtered_record = first_batch
1623            .items
1624            .iter()
1625            .find(|record| record.span_count > 5)
1626            .unwrap();
1627
1628        filters.cursor_start_time = None;
1629        filters.cursor_trace_id = None;
1630
1631        let records = PostgresClient::get_paginated_traces(&pool, filters.clone())
1632            .await
1633            .unwrap();
1634
1635        // Records are randomly generated, so just assert we get some records back
1636        assert!(
1637            !records.items.is_empty(),
1638            "Should return records with specified filters"
1639        );
1640
1641        // get spans for filtered trace
1642        let spans = PostgresClient::get_trace_spans(
1643            &pool,
1644            &filtered_record.trace_id,
1645            Some(&filtered_record.service_name),
1646        )
1647        .await
1648        .unwrap();
1649
1650        assert!(spans.len() == filtered_record.span_count as usize);
1651
1652        let start_time = filtered_record.start_time - chrono::Duration::hours(48);
1653        let end_time = filtered_record.start_time + chrono::Duration::minutes(5);
1654
1655        // make request for trace metrics
1656        let trace_metrics =
1657            PostgresClient::get_trace_metrics(&pool, None, start_time, end_time, "5 minutes", None)
1658                .await
1659                .unwrap();
1660
1661        // assert we have data points
1662        assert!(trace_metrics.len() >= 10);
1663
1664        // get paginated traces with tags
1665        let filters = scouter_types::sql::TraceFilters {
1666            attribute_filters: Some(vec![("component=kafka".to_string())]),
1667            ..Default::default()
1668        };
1669        let tagged_batch = PostgresClient::get_paginated_traces(&pool, filters)
1670            .await
1671            .unwrap();
1672        assert!(!tagged_batch.items.is_empty());
1673    }
1674
1675    #[tokio::test]
1676    async fn test_postgres_tracing_insert() {
1677        let pool = db_pool().await;
1678
1679        // create parent trace
1680        let mut trace_record = random_trace_record();
1681        let trace_id = trace_record.trace_id.clone();
1682
1683        // create spans
1684        let root_span = random_span_record(&trace_id, None, &trace_record.service_name);
1685        let child_span =
1686            random_span_record(&trace_id, Some(&root_span.span_id), &root_span.service_name);
1687
1688        // set root span id in trace record
1689        trace_record.root_span_id = root_span.span_id.clone();
1690
1691        // insert spans
1692        let result =
1693            PostgresClient::insert_span_batch(&pool, &[root_span.clone(), child_span.clone()])
1694                .await
1695                .unwrap();
1696
1697        assert_eq!(result.rows_affected(), 2);
1698
1699        let inserted_created_at = trace_record.created_at;
1700        let inserted_trace_id = trace_record.trace_id.clone();
1701
1702        let trace_filter = TraceFilters {
1703            cursor_start_time: Some(inserted_created_at + Duration::days(1)),
1704            cursor_trace_id: Some(inserted_trace_id),
1705            start_time: Some(inserted_created_at - Duration::minutes(5)),
1706            end_time: Some(inserted_created_at + Duration::days(1)),
1707            ..TraceFilters::default()
1708        };
1709
1710        let traces = PostgresClient::get_paginated_traces(&pool, trace_filter)
1711            .await
1712            .unwrap();
1713
1714        assert_eq!(traces.items.len(), 1);
1715        let retrieved_trace = &traces.items[0];
1716        // assert span count is 2
1717        assert_eq!(retrieved_trace.span_count, 2);
1718
1719        let baggage = TraceBaggageRecord {
1720            created_at: Utc::now(),
1721            trace_id: trace_record.trace_id.clone(),
1722            scope: "test_scope".to_string(),
1723            key: "user_id".to_string(),
1724            value: "12345".to_string(),
1725        };
1726
1727        let result =
1728            PostgresClient::insert_trace_baggage_batch(&pool, std::slice::from_ref(&baggage))
1729                .await
1730                .unwrap();
1731
1732        assert_eq!(result.rows_affected(), 1);
1733
1734        let retrieved_baggage =
1735            PostgresClient::get_trace_baggage_records(&pool, &trace_record.trace_id)
1736                .await
1737                .unwrap();
1738
1739        assert_eq!(retrieved_baggage.len(), 1);
1740    }
1741
1742    #[tokio::test]
1743    async fn test_postgres_tags() {
1744        let pool = db_pool().await;
1745        let uid = create_uuid7();
1746
1747        let tag1 = TagRecord {
1748            entity_id: uid.clone(),
1749            entity_type: "service".to_string(),
1750            key: "env".to_string(),
1751            value: "production".to_string(),
1752        };
1753
1754        let tag2 = TagRecord {
1755            entity_id: uid.clone(),
1756            entity_type: "service".to_string(),
1757            key: "team".to_string(),
1758            value: "backend".to_string(),
1759        };
1760
1761        let result = PostgresClient::insert_tag_batch(&pool, &[tag1.clone(), tag2.clone()])
1762            .await
1763            .unwrap();
1764
1765        assert_eq!(result.rows_affected(), 2);
1766
1767        let tags = PostgresClient::get_tags(&pool, "service", &uid)
1768            .await
1769            .unwrap();
1770
1771        assert_eq!(tags.len(), 2);
1772
1773        let tag_filter = vec![Tag {
1774            key: tags.first().unwrap().key.clone(),
1775            value: tags.first().unwrap().value.clone(),
1776        }];
1777
1778        let entity_id = PostgresClient::get_entity_id_by_tags(&pool, "service", &tag_filter, false)
1779            .await
1780            .unwrap();
1781
1782        assert_eq!(entity_id.first().unwrap(), &uid);
1783    }
1784}