1use crate::artifacts::{
2 Artifact, Artifacts, Playbook, PlaybookBinary, PlaybookIdent, PlaybookSource,
3};
4
5#[cfg(feature = "compiler")]
6use crate::build::AnonPlaybook;
7use crate::cargo::CapabilityIdent;
8
9use cargo_toml::Dependency;
10use std::path::{Path, PathBuf};
11use std::{collections::HashMap, io};
12use tokio::fs;
13
14#[derive(Debug, thiserror::Error)]
15pub enum CacheError {
16 #[error("Not found: {0}")]
17 NotFound(String),
18 #[error("{context}: {error}")]
19 Io {
20 context: String,
21 #[source]
22 error: std::io::Error,
23 },
24}
25
26#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
27pub struct PyroductConfig {
28 pub author: String,
29 pub target: Option<PathBuf>,
30 pub pyroduct: Option<Dependency>,
31 pub build_slots: Option<usize>,
32}
33
34#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
35#[serde(rename_all = "lowercase")]
36pub enum RemoteAddress {
37 Tcp(String),
38 Unix(std::path::PathBuf),
39}
40
41#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
43pub struct LoadedPlaybook {
44 pub binary: PlaybookBinary,
45 pub remote: HashMap<CapabilityIdent, RemoteAddress>,
46 #[serde(default)]
47 pub paths: HashMap<CapabilityIdent, PathBuf>,
48
49 pub log_dir: std::path::PathBuf,
50 pub input_dir: std::path::PathBuf,
51 pub output_dir: std::path::PathBuf,
52
53 #[serde(default = "default_num_workers")]
55 pub num_workers: usize,
56}
57
58fn default_num_workers() -> usize {
59 4
60}
61
62pub struct CacheManager {
65 pub root: PathBuf,
66 pub pyroduct: Option<Dependency>,
67 pub author: String,
68}
69
70impl CacheManager {
71 #[tracing::instrument(skip(root), fields(root = %root.display()))]
72 pub async fn new(
73 root: &Path,
74 pyroduct: Option<Dependency>,
75 author: String,
76 ) -> Result<Self, CacheError> {
77 tracing::debug!("Creating CacheManager instance");
78 if !root.exists() {
79 fs::create_dir_all(&root).await.map_err(|e| {
80 let err = CacheError::Io {
81 context: "Failed to create cache root".to_string(),
82 error: e,
83 };
84 tracing::error!(error = ?err, "Cache root directory creation failed");
85 err
86 })?;
87 }
88
89 let pyroduct = if let Some(mut dep) = pyroduct {
90 crate::cache::resolve_dependency_path(&mut dep, root);
91 Some(dep.clone())
92 } else {
93 None
94 };
95
96 let manager = Self {
97 root: root.to_path_buf(),
98 pyroduct,
99 author,
100 };
101
102 Ok(manager)
103 }
104
105 #[tracing::instrument]
106 pub async fn from_env() -> Result<Self, CacheError> {
107 tracing::debug!("Loading CacheManager from environment");
108 let root = std::env::var("PYRODUCT")
109 .map(PathBuf::from)
110 .unwrap_or_else(|_| {
111 let system_cache = PathBuf::from("/var/lib/pyro-daemon/cache");
113 if system_cache.join("config.toml").exists() {
114 return system_cache;
115 }
116
117 let home = std::env::var("HOME")
118 .or_else(|_| std::env::var("USERPROFILE"))
119 .map(PathBuf::from)
120 .unwrap_or_else(|_| PathBuf::from("."));
121
122 let macos_cache = home.join("Library/Application Support/pyro-daemon/cache");
124 if macos_cache.join("config.toml").exists() {
125 return macos_cache;
126 }
127
128 home.join(".pyroduct")
130 });
131 let config_path = root.join("config.toml");
132 let content = fs::read_to_string(&config_path)
133 .await
134 .map_err(|error| {
135 let err = CacheError::Io {
136 context: "Failed to read the configuration".to_string(),
137 error,
138 };
139 tracing::error!(error = ?err, "Failed to read CacheManager config file at {:?}", config_path);
140 err
141 })?;
142 let config = toml::from_str::<PyroductConfig>(&content).map_err(|error| {
143 let err = CacheError::Io {
144 context: "Failed to parse the configuration".to_string(),
145 error: io::Error::new(io::ErrorKind::InvalidData, error),
146 };
147 tracing::error!(error = ?err, "Failed to parse CacheManager config toml");
148 err
149 })?;
150
151 Self::new(&root, config.pyroduct, config.author).await
152 }
153
154 #[tracing::instrument(skip(self))]
155 pub async fn init(&self) -> Result<(), CacheError> {
156 tracing::debug!("Initializing CacheManager directories");
157 fs::create_dir_all(self.capabilities_base_dir())
158 .await
159 .map_err(|error| {
160 let err = CacheError::Io {
161 context: format!(
162 "Failed to create capabilities cache dir in {:?}",
163 self.capabilities_base_dir()
164 ),
165 error,
166 };
167 tracing::error!(error = ?err, "Failed to initialize capabilities dir");
168 err
169 })?;
170
171 fs::create_dir_all(self.interfaces_base_dir())
172 .await
173 .map_err(|error| {
174 let err = CacheError::Io {
175 context: "Failed to create interfaces cache dir".to_string(),
176 error,
177 };
178 tracing::error!(error = ?err, "Failed to initialize interfaces dir");
179 err
180 })?;
181
182 let module_dir = self.root.join("modules");
183 fs::create_dir_all(&module_dir).await.map_err(|error| {
184 let err = CacheError::Io {
185 context: "Failed to create modules cache dir".to_string(),
186 error,
187 };
188 tracing::error!(error = ?err, "Failed to initialize modules dir");
189 err
190 })?;
191
192 Ok(())
193 }
194
195 #[tracing::instrument(skip(self))]
196 pub async fn purge(&self) -> Result<(), CacheError> {
197 tracing::debug!("Purging CacheManager directories");
198 let dirs = [
199 self.capabilities_base_dir(),
200 self.interfaces_base_dir(),
201 self.root.join("modules"),
202 ];
203 for dir in dirs {
204 if dir.exists() {
205 fs::remove_dir_all(&dir).await.map_err(|e| CacheError::Io {
206 context: format!("Failed to remove cache dir {}", dir.display()),
207 error: e,
208 })?;
209 }
210 }
211 self.init().await?;
212 Ok(())
213 }
214
215 #[tracing::instrument(skip(self))]
216 pub async fn purge_capabilities(&self) -> Result<(), CacheError> {
217 tracing::debug!("Purging Capabilities from CacheManager");
218 let dirs = [
219 self.capabilities_base_dir(),
220 self.interfaces_base_dir(),
221 ];
222 for dir in dirs {
223 if dir.exists() {
224 fs::remove_dir_all(&dir).await.map_err(|e| CacheError::Io {
225 context: format!("Failed to remove capabilities cache dir {}", dir.display()),
226 error: e,
227 })?;
228 }
229 }
230 self.init().await?;
231 Ok(())
232 }
233
234 #[tracing::instrument(skip(self))]
235 pub async fn purge_modules(&self) -> Result<(), CacheError> {
236 tracing::debug!("Purging Modules from CacheManager");
237 let dir = self.root.join("modules");
238 if dir.exists() {
239 fs::remove_dir_all(&dir).await.map_err(|e| CacheError::Io {
240 context: format!("Failed to remove modules cache dir {}", dir.display()),
241 error: e,
242 })?;
243 }
244 self.init().await?;
245 Ok(())
246 }
247
248
249 pub async fn list_available_capabilities(
250 &self,
251 ) -> Result<Vec<(String, String, String)>, CacheError> {
252 let base = self.capabilities_base_dir();
253 if !base.exists() {
254 return Ok(Vec::new());
255 }
256
257 let mut results = Vec::new();
258 let mut authors = fs::read_dir(&base).await.map_err(|e| CacheError::Io {
259 context: "Failed to read capabilities base dir".to_string(),
260 error: e,
261 })?;
262
263 while let Some(author_entry) = authors.next_entry().await.map_err(|e| CacheError::Io {
264 context: "Failed to read author entry".to_string(),
265 error: e,
266 })? {
267 let author_path = author_entry.path();
268 if !author_path.is_dir() {
269 continue;
270 }
271 let author_name = author_entry.file_name().to_string_lossy().to_string();
272
273 let mut names = fs::read_dir(&author_path)
274 .await
275 .map_err(|e| CacheError::Io {
276 context: format!("Failed to read author dir: {}", author_path.display()),
277 error: e,
278 })?;
279
280 while let Some(name_entry) = names.next_entry().await.map_err(|e| CacheError::Io {
281 context: "Failed to read name entry".to_string(),
282 error: e,
283 })? {
284 let name_path = name_entry.path();
285 if !name_path.is_dir() {
286 continue;
287 }
288 let cap_name = name_entry.file_name().to_string_lossy().to_string();
289
290 let mut versions = fs::read_dir(&name_path).await.map_err(|e| CacheError::Io {
291 context: format!("Failed to read name dir: {}", name_path.display()),
292 error: e,
293 })?;
294
295 while let Some(version_entry) =
296 versions.next_entry().await.map_err(|e| CacheError::Io {
297 context: "Failed to read version entry".to_string(),
298 error: e,
299 })?
300 {
301 let version_path = version_entry.path();
302 if !version_path.is_dir() {
303 continue;
304 }
305 let version = version_entry.file_name().to_string_lossy().to_string();
306
307 if version_path.join("interface.json").exists() {
308 results.push((author_name.clone(), cap_name.clone(), version));
309 }
310 }
311 }
312 }
313
314 Ok(results)
315 }
316
317 pub fn capabilities_base_dir(&self) -> PathBuf {
318 self.root.join("capabilities")
319 }
320
321 pub fn capabilities_dir(&self, author: &str, name: &str, version: &str) -> PathBuf {
322 self.capabilities_base_dir()
323 .join(author)
324 .join(name)
325 .join(version)
326 }
327
328 pub fn interface_dir(&self, author: &str, name: &str, version: &str) -> PathBuf {
329 self.interfaces_base_dir()
330 .join(author)
331 .join(name)
332 .join(version)
333 }
334
335 pub fn modules_base_dir(&self) -> PathBuf {
338 self.root.join("modules")
339 }
340
341 pub fn module_dir(&self, author: &str, name: &str, version: &str) -> PathBuf {
342 self.modules_base_dir()
343 .join(author)
344 .join(name)
345 .join(version)
346 }
347
348 pub async fn list_available_modules(
349 &self,
350 ) -> Result<Vec<(String, String, String)>, CacheError> {
351 let base = self.modules_base_dir();
352 if !base.exists() {
353 return Ok(Vec::new());
354 }
355
356 let mut results = Vec::new();
357 let mut authors = fs::read_dir(&base).await.map_err(|e| CacheError::Io {
358 context: "Failed to read modules base dir".to_string(),
359 error: e,
360 })?;
361
362 while let Some(author_entry) = authors.next_entry().await.map_err(|e| CacheError::Io {
363 context: "Failed to read author entry".to_string(),
364 error: e,
365 })? {
366 let author_path = author_entry.path();
367 if !author_path.is_dir() {
368 continue;
369 }
370 let author_name = author_entry.file_name().to_string_lossy().to_string();
371
372 let mut names = fs::read_dir(&author_path)
373 .await
374 .map_err(|e| CacheError::Io {
375 context: format!("Failed to read author dir: {}", author_path.display()),
376 error: e,
377 })?;
378
379 while let Some(name_entry) = names.next_entry().await.map_err(|e| CacheError::Io {
380 context: "Failed to read name entry".to_string(),
381 error: e,
382 })? {
383 let name_path = name_entry.path();
384 if !name_path.is_dir() {
385 continue;
386 }
387 let mod_name = name_entry.file_name().to_string_lossy().to_string();
388
389 let mut versions = fs::read_dir(&name_path).await.map_err(|e| CacheError::Io {
390 context: format!("Failed to read name dir: {}", name_path.display()),
391 error: e,
392 })?;
393
394 while let Some(version_entry) =
395 versions.next_entry().await.map_err(|e| CacheError::Io {
396 context: "Failed to read version entry".to_string(),
397 error: e,
398 })?
399 {
400 let version_path = version_entry.path();
401 if !version_path.is_dir() {
402 continue;
403 }
404 let version = version_entry.file_name().to_string_lossy().to_string();
405
406 if version_path.join("spec.json").exists() {
407 results.push((author_name.clone(), mod_name.clone(), version));
408 }
409 }
410 }
411 }
412
413 Ok(results)
414 }
415
416 pub fn interfaces_base_dir(&self) -> PathBuf {
417 self.root.join("interfaces")
418 }
419
420 pub async fn capability_interface_spec(
421 &self,
422 author: &str,
423 name: &str,
424 version: &str,
425 ) -> Result<String, CacheError> {
426 let path = self
427 .capabilities_dir(author, name, version)
428 .join("interface.json");
429 fs::read_to_string(&path)
430 .await
431 .map_err(|error| CacheError::Io {
432 context: format!("Failed to read interface.json from {}", path.display()),
433 error,
434 })
435 }
436
437 pub async fn capability_binary_path(
438 &self,
439 author: &str,
440 name: &str,
441 version: &str,
442 ) -> Result<PathBuf, CacheError> {
443 let base_dir = self.capabilities_dir(author, name, version);
444
445 #[cfg(target_os = "linux")]
446 let lib_file = "lib.so";
447 #[cfg(target_os = "macos")]
448 let lib_file = "lib.dylib";
449 #[cfg(target_os = "windows")]
450 let lib_file = "lib.dll";
451 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
452 let lib_file = "lib.so";
453
454 let path = base_dir.join(lib_file);
455 if !path.exists() {
456 Err(CacheError::NotFound(format!(
457 "Missing {} binary for this system",
458 path.display()
459 )))
460 } else {
461 Ok(path)
462 }
463 }
464
465 pub async fn capability_config_spec(
466 &self,
467 author: &str,
468 name: &str,
469 version: &str,
470 ) -> Result<Option<String>, CacheError> {
471 let path = self
472 .capabilities_dir(author, name, version)
473 .join("config.json");
474 if path.exists() {
475 let content = fs::read_to_string(&path)
476 .await
477 .map_err(|error| CacheError::Io {
478 context: format!("Failed to read config.json from {}", path.display()),
479 error,
480 })?;
481 Ok(Some(content))
482 } else {
483 Ok(None)
484 }
485 }
486
487 #[tracing::instrument(skip(self))]
488 pub async fn remove_module(
489 &self,
490 author: &str,
491 name: &str,
492 version: &str,
493 ) -> Result<(), CacheError> {
494 tracing::debug!("Removing module from cache");
495 let path = self.module_dir(author, name, version);
496 if path.exists() {
497 tokio::fs::remove_dir_all(&path).await.map_err(|error| {
498 let err = CacheError::Io {
499 context: "Unable to remove module".to_string(),
500 error,
501 };
502 tracing::error!(error = ?err, "Failed to remove playbook at {:?}", path);
503 err
504 })?;
505 }
506 Ok(())
507 }
508
509 #[tracing::instrument(skip(self))]
512 pub async fn get_named_binary(
513 &self,
514 author: &str,
515 package: &str,
516 version: &str,
517 ) -> Result<PlaybookBinary, CacheError> {
518 tracing::debug!("Retrieving named playbook binary");
519 let path = self.module_dir(author, package, version);
520 if path.exists() {
521 let binary = PlaybookBinary::from_dir(&path).await.map_err(|error| {
522 let err = CacheError::Io {
523 context: "Unable to load named module binary".to_string(),
524 error,
525 };
526 tracing::error!(error = ?err, "Failed to load named module binary from {:?}", path);
527 err
528 })?;
529 Ok(binary)
530 } else {
531 let err = CacheError::NotFound(format!(
532 "Missing named module binary for {}/{}/{}",
533 author, package, version
534 ));
535 tracing::debug!("Named module binary not found at {:?}", path);
536 Err(err)
537 }
538 }
539
540 #[tracing::instrument(skip(self))]
541 pub async fn get_named_source(
542 &self,
543 author: &str,
544 package: &str,
545 version: &str,
546 ) -> Result<PlaybookSource, CacheError> {
547 tracing::debug!("Retrieving named playbook source");
548 let path = self.module_dir(author, package, version);
549 if path.exists() {
550 let mut source = PlaybookSource::from_dir(&path).await.map_err(|error| {
551 let err = CacheError::Io {
552 context: "Unable to load named module source".to_string(),
553 error,
554 };
555 tracing::error!(error = ?err, "Failed to load named module source from {:?}", path);
556 err
557 })?;
558 source.manifest.module = crate::cargo::CapabilityIdent {
559 author: author.to_string(),
560 package: package.to_string(),
561 version: version.to_string(),
562 };
563 Ok(source)
564 } else {
565 let err = CacheError::NotFound(format!(
566 "Missing named module source for {}/{}/{}",
567 author, package, version
568 ));
569 tracing::debug!("Named module source not found at {:?}", path);
570 Err(err)
571 }
572 }
573
574 #[tracing::instrument(skip(self, artifacts))]
575 pub async fn write_artifacts(&self, artifacts: &Artifacts) -> Result<(), CacheError> {
576 tracing::debug!("Writing artifacts to CacheManager");
577 let res = async {
578 match &artifacts {
579 Artifacts::CapabilityBinary(capability) => {
580 let path = self.capabilities_dir(
581 &capability.ident.author,
582 &capability.ident.package,
583 &capability.ident.version,
584 );
585 capability
586 .write_to_directory(&path)
587 .await
588 .map_err(|e| CacheError::Io {
589 context: format!("Failed to write artifacts to {}", path.display()),
590 error: e,
591 })
592 }
593 Artifacts::CapabilitySource(capability) => {
594 let path = self.capabilities_dir(
595 &capability.manifest.capability.author,
596 &capability.manifest.capability.package,
597 &capability.manifest.capability.version,
598 );
599 capability
600 .write_to_directory(&path)
601 .await
602 .map_err(|e| CacheError::Io {
603 context: format!("Failed to write artifacts to {}", path.display()),
604 error: e,
605 })
606 }
607 Artifacts::Interface(interface) => {
608 let path = self.interface_dir(
609 &interface.manifest.capability.author,
610 &interface.manifest.capability.package,
611 &interface.manifest.capability.version,
612 );
613 fs::create_dir_all(&path)
614 .await
615 .map_err(|e| CacheError::Io {
616 context: format!("Failed to create {}", path.display()),
617 error: e,
618 })?;
619 let mut manifest = interface.manifest.clone();
620 if let Some(pyroduct) = &self.pyroduct {
621 manifest.pyroduct = pyroduct.clone();
622 }
623 let cargo_path = path.join("Cargo.toml");
624 let cargo = manifest.clone().to_interface_manifest();
625 let cargo = toml::to_string_pretty(&cargo).map_err(|e| CacheError::Io {
626 context: format!(
627 "Failed to serialize Cargo.toml to {}",
628 cargo_path.display()
629 ),
630 error: io::Error::new(io::ErrorKind::InvalidData, e),
631 })?;
632 fs::write(&cargo_path, cargo)
633 .await
634 .map_err(|e| CacheError::Io {
635 context: format!(
636 "Failed to write Cargo.toml to {}",
637 cargo_path.display()
638 ),
639 error: e,
640 })?;
641 interface
642 .write_to_directory(&path)
643 .await
644 .map_err(|e| CacheError::Io {
645 context: format!("Failed to write artifacts to {}", path.display()),
646 error: e,
647 })
648 }
649 Artifacts::Playbook(Playbook::Binary(binary)) => {
650 let ident = &binary.spec.ident;
651 let path = self.module_dir(&ident.author, &ident.package, &ident.version);
652 fs::create_dir_all(&path)
653 .await
654 .map_err(|e| CacheError::Io {
655 context: format!("Failed to create module dir {}", path.display()),
656 error: e,
657 })?;
658 binary
659 .write_to_directory(&path)
660 .await
661 .map_err(|e| CacheError::Io {
662 context: format!("Failed to write artifacts to {}", path.display()),
663 error: e,
664 })
665 }
666 Artifacts::Playbook(Playbook::Source(source)) => {
667 let ident = source.ident();
668 let path = self.module_dir(&ident.author, &ident.package, &ident.version);
669 fs::create_dir_all(&path)
670 .await
671 .map_err(|e| CacheError::Io {
672 context: format!("Failed to create module dir {}", path.display()),
673 error: e,
674 })?;
675 source
676 .write_to_directory(&path)
677 .await
678 .map_err(|e| CacheError::Io {
679 context: format!("Failed to write artifacts to {}", path.display()),
680 error: e,
681 })
682 }
683 }
684 }
685 .await;
686
687 if let Err(ref e) = res {
688 tracing::error!(error = ?e, "Failed to write artifacts to cache");
689 } else {
690 tracing::debug!("Successfully wrote artifacts to cache");
691 }
692 res
693 }
694
695 #[tracing::instrument(skip(self, remotes, log_dir, input_dir, output_dir), fields(author = playbook.author, name = playbook.package, version = playbook.version))]
696 pub async fn load_playbook(
697 &self,
698 playbook: PlaybookIdent,
699 remotes: HashMap<CapabilityIdent, RemoteAddress>,
700 log_dir: impl AsRef<Path>,
701 input_dir: impl AsRef<Path>,
702 output_dir: impl AsRef<Path>,
703 num_workers: usize,
704 ) -> Result<LoadedPlaybook, CacheError> {
705 tracing::debug!("Loading playbook");
706 let res = async {
707 let binary = self.get_named_binary(&playbook.author, &playbook.package, &playbook.version).await?;
708 let mut paths = HashMap::new();
709 let mut remote = HashMap::new();
710
711 tracing::debug!(capabilities = ?binary.spec.capabilities, "Loading playbook capabilities");
712 tracing::debug!(
713 config_keys = ?binary
714 .configurations
715 .iter()
716 .map(|c| &c.package)
717 .collect::<Vec<_>>(),
718 "Loaded playbook configuration keys"
719 );
720
721 for cap in &binary.spec.capabilities {
722 if let Some(addr) = remotes.get(cap) {
723 remote.insert(cap.clone(), addr.clone());
724 } else if binary
725 .configurations
726 .iter()
727 .any(|c| c.package == cap.package)
728 {
729 let path = self
730 .capability_binary_path(&cap.author, &cap.package, &cap.version)
731 .await?;
732 paths.insert(cap.clone(), path);
733 } else {
734 return Err(CacheError::NotFound(format!("Capability {} not found", cap.package)));
735 }
736 }
737
738 Ok(LoadedPlaybook {
739 binary,
740 remote,
741 paths,
742 log_dir: log_dir.as_ref().to_path_buf(),
743 input_dir: input_dir.as_ref().to_path_buf(),
744 output_dir: output_dir.as_ref().to_path_buf(),
745 num_workers,
746 })
747 }.await;
748
749 if let Err(ref e) = res {
750 tracing::error!(error = ?e, "Failed to load playbook");
751 } else {
752 tracing::debug!("Successfully loaded playbook");
753 }
754 res
755 }
756
757 #[cfg(feature = "compiler")]
758 pub fn convert_anon_playbook(&self, playbook: AnonPlaybook) -> PlaybookSource {
759 let author = self.author.clone();
760 let mut resolved_capabilities = Vec::new();
761 for cap in &playbook.configurations {
762 resolved_capabilities.push(CapabilityIdent {
763 author: cap.author.clone(),
764 package: cap.package.clone(),
765 version: cap.version.clone(),
766 });
767 }
768 PlaybookSource::new(
769 crate::artifacts::PlaybookIdent {
770 author,
771 package: playbook.package,
772 version: "0.1.0".to_string(),
773 },
774 crate::artifacts::ModuleDependencies {
775 dependencies: playbook.dependencies,
776 capabilities: resolved_capabilities,
777 },
778 playbook.configurations,
779 playbook.source,
780 playbook.interconnect,
781 )
782 }
783}
784
785pub(crate) fn resolve_dependency_path(dep: &mut Dependency, base: &std::path::Path) {
786 if let Dependency::Detailed(detail) = dep
787 && let Some(ref mut p) = detail.path
788 {
789 let path = std::path::Path::new(p.as_str());
790 if path.is_relative() {
791 let absolute = base.join(path);
792 *p = absolute
793 .canonicalize()
794 .unwrap_or(absolute)
795 .to_string_lossy()
796 .into_owned();
797 }
798 }
799}