1use std::{future::Future, ops::Deref, pin::Pin};
60
61use dagga::{dot::DagLegend, Node, Schedule};
62use snafu::prelude::*;
63use tokio::io::AsyncWriteExt;
64
65pub use teleform_derive::HasDependencies;
66
67mod has_dependencies_impl;
68pub mod remote;
69#[cfg(test)]
70mod test;
71pub mod utils;
72
73use remote::{Migrated, Remote, RemoteVar, Remotes};
74
75pub trait UserError: core::fmt::Display + core::fmt::Debug + 'static {}
77impl<T: core::fmt::Display + core::fmt::Debug + 'static> UserError for T {}
78
79#[derive(snafu::Snafu, Debug)]
81pub enum Error {
82 #[snafu(display("{source}:\n{}",
83 source.chain()
84 .map(|e| format!("{e}"))
85 .collect::<Vec<_>>()
86 .join("\n -> ")))]
87 Tele { source: anyhow::Error },
88
89 #[snafu(display("Could not read store file '{path:?}': {source}"))]
90 StoreFileRead {
91 path: std::path::PathBuf,
92 source: std::io::Error,
93 },
94
95 #[snafu(display("Could not delete store file '{path:?}': {source}"))]
96 StoreFileDelete {
97 path: std::path::PathBuf,
98 source: std::io::Error,
99 },
100
101 #[snafu(display("Could not serialize stored '{name}': {source}"))]
102 Serialize {
103 name: String,
104 source: serde_json::Error,
105 },
106
107 #[snafu(display("Could not deserialize stored '{name}': {source}"))]
108 Deserialize {
109 name: String,
110 source: serde_json::Error,
111 },
112
113 #[snafu(display("Could not build schedule: {msg}"))]
114 Schedule { msg: String },
115
116 #[snafu(display("Could not create file {path:?}: {source}"))]
117 CreateFile {
118 path: std::path::PathBuf,
119 source: std::io::Error,
120 },
121
122 #[snafu(display("Could not write file {path:?}: {source}"))]
123 WriteFile {
124 path: std::path::PathBuf,
125 source: std::io::Error,
126 },
127
128 #[snafu(display("Remote value of {ty:?} is unresolved. Depends on {depends_on}"))]
129 RemoteUnresolved {
130 ty: &'static str,
131 depends_on: String,
132 },
133
134 #[snafu(display("Could not save the apply graph: {source}"))]
135 Dot { source: dagga::dot::DotError },
136
137 #[snafu(display(
138 "Could not build apply graph because of a missing resource name for '{missing}'"
139 ))]
140 MissingName { missing: usize },
141
142 #[snafu(display("Could not find a resource by the name '{name}'"))]
143 MissingResource { name: String },
144
145 #[snafu(display("Error during '{name}' creation: {error}"))]
146 Create {
147 name: String,
148 error: Box<dyn UserError>,
149 },
150
151 #[snafu(display("Error during '{name}' read and import: {error}"))]
152 Import {
153 name: String,
154 error: Box<dyn UserError>,
155 },
156
157 #[snafu(display("Error during '{name}' update: {error}"))]
158 Update {
159 name: String,
160 error: Box<dyn UserError>,
161 },
162
163 #[snafu(display("Error during '{name}' destruction: {error}"))]
164 Destroy {
165 name: String,
166 error: Box<dyn UserError>,
167 },
168
169 #[snafu(display("Error during execution of a manual step '{name}': {error}"))]
170 Manual {
171 name: String,
172 error: Box<dyn UserError>,
173 },
174
175 #[snafu(display("Missing previous remote value '{name}'"))]
176 Load { name: String },
177
178 #[snafu(display(
179 "Loading '{id}' would clobber an existing value in the store file, \
180 and these values are not the same"
181 ))]
182 Clobber { id: String },
183
184 #[snafu(display("Could not downcast"))]
185 Downcast,
186
187 #[snafu(display("Missing store file for '{id}'"))]
188 MissingStoreFile { id: String },
189}
190
191impl From<anyhow::Error> for Error {
192 fn from(source: anyhow::Error) -> Self {
193 Error::Tele { source }
194 }
195}
196
197impl From<dagga::dot::DotError> for Error {
198 fn from(source: dagga::dot::DotError) -> Self {
199 Self::Dot { source }
200 }
201}
202
203type Result<T, E = Error> = core::result::Result<T, E>;
204
205#[allow(unreachable_code)]
209pub trait Resource:
210 core::fmt::Debug
211 + Clone
212 + PartialEq
213 + HasDependencies
214 + serde::Serialize
215 + serde::de::DeserializeOwned
216 + 'static
217{
218 type Provider;
222
223 type Error: UserError;
225
226 type Output: core::fmt::Debug
229 + Clone
230 + PartialEq
231 + serde::Serialize
232 + serde::de::DeserializeOwned
233 + 'static;
234
235 fn create(
247 &self,
248 _provider: &Self::Provider,
249 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
250 unimplemented!(
251 "Resource::create is unimplemented for {}",
252 std::any::type_name::<Self>()
253 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
254 }
255
256 fn read(
268 &self,
269 _provider: &Self::Provider,
270 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
271 unimplemented!(
272 "Resource::read is unimplemented for {}",
273 std::any::type_name::<Self>()
274 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
275 }
276
277 fn update(
290 &self,
291 _provider: &Self::Provider,
292 _previous_local: &Self,
293 _previous_remote: &Self::Output,
294 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
295 unimplemented!(
296 "Resource::update is unimplemented for {}",
297 std::any::type_name::<Self>()
298 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
299 }
300
301 fn delete(
313 &self,
314 _provider: &Self::Provider,
315 _previous_remote: &Self::Output,
316 ) -> impl Future<Output = Result<(), Self::Error>> {
317 unimplemented!(
318 "Resource::delete is unimplemented for {}",
319 std::any::type_name::<Self>()
320 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
321 }
322}
323
324#[derive(Clone, Default, Debug)]
325pub struct Dependencies {
326 inner: Vec<String>,
328}
329
330impl IntoIterator for Dependencies {
331 type Item = String;
332
333 type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
334
335 fn into_iter(self) -> Self::IntoIter {
336 self.inner.into_iter()
337 }
338}
339
340impl core::fmt::Display for Dependencies {
341 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342 f.write_str(
343 &self
344 .inner
345 .iter()
346 .map(|u| u.to_string())
347 .collect::<Vec<_>>()
348 .join(", "),
349 )
350 }
351}
352
353impl Dependencies {
354 pub fn merge(self, other: Self) -> Self {
355 Dependencies {
356 inner: [self.inner, other.inner].concat(),
357 }
358 }
359}
360
361pub trait HasDependencies {
366 fn dependencies(&self) -> Dependencies {
367 Dependencies::default()
368 }
369}
370
371#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
375pub enum Action {
376 Load,
377 Create,
378 Read,
379 Update,
380 Destroy,
381}
382
383impl core::fmt::Display for Action {
384 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
385 f.write_str(match self {
386 Action::Load => "load",
387 Action::Create => "create",
388 Action::Read => "read",
389 Action::Update => "update",
390 Action::Destroy => "destroy",
391 })
392 }
393}
394
395#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
396struct InertStoreResource {
397 name: String,
398 local: serde_json::Value,
399 remote: serde_json::Value,
400}
401
402impl InertStoreResource {
403 async fn save(
404 &self,
405 resource_id: &str,
406 store_path: impl AsRef<std::path::Path>,
407 ) -> Result<(), Error> {
408 let path = store_file_path(resource_id, &store_path);
409 log::info!("storing {resource_id} to {path:?}");
410
411 let contents = serde_json::to_string_pretty(self).context(SerializeSnafu {
412 name: format!("storing {}", resource_id),
413 })?;
414
415 if let Some(parent) = path.parent() {
417 tokio::fs::create_dir_all(&parent)
418 .await
419 .context(CreateFileSnafu { path: parent })?;
420 }
421
422 let mut file = tokio::fs::File::create(&path)
423 .await
424 .context(CreateFileSnafu { path: path.clone() })?;
425 file.write_all(contents.as_bytes())
426 .await
427 .context(WriteFileSnafu { path: path.clone() })?;
428 Ok(())
429 }
430}
431
432#[derive(Clone, Debug)]
433pub struct StoreResource<L, R> {
434 name: String,
436 local_definition: L,
438 action: Action,
439 remote_var: RemoteVar<R>,
440}
441
442impl<L, R> Deref for StoreResource<L, R> {
443 type Target = L;
444
445 fn deref(&self) -> &Self::Target {
446 &self.local_definition
447 }
448}
449
450impl<L, R> AsRef<L> for StoreResource<L, R> {
451 fn as_ref(&self) -> &L {
452 &self.local_definition
453 }
454}
455
456impl<L, R> TryFrom<StoreResource<L, R>> for InertStoreResource
457where
458 L: serde::Serialize + for<'a> serde::Deserialize<'a>,
459 R: Clone + serde::Serialize + for<'a> serde::Deserialize<'a>,
460{
461 type Error = Error;
462
463 fn try_from(value: StoreResource<L, R>) -> std::result::Result<Self, Self::Error> {
464 let local = serde_json::to_value(value.local_definition).context(SerializeSnafu {
465 name: value.name.clone(),
466 })?;
467 let output = value.remote_var.get().context(LoadSnafu {
468 name: value.name.clone(),
469 })?;
470 let remote = serde_json::to_value(output).context(SerializeSnafu {
471 name: value.name.clone(),
472 })?;
473 Ok(Self {
474 name: value.name,
475 local,
476 remote,
477 })
478 }
479}
480
481impl<T> StoreResource<T, T::Output>
482where
483 T: Resource,
484 T::Output: Clone,
485{
486 pub fn remote<X: Clone + core::fmt::Debug + 'static>(
488 &self,
489 f: impl Fn(&T::Output) -> X + 'static,
490 ) -> Remote<X> {
491 Remote::new(self, f)
492 }
493
494 pub fn action(&self) -> Action {
499 self.action
500 }
501
502 pub fn depends_on<X, Y>(&self, store: &mut Store<T::Provider>, resource: &StoreResource<X, Y>) {
503 let this_var = store.remotes.get(&self.name).unwrap();
504 let that_var = store.remotes.get(&resource.name).unwrap();
505 for node in store.graph.take_nodes() {
506 store.graph.add_node(
507 if node
508 .get_results()
509 .copied()
510 .collect::<Vec<_>>()
511 .contains(&this_var.key)
512 {
513 node.with_read(that_var.key)
514 } else {
515 node
516 },
517 );
518 }
519 }
520}
521
522fn store_file_path(name: &str, store_path: impl AsRef<std::path::Path>) -> std::path::PathBuf {
524 store_path.as_ref().join(format!("{name}.json"))
525}
526
527type StoreNodeRunFn<Provider> = Box<
528 dyn FnOnce(
529 &'_ Provider,
531 ) -> Pin<Box<dyn Future<Output = Result<()>> + '_>>,
532>;
533
534struct RunAction<'a, Provider, T: Resource<Provider = Provider>> {
535 provider: &'a Provider,
536 store_path: std::path::PathBuf,
537 resource_id: String,
539 action: Action,
540 local_definition_code: T,
541 local_definition_store: Option<T>,
542 remote_var: RemoteVar<T::Output>,
543}
544
545impl<Provider, T: Resource<Provider = Provider>> RunAction<'_, Provider, T> {
546 async fn run(self) -> Result<()>
547 where
548 T: Resource,
549 {
550 let Self {
551 provider,
552 store_path,
553 resource_id,
554 action,
555 local_definition_code,
556 local_definition_store,
557 remote_var,
558 } = self;
559 log::info!("{action} '{resource_id}':");
560
561 async fn save<T: Resource>(
562 resource_id: &str,
563 local_definition_code: T,
564 remote_var: &RemoteVar<T::Output>,
565 store_path: impl AsRef<std::path::Path>,
566 ) -> Result<(), Error> {
567 let inert_resource = InertStoreResource {
568 name: resource_id.to_owned(),
569 local: serde_json::to_value(local_definition_code).context(SerializeSnafu {
570 name: format!("store {resource_id}"),
571 })?,
572 remote: serde_json::to_value(
573 remote_var.get().context(LoadSnafu { name: resource_id })?,
574 )
575 .context(SerializeSnafu {
576 name: format!("store {resource_id} remote"),
577 })?,
578 };
579 inert_resource.save(resource_id, store_path).await?;
580 Ok(())
581 }
582
583 match action {
584 Action::Load => {
585 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
586 }
587 Action::Create => {
588 let value = local_definition_code
589 .create(provider)
590 .await
591 .map_err(|error| Error::Create {
592 name: resource_id.to_owned(),
593 error: Box::new(error),
594 })?;
595 remote_var.set(Some(value));
596 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
597 }
598 Action::Read => {
599 let value = local_definition_code
600 .read(provider)
601 .await
602 .map_err(|error| Error::Create {
603 name: resource_id.to_owned(),
604 error: Box::new(error),
605 })?;
606 remote_var.set(Some(value));
607 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
608 }
609 Action::Update => {
610 let previous_local = local_definition_store.unwrap();
611 let previous_remote = remote_var.get().context(LoadSnafu {
612 name: resource_id.clone(),
613 })?;
614 if previous_local == local_definition_code {
615 log::warn!(
616 "Skipping '{resource_id}' update as the local value has not changed.\n\
617 If you require an update, consider adding a sentinel value."
618 );
619 } else {
620 let cmp =
621 pretty_assertions::Comparison::new(&previous_local, &local_definition_code);
622 let change_string = format!("{cmp}")
623 .lines()
624 .map(|line| format!(" {line}"))
625 .collect::<Vec<_>>()
626 .join("\n");
627 log::info!("updating '{resource_id}':\n{change_string}");
628 let output = local_definition_code
629 .update(provider, &previous_local, &previous_remote)
630 .await
631 .map_err(|error| Error::Update {
632 name: resource_id.clone(),
633 error: Box::new(error),
634 })?;
635 remote_var.set(Some(output));
636 save(&resource_id, local_definition_code, &remote_var, store_path).await?;
637 }
638 }
639 Action::Destroy => {
640 log::debug!("running destroy action on {resource_id}");
641 let local_definition = local_definition_code.clone();
645 let previous_remote = remote_var.get().context(LoadSnafu {
646 name: resource_id.clone(),
647 })?;
648 local_definition
649 .delete(provider, &previous_remote)
650 .await
651 .map_err(|error| Error::Destroy {
652 name: resource_id.to_owned(),
653 error: Box::new(error),
654 })?;
655
656 log::info!(" {resource_id} is destroyed");
657 let path = store_file_path(&resource_id, &store_path);
658 log::info!(" removing {resource_id} store file {path:?}");
659 tokio::fs::remove_file(&path)
660 .await
661 .context(StoreFileDeleteSnafu { path })?;
662 remote_var.set(None);
663 }
664 }
665
666 log::info!(" success!");
667 Ok(())
668 }
669}
670
671pub struct DestroyResource<T: Resource> {
672 local: T,
673 remote: T::Output,
674}
675
676impl<T: Resource> Deref for DestroyResource<T> {
677 type Target = T;
678
679 fn deref(&self) -> &Self::Target {
680 &self.local
681 }
682}
683
684impl<T: Resource> DestroyResource<T> {
685 pub fn migrate<X: Clone + core::fmt::Debug + 'static>(
688 &self,
689 f: fn(&T::Output) -> X,
690 ) -> Migrated<X> {
691 Migrated(f(&self.remote))
692 }
693}
694
695struct StoreNode<Provider> {
696 name: String,
697 _remote_ty: &'static str,
698 run: StoreNodeRunFn<Provider>,
699}
700
701struct PreviouslyStored<T: Resource> {
702 action: Action,
703 resource: Option<(T, T::Output)>,
704}
705
706pub struct Store<T> {
707 path: std::path::PathBuf,
708 provider: T,
709 remotes: Remotes,
710 graph: dagga::Dag<StoreNode<T>, usize>,
711}
712
713impl<P: 'static> Store<P> {
714 fn read_from_store<T: Resource<Provider = P>>(
715 path: impl AsRef<std::path::Path>,
716 id: &str,
717 ) -> Result<(T, T::Output)> {
718 let path = store_file_path(id, path.as_ref());
719 snafu::ensure!(path.exists(), MissingStoreFileSnafu { id: id.to_owned() });
720
721 log::debug!("{path:?} exists, reading '{id}' from it");
722 let contents = std::fs::read_to_string(&path).context(StoreFileReadSnafu {
723 path: path.to_path_buf(),
724 })?;
725 log::trace!(
726 "contents:\n{}",
727 contents
728 .lines()
729 .map(|line| format!(" {line}"))
730 .collect::<Vec<_>>()
731 .join("\n")
732 );
733 let inert_store_rez: InertStoreResource =
734 serde_json::from_str(&contents).context(DeserializeSnafu {
735 name: id.to_owned(),
736 })?;
737 log::trace!("read inert store resource");
738 log::trace!(
739 "reading local contents: {}",
740 serde_json::to_string_pretty(&inert_store_rez.local)
741 .unwrap()
742 .lines()
743 .map(|line| format!(" {line}"))
744 .collect::<Vec<_>>()
745 .join("\n")
746 );
747 log::trace!("as {}", std::any::type_name::<T>());
748 let stored_definition: T =
749 serde_json::from_value(inert_store_rez.local).context(DeserializeSnafu {
750 name: id.to_owned(),
751 })?;
752
753 log::trace!(" reading remote output JSON value");
754 let remote_value: T::Output =
755 serde_json::from_value(inert_store_rez.remote).context(DeserializeSnafu {
756 name: format!("remote {id}"),
757 })?;
758 Ok((stored_definition, remote_value))
759 }
760
761 pub fn new(path: impl AsRef<std::path::Path>, provider: P) -> Self {
762 Self {
763 path: path.as_ref().to_path_buf(),
764 graph: dagga::Dag::default(),
765 remotes: Default::default(),
766 provider,
767 }
768 }
769
770 pub fn provider(&self) -> &P {
771 &self.provider
772 }
773
774 fn read_file<T>(&self, id: &str) -> Result<(T, T::Output), Error>
775 where
776 T: Resource<Provider = P>,
777 {
778 Self::read_from_store(&self.path, id)
779 }
780
781 pub fn barrier(&mut self) {
783 self.graph.add_barrier();
784 }
785
786 fn define_resource<T>(
787 &mut self,
788 id: impl AsRef<str>,
789 local_definition: T,
790 action: Action,
791 stored_definition: Option<T>,
792 output: Option<T::Output>,
793 ) -> Result<StoreResource<T, T::Output>, Error>
794 where
795 T: Resource<Provider = P>,
796 {
797 let id = id.as_ref();
798 let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, action)?;
799 remote_var.set(output);
800
801 let remote_var = remote_var.clone();
802 let local_definition_code = local_definition.clone();
803 let local_definition_store = stored_definition.clone();
804 let store_path = self.path.clone();
805 let run: StoreNodeRunFn<T::Provider> = Box::new({
806 let resource_id = id.to_owned();
807 let remote_var = remote_var.clone();
808 let local_definition_code = local_definition_code.clone();
809 let local_definition_store = local_definition_store.clone();
810 move |provider: &T::Provider| {
811 Box::pin(
812 RunAction {
813 provider,
814 store_path,
815 resource_id,
816 action,
817 local_definition_code,
818 local_definition_store,
819 remote_var,
820 }
821 .run(),
822 )
823 }
824 });
825 let ty = std::any::type_name::<T>();
826
827 {
828 log::debug!("adding main node {action} {id}");
830 let node_name = format!("{action} {id}");
831 let dag_node = dagga::Node::new(StoreNode {
832 name: node_name.clone(),
833 _remote_ty: ty,
834 run,
835 })
836 .with_name(node_name)
837 .with_reads({
838 let mut reads = vec![];
840 for dep in local_definition.dependencies() {
841 let var = self
842 .remotes
843 .get(&dep)
844 .context(MissingResourceSnafu { name: dep })?;
845 reads.push(var.key);
846 }
847 reads
848 });
849 let dag_node = match action {
850 Action::Create | Action::Read | Action::Load | Action::Update => {
851 log::debug!(" with result {rez}");
852 dag_node.with_result(rez)
853 }
854 Action::Destroy => {
855 log::debug!(" with move {rez}");
856 dag_node.with_move(rez)
857 }
858 };
859 self.graph.add_node(dag_node);
860 }
861
862 Ok(StoreResource {
863 name: id.to_owned(),
864 local_definition,
865 action,
866 remote_var,
867 })
868 }
869
870 fn determine_action_from_previously_stored<T>(
872 &self,
873 local_definition: &T,
874 id: &str,
875 ) -> Result<PreviouslyStored<T>, Error>
876 where
877 T: Resource<Provider = P>,
878 {
879 match self.read_file(id) {
880 Ok((stored_definition, output)) => {
881 log::debug!(" {output:?}");
884 let action = if *local_definition != stored_definition {
885 log::debug!(" local resource has changed, so this remote is now stale");
886 Action::Update
887 } else {
888 let mut may_need_update = false;
891 for dep in local_definition.dependencies() {
892 let var = self.remotes.get(&dep).context(LoadSnafu { name: dep })?;
893 if var.action != Action::Load {
894 may_need_update = true;
895 break;
896 }
897 }
898 if may_need_update {
899 Action::Update
900 } else {
901 Action::Load
902 }
903 };
904
905 Ok(PreviouslyStored {
906 action,
907 resource: Some((stored_definition, output)),
908 })
909 }
910 Err(Error::MissingStoreFile { id }) => {
911 log::debug!("store file '{id}' does not exist, creating a new resource",);
912 Ok(PreviouslyStored {
913 action: Action::Create,
914 resource: None,
915 })
916 }
917 Err(e) => {
918 log::error!("could not define resource '{id}': {e}");
919 Err(e)
920 }
921 }
922 }
923
924 pub fn resource<T>(
936 &mut self,
937 id: impl AsRef<str>,
938 local_definition: T,
939 ) -> Result<StoreResource<T, T::Output>, Error>
940 where
941 T: Resource<Provider = P>,
942 {
943 let id = id.as_ref();
944 let PreviouslyStored { action, resource } =
945 self.determine_action_from_previously_stored(&local_definition, id)?;
946 let (local, remote) = resource
947 .map(|(local, remote)| (Some(local), Some(remote)))
948 .unwrap_or_default();
949 self.define_resource(id, local_definition, action, local, remote)
950 }
951
952 pub fn import<T>(
962 &mut self,
963 id: impl AsRef<str>,
964 local_definition: T,
965 ) -> Result<StoreResource<T, T::Output>, Error>
966 where
967 T: Resource<Provider = P>,
968 {
969 self.define_resource(id, local_definition, Action::Read, None, None)
970 }
971
972 pub fn load<T>(
983 &mut self,
984 id: impl AsRef<str>,
985 local_definition: T,
986 remote_definition: T::Output,
987 force_overwrite: bool,
988 ) -> Result<StoreResource<T, T::Output>, Error>
989 where
990 T: Resource<Provider = P>,
991 {
992 let id = id.as_ref();
993 if let Ok((stored_definition, output)) = self.read_file(id) {
994 if local_definition == stored_definition && remote_definition == output {
995 if force_overwrite {
996 log::warn!("loading '{id}' is clobbering an existing value, but `force_overwrite` is `true`");
997 } else {
998 let err = ClobberSnafu { id: id.to_owned() }.build();
999 log::error!("{err}");
1000 return Err(err);
1001 }
1002 }
1003 }
1004 self.define_resource(
1005 id,
1006 local_definition,
1007 Action::Load,
1008 None,
1009 Some(remote_definition),
1010 )
1011 }
1012
1013 pub fn destroy<T>(&mut self, id: impl AsRef<str>) -> Result<DestroyResource<T>, Error>
1015 where
1016 T: Resource<Provider = P>,
1017 {
1018 let id = id.as_ref();
1019 let (local, remote) = self.read_file::<T>(id)?;
1020 let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, Action::Destroy)?;
1021 remote_var.set(Some(remote.clone()));
1022 {
1023 log::debug!("adding node {} {id}", Action::Load);
1025 let node_name = format!("load {id}");
1026 let load_node = dagga::Node::new(StoreNode {
1027 name: node_name.clone(),
1028 _remote_ty: std::any::type_name::<T>(),
1029 run: Box::new({
1030 let resource_id = id.to_owned();
1031 let store_path = self.path.clone();
1032 let local = local.clone();
1033 let remote_var = remote_var.clone();
1034 move |provider| {
1035 Box::pin(
1036 RunAction {
1037 provider,
1038 store_path,
1039 resource_id,
1040 action: Action::Load,
1041 local_definition_code: local,
1042 remote_var,
1043 local_definition_store: None,
1044 }
1045 .run(),
1046 )
1047 }
1048 }),
1049 })
1050 .with_name(node_name)
1051 .with_reads({
1052 let mut reads = vec![];
1053 for dep in local.dependencies() {
1054 reads.push(
1055 self.remotes
1056 .get(&dep)
1057 .context(MissingResourceSnafu {
1058 name: id.to_owned(),
1059 })?
1060 .key,
1061 );
1062 }
1063 reads
1064 })
1065 .with_result(rez);
1066 self.graph.add_node(load_node);
1067 }
1068 {
1069 log::debug!("adding node {} {id}", Action::Destroy);
1070 let node_name = format!("destroy {id}");
1071 let destroy_node = StoreNode {
1073 name: node_name.clone(),
1074 _remote_ty: std::any::type_name::<T>(),
1075 run: Box::new({
1076 let resource_id = id.to_owned();
1077 let local = local.clone();
1078 let store_path = self.path.clone();
1079 let remote_var = remote_var.clone();
1080 move |provider| {
1081 Box::pin(
1082 RunAction {
1083 provider,
1084 store_path,
1085 resource_id,
1086 action: Action::Destroy,
1087 local_definition_code: local,
1088 local_definition_store: None,
1089 remote_var,
1090 }
1091 .run(),
1092 )
1093 }
1094 }),
1095 };
1096
1097 self.graph.add_node(
1098 dagga::Node::new(destroy_node)
1099 .with_name(node_name)
1100 .with_move(rez),
1101 );
1102 }
1103
1104 Ok(DestroyResource { local, remote })
1105 }
1106
1107 pub fn after<T: Resource>(&mut self, resource: &StoreResource<T, T::Output>) {}
1108
1109 fn get_graph_legend(&self) -> Result<DagLegend<usize>> {
1110 let mut missing_resource_creation = None;
1111 let legend = self.graph.legend()?.with_resources_named(|rez| {
1112 let maybe_name = self.remotes.get_name_by_rez(*rez);
1113 if maybe_name.is_none() {
1114 missing_resource_creation = Some(*rez);
1115 }
1116 maybe_name
1117 });
1118 if let Some(missing) = missing_resource_creation {
1119 log::error!(
1120 "Missing resource {missing}, current resources:\n{}",
1121 self.remotes
1122 );
1123 return MissingNameSnafu { missing }.fail();
1124 }
1125 Ok(legend)
1126 }
1127
1128 pub fn get_schedule_string(&self) -> Result<String, Error> {
1129 let mut dag: dagga::Dag<(), usize> = dagga::Dag::default();
1130 for node in self.graph.nodes() {
1131 let store_node = node.inner();
1132 let print_node = dagga::Node::new(())
1133 .with_name(store_node.name.clone())
1134 .with_reads(node.get_reads().copied())
1135 .with_results(node.get_results().copied())
1136 .with_moves(node.get_moves().copied())
1137 .with_barrier(node.get_barrier());
1138 dag.add_node(print_node);
1139 }
1140 struct Proxy {
1141 inner: Schedule<Node<(), usize>>,
1142 }
1143
1144 impl core::fmt::Display for Proxy {
1145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1146 if self.inner.batches.is_empty() {
1147 f.write_str("--- No changes.\n")?;
1148 f.write_str("--- 🌈🦄\n")?;
1149 }
1150 for (i, batch) in self.inner.batches.iter().enumerate() {
1151 let i = i + 1;
1152 f.write_str("--- step ")?;
1153 f.write_fmt(format_args!("{i}\n"))?;
1154 for node in batch.iter() {
1155 f.write_str(" ")?;
1156 f.write_str(node.name())?;
1157 f.write_str("\n")?;
1158 }
1159 f.write_str("---\n")?;
1160 }
1161 Ok(())
1162 }
1163 }
1164
1165 let proxy = Proxy {
1166 inner: dag.build_schedule().unwrap(),
1167 };
1168 Ok(proxy.to_string())
1169 }
1170
1171 pub fn save_apply_graph(&self, path: impl AsRef<std::path::Path>) -> Result<(), Error> {
1172 if self.graph.is_empty() {
1173 log::warn!("Resource DAG is empty, writing an empty dot file");
1174 }
1175 let legend = self.get_graph_legend()?;
1176 dagga::dot::save_as_dot(&legend, path).context(DotSnafu)?;
1177
1178 Ok(())
1179 }
1180
1181 pub async fn apply(&mut self) -> Result<()> {
1182 let graph = std::mem::take(&mut self.graph);
1183 let schedule = graph
1184 .build_schedule()
1185 .map_err(|e| Error::Schedule { msg: e.to_string() })?;
1186 for (i, batch) in schedule.batches.into_iter().enumerate() {
1187 for (j, node) in batch.into_iter().enumerate() {
1188 log::debug!("applying node {j}, batch {i}");
1189 let store_node = node.into_inner();
1190 (store_node.run)(&self.provider).await?;
1191 }
1192 }
1193 Ok(())
1194 }
1195}