tele/
lib.rs

1//! # Teleform
2//!
3//! Teleform is a library designed to facilitate Infrastructure as Code (IaC)
4//! using Rust. It provides a flexible and powerful alternative to tools like
5//! Terraform and Pulumi by allowing developers to describe infrastructure
6//! changes as a Directed Acyclic Graph (DAG). Unlike other solutions, Teleform
7//! does not provide wrappers over platform-specific resources, eschewing them
8//! in favor of direct interaction with platform APIs. This removes a layer of
9//! indirection and keeps your infrastructure domain specific.
10//!
11//! ## Key Features
12//!
13//! - **Resource Management**: Define and manage resources directly through Rust
14//!   code, allowing for seamless integration with other libraries.
15//! - **Dependency Tracking**: Automatically track dependencies between
16//!   resources to ensure correct order of operations.
17//! - **Migration Support**: Easily migrate resources and manage changes over
18//!   time.
19//!
20//! ## Usage
21//!
22//! Teleform is typically used by developers to write custom IaC command line
23//! programs executed at a developer workstation.
24//!
25//! These programs are meant to be fluid, changing as often as the
26//! infrastructure, with changes committed and tracked with version control.
27//!
28//! ### Concepts
29//!
30//! Teleform operates on the concept of local and remote states of resources:
31//!
32//! - **Local State**: This is the desired state of the resource as defined in
33//!   your Rust code. It represents the initial configuration of a platform
34//!   resource.
35//! - **Remote State**: This is the state of the resource as it exists on the
36//!   platform (e.g., AWS, Digital Ocean). It reflects the configuration
37//!   and status of the resource.
38//!
39//! Teleform uses these states to determine the necessary actions to apply.
40//! This involves creating, updating, or deleting resources as needed.
41//!
42//! An example usage can be found in `crates/teleform/src/test.rs`,
43//! demonstrating how to define and manage resources using the library's
44//! primitives.
45//!
46//! ## Target Audience
47//!
48//! This library is intended for developers, particularly those in solo or small
49//! team environments, who are looking for a more general and flexible solution
50//! to IaC. It is also suitable for those seeking to migrate away from Terraform.
51//!
52//! ## Error Handling
53//!
54//! Teleform exposes a comprehensive error enum [`Error`], which encompasses all
55//! possible errors that may occur during operations. Functions that can result
56//! in errors return a `Result` type with this [`Error`], ensuring robust error
57//! handling throughout the library.
58
59use std::{future::Future, ops::Deref, pin::Pin};
60
61use dagga::{dot::DagLegend, Node, Schedule};
62use snafu::prelude::*;
63use tokio::io::AsyncWriteExt;
64
65pub use teleform_derive::HasDependencies;
66
67mod has_dependencies_impl;
68pub mod remote;
69#[cfg(test)]
70mod test;
71pub mod utils;
72
73use remote::{Migrated, Remote, RemoteVar, Remotes};
74
75/// Marker trait for userland errors.
76pub trait UserError: core::fmt::Display + core::fmt::Debug + 'static {}
77impl<T: core::fmt::Display + core::fmt::Debug + 'static> UserError for T {}
78
79/// Top-level error enum that encompasses all errors.
80#[derive(snafu::Snafu, Debug)]
81pub enum Error {
82    #[snafu(display("{source}:\n{}",
83                source.chain()
84                    .map(|e| format!("{e}"))
85                    .collect::<Vec<_>>()
86                    .join("\n -> ")))]
87    Tele { source: anyhow::Error },
88
89    #[snafu(display("Could not read store file '{path:?}': {source}"))]
90    StoreFileRead {
91        path: std::path::PathBuf,
92        source: std::io::Error,
93    },
94
95    #[snafu(display("Could not delete store file '{path:?}': {source}"))]
96    StoreFileDelete {
97        path: std::path::PathBuf,
98        source: std::io::Error,
99    },
100
101    #[snafu(display("Could not serialize stored '{name}': {source}"))]
102    Serialize {
103        name: String,
104        source: serde_json::Error,
105    },
106
107    #[snafu(display("Could not deserialize stored '{name}': {source}"))]
108    Deserialize {
109        name: String,
110        source: serde_json::Error,
111    },
112
113    #[snafu(display("Could not build schedule: {msg}"))]
114    Schedule { msg: String },
115
116    #[snafu(display("Could not create file {path:?}: {source}"))]
117    CreateFile {
118        path: std::path::PathBuf,
119        source: std::io::Error,
120    },
121
122    #[snafu(display("Could not write file {path:?}: {source}"))]
123    WriteFile {
124        path: std::path::PathBuf,
125        source: std::io::Error,
126    },
127
128    #[snafu(display("Remote value of {ty:?} is unresolved. Depends on {depends_on}"))]
129    RemoteUnresolved {
130        ty: &'static str,
131        depends_on: String,
132    },
133
134    #[snafu(display("Could not save the apply graph: {source}"))]
135    Dot { source: dagga::dot::DotError },
136
137    #[snafu(display(
138        "Could not build apply graph because of a missing resource name for '{missing}'"
139    ))]
140    MissingName { missing: usize },
141
142    #[snafu(display("Could not find a resource by the name '{name}'"))]
143    MissingResource { name: String },
144
145    #[snafu(display("Error during '{name}' creation: {error}"))]
146    Create {
147        name: String,
148        error: Box<dyn UserError>,
149    },
150
151    #[snafu(display("Error during '{name}' read and import: {error}"))]
152    Import {
153        name: String,
154        error: Box<dyn UserError>,
155    },
156
157    #[snafu(display("Error during '{name}' update: {error}"))]
158    Update {
159        name: String,
160        error: Box<dyn UserError>,
161    },
162
163    #[snafu(display("Error during '{name}' destruction: {error}"))]
164    Destroy {
165        name: String,
166        error: Box<dyn UserError>,
167    },
168
169    #[snafu(display("Error during execution of a manual step '{name}': {error}"))]
170    Manual {
171        name: String,
172        error: Box<dyn UserError>,
173    },
174
175    #[snafu(display("Missing previous remote value '{name}'"))]
176    Load { name: String },
177
178    #[snafu(display(
179        "Loading '{id}' would clobber an existing value in the store file, \
180        and these values are not the same"
181    ))]
182    Clobber { id: String },
183
184    #[snafu(display("Could not downcast"))]
185    Downcast,
186
187    #[snafu(display("Missing store file for '{id}'"))]
188    MissingStoreFile { id: String },
189}
190
191impl From<anyhow::Error> for Error {
192    fn from(source: anyhow::Error) -> Self {
193        Error::Tele { source }
194    }
195}
196
197impl From<dagga::dot::DotError> for Error {
198    fn from(source: dagga::dot::DotError) -> Self {
199        Self::Dot { source }
200    }
201}
202
203type Result<T, E = Error> = core::result::Result<T, E>;
204
205/// IaC resources.
206///
207/// Represents a resource created on a platform (ie AWS, Digital Ocean, etc).
208#[allow(unreachable_code)]
209pub trait Resource:
210    core::fmt::Debug
211    + Clone
212    + PartialEq
213    + HasDependencies
214    + serde::Serialize
215    + serde::de::DeserializeOwned
216    + 'static
217{
218    /// Type of the platform/resource provider.
219    ///
220    /// For example `aws_config::SdkConfig` in the case of amazon web services.
221    type Provider;
222
223    /// Errors that may occur interacting with the provider.
224    type Error: UserError;
225
226    /// The remote type of this resource, which we can used to fill in
227    /// [`Remote`] values in other resources.
228    type Output: core::fmt::Debug
229        + Clone
230        + PartialEq
231        + serde::Serialize
232        + serde::de::DeserializeOwned
233        + 'static;
234
235    /// Creates a new resource on the platform.
236    ///
237    /// This method should be implemented to define how a resource is created
238    /// using the provider's API. It returns a future that resolves to the
239    /// resource's output type or an error.
240    ///
241    /// ## Note
242    /// This method is explicitly `unimplemented!` for developer convenience.
243    /// It allows you to define only the methods you need. However, take care when
244    /// using this in contexts like long-running daemons, as calling an unimplemented
245    /// method will cause a panic.
246    fn create(
247        &self,
248        _provider: &Self::Provider,
249    ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
250        unimplemented!(
251            "Resource::create is unimplemented for {}",
252            std::any::type_name::<Self>()
253        ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
254    }
255
256    /// Reads the current state of the resource from the platform.
257    ///
258    /// This method should be implemented to define how to fetch the current
259    /// state of a resource using the provider's API. It returns a future that
260    /// resolves to the resource's output type or an error.
261    ///
262    /// ## Note
263    /// This method is explicitly `unimplemented!` for developer convenience.
264    /// It allows you to define only the methods you need. However, take care when
265    /// using this in contexts like long-running daemons, as calling an unimplemented
266    /// method will cause a panic.
267    fn read(
268        &self,
269        _provider: &Self::Provider,
270    ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
271        unimplemented!(
272            "Resource::read is unimplemented for {}",
273            std::any::type_name::<Self>()
274        ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
275    }
276
277    /// Updates an existing resource on the platform.
278    ///
279    /// This method should be implemented to define how a resource is updated
280    /// using the provider's API. It takes the previous local and remote states
281    /// of the resource and returns a future that resolves to the updated
282    /// resource's output type or an error.
283    ///
284    /// ## Note
285    /// This method is explicitly `unimplemented!` for developer convenience.
286    /// It allows you to define only the methods you need. However, take care when
287    /// using this in contexts like long-running daemons, as calling an unimplemented
288    /// method will cause a panic.
289    fn update(
290        &self,
291        _provider: &Self::Provider,
292        _previous_local: &Self,
293        _previous_remote: &Self::Output,
294    ) -> impl Future<Output = Result<Self::Output, Self::Error>> {
295        unimplemented!(
296            "Resource::update is unimplemented for {}",
297            std::any::type_name::<Self>()
298        ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
299    }
300
301    /// Deletes a resource from the platform.
302    ///
303    /// This method should be implemented to define how a resource is deleted
304    /// using the provider's API. It takes the previous remote state of the
305    /// resource and returns a future that resolves to a unit type or an error.
306    ///
307    /// ## Note
308    /// This method is explicitly `unimplemented!` for developer convenience.
309    /// It allows you to define only the methods you need. However, take care when
310    /// using this in contexts like long-running daemons, as calling an unimplemented
311    /// method will cause a panic.
312    fn delete(
313        &self,
314        _provider: &Self::Provider,
315        _previous_remote: &Self::Output,
316    ) -> impl Future<Output = Result<(), Self::Error>> {
317        unimplemented!(
318            "Resource::delete is unimplemented for {}",
319            std::any::type_name::<Self>()
320        ) as Box<dyn Future<Output = Result<_, _>> + Unpin>
321    }
322}
323
324#[derive(Clone, Default, Debug)]
325pub struct Dependencies {
326    /// Specifies a dependency on a `Resource`.
327    inner: Vec<String>,
328}
329
330impl IntoIterator for Dependencies {
331    type Item = String;
332
333    type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
334
335    fn into_iter(self) -> Self::IntoIter {
336        self.inner.into_iter()
337    }
338}
339
340impl core::fmt::Display for Dependencies {
341    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342        f.write_str(
343            &self
344                .inner
345                .iter()
346                .map(|u| u.to_string())
347                .collect::<Vec<_>>()
348                .join(", "),
349        )
350    }
351}
352
353impl Dependencies {
354    pub fn merge(self, other: Self) -> Self {
355        Dependencies {
356            inner: [self.inner, other.inner].concat(),
357        }
358    }
359}
360
361/// Tracks dependencies between resources.
362///
363/// This trait can be derived, and has a default implementation that
364/// reports zero dependencies.
365pub trait HasDependencies {
366    fn dependencies(&self) -> Dependencies {
367        Dependencies::default()
368    }
369}
370
371/// `Create`, `Load` and `Update` result in a resource being added to the graph.
372///
373/// `Destroy` moves the resource out of the graph.
374#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
375pub enum Action {
376    Load,
377    Create,
378    Read,
379    Update,
380    Destroy,
381}
382
383impl core::fmt::Display for Action {
384    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
385        f.write_str(match self {
386            Action::Load => "load",
387            Action::Create => "create",
388            Action::Read => "read",
389            Action::Update => "update",
390            Action::Destroy => "destroy",
391        })
392    }
393}
394
395#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
396struct InertStoreResource {
397    name: String,
398    local: serde_json::Value,
399    remote: serde_json::Value,
400}
401
402impl InertStoreResource {
403    async fn save(
404        &self,
405        resource_id: &str,
406        store_path: impl AsRef<std::path::Path>,
407    ) -> Result<(), Error> {
408        let path = store_file_path(resource_id, &store_path);
409        log::info!("storing {resource_id} to {path:?}");
410
411        let contents = serde_json::to_string_pretty(self).context(SerializeSnafu {
412            name: format!("storing {}", resource_id),
413        })?;
414
415        // Ensure the parent directory exists
416        if let Some(parent) = path.parent() {
417            tokio::fs::create_dir_all(&parent)
418                .await
419                .context(CreateFileSnafu { path: parent })?;
420        }
421
422        let mut file = tokio::fs::File::create(&path)
423            .await
424            .context(CreateFileSnafu { path: path.clone() })?;
425        file.write_all(contents.as_bytes())
426            .await
427            .context(WriteFileSnafu { path: path.clone() })?;
428        Ok(())
429    }
430}
431
432#[derive(Clone, Debug)]
433pub struct StoreResource<L, R> {
434    /// Name of the resource from the user's perspective
435    name: String,
436    /// Local definition in _code_
437    local_definition: L,
438    action: Action,
439    remote_var: RemoteVar<R>,
440}
441
442impl<L, R> Deref for StoreResource<L, R> {
443    type Target = L;
444
445    fn deref(&self) -> &Self::Target {
446        &self.local_definition
447    }
448}
449
450impl<L, R> AsRef<L> for StoreResource<L, R> {
451    fn as_ref(&self) -> &L {
452        &self.local_definition
453    }
454}
455
456impl<L, R> TryFrom<StoreResource<L, R>> for InertStoreResource
457where
458    L: serde::Serialize + for<'a> serde::Deserialize<'a>,
459    R: Clone + serde::Serialize + for<'a> serde::Deserialize<'a>,
460{
461    type Error = Error;
462
463    fn try_from(value: StoreResource<L, R>) -> std::result::Result<Self, Self::Error> {
464        let local = serde_json::to_value(value.local_definition).context(SerializeSnafu {
465            name: value.name.clone(),
466        })?;
467        let output = value.remote_var.get().context(LoadSnafu {
468            name: value.name.clone(),
469        })?;
470        let remote = serde_json::to_value(output).context(SerializeSnafu {
471            name: value.name.clone(),
472        })?;
473        Ok(Self {
474            name: value.name,
475            local,
476            remote,
477        })
478    }
479}
480
481impl<T> StoreResource<T, T::Output>
482where
483    T: Resource,
484    T::Output: Clone,
485{
486    /// Map a remote value to use in local definitions.
487    pub fn remote<X: Clone + core::fmt::Debug + 'static>(
488        &self,
489        f: impl Fn(&T::Output) -> X + 'static,
490    ) -> Remote<X> {
491        Remote::new(self, f)
492    }
493
494    /// Return the action that would be applied to this resource.
495    ///
496    /// This is useful if you need to trigger invalidations or anything else based on
497    /// whether a resource is created, updated, deleted, etc.
498    pub fn action(&self) -> Action {
499        self.action
500    }
501
502    pub fn depends_on<X, Y>(&self, store: &mut Store<T::Provider>, resource: &StoreResource<X, Y>) {
503        let this_var = store.remotes.get(&self.name).unwrap();
504        let that_var = store.remotes.get(&resource.name).unwrap();
505        for node in store.graph.take_nodes() {
506            store.graph.add_node(
507                if node
508                    .get_results()
509                    .copied()
510                    .collect::<Vec<_>>()
511                    .contains(&this_var.key)
512                {
513                    node.with_read(that_var.key)
514                } else {
515                    node
516                },
517            );
518        }
519    }
520}
521
522/// The path to an individual resource store file.
523fn store_file_path(name: &str, store_path: impl AsRef<std::path::Path>) -> std::path::PathBuf {
524    store_path.as_ref().join(format!("{name}.json"))
525}
526
527type StoreNodeRunFn<Provider> = Box<
528    dyn FnOnce(
529        // Resource platform provider
530        &'_ Provider,
531    ) -> Pin<Box<dyn Future<Output = Result<()>> + '_>>,
532>;
533
534struct RunAction<'a, Provider, T: Resource<Provider = Provider>> {
535    provider: &'a Provider,
536    store_path: std::path::PathBuf,
537    /// Name of the resource being acted on, not the node name.
538    resource_id: String,
539    action: Action,
540    local_definition_code: T,
541    local_definition_store: Option<T>,
542    remote_var: RemoteVar<T::Output>,
543}
544
545impl<Provider, T: Resource<Provider = Provider>> RunAction<'_, Provider, T> {
546    async fn run(self) -> Result<()>
547    where
548        T: Resource,
549    {
550        let Self {
551            provider,
552            store_path,
553            resource_id,
554            action,
555            local_definition_code,
556            local_definition_store,
557            remote_var,
558        } = self;
559        log::info!("{action} '{resource_id}':");
560
561        async fn save<T: Resource>(
562            resource_id: &str,
563            local_definition_code: T,
564            remote_var: &RemoteVar<T::Output>,
565            store_path: impl AsRef<std::path::Path>,
566        ) -> Result<(), Error> {
567            let inert_resource = InertStoreResource {
568                name: resource_id.to_owned(),
569                local: serde_json::to_value(local_definition_code).context(SerializeSnafu {
570                    name: format!("store {resource_id}"),
571                })?,
572                remote: serde_json::to_value(
573                    remote_var.get().context(LoadSnafu { name: resource_id })?,
574                )
575                .context(SerializeSnafu {
576                    name: format!("store {resource_id} remote"),
577                })?,
578            };
579            inert_resource.save(resource_id, store_path).await?;
580            Ok(())
581        }
582
583        match action {
584            Action::Load => {
585                save(&resource_id, local_definition_code, &remote_var, store_path).await?;
586            }
587            Action::Create => {
588                let value = local_definition_code
589                    .create(provider)
590                    .await
591                    .map_err(|error| Error::Create {
592                        name: resource_id.to_owned(),
593                        error: Box::new(error),
594                    })?;
595                remote_var.set(Some(value));
596                save(&resource_id, local_definition_code, &remote_var, store_path).await?;
597            }
598            Action::Read => {
599                let value = local_definition_code
600                    .read(provider)
601                    .await
602                    .map_err(|error| Error::Create {
603                        name: resource_id.to_owned(),
604                        error: Box::new(error),
605                    })?;
606                remote_var.set(Some(value));
607                save(&resource_id, local_definition_code, &remote_var, store_path).await?;
608            }
609            Action::Update => {
610                let previous_local = local_definition_store.unwrap();
611                let previous_remote = remote_var.get().context(LoadSnafu {
612                    name: resource_id.clone(),
613                })?;
614                if previous_local == local_definition_code {
615                    log::warn!(
616                        "Skipping '{resource_id}' update as the local value has not changed.\n\
617                        If you require an update, consider adding a sentinel value."
618                    );
619                } else {
620                    let cmp =
621                        pretty_assertions::Comparison::new(&previous_local, &local_definition_code);
622                    let change_string = format!("{cmp}")
623                        .lines()
624                        .map(|line| format!("  {line}"))
625                        .collect::<Vec<_>>()
626                        .join("\n");
627                    log::info!("updating '{resource_id}':\n{change_string}");
628                    let output = local_definition_code
629                        .update(provider, &previous_local, &previous_remote)
630                        .await
631                        .map_err(|error| Error::Update {
632                            name: resource_id.clone(),
633                            error: Box::new(error),
634                        })?;
635                    remote_var.set(Some(output));
636                    save(&resource_id, local_definition_code, &remote_var, store_path).await?;
637                }
638            }
639            Action::Destroy => {
640                log::debug!("running destroy action on {resource_id}");
641                // In the destroy case there is no code-local definition, but there is always
642                // a store definition, so we pass the store definition as the code definition.
643                // This is better IMO than having both code-local and store be optional.
644                let local_definition = local_definition_code.clone();
645                let previous_remote = remote_var.get().context(LoadSnafu {
646                    name: resource_id.clone(),
647                })?;
648                local_definition
649                    .delete(provider, &previous_remote)
650                    .await
651                    .map_err(|error| Error::Destroy {
652                        name: resource_id.to_owned(),
653                        error: Box::new(error),
654                    })?;
655
656                log::info!("  {resource_id} is destroyed");
657                let path = store_file_path(&resource_id, &store_path);
658                log::info!("  removing {resource_id} store file {path:?}");
659                tokio::fs::remove_file(&path)
660                    .await
661                    .context(StoreFileDeleteSnafu { path })?;
662                remote_var.set(None);
663            }
664        }
665
666        log::info!("  success!");
667        Ok(())
668    }
669}
670
671pub struct DestroyResource<T: Resource> {
672    local: T,
673    remote: T::Output,
674}
675
676impl<T: Resource> Deref for DestroyResource<T> {
677    type Target = T;
678
679    fn deref(&self) -> &Self::Target {
680        &self.local
681    }
682}
683
684impl<T: Resource> DestroyResource<T> {
685    /// Map a remote value of a resource scheduled to be destroyed into a
686    /// permanent field of another resource.
687    pub fn migrate<X: Clone + core::fmt::Debug + 'static>(
688        &self,
689        f: fn(&T::Output) -> X,
690    ) -> Migrated<X> {
691        Migrated(f(&self.remote))
692    }
693}
694
695struct StoreNode<Provider> {
696    name: String,
697    _remote_ty: &'static str,
698    run: StoreNodeRunFn<Provider>,
699}
700
701struct PreviouslyStored<T: Resource> {
702    action: Action,
703    resource: Option<(T, T::Output)>,
704}
705
706pub struct Store<T> {
707    path: std::path::PathBuf,
708    provider: T,
709    remotes: Remotes,
710    graph: dagga::Dag<StoreNode<T>, usize>,
711}
712
713impl<P: 'static> Store<P> {
714    fn read_from_store<T: Resource<Provider = P>>(
715        path: impl AsRef<std::path::Path>,
716        id: &str,
717    ) -> Result<(T, T::Output)> {
718        let path = store_file_path(id, path.as_ref());
719        snafu::ensure!(path.exists(), MissingStoreFileSnafu { id: id.to_owned() });
720
721        log::debug!("{path:?} exists, reading '{id}' from it");
722        let contents = std::fs::read_to_string(&path).context(StoreFileReadSnafu {
723            path: path.to_path_buf(),
724        })?;
725        log::trace!(
726            "contents:\n{}",
727            contents
728                .lines()
729                .map(|line| format!("  {line}"))
730                .collect::<Vec<_>>()
731                .join("\n")
732        );
733        let inert_store_rez: InertStoreResource =
734            serde_json::from_str(&contents).context(DeserializeSnafu {
735                name: id.to_owned(),
736            })?;
737        log::trace!("read inert store resource");
738        log::trace!(
739            "reading local contents: {}",
740            serde_json::to_string_pretty(&inert_store_rez.local)
741                .unwrap()
742                .lines()
743                .map(|line| format!("  {line}"))
744                .collect::<Vec<_>>()
745                .join("\n")
746        );
747        log::trace!("as {}", std::any::type_name::<T>());
748        let stored_definition: T =
749            serde_json::from_value(inert_store_rez.local).context(DeserializeSnafu {
750                name: id.to_owned(),
751            })?;
752
753        log::trace!("  reading remote output JSON value");
754        let remote_value: T::Output =
755            serde_json::from_value(inert_store_rez.remote).context(DeserializeSnafu {
756                name: format!("remote {id}"),
757            })?;
758        Ok((stored_definition, remote_value))
759    }
760
761    pub fn new(path: impl AsRef<std::path::Path>, provider: P) -> Self {
762        Self {
763            path: path.as_ref().to_path_buf(),
764            graph: dagga::Dag::default(),
765            remotes: Default::default(),
766            provider,
767        }
768    }
769
770    pub fn provider(&self) -> &P {
771        &self.provider
772    }
773
774    fn read_file<T>(&self, id: &str) -> Result<(T, T::Output), Error>
775    where
776        T: Resource<Provider = P>,
777    {
778        Self::read_from_store(&self.path, id)
779    }
780
781    /// Adds a barrier after which all resources will be run after those defined before.
782    pub fn barrier(&mut self) {
783        self.graph.add_barrier();
784    }
785
786    fn define_resource<T>(
787        &mut self,
788        id: impl AsRef<str>,
789        local_definition: T,
790        action: Action,
791        stored_definition: Option<T>,
792        output: Option<T::Output>,
793    ) -> Result<StoreResource<T, T::Output>, Error>
794    where
795        T: Resource<Provider = P>,
796    {
797        let id = id.as_ref();
798        let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, action)?;
799        remote_var.set(output);
800
801        let remote_var = remote_var.clone();
802        let local_definition_code = local_definition.clone();
803        let local_definition_store = stored_definition.clone();
804        let store_path = self.path.clone();
805        let run: StoreNodeRunFn<T::Provider> = Box::new({
806            let resource_id = id.to_owned();
807            let remote_var = remote_var.clone();
808            let local_definition_code = local_definition_code.clone();
809            let local_definition_store = local_definition_store.clone();
810            move |provider: &T::Provider| {
811                Box::pin(
812                    RunAction {
813                        provider,
814                        store_path,
815                        resource_id,
816                        action,
817                        local_definition_code,
818                        local_definition_store,
819                        remote_var,
820                    }
821                    .run(),
822                )
823            }
824        });
825        let ty = std::any::type_name::<T>();
826
827        {
828            // Add the main action node
829            log::debug!("adding main node {action} {id}");
830            let node_name = format!("{action} {id}");
831            let dag_node = dagga::Node::new(StoreNode {
832                name: node_name.clone(),
833                _remote_ty: ty,
834                run,
835            })
836            .with_name(node_name)
837            .with_reads({
838                // read the resource keys out of "remotes" as dependencies
839                let mut reads = vec![];
840                for dep in local_definition.dependencies() {
841                    let var = self
842                        .remotes
843                        .get(&dep)
844                        .context(MissingResourceSnafu { name: dep })?;
845                    reads.push(var.key);
846                }
847                reads
848            });
849            let dag_node = match action {
850                Action::Create | Action::Read | Action::Load | Action::Update => {
851                    log::debug!("  with result {rez}");
852                    dag_node.with_result(rez)
853                }
854                Action::Destroy => {
855                    log::debug!("  with move {rez}");
856                    dag_node.with_move(rez)
857                }
858            };
859            self.graph.add_node(dag_node);
860        }
861
862        Ok(StoreResource {
863            name: id.to_owned(),
864            local_definition,
865            action,
866            remote_var,
867        })
868    }
869
870    /// Read the stored previous definition and determine the action.
871    fn determine_action_from_previously_stored<T>(
872        &self,
873        local_definition: &T,
874        id: &str,
875    ) -> Result<PreviouslyStored<T>, Error>
876    where
877        T: Resource<Provider = P>,
878    {
879        match self.read_file(id) {
880            Ok((stored_definition, output)) => {
881                // This has already been created and stored, so this is either a simple load,
882                // or an update.
883                log::debug!("  {output:?}");
884                let action = if *local_definition != stored_definition {
885                    log::debug!("  local resource has changed, so this remote is now stale");
886                    Action::Update
887                } else {
888                    // Check if any upstream dependencies are "stale" (updated or deleted),
889                    // which would cause this resource to possibly require an update.
890                    let mut may_need_update = false;
891                    for dep in local_definition.dependencies() {
892                        let var = self.remotes.get(&dep).context(LoadSnafu { name: dep })?;
893                        if var.action != Action::Load {
894                            may_need_update = true;
895                            break;
896                        }
897                    }
898                    if may_need_update {
899                        Action::Update
900                    } else {
901                        Action::Load
902                    }
903                };
904
905                Ok(PreviouslyStored {
906                    action,
907                    resource: Some((stored_definition, output)),
908                })
909            }
910            Err(Error::MissingStoreFile { id }) => {
911                log::debug!("store file '{id}' does not exist, creating a new resource",);
912                Ok(PreviouslyStored {
913                    action: Action::Create,
914                    resource: None,
915                })
916            }
917            Err(e) => {
918                log::error!("could not define resource '{id}': {e}");
919                Err(e)
920            }
921        }
922    }
923
924    /// Defines a resource.
925    ///
926    /// Produces two graph nodes:
927    /// 1. Depending on the result of compairing `local_definition` to the one on file
928    ///    (if it exists), either:
929    ///    - creates the resource on the platform
930    ///    - updates the resource on the platform
931    ///    - loads the resource from a file
932    /// 2. Stores the resource to a file
933    ///
934    /// To import an existing resource from a platform, use [`Store::import`].
935    pub fn resource<T>(
936        &mut self,
937        id: impl AsRef<str>,
938        local_definition: T,
939    ) -> Result<StoreResource<T, T::Output>, Error>
940    where
941        T: Resource<Provider = P>,
942    {
943        let id = id.as_ref();
944        let PreviouslyStored { action, resource } =
945            self.determine_action_from_previously_stored(&local_definition, id)?;
946        let (local, remote) = resource
947            .map(|(local, remote)| (Some(local), Some(remote)))
948            .unwrap_or_default();
949        self.define_resource(id, local_definition, action, local, remote)
950    }
951
952    /// Defines a pre-existing resource, importing it from the platform.
953    ///
954    /// Produces two graph nodes:
955    /// 1. Import the resource from the platform, resulting in the resource
956    /// 2. Store the value to a file
957    ///
958    /// This only needs to be used once in your infrastructure command.
959    /// After the resource is imported and stored to a file it is recommended
960    /// you make a code change to use [`Store::resource`].
961    pub fn import<T>(
962        &mut self,
963        id: impl AsRef<str>,
964        local_definition: T,
965    ) -> Result<StoreResource<T, T::Output>, Error>
966    where
967        T: Resource<Provider = P>,
968    {
969        self.define_resource(id, local_definition, Action::Read, None, None)
970    }
971
972    /// Defines a pre-existing resource, directly writing it to file, without
973    /// querying the platform.
974    ///
975    /// Produces two graph nodes:
976    /// 1. Load the value (noop)
977    /// 2. Store the value
978    ///
979    /// ## Errors
980    /// Errs if `force_overwrite` is `false` _and_ a stored resource already
981    /// exists. This is done to prevent accidental clobbering.
982    pub fn load<T>(
983        &mut self,
984        id: impl AsRef<str>,
985        local_definition: T,
986        remote_definition: T::Output,
987        force_overwrite: bool,
988    ) -> Result<StoreResource<T, T::Output>, Error>
989    where
990        T: Resource<Provider = P>,
991    {
992        let id = id.as_ref();
993        if let Ok((stored_definition, output)) = self.read_file(id) {
994            if local_definition == stored_definition && remote_definition == output {
995                if force_overwrite {
996                    log::warn!("loading '{id}' is clobbering an existing value, but `force_overwrite` is `true`");
997                } else {
998                    let err = ClobberSnafu { id: id.to_owned() }.build();
999                    log::error!("{err}");
1000                    return Err(err);
1001                }
1002            }
1003        }
1004        self.define_resource(
1005            id,
1006            local_definition,
1007            Action::Load,
1008            None,
1009            Some(remote_definition),
1010        )
1011    }
1012
1013    /// Destroys a resource.
1014    pub fn destroy<T>(&mut self, id: impl AsRef<str>) -> Result<DestroyResource<T>, Error>
1015    where
1016        T: Resource<Provider = P>,
1017    {
1018        let id = id.as_ref();
1019        let (local, remote) = self.read_file::<T>(id)?;
1020        let (remote_var, rez, _ty) = self.remotes.dequeue_var::<T::Output>(id, Action::Destroy)?;
1021        remote_var.set(Some(remote.clone()));
1022        {
1023            // Destruction requires a load to introduce the resource (for the DAG)
1024            log::debug!("adding node {} {id}", Action::Load);
1025            let node_name = format!("load {id}");
1026            let load_node = dagga::Node::new(StoreNode {
1027                name: node_name.clone(),
1028                _remote_ty: std::any::type_name::<T>(),
1029                run: Box::new({
1030                    let resource_id = id.to_owned();
1031                    let store_path = self.path.clone();
1032                    let local = local.clone();
1033                    let remote_var = remote_var.clone();
1034                    move |provider| {
1035                        Box::pin(
1036                            RunAction {
1037                                provider,
1038                                store_path,
1039                                resource_id,
1040                                action: Action::Load,
1041                                local_definition_code: local,
1042                                remote_var,
1043                                local_definition_store: None,
1044                            }
1045                            .run(),
1046                        )
1047                    }
1048                }),
1049            })
1050            .with_name(node_name)
1051            .with_reads({
1052                let mut reads = vec![];
1053                for dep in local.dependencies() {
1054                    reads.push(
1055                        self.remotes
1056                            .get(&dep)
1057                            .context(MissingResourceSnafu {
1058                                name: id.to_owned(),
1059                            })?
1060                            .key,
1061                    );
1062                }
1063                reads
1064            })
1065            .with_result(rez);
1066            self.graph.add_node(load_node);
1067        }
1068        {
1069            log::debug!("adding node {} {id}", Action::Destroy);
1070            let node_name = format!("destroy {id}");
1071            // Add the destroy node
1072            let destroy_node = StoreNode {
1073                name: node_name.clone(),
1074                _remote_ty: std::any::type_name::<T>(),
1075                run: Box::new({
1076                    let resource_id = id.to_owned();
1077                    let local = local.clone();
1078                    let store_path = self.path.clone();
1079                    let remote_var = remote_var.clone();
1080                    move |provider| {
1081                        Box::pin(
1082                            RunAction {
1083                                provider,
1084                                store_path,
1085                                resource_id,
1086                                action: Action::Destroy,
1087                                local_definition_code: local,
1088                                local_definition_store: None,
1089                                remote_var,
1090                            }
1091                            .run(),
1092                        )
1093                    }
1094                }),
1095            };
1096
1097            self.graph.add_node(
1098                dagga::Node::new(destroy_node)
1099                    .with_name(node_name)
1100                    .with_move(rez),
1101            );
1102        }
1103
1104        Ok(DestroyResource { local, remote })
1105    }
1106
1107    pub fn after<T: Resource>(&mut self, resource: &StoreResource<T, T::Output>) {}
1108
1109    fn get_graph_legend(&self) -> Result<DagLegend<usize>> {
1110        let mut missing_resource_creation = None;
1111        let legend = self.graph.legend()?.with_resources_named(|rez| {
1112            let maybe_name = self.remotes.get_name_by_rez(*rez);
1113            if maybe_name.is_none() {
1114                missing_resource_creation = Some(*rez);
1115            }
1116            maybe_name
1117        });
1118        if let Some(missing) = missing_resource_creation {
1119            log::error!(
1120                "Missing resource {missing}, current resources:\n{}",
1121                self.remotes
1122            );
1123            return MissingNameSnafu { missing }.fail();
1124        }
1125        Ok(legend)
1126    }
1127
1128    pub fn get_schedule_string(&self) -> Result<String, Error> {
1129        let mut dag: dagga::Dag<(), usize> = dagga::Dag::default();
1130        for node in self.graph.nodes() {
1131            let store_node = node.inner();
1132            let print_node = dagga::Node::new(())
1133                .with_name(store_node.name.clone())
1134                .with_reads(node.get_reads().copied())
1135                .with_results(node.get_results().copied())
1136                .with_moves(node.get_moves().copied())
1137                .with_barrier(node.get_barrier());
1138            dag.add_node(print_node);
1139        }
1140        struct Proxy {
1141            inner: Schedule<Node<(), usize>>,
1142        }
1143
1144        impl core::fmt::Display for Proxy {
1145            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1146                if self.inner.batches.is_empty() {
1147                    f.write_str("--- No changes.\n")?;
1148                    f.write_str("--- 🌈🦄\n")?;
1149                }
1150                for (i, batch) in self.inner.batches.iter().enumerate() {
1151                    let i = i + 1;
1152                    f.write_str("--- step ")?;
1153                    f.write_fmt(format_args!("{i}\n"))?;
1154                    for node in batch.iter() {
1155                        f.write_str("  ")?;
1156                        f.write_str(node.name())?;
1157                        f.write_str("\n")?;
1158                    }
1159                    f.write_str("---\n")?;
1160                }
1161                Ok(())
1162            }
1163        }
1164
1165        let proxy = Proxy {
1166            inner: dag.build_schedule().unwrap(),
1167        };
1168        Ok(proxy.to_string())
1169    }
1170
1171    pub fn save_apply_graph(&self, path: impl AsRef<std::path::Path>) -> Result<(), Error> {
1172        if self.graph.is_empty() {
1173            log::warn!("Resource DAG is empty, writing an empty dot file");
1174        }
1175        let legend = self.get_graph_legend()?;
1176        dagga::dot::save_as_dot(&legend, path).context(DotSnafu)?;
1177
1178        Ok(())
1179    }
1180
1181    pub async fn apply(&mut self) -> Result<()> {
1182        let graph = std::mem::take(&mut self.graph);
1183        let schedule = graph
1184            .build_schedule()
1185            .map_err(|e| Error::Schedule { msg: e.to_string() })?;
1186        for (i, batch) in schedule.batches.into_iter().enumerate() {
1187            for (j, node) in batch.into_iter().enumerate() {
1188                log::debug!("applying node {j}, batch {i}");
1189                let store_node = node.into_inner();
1190                (store_node.run)(&self.provider).await?;
1191            }
1192        }
1193        Ok(())
1194    }
1195}