llm_registry_db/
postgres.rs

1//! PostgreSQL implementation of AssetRepository
2//!
3//! This module provides a concrete implementation of the AssetRepository trait
4//! using PostgreSQL with SQLx for compile-time verified queries.
5
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use llm_registry_core::{
9    Asset, AssetId, AssetMetadata, AssetStatus, AssetType, Checksum, HashAlgorithm, Provenance,
10    StorageBackend, StorageLocation,
11};
12use semver::Version;
13use serde_json::Value as JsonValue;
14use sqlx::postgres::PgRow;
15use sqlx::{PgPool, Row};
16use std::collections::HashMap;
17use std::str::FromStr;
18use tracing::{debug, instrument};
19
20use crate::error::{DbError, DbResult};
21use crate::repository::{AssetRepository, SearchQuery, SearchResults, SortField, SortOrder};
22
23/// PostgreSQL implementation of AssetRepository
24#[derive(Debug, Clone)]
25pub struct PostgresAssetRepository {
26    pool: PgPool,
27}
28
29impl PostgresAssetRepository {
30    /// Create a new PostgreSQL asset repository
31    pub fn new(pool: PgPool) -> Self {
32        Self { pool }
33    }
34
35    /// Get a reference to the connection pool
36    pub fn pool(&self) -> &PgPool {
37        &self.pool
38    }
39}
40
41#[async_trait]
42impl AssetRepository for PostgresAssetRepository {
43    #[instrument(skip(self, asset), fields(asset_id = %asset.id, asset_name = %asset.metadata.name))]
44    async fn create(&self, asset: Asset) -> DbResult<Asset> {
45        debug!("Creating asset in database");
46
47        // Start a transaction
48        let mut tx = self.pool.begin().await?;
49
50        // Insert main asset record
51        sqlx::query(
52            r#"
53            INSERT INTO assets (
54                id, name, version, asset_type, status,
55                storage_backend, storage_uri, storage_path, size_bytes,
56                checksum_algorithm, checksum_value,
57                signature_algorithm, signature_value, signature_key_id,
58                description, license, content_type,
59                author, source_repo, commit_hash, build_id,
60                created_at, updated_at, deprecated_at, metadata
61            ) VALUES (
62                $1, $2, $3, $4, $5,
63                $6, $7, $8, $9,
64                $10, $11,
65                $12, $13, $14,
66                $15, $16, $17,
67                $18, $19, $20, $21,
68                $22, $23, $24, $25
69            )
70            "#,
71        )
72        .bind(&asset.id.to_string())
73        .bind(&asset.metadata.name)
74        .bind(&asset.metadata.version.to_string())
75        .bind(&asset.asset_type.to_string())
76        .bind(&asset.status.to_string())
77        .bind(&asset.storage.backend.to_string())
78        .bind(asset.storage.uri.as_ref().unwrap_or(&asset.storage.get_uri()))
79        .bind(if asset.storage.path.is_empty() { None } else { Some(&asset.storage.path) })
80        .bind(asset.metadata.size_bytes.map(|s| s as i64))
81        .bind(&asset.checksum.algorithm.to_string())
82        .bind(&asset.checksum.value)
83        .bind(None::<&str>)
84        .bind(None::<&str>)
85        .bind(None::<&str>)
86        .bind(&asset.metadata.description)
87        .bind(&asset.metadata.license)
88        .bind(&asset.metadata.content_type)
89        .bind(asset.provenance.as_ref().and_then(|p| p.author.as_deref()))
90        .bind(asset.provenance.as_ref().and_then(|p| p.source_repo.as_deref()))
91        .bind(asset.provenance.as_ref().and_then(|p| p.commit_hash.as_deref()))
92        .bind(asset.provenance.as_ref().and_then(|p| p.build_id.as_deref()))
93        .bind(&asset.created_at)
94        .bind(&asset.updated_at)
95        .bind(&asset.deprecated_at)
96        .bind(serde_json::to_value(&asset.metadata.annotations)?)
97        .execute(&mut *tx)
98        .await?;
99
100        // Insert tags
101        for tag in &asset.metadata.tags {
102            sqlx::query(
103                r#"
104                INSERT INTO asset_tags (asset_id, tag)
105                VALUES ($1, $2)
106                ON CONFLICT (asset_id, tag) DO NOTHING
107                "#,
108            )
109            .bind(&asset.id.to_string())
110            .bind(tag)
111            .execute(&mut *tx)
112            .await?;
113        }
114
115        // Insert dependencies
116        for dep in &asset.dependencies {
117            let dep_id = dep.as_id().ok_or_else(|| {
118                DbError::InvalidData("Dependency must be resolved to ID before persisting".to_string())
119            })?;
120
121            sqlx::query(
122                r#"
123                INSERT INTO asset_dependencies (asset_id, dependency_id, version_constraint)
124                VALUES ($1, $2, $3)
125                ON CONFLICT (asset_id, dependency_id) DO NOTHING
126                "#,
127            )
128            .bind(&asset.id.to_string())
129            .bind(&dep_id.to_string())
130            .bind(dep.as_name_version().map(|(_, v)| v))
131            .execute(&mut *tx)
132            .await?;
133        }
134
135        // Commit transaction
136        tx.commit().await?;
137
138        debug!("Asset created successfully");
139        Ok(asset)
140    }
141
142    #[instrument(skip(self), fields(asset_id = %id))]
143    async fn find_by_id(&self, id: &AssetId) -> DbResult<Option<Asset>> {
144        debug!("Finding asset by ID");
145
146        let row = sqlx::query(
147            r#"
148            SELECT
149                id, name, version, asset_type, status,
150                storage_backend, storage_uri, storage_path, size_bytes,
151                checksum_algorithm, checksum_value,
152                signature_algorithm, signature_value, signature_key_id,
153                description, license, content_type,
154                author, source_repo, commit_hash, build_id,
155                created_at, updated_at, deprecated_at, metadata
156            FROM assets
157            WHERE id = $1
158            "#,
159        )
160        .bind(&id.to_string())
161        .fetch_optional(&self.pool)
162        .await?;
163
164        match row {
165            Some(row) => {
166                let asset = row_to_asset(row)?;
167                let asset = self.load_asset_relations(asset).await?;
168                Ok(Some(asset))
169            }
170            None => Ok(None),
171        }
172    }
173
174    #[instrument(skip(self))]
175    async fn find_by_name_and_version(
176        &self,
177        name: &str,
178        version: &Version,
179    ) -> DbResult<Option<Asset>> {
180        debug!("Finding asset by name and version");
181
182        let row = sqlx::query(
183            r#"
184            SELECT
185                id, name, version, asset_type, status,
186                storage_backend, storage_uri, storage_path, size_bytes,
187                checksum_algorithm, checksum_value,
188                signature_algorithm, signature_value, signature_key_id,
189                description, license, content_type,
190                author, source_repo, commit_hash, build_id,
191                created_at, updated_at, deprecated_at, metadata
192            FROM assets
193            WHERE name = $1 AND version = $2
194            "#,
195        )
196        .bind(name)
197        .bind(&version.to_string())
198        .fetch_optional(&self.pool)
199        .await?;
200
201        match row {
202            Some(row) => {
203                let asset = row_to_asset(row)?;
204                let asset = self.load_asset_relations(asset).await?;
205                Ok(Some(asset))
206            }
207            None => Ok(None),
208        }
209    }
210
211    #[instrument(skip(self, ids))]
212    async fn find_by_ids(&self, ids: &[AssetId]) -> DbResult<Vec<Asset>> {
213        if ids.is_empty() {
214            return Ok(Vec::new());
215        }
216
217        debug!("Finding {} assets by IDs", ids.len());
218
219        let id_strings: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
220
221        let rows = sqlx::query(
222            r#"
223            SELECT
224                id, name, version, asset_type, status,
225                storage_backend, storage_uri, storage_path, size_bytes,
226                checksum_algorithm, checksum_value,
227                signature_algorithm, signature_value, signature_key_id,
228                description, license, content_type,
229                author, source_repo, commit_hash, build_id,
230                created_at, updated_at, deprecated_at, metadata
231            FROM assets
232            WHERE id = ANY($1)
233            "#,
234        )
235        .bind(&id_strings)
236        .fetch_all(&self.pool)
237        .await?;
238
239        let mut assets = Vec::new();
240        for row in rows {
241            let asset = row_to_asset(row)?;
242            let asset = self.load_asset_relations(asset).await?;
243            assets.push(asset);
244        }
245
246        Ok(assets)
247    }
248
249    #[instrument(skip(self, query))]
250    async fn search(&self, query: &SearchQuery) -> DbResult<SearchResults> {
251        debug!("Searching assets with filters");
252
253        // Build dynamic query
254        let mut sql = String::from(
255            r#"
256            SELECT
257                a.id, a.name, a.version, a.asset_type, a.status,
258                a.storage_backend, a.storage_uri, a.storage_path, a.size_bytes,
259                a.checksum_algorithm, a.checksum_value,
260                a.signature_algorithm, a.signature_value, a.signature_key_id,
261                a.description, a.license, a.content_type,
262                a.author, a.source_repo, a.commit_hash, a.build_id,
263                a.created_at, a.updated_at, a.deprecated_at, a.metadata
264            FROM assets a
265            WHERE 1=1
266            "#,
267        );
268
269        let mut conditions = Vec::new();
270        let mut bind_values: Vec<String> = Vec::new();
271        let mut param_num = 1;
272
273        // Text search
274        if let Some(ref text) = query.text {
275            conditions.push(format!(
276                "(a.name ILIKE ${} OR a.description ILIKE ${})",
277                param_num, param_num + 1
278            ));
279            bind_values.push(format!("%{}%", text));
280            bind_values.push(format!("%{}%", text));
281            param_num += 2;
282        }
283
284        // Asset type filter
285        if !query.asset_types.is_empty() {
286            conditions.push(format!("a.asset_type = ANY(${})", param_num));
287            // This is a placeholder - we'll use a different approach for ANY
288            param_num += 1;
289        }
290
291        // Author filter
292        if let Some(ref author) = query.author {
293            conditions.push(format!("a.author = ${}", param_num));
294            bind_values.push(author.clone());
295            param_num += 1;
296        }
297
298        // Storage backend filter
299        if let Some(ref backend) = query.storage_backend {
300            conditions.push(format!("a.storage_backend = ${}", param_num));
301            bind_values.push(backend.clone());
302            param_num += 1;
303        }
304
305        // Deprecated filter
306        if query.exclude_deprecated {
307            conditions.push("a.deprecated_at IS NULL".to_string());
308        }
309
310        // Tag filter - must have all specified tags
311        if !query.tags.is_empty() {
312            let tag_condition = format!(
313                "a.id IN (
314                    SELECT asset_id FROM asset_tags
315                    WHERE tag = ANY(${}::text[])
316                    GROUP BY asset_id
317                    HAVING COUNT(DISTINCT tag) = {}
318                )",
319                param_num,
320                query.tags.len()
321            );
322            conditions.push(tag_condition);
323            #[allow(unused_assignments)]
324            {
325                param_num += 1;
326            }
327        }
328
329        // Add conditions to query
330        if !conditions.is_empty() {
331            sql.push_str(" AND ");
332            sql.push_str(&conditions.join(" AND "));
333        }
334
335        // Add ORDER BY
336        let sort_field = match query.sort_by {
337            SortField::CreatedAt => "a.created_at",
338            SortField::UpdatedAt => "a.updated_at",
339            SortField::Name => "a.name",
340            SortField::Version => "a.version",
341            SortField::SizeBytes => "a.size_bytes",
342        };
343
344        let sort_order = match query.sort_order {
345            SortOrder::Ascending => "ASC",
346            SortOrder::Descending => "DESC",
347        };
348
349        sql.push_str(&format!(" ORDER BY {} {}", sort_field, sort_order));
350
351        // Add LIMIT and OFFSET
352        sql.push_str(&format!(" LIMIT {} OFFSET {}", query.limit, query.offset));
353
354        // For simplicity, we'll use a simpler approach - rebuild with sqlx query builder
355        // In production, you'd want to use a query builder or macro for this
356        let mut final_query = sqlx::query(&sql);
357
358        // Bind parameters in order
359        for value in &bind_values {
360            final_query = final_query.bind(value);
361        }
362
363        let types: Vec<String> = query.asset_types.iter().map(|t| t.to_string()).collect();
364        if !query.asset_types.is_empty() {
365            final_query = final_query.bind(&types);
366        }
367
368        if !query.tags.is_empty() {
369            final_query = final_query.bind(&query.tags);
370        }
371
372        let rows = final_query.fetch_all(&self.pool).await?;
373
374        let mut assets = Vec::new();
375        for row in rows {
376            let asset = row_to_asset(row)?;
377            let asset = self.load_asset_relations(asset).await?;
378            assets.push(asset);
379        }
380
381        // Get total count (without pagination)
382        let total = self.count_search_results(query).await?;
383
384        Ok(SearchResults {
385            assets,
386            total,
387            offset: query.offset,
388            limit: query.limit,
389        })
390    }
391
392    #[instrument(skip(self, asset), fields(asset_id = %asset.id))]
393    async fn update(&self, asset: Asset) -> DbResult<Asset> {
394        debug!("Updating asset");
395
396        let mut tx = self.pool.begin().await?;
397
398        let result = sqlx::query(
399            r#"
400            UPDATE assets SET
401                name = $2,
402                version = $3,
403                asset_type = $4,
404                status = $5,
405                storage_backend = $6,
406                storage_uri = $7,
407                storage_path = $8,
408                size_bytes = $9,
409                checksum_algorithm = $10,
410                checksum_value = $11,
411                signature_algorithm = $12,
412                signature_value = $13,
413                signature_key_id = $14,
414                description = $15,
415                license = $16,
416                content_type = $17,
417                author = $18,
418                source_repo = $19,
419                commit_hash = $20,
420                build_id = $21,
421                deprecated_at = $22,
422                metadata = $23,
423                updated_at = $24
424            WHERE id = $1
425            "#,
426        )
427        .bind(&asset.id.to_string())
428        .bind(&asset.metadata.name)
429        .bind(&asset.metadata.version.to_string())
430        .bind(&asset.asset_type.to_string())
431        .bind(&asset.status.to_string())
432        .bind(&asset.storage.backend.to_string())
433        .bind(asset.storage.uri.as_ref().unwrap_or(&asset.storage.get_uri()))
434        .bind(if asset.storage.path.is_empty() { None } else { Some(&asset.storage.path) })
435        .bind(asset.metadata.size_bytes.map(|s| s as i64))
436        .bind(&asset.checksum.algorithm.to_string())
437        .bind(&asset.checksum.value)
438        .bind(None::<&str>)
439        .bind(None::<&str>)
440        .bind(None::<&str>)
441        .bind(&asset.metadata.description)
442        .bind(&asset.metadata.license)
443        .bind(&asset.metadata.content_type)
444        .bind(asset.provenance.as_ref().and_then(|p| p.author.as_deref()))
445        .bind(asset.provenance.as_ref().and_then(|p| p.source_repo.as_deref()))
446        .bind(asset.provenance.as_ref().and_then(|p| p.commit_hash.as_deref()))
447        .bind(asset.provenance.as_ref().and_then(|p| p.build_id.as_deref()))
448        .bind(&asset.deprecated_at)
449        .bind(serde_json::to_value(&asset.metadata.annotations)?)
450        .bind(Utc::now())
451        .execute(&mut *tx)
452        .await?;
453
454        if result.rows_affected() == 0 {
455            return Err(DbError::NotFound(format!("Asset {} not found", asset.id)));
456        }
457
458        // Update tags - delete and re-insert for simplicity
459        sqlx::query("DELETE FROM asset_tags WHERE asset_id = $1")
460            .bind(&asset.id.to_string())
461            .execute(&mut *tx)
462            .await?;
463
464        for tag in &asset.metadata.tags {
465            sqlx::query(
466                r#"
467                INSERT INTO asset_tags (asset_id, tag)
468                VALUES ($1, $2)
469                "#,
470            )
471            .bind(&asset.id.to_string())
472            .bind(tag)
473            .execute(&mut *tx)
474            .await?;
475        }
476
477        tx.commit().await?;
478
479        debug!("Asset updated successfully");
480        Ok(asset)
481    }
482
483    #[instrument(skip(self), fields(asset_id = %id))]
484    async fn delete(&self, id: &AssetId) -> DbResult<()> {
485        debug!("Deleting asset");
486
487        let result = sqlx::query("DELETE FROM assets WHERE id = $1")
488            .bind(&id.to_string())
489            .execute(&self.pool)
490            .await?;
491
492        if result.rows_affected() == 0 {
493            return Err(DbError::NotFound(format!("Asset {} not found", id)));
494        }
495
496        debug!("Asset deleted successfully");
497        Ok(())
498    }
499
500    #[instrument(skip(self))]
501    async fn list_versions(&self, name: &str) -> DbResult<Vec<Asset>> {
502        debug!("Listing versions for asset");
503
504        let rows = sqlx::query(
505            r#"
506            SELECT
507                id, name, version, asset_type, status,
508                storage_backend, storage_uri, storage_path, size_bytes,
509                checksum_algorithm, checksum_value,
510                signature_algorithm, signature_value, signature_key_id,
511                description, license, content_type,
512                author, source_repo, commit_hash, build_id,
513                created_at, updated_at, deprecated_at, metadata
514            FROM assets
515            WHERE name = $1
516            ORDER BY created_at DESC
517            "#,
518        )
519        .bind(name)
520        .fetch_all(&self.pool)
521        .await?;
522
523        let mut assets = Vec::new();
524        for row in rows {
525            let asset = row_to_asset(row)?;
526            let asset = self.load_asset_relations(asset).await?;
527            assets.push(asset);
528        }
529
530        Ok(assets)
531    }
532
533    #[instrument(skip(self), fields(asset_id = %id))]
534    async fn list_dependencies(&self, id: &AssetId) -> DbResult<Vec<Asset>> {
535        debug!("Listing dependencies");
536
537        let rows = sqlx::query(
538            r#"
539            SELECT
540                a.id, a.name, a.version, a.asset_type, a.status,
541                a.storage_backend, a.storage_uri, a.storage_path, a.size_bytes,
542                a.checksum_algorithm, a.checksum_value,
543                a.signature_algorithm, a.signature_value, a.signature_key_id,
544                a.description, a.license, a.content_type,
545                a.author, a.source_repo, a.commit_hash, a.build_id,
546                a.created_at, a.updated_at, a.deprecated_at, a.metadata
547            FROM assets a
548            INNER JOIN asset_dependencies d ON a.id = d.dependency_id
549            WHERE d.asset_id = $1
550            "#,
551        )
552        .bind(&id.to_string())
553        .fetch_all(&self.pool)
554        .await?;
555
556        let mut assets = Vec::new();
557        for row in rows {
558            let asset = row_to_asset(row)?;
559            let asset = self.load_asset_relations(asset).await?;
560            assets.push(asset);
561        }
562
563        Ok(assets)
564    }
565
566    #[instrument(skip(self), fields(asset_id = %id))]
567    async fn list_reverse_dependencies(&self, id: &AssetId) -> DbResult<Vec<Asset>> {
568        debug!("Listing reverse dependencies");
569
570        let rows = sqlx::query(
571            r#"
572            SELECT
573                a.id, a.name, a.version, a.asset_type, a.status,
574                a.storage_backend, a.storage_uri, a.storage_path, a.size_bytes,
575                a.checksum_algorithm, a.checksum_value,
576                a.signature_algorithm, a.signature_value, a.signature_key_id,
577                a.description, a.license, a.content_type,
578                a.author, a.source_repo, a.commit_hash, a.build_id,
579                a.created_at, a.updated_at, a.deprecated_at, a.metadata
580            FROM assets a
581            INNER JOIN asset_dependencies d ON a.id = d.asset_id
582            WHERE d.dependency_id = $1
583            "#,
584        )
585        .bind(&id.to_string())
586        .fetch_all(&self.pool)
587        .await?;
588
589        let mut assets = Vec::new();
590        for row in rows {
591            let asset = row_to_asset(row)?;
592            let asset = self.load_asset_relations(asset).await?;
593            assets.push(asset);
594        }
595
596        Ok(assets)
597    }
598
599    #[instrument(skip(self), fields(asset_id = %id, tag = %tag))]
600    async fn add_tag(&self, id: &AssetId, tag: &str) -> DbResult<()> {
601        debug!("Adding tag to asset");
602
603        sqlx::query(
604            r#"
605            INSERT INTO asset_tags (asset_id, tag)
606            VALUES ($1, $2)
607            ON CONFLICT (asset_id, tag) DO NOTHING
608            "#,
609        )
610        .bind(&id.to_string())
611        .bind(tag)
612        .execute(&self.pool)
613        .await?;
614
615        Ok(())
616    }
617
618    #[instrument(skip(self), fields(asset_id = %id, tag = %tag))]
619    async fn remove_tag(&self, id: &AssetId, tag: &str) -> DbResult<()> {
620        debug!("Removing tag from asset");
621
622        sqlx::query("DELETE FROM asset_tags WHERE asset_id = $1 AND tag = $2")
623            .bind(&id.to_string())
624            .bind(tag)
625            .execute(&self.pool)
626            .await?;
627
628        Ok(())
629    }
630
631    #[instrument(skip(self), fields(asset_id = %id))]
632    async fn get_tags(&self, id: &AssetId) -> DbResult<Vec<String>> {
633        debug!("Getting tags for asset");
634
635        let rows = sqlx::query("SELECT tag FROM asset_tags WHERE asset_id = $1 ORDER BY tag")
636            .bind(&id.to_string())
637            .fetch_all(&self.pool)
638            .await?;
639
640        let tags = rows
641            .iter()
642            .map(|row| row.get::<String, _>("tag"))
643            .collect();
644
645        Ok(tags)
646    }
647
648    #[instrument(skip(self))]
649    async fn list_all_tags(&self) -> DbResult<Vec<String>> {
650        debug!("Listing all tags");
651
652        let rows = sqlx::query("SELECT DISTINCT tag FROM asset_tags ORDER BY tag")
653            .fetch_all(&self.pool)
654            .await?;
655
656        let tags = rows
657            .iter()
658            .map(|row| row.get::<String, _>("tag"))
659            .collect();
660
661        Ok(tags)
662    }
663
664    #[instrument(skip(self))]
665    async fn add_dependency(
666        &self,
667        asset_id: &AssetId,
668        dependency_id: &AssetId,
669        version_constraint: Option<&str>,
670    ) -> DbResult<()> {
671        debug!("Adding dependency relationship");
672
673        // Check for circular dependency
674        if self.would_create_cycle(asset_id, dependency_id).await? {
675            return Err(DbError::CircularDependency(format!(
676                "Adding dependency from {} to {} would create a cycle",
677                asset_id, dependency_id
678            )));
679        }
680
681        sqlx::query(
682            r#"
683            INSERT INTO asset_dependencies (asset_id, dependency_id, version_constraint)
684            VALUES ($1, $2, $3)
685            ON CONFLICT (asset_id, dependency_id) DO UPDATE
686            SET version_constraint = EXCLUDED.version_constraint
687            "#,
688        )
689        .bind(&asset_id.to_string())
690        .bind(&dependency_id.to_string())
691        .bind(version_constraint)
692        .execute(&self.pool)
693        .await?;
694
695        Ok(())
696    }
697
698    #[instrument(skip(self))]
699    async fn remove_dependency(
700        &self,
701        asset_id: &AssetId,
702        dependency_id: &AssetId,
703    ) -> DbResult<()> {
704        debug!("Removing dependency relationship");
705
706        sqlx::query("DELETE FROM asset_dependencies WHERE asset_id = $1 AND dependency_id = $2")
707            .bind(&asset_id.to_string())
708            .bind(&dependency_id.to_string())
709            .execute(&self.pool)
710            .await?;
711
712        Ok(())
713    }
714
715    #[instrument(skip(self))]
716    async fn count_assets(&self) -> DbResult<i64> {
717        let row = sqlx::query("SELECT COUNT(*) as count FROM assets")
718            .fetch_one(&self.pool)
719            .await?;
720
721        Ok(row.get("count"))
722    }
723
724    #[instrument(skip(self))]
725    async fn count_by_type(&self, asset_type: &AssetType) -> DbResult<i64> {
726        let row = sqlx::query("SELECT COUNT(*) as count FROM assets WHERE asset_type = $1")
727            .bind(&asset_type.to_string())
728            .fetch_one(&self.pool)
729            .await?;
730
731        Ok(row.get("count"))
732    }
733
734    #[instrument(skip(self))]
735    async fn health_check(&self) -> DbResult<()> {
736        sqlx::query("SELECT 1")
737            .execute(&self.pool)
738            .await
739            .map(|_| ())
740            .map_err(Into::into)
741    }
742}
743
744impl PostgresAssetRepository {
745    /// Load tags and dependencies for an asset
746    async fn load_asset_relations(&self, mut asset: Asset) -> DbResult<Asset> {
747        // Load tags
748        let tags = self.get_tags(&asset.id).await?;
749        asset.metadata.tags = tags;
750
751        // Load dependency references
752        let dep_rows = sqlx::query(
753            "SELECT dependency_id FROM asset_dependencies WHERE asset_id = $1"
754        )
755        .bind(&asset.id.to_string())
756        .fetch_all(&self.pool)
757        .await?;
758
759        asset.dependencies = dep_rows
760            .iter()
761            .filter_map(|row| {
762                let dep_id_str: String = row.get("dependency_id");
763                AssetId::from_str(&dep_id_str)
764                    .ok()
765                    .map(|id| llm_registry_core::AssetReference::by_id(id))
766            })
767            .collect();
768
769        Ok(asset)
770    }
771
772    /// Check if adding a dependency would create a cycle
773    async fn would_create_cycle(&self, from_id: &AssetId, to_id: &AssetId) -> DbResult<bool> {
774        // Use recursive CTE to check for cycles
775        let row = sqlx::query(
776            r#"
777            WITH RECURSIVE dep_tree AS (
778                SELECT dependency_id, 1 as depth
779                FROM asset_dependencies
780                WHERE asset_id = $1
781
782                UNION ALL
783
784                SELECT d.dependency_id, dt.depth + 1
785                FROM asset_dependencies d
786                INNER JOIN dep_tree dt ON d.asset_id = dt.dependency_id
787                WHERE dt.depth < 100
788            )
789            SELECT COUNT(*) > 0 as has_cycle
790            FROM dep_tree
791            WHERE dependency_id = $2
792            "#,
793        )
794        .bind(&to_id.to_string())
795        .bind(&from_id.to_string())
796        .fetch_one(&self.pool)
797        .await?;
798
799        Ok(row.get("has_cycle"))
800    }
801
802    /// Count search results without pagination
803    async fn count_search_results(&self, query: &SearchQuery) -> DbResult<i64> {
804        // Simplified count query - in production, this should mirror the search logic
805        let mut sql = String::from("SELECT COUNT(*) as count FROM assets a WHERE 1=1");
806
807        if query.exclude_deprecated {
808            sql.push_str(" AND a.deprecated_at IS NULL");
809        }
810
811        if !query.asset_types.is_empty() {
812            let types: Vec<String> = query.asset_types.iter().map(|t| t.to_string()).collect();
813            let placeholders: Vec<String> = types.iter().map(|t| format!("'{}'", t)).collect();
814            sql.push_str(&format!(" AND a.asset_type IN ({})", placeholders.join(", ")));
815        }
816
817        let row = sqlx::query(&sql).fetch_one(&self.pool).await?;
818
819        Ok(row.get("count"))
820    }
821}
822
823/// Convert a database row to an Asset
824fn row_to_asset(row: PgRow) -> DbResult<Asset> {
825    let id_str: String = row.get("id");
826    let id = AssetId::from_str(&id_str)
827        .map_err(|e| DbError::InvalidData(format!("Invalid asset ID: {}", e)))?;
828
829    let version_str: String = row.get("version");
830    let version = Version::parse(&version_str)
831        .map_err(|e| DbError::InvalidData(format!("Invalid version: {}", e)))?;
832
833    let asset_type_str: String = row.get("asset_type");
834    let asset_type = parse_asset_type(&asset_type_str)?;
835
836    let status_str: String = row.get("status");
837    let status = parse_asset_status(&status_str)?;
838
839    let backend_str: String = row.get("storage_backend");
840    let backend = parse_storage_backend_from_db(&backend_str)?;
841
842    let storage_uri: String = row.get("storage_uri");
843    let storage_path: Option<String> = row.get("storage_path");
844
845    let checksum_algo_str: String = row.get("checksum_algorithm");
846    let checksum_algorithm = parse_hash_algorithm(&checksum_algo_str)?;
847    let checksum_value: String = row.get("checksum_value");
848
849    let metadata_json: JsonValue = row.get("metadata");
850    let annotations: HashMap<String, String> = serde_json::from_value(metadata_json)
851        .unwrap_or_default();
852
853    let created_at: DateTime<Utc> = row.get("created_at");
854    let updated_at: DateTime<Utc> = row.get("updated_at");
855    let deprecated_at: Option<DateTime<Utc>> = row.get("deprecated_at");
856
857    let size_bytes: Option<i64> = row.get("size_bytes");
858
859    let provenance = {
860        let author: Option<String> = row.get("author");
861        let source_repo: Option<String> = row.get("source_repo");
862        let commit_hash: Option<String> = row.get("commit_hash");
863        let build_id: Option<String> = row.get("build_id");
864
865        if author.is_some() || source_repo.is_some() {
866            Some(Provenance {
867                author,
868                source_repo,
869                commit_hash,
870                build_id,
871                created_at: Utc::now(),
872                build_metadata: HashMap::new(),
873            })
874        } else {
875            None
876        }
877    };
878
879    let metadata = AssetMetadata {
880        name: row.get("name"),
881        version,
882        description: row.get("description"),
883        license: row.get("license"),
884        tags: Vec::new(), // Loaded separately
885        annotations,
886        size_bytes: size_bytes.map(|s| s as u64),
887        content_type: row.get("content_type"),
888    };
889
890    let storage = StorageLocation {
891        backend,
892        path: storage_path.unwrap_or_default(),
893        uri: Some(storage_uri),
894    };
895
896    let checksum = Checksum {
897        algorithm: checksum_algorithm,
898        value: checksum_value,
899    };
900
901    Ok(Asset {
902        id,
903        asset_type,
904        metadata,
905        status,
906        storage,
907        checksum,
908        provenance,
909        dependencies: Vec::new(), // Loaded separately
910        created_at,
911        updated_at,
912        deprecated_at,
913    })
914}
915
916fn parse_asset_type(s: &str) -> DbResult<AssetType> {
917    match s {
918        "model" => Ok(AssetType::Model),
919        "pipeline" => Ok(AssetType::Pipeline),
920        "test_suite" => Ok(AssetType::TestSuite),
921        "policy" => Ok(AssetType::Policy),
922        "dataset" => Ok(AssetType::Dataset),
923        other => AssetType::custom(other)
924            .map_err(|e| DbError::InvalidData(format!("Invalid asset type: {}", e))),
925    }
926}
927
928fn parse_asset_status(s: &str) -> DbResult<AssetStatus> {
929    AssetStatus::from_str(s)
930        .map_err(|e| DbError::InvalidData(format!("Invalid asset status: {}", e)))
931}
932
933fn parse_storage_backend_from_db(s: &str) -> DbResult<StorageBackend> {
934    // For simplicity in the database, we'll store just the backend type as a string
935    // and reconstruct minimal backend config. In a real system, you'd store full JSON.
936    match s {
937        "S3" => Ok(StorageBackend::S3 {
938            bucket: String::new(),
939            region: String::new(),
940            endpoint: None,
941        }),
942        "GCS" => Ok(StorageBackend::GCS {
943            bucket: String::new(),
944            project_id: String::new(),
945        }),
946        "AzureBlob" => Ok(StorageBackend::AzureBlob {
947            account_name: String::new(),
948            container: String::new(),
949        }),
950        "MinIO" => Ok(StorageBackend::MinIO {
951            bucket: String::new(),
952            endpoint: String::new(),
953        }),
954        "FileSystem" => Ok(StorageBackend::FileSystem {
955            base_path: String::new(),
956        }),
957        _ => Err(DbError::InvalidData(format!(
958            "Invalid storage backend type: {}",
959            s
960        ))),
961    }
962}
963
964fn parse_hash_algorithm(s: &str) -> DbResult<HashAlgorithm> {
965    HashAlgorithm::from_str(s)
966        .map_err(|e| DbError::InvalidData(format!("Invalid hash algorithm: {}", e)))
967}