tele/
lib.rs

1//! # Teleform
2//!
3//! Teleform is a library designed to facilitate Infrastructure as Code (IaC)
4//! using Rust. It provides a flexible and powerful alternative to tools like
5//! Terraform and Pulumi by allowing developers to describe infrastructure
6//! changes as a Directed Acyclic Graph (DAG). Unlike other solutions, Teleform
7//! does not provide wrappers over platform-specific resources, eschewing them
8//! in favor of direct interaction with platform APIs. This removes a layer of
9//! indirection and keeps your infrastructure domain specific.
10//!
11//! ## Key Features
12//!
13//! - **Resource Management**: Define and manage resources directly through Rust
14//!   code, allowing for seamless integration with other libraries.
15//! - **Dependency Tracking**: Automatically track dependencies between
16//!   resources to ensure correct order of operations.
17//! - **Migration Support**: Easily migrate resources and manage changes over
18//!   time.
19//!
20//! ## Usage
21//!
22//! Teleform is typically used by developers to write custom IaC command line
23//! programs executed at a developer workstation.
24//!
25//! These programs are meant to be fluid, changing as often as the
26//! infrastructure, with changes committed and tracked with version control.
27//!
28//! ### Concepts
29//!
30//! Teleform operates on the concept of local and remote states of resources:
31//!
32//! - **Local State**: This is the desired state of the resource as defined in
33//!   your Rust code. It represents the initial configuration of a platform
34//!   resource.
35//! - **Remote State**: This is the state of the resource as it exists on the
36//!   platform (e.g., AWS, Digital Ocean). It reflects the configuration
37//!   and status of the resource.
38//!
39//! Teleform uses these states to determine the necessary actions to apply.
40//! This involves creating, updating, or deleting resources as needed.
41//!
42//! An example usage can be found in `crates/teleform/src/test.rs`,
43//! demonstrating how to define and manage resources using the library's
44//! primitives.
45//!
46//! ## Target Audience
47//!
48//! This library is intended for developers, particularly those in solo or small
49//! team environments, who are looking for a more general and flexible solution
50//! to IaC. It is also suitable for those seeking to migrate away from Terraform.
51//!
52//! ## Error Handling
53//!
54//! Teleform exposes a comprehensive error enum [`Error`], which encompasses all
55//! possible errors that may occur during operations. Functions that can result
56//! in errors return a `Result` type with this [`Error`], ensuring robust error
57//! handling throughout the library.
58
59use std::{future::Future, ops::Deref, pin::Pin};
60
61use dagga::{dot::DagLegend, Node, Schedule};
62use snafu::prelude::*;
63use tokio::io::AsyncWriteExt;
64
65pub use teleform_derive::HasDependencies;
66
67mod has_dependencies_impl;
68pub mod remote;
69#[cfg(test)]
70mod test;
71pub mod utils;
72
73use remote::{Migrated, Remote, RemoteVar, Remotes};
74
75/// Marker trait for userland errors.
76pub trait UserError: core::fmt::Display + core::fmt::Debug + 'static {}
77impl<T: core::fmt::Display + core::fmt::Debug + 'static> UserError for T {}
78
79/// Top-level error enum that encompasses all errors.
80#[derive(snafu::Snafu, Debug)]
81pub enum Error {
82    #[snafu(display("{source}:\n{}",
83                source.chain()
84                    .map(|e| format!("{e}"))
85                    .collect::<Vec<_>>()
86                    .join("\n -> ")))]
87    Tele { source: anyhow::Error },
88
89    #[snafu(display("Could not read store file '{path:?}': {source}"))]
90    StoreFileRead {
91        path: std::path::PathBuf,
92        source: std::io::Error,
93    },
94
95    #[snafu(display("Could not delete store file '{path:?}': {source}"))]
96    StoreFileDelete {
97        path: std::path::PathBuf,
98        source: std::io::Error,
99    },
100
101    #[snafu(display("Could not serialize stored '{name}': {source}"))]
102    Serialize {
103        name: String,
104        source: serde_json::Error,
105    },
106
107    #[snafu(display("Could not deserialize stored '{name}': {source}"))]
108    Deserialize {
109        name: String,
110        source: serde_json::Error,
111    },
112
113    #[snafu(display("Could not build schedule: {msg}"))]
114    Schedule { msg: String },
115
116    #[snafu(display("Could not create file {path:?}: {source}"))]
117    CreateFile {
118        path: std::path::PathBuf,
119        source: std::io::Error,
120    },
121
122    #[snafu(display("Could not write file {path:?}: {source}"))]
123    WriteFile {
124        path: std::path::PathBuf,
125        source: std::io::Error,
126    },
127
128    #[snafu(display("Remote value of {ty:?} is unresolved. Depends on {depends_on}"))]
129    RemoteUnresolved {
130        ty: &'static str,
131        depends_on: String,
132    },
133
134    #[snafu(display("Could not save the apply graph: {source}"))]
135    Dot { source: dagga::dot::DotError },
136
137    #[snafu(display(
138        "Could not build apply graph because of a missing resource name for '{missing}'"
139    ))]
140    MissingName { missing: usize },
141
142    #[snafu(display("Could not find a resource by the name '{name}'"))]
143    MissingResource { name: String },
144
145    #[snafu(display("Error during '{name}' creation: {error}"))]
146    Create {
147        name: String,
148        error: Box<dyn UserError>,
149    },
150
151    #[snafu(display("Error during '{name}' read and import: {error}"))]
152    Import {
153        name: String,
154        error: Box<dyn UserError>,
155    },
156
157    #[snafu(display("Error during '{name}' update: {error}"))]
158    Update {
159        name: String,
160        error: Box<dyn UserError>,
161    },
162
163    #[snafu(display("Error during '{name}' destruction: {error}"))]
164    Destroy {
165        name: String,
166        error: Box<dyn UserError>,
167    },
168
169    #[snafu(display("Error during execution of a manual step '{name}': {error}"))]
170    Manual {
171        name: String,
172        error: Box<dyn UserError>,
173    },
174
175    #[snafu(display("Missing previous remote value '{name}'"))]
176    Load { name: String },
177
178    #[snafu(display(
179        "Loading '{id}' would clobber an existing value in the store file, \
180        and these values are not the same"
181    ))]
182    Clobber { id: String },
183
184    #[snafu(display("Could not downcast"))]
185    Downcast,
186
187    #[snafu(display("Missing store file for '{id}'"))]
188    MissingStoreFile { id: String },
189}
190
191impl From<anyhow::Error> for Error {
192    fn from(source: anyhow::Error) -> Self {
193        Error::Tele { source }
194    }
195}
196
197impl From<dagga::dot::DotError> for Error {
198    fn from(source: dagga::dot::DotError) -> Self {
199        Self::Dot { source }
200    }
201}
202
203type Result<T, E = Error> = core::result::Result<T, E>;
204
205/// IaC resources.
206///
207/// Represents a resource created on a platform (ie AWS, Digital Ocean, etc).
208#[allow(unreachable_code)]
209pub trait Resource:
210    core::fmt::Debug
211    + Clone
212    + PartialEq
213    + HasDependencies
214    + serde::Serialize
215    + serde::de::DeserializeOwned
216    + 'static
217{
218    /// Type of the platform/resource provider.
219    ///
220    /// For example `aws_config::SdkConfig` in the case of amazon web services.
221    type Provider;
222
223    /// Errors that may occur interacting with the provider.
224    type Error: UserError;
225
226    /// The remote type of this resource, which we can used to fill in
227    /// [`Remote`] values in other resources.
228    type Output: core::fmt::Debug
229        + Clone
230        + PartialEq
231        + serde::Serialize
232        + serde::de::DeserializeOwned
233        + 'static;
234
235    /// Creates a new resource on the platform.
236    ///
237    /// This method should be implemented to define how a resource is created
238    /// using the provider's API. It returns a future that resolves to the
239    /// resource's output type or an error.
240    ///
241    /// ## Note
242    /// This method is explicitly `unimplemented!` for developer convenience.
243    /// It allows you to define only the methods you need. However, take care when
244    /// using this in contexts like long-running daemons, as calling an unimplemented
245    /// method will cause a panic.
246    fn create(
247        &self,
248        _provider: &Self::Provider,
249    ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
250        unimplemented!(
251            "Resource::create is unimplemented for {}",
252            std::any::type_name::<Self>()
253        ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
254    }
255
256    /// Reads the current state of the resource from the platform.
257    ///
258    /// This method should be implemented to define how to fetch the current
259    /// state of a resource using the provider's API. It returns a future that
260    /// resolves to the resource's output type or an error.
261    ///
262    /// ## Note
263    /// This method is explicitly `unimplemented!` for developer convenience.
264    /// It allows you to define only the methods you need. However, take care when
265    /// using this in contexts like long-running daemons, as calling an unimplemented
266    /// method will cause a panic.
267    fn read(
268        &self,
269        _provider: &Self::Provider,
270    ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
271        unimplemented!(
272            "Resource::read is unimplemented for {}",
273            std::any::type_name::<Self>()
274        ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
275    }
276
277    /// Updates an existing resource on the platform.
278    ///
279    /// This method should be implemented to define how a resource is updated
280    /// using the provider's API. It takes the previous local and remote states
281    /// of the resource and returns a future that resolves to the updated
282    /// resource's output type or an error.
283    ///
284    /// ## Note
285    /// This method is explicitly `unimplemented!` for developer convenience.
286    /// It allows you to define only the methods you need. However, take care when
287    /// using this in contexts like long-running daemons, as calling an unimplemented
288    /// method will cause a panic.
289    fn update(
290        &self,
291        _provider: &Self::Provider,
292        _previous_local: &Self,
293        _previous_remote: &Self::Output,
294    ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
295        unimplemented!(
296            "Resource::update is unimplemented for {}",
297            std::any::type_name::<Self>()
298        ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
299    }
300
301    /// Deletes a resource from the platform.
302    ///
303    /// This method should be implemented to define how a resource is deleted
304    /// using the provider's API. It takes the previous remote state of the
305    /// resource and returns a future that resolves to a unit type or an error.
306    ///
307    /// ## Note
308    /// This method is explicitly `unimplemented!` for developer convenience.
309    /// It allows you to define only the methods you need. However, take care when
310    /// using this in contexts like long-running daemons, as calling an unimplemented
311    /// method will cause a panic.
312    fn delete(
313        &self,
314        _provider: &Self::Provider,
315        _previous_remote: &Self::Output,
316    ) -> impl Future<Output = Result<(), Self::Error>> {
317        unimplemented!(
318            "Resource::delete is unimplemented for {}",
319            std::any::type_name::<Self>()
320        ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
321    }
322}
323
324#[derive(Clone, Default, Debug)]
325pub struct Dependencies {
326    /// Specifies a dependency on a `Resource`.
327    inner: Vec<String>,
328}
329
330impl IntoIterator for Dependencies {
331    type Item = String;
332
333    type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
334
335    fn into_iter(self) -> Self::IntoIter {
336        self.inner.into_iter()
337    }
338}
339
340impl core::fmt::Display for Dependencies {
341    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342        f.write_str(
343            &self
344                .inner
345                .iter()
346                .map(|u| u.to_string())
347                .collect::<Vec<_>>()
348                .join(", "),
349        )
350    }
351}
352
353impl Dependencies {
354    pub fn merge(self, other: Self) -> Self {
355        Dependencies {
356            inner: [self.inner, other.inner].concat(),
357        }
358    }
359}
360
361/// Tracks dependencies between resources.
362///
363/// This trait can be derived, and has a default implementation that
364/// reports zero dependencies.
365pub trait HasDependencies {
366    fn dependencies(&self) -> Dependencies {
367        Dependencies::default()
368    }
369}
370
371/// `Create`, `Load` and `Update` result in a resource being added to the graph.
372///
373/// `Destroy` moves the resource out of the graph.
374#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
375pub enum Action {
376    Load,
377    Create,
378    Read,
379    Update,
380    Destroy,
381}
382
383impl core::fmt::Display for Action {
384    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
385        f.write_str(match self {
386            Action::Load => "load",
387            Action::Create => "create",
388            Action::Read => "read",
389            Action::Update => "update",
390            Action::Destroy => "destroy",
391        })
392    }
393}
394
395#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
396struct InertStoreResource {
397    name: String,
398    local: serde_json::Value,
399    remote: serde_json::Value,
400}
401
402impl InertStoreResource {
403    async fn save(
404        &self,
405        resource_id: &str,
406        store_path: impl AsRef<std::path::Path>,
407    ) -> Result<(), Error> {
408        let path = store_file_path(resource_id, &store_path);
409        log::info!("storing {resource_id} to {path:?}");
410
411        let contents = serde_json::to_string_pretty(self).context(SerializeSnafu {
412            name: format!("storing {}", resource_id),
413        })?;
414
415        // Ensure the parent directory exists
416        if let Some(parent) = path.parent() {
417            tokio::fs::create_dir_all(&parent)
418                .await
419                .context(CreateFileSnafu { path: parent })?;
420        }
421
422        let mut file = tokio::fs::File::create(&path)
423            .await
424            .context(CreateFileSnafu { path: path.clone() })?;
425        file.write_all(contents.as_bytes())
426            .await
427            .context(WriteFileSnafu { path: path.clone() })?;
428        Ok(())
429    }
430}
431
432#[derive(Clone, Debug)]
433pub struct StoreResource<L, R> {
434    /// Name of the resource from the user's perspective
435    name: String,
436    /// Local definition in _code_
437    local_definition: L,
438    action: Action,
439    remote_var: RemoteVar<R>,
440}
441
442impl<L, R> Deref for StoreResource<L, R> {
443    type Target = L;
444
445    fn deref(&self) -> &Self::Target {
446        &self.local_definition
447    }
448}
449
450impl<L, R> AsRef<L> for StoreResource<L, R> {
451    fn as_ref(&self) -> &L {
452        &self.local_definition
453    }
454}
455
456impl<L, R> TryFrom<StoreResource<L, R>> for InertStoreResource
457where
458    L: serde::Serialize + for<'a> serde::Deserialize<'a>,
459    R: Clone + serde::Serialize + for<'a> serde::Deserialize<'a>,
460{
461    type Error = Error;
462
463    fn try_from(value: StoreResource<L, R>) -> std::result::Result<Self, Self::Error> {
464        let local = serde_json::to_value(value.local_definition).context(SerializeSnafu {
465            name: value.name.clone(),
466        })?;
467        let output = value.remote_var.get().context(LoadSnafu {
468            name: value.name.clone(),
469        })?;
470        let remote = serde_json::to_value(output).context(SerializeSnafu {
471            name: value.name.clone(),
472        })?;
473        Ok(Self {
474            name: value.name,
475            local,
476            remote,
477        })
478    }
479}
480
481impl<T> StoreResource<T, T::Output>
482where
483    T: Resource,
484    T::Output: Clone,
485{
486    /// Map a remote value to use in local definitions.
487    pub fn remote<X: Clone + core::fmt::Debug + 'static>(
488        &self,
489        f: impl Fn(&T::Output) -> X + 'static,
490    ) -> Remote<X> {
491        Remote::new(self, f)
492    }
493
494    /// Return the action that would be applied to this resource.
495    ///
496    /// This is useful if you need to trigger invalidations or anything else based on
497    /// whether a resource is created, updated, deleted, etc.
498    pub fn action(&self) -> Action {
499        self.action
500    }
501}
502
503/// The path to an individual resource store file.
504fn store_file_path(name: &str, store_path: impl AsRef<std::path::Path>) -> std::path::PathBuf {
505    store_path.as_ref().join(format!("{name}.json"))
506}
507
508type StoreNodeRunFn<Provider> = Box<
509    dyn FnOnce(
510        // Resource platform provider
511        &'_ Provider,
512    ) -> Pin<Box<dyn Future<Output = Result<()>> + '_>>,
513>;
514
515struct RunAction<'a, Provider, T: Resource<Provider = Provider>> {
516    provider: &'a Provider,
517    store_path: std::path::PathBuf,
518    /// Name of the resource being acted on, not the node name.
519    resource_id: String,
520    action: Action,
521    local_definition_code: T,
522    local_definition_store: Option<T>,
523    remote_var: RemoteVar<T::Output>,
524}
525
526impl<Provider, T: Resource<Provider = Provider>> RunAction<'_, Provider, T> {
527    async fn run(self) -> Result<()>
528    where
529        T: Resource,
530    {
531        let Self {
532            provider,
533            store_path,
534            resource_id,
535            action,
536            local_definition_code,
537            local_definition_store,
538            remote_var,
539        } = self;
540        log::info!("{action} '{resource_id}':");
541
542        async fn save<T: Resource>(
543            resource_id: &str,
544            local_definition_code: T,
545            remote_var: &RemoteVar<T::Output>,
546            store_path: impl AsRef<std::path::Path>,
547        ) -> Result<(), Error> {
548            let inert_resource = InertStoreResource {
549                name: resource_id.to_owned(),
550                local: serde_json::to_value(local_definition_code).context(SerializeSnafu {
551                    name: format!("store {resource_id}"),
552                })?,
553                remote: serde_json::to_value(
554                    remote_var.get().context(LoadSnafu { name: resource_id })?,
555                )
556                .context(SerializeSnafu {
557                    name: format!("store {resource_id} remote"),
558                })?,
559            };
560            inert_resource.save(resource_id, store_path).await?;
561            Ok(())
562        }
563
564        match action {
565            Action::Load => {
566                save(&resource_id, local_definition_code, &remote_var, store_path).await?;
567            }
568            Action::Create => {
569                let value = local_definition_code
570                    .create(provider)
571                    .await
572                    .map_err(|error| Error::Create {
573                        name: resource_id.to_owned(),
574                        error: Box::new(error),
575                    })?;
576                remote_var.set(Some(value));
577                save(&resource_id, local_definition_code, &remote_var, store_path).await?;
578            }
579            Action::Read => {
580                let value = local_definition_code
581                    .read(provider)
582                    .await
583                    .map_err(|error| Error::Create {
584                        name: resource_id.to_owned(),
585                        error: Box::new(error),
586                    })?;
587                remote_var.set(Some(value));
588                save(&resource_id, local_definition_code, &remote_var, store_path).await?;
589            }
590            Action::Update => {
591                let previous_local = local_definition_store.unwrap();
592                let previous_remote = remote_var.get().context(LoadSnafu {
593                    name: resource_id.clone(),
594                })?;
595                if previous_local == local_definition_code {
596                    log::warn!(
597                        "Skipping '{resource_id}' update as the local value has not changed.\n\
598                        If you require an update, consider adding a sentinel value."
599                    );
600                } else {
601                    let cmp =
602                        pretty_assertions::Comparison::new(&previous_local, &local_definition_code);
603                    let change_string = format!("{cmp}")
604                        .lines()
605                        .map(|line| format!("  {line}"))
606                        .collect::<Vec<_>>()
607                        .join("\n");
608                    log::info!("updating '{resource_id}':\n{change_string}");
609                    let output = local_definition_code
610                        .update(provider, &previous_local, &previous_remote)
611                        .await
612                        .map_err(|error| Error::Update {
613                            name: resource_id.clone(),
614                            error: Box::new(error),
615                        })?;
616                    remote_var.set(Some(output));
617                    save(&resource_id, local_definition_code, &remote_var, store_path).await?;
618                }
619            }
620            Action::Destroy => {
621                log::debug!("running destroy action on {resource_id}");
622                // In the destroy case there is no code-local definition, but there is always
623                // a store definition, so we pass the store definition as the code definition.
624                // This is better IMO than having both code-local and store be optional.
625                let local_definition = local_definition_code.clone();
626                let previous_remote = remote_var.get().context(LoadSnafu {
627                    name: resource_id.clone(),
628                })?;
629                local_definition
630                    .delete(provider, &previous_remote)
631                    .await
632                    .map_err(|error| Error::Destroy {
633                        name: resource_id.to_owned(),
634                        error: Box::new(error),
635                    })?;
636
637                log::info!("  {resource_id} is destroyed");
638                let path = store_file_path(&resource_id, &store_path);
639                log::info!("  removing {resource_id} store file {path:?}");
640                tokio::fs::remove_file(&path)
641                    .await
642                    .context(StoreFileDeleteSnafu { path })?;
643                remote_var.set(None);
644            }
645        }
646
647        log::info!("  success!");
648        Ok(())
649    }
650}
651
652pub struct DestroyResource<T: Resource> {
653    local: T,
654    remote: T::Output,
655}
656
657impl<T: Resource> Deref for DestroyResource<T> {
658    type Target = T;
659
660    fn deref(&self) -> &Self::Target {
661        &self.local
662    }
663}
664
665impl<T: Resource> DestroyResource<T> {
666    /// Map a remote value of a resource scheduled to be destroyed into a
667    /// permanent field of another resource.
668    pub fn migrate<X: Clone + core::fmt::Debug + 'static>(
669        &self,
670        f: fn(&T::Output) -> X,
671    ) -> Migrated<X> {
672        Migrated(f(&self.remote))
673    }
674}
675
676struct StoreNode<Provider> {
677    name: String,
678    _remote_ty: &'static str,
679    run: StoreNodeRunFn<Provider>,
680}
681
682struct PreviouslyStored<T: Resource> {
683    action: Action,
684    resource: Option<(T, T::Output)>,
685}
686
687pub struct Store<T> {
688    path: std::path::PathBuf,
689    provider: T,
690    remotes: Remotes,
691    graph: dagga::Dag<StoreNode<T>, usize>,
692}
693
694impl<P: 'static> Store<P> {
695    fn read_from_store<T: Resource<Provider = P>>(
696        path: impl AsRef<std::path::Path>,
697        id: &str,
698    ) -> Result<(T, T::Output)> {
699        let path = store_file_path(id, path.as_ref());
700        snafu::ensure!(path.exists(), MissingStoreFileSnafu { id: id.to_owned() });
701
702        log::debug!("{path:?} exists, reading '{id}' from it");
703        let contents = std::fs::read_to_string(&path).context(StoreFileReadSnafu {
704            path: path.to_path_buf(),
705        })?;
706        log::trace!(
707            "contents:\n{}",
708            contents
709                .lines()
710                .map(|line| format!("  {line}"))
711                .collect::<Vec<_>>()
712                .join("\n")
713        );
714        let inert_store_rez: InertStoreResource =
715            serde_json::from_str(&contents).context(DeserializeSnafu {
716                name: id.to_owned(),
717            })?;
718        log::trace!("read inert store resource");
719        log::trace!(
720            "reading local contents: {}",
721            serde_json::to_string_pretty(&inert_store_rez.local)
722                .unwrap()
723                .lines()
724                .map(|line| format!("  {line}"))
725                .collect::<Vec<_>>()
726                .join("\n")
727        );
728        log::trace!("as {}", std::any::type_name::<T>());
729        let stored_definition: T =
730            serde_json::from_value(inert_store_rez.local).context(DeserializeSnafu {
731                name: id.to_owned(),
732            })?;
733
734        log::trace!("  reading remote output JSON value");
735        let remote_value: T::Output =
736            serde_json::from_value(inert_store_rez.remote).context(DeserializeSnafu {
737                name: format!("remote {id}"),
738            })?;
739        Ok((stored_definition, remote_value))
740    }
741
742    pub fn new(path: impl AsRef<std::path::Path>, provider: P) -> Self {
743        Self {
744            path: path.as_ref().to_path_buf(),
745            graph: dagga::Dag::default(),
746            remotes: Default::default(),
747            provider,
748        }
749    }
750
751    pub fn provider(&self) -> &P {
752        &self.provider
753    }
754
755    fn read_file<T>(&self, id: &str) -> Result<(T, T::Output), Error>
756    where
757        T: Resource<Provider = P>,
758    {
759        Self::read_from_store(&self.path, id)
760    }
761
762    fn define_resource<T>(
763        &mut self,
764        id: impl AsRef<str>,
765        local_definition: T,
766        action: Action,
767        stored_definition: Option<T>,
768        output: Option<T::Output>,
769    ) -> Result<StoreResource<T, T::Output>, Error>
770    where
771        T: Resource<Provider = P>,
772    {
773        let id = id.as_ref();
774        let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, action)?;
775        remote_var.set(output);
776
777        let remote_var = remote_var.clone();
778        let local_definition_code = local_definition.clone();
779        let local_definition_store = stored_definition.clone();
780        let store_path = self.path.clone();
781        let run: StoreNodeRunFn<T::Provider> = Box::new({
782            let resource_id = id.to_owned();
783            let remote_var = remote_var.clone();
784            let local_definition_code = local_definition_code.clone();
785            let local_definition_store = local_definition_store.clone();
786            move |provider: &T::Provider| {
787                Box::pin(
788                    RunAction {
789                        provider,
790                        store_path,
791                        resource_id,
792                        action,
793                        local_definition_code,
794                        local_definition_store,
795                        remote_var,
796                    }
797                    .run(),
798                )
799            }
800        });
801        let ty = std::any::type_name::<T>();
802
803        {
804            // Add the main action node
805            log::debug!("adding main node {action} {id}");
806            let node_name = format!("{action} {id}");
807            let dag_node = dagga::Node::new(StoreNode {
808                name: node_name.clone(),
809                _remote_ty: ty,
810                run,
811            })
812            .with_name(node_name)
813            .with_reads({
814                // read the resource keys out of "remotes" as dependencies
815                let mut reads = vec![];
816                for dep in local_definition.dependencies() {
817                    let var = self
818                        .remotes
819                        .get(&dep)
820                        .context(MissingResourceSnafu { name: dep })?;
821                    reads.push(var.key);
822                }
823                reads
824            });
825            let dag_node = match action {
826                Action::Create | Action::Read | Action::Load | Action::Update => {
827                    log::debug!("  with result {rez}");
828                    dag_node.with_result(rez)
829                }
830                Action::Destroy => {
831                    log::debug!("  with move {rez}");
832                    dag_node.with_move(rez)
833                }
834            };
835            self.graph.add_node(dag_node);
836        }
837
838        Ok(StoreResource {
839            name: id.to_owned(),
840            local_definition,
841            action,
842            remote_var,
843        })
844    }
845
846    /// Read the stored previous definition and determine the action.
847    fn determine_action_from_previously_stored<T>(
848        &self,
849        local_definition: &T,
850        id: &str,
851    ) -> Result<PreviouslyStored<T>, Error>
852    where
853        T: Resource<Provider = P>,
854    {
855        match self.read_file(id) {
856            Ok((stored_definition, output)) => {
857                // This has already been created and stored, so this is either a simple load,
858                // or an update.
859                log::debug!("  {output:?}");
860                let action = if *local_definition != stored_definition {
861                    log::debug!("  local resource has changed, so this remote is now stale");
862                    Action::Update
863                } else {
864                    // Check if any upstream dependencies are "stale" (updated or deleted),
865                    // which would cause this resource to possibly require an update.
866                    let mut may_need_update = false;
867                    for dep in local_definition.dependencies() {
868                        let var = self.remotes.get(&dep).context(LoadSnafu { name: dep })?;
869                        if var.action != Action::Load {
870                            may_need_update = true;
871                            break;
872                        }
873                    }
874                    if may_need_update {
875                        Action::Update
876                    } else {
877                        Action::Load
878                    }
879                };
880
881                Ok(PreviouslyStored {
882                    action,
883                    resource: Some((stored_definition, output)),
884                })
885            }
886            Err(Error::MissingStoreFile { id }) => {
887                log::debug!("store file '{id}' does not exist, creating a new resource",);
888                Ok(PreviouslyStored {
889                    action: Action::Create,
890                    resource: None,
891                })
892            }
893            Err(e) => {
894                log::error!("could not define resource '{id}': {e}");
895                Err(e)
896            }
897        }
898    }
899
900    /// Defines a resource.
901    ///
902    /// Produces two graph nodes:
903    /// 1. Depending on the result of compairing `local_definition` to the one on file
904    ///    (if it exists), either:
905    ///    - creates the resource on the platform
906    ///    - updates the resource on the platform
907    ///    - loads the resource from a file
908    /// 2. Stores the resource to a file
909    ///
910    /// To import an existing resource from a platform, use [`Store::import`].
911    pub fn resource<T>(
912        &mut self,
913        id: impl AsRef<str>,
914        local_definition: T,
915    ) -> Result<StoreResource<T, T::Output>, Error>
916    where
917        T: Resource<Provider = P>,
918    {
919        let id = id.as_ref();
920        let PreviouslyStored { action, resource } =
921            self.determine_action_from_previously_stored(&local_definition, id)?;
922        let (local, remote) = resource
923            .map(|(local, remote)| (Some(local), Some(remote)))
924            .unwrap_or_default();
925        self.define_resource(id, local_definition, action, local, remote)
926    }
927
928    /// Defines a pre-existing resource, importing it from the platform.
929    ///
930    /// Produces two graph nodes:
931    /// 1. Import the resource from the platform, resulting in the resource
932    /// 2. Store the value to a file
933    ///
934    /// This only needs to be used once in your infrastructure command.
935    /// After the resource is imported and stored to a file it is recommended
936    /// you make a code change to use [`Store::resource`].
937    pub fn import<T>(
938        &mut self,
939        id: impl AsRef<str>,
940        local_definition: T,
941    ) -> Result<StoreResource<T, T::Output>, Error>
942    where
943        T: Resource<Provider = P>,
944    {
945        self.define_resource(id, local_definition, Action::Read, None, None)
946    }
947
948    /// Defines a pre-existing resource, directly writing it to file, without
949    /// querying the platform.
950    ///
951    /// Produces two graph nodes:
952    /// 1. Load the value (noop)
953    /// 2. Store the value
954    ///
955    /// ## Errors
956    /// Errs if `force_overwrite` is `false` _and_ a stored resource already
957    /// exists. This is done to prevent accidental clobbering.
958    pub fn load<T>(
959        &mut self,
960        id: impl AsRef<str>,
961        local_definition: T,
962        remote_definition: T::Output,
963        force_overwrite: bool,
964    ) -> Result<StoreResource<T, T::Output>, Error>
965    where
966        T: Resource<Provider = P>,
967    {
968        let id = id.as_ref();
969        if let Ok((stored_definition, output)) = self.read_file(id) {
970            if local_definition == stored_definition && remote_definition == output {
971                if force_overwrite {
972                    log::warn!("loading '{id}' is clobbering an existing value, but `force_overwrite` is `true`");
973                } else {
974                    let err = ClobberSnafu { id: id.to_owned() }.build();
975                    log::error!("{err}");
976                    return Err(err);
977                }
978            }
979        }
980        self.define_resource(
981            id,
982            local_definition,
983            Action::Load,
984            None,
985            Some(remote_definition),
986        )
987    }
988
989    /// Destroys a resource.
990    pub fn destroy<T>(&mut self, id: impl AsRef<str>) -> Result<DestroyResource<T>, Error>
991    where
992        T: Resource<Provider = P>,
993    {
994        let id = id.as_ref();
995        let (local, remote) = self.read_file::<T>(id)?;
996        let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, Action::Destroy)?;
997        remote_var.set(Some(remote.clone()));
998        {
999            // Destruction requires a load to introduce the resource (for the DAG)
1000            log::debug!("adding node {} {id}", Action::Load);
1001            let node_name = format!("load {id}");
1002            let load_node = dagga::Node::new(StoreNode {
1003                name: node_name.clone(),
1004                _remote_ty: std::any::type_name::<T>(),
1005                run: Box::new({
1006                    let resource_id = id.to_owned();
1007                    let store_path = self.path.clone();
1008                    let local = local.clone();
1009                    let remote_var = remote_var.clone();
1010                    move |provider| {
1011                        Box::pin(
1012                            RunAction {
1013                                provider,
1014                                store_path,
1015                                resource_id,
1016                                action: Action::Load,
1017                                local_definition_code: local,
1018                                remote_var,
1019                                local_definition_store: None,
1020                            }
1021                            .run(),
1022                        )
1023                    }
1024                }),
1025            })
1026            .with_name(node_name)
1027            .with_reads({
1028                let mut reads = vec![];
1029                for dep in local.dependencies() {
1030                    reads.push(
1031                        self.remotes
1032                            .get(&dep)
1033                            .context(MissingResourceSnafu {
1034                                name: id.to_owned(),
1035                            })?
1036                            .key,
1037                    );
1038                }
1039                reads
1040            })
1041            .with_result(rez);
1042            self.graph.add_node(load_node);
1043        }
1044        {
1045            log::debug!("adding node {} {id}", Action::Destroy);
1046            let node_name = format!("destroy {id}");
1047            // Add the destroy node
1048            let destroy_node = StoreNode {
1049                name: node_name.clone(),
1050                _remote_ty: std::any::type_name::<T>(),
1051                run: Box::new({
1052                    let resource_id = id.to_owned();
1053                    let local = local.clone();
1054                    let store_path = self.path.clone();
1055                    let remote_var = remote_var.clone();
1056                    move |provider| {
1057                        Box::pin(
1058                            RunAction {
1059                                provider,
1060                                store_path,
1061                                resource_id,
1062                                action: Action::Destroy,
1063                                local_definition_code: local,
1064                                local_definition_store: None,
1065                                remote_var,
1066                            }
1067                            .run(),
1068                        )
1069                    }
1070                }),
1071            };
1072
1073            self.graph.add_node(
1074                dagga::Node::new(destroy_node)
1075                    .with_name(node_name)
1076                    .with_move(rez),
1077            );
1078        }
1079
1080        Ok(DestroyResource { local, remote })
1081    }
1082
1083    fn get_graph_legend(&self) -> Result<DagLegend<usize>> {
1084        let mut missing_resource_creation = None;
1085        let legend = self.graph.legend()?.with_resources_named(|rez| {
1086            let maybe_name = self.remotes.get_name_by_rez(*rez);
1087            if maybe_name.is_none() {
1088                missing_resource_creation = Some(*rez);
1089            }
1090            maybe_name
1091        });
1092        if let Some(missing) = missing_resource_creation {
1093            log::error!(
1094                "Missing resource {missing}, current resources:\n{}",
1095                self.remotes
1096            );
1097            return MissingNameSnafu { missing }.fail();
1098        }
1099        Ok(legend)
1100    }
1101
1102    pub fn get_schedule_string(&self) -> Result<String, Error> {
1103        let mut dag: dagga::Dag<(), usize> = dagga::Dag::default();
1104        for node in self.graph.nodes() {
1105            let store_node = node.inner();
1106            let print_node = dagga::Node::new(())
1107                .with_name(store_node.name.clone())
1108                .with_reads(node.get_reads().copied())
1109                .with_results(node.get_results().copied())
1110                .with_moves(node.get_moves().copied());
1111            dag.add_node(print_node);
1112        }
1113        struct Proxy {
1114            inner: Schedule<Node<(), usize>>,
1115        }
1116
1117        impl core::fmt::Display for Proxy {
1118            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1119                if self.inner.batches.is_empty() {
1120                    f.write_str("--- No changes.\n")?;
1121                    f.write_str("--- 🌈🦄\n")?;
1122                }
1123                for (i, batch) in self.inner.batches.iter().enumerate() {
1124                    let i = i + 1;
1125                    f.write_str("--- step ")?;
1126                    f.write_fmt(format_args!("{i}\n"))?;
1127                    for node in batch.iter() {
1128                        f.write_str("  ")?;
1129                        f.write_str(node.name())?;
1130                        f.write_str("\n")?;
1131                    }
1132                    f.write_str("---\n")?;
1133                }
1134                Ok(())
1135            }
1136        }
1137
1138        let proxy = Proxy {
1139            inner: dag.build_schedule().unwrap(),
1140        };
1141        Ok(proxy.to_string())
1142    }
1143
1144    pub fn save_apply_graph(&self, path: impl AsRef<std::path::Path>) -> Result<(), Error> {
1145        if self.graph.is_empty() {
1146            log::warn!("Resource DAG is empty, writing an empty dot file");
1147        }
1148        let legend = self.get_graph_legend()?;
1149        dagga::dot::save_as_dot(&legend, path).context(DotSnafu)?;
1150
1151        Ok(())
1152    }
1153
1154    pub async fn apply(&mut self) -> Result<()> {
1155        let graph = std::mem::take(&mut self.graph);
1156        let schedule = graph
1157            .build_schedule()
1158            .map_err(|e| Error::Schedule { msg: e.to_string() })?;
1159        for (i, batch) in schedule.batches.into_iter().enumerate() {
1160            for (j, node) in batch.into_iter().enumerate() {
1161                log::debug!("applying node {j}, batch {i}");
1162                let store_node = node.into_inner();
1163                (store_node.run)(&self.provider).await?;
1164            }
1165        }
1166        Ok(())
1167    }
1168}