tele/
remote.rs

1//! Remote values.
2//!
3//! Remote values are values that are determined after creating
4//! or reading a resource from a provider.
5
6use std::{
7    any::Any,
8    collections::HashMap,
9    ops::Deref,
10    sync::{Arc, Mutex},
11};
12
13use snafu::OptionExt;
14
15use crate::HasDependencies;
16
17use super::{
18    Action, Dependencies, DowncastSnafu, Error, RemoteUnresolvedSnafu, Resource, StoreResource,
19};
20
21type VarFn<X> = Arc<dyn Fn(&Arc<dyn Any>) -> Result<X, Error>>;
22
23#[derive(Clone)]
24enum RemoteInner<X> {
25    Init {
26        depends_on: String,
27        last_known_value: Option<X>,
28    },
29    Var {
30        depends_on: String,
31        map: VarFn<X>,
32        // RemoteVar<T::Output>
33        var: Arc<dyn Any>,
34    },
35}
36
37impl<X: std::fmt::Debug> std::fmt::Debug for RemoteInner<X> {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Self::Init {
41                depends_on,
42                last_known_value,
43            } => f
44                .debug_struct("Init")
45                .field("depends_on", depends_on)
46                .field("last_known_value", last_known_value)
47                .finish(),
48            Self::Var {
49                depends_on,
50                map: _,
51                var,
52            } => f
53                .debug_struct("Var")
54                .field("depends_on", depends_on)
55                .field("var", var)
56                .finish(),
57        }
58    }
59}
60
61#[derive(Clone)]
62pub struct Remote<X> {
63    inner: RemoteInner<X>,
64}
65
66impl<X: Clone + core::fmt::Debug + 'static> std::fmt::Debug for Remote<X> {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        let depends_on = match &self.inner {
69            RemoteInner::Init { depends_on, .. } => depends_on,
70            RemoteInner::Var { depends_on, .. } => depends_on,
71        };
72        f.debug_struct("Remote")
73            .field("depends_on", depends_on)
74            .field("value", &self.get().ok())
75            .finish()
76    }
77}
78
79impl<X: Clone + core::fmt::Debug + PartialEq + 'static> PartialEq for Remote<X> {
80    fn eq(&self, other: &Self) -> bool {
81        if let Ok(here) = self.get() {
82            if let Ok(there) = other.get() {
83                here == there
84            } else {
85                false
86            }
87        } else {
88            false
89        }
90    }
91}
92
93#[derive(serde::Serialize, serde::Deserialize)]
94struct RemoteProxy<T> {
95    depends_on: String,
96    last_known_value: Option<T>,
97}
98
99impl<X: serde::Serialize + Clone + core::fmt::Debug + 'static> serde::Serialize for Remote<X> {
100    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
101    where
102        S: serde::Serializer,
103    {
104        let proxy = RemoteProxy {
105            last_known_value: self.get().ok(),
106            depends_on: match &self.inner {
107                RemoteInner::Init { depends_on, .. } => depends_on.clone(),
108                RemoteInner::Var { depends_on, .. } => depends_on.clone(),
109            },
110        };
111        proxy.serialize(serializer)
112    }
113}
114
115impl<'de, X: serde::Deserialize<'de>> serde::Deserialize<'de> for Remote<X> {
116    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
117    where
118        D: serde::Deserializer<'de>,
119    {
120        let RemoteProxy {
121            depends_on,
122            last_known_value,
123        } = RemoteProxy::<X>::deserialize(deserializer)?;
124
125        Ok(Remote {
126            inner: RemoteInner::Init {
127                depends_on,
128                last_known_value,
129            },
130        })
131    }
132}
133
134impl<X: Clone + core::fmt::Debug + 'static> Remote<X> {
135    pub(crate) fn new<T: Resource>(
136        resource: &StoreResource<T, T::Output>,
137        map: impl Fn(&T::Output) -> X + 'static,
138    ) -> Self {
139        log::trace!(
140            "creating mapping of a remote resource '{}'",
141            resource.remote_var.depends_on
142        );
143        let depends_on = resource.remote_var.depends_on.clone();
144        Self {
145            inner: RemoteInner::Var {
146                map: Arc::new({
147                    let depends_on = depends_on.clone();
148                    move |any: &Arc<dyn Any>| {
149                        // UNWRAP: safe because this is an invariant
150                        let remote_var = any.downcast_ref::<RemoteVar<T::Output>>().unwrap();
151                        let t_output = remote_var.get().context(RemoteUnresolvedSnafu {
152                            ty: core::any::type_name::<X>(),
153                            depends_on: depends_on.clone(),
154                        })?;
155                        Ok(map(&t_output))
156                    }
157                }),
158                depends_on,
159                var: Arc::new(resource.remote_var.clone()),
160            },
161        }
162    }
163
164    pub fn get(&self) -> Result<X, Error> {
165        match &self.inner {
166            RemoteInner::Init {
167                depends_on,
168                last_known_value,
169            } => {
170                log::trace!("remote var returning last known value: {last_known_value:?}");
171                Ok(last_known_value.clone().context(RemoteUnresolvedSnafu {
172                    ty: core::any::type_name::<X>(),
173                    depends_on: depends_on.clone(),
174                })?)
175            }
176            RemoteInner::Var {
177                map,
178                var,
179                depends_on: _,
180            } => map(var),
181        }
182    }
183
184    pub fn map<Y>(&self, f: impl Fn(X) -> Y + 'static) -> Remote<Y> {
185        match &self.inner {
186            RemoteInner::Init {
187                depends_on,
188                last_known_value,
189            } => Remote {
190                inner: RemoteInner::Init {
191                    depends_on: depends_on.clone(),
192                    last_known_value: last_known_value.clone().map(f),
193                },
194            },
195            RemoteInner::Var {
196                depends_on,
197                map,
198                var,
199            } => Remote {
200                inner: RemoteInner::Var {
201                    depends_on: depends_on.clone(),
202                    var: var.clone(),
203                    map: Arc::new({
204                        let map = map.clone();
205                        move |any: &Arc<dyn Any>| {
206                            let x = map(any)?;
207                            Ok(f(x))
208                        }
209                    }),
210                },
211            },
212        }
213    }
214}
215
216impl<X> HasDependencies for Remote<X> {
217    fn dependencies(&self) -> Dependencies {
218        Dependencies {
219            inner: vec![match &self.inner {
220                RemoteInner::Init { depends_on, .. } => depends_on.clone(),
221                RemoteInner::Var { depends_on, .. } => depends_on.clone(),
222            }],
223        }
224    }
225}
226
227#[derive(Debug)]
228pub(crate) struct RemoteVar<T> {
229    depends_on: String,
230    inner: Arc<Mutex<Option<T>>>,
231}
232
233impl<T> Clone for RemoteVar<T> {
234    fn clone(&self) -> Self {
235        Self {
236            depends_on: self.depends_on.clone(),
237            inner: self.inner.clone(),
238        }
239    }
240}
241
242impl<T: Clone> RemoteVar<T> {
243    pub fn get(&self) -> Option<T> {
244        self.inner.lock().unwrap().clone()
245    }
246
247    pub fn set(&self, value: Option<T>) {
248        *self.inner.lock().unwrap() = value;
249    }
250}
251
252pub(crate) struct Var {
253    pub(crate) key: usize,
254    pub(crate) ty: &'static str,
255    pub(crate) action: Action,
256    pub(crate) remote: Box<dyn core::any::Any>,
257}
258
259#[derive(Default)]
260pub(crate) struct Remotes {
261    /// Map of resource name to key + RemoteVar<T>
262    vars: HashMap<String, Var>,
263}
264
265impl core::fmt::Display for Remotes {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        for (name, var) in self.vars.iter() {
268            f.write_fmt(format_args!(
269                "name:'{name}' key:{rez} ty:{ty}\n",
270                rez = var.key,
271                ty = var.ty,
272            ))?;
273        }
274        Ok(())
275    }
276}
277
278impl Remotes {
279    /// Returns a new `RemoteVar<T>` and its resource key.
280    ///
281    /// ## Errors
282    /// Errs if a var by the given name exists but is of a different type than the type
283    /// requested.
284    pub fn dequeue_var<T: Any>(
285        &mut self,
286        id: &str,
287        action: Action,
288    ) -> Result<(RemoteVar<T>, usize, &'static str), Error> {
289        log::trace!(
290            "requested remote var '{id}' of type {}",
291            core::any::type_name::<T>()
292        );
293        let next_k = self.vars.len();
294        let var = self.vars.entry(id.to_owned()).or_insert_with(|| {
295            log::trace!("   but one doesn't exist, so we're creating a new entry '{next_k}'");
296            Var {
297                key: next_k,
298                ty: std::any::type_name::<T>(),
299                action,
300                remote: Box::new(RemoteVar::<T> {
301                    depends_on: id.to_owned(),
302                    inner: Default::default(),
303                }),
304            }
305        });
306        let remote: &RemoteVar<T> = var.remote.downcast_ref().context(DowncastSnafu)?;
307        Ok((remote.clone(), var.key, var.ty))
308    }
309
310    /// Returns the name of a resource by key
311    pub fn get_name_by_rez(&self, rez: usize) -> Option<String> {
312        for (name, var) in self.vars.iter() {
313            if rez == var.key {
314                return Some(name.clone());
315            }
316        }
317        None
318    }
319
320    /// Returns the key of the resource with the given name.
321    pub fn get(&self, id: &str) -> Option<&Var> {
322        self.vars.get(id)
323    }
324}
325
326#[derive(serde::Serialize, serde::Deserialize)]
327#[serde(untagged)]
328enum MigratedProxy<T> {
329    Remote(RemoteProxy<T>),
330    Local(T),
331}
332
333#[derive(Clone, Debug, PartialEq, serde::Deserialize)]
334#[serde(try_from = "MigratedProxy<T>")]
335pub struct Migrated<T>(pub(crate) T);
336
337impl<T> Deref for Migrated<T> {
338    type Target = T;
339
340    fn deref(&self) -> &Self::Target {
341        &self.0
342    }
343}
344
345impl<T> TryFrom<MigratedProxy<T>> for Migrated<T> {
346    type Error = &'static str;
347
348    fn try_from(value: MigratedProxy<T>) -> Result<Self, Self::Error> {
349        log::trace!("read a migrated {}", std::any::type_name::<T>());
350        match value {
351            MigratedProxy::Remote(RemoteProxy {
352                depends_on: _,
353                last_known_value,
354            }) => {
355                log::trace!("  from a previous remote");
356                if let Some(value) = last_known_value {
357                    Ok(Migrated(value))
358                } else {
359                    Err("Missing last known value")
360                }
361            }
362            MigratedProxy::Local(t) => Ok(Migrated(t)),
363        }
364    }
365}
366
367impl<T: serde::Serialize> serde::Serialize for Migrated<T> {
368    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
369    where
370        S: serde::Serializer,
371    {
372        self.0.serialize(serializer)
373    }
374}
375
376#[cfg(test)]
377mod test {
378    use super::*;
379
380    #[test]
381    fn migrate_ser() {
382        let migrated = Migrated(666u32);
383        let s = serde_json::to_string_pretty(&migrated).unwrap();
384        assert_eq!("666", &s);
385
386        let proxy = MigratedProxy::Remote(RemoteProxy {
387            depends_on: "test-bucket".into(),
388            last_known_value: Some([109, 121, 98, 117, 99, 107, 101, 116]),
389        });
390        let s = serde_json::to_string_pretty(&proxy).unwrap();
391        println!("{s}");
392    }
393
394    #[test]
395    fn migrate_de() {
396        let s = serde_json::json!({
397          "depends_on": "test-bucket",
398          "last_known_value": [
399            109,
400            121,
401            98,
402            117,
403            99,
404            107,
405            101,
406            116
407          ]
408        });
409        let _migrated: Migrated<[u8; 8]> = serde_json::from_value(s).unwrap();
410    }
411}