Skip to main content

scouter_sql/sql/
postgres.rs

1use crate::sql::aggregator::init_trace_cache;
2use crate::sql::cache::entity_cache;
3use crate::sql::cache::init_entity_cache;
4use crate::sql::error::SqlError;
5use crate::sql::traits::{
6    AlertSqlLogic, ArchiveSqlLogic, CustomMetricSqlLogic, GenAIDriftSqlLogic,
7    ObservabilitySqlLogic, ProfileSqlLogic, PsiSqlLogic, SpcSqlLogic, TagSqlLogic, TraceSqlLogic,
8    UserSqlLogic,
9};
10use scouter_settings::DatabaseSettings;
11use scouter_types::{RecordType, ServerRecords, TagRecord, ToDriftRecords, TraceServerRecord};
12use sqlx::ConnectOptions;
13use sqlx::{postgres::PgConnectOptions, Pool, Postgres};
14use std::result::Result::Ok;
15use std::time::Duration;
16use tokio::try_join;
17use tracing::log::LevelFilter;
18use tracing::{debug, error, info, instrument};
19const DEFAULT_BATCH_SIZE: usize = 500;
20
21#[derive(Debug, Clone)]
22#[allow(dead_code)]
23pub struct PostgresClient {}
24
25impl SpcSqlLogic for PostgresClient {}
26impl CustomMetricSqlLogic for PostgresClient {}
27impl PsiSqlLogic for PostgresClient {}
28impl GenAIDriftSqlLogic for PostgresClient {}
29impl UserSqlLogic for PostgresClient {}
30impl ProfileSqlLogic for PostgresClient {}
31impl ObservabilitySqlLogic for PostgresClient {}
32impl AlertSqlLogic for PostgresClient {}
33impl ArchiveSqlLogic for PostgresClient {}
34impl TraceSqlLogic for PostgresClient {}
35impl TagSqlLogic for PostgresClient {}
36
37impl PostgresClient {
38    /// Setup the application with the given database pool.
39    ///
40    /// # Returns
41    ///
42    /// * `Result<Pool<Postgres>, anyhow::Error>` - Result of the database pool
43    #[instrument(skip(database_settings))]
44    pub async fn create_db_pool(
45        database_settings: &DatabaseSettings,
46    ) -> Result<Pool<Postgres>, SqlError> {
47        let mut opts: PgConnectOptions = database_settings.connection_uri.parse()?;
48
49        // Sqlx logs a lot of debug information by default, which can be overwhelming.
50        opts = opts.log_statements(LevelFilter::Off);
51
52        let pool = match sqlx::postgres::PgPoolOptions::new()
53            .max_connections(database_settings.max_connections)
54            .min_connections(database_settings.min_connections)
55            .acquire_timeout(Duration::from_secs(
56                database_settings.db_acquire_timeout_seconds,
57            ))
58            .idle_timeout(Duration::from_secs(
59                database_settings.db_idle_timeout_seconds,
60            ))
61            .max_lifetime(Duration::from_secs(
62                database_settings.db_max_lifetime_seconds,
63            ))
64            .test_before_acquire(database_settings.db_test_before_acquire)
65            .connect_with(opts)
66            .await
67        {
68            Ok(pool) => {
69                info!("✅ Successfully connected to database");
70                pool
71            }
72            Err(err) => {
73                error!("🚨 Failed to connect to database {:?}", err);
74                std::process::exit(1);
75            }
76        };
77
78        // setup entity cache
79        init_entity_cache(database_settings.entity_cache_size);
80
81        // setup trace cache
82        init_trace_cache(
83            pool.clone(),
84            database_settings.flush_interval,
85            database_settings.stale_threshold,
86            database_settings.max_cache_size,
87        )
88        .await?;
89
90        // Run migrations
91        if let Err(err) = Self::run_migrations(&pool).await {
92            error!("🚨 Failed to run migrations {:?}", err);
93            std::process::exit(1);
94        }
95
96        Ok(pool)
97    }
98
99    pub async fn run_migrations(pool: &Pool<Postgres>) -> Result<(), SqlError> {
100        info!("Running migrations");
101        sqlx::migrate!("src/migrations")
102            .run(pool)
103            .await
104            .map_err(SqlError::MigrateError)?;
105
106        debug!("Migrations complete");
107
108        Ok(())
109    }
110}
111
112pub struct MessageHandler {}
113
114impl MessageHandler {
115    #[instrument(skip_all)]
116    pub async fn insert_server_records(
117        pool: &Pool<Postgres>,
118        records: ServerRecords,
119    ) -> Result<(), SqlError> {
120        debug!("Inserting server records: {:?}", records.record_type()?);
121
122        let entity_id = entity_cache()
123            .get_entity_id_from_uid(pool, records.uid()?)
124            .await
125            .inspect_err(|e| error!("Failed to get entity ID from UID: {:?}", e))?;
126
127        match records.record_type()? {
128            RecordType::Spc => {
129                let spc_records = records.to_spc_drift_records()?;
130                debug!("SPC record count: {}", spc_records.len());
131
132                for chunk in spc_records.chunks(DEFAULT_BATCH_SIZE) {
133                    PostgresClient::insert_spc_drift_records_batch(pool, chunk, &entity_id)
134                        .await
135                        .map_err(|e| {
136                            error!("Failed to insert SPC drift records batch: {:?}", e);
137                            e
138                        })?;
139                }
140            }
141
142            RecordType::Psi => {
143                let psi_records = records.to_psi_drift_records()?;
144                debug!("PSI record count: {}", psi_records.len());
145
146                for chunk in psi_records.chunks(DEFAULT_BATCH_SIZE) {
147                    PostgresClient::insert_bin_counts_batch(pool, chunk, &entity_id)
148                        .await
149                        .map_err(|e| {
150                            error!("Failed to insert PSI drift records batch: {:?}", e);
151                            e
152                        })?;
153                }
154            }
155            RecordType::Custom => {
156                let custom_records = records.to_custom_metric_drift_records()?;
157                debug!("Custom record count: {}", custom_records.len());
158
159                for chunk in custom_records.chunks(DEFAULT_BATCH_SIZE) {
160                    PostgresClient::insert_custom_metric_values_batch(pool, chunk, &entity_id)
161                        .await
162                        .map_err(|e| {
163                            error!("Failed to insert custom metric records batch: {:?}", e);
164                            e
165                        })?;
166                }
167            }
168
169            RecordType::GenAIEval => {
170                debug!("LLM Drift record count: {:?}", records.len());
171                let records = records.to_genai_eval_records()?;
172                for record in records {
173                    let _ = PostgresClient::insert_genai_eval_record(pool, record, &entity_id)
174                        .await
175                        .map_err(|e| {
176                            error!("Failed to insert GenAI drift record: {:?}", e);
177                        });
178                }
179            }
180
181            RecordType::GenAITask => {
182                debug!("GenAI Task count: {:?}", records.len());
183                let records = records.to_genai_task_records()?;
184                for chunk in records.chunks(DEFAULT_BATCH_SIZE) {
185                    PostgresClient::insert_eval_task_results_batch(pool, chunk, &entity_id)
186                        .await
187                        .map_err(|e| {
188                            error!("Failed to insert GenAI task records batch: {:?}", e);
189                            e
190                        })?;
191                }
192            }
193
194            RecordType::GenAIWorkflow => {
195                debug!("GenAI Workflow count: {:?}", records.len());
196                let records = records.to_genai_workflow_records()?;
197                for record in records {
198                    let _ = PostgresClient::insert_genai_eval_workflow_record(
199                        pool, &record, &entity_id,
200                    )
201                    .await
202                    .map_err(|e| {
203                        error!("Failed to insert GenAI workflow record: {:?}", e);
204                    });
205                }
206            }
207
208            _ => {
209                error!(
210                    "Unsupported record type for batch insert: {:?}",
211                    records.record_type()?
212                );
213                return Err(SqlError::UnsupportedBatchTypeError);
214            }
215        }
216
217        Ok(())
218    }
219
220    pub async fn insert_trace_server_record(
221        pool: &Pool<Postgres>,
222        records: TraceServerRecord,
223    ) -> Result<(), SqlError> {
224        let (span_batch, baggage_batch, tag_records) = records.to_records()?;
225        let span_count = span_batch.len();
226
227        // ── Update TraceCache for summary aggregation ─────────────────────────
228        // TraceCache accumulates span stats in memory; flush target is TraceSummaryService.
229        let trace_cache = crate::sql::aggregator::get_trace_cache().await;
230        for span in &span_batch {
231            trace_cache.update_trace(span).await;
232        }
233
234        // ── Write spans to Delta Lake (primary trace store) ──────────────────
235        if let Some(trace_service) =
236            scouter_dataframe::parquet::tracing::service::get_trace_span_service()
237        {
238            if let Err(e) = trace_service.write_spans(span_batch).await {
239                error!("Failed to write spans to Delta Lake: {:?}", e);
240            }
241        } else {
242            error!("TraceSpanService not initialized — spans will be lost");
243        }
244
245        // ── Baggage and tags remain in Postgres ───────────────────────────────
246        let (baggage_result, tag_result) = try_join!(
247            PostgresClient::insert_trace_baggage_batch(pool, &baggage_batch),
248            PostgresClient::insert_tag_batch(pool, &tag_records),
249        )?;
250
251        debug!(
252            span_count,
253            baggage_rows = baggage_result.rows_affected(),
254            total_baggage = baggage_batch.len(),
255            tag_rows = tag_result.rows_affected(),
256            "Successfully processed trace server records"
257        );
258        Ok(())
259    }
260
261    pub async fn insert_tag_record(
262        pool: &Pool<Postgres>,
263        record: TagRecord,
264    ) -> Result<(), SqlError> {
265        let result = PostgresClient::insert_tag_batch(pool, std::slice::from_ref(&record)).await?;
266
267        debug!(
268            rows_affected = result.rows_affected(),
269            entity_type = record.entity_type.as_str(),
270            entity_id = record.entity_id.as_str(),
271            key = record.key.as_str(),
272            "Successfully inserted tag record"
273        );
274
275        Ok(())
276    }
277}
278
279/// Runs database integratino tests
280/// Note - binned queries targeting custom intervals with long-term and short-term data are
281/// done in the scouter-server integration tests
282#[cfg(test)]
283mod tests {
284
285    use super::*;
286    use crate::sql::schema::User;
287    use crate::sql::traits::EntitySqlLogic;
288    use chrono::{Duration, Utc};
289    use potato_head::create_uuid7;
290    use rand::Rng;
291    use scouter_semver::VersionType;
292    use scouter_settings::ObjectStorageSettings;
293    use scouter_types::genai::ExecutionPlan;
294    use scouter_types::psi::{Bin, BinType, PsiDriftConfig, PsiFeatureDriftProfile};
295    use scouter_types::spc::SpcDriftProfile;
296    use scouter_types::*;
297    use serde_json::Value;
298
299    const SPACE: &str = "space";
300    const NAME: &str = "name";
301    const VERSION: &str = "1.0.0";
302    const ENTITY_ID: i32 = 9999;
303
304    pub async fn cleanup(pool: &Pool<Postgres>) {
305        sqlx::raw_sql(
306            r#"
307
308            DELETE
309            FROM scouter.drift_entities;
310
311            DELETE
312            FROM scouter.spc_drift;
313
314            DELETE
315            FROM scouter.observability_metric;
316
317            DELETE
318            FROM scouter.custom_drift;
319
320            DELETE
321            FROM scouter.drift_alert;
322
323            DELETE
324            FROM scouter.drift_profile;
325
326            DELETE
327            FROM scouter.psi_drift;
328
329            DELETE
330            FROM scouter.user;
331
332            DELETE
333            FROM scouter.genai_eval_record;
334
335            DELETE
336            FROM scouter.genai_eval_task;
337
338            DELETE
339            FROM scouter.genai_eval_workflow;
340
341            DELETE
342            FROM scouter.spans;
343
344            DELETE
345            FROM scouter.traces;
346
347            DELETE
348            FROM scouter.trace_entities;
349
350            DELETE
351            FROM scouter.trace_baggage;
352
353            DELETE
354            FROM scouter.tags;
355            "#,
356        )
357        .fetch_all(pool)
358        .await
359        .unwrap();
360    }
361
362    pub async fn db_pool() -> Pool<Postgres> {
363        let pool = PostgresClient::create_db_pool(&DatabaseSettings::default())
364            .await
365            .unwrap();
366        cleanup(&pool).await;
367        pool
368    }
369
370    pub async fn insert_profile_to_db(
371        pool: &Pool<Postgres>,
372        profile: &DriftProfile,
373        active: bool,
374        deactivate_others: bool,
375    ) -> String {
376        let base_args = profile.get_base_args();
377        let version_request = VersionRequest {
378            version: None,
379            version_type: VersionType::Minor,
380            pre_tag: None,
381            build_tag: None,
382        };
383        let version = PostgresClient::get_next_profile_version(pool, &base_args, version_request)
384            .await
385            .unwrap();
386
387        let result = PostgresClient::insert_drift_profile(
388            pool,
389            profile,
390            &base_args,
391            &version,
392            &active,
393            &deactivate_others,
394        )
395        .await
396        .unwrap();
397
398        result
399    }
400
401    #[tokio::test]
402    async fn test_postgres_start() {
403        let _pool = db_pool().await;
404    }
405
406    #[tokio::test]
407    async fn test_postgres_drift_alert() {
408        let pool = db_pool().await;
409        let entity_id = 9999;
410
411        // Insert 10 alerts with slight delays to ensure different timestamps
412        for i in 0..10 {
413            let alert = AlertMap::Custom(custom::ComparisonMetricAlert {
414                metric_name: "test".to_string(),
415                baseline_value: 0 as f64,
416                observed_value: i as f64,
417                delta: None,
418                alert_threshold: AlertThreshold::Above,
419            });
420
421            let result = PostgresClient::insert_drift_alert(&pool, &entity_id, &alert)
422                .await
423                .unwrap();
424
425            assert_eq!(result.rows_affected(), 1);
426
427            // Small delay to ensure timestamp ordering
428            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
429        }
430
431        // Test 1: Get first page with default limit (50)
432        let request = DriftAlertPaginationRequest {
433            uid: create_uuid7(),
434            active: Some(true),
435            limit: Some(50),
436            ..Default::default()
437        };
438
439        let response = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
440            .await
441            .unwrap();
442
443        assert_eq!(response.items.len(), 10);
444        assert!(!response.has_next); // No more items
445        assert!(!response.has_previous); // First page
446        assert!(response.next_cursor.is_none());
447        assert!(response.previous_cursor.is_some());
448
449        // Test 2: Paginate with limit of 3 - forward direction
450        let request = DriftAlertPaginationRequest {
451            uid: create_uuid7(),
452            active: Some(true),
453            limit: Some(3),
454            direction: Some("next".to_string()),
455            ..Default::default()
456        };
457
458        let page1 = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
459            .await
460            .unwrap();
461
462        assert_eq!(page1.items.len(), 3);
463        assert!(page1.has_next);
464        assert!(!page1.has_previous);
465        assert!(page1.next_cursor.is_some());
466        assert!(page1.previous_cursor.is_some());
467
468        // Test 3: Get second page using next cursor
469        let next_cursor = page1.next_cursor.unwrap();
470        let request = DriftAlertPaginationRequest {
471            uid: create_uuid7(),
472            active: Some(true),
473            limit: Some(3),
474            cursor_created_at: Some(next_cursor.created_at),
475            cursor_id: Some(next_cursor.id as i32),
476            direction: Some("next".to_string()),
477            start_datetime: None,
478            end_datetime: None,
479        };
480
481        let page2 = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
482            .await
483            .unwrap();
484
485        assert_eq!(page2.items.len(), 3);
486        assert!(page2.has_next); // More items available
487        assert!(page2.has_previous); // Can go back
488        assert!(page2.next_cursor.is_some());
489        assert!(page2.previous_cursor.is_some());
490
491        // Verify no overlap between pages
492        let page1_ids: std::collections::HashSet<_> = page1.items.iter().map(|a| a.id).collect();
493        let page2_ids: std::collections::HashSet<_> = page2.items.iter().map(|a| a.id).collect();
494        assert!(page1_ids.is_disjoint(&page2_ids));
495
496        // Test 4: Navigate backward using previous cursor
497        let prev_cursor = page2.previous_cursor.unwrap();
498        let request = DriftAlertPaginationRequest {
499            uid: create_uuid7(),
500            active: Some(true),
501            limit: Some(3),
502            cursor_created_at: Some(prev_cursor.created_at),
503            cursor_id: Some(prev_cursor.id as i32),
504            direction: Some("previous".to_string()),
505            start_datetime: None,
506            end_datetime: None,
507        };
508
509        let page_back = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
510            .await
511            .unwrap();
512
513        assert_eq!(page_back.items.len(), 3);
514        assert!(page_back.has_next); // Can go forward
515                                     // Should return to page1 items
516        assert_eq!(
517            page_back.items.iter().map(|a| a.id).collect::<Vec<_>>(),
518            page1.items.iter().map(|a| a.id).collect::<Vec<_>>()
519        );
520
521        // Test 5: Filter by active status
522        // Deactivate some alerts first
523        let to_deactivate = &page1.items[0];
524        let update_request = UpdateAlertStatus {
525            id: to_deactivate.id,
526            active: false,
527            space: "test_space".to_string(),
528        };
529
530        PostgresClient::update_drift_alert_status(&pool, &update_request)
531            .await
532            .unwrap();
533
534        // Query only active alerts
535        let request = DriftAlertPaginationRequest {
536            uid: create_uuid7(),
537            active: Some(true),
538            limit: Some(50),
539            ..Default::default()
540        };
541
542        let active_alerts = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
543            .await
544            .unwrap();
545
546        assert_eq!(active_alerts.items.len(), 9); // 10 - 1 deactivated
547        assert!(active_alerts.items.iter().all(|a| a.active));
548
549        // Query only inactive alerts
550        let request = DriftAlertPaginationRequest {
551            uid: create_uuid7(),
552            active: Some(false),
553            limit: Some(50),
554            ..Default::default()
555        };
556
557        let inactive_alerts =
558            PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
559                .await
560                .unwrap();
561
562        assert_eq!(inactive_alerts.items.len(), 1);
563        assert!(!inactive_alerts.items[0].active);
564
565        // Test 6: Query all alerts (active and inactive)
566        let request = DriftAlertPaginationRequest {
567            uid: create_uuid7(),
568            active: None, // No filter
569            limit: Some(50),
570            ..Default::default()
571        };
572
573        let all_alerts = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
574            .await
575            .unwrap();
576
577        assert_eq!(all_alerts.items.len(), 10);
578
579        // Test 7: Empty result set
580        let request = DriftAlertPaginationRequest {
581            uid: create_uuid7(),
582            active: Some(true),
583            limit: Some(3),
584            ..Default::default()
585        };
586
587        let non_existent = PostgresClient::get_paginated_drift_alerts(&pool, &request, &999999)
588            .await
589            .unwrap();
590
591        assert_eq!(non_existent.items.len(), 0);
592        assert!(!non_existent.has_next);
593        assert!(!non_existent.has_previous);
594        assert!(non_existent.next_cursor.is_none());
595        assert!(non_existent.previous_cursor.is_none());
596    }
597
598    #[tokio::test]
599    async fn test_postgres_spc_drift_record() {
600        let pool = db_pool().await;
601
602        let record1 = SpcRecord {
603            created_at: Utc::now(),
604            uid: create_uuid7(),
605            feature: "test".to_string(),
606            value: 1.0,
607            entity_id: 0,
608        };
609
610        let record2 = SpcRecord {
611            created_at: Utc::now(),
612            uid: create_uuid7(),
613            feature: "test2".to_string(),
614            value: 2.0,
615            entity_id: 0,
616        };
617
618        let result =
619            PostgresClient::insert_spc_drift_records_batch(&pool, &[record1, record2], &ENTITY_ID)
620                .await
621                .unwrap();
622
623        assert_eq!(result.rows_affected(), 2);
624    }
625
626    #[tokio::test]
627    async fn test_postgres_bin_count() {
628        let pool = db_pool().await;
629
630        let record1 = PsiRecord {
631            created_at: Utc::now(),
632            uid: create_uuid7(),
633            feature: "test".to_string(),
634            bin_id: 1,
635            bin_count: 1,
636            entity_id: ENTITY_ID,
637        };
638
639        let record2 = PsiRecord {
640            created_at: Utc::now(),
641            uid: create_uuid7(),
642            feature: "test2".to_string(),
643            bin_id: 2,
644            bin_count: 2,
645            entity_id: ENTITY_ID,
646        };
647
648        let result =
649            PostgresClient::insert_bin_counts_batch(&pool, &[record1, record2], &ENTITY_ID)
650                .await
651                .unwrap();
652
653        assert_eq!(result.rows_affected(), 2);
654    }
655
656    #[tokio::test]
657    async fn test_postgres_observability_record() {
658        let pool = db_pool().await;
659
660        let record = ObservabilityMetrics::default();
661
662        let result = PostgresClient::insert_observability_record(&pool, &record, &ENTITY_ID)
663            .await
664            .unwrap();
665
666        assert_eq!(result.rows_affected(), 1);
667    }
668
669    #[tokio::test]
670    async fn test_postgres_crud_drift_profile() {
671        let pool = db_pool().await;
672
673        let mut spc_profile = SpcDriftProfile::default();
674        let profile = DriftProfile::Spc(spc_profile.clone());
675        let uid = insert_profile_to_db(&pool, &profile, false, false).await;
676        assert!(!uid.is_empty());
677
678        let entity_id = PostgresClient::get_entity_id_from_uid(&pool, &uid)
679            .await
680            .unwrap();
681
682        spc_profile.scouter_version = "test".to_string();
683
684        let result = PostgresClient::update_drift_profile(
685            &pool,
686            &DriftProfile::Spc(spc_profile.clone()),
687            &entity_id,
688        )
689        .await
690        .unwrap();
691
692        assert_eq!(result.rows_affected(), 1);
693
694        let profile = PostgresClient::get_drift_profile(&pool, &entity_id)
695            .await
696            .unwrap();
697
698        let deserialized = serde_json::from_value::<SpcDriftProfile>(profile.unwrap()).unwrap();
699
700        assert_eq!(deserialized, spc_profile);
701
702        PostgresClient::update_drift_profile_status(
703            &pool,
704            &ProfileStatusRequest {
705                name: spc_profile.config.name.clone(),
706                space: spc_profile.config.space.clone(),
707                version: spc_profile.config.version.clone(),
708                active: false,
709                drift_type: Some(DriftType::Spc),
710                deactivate_others: false,
711            },
712        )
713        .await
714        .unwrap();
715    }
716
717    #[tokio::test]
718    async fn test_postgres_get_features() {
719        let pool = db_pool().await;
720
721        let timestamp = Utc::now();
722
723        for _ in 0..10 {
724            let mut records = Vec::new();
725            for j in 0..10 {
726                let record = SpcRecord {
727                    created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
728                    uid: create_uuid7(),
729                    feature: format!("test{j}"),
730                    value: j as f64,
731                    entity_id: ENTITY_ID,
732                };
733
734                records.push(record);
735            }
736
737            let result =
738                PostgresClient::insert_spc_drift_records_batch(&pool, &records, &ENTITY_ID)
739                    .await
740                    .unwrap();
741            assert_eq!(result.rows_affected(), records.len() as u64);
742        }
743
744        let features = PostgresClient::get_spc_features(&pool, &ENTITY_ID)
745            .await
746            .unwrap();
747        assert_eq!(features.len(), 10);
748
749        let records =
750            PostgresClient::get_spc_drift_records(&pool, &timestamp, &features, &ENTITY_ID)
751                .await
752                .unwrap();
753
754        assert_eq!(records.features.len(), 10);
755
756        let binned_records = PostgresClient::get_binned_spc_drift_records(
757            &pool,
758            &DriftRequest {
759                uid: create_uuid7(),
760                time_interval: TimeInterval::FifteenMinutes,
761                max_data_points: 10,
762                ..Default::default()
763            },
764            &DatabaseSettings::default().retention_period,
765            &ObjectStorageSettings::default(),
766            &ENTITY_ID,
767        )
768        .await
769        .unwrap();
770
771        assert_eq!(binned_records.features.len(), 10);
772    }
773
774    #[tokio::test]
775    async fn test_postgres_bin_proportions() {
776        let pool = db_pool().await;
777
778        let timestamp = Utc::now();
779
780        let num_features = 3;
781        let num_bins = 5;
782
783        let features = (0..=num_features)
784            .map(|feature| {
785                let bins = (0..=num_bins)
786                    .map(|bind_id| Bin {
787                        id: bind_id,
788                        lower_limit: None,
789                        upper_limit: None,
790                        proportion: 0.0,
791                    })
792                    .collect();
793                let feature_name = format!("feature{feature}");
794                let feature_profile = PsiFeatureDriftProfile {
795                    id: feature_name.clone(),
796                    bins,
797                    timestamp,
798                    bin_type: BinType::Numeric,
799                };
800                (feature_name, feature_profile)
801            })
802            .collect();
803
804        let profile = &DriftProfile::Psi(psi::PsiDriftProfile::new(
805            features,
806            PsiDriftConfig {
807                space: SPACE.to_string(),
808                name: NAME.to_string(),
809                version: VERSION.to_string(),
810                ..Default::default()
811            },
812        ));
813        let uid = insert_profile_to_db(&pool, profile, false, false).await;
814        let entity_id = PostgresClient::get_entity_id_from_uid(&pool, &uid)
815            .await
816            .unwrap();
817
818        for feature in 0..num_features {
819            for bin in 0..=num_bins {
820                let mut records = Vec::new();
821                for j in 0..=100 {
822                    let record = PsiRecord {
823                        created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
824                        uid: create_uuid7(),
825                        feature: format!("feature{feature}"),
826                        bin_id: bin,
827                        bin_count: rand::rng().random_range(0..10) as i32,
828                        entity_id: ENTITY_ID,
829                    };
830
831                    records.push(record);
832                }
833                PostgresClient::insert_bin_counts_batch(&pool, &records, &entity_id)
834                    .await
835                    .unwrap();
836            }
837        }
838
839        let binned_records = PostgresClient::get_feature_distributions(
840            &pool,
841            &timestamp,
842            &["feature0".to_string()],
843            &entity_id,
844        )
845        .await
846        .unwrap();
847
848        // assert binned_records.features["test"]["decile_1"] is around .5
849        let bin_proportion = binned_records
850            .distributions
851            .get("feature0")
852            .unwrap()
853            .bins
854            .get(&1)
855            .unwrap();
856
857        assert!(*bin_proportion > 0.1 && *bin_proportion < 0.2);
858
859        let binned_records = PostgresClient::get_binned_psi_drift_records(
860            &pool,
861            &DriftRequest {
862                uid: create_uuid7(),
863                time_interval: TimeInterval::OneHour,
864                max_data_points: 1000,
865                ..Default::default()
866            },
867            &DatabaseSettings::default().retention_period,
868            &ObjectStorageSettings::default(),
869            &entity_id,
870        )
871        .await
872        .unwrap();
873        //
874        assert_eq!(binned_records.len(), 3);
875    }
876
877    #[tokio::test]
878    async fn test_postgres_cru_custom_metric() {
879        let pool = db_pool().await;
880        let timestamp = Utc::now();
881
882        let (uid, entity_id) = PostgresClient::create_entity(
883            &pool,
884            SPACE,
885            NAME,
886            VERSION,
887            DriftType::Custom.to_string(),
888        )
889        .await
890        .unwrap();
891
892        for i in 0..2 {
893            let mut records = Vec::new();
894            for j in 0..25 {
895                let record = CustomMetricRecord {
896                    created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
897                    uid: uid.clone(),
898                    metric: format!("metric{i}"),
899                    value: rand::rng().random_range(0..10) as f64,
900                    entity_id: ENTITY_ID,
901                };
902                records.push(record);
903            }
904            let result =
905                PostgresClient::insert_custom_metric_values_batch(&pool, &records, &entity_id)
906                    .await
907                    .unwrap();
908            assert_eq!(result.rows_affected(), 25);
909        }
910
911        // insert random record to test has statistics funcs handle single record
912        let record = CustomMetricRecord {
913            created_at: Utc::now(),
914            uid: uid.clone(),
915            metric: "metric3".to_string(),
916            value: rand::rng().random_range(0..10) as f64,
917            entity_id: ENTITY_ID,
918        };
919
920        let result =
921            PostgresClient::insert_custom_metric_values_batch(&pool, &[record], &entity_id)
922                .await
923                .unwrap();
924        assert_eq!(result.rows_affected(), 1);
925
926        let metrics = PostgresClient::get_custom_metric_values(
927            &pool,
928            &timestamp,
929            &["metric1".to_string()],
930            &entity_id,
931        )
932        .await
933        .unwrap();
934
935        assert_eq!(metrics.len(), 1);
936
937        let binned_records = PostgresClient::get_binned_custom_drift_records(
938            &pool,
939            &DriftRequest {
940                uid: uid.clone(),
941                time_interval: TimeInterval::OneHour,
942                max_data_points: 1000,
943                ..Default::default()
944            },
945            &DatabaseSettings::default().retention_period,
946            &ObjectStorageSettings::default(),
947            &entity_id,
948        )
949        .await
950        .unwrap();
951        //
952        assert_eq!(binned_records.metrics.len(), 3);
953    }
954
955    #[tokio::test]
956    async fn test_postgres_user() {
957        let pool = db_pool().await;
958        let recovery_codes = vec!["recovery_code_1".to_string(), "recovery_code_2".to_string()];
959
960        // Create
961        let user = User::new(
962            "user".to_string(),
963            "pass".to_string(),
964            "email".to_string(),
965            recovery_codes,
966            None,
967            None,
968            None,
969            None,
970        );
971        PostgresClient::insert_user(&pool, &user).await.unwrap();
972
973        // Read
974        let mut user = PostgresClient::get_user(&pool, "user")
975            .await
976            .unwrap()
977            .unwrap();
978
979        assert_eq!(user.username, "user");
980        assert_eq!(user.group_permissions, vec!["user"]);
981        assert_eq!(user.email, "email");
982
983        // update user
984        user.active = false;
985        user.refresh_token = Some("token".to_string());
986
987        // Update
988        PostgresClient::update_user(&pool, &user).await.unwrap();
989        let user = PostgresClient::get_user(&pool, "user")
990            .await
991            .unwrap()
992            .unwrap();
993        assert!(!user.active);
994        assert_eq!(user.refresh_token.unwrap(), "token");
995
996        // get users
997        let users = PostgresClient::get_users(&pool).await.unwrap();
998        assert_eq!(users.len(), 1);
999
1000        // get last admin
1001        let is_last_admin = PostgresClient::is_last_admin(&pool, "user").await.unwrap();
1002        assert!(!is_last_admin);
1003
1004        // delete
1005        PostgresClient::delete_user(&pool, "user").await.unwrap();
1006    }
1007
1008    #[tokio::test]
1009    async fn test_postgres_genai_eval_record_reschedule() {
1010        let pool = db_pool().await;
1011
1012        let (uid, entity_id) = PostgresClient::create_entity(
1013            &pool,
1014            SPACE,
1015            NAME,
1016            VERSION,
1017            DriftType::GenAI.to_string(),
1018        )
1019        .await
1020        .unwrap();
1021
1022        let input = "This is a test input";
1023        let output = "This is a test response";
1024
1025        for j in 0..10 {
1026            let context = serde_json::json!({
1027                "input": input,
1028                "response": output,
1029            });
1030            let record = EvalRecord {
1031                created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1032                context,
1033                status: Status::Pending,
1034                id: 0, // This will be set by the database
1035                uid: format!("test_{}", j),
1036                entity_uid: uid.clone(),
1037                entity_id,
1038                ..Default::default()
1039            };
1040
1041            let boxed = BoxedEvalRecord::new(record);
1042
1043            let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1044                .await
1045                .unwrap();
1046
1047            assert_eq!(result.rows_affected(), 1);
1048        }
1049
1050        let features = PostgresClient::get_genai_eval_records(&pool, None, None, &entity_id)
1051            .await
1052            .unwrap();
1053        assert_eq!(features.len(), 10);
1054
1055        // get pending task
1056        let pending_tasks = PostgresClient::get_pending_genai_eval_record(&pool)
1057            .await
1058            .unwrap();
1059
1060        // assert not empty
1061        assert!(pending_tasks.is_some());
1062
1063        // get pending task with space, name, version
1064        let task_input = &pending_tasks.as_ref().unwrap().context["input"];
1065        assert_eq!(*task_input, "This is a test input".to_string());
1066
1067        // reschedule task
1068        PostgresClient::reschedule_genai_eval_record(
1069            &pool,
1070            &pending_tasks.as_ref().unwrap().uid,
1071            Duration::seconds(30),
1072        )
1073        .await
1074        .unwrap();
1075    }
1076
1077    #[tokio::test]
1078    async fn test_postgres_genai_eval_record_insert_get() {
1079        let pool = db_pool().await;
1080
1081        let (uid, entity_id) = PostgresClient::create_entity(
1082            &pool,
1083            SPACE,
1084            NAME,
1085            VERSION,
1086            DriftType::GenAI.to_string(),
1087        )
1088        .await
1089        .unwrap();
1090
1091        let input = "This is a test input";
1092        let output = "This is a test response";
1093
1094        for j in 0..10 {
1095            let context = serde_json::json!({
1096                "input": input,
1097                "response": output,
1098            });
1099            let record = EvalRecord {
1100                created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1101                context,
1102                status: Status::Pending,
1103                id: 0, // This will be set by the database
1104                uid: format!("test_{}", j),
1105                entity_uid: uid.clone(),
1106                entity_id,
1107                ..Default::default()
1108            };
1109
1110            let boxed = BoxedEvalRecord::new(record);
1111
1112            let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1113                .await
1114                .unwrap();
1115
1116            assert_eq!(result.rows_affected(), 1);
1117        }
1118
1119        let features = PostgresClient::get_genai_eval_records(&pool, None, None, &entity_id)
1120            .await
1121            .unwrap();
1122        assert_eq!(features.len(), 10);
1123
1124        // get pending task
1125        let pending_tasks = PostgresClient::get_pending_genai_eval_record(&pool)
1126            .await
1127            .unwrap();
1128
1129        // assert not empty
1130        assert!(pending_tasks.is_some());
1131
1132        // get pending task with space, name, version
1133        let task_input = &pending_tasks.as_ref().unwrap().context["input"];
1134        assert_eq!(*task_input, "This is a test input".to_string());
1135
1136        // update pending task
1137        PostgresClient::update_genai_eval_record_status(
1138            &pool,
1139            &pending_tasks.unwrap(),
1140            Status::Processed,
1141            &(1_i64),
1142        )
1143        .await
1144        .unwrap();
1145
1146        // query processed tasks
1147        let processed_tasks = PostgresClient::get_genai_eval_records(
1148            &pool,
1149            None,
1150            Some(Status::Processed),
1151            &entity_id,
1152        )
1153        .await
1154        .unwrap();
1155
1156        // assert not empty
1157        assert_eq!(processed_tasks.len(), 1);
1158    }
1159
1160    #[tokio::test]
1161    async fn test_postgres_genai_eval_record_pagination() {
1162        let pool = db_pool().await;
1163
1164        let (uid, entity_id) = PostgresClient::create_entity(
1165            &pool,
1166            SPACE,
1167            NAME,
1168            VERSION,
1169            DriftType::GenAI.to_string(),
1170        )
1171        .await
1172        .unwrap();
1173
1174        let input = "This is a test input";
1175        let output = "This is a test response";
1176
1177        // Insert 10 records with increasing timestamps
1178        for j in 0..10 {
1179            let context = serde_json::json!({
1180                "input": input,
1181                "response": output,
1182            });
1183            let record = EvalRecord {
1184                created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1185                context,
1186                status: Status::Pending,
1187                id: 0, // This will be set by the database
1188                uid: format!("test_{}", j),
1189                entity_uid: uid.clone(),
1190                entity_id: ENTITY_ID,
1191                ..Default::default()
1192            };
1193
1194            let boxed = BoxedEvalRecord::new(record);
1195
1196            let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1197                .await
1198                .unwrap();
1199
1200            assert_eq!(result.rows_affected(), 1);
1201        }
1202
1203        // ===== PAGE 1: Get first 5 records (newest) =====
1204        let params = EvalRecordPaginationRequest {
1205            status: None,
1206            limit: Some(5),
1207            cursor_created_at: None,
1208            cursor_id: None,
1209            direction: None,
1210            ..Default::default()
1211        };
1212
1213        let page1 = PostgresClient::get_paginated_genai_eval_records(&pool, &params, &entity_id)
1214            .await
1215            .unwrap();
1216
1217        assert_eq!(page1.items.len(), 5, "Page 1 should have 5 records");
1218        assert!(page1.has_next, "Should have next page");
1219        assert!(
1220            !page1.has_previous,
1221            "Should not have previous page (first page)"
1222        );
1223        assert!(page1.next_cursor.is_some(), "Should have next cursor");
1224
1225        // First item should be the NEWEST record (highest ID)
1226        let page1_first = page1.items.first().unwrap();
1227        let page1_last = page1.items.last().unwrap();
1228
1229        assert!(
1230            page1_first.created_at >= page1_last.created_at,
1231            "Page 1 should be sorted newest first (DESC)"
1232        );
1233
1234        // ===== PAGE 2: Get next 5 records (older) =====
1235        let next_cursor = page1.next_cursor.unwrap();
1236
1237        let params = EvalRecordPaginationRequest {
1238            status: None,
1239            limit: Some(5),
1240            cursor_created_at: Some(next_cursor.created_at),
1241            cursor_id: Some(next_cursor.id),
1242            direction: None,
1243            ..Default::default()
1244        };
1245
1246        let page2 = PostgresClient::get_paginated_genai_eval_records(&pool, &params, &entity_id)
1247            .await
1248            .unwrap();
1249
1250        assert_eq!(page2.items.len(), 5, "Page 2 should have 5 records");
1251        assert!(!page2.has_next, "Should not have next page (last page)");
1252        assert!(page2.has_previous, "Should have previous page");
1253        assert!(
1254            page2.previous_cursor.is_some(),
1255            "Should have previous cursor"
1256        );
1257
1258        let page2_first = page2.items.first().unwrap();
1259
1260        // Page 2 first item should be OLDER than Page 1 last item
1261        assert!(
1262            page2_first.created_at < page1_last.created_at
1263                || (page2_first.created_at == page1_last.created_at
1264                    && page2_first.id < page1_last.id),
1265            "Page 2 should start with records older than Page 1 last item"
1266        );
1267
1268        // Verify we got all 10 records across both pages
1269        let all_ids: Vec<i64> = page1
1270            .items
1271            .iter()
1272            .chain(page2.items.iter())
1273            .map(|r| r.id)
1274            .collect();
1275
1276        assert_eq!(all_ids.len(), 10, "Should have 10 unique records total");
1277
1278        // All IDs should be unique
1279        let unique_ids: std::collections::HashSet<_> = all_ids.iter().collect();
1280        assert_eq!(unique_ids.len(), 10, "All IDs should be unique");
1281
1282        // ===== BACKWARD PAGINATION TEST =====
1283        // Go back from page 2 to page 1
1284        let previous_cursor = page2.previous_cursor.unwrap();
1285
1286        let params = EvalRecordPaginationRequest {
1287            status: None,
1288            limit: Some(5),
1289            cursor_created_at: Some(previous_cursor.created_at),
1290            cursor_id: Some(previous_cursor.id),
1291            direction: Some("previous".to_string()),
1292            ..Default::default()
1293        };
1294
1295        let page1_again =
1296            PostgresClient::get_paginated_genai_eval_records(&pool, &params, &entity_id)
1297                .await
1298                .unwrap();
1299
1300        assert_eq!(
1301            page1_again.items.len(),
1302            5,
1303            "Going back should return 5 records"
1304        );
1305
1306        // Should get the same records as page 1
1307        assert_eq!(
1308            page1_again.items.first().unwrap().id,
1309            page1_first.id,
1310            "Should return to the same first record"
1311        );
1312    }
1313
1314    #[tokio::test]
1315    async fn test_postgres_genai_eval_workflow_pagination() {
1316        let pool = db_pool().await;
1317
1318        let (_uid, entity_id) = PostgresClient::create_entity(
1319            &pool,
1320            SPACE,
1321            NAME,
1322            VERSION,
1323            DriftType::GenAI.to_string(),
1324        )
1325        .await
1326        .unwrap();
1327
1328        // Insert 10 records with increasing timestamps
1329        for j in 0..10 {
1330            let record = GenAIEvalWorkflowResult {
1331                created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1332                record_uid: format!("test_{}", j),
1333                entity_id,
1334                ..Default::default()
1335            };
1336
1337            let result =
1338                PostgresClient::insert_genai_eval_workflow_record(&pool, &record, &entity_id)
1339                    .await
1340                    .unwrap();
1341
1342            assert_eq!(result.rows_affected(), 1);
1343        }
1344
1345        // ===== PAGE 1: Get first 5 records (newest) =====
1346        let params = EvalRecordPaginationRequest {
1347            status: None,
1348            limit: Some(5),
1349            cursor_created_at: None,
1350            cursor_id: None,
1351            direction: None,
1352            ..Default::default()
1353        };
1354
1355        let page1 =
1356            PostgresClient::get_paginated_genai_eval_workflow_records(&pool, &params, &entity_id)
1357                .await
1358                .unwrap();
1359
1360        assert_eq!(page1.items.len(), 5, "Page 1 should have 5 records");
1361        assert!(page1.has_next, "Should have next page");
1362        assert!(
1363            !page1.has_previous,
1364            "Should not have previous page (first page)"
1365        );
1366        assert!(page1.next_cursor.is_some(), "Should have next cursor");
1367
1368        // First item should be the NEWEST record (highest ID)
1369        let page1_first = page1.items.first().unwrap();
1370        let page1_last = page1.items.last().unwrap();
1371
1372        assert!(
1373            page1_first.created_at >= page1_last.created_at,
1374            "Page 1 should be sorted newest first (DESC)"
1375        );
1376
1377        // ===== PAGE 2: Get next 5 records (older) =====
1378        let next_cursor = page1.next_cursor.unwrap();
1379
1380        let params = EvalRecordPaginationRequest {
1381            status: None,
1382            limit: Some(5),
1383            cursor_created_at: Some(next_cursor.created_at),
1384            cursor_id: Some(next_cursor.id),
1385            direction: None,
1386            ..Default::default()
1387        };
1388
1389        let page2 =
1390            PostgresClient::get_paginated_genai_eval_workflow_records(&pool, &params, &entity_id)
1391                .await
1392                .unwrap();
1393
1394        assert_eq!(page2.items.len(), 5, "Page 2 should have 5 records");
1395        assert!(!page2.has_next, "Should not have next page (last page)");
1396        assert!(page2.has_previous, "Should have previous page");
1397        assert!(
1398            page2.previous_cursor.is_some(),
1399            "Should have previous cursor"
1400        );
1401
1402        let page2_first = page2.items.first().unwrap();
1403
1404        // Page 2 first item should be OLDER than Page 1 last item
1405        assert!(
1406            page2_first.created_at < page1_last.created_at
1407                || (page2_first.created_at == page1_last.created_at
1408                    && page2_first.id < page1_last.id),
1409            "Page 2 should start with records older than Page 1 last item"
1410        );
1411
1412        // Verify we got all 10 records across both pages
1413        let all_ids: Vec<i64> = page1
1414            .items
1415            .iter()
1416            .chain(page2.items.iter())
1417            .map(|r| r.id)
1418            .collect();
1419
1420        assert_eq!(all_ids.len(), 10, "Should have 10 unique records total");
1421
1422        // All IDs should be unique
1423        let unique_ids: std::collections::HashSet<_> = all_ids.iter().collect();
1424        assert_eq!(unique_ids.len(), 10, "All IDs should be unique");
1425
1426        // ===== BACKWARD PAGINATION TEST =====
1427        // Go back from page 2 to page 1
1428        let previous_cursor = page2.previous_cursor.unwrap();
1429
1430        let params = EvalRecordPaginationRequest {
1431            status: None,
1432            limit: Some(5),
1433            cursor_created_at: Some(previous_cursor.created_at),
1434            cursor_id: Some(previous_cursor.id),
1435            direction: Some("previous".to_string()),
1436            ..Default::default()
1437        };
1438
1439        let page1_again =
1440            PostgresClient::get_paginated_genai_eval_workflow_records(&pool, &params, &entity_id)
1441                .await
1442                .unwrap();
1443
1444        assert_eq!(
1445            page1_again.items.len(),
1446            5,
1447            "Going back should return 5 records"
1448        );
1449
1450        // Should get the same records as page 1
1451        assert_eq!(
1452            page1_again.items.first().unwrap().id,
1453            page1_first.id,
1454            "Should return to the same first record"
1455        );
1456    }
1457
1458    #[tokio::test]
1459    async fn test_postgres_genai_task_result_insert_get() {
1460        let pool = db_pool().await;
1461
1462        let timestamp = Utc::now();
1463
1464        let (uid, entity_id) = PostgresClient::create_entity(
1465            &pool,
1466            SPACE,
1467            NAME,
1468            VERSION,
1469            DriftType::GenAI.to_string(),
1470        )
1471        .await
1472        .unwrap();
1473
1474        let mut records = Vec::new();
1475        for i in 0..2 {
1476            for j in 0..25 {
1477                let record = GenAIEvalTaskResult {
1478                    record_uid: format!("record_uid_{i}_{j}"),
1479                    created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1480                    start_time: Utc::now(),
1481                    end_time: Utc::now() + chrono::Duration::seconds(1),
1482                    entity_id,
1483                    task_id: format!("task{i}"),
1484                    task_type: scouter_types::genai::EvaluationTaskType::Assertion,
1485                    passed: true,
1486                    value: j as f64,
1487                    assertion: Assertion::FieldPath(Some(format!("field.path.{i}"))),
1488                    operator: scouter_types::genai::ComparisonOperator::Contains,
1489                    expected: Value::Null,
1490                    actual: Value::Null,
1491                    message: "All good".to_string(),
1492                    entity_uid: uid.clone(),
1493                    condition: false,
1494                    stage: 0_i32,
1495                };
1496                records.push(record);
1497            }
1498            let result =
1499                PostgresClient::insert_eval_task_results_batch(&pool, &records, &entity_id)
1500                    .await
1501                    .unwrap();
1502            assert_eq!(result.rows_affected(), 25);
1503        }
1504
1505        let metrics = PostgresClient::get_genai_task_values(
1506            &pool,
1507            &timestamp,
1508            &["task1".to_string()],
1509            &entity_id,
1510        )
1511        .await
1512        .unwrap();
1513
1514        assert_eq!(metrics.len(), 1);
1515        let binned_records = PostgresClient::get_binned_genai_task_values(
1516            &pool,
1517            &DriftRequest {
1518                uid: uid.clone(),
1519                time_interval: TimeInterval::OneHour,
1520                max_data_points: 1000,
1521                ..Default::default()
1522            },
1523            &DatabaseSettings::default().retention_period,
1524            &ObjectStorageSettings::default(),
1525            &entity_id,
1526        )
1527        .await
1528        .unwrap();
1529        //
1530        assert_eq!(binned_records.metrics.len(), 2);
1531
1532        let eval_task = PostgresClient::get_genai_eval_task(&pool, &records[0].record_uid)
1533            .await
1534            .unwrap();
1535
1536        assert_eq!(eval_task[0].record_uid, records[0].record_uid);
1537    }
1538
1539    #[tokio::test]
1540    async fn test_postgres_genai_workflow_result_insert_get() {
1541        let pool = db_pool().await;
1542
1543        let timestamp = Utc::now();
1544
1545        let (uid, entity_id) = PostgresClient::create_entity(
1546            &pool,
1547            SPACE,
1548            NAME,
1549            VERSION,
1550            DriftType::GenAI.to_string(),
1551        )
1552        .await
1553        .unwrap();
1554
1555        for i in 0..2 {
1556            for j in 0..25 {
1557                let record = GenAIEvalWorkflowResult {
1558                    record_uid: format!("record_uid_{i}_{j}"),
1559                    created_at: Utc::now() + chrono::Duration::hours(i),
1560                    entity_id,
1561                    total_tasks: 10,
1562                    passed_tasks: 8,
1563                    failed_tasks: 2,
1564                    pass_rate: 0.8,
1565                    duration_ms: 1500,
1566                    entity_uid: uid.clone(),
1567                    execution_plan: ExecutionPlan::default(),
1568                    id: 0,
1569                };
1570                let result =
1571                    PostgresClient::insert_genai_eval_workflow_record(&pool, &record, &entity_id)
1572                        .await
1573                        .unwrap();
1574                assert_eq!(result.rows_affected(), 1);
1575            }
1576        }
1577
1578        let metric = PostgresClient::get_genai_workflow_value(&pool, &timestamp, &entity_id)
1579            .await
1580            .unwrap();
1581
1582        assert!(metric.is_some());
1583
1584        let binned_records = PostgresClient::get_binned_genai_workflow_values(
1585            &pool,
1586            &DriftRequest {
1587                uid: uid.clone(),
1588                time_interval: TimeInterval::OneHour,
1589                max_data_points: 1000,
1590                ..Default::default()
1591            },
1592            &DatabaseSettings::default().retention_period,
1593            &ObjectStorageSettings::default(),
1594            &entity_id,
1595        )
1596        .await
1597        .unwrap();
1598        //
1599        assert_eq!(binned_records.metrics.len(), 1);
1600    }
1601
1602    #[tokio::test]
1603    async fn test_postgres_tags() {
1604        let pool = db_pool().await;
1605        let uid = create_uuid7();
1606
1607        let tag1 = TagRecord {
1608            entity_id: uid.clone(),
1609            entity_type: "service".to_string(),
1610            key: "env".to_string(),
1611            value: "production".to_string(),
1612        };
1613
1614        let tag2 = TagRecord {
1615            entity_id: uid.clone(),
1616            entity_type: "service".to_string(),
1617            key: "team".to_string(),
1618            value: "backend".to_string(),
1619        };
1620
1621        let result = PostgresClient::insert_tag_batch(&pool, &[tag1.clone(), tag2.clone()])
1622            .await
1623            .unwrap();
1624
1625        assert_eq!(result.rows_affected(), 2);
1626
1627        let tags = PostgresClient::get_tags(&pool, "service", &uid)
1628            .await
1629            .unwrap();
1630
1631        assert_eq!(tags.len(), 2);
1632
1633        let tag_filter = vec![Tag {
1634            key: tags.first().unwrap().key.clone(),
1635            value: tags.first().unwrap().value.clone(),
1636        }];
1637
1638        let entity_id = PostgresClient::get_entity_id_by_tags(&pool, "service", &tag_filter, false)
1639            .await
1640            .unwrap();
1641
1642        assert_eq!(entity_id.first().unwrap(), &uid);
1643    }
1644}