1use std::{
7 any::Any,
8 collections::HashMap,
9 ops::Deref,
10 sync::{Arc, Mutex},
11};
12
13use snafu::OptionExt;
14
15use crate::HasDependencies;
16
17use super::{
18 Action, Dependencies, DowncastSnafu, Error, RemoteUnresolvedSnafu, Resource, StoreResource,
19};
20
21type VarFn<X> = Arc<dyn Fn(&Arc<dyn Any>) -> Result<X, Error>>;
22
23#[derive(Clone)]
24enum RemoteInner<X> {
25 Init {
26 depends_on: String,
27 last_known_value: Option<X>,
28 },
29 Var {
30 depends_on: String,
31 map: VarFn<X>,
32 var: Arc<dyn Any>,
34 },
35}
36
37impl<X: std::fmt::Debug> std::fmt::Debug for RemoteInner<X> {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Self::Init {
41 depends_on,
42 last_known_value,
43 } => f
44 .debug_struct("Init")
45 .field("depends_on", depends_on)
46 .field("last_known_value", last_known_value)
47 .finish(),
48 Self::Var {
49 depends_on,
50 map: _,
51 var,
52 } => f
53 .debug_struct("Var")
54 .field("depends_on", depends_on)
55 .field("var", var)
56 .finish(),
57 }
58 }
59}
60
61#[derive(Clone)]
62pub struct Remote<X> {
63 inner: RemoteInner<X>,
64}
65
66impl<X: Clone + core::fmt::Debug + 'static> std::fmt::Debug for Remote<X> {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 let depends_on = match &self.inner {
69 RemoteInner::Init { depends_on, .. } => depends_on,
70 RemoteInner::Var { depends_on, .. } => depends_on,
71 };
72 f.debug_struct("Remote")
73 .field("depends_on", depends_on)
74 .field("value", &self.get().ok())
75 .finish()
76 }
77}
78
79impl<X: Clone + core::fmt::Debug + PartialEq + 'static> PartialEq for Remote<X> {
80 fn eq(&self, other: &Self) -> bool {
81 if let Ok(here) = self.get() {
82 if let Ok(there) = other.get() {
83 here == there
84 } else {
85 false
86 }
87 } else {
88 false
89 }
90 }
91}
92
93#[derive(serde::Serialize, serde::Deserialize)]
94struct RemoteProxy<T> {
95 depends_on: String,
96 last_known_value: Option<T>,
97}
98
99impl<X: serde::Serialize + Clone + core::fmt::Debug + 'static> serde::Serialize for Remote<X> {
100 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
101 where
102 S: serde::Serializer,
103 {
104 let proxy = RemoteProxy {
105 last_known_value: self.get().ok(),
106 depends_on: match &self.inner {
107 RemoteInner::Init { depends_on, .. } => depends_on.clone(),
108 RemoteInner::Var { depends_on, .. } => depends_on.clone(),
109 },
110 };
111 proxy.serialize(serializer)
112 }
113}
114
115impl<'de, X: serde::Deserialize<'de>> serde::Deserialize<'de> for Remote<X> {
116 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
117 where
118 D: serde::Deserializer<'de>,
119 {
120 let RemoteProxy {
121 depends_on,
122 last_known_value,
123 } = RemoteProxy::<X>::deserialize(deserializer)?;
124
125 Ok(Remote {
126 inner: RemoteInner::Init {
127 depends_on,
128 last_known_value,
129 },
130 })
131 }
132}
133
134impl<X: Clone + core::fmt::Debug + 'static> Remote<X> {
135 pub(crate) fn new<T: Resource>(
136 resource: &StoreResource<T, T::Output>,
137 map: impl Fn(&T::Output) -> X + 'static,
138 ) -> Self {
139 log::trace!(
140 "creating mapping of a remote resource '{}'",
141 resource.remote_var.depends_on
142 );
143 let depends_on = resource.remote_var.depends_on.clone();
144 Self {
145 inner: RemoteInner::Var {
146 map: Arc::new({
147 let depends_on = depends_on.clone();
148 move |any: &Arc<dyn Any>| {
149 let remote_var = any.downcast_ref::<RemoteVar<T::Output>>().unwrap();
151 let t_output = remote_var.get().context(RemoteUnresolvedSnafu {
152 ty: core::any::type_name::<X>(),
153 depends_on: depends_on.clone(),
154 })?;
155 Ok(map(&t_output))
156 }
157 }),
158 depends_on,
159 var: Arc::new(resource.remote_var.clone()),
160 },
161 }
162 }
163
164 pub fn get(&self) -> Result<X, Error> {
165 match &self.inner {
166 RemoteInner::Init {
167 depends_on,
168 last_known_value,
169 } => {
170 log::trace!("remote var returning last known value: {last_known_value:?}");
171 Ok(last_known_value.clone().context(RemoteUnresolvedSnafu {
172 ty: core::any::type_name::<X>(),
173 depends_on: depends_on.clone(),
174 })?)
175 }
176 RemoteInner::Var {
177 map,
178 var,
179 depends_on: _,
180 } => map(var),
181 }
182 }
183
184 pub fn map<Y>(&self, f: impl Fn(X) -> Y + 'static) -> Remote<Y> {
185 match &self.inner {
186 RemoteInner::Init {
187 depends_on,
188 last_known_value,
189 } => Remote {
190 inner: RemoteInner::Init {
191 depends_on: depends_on.clone(),
192 last_known_value: last_known_value.clone().map(f),
193 },
194 },
195 RemoteInner::Var {
196 depends_on,
197 map,
198 var,
199 } => Remote {
200 inner: RemoteInner::Var {
201 depends_on: depends_on.clone(),
202 var: var.clone(),
203 map: Arc::new({
204 let map = map.clone();
205 move |any: &Arc<dyn Any>| {
206 let x = map(any)?;
207 Ok(f(x))
208 }
209 }),
210 },
211 },
212 }
213 }
214}
215
216impl<X> HasDependencies for Remote<X> {
217 fn dependencies(&self) -> Dependencies {
218 Dependencies {
219 inner: vec![match &self.inner {
220 RemoteInner::Init { depends_on, .. } => depends_on.clone(),
221 RemoteInner::Var { depends_on, .. } => depends_on.clone(),
222 }],
223 }
224 }
225}
226
227#[derive(Debug)]
228pub(crate) struct RemoteVar<T> {
229 depends_on: String,
230 inner: Arc<Mutex<Option<T>>>,
231}
232
233impl<T> Clone for RemoteVar<T> {
234 fn clone(&self) -> Self {
235 Self {
236 depends_on: self.depends_on.clone(),
237 inner: self.inner.clone(),
238 }
239 }
240}
241
242impl<T: Clone> RemoteVar<T> {
243 pub fn get(&self) -> Option<T> {
244 self.inner.lock().unwrap().clone()
245 }
246
247 pub fn set(&self, value: Option<T>) {
248 *self.inner.lock().unwrap() = value;
249 }
250}
251
252pub(crate) struct Var {
253 pub(crate) key: usize,
254 pub(crate) ty: &'static str,
255 pub(crate) action: Action,
256 pub(crate) remote: Box<dyn core::any::Any>,
257}
258
259#[derive(Default)]
260pub(crate) struct Remotes {
261 vars: HashMap<String, Var>,
263}
264
265impl core::fmt::Display for Remotes {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 for (name, var) in self.vars.iter() {
268 f.write_fmt(format_args!(
269 "name:'{name}' key:{rez} ty:{ty}\n",
270 rez = var.key,
271 ty = var.ty,
272 ))?;
273 }
274 Ok(())
275 }
276}
277
278impl Remotes {
279 pub fn dequeue_var<T: Any>(
285 &mut self,
286 id: &str,
287 action: Action,
288 ) -> Result<(RemoteVar<T>, usize, &'static str), Error> {
289 log::trace!(
290 "requested remote var '{id}' of type {}",
291 core::any::type_name::<T>()
292 );
293 let next_k = self.vars.len();
294 let var = self.vars.entry(id.to_owned()).or_insert_with(|| {
295 log::trace!(" but one doesn't exist, so we're creating a new entry '{next_k}'");
296 Var {
297 key: next_k,
298 ty: std::any::type_name::<T>(),
299 action,
300 remote: Box::new(RemoteVar::<T> {
301 depends_on: id.to_owned(),
302 inner: Default::default(),
303 }),
304 }
305 });
306 let remote: &RemoteVar<T> = var.remote.downcast_ref().context(DowncastSnafu)?;
307 Ok((remote.clone(), var.key, var.ty))
308 }
309
310 pub fn get_name_by_rez(&self, rez: usize) -> Option<String> {
312 for (name, var) in self.vars.iter() {
313 if rez == var.key {
314 return Some(name.clone());
315 }
316 }
317 None
318 }
319
320 pub fn get(&self, id: &str) -> Option<&Var> {
322 self.vars.get(id)
323 }
324}
325
326#[derive(serde::Serialize, serde::Deserialize)]
327#[serde(untagged)]
328enum MigratedProxy<T> {
329 Remote(RemoteProxy<T>),
330 Local(T),
331}
332
333#[derive(Clone, Debug, PartialEq, serde::Deserialize)]
334#[serde(try_from = "MigratedProxy<T>")]
335pub struct Migrated<T>(pub(crate) T);
336
337impl<T> Deref for Migrated<T> {
338 type Target = T;
339
340 fn deref(&self) -> &Self::Target {
341 &self.0
342 }
343}
344
345impl<T> TryFrom<MigratedProxy<T>> for Migrated<T> {
346 type Error = &'static str;
347
348 fn try_from(value: MigratedProxy<T>) -> Result<Self, Self::Error> {
349 log::trace!("read a migrated {}", std::any::type_name::<T>());
350 match value {
351 MigratedProxy::Remote(RemoteProxy {
352 depends_on: _,
353 last_known_value,
354 }) => {
355 log::trace!(" from a previous remote");
356 if let Some(value) = last_known_value {
357 Ok(Migrated(value))
358 } else {
359 Err("Missing last known value")
360 }
361 }
362 MigratedProxy::Local(t) => Ok(Migrated(t)),
363 }
364 }
365}
366
367impl<T: serde::Serialize> serde::Serialize for Migrated<T> {
368 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
369 where
370 S: serde::Serializer,
371 {
372 self.0.serialize(serializer)
373 }
374}
375
376#[cfg(test)]
377mod test {
378 use super::*;
379
380 #[test]
381 fn migrate_ser() {
382 let migrated = Migrated(666u32);
383 let s = serde_json::to_string_pretty(&migrated).unwrap();
384 assert_eq!("666", &s);
385
386 let proxy = MigratedProxy::Remote(RemoteProxy {
387 depends_on: "test-bucket".into(),
388 last_known_value: Some([109, 121, 98, 117, 99, 107, 101, 116]),
389 });
390 let s = serde_json::to_string_pretty(&proxy).unwrap();
391 println!("{s}");
392 }
393
394 #[test]
395 fn migrate_de() {
396 let s = serde_json::json!({
397 "depends_on": "test-bucket",
398 "last_known_value": [
399 109,
400 121,
401 98,
402 117,
403 99,
404 107,
405 101,
406 116
407 ]
408 });
409 let _migrated: Migrated<[u8; 8]> = serde_json::from_value(s).unwrap();
410 }
411}