1use crate::artifacts::{
2 Artifact, Artifacts, Playbook, PlaybookBinary, PlaybookIdent, PlaybookSource,
3};
4
5#[cfg(feature = "compiler")]
6use crate::build::AnonPlaybook;
7use crate::cargo::CapabilityIdent;
8
9use cargo_toml::Dependency;
10use std::path::{Path, PathBuf};
11use std::{collections::HashMap, io};
12use tokio::fs;
13
14#[derive(Debug, thiserror::Error)]
15pub enum CacheError {
16 #[error("Not found: {0}")]
17 NotFound(String),
18 #[error("{context}: {error}")]
19 Io {
20 context: String,
21 #[source]
22 error: std::io::Error,
23 },
24}
25
26#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
27pub struct PyroductConfig {
28 pub author: String,
29 pub target: Option<PathBuf>,
30 pub pyroduct: Option<Dependency>,
31 pub build_slots: Option<usize>,
32}
33
34#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
35#[serde(rename_all = "lowercase")]
36pub enum RemoteAddress {
37 Tcp(String),
38 Unix(std::path::PathBuf),
39}
40
41#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
43pub struct LoadedPlaybook {
44 pub binary: PlaybookBinary,
45 pub remote: HashMap<CapabilityIdent, RemoteAddress>,
46 #[serde(default)]
47 pub paths: HashMap<CapabilityIdent, PathBuf>,
48
49 pub log_dir: std::path::PathBuf,
50 pub input_dir: std::path::PathBuf,
51 pub output_dir: std::path::PathBuf,
52
53 #[serde(default = "default_num_workers")]
55 pub num_workers: usize,
56}
57
58fn default_num_workers() -> usize {
59 4
60}
61
62pub struct CacheManager {
65 pub root: PathBuf,
66 pub pyroduct: Option<Dependency>,
67 pub author: String,
68}
69
70impl CacheManager {
71 #[tracing::instrument(skip(root), fields(root = %root.display()))]
72 pub async fn new(
73 root: &Path,
74 pyroduct: Option<Dependency>,
75 author: String,
76 ) -> Result<Self, CacheError> {
77 tracing::debug!("Creating CacheManager instance");
78 if !root.exists() {
79 fs::create_dir_all(&root).await.map_err(|e| {
80 let err = CacheError::Io {
81 context: "Failed to create cache root".to_string(),
82 error: e,
83 };
84 tracing::error!(error = ?err, "Cache root directory creation failed");
85 err
86 })?;
87 }
88
89 let pyroduct = if let Some(mut dep) = pyroduct {
90 crate::cache::resolve_dependency_path(&mut dep, root);
91 Some(dep.clone())
92 } else {
93 None
94 };
95
96 let manager = Self {
97 root: root.to_path_buf(),
98 pyroduct,
99 author,
100 };
101
102 Ok(manager)
103 }
104
105 #[tracing::instrument]
106 pub async fn from_env() -> Result<Self, CacheError> {
107 tracing::debug!("Loading CacheManager from environment");
108 let root = std::env::var("PYRODUCT")
109 .map(PathBuf::from)
110 .unwrap_or_else(|_| {
111 let system_cache = PathBuf::from("/var/lib/pyro-daemon/cache");
113 if system_cache.join("config.toml").exists() {
114 return system_cache;
115 }
116
117 let home = std::env::var("HOME")
118 .or_else(|_| std::env::var("USERPROFILE"))
119 .map(PathBuf::from)
120 .unwrap_or_else(|_| PathBuf::from("."));
121
122 let user_cache = home.join(".pyroduct");
124 if user_cache.join("config.toml").exists() {
125 return user_cache;
126 }
127
128 let macos_cache = home.join("Library/Application Support/pyro-daemon/cache");
130 if macos_cache.join("config.toml").exists() {
131 return macos_cache;
132 }
133
134 home.join(".pyroduct")
136 });
137 let config_path = root.join("config.toml");
138 let content = fs::read_to_string(&config_path)
139 .await
140 .map_err(|error| {
141 let err = CacheError::Io {
142 context: "Failed to read the configuration".to_string(),
143 error,
144 };
145 tracing::error!(error = ?err, "Failed to read CacheManager config file at {:?}", config_path);
146 err
147 })?;
148 let config = toml::from_str::<PyroductConfig>(&content).map_err(|error| {
149 let err = CacheError::Io {
150 context: "Failed to parse the configuration".to_string(),
151 error: io::Error::new(io::ErrorKind::InvalidData, error),
152 };
153 tracing::error!(error = ?err, "Failed to parse CacheManager config toml");
154 err
155 })?;
156
157 Self::new(&root, config.pyroduct, config.author).await
158 }
159
160 #[tracing::instrument(skip(self))]
161 pub async fn init(&self) -> Result<(), CacheError> {
162 tracing::debug!("Initializing CacheManager directories");
163 fs::create_dir_all(self.capabilities_base_dir())
164 .await
165 .map_err(|error| {
166 let err = CacheError::Io {
167 context: format!(
168 "Failed to create capabilities cache dir in {:?}",
169 self.capabilities_base_dir()
170 ),
171 error,
172 };
173 tracing::error!(error = ?err, "Failed to initialize capabilities dir");
174 err
175 })?;
176
177 fs::create_dir_all(self.interfaces_base_dir())
178 .await
179 .map_err(|error| {
180 let err = CacheError::Io {
181 context: "Failed to create interfaces cache dir".to_string(),
182 error,
183 };
184 tracing::error!(error = ?err, "Failed to initialize interfaces dir");
185 err
186 })?;
187
188 let module_dir = self.root.join("modules");
189 fs::create_dir_all(&module_dir).await.map_err(|error| {
190 let err = CacheError::Io {
191 context: "Failed to create modules cache dir".to_string(),
192 error,
193 };
194 tracing::error!(error = ?err, "Failed to initialize modules dir");
195 err
196 })?;
197
198 Ok(())
199 }
200
201 #[tracing::instrument(skip(self))]
202 pub async fn purge(&self) -> Result<(), CacheError> {
203 tracing::debug!("Purging CacheManager directories");
204 let dirs = [
205 self.capabilities_base_dir(),
206 self.interfaces_base_dir(),
207 self.root.join("modules"),
208 ];
209 for dir in dirs {
210 if dir.exists() {
211 fs::remove_dir_all(&dir).await.map_err(|e| CacheError::Io {
212 context: format!("Failed to remove cache dir {}", dir.display()),
213 error: e,
214 })?;
215 }
216 }
217 self.init().await?;
218 Ok(())
219 }
220
221 #[tracing::instrument(skip(self))]
222 pub async fn purge_capabilities(&self) -> Result<(), CacheError> {
223 tracing::debug!("Purging Capabilities from CacheManager");
224 let dirs = [
225 self.capabilities_base_dir(),
226 self.interfaces_base_dir(),
227 ];
228 for dir in dirs {
229 if dir.exists() {
230 fs::remove_dir_all(&dir).await.map_err(|e| CacheError::Io {
231 context: format!("Failed to remove capabilities cache dir {}", dir.display()),
232 error: e,
233 })?;
234 }
235 }
236 self.init().await?;
237 Ok(())
238 }
239
240 #[tracing::instrument(skip(self))]
241 pub async fn purge_modules(&self) -> Result<(), CacheError> {
242 tracing::debug!("Purging Modules from CacheManager");
243 let dir = self.root.join("modules");
244 if dir.exists() {
245 fs::remove_dir_all(&dir).await.map_err(|e| CacheError::Io {
246 context: format!("Failed to remove modules cache dir {}", dir.display()),
247 error: e,
248 })?;
249 }
250 self.init().await?;
251 Ok(())
252 }
253
254
255 pub async fn list_available_capabilities(
256 &self,
257 ) -> Result<Vec<(String, String, String)>, CacheError> {
258 let base = self.capabilities_base_dir();
259 if !base.exists() {
260 return Ok(Vec::new());
261 }
262
263 let mut results = Vec::new();
264 let mut authors = fs::read_dir(&base).await.map_err(|e| CacheError::Io {
265 context: "Failed to read capabilities base dir".to_string(),
266 error: e,
267 })?;
268
269 while let Some(author_entry) = authors.next_entry().await.map_err(|e| CacheError::Io {
270 context: "Failed to read author entry".to_string(),
271 error: e,
272 })? {
273 let author_path = author_entry.path();
274 if !author_path.is_dir() {
275 continue;
276 }
277 let author_name = author_entry.file_name().to_string_lossy().to_string();
278
279 let mut names = fs::read_dir(&author_path)
280 .await
281 .map_err(|e| CacheError::Io {
282 context: format!("Failed to read author dir: {}", author_path.display()),
283 error: e,
284 })?;
285
286 while let Some(name_entry) = names.next_entry().await.map_err(|e| CacheError::Io {
287 context: "Failed to read name entry".to_string(),
288 error: e,
289 })? {
290 let name_path = name_entry.path();
291 if !name_path.is_dir() {
292 continue;
293 }
294 let cap_name = name_entry.file_name().to_string_lossy().to_string();
295
296 let mut versions = fs::read_dir(&name_path).await.map_err(|e| CacheError::Io {
297 context: format!("Failed to read name dir: {}", name_path.display()),
298 error: e,
299 })?;
300
301 while let Some(version_entry) =
302 versions.next_entry().await.map_err(|e| CacheError::Io {
303 context: "Failed to read version entry".to_string(),
304 error: e,
305 })?
306 {
307 let version_path = version_entry.path();
308 if !version_path.is_dir() {
309 continue;
310 }
311 let version = version_entry.file_name().to_string_lossy().to_string();
312
313 if version_path.join("interface.json").exists() {
314 results.push((author_name.clone(), cap_name.clone(), version));
315 }
316 }
317 }
318 }
319
320 Ok(results)
321 }
322
323 pub fn capabilities_base_dir(&self) -> PathBuf {
324 self.root.join("capabilities")
325 }
326
327 pub fn capabilities_dir(&self, author: &str, name: &str, version: &str) -> PathBuf {
328 self.capabilities_base_dir()
329 .join(author)
330 .join(name)
331 .join(version)
332 }
333
334 pub fn interface_dir(&self, author: &str, name: &str, version: &str) -> PathBuf {
335 self.interfaces_base_dir()
336 .join(author)
337 .join(name)
338 .join(version)
339 }
340
341 pub fn modules_base_dir(&self) -> PathBuf {
344 self.root.join("modules")
345 }
346
347 pub fn module_dir(&self, author: &str, name: &str, version: &str) -> PathBuf {
348 self.modules_base_dir()
349 .join(author)
350 .join(name)
351 .join(version)
352 }
353
354 pub async fn list_available_modules(
355 &self,
356 ) -> Result<Vec<(String, String, String)>, CacheError> {
357 let base = self.modules_base_dir();
358 if !base.exists() {
359 return Ok(Vec::new());
360 }
361
362 let mut results = Vec::new();
363 let mut authors = fs::read_dir(&base).await.map_err(|e| CacheError::Io {
364 context: "Failed to read modules base dir".to_string(),
365 error: e,
366 })?;
367
368 while let Some(author_entry) = authors.next_entry().await.map_err(|e| CacheError::Io {
369 context: "Failed to read author entry".to_string(),
370 error: e,
371 })? {
372 let author_path = author_entry.path();
373 if !author_path.is_dir() {
374 continue;
375 }
376 let author_name = author_entry.file_name().to_string_lossy().to_string();
377
378 let mut names = fs::read_dir(&author_path)
379 .await
380 .map_err(|e| CacheError::Io {
381 context: format!("Failed to read author dir: {}", author_path.display()),
382 error: e,
383 })?;
384
385 while let Some(name_entry) = names.next_entry().await.map_err(|e| CacheError::Io {
386 context: "Failed to read name entry".to_string(),
387 error: e,
388 })? {
389 let name_path = name_entry.path();
390 if !name_path.is_dir() {
391 continue;
392 }
393 let mod_name = name_entry.file_name().to_string_lossy().to_string();
394
395 let mut versions = fs::read_dir(&name_path).await.map_err(|e| CacheError::Io {
396 context: format!("Failed to read name dir: {}", name_path.display()),
397 error: e,
398 })?;
399
400 while let Some(version_entry) =
401 versions.next_entry().await.map_err(|e| CacheError::Io {
402 context: "Failed to read version entry".to_string(),
403 error: e,
404 })?
405 {
406 let version_path = version_entry.path();
407 if !version_path.is_dir() {
408 continue;
409 }
410 let version = version_entry.file_name().to_string_lossy().to_string();
411
412 if version_path.join("spec.json").exists() {
413 results.push((author_name.clone(), mod_name.clone(), version));
414 }
415 }
416 }
417 }
418
419 Ok(results)
420 }
421
422 pub fn interfaces_base_dir(&self) -> PathBuf {
423 self.root.join("interfaces")
424 }
425
426 pub async fn capability_interface_spec(
427 &self,
428 author: &str,
429 name: &str,
430 version: &str,
431 ) -> Result<String, CacheError> {
432 let path = self
433 .capabilities_dir(author, name, version)
434 .join("interface.json");
435 fs::read_to_string(&path)
436 .await
437 .map_err(|error| CacheError::Io {
438 context: format!("Failed to read interface.json from {}", path.display()),
439 error,
440 })
441 }
442
443 pub async fn capability_binary_path(
444 &self,
445 author: &str,
446 name: &str,
447 version: &str,
448 ) -> Result<PathBuf, CacheError> {
449 let base_dir = self.capabilities_dir(author, name, version);
450
451 #[cfg(target_os = "linux")]
452 let lib_file = "lib.so";
453 #[cfg(target_os = "macos")]
454 let lib_file = "lib.dylib";
455 #[cfg(target_os = "windows")]
456 let lib_file = "lib.dll";
457 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
458 let lib_file = "lib.so";
459
460 let path = base_dir.join(lib_file);
461 if !path.exists() {
462 Err(CacheError::NotFound(format!(
463 "Missing {} binary for this system",
464 path.display()
465 )))
466 } else {
467 Ok(path)
468 }
469 }
470
471 pub async fn capability_config_spec(
472 &self,
473 author: &str,
474 name: &str,
475 version: &str,
476 ) -> Result<Option<String>, CacheError> {
477 let path = self
478 .capabilities_dir(author, name, version)
479 .join("config.json");
480 if path.exists() {
481 let content = fs::read_to_string(&path)
482 .await
483 .map_err(|error| CacheError::Io {
484 context: format!("Failed to read config.json from {}", path.display()),
485 error,
486 })?;
487 Ok(Some(content))
488 } else {
489 Ok(None)
490 }
491 }
492
493 #[tracing::instrument(skip(self))]
494 pub async fn remove_module(
495 &self,
496 author: &str,
497 name: &str,
498 version: &str,
499 ) -> Result<(), CacheError> {
500 tracing::debug!("Removing module from cache");
501 let path = self.module_dir(author, name, version);
502 if path.exists() {
503 tokio::fs::remove_dir_all(&path).await.map_err(|error| {
504 let err = CacheError::Io {
505 context: "Unable to remove module".to_string(),
506 error,
507 };
508 tracing::error!(error = ?err, "Failed to remove playbook at {:?}", path);
509 err
510 })?;
511 }
512 Ok(())
513 }
514
515 #[tracing::instrument(skip(self))]
518 pub async fn get_named_binary(
519 &self,
520 author: &str,
521 package: &str,
522 version: &str,
523 ) -> Result<PlaybookBinary, CacheError> {
524 tracing::debug!("Retrieving named playbook binary");
525 let path = self.module_dir(author, package, version);
526 if path.exists() {
527 let binary = PlaybookBinary::from_dir(&path).await.map_err(|error| {
528 let err = CacheError::Io {
529 context: "Unable to load named module binary".to_string(),
530 error,
531 };
532 tracing::error!(error = ?err, "Failed to load named module binary from {:?}", path);
533 err
534 })?;
535 Ok(binary)
536 } else {
537 let err = CacheError::NotFound(format!(
538 "Missing named module binary for {}/{}/{}",
539 author, package, version
540 ));
541 tracing::debug!("Named module binary not found at {:?}", path);
542 Err(err)
543 }
544 }
545
546 #[tracing::instrument(skip(self))]
547 pub async fn get_named_source(
548 &self,
549 author: &str,
550 package: &str,
551 version: &str,
552 ) -> Result<PlaybookSource, CacheError> {
553 tracing::debug!("Retrieving named playbook source");
554 let path = self.module_dir(author, package, version);
555 if path.exists() {
556 let mut source = PlaybookSource::from_dir(&path).await.map_err(|error| {
557 let err = CacheError::Io {
558 context: "Unable to load named module source".to_string(),
559 error,
560 };
561 tracing::error!(error = ?err, "Failed to load named module source from {:?}", path);
562 err
563 })?;
564 source.manifest.module = crate::cargo::CapabilityIdent {
565 author: author.to_string(),
566 package: package.to_string(),
567 version: version.to_string(),
568 };
569 Ok(source)
570 } else {
571 let err = CacheError::NotFound(format!(
572 "Missing named module source for {}/{}/{}",
573 author, package, version
574 ));
575 tracing::debug!("Named module source not found at {:?}", path);
576 Err(err)
577 }
578 }
579
580 #[tracing::instrument(skip(self, artifacts))]
581 pub async fn write_artifacts(&self, artifacts: &Artifacts) -> Result<(), CacheError> {
582 tracing::debug!("Writing artifacts to CacheManager");
583 let res = async {
584 match &artifacts {
585 Artifacts::CapabilityBinary(capability) => {
586 let path = self.capabilities_dir(
587 &capability.ident.author,
588 &capability.ident.package,
589 &capability.ident.version,
590 );
591 capability
592 .write_to_directory(&path)
593 .await
594 .map_err(|e| CacheError::Io {
595 context: format!("Failed to write artifacts to {}", path.display()),
596 error: e,
597 })
598 }
599 Artifacts::CapabilitySource(capability) => {
600 let path = self.capabilities_dir(
601 &capability.manifest.capability.author,
602 &capability.manifest.capability.package,
603 &capability.manifest.capability.version,
604 );
605 capability
606 .write_to_directory(&path)
607 .await
608 .map_err(|e| CacheError::Io {
609 context: format!("Failed to write artifacts to {}", path.display()),
610 error: e,
611 })
612 }
613 Artifacts::Interface(interface) => {
614 let path = self.interface_dir(
615 &interface.manifest.capability.author,
616 &interface.manifest.capability.package,
617 &interface.manifest.capability.version,
618 );
619 fs::create_dir_all(&path)
620 .await
621 .map_err(|e| CacheError::Io {
622 context: format!("Failed to create {}", path.display()),
623 error: e,
624 })?;
625 let mut manifest = interface.manifest.clone();
626 if let Some(pyroduct) = &self.pyroduct {
627 manifest.pyroduct = pyroduct.clone();
628 }
629 let cargo_path = path.join("Cargo.toml");
630 let cargo = manifest.clone().to_interface_manifest();
631 let cargo = toml::to_string_pretty(&cargo).map_err(|e| CacheError::Io {
632 context: format!(
633 "Failed to serialize Cargo.toml to {}",
634 cargo_path.display()
635 ),
636 error: io::Error::new(io::ErrorKind::InvalidData, e),
637 })?;
638 fs::write(&cargo_path, cargo)
639 .await
640 .map_err(|e| CacheError::Io {
641 context: format!(
642 "Failed to write Cargo.toml to {}",
643 cargo_path.display()
644 ),
645 error: e,
646 })?;
647 interface
648 .write_to_directory(&path)
649 .await
650 .map_err(|e| CacheError::Io {
651 context: format!("Failed to write artifacts to {}", path.display()),
652 error: e,
653 })
654 }
655 Artifacts::Playbook(Playbook::Binary(binary)) => {
656 let ident = &binary.spec.ident;
657 let path = self.module_dir(&ident.author, &ident.package, &ident.version);
658 fs::create_dir_all(&path)
659 .await
660 .map_err(|e| CacheError::Io {
661 context: format!("Failed to create module dir {}", path.display()),
662 error: e,
663 })?;
664 binary
665 .write_to_directory(&path)
666 .await
667 .map_err(|e| CacheError::Io {
668 context: format!("Failed to write artifacts to {}", path.display()),
669 error: e,
670 })
671 }
672 Artifacts::Playbook(Playbook::Source(source)) => {
673 let ident = source.ident();
674 let path = self.module_dir(&ident.author, &ident.package, &ident.version);
675 fs::create_dir_all(&path)
676 .await
677 .map_err(|e| CacheError::Io {
678 context: format!("Failed to create module dir {}", path.display()),
679 error: e,
680 })?;
681 source
682 .write_to_directory(&path)
683 .await
684 .map_err(|e| CacheError::Io {
685 context: format!("Failed to write artifacts to {}", path.display()),
686 error: e,
687 })
688 }
689 }
690 }
691 .await;
692
693 if let Err(ref e) = res {
694 tracing::error!(error = ?e, "Failed to write artifacts to cache");
695 } else {
696 tracing::debug!("Successfully wrote artifacts to cache");
697 }
698 res
699 }
700
701 #[tracing::instrument(skip(self, remotes, log_dir, input_dir, output_dir), fields(author = playbook.author, name = playbook.package, version = playbook.version))]
702 pub async fn load_playbook(
703 &self,
704 playbook: PlaybookIdent,
705 remotes: HashMap<CapabilityIdent, RemoteAddress>,
706 log_dir: impl AsRef<Path>,
707 input_dir: impl AsRef<Path>,
708 output_dir: impl AsRef<Path>,
709 num_workers: usize,
710 ) -> Result<LoadedPlaybook, CacheError> {
711 tracing::debug!("Loading playbook");
712 let res = async {
713 let binary = self.get_named_binary(&playbook.author, &playbook.package, &playbook.version).await?;
714 let mut paths = HashMap::new();
715 let mut remote = HashMap::new();
716
717 tracing::debug!(capabilities = ?binary.spec.capabilities, "Loading playbook capabilities");
718 tracing::debug!(
719 config_keys = ?binary
720 .configurations
721 .iter()
722 .map(|c| &c.package)
723 .collect::<Vec<_>>(),
724 "Loaded playbook configuration keys"
725 );
726
727 for cap in &binary.spec.capabilities {
728 if let Some(addr) = remotes.get(cap) {
729 remote.insert(cap.clone(), addr.clone());
730 } else if binary
731 .configurations
732 .iter()
733 .any(|c| c.package == cap.package)
734 {
735 let path = self
736 .capability_binary_path(&cap.author, &cap.package, &cap.version)
737 .await?;
738 paths.insert(cap.clone(), path);
739 } else {
740 return Err(CacheError::NotFound(format!("Capability {} not found", cap.package)));
741 }
742 }
743
744 Ok(LoadedPlaybook {
745 binary,
746 remote,
747 paths,
748 log_dir: log_dir.as_ref().to_path_buf(),
749 input_dir: input_dir.as_ref().to_path_buf(),
750 output_dir: output_dir.as_ref().to_path_buf(),
751 num_workers,
752 })
753 }.await;
754
755 if let Err(ref e) = res {
756 tracing::error!(error = ?e, "Failed to load playbook");
757 } else {
758 tracing::debug!("Successfully loaded playbook");
759 }
760 res
761 }
762
763 #[cfg(feature = "compiler")]
764 pub fn convert_anon_playbook(&self, playbook: AnonPlaybook) -> PlaybookSource {
765 let author = self.author.clone();
766 let mut resolved_capabilities = Vec::new();
767 for cap in &playbook.configurations {
768 resolved_capabilities.push(CapabilityIdent {
769 author: cap.author.clone(),
770 package: cap.package.clone(),
771 version: cap.version.clone(),
772 });
773 }
774 PlaybookSource::new(
775 crate::artifacts::PlaybookIdent {
776 author,
777 package: playbook.package,
778 version: "0.1.0".to_string(),
779 },
780 crate::artifacts::ModuleDependencies {
781 dependencies: playbook.dependencies,
782 capabilities: resolved_capabilities,
783 },
784 playbook.configurations,
785 playbook.source,
786 playbook.interconnect,
787 )
788 }
789}
790
791pub(crate) fn resolve_dependency_path(dep: &mut Dependency, base: &std::path::Path) {
792 if let Dependency::Detailed(detail) = dep
793 && let Some(ref mut p) = detail.path
794 {
795 let path = std::path::Path::new(p.as_str());
796 if path.is_relative() {
797 let absolute = base.join(path);
798 *p = absolute
799 .canonicalize()
800 .unwrap_or(absolute)
801 .to_string_lossy()
802 .into_owned();
803 }
804 }
805}