1use std::{future::Future, ops::Deref, pin::Pin};
60
61use dagga::{dot::DagLegend, Node, Schedule};
62use snafu::prelude::*;
63use tokio::io::AsyncWriteExt;
64
65pub use teleform_derive::HasDependencies;
66
67mod has_dependencies_impl;
68pub mod remote;
69#[cfg(test)]
70mod test;
71pub mod utils;
72
73use remote::{Migrated, Remote, RemoteVar, Remotes};
74
75pub trait UserError: core::fmt::Display + core::fmt::Debug + 'static {}
77impl<T: core::fmt::Display + core::fmt::Debug + 'static> UserError for T {}
78
79#[derive(snafu::Snafu, Debug)]
81pub enum Error {
82 #[snafu(display("{source}:\n{}",
83 source.chain()
84 .map(|e| format!("{e}"))
85 .collect::<Vec<_>>()
86 .join("\n -> ")))]
87 Tele { source: anyhow::Error },
88
89 #[snafu(display("Could not read store file '{path:?}': {source}"))]
90 StoreFileRead {
91 path: std::path::PathBuf,
92 source: std::io::Error,
93 },
94
95 #[snafu(display("Could not delete store file '{path:?}': {source}"))]
96 StoreFileDelete {
97 path: std::path::PathBuf,
98 source: std::io::Error,
99 },
100
101 #[snafu(display("Could not serialize stored '{name}': {source}"))]
102 Serialize {
103 name: String,
104 source: serde_json::Error,
105 },
106
107 #[snafu(display("Could not deserialize stored '{name}': {source}"))]
108 Deserialize {
109 name: String,
110 source: serde_json::Error,
111 },
112
113 #[snafu(display("Could not build schedule: {msg}"))]
114 Schedule { msg: String },
115
116 #[snafu(display("Could not create file {path:?}: {source}"))]
117 CreateFile {
118 path: std::path::PathBuf,
119 source: std::io::Error,
120 },
121
122 #[snafu(display("Could not write file {path:?}: {source}"))]
123 WriteFile {
124 path: std::path::PathBuf,
125 source: std::io::Error,
126 },
127
128 #[snafu(display("Remote value of {ty:?} is unresolved. Depends on {depends_on}"))]
129 RemoteUnresolved {
130 ty: &'static str,
131 depends_on: String,
132 },
133
134 #[snafu(display("Could not save the apply graph: {source}"))]
135 Dot { source: dagga::dot::DotError },
136
137 #[snafu(display(
138 "Could not build apply graph because of a missing resource name for '{missing}'"
139 ))]
140 MissingName { missing: usize },
141
142 #[snafu(display("Could not find a resource by the name '{name}'"))]
143 MissingResource { name: String },
144
145 #[snafu(display("Error during '{name}' creation: {error}"))]
146 Create {
147 name: String,
148 error: Box<dyn UserError>,
149 },
150
151 #[snafu(display("Error during '{name}' read and import: {error}"))]
152 Import {
153 name: String,
154 error: Box<dyn UserError>,
155 },
156
157 #[snafu(display("Error during '{name}' update: {error}"))]
158 Update {
159 name: String,
160 error: Box<dyn UserError>,
161 },
162
163 #[snafu(display("Error during '{name}' destruction: {error}"))]
164 Destroy {
165 name: String,
166 error: Box<dyn UserError>,
167 },
168
169 #[snafu(display("Missing previous remote value '{name}'"))]
170 Load { name: String },
171
172 #[snafu(display(
173 "Loading '{id}' would clobber an existing value in the store file, \
174 and these values are not the same"
175 ))]
176 Clobber { id: &'static str },
177
178 #[snafu(display("Could not downcast"))]
179 Downcast,
180
181 #[snafu(display("Missing store file for '{id}'"))]
182 MissingStoreFile { id: String },
183}
184
185impl From<anyhow::Error> for Error {
186 fn from(source: anyhow::Error) -> Self {
187 Error::Tele { source }
188 }
189}
190
191impl From<dagga::dot::DotError> for Error {
192 fn from(source: dagga::dot::DotError) -> Self {
193 Self::Dot { source }
194 }
195}
196
197type Result<T, E = Error> = core::result::Result<T, E>;
198
199#[allow(unreachable_code)]
203pub trait Resource:
204 Clone + PartialEq + HasDependencies + serde::Serialize + serde::de::DeserializeOwned + 'static
205{
206 type Provider;
210
211 type Error: UserError;
213
214 type Output: core::fmt::Debug
217 + Clone
218 + PartialEq
219 + serde::Serialize
220 + serde::de::DeserializeOwned
221 + 'static;
222
223 fn create(
235 &self,
236 _provider: &Self::Provider,
237 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
238 unimplemented!(
239 "Resource::create is unimplemented for {}",
240 std::any::type_name::<Self>()
241 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
242 }
243
244 fn read(
256 &self,
257 _provider: &Self::Provider,
258 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
259 unimplemented!(
260 "Resource::read is unimplemented for {}",
261 std::any::type_name::<Self>()
262 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
263 }
264
265 fn update(
278 &self,
279 _provider: &Self::Provider,
280 _previous_local: &Self,
281 _previous_remote: &Self::Output,
282 ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
283 unimplemented!(
284 "Resource::update is unimplemented for {}",
285 std::any::type_name::<Self>()
286 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
287 }
288
289 fn delete(
301 &self,
302 _provider: &Self::Provider,
303 _previous_remote: &Self::Output,
304 ) -> impl Future<Output = Result<(), Self::Error>> {
305 unimplemented!(
306 "Resource::delete is unimplemented for {}",
307 std::any::type_name::<Self>()
308 ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
309 }
310}
311
312#[derive(Clone, Default, Debug)]
313pub struct Dependencies {
314 inner: Vec<String>,
316}
317
318impl IntoIterator for Dependencies {
319 type Item = String;
320
321 type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
322
323 fn into_iter(self) -> Self::IntoIter {
324 self.inner.into_iter()
325 }
326}
327
328impl core::fmt::Display for Dependencies {
329 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330 f.write_str(
331 &self
332 .inner
333 .iter()
334 .map(|u| u.to_string())
335 .collect::<Vec<_>>()
336 .join(", "),
337 )
338 }
339}
340
341impl Dependencies {
342 pub fn merge(self, other: Self) -> Self {
343 Dependencies {
344 inner: [self.inner, other.inner].concat(),
345 }
346 }
347}
348
349pub trait HasDependencies {
354 fn dependencies(&self) -> Dependencies {
355 Dependencies::default()
356 }
357}
358
359#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
363pub enum Action {
364 Load,
365 Create,
366 Read,
367 Update,
368 Destroy,
369}
370
371impl core::fmt::Display for Action {
372 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373 f.write_str(match self {
374 Action::Load => "load",
375 Action::Create => "create",
376 Action::Read => "read",
377 Action::Update => "update",
378 Action::Destroy => "destroy",
379 })
380 }
381}
382
383#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
384struct InertStoreResource {
385 name: String,
386 local: serde_json::Value,
387 remote: serde_json::Value,
388}
389
390impl InertStoreResource {
391 async fn save(
392 &self,
393 resource_id: &str,
394 store_path: impl AsRef<std::path::Path>,
395 ) -> Result<(), Error> {
396 let path = store_file_path(resource_id, &store_path);
397 log::info!("storing {resource_id} to {path:?}");
398
399 let contents = serde_json::to_string_pretty(self).context(SerializeSnafu {
400 name: format!("storing {}", resource_id),
401 })?;
402
403 let mut file = tokio::fs::File::create(&path)
404 .await
405 .context(CreateFileSnafu { path: path.clone() })?;
406 file.write_all(contents.as_bytes())
407 .await
408 .context(WriteFileSnafu { path: path.clone() })?;
409 Ok(())
410 }
411}
412
413#[derive(Clone, Debug)]
414pub struct StoreResource<L, R> {
415 name: String,
417 local_definition: L,
419
420 remote_var: RemoteVar<R>,
421}
422
423impl<L, R> Deref for StoreResource<L, R> {
424 type Target = L;
425
426 fn deref(&self) -> &Self::Target {
427 &self.local_definition
428 }
429}
430
431impl<L, R> AsRef<L> for StoreResource<L, R> {
432 fn as_ref(&self) -> &L {
433 &self.local_definition
434 }
435}
436
437impl<L, R> TryFrom<StoreResource<L, R>> for InertStoreResource
438where
439 L: serde::Serialize + for<'a> serde::Deserialize<'a>,
440 R: Clone + serde::Serialize + for<'a> serde::Deserialize<'a>,
441{
442 type Error = Error;
443
444 fn try_from(value: StoreResource<L, R>) -> std::result::Result<Self, Self::Error> {
445 let local = serde_json::to_value(value.local_definition).context(SerializeSnafu {
446 name: value.name.clone(),
447 })?;
448 let output = value.remote_var.get().context(LoadSnafu {
449 name: value.name.clone(),
450 })?;
451 let remote = serde_json::to_value(output).context(SerializeSnafu {
452 name: value.name.clone(),
453 })?;
454 Ok(Self {
455 name: value.name,
456 local,
457 remote,
458 })
459 }
460}
461
462impl<T> StoreResource<T, T::Output>
463where
464 T: Resource,
465 T::Output: Clone,
466{
467 pub fn remote<X: Clone + core::fmt::Debug + 'static>(
469 &self,
470 f: fn(&T::Output) -> X,
471 ) -> Remote<X> {
472 Remote::new(self, f)
473 }
474}
475
476fn store_file_path(name: &str, store_path: impl AsRef<std::path::Path>) -> std::path::PathBuf {
478 store_path.as_ref().join(format!("{name}.json"))
479}
480
481type StoreNodeRunFn<Provider> = Box<
482 dyn FnOnce(
483 &'_ Provider,
485 ) -> Pin<Box<dyn Future<Output = Result<()>> + '_>>,
486>;
487
488struct RunAction<'a, Provider, T: Resource<Provider = Provider>> {
489 provider: &'a Provider,
490 store_path: std::path::PathBuf,
491 resource_id: &'static str,
493 action: Action,
494 local_definition_code: T,
495 local_definition_store: Option<T>,
496 remote_var: RemoteVar<T::Output>,
497}
498
499impl<Provider, T: Resource<Provider = Provider>> RunAction<'_, Provider, T> {
500 async fn run(self) -> Result<()>
501 where
502 T: Resource,
503 {
504 let Self {
505 provider,
506 store_path,
507 resource_id,
508 action,
509 local_definition_code,
510 local_definition_store,
511 remote_var,
512 } = self;
513 log::info!("{action} '{resource_id}':");
514
515 async fn save<T: Resource>(
516 resource_id: &str,
517 local_definition_code: T,
518 remote_var: &RemoteVar<T::Output>,
519 store_path: impl AsRef<std::path::Path>,
520 ) -> Result<(), Error> {
521 let inert_resource = InertStoreResource {
522 name: resource_id.to_owned(),
523 local: serde_json::to_value(local_definition_code).context(SerializeSnafu {
524 name: format!("store {resource_id}"),
525 })?,
526 remote: serde_json::to_value(
527 remote_var.get().context(LoadSnafu { name: resource_id })?,
528 )
529 .context(SerializeSnafu {
530 name: format!("store {resource_id} remote"),
531 })?,
532 };
533 inert_resource.save(resource_id, store_path).await?;
534 Ok(())
535 }
536
537 match action {
538 Action::Load => {
539 save(resource_id, local_definition_code, &remote_var, store_path).await?;
540 }
541 Action::Create => {
542 let value = local_definition_code
543 .create(provider)
544 .await
545 .map_err(|error| Error::Create {
546 name: resource_id.to_owned(),
547 error: Box::new(error),
548 })?;
549 remote_var.set(Some(value));
550 save(resource_id, local_definition_code, &remote_var, store_path).await?;
551 }
552 Action::Read => {
553 let value = local_definition_code
554 .read(provider)
555 .await
556 .map_err(|error| Error::Create {
557 name: resource_id.to_owned(),
558 error: Box::new(error),
559 })?;
560 remote_var.set(Some(value));
561 save(resource_id, local_definition_code, &remote_var, store_path).await?;
562 }
563 Action::Update => {
564 let previous_local = local_definition_store.unwrap();
565 let previous_remote = remote_var.get().context(LoadSnafu { name: resource_id })?;
566 let output = local_definition_code
567 .update(provider, &previous_local, &previous_remote)
568 .await
569 .map_err(|error| Error::Update {
570 name: resource_id.to_owned(),
571 error: Box::new(error),
572 })?;
573 remote_var.set(Some(output));
574 save(resource_id, local_definition_code, &remote_var, store_path).await?;
575 }
576 Action::Destroy => {
577 log::debug!("running destroy action on {resource_id}");
578 let local_definition = local_definition_code.clone();
582 let previous_remote = remote_var.get().context(LoadSnafu { name: resource_id })?;
583 local_definition
584 .delete(provider, &previous_remote)
585 .await
586 .map_err(|error| Error::Destroy {
587 name: resource_id.to_owned(),
588 error: Box::new(error),
589 })?;
590
591 log::info!(" {resource_id} is destroyed");
592 let path = store_file_path(resource_id, &store_path);
593 log::info!(" removing {resource_id} store file {path:?}");
594 tokio::fs::remove_file(&path)
595 .await
596 .context(StoreFileDeleteSnafu { path })?;
597 remote_var.set(None);
598 }
599 }
600
601 log::info!(" success!");
602 Ok(())
603 }
604}
605
606pub struct DestroyResource<T: Resource> {
607 local: T,
608 remote: T::Output,
609}
610
611impl<T: Resource> Deref for DestroyResource<T> {
612 type Target = T;
613
614 fn deref(&self) -> &Self::Target {
615 &self.local
616 }
617}
618
619impl<T: Resource> DestroyResource<T> {
620 pub fn migrate<X: Clone + core::fmt::Debug + 'static>(
623 &self,
624 f: fn(&T::Output) -> X,
625 ) -> Migrated<X> {
626 Migrated(f(&self.remote))
627 }
628}
629
630struct StoreNode<Provider> {
631 name: String,
632 _remote_ty: &'static str,
633 run: StoreNodeRunFn<Provider>,
634}
635
636pub struct Store<T> {
637 path: std::path::PathBuf,
638 provider: T,
639 remotes: Remotes,
640 graph: dagga::Dag<StoreNode<T>, usize>,
641}
642
643impl<P: 'static> Store<P> {
644 fn read_from_store<T: Resource<Provider = P>>(
645 path: impl AsRef<std::path::Path>,
646 id: &str,
647 ) -> Result<(T, T::Output)> {
648 let path = store_file_path(id, path.as_ref());
649 snafu::ensure!(path.exists(), MissingStoreFileSnafu { id: id.to_owned() });
650
651 log::debug!("{path:?} exists, reading '{id}' from it");
652 let contents = std::fs::read_to_string(&path).context(StoreFileReadSnafu {
653 path: path.to_path_buf(),
654 })?;
655 log::trace!(
656 "contents:\n{}",
657 contents
658 .lines()
659 .map(|line| format!(" {line}"))
660 .collect::<Vec<_>>()
661 .join("\n")
662 );
663 let inert_store_rez: InertStoreResource =
664 serde_json::from_str(&contents).context(DeserializeSnafu {
665 name: id.to_owned(),
666 })?;
667 log::trace!("read inert store resource");
668 log::trace!(
669 "reading local contents: {}",
670 serde_json::to_string_pretty(&inert_store_rez.local)
671 .unwrap()
672 .lines()
673 .map(|line| format!(" {line}"))
674 .collect::<Vec<_>>()
675 .join("\n")
676 );
677 log::trace!("as {}", std::any::type_name::<T>());
678 let stored_definition: T =
679 serde_json::from_value(inert_store_rez.local).context(DeserializeSnafu {
680 name: id.to_owned(),
681 })?;
682
683 log::trace!(" reading remote output JSON value");
684 let remote_value: T::Output =
685 serde_json::from_value(inert_store_rez.remote).context(DeserializeSnafu {
686 name: format!("remote {id}"),
687 })?;
688 Ok((stored_definition, remote_value))
689 }
690
691 pub fn new(path: impl AsRef<std::path::Path>, provider: P) -> Self {
692 Self {
693 path: path.as_ref().to_path_buf(),
694 graph: dagga::Dag::default(),
695 remotes: Default::default(),
696 provider,
697 }
698 }
699
700 fn read_file<T>(&self, id: &'static str) -> Result<(T, T::Output), Error>
701 where
702 T: Resource<Provider = P>,
703 {
704 Self::read_from_store(&self.path, id)
705 }
706
707 fn define_resource<T>(
708 &mut self,
709 id: &'static str,
710 local_definition: T,
711 action: Action,
712 stored_definition: Option<T>,
713 output: Option<T::Output>,
714 ) -> Result<StoreResource<T, T::Output>, Error>
715 where
716 T: Resource<Provider = P>,
717 {
718 let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, action)?;
719 remote_var.set(output);
720
721 let remote_var = remote_var.clone();
722 let local_definition_code = local_definition.clone();
723 let local_definition_store = stored_definition.clone();
724 let store_path = self.path.clone();
725 let run: StoreNodeRunFn<T::Provider> = Box::new({
726 let remote_var = remote_var.clone();
727 let local_definition_code = local_definition_code.clone();
728 let local_definition_store = local_definition_store.clone();
729 move |provider: &T::Provider| {
730 Box::pin(
731 RunAction {
732 provider,
733 store_path,
734 resource_id: id,
735 action,
736 local_definition_code,
737 local_definition_store,
738 remote_var,
739 }
740 .run(),
741 )
742 }
743 });
744 let ty = std::any::type_name::<T>();
745
746 {
747 log::debug!("adding main node {action} {id}");
749 let node_name = format!("{action} {id}");
750 let dag_node = dagga::Node::new(StoreNode {
751 name: node_name.clone(),
752 _remote_ty: ty,
753 run,
754 })
755 .with_name(node_name)
756 .with_reads({
757 let mut reads = vec![];
759 for dep in local_definition.dependencies() {
760 let var = self
761 .remotes
762 .get(&dep)
763 .context(MissingResourceSnafu { name: dep })?;
764 reads.push(var.key);
765 }
766 reads
767 });
768 let dag_node = match action {
769 Action::Create | Action::Read | Action::Load | Action::Update => {
770 log::debug!(" with result {rez}");
771 dag_node.with_result(rez)
772 }
773 Action::Destroy => {
774 log::debug!(" with move {rez}");
775 dag_node.with_move(rez)
776 }
777 };
778 self.graph.add_node(dag_node);
779 }
780
781 Ok(StoreResource {
782 name: id.to_owned(),
783 local_definition,
784 remote_var,
785 })
786 }
787
788 pub fn resource<T>(
800 &mut self,
801 id: &'static str,
802 local_definition: T,
803 ) -> Result<StoreResource<T, T::Output>, Error>
804 where
805 T: Resource<Provider = P>,
806 {
807 let (action, stored_definition, output) =
808 if let Ok((stored_definition, output)) = self.read_file(id) {
809 log::debug!(" {output:?}");
812 let action = if local_definition != stored_definition {
813 log::debug!(" local resource has changed, so this remote is now stale");
814 Action::Update
815 } else {
816 Action::Load
817 };
818
819 (action, Some(stored_definition), Some(output))
820 } else {
821 log::debug!("creating an empty '{id}'");
822 (Action::Create, None, None)
823 };
824 self.define_resource(id, local_definition, action, stored_definition, output)
825 }
826
827 pub fn import<T>(
837 &mut self,
838 id: &'static str,
839 local_definition: T,
840 ) -> Result<StoreResource<T, T::Output>, Error>
841 where
842 T: Resource<Provider = P>,
843 {
844 self.define_resource(id, local_definition, Action::Read, None, None)
845 }
846
847 pub fn load<T>(
858 &mut self,
859 id: &'static str,
860 local_definition: T,
861 remote_definition: T::Output,
862 force_overwrite: bool,
863 ) -> Result<StoreResource<T, T::Output>, Error>
864 where
865 T: Resource<Provider = P>,
866 {
867 if let Ok((stored_definition, output)) = self.read_file(id) {
868 if local_definition == stored_definition && remote_definition == output {
869 if force_overwrite {
870 log::warn!("loading '{id}' is clobbering an existing value, but `force_overwrite` is `true`");
871 } else {
872 let err = ClobberSnafu { id }.build();
873 log::error!("{err}");
874 return Err(err);
875 }
876 }
877 }
878 self.define_resource(
879 id,
880 local_definition,
881 Action::Load,
882 None,
883 Some(remote_definition),
884 )
885 }
886
887 pub fn destroy<T>(&mut self, id: &'static str) -> Result<DestroyResource<T>, Error>
889 where
890 T: Resource<Provider = P>,
891 {
892 let (local, remote) = self.read_file::<T>(id)?;
893 let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, Action::Destroy)?;
894 remote_var.set(Some(remote.clone()));
895 {
896 log::debug!("adding node {} {id}", Action::Load);
898 let node_name = format!("load {id}");
899 let load_node = dagga::Node::new(StoreNode {
900 name: node_name.clone(),
901 _remote_ty: std::any::type_name::<T>(),
902 run: Box::new({
903 let store_path = self.path.clone();
904 let local = local.clone();
905 let remote_var = remote_var.clone();
906 move |provider| {
907 Box::pin(
908 RunAction {
909 provider,
910 store_path,
911 resource_id: id,
912 action: Action::Load,
913 local_definition_code: local,
914 remote_var,
915 local_definition_store: None,
916 }
917 .run(),
918 )
919 }
920 }),
921 })
922 .with_name(node_name)
923 .with_reads({
924 let mut reads = vec![];
925 for dep in local.dependencies() {
926 reads.push(
927 self.remotes
928 .get(&dep)
929 .context(MissingResourceSnafu {
930 name: id.to_owned(),
931 })?
932 .key,
933 );
934 }
935 reads
936 })
937 .with_result(rez);
938 self.graph.add_node(load_node);
939 }
940 {
941 log::debug!("adding node {} {id}", Action::Destroy);
942 let node_name = format!("destroy {id}");
943 let destroy_node = StoreNode {
945 name: node_name.clone(),
946 _remote_ty: std::any::type_name::<T>(),
947 run: Box::new({
948 let local = local.clone();
949 let store_path = self.path.clone();
950 let remote_var = remote_var.clone();
951 move |provider| {
952 Box::pin(
953 RunAction {
954 provider,
955 store_path,
956 resource_id: id,
957 action: Action::Destroy,
958 local_definition_code: local,
959 local_definition_store: None,
960 remote_var,
961 }
962 .run(),
963 )
964 }
965 }),
966 };
967
968 self.graph.add_node(
969 dagga::Node::new(destroy_node)
970 .with_name(node_name)
971 .with_move(rez),
972 );
973 }
974
975 Ok(DestroyResource { local, remote })
976 }
977
978 fn get_graph_legend(&self) -> Result<DagLegend<usize>> {
979 let mut missing_resource_creation = None;
980 let legend = self.graph.legend()?.with_resources_named(|rez| {
981 let maybe_name = self.remotes.get_name_by_rez(*rez);
982 if maybe_name.is_none() {
983 missing_resource_creation = Some(*rez);
984 }
985 maybe_name
986 });
987 if let Some(missing) = missing_resource_creation {
988 log::error!(
989 "Missing resource {missing}, current resources:\n{}",
990 self.remotes
991 );
992 return MissingNameSnafu { missing }.fail();
993 }
994 Ok(legend)
995 }
996
997 pub fn get_schedule_string(&self) -> Result<String, Error> {
998 let mut dag: dagga::Dag<(), usize> = dagga::Dag::default();
999 for node in self.graph.nodes() {
1000 let store_node = node.inner();
1001 let print_node = dagga::Node::new(())
1002 .with_name(store_node.name.clone())
1003 .with_reads(node.get_reads().copied())
1004 .with_results(node.get_results().copied())
1005 .with_moves(node.get_moves().copied());
1006 dag.add_node(print_node);
1007 }
1008 struct Proxy {
1009 inner: Schedule<Node<(), usize>>,
1010 }
1011
1012 impl core::fmt::Display for Proxy {
1013 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1014 if self.inner.batches.is_empty() {
1015 f.write_str("--- No changes.\n")?;
1016 f.write_str("--- 🌈🦄\n")?;
1017 }
1018 for (i, batch) in self.inner.batches.iter().enumerate() {
1019 let i = i + 1;
1020 f.write_str("--- step ")?;
1021 f.write_fmt(format_args!("{i}\n"))?;
1022 for node in batch.iter() {
1023 f.write_str(" ")?;
1024 f.write_str(node.name())?;
1025 f.write_str("\n")?;
1026 }
1027 f.write_str("---\n")?;
1028 }
1029 Ok(())
1030 }
1031 }
1032
1033 let proxy = Proxy {
1034 inner: dag.build_schedule().unwrap(),
1035 };
1036 Ok(proxy.to_string())
1037 }
1038
1039 pub fn save_apply_graph(&self, path: impl AsRef<std::path::Path>) -> Result<(), Error> {
1040 if self.graph.is_empty() {
1041 log::warn!("Resource DAG is empty, writing an empty dot file");
1042 }
1043 let legend = self.get_graph_legend()?;
1044 dagga::dot::save_as_dot(&legend, path).context(DotSnafu)?;
1045
1046 Ok(())
1047 }
1048
1049 pub async fn apply(&mut self) -> Result<()> {
1050 let graph = std::mem::take(&mut self.graph);
1051 let schedule = graph
1052 .build_schedule()
1053 .map_err(|e| Error::Schedule { msg: e.to_string() })?;
1054 for (i, batch) in schedule.batches.into_iter().enumerate() {
1055 for (j, node) in batch.into_iter().enumerate() {
1056 log::debug!("applying node {j}, batch {i}");
1057 let store_node = node.into_inner();
1058 (store_node.run)(&self.provider).await?;
1059 }
1060 }
1061 Ok(())
1062 }
1063}