Skip to main content

pacha/registry/
mod.rs

1//! Registry implementation with `SQLite` storage.
2
3mod database;
4
5pub use database::RegistryDb;
6
7use crate::data::{Dataset, DatasetId, Datasheet};
8use crate::error::{PachaError, Result};
9use crate::experiment::{ExperimentRun, RunId};
10use crate::lineage::LineageGraph;
11use crate::model::{Model, ModelCard, ModelId, ModelStage, ModelVersion};
12use crate::recipe::{RecipeId, RecipeReference, TrainingRecipe};
13use crate::storage::ObjectStore;
14use chrono::Utc;
15use std::fs;
16use std::path::{Path, PathBuf};
17
18/// Configuration for the Pacha registry.
19#[derive(Debug, Clone)]
20pub struct RegistryConfig {
21    /// Base path for the registry.
22    pub base_path: PathBuf,
23}
24
25impl RegistryConfig {
26    /// Create a new config with the given base path.
27    #[must_use]
28    pub fn new<P: AsRef<Path>>(base_path: P) -> Self {
29        Self { base_path: base_path.as_ref().to_path_buf() }
30    }
31
32    /// Get the database path.
33    #[must_use]
34    pub fn db_path(&self) -> PathBuf {
35        self.base_path.join("registry.db")
36    }
37
38    /// Get the objects path.
39    #[must_use]
40    pub fn objects_path(&self) -> PathBuf {
41        self.base_path.join("objects")
42    }
43
44    /// Get the config file path.
45    #[must_use]
46    pub fn config_path(&self) -> PathBuf {
47        self.base_path.join("config.toml")
48    }
49}
50
51impl Default for RegistryConfig {
52    fn default() -> Self {
53        let home = dirs_path();
54        Self::new(home.join(".pacha"))
55    }
56}
57
58fn dirs_path() -> PathBuf {
59    std::env::var("HOME").map_or_else(|_| PathBuf::from("."), PathBuf::from)
60}
61
62/// The main Pacha registry.
63pub struct Registry {
64    config: RegistryConfig,
65    db: RegistryDb,
66    objects: ObjectStore,
67}
68
69impl Registry {
70    /// Create or open a registry at the default location (~/.pacha).
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if initialization fails.
75    pub fn open_default() -> Result<Self> {
76        Self::open(RegistryConfig::default())
77    }
78
79    /// Create or open a registry with the given configuration.
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if initialization fails.
84    pub fn open(config: RegistryConfig) -> Result<Self> {
85        // Create base directory
86        fs::create_dir_all(&config.base_path)?;
87
88        // Initialize database
89        let db = RegistryDb::open(config.db_path())?;
90
91        // Initialize object store
92        let objects = ObjectStore::new(config.objects_path())?;
93
94        Ok(Self { config, db, objects })
95    }
96
97    /// Get the registry configuration.
98    #[must_use]
99    pub fn config(&self) -> &RegistryConfig {
100        &self.config
101    }
102
103    /// HELIX-IDEA-007 — atomic point-in-time snapshot of the registry
104    /// SQLite metadata file. Wraps SQLite's `VACUUM INTO 'path'` so the
105    /// target file is a self-consistent copy of the source as of the
106    /// moment the statement begins. Concurrent writers continue against
107    /// the source; their post-snapshot changes are absent from the copy.
108    ///
109    /// The companion content-addressed object store under `objects/` is
110    /// immutable by construction (BLAKE3-keyed) — a consistent snapshot
111    /// of the full registry is `(snapshot.db, cp -r objects/)`. We
112    /// document but do not automate the object copy in v1.
113    ///
114    /// Contract: `contracts/apr-registry-snapshot-v1.yaml`
115    /// (FALSIFY-SNAPSHOT-001..003).
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the underlying SQL fails — most commonly when
120    /// `to` already exists (SQLite refuses to overwrite via VACUUM INTO).
121    pub fn snapshot<P: AsRef<Path>>(&self, to: P) -> Result<()> {
122        self.db.vacuum_into(to)
123    }
124
125    // ==================== Model Registry ====================
126
127    /// Register a new model.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if registration fails.
132    pub fn register_model(
133        &self,
134        name: &str,
135        version: &ModelVersion,
136        artifact: &[u8],
137        card: ModelCard,
138    ) -> Result<ModelId> {
139        // Check if already exists
140        if self.db.model_exists(name, version)? {
141            return Err(PachaError::AlreadyExists {
142                kind: "model".to_string(),
143                name: name.to_string(),
144                version: version.to_string(),
145            });
146        }
147
148        // Store artifact
149        let content_address = self.objects.put(artifact)?;
150
151        // Create model
152        let model = Model {
153            id: ModelId::new(),
154            name: name.to_string(),
155            version: version.clone(),
156            content_address,
157            card,
158            stage: ModelStage::Development,
159            created_at: Utc::now(),
160            updated_at: Utc::now(),
161        };
162
163        // Save to database
164        self.db.insert_model(&model)?;
165
166        Ok(model.id)
167    }
168
169    /// Get a model by name and version.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if the model is not found.
174    pub fn get_model(&self, name: &str, version: &ModelVersion) -> Result<Model> {
175        // Contract: configuration-v1.yaml precondition (pv codegen)
176        contract_pre_configuration!(name.as_bytes());
177        self.db.get_model(name, version)
178    }
179
180    /// Get a model by ID.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if the model is not found.
185    pub fn get_model_by_id(&self, id: &ModelId) -> Result<Model> {
186        self.db.get_model_by_id(id)
187    }
188
189    /// List all versions of a model.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if the query fails.
194    pub fn list_model_versions(&self, name: &str) -> Result<Vec<ModelVersion>> {
195        contract_pre_ols_fit!();
196        let result = self.db.list_model_versions(name);
197        if let Ok(ref val) = result {
198            contract_post_configuration!(val);
199        }
200        result
201    }
202
203    /// List all model names.
204    ///
205    /// # Errors
206    ///
207    /// Returns an error if the query fails.
208    pub fn list_models(&self) -> Result<Vec<String>> {
209        contract_pre_ols_fit!();
210        let result = self.db.list_model_names();
211        if let Ok(ref val) = result {
212            contract_post_configuration!(val);
213        }
214        result
215    }
216
217    /// Transition a model to a new stage.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if the transition is invalid.
222    pub fn transition_model_stage(
223        &self,
224        name: &str,
225        version: &ModelVersion,
226        target_stage: ModelStage,
227    ) -> Result<()> {
228        let model = self.get_model(name, version)?;
229        let _new_stage = model.stage.transition_to(target_stage)?;
230        self.db.update_model_stage(&model.id, target_stage)
231    }
232
233    /// Get the artifact data for a model.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the artifact cannot be retrieved.
238    pub fn get_model_artifact(&self, name: &str, version: &ModelVersion) -> Result<Vec<u8>> {
239        let model = self.get_model(name, version)?;
240        self.objects.get(&model.content_address)
241    }
242
243    /// Get model lineage graph.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the query fails.
248    pub fn get_model_lineage(&self, _model_id: &ModelId) -> Result<LineageGraph> {
249        // Returns empty graph until lineage data is populated
250        Ok(LineageGraph::new())
251    }
252
253    // ==================== Dataset Registry ====================
254
255    /// Register a new dataset.
256    ///
257    /// # Errors
258    ///
259    /// Returns an error if registration fails.
260    pub fn register_dataset(
261        &self,
262        name: &str,
263        version: &crate::data::DatasetVersion,
264        data: &[u8],
265        datasheet: Datasheet,
266    ) -> Result<DatasetId> {
267        // Check if already exists
268        if self.db.dataset_exists(name, version)? {
269            return Err(PachaError::AlreadyExists {
270                kind: "dataset".to_string(),
271                name: name.to_string(),
272                version: version.to_string(),
273            });
274        }
275
276        // Store data
277        let content_address = self.objects.put(data)?;
278
279        // Create dataset
280        let dataset = Dataset {
281            id: DatasetId::new(),
282            name: name.to_string(),
283            version: version.clone(),
284            content_address,
285            datasheet,
286            created_at: Utc::now(),
287        };
288
289        // Save to database
290        self.db.insert_dataset(&dataset)?;
291
292        Ok(dataset.id)
293    }
294
295    /// Get a dataset by name and version.
296    ///
297    /// # Errors
298    ///
299    /// Returns an error if the dataset is not found.
300    pub fn get_dataset(
301        &self,
302        name: &str,
303        version: &crate::data::DatasetVersion,
304    ) -> Result<Dataset> {
305        self.db.get_dataset(name, version)
306    }
307
308    /// List all dataset names.
309    ///
310    /// # Errors
311    ///
312    /// Returns an error if the query fails.
313    pub fn list_datasets(&self) -> Result<Vec<String>> {
314        contract_pre_configuration!();
315        let result = self.db.list_dataset_names();
316        if let Ok(ref val) = result {
317            contract_post_configuration!(val);
318        }
319        result
320    }
321
322    /// List all versions of a dataset.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the query fails.
327    pub fn list_dataset_versions(&self, name: &str) -> Result<Vec<crate::data::DatasetVersion>> {
328        contract_pre_configuration!(name);
329        let result = self.db.list_dataset_versions(name);
330        if let Ok(ref val) = result {
331            contract_post_configuration!(val);
332        }
333        result
334    }
335
336    /// Get the data for a dataset.
337    ///
338    /// # Errors
339    ///
340    /// Returns an error if the data cannot be retrieved.
341    pub fn get_dataset_data(
342        &self,
343        name: &str,
344        version: &crate::data::DatasetVersion,
345    ) -> Result<Vec<u8>> {
346        let dataset = self.get_dataset(name, version)?;
347        self.objects.get(&dataset.content_address)
348    }
349
350    // ==================== Recipe Registry ====================
351
352    /// Register a new recipe.
353    ///
354    /// # Errors
355    ///
356    /// Returns an error if registration fails.
357    pub fn register_recipe(&self, recipe: &TrainingRecipe) -> Result<RecipeId> {
358        // Check if already exists
359        if self.db.recipe_exists(&recipe.name, &recipe.version)? {
360            return Err(PachaError::AlreadyExists {
361                kind: "recipe".to_string(),
362                name: recipe.name.clone(),
363                version: recipe.version.to_string(),
364            });
365        }
366
367        let id = recipe.id.clone();
368        self.db.insert_recipe(recipe)?;
369        Ok(id)
370    }
371
372    /// Get a recipe by name and version.
373    ///
374    /// # Errors
375    ///
376    /// Returns an error if the recipe is not found.
377    pub fn get_recipe(
378        &self,
379        name: &str,
380        version: &crate::recipe::RecipeVersion,
381    ) -> Result<TrainingRecipe> {
382        self.db.get_recipe(name, version)
383    }
384
385    /// List all recipe names.
386    ///
387    /// # Errors
388    ///
389    /// Returns an error if the query fails.
390    pub fn list_recipes(&self) -> Result<Vec<String>> {
391        contract_pre_configuration!();
392        let result = self.db.list_recipe_names();
393        if let Ok(ref val) = result {
394            contract_post_configuration!(val);
395        }
396        result
397    }
398
399    /// List all versions of a recipe.
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if the query fails.
404    pub fn list_recipe_versions(&self, name: &str) -> Result<Vec<crate::recipe::RecipeVersion>> {
405        contract_pre_expand_recipe!(name);
406        self.db.list_recipe_versions(name)
407    }
408
409    // ==================== Experiment Tracking ====================
410
411    /// Start a new experiment run.
412    ///
413    /// # Errors
414    ///
415    /// Returns an error if starting fails.
416    pub fn start_run(&self, mut run: ExperimentRun) -> Result<RunId> {
417        contract_pre_configuration!();
418        run.start();
419        let id = run.run_id.clone();
420        self.db.insert_run(&run)?;
421        Ok(id)
422    }
423
424    /// Update an experiment run.
425    ///
426    /// # Errors
427    ///
428    /// Returns an error if the update fails.
429    pub fn update_run(&self, run: &ExperimentRun) -> Result<()> {
430        contract_pre_configuration!();
431        self.db.update_run(run)
432    }
433
434    /// Get an experiment run by ID.
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if the run is not found.
439    pub fn get_run(&self, run_id: &RunId) -> Result<ExperimentRun> {
440        contract_pre_configuration!();
441        self.db.get_run(run_id)
442    }
443
444    /// List runs for a recipe.
445    ///
446    /// # Errors
447    ///
448    /// Returns an error if the query fails.
449    pub fn list_runs(&self, recipe_ref: &RecipeReference) -> Result<Vec<ExperimentRun>> {
450        contract_pre_configuration!();
451        self.db.list_runs_for_recipe(recipe_ref)
452    }
453
454    // ==================== Utility ====================
455
456    /// Get storage statistics.
457    ///
458    /// # Errors
459    ///
460    /// Returns an error if querying fails.
461    pub fn storage_stats(&self) -> Result<StorageStats> {
462        let total_size = self.objects.total_size()?;
463        let object_count = self.objects.list()?.len();
464        let model_count = self.db.count_models()?;
465        let dataset_count = self.db.count_datasets()?;
466        let recipe_count = self.db.count_recipes()?;
467
468        Ok(StorageStats {
469            total_size_bytes: total_size,
470            object_count,
471            model_count,
472            dataset_count,
473            recipe_count,
474        })
475    }
476}
477
478/// Storage statistics.
479#[derive(Debug, Clone)]
480pub struct StorageStats {
481    /// Total size of all objects in bytes.
482    pub total_size_bytes: u64,
483    /// Number of content-addressed objects.
484    pub object_count: usize,
485    /// Number of registered models.
486    pub model_count: usize,
487    /// Number of registered datasets.
488    pub dataset_count: usize,
489    /// Number of registered recipes.
490    pub recipe_count: usize,
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496    use crate::data::DatasetVersion;
497    use crate::recipe::{Hyperparameters, RecipeVersion};
498    use tempfile::TempDir;
499
500    fn setup() -> (TempDir, Registry) {
501        let dir = TempDir::new().unwrap();
502        let config = RegistryConfig::new(dir.path());
503        let registry = Registry::open(config).unwrap();
504        (dir, registry)
505    }
506
507    #[test]
508    fn test_registry_open() {
509        let (_dir, registry) = setup();
510        assert!(registry.config.base_path.exists());
511    }
512
513    #[test]
514    fn test_register_and_get_model() {
515        let (_dir, registry) = setup();
516
517        let name = "test-model";
518        let version = ModelVersion::new(1, 0, 0);
519        let artifact = b"model data";
520        let card = ModelCard::new("Test model");
521
522        let id = registry.register_model(name, &version, artifact, card.clone()).unwrap();
523
524        let model = registry.get_model(name, &version).unwrap();
525        assert_eq!(model.id, id);
526        assert_eq!(model.name, name);
527        assert_eq!(model.version, version);
528        assert_eq!(model.card.description, card.description);
529    }
530
531    #[test]
532    fn test_register_duplicate_model_fails() {
533        let (_dir, registry) = setup();
534
535        let name = "test-model";
536        let version = ModelVersion::new(1, 0, 0);
537        let artifact = b"model data";
538        let card = ModelCard::new("Test model");
539
540        registry.register_model(name, &version, artifact, card.clone()).unwrap();
541
542        let result = registry.register_model(name, &version, artifact, card);
543        assert!(matches!(result, Err(PachaError::AlreadyExists { .. })));
544    }
545
546    #[test]
547    fn test_model_artifact_roundtrip() {
548        let (_dir, registry) = setup();
549
550        let name = "test-model";
551        let version = ModelVersion::new(1, 0, 0);
552        let artifact = b"model binary data here";
553        let card = ModelCard::new("Test");
554
555        registry.register_model(name, &version, artifact, card).unwrap();
556
557        let retrieved = registry.get_model_artifact(name, &version).unwrap();
558        assert_eq!(retrieved, artifact);
559    }
560
561    #[test]
562    fn test_model_stage_transition() {
563        let (_dir, registry) = setup();
564
565        let name = "test-model";
566        let version = ModelVersion::new(1, 0, 0);
567        registry.register_model(name, &version, b"data", ModelCard::new("Test")).unwrap();
568
569        // Development -> Staging is valid
570        registry.transition_model_stage(name, &version, ModelStage::Staging).unwrap();
571
572        let model = registry.get_model(name, &version).unwrap();
573        assert_eq!(model.stage, ModelStage::Staging);
574    }
575
576    #[test]
577    fn test_register_and_get_dataset() {
578        let (_dir, registry) = setup();
579
580        let name = "test-dataset";
581        let version = DatasetVersion::new(1, 0, 0);
582        let data = b"csv,data,here";
583        let datasheet = Datasheet::new("Test dataset");
584
585        let id = registry.register_dataset(name, &version, data, datasheet.clone()).unwrap();
586
587        let dataset = registry.get_dataset(name, &version).unwrap();
588        assert_eq!(dataset.id, id);
589        assert_eq!(dataset.datasheet.purpose, datasheet.purpose);
590    }
591
592    #[test]
593    fn test_dataset_data_roundtrip() {
594        let (_dir, registry) = setup();
595
596        let name = "test-dataset";
597        let version = DatasetVersion::new(1, 0, 0);
598        let data = b"raw dataset bytes";
599        let datasheet = Datasheet::new("Test");
600
601        registry.register_dataset(name, &version, data, datasheet).unwrap();
602
603        let retrieved = registry.get_dataset_data(name, &version).unwrap();
604        assert_eq!(retrieved, data);
605    }
606
607    #[test]
608    fn test_register_and_get_recipe() {
609        let (_dir, registry) = setup();
610
611        let recipe = TrainingRecipe::builder()
612            .name("test-recipe")
613            .version(RecipeVersion::new(1, 0, 0))
614            .description("Test recipe")
615            .hyperparameters(Hyperparameters::default())
616            .build();
617
618        let id = registry.register_recipe(&recipe).unwrap();
619
620        let retrieved = registry.get_recipe("test-recipe", &RecipeVersion::new(1, 0, 0)).unwrap();
621        assert_eq!(retrieved.id, id);
622        assert_eq!(retrieved.description, "Test recipe");
623    }
624
625    #[test]
626    fn test_experiment_run() {
627        let (_dir, registry) = setup();
628
629        let mut run = ExperimentRun::new(Hyperparameters::default());
630        run.log_metric("loss", 0.5, 100);
631
632        let run_id = registry.start_run(run).unwrap();
633
634        let retrieved = registry.get_run(&run_id).unwrap();
635        assert_eq!(retrieved.run_id, run_id);
636        assert_eq!(retrieved.metrics.len(), 1);
637    }
638
639    #[test]
640    fn test_storage_stats() {
641        let (_dir, registry) = setup();
642
643        registry
644            .register_model("model1", &ModelVersion::new(1, 0, 0), b"data1", ModelCard::new("M1"))
645            .unwrap();
646
647        registry
648            .register_dataset(
649                "dataset1",
650                &DatasetVersion::new(1, 0, 0),
651                b"data2",
652                Datasheet::new("D1"),
653            )
654            .unwrap();
655
656        let stats = registry.storage_stats().unwrap();
657        assert_eq!(stats.model_count, 1);
658        assert_eq!(stats.dataset_count, 1);
659        assert_eq!(stats.object_count, 2);
660    }
661
662    #[test]
663    fn test_list_operations() {
664        let (_dir, registry) = setup();
665
666        registry
667            .register_model("model-a", &ModelVersion::new(1, 0, 0), b"data", ModelCard::new("A"))
668            .unwrap();
669        registry
670            .register_model(
671                "model-a",
672                &ModelVersion::new(1, 1, 0),
673                b"data2",
674                ModelCard::new("A v1.1"),
675            )
676            .unwrap();
677        registry
678            .register_model("model-b", &ModelVersion::new(1, 0, 0), b"data3", ModelCard::new("B"))
679            .unwrap();
680
681        let models = registry.list_models().unwrap();
682        assert_eq!(models.len(), 2);
683        assert!(models.contains(&"model-a".to_string()));
684        assert!(models.contains(&"model-b".to_string()));
685
686        let versions = registry.list_model_versions("model-a").unwrap();
687        assert_eq!(versions.len(), 2);
688    }
689}