1use std::{future::Future, ops::Deref, pin::Pin};
60
61use dagga::{dot::DagLegend, Node, Schedule};
62use snafu::prelude::*;
63use tokio::io::AsyncWriteExt;
64
65pub use teleform_derive::HasDependencies;
66
67mod has_dependencies_impl;
68pub mod remote;
69#[cfg(test)]
70mod test;
71pub mod utils;
72
73use remote::{Migrated, Remote, RemoteVar, Remotes};
74
75pub trait UserError: core::fmt::Display + core::fmt::Debug + 'static {}
77impl<T: core::fmt::Display + core::fmt::Debug + 'static> UserError for T {}
78
79#[derive(snafu::Snafu, Debug)]
81pub enum Error {
82 #[snafu(display("{source}:\n{}",
83 source.chain()
84 .map(|e| format!("{e}"))
85 .collect::<Vec<_>>()
86 .join("\n -> ")))]
87 Tele { source: anyhow::Error },
88
89 #[snafu(display("Could not read store file '{path:?}': {source}"))]
90 StoreFileRead {
91 path: std::path::PathBuf,
92 source: std::io::Error,
93 },
94
95 #[snafu(display("Could not delete store file '{path:?}': {source}"))]
96 StoreFileDelete {
97 path: std::path::PathBuf,
98 source: std::io::Error,
99 },
100
101 #[snafu(display("Could not serialize stored '{name}': {source}"))]
102 Serialize {
103 name: String,
104 source: serde_json::Error,
105 },
106
107 #[snafu(display("Could not deserialize stored '{name}': {source}"))]
108 Deserialize {
109 name: String,
110 source: serde_json::Error,
111 },
112
113 #[snafu(display("Could not build schedule: {msg}"))]
114 Schedule { msg: String },
115
116 #[snafu(display("Could not create file {path:?}: {source}"))]
117 CreateFile {
118 path: std::path::PathBuf,
119 source: std::io::Error,
120 },
121
122 #[snafu(display("Could not write file {path:?}: {source}"))]
123 WriteFile {
124 path: std::path::PathBuf,
125 source: std::io::Error,
126 },
127
128 #[snafu(display("Remote value of {ty:?} is unresolved. Depends on {depends_on}"))]
129 RemoteUnresolved {
130 ty: &'static str,
131 depends_on: String,
132 },
133
134 #[snafu(display("Could not save the apply graph: {source}"))]
135 Dot { source: dagga::dot::DotError },
136
137 #[snafu(display(
138 "Could not build apply graph because of a missing resource name for '{missing}'"
139 ))]
140 MissingName { missing: usize },
141
142 #[snafu(display("Could not find a resource by the name '{name}'"))]
143 MissingResource { name: String },
144
145 #[snafu(display("Error during '{name}' creation: {error}"))]
146 Create {
147 name: String,
148 error: Box<dyn UserError>,
149 },
150
151 #[snafu(display("Error during '{name}' read and import: {error}"))]
152 Import {
153 name: String,
154 error: Box<dyn UserError>,
155 },
156
157 #[snafu(display("Error during '{name}' update: {error}"))]
158 Update {
159 name: String,
160 error: Box<dyn UserError>,
161 },
162
163 #[snafu(display("Error during '{name}' destruction: {error}"))]
164 Destroy {
165 name: String,
166 error: Box<dyn UserError>,
167 },
168
169 #[snafu(display("Error during execution of a manual step '{name}': {error}"))]
170 Manual {
171 name: String,
172 error: Box<dyn UserError>,
173 },
174
175 #[snafu(display("Missing previous remote value '{name}'"))]
176 Load { name: String },
177
178 #[snafu(display(
179 "Loading '{id}' would clobber an existing value in the store file, \
180 and these values are not the same"
181 ))]
182 Clobber { id: String },
183
184 #[snafu(display("Could not downcast"))]
185 Downcast,
186
187 #[snafu(display("Missing store file for '{id}'"))]
188 MissingStoreFile { id: String },
189}
190
191impl From<anyhow::Error> for Error {
192 fn from(source: anyhow::Error) -> Self {
193 Error::Tele { source }
194 }
195}
196
197impl From<dagga::dot::DotError> for Error {
198 fn from(source: dagga::dot::DotError) -> Self {
199 Self::Dot { source }
200 }
201}
202
203type Result<T, E = Error> = core::result::Result<T, E>;
204
205#[allow(unreachable_code)]
209pub trait Resource:
210 Clone + PartialEq + HasDependencies + serde::Serialize + serde::de::DeserializeOwned + 'static
211{
212 type Provider;
216
217 type Error: UserError;
219
220 type Output: core::fmt::Debug
223 + Clone
224 + PartialEq
225 + serde::Serialize
226 + serde::de::DeserializeOwned
227 + 'static;
228
229 fn create(
241 &self,
242 _provider: &Self::Provider,
243 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
244 unimplemented!(
245 "Resource::create is unimplemented for {}",
246 std::any::type_name::<Self>()
247 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
248 }
249
250 fn read(
262 &self,
263 _provider: &Self::Provider,
264 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
265 unimplemented!(
266 "Resource::read is unimplemented for {}",
267 std::any::type_name::<Self>()
268 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
269 }
270
271 fn update(
284 &self,
285 _provider: &Self::Provider,
286 _previous_local: &Self,
287 _previous_remote: &Self::Output,
288 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
289 unimplemented!(
290 "Resource::update is unimplemented for {}",
291 std::any::type_name::<Self>()
292 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
293 }
294
295 fn delete(
307 &self,
308 _provider: &Self::Provider,
309 _previous_remote: &Self::Output,
310 ) -> impl Future<Output = Result<(), Self::Error>> {
311 unimplemented!(
312 "Resource::delete is unimplemented for {}",
313 std::any::type_name::<Self>()
314 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
315 }
316}
317
318#[derive(Clone, Default, Debug)]
319pub struct Dependencies {
320 inner: Vec<String>,
322}
323
324impl IntoIterator for Dependencies {
325 type Item = String;
326
327 type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
328
329 fn into_iter(self) -> Self::IntoIter {
330 self.inner.into_iter()
331 }
332}
333
334impl core::fmt::Display for Dependencies {
335 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
336 f.write_str(
337 &self
338 .inner
339 .iter()
340 .map(|u| u.to_string())
341 .collect::<Vec<_>>()
342 .join(", "),
343 )
344 }
345}
346
347impl Dependencies {
348 pub fn merge(self, other: Self) -> Self {
349 Dependencies {
350 inner: [self.inner, other.inner].concat(),
351 }
352 }
353}
354
355pub trait HasDependencies {
360 fn dependencies(&self) -> Dependencies {
361 Dependencies::default()
362 }
363}
364
365#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
369pub enum Action {
370 Load,
371 Create,
372 Read,
373 Update,
374 Destroy,
375}
376
377impl core::fmt::Display for Action {
378 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379 f.write_str(match self {
380 Action::Load => "load",
381 Action::Create => "create",
382 Action::Read => "read",
383 Action::Update => "update",
384 Action::Destroy => "destroy",
385 })
386 }
387}
388
389#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
390struct InertStoreResource {
391 name: String,
392 local: serde_json::Value,
393 remote: serde_json::Value,
394}
395
396impl InertStoreResource {
397 async fn save(
398 &self,
399 resource_id: &str,
400 store_path: impl AsRef<std::path::Path>,
401 ) -> Result<(), Error> {
402 let path = store_file_path(resource_id, &store_path);
403 log::info!("storing {resource_id} to {path:?}");
404
405 let contents = serde_json::to_string_pretty(self).context(SerializeSnafu {
406 name: format!("storing {}", resource_id),
407 })?;
408
409 if let Some(parent) = path.parent() {
411 tokio::fs::create_dir_all(&parent)
412 .await
413 .context(CreateFileSnafu { path: parent })?;
414 }
415
416 let mut file = tokio::fs::File::create(&path)
417 .await
418 .context(CreateFileSnafu { path: path.clone() })?;
419 file.write_all(contents.as_bytes())
420 .await
421 .context(WriteFileSnafu { path: path.clone() })?;
422 Ok(())
423 }
424}
425
426#[derive(Clone, Debug)]
427pub struct StoreResource<L, R> {
428 name: String,
430 local_definition: L,
432 action: Action,
433 remote_var: RemoteVar<R>,
434}
435
436impl<L, R> Deref for StoreResource<L, R> {
437 type Target = L;
438
439 fn deref(&self) -> &Self::Target {
440 &self.local_definition
441 }
442}
443
444impl<L, R> AsRef<L> for StoreResource<L, R> {
445 fn as_ref(&self) -> &L {
446 &self.local_definition
447 }
448}
449
450impl<L, R> TryFrom<StoreResource<L, R>> for InertStoreResource
451where
452 L: serde::Serialize + for<'a> serde::Deserialize<'a>,
453 R: Clone + serde::Serialize + for<'a> serde::Deserialize<'a>,
454{
455 type Error = Error;
456
457 fn try_from(value: StoreResource<L, R>) -> std::result::Result<Self, Self::Error> {
458 let local = serde_json::to_value(value.local_definition).context(SerializeSnafu {
459 name: value.name.clone(),
460 })?;
461 let output = value.remote_var.get().context(LoadSnafu {
462 name: value.name.clone(),
463 })?;
464 let remote = serde_json::to_value(output).context(SerializeSnafu {
465 name: value.name.clone(),
466 })?;
467 Ok(Self {
468 name: value.name,
469 local,
470 remote,
471 })
472 }
473}
474
475impl<T> StoreResource<T, T::Output>
476where
477 T: Resource,
478 T::Output: Clone,
479{
480 pub fn remote<X: Clone + core::fmt::Debug + 'static>(
482 &self,
483 f: impl Fn(&T::Output) -> X + 'static,
484 ) -> Remote<X> {
485 Remote::new(self, f)
486 }
487
488 pub fn action(&self) -> Action {
493 self.action
494 }
495}
496
497fn store_file_path(name: &str, store_path: impl AsRef<std::path::Path>) -> std::path::PathBuf {
499 store_path.as_ref().join(format!("{name}.json"))
500}
501
502type StoreNodeRunFn<Provider> = Box<
503 dyn FnOnce(
504 &'_ Provider,
506 ) -> Pin<Box<dyn Future<Output = Result<()>> + '_>>,
507>;
508
509struct RunAction<'a, Provider, T: Resource<Provider = Provider>> {
510 provider: &'a Provider,
511 store_path: std::path::PathBuf,
512 resource_id: String,
514 action: Action,
515 local_definition_code: T,
516 local_definition_store: Option<T>,
517 remote_var: RemoteVar<T::Output>,
518}
519
520impl<Provider, T: Resource<Provider = Provider>> RunAction<'_, Provider, T> {
521 async fn run(self) -> Result<()>
522 where
523 T: Resource,
524 {
525 let Self {
526 provider,
527 store_path,
528 resource_id,
529 action,
530 local_definition_code,
531 local_definition_store,
532 remote_var,
533 } = self;
534 log::info!("{action} '{resource_id}':");
535
536 async fn save<T: Resource>(
537 resource_id: &str,
538 local_definition_code: T,
539 remote_var: &RemoteVar<T::Output>,
540 store_path: impl AsRef<std::path::Path>,
541 ) -> Result<(), Error> {
542 let inert_resource = InertStoreResource {
543 name: resource_id.to_owned(),
544 local: serde_json::to_value(local_definition_code).context(SerializeSnafu {
545 name: format!("store {resource_id}"),
546 })?,
547 remote: serde_json::to_value(
548 remote_var.get().context(LoadSnafu { name: resource_id })?,
549 )
550 .context(SerializeSnafu {
551 name: format!("store {resource_id} remote"),
552 })?,
553 };
554 inert_resource.save(resource_id, store_path).await?;
555 Ok(())
556 }
557
558 match action {
559 Action::Load => {
560 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
561 }
562 Action::Create => {
563 let value = local_definition_code
564 .create(provider)
565 .await
566 .map_err(|error| Error::Create {
567 name: resource_id.to_owned(),
568 error: Box::new(error),
569 })?;
570 remote_var.set(Some(value));
571 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
572 }
573 Action::Read => {
574 let value = local_definition_code
575 .read(provider)
576 .await
577 .map_err(|error| Error::Create {
578 name: resource_id.to_owned(),
579 error: Box::new(error),
580 })?;
581 remote_var.set(Some(value));
582 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
583 }
584 Action::Update => {
585 let previous_local = local_definition_store.unwrap();
586 let previous_remote = remote_var.get().context(LoadSnafu {
587 name: resource_id.clone(),
588 })?;
589 let output = local_definition_code
590 .update(provider, &previous_local, &previous_remote)
591 .await
592 .map_err(|error| Error::Update {
593 name: resource_id.clone(),
594 error: Box::new(error),
595 })?;
596 remote_var.set(Some(output));
597 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
598 }
599 Action::Destroy => {
600 log::debug!("running destroy action on {resource_id}");
601 let local_definition = local_definition_code.clone();
605 let previous_remote = remote_var.get().context(LoadSnafu {
606 name: resource_id.clone(),
607 })?;
608 local_definition
609 .delete(provider, &previous_remote)
610 .await
611 .map_err(|error| Error::Destroy {
612 name: resource_id.to_owned(),
613 error: Box::new(error),
614 })?;
615
616 log::info!(" {resource_id} is destroyed");
617 let path = store_file_path(&resource_id, &store_path);
618 log::info!(" removing {resource_id} store file {path:?}");
619 tokio::fs::remove_file(&path)
620 .await
621 .context(StoreFileDeleteSnafu { path })?;
622 remote_var.set(None);
623 }
624 }
625
626 log::info!(" success!");
627 Ok(())
628 }
629}
630
631pub struct DestroyResource<T: Resource> {
632 local: T,
633 remote: T::Output,
634}
635
636impl<T: Resource> Deref for DestroyResource<T> {
637 type Target = T;
638
639 fn deref(&self) -> &Self::Target {
640 &self.local
641 }
642}
643
644impl<T: Resource> DestroyResource<T> {
645 pub fn migrate<X: Clone + core::fmt::Debug + 'static>(
648 &self,
649 f: fn(&T::Output) -> X,
650 ) -> Migrated<X> {
651 Migrated(f(&self.remote))
652 }
653}
654
655struct StoreNode<Provider> {
656 name: String,
657 _remote_ty: &'static str,
658 run: StoreNodeRunFn<Provider>,
659}
660
661struct PreviouslyStored<T: Resource> {
662 action: Action,
663 resource: Option<(T, T::Output)>,
664}
665
666pub struct Store<T> {
667 path: std::path::PathBuf,
668 provider: T,
669 remotes: Remotes,
670 graph: dagga::Dag<StoreNode<T>, usize>,
671}
672
673impl<P: 'static> Store<P> {
674 fn read_from_store<T: Resource<Provider = P>>(
675 path: impl AsRef<std::path::Path>,
676 id: &str,
677 ) -> Result<(T, T::Output)> {
678 let path = store_file_path(id, path.as_ref());
679 snafu::ensure!(path.exists(), MissingStoreFileSnafu { id: id.to_owned() });
680
681 log::debug!("{path:?} exists, reading '{id}' from it");
682 let contents = std::fs::read_to_string(&path).context(StoreFileReadSnafu {
683 path: path.to_path_buf(),
684 })?;
685 log::trace!(
686 "contents:\n{}",
687 contents
688 .lines()
689 .map(|line| format!(" {line}"))
690 .collect::<Vec<_>>()
691 .join("\n")
692 );
693 let inert_store_rez: InertStoreResource =
694 serde_json::from_str(&contents).context(DeserializeSnafu {
695 name: id.to_owned(),
696 })?;
697 log::trace!("read inert store resource");
698 log::trace!(
699 "reading local contents: {}",
700 serde_json::to_string_pretty(&inert_store_rez.local)
701 .unwrap()
702 .lines()
703 .map(|line| format!(" {line}"))
704 .collect::<Vec<_>>()
705 .join("\n")
706 );
707 log::trace!("as {}", std::any::type_name::<T>());
708 let stored_definition: T =
709 serde_json::from_value(inert_store_rez.local).context(DeserializeSnafu {
710 name: id.to_owned(),
711 })?;
712
713 log::trace!(" reading remote output JSON value");
714 let remote_value: T::Output =
715 serde_json::from_value(inert_store_rez.remote).context(DeserializeSnafu {
716 name: format!("remote {id}"),
717 })?;
718 Ok((stored_definition, remote_value))
719 }
720
721 pub fn new(path: impl AsRef<std::path::Path>, provider: P) -> Self {
722 Self {
723 path: path.as_ref().to_path_buf(),
724 graph: dagga::Dag::default(),
725 remotes: Default::default(),
726 provider,
727 }
728 }
729
730 pub fn provider(&self) -> &P {
731 &self.provider
732 }
733
734 fn read_file<T>(&self, id: &str) -> Result<(T, T::Output), Error>
735 where
736 T: Resource<Provider = P>,
737 {
738 Self::read_from_store(&self.path, id)
739 }
740
741 fn define_resource<T>(
742 &mut self,
743 id: impl AsRef<str>,
744 local_definition: T,
745 action: Action,
746 stored_definition: Option<T>,
747 output: Option<T::Output>,
748 ) -> Result<StoreResource<T, T::Output>, Error>
749 where
750 T: Resource<Provider = P>,
751 {
752 let id = id.as_ref();
753 let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, action)?;
754 remote_var.set(output);
755
756 let remote_var = remote_var.clone();
757 let local_definition_code = local_definition.clone();
758 let local_definition_store = stored_definition.clone();
759 let store_path = self.path.clone();
760 let run: StoreNodeRunFn<T::Provider> = Box::new({
761 let resource_id = id.to_owned();
762 let remote_var = remote_var.clone();
763 let local_definition_code = local_definition_code.clone();
764 let local_definition_store = local_definition_store.clone();
765 move |provider: &T::Provider| {
766 Box::pin(
767 RunAction {
768 provider,
769 store_path,
770 resource_id,
771 action,
772 local_definition_code,
773 local_definition_store,
774 remote_var,
775 }
776 .run(),
777 )
778 }
779 });
780 let ty = std::any::type_name::<T>();
781
782 {
783 log::debug!("adding main node {action} {id}");
785 let node_name = format!("{action} {id}");
786 let dag_node = dagga::Node::new(StoreNode {
787 name: node_name.clone(),
788 _remote_ty: ty,
789 run,
790 })
791 .with_name(node_name)
792 .with_reads({
793 let mut reads = vec![];
795 for dep in local_definition.dependencies() {
796 let var = self
797 .remotes
798 .get(&dep)
799 .context(MissingResourceSnafu { name: dep })?;
800 reads.push(var.key);
801 }
802 reads
803 });
804 let dag_node = match action {
805 Action::Create | Action::Read | Action::Load | Action::Update => {
806 log::debug!(" with result {rez}");
807 dag_node.with_result(rez)
808 }
809 Action::Destroy => {
810 log::debug!(" with move {rez}");
811 dag_node.with_move(rez)
812 }
813 };
814 self.graph.add_node(dag_node);
815 }
816
817 Ok(StoreResource {
818 name: id.to_owned(),
819 local_definition,
820 action,
821 remote_var,
822 })
823 }
824
825 fn determine_action_from_previously_stored<T>(
827 &self,
828 local_definition: &T,
829 id: &str,
830 ) -> Result<PreviouslyStored<T>, Error>
831 where
832 T: Resource<Provider = P>,
833 {
834 match self.read_file(id) {
835 Ok((stored_definition, output)) => {
836 log::debug!(" {output:?}");
839 let action = if *local_definition != stored_definition {
840 log::debug!(" local resource has changed, so this remote is now stale");
841 Action::Update
842 } else {
843 let mut may_need_update = false;
846 for dep in local_definition.dependencies() {
847 let var = self.remotes.get(&dep).context(LoadSnafu { name: dep })?;
848 if var.action != Action::Load {
849 may_need_update = true;
850 break;
851 }
852 }
853 if may_need_update {
854 Action::Update
855 } else {
856 Action::Load
857 }
858 };
859
860 Ok(PreviouslyStored {
861 action,
862 resource: Some((stored_definition, output)),
863 })
864 }
865 Err(Error::MissingStoreFile { id }) => {
866 log::debug!("store file '{id}' does not exist, creating a new resource",);
867 Ok(PreviouslyStored {
868 action: Action::Create,
869 resource: None,
870 })
871 }
872 Err(e) => {
873 log::error!("could not define resource '{id}': {e}");
874 Err(e)
875 }
876 }
877 }
878
879 pub fn resource<T>(
891 &mut self,
892 id: impl AsRef<str>,
893 local_definition: T,
894 ) -> Result<StoreResource<T, T::Output>, Error>
895 where
896 T: Resource<Provider = P>,
897 {
898 let id = id.as_ref();
899 let PreviouslyStored { action, resource } =
900 self.determine_action_from_previously_stored(&local_definition, id)?;
901 let (local, remote) = resource
902 .map(|(local, remote)| (Some(local), Some(remote)))
903 .unwrap_or_default();
904 self.define_resource(id, local_definition, action, local, remote)
905 }
906
907 pub fn import<T>(
917 &mut self,
918 id: impl AsRef<str>,
919 local_definition: T,
920 ) -> Result<StoreResource<T, T::Output>, Error>
921 where
922 T: Resource<Provider = P>,
923 {
924 self.define_resource(id, local_definition, Action::Read, None, None)
925 }
926
927 pub fn load<T>(
938 &mut self,
939 id: impl AsRef<str>,
940 local_definition: T,
941 remote_definition: T::Output,
942 force_overwrite: bool,
943 ) -> Result<StoreResource<T, T::Output>, Error>
944 where
945 T: Resource<Provider = P>,
946 {
947 let id = id.as_ref();
948 if let Ok((stored_definition, output)) = self.read_file(id) {
949 if local_definition == stored_definition && remote_definition == output {
950 if force_overwrite {
951 log::warn!("loading '{id}' is clobbering an existing value, but `force_overwrite` is `true`");
952 } else {
953 let err = ClobberSnafu { id: id.to_owned() }.build();
954 log::error!("{err}");
955 return Err(err);
956 }
957 }
958 }
959 self.define_resource(
960 id,
961 local_definition,
962 Action::Load,
963 None,
964 Some(remote_definition),
965 )
966 }
967
968 pub fn destroy<T>(&mut self, id: impl AsRef<str>) -> Result<DestroyResource<T>, Error>
970 where
971 T: Resource<Provider = P>,
972 {
973 let id = id.as_ref();
974 let (local, remote) = self.read_file::<T>(id)?;
975 let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, Action::Destroy)?;
976 remote_var.set(Some(remote.clone()));
977 {
978 log::debug!("adding node {} {id}", Action::Load);
980 let node_name = format!("load {id}");
981 let load_node = dagga::Node::new(StoreNode {
982 name: node_name.clone(),
983 _remote_ty: std::any::type_name::<T>(),
984 run: Box::new({
985 let resource_id = id.to_owned();
986 let store_path = self.path.clone();
987 let local = local.clone();
988 let remote_var = remote_var.clone();
989 move |provider| {
990 Box::pin(
991 RunAction {
992 provider,
993 store_path,
994 resource_id,
995 action: Action::Load,
996 local_definition_code: local,
997 remote_var,
998 local_definition_store: None,
999 }
1000 .run(),
1001 )
1002 }
1003 }),
1004 })
1005 .with_name(node_name)
1006 .with_reads({
1007 let mut reads = vec![];
1008 for dep in local.dependencies() {
1009 reads.push(
1010 self.remotes
1011 .get(&dep)
1012 .context(MissingResourceSnafu {
1013 name: id.to_owned(),
1014 })?
1015 .key,
1016 );
1017 }
1018 reads
1019 })
1020 .with_result(rez);
1021 self.graph.add_node(load_node);
1022 }
1023 {
1024 log::debug!("adding node {} {id}", Action::Destroy);
1025 let node_name = format!("destroy {id}");
1026 let destroy_node = StoreNode {
1028 name: node_name.clone(),
1029 _remote_ty: std::any::type_name::<T>(),
1030 run: Box::new({
1031 let resource_id = id.to_owned();
1032 let local = local.clone();
1033 let store_path = self.path.clone();
1034 let remote_var = remote_var.clone();
1035 move |provider| {
1036 Box::pin(
1037 RunAction {
1038 provider,
1039 store_path,
1040 resource_id,
1041 action: Action::Destroy,
1042 local_definition_code: local,
1043 local_definition_store: None,
1044 remote_var,
1045 }
1046 .run(),
1047 )
1048 }
1049 }),
1050 };
1051
1052 self.graph.add_node(
1053 dagga::Node::new(destroy_node)
1054 .with_name(node_name)
1055 .with_move(rez),
1056 );
1057 }
1058
1059 Ok(DestroyResource { local, remote })
1060 }
1061
1062 fn get_graph_legend(&self) -> Result<DagLegend<usize>> {
1063 let mut missing_resource_creation = None;
1064 let legend = self.graph.legend()?.with_resources_named(|rez| {
1065 let maybe_name = self.remotes.get_name_by_rez(*rez);
1066 if maybe_name.is_none() {
1067 missing_resource_creation = Some(*rez);
1068 }
1069 maybe_name
1070 });
1071 if let Some(missing) = missing_resource_creation {
1072 log::error!(
1073 "Missing resource {missing}, current resources:\n{}",
1074 self.remotes
1075 );
1076 return MissingNameSnafu { missing }.fail();
1077 }
1078 Ok(legend)
1079 }
1080
1081 pub fn get_schedule_string(&self) -> Result<String, Error> {
1082 let mut dag: dagga::Dag<(), usize> = dagga::Dag::default();
1083 for node in self.graph.nodes() {
1084 let store_node = node.inner();
1085 let print_node = dagga::Node::new(())
1086 .with_name(store_node.name.clone())
1087 .with_reads(node.get_reads().copied())
1088 .with_results(node.get_results().copied())
1089 .with_moves(node.get_moves().copied());
1090 dag.add_node(print_node);
1091 }
1092 struct Proxy {
1093 inner: Schedule<Node<(), usize>>,
1094 }
1095
1096 impl core::fmt::Display for Proxy {
1097 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1098 if self.inner.batches.is_empty() {
1099 f.write_str("--- No changes.\n")?;
1100 f.write_str("--- 🌈🦄\n")?;
1101 }
1102 for (i, batch) in self.inner.batches.iter().enumerate() {
1103 let i = i + 1;
1104 f.write_str("--- step ")?;
1105 f.write_fmt(format_args!("{i}\n"))?;
1106 for node in batch.iter() {
1107 f.write_str(" ")?;
1108 f.write_str(node.name())?;
1109 f.write_str("\n")?;
1110 }
1111 f.write_str("---\n")?;
1112 }
1113 Ok(())
1114 }
1115 }
1116
1117 let proxy = Proxy {
1118 inner: dag.build_schedule().unwrap(),
1119 };
1120 Ok(proxy.to_string())
1121 }
1122
1123 pub fn save_apply_graph(&self, path: impl AsRef<std::path::Path>) -> Result<(), Error> {
1124 if self.graph.is_empty() {
1125 log::warn!("Resource DAG is empty, writing an empty dot file");
1126 }
1127 let legend = self.get_graph_legend()?;
1128 dagga::dot::save_as_dot(&legend, path).context(DotSnafu)?;
1129
1130 Ok(())
1131 }
1132
1133 pub async fn apply(&mut self) -> Result<()> {
1134 let graph = std::mem::take(&mut self.graph);
1135 let schedule = graph
1136 .build_schedule()
1137 .map_err(|e| Error::Schedule { msg: e.to_string() })?;
1138 for (i, batch) in schedule.batches.into_iter().enumerate() {
1139 for (j, node) in batch.into_iter().enumerate() {
1140 log::debug!("applying node {j}, batch {i}");
1141 let store_node = node.into_inner();
1142 (store_node.run)(&self.provider).await?;
1143 }
1144 }
1145 Ok(())
1146 }
1147}