tele/
remote.rs

1//! Remote values.
2//!
3//! Remote values are values that are determined after creating
4//! or reading a resource from a provider.
5
6use std::{
7    any::Any,
8    collections::HashMap,
9    ops::Deref,
10    sync::{Arc, Mutex},
11};
12
13use snafu::OptionExt;
14
15use crate::HasDependencies;
16
17use super::{
18    Action, Dependencies, DowncastSnafu, Error, RemoteUnresolvedSnafu, Resource, StoreResource,
19};
20
21type VarFn<X> = Arc<dyn Fn(&Arc<dyn Any>) -> Result<X, Error>>;
22
23#[derive(Clone)]
24enum RemoteInner<X> {
25    Init {
26        depends_on: String,
27        last_known_value: Option<X>,
28    },
29    Var {
30        depends_on: String,
31        map: VarFn<X>,
32        // RemoteVar<T::Output>
33        var: Arc<dyn Any>,
34    },
35}
36
37impl<X: std::fmt::Debug> std::fmt::Debug for RemoteInner<X> {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Self::Init {
41                depends_on,
42                last_known_value,
43            } => f
44                .debug_struct("Init")
45                .field("depends_on", depends_on)
46                .field("last_known_value", last_known_value)
47                .finish(),
48            Self::Var {
49                depends_on,
50                map: _,
51                var,
52            } => f
53                .debug_struct("Var")
54                .field("depends_on", depends_on)
55                .field("var", var)
56                .finish(),
57        }
58    }
59}
60
61#[derive(Clone, Debug)]
62pub struct Remote<X> {
63    inner: RemoteInner<X>,
64}
65
66impl<X: Clone + core::fmt::Debug + PartialEq + 'static> PartialEq for Remote<X> {
67    fn eq(&self, other: &Self) -> bool {
68        if let Ok(here) = self.get() {
69            if let Ok(there) = other.get() {
70                here == there
71            } else {
72                false
73            }
74        } else {
75            false
76        }
77    }
78}
79
80#[derive(serde::Serialize, serde::Deserialize)]
81struct RemoteProxy<T> {
82    depends_on: String,
83    last_known_value: Option<T>,
84}
85
86impl<X: serde::Serialize + Clone + core::fmt::Debug + 'static> serde::Serialize for Remote<X> {
87    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
88    where
89        S: serde::Serializer,
90    {
91        let proxy = RemoteProxy {
92            last_known_value: self.get().ok(),
93            depends_on: match &self.inner {
94                RemoteInner::Init { depends_on, .. } => depends_on.clone(),
95                RemoteInner::Var { depends_on, .. } => depends_on.clone(),
96            },
97        };
98        proxy.serialize(serializer)
99    }
100}
101
102impl<'de, X: serde::Deserialize<'de>> serde::Deserialize<'de> for Remote<X> {
103    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
104    where
105        D: serde::Deserializer<'de>,
106    {
107        let RemoteProxy {
108            depends_on,
109            last_known_value,
110        } = RemoteProxy::<X>::deserialize(deserializer)?;
111
112        Ok(Remote {
113            inner: RemoteInner::Init {
114                depends_on,
115                last_known_value,
116            },
117        })
118    }
119}
120
121impl<X: Clone + core::fmt::Debug + 'static> Remote<X> {
122    pub(crate) fn new<T: Resource>(
123        resource: &StoreResource<T, T::Output>,
124        map: impl Fn(&T::Output) -> X + 'static,
125    ) -> Self {
126        log::trace!(
127            "creating mapping of a remote resource '{}'",
128            resource.remote_var.depends_on
129        );
130        let depends_on = resource.remote_var.depends_on.clone();
131        Self {
132            inner: RemoteInner::Var {
133                map: Arc::new({
134                    let depends_on = depends_on.clone();
135                    move |any: &Arc<dyn Any>| {
136                        // UNWRAP: safe because this is an invariant
137                        let remote_var = any.downcast_ref::<RemoteVar<T::Output>>().unwrap();
138                        let t_output = remote_var.get().context(RemoteUnresolvedSnafu {
139                            ty: core::any::type_name::<X>(),
140                            depends_on: depends_on.clone(),
141                        })?;
142                        Ok(map(&t_output))
143                    }
144                }),
145                depends_on,
146                var: Arc::new(resource.remote_var.clone()),
147            },
148        }
149    }
150
151    pub fn get(&self) -> Result<X, Error> {
152        match &self.inner {
153            RemoteInner::Init {
154                depends_on,
155                last_known_value,
156            } => {
157                log::trace!("remote var returning last known value: {last_known_value:?}");
158                Ok(last_known_value.clone().context(RemoteUnresolvedSnafu {
159                    ty: core::any::type_name::<X>(),
160                    depends_on: depends_on.clone(),
161                })?)
162            }
163            RemoteInner::Var {
164                map,
165                var,
166                depends_on: _,
167            } => map(var),
168        }
169    }
170}
171
172impl<X> HasDependencies for Remote<X> {
173    fn dependencies(&self) -> Dependencies {
174        Dependencies {
175            inner: vec![match &self.inner {
176                RemoteInner::Init { depends_on, .. } => depends_on.clone(),
177                RemoteInner::Var { depends_on, .. } => depends_on.clone(),
178            }],
179        }
180    }
181}
182
183#[derive(Debug)]
184pub(crate) struct RemoteVar<T> {
185    depends_on: String,
186    action: Action,
187    inner: Arc<Mutex<Option<T>>>,
188}
189
190impl<T> Clone for RemoteVar<T> {
191    fn clone(&self) -> Self {
192        Self {
193            depends_on: self.depends_on.clone(),
194            action: self.action,
195            inner: self.inner.clone(),
196        }
197    }
198}
199
200impl<T: Clone> RemoteVar<T> {
201    pub fn get(&self) -> Option<T> {
202        self.inner.lock().unwrap().clone()
203    }
204
205    pub fn set(&self, value: Option<T>) {
206        *self.inner.lock().unwrap() = value;
207    }
208}
209
210pub(crate) struct Var {
211    pub(crate) key: usize,
212    pub(crate) ty: &'static str,
213    pub(crate) remote: Box<dyn core::any::Any>,
214}
215
216#[derive(Default)]
217pub(crate) struct Remotes {
218    /// Map of resource name to key + RemoteVar<T>
219    vars: HashMap<String, Var>,
220}
221
222impl core::fmt::Display for Remotes {
223    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224        for (name, var) in self.vars.iter() {
225            f.write_fmt(format_args!(
226                "name:'{name}' key:{rez} ty:{ty}\n",
227                rez = var.key,
228                ty = var.ty,
229            ))?;
230        }
231        Ok(())
232    }
233}
234
235impl Remotes {
236    /// Returns a new `RemoteVar<T>` and its resource key.
237    ///
238    /// ## Errors
239    /// Errs if a var by the given name exists but is of a different type than the type
240    /// requested.
241    pub fn dequeue_var<T: Any>(
242        &mut self,
243        id: &str,
244        action: Action,
245    ) -> Result<(RemoteVar<T>, usize, &'static str), Error> {
246        log::trace!(
247            "requested remote var '{id}' of type {}",
248            core::any::type_name::<T>()
249        );
250        let next_k = self.vars.len();
251        let var = self.vars.entry(id.to_owned()).or_insert_with(|| {
252            log::trace!("   but one doesn't exist, so we're creating a new entry '{next_k}'");
253            Var {
254                key: next_k,
255                ty: std::any::type_name::<T>(),
256                remote: Box::new(RemoteVar::<T> {
257                    depends_on: id.to_owned(),
258                    action,
259                    inner: Default::default(),
260                }),
261            }
262        });
263        let remote: &RemoteVar<T> = var.remote.downcast_ref().context(DowncastSnafu)?;
264        Ok((remote.clone(), var.key, var.ty))
265    }
266
267    /// Returns the name of a resource by key
268    pub fn get_name_by_rez(&self, rez: usize) -> Option<String> {
269        for (name, var) in self.vars.iter() {
270            if rez == var.key {
271                return Some(name.clone());
272            }
273        }
274        None
275    }
276
277    /// Returns the key of the resource with the given name.
278    pub fn get(&self, id: &str) -> Option<&Var> {
279        self.vars.get(id)
280    }
281}
282
283#[derive(serde::Serialize, serde::Deserialize)]
284#[serde(untagged)]
285enum MigratedProxy<T> {
286    Remote(RemoteProxy<T>),
287    Local(T),
288}
289
290#[derive(Clone, Debug, PartialEq, serde::Deserialize)]
291#[serde(try_from = "MigratedProxy<T>")]
292pub struct Migrated<T>(pub(crate) T);
293
294impl<T> Deref for Migrated<T> {
295    type Target = T;
296
297    fn deref(&self) -> &Self::Target {
298        &self.0
299    }
300}
301
302impl<T> TryFrom<MigratedProxy<T>> for Migrated<T> {
303    type Error = &'static str;
304
305    fn try_from(value: MigratedProxy<T>) -> Result<Self, Self::Error> {
306        log::trace!("read a migrated {}", std::any::type_name::<T>());
307        match value {
308            MigratedProxy::Remote(RemoteProxy {
309                depends_on: _,
310                last_known_value,
311            }) => {
312                log::trace!("  from a previous remote");
313                if let Some(value) = last_known_value {
314                    Ok(Migrated(value))
315                } else {
316                    Err("Missing last known value")
317                }
318            }
319            MigratedProxy::Local(t) => Ok(Migrated(t)),
320        }
321    }
322}
323
324impl<T: serde::Serialize> serde::Serialize for Migrated<T> {
325    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
326    where
327        S: serde::Serializer,
328    {
329        self.0.serialize(serializer)
330    }
331}
332
333#[cfg(test)]
334mod test {
335    use super::*;
336
337    #[test]
338    fn migrate_ser() {
339        let migrated = Migrated(666u32);
340        let s = serde_json::to_string_pretty(&migrated).unwrap();
341        assert_eq!("666", &s);
342
343        let proxy = MigratedProxy::Remote(RemoteProxy {
344            depends_on: "test-bucket".into(),
345            last_known_value: Some([109, 121, 98, 117, 99, 107, 101, 116]),
346        });
347        let s = serde_json::to_string_pretty(&proxy).unwrap();
348        println!("{s}");
349    }
350
351    #[test]
352    fn migrate_de() {
353        let s = serde_json::json!({
354          "depends_on": "test-bucket",
355          "last_known_value": [
356            109,
357            121,
358            98,
359            117,
360            99,
361            107,
362            101,
363            116
364          ]
365        });
366        let _migrated: Migrated<[u8; 8]> = serde_json::from_value(s).unwrap();
367    }
368}