1use std::{
7 any::Any,
8 collections::HashMap,
9 ops::Deref,
10 sync::{Arc, Mutex},
11};
12
13use snafu::OptionExt;
14
15use crate::HasDependencies;
16
17use super::{
18 Action, Dependencies, DowncastSnafu, Error, RemoteUnresolvedSnafu, Resource, StoreResource,
19};
20
21type VarFn<X> = Arc<dyn Fn(&Arc<dyn Any>) -> Result<X, Error>>;
22
23#[derive(Clone)]
24enum RemoteInner<X> {
25 Init {
26 depends_on: String,
27 last_known_value: Option<X>,
28 },
29 Var {
30 depends_on: String,
31 map: VarFn<X>,
32 var: Arc<dyn Any>,
34 },
35}
36
37impl<X: std::fmt::Debug> std::fmt::Debug for RemoteInner<X> {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Self::Init {
41 depends_on,
42 last_known_value,
43 } => f
44 .debug_struct("Init")
45 .field("depends_on", depends_on)
46 .field("last_known_value", last_known_value)
47 .finish(),
48 Self::Var {
49 depends_on,
50 map: _,
51 var,
52 } => f
53 .debug_struct("Var")
54 .field("depends_on", depends_on)
55 .field("var", var)
56 .finish(),
57 }
58 }
59}
60
61#[derive(Clone, Debug)]
62pub struct Remote<X> {
63 inner: RemoteInner<X>,
64}
65
66impl<X: Clone + core::fmt::Debug + PartialEq + 'static> PartialEq for Remote<X> {
67 fn eq(&self, other: &Self) -> bool {
68 if let Ok(here) = self.get() {
69 if let Ok(there) = other.get() {
70 here == there
71 } else {
72 false
73 }
74 } else {
75 false
76 }
77 }
78}
79
80#[derive(serde::Serialize, serde::Deserialize)]
81struct RemoteProxy<T> {
82 depends_on: String,
83 last_known_value: Option<T>,
84}
85
86impl<X: serde::Serialize + Clone + core::fmt::Debug + 'static> serde::Serialize for Remote<X> {
87 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
88 where
89 S: serde::Serializer,
90 {
91 let proxy = RemoteProxy {
92 last_known_value: self.get().ok(),
93 depends_on: match &self.inner {
94 RemoteInner::Init { depends_on, .. } => depends_on.clone(),
95 RemoteInner::Var { depends_on, .. } => depends_on.clone(),
96 },
97 };
98 proxy.serialize(serializer)
99 }
100}
101
102impl<'de, X: serde::Deserialize<'de>> serde::Deserialize<'de> for Remote<X> {
103 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
104 where
105 D: serde::Deserializer<'de>,
106 {
107 let RemoteProxy {
108 depends_on,
109 last_known_value,
110 } = RemoteProxy::<X>::deserialize(deserializer)?;
111
112 Ok(Remote {
113 inner: RemoteInner::Init {
114 depends_on,
115 last_known_value,
116 },
117 })
118 }
119}
120
121impl<X: Clone + core::fmt::Debug + 'static> Remote<X> {
122 pub(crate) fn new<T: Resource>(
123 resource: &StoreResource<T, T::Output>,
124 map: impl Fn(&T::Output) -> X + 'static,
125 ) -> Self {
126 log::trace!(
127 "creating mapping of a remote resource '{}'",
128 resource.remote_var.depends_on
129 );
130 let depends_on = resource.remote_var.depends_on.clone();
131 Self {
132 inner: RemoteInner::Var {
133 map: Arc::new({
134 let depends_on = depends_on.clone();
135 move |any: &Arc<dyn Any>| {
136 let remote_var = any.downcast_ref::<RemoteVar<T::Output>>().unwrap();
138 let t_output = remote_var.get().context(RemoteUnresolvedSnafu {
139 ty: core::any::type_name::<X>(),
140 depends_on: depends_on.clone(),
141 })?;
142 Ok(map(&t_output))
143 }
144 }),
145 depends_on,
146 var: Arc::new(resource.remote_var.clone()),
147 },
148 }
149 }
150
151 pub fn get(&self) -> Result<X, Error> {
152 match &self.inner {
153 RemoteInner::Init {
154 depends_on,
155 last_known_value,
156 } => {
157 log::trace!("remote var returning last known value: {last_known_value:?}");
158 Ok(last_known_value.clone().context(RemoteUnresolvedSnafu {
159 ty: core::any::type_name::<X>(),
160 depends_on: depends_on.clone(),
161 })?)
162 }
163 RemoteInner::Var {
164 map,
165 var,
166 depends_on: _,
167 } => map(var),
168 }
169 }
170}
171
172impl<X> HasDependencies for Remote<X> {
173 fn dependencies(&self) -> Dependencies {
174 Dependencies {
175 inner: vec![match &self.inner {
176 RemoteInner::Init { depends_on, .. } => depends_on.clone(),
177 RemoteInner::Var { depends_on, .. } => depends_on.clone(),
178 }],
179 }
180 }
181}
182
183#[derive(Debug)]
184pub(crate) struct RemoteVar<T> {
185 depends_on: String,
186 action: Action,
187 inner: Arc<Mutex<Option<T>>>,
188}
189
190impl<T> Clone for RemoteVar<T> {
191 fn clone(&self) -> Self {
192 Self {
193 depends_on: self.depends_on.clone(),
194 action: self.action,
195 inner: self.inner.clone(),
196 }
197 }
198}
199
200impl<T: Clone> RemoteVar<T> {
201 pub fn get(&self) -> Option<T> {
202 self.inner.lock().unwrap().clone()
203 }
204
205 pub fn set(&self, value: Option<T>) {
206 *self.inner.lock().unwrap() = value;
207 }
208}
209
210pub(crate) struct Var {
211 pub(crate) key: usize,
212 pub(crate) ty: &'static str,
213 pub(crate) remote: Box<dyn core::any::Any>,
214}
215
216#[derive(Default)]
217pub(crate) struct Remotes {
218 vars: HashMap<String, Var>,
220}
221
222impl core::fmt::Display for Remotes {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 for (name, var) in self.vars.iter() {
225 f.write_fmt(format_args!(
226 "name:'{name}' key:{rez} ty:{ty}\n",
227 rez = var.key,
228 ty = var.ty,
229 ))?;
230 }
231 Ok(())
232 }
233}
234
235impl Remotes {
236 pub fn dequeue_var<T: Any>(
242 &mut self,
243 id: &str,
244 action: Action,
245 ) -> Result<(RemoteVar<T>, usize, &'static str), Error> {
246 log::trace!(
247 "requested remote var '{id}' of type {}",
248 core::any::type_name::<T>()
249 );
250 let next_k = self.vars.len();
251 let var = self.vars.entry(id.to_owned()).or_insert_with(|| {
252 log::trace!(" but one doesn't exist, so we're creating a new entry '{next_k}'");
253 Var {
254 key: next_k,
255 ty: std::any::type_name::<T>(),
256 remote: Box::new(RemoteVar::<T> {
257 depends_on: id.to_owned(),
258 action,
259 inner: Default::default(),
260 }),
261 }
262 });
263 let remote: &RemoteVar<T> = var.remote.downcast_ref().context(DowncastSnafu)?;
264 Ok((remote.clone(), var.key, var.ty))
265 }
266
267 pub fn get_name_by_rez(&self, rez: usize) -> Option<String> {
269 for (name, var) in self.vars.iter() {
270 if rez == var.key {
271 return Some(name.clone());
272 }
273 }
274 None
275 }
276
277 pub fn get(&self, id: &str) -> Option<&Var> {
279 self.vars.get(id)
280 }
281}
282
283#[derive(serde::Serialize, serde::Deserialize)]
284#[serde(untagged)]
285enum MigratedProxy<T> {
286 Remote(RemoteProxy<T>),
287 Local(T),
288}
289
290#[derive(Clone, Debug, PartialEq, serde::Deserialize)]
291#[serde(try_from = "MigratedProxy<T>")]
292pub struct Migrated<T>(pub(crate) T);
293
294impl<T> Deref for Migrated<T> {
295 type Target = T;
296
297 fn deref(&self) -> &Self::Target {
298 &self.0
299 }
300}
301
302impl<T> TryFrom<MigratedProxy<T>> for Migrated<T> {
303 type Error = &'static str;
304
305 fn try_from(value: MigratedProxy<T>) -> Result<Self, Self::Error> {
306 log::trace!("read a migrated {}", std::any::type_name::<T>());
307 match value {
308 MigratedProxy::Remote(RemoteProxy {
309 depends_on: _,
310 last_known_value,
311 }) => {
312 log::trace!(" from a previous remote");
313 if let Some(value) = last_known_value {
314 Ok(Migrated(value))
315 } else {
316 Err("Missing last known value")
317 }
318 }
319 MigratedProxy::Local(t) => Ok(Migrated(t)),
320 }
321 }
322}
323
324impl<T: serde::Serialize> serde::Serialize for Migrated<T> {
325 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
326 where
327 S: serde::Serializer,
328 {
329 self.0.serialize(serializer)
330 }
331}
332
333#[cfg(test)]
334mod test {
335 use super::*;
336
337 #[test]
338 fn migrate_ser() {
339 let migrated = Migrated(666u32);
340 let s = serde_json::to_string_pretty(&migrated).unwrap();
341 assert_eq!("666", &s);
342
343 let proxy = MigratedProxy::Remote(RemoteProxy {
344 depends_on: "test-bucket".into(),
345 last_known_value: Some([109, 121, 98, 117, 99, 107, 101, 116]),
346 });
347 let s = serde_json::to_string_pretty(&proxy).unwrap();
348 println!("{s}");
349 }
350
351 #[test]
352 fn migrate_de() {
353 let s = serde_json::json!({
354 "depends_on": "test-bucket",
355 "last_known_value": [
356 109,
357 121,
358 98,
359 117,
360 99,
361 107,
362 101,
363 116
364 ]
365 });
366 let _migrated: Migrated<[u8; 8]> = serde_json::from_value(s).unwrap();
367 }
368}