1use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use llm_registry_core::{
9 Asset, AssetId, AssetMetadata, AssetStatus, AssetType, Checksum, HashAlgorithm, Provenance,
10 StorageBackend, StorageLocation,
11};
12use semver::Version;
13use serde_json::Value as JsonValue;
14use sqlx::postgres::PgRow;
15use sqlx::{PgPool, Row};
16use std::collections::HashMap;
17use std::str::FromStr;
18use tracing::{debug, instrument};
19
20use crate::error::{DbError, DbResult};
21use crate::repository::{AssetRepository, SearchQuery, SearchResults, SortField, SortOrder};
22
23#[derive(Debug, Clone)]
25pub struct PostgresAssetRepository {
26 pool: PgPool,
27}
28
29impl PostgresAssetRepository {
30 pub fn new(pool: PgPool) -> Self {
32 Self { pool }
33 }
34
35 pub fn pool(&self) -> &PgPool {
37 &self.pool
38 }
39}
40
41#[async_trait]
42impl AssetRepository for PostgresAssetRepository {
43 #[instrument(skip(self, asset), fields(asset_id = %asset.id, asset_name = %asset.metadata.name))]
44 async fn create(&self, asset: Asset) -> DbResult<Asset> {
45 debug!("Creating asset in database");
46
47 let mut tx = self.pool.begin().await?;
49
50 sqlx::query(
52 r#"
53 INSERT INTO assets (
54 id, name, version, asset_type, status,
55 storage_backend, storage_uri, storage_path, size_bytes,
56 checksum_algorithm, checksum_value,
57 signature_algorithm, signature_value, signature_key_id,
58 description, license, content_type,
59 author, source_repo, commit_hash, build_id,
60 created_at, updated_at, deprecated_at, metadata
61 ) VALUES (
62 $1, $2, $3, $4, $5,
63 $6, $7, $8, $9,
64 $10, $11,
65 $12, $13, $14,
66 $15, $16, $17,
67 $18, $19, $20, $21,
68 $22, $23, $24, $25
69 )
70 "#,
71 )
72 .bind(&asset.id.to_string())
73 .bind(&asset.metadata.name)
74 .bind(&asset.metadata.version.to_string())
75 .bind(&asset.asset_type.to_string())
76 .bind(&asset.status.to_string())
77 .bind(&asset.storage.backend.to_string())
78 .bind(asset.storage.uri.as_ref().unwrap_or(&asset.storage.get_uri()))
79 .bind(if asset.storage.path.is_empty() { None } else { Some(&asset.storage.path) })
80 .bind(asset.metadata.size_bytes.map(|s| s as i64))
81 .bind(&asset.checksum.algorithm.to_string())
82 .bind(&asset.checksum.value)
83 .bind(None::<&str>)
84 .bind(None::<&str>)
85 .bind(None::<&str>)
86 .bind(&asset.metadata.description)
87 .bind(&asset.metadata.license)
88 .bind(&asset.metadata.content_type)
89 .bind(asset.provenance.as_ref().and_then(|p| p.author.as_deref()))
90 .bind(asset.provenance.as_ref().and_then(|p| p.source_repo.as_deref()))
91 .bind(asset.provenance.as_ref().and_then(|p| p.commit_hash.as_deref()))
92 .bind(asset.provenance.as_ref().and_then(|p| p.build_id.as_deref()))
93 .bind(&asset.created_at)
94 .bind(&asset.updated_at)
95 .bind(&asset.deprecated_at)
96 .bind(serde_json::to_value(&asset.metadata.annotations)?)
97 .execute(&mut *tx)
98 .await?;
99
100 for tag in &asset.metadata.tags {
102 sqlx::query(
103 r#"
104 INSERT INTO asset_tags (asset_id, tag)
105 VALUES ($1, $2)
106 ON CONFLICT (asset_id, tag) DO NOTHING
107 "#,
108 )
109 .bind(&asset.id.to_string())
110 .bind(tag)
111 .execute(&mut *tx)
112 .await?;
113 }
114
115 for dep in &asset.dependencies {
117 let dep_id = dep.as_id().ok_or_else(|| {
118 DbError::InvalidData("Dependency must be resolved to ID before persisting".to_string())
119 })?;
120
121 sqlx::query(
122 r#"
123 INSERT INTO asset_dependencies (asset_id, dependency_id, version_constraint)
124 VALUES ($1, $2, $3)
125 ON CONFLICT (asset_id, dependency_id) DO NOTHING
126 "#,
127 )
128 .bind(&asset.id.to_string())
129 .bind(&dep_id.to_string())
130 .bind(dep.as_name_version().map(|(_, v)| v))
131 .execute(&mut *tx)
132 .await?;
133 }
134
135 tx.commit().await?;
137
138 debug!("Asset created successfully");
139 Ok(asset)
140 }
141
142 #[instrument(skip(self), fields(asset_id = %id))]
143 async fn find_by_id(&self, id: &AssetId) -> DbResult<Option<Asset>> {
144 debug!("Finding asset by ID");
145
146 let row = sqlx::query(
147 r#"
148 SELECT
149 id, name, version, asset_type, status,
150 storage_backend, storage_uri, storage_path, size_bytes,
151 checksum_algorithm, checksum_value,
152 signature_algorithm, signature_value, signature_key_id,
153 description, license, content_type,
154 author, source_repo, commit_hash, build_id,
155 created_at, updated_at, deprecated_at, metadata
156 FROM assets
157 WHERE id = $1
158 "#,
159 )
160 .bind(&id.to_string())
161 .fetch_optional(&self.pool)
162 .await?;
163
164 match row {
165 Some(row) => {
166 let asset = row_to_asset(row)?;
167 let asset = self.load_asset_relations(asset).await?;
168 Ok(Some(asset))
169 }
170 None => Ok(None),
171 }
172 }
173
174 #[instrument(skip(self))]
175 async fn find_by_name_and_version(
176 &self,
177 name: &str,
178 version: &Version,
179 ) -> DbResult<Option<Asset>> {
180 debug!("Finding asset by name and version");
181
182 let row = sqlx::query(
183 r#"
184 SELECT
185 id, name, version, asset_type, status,
186 storage_backend, storage_uri, storage_path, size_bytes,
187 checksum_algorithm, checksum_value,
188 signature_algorithm, signature_value, signature_key_id,
189 description, license, content_type,
190 author, source_repo, commit_hash, build_id,
191 created_at, updated_at, deprecated_at, metadata
192 FROM assets
193 WHERE name = $1 AND version = $2
194 "#,
195 )
196 .bind(name)
197 .bind(&version.to_string())
198 .fetch_optional(&self.pool)
199 .await?;
200
201 match row {
202 Some(row) => {
203 let asset = row_to_asset(row)?;
204 let asset = self.load_asset_relations(asset).await?;
205 Ok(Some(asset))
206 }
207 None => Ok(None),
208 }
209 }
210
211 #[instrument(skip(self, ids))]
212 async fn find_by_ids(&self, ids: &[AssetId]) -> DbResult<Vec<Asset>> {
213 if ids.is_empty() {
214 return Ok(Vec::new());
215 }
216
217 debug!("Finding {} assets by IDs", ids.len());
218
219 let id_strings: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
220
221 let rows = sqlx::query(
222 r#"
223 SELECT
224 id, name, version, asset_type, status,
225 storage_backend, storage_uri, storage_path, size_bytes,
226 checksum_algorithm, checksum_value,
227 signature_algorithm, signature_value, signature_key_id,
228 description, license, content_type,
229 author, source_repo, commit_hash, build_id,
230 created_at, updated_at, deprecated_at, metadata
231 FROM assets
232 WHERE id = ANY($1)
233 "#,
234 )
235 .bind(&id_strings)
236 .fetch_all(&self.pool)
237 .await?;
238
239 let mut assets = Vec::new();
240 for row in rows {
241 let asset = row_to_asset(row)?;
242 let asset = self.load_asset_relations(asset).await?;
243 assets.push(asset);
244 }
245
246 Ok(assets)
247 }
248
249 #[instrument(skip(self, query))]
250 async fn search(&self, query: &SearchQuery) -> DbResult<SearchResults> {
251 debug!("Searching assets with filters");
252
253 let mut sql = String::from(
255 r#"
256 SELECT
257 a.id, a.name, a.version, a.asset_type, a.status,
258 a.storage_backend, a.storage_uri, a.storage_path, a.size_bytes,
259 a.checksum_algorithm, a.checksum_value,
260 a.signature_algorithm, a.signature_value, a.signature_key_id,
261 a.description, a.license, a.content_type,
262 a.author, a.source_repo, a.commit_hash, a.build_id,
263 a.created_at, a.updated_at, a.deprecated_at, a.metadata
264 FROM assets a
265 WHERE 1=1
266 "#,
267 );
268
269 let mut conditions = Vec::new();
270 let mut bind_values: Vec<String> = Vec::new();
271 let mut param_num = 1;
272
273 if let Some(ref text) = query.text {
275 conditions.push(format!(
276 "(a.name ILIKE ${} OR a.description ILIKE ${})",
277 param_num, param_num + 1
278 ));
279 bind_values.push(format!("%{}%", text));
280 bind_values.push(format!("%{}%", text));
281 param_num += 2;
282 }
283
284 if !query.asset_types.is_empty() {
286 conditions.push(format!("a.asset_type = ANY(${})", param_num));
287 param_num += 1;
289 }
290
291 if let Some(ref author) = query.author {
293 conditions.push(format!("a.author = ${}", param_num));
294 bind_values.push(author.clone());
295 param_num += 1;
296 }
297
298 if let Some(ref backend) = query.storage_backend {
300 conditions.push(format!("a.storage_backend = ${}", param_num));
301 bind_values.push(backend.clone());
302 param_num += 1;
303 }
304
305 if query.exclude_deprecated {
307 conditions.push("a.deprecated_at IS NULL".to_string());
308 }
309
310 if !query.tags.is_empty() {
312 let tag_condition = format!(
313 "a.id IN (
314 SELECT asset_id FROM asset_tags
315 WHERE tag = ANY(${}::text[])
316 GROUP BY asset_id
317 HAVING COUNT(DISTINCT tag) = {}
318 )",
319 param_num,
320 query.tags.len()
321 );
322 conditions.push(tag_condition);
323 #[allow(unused_assignments)]
324 {
325 param_num += 1;
326 }
327 }
328
329 if !conditions.is_empty() {
331 sql.push_str(" AND ");
332 sql.push_str(&conditions.join(" AND "));
333 }
334
335 let sort_field = match query.sort_by {
337 SortField::CreatedAt => "a.created_at",
338 SortField::UpdatedAt => "a.updated_at",
339 SortField::Name => "a.name",
340 SortField::Version => "a.version",
341 SortField::SizeBytes => "a.size_bytes",
342 };
343
344 let sort_order = match query.sort_order {
345 SortOrder::Ascending => "ASC",
346 SortOrder::Descending => "DESC",
347 };
348
349 sql.push_str(&format!(" ORDER BY {} {}", sort_field, sort_order));
350
351 sql.push_str(&format!(" LIMIT {} OFFSET {}", query.limit, query.offset));
353
354 let mut final_query = sqlx::query(&sql);
357
358 for value in &bind_values {
360 final_query = final_query.bind(value);
361 }
362
363 let types: Vec<String> = query.asset_types.iter().map(|t| t.to_string()).collect();
364 if !query.asset_types.is_empty() {
365 final_query = final_query.bind(&types);
366 }
367
368 if !query.tags.is_empty() {
369 final_query = final_query.bind(&query.tags);
370 }
371
372 let rows = final_query.fetch_all(&self.pool).await?;
373
374 let mut assets = Vec::new();
375 for row in rows {
376 let asset = row_to_asset(row)?;
377 let asset = self.load_asset_relations(asset).await?;
378 assets.push(asset);
379 }
380
381 let total = self.count_search_results(query).await?;
383
384 Ok(SearchResults {
385 assets,
386 total,
387 offset: query.offset,
388 limit: query.limit,
389 })
390 }
391
392 #[instrument(skip(self, asset), fields(asset_id = %asset.id))]
393 async fn update(&self, asset: Asset) -> DbResult<Asset> {
394 debug!("Updating asset");
395
396 let mut tx = self.pool.begin().await?;
397
398 let result = sqlx::query(
399 r#"
400 UPDATE assets SET
401 name = $2,
402 version = $3,
403 asset_type = $4,
404 status = $5,
405 storage_backend = $6,
406 storage_uri = $7,
407 storage_path = $8,
408 size_bytes = $9,
409 checksum_algorithm = $10,
410 checksum_value = $11,
411 signature_algorithm = $12,
412 signature_value = $13,
413 signature_key_id = $14,
414 description = $15,
415 license = $16,
416 content_type = $17,
417 author = $18,
418 source_repo = $19,
419 commit_hash = $20,
420 build_id = $21,
421 deprecated_at = $22,
422 metadata = $23,
423 updated_at = $24
424 WHERE id = $1
425 "#,
426 )
427 .bind(&asset.id.to_string())
428 .bind(&asset.metadata.name)
429 .bind(&asset.metadata.version.to_string())
430 .bind(&asset.asset_type.to_string())
431 .bind(&asset.status.to_string())
432 .bind(&asset.storage.backend.to_string())
433 .bind(asset.storage.uri.as_ref().unwrap_or(&asset.storage.get_uri()))
434 .bind(if asset.storage.path.is_empty() { None } else { Some(&asset.storage.path) })
435 .bind(asset.metadata.size_bytes.map(|s| s as i64))
436 .bind(&asset.checksum.algorithm.to_string())
437 .bind(&asset.checksum.value)
438 .bind(None::<&str>)
439 .bind(None::<&str>)
440 .bind(None::<&str>)
441 .bind(&asset.metadata.description)
442 .bind(&asset.metadata.license)
443 .bind(&asset.metadata.content_type)
444 .bind(asset.provenance.as_ref().and_then(|p| p.author.as_deref()))
445 .bind(asset.provenance.as_ref().and_then(|p| p.source_repo.as_deref()))
446 .bind(asset.provenance.as_ref().and_then(|p| p.commit_hash.as_deref()))
447 .bind(asset.provenance.as_ref().and_then(|p| p.build_id.as_deref()))
448 .bind(&asset.deprecated_at)
449 .bind(serde_json::to_value(&asset.metadata.annotations)?)
450 .bind(Utc::now())
451 .execute(&mut *tx)
452 .await?;
453
454 if result.rows_affected() == 0 {
455 return Err(DbError::NotFound(format!("Asset {} not found", asset.id)));
456 }
457
458 sqlx::query("DELETE FROM asset_tags WHERE asset_id = $1")
460 .bind(&asset.id.to_string())
461 .execute(&mut *tx)
462 .await?;
463
464 for tag in &asset.metadata.tags {
465 sqlx::query(
466 r#"
467 INSERT INTO asset_tags (asset_id, tag)
468 VALUES ($1, $2)
469 "#,
470 )
471 .bind(&asset.id.to_string())
472 .bind(tag)
473 .execute(&mut *tx)
474 .await?;
475 }
476
477 tx.commit().await?;
478
479 debug!("Asset updated successfully");
480 Ok(asset)
481 }
482
483 #[instrument(skip(self), fields(asset_id = %id))]
484 async fn delete(&self, id: &AssetId) -> DbResult<()> {
485 debug!("Deleting asset");
486
487 let result = sqlx::query("DELETE FROM assets WHERE id = $1")
488 .bind(&id.to_string())
489 .execute(&self.pool)
490 .await?;
491
492 if result.rows_affected() == 0 {
493 return Err(DbError::NotFound(format!("Asset {} not found", id)));
494 }
495
496 debug!("Asset deleted successfully");
497 Ok(())
498 }
499
500 #[instrument(skip(self))]
501 async fn list_versions(&self, name: &str) -> DbResult<Vec<Asset>> {
502 debug!("Listing versions for asset");
503
504 let rows = sqlx::query(
505 r#"
506 SELECT
507 id, name, version, asset_type, status,
508 storage_backend, storage_uri, storage_path, size_bytes,
509 checksum_algorithm, checksum_value,
510 signature_algorithm, signature_value, signature_key_id,
511 description, license, content_type,
512 author, source_repo, commit_hash, build_id,
513 created_at, updated_at, deprecated_at, metadata
514 FROM assets
515 WHERE name = $1
516 ORDER BY created_at DESC
517 "#,
518 )
519 .bind(name)
520 .fetch_all(&self.pool)
521 .await?;
522
523 let mut assets = Vec::new();
524 for row in rows {
525 let asset = row_to_asset(row)?;
526 let asset = self.load_asset_relations(asset).await?;
527 assets.push(asset);
528 }
529
530 Ok(assets)
531 }
532
533 #[instrument(skip(self), fields(asset_id = %id))]
534 async fn list_dependencies(&self, id: &AssetId) -> DbResult<Vec<Asset>> {
535 debug!("Listing dependencies");
536
537 let rows = sqlx::query(
538 r#"
539 SELECT
540 a.id, a.name, a.version, a.asset_type, a.status,
541 a.storage_backend, a.storage_uri, a.storage_path, a.size_bytes,
542 a.checksum_algorithm, a.checksum_value,
543 a.signature_algorithm, a.signature_value, a.signature_key_id,
544 a.description, a.license, a.content_type,
545 a.author, a.source_repo, a.commit_hash, a.build_id,
546 a.created_at, a.updated_at, a.deprecated_at, a.metadata
547 FROM assets a
548 INNER JOIN asset_dependencies d ON a.id = d.dependency_id
549 WHERE d.asset_id = $1
550 "#,
551 )
552 .bind(&id.to_string())
553 .fetch_all(&self.pool)
554 .await?;
555
556 let mut assets = Vec::new();
557 for row in rows {
558 let asset = row_to_asset(row)?;
559 let asset = self.load_asset_relations(asset).await?;
560 assets.push(asset);
561 }
562
563 Ok(assets)
564 }
565
566 #[instrument(skip(self), fields(asset_id = %id))]
567 async fn list_reverse_dependencies(&self, id: &AssetId) -> DbResult<Vec<Asset>> {
568 debug!("Listing reverse dependencies");
569
570 let rows = sqlx::query(
571 r#"
572 SELECT
573 a.id, a.name, a.version, a.asset_type, a.status,
574 a.storage_backend, a.storage_uri, a.storage_path, a.size_bytes,
575 a.checksum_algorithm, a.checksum_value,
576 a.signature_algorithm, a.signature_value, a.signature_key_id,
577 a.description, a.license, a.content_type,
578 a.author, a.source_repo, a.commit_hash, a.build_id,
579 a.created_at, a.updated_at, a.deprecated_at, a.metadata
580 FROM assets a
581 INNER JOIN asset_dependencies d ON a.id = d.asset_id
582 WHERE d.dependency_id = $1
583 "#,
584 )
585 .bind(&id.to_string())
586 .fetch_all(&self.pool)
587 .await?;
588
589 let mut assets = Vec::new();
590 for row in rows {
591 let asset = row_to_asset(row)?;
592 let asset = self.load_asset_relations(asset).await?;
593 assets.push(asset);
594 }
595
596 Ok(assets)
597 }
598
599 #[instrument(skip(self), fields(asset_id = %id, tag = %tag))]
600 async fn add_tag(&self, id: &AssetId, tag: &str) -> DbResult<()> {
601 debug!("Adding tag to asset");
602
603 sqlx::query(
604 r#"
605 INSERT INTO asset_tags (asset_id, tag)
606 VALUES ($1, $2)
607 ON CONFLICT (asset_id, tag) DO NOTHING
608 "#,
609 )
610 .bind(&id.to_string())
611 .bind(tag)
612 .execute(&self.pool)
613 .await?;
614
615 Ok(())
616 }
617
618 #[instrument(skip(self), fields(asset_id = %id, tag = %tag))]
619 async fn remove_tag(&self, id: &AssetId, tag: &str) -> DbResult<()> {
620 debug!("Removing tag from asset");
621
622 sqlx::query("DELETE FROM asset_tags WHERE asset_id = $1 AND tag = $2")
623 .bind(&id.to_string())
624 .bind(tag)
625 .execute(&self.pool)
626 .await?;
627
628 Ok(())
629 }
630
631 #[instrument(skip(self), fields(asset_id = %id))]
632 async fn get_tags(&self, id: &AssetId) -> DbResult<Vec<String>> {
633 debug!("Getting tags for asset");
634
635 let rows = sqlx::query("SELECT tag FROM asset_tags WHERE asset_id = $1 ORDER BY tag")
636 .bind(&id.to_string())
637 .fetch_all(&self.pool)
638 .await?;
639
640 let tags = rows
641 .iter()
642 .map(|row| row.get::<String, _>("tag"))
643 .collect();
644
645 Ok(tags)
646 }
647
648 #[instrument(skip(self))]
649 async fn list_all_tags(&self) -> DbResult<Vec<String>> {
650 debug!("Listing all tags");
651
652 let rows = sqlx::query("SELECT DISTINCT tag FROM asset_tags ORDER BY tag")
653 .fetch_all(&self.pool)
654 .await?;
655
656 let tags = rows
657 .iter()
658 .map(|row| row.get::<String, _>("tag"))
659 .collect();
660
661 Ok(tags)
662 }
663
664 #[instrument(skip(self))]
665 async fn add_dependency(
666 &self,
667 asset_id: &AssetId,
668 dependency_id: &AssetId,
669 version_constraint: Option<&str>,
670 ) -> DbResult<()> {
671 debug!("Adding dependency relationship");
672
673 if self.would_create_cycle(asset_id, dependency_id).await? {
675 return Err(DbError::CircularDependency(format!(
676 "Adding dependency from {} to {} would create a cycle",
677 asset_id, dependency_id
678 )));
679 }
680
681 sqlx::query(
682 r#"
683 INSERT INTO asset_dependencies (asset_id, dependency_id, version_constraint)
684 VALUES ($1, $2, $3)
685 ON CONFLICT (asset_id, dependency_id) DO UPDATE
686 SET version_constraint = EXCLUDED.version_constraint
687 "#,
688 )
689 .bind(&asset_id.to_string())
690 .bind(&dependency_id.to_string())
691 .bind(version_constraint)
692 .execute(&self.pool)
693 .await?;
694
695 Ok(())
696 }
697
698 #[instrument(skip(self))]
699 async fn remove_dependency(
700 &self,
701 asset_id: &AssetId,
702 dependency_id: &AssetId,
703 ) -> DbResult<()> {
704 debug!("Removing dependency relationship");
705
706 sqlx::query("DELETE FROM asset_dependencies WHERE asset_id = $1 AND dependency_id = $2")
707 .bind(&asset_id.to_string())
708 .bind(&dependency_id.to_string())
709 .execute(&self.pool)
710 .await?;
711
712 Ok(())
713 }
714
715 #[instrument(skip(self))]
716 async fn count_assets(&self) -> DbResult<i64> {
717 let row = sqlx::query("SELECT COUNT(*) as count FROM assets")
718 .fetch_one(&self.pool)
719 .await?;
720
721 Ok(row.get("count"))
722 }
723
724 #[instrument(skip(self))]
725 async fn count_by_type(&self, asset_type: &AssetType) -> DbResult<i64> {
726 let row = sqlx::query("SELECT COUNT(*) as count FROM assets WHERE asset_type = $1")
727 .bind(&asset_type.to_string())
728 .fetch_one(&self.pool)
729 .await?;
730
731 Ok(row.get("count"))
732 }
733
734 #[instrument(skip(self))]
735 async fn health_check(&self) -> DbResult<()> {
736 sqlx::query("SELECT 1")
737 .execute(&self.pool)
738 .await
739 .map(|_| ())
740 .map_err(Into::into)
741 }
742}
743
744impl PostgresAssetRepository {
745 async fn load_asset_relations(&self, mut asset: Asset) -> DbResult<Asset> {
747 let tags = self.get_tags(&asset.id).await?;
749 asset.metadata.tags = tags;
750
751 let dep_rows = sqlx::query(
753 "SELECT dependency_id FROM asset_dependencies WHERE asset_id = $1"
754 )
755 .bind(&asset.id.to_string())
756 .fetch_all(&self.pool)
757 .await?;
758
759 asset.dependencies = dep_rows
760 .iter()
761 .filter_map(|row| {
762 let dep_id_str: String = row.get("dependency_id");
763 AssetId::from_str(&dep_id_str)
764 .ok()
765 .map(|id| llm_registry_core::AssetReference::by_id(id))
766 })
767 .collect();
768
769 Ok(asset)
770 }
771
772 async fn would_create_cycle(&self, from_id: &AssetId, to_id: &AssetId) -> DbResult<bool> {
774 let row = sqlx::query(
776 r#"
777 WITH RECURSIVE dep_tree AS (
778 SELECT dependency_id, 1 as depth
779 FROM asset_dependencies
780 WHERE asset_id = $1
781
782 UNION ALL
783
784 SELECT d.dependency_id, dt.depth + 1
785 FROM asset_dependencies d
786 INNER JOIN dep_tree dt ON d.asset_id = dt.dependency_id
787 WHERE dt.depth < 100
788 )
789 SELECT COUNT(*) > 0 as has_cycle
790 FROM dep_tree
791 WHERE dependency_id = $2
792 "#,
793 )
794 .bind(&to_id.to_string())
795 .bind(&from_id.to_string())
796 .fetch_one(&self.pool)
797 .await?;
798
799 Ok(row.get("has_cycle"))
800 }
801
802 async fn count_search_results(&self, query: &SearchQuery) -> DbResult<i64> {
804 let mut sql = String::from("SELECT COUNT(*) as count FROM assets a WHERE 1=1");
806
807 if query.exclude_deprecated {
808 sql.push_str(" AND a.deprecated_at IS NULL");
809 }
810
811 if !query.asset_types.is_empty() {
812 let types: Vec<String> = query.asset_types.iter().map(|t| t.to_string()).collect();
813 let placeholders: Vec<String> = types.iter().map(|t| format!("'{}'", t)).collect();
814 sql.push_str(&format!(" AND a.asset_type IN ({})", placeholders.join(", ")));
815 }
816
817 let row = sqlx::query(&sql).fetch_one(&self.pool).await?;
818
819 Ok(row.get("count"))
820 }
821}
822
823fn row_to_asset(row: PgRow) -> DbResult<Asset> {
825 let id_str: String = row.get("id");
826 let id = AssetId::from_str(&id_str)
827 .map_err(|e| DbError::InvalidData(format!("Invalid asset ID: {}", e)))?;
828
829 let version_str: String = row.get("version");
830 let version = Version::parse(&version_str)
831 .map_err(|e| DbError::InvalidData(format!("Invalid version: {}", e)))?;
832
833 let asset_type_str: String = row.get("asset_type");
834 let asset_type = parse_asset_type(&asset_type_str)?;
835
836 let status_str: String = row.get("status");
837 let status = parse_asset_status(&status_str)?;
838
839 let backend_str: String = row.get("storage_backend");
840 let backend = parse_storage_backend_from_db(&backend_str)?;
841
842 let storage_uri: String = row.get("storage_uri");
843 let storage_path: Option<String> = row.get("storage_path");
844
845 let checksum_algo_str: String = row.get("checksum_algorithm");
846 let checksum_algorithm = parse_hash_algorithm(&checksum_algo_str)?;
847 let checksum_value: String = row.get("checksum_value");
848
849 let metadata_json: JsonValue = row.get("metadata");
850 let annotations: HashMap<String, String> = serde_json::from_value(metadata_json)
851 .unwrap_or_default();
852
853 let created_at: DateTime<Utc> = row.get("created_at");
854 let updated_at: DateTime<Utc> = row.get("updated_at");
855 let deprecated_at: Option<DateTime<Utc>> = row.get("deprecated_at");
856
857 let size_bytes: Option<i64> = row.get("size_bytes");
858
859 let provenance = {
860 let author: Option<String> = row.get("author");
861 let source_repo: Option<String> = row.get("source_repo");
862 let commit_hash: Option<String> = row.get("commit_hash");
863 let build_id: Option<String> = row.get("build_id");
864
865 if author.is_some() || source_repo.is_some() {
866 Some(Provenance {
867 author,
868 source_repo,
869 commit_hash,
870 build_id,
871 created_at: Utc::now(),
872 build_metadata: HashMap::new(),
873 })
874 } else {
875 None
876 }
877 };
878
879 let metadata = AssetMetadata {
880 name: row.get("name"),
881 version,
882 description: row.get("description"),
883 license: row.get("license"),
884 tags: Vec::new(), annotations,
886 size_bytes: size_bytes.map(|s| s as u64),
887 content_type: row.get("content_type"),
888 };
889
890 let storage = StorageLocation {
891 backend,
892 path: storage_path.unwrap_or_default(),
893 uri: Some(storage_uri),
894 };
895
896 let checksum = Checksum {
897 algorithm: checksum_algorithm,
898 value: checksum_value,
899 };
900
901 Ok(Asset {
902 id,
903 asset_type,
904 metadata,
905 status,
906 storage,
907 checksum,
908 provenance,
909 dependencies: Vec::new(), created_at,
911 updated_at,
912 deprecated_at,
913 })
914}
915
916fn parse_asset_type(s: &str) -> DbResult<AssetType> {
917 match s {
918 "model" => Ok(AssetType::Model),
919 "pipeline" => Ok(AssetType::Pipeline),
920 "test_suite" => Ok(AssetType::TestSuite),
921 "policy" => Ok(AssetType::Policy),
922 "dataset" => Ok(AssetType::Dataset),
923 other => AssetType::custom(other)
924 .map_err(|e| DbError::InvalidData(format!("Invalid asset type: {}", e))),
925 }
926}
927
928fn parse_asset_status(s: &str) -> DbResult<AssetStatus> {
929 AssetStatus::from_str(s)
930 .map_err(|e| DbError::InvalidData(format!("Invalid asset status: {}", e)))
931}
932
933fn parse_storage_backend_from_db(s: &str) -> DbResult<StorageBackend> {
934 match s {
937 "S3" => Ok(StorageBackend::S3 {
938 bucket: String::new(),
939 region: String::new(),
940 endpoint: None,
941 }),
942 "GCS" => Ok(StorageBackend::GCS {
943 bucket: String::new(),
944 project_id: String::new(),
945 }),
946 "AzureBlob" => Ok(StorageBackend::AzureBlob {
947 account_name: String::new(),
948 container: String::new(),
949 }),
950 "MinIO" => Ok(StorageBackend::MinIO {
951 bucket: String::new(),
952 endpoint: String::new(),
953 }),
954 "FileSystem" => Ok(StorageBackend::FileSystem {
955 base_path: String::new(),
956 }),
957 _ => Err(DbError::InvalidData(format!(
958 "Invalid storage backend type: {}",
959 s
960 ))),
961 }
962}
963
964fn parse_hash_algorithm(s: &str) -> DbResult<HashAlgorithm> {
965 HashAlgorithm::from_str(s)
966 .map_err(|e| DbError::InvalidData(format!("Invalid hash algorithm: {}", e)))
967}