1use std::{future::Future, ops::Deref, pin::Pin};
60
61use dagga::{dot::DagLegend, Node, Schedule};
62use snafu::prelude::*;
63use tokio::io::AsyncWriteExt;
64
65pub use teleform_derive::HasDependencies;
66
67mod has_dependencies_impl;
68pub mod remote;
69#[cfg(test)]
70mod test;
71pub mod utils;
72
73use remote::{Migrated, Remote, RemoteVar, Remotes};
74
75pub trait UserError: core::fmt::Display + core::fmt::Debug + 'static {}
77impl<T: core::fmt::Display + core::fmt::Debug + 'static> UserError for T {}
78
79#[derive(snafu::Snafu, Debug)]
81pub enum Error {
82 #[snafu(display("{source}:\n{}",
83 source.chain()
84 .map(|e| format!("{e}"))
85 .collect::<Vec<_>>()
86 .join("\n -> ")))]
87 Tele { source: anyhow::Error },
88
89 #[snafu(display("Could not read store file '{path:?}': {source}"))]
90 StoreFileRead {
91 path: std::path::PathBuf,
92 source: std::io::Error,
93 },
94
95 #[snafu(display("Could not delete store file '{path:?}': {source}"))]
96 StoreFileDelete {
97 path: std::path::PathBuf,
98 source: std::io::Error,
99 },
100
101 #[snafu(display("Could not serialize stored '{name}': {source}"))]
102 Serialize {
103 name: String,
104 source: serde_json::Error,
105 },
106
107 #[snafu(display("Could not deserialize stored '{name}': {source}"))]
108 Deserialize {
109 name: String,
110 source: serde_json::Error,
111 },
112
113 #[snafu(display("Could not build schedule: {msg}"))]
114 Schedule { msg: String },
115
116 #[snafu(display("Could not create file {path:?}: {source}"))]
117 CreateFile {
118 path: std::path::PathBuf,
119 source: std::io::Error,
120 },
121
122 #[snafu(display("Could not write file {path:?}: {source}"))]
123 WriteFile {
124 path: std::path::PathBuf,
125 source: std::io::Error,
126 },
127
128 #[snafu(display("Remote value of {ty:?} is unresolved. Depends on {depends_on}"))]
129 RemoteUnresolved {
130 ty: &'static str,
131 depends_on: String,
132 },
133
134 #[snafu(display("Could not save the apply graph: {source}"))]
135 Dot { source: dagga::dot::DotError },
136
137 #[snafu(display(
138 "Could not build apply graph because of a missing resource name for '{missing}'"
139 ))]
140 MissingName { missing: usize },
141
142 #[snafu(display("Could not find a resource by the name '{name}'"))]
143 MissingResource { name: String },
144
145 #[snafu(display("Error during '{name}' creation: {error}"))]
146 Create {
147 name: String,
148 error: Box<dyn UserError>,
149 },
150
151 #[snafu(display("Error during '{name}' read and import: {error}"))]
152 Import {
153 name: String,
154 error: Box<dyn UserError>,
155 },
156
157 #[snafu(display("Error during '{name}' update: {error}"))]
158 Update {
159 name: String,
160 error: Box<dyn UserError>,
161 },
162
163 #[snafu(display("Error during '{name}' destruction: {error}"))]
164 Destroy {
165 name: String,
166 error: Box<dyn UserError>,
167 },
168
169 #[snafu(display("Missing previous remote value '{name}'"))]
170 Load { name: String },
171
172 #[snafu(display(
173 "Loading '{id}' would clobber an existing value in the store file, \
174 and these values are not the same"
175 ))]
176 Clobber { id: String },
177
178 #[snafu(display("Could not downcast"))]
179 Downcast,
180
181 #[snafu(display("Missing store file for '{id}'"))]
182 MissingStoreFile { id: String },
183}
184
185impl From<anyhow::Error> for Error {
186 fn from(source: anyhow::Error) -> Self {
187 Error::Tele { source }
188 }
189}
190
191impl From<dagga::dot::DotError> for Error {
192 fn from(source: dagga::dot::DotError) -> Self {
193 Self::Dot { source }
194 }
195}
196
197type Result<T, E = Error> = core::result::Result<T, E>;
198
199#[allow(unreachable_code)]
203pub trait Resource:
204 Clone + PartialEq + HasDependencies + serde::Serialize + serde::de::DeserializeOwned + 'static
205{
206 type Provider;
210
211 type Error: UserError;
213
214 type Output: core::fmt::Debug
217 + Clone
218 + PartialEq
219 + serde::Serialize
220 + serde::de::DeserializeOwned
221 + 'static;
222
223 fn create(
235 &self,
236 _provider: &Self::Provider,
237 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
238 unimplemented!(
239 "Resource::create is unimplemented for {}",
240 std::any::type_name::<Self>()
241 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
242 }
243
244 fn read(
256 &self,
257 _provider: &Self::Provider,
258 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
259 unimplemented!(
260 "Resource::read is unimplemented for {}",
261 std::any::type_name::<Self>()
262 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
263 }
264
265 fn update(
278 &self,
279 _provider: &Self::Provider,
280 _previous_local: &Self,
281 _previous_remote: &Self::Output,
282 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
283 unimplemented!(
284 "Resource::update is unimplemented for {}",
285 std::any::type_name::<Self>()
286 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
287 }
288
289 fn delete(
301 &self,
302 _provider: &Self::Provider,
303 _previous_remote: &Self::Output,
304 ) -> impl Future<Output = Result<(), Self::Error>> {
305 unimplemented!(
306 "Resource::delete is unimplemented for {}",
307 std::any::type_name::<Self>()
308 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
309 }
310}
311
312#[derive(Clone, Default, Debug)]
313pub struct Dependencies {
314 inner: Vec<String>,
316}
317
318impl IntoIterator for Dependencies {
319 type Item = String;
320
321 type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
322
323 fn into_iter(self) -> Self::IntoIter {
324 self.inner.into_iter()
325 }
326}
327
328impl core::fmt::Display for Dependencies {
329 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330 f.write_str(
331 &self
332 .inner
333 .iter()
334 .map(|u| u.to_string())
335 .collect::<Vec<_>>()
336 .join(", "),
337 )
338 }
339}
340
341impl Dependencies {
342 pub fn merge(self, other: Self) -> Self {
343 Dependencies {
344 inner: [self.inner, other.inner].concat(),
345 }
346 }
347}
348
349pub trait HasDependencies {
354 fn dependencies(&self) -> Dependencies {
355 Dependencies::default()
356 }
357}
358
359#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
363pub enum Action {
364 Load,
365 Create,
366 Read,
367 Update,
368 Destroy,
369}
370
371impl core::fmt::Display for Action {
372 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373 f.write_str(match self {
374 Action::Load => "load",
375 Action::Create => "create",
376 Action::Read => "read",
377 Action::Update => "update",
378 Action::Destroy => "destroy",
379 })
380 }
381}
382
383#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
384struct InertStoreResource {
385 name: String,
386 local: serde_json::Value,
387 remote: serde_json::Value,
388}
389
390impl InertStoreResource {
391 async fn save(
392 &self,
393 resource_id: &str,
394 store_path: impl AsRef<std::path::Path>,
395 ) -> Result<(), Error> {
396 let path = store_file_path(resource_id, &store_path);
397 log::info!("storing {resource_id} to {path:?}");
398
399 let contents = serde_json::to_string_pretty(self).context(SerializeSnafu {
400 name: format!("storing {}", resource_id),
401 })?;
402
403 if let Some(parent) = path.parent() {
405 tokio::fs::create_dir_all(&parent)
406 .await
407 .context(CreateFileSnafu { path: parent })?;
408 }
409
410 let mut file = tokio::fs::File::create(&path)
411 .await
412 .context(CreateFileSnafu { path: path.clone() })?;
413 file.write_all(contents.as_bytes())
414 .await
415 .context(WriteFileSnafu { path: path.clone() })?;
416 Ok(())
417 }
418}
419
420#[derive(Clone, Debug)]
421pub struct StoreResource<L, R> {
422 name: String,
424 local_definition: L,
426 action: Action,
427 remote_var: RemoteVar<R>,
428}
429
430impl<L, R> Deref for StoreResource<L, R> {
431 type Target = L;
432
433 fn deref(&self) -> &Self::Target {
434 &self.local_definition
435 }
436}
437
438impl<L, R> AsRef<L> for StoreResource<L, R> {
439 fn as_ref(&self) -> &L {
440 &self.local_definition
441 }
442}
443
444impl<L, R> TryFrom<StoreResource<L, R>> for InertStoreResource
445where
446 L: serde::Serialize + for<'a> serde::Deserialize<'a>,
447 R: Clone + serde::Serialize + for<'a> serde::Deserialize<'a>,
448{
449 type Error = Error;
450
451 fn try_from(value: StoreResource<L, R>) -> std::result::Result<Self, Self::Error> {
452 let local = serde_json::to_value(value.local_definition).context(SerializeSnafu {
453 name: value.name.clone(),
454 })?;
455 let output = value.remote_var.get().context(LoadSnafu {
456 name: value.name.clone(),
457 })?;
458 let remote = serde_json::to_value(output).context(SerializeSnafu {
459 name: value.name.clone(),
460 })?;
461 Ok(Self {
462 name: value.name,
463 local,
464 remote,
465 })
466 }
467}
468
469impl<T> StoreResource<T, T::Output>
470where
471 T: Resource,
472 T::Output: Clone,
473{
474 pub fn remote<X: Clone + core::fmt::Debug + 'static>(
476 &self,
477 f: impl Fn(&T::Output) -> X + 'static,
478 ) -> Remote<X> {
479 Remote::new(self, f)
480 }
481
482 pub fn action(&self) -> Action {
487 self.action
488 }
489}
490
491fn store_file_path(name: &str, store_path: impl AsRef<std::path::Path>) -> std::path::PathBuf {
493 store_path.as_ref().join(format!("{name}.json"))
494}
495
496type StoreNodeRunFn<Provider> = Box<
497 dyn FnOnce(
498 &'_ Provider,
500 ) -> Pin<Box<dyn Future<Output = Result<()>> + '_>>,
501>;
502
503struct RunAction<'a, Provider, T: Resource<Provider = Provider>> {
504 provider: &'a Provider,
505 store_path: std::path::PathBuf,
506 resource_id: String,
508 action: Action,
509 local_definition_code: T,
510 local_definition_store: Option<T>,
511 remote_var: RemoteVar<T::Output>,
512}
513
514impl<Provider, T: Resource<Provider = Provider>> RunAction<'_, Provider, T> {
515 async fn run(self) -> Result<()>
516 where
517 T: Resource,
518 {
519 let Self {
520 provider,
521 store_path,
522 resource_id,
523 action,
524 local_definition_code,
525 local_definition_store,
526 remote_var,
527 } = self;
528 log::info!("{action} '{resource_id}':");
529
530 async fn save<T: Resource>(
531 resource_id: &str,
532 local_definition_code: T,
533 remote_var: &RemoteVar<T::Output>,
534 store_path: impl AsRef<std::path::Path>,
535 ) -> Result<(), Error> {
536 let inert_resource = InertStoreResource {
537 name: resource_id.to_owned(),
538 local: serde_json::to_value(local_definition_code).context(SerializeSnafu {
539 name: format!("store {resource_id}"),
540 })?,
541 remote: serde_json::to_value(
542 remote_var.get().context(LoadSnafu { name: resource_id })?,
543 )
544 .context(SerializeSnafu {
545 name: format!("store {resource_id} remote"),
546 })?,
547 };
548 inert_resource.save(resource_id, store_path).await?;
549 Ok(())
550 }
551
552 match action {
553 Action::Load => {
554 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
555 }
556 Action::Create => {
557 let value = local_definition_code
558 .create(provider)
559 .await
560 .map_err(|error| Error::Create {
561 name: resource_id.to_owned(),
562 error: Box::new(error),
563 })?;
564 remote_var.set(Some(value));
565 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
566 }
567 Action::Read => {
568 let value = local_definition_code
569 .read(provider)
570 .await
571 .map_err(|error| Error::Create {
572 name: resource_id.to_owned(),
573 error: Box::new(error),
574 })?;
575 remote_var.set(Some(value));
576 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
577 }
578 Action::Update => {
579 let previous_local = local_definition_store.unwrap();
580 let previous_remote = remote_var.get().context(LoadSnafu {
581 name: resource_id.clone(),
582 })?;
583 let output = local_definition_code
584 .update(provider, &previous_local, &previous_remote)
585 .await
586 .map_err(|error| Error::Update {
587 name: resource_id.clone(),
588 error: Box::new(error),
589 })?;
590 remote_var.set(Some(output));
591 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
592 }
593 Action::Destroy => {
594 log::debug!("running destroy action on {resource_id}");
595 let local_definition = local_definition_code.clone();
599 let previous_remote = remote_var.get().context(LoadSnafu {
600 name: resource_id.clone(),
601 })?;
602 local_definition
603 .delete(provider, &previous_remote)
604 .await
605 .map_err(|error| Error::Destroy {
606 name: resource_id.to_owned(),
607 error: Box::new(error),
608 })?;
609
610 log::info!(" {resource_id} is destroyed");
611 let path = store_file_path(&resource_id, &store_path);
612 log::info!(" removing {resource_id} store file {path:?}");
613 tokio::fs::remove_file(&path)
614 .await
615 .context(StoreFileDeleteSnafu { path })?;
616 remote_var.set(None);
617 }
618 }
619
620 log::info!(" success!");
621 Ok(())
622 }
623}
624
625pub struct DestroyResource<T: Resource> {
626 local: T,
627 remote: T::Output,
628}
629
630impl<T: Resource> Deref for DestroyResource<T> {
631 type Target = T;
632
633 fn deref(&self) -> &Self::Target {
634 &self.local
635 }
636}
637
638impl<T: Resource> DestroyResource<T> {
639 pub fn migrate<X: Clone + core::fmt::Debug + 'static>(
642 &self,
643 f: fn(&T::Output) -> X,
644 ) -> Migrated<X> {
645 Migrated(f(&self.remote))
646 }
647}
648
649struct StoreNode<Provider> {
650 name: String,
651 _remote_ty: &'static str,
652 run: StoreNodeRunFn<Provider>,
653}
654
655pub struct Store<T> {
656 path: std::path::PathBuf,
657 provider: T,
658 remotes: Remotes,
659 graph: dagga::Dag<StoreNode<T>, usize>,
660}
661
662impl<P: 'static> Store<P> {
663 fn read_from_store<T: Resource<Provider = P>>(
664 path: impl AsRef<std::path::Path>,
665 id: &str,
666 ) -> Result<(T, T::Output)> {
667 let path = store_file_path(id, path.as_ref());
668 snafu::ensure!(path.exists(), MissingStoreFileSnafu { id: id.to_owned() });
669
670 log::debug!("{path:?} exists, reading '{id}' from it");
671 let contents = std::fs::read_to_string(&path).context(StoreFileReadSnafu {
672 path: path.to_path_buf(),
673 })?;
674 log::trace!(
675 "contents:\n{}",
676 contents
677 .lines()
678 .map(|line| format!(" {line}"))
679 .collect::<Vec<_>>()
680 .join("\n")
681 );
682 let inert_store_rez: InertStoreResource =
683 serde_json::from_str(&contents).context(DeserializeSnafu {
684 name: id.to_owned(),
685 })?;
686 log::trace!("read inert store resource");
687 log::trace!(
688 "reading local contents: {}",
689 serde_json::to_string_pretty(&inert_store_rez.local)
690 .unwrap()
691 .lines()
692 .map(|line| format!(" {line}"))
693 .collect::<Vec<_>>()
694 .join("\n")
695 );
696 log::trace!("as {}", std::any::type_name::<T>());
697 let stored_definition: T =
698 serde_json::from_value(inert_store_rez.local).context(DeserializeSnafu {
699 name: id.to_owned(),
700 })?;
701
702 log::trace!(" reading remote output JSON value");
703 let remote_value: T::Output =
704 serde_json::from_value(inert_store_rez.remote).context(DeserializeSnafu {
705 name: format!("remote {id}"),
706 })?;
707 Ok((stored_definition, remote_value))
708 }
709
710 pub fn new(path: impl AsRef<std::path::Path>, provider: P) -> Self {
711 Self {
712 path: path.as_ref().to_path_buf(),
713 graph: dagga::Dag::default(),
714 remotes: Default::default(),
715 provider,
716 }
717 }
718
719 pub fn provider(&self) -> &P {
720 &self.provider
721 }
722
723 fn read_file<T>(&self, id: &str) -> Result<(T, T::Output), Error>
724 where
725 T: Resource<Provider = P>,
726 {
727 Self::read_from_store(&self.path, id)
728 }
729
730 fn define_resource<T>(
731 &mut self,
732 id: impl AsRef<str>,
733 local_definition: T,
734 action: Action,
735 stored_definition: Option<T>,
736 output: Option<T::Output>,
737 ) -> Result<StoreResource<T, T::Output>, Error>
738 where
739 T: Resource<Provider = P>,
740 {
741 let id = id.as_ref();
742 let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, action)?;
743 remote_var.set(output);
744
745 let remote_var = remote_var.clone();
746 let local_definition_code = local_definition.clone();
747 let local_definition_store = stored_definition.clone();
748 let store_path = self.path.clone();
749 let run: StoreNodeRunFn<T::Provider> = Box::new({
750 let resource_id = id.to_owned();
751 let remote_var = remote_var.clone();
752 let local_definition_code = local_definition_code.clone();
753 let local_definition_store = local_definition_store.clone();
754 move |provider: &T::Provider| {
755 Box::pin(
756 RunAction {
757 provider,
758 store_path,
759 resource_id,
760 action,
761 local_definition_code,
762 local_definition_store,
763 remote_var,
764 }
765 .run(),
766 )
767 }
768 });
769 let ty = std::any::type_name::<T>();
770
771 {
772 log::debug!("adding main node {action} {id}");
774 let node_name = format!("{action} {id}");
775 let dag_node = dagga::Node::new(StoreNode {
776 name: node_name.clone(),
777 _remote_ty: ty,
778 run,
779 })
780 .with_name(node_name)
781 .with_reads({
782 let mut reads = vec![];
784 for dep in local_definition.dependencies() {
785 let var = self
786 .remotes
787 .get(&dep)
788 .context(MissingResourceSnafu { name: dep })?;
789 reads.push(var.key);
790 }
791 reads
792 });
793 let dag_node = match action {
794 Action::Create | Action::Read | Action::Load | Action::Update => {
795 log::debug!(" with result {rez}");
796 dag_node.with_result(rez)
797 }
798 Action::Destroy => {
799 log::debug!(" with move {rez}");
800 dag_node.with_move(rez)
801 }
802 };
803 self.graph.add_node(dag_node);
804 }
805
806 Ok(StoreResource {
807 name: id.to_owned(),
808 local_definition,
809 action,
810 remote_var,
811 })
812 }
813
814 pub fn resource<T>(
826 &mut self,
827 id: impl AsRef<str>,
828 local_definition: T,
829 ) -> Result<StoreResource<T, T::Output>, Error>
830 where
831 T: Resource<Provider = P>,
832 {
833 let id = id.as_ref();
834 let (action, stored_definition, output) = match self.read_file(id) {
835 Ok((stored_definition, output)) => {
836 log::debug!(" {output:?}");
839 let action = if local_definition != stored_definition {
840 log::debug!(" local resource has changed, so this remote is now stale");
841 Action::Update
842 } else {
843 Action::Load
844 };
845
846 (action, Some(stored_definition), Some(output))
847 }
848 Err(Error::MissingStoreFile { id }) => {
849 log::debug!("store file '{id}' does not exist, creating a new resource",);
850 (Action::Create, None, None)
851 }
852 Err(e) => {
853 log::error!("could not define resource '{id}': {e}");
854 return Err(e);
855 }
856 };
857 self.define_resource(id, local_definition, action, stored_definition, output)
858 }
859
860 pub fn import<T>(
870 &mut self,
871 id: impl AsRef<str>,
872 local_definition: T,
873 ) -> Result<StoreResource<T, T::Output>, Error>
874 where
875 T: Resource<Provider = P>,
876 {
877 self.define_resource(id, local_definition, Action::Read, None, None)
878 }
879
880 pub fn load<T>(
891 &mut self,
892 id: impl AsRef<str>,
893 local_definition: T,
894 remote_definition: T::Output,
895 force_overwrite: bool,
896 ) -> Result<StoreResource<T, T::Output>, Error>
897 where
898 T: Resource<Provider = P>,
899 {
900 let id = id.as_ref();
901 if let Ok((stored_definition, output)) = self.read_file(id) {
902 if local_definition == stored_definition && remote_definition == output {
903 if force_overwrite {
904 log::warn!("loading '{id}' is clobbering an existing value, but `force_overwrite` is `true`");
905 } else {
906 let err = ClobberSnafu { id: id.to_owned() }.build();
907 log::error!("{err}");
908 return Err(err);
909 }
910 }
911 }
912 self.define_resource(
913 id,
914 local_definition,
915 Action::Load,
916 None,
917 Some(remote_definition),
918 )
919 }
920
921 pub fn destroy<T>(&mut self, id: impl AsRef<str>) -> Result<DestroyResource<T>, Error>
923 where
924 T: Resource<Provider = P>,
925 {
926 let id = id.as_ref();
927 let (local, remote) = self.read_file::<T>(id)?;
928 let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, Action::Destroy)?;
929 remote_var.set(Some(remote.clone()));
930 {
931 log::debug!("adding node {} {id}", Action::Load);
933 let node_name = format!("load {id}");
934 let load_node = dagga::Node::new(StoreNode {
935 name: node_name.clone(),
936 _remote_ty: std::any::type_name::<T>(),
937 run: Box::new({
938 let resource_id = id.to_owned();
939 let store_path = self.path.clone();
940 let local = local.clone();
941 let remote_var = remote_var.clone();
942 move |provider| {
943 Box::pin(
944 RunAction {
945 provider,
946 store_path,
947 resource_id,
948 action: Action::Load,
949 local_definition_code: local,
950 remote_var,
951 local_definition_store: None,
952 }
953 .run(),
954 )
955 }
956 }),
957 })
958 .with_name(node_name)
959 .with_reads({
960 let mut reads = vec![];
961 for dep in local.dependencies() {
962 reads.push(
963 self.remotes
964 .get(&dep)
965 .context(MissingResourceSnafu {
966 name: id.to_owned(),
967 })?
968 .key,
969 );
970 }
971 reads
972 })
973 .with_result(rez);
974 self.graph.add_node(load_node);
975 }
976 {
977 log::debug!("adding node {} {id}", Action::Destroy);
978 let node_name = format!("destroy {id}");
979 let destroy_node = StoreNode {
981 name: node_name.clone(),
982 _remote_ty: std::any::type_name::<T>(),
983 run: Box::new({
984 let resource_id = id.to_owned();
985 let local = local.clone();
986 let store_path = self.path.clone();
987 let remote_var = remote_var.clone();
988 move |provider| {
989 Box::pin(
990 RunAction {
991 provider,
992 store_path,
993 resource_id,
994 action: Action::Destroy,
995 local_definition_code: local,
996 local_definition_store: None,
997 remote_var,
998 }
999 .run(),
1000 )
1001 }
1002 }),
1003 };
1004
1005 self.graph.add_node(
1006 dagga::Node::new(destroy_node)
1007 .with_name(node_name)
1008 .with_move(rez),
1009 );
1010 }
1011
1012 Ok(DestroyResource { local, remote })
1013 }
1014
1015 fn get_graph_legend(&self) -> Result<DagLegend<usize>> {
1016 let mut missing_resource_creation = None;
1017 let legend = self.graph.legend()?.with_resources_named(|rez| {
1018 let maybe_name = self.remotes.get_name_by_rez(*rez);
1019 if maybe_name.is_none() {
1020 missing_resource_creation = Some(*rez);
1021 }
1022 maybe_name
1023 });
1024 if let Some(missing) = missing_resource_creation {
1025 log::error!(
1026 "Missing resource {missing}, current resources:\n{}",
1027 self.remotes
1028 );
1029 return MissingNameSnafu { missing }.fail();
1030 }
1031 Ok(legend)
1032 }
1033
1034 pub fn get_schedule_string(&self) -> Result<String, Error> {
1035 let mut dag: dagga::Dag<(), usize> = dagga::Dag::default();
1036 for node in self.graph.nodes() {
1037 let store_node = node.inner();
1038 let print_node = dagga::Node::new(())
1039 .with_name(store_node.name.clone())
1040 .with_reads(node.get_reads().copied())
1041 .with_results(node.get_results().copied())
1042 .with_moves(node.get_moves().copied());
1043 dag.add_node(print_node);
1044 }
1045 struct Proxy {
1046 inner: Schedule<Node<(), usize>>,
1047 }
1048
1049 impl core::fmt::Display for Proxy {
1050 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1051 if self.inner.batches.is_empty() {
1052 f.write_str("--- No changes.\n")?;
1053 f.write_str("--- 🌈🦄\n")?;
1054 }
1055 for (i, batch) in self.inner.batches.iter().enumerate() {
1056 let i = i + 1;
1057 f.write_str("--- step ")?;
1058 f.write_fmt(format_args!("{i}\n"))?;
1059 for node in batch.iter() {
1060 f.write_str(" ")?;
1061 f.write_str(node.name())?;
1062 f.write_str("\n")?;
1063 }
1064 f.write_str("---\n")?;
1065 }
1066 Ok(())
1067 }
1068 }
1069
1070 let proxy = Proxy {
1071 inner: dag.build_schedule().unwrap(),
1072 };
1073 Ok(proxy.to_string())
1074 }
1075
1076 pub fn save_apply_graph(&self, path: impl AsRef<std::path::Path>) -> Result<(), Error> {
1077 if self.graph.is_empty() {
1078 log::warn!("Resource DAG is empty, writing an empty dot file");
1079 }
1080 let legend = self.get_graph_legend()?;
1081 dagga::dot::save_as_dot(&legend, path).context(DotSnafu)?;
1082
1083 Ok(())
1084 }
1085
1086 pub async fn apply(&mut self) -> Result<()> {
1087 let graph = std::mem::take(&mut self.graph);
1088 let schedule = graph
1089 .build_schedule()
1090 .map_err(|e| Error::Schedule { msg: e.to_string() })?;
1091 for (i, batch) in schedule.batches.into_iter().enumerate() {
1092 for (j, node) in batch.into_iter().enumerate() {
1093 log::debug!("applying node {j}, batch {i}");
1094 let store_node = node.into_inner();
1095 (store_node.run)(&self.provider).await?;
1096 }
1097 }
1098 Ok(())
1099 }
1100}