1use std::{
7 any::Any,
8 collections::HashMap,
9 ops::Deref,
10 sync::{Arc, Mutex},
11};
12
13use snafu::OptionExt;
14
15use crate::HasDependencies;
16
17use super::{
18 Action, Dependencies, DowncastSnafu, Error, RemoteUnresolvedSnafu, Resource, StoreResource,
19};
20
21type VarFn<X> = Arc<dyn Fn(&Arc<dyn Any>) -> Result<X, Error>>;
22
23#[derive(Clone)]
24enum RemoteInner<X> {
25 Init {
26 depends_on: String,
27 last_known_value: Option<X>,
28 },
29 Var {
30 depends_on: String,
31 map: VarFn<X>,
32 var: Arc<dyn Any>,
34 },
35}
36
37impl<X: std::fmt::Debug> std::fmt::Debug for RemoteInner<X> {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Self::Init {
41 depends_on,
42 last_known_value,
43 } => f
44 .debug_struct("Init")
45 .field("depends_on", depends_on)
46 .field("last_known_value", last_known_value)
47 .finish(),
48 Self::Var {
49 depends_on,
50 map: _,
51 var,
52 } => f
53 .debug_struct("Var")
54 .field("depends_on", depends_on)
55 .field("var", var)
56 .finish(),
57 }
58 }
59}
60
61#[derive(Clone, Debug)]
62pub struct Remote<X> {
63 inner: RemoteInner<X>,
64}
65
66impl<X: Clone + core::fmt::Debug + PartialEq + 'static> PartialEq for Remote<X> {
67 fn eq(&self, other: &Self) -> bool {
68 if let Ok(here) = self.get() {
69 if let Ok(there) = other.get() {
70 here == there
71 } else {
72 false
73 }
74 } else {
75 false
76 }
77 }
78}
79
80#[derive(serde::Serialize, serde::Deserialize)]
81struct RemoteProxy<T> {
82 depends_on: String,
83 last_known_value: Option<T>,
84}
85
86impl<X: serde::Serialize + Clone + core::fmt::Debug + 'static> serde::Serialize for Remote<X> {
87 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
88 where
89 S: serde::Serializer,
90 {
91 let proxy = RemoteProxy {
92 last_known_value: self.get().ok(),
93 depends_on: match &self.inner {
94 RemoteInner::Init { depends_on, .. } => depends_on.clone(),
95 RemoteInner::Var { depends_on, .. } => depends_on.clone(),
96 },
97 };
98 proxy.serialize(serializer)
99 }
100}
101
102impl<'de, X: serde::Deserialize<'de>> serde::Deserialize<'de> for Remote<X> {
103 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
104 where
105 D: serde::Deserializer<'de>,
106 {
107 let RemoteProxy {
108 depends_on,
109 last_known_value,
110 } = RemoteProxy::<X>::deserialize(deserializer)?;
111
112 Ok(Remote {
113 inner: RemoteInner::Init {
114 depends_on,
115 last_known_value,
116 },
117 })
118 }
119}
120
121impl<X: Clone + core::fmt::Debug + 'static> Remote<X> {
122 pub(crate) fn new<T: Resource>(
123 resource: &StoreResource<T, T::Output>,
124 map: impl Fn(&T::Output) -> X + 'static,
125 ) -> Self {
126 log::trace!(
127 "creating mapping of a remote resource '{}'",
128 resource.remote_var.depends_on
129 );
130 let depends_on = resource.remote_var.depends_on.clone();
131 Self {
132 inner: RemoteInner::Var {
133 map: Arc::new({
134 let depends_on = depends_on.clone();
135 move |any: &Arc<dyn Any>| {
136 let remote_var = any.downcast_ref::<RemoteVar<T::Output>>().unwrap();
138 let t_output = remote_var.get().context(RemoteUnresolvedSnafu {
139 ty: core::any::type_name::<X>(),
140 depends_on: depends_on.clone(),
141 })?;
142 Ok(map(&t_output))
143 }
144 }),
145 depends_on,
146 var: Arc::new(resource.remote_var.clone()),
147 },
148 }
149 }
150
151 pub fn get(&self) -> Result<X, Error> {
152 match &self.inner {
153 RemoteInner::Init {
154 depends_on,
155 last_known_value,
156 } => {
157 log::trace!("remote var returning last known value: {last_known_value:?}");
158 Ok(last_known_value.clone().context(RemoteUnresolvedSnafu {
159 ty: core::any::type_name::<X>(),
160 depends_on: depends_on.clone(),
161 })?)
162 }
163 RemoteInner::Var {
164 map,
165 var,
166 depends_on: _,
167 } => map(var),
168 }
169 }
170}
171
172impl<X> HasDependencies for Remote<X> {
173 fn dependencies(&self) -> Dependencies {
174 Dependencies {
175 inner: vec![match &self.inner {
176 RemoteInner::Init { depends_on, .. } => depends_on.clone(),
177 RemoteInner::Var { depends_on, .. } => depends_on.clone(),
178 }],
179 }
180 }
181}
182
183#[derive(Debug)]
184pub(crate) struct RemoteVar<T> {
185 depends_on: String,
186 inner: Arc<Mutex<Option<T>>>,
187}
188
189impl<T> Clone for RemoteVar<T> {
190 fn clone(&self) -> Self {
191 Self {
192 depends_on: self.depends_on.clone(),
193 inner: self.inner.clone(),
194 }
195 }
196}
197
198impl<T: Clone> RemoteVar<T> {
199 pub fn get(&self) -> Option<T> {
200 self.inner.lock().unwrap().clone()
201 }
202
203 pub fn set(&self, value: Option<T>) {
204 *self.inner.lock().unwrap() = value;
205 }
206}
207
208pub(crate) struct Var {
209 pub(crate) key: usize,
210 pub(crate) ty: &'static str,
211 pub(crate) action: Action,
212 pub(crate) remote: Box<dyn core::any::Any>,
213}
214
215#[derive(Default)]
216pub(crate) struct Remotes {
217 vars: HashMap<String, Var>,
219}
220
221impl core::fmt::Display for Remotes {
222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223 for (name, var) in self.vars.iter() {
224 f.write_fmt(format_args!(
225 "name:'{name}' key:{rez} ty:{ty}\n",
226 rez = var.key,
227 ty = var.ty,
228 ))?;
229 }
230 Ok(())
231 }
232}
233
234impl Remotes {
235 pub fn dequeue_var<T: Any>(
241 &mut self,
242 id: &str,
243 action: Action,
244 ) -> Result<(RemoteVar<T>, usize, &'static str), Error> {
245 log::trace!(
246 "requested remote var '{id}' of type {}",
247 core::any::type_name::<T>()
248 );
249 let next_k = self.vars.len();
250 let var = self.vars.entry(id.to_owned()).or_insert_with(|| {
251 log::trace!(" but one doesn't exist, so we're creating a new entry '{next_k}'");
252 Var {
253 key: next_k,
254 ty: std::any::type_name::<T>(),
255 action,
256 remote: Box::new(RemoteVar::<T> {
257 depends_on: id.to_owned(),
258 inner: Default::default(),
259 }),
260 }
261 });
262 let remote: &RemoteVar<T> = var.remote.downcast_ref().context(DowncastSnafu)?;
263 Ok((remote.clone(), var.key, var.ty))
264 }
265
266 pub fn get_name_by_rez(&self, rez: usize) -> Option<String> {
268 for (name, var) in self.vars.iter() {
269 if rez == var.key {
270 return Some(name.clone());
271 }
272 }
273 None
274 }
275
276 pub fn get(&self, id: &str) -> Option<&Var> {
278 self.vars.get(id)
279 }
280}
281
282#[derive(serde::Serialize, serde::Deserialize)]
283#[serde(untagged)]
284enum MigratedProxy<T> {
285 Remote(RemoteProxy<T>),
286 Local(T),
287}
288
289#[derive(Clone, Debug, PartialEq, serde::Deserialize)]
290#[serde(try_from = "MigratedProxy<T>")]
291pub struct Migrated<T>(pub(crate) T);
292
293impl<T> Deref for Migrated<T> {
294 type Target = T;
295
296 fn deref(&self) -> &Self::Target {
297 &self.0
298 }
299}
300
301impl<T> TryFrom<MigratedProxy<T>> for Migrated<T> {
302 type Error = &'static str;
303
304 fn try_from(value: MigratedProxy<T>) -> Result<Self, Self::Error> {
305 log::trace!("read a migrated {}", std::any::type_name::<T>());
306 match value {
307 MigratedProxy::Remote(RemoteProxy {
308 depends_on: _,
309 last_known_value,
310 }) => {
311 log::trace!(" from a previous remote");
312 if let Some(value) = last_known_value {
313 Ok(Migrated(value))
314 } else {
315 Err("Missing last known value")
316 }
317 }
318 MigratedProxy::Local(t) => Ok(Migrated(t)),
319 }
320 }
321}
322
323impl<T: serde::Serialize> serde::Serialize for Migrated<T> {
324 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
325 where
326 S: serde::Serializer,
327 {
328 self.0.serialize(serializer)
329 }
330}
331
332#[cfg(test)]
333mod test {
334 use super::*;
335
336 #[test]
337 fn migrate_ser() {
338 let migrated = Migrated(666u32);
339 let s = serde_json::to_string_pretty(&migrated).unwrap();
340 assert_eq!("666", &s);
341
342 let proxy = MigratedProxy::Remote(RemoteProxy {
343 depends_on: "test-bucket".into(),
344 last_known_value: Some([109, 121, 98, 117, 99, 107, 101, 116]),
345 });
346 let s = serde_json::to_string_pretty(&proxy).unwrap();
347 println!("{s}");
348 }
349
350 #[test]
351 fn migrate_de() {
352 let s = serde_json::json!({
353 "depends_on": "test-bucket",
354 "last_known_value": [
355 109,
356 121,
357 98,
358 117,
359 99,
360 107,
361 101,
362 116
363 ]
364 });
365 let _migrated: Migrated<[u8; 8]> = serde_json::from_value(s).unwrap();
366 }
367}