Skip to main content

pyro_artifacts/
cache.rs

1use crate::artifacts::{
2    Artifact, Artifacts, Playbook, PlaybookBinary, PlaybookIdent, PlaybookSource,
3};
4
5#[cfg(feature = "compiler")]
6use crate::build::AnonPlaybook;
7use crate::cargo::CapabilityIdent;
8
9use cargo_toml::Dependency;
10use std::path::{Path, PathBuf};
11use std::{collections::HashMap, io};
12use tokio::fs;
13
14#[derive(Debug, thiserror::Error)]
15pub enum CacheError {
16    #[error("Not found: {0}")]
17    NotFound(String),
18    #[error("{context}: {error}")]
19    Io {
20        context: String,
21        #[source]
22        error: std::io::Error,
23    },
24}
25
26#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
27pub struct PyroductConfig {
28    pub author: String,
29    pub target: Option<PathBuf>,
30    pub pyroduct: Option<Dependency>,
31    pub build_slots: Option<usize>,
32}
33
34#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
35#[serde(rename_all = "lowercase")]
36pub enum RemoteAddress {
37    Tcp(String),
38    Unix(std::path::PathBuf),
39}
40
41/// A loaded playbook where all the libraries are on disk and the binary is loaded
42#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
43pub struct LoadedPlaybook {
44    pub binary: PlaybookBinary,
45    pub remote: HashMap<CapabilityIdent, RemoteAddress>,
46    #[serde(default)]
47    pub paths: HashMap<CapabilityIdent, PathBuf>,
48
49    pub log_dir: std::path::PathBuf,
50    pub input_dir: std::path::PathBuf,
51    pub output_dir: std::path::PathBuf,
52}
53
54// The lock file is automatically unlocked when `_lock_file` is dropped (fs2 behavior).
55
56pub struct CacheManager {
57    pub root: PathBuf,
58    pub pyroduct: Option<Dependency>,
59    pub author: String,
60}
61
62impl CacheManager {
63    #[tracing::instrument(skip(root), fields(root = %root.display()))]
64    pub async fn new(
65        root: &Path,
66        pyroduct: Option<Dependency>,
67        author: String,
68    ) -> Result<Self, CacheError> {
69        tracing::debug!("Creating CacheManager instance");
70        if !root.exists() {
71            fs::create_dir_all(&root).await.map_err(|e| {
72                let err = CacheError::Io {
73                    context: "Failed to create cache root".to_string(),
74                    error: e,
75                };
76                tracing::error!(error = ?err, "Cache root directory creation failed");
77                err
78            })?;
79        }
80
81        let pyroduct = if let Some(mut dep) = pyroduct {
82            crate::cache::resolve_dependency_path(&mut dep, root);
83            Some(dep.clone())
84        } else {
85            None
86        };
87
88        let manager = Self {
89            root: root.to_path_buf(),
90            pyroduct,
91            author,
92        };
93
94        Ok(manager)
95    }
96
97    #[tracing::instrument]
98    pub async fn from_env() -> Result<Self, CacheError> {
99        tracing::debug!("Loading CacheManager from environment");
100        let root = std::env::var("PYRODUCT")
101            .map(PathBuf::from)
102            .unwrap_or_else(|_| {
103                let home = std::env::var("HOME")
104                    .or_else(|_| std::env::var("USERPROFILE"))
105                    .map(PathBuf::from)
106                    .unwrap_or_else(|_| PathBuf::from("."));
107                home.join(".pyroduct")
108            });
109        let config_path = root.join("config.toml");
110        let content = fs::read_to_string(&config_path)
111            .await
112            .map_err(|error| {
113                let err = CacheError::Io {
114                    context: "Failed to read the configuration".to_string(),
115                    error,
116                };
117                tracing::error!(error = ?err, "Failed to read CacheManager config file at {:?}", config_path);
118                err
119            })?;
120        let config = toml::from_str::<PyroductConfig>(&content).map_err(|error| {
121            let err = CacheError::Io {
122                context: "Failed to parse the configuration".to_string(),
123                error: io::Error::new(io::ErrorKind::InvalidData, error),
124            };
125            tracing::error!(error = ?err, "Failed to parse CacheManager config toml");
126            err
127        })?;
128
129        Self::new(&root, config.pyroduct, config.author).await
130    }
131
132    #[tracing::instrument(skip(self))]
133    pub async fn init(&self) -> Result<(), CacheError> {
134        tracing::debug!("Initializing CacheManager directories");
135        fs::create_dir_all(self.capabilities_base_dir())
136            .await
137            .map_err(|error| {
138                let err = CacheError::Io {
139                    context: format!(
140                        "Failed to create capabilities cache dir in {:?}",
141                        self.capabilities_base_dir()
142                    ),
143                    error,
144                };
145                tracing::error!(error = ?err, "Failed to initialize capabilities dir");
146                err
147            })?;
148
149        fs::create_dir_all(self.interfaces_base_dir())
150            .await
151            .map_err(|error| {
152                let err = CacheError::Io {
153                    context: "Failed to create interfaces cache dir".to_string(),
154                    error,
155                };
156                tracing::error!(error = ?err, "Failed to initialize interfaces dir");
157                err
158            })?;
159
160        let module_dir = self.root.join("modules");
161        fs::create_dir_all(&module_dir).await.map_err(|error| {
162            let err = CacheError::Io {
163                context: "Failed to create modules cache dir".to_string(),
164                error,
165            };
166            tracing::error!(error = ?err, "Failed to initialize modules dir");
167            err
168        })?;
169
170        Ok(())
171    }
172
173    #[tracing::instrument(skip(self))]
174    pub async fn purge(&self) -> Result<(), CacheError> {
175        tracing::debug!("Purging CacheManager directories");
176        let dirs = [
177            self.capabilities_base_dir(),
178            self.interfaces_base_dir(),
179            self.root.join("modules"),
180        ];
181        for dir in dirs {
182            if dir.exists() {
183                fs::remove_dir_all(&dir).await.map_err(|e| CacheError::Io {
184                    context: format!("Failed to remove cache dir {}", dir.display()),
185                    error: e,
186                })?;
187            }
188        }
189        self.init().await?;
190        Ok(())
191    }
192
193    #[tracing::instrument(skip(self))]
194    pub async fn purge_capabilities(&self) -> Result<(), CacheError> {
195        tracing::debug!("Purging Capabilities from CacheManager");
196        let dirs = [
197            self.capabilities_base_dir(),
198            self.interfaces_base_dir(),
199        ];
200        for dir in dirs {
201            if dir.exists() {
202                fs::remove_dir_all(&dir).await.map_err(|e| CacheError::Io {
203                    context: format!("Failed to remove capabilities cache dir {}", dir.display()),
204                    error: e,
205                })?;
206            }
207        }
208        self.init().await?;
209        Ok(())
210    }
211
212    #[tracing::instrument(skip(self))]
213    pub async fn purge_modules(&self) -> Result<(), CacheError> {
214        tracing::debug!("Purging Modules from CacheManager");
215        let dir = self.root.join("modules");
216        if dir.exists() {
217            fs::remove_dir_all(&dir).await.map_err(|e| CacheError::Io {
218                context: format!("Failed to remove modules cache dir {}", dir.display()),
219                error: e,
220            })?;
221        }
222        self.init().await?;
223        Ok(())
224    }
225
226
227    pub async fn list_available_capabilities(
228        &self,
229    ) -> Result<Vec<(String, String, String)>, CacheError> {
230        let base = self.capabilities_base_dir();
231        if !base.exists() {
232            return Ok(Vec::new());
233        }
234
235        let mut results = Vec::new();
236        let mut authors = fs::read_dir(&base).await.map_err(|e| CacheError::Io {
237            context: "Failed to read capabilities base dir".to_string(),
238            error: e,
239        })?;
240
241        while let Some(author_entry) = authors.next_entry().await.map_err(|e| CacheError::Io {
242            context: "Failed to read author entry".to_string(),
243            error: e,
244        })? {
245            let author_path = author_entry.path();
246            if !author_path.is_dir() {
247                continue;
248            }
249            let author_name = author_entry.file_name().to_string_lossy().to_string();
250
251            let mut names = fs::read_dir(&author_path)
252                .await
253                .map_err(|e| CacheError::Io {
254                    context: format!("Failed to read author dir: {}", author_path.display()),
255                    error: e,
256                })?;
257
258            while let Some(name_entry) = names.next_entry().await.map_err(|e| CacheError::Io {
259                context: "Failed to read name entry".to_string(),
260                error: e,
261            })? {
262                let name_path = name_entry.path();
263                if !name_path.is_dir() {
264                    continue;
265                }
266                let cap_name = name_entry.file_name().to_string_lossy().to_string();
267
268                let mut versions = fs::read_dir(&name_path).await.map_err(|e| CacheError::Io {
269                    context: format!("Failed to read name dir: {}", name_path.display()),
270                    error: e,
271                })?;
272
273                while let Some(version_entry) =
274                    versions.next_entry().await.map_err(|e| CacheError::Io {
275                        context: "Failed to read version entry".to_string(),
276                        error: e,
277                    })?
278                {
279                    let version_path = version_entry.path();
280                    if !version_path.is_dir() {
281                        continue;
282                    }
283                    let version = version_entry.file_name().to_string_lossy().to_string();
284
285                    if version_path.join("interface.json").exists() {
286                        results.push((author_name.clone(), cap_name.clone(), version));
287                    }
288                }
289            }
290        }
291
292        Ok(results)
293    }
294
295    pub fn capabilities_base_dir(&self) -> PathBuf {
296        self.root.join("capabilities")
297    }
298
299    pub fn capabilities_dir(&self, author: &str, name: &str, version: &str) -> PathBuf {
300        self.capabilities_base_dir()
301            .join(author)
302            .join(name)
303            .join(version)
304    }
305
306    pub fn interface_dir(&self, author: &str, name: &str, version: &str) -> PathBuf {
307        self.interfaces_base_dir()
308            .join(author)
309            .join(name)
310            .join(version)
311    }
312
313    // --- Named module directory helpers ---
314
315    pub fn modules_base_dir(&self) -> PathBuf {
316        self.root.join("modules")
317    }
318
319    pub fn module_dir(&self, author: &str, name: &str, version: &str) -> PathBuf {
320        self.modules_base_dir()
321            .join(author)
322            .join(name)
323            .join(version)
324    }
325
326    pub async fn list_available_modules(
327        &self,
328    ) -> Result<Vec<(String, String, String)>, CacheError> {
329        let base = self.modules_base_dir();
330        if !base.exists() {
331            return Ok(Vec::new());
332        }
333
334        let mut results = Vec::new();
335        let mut authors = fs::read_dir(&base).await.map_err(|e| CacheError::Io {
336            context: "Failed to read modules base dir".to_string(),
337            error: e,
338        })?;
339
340        while let Some(author_entry) = authors.next_entry().await.map_err(|e| CacheError::Io {
341            context: "Failed to read author entry".to_string(),
342            error: e,
343        })? {
344            let author_path = author_entry.path();
345            if !author_path.is_dir() {
346                continue;
347            }
348            let author_name = author_entry.file_name().to_string_lossy().to_string();
349
350            let mut names = fs::read_dir(&author_path)
351                .await
352                .map_err(|e| CacheError::Io {
353                    context: format!("Failed to read author dir: {}", author_path.display()),
354                    error: e,
355                })?;
356
357            while let Some(name_entry) = names.next_entry().await.map_err(|e| CacheError::Io {
358                context: "Failed to read name entry".to_string(),
359                error: e,
360            })? {
361                let name_path = name_entry.path();
362                if !name_path.is_dir() {
363                    continue;
364                }
365                let mod_name = name_entry.file_name().to_string_lossy().to_string();
366
367                let mut versions = fs::read_dir(&name_path).await.map_err(|e| CacheError::Io {
368                    context: format!("Failed to read name dir: {}", name_path.display()),
369                    error: e,
370                })?;
371
372                while let Some(version_entry) =
373                    versions.next_entry().await.map_err(|e| CacheError::Io {
374                        context: "Failed to read version entry".to_string(),
375                        error: e,
376                    })?
377                {
378                    let version_path = version_entry.path();
379                    if !version_path.is_dir() {
380                        continue;
381                    }
382                    let version = version_entry.file_name().to_string_lossy().to_string();
383
384                    if version_path.join("spec.json").exists() {
385                        results.push((author_name.clone(), mod_name.clone(), version));
386                    }
387                }
388            }
389        }
390
391        Ok(results)
392    }
393
394    pub fn interfaces_base_dir(&self) -> PathBuf {
395        self.root.join("interfaces")
396    }
397
398    pub async fn capability_interface_spec(
399        &self,
400        author: &str,
401        name: &str,
402        version: &str,
403    ) -> Result<String, CacheError> {
404        let path = self
405            .capabilities_dir(author, name, version)
406            .join("interface.json");
407        fs::read_to_string(&path)
408            .await
409            .map_err(|error| CacheError::Io {
410                context: format!("Failed to read interface.json from {}", path.display()),
411                error,
412            })
413    }
414
415    pub async fn capability_binary_path(
416        &self,
417        author: &str,
418        name: &str,
419        version: &str,
420    ) -> Result<PathBuf, CacheError> {
421        let base_dir = self.capabilities_dir(author, name, version);
422
423        #[cfg(target_os = "linux")]
424        let lib_file = "lib.so";
425        #[cfg(target_os = "macos")]
426        let lib_file = "lib.dylib";
427        #[cfg(target_os = "windows")]
428        let lib_file = "lib.dll";
429        #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
430        let lib_file = "lib.so";
431
432        let path = base_dir.join(lib_file);
433        if !path.exists() {
434            Err(CacheError::NotFound(format!(
435                "Missing {} binary for this system",
436                path.display()
437            )))
438        } else {
439            Ok(path)
440        }
441    }
442
443    pub async fn capability_config_spec(
444        &self,
445        author: &str,
446        name: &str,
447        version: &str,
448    ) -> Result<Option<String>, CacheError> {
449        let path = self
450            .capabilities_dir(author, name, version)
451            .join("config.json");
452        if path.exists() {
453            let content = fs::read_to_string(&path)
454                .await
455                .map_err(|error| CacheError::Io {
456                    context: format!("Failed to read config.json from {}", path.display()),
457                    error,
458                })?;
459            Ok(Some(content))
460        } else {
461            Ok(None)
462        }
463    }
464
465    #[tracing::instrument(skip(self))]
466    pub async fn remove_module(
467        &self,
468        author: &str,
469        name: &str,
470        version: &str,
471    ) -> Result<(), CacheError> {
472        tracing::debug!("Removing module from cache");
473        let path = self.module_dir(author, name, version);
474        if path.exists() {
475            tokio::fs::remove_dir_all(&path).await.map_err(|error| {
476                let err = CacheError::Io {
477                    context: "Unable to remove module".to_string(),
478                    error,
479                };
480                tracing::error!(error = ?err, "Failed to remove playbook at {:?}", path);
481                err
482            })?;
483        }
484        Ok(())
485    }
486
487    // --- Named module lookup ---
488
489    #[tracing::instrument(skip(self))]
490    pub async fn get_named_binary(
491        &self,
492        author: &str,
493        package: &str,
494        version: &str,
495    ) -> Result<PlaybookBinary, CacheError> {
496        tracing::debug!("Retrieving named playbook binary");
497        let path = self.module_dir(author, package, version);
498        if path.exists() {
499            let binary = PlaybookBinary::from_dir(&path).await.map_err(|error| {
500                let err = CacheError::Io {
501                    context: "Unable to load named module binary".to_string(),
502                    error,
503                };
504                tracing::error!(error = ?err, "Failed to load named module binary from {:?}", path);
505                err
506            })?;
507            Ok(binary)
508        } else {
509            let err = CacheError::NotFound(format!(
510                "Missing named module binary for {}/{}/{}",
511                author, package, version
512            ));
513            tracing::debug!("Named module binary not found at {:?}", path);
514            Err(err)
515        }
516    }
517
518    #[tracing::instrument(skip(self))]
519    pub async fn get_named_source(
520        &self,
521        author: &str,
522        package: &str,
523        version: &str,
524    ) -> Result<PlaybookSource, CacheError> {
525        tracing::debug!("Retrieving named playbook source");
526        let path = self.module_dir(author, package, version);
527        if path.exists() {
528            let mut source = PlaybookSource::from_dir(&path).await.map_err(|error| {
529                let err = CacheError::Io {
530                    context: "Unable to load named module source".to_string(),
531                    error,
532                };
533                tracing::error!(error = ?err, "Failed to load named module source from {:?}", path);
534                err
535            })?;
536            source.manifest.module = crate::cargo::CapabilityIdent {
537                author: author.to_string(),
538                package: package.to_string(),
539                version: version.to_string(),
540            };
541            Ok(source)
542        } else {
543            let err = CacheError::NotFound(format!(
544                "Missing named module source for {}/{}/{}",
545                author, package, version
546            ));
547            tracing::debug!("Named module source not found at {:?}", path);
548            Err(err)
549        }
550    }
551
552    #[tracing::instrument(skip(self, artifacts))]
553    pub async fn write_artifacts(&self, artifacts: &Artifacts) -> Result<(), CacheError> {
554        tracing::debug!("Writing artifacts to CacheManager");
555        let res = async {
556            match &artifacts {
557                Artifacts::CapabilityBinary(capability) => {
558                    let path = self.capabilities_dir(
559                        &capability.ident.author,
560                        &capability.ident.package,
561                        &capability.ident.version,
562                    );
563                    capability
564                        .write_to_directory(&path)
565                        .await
566                        .map_err(|e| CacheError::Io {
567                            context: format!("Failed to write artifacts to {}", path.display()),
568                            error: e,
569                        })
570                }
571                Artifacts::CapabilitySource(capability) => {
572                    let path = self.capabilities_dir(
573                        &capability.manifest.capability.author,
574                        &capability.manifest.capability.package,
575                        &capability.manifest.capability.version,
576                    );
577                    capability
578                        .write_to_directory(&path)
579                        .await
580                        .map_err(|e| CacheError::Io {
581                            context: format!("Failed to write artifacts to {}", path.display()),
582                            error: e,
583                        })
584                }
585                Artifacts::Interface(interface) => {
586                    let path = self.interface_dir(
587                        &interface.manifest.capability.author,
588                        &interface.manifest.capability.package,
589                        &interface.manifest.capability.version,
590                    );
591                    fs::create_dir_all(&path)
592                        .await
593                        .map_err(|e| CacheError::Io {
594                            context: format!("Failed to create  {}", path.display()),
595                            error: e,
596                        })?;
597                    let mut manifest = interface.manifest.clone();
598                    if let Some(pyroduct) = &self.pyroduct {
599                        manifest.pyroduct = pyroduct.clone();
600                    }
601                    let cargo_path = path.join("Cargo.toml");
602                    let cargo = manifest.clone().to_interface_manifest();
603                    let cargo = toml::to_string_pretty(&cargo).map_err(|e| CacheError::Io {
604                        context: format!(
605                            "Failed to serialize Cargo.toml to {}",
606                            cargo_path.display()
607                        ),
608                        error: io::Error::new(io::ErrorKind::InvalidData, e),
609                    })?;
610                    fs::write(&cargo_path, cargo)
611                        .await
612                        .map_err(|e| CacheError::Io {
613                            context: format!(
614                                "Failed to write Cargo.toml to {}",
615                                cargo_path.display()
616                            ),
617                            error: e,
618                        })?;
619                    interface
620                        .write_to_directory(&path)
621                        .await
622                        .map_err(|e| CacheError::Io {
623                            context: format!("Failed to write artifacts to {}", path.display()),
624                            error: e,
625                        })
626                }
627                Artifacts::Playbook(Playbook::Binary(binary)) => {
628                    let ident = &binary.spec.ident;
629                    let path = self.module_dir(&ident.author, &ident.package, &ident.version);
630                    fs::create_dir_all(&path)
631                        .await
632                        .map_err(|e| CacheError::Io {
633                            context: format!("Failed to create module dir {}", path.display()),
634                            error: e,
635                        })?;
636                    binary
637                        .write_to_directory(&path)
638                        .await
639                        .map_err(|e| CacheError::Io {
640                            context: format!("Failed to write artifacts to {}", path.display()),
641                            error: e,
642                        })
643                }
644                Artifacts::Playbook(Playbook::Source(source)) => {
645                    let ident = source.ident();
646                    let path = self.module_dir(&ident.author, &ident.package, &ident.version);
647                    fs::create_dir_all(&path)
648                        .await
649                        .map_err(|e| CacheError::Io {
650                            context: format!("Failed to create module dir {}", path.display()),
651                            error: e,
652                        })?;
653                    source
654                        .write_to_directory(&path)
655                        .await
656                        .map_err(|e| CacheError::Io {
657                            context: format!("Failed to write artifacts to {}", path.display()),
658                            error: e,
659                        })
660                }
661            }
662        }
663        .await;
664
665        if let Err(ref e) = res {
666            tracing::error!(error = ?e, "Failed to write artifacts to cache");
667        } else {
668            tracing::debug!("Successfully wrote artifacts to cache");
669        }
670        res
671    }
672
673    #[tracing::instrument(skip(self, remotes, log_dir, input_dir, output_dir), fields(author = playbook.author, name = playbook.package, version = playbook.version))]
674    pub async fn load_playbook(
675        &self,
676        playbook: PlaybookIdent,
677        remotes: HashMap<CapabilityIdent, RemoteAddress>,
678        log_dir: impl AsRef<Path>,
679        input_dir: impl AsRef<Path>,
680        output_dir: impl AsRef<Path>,
681    ) -> Result<LoadedPlaybook, CacheError> {
682        tracing::debug!("Loading playbook");
683        let res = async {
684            let binary = self.get_named_binary(&playbook.author, &playbook.package, &playbook.version).await?;
685            let mut paths = HashMap::new();
686            let mut remote = HashMap::new();
687
688            tracing::debug!(capabilities = ?binary.spec.capabilities, "Loading playbook capabilities");
689            tracing::debug!(
690                config_keys = ?binary
691                    .configurations
692                    .iter()
693                    .map(|c| &c.package)
694                    .collect::<Vec<_>>(),
695                "Loaded playbook configuration keys"
696            );
697
698            for cap in &binary.spec.capabilities {
699                if let Some(addr) = remotes.get(cap) {
700                    remote.insert(cap.clone(), addr.clone());
701                } else if binary
702                    .configurations
703                    .iter()
704                    .any(|c| c.package == cap.package)
705                {
706                    let path = self
707                        .capability_binary_path(&cap.author, &cap.package, &cap.version)
708                        .await?;
709                    paths.insert(cap.clone(), path);
710                } else {
711                    return Err(CacheError::NotFound(format!("Capability {} not found", cap.package)));
712                }
713            }
714
715            Ok(LoadedPlaybook {
716                binary,
717                remote,
718                paths,
719                log_dir: log_dir.as_ref().to_path_buf(),
720                input_dir: input_dir.as_ref().to_path_buf(),
721                output_dir: output_dir.as_ref().to_path_buf(),
722            })
723        }.await;
724
725        if let Err(ref e) = res {
726            tracing::error!(error = ?e, "Failed to load playbook");
727        } else {
728            tracing::debug!("Successfully loaded playbook");
729        }
730        res
731    }
732
733    #[cfg(feature = "compiler")]
734    pub fn convert_anon_playbook(&self, playbook: AnonPlaybook) -> PlaybookSource {
735        let author = self.author.clone();
736        let mut resolved_capabilities = Vec::new();
737        for cap in &playbook.configurations {
738            resolved_capabilities.push(CapabilityIdent {
739                author: cap.author.clone(),
740                package: cap.package.clone(),
741                version: cap.version.clone(),
742            });
743        }
744        PlaybookSource::new(
745            crate::artifacts::PlaybookIdent {
746                author,
747                package: playbook.package,
748                version: "0.1.0".to_string(),
749            },
750            crate::artifacts::ModuleDependencies {
751                dependencies: playbook.dependencies,
752                capabilities: resolved_capabilities,
753            },
754            playbook.configurations,
755            playbook.source,
756            playbook.interconnect,
757        )
758    }
759}
760
761pub(crate) fn resolve_dependency_path(dep: &mut Dependency, base: &std::path::Path) {
762    if let Dependency::Detailed(detail) = dep
763        && let Some(ref mut p) = detail.path
764    {
765        let path = std::path::Path::new(p.as_str());
766        if path.is_relative() {
767            let absolute = base.join(path);
768            *p = absolute
769                .canonicalize()
770                .unwrap_or(absolute)
771                .to_string_lossy()
772                .into_owned();
773        }
774    }
775}