1use std::{future::Future, ops::Deref, pin::Pin};
60
61use dagga::{dot::DagLegend, Node, Schedule};
62use snafu::prelude::*;
63use tokio::io::AsyncWriteExt;
64
65pub use teleform_derive::HasDependencies;
66
67mod has_dependencies_impl;
68pub mod remote;
69#[cfg(test)]
70mod test;
71pub mod utils;
72
73use remote::{Migrated, Remote, RemoteVar, Remotes};
74
75pub trait UserError: core::fmt::Display + core::fmt::Debug + 'static {}
77impl<T: core::fmt::Display + core::fmt::Debug + 'static> UserError for T {}
78
79#[derive(snafu::Snafu, Debug)]
81pub enum Error {
82 #[snafu(display("{source}:\n{}",
83 source.chain()
84 .map(|e| format!("{e}"))
85 .collect::<Vec<_>>()
86 .join("\n -> ")))]
87 Tele { source: anyhow::Error },
88
89 #[snafu(display("Could not read store file '{path:?}': {source}"))]
90 StoreFileRead {
91 path: std::path::PathBuf,
92 source: std::io::Error,
93 },
94
95 #[snafu(display("Could not delete store file '{path:?}': {source}"))]
96 StoreFileDelete {
97 path: std::path::PathBuf,
98 source: std::io::Error,
99 },
100
101 #[snafu(display("Could not serialize stored '{name}': {source}"))]
102 Serialize {
103 name: String,
104 source: serde_json::Error,
105 },
106
107 #[snafu(display("Could not deserialize stored '{name}': {source}"))]
108 Deserialize {
109 name: String,
110 source: serde_json::Error,
111 },
112
113 #[snafu(display("Could not build schedule: {msg}"))]
114 Schedule { msg: String },
115
116 #[snafu(display("Could not create file {path:?}: {source}"))]
117 CreateFile {
118 path: std::path::PathBuf,
119 source: std::io::Error,
120 },
121
122 #[snafu(display("Could not write file {path:?}: {source}"))]
123 WriteFile {
124 path: std::path::PathBuf,
125 source: std::io::Error,
126 },
127
128 #[snafu(display("Remote value of {ty:?} is unresolved. Depends on {depends_on}"))]
129 RemoteUnresolved {
130 ty: &'static str,
131 depends_on: String,
132 },
133
134 #[snafu(display("Could not save the apply graph: {source}"))]
135 Dot { source: dagga::dot::DotError },
136
137 #[snafu(display(
138 "Could not build apply graph because of a missing resource name for '{missing}'"
139 ))]
140 MissingName { missing: usize },
141
142 #[snafu(display("Could not find a resource by the name '{name}'"))]
143 MissingResource { name: String },
144
145 #[snafu(display("Error during '{name}' creation: {error}"))]
146 Create {
147 name: String,
148 error: Box<dyn UserError>,
149 },
150
151 #[snafu(display("Error during '{name}' read and import: {error}"))]
152 Import {
153 name: String,
154 error: Box<dyn UserError>,
155 },
156
157 #[snafu(display("Error during '{name}' update: {error}"))]
158 Update {
159 name: String,
160 error: Box<dyn UserError>,
161 },
162
163 #[snafu(display("Error during '{name}' destruction: {error}"))]
164 Destroy {
165 name: String,
166 error: Box<dyn UserError>,
167 },
168
169 #[snafu(display("Error during execution of a manual step '{name}': {error}"))]
170 Manual {
171 name: String,
172 error: Box<dyn UserError>,
173 },
174
175 #[snafu(display("Missing previous remote value '{name}'"))]
176 Load { name: String },
177
178 #[snafu(display(
179 "Loading '{id}' would clobber an existing value in the store file, \
180 and these values are not the same"
181 ))]
182 Clobber { id: String },
183
184 #[snafu(display("Could not downcast"))]
185 Downcast,
186
187 #[snafu(display("Missing store file for '{id}'"))]
188 MissingStoreFile { id: String },
189}
190
191impl From<anyhow::Error> for Error {
192 fn from(source: anyhow::Error) -> Self {
193 Error::Tele { source }
194 }
195}
196
197impl From<dagga::dot::DotError> for Error {
198 fn from(source: dagga::dot::DotError) -> Self {
199 Self::Dot { source }
200 }
201}
202
203type Result<T, E = Error> = core::result::Result<T, E>;
204
205#[allow(unreachable_code)]
209pub trait Resource:
210 core::fmt::Debug
211 + Clone
212 + PartialEq
213 + HasDependencies
214 + serde::Serialize
215 + serde::de::DeserializeOwned
216 + 'static
217{
218 type Provider;
222
223 type Error: UserError;
225
226 type Output: core::fmt::Debug
229 + Clone
230 + PartialEq
231 + serde::Serialize
232 + serde::de::DeserializeOwned
233 + 'static;
234
235 fn create(
247 &self,
248 _provider: &Self::Provider,
249 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
250 unimplemented!(
251 "Resource::create is unimplemented for {}",
252 std::any::type_name::<Self>()
253 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
254 }
255
256 fn read(
268 &self,
269 _provider: &Self::Provider,
270 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
271 unimplemented!(
272 "Resource::read is unimplemented for {}",
273 std::any::type_name::<Self>()
274 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
275 }
276
277 fn update(
290 &self,
291 _provider: &Self::Provider,
292 _previous_local: &Self,
293 _previous_remote: &Self::Output,
294 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
295 unimplemented!(
296 "Resource::update is unimplemented for {}",
297 std::any::type_name::<Self>()
298 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
299 }
300
301 fn delete(
313 &self,
314 _provider: &Self::Provider,
315 _previous_remote: &Self::Output,
316 ) -> impl Future<Output = Result<(), Self::Error>> {
317 unimplemented!(
318 "Resource::delete is unimplemented for {}",
319 std::any::type_name::<Self>()
320 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
321 }
322}
323
324#[derive(Clone, Default, Debug)]
325pub struct Dependencies {
326 inner: Vec<String>,
328}
329
330impl IntoIterator for Dependencies {
331 type Item = String;
332
333 type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
334
335 fn into_iter(self) -> Self::IntoIter {
336 self.inner.into_iter()
337 }
338}
339
340impl core::fmt::Display for Dependencies {
341 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342 f.write_str(
343 &self
344 .inner
345 .iter()
346 .map(|u| u.to_string())
347 .collect::<Vec<_>>()
348 .join(", "),
349 )
350 }
351}
352
353impl Dependencies {
354 pub fn merge(self, other: Self) -> Self {
355 Dependencies {
356 inner: [self.inner, other.inner].concat(),
357 }
358 }
359}
360
361pub trait HasDependencies {
366 fn dependencies(&self) -> Dependencies {
367 Dependencies::default()
368 }
369}
370
371#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
375pub enum Action {
376 Load,
377 Create,
378 Read,
379 Update,
380 Destroy,
381}
382
383impl core::fmt::Display for Action {
384 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
385 f.write_str(match self {
386 Action::Load => "load",
387 Action::Create => "create",
388 Action::Read => "read",
389 Action::Update => "update",
390 Action::Destroy => "destroy",
391 })
392 }
393}
394
395#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
396struct InertStoreResource {
397 name: String,
398 local: serde_json::Value,
399 remote: serde_json::Value,
400}
401
402impl InertStoreResource {
403 async fn save(
404 &self,
405 resource_id: &str,
406 store_path: impl AsRef<std::path::Path>,
407 ) -> Result<(), Error> {
408 let path = store_file_path(resource_id, &store_path);
409 log::info!("storing {resource_id} to {path:?}");
410
411 let contents = serde_json::to_string_pretty(self).context(SerializeSnafu {
412 name: format!("storing {}", resource_id),
413 })?;
414
415 if let Some(parent) = path.parent() {
417 tokio::fs::create_dir_all(&parent)
418 .await
419 .context(CreateFileSnafu { path: parent })?;
420 }
421
422 let mut file = tokio::fs::File::create(&path)
423 .await
424 .context(CreateFileSnafu { path: path.clone() })?;
425 file.write_all(contents.as_bytes())
426 .await
427 .context(WriteFileSnafu { path: path.clone() })?;
428 Ok(())
429 }
430}
431
432#[derive(Clone, Debug)]
433pub struct StoreResource<L, R> {
434 name: String,
436 local_definition: L,
438 action: Action,
439 remote_var: RemoteVar<R>,
440}
441
442impl<L, R> Deref for StoreResource<L, R> {
443 type Target = L;
444
445 fn deref(&self) -> &Self::Target {
446 &self.local_definition
447 }
448}
449
450impl<L, R> AsRef<L> for StoreResource<L, R> {
451 fn as_ref(&self) -> &L {
452 &self.local_definition
453 }
454}
455
456impl<L, R> TryFrom<StoreResource<L, R>> for InertStoreResource
457where
458 L: serde::Serialize + for<'a> serde::Deserialize<'a>,
459 R: Clone + serde::Serialize + for<'a> serde::Deserialize<'a>,
460{
461 type Error = Error;
462
463 fn try_from(value: StoreResource<L, R>) -> std::result::Result<Self, Self::Error> {
464 let local = serde_json::to_value(value.local_definition).context(SerializeSnafu {
465 name: value.name.clone(),
466 })?;
467 let output = value.remote_var.get().context(LoadSnafu {
468 name: value.name.clone(),
469 })?;
470 let remote = serde_json::to_value(output).context(SerializeSnafu {
471 name: value.name.clone(),
472 })?;
473 Ok(Self {
474 name: value.name,
475 local,
476 remote,
477 })
478 }
479}
480
481impl<T> StoreResource<T, T::Output>
482where
483 T: Resource,
484 T::Output: Clone,
485{
486 pub fn remote<X: Clone + core::fmt::Debug + 'static>(
488 &self,
489 f: impl Fn(&T::Output) -> X + 'static,
490 ) -> Remote<X> {
491 Remote::new(self, f)
492 }
493
494 pub fn action(&self) -> Action {
499 self.action
500 }
501}
502
503fn store_file_path(name: &str, store_path: impl AsRef<std::path::Path>) -> std::path::PathBuf {
505 store_path.as_ref().join(format!("{name}.json"))
506}
507
508type StoreNodeRunFn<Provider> = Box<
509 dyn FnOnce(
510 &'_ Provider,
512 ) -> Pin<Box<dyn Future<Output = Result<()>> + '_>>,
513>;
514
515struct RunAction<'a, Provider, T: Resource<Provider = Provider>> {
516 provider: &'a Provider,
517 store_path: std::path::PathBuf,
518 resource_id: String,
520 action: Action,
521 local_definition_code: T,
522 local_definition_store: Option<T>,
523 remote_var: RemoteVar<T::Output>,
524}
525
526impl<Provider, T: Resource<Provider = Provider>> RunAction<'_, Provider, T> {
527 async fn run(self) -> Result<()>
528 where
529 T: Resource,
530 {
531 let Self {
532 provider,
533 store_path,
534 resource_id,
535 action,
536 local_definition_code,
537 local_definition_store,
538 remote_var,
539 } = self;
540 log::info!("{action} '{resource_id}':");
541
542 async fn save<T: Resource>(
543 resource_id: &str,
544 local_definition_code: T,
545 remote_var: &RemoteVar<T::Output>,
546 store_path: impl AsRef<std::path::Path>,
547 ) -> Result<(), Error> {
548 let inert_resource = InertStoreResource {
549 name: resource_id.to_owned(),
550 local: serde_json::to_value(local_definition_code).context(SerializeSnafu {
551 name: format!("store {resource_id}"),
552 })?,
553 remote: serde_json::to_value(
554 remote_var.get().context(LoadSnafu { name: resource_id })?,
555 )
556 .context(SerializeSnafu {
557 name: format!("store {resource_id} remote"),
558 })?,
559 };
560 inert_resource.save(resource_id, store_path).await?;
561 Ok(())
562 }
563
564 match action {
565 Action::Load => {
566 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
567 }
568 Action::Create => {
569 let value = local_definition_code
570 .create(provider)
571 .await
572 .map_err(|error| Error::Create {
573 name: resource_id.to_owned(),
574 error: Box::new(error),
575 })?;
576 remote_var.set(Some(value));
577 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
578 }
579 Action::Read => {
580 let value = local_definition_code
581 .read(provider)
582 .await
583 .map_err(|error| Error::Create {
584 name: resource_id.to_owned(),
585 error: Box::new(error),
586 })?;
587 remote_var.set(Some(value));
588 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
589 }
590 Action::Update => {
591 let previous_local = local_definition_store.unwrap();
592 let previous_remote = remote_var.get().context(LoadSnafu {
593 name: resource_id.clone(),
594 })?;
595 if previous_local == local_definition_code {
596 log::warn!(
597 "Skipping '{resource_id}' update as the local value has not changed.\n\
598 If you require an update, consider adding a sentinel value."
599 );
600 } else {
601 let cmp =
602 pretty_assertions::Comparison::new(&previous_local, &local_definition_code);
603 let change_string = format!("{cmp}")
604 .lines()
605 .map(|line| format!(" {line}"))
606 .collect::<Vec<_>>()
607 .join("\n");
608 log::info!("updating '{resource_id}':\n{change_string}");
609 let output = local_definition_code
610 .update(provider, &previous_local, &previous_remote)
611 .await
612 .map_err(|error| Error::Update {
613 name: resource_id.clone(),
614 error: Box::new(error),
615 })?;
616 remote_var.set(Some(output));
617 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
618 }
619 }
620 Action::Destroy => {
621 log::debug!("running destroy action on {resource_id}");
622 let local_definition = local_definition_code.clone();
626 let previous_remote = remote_var.get().context(LoadSnafu {
627 name: resource_id.clone(),
628 })?;
629 local_definition
630 .delete(provider, &previous_remote)
631 .await
632 .map_err(|error| Error::Destroy {
633 name: resource_id.to_owned(),
634 error: Box::new(error),
635 })?;
636
637 log::info!(" {resource_id} is destroyed");
638 let path = store_file_path(&resource_id, &store_path);
639 log::info!(" removing {resource_id} store file {path:?}");
640 tokio::fs::remove_file(&path)
641 .await
642 .context(StoreFileDeleteSnafu { path })?;
643 remote_var.set(None);
644 }
645 }
646
647 log::info!(" success!");
648 Ok(())
649 }
650}
651
652pub struct DestroyResource<T: Resource> {
653 local: T,
654 remote: T::Output,
655}
656
657impl<T: Resource> Deref for DestroyResource<T> {
658 type Target = T;
659
660 fn deref(&self) -> &Self::Target {
661 &self.local
662 }
663}
664
665impl<T: Resource> DestroyResource<T> {
666 pub fn migrate<X: Clone + core::fmt::Debug + 'static>(
669 &self,
670 f: fn(&T::Output) -> X,
671 ) -> Migrated<X> {
672 Migrated(f(&self.remote))
673 }
674}
675
676struct StoreNode<Provider> {
677 name: String,
678 _remote_ty: &'static str,
679 run: StoreNodeRunFn<Provider>,
680}
681
682struct PreviouslyStored<T: Resource> {
683 action: Action,
684 resource: Option<(T, T::Output)>,
685}
686
687pub struct Store<T> {
688 path: std::path::PathBuf,
689 provider: T,
690 remotes: Remotes,
691 graph: dagga::Dag<StoreNode<T>, usize>,
692}
693
694impl<P: 'static> Store<P> {
695 fn read_from_store<T: Resource<Provider = P>>(
696 path: impl AsRef<std::path::Path>,
697 id: &str,
698 ) -> Result<(T, T::Output)> {
699 let path = store_file_path(id, path.as_ref());
700 snafu::ensure!(path.exists(), MissingStoreFileSnafu { id: id.to_owned() });
701
702 log::debug!("{path:?} exists, reading '{id}' from it");
703 let contents = std::fs::read_to_string(&path).context(StoreFileReadSnafu {
704 path: path.to_path_buf(),
705 })?;
706 log::trace!(
707 "contents:\n{}",
708 contents
709 .lines()
710 .map(|line| format!(" {line}"))
711 .collect::<Vec<_>>()
712 .join("\n")
713 );
714 let inert_store_rez: InertStoreResource =
715 serde_json::from_str(&contents).context(DeserializeSnafu {
716 name: id.to_owned(),
717 })?;
718 log::trace!("read inert store resource");
719 log::trace!(
720 "reading local contents: {}",
721 serde_json::to_string_pretty(&inert_store_rez.local)
722 .unwrap()
723 .lines()
724 .map(|line| format!(" {line}"))
725 .collect::<Vec<_>>()
726 .join("\n")
727 );
728 log::trace!("as {}", std::any::type_name::<T>());
729 let stored_definition: T =
730 serde_json::from_value(inert_store_rez.local).context(DeserializeSnafu {
731 name: id.to_owned(),
732 })?;
733
734 log::trace!(" reading remote output JSON value");
735 let remote_value: T::Output =
736 serde_json::from_value(inert_store_rez.remote).context(DeserializeSnafu {
737 name: format!("remote {id}"),
738 })?;
739 Ok((stored_definition, remote_value))
740 }
741
742 pub fn new(path: impl AsRef<std::path::Path>, provider: P) -> Self {
743 Self {
744 path: path.as_ref().to_path_buf(),
745 graph: dagga::Dag::default(),
746 remotes: Default::default(),
747 provider,
748 }
749 }
750
751 pub fn provider(&self) -> &P {
752 &self.provider
753 }
754
755 fn read_file<T>(&self, id: &str) -> Result<(T, T::Output), Error>
756 where
757 T: Resource<Provider = P>,
758 {
759 Self::read_from_store(&self.path, id)
760 }
761
762 fn define_resource<T>(
763 &mut self,
764 id: impl AsRef<str>,
765 local_definition: T,
766 action: Action,
767 stored_definition: Option<T>,
768 output: Option<T::Output>,
769 ) -> Result<StoreResource<T, T::Output>, Error>
770 where
771 T: Resource<Provider = P>,
772 {
773 let id = id.as_ref();
774 let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, action)?;
775 remote_var.set(output);
776
777 let remote_var = remote_var.clone();
778 let local_definition_code = local_definition.clone();
779 let local_definition_store = stored_definition.clone();
780 let store_path = self.path.clone();
781 let run: StoreNodeRunFn<T::Provider> = Box::new({
782 let resource_id = id.to_owned();
783 let remote_var = remote_var.clone();
784 let local_definition_code = local_definition_code.clone();
785 let local_definition_store = local_definition_store.clone();
786 move |provider: &T::Provider| {
787 Box::pin(
788 RunAction {
789 provider,
790 store_path,
791 resource_id,
792 action,
793 local_definition_code,
794 local_definition_store,
795 remote_var,
796 }
797 .run(),
798 )
799 }
800 });
801 let ty = std::any::type_name::<T>();
802
803 {
804 log::debug!("adding main node {action} {id}");
806 let node_name = format!("{action} {id}");
807 let dag_node = dagga::Node::new(StoreNode {
808 name: node_name.clone(),
809 _remote_ty: ty,
810 run,
811 })
812 .with_name(node_name)
813 .with_reads({
814 let mut reads = vec![];
816 for dep in local_definition.dependencies() {
817 let var = self
818 .remotes
819 .get(&dep)
820 .context(MissingResourceSnafu { name: dep })?;
821 reads.push(var.key);
822 }
823 reads
824 });
825 let dag_node = match action {
826 Action::Create | Action::Read | Action::Load | Action::Update => {
827 log::debug!(" with result {rez}");
828 dag_node.with_result(rez)
829 }
830 Action::Destroy => {
831 log::debug!(" with move {rez}");
832 dag_node.with_move(rez)
833 }
834 };
835 self.graph.add_node(dag_node);
836 }
837
838 Ok(StoreResource {
839 name: id.to_owned(),
840 local_definition,
841 action,
842 remote_var,
843 })
844 }
845
846 fn determine_action_from_previously_stored<T>(
848 &self,
849 local_definition: &T,
850 id: &str,
851 ) -> Result<PreviouslyStored<T>, Error>
852 where
853 T: Resource<Provider = P>,
854 {
855 match self.read_file(id) {
856 Ok((stored_definition, output)) => {
857 log::debug!(" {output:?}");
860 let action = if *local_definition != stored_definition {
861 log::debug!(" local resource has changed, so this remote is now stale");
862 Action::Update
863 } else {
864 let mut may_need_update = false;
867 for dep in local_definition.dependencies() {
868 let var = self.remotes.get(&dep).context(LoadSnafu { name: dep })?;
869 if var.action != Action::Load {
870 may_need_update = true;
871 break;
872 }
873 }
874 if may_need_update {
875 Action::Update
876 } else {
877 Action::Load
878 }
879 };
880
881 Ok(PreviouslyStored {
882 action,
883 resource: Some((stored_definition, output)),
884 })
885 }
886 Err(Error::MissingStoreFile { id }) => {
887 log::debug!("store file '{id}' does not exist, creating a new resource",);
888 Ok(PreviouslyStored {
889 action: Action::Create,
890 resource: None,
891 })
892 }
893 Err(e) => {
894 log::error!("could not define resource '{id}': {e}");
895 Err(e)
896 }
897 }
898 }
899
900 pub fn resource<T>(
912 &mut self,
913 id: impl AsRef<str>,
914 local_definition: T,
915 ) -> Result<StoreResource<T, T::Output>, Error>
916 where
917 T: Resource<Provider = P>,
918 {
919 let id = id.as_ref();
920 let PreviouslyStored { action, resource } =
921 self.determine_action_from_previously_stored(&local_definition, id)?;
922 let (local, remote) = resource
923 .map(|(local, remote)| (Some(local), Some(remote)))
924 .unwrap_or_default();
925 self.define_resource(id, local_definition, action, local, remote)
926 }
927
928 pub fn import<T>(
938 &mut self,
939 id: impl AsRef<str>,
940 local_definition: T,
941 ) -> Result<StoreResource<T, T::Output>, Error>
942 where
943 T: Resource<Provider = P>,
944 {
945 self.define_resource(id, local_definition, Action::Read, None, None)
946 }
947
948 pub fn load<T>(
959 &mut self,
960 id: impl AsRef<str>,
961 local_definition: T,
962 remote_definition: T::Output,
963 force_overwrite: bool,
964 ) -> Result<StoreResource<T, T::Output>, Error>
965 where
966 T: Resource<Provider = P>,
967 {
968 let id = id.as_ref();
969 if let Ok((stored_definition, output)) = self.read_file(id) {
970 if local_definition == stored_definition && remote_definition == output {
971 if force_overwrite {
972 log::warn!("loading '{id}' is clobbering an existing value, but `force_overwrite` is `true`");
973 } else {
974 let err = ClobberSnafu { id: id.to_owned() }.build();
975 log::error!("{err}");
976 return Err(err);
977 }
978 }
979 }
980 self.define_resource(
981 id,
982 local_definition,
983 Action::Load,
984 None,
985 Some(remote_definition),
986 )
987 }
988
989 pub fn destroy<T>(&mut self, id: impl AsRef<str>) -> Result<DestroyResource<T>, Error>
991 where
992 T: Resource<Provider = P>,
993 {
994 let id = id.as_ref();
995 let (local, remote) = self.read_file::<T>(id)?;
996 let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, Action::Destroy)?;
997 remote_var.set(Some(remote.clone()));
998 {
999 log::debug!("adding node {} {id}", Action::Load);
1001 let node_name = format!("load {id}");
1002 let load_node = dagga::Node::new(StoreNode {
1003 name: node_name.clone(),
1004 _remote_ty: std::any::type_name::<T>(),
1005 run: Box::new({
1006 let resource_id = id.to_owned();
1007 let store_path = self.path.clone();
1008 let local = local.clone();
1009 let remote_var = remote_var.clone();
1010 move |provider| {
1011 Box::pin(
1012 RunAction {
1013 provider,
1014 store_path,
1015 resource_id,
1016 action: Action::Load,
1017 local_definition_code: local,
1018 remote_var,
1019 local_definition_store: None,
1020 }
1021 .run(),
1022 )
1023 }
1024 }),
1025 })
1026 .with_name(node_name)
1027 .with_reads({
1028 let mut reads = vec![];
1029 for dep in local.dependencies() {
1030 reads.push(
1031 self.remotes
1032 .get(&dep)
1033 .context(MissingResourceSnafu {
1034 name: id.to_owned(),
1035 })?
1036 .key,
1037 );
1038 }
1039 reads
1040 })
1041 .with_result(rez);
1042 self.graph.add_node(load_node);
1043 }
1044 {
1045 log::debug!("adding node {} {id}", Action::Destroy);
1046 let node_name = format!("destroy {id}");
1047 let destroy_node = StoreNode {
1049 name: node_name.clone(),
1050 _remote_ty: std::any::type_name::<T>(),
1051 run: Box::new({
1052 let resource_id = id.to_owned();
1053 let local = local.clone();
1054 let store_path = self.path.clone();
1055 let remote_var = remote_var.clone();
1056 move |provider| {
1057 Box::pin(
1058 RunAction {
1059 provider,
1060 store_path,
1061 resource_id,
1062 action: Action::Destroy,
1063 local_definition_code: local,
1064 local_definition_store: None,
1065 remote_var,
1066 }
1067 .run(),
1068 )
1069 }
1070 }),
1071 };
1072
1073 self.graph.add_node(
1074 dagga::Node::new(destroy_node)
1075 .with_name(node_name)
1076 .with_move(rez),
1077 );
1078 }
1079
1080 Ok(DestroyResource { local, remote })
1081 }
1082
1083 fn get_graph_legend(&self) -> Result<DagLegend<usize>> {
1084 let mut missing_resource_creation = None;
1085 let legend = self.graph.legend()?.with_resources_named(|rez| {
1086 let maybe_name = self.remotes.get_name_by_rez(*rez);
1087 if maybe_name.is_none() {
1088 missing_resource_creation = Some(*rez);
1089 }
1090 maybe_name
1091 });
1092 if let Some(missing) = missing_resource_creation {
1093 log::error!(
1094 "Missing resource {missing}, current resources:\n{}",
1095 self.remotes
1096 );
1097 return MissingNameSnafu { missing }.fail();
1098 }
1099 Ok(legend)
1100 }
1101
1102 pub fn get_schedule_string(&self) -> Result<String, Error> {
1103 let mut dag: dagga::Dag<(), usize> = dagga::Dag::default();
1104 for node in self.graph.nodes() {
1105 let store_node = node.inner();
1106 let print_node = dagga::Node::new(())
1107 .with_name(store_node.name.clone())
1108 .with_reads(node.get_reads().copied())
1109 .with_results(node.get_results().copied())
1110 .with_moves(node.get_moves().copied());
1111 dag.add_node(print_node);
1112 }
1113 struct Proxy {
1114 inner: Schedule<Node<(), usize>>,
1115 }
1116
1117 impl core::fmt::Display for Proxy {
1118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1119 if self.inner.batches.is_empty() {
1120 f.write_str("--- No changes.\n")?;
1121 f.write_str("--- 🌈🦄\n")?;
1122 }
1123 for (i, batch) in self.inner.batches.iter().enumerate() {
1124 let i = i + 1;
1125 f.write_str("--- step ")?;
1126 f.write_fmt(format_args!("{i}\n"))?;
1127 for node in batch.iter() {
1128 f.write_str(" ")?;
1129 f.write_str(node.name())?;
1130 f.write_str("\n")?;
1131 }
1132 f.write_str("---\n")?;
1133 }
1134 Ok(())
1135 }
1136 }
1137
1138 let proxy = Proxy {
1139 inner: dag.build_schedule().unwrap(),
1140 };
1141 Ok(proxy.to_string())
1142 }
1143
1144 pub fn save_apply_graph(&self, path: impl AsRef<std::path::Path>) -> Result<(), Error> {
1145 if self.graph.is_empty() {
1146 log::warn!("Resource DAG is empty, writing an empty dot file");
1147 }
1148 let legend = self.get_graph_legend()?;
1149 dagga::dot::save_as_dot(&legend, path).context(DotSnafu)?;
1150
1151 Ok(())
1152 }
1153
1154 pub async fn apply(&mut self) -> Result<()> {
1155 let graph = std::mem::take(&mut self.graph);
1156 let schedule = graph
1157 .build_schedule()
1158 .map_err(|e| Error::Schedule { msg: e.to_string() })?;
1159 for (i, batch) in schedule.batches.into_iter().enumerate() {
1160 for (j, node) in batch.into_iter().enumerate() {
1161 log::debug!("applying node {j}, batch {i}");
1162 let store_node = node.into_inner();
1163 (store_node.run)(&self.provider).await?;
1164 }
1165 }
1166 Ok(())
1167 }
1168}