1use crate::artifacts::{
2 Artifact, Artifacts, Playbook, PlaybookBinary, PlaybookIdent, PlaybookSource,
3};
4
5#[cfg(feature = "compiler")]
6use crate::build::AnonPlaybook;
7use crate::cargo::CapabilityIdent;
8
9use cargo_toml::Dependency;
10use std::path::{Path, PathBuf};
11use std::{collections::HashMap, io};
12use tokio::fs;
13
14#[derive(Debug, thiserror::Error)]
15pub enum CacheError {
16 #[error("Not found: {0}")]
17 NotFound(String),
18 #[error("{context}: {error}")]
19 Io {
20 context: String,
21 #[source]
22 error: std::io::Error,
23 },
24}
25
26#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
27pub struct PyroductConfig {
28 pub author: String,
29 pub target: Option<PathBuf>,
30 pub pyroduct: Option<Dependency>,
31 pub build_slots: Option<usize>,
32}
33
34#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
35#[serde(rename_all = "lowercase")]
36pub enum RemoteAddress {
37 Tcp(String),
38 Unix(std::path::PathBuf),
39}
40
41#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
43pub struct LoadedPlaybook {
44 pub binary: PlaybookBinary,
45 pub remote: HashMap<CapabilityIdent, RemoteAddress>,
46 #[serde(default)]
47 pub paths: HashMap<CapabilityIdent, PathBuf>,
48
49 pub log_dir: std::path::PathBuf,
50 pub input_dir: std::path::PathBuf,
51 pub output_dir: std::path::PathBuf,
52}
53
54pub struct CacheManager {
57 pub root: PathBuf,
58 pub pyroduct: Option<Dependency>,
59 pub author: String,
60}
61
62impl CacheManager {
63 #[tracing::instrument(skip(root), fields(root = %root.display()))]
64 pub async fn new(
65 root: &Path,
66 pyroduct: Option<Dependency>,
67 author: String,
68 ) -> Result<Self, CacheError> {
69 tracing::debug!("Creating CacheManager instance");
70 if !root.exists() {
71 fs::create_dir_all(&root).await.map_err(|e| {
72 let err = CacheError::Io {
73 context: "Failed to create cache root".to_string(),
74 error: e,
75 };
76 tracing::error!(error = ?err, "Cache root directory creation failed");
77 err
78 })?;
79 }
80
81 let pyroduct = if let Some(mut dep) = pyroduct {
82 crate::cache::resolve_dependency_path(&mut dep, root);
83 Some(dep.clone())
84 } else {
85 None
86 };
87
88 let manager = Self {
89 root: root.to_path_buf(),
90 pyroduct,
91 author,
92 };
93
94 Ok(manager)
95 }
96
97 #[tracing::instrument]
98 pub async fn from_env() -> Result<Self, CacheError> {
99 tracing::debug!("Loading CacheManager from environment");
100 let root = std::env::var("PYRODUCT")
101 .map(PathBuf::from)
102 .unwrap_or_else(|_| {
103 let home = std::env::var("HOME")
104 .or_else(|_| std::env::var("USERPROFILE"))
105 .map(PathBuf::from)
106 .unwrap_or_else(|_| PathBuf::from("."));
107 home.join(".pyroduct")
108 });
109 let config_path = root.join("config.toml");
110 let content = fs::read_to_string(&config_path)
111 .await
112 .map_err(|error| {
113 let err = CacheError::Io {
114 context: "Failed to read the configuration".to_string(),
115 error,
116 };
117 tracing::error!(error = ?err, "Failed to read CacheManager config file at {:?}", config_path);
118 err
119 })?;
120 let config = toml::from_str::<PyroductConfig>(&content).map_err(|error| {
121 let err = CacheError::Io {
122 context: "Failed to parse the configuration".to_string(),
123 error: io::Error::new(io::ErrorKind::InvalidData, error),
124 };
125 tracing::error!(error = ?err, "Failed to parse CacheManager config toml");
126 err
127 })?;
128
129 Self::new(&root, config.pyroduct, config.author).await
130 }
131
132 #[tracing::instrument(skip(self))]
133 pub async fn init(&self) -> Result<(), CacheError> {
134 tracing::debug!("Initializing CacheManager directories");
135 fs::create_dir_all(self.capabilities_base_dir())
136 .await
137 .map_err(|error| {
138 let err = CacheError::Io {
139 context: format!(
140 "Failed to create capabilities cache dir in {:?}",
141 self.capabilities_base_dir()
142 ),
143 error,
144 };
145 tracing::error!(error = ?err, "Failed to initialize capabilities dir");
146 err
147 })?;
148
149 fs::create_dir_all(self.interfaces_base_dir())
150 .await
151 .map_err(|error| {
152 let err = CacheError::Io {
153 context: "Failed to create interfaces cache dir".to_string(),
154 error,
155 };
156 tracing::error!(error = ?err, "Failed to initialize interfaces dir");
157 err
158 })?;
159
160 let module_dir = self.root.join("modules");
161 fs::create_dir_all(&module_dir).await.map_err(|error| {
162 let err = CacheError::Io {
163 context: "Failed to create modules cache dir".to_string(),
164 error,
165 };
166 tracing::error!(error = ?err, "Failed to initialize modules dir");
167 err
168 })?;
169
170 Ok(())
171 }
172
173 #[tracing::instrument(skip(self))]
174 pub async fn purge(&self) -> Result<(), CacheError> {
175 tracing::debug!("Purging CacheManager directories");
176 let dirs = [
177 self.capabilities_base_dir(),
178 self.interfaces_base_dir(),
179 self.root.join("modules"),
180 ];
181 for dir in dirs {
182 if dir.exists() {
183 fs::remove_dir_all(&dir).await.map_err(|e| CacheError::Io {
184 context: format!("Failed to remove cache dir {}", dir.display()),
185 error: e,
186 })?;
187 }
188 }
189 self.init().await?;
190 Ok(())
191 }
192
193 #[tracing::instrument(skip(self))]
194 pub async fn purge_capabilities(&self) -> Result<(), CacheError> {
195 tracing::debug!("Purging Capabilities from CacheManager");
196 let dirs = [
197 self.capabilities_base_dir(),
198 self.interfaces_base_dir(),
199 ];
200 for dir in dirs {
201 if dir.exists() {
202 fs::remove_dir_all(&dir).await.map_err(|e| CacheError::Io {
203 context: format!("Failed to remove capabilities cache dir {}", dir.display()),
204 error: e,
205 })?;
206 }
207 }
208 self.init().await?;
209 Ok(())
210 }
211
212 #[tracing::instrument(skip(self))]
213 pub async fn purge_modules(&self) -> Result<(), CacheError> {
214 tracing::debug!("Purging Modules from CacheManager");
215 let dir = self.root.join("modules");
216 if dir.exists() {
217 fs::remove_dir_all(&dir).await.map_err(|e| CacheError::Io {
218 context: format!("Failed to remove modules cache dir {}", dir.display()),
219 error: e,
220 })?;
221 }
222 self.init().await?;
223 Ok(())
224 }
225
226
227 pub async fn list_available_capabilities(
228 &self,
229 ) -> Result<Vec<(String, String, String)>, CacheError> {
230 let base = self.capabilities_base_dir();
231 if !base.exists() {
232 return Ok(Vec::new());
233 }
234
235 let mut results = Vec::new();
236 let mut authors = fs::read_dir(&base).await.map_err(|e| CacheError::Io {
237 context: "Failed to read capabilities base dir".to_string(),
238 error: e,
239 })?;
240
241 while let Some(author_entry) = authors.next_entry().await.map_err(|e| CacheError::Io {
242 context: "Failed to read author entry".to_string(),
243 error: e,
244 })? {
245 let author_path = author_entry.path();
246 if !author_path.is_dir() {
247 continue;
248 }
249 let author_name = author_entry.file_name().to_string_lossy().to_string();
250
251 let mut names = fs::read_dir(&author_path)
252 .await
253 .map_err(|e| CacheError::Io {
254 context: format!("Failed to read author dir: {}", author_path.display()),
255 error: e,
256 })?;
257
258 while let Some(name_entry) = names.next_entry().await.map_err(|e| CacheError::Io {
259 context: "Failed to read name entry".to_string(),
260 error: e,
261 })? {
262 let name_path = name_entry.path();
263 if !name_path.is_dir() {
264 continue;
265 }
266 let cap_name = name_entry.file_name().to_string_lossy().to_string();
267
268 let mut versions = fs::read_dir(&name_path).await.map_err(|e| CacheError::Io {
269 context: format!("Failed to read name dir: {}", name_path.display()),
270 error: e,
271 })?;
272
273 while let Some(version_entry) =
274 versions.next_entry().await.map_err(|e| CacheError::Io {
275 context: "Failed to read version entry".to_string(),
276 error: e,
277 })?
278 {
279 let version_path = version_entry.path();
280 if !version_path.is_dir() {
281 continue;
282 }
283 let version = version_entry.file_name().to_string_lossy().to_string();
284
285 if version_path.join("interface.json").exists() {
286 results.push((author_name.clone(), cap_name.clone(), version));
287 }
288 }
289 }
290 }
291
292 Ok(results)
293 }
294
295 pub fn capabilities_base_dir(&self) -> PathBuf {
296 self.root.join("capabilities")
297 }
298
299 pub fn capabilities_dir(&self, author: &str, name: &str, version: &str) -> PathBuf {
300 self.capabilities_base_dir()
301 .join(author)
302 .join(name)
303 .join(version)
304 }
305
306 pub fn interface_dir(&self, author: &str, name: &str, version: &str) -> PathBuf {
307 self.interfaces_base_dir()
308 .join(author)
309 .join(name)
310 .join(version)
311 }
312
313 pub fn modules_base_dir(&self) -> PathBuf {
316 self.root.join("modules")
317 }
318
319 pub fn module_dir(&self, author: &str, name: &str, version: &str) -> PathBuf {
320 self.modules_base_dir()
321 .join(author)
322 .join(name)
323 .join(version)
324 }
325
326 pub async fn list_available_modules(
327 &self,
328 ) -> Result<Vec<(String, String, String)>, CacheError> {
329 let base = self.modules_base_dir();
330 if !base.exists() {
331 return Ok(Vec::new());
332 }
333
334 let mut results = Vec::new();
335 let mut authors = fs::read_dir(&base).await.map_err(|e| CacheError::Io {
336 context: "Failed to read modules base dir".to_string(),
337 error: e,
338 })?;
339
340 while let Some(author_entry) = authors.next_entry().await.map_err(|e| CacheError::Io {
341 context: "Failed to read author entry".to_string(),
342 error: e,
343 })? {
344 let author_path = author_entry.path();
345 if !author_path.is_dir() {
346 continue;
347 }
348 let author_name = author_entry.file_name().to_string_lossy().to_string();
349
350 let mut names = fs::read_dir(&author_path)
351 .await
352 .map_err(|e| CacheError::Io {
353 context: format!("Failed to read author dir: {}", author_path.display()),
354 error: e,
355 })?;
356
357 while let Some(name_entry) = names.next_entry().await.map_err(|e| CacheError::Io {
358 context: "Failed to read name entry".to_string(),
359 error: e,
360 })? {
361 let name_path = name_entry.path();
362 if !name_path.is_dir() {
363 continue;
364 }
365 let mod_name = name_entry.file_name().to_string_lossy().to_string();
366
367 let mut versions = fs::read_dir(&name_path).await.map_err(|e| CacheError::Io {
368 context: format!("Failed to read name dir: {}", name_path.display()),
369 error: e,
370 })?;
371
372 while let Some(version_entry) =
373 versions.next_entry().await.map_err(|e| CacheError::Io {
374 context: "Failed to read version entry".to_string(),
375 error: e,
376 })?
377 {
378 let version_path = version_entry.path();
379 if !version_path.is_dir() {
380 continue;
381 }
382 let version = version_entry.file_name().to_string_lossy().to_string();
383
384 if version_path.join("spec.json").exists() {
385 results.push((author_name.clone(), mod_name.clone(), version));
386 }
387 }
388 }
389 }
390
391 Ok(results)
392 }
393
394 pub fn interfaces_base_dir(&self) -> PathBuf {
395 self.root.join("interfaces")
396 }
397
398 pub async fn capability_interface_spec(
399 &self,
400 author: &str,
401 name: &str,
402 version: &str,
403 ) -> Result<String, CacheError> {
404 let path = self
405 .capabilities_dir(author, name, version)
406 .join("interface.json");
407 fs::read_to_string(&path)
408 .await
409 .map_err(|error| CacheError::Io {
410 context: format!("Failed to read interface.json from {}", path.display()),
411 error,
412 })
413 }
414
415 pub async fn capability_binary_path(
416 &self,
417 author: &str,
418 name: &str,
419 version: &str,
420 ) -> Result<PathBuf, CacheError> {
421 let base_dir = self.capabilities_dir(author, name, version);
422
423 #[cfg(target_os = "linux")]
424 let lib_file = "lib.so";
425 #[cfg(target_os = "macos")]
426 let lib_file = "lib.dylib";
427 #[cfg(target_os = "windows")]
428 let lib_file = "lib.dll";
429 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
430 let lib_file = "lib.so";
431
432 let path = base_dir.join(lib_file);
433 if !path.exists() {
434 Err(CacheError::NotFound(format!(
435 "Missing {} binary for this system",
436 path.display()
437 )))
438 } else {
439 Ok(path)
440 }
441 }
442
443 pub async fn capability_config_spec(
444 &self,
445 author: &str,
446 name: &str,
447 version: &str,
448 ) -> Result<Option<String>, CacheError> {
449 let path = self
450 .capabilities_dir(author, name, version)
451 .join("config.json");
452 if path.exists() {
453 let content = fs::read_to_string(&path)
454 .await
455 .map_err(|error| CacheError::Io {
456 context: format!("Failed to read config.json from {}", path.display()),
457 error,
458 })?;
459 Ok(Some(content))
460 } else {
461 Ok(None)
462 }
463 }
464
465 #[tracing::instrument(skip(self))]
466 pub async fn remove_module(
467 &self,
468 author: &str,
469 name: &str,
470 version: &str,
471 ) -> Result<(), CacheError> {
472 tracing::debug!("Removing module from cache");
473 let path = self.module_dir(author, name, version);
474 if path.exists() {
475 tokio::fs::remove_dir_all(&path).await.map_err(|error| {
476 let err = CacheError::Io {
477 context: "Unable to remove module".to_string(),
478 error,
479 };
480 tracing::error!(error = ?err, "Failed to remove playbook at {:?}", path);
481 err
482 })?;
483 }
484 Ok(())
485 }
486
487 #[tracing::instrument(skip(self))]
490 pub async fn get_named_binary(
491 &self,
492 author: &str,
493 package: &str,
494 version: &str,
495 ) -> Result<PlaybookBinary, CacheError> {
496 tracing::debug!("Retrieving named playbook binary");
497 let path = self.module_dir(author, package, version);
498 if path.exists() {
499 let binary = PlaybookBinary::from_dir(&path).await.map_err(|error| {
500 let err = CacheError::Io {
501 context: "Unable to load named module binary".to_string(),
502 error,
503 };
504 tracing::error!(error = ?err, "Failed to load named module binary from {:?}", path);
505 err
506 })?;
507 Ok(binary)
508 } else {
509 let err = CacheError::NotFound(format!(
510 "Missing named module binary for {}/{}/{}",
511 author, package, version
512 ));
513 tracing::debug!("Named module binary not found at {:?}", path);
514 Err(err)
515 }
516 }
517
518 #[tracing::instrument(skip(self))]
519 pub async fn get_named_source(
520 &self,
521 author: &str,
522 package: &str,
523 version: &str,
524 ) -> Result<PlaybookSource, CacheError> {
525 tracing::debug!("Retrieving named playbook source");
526 let path = self.module_dir(author, package, version);
527 if path.exists() {
528 let mut source = PlaybookSource::from_dir(&path).await.map_err(|error| {
529 let err = CacheError::Io {
530 context: "Unable to load named module source".to_string(),
531 error,
532 };
533 tracing::error!(error = ?err, "Failed to load named module source from {:?}", path);
534 err
535 })?;
536 source.manifest.module = crate::cargo::CapabilityIdent {
537 author: author.to_string(),
538 package: package.to_string(),
539 version: version.to_string(),
540 };
541 Ok(source)
542 } else {
543 let err = CacheError::NotFound(format!(
544 "Missing named module source for {}/{}/{}",
545 author, package, version
546 ));
547 tracing::debug!("Named module source not found at {:?}", path);
548 Err(err)
549 }
550 }
551
552 #[tracing::instrument(skip(self, artifacts))]
553 pub async fn write_artifacts(&self, artifacts: &Artifacts) -> Result<(), CacheError> {
554 tracing::debug!("Writing artifacts to CacheManager");
555 let res = async {
556 match &artifacts {
557 Artifacts::CapabilityBinary(capability) => {
558 let path = self.capabilities_dir(
559 &capability.ident.author,
560 &capability.ident.package,
561 &capability.ident.version,
562 );
563 capability
564 .write_to_directory(&path)
565 .await
566 .map_err(|e| CacheError::Io {
567 context: format!("Failed to write artifacts to {}", path.display()),
568 error: e,
569 })
570 }
571 Artifacts::CapabilitySource(capability) => {
572 let path = self.capabilities_dir(
573 &capability.manifest.capability.author,
574 &capability.manifest.capability.package,
575 &capability.manifest.capability.version,
576 );
577 capability
578 .write_to_directory(&path)
579 .await
580 .map_err(|e| CacheError::Io {
581 context: format!("Failed to write artifacts to {}", path.display()),
582 error: e,
583 })
584 }
585 Artifacts::Interface(interface) => {
586 let path = self.interface_dir(
587 &interface.manifest.capability.author,
588 &interface.manifest.capability.package,
589 &interface.manifest.capability.version,
590 );
591 fs::create_dir_all(&path)
592 .await
593 .map_err(|e| CacheError::Io {
594 context: format!("Failed to create {}", path.display()),
595 error: e,
596 })?;
597 let mut manifest = interface.manifest.clone();
598 if let Some(pyroduct) = &self.pyroduct {
599 manifest.pyroduct = pyroduct.clone();
600 }
601 let cargo_path = path.join("Cargo.toml");
602 let cargo = manifest.clone().to_interface_manifest();
603 let cargo = toml::to_string_pretty(&cargo).map_err(|e| CacheError::Io {
604 context: format!(
605 "Failed to serialize Cargo.toml to {}",
606 cargo_path.display()
607 ),
608 error: io::Error::new(io::ErrorKind::InvalidData, e),
609 })?;
610 fs::write(&cargo_path, cargo)
611 .await
612 .map_err(|e| CacheError::Io {
613 context: format!(
614 "Failed to write Cargo.toml to {}",
615 cargo_path.display()
616 ),
617 error: e,
618 })?;
619 interface
620 .write_to_directory(&path)
621 .await
622 .map_err(|e| CacheError::Io {
623 context: format!("Failed to write artifacts to {}", path.display()),
624 error: e,
625 })
626 }
627 Artifacts::Playbook(Playbook::Binary(binary)) => {
628 let ident = &binary.spec.ident;
629 let path = self.module_dir(&ident.author, &ident.package, &ident.version);
630 fs::create_dir_all(&path)
631 .await
632 .map_err(|e| CacheError::Io {
633 context: format!("Failed to create module dir {}", path.display()),
634 error: e,
635 })?;
636 binary
637 .write_to_directory(&path)
638 .await
639 .map_err(|e| CacheError::Io {
640 context: format!("Failed to write artifacts to {}", path.display()),
641 error: e,
642 })
643 }
644 Artifacts::Playbook(Playbook::Source(source)) => {
645 let ident = source.ident();
646 let path = self.module_dir(&ident.author, &ident.package, &ident.version);
647 fs::create_dir_all(&path)
648 .await
649 .map_err(|e| CacheError::Io {
650 context: format!("Failed to create module dir {}", path.display()),
651 error: e,
652 })?;
653 source
654 .write_to_directory(&path)
655 .await
656 .map_err(|e| CacheError::Io {
657 context: format!("Failed to write artifacts to {}", path.display()),
658 error: e,
659 })
660 }
661 }
662 }
663 .await;
664
665 if let Err(ref e) = res {
666 tracing::error!(error = ?e, "Failed to write artifacts to cache");
667 } else {
668 tracing::debug!("Successfully wrote artifacts to cache");
669 }
670 res
671 }
672
673 #[tracing::instrument(skip(self, remotes, log_dir, input_dir, output_dir), fields(author = playbook.author, name = playbook.package, version = playbook.version))]
674 pub async fn load_playbook(
675 &self,
676 playbook: PlaybookIdent,
677 remotes: HashMap<CapabilityIdent, RemoteAddress>,
678 log_dir: impl AsRef<Path>,
679 input_dir: impl AsRef<Path>,
680 output_dir: impl AsRef<Path>,
681 ) -> Result<LoadedPlaybook, CacheError> {
682 tracing::debug!("Loading playbook");
683 let res = async {
684 let binary = self.get_named_binary(&playbook.author, &playbook.package, &playbook.version).await?;
685 let mut paths = HashMap::new();
686 let mut remote = HashMap::new();
687
688 tracing::debug!(capabilities = ?binary.spec.capabilities, "Loading playbook capabilities");
689 tracing::debug!(
690 config_keys = ?binary
691 .configurations
692 .iter()
693 .map(|c| &c.package)
694 .collect::<Vec<_>>(),
695 "Loaded playbook configuration keys"
696 );
697
698 for cap in &binary.spec.capabilities {
699 if let Some(addr) = remotes.get(cap) {
700 remote.insert(cap.clone(), addr.clone());
701 } else if binary
702 .configurations
703 .iter()
704 .any(|c| c.package == cap.package)
705 {
706 let path = self
707 .capability_binary_path(&cap.author, &cap.package, &cap.version)
708 .await?;
709 paths.insert(cap.clone(), path);
710 } else {
711 return Err(CacheError::NotFound(format!("Capability {} not found", cap.package)));
712 }
713 }
714
715 Ok(LoadedPlaybook {
716 binary,
717 remote,
718 paths,
719 log_dir: log_dir.as_ref().to_path_buf(),
720 input_dir: input_dir.as_ref().to_path_buf(),
721 output_dir: output_dir.as_ref().to_path_buf(),
722 })
723 }.await;
724
725 if let Err(ref e) = res {
726 tracing::error!(error = ?e, "Failed to load playbook");
727 } else {
728 tracing::debug!("Successfully loaded playbook");
729 }
730 res
731 }
732
733 #[cfg(feature = "compiler")]
734 pub fn convert_anon_playbook(&self, playbook: AnonPlaybook) -> PlaybookSource {
735 let author = self.author.clone();
736 let mut resolved_capabilities = Vec::new();
737 for cap in &playbook.configurations {
738 resolved_capabilities.push(CapabilityIdent {
739 author: cap.author.clone(),
740 package: cap.package.clone(),
741 version: cap.version.clone(),
742 });
743 }
744 PlaybookSource::new(
745 crate::artifacts::PlaybookIdent {
746 author,
747 package: playbook.package,
748 version: "0.1.0".to_string(),
749 },
750 crate::artifacts::ModuleDependencies {
751 dependencies: playbook.dependencies,
752 capabilities: resolved_capabilities,
753 },
754 playbook.configurations,
755 playbook.source,
756 playbook.interconnect,
757 )
758 }
759}
760
761pub(crate) fn resolve_dependency_path(dep: &mut Dependency, base: &std::path::Path) {
762 if let Dependency::Detailed(detail) = dep
763 && let Some(ref mut p) = detail.path
764 {
765 let path = std::path::Path::new(p.as_str());
766 if path.is_relative() {
767 let absolute = base.join(path);
768 *p = absolute
769 .canonicalize()
770 .unwrap_or(absolute)
771 .to_string_lossy()
772 .into_owned();
773 }
774 }
775}