tele/
lib.rs

1//! # Teleform
2//!
3//! Teleform is a library designed to facilitate Infrastructure as Code (IaC)
4//! using Rust. It provides a flexible and powerful alternative to tools like
5//! Terraform and Pulumi by allowing developers to describe infrastructure
6//! changes as a Directed Acyclic Graph (DAG). Unlike other solutions, Teleform
7//! does not provide wrappers over platform-specific resources, eschewing them
8//! in favor of direct interaction with platform APIs. This removes a layer of
9//! indirection and keeps your infrastructure domain specific.
10//!
11//! ## Key Features
12//!
13//! - **Resource Management**: Define and manage resources directly through Rust
14//!   code, allowing for seamless integration with other libraries.
15//! - **Dependency Tracking**: Automatically track dependencies between
16//!   resources to ensure correct order of operations.
17//! - **Migration Support**: Easily migrate resources and manage changes over
18//!   time.
19//!
20//! ## Usage
21//!
22//! Teleform is typically used by developers to write custom IaC command line
23//! programs executed at a developer workstation.
24//!
25//! These programs are meant to be fluid, changing as often as the
26//! infrastructure, with changes committed and tracked with version control.
27//!
28//! ### Concepts
29//!
30//! Teleform operates on the concept of local and remote states of resources:
31//!
32//! - **Local State**: This is the desired state of the resource as defined in
33//!   your Rust code. It represents the initial configuration of a platform
34//!   resource.
35//! - **Remote State**: This is the state of the resource as it exists on the
36//!   platform (e.g., AWS, Digital Ocean). It reflects the configuration
37//!   and status of the resource.
38//!
39//! Teleform uses these states to determine the necessary actions to apply.
40//! This involves creating, updating, or deleting resources as needed.
41//!
42//! An example usage can be found in `crates/teleform/src/test.rs`,
43//! demonstrating how to define and manage resources using the library's
44//! primitives.
45//!
46//! ## Target Audience
47//!
48//! This library is intended for developers, particularly those in solo or small
49//! team environments, who are looking for a more general and flexible solution
50//! to IaC. It is also suitable for those seeking to migrate away from Terraform.
51//!
52//! ## Error Handling
53//!
54//! Teleform exposes a comprehensive error enum [`Error`], which encompasses all
55//! possible errors that may occur during operations. Functions that can result
56//! in errors return a `Result` type with this [`Error`], ensuring robust error
57//! handling throughout the library.
58
59use std::{future::Future, ops::Deref, pin::Pin};
60
61use dagga::{dot::DagLegend, Node, Schedule};
62use snafu::prelude::*;
63use tokio::io::AsyncWriteExt;
64
65pub use teleform_derive::HasDependencies;
66
67mod has_dependencies_impl;
68pub mod remote;
69#[cfg(test)]
70mod test;
71pub mod utils;
72
73use remote::{Migrated, Remote, RemoteVar, Remotes};
74
75/// Marker trait for userland errors.
76pub trait UserError: core::fmt::Display + core::fmt::Debug + 'static {}
77impl<T: core::fmt::Display + core::fmt::Debug + 'static> UserError for T {}
78
79/// Top-level error enum that encompasses all errors.
80#[derive(snafu::Snafu, Debug)]
81pub enum Error {
82    #[snafu(display("{source}:\n{}",
83                source.chain()
84                    .map(|e| format!("{e}"))
85                    .collect::<Vec<_>>()
86                    .join("\n -> ")))]
87    Tele { source: anyhow::Error },
88
89    #[snafu(display("Could not read store file '{path:?}': {source}"))]
90    StoreFileRead {
91        path: std::path::PathBuf,
92        source: std::io::Error,
93    },
94
95    #[snafu(display("Could not delete store file '{path:?}': {source}"))]
96    StoreFileDelete {
97        path: std::path::PathBuf,
98        source: std::io::Error,
99    },
100
101    #[snafu(display("Could not serialize stored '{name}': {source}"))]
102    Serialize {
103        name: String,
104        source: serde_json::Error,
105    },
106
107    #[snafu(display("Could not deserialize stored '{name}': {source}"))]
108    Deserialize {
109        name: String,
110        source: serde_json::Error,
111    },
112
113    #[snafu(display("Could not build schedule: {msg}"))]
114    Schedule { msg: String },
115
116    #[snafu(display("Could not create file {path:?}: {source}"))]
117    CreateFile {
118        path: std::path::PathBuf,
119        source: std::io::Error,
120    },
121
122    #[snafu(display("Could not write file {path:?}: {source}"))]
123    WriteFile {
124        path: std::path::PathBuf,
125        source: std::io::Error,
126    },
127
128    #[snafu(display("Remote value of {ty:?} is unresolved. Depends on {depends_on}"))]
129    RemoteUnresolved {
130        ty: &'static str,
131        depends_on: String,
132    },
133
134    #[snafu(display("Could not save the apply graph: {source}"))]
135    Dot { source: dagga::dot::DotError },
136
137    #[snafu(display(
138        "Could not build apply graph because of a missing resource name for '{missing}'"
139    ))]
140    MissingName { missing: usize },
141
142    #[snafu(display("Could not find a resource by the name '{name}'"))]
143    MissingResource { name: String },
144
145    #[snafu(display("Error during '{name}' creation: {error}"))]
146    Create {
147        name: String,
148        error: Box<dyn UserError>,
149    },
150
151    #[snafu(display("Error during '{name}' read and import: {error}"))]
152    Import {
153        name: String,
154        error: Box<dyn UserError>,
155    },
156
157    #[snafu(display("Error during '{name}' update: {error}"))]
158    Update {
159        name: String,
160        error: Box<dyn UserError>,
161    },
162
163    #[snafu(display("Error during '{name}' destruction: {error}"))]
164    Destroy {
165        name: String,
166        error: Box<dyn UserError>,
167    },
168
169    #[snafu(display("Error during execution of a manual step '{name}': {error}"))]
170    Manual {
171        name: String,
172        error: Box<dyn UserError>,
173    },
174
175    #[snafu(display("Missing previous remote value '{name}'"))]
176    Load { name: String },
177
178    #[snafu(display(
179        "Loading '{id}' would clobber an existing value in the store file, \
180        and these values are not the same"
181    ))]
182    Clobber { id: String },
183
184    #[snafu(display("Could not downcast"))]
185    Downcast,
186
187    #[snafu(display("Missing store file for '{id}'"))]
188    MissingStoreFile { id: String },
189}
190
191impl From<anyhow::Error> for Error {
192    fn from(source: anyhow::Error) -> Self {
193        Error::Tele { source }
194    }
195}
196
197impl From<dagga::dot::DotError> for Error {
198    fn from(source: dagga::dot::DotError) -> Self {
199        Self::Dot { source }
200    }
201}
202
203type Result<T, E = Error> = core::result::Result<T, E>;
204
205/// IaC resources.
206///
207/// Represents a resource created on a platform (ie AWS, Digital Ocean, etc).
208#[allow(unreachable_code)]
209pub trait Resource:
210    Clone + PartialEq + HasDependencies + serde::Serialize + serde::de::DeserializeOwned + 'static
211{
212    /// Type of the platform/resource provider.
213    ///
214    /// For example `aws_config::SdkConfig` in the case of amazon web services.
215    type Provider;
216
217    /// Errors that may occur interacting with the provider.
218    type Error: UserError;
219
220    /// The remote type of this resource, which we can used to fill in
221    /// [`Remote`] values in other resources.
222    type Output: core::fmt::Debug
223        + Clone
224        + PartialEq
225        + serde::Serialize
226        + serde::de::DeserializeOwned
227        + 'static;
228
229    /// Creates a new resource on the platform.
230    ///
231    /// This method should be implemented to define how a resource is created
232    /// using the provider's API. It returns a future that resolves to the
233    /// resource's output type or an error.
234    ///
235    /// ## Note
236    /// This method is explicitly `unimplemented!` for developer convenience.
237    /// It allows you to define only the methods you need. However, take care when
238    /// using this in contexts like long-running daemons, as calling an unimplemented
239    /// method will cause a panic.
240    fn create(
241        &self,
242        _provider: &Self::Provider,
243    ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
244        unimplemented!(
245            "Resource::create is unimplemented for {}",
246            std::any::type_name::<Self>()
247        ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
248    }
249
250    /// Reads the current state of the resource from the platform.
251    ///
252    /// This method should be implemented to define how to fetch the current
253    /// state of a resource using the provider's API. It returns a future that
254    /// resolves to the resource's output type or an error.
255    ///
256    /// ## Note
257    /// This method is explicitly `unimplemented!` for developer convenience.
258    /// It allows you to define only the methods you need. However, take care when
259    /// using this in contexts like long-running daemons, as calling an unimplemented
260    /// method will cause a panic.
261    fn read(
262        &self,
263        _provider: &Self::Provider,
264    ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
265        unimplemented!(
266            "Resource::read is unimplemented for {}",
267            std::any::type_name::<Self>()
268        ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
269    }
270
271    /// Updates an existing resource on the platform.
272    ///
273    /// This method should be implemented to define how a resource is updated
274    /// using the provider's API. It takes the previous local and remote states
275    /// of the resource and returns a future that resolves to the updated
276    /// resource's output type or an error.
277    ///
278    /// ## Note
279    /// This method is explicitly `unimplemented!` for developer convenience.
280    /// It allows you to define only the methods you need. However, take care when
281    /// using this in contexts like long-running daemons, as calling an unimplemented
282    /// method will cause a panic.
283    fn update(
284        &self,
285        _provider: &Self::Provider,
286        _previous_local: &Self,
287        _previous_remote: &Self::Output,
288    ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
289        unimplemented!(
290            "Resource::update is unimplemented for {}",
291            std::any::type_name::<Self>()
292        ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
293    }
294
295    /// Deletes a resource from the platform.
296    ///
297    /// This method should be implemented to define how a resource is deleted
298    /// using the provider's API. It takes the previous remote state of the
299    /// resource and returns a future that resolves to a unit type or an error.
300    ///
301    /// ## Note
302    /// This method is explicitly `unimplemented!` for developer convenience.
303    /// It allows you to define only the methods you need. However, take care when
304    /// using this in contexts like long-running daemons, as calling an unimplemented
305    /// method will cause a panic.
306    fn delete(
307        &self,
308        _provider: &Self::Provider,
309        _previous_remote: &Self::Output,
310    ) -> impl Future<Output = Result<(), Self::Error>> {
311        unimplemented!(
312            "Resource::delete is unimplemented for {}",
313            std::any::type_name::<Self>()
314        ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
315    }
316}
317
318#[derive(Clone, Default, Debug)]
319pub struct Dependencies {
320    /// Specifies a dependency on a `Resource`.
321    inner: Vec<String>,
322}
323
324impl IntoIterator for Dependencies {
325    type Item = String;
326
327    type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
328
329    fn into_iter(self) -> Self::IntoIter {
330        self.inner.into_iter()
331    }
332}
333
334impl core::fmt::Display for Dependencies {
335    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
336        f.write_str(
337            &self
338                .inner
339                .iter()
340                .map(|u| u.to_string())
341                .collect::<Vec<_>>()
342                .join(", "),
343        )
344    }
345}
346
347impl Dependencies {
348    pub fn merge(self, other: Self) -> Self {
349        Dependencies {
350            inner: [self.inner, other.inner].concat(),
351        }
352    }
353}
354
355/// Tracks dependencies between resources.
356///
357/// This trait can be derived, and has a default implementation that
358/// reports zero dependencies.
359pub trait HasDependencies {
360    fn dependencies(&self) -> Dependencies {
361        Dependencies::default()
362    }
363}
364
365/// `Create`, `Load` and `Update` result in a resource being added to the graph.
366///
367/// `Destroy` moves the resource out of the graph.
368#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
369pub enum Action {
370    Load,
371    Create,
372    Read,
373    Update,
374    Destroy,
375}
376
377impl core::fmt::Display for Action {
378    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379        f.write_str(match self {
380            Action::Load => "load",
381            Action::Create => "create",
382            Action::Read => "read",
383            Action::Update => "update",
384            Action::Destroy => "destroy",
385        })
386    }
387}
388
389#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
390struct InertStoreResource {
391    name: String,
392    local: serde_json::Value,
393    remote: serde_json::Value,
394}
395
396impl InertStoreResource {
397    async fn save(
398        &self,
399        resource_id: &str,
400        store_path: impl AsRef<std::path::Path>,
401    ) -> Result<(), Error> {
402        let path = store_file_path(resource_id, &store_path);
403        log::info!("storing {resource_id} to {path:?}");
404
405        let contents = serde_json::to_string_pretty(self).context(SerializeSnafu {
406            name: format!("storing {}", resource_id),
407        })?;
408
409        // Ensure the parent directory exists
410        if let Some(parent) = path.parent() {
411            tokio::fs::create_dir_all(&parent)
412                .await
413                .context(CreateFileSnafu { path: parent })?;
414        }
415
416        let mut file = tokio::fs::File::create(&path)
417            .await
418            .context(CreateFileSnafu { path: path.clone() })?;
419        file.write_all(contents.as_bytes())
420            .await
421            .context(WriteFileSnafu { path: path.clone() })?;
422        Ok(())
423    }
424}
425
426#[derive(Clone, Debug)]
427pub struct StoreResource<L, R> {
428    /// Name of the resource from the user's perspective
429    name: String,
430    /// Local definition in _code_
431    local_definition: L,
432    action: Action,
433    remote_var: RemoteVar<R>,
434}
435
436impl<L, R> Deref for StoreResource<L, R> {
437    type Target = L;
438
439    fn deref(&self) -> &Self::Target {
440        &self.local_definition
441    }
442}
443
444impl<L, R> AsRef<L> for StoreResource<L, R> {
445    fn as_ref(&self) -> &L {
446        &self.local_definition
447    }
448}
449
450impl<L, R> TryFrom<StoreResource<L, R>> for InertStoreResource
451where
452    L: serde::Serialize + for<'a> serde::Deserialize<'a>,
453    R: Clone + serde::Serialize + for<'a> serde::Deserialize<'a>,
454{
455    type Error = Error;
456
457    fn try_from(value: StoreResource<L, R>) -> std::result::Result<Self, Self::Error> {
458        let local = serde_json::to_value(value.local_definition).context(SerializeSnafu {
459            name: value.name.clone(),
460        })?;
461        let output = value.remote_var.get().context(LoadSnafu {
462            name: value.name.clone(),
463        })?;
464        let remote = serde_json::to_value(output).context(SerializeSnafu {
465            name: value.name.clone(),
466        })?;
467        Ok(Self {
468            name: value.name,
469            local,
470            remote,
471        })
472    }
473}
474
475impl<T> StoreResource<T, T::Output>
476where
477    T: Resource,
478    T::Output: Clone,
479{
480    /// Map a remote value to use in local definitions.
481    pub fn remote<X: Clone + core::fmt::Debug + 'static>(
482        &self,
483        f: impl Fn(&T::Output) -> X + 'static,
484    ) -> Remote<X> {
485        Remote::new(self, f)
486    }
487
488    /// Return the action that would be applied to this resource.
489    ///
490    /// This is useful if you need to trigger invalidations or anything else based on
491    /// whether a resource is created, updated, deleted, etc.
492    pub fn action(&self) -> Action {
493        self.action
494    }
495}
496
497/// The path to an individual resource store file.
498fn store_file_path(name: &str, store_path: impl AsRef<std::path::Path>) -> std::path::PathBuf {
499    store_path.as_ref().join(format!("{name}.json"))
500}
501
502type StoreNodeRunFn<Provider> = Box<
503    dyn FnOnce(
504        // Resource platform provider
505        &'_ Provider,
506    ) -> Pin<Box<dyn Future<Output = Result<()>> + '_>>,
507>;
508
509struct RunAction<'a, Provider, T: Resource<Provider = Provider>> {
510    provider: &'a Provider,
511    store_path: std::path::PathBuf,
512    /// Name of the resource being acted on, not the node name.
513    resource_id: String,
514    action: Action,
515    local_definition_code: T,
516    local_definition_store: Option<T>,
517    remote_var: RemoteVar<T::Output>,
518}
519
520impl<Provider, T: Resource<Provider = Provider>> RunAction<'_, Provider, T> {
521    async fn run(self) -> Result<()>
522    where
523        T: Resource,
524    {
525        let Self {
526            provider,
527            store_path,
528            resource_id,
529            action,
530            local_definition_code,
531            local_definition_store,
532            remote_var,
533        } = self;
534        log::info!("{action} '{resource_id}':");
535
536        async fn save<T: Resource>(
537            resource_id: &str,
538            local_definition_code: T,
539            remote_var: &RemoteVar<T::Output>,
540            store_path: impl AsRef<std::path::Path>,
541        ) -> Result<(), Error> {
542            let inert_resource = InertStoreResource {
543                name: resource_id.to_owned(),
544                local: serde_json::to_value(local_definition_code).context(SerializeSnafu {
545                    name: format!("store {resource_id}"),
546                })?,
547                remote: serde_json::to_value(
548                    remote_var.get().context(LoadSnafu { name: resource_id })?,
549                )
550                .context(SerializeSnafu {
551                    name: format!("store {resource_id} remote"),
552                })?,
553            };
554            inert_resource.save(resource_id, store_path).await?;
555            Ok(())
556        }
557
558        match action {
559            Action::Load => {
560                save(&resource_id, local_definition_code, &remote_var, store_path).await?;
561            }
562            Action::Create => {
563                let value = local_definition_code
564                    .create(provider)
565                    .await
566                    .map_err(|error| Error::Create {
567                        name: resource_id.to_owned(),
568                        error: Box::new(error),
569                    })?;
570                remote_var.set(Some(value));
571                save(&resource_id, local_definition_code, &remote_var, store_path).await?;
572            }
573            Action::Read => {
574                let value = local_definition_code
575                    .read(provider)
576                    .await
577                    .map_err(|error| Error::Create {
578                        name: resource_id.to_owned(),
579                        error: Box::new(error),
580                    })?;
581                remote_var.set(Some(value));
582                save(&resource_id, local_definition_code, &remote_var, store_path).await?;
583            }
584            Action::Update => {
585                let previous_local = local_definition_store.unwrap();
586                let previous_remote = remote_var.get().context(LoadSnafu {
587                    name: resource_id.clone(),
588                })?;
589                let output = local_definition_code
590                    .update(provider, &previous_local, &previous_remote)
591                    .await
592                    .map_err(|error| Error::Update {
593                        name: resource_id.clone(),
594                        error: Box::new(error),
595                    })?;
596                remote_var.set(Some(output));
597                save(&resource_id, local_definition_code, &remote_var, store_path).await?;
598            }
599            Action::Destroy => {
600                log::debug!("running destroy action on {resource_id}");
601                // In the destroy case there is no code-local definition, but there is always
602                // a store definition, so we pass the store definition as the code definition.
603                // This is better IMO than having both code-local and store be optional.
604                let local_definition = local_definition_code.clone();
605                let previous_remote = remote_var.get().context(LoadSnafu {
606                    name: resource_id.clone(),
607                })?;
608                local_definition
609                    .delete(provider, &previous_remote)
610                    .await
611                    .map_err(|error| Error::Destroy {
612                        name: resource_id.to_owned(),
613                        error: Box::new(error),
614                    })?;
615
616                log::info!("  {resource_id} is destroyed");
617                let path = store_file_path(&resource_id, &store_path);
618                log::info!("  removing {resource_id} store file {path:?}");
619                tokio::fs::remove_file(&path)
620                    .await
621                    .context(StoreFileDeleteSnafu { path })?;
622                remote_var.set(None);
623            }
624        }
625
626        log::info!("  success!");
627        Ok(())
628    }
629}
630
631pub struct DestroyResource<T: Resource> {
632    local: T,
633    remote: T::Output,
634}
635
636impl<T: Resource> Deref for DestroyResource<T> {
637    type Target = T;
638
639    fn deref(&self) -> &Self::Target {
640        &self.local
641    }
642}
643
644impl<T: Resource> DestroyResource<T> {
645    /// Map a remote value of a resource scheduled to be destroyed into a
646    /// permanent field of another resource.
647    pub fn migrate<X: Clone + core::fmt::Debug + 'static>(
648        &self,
649        f: fn(&T::Output) -> X,
650    ) -> Migrated<X> {
651        Migrated(f(&self.remote))
652    }
653}
654
655struct StoreNode<Provider> {
656    name: String,
657    _remote_ty: &'static str,
658    run: StoreNodeRunFn<Provider>,
659}
660
661struct PreviouslyStored<T: Resource> {
662    action: Action,
663    resource: Option<(T, T::Output)>,
664}
665
666pub struct Store<T> {
667    path: std::path::PathBuf,
668    provider: T,
669    remotes: Remotes,
670    graph: dagga::Dag<StoreNode<T>, usize>,
671}
672
673impl<P: 'static> Store<P> {
674    fn read_from_store<T: Resource<Provider = P>>(
675        path: impl AsRef<std::path::Path>,
676        id: &str,
677    ) -> Result<(T, T::Output)> {
678        let path = store_file_path(id, path.as_ref());
679        snafu::ensure!(path.exists(), MissingStoreFileSnafu { id: id.to_owned() });
680
681        log::debug!("{path:?} exists, reading '{id}' from it");
682        let contents = std::fs::read_to_string(&path).context(StoreFileReadSnafu {
683            path: path.to_path_buf(),
684        })?;
685        log::trace!(
686            "contents:\n{}",
687            contents
688                .lines()
689                .map(|line| format!("  {line}"))
690                .collect::<Vec<_>>()
691                .join("\n")
692        );
693        let inert_store_rez: InertStoreResource =
694            serde_json::from_str(&contents).context(DeserializeSnafu {
695                name: id.to_owned(),
696            })?;
697        log::trace!("read inert store resource");
698        log::trace!(
699            "reading local contents: {}",
700            serde_json::to_string_pretty(&inert_store_rez.local)
701                .unwrap()
702                .lines()
703                .map(|line| format!("  {line}"))
704                .collect::<Vec<_>>()
705                .join("\n")
706        );
707        log::trace!("as {}", std::any::type_name::<T>());
708        let stored_definition: T =
709            serde_json::from_value(inert_store_rez.local).context(DeserializeSnafu {
710                name: id.to_owned(),
711            })?;
712
713        log::trace!("  reading remote output JSON value");
714        let remote_value: T::Output =
715            serde_json::from_value(inert_store_rez.remote).context(DeserializeSnafu {
716                name: format!("remote {id}"),
717            })?;
718        Ok((stored_definition, remote_value))
719    }
720
721    pub fn new(path: impl AsRef<std::path::Path>, provider: P) -> Self {
722        Self {
723            path: path.as_ref().to_path_buf(),
724            graph: dagga::Dag::default(),
725            remotes: Default::default(),
726            provider,
727        }
728    }
729
730    pub fn provider(&self) -> &P {
731        &self.provider
732    }
733
734    fn read_file<T>(&self, id: &str) -> Result<(T, T::Output), Error>
735    where
736        T: Resource<Provider = P>,
737    {
738        Self::read_from_store(&self.path, id)
739    }
740
741    fn define_resource<T>(
742        &mut self,
743        id: impl AsRef<str>,
744        local_definition: T,
745        action: Action,
746        stored_definition: Option<T>,
747        output: Option<T::Output>,
748    ) -> Result<StoreResource<T, T::Output>, Error>
749    where
750        T: Resource<Provider = P>,
751    {
752        let id = id.as_ref();
753        let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, action)?;
754        remote_var.set(output);
755
756        let remote_var = remote_var.clone();
757        let local_definition_code = local_definition.clone();
758        let local_definition_store = stored_definition.clone();
759        let store_path = self.path.clone();
760        let run: StoreNodeRunFn<T::Provider> = Box::new({
761            let resource_id = id.to_owned();
762            let remote_var = remote_var.clone();
763            let local_definition_code = local_definition_code.clone();
764            let local_definition_store = local_definition_store.clone();
765            move |provider: &T::Provider| {
766                Box::pin(
767                    RunAction {
768                        provider,
769                        store_path,
770                        resource_id,
771                        action,
772                        local_definition_code,
773                        local_definition_store,
774                        remote_var,
775                    }
776                    .run(),
777                )
778            }
779        });
780        let ty = std::any::type_name::<T>();
781
782        {
783            // Add the main action node
784            log::debug!("adding main node {action} {id}");
785            let node_name = format!("{action} {id}");
786            let dag_node = dagga::Node::new(StoreNode {
787                name: node_name.clone(),
788                _remote_ty: ty,
789                run,
790            })
791            .with_name(node_name)
792            .with_reads({
793                // read the resource keys out of "remotes" as dependencies
794                let mut reads = vec![];
795                for dep in local_definition.dependencies() {
796                    let var = self
797                        .remotes
798                        .get(&dep)
799                        .context(MissingResourceSnafu { name: dep })?;
800                    reads.push(var.key);
801                }
802                reads
803            });
804            let dag_node = match action {
805                Action::Create | Action::Read | Action::Load | Action::Update => {
806                    log::debug!("  with result {rez}");
807                    dag_node.with_result(rez)
808                }
809                Action::Destroy => {
810                    log::debug!("  with move {rez}");
811                    dag_node.with_move(rez)
812                }
813            };
814            self.graph.add_node(dag_node);
815        }
816
817        Ok(StoreResource {
818            name: id.to_owned(),
819            local_definition,
820            action,
821            remote_var,
822        })
823    }
824
825    /// Read the stored previous definition and determine the action.
826    fn determine_action_from_previously_stored<T>(
827        &self,
828        local_definition: &T,
829        id: &str,
830    ) -> Result<PreviouslyStored<T>, Error>
831    where
832        T: Resource<Provider = P>,
833    {
834        match self.read_file(id) {
835            Ok((stored_definition, output)) => {
836                // This has already been created and stored, so this is either a simple load,
837                // or an update.
838                log::debug!("  {output:?}");
839                let action = if *local_definition != stored_definition {
840                    log::debug!("  local resource has changed, so this remote is now stale");
841                    Action::Update
842                } else {
843                    // Check if any upstream dependencies are "stale" (updated or deleted),
844                    // which would cause this resource to possibly require an update.
845                    let mut may_need_update = false;
846                    for dep in local_definition.dependencies() {
847                        let var = self.remotes.get(&dep).context(LoadSnafu { name: dep })?;
848                        if var.action != Action::Load {
849                            may_need_update = true;
850                            break;
851                        }
852                    }
853                    if may_need_update {
854                        Action::Update
855                    } else {
856                        Action::Load
857                    }
858                };
859
860                Ok(PreviouslyStored {
861                    action,
862                    resource: Some((stored_definition, output)),
863                })
864            }
865            Err(Error::MissingStoreFile { id }) => {
866                log::debug!("store file '{id}' does not exist, creating a new resource",);
867                Ok(PreviouslyStored {
868                    action: Action::Create,
869                    resource: None,
870                })
871            }
872            Err(e) => {
873                log::error!("could not define resource '{id}': {e}");
874                Err(e)
875            }
876        }
877    }
878
879    /// Defines a resource.
880    ///
881    /// Produces two graph nodes:
882    /// 1. Depending on the result of compairing `local_definition` to the one on file
883    ///    (if it exists), either:
884    ///    - creates the resource on the platform
885    ///    - updates the resource on the platform
886    ///    - loads the resource from a file
887    /// 2. Stores the resource to a file
888    ///
889    /// To import an existing resource from a platform, use [`Store::import`].
890    pub fn resource<T>(
891        &mut self,
892        id: impl AsRef<str>,
893        local_definition: T,
894    ) -> Result<StoreResource<T, T::Output>, Error>
895    where
896        T: Resource<Provider = P>,
897    {
898        let id = id.as_ref();
899        let PreviouslyStored { action, resource } =
900            self.determine_action_from_previously_stored(&local_definition, id)?;
901        let (local, remote) = resource
902            .map(|(local, remote)| (Some(local), Some(remote)))
903            .unwrap_or_default();
904        self.define_resource(id, local_definition, action, local, remote)
905    }
906
907    /// Defines a pre-existing resource, importing it from the platform.
908    ///
909    /// Produces two graph nodes:
910    /// 1. Import the resource from the platform, resulting in the resource
911    /// 2. Store the value to a file
912    ///
913    /// This only needs to be used once in your infrastructure command.
914    /// After the resource is imported and stored to a file it is recommended
915    /// you make a code change to use [`Store::resource`].
916    pub fn import<T>(
917        &mut self,
918        id: impl AsRef<str>,
919        local_definition: T,
920    ) -> Result<StoreResource<T, T::Output>, Error>
921    where
922        T: Resource<Provider = P>,
923    {
924        self.define_resource(id, local_definition, Action::Read, None, None)
925    }
926
927    /// Defines a pre-existing resource, directly writing it to file, without
928    /// querying the platform.
929    ///
930    /// Produces two graph nodes:
931    /// 1. Load the value (noop)
932    /// 2. Store the value
933    ///
934    /// ## Errors
935    /// Errs if `force_overwrite` is `false` _and_ a stored resource already
936    /// exists. This is done to prevent accidental clobbering.
937    pub fn load<T>(
938        &mut self,
939        id: impl AsRef<str>,
940        local_definition: T,
941        remote_definition: T::Output,
942        force_overwrite: bool,
943    ) -> Result<StoreResource<T, T::Output>, Error>
944    where
945        T: Resource<Provider = P>,
946    {
947        let id = id.as_ref();
948        if let Ok((stored_definition, output)) = self.read_file(id) {
949            if local_definition == stored_definition && remote_definition == output {
950                if force_overwrite {
951                    log::warn!("loading '{id}' is clobbering an existing value, but `force_overwrite` is `true`");
952                } else {
953                    let err = ClobberSnafu { id: id.to_owned() }.build();
954                    log::error!("{err}");
955                    return Err(err);
956                }
957            }
958        }
959        self.define_resource(
960            id,
961            local_definition,
962            Action::Load,
963            None,
964            Some(remote_definition),
965        )
966    }
967
968    /// Destroys a resource.
969    pub fn destroy<T>(&mut self, id: impl AsRef<str>) -> Result<DestroyResource<T>, Error>
970    where
971        T: Resource<Provider = P>,
972    {
973        let id = id.as_ref();
974        let (local, remote) = self.read_file::<T>(id)?;
975        let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, Action::Destroy)?;
976        remote_var.set(Some(remote.clone()));
977        {
978            // Destruction requires a load to introduce the resource (for the DAG)
979            log::debug!("adding node {} {id}", Action::Load);
980            let node_name = format!("load {id}");
981            let load_node = dagga::Node::new(StoreNode {
982                name: node_name.clone(),
983                _remote_ty: std::any::type_name::<T>(),
984                run: Box::new({
985                    let resource_id = id.to_owned();
986                    let store_path = self.path.clone();
987                    let local = local.clone();
988                    let remote_var = remote_var.clone();
989                    move |provider| {
990                        Box::pin(
991                            RunAction {
992                                provider,
993                                store_path,
994                                resource_id,
995                                action: Action::Load,
996                                local_definition_code: local,
997                                remote_var,
998                                local_definition_store: None,
999                            }
1000                            .run(),
1001                        )
1002                    }
1003                }),
1004            })
1005            .with_name(node_name)
1006            .with_reads({
1007                let mut reads = vec![];
1008                for dep in local.dependencies() {
1009                    reads.push(
1010                        self.remotes
1011                            .get(&dep)
1012                            .context(MissingResourceSnafu {
1013                                name: id.to_owned(),
1014                            })?
1015                            .key,
1016                    );
1017                }
1018                reads
1019            })
1020            .with_result(rez);
1021            self.graph.add_node(load_node);
1022        }
1023        {
1024            log::debug!("adding node {} {id}", Action::Destroy);
1025            let node_name = format!("destroy {id}");
1026            // Add the destroy node
1027            let destroy_node = StoreNode {
1028                name: node_name.clone(),
1029                _remote_ty: std::any::type_name::<T>(),
1030                run: Box::new({
1031                    let resource_id = id.to_owned();
1032                    let local = local.clone();
1033                    let store_path = self.path.clone();
1034                    let remote_var = remote_var.clone();
1035                    move |provider| {
1036                        Box::pin(
1037                            RunAction {
1038                                provider,
1039                                store_path,
1040                                resource_id,
1041                                action: Action::Destroy,
1042                                local_definition_code: local,
1043                                local_definition_store: None,
1044                                remote_var,
1045                            }
1046                            .run(),
1047                        )
1048                    }
1049                }),
1050            };
1051
1052            self.graph.add_node(
1053                dagga::Node::new(destroy_node)
1054                    .with_name(node_name)
1055                    .with_move(rez),
1056            );
1057        }
1058
1059        Ok(DestroyResource { local, remote })
1060    }
1061
1062    fn get_graph_legend(&self) -> Result<DagLegend<usize>> {
1063        let mut missing_resource_creation = None;
1064        let legend = self.graph.legend()?.with_resources_named(|rez| {
1065            let maybe_name = self.remotes.get_name_by_rez(*rez);
1066            if maybe_name.is_none() {
1067                missing_resource_creation = Some(*rez);
1068            }
1069            maybe_name
1070        });
1071        if let Some(missing) = missing_resource_creation {
1072            log::error!(
1073                "Missing resource {missing}, current resources:\n{}",
1074                self.remotes
1075            );
1076            return MissingNameSnafu { missing }.fail();
1077        }
1078        Ok(legend)
1079    }
1080
1081    pub fn get_schedule_string(&self) -> Result<String, Error> {
1082        let mut dag: dagga::Dag<(), usize> = dagga::Dag::default();
1083        for node in self.graph.nodes() {
1084            let store_node = node.inner();
1085            let print_node = dagga::Node::new(())
1086                .with_name(store_node.name.clone())
1087                .with_reads(node.get_reads().copied())
1088                .with_results(node.get_results().copied())
1089                .with_moves(node.get_moves().copied());
1090            dag.add_node(print_node);
1091        }
1092        struct Proxy {
1093            inner: Schedule<Node<(), usize>>,
1094        }
1095
1096        impl core::fmt::Display for Proxy {
1097            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1098                if self.inner.batches.is_empty() {
1099                    f.write_str("--- No changes.\n")?;
1100                    f.write_str("--- 🌈🦄\n")?;
1101                }
1102                for (i, batch) in self.inner.batches.iter().enumerate() {
1103                    let i = i + 1;
1104                    f.write_str("--- step ")?;
1105                    f.write_fmt(format_args!("{i}\n"))?;
1106                    for node in batch.iter() {
1107                        f.write_str("  ")?;
1108                        f.write_str(node.name())?;
1109                        f.write_str("\n")?;
1110                    }
1111                    f.write_str("---\n")?;
1112                }
1113                Ok(())
1114            }
1115        }
1116
1117        let proxy = Proxy {
1118            inner: dag.build_schedule().unwrap(),
1119        };
1120        Ok(proxy.to_string())
1121    }
1122
1123    pub fn save_apply_graph(&self, path: impl AsRef<std::path::Path>) -> Result<(), Error> {
1124        if self.graph.is_empty() {
1125            log::warn!("Resource DAG is empty, writing an empty dot file");
1126        }
1127        let legend = self.get_graph_legend()?;
1128        dagga::dot::save_as_dot(&legend, path).context(DotSnafu)?;
1129
1130        Ok(())
1131    }
1132
1133    pub async fn apply(&mut self) -> Result<()> {
1134        let graph = std::mem::take(&mut self.graph);
1135        let schedule = graph
1136            .build_schedule()
1137            .map_err(|e| Error::Schedule { msg: e.to_string() })?;
1138        for (i, batch) in schedule.batches.into_iter().enumerate() {
1139            for (j, node) in batch.into_iter().enumerate() {
1140                log::debug!("applying node {j}, batch {i}");
1141                let store_node = node.into_inner();
1142                (store_node.run)(&self.provider).await?;
1143            }
1144        }
1145        Ok(())
1146    }
1147}