tele/
remote.rs

1//! Remote values.
2//!
3//! Remote values are values that are determined after creating
4//! or reading a resource from a provider.
5
6use std::{
7    any::Any,
8    collections::HashMap,
9    ops::Deref,
10    sync::{Arc, Mutex},
11};
12
13use snafu::OptionExt;
14
15use crate::HasDependencies;
16
17use super::{
18    Action, Dependencies, DowncastSnafu, Error, RemoteUnresolvedSnafu, Resource, StoreResource,
19};
20
21type VarFn<X> = Arc<dyn Fn(&Arc<dyn Any>) -> Result<X, Error>>;
22
23#[derive(Clone)]
24enum RemoteInner<X> {
25    Init {
26        depends_on: String,
27        last_known_value: Option<X>,
28    },
29    Var {
30        depends_on: String,
31        map: VarFn<X>,
32        // RemoteVar<T::Output>
33        var: Arc<dyn Any>,
34    },
35}
36
37impl<X: std::fmt::Debug> std::fmt::Debug for RemoteInner<X> {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Self::Init {
41                depends_on,
42                last_known_value,
43            } => f
44                .debug_struct("Init")
45                .field("depends_on", depends_on)
46                .field("last_known_value", last_known_value)
47                .finish(),
48            Self::Var {
49                depends_on,
50                map: _,
51                var,
52            } => f
53                .debug_struct("Var")
54                .field("depends_on", depends_on)
55                .field("var", var)
56                .finish(),
57        }
58    }
59}
60
61#[derive(Clone, Debug)]
62pub struct Remote<X> {
63    inner: RemoteInner<X>,
64}
65
66impl<X: Clone + core::fmt::Debug + PartialEq + 'static> PartialEq for Remote<X> {
67    fn eq(&self, other: &Self) -> bool {
68        if let Ok(here) = self.get() {
69            if let Ok(there) = other.get() {
70                here == there
71            } else {
72                false
73            }
74        } else {
75            false
76        }
77    }
78}
79
80#[derive(serde::Serialize, serde::Deserialize)]
81struct RemoteProxy<T> {
82    depends_on: String,
83    last_known_value: Option<T>,
84}
85
86impl<X: serde::Serialize + Clone + core::fmt::Debug + 'static> serde::Serialize for Remote<X> {
87    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
88    where
89        S: serde::Serializer,
90    {
91        let proxy = RemoteProxy {
92            last_known_value: self.get().ok(),
93            depends_on: match &self.inner {
94                RemoteInner::Init { depends_on, .. } => depends_on.clone(),
95                RemoteInner::Var { depends_on, .. } => depends_on.clone(),
96            },
97        };
98        proxy.serialize(serializer)
99    }
100}
101
102impl<'de, X: serde::Deserialize<'de>> serde::Deserialize<'de> for Remote<X> {
103    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
104    where
105        D: serde::Deserializer<'de>,
106    {
107        let RemoteProxy {
108            depends_on,
109            last_known_value,
110        } = RemoteProxy::<X>::deserialize(deserializer)?;
111
112        Ok(Remote {
113            inner: RemoteInner::Init {
114                depends_on,
115                last_known_value,
116            },
117        })
118    }
119}
120
121impl<X: Clone + core::fmt::Debug + 'static> Remote<X> {
122    pub(crate) fn new<T: Resource>(
123        resource: &StoreResource<T, T::Output>,
124        map: impl Fn(&T::Output) -> X + 'static,
125    ) -> Self {
126        log::trace!(
127            "creating mapping of a remote resource '{}'",
128            resource.remote_var.depends_on
129        );
130        let depends_on = resource.remote_var.depends_on.clone();
131        Self {
132            inner: RemoteInner::Var {
133                map: Arc::new({
134                    let depends_on = depends_on.clone();
135                    move |any: &Arc<dyn Any>| {
136                        // UNWRAP: safe because this is an invariant
137                        let remote_var = any.downcast_ref::<RemoteVar<T::Output>>().unwrap();
138                        let t_output = remote_var.get().context(RemoteUnresolvedSnafu {
139                            ty: core::any::type_name::<X>(),
140                            depends_on: depends_on.clone(),
141                        })?;
142                        Ok(map(&t_output))
143                    }
144                }),
145                depends_on,
146                var: Arc::new(resource.remote_var.clone()),
147            },
148        }
149    }
150
151    pub fn get(&self) -> Result<X, Error> {
152        match &self.inner {
153            RemoteInner::Init {
154                depends_on,
155                last_known_value,
156            } => {
157                log::trace!("remote var returning last known value: {last_known_value:?}");
158                Ok(last_known_value.clone().context(RemoteUnresolvedSnafu {
159                    ty: core::any::type_name::<X>(),
160                    depends_on: depends_on.clone(),
161                })?)
162            }
163            RemoteInner::Var {
164                map,
165                var,
166                depends_on: _,
167            } => map(var),
168        }
169    }
170}
171
172impl<X> HasDependencies for Remote<X> {
173    fn dependencies(&self) -> Dependencies {
174        Dependencies {
175            inner: vec![match &self.inner {
176                RemoteInner::Init { depends_on, .. } => depends_on.clone(),
177                RemoteInner::Var { depends_on, .. } => depends_on.clone(),
178            }],
179        }
180    }
181}
182
183#[derive(Debug)]
184pub(crate) struct RemoteVar<T> {
185    depends_on: String,
186    inner: Arc<Mutex<Option<T>>>,
187}
188
189impl<T> Clone for RemoteVar<T> {
190    fn clone(&self) -> Self {
191        Self {
192            depends_on: self.depends_on.clone(),
193            inner: self.inner.clone(),
194        }
195    }
196}
197
198impl<T: Clone> RemoteVar<T> {
199    pub fn get(&self) -> Option<T> {
200        self.inner.lock().unwrap().clone()
201    }
202
203    pub fn set(&self, value: Option<T>) {
204        *self.inner.lock().unwrap() = value;
205    }
206}
207
208pub(crate) struct Var {
209    pub(crate) key: usize,
210    pub(crate) ty: &'static str,
211    pub(crate) action: Action,
212    pub(crate) remote: Box<dyn core::any::Any>,
213}
214
215#[derive(Default)]
216pub(crate) struct Remotes {
217    /// Map of resource name to key + RemoteVar<T>
218    vars: HashMap<String, Var>,
219}
220
221impl core::fmt::Display for Remotes {
222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223        for (name, var) in self.vars.iter() {
224            f.write_fmt(format_args!(
225                "name:'{name}' key:{rez} ty:{ty}\n",
226                rez = var.key,
227                ty = var.ty,
228            ))?;
229        }
230        Ok(())
231    }
232}
233
234impl Remotes {
235    /// Returns a new `RemoteVar<T>` and its resource key.
236    ///
237    /// ## Errors
238    /// Errs if a var by the given name exists but is of a different type than the type
239    /// requested.
240    pub fn dequeue_var<T: Any>(
241        &mut self,
242        id: &str,
243        action: Action,
244    ) -> Result<(RemoteVar<T>, usize, &'static str), Error> {
245        log::trace!(
246            "requested remote var '{id}' of type {}",
247            core::any::type_name::<T>()
248        );
249        let next_k = self.vars.len();
250        let var = self.vars.entry(id.to_owned()).or_insert_with(|| {
251            log::trace!("   but one doesn't exist, so we're creating a new entry '{next_k}'");
252            Var {
253                key: next_k,
254                ty: std::any::type_name::<T>(),
255                action,
256                remote: Box::new(RemoteVar::<T> {
257                    depends_on: id.to_owned(),
258                    inner: Default::default(),
259                }),
260            }
261        });
262        let remote: &RemoteVar<T> = var.remote.downcast_ref().context(DowncastSnafu)?;
263        Ok((remote.clone(), var.key, var.ty))
264    }
265
266    /// Returns the name of a resource by key
267    pub fn get_name_by_rez(&self, rez: usize) -> Option<String> {
268        for (name, var) in self.vars.iter() {
269            if rez == var.key {
270                return Some(name.clone());
271            }
272        }
273        None
274    }
275
276    /// Returns the key of the resource with the given name.
277    pub fn get(&self, id: &str) -> Option<&Var> {
278        self.vars.get(id)
279    }
280}
281
282#[derive(serde::Serialize, serde::Deserialize)]
283#[serde(untagged)]
284enum MigratedProxy<T> {
285    Remote(RemoteProxy<T>),
286    Local(T),
287}
288
289#[derive(Clone, Debug, PartialEq, serde::Deserialize)]
290#[serde(try_from = "MigratedProxy<T>")]
291pub struct Migrated<T>(pub(crate) T);
292
293impl<T> Deref for Migrated<T> {
294    type Target = T;
295
296    fn deref(&self) -> &Self::Target {
297        &self.0
298    }
299}
300
301impl<T> TryFrom<MigratedProxy<T>> for Migrated<T> {
302    type Error = &'static str;
303
304    fn try_from(value: MigratedProxy<T>) -> Result<Self, Self::Error> {
305        log::trace!("read a migrated {}", std::any::type_name::<T>());
306        match value {
307            MigratedProxy::Remote(RemoteProxy {
308                depends_on: _,
309                last_known_value,
310            }) => {
311                log::trace!("  from a previous remote");
312                if let Some(value) = last_known_value {
313                    Ok(Migrated(value))
314                } else {
315                    Err("Missing last known value")
316                }
317            }
318            MigratedProxy::Local(t) => Ok(Migrated(t)),
319        }
320    }
321}
322
323impl<T: serde::Serialize> serde::Serialize for Migrated<T> {
324    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
325    where
326        S: serde::Serializer,
327    {
328        self.0.serialize(serializer)
329    }
330}
331
332#[cfg(test)]
333mod test {
334    use super::*;
335
336    #[test]
337    fn migrate_ser() {
338        let migrated = Migrated(666u32);
339        let s = serde_json::to_string_pretty(&migrated).unwrap();
340        assert_eq!("666", &s);
341
342        let proxy = MigratedProxy::Remote(RemoteProxy {
343            depends_on: "test-bucket".into(),
344            last_known_value: Some([109, 121, 98, 117, 99, 107, 101, 116]),
345        });
346        let s = serde_json::to_string_pretty(&proxy).unwrap();
347        println!("{s}");
348    }
349
350    #[test]
351    fn migrate_de() {
352        let s = serde_json::json!({
353          "depends_on": "test-bucket",
354          "last_known_value": [
355            109,
356            121,
357            98,
358            117,
359            99,
360            107,
361            101,
362            116
363          ]
364        });
365        let _migrated: Migrated<[u8; 8]> = serde_json::from_value(s).unwrap();
366    }
367}