Skip to main content

scouter_sql/sql/
postgres.rs

1use crate::sql::aggregator::init_trace_cache;
2use crate::sql::cache::entity_cache;
3use crate::sql::cache::init_entity_cache;
4use crate::sql::error::SqlError;
5use crate::sql::traits::{
6    AlertSqlLogic, ArchiveSqlLogic, CustomMetricSqlLogic, GenAIDriftSqlLogic,
7    ObservabilitySqlLogic, ProfileSqlLogic, PsiSqlLogic, SpcSqlLogic, TagSqlLogic, TraceSqlLogic,
8    UserSqlLogic,
9};
10use scouter_settings::DatabaseSettings;
11use scouter_types::{RecordType, ServerRecords, TagRecord, ToDriftRecords, TraceServerRecord};
12use sqlx::ConnectOptions;
13use sqlx::{postgres::PgConnectOptions, Pool, Postgres};
14use std::result::Result::Ok;
15use std::time::Duration;
16use tokio::try_join;
17use tracing::log::LevelFilter;
18use tracing::{debug, error, info, instrument};
19const DEFAULT_BATCH_SIZE: usize = 500;
20
21#[derive(Debug, Clone)]
22#[allow(dead_code)]
23pub struct PostgresClient {}
24
25impl SpcSqlLogic for PostgresClient {}
26impl CustomMetricSqlLogic for PostgresClient {}
27impl PsiSqlLogic for PostgresClient {}
28impl GenAIDriftSqlLogic for PostgresClient {}
29impl UserSqlLogic for PostgresClient {}
30impl ProfileSqlLogic for PostgresClient {}
31impl ObservabilitySqlLogic for PostgresClient {}
32impl AlertSqlLogic for PostgresClient {}
33impl ArchiveSqlLogic for PostgresClient {}
34impl TraceSqlLogic for PostgresClient {}
35impl TagSqlLogic for PostgresClient {}
36
37impl PostgresClient {
38    /// Setup the application with the given database pool.
39    ///
40    /// # Returns
41    ///
42    /// * `Result<Pool<Postgres>, anyhow::Error>` - Result of the database pool
43    #[instrument(skip(database_settings))]
44    pub async fn create_db_pool(
45        database_settings: &DatabaseSettings,
46    ) -> Result<Pool<Postgres>, SqlError> {
47        let mut opts: PgConnectOptions = database_settings.connection_uri.parse()?;
48
49        // Sqlx logs a lot of debug information by default, which can be overwhelming.
50        opts = opts.log_statements(LevelFilter::Off);
51
52        let pool = match sqlx::postgres::PgPoolOptions::new()
53            .max_connections(database_settings.max_connections)
54            .min_connections(database_settings.min_connections)
55            .acquire_timeout(Duration::from_secs(
56                database_settings.db_acquire_timeout_seconds,
57            ))
58            .idle_timeout(Duration::from_secs(
59                database_settings.db_idle_timeout_seconds,
60            ))
61            .max_lifetime(Duration::from_secs(
62                database_settings.db_max_lifetime_seconds,
63            ))
64            .test_before_acquire(database_settings.db_test_before_acquire)
65            .connect_with(opts)
66            .await
67        {
68            Ok(pool) => {
69                info!("✅ Successfully connected to database");
70                pool
71            }
72            Err(err) => {
73                error!("🚨 Failed to connect to database {:?}", err);
74                std::process::exit(1);
75            }
76        };
77
78        // setup entity cache
79        init_entity_cache(database_settings.entity_cache_size);
80
81        // setup trace cache
82        init_trace_cache(
83            pool.clone(),
84            database_settings.flush_interval,
85            database_settings.stale_threshold,
86            database_settings.max_cache_size,
87        )
88        .await?;
89
90        // Run migrations
91        if let Err(err) = Self::run_migrations(&pool).await {
92            error!("🚨 Failed to run migrations {:?}", err);
93            std::process::exit(1);
94        }
95
96        Ok(pool)
97    }
98
99    pub async fn run_migrations(pool: &Pool<Postgres>) -> Result<(), SqlError> {
100        info!("Running migrations");
101        sqlx::migrate!("src/migrations")
102            .run(pool)
103            .await
104            .map_err(SqlError::MigrateError)?;
105
106        debug!("Migrations complete");
107
108        Ok(())
109    }
110}
111
112pub struct MessageHandler {}
113
114impl MessageHandler {
115    #[instrument(skip_all)]
116    pub async fn insert_server_records(
117        pool: &Pool<Postgres>,
118        records: ServerRecords,
119    ) -> Result<(), SqlError> {
120        debug!("Inserting server records: {:?}", records.record_type()?);
121
122        let entity_id = entity_cache()
123            .get_entity_id_from_uid(pool, records.uid()?)
124            .await
125            .inspect_err(|e| error!("Failed to get entity ID from UID: {:?}", e))?;
126
127        match records.record_type()? {
128            RecordType::Spc => {
129                let spc_records = records.to_spc_drift_records()?;
130                debug!("SPC record count: {}", spc_records.len());
131
132                for chunk in spc_records.chunks(DEFAULT_BATCH_SIZE) {
133                    PostgresClient::insert_spc_drift_records_batch(pool, chunk, &entity_id)
134                        .await
135                        .map_err(|e| {
136                            error!("Failed to insert SPC drift records batch: {:?}", e);
137                            e
138                        })?;
139                }
140            }
141
142            RecordType::Psi => {
143                let psi_records = records.to_psi_drift_records()?;
144                debug!("PSI record count: {}", psi_records.len());
145
146                for chunk in psi_records.chunks(DEFAULT_BATCH_SIZE) {
147                    PostgresClient::insert_bin_counts_batch(pool, chunk, &entity_id)
148                        .await
149                        .map_err(|e| {
150                            error!("Failed to insert PSI drift records batch: {:?}", e);
151                            e
152                        })?;
153                }
154            }
155            RecordType::Custom => {
156                let custom_records = records.to_custom_metric_drift_records()?;
157                debug!("Custom record count: {}", custom_records.len());
158
159                for chunk in custom_records.chunks(DEFAULT_BATCH_SIZE) {
160                    PostgresClient::insert_custom_metric_values_batch(pool, chunk, &entity_id)
161                        .await
162                        .map_err(|e| {
163                            error!("Failed to insert custom metric records batch: {:?}", e);
164                            e
165                        })?;
166                }
167            }
168
169            RecordType::GenAIEval => {
170                debug!("LLM Drift record count: {:?}", records.len());
171                let records = records.to_genai_eval_records()?;
172                for record in records {
173                    let _ = PostgresClient::insert_genai_eval_record(pool, record, &entity_id)
174                        .await
175                        .map_err(|e| {
176                            error!("Failed to insert GenAI drift record: {:?}", e);
177                        });
178                }
179            }
180
181            RecordType::GenAITask => {
182                debug!("GenAI Task count: {:?}", records.len());
183                let records = records.to_genai_task_records()?;
184                for chunk in records.chunks(DEFAULT_BATCH_SIZE) {
185                    PostgresClient::insert_eval_task_results_batch(pool, chunk, &entity_id)
186                        .await
187                        .map_err(|e| {
188                            error!("Failed to insert GenAI task records batch: {:?}", e);
189                            e
190                        })?;
191                }
192            }
193
194            RecordType::GenAIWorkflow => {
195                debug!("GenAI Workflow count: {:?}", records.len());
196                let records = records.to_genai_workflow_records()?;
197                for record in records {
198                    let _ = PostgresClient::insert_genai_eval_workflow_record(
199                        pool, &record, &entity_id,
200                    )
201                    .await
202                    .map_err(|e| {
203                        error!("Failed to insert GenAI workflow record: {:?}", e);
204                    });
205                }
206            }
207
208            _ => {
209                error!(
210                    "Unsupported record type for batch insert: {:?}",
211                    records.record_type()?
212                );
213                return Err(SqlError::UnsupportedBatchTypeError);
214            }
215        }
216
217        Ok(())
218    }
219
220    pub async fn insert_trace_server_record(
221        pool: &Pool<Postgres>,
222        records: TraceServerRecord,
223    ) -> Result<(), SqlError> {
224        let (span_batch, baggage_batch, tag_records) = records.to_records()?;
225        let span_count = span_batch.len();
226
227        // ── Update TraceCache for summary aggregation ─────────────────────────
228        // TraceCache accumulates span stats in memory; flush target is TraceSummaryService.
229        let trace_cache = crate::sql::aggregator::get_trace_cache().await;
230        for span in &span_batch {
231            trace_cache.update_trace(span).await;
232        }
233
234        // ── Write spans to Delta Lake (primary trace store) ──────────────────
235        if let Some(trace_service) =
236            scouter_dataframe::parquet::tracing::service::get_trace_span_service()
237        {
238            if let Err(e) = trace_service.write_spans(span_batch).await {
239                error!("Failed to write spans to Delta Lake: {:?}", e);
240            }
241        } else {
242            error!("TraceSpanService not initialized — spans will be lost");
243        }
244
245        // ── Baggage and tags remain in Postgres ───────────────────────────────
246        let (baggage_result, tag_result) = try_join!(
247            PostgresClient::insert_trace_baggage_batch(pool, &baggage_batch),
248            PostgresClient::insert_tag_batch(pool, &tag_records),
249        )?;
250
251        debug!(
252            span_count,
253            baggage_rows = baggage_result.rows_affected(),
254            total_baggage = baggage_batch.len(),
255            tag_rows = tag_result.rows_affected(),
256            "Successfully processed trace server records"
257        );
258        Ok(())
259    }
260
261    pub async fn insert_tag_record(
262        pool: &Pool<Postgres>,
263        record: TagRecord,
264    ) -> Result<(), SqlError> {
265        let result = PostgresClient::insert_tag_batch(pool, std::slice::from_ref(&record)).await?;
266
267        debug!(
268            rows_affected = result.rows_affected(),
269            entity_type = record.entity_type.as_str(),
270            entity_id = record.entity_id.as_str(),
271            key = record.key.as_str(),
272            "Successfully inserted tag record"
273        );
274
275        Ok(())
276    }
277}
278
279/// Runs database integratino tests
280/// Note - binned queries targeting custom intervals with long-term and short-term data are
281/// done in the scouter-server integration tests
282#[cfg(test)]
283mod tests {
284
285    use super::*;
286    use crate::sql::schema::User;
287    use crate::sql::traits::EntitySqlLogic;
288    use chrono::{Duration, Utc};
289    use potato_head::create_uuid7;
290    use rand::Rng;
291    use scouter_semver::VersionType;
292    use scouter_settings::ObjectStorageSettings;
293    use scouter_types::genai::ExecutionPlan;
294    use scouter_types::psi::{Bin, BinType, PsiDriftConfig, PsiFeatureDriftProfile};
295    use scouter_types::spc::SpcDriftProfile;
296    use scouter_types::*;
297    use serde_json::Value;
298
299    const SPACE: &str = "space";
300    const NAME: &str = "name";
301    const VERSION: &str = "1.0.0";
302    const ENTITY_ID: i32 = 9999;
303
304    pub async fn cleanup(pool: &Pool<Postgres>) {
305        sqlx::raw_sql(
306            r#"
307
308            DELETE
309            FROM scouter.drift_entities;
310
311            DELETE
312            FROM scouter.spc_drift;
313
314            DELETE
315            FROM scouter.observability_metric;
316
317            DELETE
318            FROM scouter.custom_drift;
319
320            DELETE
321            FROM scouter.drift_alert;
322
323            DELETE
324            FROM scouter.drift_profile;
325
326            DELETE
327            FROM scouter.psi_drift;
328
329            DELETE
330            FROM scouter.user;
331
332            DELETE
333            FROM scouter.genai_eval_record;
334
335            DELETE
336            FROM scouter.genai_eval_task;
337
338            DELETE
339            FROM scouter.genai_eval_workflow;
340
341            DELETE
342            FROM scouter.spans;
343
344            DELETE
345            FROM scouter.traces;
346
347            DELETE
348            FROM scouter.trace_entities;
349
350            DELETE
351            FROM scouter.trace_baggage;
352
353            DELETE
354            FROM scouter.tags;
355            "#,
356        )
357        .fetch_all(pool)
358        .await
359        .unwrap();
360    }
361
362    pub async fn db_pool() -> Pool<Postgres> {
363        let pool = PostgresClient::create_db_pool(&DatabaseSettings::default())
364            .await
365            .unwrap();
366        cleanup(&pool).await;
367        pool
368    }
369
370    pub async fn insert_profile_to_db(
371        pool: &Pool<Postgres>,
372        profile: &DriftProfile,
373        active: bool,
374        deactivate_others: bool,
375    ) -> String {
376        let base_args = profile.get_base_args();
377        let version_request = VersionRequest {
378            version: None,
379            version_type: VersionType::Minor,
380            pre_tag: None,
381            build_tag: None,
382        };
383        let version = PostgresClient::get_next_profile_version(pool, &base_args, version_request)
384            .await
385            .unwrap();
386
387        let result = PostgresClient::insert_drift_profile(
388            pool,
389            profile,
390            &base_args,
391            &version,
392            &active,
393            &deactivate_others,
394        )
395        .await
396        .unwrap();
397
398        result
399    }
400
401    #[tokio::test]
402    async fn test_postgres_start() {
403        let _pool = db_pool().await;
404    }
405
406    #[tokio::test]
407    async fn test_postgres_drift_alert() {
408        let pool = db_pool().await;
409        let entity_id = 9999;
410
411        // Insert 10 alerts with slight delays to ensure different timestamps
412        for i in 0..10 {
413            let alert = AlertMap::Custom(custom::ComparisonMetricAlert {
414                metric_name: "test".to_string(),
415                baseline_value: 0 as f64,
416                observed_value: i as f64,
417                delta: None,
418                alert_threshold: AlertThreshold::Above,
419            });
420
421            let result = PostgresClient::insert_drift_alert(&pool, &entity_id, &alert)
422                .await
423                .unwrap();
424
425            assert_eq!(result.rows_affected(), 1);
426
427            // Small delay to ensure timestamp ordering
428            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
429        }
430
431        // Test 1: Get first page with default limit (50)
432        let request = DriftAlertPaginationRequest {
433            uid: create_uuid7(),
434            active: Some(true),
435            limit: Some(50),
436            ..Default::default()
437        };
438
439        let response = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
440            .await
441            .unwrap();
442
443        assert_eq!(response.items.len(), 10);
444        assert!(!response.has_next); // No more items
445        assert!(!response.has_previous); // First page
446        assert!(response.next_cursor.is_none());
447        assert!(response.previous_cursor.is_some());
448
449        // Test 2: Paginate with limit of 3 - forward direction
450        let request = DriftAlertPaginationRequest {
451            uid: create_uuid7(),
452            active: Some(true),
453            limit: Some(3),
454            direction: Some("next".to_string()),
455            ..Default::default()
456        };
457
458        let page1 = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
459            .await
460            .unwrap();
461
462        assert_eq!(page1.items.len(), 3);
463        assert!(page1.has_next);
464        assert!(!page1.has_previous);
465        assert!(page1.next_cursor.is_some());
466        assert!(page1.previous_cursor.is_some());
467
468        // Test 3: Get second page using next cursor
469        let next_cursor = page1.next_cursor.unwrap();
470        let request = DriftAlertPaginationRequest {
471            uid: create_uuid7(),
472            active: Some(true),
473            limit: Some(3),
474            cursor_created_at: Some(next_cursor.created_at),
475            cursor_id: Some(next_cursor.id as i32),
476            direction: Some("next".to_string()),
477            start_datetime: None,
478            end_datetime: None,
479        };
480
481        let page2 = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
482            .await
483            .unwrap();
484
485        assert_eq!(page2.items.len(), 3);
486        assert!(page2.has_next); // More items available
487        assert!(page2.has_previous); // Can go back
488        assert!(page2.next_cursor.is_some());
489        assert!(page2.previous_cursor.is_some());
490
491        // Verify no overlap between pages
492        let page1_ids: std::collections::HashSet<_> = page1.items.iter().map(|a| a.id).collect();
493        let page2_ids: std::collections::HashSet<_> = page2.items.iter().map(|a| a.id).collect();
494        assert!(page1_ids.is_disjoint(&page2_ids));
495
496        // Test 4: Navigate backward using previous cursor
497        let prev_cursor = page2.previous_cursor.unwrap();
498        let request = DriftAlertPaginationRequest {
499            uid: create_uuid7(),
500            active: Some(true),
501            limit: Some(3),
502            cursor_created_at: Some(prev_cursor.created_at),
503            cursor_id: Some(prev_cursor.id as i32),
504            direction: Some("previous".to_string()),
505            start_datetime: None,
506            end_datetime: None,
507        };
508
509        let page_back = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
510            .await
511            .unwrap();
512
513        assert_eq!(page_back.items.len(), 3);
514        assert!(page_back.has_next); // Can go forward
515                                     // Should return to page1 items
516        assert_eq!(
517            page_back.items.iter().map(|a| a.id).collect::<Vec<_>>(),
518            page1.items.iter().map(|a| a.id).collect::<Vec<_>>()
519        );
520
521        // Test 5: Filter by active status
522        // Deactivate some alerts first
523        let to_deactivate = &page1.items[0];
524        let update_request = UpdateAlertStatus {
525            id: to_deactivate.id,
526            active: false,
527            space: "test_space".to_string(),
528        };
529
530        PostgresClient::update_drift_alert_status(&pool, &update_request)
531            .await
532            .unwrap();
533
534        // Query only active alerts
535        let request = DriftAlertPaginationRequest {
536            uid: create_uuid7(),
537            active: Some(true),
538            limit: Some(50),
539            ..Default::default()
540        };
541
542        let active_alerts = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
543            .await
544            .unwrap();
545
546        assert_eq!(active_alerts.items.len(), 9); // 10 - 1 deactivated
547        assert!(active_alerts.items.iter().all(|a| a.active));
548
549        // Query only inactive alerts
550        let request = DriftAlertPaginationRequest {
551            uid: create_uuid7(),
552            active: Some(false),
553            limit: Some(50),
554            ..Default::default()
555        };
556
557        let inactive_alerts =
558            PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
559                .await
560                .unwrap();
561
562        assert_eq!(inactive_alerts.items.len(), 1);
563        assert!(!inactive_alerts.items[0].active);
564
565        // Test 6: Query all alerts (active and inactive)
566        let request = DriftAlertPaginationRequest {
567            uid: create_uuid7(),
568            active: None, // No filter
569            limit: Some(50),
570            ..Default::default()
571        };
572
573        let all_alerts = PostgresClient::get_paginated_drift_alerts(&pool, &request, &entity_id)
574            .await
575            .unwrap();
576
577        assert_eq!(all_alerts.items.len(), 10);
578
579        // Test 7: Empty result set
580        let request = DriftAlertPaginationRequest {
581            uid: create_uuid7(),
582            active: Some(true),
583            limit: Some(3),
584            ..Default::default()
585        };
586
587        let non_existent = PostgresClient::get_paginated_drift_alerts(&pool, &request, &999999)
588            .await
589            .unwrap();
590
591        assert_eq!(non_existent.items.len(), 0);
592        assert!(!non_existent.has_next);
593        assert!(!non_existent.has_previous);
594        assert!(non_existent.next_cursor.is_none());
595        assert!(non_existent.previous_cursor.is_none());
596    }
597
598    #[tokio::test]
599    async fn test_postgres_spc_drift_record() {
600        let pool = db_pool().await;
601
602        let record1 = SpcRecord {
603            created_at: Utc::now(),
604            uid: create_uuid7(),
605            feature: "test".to_string(),
606            value: 1.0,
607            entity_id: 0,
608        };
609
610        let record2 = SpcRecord {
611            created_at: Utc::now(),
612            uid: create_uuid7(),
613            feature: "test2".to_string(),
614            value: 2.0,
615            entity_id: 0,
616        };
617
618        let result =
619            PostgresClient::insert_spc_drift_records_batch(&pool, &[record1, record2], &ENTITY_ID)
620                .await
621                .unwrap();
622
623        assert_eq!(result.rows_affected(), 2);
624    }
625
626    #[tokio::test]
627    async fn test_postgres_bin_count() {
628        let pool = db_pool().await;
629
630        let record1 = PsiRecord {
631            created_at: Utc::now(),
632            uid: create_uuid7(),
633            feature: "test".to_string(),
634            bin_id: 1,
635            bin_count: 1,
636            entity_id: ENTITY_ID,
637        };
638
639        let record2 = PsiRecord {
640            created_at: Utc::now(),
641            uid: create_uuid7(),
642            feature: "test2".to_string(),
643            bin_id: 2,
644            bin_count: 2,
645            entity_id: ENTITY_ID,
646        };
647
648        let result =
649            PostgresClient::insert_bin_counts_batch(&pool, &[record1, record2], &ENTITY_ID)
650                .await
651                .unwrap();
652
653        assert_eq!(result.rows_affected(), 2);
654    }
655
656    #[tokio::test]
657    async fn test_postgres_observability_record() {
658        let pool = db_pool().await;
659
660        let record = ObservabilityMetrics::default();
661
662        let result = PostgresClient::insert_observability_record(&pool, &record, &ENTITY_ID)
663            .await
664            .unwrap();
665
666        assert_eq!(result.rows_affected(), 1);
667    }
668
669    #[tokio::test]
670    async fn test_postgres_crud_drift_profile() {
671        let pool = db_pool().await;
672
673        let mut spc_profile = SpcDriftProfile::default();
674        let profile = DriftProfile::Spc(spc_profile.clone());
675        let uid = insert_profile_to_db(&pool, &profile, false, false).await;
676        assert!(!uid.is_empty());
677
678        let entity_id = PostgresClient::get_entity_id_from_uid(&pool, &uid)
679            .await
680            .unwrap();
681
682        spc_profile.scouter_version = "test".to_string();
683
684        let result = PostgresClient::update_drift_profile(
685            &pool,
686            &DriftProfile::Spc(spc_profile.clone()),
687            &entity_id,
688        )
689        .await
690        .unwrap();
691
692        assert_eq!(result.rows_affected(), 1);
693
694        let profile = PostgresClient::get_drift_profile(&pool, &entity_id)
695            .await
696            .unwrap();
697
698        let deserialized = serde_json::from_value::<SpcDriftProfile>(profile.unwrap()).unwrap();
699
700        assert_eq!(deserialized, spc_profile);
701
702        PostgresClient::update_drift_profile_status(
703            &pool,
704            &ProfileStatusRequest {
705                name: spc_profile.config.name.clone(),
706                space: spc_profile.config.space.clone(),
707                version: spc_profile.config.version.clone(),
708                active: false,
709                drift_type: Some(DriftType::Spc),
710                deactivate_others: false,
711            },
712        )
713        .await
714        .unwrap();
715    }
716
717    #[tokio::test]
718    async fn test_postgres_get_features() {
719        let pool = db_pool().await;
720
721        let timestamp = Utc::now();
722
723        for _ in 0..10 {
724            let mut records = Vec::new();
725            for j in 0..10 {
726                let record = SpcRecord {
727                    created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
728                    uid: create_uuid7(),
729                    feature: format!("test{j}"),
730                    value: j as f64,
731                    entity_id: ENTITY_ID,
732                };
733
734                records.push(record);
735            }
736
737            let result =
738                PostgresClient::insert_spc_drift_records_batch(&pool, &records, &ENTITY_ID)
739                    .await
740                    .unwrap();
741            assert_eq!(result.rows_affected(), records.len() as u64);
742        }
743
744        let features = PostgresClient::get_spc_features(&pool, &ENTITY_ID)
745            .await
746            .unwrap();
747        assert_eq!(features.len(), 10);
748
749        let records =
750            PostgresClient::get_spc_drift_records(&pool, &timestamp, &features, &ENTITY_ID)
751                .await
752                .unwrap();
753
754        assert_eq!(records.features.len(), 10);
755
756        let binned_records = PostgresClient::get_binned_spc_drift_records(
757            &pool,
758            &DriftRequest {
759                uid: create_uuid7(),
760                time_interval: TimeInterval::FifteenMinutes,
761                max_data_points: 10,
762                ..Default::default()
763            },
764            &DatabaseSettings::default().retention_period,
765            &ObjectStorageSettings::default(),
766            &ENTITY_ID,
767        )
768        .await
769        .unwrap();
770
771        assert_eq!(binned_records.features.len(), 10);
772    }
773
774    #[tokio::test]
775    async fn test_postgres_bin_proportions() {
776        let pool = db_pool().await;
777
778        let timestamp = Utc::now();
779
780        let num_features = 3;
781        let num_bins = 5;
782
783        let features = (0..=num_features)
784            .map(|feature| {
785                let bins = (0..=num_bins)
786                    .map(|bind_id| Bin {
787                        id: bind_id,
788                        lower_limit: None,
789                        upper_limit: None,
790                        proportion: 0.0,
791                    })
792                    .collect();
793                let feature_name = format!("feature{feature}");
794                let feature_profile = PsiFeatureDriftProfile {
795                    id: feature_name.clone(),
796                    bins,
797                    timestamp,
798                    bin_type: BinType::Numeric,
799                };
800                (feature_name, feature_profile)
801            })
802            .collect();
803
804        let profile = &DriftProfile::Psi(psi::PsiDriftProfile::new(
805            features,
806            PsiDriftConfig {
807                space: SPACE.to_string(),
808                name: NAME.to_string(),
809                version: VERSION.to_string(),
810                ..Default::default()
811            },
812        ));
813        let uid = insert_profile_to_db(&pool, profile, false, false).await;
814        let entity_id = PostgresClient::get_entity_id_from_uid(&pool, &uid)
815            .await
816            .unwrap();
817
818        for feature in 0..num_features {
819            for bin in 0..=num_bins {
820                let mut records = Vec::new();
821                for j in 0..=100 {
822                    let record = PsiRecord {
823                        created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
824                        uid: create_uuid7(),
825                        feature: format!("feature{feature}"),
826                        bin_id: bin,
827                        bin_count: rand::rng().random_range(0..10) as i32,
828                        entity_id: ENTITY_ID,
829                    };
830
831                    records.push(record);
832                }
833                PostgresClient::insert_bin_counts_batch(&pool, &records, &entity_id)
834                    .await
835                    .unwrap();
836            }
837        }
838
839        let binned_records = PostgresClient::get_feature_distributions(
840            &pool,
841            &timestamp,
842            &["feature0".to_string()],
843            &entity_id,
844        )
845        .await
846        .unwrap();
847
848        // assert binned_records.features["test"]["decile_1"] is around .5
849        let bin_proportion = binned_records
850            .distributions
851            .get("feature0")
852            .unwrap()
853            .bins
854            .get(&1)
855            .unwrap();
856
857        assert!(*bin_proportion > 0.1 && *bin_proportion < 0.2);
858
859        let binned_records = PostgresClient::get_binned_psi_drift_records(
860            &pool,
861            &DriftRequest {
862                uid: create_uuid7(),
863                time_interval: TimeInterval::OneHour,
864                max_data_points: 1000,
865                ..Default::default()
866            },
867            &DatabaseSettings::default().retention_period,
868            &ObjectStorageSettings::default(),
869            &entity_id,
870        )
871        .await
872        .unwrap();
873        //
874        assert_eq!(binned_records.len(), 3);
875    }
876
877    #[tokio::test]
878    async fn test_postgres_cru_custom_metric() {
879        let pool = db_pool().await;
880        let timestamp = Utc::now();
881
882        let (uid, entity_id) = PostgresClient::create_entity(
883            &pool,
884            SPACE,
885            NAME,
886            VERSION,
887            DriftType::Custom.to_string(),
888        )
889        .await
890        .unwrap();
891
892        for i in 0..2 {
893            let mut records = Vec::new();
894            for j in 0..25 {
895                let record = CustomMetricRecord {
896                    created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
897                    uid: uid.clone(),
898                    metric: format!("metric{i}"),
899                    value: rand::rng().random_range(0..10) as f64,
900                    entity_id: ENTITY_ID,
901                };
902                records.push(record);
903            }
904            let result =
905                PostgresClient::insert_custom_metric_values_batch(&pool, &records, &entity_id)
906                    .await
907                    .unwrap();
908            assert_eq!(result.rows_affected(), 25);
909        }
910
911        // insert random record to test has statistics funcs handle single record
912        let record = CustomMetricRecord {
913            created_at: Utc::now(),
914            uid: uid.clone(),
915            metric: "metric3".to_string(),
916            value: rand::rng().random_range(0..10) as f64,
917            entity_id: ENTITY_ID,
918        };
919
920        let result =
921            PostgresClient::insert_custom_metric_values_batch(&pool, &[record], &entity_id)
922                .await
923                .unwrap();
924        assert_eq!(result.rows_affected(), 1);
925
926        let metrics = PostgresClient::get_custom_metric_values(
927            &pool,
928            &timestamp,
929            &["metric1".to_string()],
930            &entity_id,
931        )
932        .await
933        .unwrap();
934
935        assert_eq!(metrics.len(), 1);
936
937        let binned_records = PostgresClient::get_binned_custom_drift_records(
938            &pool,
939            &DriftRequest {
940                uid: uid.clone(),
941                time_interval: TimeInterval::OneHour,
942                max_data_points: 1000,
943                ..Default::default()
944            },
945            &DatabaseSettings::default().retention_period,
946            &ObjectStorageSettings::default(),
947            &entity_id,
948        )
949        .await
950        .unwrap();
951        //
952        assert_eq!(binned_records.metrics.len(), 3);
953    }
954
955    #[tokio::test]
956    async fn test_postgres_user() {
957        let pool = db_pool().await;
958        let recovery_codes = vec!["recovery_code_1".to_string(), "recovery_code_2".to_string()];
959
960        // Create
961        let user = User::new(
962            "user".to_string(),
963            "pass".to_string(),
964            "email".to_string(),
965            recovery_codes,
966            None,
967            None,
968            None,
969            None,
970        );
971        PostgresClient::insert_user(&pool, &user).await.unwrap();
972
973        // Read
974        let mut user = PostgresClient::get_user(&pool, "user")
975            .await
976            .unwrap()
977            .unwrap();
978
979        assert_eq!(user.username, "user");
980        assert_eq!(user.group_permissions, vec!["user"]);
981        assert_eq!(user.email, "email");
982
983        // update user
984        user.active = false;
985        user.refresh_token = Some("token".to_string());
986
987        // Update
988        PostgresClient::update_user(&pool, &user).await.unwrap();
989        let user = PostgresClient::get_user(&pool, "user")
990            .await
991            .unwrap()
992            .unwrap();
993        assert!(!user.active);
994        assert_eq!(user.refresh_token.unwrap(), "token");
995
996        // get users
997        let users = PostgresClient::get_users(&pool).await.unwrap();
998        assert_eq!(users.len(), 1);
999
1000        // get last admin
1001        let is_last_admin = PostgresClient::is_last_admin(&pool, "user").await.unwrap();
1002        assert!(!is_last_admin);
1003
1004        // delete
1005        PostgresClient::delete_user(&pool, "user").await.unwrap();
1006    }
1007
1008    #[tokio::test]
1009    async fn test_postgres_genai_eval_record_reschedule() {
1010        let pool = db_pool().await;
1011
1012        let (uid, entity_id) = PostgresClient::create_entity(
1013            &pool,
1014            SPACE,
1015            NAME,
1016            VERSION,
1017            DriftType::GenAI.to_string(),
1018        )
1019        .await
1020        .unwrap();
1021
1022        let input = "This is a test input";
1023        let output = "This is a test response";
1024
1025        for j in 0..10 {
1026            let context = serde_json::json!({
1027                "input": input,
1028                "response": output,
1029            });
1030            let record = EvalRecord {
1031                created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1032                context,
1033                status: Status::Pending,
1034                id: 0, // This will be set by the database
1035                uid: format!("test_{}", j),
1036                entity_uid: uid.clone(),
1037                entity_id,
1038                tags: vec!["key=value".to_string()],
1039                ..Default::default()
1040            };
1041
1042            let boxed = BoxedEvalRecord::new(record);
1043
1044            let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1045                .await
1046                .unwrap();
1047
1048            assert_eq!(result.rows_affected(), 1);
1049        }
1050
1051        let features = PostgresClient::get_genai_eval_records(&pool, None, None, &entity_id)
1052            .await
1053            .unwrap();
1054        assert_eq!(features.len(), 10);
1055
1056        // get pending task
1057        let pending_tasks = PostgresClient::get_pending_genai_eval_record(&pool)
1058            .await
1059            .unwrap();
1060
1061        // assert not empty
1062        assert!(pending_tasks.is_some());
1063
1064        // get pending task with space, name, version
1065        let task_input = &pending_tasks.as_ref().unwrap().context["input"];
1066        assert_eq!(*task_input, "This is a test input".to_string());
1067
1068        // reschedule task
1069        PostgresClient::reschedule_genai_eval_record(
1070            &pool,
1071            &pending_tasks.as_ref().unwrap().uid,
1072            Duration::seconds(30),
1073        )
1074        .await
1075        .unwrap();
1076    }
1077
1078    #[tokio::test]
1079    async fn test_postgres_genai_eval_record_insert_get() {
1080        let pool = db_pool().await;
1081
1082        let (uid, entity_id) = PostgresClient::create_entity(
1083            &pool,
1084            SPACE,
1085            NAME,
1086            VERSION,
1087            DriftType::GenAI.to_string(),
1088        )
1089        .await
1090        .unwrap();
1091
1092        let input = "This is a test input";
1093        let output = "This is a test response";
1094
1095        for j in 0..10 {
1096            let context = serde_json::json!({
1097                "input": input,
1098                "response": output,
1099            });
1100            let record = EvalRecord {
1101                created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1102                context,
1103                status: Status::Pending,
1104                id: 0, // This will be set by the database
1105                uid: format!("test_{}", j),
1106                entity_uid: uid.clone(),
1107                entity_id,
1108                ..Default::default()
1109            };
1110
1111            let boxed = BoxedEvalRecord::new(record);
1112
1113            let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1114                .await
1115                .unwrap();
1116
1117            assert_eq!(result.rows_affected(), 1);
1118        }
1119
1120        let features = PostgresClient::get_genai_eval_records(&pool, None, None, &entity_id)
1121            .await
1122            .unwrap();
1123        assert_eq!(features.len(), 10);
1124
1125        // get pending task
1126        let pending_tasks = PostgresClient::get_pending_genai_eval_record(&pool)
1127            .await
1128            .unwrap();
1129
1130        // assert not empty
1131        assert!(pending_tasks.is_some());
1132
1133        // get pending task with space, name, version
1134        let task_input = &pending_tasks.as_ref().unwrap().context["input"];
1135        assert_eq!(*task_input, "This is a test input".to_string());
1136
1137        // update pending task
1138        PostgresClient::update_genai_eval_record_status(
1139            &pool,
1140            &pending_tasks.unwrap(),
1141            Status::Processed,
1142            &(1_i64),
1143        )
1144        .await
1145        .unwrap();
1146
1147        // query processed tasks
1148        let processed_tasks = PostgresClient::get_genai_eval_records(
1149            &pool,
1150            None,
1151            Some(Status::Processed),
1152            &entity_id,
1153        )
1154        .await
1155        .unwrap();
1156
1157        // assert not empty
1158        assert_eq!(processed_tasks.len(), 1);
1159    }
1160
1161    #[tokio::test]
1162    async fn test_postgres_genai_eval_record_pagination() {
1163        let pool = db_pool().await;
1164
1165        let (uid, entity_id) = PostgresClient::create_entity(
1166            &pool,
1167            SPACE,
1168            NAME,
1169            VERSION,
1170            DriftType::GenAI.to_string(),
1171        )
1172        .await
1173        .unwrap();
1174
1175        let input = "This is a test input";
1176        let output = "This is a test response";
1177
1178        // Insert 10 records with increasing timestamps
1179        for j in 0..10 {
1180            let context = serde_json::json!({
1181                "input": input,
1182                "response": output,
1183            });
1184            let record = EvalRecord {
1185                created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1186                context,
1187                status: Status::Pending,
1188                id: 0, // This will be set by the database
1189                uid: format!("test_{}", j),
1190                entity_uid: uid.clone(),
1191                entity_id: ENTITY_ID,
1192                ..Default::default()
1193            };
1194
1195            let boxed = BoxedEvalRecord::new(record);
1196
1197            let result = PostgresClient::insert_genai_eval_record(&pool, boxed, &entity_id)
1198                .await
1199                .unwrap();
1200
1201            assert_eq!(result.rows_affected(), 1);
1202        }
1203
1204        // ===== PAGE 1: Get first 5 records (newest) =====
1205        let params = EvalRecordPaginationRequest {
1206            status: None,
1207            limit: Some(5),
1208            cursor_created_at: None,
1209            cursor_id: None,
1210            direction: None,
1211            ..Default::default()
1212        };
1213
1214        let page1 = PostgresClient::get_paginated_genai_eval_records(&pool, &params, &entity_id)
1215            .await
1216            .unwrap();
1217
1218        assert_eq!(page1.items.len(), 5, "Page 1 should have 5 records");
1219        assert!(page1.has_next, "Should have next page");
1220        assert!(
1221            !page1.has_previous,
1222            "Should not have previous page (first page)"
1223        );
1224        assert!(page1.next_cursor.is_some(), "Should have next cursor");
1225
1226        // First item should be the NEWEST record (highest ID)
1227        let page1_first = page1.items.first().unwrap();
1228        let page1_last = page1.items.last().unwrap();
1229
1230        assert!(
1231            page1_first.created_at >= page1_last.created_at,
1232            "Page 1 should be sorted newest first (DESC)"
1233        );
1234
1235        // ===== PAGE 2: Get next 5 records (older) =====
1236        let next_cursor = page1.next_cursor.unwrap();
1237
1238        let params = EvalRecordPaginationRequest {
1239            status: None,
1240            limit: Some(5),
1241            cursor_created_at: Some(next_cursor.created_at),
1242            cursor_id: Some(next_cursor.id),
1243            direction: None,
1244            ..Default::default()
1245        };
1246
1247        let page2 = PostgresClient::get_paginated_genai_eval_records(&pool, &params, &entity_id)
1248            .await
1249            .unwrap();
1250
1251        assert_eq!(page2.items.len(), 5, "Page 2 should have 5 records");
1252        assert!(!page2.has_next, "Should not have next page (last page)");
1253        assert!(page2.has_previous, "Should have previous page");
1254        assert!(
1255            page2.previous_cursor.is_some(),
1256            "Should have previous cursor"
1257        );
1258
1259        let page2_first = page2.items.first().unwrap();
1260
1261        // Page 2 first item should be OLDER than Page 1 last item
1262        assert!(
1263            page2_first.created_at < page1_last.created_at
1264                || (page2_first.created_at == page1_last.created_at
1265                    && page2_first.id < page1_last.id),
1266            "Page 2 should start with records older than Page 1 last item"
1267        );
1268
1269        // Verify we got all 10 records across both pages
1270        let all_ids: Vec<i64> = page1
1271            .items
1272            .iter()
1273            .chain(page2.items.iter())
1274            .map(|r| r.id)
1275            .collect();
1276
1277        assert_eq!(all_ids.len(), 10, "Should have 10 unique records total");
1278
1279        // All IDs should be unique
1280        let unique_ids: std::collections::HashSet<_> = all_ids.iter().collect();
1281        assert_eq!(unique_ids.len(), 10, "All IDs should be unique");
1282
1283        // ===== BACKWARD PAGINATION TEST =====
1284        // Go back from page 2 to page 1
1285        let previous_cursor = page2.previous_cursor.unwrap();
1286
1287        let params = EvalRecordPaginationRequest {
1288            status: None,
1289            limit: Some(5),
1290            cursor_created_at: Some(previous_cursor.created_at),
1291            cursor_id: Some(previous_cursor.id),
1292            direction: Some("previous".to_string()),
1293            ..Default::default()
1294        };
1295
1296        let page1_again =
1297            PostgresClient::get_paginated_genai_eval_records(&pool, &params, &entity_id)
1298                .await
1299                .unwrap();
1300
1301        assert_eq!(
1302            page1_again.items.len(),
1303            5,
1304            "Going back should return 5 records"
1305        );
1306
1307        // Should get the same records as page 1
1308        assert_eq!(
1309            page1_again.items.first().unwrap().id,
1310            page1_first.id,
1311            "Should return to the same first record"
1312        );
1313    }
1314
1315    #[tokio::test]
1316    async fn test_postgres_genai_eval_workflow_pagination() {
1317        let pool = db_pool().await;
1318
1319        let (_uid, entity_id) = PostgresClient::create_entity(
1320            &pool,
1321            SPACE,
1322            NAME,
1323            VERSION,
1324            DriftType::GenAI.to_string(),
1325        )
1326        .await
1327        .unwrap();
1328
1329        // Insert 10 records with increasing timestamps
1330        for j in 0..10 {
1331            let record = GenAIEvalWorkflowResult {
1332                created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1333                record_uid: format!("test_{}", j),
1334                entity_id,
1335                ..Default::default()
1336            };
1337
1338            let result =
1339                PostgresClient::insert_genai_eval_workflow_record(&pool, &record, &entity_id)
1340                    .await
1341                    .unwrap();
1342
1343            assert_eq!(result.rows_affected(), 1);
1344        }
1345
1346        // ===== PAGE 1: Get first 5 records (newest) =====
1347        let params = EvalRecordPaginationRequest {
1348            status: None,
1349            limit: Some(5),
1350            cursor_created_at: None,
1351            cursor_id: None,
1352            direction: None,
1353            ..Default::default()
1354        };
1355
1356        let page1 =
1357            PostgresClient::get_paginated_genai_eval_workflow_records(&pool, &params, &entity_id)
1358                .await
1359                .unwrap();
1360
1361        assert_eq!(page1.items.len(), 5, "Page 1 should have 5 records");
1362        assert!(page1.has_next, "Should have next page");
1363        assert!(
1364            !page1.has_previous,
1365            "Should not have previous page (first page)"
1366        );
1367        assert!(page1.next_cursor.is_some(), "Should have next cursor");
1368
1369        // First item should be the NEWEST record (highest ID)
1370        let page1_first = page1.items.first().unwrap();
1371        let page1_last = page1.items.last().unwrap();
1372
1373        assert!(
1374            page1_first.created_at >= page1_last.created_at,
1375            "Page 1 should be sorted newest first (DESC)"
1376        );
1377
1378        // ===== PAGE 2: Get next 5 records (older) =====
1379        let next_cursor = page1.next_cursor.unwrap();
1380
1381        let params = EvalRecordPaginationRequest {
1382            status: None,
1383            limit: Some(5),
1384            cursor_created_at: Some(next_cursor.created_at),
1385            cursor_id: Some(next_cursor.id),
1386            direction: None,
1387            ..Default::default()
1388        };
1389
1390        let page2 =
1391            PostgresClient::get_paginated_genai_eval_workflow_records(&pool, &params, &entity_id)
1392                .await
1393                .unwrap();
1394
1395        assert_eq!(page2.items.len(), 5, "Page 2 should have 5 records");
1396        assert!(!page2.has_next, "Should not have next page (last page)");
1397        assert!(page2.has_previous, "Should have previous page");
1398        assert!(
1399            page2.previous_cursor.is_some(),
1400            "Should have previous cursor"
1401        );
1402
1403        let page2_first = page2.items.first().unwrap();
1404
1405        // Page 2 first item should be OLDER than Page 1 last item
1406        assert!(
1407            page2_first.created_at < page1_last.created_at
1408                || (page2_first.created_at == page1_last.created_at
1409                    && page2_first.id < page1_last.id),
1410            "Page 2 should start with records older than Page 1 last item"
1411        );
1412
1413        // Verify we got all 10 records across both pages
1414        let all_ids: Vec<i64> = page1
1415            .items
1416            .iter()
1417            .chain(page2.items.iter())
1418            .map(|r| r.id)
1419            .collect();
1420
1421        assert_eq!(all_ids.len(), 10, "Should have 10 unique records total");
1422
1423        // All IDs should be unique
1424        let unique_ids: std::collections::HashSet<_> = all_ids.iter().collect();
1425        assert_eq!(unique_ids.len(), 10, "All IDs should be unique");
1426
1427        // ===== BACKWARD PAGINATION TEST =====
1428        // Go back from page 2 to page 1
1429        let previous_cursor = page2.previous_cursor.unwrap();
1430
1431        let params = EvalRecordPaginationRequest {
1432            status: None,
1433            limit: Some(5),
1434            cursor_created_at: Some(previous_cursor.created_at),
1435            cursor_id: Some(previous_cursor.id),
1436            direction: Some("previous".to_string()),
1437            ..Default::default()
1438        };
1439
1440        let page1_again =
1441            PostgresClient::get_paginated_genai_eval_workflow_records(&pool, &params, &entity_id)
1442                .await
1443                .unwrap();
1444
1445        assert_eq!(
1446            page1_again.items.len(),
1447            5,
1448            "Going back should return 5 records"
1449        );
1450
1451        // Should get the same records as page 1
1452        assert_eq!(
1453            page1_again.items.first().unwrap().id,
1454            page1_first.id,
1455            "Should return to the same first record"
1456        );
1457    }
1458
1459    #[tokio::test]
1460    async fn test_postgres_genai_task_result_insert_get() {
1461        let pool = db_pool().await;
1462
1463        let timestamp = Utc::now();
1464
1465        let (uid, entity_id) = PostgresClient::create_entity(
1466            &pool,
1467            SPACE,
1468            NAME,
1469            VERSION,
1470            DriftType::GenAI.to_string(),
1471        )
1472        .await
1473        .unwrap();
1474
1475        let mut records = Vec::new();
1476        for i in 0..2 {
1477            for j in 0..25 {
1478                let record = EvalTaskResult {
1479                    record_uid: format!("record_uid_{i}_{j}"),
1480                    created_at: Utc::now() + chrono::Duration::microseconds(j as i64),
1481                    start_time: Utc::now(),
1482                    end_time: Utc::now() + chrono::Duration::seconds(1),
1483                    entity_id,
1484                    task_id: format!("task{i}"),
1485                    task_type: scouter_types::genai::EvaluationTaskType::Assertion,
1486                    passed: true,
1487                    value: j as f64,
1488                    assertion: Assertion::FieldPath(Some(format!("field.path.{i}"))),
1489                    operator: scouter_types::genai::ComparisonOperator::Contains,
1490                    expected: Value::Null,
1491                    actual: Value::Null,
1492                    message: "All good".to_string(),
1493                    entity_uid: uid.clone(),
1494                    condition: false,
1495                    stage: 0_i32,
1496                };
1497                records.push(record);
1498            }
1499            let result =
1500                PostgresClient::insert_eval_task_results_batch(&pool, &records, &entity_id)
1501                    .await
1502                    .unwrap();
1503            assert_eq!(result.rows_affected(), 25);
1504        }
1505
1506        let metrics = PostgresClient::get_genai_task_values(
1507            &pool,
1508            &timestamp,
1509            &["task1".to_string()],
1510            &entity_id,
1511        )
1512        .await
1513        .unwrap();
1514
1515        assert_eq!(metrics.len(), 1);
1516        let binned_records = PostgresClient::get_binned_genai_task_values(
1517            &pool,
1518            &DriftRequest {
1519                uid: uid.clone(),
1520                time_interval: TimeInterval::OneHour,
1521                max_data_points: 1000,
1522                ..Default::default()
1523            },
1524            &DatabaseSettings::default().retention_period,
1525            &ObjectStorageSettings::default(),
1526            &entity_id,
1527        )
1528        .await
1529        .unwrap();
1530        //
1531        assert_eq!(binned_records.metrics.len(), 2);
1532
1533        let eval_task = PostgresClient::get_genai_eval_task(&pool, &records[0].record_uid)
1534            .await
1535            .unwrap();
1536
1537        assert_eq!(eval_task[0].record_uid, records[0].record_uid);
1538    }
1539
1540    #[tokio::test]
1541    async fn test_postgres_genai_workflow_result_insert_get() {
1542        let pool = db_pool().await;
1543
1544        let timestamp = Utc::now();
1545
1546        let (uid, entity_id) = PostgresClient::create_entity(
1547            &pool,
1548            SPACE,
1549            NAME,
1550            VERSION,
1551            DriftType::GenAI.to_string(),
1552        )
1553        .await
1554        .unwrap();
1555
1556        for i in 0..2 {
1557            for j in 0..25 {
1558                let record = GenAIEvalWorkflowResult {
1559                    record_uid: format!("record_uid_{i}_{j}"),
1560                    created_at: Utc::now() + chrono::Duration::hours(i),
1561                    entity_id,
1562                    total_tasks: 10,
1563                    passed_tasks: 8,
1564                    failed_tasks: 2,
1565                    pass_rate: 0.8,
1566                    duration_ms: 1500,
1567                    entity_uid: uid.clone(),
1568                    execution_plan: ExecutionPlan::default(),
1569                    id: 0,
1570                };
1571                let result =
1572                    PostgresClient::insert_genai_eval_workflow_record(&pool, &record, &entity_id)
1573                        .await
1574                        .unwrap();
1575                assert_eq!(result.rows_affected(), 1);
1576            }
1577        }
1578
1579        let metric = PostgresClient::get_genai_workflow_value(&pool, &timestamp, &entity_id)
1580            .await
1581            .unwrap();
1582
1583        assert!(metric.is_some());
1584
1585        let binned_records = PostgresClient::get_binned_genai_workflow_values(
1586            &pool,
1587            &DriftRequest {
1588                uid: uid.clone(),
1589                time_interval: TimeInterval::OneHour,
1590                max_data_points: 1000,
1591                ..Default::default()
1592            },
1593            &DatabaseSettings::default().retention_period,
1594            &ObjectStorageSettings::default(),
1595            &entity_id,
1596        )
1597        .await
1598        .unwrap();
1599        //
1600        assert_eq!(binned_records.metrics.len(), 1);
1601    }
1602
1603    #[tokio::test]
1604    async fn test_postgres_tags() {
1605        let pool = db_pool().await;
1606        let uid = create_uuid7();
1607
1608        let tag1 = TagRecord {
1609            entity_id: uid.clone(),
1610            entity_type: "service".to_string(),
1611            key: "env".to_string(),
1612            value: "production".to_string(),
1613        };
1614
1615        let tag2 = TagRecord {
1616            entity_id: uid.clone(),
1617            entity_type: "service".to_string(),
1618            key: "team".to_string(),
1619            value: "backend".to_string(),
1620        };
1621
1622        let result = PostgresClient::insert_tag_batch(&pool, &[tag1.clone(), tag2.clone()])
1623            .await
1624            .unwrap();
1625
1626        assert_eq!(result.rows_affected(), 2);
1627
1628        let tags = PostgresClient::get_tags(&pool, "service", &uid)
1629            .await
1630            .unwrap();
1631
1632        assert_eq!(tags.len(), 2);
1633
1634        let tag_filter = vec![Tag {
1635            key: tags.first().unwrap().key.clone(),
1636            value: tags.first().unwrap().value.clone(),
1637        }];
1638
1639        let entity_id = PostgresClient::get_entity_id_by_tags(&pool, "service", &tag_filter, false)
1640            .await
1641            .unwrap();
1642
1643        assert_eq!(entity_id.first().unwrap(), &uid);
1644    }
1645}